Thursday, January 19, 2012

Handling Big Messages with MQ (MQSeries)

Its very hard to handle big messages, especially more than 3-4 MB. You will receive a JMSException after certain limit. But MQ gave us a concept of grouping the messages if required. Only problem is, if you have a Single huge file, how you can devide it, send it and receive it in proper sequence.

Below code exactly does the same thing. Hope it will help you. Please let me know sugestions and changes if required.

package com.xyz;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;
import com.ibm.mq.jms.JMSC;
import com.ibm.mq.jms.MQConnectionFactory;
import com.ibm.mq.jms.MQQueue;

public class MQMessageSegments {
private static Logger log = Logger.getLogger(MQMessageSegments.class);

 static MQDirectConnector connector = null;
 static Connection connection = null;
 static Session session = null;
 static Queue sendToQueue = null;
 static int ch;
 static Map<String, FileOutputStream> map = new HashMap<String, FileOutputStream>();
 private static MQConnectionFactory factory = null;
 private static String hostName = "localhost.homedepot.com";
 private static int port = 1414;
 private static String queueManagerName = "COM.AAD.LCL.TST";
 private static String channelName = "SYSTEM.DEF.SVRCONN";
 private static String localQueueName = "AAD.LOCAL.QUEUE" ;

 static Connection conn2 = null;

 static Session session2 = null;
 public static void main(String args[]){
 
  try {
   log.debug("Connecting to Local Queue Manager - Start");
    factory = new MQConnectionFactory();
    factory.setHostName(hostName);
    factory.setPort(port);
    factory.setQueueManager(queueManagerName);
    factory.setChannel(channelName);

    connection = factory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
   
    MQQueue destination = new MQQueue(localQueueName);
    destination.setTargetClient(JMSC.MQJMS_CLIENT_NONJMS_MQ);
   
    MessageProducer producer =  session.createProducer(destination);
   log.debug("Connecting to Local Queue Manager - End");
   /************* Here Starts Sending Part ********/
   log.debug("Creating Messages and Sending it across - Start");
    StringBuffer fileBuffer = new StringBuffer();
    String groupId = "ID:" + new BigInteger(24 * 8, new Random()).toString(10);
    long highst = 1000000; // Number of Characters, approximately 1.1 MB
    int i = 0;
    int sequence = 1;
    File file = new File("C:/access1.log");
    FileInputStream fin =  new FileInputStream(file);
    while ((ch = fin.read()) != -1){
     fileBuffer.append((char) ch);
     i++;
     if(i >= highst){
      TextMessage message = session.createTextMessage();
      message.setStringProperty("JMSXGroupID", groupId);
      message.setIntProperty("JMSXGroupSeq", sequence);
      message.setText(fileBuffer.toString());
      producer.send(message);
      log.debug("sequence Number "+ sequence + " Processed");
      sequence++;
      i = 0;
      fileBuffer = null;
      fileBuffer = new StringBuffer();
     }
    }
   if(fileBuffer != null &&
     fileBuffer.toString().length() > 0){
    TextMessage message = session.createTextMessage();
    message.setStringProperty("JMSXGroupID", groupId);
    message.setIntProperty("JMSXGroupSeq", sequence);
    message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true);
    message.setText(fileBuffer.toString());
    producer.send(message);
    log.debug("sequence Number "+ sequence + " Processed as Last Mgs in Grp" );
   }
   connection.close();
   session.close();
   log.debug("Creating Messages and Sending it across - End");
   /************* Here Ends Sending Part ********/
   Thread.sleep(10000); // Just waiting for sometime before receiving starts
   /************* Here Starts Receiving Logic Part ********/
   log.debug("Receiving Logic - Start");
   conn2 = factory.createConnection();
   conn2.start();
   session2 = conn2.createSession(true, Session.AUTO_ACKNOWLEDGE);
   // First trick is to have selector in Consumer to receive last message only
   MessageConsumer consumer = session2.createConsumer(destination, "JMS_IBM_Last_Msg_In_Group=TRUE");
   GroupMessageConsumer grpConsumer = new GroupMessageConsumer();
   consumer.setMessageListener(grpConsumer);
   conn2.setExceptionListener(grpConsumer);
   Thread.sleep(10000); // Just waiting for sometime so that Consumer can listen the message
   log.debug("Receiving Logic - End");
  }catch(Exception exe){
   exe.printStackTrace();
  }finally{
   try {
   
    connection.close();
    conn2.close();
   } catch (JMSException e) {
    e.printStackTrace();
   }
  
  }
 }

 private static class GroupMessageConsumer implements MessageListener, ExceptionListener{

  @Override
  public void onMessage(Message lastMessage) {
   log.debug("lastMessage received - Start");
   FileOutputStream fos = null;
   Session consumerSession = null;
   Connection consumerConnection = null;
   try {
    consumerConnection = factory.createConnection();
    consumerConnection.start();
    consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    MQQueue destination = new MQQueue(localQueueName);
    int groupSize = lastMessage.getIntProperty("JMSXGroupSeq");
    String groupId = lastMessage.getStringProperty("JMSXGroupID");
   
    boolean failed = false;
    File outFile = new File("C:/outListen.txt");
    fos = new FileOutputStream(outFile);
    log.debug("Reading other messages in Sequence - Start");
    for (int i = 1; (i < groupSize) && !failed; i++) {
     MessageConsumer consumer = consumerSession.createConsumer(destination,
           "JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i);
           TextMessage message = (TextMessage) consumer.receiveNoWait();
           log.debug("Reading Sequence "+ i);
           if (message != null) {
            fos.write((message.getText()).getBytes());
           } else {
            failed = true;
           }
           consumer.close();
    }
    log.debug("Reading Last Message in group ");
    fos.write(((TextMessage) lastMessage).getText().getBytes());
    log.debug("Reading other messages in Sequence - End");
    fos.close();
    if (failed) {
     log.debug("Message Not Processed");
       } else {
          log.debug("Message Processed");
       }
   } catch (JMSException e) {
    e.printStackTrace();
   } catch (FileNotFoundException e) {
    e.printStackTrace();
   } catch (IOException e) {
    e.printStackTrace();
   } finally {
    try {
     session2.commit();
     consumerSession.close();
     consumerConnection.close();
    } catch (JMSException e) {
     e.printStackTrace();
    }
   }
  
  
  }

  @Override
  public void onException(JMSException e) {
   e.printStackTrace();
  
  }
 
 }
}

No comments:

Post a Comment