微服务有其自身的优点和缺点,其中一个缺点是管理分布式事务。如何确保您的事务在所有 4 个任务都成功的情况下成功提交,或者如果任何任务未完成(已完成的任务回滚),则成功失败?Spring Boot 提供了 @Transactional 来管理事务,但这仅适用于单个方法和单个项目。
有一种设计模式可以解决微服务中分布式事务的问题。它最初是由计算机科学家 Hector Gracia Molina 和 Kenneth Salem 提出的,如他们的论文中所建议的,他们为长期事务 (Long Lived Transactions,LLT) 而不是微服务创建了这个系统,但它对微服务非常有用。
论文链接:https://www.cs.cornell.edu/andru/cs711/2002fa/reading/sagas.pdf
长期事务是需要较长时间的事务,可能是几分钟、几小时甚至几天。在此类事务中的所有任务完成之前,您无法锁定数据库,因为它会严重影响应用程序的性能。因此,他们提出了 SAGA 设计模式(命名为 SAGA,可能是因为他们创建它是为了处理长事务 — SAGA 的意思是一个很长的故事)。
SAGA模式
如果您的事务包含 4 个任务,您可以为每个任务(最后一个任务除外)创建一个补偿任务。因此,如果任何任务失败,则运行先前任务的补偿任务以回滚原始效果。
所以如果有四个任务 T1、T2、T3 和 T4,那么你就有三个对应的补偿任务 C1、C2、C3。
假如 T1 和 T2 成功,而 T3 失败,则运行 C2 和 C1 以按后进先出顺序回滚 T1 和 T2 的效果。所以事务内执行顺序是这样的:T1 -> T2 -> T3 (失败) -> C2 -> C1。
您不需要为最后一个任务设置补偿任务,因为如果最后一个任务失败,您只需回滚之前的任务。这称为 Backward Recovery,因为您返回并执行已完成成功任务的补偿任务。如果您的业务使用案例需要,您还可以通过重试 T3 和 T4 来尝试正向恢复。不过,向后恢复更常见。
这就是 SAGA 模式,它可以通过两种方式实现:Choreography 和O rchestration
Choreography 编排:编排意味着任务独立执行。完成一个任务后,它会调用序列中的下一个任务。如果下一个任务失败,则它会为前一个任务调用补偿任务。编排意味着任务由另一个父任务调用。它扮演着编排器的角色。它按顺序调用每个任务,并根据它们的响应决定是调用下一个任务还是补偿任务。在此示例中,让我们使用 Choreography 实现 SAGA。与 Orchestration 相比,它更简单、更整洁。
实现案例
让我们考虑一个电子商务应用程序的示例:客户下订单,订单已发货。这是业务用例。
假设有四种不同的微服务来处理此流程:
- 处理客户订单的订单微服务。
- 一个支付微服务,用于处理订单的付款。
- 一个清单微服务,在下订单后更新清单。
- 一个运输微服务,用于交付订单。
请注意,在实际情况下,将这些功能分成四个不同的应用程序可能不是一个好的设计。如果您的用户群很小,您可以在单个整体式应用程序中完成上述所有操作,而不是四个不同的微服务,这将增加您的网络调用和基础设施成本。此外,在整体式架构中处理事务也要容易得多。
不过,对于这个例子,我们将使用这个设计来理解 SAGA 模式。
现在,当客户下订单时,我们来考虑每个微服务中的以下函数:
- createOrder() — Order microservice
- processPayment() — Payment microservice
- updateInventory() — Inventory microservice
- shipOrder() — Shipping Microservice
当客户下订单并且 createOrder() 、 processPayment() 方法成功且 updateInventory() 方法失败时,系统将具有错误的库存信息。而且客户不会发货!
因此,所有这些任务都必须是单个事务的一部分。您可以使用 SAGA 设计模式来实现分布式事务。要解决上述问题,您可以使用向后恢复回滚整个事务。
您可以为上述每项任务设置一个补偿任务。以下是补偿任务:
- reverseOrder() — Order microservice
- reversePayment() — Payment microservice
- reverseInventory() — Inventory microservice
现在,如果 updateInventory() 方法失败,则调用 reversePayment(),然后调用 reverseOrder(),订单将回滚!
代码实现
Order Microservice
我们首先看一下 order 微服务。
Controller:
@RestController
public class OrderController {
@Autowired
private OrderRepository repository;
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@PostMapping("/orders")
public void createOrder(@RequestBody CustomerOrder customerOrder) {
OrderEntity order = new OrderEntity();
try {
// save order in database
order.setAmount(customerOrder.getAmount());
order.setItem(customerOrder.getItem());
order.setQuantity(customerOrder.getQuantity());
order.setStatus("CREATED");
order = this.repository.save(order);
customerOrder.setOrderId(order.getId());
// publish order created event for payment microservice to consume.
OrderEvent event = new OrderEvent();
event.setOrder(customerOrder);
event.setType("ORDER_CREATED");
this.kafkaTemplate.send("new-orders", event);
} catch (Exception e) {
order.setStatus("FAILED");
this.repository.save(order);
}
}
}
如您所见,下订单时,我们会在数据库中创建一个条目并将其标记为成功。然后我们使用 Apache Kafka 发布一个 order created 事件,以便支付微服务可以接收该事件并处理付款。如果上述逻辑失败,我们将订单状态标记为失败并将其保存到数据库。
现在让我们看看 order 微服务的补偿任务。如果 payment 微服务发布事件来反转顺序,则将执行此任务。
让我们创建一个类来实现这一点:
@Component
public class ReverseOrder {
@Autowired
private OrderRepository repository;
@KafkaListener(topics = "reversed-orders", groupId = "orders-group")
public void reverseOrder(String event) {
try {
OrderEvent orderEvent = new ObjectMapper().readValue(event, OrderEvent.class);
Optional<OrderEntity> order = this.repository.findById(orderEvent.getOrder().getOrderId());
order.ifPresent(o -> {
o.setStatus("FAILED");
this.repository.save(o);
});
} catch (Exception e) {
e.printStackTrace();
}
}
}
如您所见,我们在 Apache Kafka 中侦听“反向订单”主题,一旦我们收到该主题下的任何事件,我们就会获取它,从中提取订单详细信息并将数据库中的状态标记为失败(您也可以完全删除该行,但将其标记为失败让我们有机会稍后重试订单并进行分析)。
在我们的示例中,我们使用内存数据库和在本地运行的 Apache Kafka。以下是 order 微服务的配置:
spring.h2.console.enabled=true
spring.datasource.url=jdbc:h2:mem:ordersdb
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=orders-group
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
Payment Microservice
触发订单事件后,支付微服务会获取该事件,提取订单详细信息,然后从中提取支付信息,然后将其存储在支付数据库中。然后它会触发一个 payment 事件,该事件将被 inventory 微服务选取。
@Controller
public class PaymentController {
@Autowired
private PaymentRepository repository;
@Autowired
private KafkaTemplate<String, PaymentEvent> kafkaTemplate;
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaOrderTemplate;
@KafkaListener(topics = "new-orders", groupId = "orders-group")
public void processPayment(String event) throws JsonMappingException, JsonProcessingException {
System.out.println("Recieved event" + event);
OrderEvent orderEvent = new ObjectMapper().readValue(event, OrderEvent.class);
CustomerOrder order = orderEvent.getOrder();
Payment payment = new Payment();
try {
// save payment details in db
payment.setAmount(order.getAmount());
payment.setMode(order.getPaymentMode());
payment.setOrderId(order.getOrderId());
payment.setStatus("SUCCESS");
this.repository.save(payment);
// publish payment created event for inventory microservice to consume.
PaymentEvent paymentEvent = new PaymentEvent();
paymentEvent.setOrder(orderEvent.getOrder());
paymentEvent.setType("PAYMENT_CREATED");
this.kafkaTemplate.send("new-payments", paymentEvent);
} catch (Exception e) {
payment.setOrderId(order.getOrderId());
payment.setStatus("FAILED");
repository.save(payment);
// reverse previous task
OrderEvent oe = new OrderEvent();
oe.setOrder(order);
oe.setType("ORDER_REVERSED");
this.kafkaOrderTemplate.send("reversed-orders", orderEvent);
}
}
}
此外,当您查看付款是否由于某种原因而失败时,您将状态标记为“失败”并更新数据库。您还可以通过向 Kafka 发布事件来触发 order 服务的补偿任务。
为了撤销付款,我们创建一个补偿任务。
@Component
public class ReversePayment {
@Autowired
private PaymentRepository repository;
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@KafkaListener(topics = "reversed-payments", groupId = "payments-group")
public void reversePayment(String event) {
try {
PaymentEvent paymentEvent = new ObjectMapper().readValue(event, PaymentEvent.class);
CustomerOrder order = paymentEvent.getOrder();
// do refund..
// update status as failed
Iterable<Payment> payments = this.repository.findByOrderId(order.getOrderId());
payments.forEach(p -> {
p.setStatus("FAILED");
this.repository.save(p);
});
// reverse previous task
OrderEvent orderEvent = new OrderEvent();
orderEvent.setOrder(paymentEvent.getOrder());
orderEvent.setType("ORDER_REVERSED");
this.kafkaTemplate.send("reversed-orders", orderEvent);
} catch (Exception e) {
e.printStackTrace();
}
}
}
当 inventory 微服务在 failure 时触发上述补偿任务时,我们会在特定订单的 payment 数据库中更新状态 failed 。我们还需要反转为此付款创建的订单,因此我们也触发了 order service 的补偿任务。
Inventory Microservice
与支付微服务类似,在库存微服务中,我们监听支付事件,取货,提取订单详情,然后在数据库中更新库存(我们根据购买的物品数量减少库存)。如果 stock 在 database 中不存在,我们将引发异常。我们可以使用上述错误场景来测试补偿任务稍后是否按预期工作。此外,一旦库存更新,我们就会触发一个新事件,以便货运微服务提取。
@RestController
public class InventoryController {
@Autowired
private InventoryRepository repository;
@Autowired
private KafkaTemplate<String, InventoryEvent> kafkaTemplate;
@Autowired
private KafkaTemplate<String, PaymentEvent> kafkaPaymentTemplate;
@KafkaListener(topics = "new-payments", groupId = "payments-group")
public void updateInventory(String paymentEvent) throws JsonMappingException, JsonProcessingException {
InventoryEvent event = new InventoryEvent();
PaymentEvent p = new ObjectMapper().readValue(paymentEvent, PaymentEvent.class);
CustomerOrder order = p.getOrder();
try {
// update stock in database
Iterable<Inventory> inventories = this.repository.findByItem(order.getItem());
boolean exists = inventories.iterator().hasNext();
if (!exists)
throw new Exception("Stock not available");
inventories.forEach(
i -> {
i.setQuantity(i.getQuantity() - order.getQuantity());
this.repository.save(i);
});
event.setType("INVENTORY_UPDATED");
event.setOrder(p.getOrder());
this.kafkaTemplate.send("new-inventory", event);
} catch (Exception e) {
// reverse previous task
PaymentEvent pe = new PaymentEvent();
pe.setOrder(order);
pe.setType("PAYMENT_REVERSED");
this.kafkaPaymentTemplate.send("reversed-payments", pe);
}
}
@PostMapping("/inventory")
public void addInventory(@RequestBody Stock stock) {
Iterable<Inventory> items = this.repository.findByItem(stock.getItem());
if (items.iterator().hasNext()) {
items.forEach(i -> {
i.setQuantity(stock.getQuantity() + i.getQuantity());
this.repository.save(i);
});
} else {
Inventory i = new Inventory();
i.setItem(stock.getItem());
i.setQuantity(stock.getQuantity());
this.repository.save(i);
}
}
}
如果在更新库存时出现异常,我们会通过向 Kafka 发布事件来触发支付服务的补偿任务。
为了在下一个任务失败时撤消对清单所做的更新,我们在 inventory service 中创建以下补偿任务。我们增加了这里的库存,因为我们之前减少了库存。我们还触发了链中的下一个补偿任务,反向支付任务,它将进行退款并反向支付,然后它自己反过来将调用反向顺序补偿任务。
@RestController
public class ReverseInventory {
@Autowired
private InventoryRepository repository;
@Autowired
private KafkaTemplate<String, PaymentEvent> kafkaTemplate;
@KafkaListener(topics = "reversed-inventory", groupId = "inventory-group")
public void reverseInventory(String event) {
try {
InventoryEvent inventoryEvent = new ObjectMapper().readValue(event, InventoryEvent.class);
Iterable<Inventory> inv = this.repository.findByItem(inventoryEvent.getOrder().getItem());
inv.forEach(i -> {
i.setQuantity(i.getQuantity() + inventoryEvent.getOrder().getQuantity());
this.repository.save(i);
});
// reverse previous task
PaymentEvent paymentEvent = new PaymentEvent();
paymentEvent.setOrder(inventoryEvent.getOrder());
paymentEvent.setType("PAYMENT_REVERSED");
this.kafkaTemplate.send("reversed-payments", paymentEvent);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Shipment Microservice
库存更新后,我们会调用最终服务将商品运送给买家。
@Controller
public class ShipmentController {
@Autowired
private ShipmentRepository repository;
@Autowired
private KafkaTemplate<String, InventoryEvent> kafkaTemplate;
@KafkaListener(topics = "new-inventory", groupId = "inventory-group")
public void shipOrder(String event) throws JsonMappingException, JsonProcessingException {
Shipment shipment = new Shipment();
InventoryEvent inventoryEvent = new ObjectMapper().readValue(event, InventoryEvent.class);
CustomerOrder order = inventoryEvent.getOrder();
try {
if (order.getAddress() == null) {
throw new Exception("Address not present");
}
shipment.setAddress(order.getAddress());
shipment.setOrderId(order.getOrderId());
shipment.setStatus("success");
this.repository.save(shipment);
// do other shipment logic ..
} catch (Exception e) {
shipment.setOrderId(order.getOrderId());
shipment.setStatus("failed");
this.repository.save(shipment);
InventoryEvent reverseEvent = new InventoryEvent();
reverseEvent.setType("INVENTORY_REVERSED");
System.out.println(order);
reverseEvent.setOrder(order);
this.kafkaTemplate.send("reversed-inventory", reverseEvent);
}
}
}
我们检查客户的地址,如果为空,则抛出异常,这反过来将触发库存服务的补偿任务,并在数据库中将发货状态标记为失败。如果由于任何其他原因也导致发货失败,我们会按照上述方式进行。货件服务不需要任何补偿任务,因为它是流程中的最后一项服务。如果失败,我们只触发前一个服务 (inventory service) 的补偿任务。
没有回复内容