ソースを参照

完善细小功能,增加临时速率升级记录 用于后续处理计算错误的问题

xudm 2 ヶ月 前
コミット
dd480a5087

+ 0 - 1
src/main/java/com/xs/core/XsTgGameApplication.java

@@ -2,7 +2,6 @@ package com.xs.core;
 
 import org.mybatis.spring.annotation.MapperScan;
 import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 @SpringBootApplication

+ 8 - 0
src/main/java/com/xs/core/model/coin/entity/BoostTemporaryRateRecord.java

@@ -5,8 +5,10 @@ import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
 import com.baomidou.mybatisplus.extension.activerecord.Model;
+
 import java.io.Serializable;
 import java.time.LocalDateTime;
+
 import lombok.Getter;
 import lombok.Setter;
 
@@ -35,6 +37,12 @@ public class BoostTemporaryRateRecord extends Model<BoostTemporaryRateRecord> {
     @TableField("user_id")
     private Long userId;
 
+    /**
+     * 批次id
+     */
+    @TableField("batch_id")
+    private String batchId;
+
     /**
      * 升级时间戳
      */

+ 2 - 2
src/main/java/com/xs/core/mq/consumer/AirdropMessageConsumer.java

@@ -30,7 +30,7 @@ public class AirdropMessageConsumer {
     @Autowired
     private IUserWalletService walletService;
 
-    @RabbitListener(queues = RabbitMQConfig.AIRDROP_QUEUE, concurrency = "10")
+    @RabbitListener(queues = RabbitMQConfig.AIRDROP_QUEUE)
     public void handleComplete(Message m, Channel channel) throws IOException {
         String msg = new String(m.getBody());
         AirdropManagerMessage airdropManagerMessage = JSON.parseObject(msg, AirdropManagerMessage.class);
@@ -72,10 +72,10 @@ public class AirdropMessageConsumer {
                     walletService.coinTransaction(CoinTransactionTypeEnum.SUBTRACT, CoinTransactionCategoryEnum.RESOURCE_PLACEMENT, "空投减少金币", "空投减少金币,用户id:" + airdropManagerMessage.getTargetUser() + ",用户空投金币不足,实际减去金额为" + reduce.toPlainString(), reduce, airdropManagerMessage.getTargetUser());
                 }
             }
+            channel.basicAck(m.getMessageProperties().getDeliveryTag(), false);
         } catch (Exception e) {
             log.error("空投消息处理失败,消息内容:{}", msg, e);
             channel.basicNack(m.getMessageProperties().getDeliveryTag(), false, true);
         }
-        channel.basicAck(m.getMessageProperties().getDeliveryTag(), false);
     }
 }

+ 7 - 8
src/main/java/com/xs/core/mq/consumer/GoldCoinProductMessageConsumer.java

@@ -62,7 +62,6 @@ public class GoldCoinProductMessageConsumer {
                         BeanUtils.copyProperties(coinProdState, newMessage);
                         //重新设置消息id
                         newMessage.setMsgId(IdWorker.getIdStr());
-
                     } else {
                         //发送金币结算消息
                         newMessage = new CoinProducerMessage();
@@ -73,20 +72,20 @@ public class GoldCoinProductMessageConsumer {
                     }
                 }
             }
+            //如过没有发生异常才发送下一次延迟计算消息
+            if (completed) {
+                goldCoinProducer.sendCalculationCompleteMessage(newMessage);
+            } else {
+                goldCoinProducer.sendDelayCalculationMessage(newMessage);
+            }
             channel.basicAck(m.getMessageProperties().getDeliveryTag(), false);
         } catch (Exception e) {
-            log.error("Failed to handle calculation for user: {}", message.getUserId(), e);
+            log.error("Failed to handle calculation for user: {}", message, e);
             //删除消息的处理状态
             deleteMessageProcessed(message.getMsgId());
             // 拒绝消息 并重新入队
             channel.basicNack(m.getMessageProperties().getDeliveryTag(), false, true);
         }
-        //如过没有发生异常才发送下一次延迟计算消息
-        if (completed) {
-            goldCoinProducer.sendCalculationCompleteMessage(newMessage);
-        } else {
-            goldCoinProducer.sendDelayCalculationMessage(newMessage);
-        }
     }
 
     // 使用Redis做幂等性检查

+ 15 - 0
src/main/java/com/xs/core/mq/producer/ConfirmCallbackService.java

@@ -0,0 +1,15 @@
+package com.xs.core.mq.producer;
+
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+
+public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
+    @Override
+    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+        if (ack) {
+            System.out.println("消息发送成功");
+        } else {
+            System.out.println("消息发送失败");
+        }
+    }
+}

+ 4 - 3
src/main/java/com/xs/core/mq/producer/GoldCoinMessageProducer.java

@@ -8,6 +8,7 @@ import com.xs.core.model.coin.msg.RetryableMessage;
 import com.xs.core.model.coin.msg.TeamShareMessage;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.MessageDeliveryMode;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -35,7 +36,7 @@ public class GoldCoinMessageProducer {
                 //延时10秒
                 msg.getMessageProperties().setHeader("x-delay", 10 * 1000L);
                 return msg;
-            });
+            }, new CorrelationData(coinProducerMessage.getMsgId()));
         });
     }
 
@@ -46,7 +47,7 @@ public class GoldCoinMessageProducer {
      */
     public void sendCalculationCompleteMessage(CoinProducerMessage coinProducerMessage) {
         sendMessageWithRetry(coinProducerMessage, MAX_RETRY_COUNT, m -> {
-            rabbitTemplate.convertAndSend(RabbitMQConfig.GOLD_COIN_COMPLETE_EXCHANGE, RabbitMQConfig.GOLD_COIN_COMPLETE_QUEUE, JSON.toJSONString(m));
+            rabbitTemplate.convertAndSend(RabbitMQConfig.GOLD_COIN_COMPLETE_EXCHANGE, RabbitMQConfig.GOLD_COIN_COMPLETE_QUEUE, JSON.toJSONString(m),new CorrelationData(coinProducerMessage.getMsgId()));
             log.info("发送计算结束的消息: {}", JSON.toJSONString(m));
         });
     }
@@ -61,7 +62,7 @@ public class GoldCoinMessageProducer {
             //延时5秒
             msg.getMessageProperties().setHeader("x-delay", 5 * 1000L);
             return msg;
-        }));
+        },new CorrelationData(teamShareMessage.getMsgId())));
     }
 
     /**

+ 3 - 0
src/main/java/com/xs/core/service/Impl/coin/GoldCoinServiceImpl.java

@@ -273,6 +273,7 @@ public class GoldCoinServiceImpl implements GoldCoinService {
         //获取用户的金币产出状态
         GoldCoinProdState goldCoinProdState = stateService.getGoldCoinProdStateByUser(context.getId());
         CheckUtils.throwIf(Boolean.FALSE.equals(goldCoinProdState.getRunning()), "error.gold.product.not.start");
+        String batchId = goldCoinProdState.getBatchId();
         //获取用户的临时速率
         String rateTemporarilyKey = GoldCoinConstant.GOLD_COIN_RATE_UPGRADES_KEY + context.getId();
         String goldCoinStateKey = GoldCoinConstant.GOLD_COIN_STATE_KEY + context.getId();
@@ -313,6 +314,8 @@ public class GoldCoinServiceImpl implements GoldCoinService {
             record.setUserId(context.getId());
             record.setTemporaryRate(rateNum.toPlainString());
             record.setBoostTime(LocalDateTime.now());
+            record.setBoostTimestamp(epochSecond);
+            record.setBatchId(batchId);
             boostTemporaryRateRecordService.save(record);
         });
     }

+ 6 - 12
src/main/resources/mapper/BoostTemporaryRateRecordMapper.xml

@@ -1,19 +1,13 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="com.xs.core.mapper.coin.BoostTemporaryRateRecordMapper">
-
     <!-- 通用查询映射结果 -->
     <resultMap id="BaseResultMap" type="com.xs.core.model.coin.entity.BoostTemporaryRateRecord">
-        <id column="id" property="id" />
-        <result column="user_id" property="userId" />
-        <result column="boost_timestamp" property="boostTimestamp" />
-        <result column="boost_time" property="boostTime" />
-        <result column="temporary_rate" property="temporaryRate" />
+        <id column="id" property="id"/>
+        <result column="user_id" property="userId"/>
+        <result column="batch_id" property="batchId"/>
+        <result column="boost_timestamp" property="boostTimestamp"/>
+        <result column="boost_time" property="boostTime"/>
+        <result column="temporary_rate" property="temporaryRate"/>
     </resultMap>
-
-    <!-- 通用查询结果列 -->
-    <sql id="Base_Column_List">
-        id, user_id, boost_timestamp, boost_time, temporary_rate
-    </sql>
-
 </mapper>