rocketmq学习笔记 ---- Hello world!

RocketMQ的集群部署机构如图所示:

rocketmq学习笔记 ---- Hello world!

环境搭建过程中,启动了mqnamesrv以及broker,这就是RocketMQ集群中的服务发现以及消息存储的模块,程序中只需要实现Producer以及Consumer即可。

Producer模拟发消息功能:

public class Producer {

	public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		DefaultMQProducer producer = new DefaultMQProducer("group");
		producer.setNamesrvAddr("10.9.200.245:9876");
		producer.start();

		for (int i = 0; i < 5; i++) {
			Message msg = new Message("orders", "order".concat(String.valueOf(i)).getBytes());
			SendResult result = producer.send(msg);
			System.out.println(msg + " send out...");
			System.out.println(result);
			Thread.sleep(500);
		}
		producer.shutdown();
	}
	
}

Consumer接收消息:

public class Consumer {

	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumers");
		consumer.setNamesrvAddr("10.9.200.245:9876");
		
		consumer.subscribe("orders", (String)null);
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				MessageExt msg = msgs.get(0);
				System.out.println(new String(msg.getBody()));
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		consumer.start();
	}
}

Producer以及Consumer输出:

Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C17FBB0000, WAIT=true}, body=[111, 114, 100, 101, 114, 48], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C17FBB0000, offsetMsgId=0A09C8F500002A9F0000000000009AAA, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=0], queueOffset=63]
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C181C50001, WAIT=true}, body=[111, 114, 100, 101, 114, 49], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C181C50001, offsetMsgId=0A09C8F500002A9F0000000000009B45, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=1], queueOffset=63]
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C183BD0002, WAIT=true}, body=[111, 114, 100, 101, 114, 50], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C183BD0002, offsetMsgId=0A09C8F500002A9F0000000000009BE0, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=2], queueOffset=64]
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C185BC0003, WAIT=true}, body=[111, 114, 100, 101, 114, 51], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C185BC0003, offsetMsgId=0A09C8F500002A9F0000000000009C7B, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=3], queueOffset=64]
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C187B50004, WAIT=true}, body=[111, 114, 100, 101, 114, 52], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C187B50004, offsetMsgId=0A09C8F500002A9F0000000000009D16, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=0], queueOffset=64]
order0
order1
order2
order3
order4