Showing posts with label streaming messages. Show all posts
Showing posts with label streaming messages. Show all posts

Tuesday, May 15, 2012

CSV to XML transformation using Camel

Lots of Projects I know needs this transformation.
CSV to XML, where you might be reading user uploaded CSV file or it can be Pipe delimited flat file and make a XML out of it.
Below is the code doing this using camel and Xstream. I am exploring more to get XML nodes to populate as headers, but not yet successful. I will update that as soon as I get the solution. till then.. this is the good code u can use.

 

package com.XYZ;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.DomDriver;

public class FileCSVtoXMLConversion {
private static XStream xStream;
public static void main(String args[]) throws Exception {
// create CamelContext
CamelContext context = new DefaultCamelContext();
xStream = new XStream(new DomDriver());
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
public void configure() {
from("
file://Some/TEST?fileName=ArticleTest.csv").
unmarshal().
csv().
process(new Processor() {
  @Override
  public void process(Exchange exchange) throws Exception {
    String xmlConverted = xStream.toXML(exchange.getIn().getBody());
    exchange.getIn().setBody(xmlConverted);
  }
 }
).to("
file://Some/TESTOUT?fileName=ArticleTest.XML");
}
});
// start the route and let it do its work
context.start();
Thread.sleep(10000);
// stop the CamelContext
context.stop();
}
}

Wednesday, January 18, 2012

Streaming Messaging in ActiveMQ

Many examples on the Web floating around with a copied code snap from apache site. But here is the code which actually sends a Stream Message and More.. it also receives it with MessageListener.

Please let me know suggestions, improvements and questions if you have. thanks.

package com.xyz;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.log4j.Logger;

import com.xyz.AMQConnectorException;
public class ReadFileAndSendStream {
 static ActiveMQDirectConnector connector = null;
 static ActiveMQConnection connection = null;
 static Session session = null;
 static Destination sendToQueue = null;
 static int ch;
 static Map<String, FileOutputStream> map = new HashMap<String, FileOutputStream>();
 private static Logger log = Logger.getLogger(ReadFileAndSendStream.class);
 public static void main(String args[]){

  try {
   connector = new ActiveMQDirectConnector("tcp://Server.company.com:<PORT>");
   connection =   connector.getConnectionDetails().getConnection();
  
   session = connector.getConnectionDetails().getSession();
   // Reading a File and sending the stream to ActiveMQ queue
   sendToQueue = session.createQueue("AAD.STREAM.TEST");
  
   /*********  Sending a huge huge huge message to ActiveMQ Queue **************/
   log.debug("Start time to Send  ::  " + System.currentTimeMillis());
   File file = new File("C:/access1.log");
   DataOutputStream out2 =  new DataOutputStream(connection.createOutputStream(sendToQueue));
      FileInputStream fin = null;
      try {
        fin = new FileInputStream(file);
        while ((ch = fin.read()) != -1)
         out2.write((char) ch);
         fin.close();
      } catch (Exception e) {
       e.printStackTrace();
      }
   out2.close();
   session.commit();
   Thread.sleep(10000);
   log.debug("End time to Send  ::  " + System.currentTimeMillis());
 
   /********* I should write a consumer ***********/
  
   log.debug("Start time to read  ::  " + System.currentTimeMillis());
  
 
   MessageConsumer consumer = session.createConsumer(sendToQueue);
   MyConsumer myConsumer = new MyConsumer();
   consumer.setMessageListener(myConsumer);
   connection.setExceptionListener(myConsumer);
  
   log.debug("End time to read  ::  " + System.currentTimeMillis());
  
   /*********  Reading a huge huge huge message from ActiveMQ Queue
  
   DataInputStream in = new DataInputStream(connection.createInputStream(sendToQueue));
   FileWriter fstream = new FileWriter("C:/out.txt");
   BufferedWriter out = new BufferedWriter(fstream);
    try {
         while ((ch = in.read()) != -1)
        out.write((char) ch);
          in.close();
       } catch (Exception e) {
        e.printStackTrace();
       }
     out.close();
     **************/
  } catch (AMQConnectorException e) {
   e.printStackTrace();
  }catch (JMSException e) {
   e.printStackTrace();
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 
 }

 private static class MyConsumer implements MessageListener, ExceptionListener{

  @Override
   public void onMessage(Message message) {
    String messageId;
    FileOutputStream fos = null;

    if (message instanceof BytesMessage) {
    try {
     messageId = ((ActiveMQBytesMessage)message).getGroupID();
   
     if(!map.containsKey(messageId)) {
      log.debug("This is the message ID:" + messageId);
      File outFile = new File("C:/outListen.txt");
      fos = new FileOutputStream(outFile);
      map.put(messageId, fos);
     } else {
      fos = (FileOutputStream) map.get(messageId);
     }
     ByteSequence bs = ((ActiveMQBytesMessage)message).getContent();
     fos.write(bs.getData());
    
     log.debug("getGroupSequence" +((ActiveMQBytesMessage)message).getGroupSequence());
    }catch (IOException e) {
     e.printStackTrace();
    } finally {
   
    }
    }else
    {
     try {
      log.debug("This is the message type:" + message.getJMSType() + "\n\n" + message);
      fos = (FileOutputStream) map.remove(((ActiveMQMessage)message).getGroupID() );

      if (fos != null)
      {
       log.debug("Closing File");
       fos.close();
      }
      } catch (JMSException e) {
       e.printStackTrace();
      } catch (IOException e) {
       e.printStackTrace();
      } finally {
       try {
        session.commit();
       } catch (JMSException e) {
        e.printStackTrace();
       }
      }

    }
   }
   @Override
   public void onException(JMSException ex) {
    System.out.println(ex);
    ex.printStackTrace();
   
   }
 }
}