关于 Spring For All

关于 Spring For All

Spring For All 的一切
最新动态

最新动态

Spring 5 会是咋样呢
Spring Boot

Spring Boot

快速构建并运行 Spring 应用程序
Spring Cloud

Spring Cloud

分布式系统的一套工具,可用于构建微服务
Spring Framework

Spring Framework

提供依赖注入、事务、Web应用、数据访问等模块
Spring Data

Spring Data

提供一致性数据访问模块
Spring Security

Spring Security

提供应用身份验证和授权支持
Spring Batch

Spring Batch

提供高容批处理操作模块
Spring AMQP

Spring AMQP

基于AMQP消息解决方案
Micro Service Arch.

Micro Service Arch.

微服务架构相关
开源项目及视频教程

开源项目及视频教程

做中国最好的 Spring 开源项目及视频教程
小马哥专栏

小马哥专栏

阿里技术布道者,专注 Spring Boot 及微服务

spring amqp消息发送成功,消息确认异常,急求解决

回复

zuoyc 发起了问题 • 1 人关注 • 0 个回复 • 160 次浏览 • 2017-08-28 10:50 • 来自相关话题

我想问下大家MQ事物的原子性是怎么控制的?

xiaobaxi 回复了问题 • 2 人关注 • 1 个回复 • 258 次浏览 • 2017-07-18 23:15 • 来自相关话题

RabbitMQ中的消息属性中的type、messageId、correlationId三个属性分别是干啥的?

xiaobaxi 回复了问题 • 3 人关注 • 1 个回复 • 365 次浏览 • 2017-07-11 11:06 • 来自相关话题

kafka问题

JThink 回复了问题 • 2 人关注 • 1 个回复 • 288 次浏览 • 2017-06-16 15:36 • 来自相关话题

如何使用消息解决分布式事务问题

xiaobaxi 回复了问题 • 2 人关注 • 1 个回复 • 492 次浏览 • 2017-06-13 16:42 • 来自相关话题

RabbitMQ 的消息持久化与 Spring AMQP 的实现剖析

梁桂钊 发表了文章 • 0 个评论 • 672 次浏览 • 2017-06-12 08:21 • 来自相关话题

要从奔溃的 RabbitMQ 中恢复的消息,我们需要做消息持久化。如果消息要从 RabbitMQ 奔溃中恢复,那么必须满足三点,且三者缺一不可。
交换器必须是持久化。队列必须是持久化的。消息必须是持久化的。

原生的实现方式

原生的 RabbitMQ 客户端需要完成三个步骤。

第一步,交换器的持久化。$(document).ready(function() {$('pre code').each(function(i, block) { hljs.highlightBlock( block); }); });// 参数1 exchange :交换器名
// 参数2 type :交换器类型
// 参数3 durable :是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
第二步,队列的持久化。// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
第三步,消息的持久化。// 参数1 exchange :交换器
// 参数2 routingKey : 路由键
// 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
// 参数4 body : 消息体
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
Spring AMQP 的实现方式

Spring AMQP 是对原生的 RabbitMQ 客户端的封装。一般情况下,我们只需要定义交换器的持久化和队列的持久化。

其中,交换器的持久化配置如下。// 参数1 name :交互器名
// 参数2 durable :是否持久化
// 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列
new TopicExchange(name, durable, autoDelete)
此外,还需要再配置队列的持久化。// 参数1 name :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
new Queue(name, durable, exclusive, autoDelete);
至此,RabbitMQ 的消息持久化配置完毕。

那么,消息的持久化难道不需要配置么?确实如此,我们来看下源码。

一般情况下,我们会通过这种方式发送消息。rabbitTemplate.convertAndSend(exchange, routeKey, message);
其中,调用了 convertAndSend(String exchange, String routingKey, final Object object) 方法。@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}
接着,用调用了 convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) 方法。public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}
此时,最关键的方法出现了,它是 convertMessageIfNecessary(final Object object)。protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}
其中,关键的是 MessageProperties 类,它持久化的策略是 MessageDeliveryMode.PERSISTENT,因此它会初始化时默认消息是持久化的。public class MessageProperties implements Serializable {
public MessageProperties() {
this.deliveryMode = DEFAULT_DELIVERY_MODE;
this.priority = DEFAULT_PRIORITY;
}

static {
DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
DEFAULT_PRIORITY = Integer.valueOf(0);
}

(完)
 

本文作者: 梁桂钊
原文链接: http://blog.720ui.com/2017/rab ... able/
版权归作者所有,转载请注明出处 查看全部
要从奔溃的 RabbitMQ 中恢复的消息,我们需要做消息持久化。如果消息要从 RabbitMQ 奔溃中恢复,那么必须满足三点,且三者缺一不可。
  1. 交换器必须是持久化。
  2. 队列必须是持久化的。
  3. 消息必须是持久化的。


原生的实现方式

原生的 RabbitMQ 客户端需要完成三个步骤。

第一步,交换器的持久化。
// 参数1 exchange :交换器名
// 参数2 type :交换器类型
// 参数3 durable :是否持久化
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

第二步,队列的持久化。
// 参数1 queue :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
// 参数5 arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

第三步,消息的持久化。
// 参数1 exchange :交换器
// 参数2 routingKey : 路由键
// 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化
// 参数4 body : 消息体
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

Spring AMQP 的实现方式

Spring AMQP 是对原生的 RabbitMQ 客户端的封装。一般情况下,我们只需要定义交换器的持久化和队列的持久化。

其中,交换器的持久化配置如下。
// 参数1 name :交互器名
// 参数2 durable :是否持久化
// 参数3 autoDelete :当所有消费客户端连接断开后,是否自动删除队列
new TopicExchange(name, durable, autoDelete)

此外,还需要再配置队列的持久化。
// 参数1 name :队列名
// 参数2 durable :是否持久化
// 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除
// 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列
new Queue(name, durable, exclusive, autoDelete);

至此,RabbitMQ 的消息持久化配置完毕。

那么,消息的持久化难道不需要配置么?确实如此,我们来看下源码。

一般情况下,我们会通过这种方式发送消息。
rabbitTemplate.convertAndSend(exchange, routeKey, message);

其中,调用了 convertAndSend(String exchange, String routingKey, final Object object) 方法。
@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}

接着,用调用了 convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) 方法。
public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData) throws AmqpException {
send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
}

此时,最关键的方法出现了,它是 convertMessageIfNecessary(final Object object)。
protected Message convertMessageIfNecessary(final Object object) {
if (object instanceof Message) {
return (Message) object;
}
return getRequiredMessageConverter().toMessage(object, new MessageProperties());
}

其中,关键的是 MessageProperties 类,它持久化的策略是 MessageDeliveryMode.PERSISTENT,因此它会初始化时默认消息是持久化的。
public class MessageProperties implements Serializable {
public MessageProperties() {
this.deliveryMode = DEFAULT_DELIVERY_MODE;
this.priority = DEFAULT_PRIORITY;
}

static {
DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
DEFAULT_PRIORITY = Integer.valueOf(0);
}
}
 
(完)
 


本文作者: 梁桂钊
原文链接: http://blog.720ui.com/2017/rab ... able/
版权归作者所有,转载请注明出处


Spring AMQP 实战 - 整合 RabbitMQ 发送邮件

梁桂钊 发表了文章 • 6 个评论 • 860 次浏览 • 2017-06-12 08:15 • 来自相关话题

这篇文章,我们开始 Spring AMQP 项目实战旅程。

介绍

通过这个项目实战旅程,你会学习到如何使用 Spring Boot 整合 Spring AMQP,并且使用 RabbitMQ 的消息队列机制发送邮件。其中,消息生产者负责将用户的邮件消息发送至消息队列,而消息消费者从消息队列中获取邮件消息进行发送。这个过程,你可以理解成邮局:当你将要发布的邮件放在邮箱中时,您可以确信邮差最终会将邮件发送给收件人。

准备

本教程假定 RabbitMQ 已在标准端口(5672) 的 localhost 上安装并运行。如果使用不同的主机,端口,连接设置将需要调整。host = localhost
username = guest
password = guest
port = 5672
vhost = /
实战旅程
准备工作

这个实战教程会构建两个工程项目:email-server-producer 与 email-server-consumer。其中,email-server-producer 是消息生产者工程,email-server-consumer 是消息消费者工程。

在教程的最后,我会将完整的代码提交至 github 上面,你可以结合源码来阅读这个教程,会有更好的效果。

现在开始旅程吧。我们使用 Spring Boot 整合 Spring AMQP,并通过 Maven 构建依赖关系。(由于篇幅的问题,我并不会粘贴完整的 pom.xml 配置信息,你可以在 github 源码中查看完整的配置文件)
 
<dependencies>
<!-- spring boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>

<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>${javax.mail.version}</version>
</dependency>

</dependencies>

构建消息生产者






我们使用 Java Config 的方式配置消息生产者。@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
@Autowired
private Environment env;

@Bean
public ConnectionFactory connectionFactory() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(env.getProperty("mq.host").trim());
connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
connectionFactory.setUsername(env.getProperty("mq.username").trim());
connectionFactory.setPassword(env.getProperty("mq.password").trim());
return connectionFactory;
}

@Bean
public CachingConnectionFactory cachingConnectionFactory() throws Exception {
return new CachingConnectionFactory(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() throws Exception {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}

@Bean
public AmqpAdmin amqpAdmin() throws Exception {
return new RabbitAdmin(cachingConnectionFactory());
}

@Bean
Queue queue() {
String name = env.getProperty("mq.queue").trim();
// 是否持久化
boolean durable = StringUtils.isNotBlank(env.getProperty("mq.queue.durable").trim())?
Boolean.valueOf(env.getProperty("mq.queue.durable").trim()) : true;
// 仅创建者可以使用的私有队列,断开后自动删除
boolean exclusive = StringUtils.isNotBlank(env.getProperty("mq.queue.exclusive").trim())?
Boolean.valueOf(env.getProperty("mq.queue.exclusive").trim()) : false;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.queue.autoDelete").trim())?
Boolean.valueOf(env.getProperty("mq.queue.autoDelete").trim()) : false;
return new Queue(name, durable, exclusive, autoDelete);
}

@Bean
TopicExchange exchange() {
String name = env.getProperty("mq.exchange").trim();
// 是否持久化
boolean durable = StringUtils.isNotBlank(env.getProperty("mq.exchange.durable").trim())?
Boolean.valueOf(env.getProperty("mq.exchange.durable").trim()) : true;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.exchange.autoDelete").trim())?
Boolean.valueOf(env.getProperty("mq.exchange.autoDelete").trim()) : false;
return new TopicExchange(name, durable, autoDelete);
}

@Bean
Binding binding() {
String routekey = env.getProperty("mq.routekey").trim();
return BindingBuilder.bind(queue()).to(exchange()).with(routekey);
}
}
其中,定义了队列、交换器,以及绑定。事实上,通过这种方式当队列或交换器不存在的时候,Spring AMQP 会自动创建它。(如果你不希望自动创建,可以在 RabbitMQ 的管理后台开通队列和交换器,并注释掉 queue() 方法和 exchange() 方法)。此外,我们为了更好地扩展,将创建队列或交换器的配置信息抽离到了配置文件 application.properties。其中,还包括 RabbitMQ 的配置信息。mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/

mq.exchange=email_exchange
mq.exchange.durable=true
mq.exchange.autoDelete=false

mq.queue=email_queue
mq.queue.durable=true
mq.queue.exclusive=false
mq.queue.autoDelete=false

mq.routekey=email_routekey
此外,假设一个生产者发送到一个交换器,而一个消费者从一个队列接收消息。此时,将队列绑定到交换器对于连接这些生产者和消费者至关重要。在 Spring AMQP 中,我们定义一个 Binding 类来表示这些连接。我们使用 BindingBuilder 来构建 “流式的 API” 风格。BindingBuilder.bind(queue()).to(exchange()).with(routekey);
现在,我们离大功告成已经很近了,需要再定义一个发送邮件任务存入消息队列的方法。此时,为了更好地扩展,我们定义一个接口和一个实现类,基于接口编程嘛。public interface EmailService {
/**
* 发送邮件任务存入消息队列
* @param message
* @throws Exception
*/
void sendEmail(String message) throws Exception;
}
它的实现类中重写 sendEmail() 方法,将消息转码并写入到消息队列中。@Service
public class EmailServiceImpl implements EmailService{
private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class);

@Resource( name = "rabbitTemplate" )
private RabbitTemplate rabbitTemplate;

@Value("${mq.exchange}")
private String exchange;

@Value("${mq.routekey}")
private String routeKey;

@Override
public void sendEmail(String message) throws Exception {
try {
rabbitTemplate.convertAndSend(exchange, routeKey, message);
}catch (Exception e){
logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e));
}
}
}
那么,我们再模拟一个 RESTful API 接口调用的场景,来模拟真实的场景。@RestController()
@RequestMapping(value = "/v1/emails")
public class EmailController {

@Resource
private EmailService emailService;

@RequestMapping(method = RequestMethod.POST)
public JSONObject add(@RequestBody JSONObject jsonObject) throws Exception {
emailService.sendEmail(jsonObject.toJSONString());
return jsonObject;
}
}
最后,再写一个 main 方法,将 Spring Boot 服务运行起来吧。@RestController
@EnableAutoConfiguration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class WebMain {

public static void main(String[] args) throws Exception {
SpringApplication.run(WebMain.class, args);
}
}
至此,已经大功告成了。我们可以通过 Postman 发送一个 HTTP 请求。(Postman是一款功能强大的网页调试与发送网页HTTP请求的Chrome插件。){
"to":"lianggzone@163.com",
"subject":"email-server-producer",
"text":"<html><head></head><body><h1>邮件测试</h1><p>hello!this is mail test。</p></body></html>"
}
请参见图示。






来看看 RabbitMQ 的管理后台吧,它会出现一个未处理的消息。(地址:http://localhost:15672/#/queues) 
 






注意的是,千万别向我的邮箱发测试消息哟,不然我的邮箱会邮件爆炸的/(ㄒoㄒ)/~~。

构建消息消费者






完成消息生产者之后,我们再来构建一个消息消费者的工程。同样地,我们使用 Java Config 的方式配置消息消费者。@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
@Autowired
private Environment env;

@Bean
public ConnectionFactory connectionFactory() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(env.getProperty("mq.host").trim());
connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
connectionFactory.setUsername(env.getProperty("mq.username").trim());
connectionFactory.setPassword(env.getProperty("mq.password").trim());
return connectionFactory;
}

@Bean
public CachingConnectionFactory cachingConnectionFactory() throws Exception {
return new CachingConnectionFactory(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() throws Exception {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}

@Bean
public AmqpAdmin amqpAdmin() throws Exception {
return new RabbitAdmin(cachingConnectionFactory());
}

@Bean
public SimpleMessageListenerContainer listenerContainer(
@Qualifier("mailMessageListenerAdapter") MailMessageListenerAdapter mailMessageListenerAdapter) throws Exception {
String queueName = env.getProperty("mq.queue").trim();

SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(cachingConnectionFactory());
simpleMessageListenerContainer.setQueueNames(queueName);
simpleMessageListenerContainer.setMessageListener(mailMessageListenerAdapter);
// 设置手动 ACK
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return simpleMessageListenerContainer;
}
}
聪明的你,应该发现了其中的不同。这个代码中多了一个 listenerContainer() 方法。是的,它是一个监听器容器,用来监听消息队列进行消息处理的。注意的是,我们这里设置手动 ACK 的方式。默认的情况下,它采用自动应答,这种方式中消息队列会发送消息后立即从消息队列中删除该消息。此时,我们通过手动 ACK 方式,如果消费者因宕机或链接失败等原因没有发送 ACK,RabbitMQ 会将消息重新发送给其他监听在队列的下一个消费者,保证消息的可靠性。

当然,我们也定义 application.properties 配置文件。mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/

mq.queue=email_queue
此外,我们创建了一个 MailMessageListenerAdapter 类来消费消息。@Component("mailMessageListenerAdapter")
public class MailMessageListenerAdapter extends MessageListenerAdapter {

@Resource
private JavaMailSender mailSender;

@Value("${mail.username}")
private String mailUsername;

@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// 解析RabbitMQ消息体
String messageBody = new String(message.getBody());
MailMessageModel mailMessageModel = JSONObject.toJavaObject(JSONObject.parseObject(messageBody), MailMessageModel.class);
// 发送邮件
String to = mailMessageModel.getTo();
String subject = mailMessageModel.getSubject();
String text = mailMessageModel.getText();
sendHtmlMail(to, subject, text);
// 手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
e.printStackTrace();
}
}

/**
* 发送邮件
* @param to
* @param subject
* @param text
* @throws Exception
*/
private void sendHtmlMail(String to, String subject, String text) throws Exception {
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage);
mimeMessageHelper.setFrom(mailUsername);
mimeMessageHelper.setTo(to);
mimeMessageHelper.setSubject(subject);
mimeMessageHelper.setText(text, true);
// 发送邮件
mailSender.send(mimeMessage);
}
}
在 onMessage() 方法中,我们完成了三件事情:
从 RabbitMQ 的消息队列中解析消息体。根据消息体的内容,发送邮件给目标的邮箱。手动应答 ACK,让消息队列删除该消息。

这里,JSONObject.toJavaObject() 方法使用 fastjson 将 json 字符串转换成实体对象 MailMessageModel。注意的是,@Data 是 lombok 类库的一个注解。@Data
public class MailMessageModel {
@JSONField(name = "from")
private String from;

@JSONField(name = "to")
private String to;

@JSONField(name = "subject")
private String subject;

@JSONField(name = "text")
private String text;

@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("Email{from:").append(this.from).append(", ");
sb.append("to:").append(this.to).append(", ");
sb.append("subject:").append(this.subject).append(", ");
sb.append("text:").append(this.text).append("}");
return sb.toString();
}
}
Spring 对 Java Mail 有很好的支持。其中,邮件包括几种类型:简单文本的邮件、 HTML 文本的邮件、 内嵌图片的邮件、 包含附件的邮件。这里,我们封装了一个简单的 sendHtmlMail() 进行邮件发送。

对了,我们还少了一个邮件的配置类。@Configuration
@PropertySource(value = {"classpath:mail.properties"})
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class EmailConfig {
@Autowired
private Environment env;

@Bean(name = "mailSender")
public JavaMailSender mailSender() {
// 创建邮件发送器, 主要提供了邮件发送接口、透明创建Java Mail的MimeMessage、及邮件发送的配置
JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
// 如果为普通邮箱, 非ssl认证等
mailSender.setHost(env.getProperty("mail.host").trim());
mailSender.setPort(Integer.parseInt(env.getProperty("mail.port").trim()));
mailSender.setUsername(env.getProperty("mail.username").trim());
mailSender.setPassword(env.getProperty("mail.password").trim());
mailSender.setDefaultEncoding("utf-8");
// 配置邮件服务器
Properties props = new Properties();
// 让服务器进行认证,认证用户名和密码是否正确
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.timeout", "25000");
mailSender.setJavaMailProperties(props);
return mailSender;
}
}
这些配置信息,我们在配置文件 mail.properties 中维护。mail.host=smtp.163.com
mail.port=25
mail.username=用户名
mail.password=密码
最后,我们写一个 main 方法,将 Spring Boot 服务运行起来吧。

至此,我们也完成了一个消息消费者的工程,它将不断地从消息队列中处理邮件消息。

源代码

相关示例完整代码: https://github.com/lianggzone/rabbitmq-server 
 
(完)
 

本文作者: 梁桂钊
原文链接: http://blog.720ui.com/2017/rabbitmq_action_email/
版权归作者所有,转载请注明出处 查看全部
这篇文章,我们开始 Spring AMQP 项目实战旅程。

介绍

通过这个项目实战旅程,你会学习到如何使用 Spring Boot 整合 Spring AMQP,并且使用 RabbitMQ 的消息队列机制发送邮件。其中,消息生产者负责将用户的邮件消息发送至消息队列,而消息消费者从消息队列中获取邮件消息进行发送。这个过程,你可以理解成邮局:当你将要发布的邮件放在邮箱中时,您可以确信邮差最终会将邮件发送给收件人。

准备

本教程假定 RabbitMQ 已在标准端口(5672) 的 localhost 上安装并运行。如果使用不同的主机,端口,连接设置将需要调整。
host = localhost
username = guest
password = guest
port = 5672
vhost = /

实战旅程
准备工作

这个实战教程会构建两个工程项目:email-server-producer 与 email-server-consumer。其中,email-server-producer 是消息生产者工程,email-server-consumer 是消息消费者工程。

在教程的最后,我会将完整的代码提交至 github 上面,你可以结合源码来阅读这个教程,会有更好的效果。

现在开始旅程吧。我们使用 Spring Boot 整合 Spring AMQP,并通过 Maven 构建依赖关系。(由于篇幅的问题,我并不会粘贴完整的 pom.xml 配置信息,你可以在 github 源码中查看完整的配置文件)
 
<dependencies>
<!-- spring boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>

<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
<version>${javax.mail.version}</version>
</dependency>

</dependencies>

构建消息生产者

spring-amqp-email-list1.PNG


我们使用 Java Config 的方式配置消息生产者。
@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
@Autowired
private Environment env;

@Bean
public ConnectionFactory connectionFactory() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(env.getProperty("mq.host").trim());
connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
connectionFactory.setUsername(env.getProperty("mq.username").trim());
connectionFactory.setPassword(env.getProperty("mq.password").trim());
return connectionFactory;
}

@Bean
public CachingConnectionFactory cachingConnectionFactory() throws Exception {
return new CachingConnectionFactory(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() throws Exception {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}

@Bean
public AmqpAdmin amqpAdmin() throws Exception {
return new RabbitAdmin(cachingConnectionFactory());
}

@Bean
Queue queue() {
String name = env.getProperty("mq.queue").trim();
// 是否持久化
boolean durable = StringUtils.isNotBlank(env.getProperty("mq.queue.durable").trim())?
Boolean.valueOf(env.getProperty("mq.queue.durable").trim()) : true;
// 仅创建者可以使用的私有队列,断开后自动删除
boolean exclusive = StringUtils.isNotBlank(env.getProperty("mq.queue.exclusive").trim())?
Boolean.valueOf(env.getProperty("mq.queue.exclusive").trim()) : false;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.queue.autoDelete").trim())?
Boolean.valueOf(env.getProperty("mq.queue.autoDelete").trim()) : false;
return new Queue(name, durable, exclusive, autoDelete);
}

@Bean
TopicExchange exchange() {
String name = env.getProperty("mq.exchange").trim();
// 是否持久化
boolean durable = StringUtils.isNotBlank(env.getProperty("mq.exchange.durable").trim())?
Boolean.valueOf(env.getProperty("mq.exchange.durable").trim()) : true;
// 当所有消费客户端连接断开后,是否自动删除队列
boolean autoDelete = StringUtils.isNotBlank(env.getProperty("mq.exchange.autoDelete").trim())?
Boolean.valueOf(env.getProperty("mq.exchange.autoDelete").trim()) : false;
return new TopicExchange(name, durable, autoDelete);
}

@Bean
Binding binding() {
String routekey = env.getProperty("mq.routekey").trim();
return BindingBuilder.bind(queue()).to(exchange()).with(routekey);
}
}

其中,定义了队列、交换器,以及绑定。事实上,通过这种方式当队列或交换器不存在的时候,Spring AMQP 会自动创建它。(如果你不希望自动创建,可以在 RabbitMQ 的管理后台开通队列和交换器,并注释掉 queue() 方法和 exchange() 方法)。此外,我们为了更好地扩展,将创建队列或交换器的配置信息抽离到了配置文件 application.properties。其中,还包括 RabbitMQ 的配置信息。
mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/

mq.exchange=email_exchange
mq.exchange.durable=true
mq.exchange.autoDelete=false

mq.queue=email_queue
mq.queue.durable=true
mq.queue.exclusive=false
mq.queue.autoDelete=false

mq.routekey=email_routekey

此外,假设一个生产者发送到一个交换器,而一个消费者从一个队列接收消息。此时,将队列绑定到交换器对于连接这些生产者和消费者至关重要。在 Spring AMQP 中,我们定义一个 Binding 类来表示这些连接。我们使用 BindingBuilder 来构建 “流式的 API” 风格。
BindingBuilder.bind(queue()).to(exchange()).with(routekey);

现在,我们离大功告成已经很近了,需要再定义一个发送邮件任务存入消息队列的方法。此时,为了更好地扩展,我们定义一个接口和一个实现类,基于接口编程嘛。
public interface EmailService {
/**
* 发送邮件任务存入消息队列
* @param message
* @throws Exception
*/
void sendEmail(String message) throws Exception;
}

它的实现类中重写 sendEmail() 方法,将消息转码并写入到消息队列中。
@Service
public class EmailServiceImpl implements EmailService{
private static Logger logger = LoggerFactory.getLogger(EmailServiceImpl.class);

@Resource( name = "rabbitTemplate" )
private RabbitTemplate rabbitTemplate;

@Value("${mq.exchange}")
private String exchange;

@Value("${mq.routekey}")
private String routeKey;

@Override
public void sendEmail(String message) throws Exception {
try {
rabbitTemplate.convertAndSend(exchange, routeKey, message);
}catch (Exception e){
logger.error("EmailServiceImpl.sendEmail", ExceptionUtils.getMessage(e));
}
}
}

那么,我们再模拟一个 RESTful API 接口调用的场景,来模拟真实的场景。
@RestController()
@RequestMapping(value = "/v1/emails")
public class EmailController {

@Resource
private EmailService emailService;

@RequestMapping(method = RequestMethod.POST)
public JSONObject add(@RequestBody JSONObject jsonObject) throws Exception {
emailService.sendEmail(jsonObject.toJSONString());
return jsonObject;
}
}

最后,再写一个 main 方法,将 Spring Boot 服务运行起来吧。
@RestController
@EnableAutoConfiguration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class WebMain {

public static void main(String[] args) throws Exception {
SpringApplication.run(WebMain.class, args);
}
}

至此,已经大功告成了。我们可以通过 Postman 发送一个 HTTP 请求。(Postman是一款功能强大的网页调试与发送网页HTTP请求的Chrome插件。)
{
"to":"lianggzone@163.com",
"subject":"email-server-producer",
"text":"<html><head></head><body><h1>邮件测试</h1><p>hello!this is mail test。</p></body></html>"
}

请参见图示。

spring-amqp-email.PNG


来看看 RabbitMQ 的管理后台吧,它会出现一个未处理的消息。(地址:http://localhost:15672/#/queues) 
 

spring-amqp-email-admin.PNG


注意的是,千万别向我的邮箱发测试消息哟,不然我的邮箱会邮件爆炸的/(ㄒoㄒ)/~~。

构建消息消费者

spring-amqp-email-list2.PNG


完成消息生产者之后,我们再来构建一个消息消费者的工程。同样地,我们使用 Java Config 的方式配置消息消费者。
@Configuration
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
@PropertySource(value = {"classpath:application.properties"})
public class RabbitMQConfig {
@Autowired
private Environment env;

@Bean
public ConnectionFactory connectionFactory() throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(env.getProperty("mq.host").trim());
connectionFactory.setPort(Integer.parseInt(env.getProperty("mq.port").trim()));
connectionFactory.setVirtualHost(env.getProperty("mq.vhost").trim());
connectionFactory.setUsername(env.getProperty("mq.username").trim());
connectionFactory.setPassword(env.getProperty("mq.password").trim());
return connectionFactory;
}

@Bean
public CachingConnectionFactory cachingConnectionFactory() throws Exception {
return new CachingConnectionFactory(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() throws Exception {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory());
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}

@Bean
public AmqpAdmin amqpAdmin() throws Exception {
return new RabbitAdmin(cachingConnectionFactory());
}

@Bean
public SimpleMessageListenerContainer listenerContainer(
@Qualifier("mailMessageListenerAdapter") MailMessageListenerAdapter mailMessageListenerAdapter) throws Exception {
String queueName = env.getProperty("mq.queue").trim();

SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(cachingConnectionFactory());
simpleMessageListenerContainer.setQueueNames(queueName);
simpleMessageListenerContainer.setMessageListener(mailMessageListenerAdapter);
// 设置手动 ACK
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return simpleMessageListenerContainer;
}
}

聪明的你,应该发现了其中的不同。这个代码中多了一个 listenerContainer() 方法。是的,它是一个监听器容器,用来监听消息队列进行消息处理的。注意的是,我们这里设置手动 ACK 的方式。默认的情况下,它采用自动应答,这种方式中消息队列会发送消息后立即从消息队列中删除该消息。此时,我们通过手动 ACK 方式,如果消费者因宕机或链接失败等原因没有发送 ACK,RabbitMQ 会将消息重新发送给其他监听在队列的下一个消费者,保证消息的可靠性。

当然,我们也定义 application.properties 配置文件。
mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
mq.vhost=/

mq.queue=email_queue

此外,我们创建了一个 MailMessageListenerAdapter 类来消费消息。
@Component("mailMessageListenerAdapter")
public class MailMessageListenerAdapter extends MessageListenerAdapter {

@Resource
private JavaMailSender mailSender;

@Value("${mail.username}")
private String mailUsername;

@Override
public void onMessage(Message message, Channel channel) throws Exception {
try {
// 解析RabbitMQ消息体
String messageBody = new String(message.getBody());
MailMessageModel mailMessageModel = JSONObject.toJavaObject(JSONObject.parseObject(messageBody), MailMessageModel.class);
// 发送邮件
String to = mailMessageModel.getTo();
String subject = mailMessageModel.getSubject();
String text = mailMessageModel.getText();
sendHtmlMail(to, subject, text);
// 手动ACK
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
e.printStackTrace();
}
}

/**
* 发送邮件
* @param to
* @param subject
* @param text
* @throws Exception
*/
private void sendHtmlMail(String to, String subject, String text) throws Exception {
MimeMessage mimeMessage = mailSender.createMimeMessage();
MimeMessageHelper mimeMessageHelper = new MimeMessageHelper(mimeMessage);
mimeMessageHelper.setFrom(mailUsername);
mimeMessageHelper.setTo(to);
mimeMessageHelper.setSubject(subject);
mimeMessageHelper.setText(text, true);
// 发送邮件
mailSender.send(mimeMessage);
}
}

在 onMessage() 方法中,我们完成了三件事情:
  1. 从 RabbitMQ 的消息队列中解析消息体。
  2. 根据消息体的内容,发送邮件给目标的邮箱。
  3. 手动应答 ACK,让消息队列删除该消息。


这里,JSONObject.toJavaObject() 方法使用 fastjson 将 json 字符串转换成实体对象 MailMessageModel。注意的是,@Data 是 lombok 类库的一个注解。
@Data
public class MailMessageModel {
@JSONField(name = "from")
private String from;

@JSONField(name = "to")
private String to;

@JSONField(name = "subject")
private String subject;

@JSONField(name = "text")
private String text;

@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("Email{from:").append(this.from).append(", ");
sb.append("to:").append(this.to).append(", ");
sb.append("subject:").append(this.subject).append(", ");
sb.append("text:").append(this.text).append("}");
return sb.toString();
}
}

Spring 对 Java Mail 有很好的支持。其中,邮件包括几种类型:简单文本的邮件、 HTML 文本的邮件、 内嵌图片的邮件、 包含附件的邮件。这里,我们封装了一个简单的 sendHtmlMail() 进行邮件发送。

对了,我们还少了一个邮件的配置类。
@Configuration
@PropertySource(value = {"classpath:mail.properties"})
@ComponentScan(basePackages = {"com.lianggzone.rabbitmq"})
public class EmailConfig {
@Autowired
private Environment env;

@Bean(name = "mailSender")
public JavaMailSender mailSender() {
// 创建邮件发送器, 主要提供了邮件发送接口、透明创建Java Mail的MimeMessage、及邮件发送的配置
JavaMailSenderImpl mailSender = new JavaMailSenderImpl();
// 如果为普通邮箱, 非ssl认证等
mailSender.setHost(env.getProperty("mail.host").trim());
mailSender.setPort(Integer.parseInt(env.getProperty("mail.port").trim()));
mailSender.setUsername(env.getProperty("mail.username").trim());
mailSender.setPassword(env.getProperty("mail.password").trim());
mailSender.setDefaultEncoding("utf-8");
// 配置邮件服务器
Properties props = new Properties();
// 让服务器进行认证,认证用户名和密码是否正确
props.put("mail.smtp.auth", "true");
props.put("mail.smtp.timeout", "25000");
mailSender.setJavaMailProperties(props);
return mailSender;
}
}

这些配置信息,我们在配置文件 mail.properties 中维护。
mail.host=smtp.163.com
mail.port=25
mail.username=用户名
mail.password=密码

最后,我们写一个 main 方法,将 Spring Boot 服务运行起来吧。

至此,我们也完成了一个消息消费者的工程,它将不断地从消息队列中处理邮件消息。

源代码

相关示例完整代码: https://github.com/lianggzone/rabbitmq-server 
 
(完)
 


本文作者: 梁桂钊
原文链接: http://blog.720ui.com/2017/rabbitmq_action_email/
版权归作者所有,转载请注明出处