使用 Spring Data 实现事件通知模式

当实体被更新、删除或持久化时,系统会发布事件以通知其他系统这些变更。我们还将通过引入 DTO 对象来增强通知过程,从而避免获取更新后的数据。这一增强解决了事件通知模式事件溯源相比的一个缺点。

完整的应用程序代码可在 GitHub 上找到。

1. 实体监听器

首先,我们使用 @EntityListeners 为实体指定监听器类。以下示例展示了 Book 实体通过 BookEntityListener 类监听生命周期事件:

@Entity
@Table(name = "books")
@EntityListeners(BookEntityListener.class)
public class Book extends AbstractEntity {
   
   // fields
   
}
 

监听器类本身是一个 Spring Bean,使用 @Component 注解。它可以处理多个实体的事件,并使用统一的对象结构发布相应的事件。

@Component
@RequiredArgsConstructor
public class BookEntityListener {
    private final ApplicationEventQueue applicationEventQueue;

    @PostUpdate
    public void postUpdate(AbstractEntity entity) {
        switch (entity) {
            case Book book -> publishBookEvent(book, OperationType.UPDATE);
            default ->
                    log.error(...)
        }
    }

    @PostRemove
    public void postRemove(AbstractEntity entity) {
        // ...
    }

    @PostPersist
    public void postPersist(AbstractEntity entity) {
        // ...
    }

    private void publishBookEvent(Book book, OperationType operationType) {
        DataChangeEvent entityUpdated =
                DataChangeEvent.builder()
                        .eventName("book")
                        .id(book.getId())
                        .operationType(operationType)
                        .databaseVersion(book.getDatabaseVersion())
                        .build();
        applicationEventQueue.enqueue(entityUpdated);
    }
}

2. 事件队列

我们不会直接发布事件,而是将其加入队列,原因如下:

  1. 过滤同一事务中的重复事件:在同一事务中对同一实体操作的多个事件会被过滤,确保只发布最后一个事件。实体中的 @Version 字段在此非常有用。

  2. 在事务提交后发布事件:事件仅在事务成功提交到数据库后发布,避免了发送可能被回滚的变更事件的风险。

ApplicationEventQueue 类使用 ThreadLocal 安全地存储当前线程中的事件。

@Component
@Getter
@Setter
@Slf4j
public class ApplicationEventQueue {

    private static final ThreadLocal<Set<DataChangeEvent>> events =
            ThreadLocal.withInitial(HashSet::new);

    public void enqueue(DataChangeEvent event) {
        events.get().add(event);
    }

    public void clear() {
        events.remove();
    }

    public Set<DataChangeEvent> consumeEvents() {
        Set<DataChangeEvent> allEvents = filterByLatestDatabaseVersion(events.get());
        this.clear();
        return allEvents;
    }

    private Set<DataChangeEvent> filterByLatestDatabaseVersion(
            Set<DataChangeEvent> dataChangeEventSet) {
        // return events filtered;
    }

}
 

虽然 @RequestScope Bean 可以工作,但当事件在请求范围外创建时(例如在 @Async 方法中),它会失效。ThreadLocal 提供了线程安全的替代方案,但需要谨慎管理。特别是,我们必须在事件处理完毕后清除存储的事件,以避免事件在请求之间泄漏。

此外,Oracle 不建议池化虚拟线程,因此在使用轻量级虚拟线程时,ThreadLocal 是一个安全的选择。详情请参阅 Oracle 关于虚拟线程的文档


3. 清除 ThreadLocal 的拦截器

为了防止在线程池化时事件泄漏,我们在请求完成后清除 ThreadLocal。这可以通过拦截器实现:

@RequiredArgsConstructor
@Slf4j
public class ThreadLocalClearingInterceptor implements HandlerInterceptor {

    private final ApplicationEventQueue applicationEventQueue;

    @Override
    public void afterCompletion(
            HttpServletRequest request,
            HttpServletResponse response,
            Object handler,
            Exception ex) {
        log.trace("Clearing thread local request events");
        applicationEventQueue.clear(); // Clear ThreadLocal events after request completion
    }
}
 

在 WebMvcConfigurer 中注册拦截器:

@Configuration
@RequiredArgsConstructor
@Slf4j
public class WebConfig implements WebMvcConfigurer {

    private final ApplicationEventQueue applicationEventQueue;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        log.debug("WebConfig::addInterceptors()");
        registry.addInterceptor(new ThreadLocalClearingInterceptor(applicationEventQueue));
    }
}

4. 事务同步的事件通知

TransactionSynchronizationAspect 确保事件仅在事务提交后发送。同步逻辑避免了为失败的事务发送事件。

该切面围绕所有使用 @Transactional 注解的方法执行,包括类级别注解的方法。切面在事务提交后立即注册事件通知。每个事务都会注册一个新的同步。

@Aspect
@Component
@ConditionalOnProperty(value = "events.notification-enabled", havingValue = "true")
public class TransactionSynchronizationAspect {

    private final ApplicationEventQueue applicationEventQueue;
    private final ApplicationEventPublisher springEventPublisher;

    @Before(
            "execution(* (@org.springframework.transaction.annotation.Transactional *).*(..)) || "
                    + "@annotation(org.springframework.transaction.annotation.Transactional)")
    public void beforeWriteEndpoint(JoinPoint joinPoint) throws Throwable {
        if (isReadOnlyTransaction(joinPoint)) {
            return;
        }
        if (!TransactionSynchronizationManager.isSynchronizationActive()) {
            return;
        }
        boolean alreadyRegistered =
                TransactionSynchronizationManager.getSynchronizations().stream()
                        .anyMatch(DataChangeEventSynchronization.class::isInstance);
        if (alreadyRegistered) {
            return;
        }
        TransactionSynchronizationManager.registerSynchronization(
                new DataChangeEventSynchronization());
    }

    private boolean isReadOnlyTransaction(JoinPoint joinPoint) throws NoSuchMethodException {
        Method method = getTargetMethod(joinPoint);
        Transactional transactional = method.getAnnotation(Transactional.class);
        if (transactional == null) {
            transactional = joinPoint.getTarget().getClass().getAnnotation(Transactional.class);
        }
        return transactional != null && transactional.readOnly();
    }

    private Method getTargetMethod(JoinPoint joinPoint) throws NoSuchMethodException {
        Method signatureMethod = ((MethodSignature) joinPoint.getSignature()).getMethod();
        return joinPoint
                .getTarget()
                .getClass()
                .getMethod(signatureMethod.getName(), signatureMethod.getParameterTypes());
    }

    private class DataChangeEventSynchronization implements TransactionSynchronization {

        private void publishEvents() {
            Set<DataChangeEvent> eventsToPublish = applicationEventQueue.consumeEvents();
            if (CollectionUtils.isEmpty(eventsToPublish)) {
                return;
            }
            for (DataChangeEvent event : eventsToPublish) {
                springEventPublisher.publishEvent(event);
            }
        }

        @Override
        public void afterCommit() {
            publishEvents();
        }
    }
}
 

由于切面在事务提交后执行,它确保仅在事务成功时发送事件。如果事务失败,事件不会被发送,且变更会被回滚。

5. 使用 DTO 响应的事件通知

此增强通过 ResponseBodyAdvice 实现,它在响应体发送到客户端之前拦截响应体。由于事件在事务提交后发送,数据保证已持久化到数据库中。

@ControllerAdvice
@RequiredArgsConstructor
@ConditionalOnProperty(value = "events.notification-response-enabled", havingValue = "true")
public class EventNotificationResponseBodyAdvice implements ResponseBodyAdvice<Object> {

    private final ApplicationEventQueue applicationEventQueue;
    private final ApplicationEventPublisher springEventPublisher;

    @Override
    public boolean supports(
            MethodParameter returnType, Class<? extends HttpMessageConverter<?>> converterType) {
        return true;
    }

    @Override
    public Object beforeBodyWrite(
            Object body,
            MethodParameter returnType,
            MediaType selectedContentType,
            Class<? extends HttpMessageConverter<?>> selectedConverterType,
            ServerHttpRequest request,
            ServerHttpResponse response) {

        if (isWriteMethod(request.getMethod())) {
            try {
                this.publishEvents(body);
            } catch (Exception e) {
                log.error("Error while sending spring events", e);
            }
        }

        return body;
    }

    private void publishEvents(Object body) {

        Set<DataChangeEvent> eventsToPublish = applicationEventQueue.consumeEvents();
        if (eventsToPublish.isEmpty()) {
            return;
        }
        for (DataChangeEvent event : eventsToPublish) {
            event.setBody(body);
            springEventPublisher.publishEvent(event);
        }
    }
}

6. 消费事件

可以使用 @EventListener 注解消费 DataChangeEvent 事件。通过添加 @Async,事件监听器会异步运行,避免阻塞主线程。

@Service
@RequiredArgsConstructor
@Slf4j
public class UpdateEventListener {

    @Async
    @EventListener
    public void on(DataChangeEvent event) {
        // do something with the event, e.g. update a cache, use a message broker, etc.
    }

}

事件被消费后,可以触发各种适合应用程序需求的操作。

此外,Spring Modulith 提供了一种无缝的方式来外部化事件。要了解更多关于使用 Spring Modulith 外部化事件的信息,请查看 Spring Modulith 简化事件外部化


7. 替代方案与结论

Debezium 是一个开源平台,实现了变更数据捕获(CDC)模式,直接从数据库事务日志中捕获实时变更。Debezium 使“轻松”实现事件驱动架构成为可能。

请登录后发表评论

    没有回复内容