rocketmq学习笔记 ---- Hello world!
RocketMQ的集群部署机构如图所示:
在环境搭建过程中,启动了mqnamesrv以及broker,这就是RocketMQ集群中的服务发现以及消息存储的模块,程序中只需要实现Producer以及Consumer即可。
Producer模拟发消息功能:
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group");
producer.setNamesrvAddr("10.9.200.245:9876");
producer.start();
for (int i = 0; i < 5; i++) {
Message msg = new Message("orders", "order".concat(String.valueOf(i)).getBytes());
SendResult result = producer.send(msg);
System.out.println(msg + " send out...");
System.out.println(result);
Thread.sleep(500);
}
producer.shutdown();
}
}
Consumer接收消息:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumers");
consumer.setNamesrvAddr("10.9.200.245:9876");
consumer.subscribe("orders", (String)null);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
System.out.println(new String(msg.getBody()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
Producer以及Consumer输出:
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C17FBB0000, WAIT=true}, body=[111, 114, 100, 101, 114, 48], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C17FBB0000, offsetMsgId=0A09C8F500002A9F0000000000009AAA, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=0], queueOffset=63]
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C181C50001, WAIT=true}, body=[111, 114, 100, 101, 114, 49], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C181C50001, offsetMsgId=0A09C8F500002A9F0000000000009B45, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=1], queueOffset=63]
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C183BD0002, WAIT=true}, body=[111, 114, 100, 101, 114, 50], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C183BD0002, offsetMsgId=0A09C8F500002A9F0000000000009BE0, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=2], queueOffset=64]
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C185BC0003, WAIT=true}, body=[111, 114, 100, 101, 114, 51], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C185BC0003, offsetMsgId=0A09C8F500002A9F0000000000009C7B, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=3], queueOffset=64]
Message{topic='orders', flag=0, properties={UNIQ_KEY=0A09C86813F018B4AAC250C187B50004, WAIT=true}, body=[111, 114, 100, 101, 114, 52], transactionId='null'} send out...
SendResult [sendStatus=SEND_OK, msgId=0A09C86813F018B4AAC250C187B50004, offsetMsgId=0A09C8F500002A9F0000000000009D16, messageQueue=MessageQueue [topic=orders, brokerName=broker-a, queueId=0], queueOffset=64]
order0
order1
order2
order3
order4