IBM MQ消息监听器

问题描述:

嗨有谁知道如何使用IBM MQ创建消息监听器?我知道如何使用JMS规范来做到这一点,但我不确定如何为IBM MQ做到这一点。任何链接或指针非常感谢。IBM MQ消息监听器

+0

如果你知道如何做到这一点的JMS,为什么你需要使它具体化MQ? – skaffman 2009-10-28 09:42:31

+0

嘿,你能告诉我如何使用JMS连接到IBM MQ,就像我想知道如何指定队列管理器,通道等 – 2010-02-24 06:19:23

看一看IBM的帮助:Writing WebSphere MQ base Java applications

IBM具有与队列交互的API。以下是他们的示例:

import com.ibm.mq.*;   // Include the WebSphere MQ classes for Java package 


public class MQSample 
{ 
    private String qManager = "your_Q_manager"; // define name of queue 
               // manager to connect to. 
    private MQQueueManager qMgr;     // define a queue manager 
               // object 
    public static void main(String args[]) { 
    new MQSample(); 
    } 

    public MQSample() { 
    try { 

     // Create a connection to the queue manager 

     qMgr = new MQQueueManager(qManager); 

     // Set up the options on the queue we wish to open... 
     // Note. All WebSphere MQ Options are prefixed with MQC in Java. 

     int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | 
         MQC.MQOO_OUTPUT ; 

     // Now specify the queue that we wish to open, 
     // and the open options... 

     MQQueue system_default_local_queue = 
       qMgr.accessQueue("SYSTEM.DEFAULT.LOCAL.QUEUE", 
           openOptions); 

     // Define a simple WebSphere MQ message, and write some text in UTF format.. 

     MQMessage hello_world = new MQMessage(); 
     hello_world.writeUTF("Hello World!"); 

     // specify the message options... 

     MQPutMessageOptions pmo = new MQPutMessageOptions(); // accept the // defaults, 
                  // same as MQPMO_DEFAULT 

     // put the message on the queue 

     system_default_local_queue.put(hello_world,pmo); 

     // get the message back again... 
     // First define a WebSphere MQ message buffer to receive the message into.. 

     MQMessage retrievedMessage = new MQMessage(); 
     retrievedMessage.messageId = hello_world.messageId; 

     // Set the get message options... 

     MQGetMessageOptions gmo = new MQGetMessageOptions(); // accept the defaults 
                  // same as MQGMO_DEFAULT 
     // get the message off the queue... 

     system_default_local_queue.get(retrievedMessage, gmo); 

     // And prove we have the message by displaying the UTF message text 

     String msgText = retrievedMessage.readUTF(); 
     System.out.println("The message is: " + msgText); 
     // Close the queue... 
     system_default_local_queue.close(); 
     // Disconnect from the queue manager 

     qMgr.disconnect(); 
    } 
     // If an error has occurred in the above, try to identify what went wrong 
     // Was it a WebSphere MQ error? 
    catch (MQException ex) 
    { 
     System.out.println("A WebSphere MQ error occurred : Completion code " + 
         ex.completionCode + " Reason code " + ex.reasonCode); 
    } 
     // Was it a Java buffer space error? 
    catch (java.io.IOException ex) 
    { 
     System.out.println("An error occurred whilst writing to the message buffer: " + ex); 
    } 
    } 
} // end of sample 

我不确定IBM jar是否位于基本Maven回购站。我知道在过去,我必须从本地IBM安装中提取它们,并将它们放入本地SVN回购。我正在使用以下罐子:

<dependency> 
    <groupId>com.ibm</groupId> 
    <artifactId>com.ibm.mq</artifactId> 
    <version>5.3.00</version> 
    <scope>compile</scope> 
</dependency> 
    <dependency> 
    <groupId>com.ibm</groupId> 
    <artifactId>com.ibm.mq.pcf</artifactId> 
    <version>5.3.00</version> 
    <scope>compile</scope> 
</dependency> 
    <dependency> 
    <groupId>com.ibm</groupId> 
    <artifactId>com.ibm.mqbind</artifactId> 
    <version>5.3.00</version> 
    <scope>compile</scope> 
</dependency> 
    <dependency> 
    <groupId>com.ibm</groupId> 
    <artifactId>com.ibm.mqjms</artifactId> 
    <version>5.3.00</version> 
    <scope>compile</scope> 
</dependency> 
+0

嗨,谢谢你的回应。我已经检查过这些网站,但我仍然没有找到使用异步消息监听器的示例。有任何想法吗?谢谢。 – x1a0 2009-10-06 15:50:02

+0

我们创建一个线程,该线程在位于队列管理器中的队列上每隔X秒查找一次消息。另外,如果这是您真正想要的,您可能需要编辑原始问题。 – Droo 2009-10-07 03:39:51

查看上面提供的示例。

具体的线路

MQGetMessageOptions gmo = new MQGetMessageOptions();  
system_default_local_queue.get(retrievedMessage, gmo); 

您可以配置得到抛出一个异常MQRC_NO_MSG_AVAILABLE之前等待指定的时间。或者你可以永远等待。

gmo.waitInterval= qTimeout; 
gmo.options = MQC.MQGMO_WAIT 

因此,你可以创建一个线程,不断寻找新的消息,然后将它们传递给处理程序。获取和放置不需要处于同一个线程甚至应用程序中。

我希望这有助于回答你的问题。

尽管前面的响应者提到了WMQ Java API,但WMQ也支持JMS,因此这里有一些资源可以帮助您开始使用。

看看这篇文章:IBM WebSphere Developer Technical Journal: Running a standalone Java application on WebSphere MQ V6.0

而且,如果你已经安装了完整的WMQ客户,而不仅仅是抓住罐子,那么你将有很多的安装示例代码。默认情况下,这些文件将位于C:\ Program Files \ IBM \ WebSphere MQ \ tools \ jms或/ opt/mqm/samp中,具体取决于您的平台。

如果您需要WMQ客户端安装介质,请获取它here。请注意,这是WMQ v7客户端,而不是v6客户端。它与v6 QMgr兼容,但自从v6到2011年9月的报废之后,您应该在v7客户端上进行新的开发,并且如果可能的话,还需要v7 QMgr。如果双方都是v7,有很多功能和性能增强。

如果您需要,您可以获得产品手册here

最后,请确定何时得到JMS异常以打印链接的异常。这不是一个WMQ的东西,而是一个JMS的东西。 Sun为JMS异常提供了一个多层次的数据结构,真正有趣的部分通常是嵌套的层次。这不是什么大不了的事,可在几行来实现:

try { 
    . 
    . code that might throw a JMSException 
    . 
} catch (JMSException je) { 
    System.err.println("caught "+je); 
    Exception e = je.getLinkedException(); 
    if (e != null) { 
    System.err.println("linked exception: "+e); 
    } else { 
    System.err.println("No linked exception found."); 
    } 
} 

这有助于确定一个JMS误差与输送误差之间的差异。例如,JMS安全错误可能是WMQ 2035,也可能是JSSE配置,或者应用程序可能无法访问文件系统中的某些内容。这些值中只有一个值得花费大量的时间来挖掘WMQ错误日志,并且只有通过打印链接的异常,您才能分辨出是否是那一个。

除现有答案外,还有一个重要的观点:JMS提供了MessageListener,它允许您以异步回调的形式接收消息。

本地API有没有等效的功能!必须适当地重复拨打get(...)

在循环

得到消息之前,您可以指定如下

gmo.options = MQC.MQGMO_WAIT 
gmo.waitInterval = MQConstants.MQWI_UNLIMITED; 

这使得循环会等到有队列中的消息。 对我来说,这是以防万一有人类似于MessageListerner

将谷歌计算器的MQ监听像我一样...... 这可能是没有答案的,由于JMS实现,但是这就是我一直在寻找。 事情是这样的:

MQQueueConnectionFactory cf = new MQQueueConnectionFactory(); 
MQQueueConnection conn = (MQQueueConnection)cf.createQueueConnection(); 
MQQueueSession session = (MQQueueSession)conn.createSession(false, 1); 

Queue queue = session.createQueue("QUEUE"); 

MQQueueReceiver receiver = (MQQueueReceiver)session.createReceiver(queue); 

receiver.setMessageListener(new YourListener()); 

conn.start(); 

YourListener应该实现MessageListener接口,您将收到你的邮件进入的onMessage(信息MSG)方法。

您好,这里是IBM MQ的消息监听器的工作示例。在这里,我用春天也创造豆类等...

package queue.app; 

import javax.annotation.PostConstruct; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.Queue; 
import javax.jms.QueueConnection; 
import javax.jms.QueueReceiver; 
import javax.jms.QueueSession; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.log4j.Logger; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.stereotype.Component; 

import com.ibm.mq.jms.MQQueue; 
import com.ibm.mq.jms.MQQueueConnectionFactory; 
import com.ibm.msg.client.wmq.WMQConstants; 


@Component 
public class QueueConsumer implements MessageListener{ 

    private Logger logger = Logger.getLogger(getClass()); 

    MQQueueConnectionFactory qcf = new MQQueueConnectionFactory(); 
    QueueConnection qc; 
    Queue queue; 
    QueueSession queueSession; 
    QueueReceiver qr; 

    @Value("${jms.hostName}") 
    String jmsHost; 
    @Value("${jms.port}") 
    String jmsPort; 
    @Value("${jms.queue.name}") 
    String QUEUE_NAME; 
    @Value("${jms.queueManager}") 
    String jmsQueueMgr; 
    @Value("${jms.username}") 
    String jmsUserName; 
    @Value("${jms.channel}") 
    String jmsChannel; 

    @PostConstruct 
    public void init() throws Exception{ 
     qcf.setHostName (jmsHost); 
     qcf.setPort (Integer.parseInt(jmsPort)); 
     qcf.setQueueManager (jmsQueueMgr); 
     qcf.setChannel (jmsChannel); 
     qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT); 
     qc = qcf.createQueueConnection(); 

     queue = new MQQueue(QUEUE_NAME); 
     qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); 
     queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE); 
     qr = queueSession.createReceiver(queue); 
     qr.setMessageListener(this); 
     qc.start(); 

    } 


    @Override 
    public void onMessage(Message message) { 
     logger.info("Inside On Message..."); 
     long t1 = System.currentTimeMillis(); 
     logger.info("Message consumed at ...."+t1); 

     try{ 
      if(message instanceof TextMessage) { 
       logger.info("String message recieved >> "+((TextMessage) message).getText()); 
      } 

     }catch(Exception e){ 
      e.printStackTrace(); 
     } 

    } 
} 

下面是依赖我有...

<dependency> 
      <groupId>com.sun.messaging.mq</groupId> 
      <artifactId>fscontext</artifactId> 
      <version>4.2</version> 
      <scope>test</scope> 
     </dependency> 

     <dependency> 
      <groupId>com.ibm</groupId> 
      <artifactId>jms</artifactId> 
      <version>1.0</version> 
     </dependency> 

     <dependency> 
      <groupId>org.springframework</groupId> 
      <artifactId>spring-jms</artifactId> 
      <version>3.2.17.RELEASE</version> 
     </dependency> 


     <dependency> 
      <groupId>com.ibm</groupId> 
      <artifactId>com.ibm.mq</artifactId> 
      <version>1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.ibm</groupId> 
      <artifactId>com.ibm.mq.allclient</artifactId> 
      <version>1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.ibm</groupId> 
      <artifactId>com.ibm.mq.jmqi</artifactId> 
      <version>1.0</version> 
     </dependency> 
     <dependency> 
      <groupId>com.ibm</groupId> 
      <artifactId>com.ibm.mqjms</artifactId> 
      <version>1.0</version> 
     </dependency>