如何在 Spring Boot 中实现 SAGA 模式?-Spring专区论坛-技术-SpringForAll社区

如何在 Spring Boot 中实现 SAGA 模式?

微服务有其自身的优点和缺点,其中一个缺点是管理分布式事务。如何确保您的事务在所有 4 个任务都成功的情况下成功提交,或者如果任何任务未完成(已完成的任务回滚),则成功失败?Spring Boot 提供了 @Transactional 来管理事务,但这仅适用于单个方法和单个项目。

有一种设计模式可以解决微服务中分布式事务的问题。它最初是由计算机科学家 Hector Gracia Molina 和 Kenneth Salem 提出的,如他们的论文中所建议的,他们为长期事务 (Long Lived Transactions,LLT) 而不是微服务创建了这个系统,但它对微服务非常有用。

论文链接:https://www.cs.cornell.edu/andru/cs711/2002fa/reading/sagas.pdf

长期事务是需要较长时间的事务,可能是几分钟、几小时甚至几天。在此类事务中的所有任务完成之前,您无法锁定数据库,因为它会严重影响应用程序的性能。因此,他们提出了 SAGA 设计模式(命名为 SAGA,可能是因为他们创建它是为了处理长事务 — SAGA 的意思是一个很长的故事)。

SAGA模式

如果您的事务包含 4 个任务,您可以为每个任务(最后一个任务除外)创建一个补偿任务。因此,如果任何任务失败,则运行先前任务的补偿任务以回滚原始效果。

所以如果有四个任务 T1、T2、T3 和 T4,那么你就有三个对应的补偿任务 C1、C2、C3。

假如 T1 和 T2 成功,而 T3 失败,则运行 C2 和 C1 以按后进先出顺序回滚 T1 和 T2 的效果。所以事务内执行顺序是这样的:T1 -> T2 -> T3 (失败) -> C2 -> C1。

您不需要为最后一个任务设置补偿任务,因为如果最后一个任务失败,您只需回滚之前的任务。这称为 Backward Recovery,因为您返回并执行已完成成功任务的补偿任务。如果您的业务使用案例需要,您还可以通过重试 T3 和 T4 来尝试正向恢复。不过,向后恢复更常见。

这就是 SAGA 模式,它可以通过两种方式实现:Choreography 和O rchestration

Choreography 编排:编排意味着任务独立执行。完成一个任务后,它会调用序列中的下一个任务。如果下一个任务失败,则它会为前一个任务调用补偿任务。编排意味着任务由另一个父任务调用。它扮演着编排器的角色。它按顺序调用每个任务,并根据它们的响应决定是调用下一个任务还是补偿任务。在此示例中,让我们使用 Choreography 实现 SAGA。与 Orchestration 相比,它更简单、更整洁。

实现案例

让我们考虑一个电子商务应用程序的示例:客户下订单,订单已发货。这是业务用例。

假设有四种不同的微服务来处理此流程:

  • 处理客户订单的订单微服务。
  • 一个支付微服务,用于处理订单的付款。
  • 一个清单微服务,在下订单后更新清单。
  • 一个运输微服务,用于交付订单。

 

d2b5ca33bd20241226100313

 

请注意,在实际情况下,将这些功能分成四个不同的应用程序可能不是一个好的设计。如果您的用户群很小,您可以在单个整体式应用程序中完成上述所有操作,而不是四个不同的微服务,这将增加您的网络调用和基础设施成本。此外,在整体式架构中处理事务也要容易得多。

不过,对于这个例子,我们将使用这个设计来理解 SAGA 模式。

现在,当客户下订单时,我们来考虑每个微服务中的以下函数:

  • createOrder() — Order microservice
  • processPayment() — Payment microservice
  • updateInventory() — Inventory microservice
  • shipOrder() — Shipping Microservice

当客户下订单并且 createOrder() 、 processPayment() 方法成功且 updateInventory() 方法失败时,系统将具有错误的库存信息。而且客户不会发货!

因此,所有这些任务都必须是单个事务的一部分。您可以使用 SAGA 设计模式来实现分布式事务。要解决上述问题,您可以使用向后恢复回滚整个事务。

您可以为上述每项任务设置一个补偿任务。以下是补偿任务:

  • reverseOrder() — Order microservice
  • reversePayment() — Payment microservice
  • reverseInventory() — Inventory microservice

 

d2b5ca33bd20241226100521

现在,如果 updateInventory() 方法失败,则调用 reversePayment(),然后调用 reverseOrder(),订单将回滚!

代码实现

Order Microservice

我们首先看一下 order 微服务。

Controller:

@RestController
public class OrderController {
    @Autowired
    private OrderRepository repository;
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    @PostMapping("/orders")
    public void createOrder(@RequestBody CustomerOrder customerOrder) {
        OrderEntity order = new OrderEntity();
        try {
            // save order in database
            order.setAmount(customerOrder.getAmount());
            order.setItem(customerOrder.getItem());
            order.setQuantity(customerOrder.getQuantity());
            order.setStatus("CREATED");
            order = this.repository.save(order);
            customerOrder.setOrderId(order.getId());
            // publish order created event for payment microservice to consume.
            OrderEvent event = new OrderEvent();
            event.setOrder(customerOrder);
            event.setType("ORDER_CREATED");
            this.kafkaTemplate.send("new-orders", event);
        } catch (Exception e) {
            order.setStatus("FAILED");
            this.repository.save(order);
        }
    }

}

如您所见,下订单时,我们会在数据库中创建一个条目并将其标记为成功。然后我们使用 Apache Kafka 发布一个 order created 事件,以便支付微服务可以接收该事件并处理付款。如果上述逻辑失败,我们将订单状态标记为失败并将其保存到数据库。

现在让我们看看 order 微服务的补偿任务。如果 payment 微服务发布事件来反转顺序,则将执行此任务。

让我们创建一个类来实现这一点:

@Component
public class ReverseOrder {
    @Autowired
    private OrderRepository repository;
    @KafkaListener(topics = "reversed-orders", groupId = "orders-group")
    public void reverseOrder(String event) {
        try {
            OrderEvent orderEvent = new ObjectMapper().readValue(event, OrderEvent.class);
            Optional<OrderEntity> order = this.repository.findById(orderEvent.getOrder().getOrderId());
            order.ifPresent(o -> {
                o.setStatus("FAILED");
                this.repository.save(o);
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

如您所见,我们在 Apache Kafka 中侦听“反向订单”主题,一旦我们收到该主题下的任何事件,我们就会获取它,从中提取订单详细信息并将数据库中的状态标记为失败(您也可以完全删除该行,但将其标记为失败让我们有机会稍后重试订单并进行分析)。

在我们的示例中,我们使用内存数据库和在本地运行的 Apache Kafka。以下是 order 微服务的配置:

spring.h2.console.enabled=true
spring.datasource.url=jdbc:h2:mem:ordersdb
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=orders-group
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

Payment Microservice 

触发订单事件后,支付微服务会获取该事件,提取订单详细信息,然后从中提取支付信息,然后将其存储在支付数据库中。然后它会触发一个 payment 事件,该事件将被 inventory 微服务选取。

@Controller
public class PaymentController {
    @Autowired
    private PaymentRepository repository;
    @Autowired
    private KafkaTemplate<String, PaymentEvent> kafkaTemplate;
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaOrderTemplate;
    @KafkaListener(topics = "new-orders", groupId = "orders-group")
    public void processPayment(String event) throws JsonMappingException, JsonProcessingException {
        System.out.println("Recieved event" + event);
        OrderEvent orderEvent = new ObjectMapper().readValue(event, OrderEvent.class);
        CustomerOrder order = orderEvent.getOrder();
        Payment payment = new Payment();
        try {
            // save payment details in db
            payment.setAmount(order.getAmount());
            payment.setMode(order.getPaymentMode());
            payment.setOrderId(order.getOrderId());
            payment.setStatus("SUCCESS");
            this.repository.save(payment);
            // publish payment created event for inventory microservice to consume.
            PaymentEvent paymentEvent = new PaymentEvent();
            paymentEvent.setOrder(orderEvent.getOrder());
            paymentEvent.setType("PAYMENT_CREATED");
            this.kafkaTemplate.send("new-payments", paymentEvent);
        } catch (Exception e) {
            payment.setOrderId(order.getOrderId());
            payment.setStatus("FAILED");
            repository.save(payment);
            // reverse previous task
            OrderEvent oe = new OrderEvent();
            oe.setOrder(order);
            oe.setType("ORDER_REVERSED");
            this.kafkaOrderTemplate.send("reversed-orders", orderEvent);
        }
    }
}

此外,当您查看付款是否由于某种原因而失败时,您将状态标记为“失败”并更新数据库。您还可以通过向 Kafka 发布事件来触发 order 服务的补偿任务。

为了撤销付款,我们创建一个补偿任务。

@Component
public class ReversePayment {
    @Autowired
    private PaymentRepository repository;
    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;
    @KafkaListener(topics = "reversed-payments", groupId = "payments-group")
    public void reversePayment(String event) {
        try {
            PaymentEvent paymentEvent = new ObjectMapper().readValue(event, PaymentEvent.class);
            CustomerOrder order = paymentEvent.getOrder();
            // do refund..
            // update status as failed
            Iterable<Payment> payments = this.repository.findByOrderId(order.getOrderId());
            payments.forEach(p -> {
                p.setStatus("FAILED");
                this.repository.save(p);
            });
            // reverse previous task
            OrderEvent orderEvent = new OrderEvent();
            orderEvent.setOrder(paymentEvent.getOrder());
            orderEvent.setType("ORDER_REVERSED");
            this.kafkaTemplate.send("reversed-orders", orderEvent);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

当 inventory 微服务在 failure 时触发上述补偿任务时,我们会在特定订单的 payment 数据库中更新状态 failed 。我们还需要反转为此付款创建的订单,因此我们也触发了 order service 的补偿任务。

Inventory Microservice

与支付微服务类似,在库存微服务中,我们监听支付事件,取货,提取订单详情,然后在数据库中更新库存(我们根据购买的物品数量减少库存)。如果 stock 在 database 中不存在,我们将引发异常。我们可以使用上述错误场景来测试补偿任务稍后是否按预期工作。此外,一旦库存更新,我们就会触发一个新事件,以便货运微服务提取。

@RestController
public class InventoryController {
    @Autowired
    private InventoryRepository repository;
    @Autowired
    private KafkaTemplate<String, InventoryEvent> kafkaTemplate;
    @Autowired
    private KafkaTemplate<String, PaymentEvent> kafkaPaymentTemplate;
    @KafkaListener(topics = "new-payments", groupId = "payments-group")
    public void updateInventory(String paymentEvent) throws JsonMappingException, JsonProcessingException {
        InventoryEvent event = new InventoryEvent();
        PaymentEvent p = new ObjectMapper().readValue(paymentEvent, PaymentEvent.class);
        CustomerOrder order = p.getOrder();
        try {
            // update stock in database
            Iterable<Inventory> inventories = this.repository.findByItem(order.getItem());
            boolean exists = inventories.iterator().hasNext();
            if (!exists)
                throw new Exception("Stock not available");
            inventories.forEach(
                    i -> {
                        i.setQuantity(i.getQuantity() - order.getQuantity());
                        this.repository.save(i);
                    });
            event.setType("INVENTORY_UPDATED");
            event.setOrder(p.getOrder());
            this.kafkaTemplate.send("new-inventory", event);
        } catch (Exception e) {
            // reverse previous task
            PaymentEvent pe = new PaymentEvent();
            pe.setOrder(order);
            pe.setType("PAYMENT_REVERSED");
            this.kafkaPaymentTemplate.send("reversed-payments", pe);
        }
    }
    @PostMapping("/inventory")
    public void addInventory(@RequestBody Stock stock) {
        Iterable<Inventory> items = this.repository.findByItem(stock.getItem());
        if (items.iterator().hasNext()) {
            items.forEach(i -> {
                i.setQuantity(stock.getQuantity() + i.getQuantity());
                this.repository.save(i);
            });
        } else {
            Inventory i = new Inventory();
            i.setItem(stock.getItem());
            i.setQuantity(stock.getQuantity());
            this.repository.save(i);
        }
    }
  
}

如果在更新库存时出现异常,我们会通过向 Kafka 发布事件来触发支付服务的补偿任务。

为了在下一个任务失败时撤消对清单所做的更新,我们在 inventory service 中创建以下补偿任务。我们增加了这里的库存,因为我们之前减少了库存。我们还触发了链中的下一个补偿任务,反向支付任务,它将进行退款并反向支付,然后它自己反过来将调用反向顺序补偿任务。

@RestController
public class ReverseInventory {
    @Autowired
    private InventoryRepository repository;
    @Autowired
    private KafkaTemplate<String, PaymentEvent> kafkaTemplate;
    @KafkaListener(topics = "reversed-inventory", groupId = "inventory-group")
    public void reverseInventory(String event) {
        try {
            InventoryEvent inventoryEvent = new ObjectMapper().readValue(event, InventoryEvent.class);
            Iterable<Inventory> inv = this.repository.findByItem(inventoryEvent.getOrder().getItem());
            inv.forEach(i -> {
                i.setQuantity(i.getQuantity() + inventoryEvent.getOrder().getQuantity());
                this.repository.save(i);
            });
            // reverse previous task
            PaymentEvent paymentEvent = new PaymentEvent();
            paymentEvent.setOrder(inventoryEvent.getOrder());
            paymentEvent.setType("PAYMENT_REVERSED");
            this.kafkaTemplate.send("reversed-payments", paymentEvent);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Shipment Microservice

库存更新后,我们会调用最终服务将商品运送给买家。

@Controller
public class ShipmentController {
    @Autowired
    private ShipmentRepository repository;
    @Autowired
    private KafkaTemplate<String, InventoryEvent> kafkaTemplate;
    @KafkaListener(topics = "new-inventory", groupId = "inventory-group")
    public void shipOrder(String event) throws JsonMappingException, JsonProcessingException {
        Shipment shipment = new Shipment();
        InventoryEvent inventoryEvent = new ObjectMapper().readValue(event, InventoryEvent.class);
        CustomerOrder order = inventoryEvent.getOrder();
        try {
            if (order.getAddress() == null) {
                throw new Exception("Address not present");
            }
            shipment.setAddress(order.getAddress());
            shipment.setOrderId(order.getOrderId());
            shipment.setStatus("success");
            this.repository.save(shipment);
            // do other shipment logic ..
        } catch (Exception e) {
            shipment.setOrderId(order.getOrderId());
            shipment.setStatus("failed");
            this.repository.save(shipment);
            InventoryEvent reverseEvent = new InventoryEvent();
            reverseEvent.setType("INVENTORY_REVERSED");
            System.out.println(order);
            reverseEvent.setOrder(order);
            this.kafkaTemplate.send("reversed-inventory", reverseEvent);
        }
    }
}

我们检查客户的地址,如果为空,则抛出异常,这反过来将触发库存服务的补偿任务,并在数据库中将发货状态标记为失败。如果由于任何其他原因也导致发货失败,我们会按照上述方式进行。货件服务不需要任何补偿任务,因为它是流程中的最后一项服务。如果失败,我们只触发前一个服务 (inventory service) 的补偿任务。

 

请登录后发表评论

    没有回复内容