Wednesday, January 18, 2012

Streaming Messaging in ActiveMQ

Many examples on the Web floating around with a copied code snap from apache site. But here is the code which actually sends a Stream Message and More.. it also receives it with MessageListener.

Please let me know suggestions, improvements and questions if you have. thanks.

package com.xyz;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.log4j.Logger;

import com.xyz.AMQConnectorException;
public class ReadFileAndSendStream {
 static ActiveMQDirectConnector connector = null;
 static ActiveMQConnection connection = null;
 static Session session = null;
 static Destination sendToQueue = null;
 static int ch;
 static Map<String, FileOutputStream> map = new HashMap<String, FileOutputStream>();
 private static Logger log = Logger.getLogger(ReadFileAndSendStream.class);
 public static void main(String args[]){

  try {
   connector = new ActiveMQDirectConnector("tcp://Server.company.com:<PORT>");
   connection =   connector.getConnectionDetails().getConnection();
  
   session = connector.getConnectionDetails().getSession();
   // Reading a File and sending the stream to ActiveMQ queue
   sendToQueue = session.createQueue("AAD.STREAM.TEST");
  
   /*********  Sending a huge huge huge message to ActiveMQ Queue **************/
   log.debug("Start time to Send  ::  " + System.currentTimeMillis());
   File file = new File("C:/access1.log");
   DataOutputStream out2 =  new DataOutputStream(connection.createOutputStream(sendToQueue));
      FileInputStream fin = null;
      try {
        fin = new FileInputStream(file);
        while ((ch = fin.read()) != -1)
         out2.write((char) ch);
         fin.close();
      } catch (Exception e) {
       e.printStackTrace();
      }
   out2.close();
   session.commit();
   Thread.sleep(10000);
   log.debug("End time to Send  ::  " + System.currentTimeMillis());
 
   /********* I should write a consumer ***********/
  
   log.debug("Start time to read  ::  " + System.currentTimeMillis());
  
 
   MessageConsumer consumer = session.createConsumer(sendToQueue);
   MyConsumer myConsumer = new MyConsumer();
   consumer.setMessageListener(myConsumer);
   connection.setExceptionListener(myConsumer);
  
   log.debug("End time to read  ::  " + System.currentTimeMillis());
  
   /*********  Reading a huge huge huge message from ActiveMQ Queue
  
   DataInputStream in = new DataInputStream(connection.createInputStream(sendToQueue));
   FileWriter fstream = new FileWriter("C:/out.txt");
   BufferedWriter out = new BufferedWriter(fstream);
    try {
         while ((ch = in.read()) != -1)
        out.write((char) ch);
          in.close();
       } catch (Exception e) {
        e.printStackTrace();
       }
     out.close();
     **************/
  } catch (AMQConnectorException e) {
   e.printStackTrace();
  }catch (JMSException e) {
   e.printStackTrace();
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 
 }

 private static class MyConsumer implements MessageListener, ExceptionListener{

  @Override
   public void onMessage(Message message) {
    String messageId;
    FileOutputStream fos = null;

    if (message instanceof BytesMessage) {
    try {
     messageId = ((ActiveMQBytesMessage)message).getGroupID();
   
     if(!map.containsKey(messageId)) {
      log.debug("This is the message ID:" + messageId);
      File outFile = new File("C:/outListen.txt");
      fos = new FileOutputStream(outFile);
      map.put(messageId, fos);
     } else {
      fos = (FileOutputStream) map.get(messageId);
     }
     ByteSequence bs = ((ActiveMQBytesMessage)message).getContent();
     fos.write(bs.getData());
    
     log.debug("getGroupSequence" +((ActiveMQBytesMessage)message).getGroupSequence());
    }catch (IOException e) {
     e.printStackTrace();
    } finally {
   
    }
    }else
    {
     try {
      log.debug("This is the message type:" + message.getJMSType() + "\n\n" + message);
      fos = (FileOutputStream) map.remove(((ActiveMQMessage)message).getGroupID() );

      if (fos != null)
      {
       log.debug("Closing File");
       fos.close();
      }
      } catch (JMSException e) {
       e.printStackTrace();
      } catch (IOException e) {
       e.printStackTrace();
      } finally {
       try {
        session.commit();
       } catch (JMSException e) {
        e.printStackTrace();
       }
      }

    }
   }
   @Override
   public void onException(JMSException ex) {
    System.out.println(ex);
    ex.printStackTrace();
   
   }
 }
}


No comments:

Post a Comment