ActiveMQ 概述
JMS 只是定义了一组有关消息传送的规范和标准,并没有提供实现,也就说 JMS 只是定义了一组接口而已,就像 JDBC 抽象了关系数据库访问、JPA 抽象了对象与关系数据库映射一样。JMS 的具体实现由不同的消息中间件厂商提供,比如 Apache ActiveMQ 就是 JMS 规范的具体实现,ApacheActiveMQ 才是一个消息服务系统,而 JMS 不是。
Apache ActiveMQ 是最流行的开源的、支持多协议的、基于 Java 的消息代理。它支持行业标准协议,因此用户可以通过多种语言和平台从客户端选择中受益,可以使用 C,C ++,Python,.Net 等编写的客户端进行连接。它使用无处不在的 AMQP 协议集成您的多平台应用程序。它基于 Websocket使用 STOMP 在 Web 应用程序之间交换消息;使用 MQTT 管理您的 IoT 设备;支持现有的 JMS 基础结构及其他。 ActiveMQ 提供了强大的功能和灵活性来支持任何消息传递用例。ActiveMQ 当前有两种可用“风格”:“经典”代理和“下一代”的 Artemis 代理。一旦 Artemis 的特性达到和“经典”风格同等的充分级别时,就会成为下一个 ActiveMQ 的主要版本。Apache ActiveMQ 官网:ActiveMQ
使用 ActiveMQ 作为消息代理需要:
下载和安装 ActiveMQ
启动 ActiveMQ 服务
下载和安装 ActiveMQ
以“经典”风格为例,可通过以下 url 下载 ActiveMQ 代理:
从官网下载 apache-activemq-x.x.x-bin.zip 文件,将其放在某个目录下,解压该文件,就可以使用 ActiveMQ 了。
启动 ActiveMQ 服务
使用 ActiveMQ 提供消息服务,需要先启动 ActiveMQ 服务。可通过 activemq start 命令启动ActiveMQ 服务。如果将 ActiveMQ 的 bin 目录配置为环境变量,可以直接在命令行使用 activemqstart 命令,如果没有配置环境变量,可以先切换目录到 ActiveMQ 的 bin 目录,再使用 activemqstart 命令启动 ActiveMQ 服务。
例如,这里将下载的 apache-activemq-x.x.x-bin.zip 文件解压到 D 盘,ActiveMQ 的目录为D:\apache-activemq-5.16.2\bin,切换到该目录,使用 activemq start 命令启动 ActiveMQ 服务,如下所示:
执行 activemq start 命令启动 ActiveMQ 服务,可看到服务启动的提示信息,其中部分信息如下所示:
从启动信息可以看到,ActiveMQ 的 Web 控制台对应的 url 为:http://127.0.0.1:8161/
打开浏览器,输入该 url,输入默认的账户:
用户名为 admin,密码为 admin,
访问 ActiveMQ 的 Web控制台,并点击“Manage ActiveMQ broker”超链接,可以看到如下界面:
通过访问该 Web 控制台,可以查看创建的队列、主题、订阅者等信息,并跟踪队列和主题上消息的收发。需要注意的是:http://127.0.0.1:8161/为 ActiveMQ 的 Web 控制台的 URL,使用的 TCP 默认端口号为 8161,但是其服务端口号默认为 61616,所以 TCP 服务 URL 为 tcp://localhost:61616,当然,这里的 localhost 可替换为主机名或 IP 地址。
至此,我们可以开始使用 ActiveMQ 服务并跟踪消息的收发了。
JMS 编程模型
在 JMS API 中,javax.jms.ConnectionFactory 接口提供了一种标准方法用于创建 javax.jms.Connection连接实例,实现和一个 JMS 代理进行交互。在 JMS 编程时,涉及的组件包括:
ConnectionFactory:创建 Connection 对象的工厂,针对两种不同的 JMS 消息传送模型,分别有 QueueConnectionFactory 和 TopicConnectionFactory 。
Destination:表示目的地。目的地是消息生产者的消息发送目标和消息使用者的消息来源。对于消息生产者来说,它的目的地(Destination)是某个队列(Queue)或主题(Topic),对于消息使用者来说,它的目的地也是某个队列或主题(即消息来源)。所以,Destination 实际上就是两种类型的对象:Queue 和 Topic。
Connection:表示在客户端和 JMS 系统之间建立的连接。一个连接可以产生一个或多个Session。
Session:提供方法对消息进行操作的接口,可以通过 Session 对象创建生产者、使用者、消息等。Session 提供了事务功能,如果需要使用 Session 发送/接收多个消息时,可以将这些发送/接收操作放到一个事务中。
Producer:表示消息生产者。消息生产者由 Session 创建,并用于将消息发送到 Destination。同 样 的 , 针 对 两 种 不 同 的 消 息 模 型 , 消 息 生 产 者 分 两 种 类 型 : QueueSender 和TopicPublisher。可以调用消息生产者的方法发送消息。
Consumer:表示消息使用者,消息使用者也由 Session 创建,用于接收被发送到 Destination的 消 息 。 针 对 两 种 不 同 的 消 息 模 型 , 消 息 使 用 者 分 为 两 种 类 型 : QueueReceiver 和TopicSubscriber 。 可 分 别 通 过 Session 的 createReceiver(Queue queue) 方 法 和createSubscriber(Topic topic)方法来创建两种消息使用者。当然,也可以 Session 的createDurableSubscriber() 方法来创建持久化的订阅者。
MessageListener:表示消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的 onMessage()方法以接收和使用消息。
在不使用 Spring 框架的情况下,我们可以使用 JMS API 和 ActiveMQ API 实现消息通信。
例如,如果创建一个 Java 程序,作为一个发送消息的客户端,使用点对点模型实现发送消息到指定的队列,则可以使用如下代码实现发送消息的功能:
需要的依赖
<!--JMS原生API-->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0</version>
</dependency>
<!-- active-mq核心包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
客户端-生产者
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MessProducter {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.创建队列目的地,并指定队列名称
Destination destination = session.createQueue("queue1");
//6.创建一个消息生产者
MessageProducer producer = session.createProducer(destination);
//7.创建消息,这里使用文本消息
TextMessage message = session.createTextMessage("Hello~");
//8.发送消息
producer.send(message);
//9.关闭连接
connection.close();
}
}
在上面的代码中,我们通过 ConnectionFactory 创建了 Connection 实例,并通过 Connection 实例的 createSession()方法创建 Session 实例,用于实现消息通信。然后调用 Session 实例的createQueue()方法创建了一个名为“queue1”的队列,作为发送消息的目的地。Session 实例的createProducer()方法用于创建消息生产者,创建消息生产者需要指定发送消息的目的地。此外,我们调用了 Session 的 createTextMessage()方法创建一个文本消息,Session 实例也提供了方法创 建 其 他 类 型 的 消 息 , 如 createObjectMessage() 方 法 用 于 创 建 一 个 对 象 消 息 ,createMapMessage()方法用于创建一个 Map 消息。接着,我们调用 MessageProducer 实例的 send()方法发送消息到队列,并在使用完连接后关闭连接。
如果希望创建一个 Java 程序,作为接收和使用消息的客户端,可使用以下代码实现该客户端:
消费者客户端
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class MessConsumer {
public static void main(String[] args) throws JMSException {
//1.创建连接工厂
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2.创建连接
Connection connection = factory.createConnection();
//3.启动连接
connection.start();
//4.创建会话
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.创建队列目的地,并指定队列名称
Destination destination = session.createQueue("queue1");
//6.创建一个消息消费者
MessageConsumer consumer = session.createConsumer(destination);
//7.创建消息监听器,监听消息的接收
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("Message Received: "+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
}
}
在上面的代码中,我们通过 ConnectionFactory 创建了 Connection 实例,并通过 Connection 实例的 createSession()方法创建 Session 实例,用于实现消息通信。然后调用 Session 实例的createQueue()方法创建了一个名为“queue1”的队列,作为接收消息的来源,发送消息和接收消息的目的地必须是同一个。Session 实例的 createConsumer()方法用于创建消息使用者,创建消息 使 用 者 需 要 指 定 接 收 消 息 的 目 的 地 。 此 外 , 我 们 调 用 了 MessageConsumer 接 口 的setMessageListener()方法为消息使用者注册一个消息监听器,用于监听消息的接收,当接收到消息时,onMessage() 方法被调用,在此方法中编写代码处理接收到的消息,如果希望持续监听消息接收,则不能关闭连接。 当然,也可以通过 JNDI 查找和获取 ConnectionFactory 和 Destination 实例。可以看到,使用 JMS API 和 ActiveMQ API 实现消息通信,需要进行很多的编码。Spring 框架提供了和 JMS 的集成支持,从而简化了使用 JMS API 实现消息通信,类似 Spring 对 JDBC API 的集成。
JMS 总的来说,主要实现两个功能,也就是消息的生产和消息的使用。Spring 框架提供了JmsTemplate 等类用于实现消息生产和消息的同步接收。对于异步通信,类似 Java EE 中消息驱动的 bean 风格,Spring 提供消息监听器容器,以创建消息驱动的 POJO 类。此外,Spring 提供一种声明的方式方式来创建消息监听器。