`

activemq-topic-durable

阅读更多
其他参考:
http://wangxinchun.iteye.com/blog/2146172
http://wangxinchun.iteye.com/blog/2146120
http://wangxinchun.iteye.com/blog/2145998
http://wangxinchun.iteye.com/blog/2145958
http://wangxinchun.iteye.com/blog/2147335
有这样一种场景,如果Publisher 发布消息是,某一个Subscriber 由于特殊原因,比如断网或在系统死掉,那么希望在这个Subscriber 在重启之后能收到Publisher  在自己挂掉这段时间内的消息。AMQ 提供了这种功能~

做法:
值需要Subscriber 的 MessageConsumer的生成方式做变动:
MessageConsumer consumer = session.createDurableSubscriber(destination, clientId) ;

对,就是指定clientId ,重启后broker根据clientId发送遗漏的消息,并且这个clientId 不能有冲突,最好使用机器的ip+port+businessID 。

请看下面这个例子:
public class Subscriber implements MessageListener {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private final CountDownLatch countDownLatch;
    public Subscriber(CountDownLatch latch) {
        countDownLatch = latch;
    }

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        System.out.println("\nWaiting to receive messages... Either waiting for END message or press Ctrl+C to exit");
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        final CountDownLatch latch = new CountDownLatch(1);

        try {

            connection = connectionFactory.createConnection();
            String clientId = System.getProperty("clientId")+"";
            connection.setClientID(clientId);

            connection.start();

            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Topic destination = session.createTopic("test-topic");

            MessageConsumer consumer = session.createDurableSubscriber(destination, clientId) ;
            consumer.setMessageListener(new Subscriber(latch));
            latch.await();
            consumer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
        finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    System.out.println("Could not close an open connection...");
                }
            }
        }
    }

    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                String text = ((TextMessage) message).getText();
                if ("END".equalsIgnoreCase(text)) {
                    System.out.println("Received END message!");
                    countDownLatch.countDown();
                }
                else {
                    System.out.println("Received message:" +text);
                }
            }
        } catch (JMSException e) {
            System.out.println("Got a JMS Exception!");
        }
    }
}



验证:1、启动Subscriber
2、启动Publisher
3、关闭Subscriber
4、使用Publisher 发消息。
5、启动Subscriber   注意:此时Subscriber 收到了消息~
0
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics