SpringBoot从入门到精通-第17章 消息中间件ActiveMQ
一、ActiveMQ简介
消息中间件基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步的消息传输。在传递消息的过程中,发送消息的一方被称为生产者,接收消息的一方被称为消费者。这是一种概括性的称呼,凡是实现消息传递的双方都可以被称作生产者和消费者,它们可以是两个系统、两个程序、两个线程,甚至可以是同一个类中的两个方法。
消息中间件两个功能
- 异步调用
- 消减峰值
消息中间件两种传递模式
- 点对点
-
- 一个消息队列可以同时服务多个生产者和多个消费者,生产者将消息放到消息队列中,供消费者进行消费,一个消息只能被一个消费者消费的传递模式称作“点对点”模式,也叫一对一模式。
- 发布/订阅
-
- 生产者放入队列中的消息可以同时被多个消费者消费的传递模式称作“发布/订阅”模式。
二、示例
利用消息中间件异步保存订单数据
- 模拟表单提交消息到ActiveMQ;
- 提交成功返回“提交订单成功”,否则返回“传入的数据有误”。
三、整合ActiveMQ
2.1整体操作步骤
- 安装部署ActiveMQ
- 创建SpringBoot程序
- 添加ActiveMQ依赖
- 添加ActiveMQ配置
- 编写实体类
- 编写ActiveMQ配置类
- 编写ActiveMQ消费者
- 编写ActiveMQ生产者
- 编写SpringBoot控制器
- 编写index.html
- 访问测试
2.2详细操作步骤
- 安装部署ActiveMQ
– 解压activemq.tar
— tar xf activemq.tar
– 740授权
— chmod 740 activemq
– 配置好java环境
–启动ActiveMQ
—cd bin && ./activemq start
—ps -ef|grep activema
端口8161 61616
管理路径为 http://localhost:8161 admin admin
业务路径为 tcp://localhost:61616
1、管理页面创建队列并发送消息
Queue–name–create
右侧operations—send to
2、命令行查看队列
./activemq browse event01
./activemq browse --amqurl tcp://127.0.0.1:61616 --user admin --password admin event01
创建SpringBoot程序
添加ActiveMQ依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
整体pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.mr</groupId>
<artifactId>OrderDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>OrderDemo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 添加ActiveMQ配置
spring.activemq.broker-url=tcp://10.0.0.132:61616
activemq.queue.name=order_queue
- 编写实体类
package com.mr.dto;
public class Order {
private String orderId;// 订单流水号
private String goodsId;// 商品编号
private String goodsName;// 商品名称
private String goodsType;// 商品类型
private String goodsCount;// 商品数量
private String goodsUnit;// 商品单位
private String createDate;// 下单日期
private String creator;// 创建人
public Order() {}
public Order(String orderId, String goodsId, String goodsName, String goodsType, String goodsCount,
String goodsUnit, String createDate, String creator) {
this.orderId = orderId;
this.goodsId = goodsId;
this.goodsName = goodsName;
this.goodsType = goodsType;
this.goodsCount = goodsCount;
this.goodsUnit = goodsUnit;
this.createDate = createDate;
this.creator = creator;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public String getGoodsId() {
return goodsId;
}
public void setGoodsId(String goodsId) {
this.goodsId = goodsId;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public String getGoodsType() {
return goodsType;
}
public void setGoodsType(String goodsType) {
this.goodsType = goodsType;
}
public String getGoodsCount() {
return goodsCount;
}
public void setGoodsCount(String goodsCount) {
this.goodsCount = goodsCount;
}
public String getGoodsUnit() {
return goodsUnit;
}
public void setGoodsUnit(String goodsUnit) {
this.goodsUnit = goodsUnit;
}
public String getCreateDate() {
return createDate;
}
public void setCreateDate(String createDate) {
this.createDate = createDate;
}
public String getCreator() {
return creator;
}
public void setCreator(String creator) {
this.creator = creator;
}
@Override
public String toString() {
return "Order [orderId=" + orderId + ", goodsId=" + goodsId + ", goodsName=" + goodsName + ", goodsType="
+ goodsType + ", goodsCount=" + goodsCount + ", goodsUnit=" + goodsUnit + ", createDate=" + createDate
+ ", creator=" + creator + "]";
}
}
- 编写ActiveMQ配置类
package com.mr.config;
import javax.jms.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ActiveMqConfig {
@Value("${activemq.queue.name}")
String queueName;
@Bean
public Queue queue() {
return new ActiveMQQueue(queueName);
}
}
- 编写ActiveMQ消费者
package com.mr.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mr.dto.Order;
@Service
public class QueueConsumer {
@Autowired
ObjectMapper jackson;
@JmsListener(destination = "${activemq.queue.name}")
public void receiveQueueMsg(String msg) throws JsonMappingException, JsonProcessingException {
System.out.println("接收的订单数据:" + msg);
Order order = jackson.readValue(msg, Order.class);
System.out.println("(模拟)将数据保存到数据库,提交的单号为:" + order.getGoodsId());
}
}
- 编写ActiveMQ生产者
package com.mr.service;
import javax.jms.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.stereotype.Service;
@Service
public class QueueProducer {
@Autowired
JmsOperations jmsTemplate;
@Autowired
Queue queue;
public void sendMessage(String message) {
jmsTemplate.convertAndSend(queue, message);
}
}
- 编写SpringBoot控制器
package com.mr.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mr.dto.Order;
import com.mr.service.QueueProducer;
@Controller
public class OrderController {
@Autowired
QueueProducer producer;
@Autowired
ObjectMapper jackson;
@RequestMapping("/submit")
@ResponseBody
public String getOrder(String orderId, String goodsId, String goodsName, String goodsType,
String goodsCount, String goodsUnit, String createDate, String creator) {
Order order = new Order(orderId, goodsId, goodsName, goodsType, goodsCount,
goodsUnit, createDate, creator);
try {
producer.sendMessage(jackson.writeValueAsString(order));
} catch (JsonProcessingException e) {
e.printStackTrace();
return "传入的数据有误";
}
return "提交订单成功";
}
@RequestMapping("/index")
public String index() {
return "index";
}
}
- 编写index.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
</head>
<body>
<form action="/submit" method="post">
<a>订单流水号</a>
<input type="text" name="orderId" /><br>
<a>商品编号</a>
<input type="text" name="goodsId" /><br>
<a>商品名称</a>
<input type="text" name="goodsName" /><br>
<a>商品类型</a>
<input type="text" name="goodsType" /><br>
<a>商品数量</a>
<input type="text" name="goodsCount" /><br>
<a>商品单位</a>
<input type="text" name="goodsUnit" /><br>
<a>下单日期</a>
<input type="text" name="createDate" /><br>
<a>创建人</a>
<input type="text" name="creator" /><br>
<input type="submit" value="提交" />
</form>
</body>
</html>
- 访问测试
http://127.0.0.1:8080/index
填写表单信息,点击提交
查看下ActiveMQ变化