在Spring Boot中使用Spring Integration对RabbitMQ消息进行分组

在构建微服务应用时,您可能会遇到需要批量处理传入消息的场景,例如创建ETL记录或批量更新数据库等。Spring Integration正是为此类需求量身定制的解决方案。

Spring Integration作为Spring框架的扩展,提供了丰富的企业集成模式,使得Spring应用间的模块通信和消息处理变得简单高效。借助这一框架,您不仅可以轻松实现不同模块或服务间的通信,还能在消息处理前对其进行灵活的操控。

本文将通过实例详细展示如何利用Spring Integration对RabbitMQ消息进行分组和处理。假设我们正在开发一个日志收集服务,该服务需要逐条接收消息,并按数量或类型进行分组后再行处理。让我们深入探讨这一过程。

依赖项

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>3.3.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>3.3.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
    <version>6.3.1</version>
</dependency>

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-http</artifactId>
    <version>6.3.1</version>
</dependency>

消息结构

我们的日志消息包含message、userId和type属性。这些消息将通过RabbitMQ发送,并通过分组进行处理。其结构如下所示:

public class LogDto {
    private String message;
    private String userId;
    private LogType type;
}

public enum LogType {
    INFO,
    WARN,
    ERROR,
    FATAL;
}

按消息数量分组

@Component
@RequiredArgsConstructor
public class LogWriterQueueHandlerByCount implements CorrelationStrategy {

    @Value("${messaging.log.writer.queue}")
    private String QUEUE_NAME;

    private final MessageConverter messageConverter;

    private final boolean QUEUE_AUTO_START = false;

    private final int BATCH_SIZE = 5;

    private final int BATCH_TIMEOUT = 1000;

    @Bean
    public IntegrationFlow logFlowByCount(ConnectionFactory connectionFactory) {
        return IntegrationFlow.from(
                        Amqp.inboundAdapter(connectionFactory, QUEUE_NAME) // listen to the queue
                                .messageConverter(messageConverter)
                                .autoStartup(QUEUE_AUTO_START))

                .aggregate(a -> a
                        .correlationStrategy(this, "getCorrelationKey")
                        .releaseStrategy(r -> r.size() == BATCH_SIZE) // complete the group after BATCH SIZE of messages
                        .expireGroupsUponCompletion(true) // don't wait to finish the group
                        .sendPartialResultOnExpiry(true) // send partial results if needed
                        .groupTimeout(BATCH_TIMEOUT)) // timeout groups after milliseconds
                .handle(this) // calls handleMessage method
                .get();
    }


    @Override
    public Object getCorrelationKey(Message<?> message) {
        // Integration flow is not allowed null values, so we need to set a default value
        if (message.getPayload() instanceof LogDto) {
            return 1;
        } else {
            throw new RuntimeException("Correlation has been failed");
        }
    }

    @ServiceActivator
    public void handleMessage(Collection<LogDto> aggregatedData) {
        System.out.println(MessageFormat.format("Collected Items {0}", aggregatedData));
    }
}

在本例中,我们设定了一个相关性策略,该策略将持续收集消息,直至满足以下任一条件:收到5条消息或时间过去1秒。Spring Integration通过相关键来定义消息组。在此示例中,我们使用getCorrelationKey方法来定义组键。由于我们的目标仅仅是按数量对消息进行分组,因此我们返回一个静态值1作为组键。

那么,如果我们希望按类型而非数量进行分组,该如何操作呢?其实非常简单!

按类型分组

@Component
@RequiredArgsConstructor
public class LogWriterQueueHandlerByType implements CorrelationStrategy {

    @Value("${messaging.log.writer.queue}")
    private String QUEUE_NAME;

    private final MessageConverter messageConverter;

    private final boolean QUEUE_AUTO_START = true;

    private final int BATCH_SIZE = 5;

    private final int BATCH_TIMEOUT = 1000;

    @Bean
    public IntegrationFlow logFlowByType(ConnectionFactory connectionFactory) {
        return IntegrationFlow.from(
                        Amqp.inboundAdapter(connectionFactory, QUEUE_NAME) // listen to the queue
                                .messageConverter(messageConverter)
                                .autoStartup(QUEUE_AUTO_START))

                .aggregate(a -> a
                        .correlationStrategy(this, "getCorrelationKey")
                        .releaseStrategy(r -> r.size() == BATCH_SIZE) // complete the group after BATCH SIZE of messages
                        .expireGroupsUponCompletion(true) // don't wait to finish the group
                        .sendPartialResultOnExpiry(true) // send partial results if needed
                        .groupTimeout(BATCH_TIMEOUT)) // timeout groups after milliseconds
                .handle(this) // calls handleMessage method
                .get();
    }


    @Override
    public Object getCorrelationKey(Message<?> message) {
        // Integration flow is not allowed null values, so we need to set a default value
        if (message.getPayload() instanceof LogDto) {
            var type = ((LogDto) message.getPayload()).getType();
            return Objects.requireNonNullElse(type, -1);
        } else {
            throw new RuntimeException("Correlation has been failed");
        }
    }

    @ServiceActivator
    public void handleMessage(Collection<LogDto> aggregatedData) {
        System.out.println(MessageFormat.format("Collected Items {0}", aggregatedData));
    }
}

正如我们所见,只需对getCorrelationKey方法稍作调整即可实现。我们从消息中提取类型值,并将其作为相关键使用。

总结

Spring Integration框架以其丰富的模式实现,极大地简化了企业级项目的开发。在本文中,我们探讨了如何利用Spring Integration创建相关性策略流,从而按照我们的需求对RabbitMQ消息进行灵活的分组和处理。期待在下一篇文章中与您继续探讨更多技术话题。

请登录后发表评论

    没有回复内容