mq意为:消息队列。其中active mq 是apache的一个顶级开源项目,是jms的一个标准实现。
为什么要用mq?
1、通信,mq最本质的需求就是机器间的通信,即消息传递。
2、系统解耦,异步通信。mq消息的发送,不需要等待对方的返回,吞吐量高。
3、生产者和消费者的消息通信可靠性保证。一旦消息到了mq,消息的接收有保障。
mq消息传递的两种模式:
1、PTP:Point to Point,即点对点的消息模型;
一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。
2、Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型;
一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。
其中:Pub/Sub又有Nondurable subscription(非持久订阅)和durable subscription (持久化订阅)2种消息处理方式。
非持久订阅:只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。
持久订阅时:客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider 时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。
典型术语:
JMS Provider:实现JMS 接口的消息中间件;
Queue:队列目标;
Topic:主题目标;
主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(s ubscribe) 从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要 接触即可保证消息的传送。
ConnectionFactory:连接工厂,JMS 用它创建连接;
Connection:JMS 客户端到JMS Provider 的连接;
Destination:消息的目的地;
Session:会话,一个发送或接收消息的线程; 客户端用Session 创建MessageProducer 和MessageConsumer对象。如果在Session 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收。
MessageProducer:由Session 对象创建的用来发送消息的对象;
MessageConsumer:由Session 对象创建的用来接收消息的对象; 客户端用MessageConsumer 接收队列中的消息,如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到。
Acknowledge:签收;
Transaction:事务。
执行流程:
按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:
1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
2. 利用factory构造JMS connection
3. 启动connection
4. 通过connection创建JMS session.
5. 指定JMS destination.
6. 创建JMS producer或者创建JMS message并提供destination.
7. 创建JMS consumer或注册JMS message listener.
8. 发送和接收JMS message.
9. 关闭所有JMS资源,包括connection, session, producer, consumer等。
最简单的案例:
Producer code:
public class Producer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final Boolean NON_TRANSACTED = false;
private static final int NUM_MESSAGES_TO_SEND = 100;
private static final long DELAY = 100;
public static void main(String[] args) {
String url = BROKER_URL;
if (args.length > 0) {
url = args[0].trim();
}
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
Connection connection = null;
try {
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("test-queue");
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
TextMessage message = session.createTextMessage("Message #" + i);
System.out.println("Sending message #" + i);
producer.send(message);
Thread.sleep(DELAY);
}
producer.close();
session.close();
} catch (Exception e) {
System.out.println("Caught exception!");
}
finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
System.out.println("Could not close an open connection...");
}
}
}
}
}
consumer:
public class Consumer {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final Boolean NON_TRANSACTED = false;
private static final long TIMEOUT = 20000;
public static void main(String[] args) {
String url = BROKER_URL;
if (args.length > 0) {
url = args[0].trim();
}
System.out.println("\nWaiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s");
//MQ 连接工厂对象
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
Connection connection = null;
try {
//建连接
connection = connectionFactory.createConnection();
connection.start();
//创建会话
Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
//获取消息的目的地
Destination destination = session.createQueue("test-queue");
//生成消息消费者
MessageConsumer consumer = session.createConsumer(destination);
int i = 0;
while (true) {
//从目的地获取消息
Message message = consumer.receive(TIMEOUT);
if (message != null) {
if (message instanceof TextMessage) {
String text = ((TextMessage) message).getText();
System.out.println("Got " + i++ + ". message: " + text);
}
} else {
break;
}
}
consumer.close();
session.close();
} catch (Exception e) {
System.out.println("Caught exception!");
}
finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
System.out.println("Could not close an open connection...");
}
}
}
}
}
broker:
/**
* 嵌入式AMQ broker 启动服务
* @author xinchun.wang
*
*/
public class MQBroker {
public static void main(String[] args) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setBrokerName("tpBroker");
brokerService.setPersistent(false);
brokerService.addConnector("tcp://localhost:61616");
brokerService.start();
System.out.println("brokerService start ok ~");
}
}
activemq 学习资源准备
官方网址:
http://activemq.apache.org/
源码svn:
https://svn.apache.org/repos/asf/activemq/trunk/
下载activemq
http://www.apache.org/dyn/closer.cgi?path=/activemq/5.10.0/apache-activemq-5.10.0-bin.zip
运行MQ:
解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat 即启动了mq的 broker。
另外AMQ 还可以通过代码的方式嵌入式启动:
/**
* 嵌入式AMQ broker 启动服务
* @author xinchun.wang
*
*/
public class MQBroker {
public static void main(String[] args) throws Exception {
BrokerService brokerService = new BrokerService();
brokerService.setBrokerName("tpBroker");
brokerService.setPersistent(false);
brokerService.addConnector("tcp://localhost:61616");
brokerService.start();
System.out.println("brokerService start ok ~");
}
}
嵌入式启动AMQ,pom 引入active-all.jar包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.10.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
client 发送和接受消息需要引入active-client包
pom xml 为:
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.10.0</version>
</dependency>
分享到:
相关推荐
解压缩apache-activemq-5.5.1-bin.zip,然后双击...包含了apache-activemq-5.5.1-bin.zip以及ActiveMQ一个helloworld的demo启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。
1.一个是admin,用来显示和管理所有的queue、topic、connection等等。 2.一个是demo,有一些使用jms和activemq的简单例子。 3.还有一个fileserver,用来支持通过activemq发送文件时的中转服务器。blob message时配置...
ActiveMQ-Queues点对点消息-Receive+Listener方式:参考博文:http://blog.csdn.net/ABAP_Brave/article/details/53443725
使用SpringBoot方式集成ActiveMQ ActiveMQ消息中间件的点对点模式point to point 消息队列 * 消息消费者从queue中取出并且消费消息 * 消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的...
MQ是消息中间件,是一种在分布式系统中应用程序借以传递消息的媒介,常用的有ActiveMQ,RabbitMQ,kafka。ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS ...1、点对点(queue) 2、一对多(topic)
activemq的Java依赖包,课构建Queue/topic 生产、消费
spring boot activemq集成示例,包含queue和topic消息的发送、接收,连接池的支持。
消息中间件(Adaptive Message Queue,简称:AMQ)是一个高性能的消息中间件,AMQ提供了基于Publish/Subscribe的消息通讯服务,同时AMQ可以提供服务总线功能,供其他进程可以在AMQ上进行服务注册,客户端软件可以...
activemq的queue队列模式的maven,spring的demo
wget -P /usr/share/munin/plugins/ https://raw.github.com/kipsnak/munin-activemq-plugin/master/queue_ 安装要求 通过以下命令测试系统是否具有必需的模块: /usr/share/munin/plugins/queue_ autoconf 因此...
Apache ActiveMQ Queue Topic 详解 教程 加入代码解释说明
activemq queue模式,事务、应答、转发模式、MessageConsumer的receive阻塞方法的测试
MessageQueue API ActiveMQ API, jms 消息队列API
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...
本人在学习activemq,然后 测试完成的demo, 包含了queue,topic,持久化到mysql,订阅模式,包好用
<artifactId>db-queue <version>11.0.0 为什么? 有以下几个原因: 您需要简单,高效和灵活的任务处理工具,以支持延迟的作业执行。 您已经有一个数据库,并且不想在基础结构中引入其他工具(例如ActiveMq或...
启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。创建project:ActiveMQ-5.5,并导入apache-activemq-5.5.1\lib目录下需要用到的jar文件,项目结构如下图所示最后接收者跟...
2.使用通配符订阅Queue(因为设备很多,而且是动态新增,主题是不确定的) springBoot整合 1.依赖 org.messaginghub pooled-jms org.springframework.boot spring-boot-starter-activemq org.apache....
message-queue-parent 消息队列(ActiveMQ、Kafka、RabbitMQ、RocketMQ等)示例