Tuesday, January 31, 2012

MQ Queue Manager: if you can directly access MQQueueManager object in your working
Environment, following is the way to create the Object directly.


MQEnvironment.hostname = "yourhost.yourcompany.com";
MQEnvironment.port = CLIENT_PORT; // Mention int port here
MQEnvironment.channel = CLIENT_CHANNEL; // Menion Server connection channel here

/* Mention queueManagerName here */
 MQQueueManagerqueueManager = new MQQueueManager(queueManagerName);

Thats it. hostname, port , channel and queueManagerName can connect you to your queue Manager.

And yes. dont forget to import this:

import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQQueueManager;

Thursday, January 19, 2012

Handling Big Messages with MQ (MQSeries)

Its very hard to handle big messages, especially more than 3-4 MB. You will receive a JMSException after certain limit. But MQ gave us a concept of grouping the messages if required. Only problem is, if you have a Single huge file, how you can devide it, send it and receive it in proper sequence.

Below code exactly does the same thing. Hope it will help you. Please let me know sugestions and changes if required.

package com.xyz;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import com.ibm.mq.jms.JMSC;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.jms.MQQueue;

public class MQMessageSegments {
private static Logger log = Logger.getLogger(MQMessageSegments.class);

 static MQDirectConnector connector = null;
 static Connection connection = null;
 static Session session = null;
 static Queue sendToQueue = null;
 static int ch;
 static Map<String, FileOutputStream> map = new HashMap<String, FileOutputStream>();
 private static MQConnectionFactory factory = null;
 private static String hostName = "localhost.homedepot.com";
 private static int port = 1414;
 private static String queueManagerName = "COM.AAD.LCL.TST";
 private static String channelName = "SYSTEM.DEF.SVRCONN";
 private static String localQueueName = "AAD.LOCAL.QUEUE" ;

 static Connection conn2 = null;

 static Session session2 = null;
 public static void main(String args[]){
 
  try {
   log.debug("Connecting to Local Queue Manager - Start");
    factory = new MQConnectionFactory();
    factory.setHostName(hostName);
    factory.setPort(port);
    factory.setQueueManager(queueManagerName);
    factory.setChannel(channelName);

    connection = factory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   
    MQQueue destination = new MQQueue(localQueueName);
    destination.setTargetClient(JMSC.MQJMS_CLIENT_NONJMS_MQ);
   
    MessageProducer producer =  session.createProducer(destination);
   log.debug("Connecting to Local Queue Manager - End");
   /************* Here Starts Sending Part ********/
   log.debug("Creating Messages and Sending it across - Start");
    StringBuffer fileBuffer = new StringBuffer();
    String groupId = "ID:" + new BigInteger(24 * 8, new Random()).toString(10);
    long highst = 1000000; // Number of Characters, approximately 1.1 MB
    int i = 0;
    int sequence = 1;
    File file = new File("C:/access1.log");
    FileInputStream fin =  new FileInputStream(file);
    while ((ch = fin.read()) != -1){
     fileBuffer.append((char) ch);
     i++;
     if(i >= highst){
      TextMessage message = session.createTextMessage();
      message.setStringProperty("JMSXGroupID", groupId);
      message.setIntProperty("JMSXGroupSeq", sequence);
      message.setText(fileBuffer.toString());
      producer.send(message);
      log.debug("sequence Number "+ sequence + " Processed");
      sequence++;
      i = 0;
      fileBuffer = null;
      fileBuffer = new StringBuffer();
     }
    }
   if(fileBuffer != null &&
     fileBuffer.toString().length() > 0){
    TextMessage message = session.createTextMessage();
    message.setStringProperty("JMSXGroupID", groupId);
    message.setIntProperty("JMSXGroupSeq", sequence);
    message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
    message.setText(fileBuffer.toString());
    producer.send(message);
    log.debug("sequence Number "+ sequence + " Processed as Last Mgs in Grp" );
   }
   connection.close();
   session.close();
   log.debug("Creating Messages and Sending it across - End");
   /************* Here Ends Sending Part ********/
   Thread.sleep(10000); // Just waiting for sometime before receiving starts
   /************* Here Starts Receiving Logic Part ********/
   log.debug("Receiving Logic - Start");
   conn2 = factory.createConnection();
   conn2.start();
   session2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
   // First trick is to have selector in Consumer to receive last message only
   MessageConsumer consumer = session2.createConsumer(destination, "JMS_IBM_Last_Msg_In_Group=TRUE");
   GroupMessageConsumer grpConsumer = new GroupMessageConsumer();
   consumer.setMessageListener(grpConsumer);
   conn2.setExceptionListener(grpConsumer);
   Thread.sleep(10000); // Just waiting for sometime so that Consumer can listen the message
   log.debug("Receiving Logic - End");
  }catch(Exception exe){
   exe.printStackTrace();
  }finally{
   try {
   
    connection.close();
    conn2.close();
   } catch (JMSException e) {
    e.printStackTrace();
   }
  
  }
 }

 private static class GroupMessageConsumer implements MessageListener, ExceptionListener{

  @Override
  public void onMessage(Message lastMessage) {
   log.debug("lastMessage received - Start");
   FileOutputStream fos = null;
   Session consumerSession = null;
   Connection consumerConnection = null;
   try {
    consumerConnection = factory.createConnection();
    consumerConnection.start();
    consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MQQueue destination = new MQQueue(localQueueName);
    int groupSize = lastMessage.getIntProperty("JMSXGroupSeq");
    String groupId = lastMessage.getStringProperty("JMSXGroupID");
   
    boolean failed = false;
    File outFile = new File("C:/outListen.txt");
    fos = new FileOutputStream(outFile);
    log.debug("Reading other messages in Sequence - Start");
    for (int i = 1; (i < groupSize) && !failed; i++) {
     MessageConsumer consumer = consumerSession.createConsumer(destination,
           "JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i);
           TextMessage message = (TextMessage) consumer.receiveNoWait();
           log.debug("Reading Sequence "+ i);
           if (message != null) {
            fos.write((message.getText()).getBytes());
           } else {
            failed = true;
           }
           consumer.close();
    }
    log.debug("Reading Last Message in group ");
    fos.write(((TextMessage) lastMessage).getText().getBytes());
    log.debug("Reading other messages in Sequence - End");
    fos.close();
    if (failed) {
     log.debug("Message Not Processed");
       } else {
          log.debug("Message Processed");
       }
   } catch (JMSException e) {
    e.printStackTrace();
   } catch (FileNotFoundException e) {
    e.printStackTrace();
   } catch (IOException e) {
    e.printStackTrace();
   } finally {
    try {
     session2.commit();
     consumerSession.close();
     consumerConnection.close();
    } catch (JMSException e) {
     e.printStackTrace();
    }
   }
  
  
  }

  @Override
  public void onException(JMSException e) {
   e.printStackTrace();
  
  }
 
 }
}

Wednesday, January 18, 2012

Streaming Messaging in ActiveMQ

Many examples on the Web floating around with a copied code snap from apache site. But here is the code which actually sends a Stream Message and More.. it also receives it with MessageListener.

Please let me know suggestions, improvements and questions if you have. thanks.

package com.xyz;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.log4j.Logger;

import com.xyz.AMQConnectorException;
public class ReadFileAndSendStream {
 static ActiveMQDirectConnector connector = null;
 static ActiveMQConnection connection = null;
 static Session session = null;
 static Destination sendToQueue = null;
 static int ch;
 static Map<String, FileOutputStream> map = new HashMap<String, FileOutputStream>();
 private static Logger log = Logger.getLogger(ReadFileAndSendStream.class);
 public static void main(String args[]){

  try {
   connector = new ActiveMQDirectConnector("tcp://Server.company.com:<PORT>");
   connection =   connector.getConnectionDetails().getConnection();
  
   session = connector.getConnectionDetails().getSession();
   // Reading a File and sending the stream to ActiveMQ queue
   sendToQueue = session.createQueue("AAD.STREAM.TEST");
  
   /*********  Sending a huge huge huge message to ActiveMQ Queue **************/
   log.debug("Start time to Send  ::  " + System.currentTimeMillis());
   File file = new File("C:/access1.log");
   DataOutputStream out2 =  new DataOutputStream(connection.createOutputStream(sendToQueue));
      FileInputStream fin = null;
      try {
        fin = new FileInputStream(file);
        while ((ch = fin.read()) != -1)
         out2.write((char) ch);
         fin.close();
      } catch (Exception e) {
       e.printStackTrace();
      }
   out2.close();
   session.commit();
   Thread.sleep(10000);
   log.debug("End time to Send  ::  " + System.currentTimeMillis());
 
   /********* I should write a consumer ***********/
  
   log.debug("Start time to read  ::  " + System.currentTimeMillis());
  
 
   MessageConsumer consumer = session.createConsumer(sendToQueue);
   MyConsumer myConsumer = new MyConsumer();
   consumer.setMessageListener(myConsumer);
   connection.setExceptionListener(myConsumer);
  
   log.debug("End time to read  ::  " + System.currentTimeMillis());
  
   /*********  Reading a huge huge huge message from ActiveMQ Queue
  
   DataInputStream in = new DataInputStream(connection.createInputStream(sendToQueue));
   FileWriter fstream = new FileWriter("C:/out.txt");
   BufferedWriter out = new BufferedWriter(fstream);
    try {
         while ((ch = in.read()) != -1)
        out.write((char) ch);
          in.close();
       } catch (Exception e) {
        e.printStackTrace();
       }
     out.close();
     **************/
  } catch (AMQConnectorException e) {
   e.printStackTrace();
  }catch (JMSException e) {
   e.printStackTrace();
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 
 }

 private static class MyConsumer implements MessageListener, ExceptionListener{

  @Override
   public void onMessage(Message message) {
    String messageId;
    FileOutputStream fos = null;

    if (message instanceof BytesMessage) {
    try {
     messageId = ((ActiveMQBytesMessage)message).getGroupID();
   
     if(!map.containsKey(messageId)) {
      log.debug("This is the message ID:" + messageId);
      File outFile = new File("C:/outListen.txt");
      fos = new FileOutputStream(outFile);
      map.put(messageId, fos);
     } else {
      fos = (FileOutputStream) map.get(messageId);
     }
     ByteSequence bs = ((ActiveMQBytesMessage)message).getContent();
     fos.write(bs.getData());
    
     log.debug("getGroupSequence" +((ActiveMQBytesMessage)message).getGroupSequence());
    }catch (IOException e) {
     e.printStackTrace();
    } finally {
   
    }
    }else
    {
     try {
      log.debug("This is the message type:" + message.getJMSType() + "\n\n" + message);
      fos = (FileOutputStream) map.remove(((ActiveMQMessage)message).getGroupID() );

      if (fos != null)
      {
       log.debug("Closing File");
       fos.close();
      }
      } catch (JMSException e) {
       e.printStackTrace();
      } catch (IOException e) {
       e.printStackTrace();
      } finally {
       try {
        session.commit();
       } catch (JMSException e) {
        e.printStackTrace();
       }
      }

    }
   }
   @Override
   public void onException(JMSException ex) {
    System.out.println(ex);
    ex.printStackTrace();
   
   }
 }
}


Tuesday, January 17, 2012

Standalone MQ Queue Consumer Code

This is small piece of code shows how to connect to MQ with help of MQQueueConnectionFactory and without using JNDI or any context lookups.
Code will create own factory instance and assign the values.
This is easy when you are using Java standalone application without using any application server.


This class also has a consumer example for implementation.




package com.xyz;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import com.ibm.mq.jms.JMSC;
import com.ibm.mq.jms.MQQueueConnectionFactory;

public class MQDirectQueueConsumer2 {

 private String localQueueName;
 private String hostName;
 private int port;
 private String queueManagerName;
 private String channelName;

 private int sessionProperty = 0;
 private QueueConnection mqConnection = null;
 private QueueSession mqSession = null;

 private MQQueueConnectionFactory mqQCF = null;

 /**
  * MQDirectQueueConsumer2 constructor. Also calls connectToMQ() which returns QueueConnection
  * Object for further action.
  * @param hostName
  * @param port
  * @param queueManagerName
  * @param channelName
  * @param localQueueName
  * @throws Exception
  */
 public MQDirectQueueConsumer2(String hostName,
   int port,
   String queueManagerName,
   String channelName,
   String localQueueName) throws Exception {
  this.hostName = hostName;
  this.port = port;
  this.queueManagerName = queueManagerName;
  this.channelName = channelName;
  this.localQueueName = localQueueName;
  connectToMQ();

 }
 /**
  * connectToMQ() creates a connection with queue manager.
  * Make sure you have all necessary information to connect to QMGR
  * @return QueueConnection
  * @throws Exception
  */
 private QueueConnection connectToMQ() throws Exception {
  try {
   mqQCF = new MQQueueConnectionFactory();
   mqQCF.setHostName(hostName);
   mqQCF.setPort(port);
   mqQCF.setQueueManager(queueManagerName);
   mqQCF.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
   mqQCF.setChannel(channelName);
 
   mqConnection = mqQCF.createQueueConnection();
   if(sessionProperty == 0){
    mqSession = mqConnection.createQueueSession(true, Session.AUTO_ACKNOWLEDGE);
   }else{
    mqSession = mqConnection.createQueueSession(true, sessionProperty);
   }
   mqConnection.start();
  } catch (JMSException e) {
   e.printStackTrace();
   throw new Exception("Exception in connectToMQ()",e);
  }
  return mqConnection;
 }

 /**
  * startConsumer() registers and starts the consumer for localQueue passed to constructor
  * @throws Exception
  */
 public void startConsumer() throws Exception{
  if(mqConnection == null){
   connectToMQ();
  }
  try{
    Destination queueToConsume    = mqSession.createQueue(localQueueName);
    MessageConsumer consumer = mqSession.createConsumer(queueToConsume);
    DirectQueueConsumer queueConsumer = new DirectQueueConsumer();
    mqConnection.setExceptionListener(queueConsumer);
    consumer.setMessageListener(queueConsumer);
   
  }catch(Exception exe){
   throw new Exception("Exception in initialize()"+exe);
  }
 }

 /**
  * Inner Static private class DirectQueueConsumer is
  * a MessageListener and ExceptionListener for localQueue
  * received message will be passed to onMessage() method provided.
  * @author THD
  *
  */
 private static class DirectQueueConsumer implements MessageListener, ExceptionListener
 {
  @Override
  public void onMessage(Message arg0) {
   System.out.println("Received Message :"+ arg0);
   // TODO Application Logic to handle the message
 
  }
  @Override
  public void onException(JMSException arg0) {
   // TODO Application Logic to handle the message
 
  }
 }
}