前言:
这里我们需要准备两个Spring Boot项目,一个项目作为 消息生产,一个作为消费消息
window安装MQ,自行安装,可以参考教程:
https://blog.csdn.net/qq_63815371/article/details/131032508
项目1:(生产消息)
项目结构:
pom.xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<!--MQ-->
<!-- RocketMQ Java Client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version> <!-- 使用合适的RocketMQ版本 -->
</dependency>
<!-- Spring Boot Starter for RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version> <!-- 使用合适的RocketMQ Spring Boot Starter 版本 -->
</dependency>
application.properties
server.port=8888
# RocketMQ NameServer地址
rocketmq.name-server=127.0.0.1:9876
# RocketMQ Producer(生产者)配置
rocketmq.producer.group=producer_group
rocketmq.producer.send-message-timeout=3000
发送String类型的消息:
@SpringBootTest
public class 生产者 {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 测试发送 String类型的数据
* */
@Test
public void sendMessage() {
rocketMQTemplate.convertAndSend("gao", "第一个测试消息");
}
}
发送User对象类型的数据
@SpringBootTest
public class 生产者 {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 测试发送 User对象 类型的数据
* */
@Test
public void sendMessage2() {
User user = new User();
user.setName("小高");
user.setAge(20);
user.setSex("男");
user.setCreateDate(new Date());
rocketMQTemplate.convertAndSend("gao", user);
}
}
User实体:
import lombok.Data;
import java.util.Date;
@Data
public class User {
private String name;
private Integer age;
private String sex;
private Date createDate;
}
发送JSON类型的数据
@SpringBootTest
public class 生产者 {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 测试发送 JSON 类型的数据
* */
@Test
public void sendMessage3() {
User user = new User();
user.setName("小高");
user.setAge(20);
user.setSex("男");
user.setCreateDate(new Date());
String s = JSON.toJSONString(user);
System.out.println(s);
rocketMQTemplate.convertAndSend("gao", s);
}
}
项目2:(消费者)
项目结构:
pom.xml
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.74</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<!-- RocketMQ Java Client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.1</version> <!-- 使用合适的RocketMQ版本 -->
</dependency>
<!-- Spring Boot Starter for RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version> <!-- 使用合适的RocketMQ Spring Boot Starter 版本 -->
</dependency>
application.properties
server.port=8889
# RocketMQ NameServer地址
rocketmq.name-server=127.0.0.1:9876
# RocketMQ Consumer(消费者)配置
rocketmq.consumer.group=consumer_group
rocketmq.consumer.topic=gao
消费String类型的数据
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "gao", consumerGroup = "consumer_group")
public class 消费者String implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.println("消费者,接收到的消息String: " + message);
}
}
消费User类型的数据
import com.gao.entity.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "gao", consumerGroup = "consumer_group")
public class 消费者User implements RocketMQListener<User> {
@Override
public void onMessage(User user) {
System.out.println("消费者,接收到的消息User: " + user);
}
}
User实体:
import lombok.Data;
import java.util.Date;
@Data
public class User {
private String name;
private Integer age;
private String sex;
private Date createDate;
}
消费JSON的数据,原则上还是Sting类型的数据
import com.alibaba.fastjson.JSON;
import com.gao.entity.User;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(topic = "gao", consumerGroup = "consumer_group")
public class 消费者JSON implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
User user = JSON.parseObject(s, User.class);
System.out.println("消费者,接收到的消息JSON: "+user);
}
}
来源:https://www.cnblogs.com/Life-QX/p/17778382.html
没有回复内容