使用 Spring Boot + Temporal 构建复杂的分布式工作流应用-Spring专区论坛-技术-SpringForAll社区

使用 Spring Boot + Temporal 构建复杂的分布式工作流应用

构建可靠的分布式应用程序可能会面临许多挑战。系统崩溃、网络故障以及进程在执行过程中可能会出现卡顿。这正是 Temporal 发挥作用的地方。它是一个开源的工作流编排平台。当与 Spring Boot 结合使用时,它能够帮助您构建强大的应用程序,这些应用程序能够应对现实世界中的各种挑战。

为什么需要 Temporal?

如果您曾经使用过分布式系统,您可能会遇到流程故障。这些故障可能导致系统不一致。

例如,它们可能处理了付款但未更新订单,或者发送了一封电子邮件却没有更新数据库。Temporal 可以解决这些问题,它提供持久执行、自动重试和状态管理功能。

Spring Boot + Temporal 为您提供了处理复杂流程的工具,同时保留了您熟悉的 Spring 生态系统。您可以享受两者的优势,既能获得 Spring 的依赖注入和配置功能,又能享受到 Temporal 的工作流管理能力。

设置环境

在我们详细分析代码之前,首先需要配置我们的开发环境。我们需要两个要素:一个包含 Temporal 依赖项的 Spring Boot 应用程序,以及一个正在运行的 Temporal 服务器。

首先,让我们创建具有必要依赖项的项目:

pom.xml :

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.4.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>me.vrnsky</groupId>
    <artifactId>temporal-spring-boot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>temporal-spring-boot</name>
    <description>temporal-spring-boot</description>
    <properties>
        <java.version>17</java.version>
        <temporal.version>1.17.0</temporal.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>io.temporal</groupId>
            <artifactId>temporal-sdk</artifactId>
            <version>${temporal.version}</version>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <annotationProcessorPaths>
                        <path>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                            <version>1.18.32</version>
                        </path>
                    </annotationProcessorPaths>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

接下来,我们需要一个正在运行的 Temporal 服务器。

使用 Docker 开始最简单:

version: '3.5'
services:
  postgresql:
    image: postgres:13
    environment:
      POSTGRES_USER: temporal
      POSTGRES_PASSWORD: temporal
    ports:
      - "5432:5432"
    networks:
      - temporal-network

  temporal:
    image: temporalio/auto-setup:1.20.0
    depends_on:
      - postgresql
    environment:
      - DB=postgresql
      - DB_PORT=5432
      - POSTGRES_USER=temporal
      - POSTGRES_PWD=temporal
      - POSTGRES_SEEDS=postgresql
    ports:
      - "7233:7233"
    networks:
      - temporal-network

  temporal-web:
    image: temporalio/web:1.15.0
    environment:
      - TEMPORAL_GRPC_ENDPOINT=temporal:7233
      - TEMPORAL_PERMIT_WRITE_API=true
    ports:
      - "8088:8088"
    depends_on:
      - temporal
    networks:
      - temporal-network

networks:
  temporal-network:
    driver: bridge

 Temporal 的工作原理

d2b5ca33bd20241226112232

将 Temporal 看作是一个复杂的任务管理器,用于分布式进程。当您的 Spring Boot 应用程序需要启动工作流时,它会通过 WorkflowClient 与 Temporal 进行通信。随后,Temporal 会管理整个流程,确保每个步骤都能成功完成。即使在系统崩溃或网络故障的情况下,它也能维护工作流的状态。

案例学习

让我们来构建一个实用的系统——订单处理系统。这是一个常见的应用场景,可能会出现很多问题:付款可能失败、库存可能不足或运输服务可能暂停。工作流程将如下所示:

d2b5ca33bd20241226112448

现在,让我们将此工作流放入代码中

import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;

@WorkflowInterface
public interface OrderProcessingWorkflow {
    @WorkflowMethod
    void processOrder(String orderId);
}
import io.temporal.activity.ActivityInterface;

@ActivityInterface
public interface OrderActivity {

    void validateOrder(String orderId);

    void processPayment(String orderId);

    void shipOrder(String orderId);

    void sendConfirmation(String orderId);
}

这是工作流程的实现。

import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
import me.vrnsky.temporalspringboot.activity.OrderActivity;

import java.time.Duration;

public class OrderProcessWorkflowImpl implements OrderProcessingWorkflow {
    private final ActivityOptions options = ActivityOptions.newBuilder()
            .setScheduleToCloseTimeout(Duration.ofMinutes(5))
            .build();

    private final OrderActivity orderActivity = Workflow.newActivityStub(OrderActivity.class, options);

    @Override
    public void processOrder(String orderId) {
        try {
            orderActivity.validateOrder(orderId);
            orderActivity.processPayment(orderId);
            orderActivity.shipOrder(orderId);
            orderActivity.sendConfirmation(orderId);
        } catch (Exception e) {
            throw e;
        }
    }
}

我们还需要实现活动界面并配置 Workflow 客户端。

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class OrderActivityImpl implements OrderActivity {

    @Override
    public void validateOrder(String orderId) {
        log.info("Validating order {}", orderId);
    }

    @Override
    public void processPayment(String orderId) {
        log.info("Processing payment {}", orderId);
    }

    @Override
    public void shipOrder(String orderId) {
        log.info("Shipping order {}", orderId);
    }

    @Override
    public void sendConfirmation(String orderId) {
        log.info("Sending confirmation {}", orderId);
    }
}
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TemporalConfig {

    @Bean
    public WorkflowClient workflowClient() {
        WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
        return WorkflowClient.newInstance(service);
    }
}

此时,我们可以添加一个 Controller,并通过手动过程测试我们的工作流程。

import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import lombok.RequiredArgsConstructor;
import me.vrnsky.temporalspringboot.workflow.OrderProcessingWorkflow;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequiredArgsConstructor
public class OrderController {

    private final WorkflowClient workflowClient;

    @PostMapping("/orders/{orderId}/process")
    public ResponseEntity<String> processOrder(@PathVariable String orderId) {
        OrderProcessingWorkflow workflow = workflowClient.newWorkflowStub(
                OrderProcessingWorkflow.class,
                WorkflowOptions.newBuilder()
                        .setTaskQueue("OrderProcessingQueue")
                        .setWorkflowId("Order-" + orderId)
                        .build());

        WorkflowClient.start(workflow::processOrder, orderId);
        return ResponseEntity.accepted().body("Order processing started");
    }
}

使用以下 cURL 命令发送请求测试一下:

curl -X POST http://localhost:8080/orders/1/process

可以看到 Order processing started 的请求响应。同时,在 Web UI 中,您应该会看到工作流正在运行:

d2b5ca33bd20241226112807

由于没有工作程序来轮询和执行工作流程,因此工作流程一直在运行。让我们添加一个新的 worker。

import io.temporal.client.WorkflowClient;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import me.vrnsky.temporalspringboot.activity.OrderActivityImpl;
import me.vrnsky.temporalspringboot.workflow.OrderProcessWorkflowImpl;
import org.springframework.stereotype.Component;

@Component
public class OrderWorker {
    private final Worker worker;

    public OrderWorker(WorkflowClient workflowClient) {
        WorkerFactory factory = WorkerFactory.newInstance(workflowClient);
        this.worker = factory.newWorker("OrderProcessingQueue");

        worker.registerWorkflowImplementationTypes(OrderProcessWorkflowImpl.class);
        worker.registerActivitiesImplementations(new OrderActivityImpl());
        factory.start();
    }
}

现在,如果您再次运行 Spring Boot 应用程序,您应该会看到工作流已完成。

d2b5ca33bd20241226112912

这种实现的好处在于 Temporal 为您解决了分布式系统中的所有复杂问题。如果任何活动出现失败,Temporal 会根据您的配置对该活动进行重试。如果您的应用程序在处理过程中崩溃,Temporal 会在您重新启动时从上次中断的地方继续。

失败处理

让我们再优化下工作流,以包含补偿逻辑

import io.temporal.workflow.QueryMethod;
import io.temporal.workflow.SignalMethod;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import me.vrnsky.temporalspringboot.model.OrderResult;
import me.vrnsky.temporalspringboot.model.OrderStatus;

@WorkflowInterface
public interface OrderProcessingWorkflow {
    @WorkflowMethod
    OrderResult processOrder(String orderId);

    @QueryMethod
    OrderStatus getOrderStatus();

    @SignalMethod
    void cancelOrder(String reason);
}
public record OrderResult(
        String orderId,
        OrderStatus status,
        String message
) {
}
public enum OrderStatus {
    CREATED,
    VALIDATED,
    PAYMENT_PROCESSED,
    SHIPPED,
    COMPLETED,
    FAILED,
    COMPENSATING,
    CANCELLED
}
public enum AlertLevel {
    INFO,
    WARNING,
    ERROR,
    CRITICAL
}
import io.temporal.activity.ActivityInterface;
import me.vrnsky.temporalspringboot.model.AlertLevel;
import me.vrnsky.temporalspringboot.model.OrderStatus;

@ActivityInterface
public interface OrderActivity {

    void validateOrder(String orderId);

    void processPayment(String orderId);

    void shipOrder(String orderId);

    void sendConfirmation(String orderId);
    
    void refundPayment(String orderId);
    
    void cancelShipment(String orderId);
    
    void sendCancellationNotification(String orderId, String reason);
    
    void logOrderEvent(String orderId, String event, OrderStatus status);
    void sendAlert(String orderId, AlertLevel level, String message);
}

实现 OrderActivity

import lombok.extern.log4j.Log4j2;
import lombok.extern.slf4j.Slf4j;
import me.vrnsky.temporalspringboot.model.AlertLevel;
import me.vrnsky.temporalspringboot.model.OrderStatus;

@Log4j2
public class OrderActivityImpl implements OrderActivity {

    @Override
    public void validateOrder(String orderId) {
        log.info("Validating order {}", orderId);
    }

    @Override
    public void processPayment(String orderId) {
        log.info("Processing payment {}", orderId);
    }

    @Override
    public void shipOrder(String orderId) {
        log.info("Shipping order {}", orderId);
    }

    @Override
    public void sendConfirmation(String orderId) {
        log.info("Sending confirmation {}", orderId);
    }

    @Override
    public void refundPayment(String orderId) {
        log.info("Refunding payment {}", orderId);
    }

    @Override
    public void cancelShipment(String orderId) {
        log.info("Cancelling shipment {}", orderId);
    }

    @Override
    public void sendCancellationNotification(String orderId, String reason) {
        log.info("Cancelling notification {}", orderId);
    }

    @Override
    public void logOrderEvent(String orderId, String event, OrderStatus status) {
        log.info("Order {} event {}", orderId, event);
    }

    @Override
    public void sendAlert(String orderId, AlertLevel level, String message) {
        log.info("Sending alert {}", orderId);
    }
}

现在,让我们使用补偿逻辑和监控来构建增强的工作流程。

import io.temporal.activity.ActivityOptions;
import io.temporal.common.RetryOptions;
import io.temporal.workflow.Workflow;
import me.vrnsky.temporalspringboot.activity.OrderActivity;
import me.vrnsky.temporalspringboot.model.AlertLevel;
import me.vrnsky.temporalspringboot.model.OrderResult;
import me.vrnsky.temporalspringboot.model.OrderStatus;

import java.time.Duration;

public class OrderProcessWorkflowImpl implements OrderProcessingWorkflow {
    private OrderStatus currentStatus = OrderStatus.CREATED;
    private boolean paymentProcessed = false;
    private boolean shipmentCreated = false;

    private final ActivityOptions options = ActivityOptions.newBuilder()
            .setScheduleToCloseTimeout(Duration.ofMinutes(5))
            .setRetryOptions(RetryOptions.newBuilder()
                    .setMaximumAttempts(3)
                    .setInitialInterval(Duration.ofSeconds(1))
                    .setMaximumInterval(Duration.ofSeconds(10))
                    .build())
            .build();

    private final OrderActivity orderActivity = Workflow.newActivityStub(OrderActivity.class, options);


    @Override
    public OrderResult processOrder(String orderId) {
        try {
            try {
                orderActivity.validateOrder(orderId);
                currentStatus = OrderStatus.VALIDATED;
                orderActivity.logOrderEvent(orderId, "Order validated", currentStatus);
            } catch (Exception e) {
                handleFailure(orderId, "Order validation failed", e);
                return new OrderResult(orderId, currentStatus, "Validation failed: " + e.getMessage());
            }

            // Process Payment
            try {
                orderActivity.processPayment(orderId);
                paymentProcessed = true;
                currentStatus = OrderStatus.PAYMENT_PROCESSED;
                orderActivity.logOrderEvent(orderId, "Payment processed", currentStatus);
            } catch (Exception e) {
                handleFailure(orderId, "Payment processing failed", e);
                return new OrderResult(orderId, currentStatus, "Payment failed: " + e.getMessage());
            }

            // Ship Order
            try {
                orderActivity.shipOrder(orderId);
                shipmentCreated = true;
                currentStatus = OrderStatus.SHIPPED;
                orderActivity.logOrderEvent(orderId, "Order shipped", currentStatus);
            } catch (Exception e) {
                handleFailure(orderId, "Shipping failed", e);
                return new OrderResult(orderId, currentStatus, "Shipping failed: " + e.getMessage());
            }

            // Send Confirmation
            try {
                orderActivity.sendConfirmation(orderId);
                currentStatus = OrderStatus.COMPLETED;
                orderActivity.logOrderEvent(orderId, "Order completed", currentStatus);
                return new OrderResult(orderId, currentStatus, "Order completed successfully");
            } catch (Exception e) {
                handleFailure(orderId, "Confirmation failed", e);
                return new OrderResult(orderId, currentStatus, "Confirmation failed: " + e.getMessage());
            }

        } catch (Exception e) {
            handleFailure(orderId, "Unexpected error", e);
            return new OrderResult(orderId, currentStatus, "Unexpected error: " + e.getMessage());
        }
    }

    @Override
    public OrderStatus getOrderStatus() {
        return currentStatus;
    }

    @Override
    public void cancelOrder(String reason) {
        if (currentStatus == OrderStatus.COMPLETED || currentStatus == OrderStatus.CANCELLED) {
            throw new IllegalStateException("Cannot cancel completed or already cancelled order");
        }
        compensate(Workflow.getInfo().getWorkflowId(), reason);
    }

    private void handleFailure(String orderId, String message, Exception e) {
        currentStatus = OrderStatus.COMPENSATING;
        orderActivity.logOrderEvent(orderId, message, currentStatus);
        orderActivity.sendAlert(orderId, AlertLevel.ERROR, message + ":" + e.getMessage());
    }

    private void compensate(String orderId, String reason) {
        try {
            if (shipmentCreated) {
                orderActivity.cancelShipment(orderId);
                orderActivity.logOrderEvent(orderId, "Shipment cancelled", currentStatus);
            }

            if (paymentProcessed) {
                orderActivity.refundPayment(orderId);
                orderActivity.logOrderEvent(orderId, "Payment refunded", currentStatus);
            }

            orderActivity.sendCancellationNotification(orderId, reason);
            currentStatus = OrderStatus.CANCELLED;
            orderActivity.logOrderEvent(orderId, "Order cancelled", currentStatus);
        } catch (Exception e) {
            currentStatus = OrderStatus.FAILED;
            orderActivity.sendAlert(orderId, AlertLevel.CRITICAL, "Compensation failed: " + e.getMessage());
        }
    }

}

总结

Spring Boot 和 Temporal 为构建复杂分布式应用程序提供了强大的基础支持。我们构建的示例展示了其主要优势:

  • 自动重试机制和状态持久性
  • 平滑的管理故障和补偿逻辑
  • 工作流和活动之间有明确的分离

在执行复杂的业务流程时,请谨慎选择工具。Temporal 的工作流编排增强了 Spring Boot 的依赖注入和配置功能。这使得开发人员能够专注于业务逻辑,而 Temporal 则负责处理分布式系统的可靠性问题。

 

请登录后发表评论

    没有回复内容