`

activemq-queue

阅读更多
mq意为:消息队列。其中active mq 是apache的一个顶级开源项目,是jms的一个标准实现。

为什么要用mq?
1、通信,mq最本质的需求就是机器间的通信,即消息传递。
2、系统解耦,异步通信。mq消息的发送,不需要等待对方的返回,吞吐量高。
3、生产者和消费者的消息通信可靠性保证。一旦消息到了mq,消息的接收有保障。

mq消息传递的两种模式:
1、PTP:Point to Point,即点对点的消息模型;
    一条消息仅能被一个consumer收到。如果在message发送的时候没有可用的consumer,那么它将被保存一直到能处理该message的consumer可用。

2、Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型;
    一条消息被publish时,它将发到所有感兴趣的订阅者,所以零到多个subscriber将接收到消息的一个拷贝。
其中:Pub/Sub又有Nondurable subscription(非持久订阅)和durable subscription (持久化订阅)2种消息处理方式。

非持久订阅:只有当客户端处于激活状态,也就是和JMS Provider 保持连接状态才能收到发送到某个主题的消息,而当客户端处于离线状态,这个时间段发到主题的消息将会丢失,永远不会收到。
持久订阅时:客户端向JMS 注册一个识别自己身份的ID,当这个客户端处于离线时,JMS Provider 会为这个ID 保存所有发送到主题的消息,当客户再次连接到JMS Provider 时,会根据自己的ID 得到所有当自己处于离线时发送到主题的消息。


典型术语:
JMS Provider:实现JMS 接口的消息中间件;
Queue:队列目标;
Topic:主题目标;
  主题可以被认为是消息的传输中介,发布者(publisher)发布消息到主题,订阅者(s ubscribe) 从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要 接触即可保证消息的传送。

ConnectionFactory:连接工厂,JMS 用它创建连接;
Connection:JMS 客户端到JMS Provider 的连接;
Destination:消息的目的地;
Session:会话,一个发送或接收消息的线程; 客户端用Session 创建MessageProducer 和MessageConsumer对象。如果在Session 关闭时,有一些消息已经被收到,但还没有被签收(acknowledged),那么,当消费者下次连接到相同的队列时,这些消息还会被再次接收。
MessageProducer:由Session 对象创建的用来发送消息的对象;
MessageConsumer:由Session 对象创建的用来接收消息的对象; 客户端用MessageConsumer 接收队列中的消息,如果用户在receive 方法中设定了消息选择条件,那么不符合条件的消息会留在队列中,不会被接收到。

Acknowledge:签收;
Transaction:事务。

执行流程:
 按照JMS的规范,我们首先需要获得一个JMS connection factory.,通过这个connection factory来创建connection.在这个基础之上我们再创建session, destination, producer和consumer。因此主要的几个步骤如下:
1. 获得JMS connection factory. 通过我们提供特定环境的连接信息来构造factory。
2. 利用factory构造JMS connection
3. 启动connection
4. 通过connection创建JMS session.
5. 指定JMS destination.
6. 创建JMS producer或者创建JMS message并提供destination.
7. 创建JMS consumer或注册JMS message listener.
8. 发送和接收JMS message.
9. 关闭所有JMS资源,包括connection, session, producer, consumer等。

最简单的案例:
Producer code:
public class Producer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final int NUM_MESSAGES_TO_SEND = 100;
    private static final long DELAY = 100;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        try {

            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
                TextMessage message = session.createTextMessage("Message #" + i);
                System.out.println("Sending message #" + i);
                producer.send(message);
                Thread.sleep(DELAY);
            }

            producer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }
}


consumer:
public class Consumer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final long TIMEOUT = 20000;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        System.out.println("\nWaiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s");
        //MQ 连接工厂对象
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;

        try {

        	//建连接
            connection = connectionFactory.createConnection();
            connection.start();

            //创建会话
            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            //获取消息的目的地
            Destination destination = session.createQueue("test-queue");
            
            //生成消息消费者
            MessageConsumer consumer = session.createConsumer(destination);

            int i = 0;
            while (true) {
            	//从目的地获取消息
                Message message = consumer.receive(TIMEOUT);
                if (message != null) {
                    if (message instanceof TextMessage) {
                        String text = ((TextMessage) message).getText();
                        System.out.println("Got " + i++ + ". message: " + text);
                    }
                } else {
                    break;
                }
            }

            consumer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }
}


broker:
/**
 * 嵌入式AMQ broker 启动服务
 * @author xinchun.wang
 *
 */
public class MQBroker {
	public static void main(String[] args) throws Exception {
		BrokerService brokerService = new BrokerService();
		brokerService.setBrokerName("tpBroker");
		brokerService.setPersistent(false);
		brokerService.addConnector("tcp://localhost:61616");
		brokerService.start();
		System.out.println("brokerService start ok ~");
	}

}




activemq 学习资源准备

官方网址:
http://activemq.apache.org/

源码svn:
https://svn.apache.org/repos/asf/activemq/trunk/

下载activemq
http://www.apache.org/dyn/closer.cgi?path=/activemq/5.10.0/apache-activemq-5.10.0-bin.zip

运行MQ:

解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat 即启动了mq的 broker。

另外AMQ 还可以通过代码的方式嵌入式启动:
/**
 * 嵌入式AMQ broker 启动服务
 * @author xinchun.wang
 *
 */
public class MQBroker {
	public static void main(String[] args) throws Exception {
		BrokerService brokerService = new BrokerService();
		brokerService.setBrokerName("tpBroker");
		brokerService.setPersistent(false);
		brokerService.addConnector("tcp://localhost:61616");
		brokerService.start();
		System.out.println("brokerService start ok ~");
	}

}


嵌入式启动AMQ,pom 引入active-all.jar包
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.10.0</version>
		<exclusions>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>jcl-over-slf4j</artifactId>
			</exclusion>
			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-api</artifactId>
			</exclusion>

			<exclusion>
				<groupId>org.slf4j</groupId>
				<artifactId>slf4j-log4j12</artifactId>
			</exclusion>
		</exclusions>
	</dependency>  



client 发送和接受消息需要引入active-client包
pom xml 为:
         <dependency>
            <groupId>org.apache.geronimo.specs</groupId>
            <artifactId>geronimo-jms_1.1_spec</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.10.0</version>
        </dependency>
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics