在构建微服务应用时,您可能会遇到需要批量处理传入消息的场景,例如创建ETL记录或批量更新数据库等。Spring Integration正是为此类需求量身定制的解决方案。
Spring Integration作为Spring框架的扩展,提供了丰富的企业集成模式,使得Spring应用间的模块通信和消息处理变得简单高效。借助这一框架,您不仅可以轻松实现不同模块或服务间的通信,还能在消息处理前对其进行灵活的操控。
本文将通过实例详细展示如何利用Spring Integration对RabbitMQ消息进行分组和处理。假设我们正在开发一个日志收集服务,该服务需要逐条接收消息,并按数量或类型进行分组后再行处理。让我们深入探讨这一过程。
依赖项
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<version>3.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>6.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
<version>6.3.1</version>
</dependency>
消息结构
我们的日志消息包含message、userId和type属性。这些消息将通过RabbitMQ发送,并通过分组进行处理。其结构如下所示:
public class LogDto {
private String message;
private String userId;
private LogType type;
}
public enum LogType {
INFO,
WARN,
ERROR,
FATAL;
}
按消息数量分组
@Component
@RequiredArgsConstructor
public class LogWriterQueueHandlerByCount implements CorrelationStrategy {
@Value("${messaging.log.writer.queue}")
private String QUEUE_NAME;
private final MessageConverter messageConverter;
private final boolean QUEUE_AUTO_START = false;
private final int BATCH_SIZE = 5;
private final int BATCH_TIMEOUT = 1000;
@Bean
public IntegrationFlow logFlowByCount(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Amqp.inboundAdapter(connectionFactory, QUEUE_NAME) // listen to the queue
.messageConverter(messageConverter)
.autoStartup(QUEUE_AUTO_START))
.aggregate(a -> a
.correlationStrategy(this, "getCorrelationKey")
.releaseStrategy(r -> r.size() == BATCH_SIZE) // complete the group after BATCH SIZE of messages
.expireGroupsUponCompletion(true) // don't wait to finish the group
.sendPartialResultOnExpiry(true) // send partial results if needed
.groupTimeout(BATCH_TIMEOUT)) // timeout groups after milliseconds
.handle(this) // calls handleMessage method
.get();
}
@Override
public Object getCorrelationKey(Message<?> message) {
// Integration flow is not allowed null values, so we need to set a default value
if (message.getPayload() instanceof LogDto) {
return 1;
} else {
throw new RuntimeException("Correlation has been failed");
}
}
@ServiceActivator
public void handleMessage(Collection<LogDto> aggregatedData) {
System.out.println(MessageFormat.format("Collected Items {0}", aggregatedData));
}
}
在本例中,我们设定了一个相关性策略,该策略将持续收集消息,直至满足以下任一条件:收到5条消息或时间过去1秒。Spring Integration通过相关键来定义消息组。在此示例中,我们使用getCorrelationKey方法来定义组键。由于我们的目标仅仅是按数量对消息进行分组,因此我们返回一个静态值1作为组键。
那么,如果我们希望按类型而非数量进行分组,该如何操作呢?其实非常简单!
按类型分组
@Component
@RequiredArgsConstructor
public class LogWriterQueueHandlerByType implements CorrelationStrategy {
@Value("${messaging.log.writer.queue}")
private String QUEUE_NAME;
private final MessageConverter messageConverter;
private final boolean QUEUE_AUTO_START = true;
private final int BATCH_SIZE = 5;
private final int BATCH_TIMEOUT = 1000;
@Bean
public IntegrationFlow logFlowByType(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(
Amqp.inboundAdapter(connectionFactory, QUEUE_NAME) // listen to the queue
.messageConverter(messageConverter)
.autoStartup(QUEUE_AUTO_START))
.aggregate(a -> a
.correlationStrategy(this, "getCorrelationKey")
.releaseStrategy(r -> r.size() == BATCH_SIZE) // complete the group after BATCH SIZE of messages
.expireGroupsUponCompletion(true) // don't wait to finish the group
.sendPartialResultOnExpiry(true) // send partial results if needed
.groupTimeout(BATCH_TIMEOUT)) // timeout groups after milliseconds
.handle(this) // calls handleMessage method
.get();
}
@Override
public Object getCorrelationKey(Message<?> message) {
// Integration flow is not allowed null values, so we need to set a default value
if (message.getPayload() instanceof LogDto) {
var type = ((LogDto) message.getPayload()).getType();
return Objects.requireNonNullElse(type, -1);
} else {
throw new RuntimeException("Correlation has been failed");
}
}
@ServiceActivator
public void handleMessage(Collection<LogDto> aggregatedData) {
System.out.println(MessageFormat.format("Collected Items {0}", aggregatedData));
}
}
正如我们所见,只需对getCorrelationKey方法稍作调整即可实现。我们从消息中提取类型值,并将其作为相关键使用。
总结
Spring Integration框架以其丰富的模式实现,极大地简化了企业级项目的开发。在本文中,我们探讨了如何利用Spring Integration创建相关性策略流,从而按照我们的需求对RabbitMQ消息进行灵活的分组和处理。期待在下一篇文章中与您继续探讨更多技术话题。
没有回复内容