当实体被更新、删除或持久化时,系统会发布事件以通知其他系统这些变更。我们还将通过引入 DTO 对象来增强通知过程,从而避免获取更新后的数据。这一增强解决了事件通知模式与事件溯源相比的一个缺点。
完整的应用程序代码可在 GitHub 上找到。
1. 实体监听器
首先,我们使用 @EntityListeners
为实体指定监听器类。以下示例展示了 Book
实体通过 BookEntityListener
类监听生命周期事件:
@Entity
@Table(name = "books")
@EntityListeners(BookEntityListener.class)
public class Book extends AbstractEntity {
// fields
}
监听器类本身是一个 Spring Bean,使用 @Component
注解。它可以处理多个实体的事件,并使用统一的对象结构发布相应的事件。
@Component
@RequiredArgsConstructor
public class BookEntityListener {
private final ApplicationEventQueue applicationEventQueue;
@PostUpdate
public void postUpdate(AbstractEntity entity) {
switch (entity) {
case Book book -> publishBookEvent(book, OperationType.UPDATE);
default ->
log.error(...)
}
}
@PostRemove
public void postRemove(AbstractEntity entity) {
// ...
}
@PostPersist
public void postPersist(AbstractEntity entity) {
// ...
}
private void publishBookEvent(Book book, OperationType operationType) {
DataChangeEvent entityUpdated =
DataChangeEvent.builder()
.eventName("book")
.id(book.getId())
.operationType(operationType)
.databaseVersion(book.getDatabaseVersion())
.build();
applicationEventQueue.enqueue(entityUpdated);
}
}
2. 事件队列
我们不会直接发布事件,而是将其加入队列,原因如下:
-
过滤同一事务中的重复事件:在同一事务中对同一实体操作的多个事件会被过滤,确保只发布最后一个事件。实体中的
@Version
字段在此非常有用。 -
在事务提交后发布事件:事件仅在事务成功提交到数据库后发布,避免了发送可能被回滚的变更事件的风险。
ApplicationEventQueue
类使用 ThreadLocal
安全地存储当前线程中的事件。
@Component
@Getter
@Setter
@Slf4j
public class ApplicationEventQueue {
private static final ThreadLocal<Set<DataChangeEvent>> events =
ThreadLocal.withInitial(HashSet::new);
public void enqueue(DataChangeEvent event) {
events.get().add(event);
}
public void clear() {
events.remove();
}
public Set<DataChangeEvent> consumeEvents() {
Set<DataChangeEvent> allEvents = filterByLatestDatabaseVersion(events.get());
this.clear();
return allEvents;
}
private Set<DataChangeEvent> filterByLatestDatabaseVersion(
Set<DataChangeEvent> dataChangeEventSet) {
// return events filtered;
}
}
虽然 @RequestScope
Bean 可以工作,但当事件在请求范围外创建时(例如在 @Async
方法中),它会失效。ThreadLocal
提供了线程安全的替代方案,但需要谨慎管理。特别是,我们必须在事件处理完毕后清除存储的事件,以避免事件在请求之间泄漏。
此外,Oracle 不建议池化虚拟线程,因此在使用轻量级虚拟线程时,ThreadLocal
是一个安全的选择。详情请参阅 Oracle 关于虚拟线程的文档。
3. 清除 ThreadLocal 的拦截器
为了防止在线程池化时事件泄漏,我们在请求完成后清除 ThreadLocal
。这可以通过拦截器实现:
@RequiredArgsConstructor
@Slf4j
public class ThreadLocalClearingInterceptor implements HandlerInterceptor {
private final ApplicationEventQueue applicationEventQueue;
@Override
public void afterCompletion(
HttpServletRequest request,
HttpServletResponse response,
Object handler,
Exception ex) {
log.trace("Clearing thread local request events");
applicationEventQueue.clear(); // Clear ThreadLocal events after request completion
}
}
在 WebMvcConfigurer
中注册拦截器:
@Configuration
@RequiredArgsConstructor
@Slf4j
public class WebConfig implements WebMvcConfigurer {
private final ApplicationEventQueue applicationEventQueue;
@Override
public void addInterceptors(InterceptorRegistry registry) {
log.debug("WebConfig::addInterceptors()");
registry.addInterceptor(new ThreadLocalClearingInterceptor(applicationEventQueue));
}
}
4. 事务同步的事件通知
TransactionSynchronizationAspect
确保事件仅在事务提交后发送。同步逻辑避免了为失败的事务发送事件。
该切面围绕所有使用 @Transactional
注解的方法执行,包括类级别注解的方法。切面在事务提交后立即注册事件通知。每个事务都会注册一个新的同步。
@Aspect
@Component
@ConditionalOnProperty(value = "events.notification-enabled", havingValue = "true")
public class TransactionSynchronizationAspect {
private final ApplicationEventQueue applicationEventQueue;
private final ApplicationEventPublisher springEventPublisher;
@Before(
"execution(* (@org.springframework.transaction.annotation.Transactional *).*(..)) || "
+ "@annotation(org.springframework.transaction.annotation.Transactional)")
public void beforeWriteEndpoint(JoinPoint joinPoint) throws Throwable {
if (isReadOnlyTransaction(joinPoint)) {
return;
}
if (!TransactionSynchronizationManager.isSynchronizationActive()) {
return;
}
boolean alreadyRegistered =
TransactionSynchronizationManager.getSynchronizations().stream()
.anyMatch(DataChangeEventSynchronization.class::isInstance);
if (alreadyRegistered) {
return;
}
TransactionSynchronizationManager.registerSynchronization(
new DataChangeEventSynchronization());
}
private boolean isReadOnlyTransaction(JoinPoint joinPoint) throws NoSuchMethodException {
Method method = getTargetMethod(joinPoint);
Transactional transactional = method.getAnnotation(Transactional.class);
if (transactional == null) {
transactional = joinPoint.getTarget().getClass().getAnnotation(Transactional.class);
}
return transactional != null && transactional.readOnly();
}
private Method getTargetMethod(JoinPoint joinPoint) throws NoSuchMethodException {
Method signatureMethod = ((MethodSignature) joinPoint.getSignature()).getMethod();
return joinPoint
.getTarget()
.getClass()
.getMethod(signatureMethod.getName(), signatureMethod.getParameterTypes());
}
private class DataChangeEventSynchronization implements TransactionSynchronization {
private void publishEvents() {
Set<DataChangeEvent> eventsToPublish = applicationEventQueue.consumeEvents();
if (CollectionUtils.isEmpty(eventsToPublish)) {
return;
}
for (DataChangeEvent event : eventsToPublish) {
springEventPublisher.publishEvent(event);
}
}
@Override
public void afterCommit() {
publishEvents();
}
}
}
由于切面在事务提交后执行,它确保仅在事务成功时发送事件。如果事务失败,事件不会被发送,且变更会被回滚。
5. 使用 DTO 响应的事件通知
此增强通过 ResponseBodyAdvice
实现,它在响应体发送到客户端之前拦截响应体。由于事件在事务提交后发送,数据保证已持久化到数据库中。
@ControllerAdvice
@RequiredArgsConstructor
@ConditionalOnProperty(value = "events.notification-response-enabled", havingValue = "true")
public class EventNotificationResponseBodyAdvice implements ResponseBodyAdvice<Object> {
private final ApplicationEventQueue applicationEventQueue;
private final ApplicationEventPublisher springEventPublisher;
@Override
public boolean supports(
MethodParameter returnType, Class<? extends HttpMessageConverter<?>> converterType) {
return true;
}
@Override
public Object beforeBodyWrite(
Object body,
MethodParameter returnType,
MediaType selectedContentType,
Class<? extends HttpMessageConverter<?>> selectedConverterType,
ServerHttpRequest request,
ServerHttpResponse response) {
if (isWriteMethod(request.getMethod())) {
try {
this.publishEvents(body);
} catch (Exception e) {
log.error("Error while sending spring events", e);
}
}
return body;
}
private void publishEvents(Object body) {
Set<DataChangeEvent> eventsToPublish = applicationEventQueue.consumeEvents();
if (eventsToPublish.isEmpty()) {
return;
}
for (DataChangeEvent event : eventsToPublish) {
event.setBody(body);
springEventPublisher.publishEvent(event);
}
}
}
6. 消费事件
可以使用 @EventListener
注解消费 DataChangeEvent
事件。通过添加 @Async
,事件监听器会异步运行,避免阻塞主线程。
@Service
@RequiredArgsConstructor
@Slf4j
public class UpdateEventListener {
@Async
@EventListener
public void on(DataChangeEvent event) {
// do something with the event, e.g. update a cache, use a message broker, etc.
}
}
事件被消费后,可以触发各种适合应用程序需求的操作。
此外,Spring Modulith 提供了一种无缝的方式来外部化事件。要了解更多关于使用 Spring Modulith 外部化事件的信息,请查看 Spring Modulith 简化事件外部化。
7. 替代方案与结论
Debezium 是一个开源平台,实现了变更数据捕获(CDC)模式,直接从数据库事务日志中捕获实时变更。Debezium 使“轻松”实现事件驱动架构成为可能。
没有回复内容