RocketMQ--(二)HelloWord
package test1; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws MQClientException { DefaultMQProducer producer=new DefaultMQProducer("quickstart_producer"); producer.setNamesrvAddr("192.168.0.121:9876;192.168.0.122:9876"); producer.start(); for (int i = 0; i < 100; i++) { try { Message msg=new Message("TopicQuickStart","TagA",("Hello RocketMQ"+i).getBytes()); SendResult sendResult=producer.send(msg); System.out.println(sendResult); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } producer.shutdown(); } }
package test1; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("quickstart_consumer"); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setNamesrvAddr("192.168.0.121:9876;192.168.0.122:9876"); consumer.subscribe("TopicQuickStart","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for(MessageExt msg:msgs){ String topic=msg.getTopic(); String tags=msg.getTags(); String msgBody=new String(msg.getBody(),"utf-8"); System.out.println(topic+" -- "+tags+" -- "+msgBody); } } catch (Exception e) { return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("Consumer Started"); } }