单服务操作多数据库情况
多服务操作单数据库情况
多服务操作多数据库,服务a和服务b之间相互调用情况
优点:
缺点:
三个阶段的具体内容:
所有参与者成功反馈
最终一致性
RocketMQ是一种最终一致性的分布式事务
Half Message(半消息)
是指暂不能被Consumer消费的消息暂不能投递
二次确认
消息回查
长期处于半消息的消息
Commit或者Rollback 消息回查
流程说明:
实战场景:
<!--开启web服务-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--jdbc-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jdbc</artifactId>
</dependency>
<!--mysql驱动连接-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
<!--工具包-->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.22</version>
</dependency>
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
JdbcTemplate jdbcTemplate;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
@Transactional
public Integer createOrder(OrderDto orderDto) {
String sql = "insert into t_order(order_id,shop_id) values(?,?)";
int result = jdbcTemplate.update(sql, new Object[]{orderDto.getOrderId(), orderDto.getShopId()});
return result;
}
@Override
public String createOrderId(Integer id) {
//生成订单id
String order_id = UUID.randomUUID().toString();
//通过商品信息生成订单信息
OrderDto orderDto = new OrderDto(order_id, id);
//组织生成mq消息
MessageBuilder<String> tMessageBuilder = MessageBuilder.withPayload(JSONUtil.toJsonStr(orderDto));
Message message = tMessageBuilder.build();
//该消息不会被消费
rocketMQTemplate.sendMessageInTransaction("springboot-producer-group", message, null);
return order_id;
}
@Override
public Integer queryOrder(String orderId) {
String sql = "select count(1) from t_order where order_id=?";
//根据订单查询是否存在订单信息
Integer result = jdbcTemplate.queryForObject(sql, Integer.class, orderId);
return result;
}
}
@Component
@RocketMQTransactionListener
public class OrderProducerListener implements RocketMQLocalTransactionListener {
private static final Logger logger = LoggerFactory.getLogger(OrderProducerListener.class);
@Autowired
OrderService orderService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
logger.info("开始执行本地事务{}", JSONUtil.toJsonStr(message));
//根据不同的header信息分发到不同的业务场景中
//MessageHeaders messageHeaders= message.getHeaders();
//messageHeaders.get("");
String msgJson = new String((byte[])message.getPayload());
OrderDto orderDto=JSONUtil.toBean(msgJson,OrderDto.class);
if (orderDto == null) {
logger.info("订单数据为空事务状态设置为ROLLBACK。。。");
return RocketMQLocalTransactionState.ROLLBACK;
}
try {
//执行生成订单业务
Integer result = orderService.createOrder(orderDto);
if (result > 0) {
//数据执行成功则确认消息
logger.info("插入订单数据成功{}", result);
//模拟发生异常
//int i=1/0;
return RocketMQLocalTransactionState.COMMIT;
} else {
//数据执行失败则回滚消息
logger.info("插入订单数据失败{}", result);
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
//发生异常进行回滚
logger.error("执行本地事务发生异常:{}", e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
logger.info("开始回查本地事务{}", JSONUtil.toJsonStr(message));
try {
String msgJson = new String((byte[])message.getPayload());
//TODO
if (ObjectUtil.isNotNull(msgJson)) {
OrderDto orderDto=JSONUtil.toBean(msgJson,OrderDto.class);
//回查时检查是否生成订单数据
Integer result = orderService.queryOrder(orderDto.getOrderId());
if (result > 0) {
logger.info("数据已插入{}", result);
return RocketMQLocalTransactionState.COMMIT;
} else {
//如果没有数据可以重新提交或者进行其他处理
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}catch (Exception e){
logger.error("检查本地事务发生异常:{}",e);
return RocketMQLocalTransactionState.ROLLBACK;
}
logger.info("结束本地事务状态查询:{}", JSONUtil.toJsonStr(message));
return RocketMQLocalTransactionState.UNKNOWN;
}
}
@Service
public class DistributionServiceImpl implements DistributionService {
private final List<Integer> distributorList = new ArrayList<Integer>(){
{
//模拟配送员id
add(1);
add(2);
add(3);
}
};
@Autowired
JdbcTemplate jdbcTemplate;
@Override
@Transactional
public Integer distribution(String orderId) {
String sql="insert into order_distribution (order_id,distributor) values (?,?)";
//随机获取配送员id
Integer distributor=distributorList.get(RandomUtil.randomInt(0,distributorList.size()));
//插入数据
int updateResult=jdbcTemplate.update(sql,new Object[]{orderId,distributor});
return updateResult;
}
}
@RocketMQMessageListener(topic = "springboot-producer-group", consumerGroup = "spring_boot_distribution", selectorExpression = "*")
@Service
public class DistributionLisTener implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
private static final Logger logger = LoggerFactory.getLogger(DistributionLisTener.class);
@Autowired
DistributionService distributionService;
@Override
public void onMessage(String o) {
logger.info("开始消费消息:{}", o);
Map<String, Object> msgMap = JSONUtil.toBean(o, Map.class);
//模拟异常
int i = 1 / 0;
//执行分配业务
distributionService.distribution((String) msgMap.get("orderId"));
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
// 设置最大重试次数
defaultMQPushConsumer.setMaxReconsumeTimes(2);
}
}
3.7 执行流程
订单服务:
派送服务:
为什么要先发送Half Message(半消息)
什么情况会回查
为什么说MQ是最终一致性事务
没有回复内容