Many examples on the Web floating around with a copied code snap from apache site. But here is the code which actually sends a Stream Message and More.. it also receives it with MessageListener.
Please let me know suggestions, improvements and questions if you have. thanks.
package com.xyz;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.log4j.Logger;
import com.xyz.AMQConnectorException;
public class ReadFileAndSendStream {
static ActiveMQDirectConnector connector = null;
static ActiveMQConnection connection = null;
static Session session = null;
static Destination sendToQueue = null;
static int ch;
static Map<String, FileOutputStream> map = new HashMap<String, FileOutputStream>();
private static Logger log = Logger.getLogger(ReadFileAndSendStream.class);
public static void main(String args[]){
try {
connector = new ActiveMQDirectConnector("tcp://Server.company.com:<PORT>");
connection = connector.getConnectionDetails().getConnection();
session = connector.getConnectionDetails().getSession();
// Reading a File and sending the stream to ActiveMQ queue
sendToQueue = session.createQueue("AAD.STREAM.TEST");
/********* Sending a huge huge huge message to ActiveMQ Queue **************/
log.debug("Start time to Send :: " + System.currentTimeMillis());
File file = new File("C:/access1.log");
DataOutputStream out2 = new DataOutputStream(connection.createOutputStream(sendToQueue));
FileInputStream fin = null;
try {
fin = new FileInputStream(file);
while ((ch = fin.read()) != -1)
out2.write((char) ch);
fin.close();
} catch (Exception e) {
e.printStackTrace();
}
out2.close();
session.commit();
Thread.sleep(10000);
log.debug("End time to Send :: " + System.currentTimeMillis());
/********* I should write a consumer ***********/
log.debug("Start time to read :: " + System.currentTimeMillis());
MessageConsumer consumer = session.createConsumer(sendToQueue);
MyConsumer myConsumer = new MyConsumer();
consumer.setMessageListener(myConsumer);
connection.setExceptionListener(myConsumer);
log.debug("End time to read :: " + System.currentTimeMillis());
/********* Reading a huge huge huge message from ActiveMQ Queue
DataInputStream in = new DataInputStream(connection.createInputStream(sendToQueue));
FileWriter fstream = new FileWriter("C:/out.txt");
BufferedWriter out = new BufferedWriter(fstream);
try {
while ((ch = in.read()) != -1)
out.write((char) ch);
in.close();
} catch (Exception e) {
e.printStackTrace();
}
out.close();
**************/
} catch (AMQConnectorException e) {
e.printStackTrace();
}catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static class MyConsumer implements MessageListener, ExceptionListener{
@Override
public void onMessage(Message message) {
String messageId;
FileOutputStream fos = null;
if (message instanceof BytesMessage) {
try {
messageId = ((ActiveMQBytesMessage)message).getGroupID();
if(!map.containsKey(messageId)) {
log.debug("This is the message ID:" + messageId);
File outFile = new File("C:/outListen.txt");
fos = new FileOutputStream(outFile);
map.put(messageId, fos);
} else {
fos = (FileOutputStream) map.get(messageId);
}
ByteSequence bs = ((ActiveMQBytesMessage)message).getContent();
fos.write(bs.getData());
log.debug("getGroupSequence" +((ActiveMQBytesMessage)message).getGroupSequence());
}catch (IOException e) {
e.printStackTrace();
} finally {
}
}else
{
try {
log.debug("This is the message type:" + message.getJMSType() + "\n\n" + message);
fos = (FileOutputStream) map.remove(((ActiveMQMessage)message).getGroupID() );
if (fos != null)
{
log.debug("Closing File");
fos.close();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
@Override
public void onException(JMSException ex) {
System.out.println(ex);
ex.printStackTrace();
}
}
}
Please let me know suggestions, improvements and questions if you have. thanks.
package com.xyz;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.log4j.Logger;
import com.xyz.AMQConnectorException;
public class ReadFileAndSendStream {
static ActiveMQDirectConnector connector = null;
static ActiveMQConnection connection = null;
static Session session = null;
static Destination sendToQueue = null;
static int ch;
static Map<String, FileOutputStream> map = new HashMap<String, FileOutputStream>();
private static Logger log = Logger.getLogger(ReadFileAndSendStream.class);
public static void main(String args[]){
try {
connector = new ActiveMQDirectConnector("tcp://Server.company.com:<PORT>");
connection = connector.getConnectionDetails().getConnection();
session = connector.getConnectionDetails().getSession();
// Reading a File and sending the stream to ActiveMQ queue
sendToQueue = session.createQueue("AAD.STREAM.TEST");
/********* Sending a huge huge huge message to ActiveMQ Queue **************/
log.debug("Start time to Send :: " + System.currentTimeMillis());
File file = new File("C:/access1.log");
DataOutputStream out2 = new DataOutputStream(connection.createOutputStream(sendToQueue));
FileInputStream fin = null;
try {
fin = new FileInputStream(file);
while ((ch = fin.read()) != -1)
out2.write((char) ch);
fin.close();
} catch (Exception e) {
e.printStackTrace();
}
out2.close();
session.commit();
Thread.sleep(10000);
log.debug("End time to Send :: " + System.currentTimeMillis());
/********* I should write a consumer ***********/
log.debug("Start time to read :: " + System.currentTimeMillis());
MessageConsumer consumer = session.createConsumer(sendToQueue);
MyConsumer myConsumer = new MyConsumer();
consumer.setMessageListener(myConsumer);
connection.setExceptionListener(myConsumer);
log.debug("End time to read :: " + System.currentTimeMillis());
/********* Reading a huge huge huge message from ActiveMQ Queue
DataInputStream in = new DataInputStream(connection.createInputStream(sendToQueue));
FileWriter fstream = new FileWriter("C:/out.txt");
BufferedWriter out = new BufferedWriter(fstream);
try {
while ((ch = in.read()) != -1)
out.write((char) ch);
in.close();
} catch (Exception e) {
e.printStackTrace();
}
out.close();
**************/
} catch (AMQConnectorException e) {
e.printStackTrace();
}catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static class MyConsumer implements MessageListener, ExceptionListener{
@Override
public void onMessage(Message message) {
String messageId;
FileOutputStream fos = null;
if (message instanceof BytesMessage) {
try {
messageId = ((ActiveMQBytesMessage)message).getGroupID();
if(!map.containsKey(messageId)) {
log.debug("This is the message ID:" + messageId);
File outFile = new File("C:/outListen.txt");
fos = new FileOutputStream(outFile);
map.put(messageId, fos);
} else {
fos = (FileOutputStream) map.get(messageId);
}
ByteSequence bs = ((ActiveMQBytesMessage)message).getContent();
fos.write(bs.getData());
log.debug("getGroupSequence" +((ActiveMQBytesMessage)message).getGroupSequence());
}catch (IOException e) {
e.printStackTrace();
} finally {
}
}else
{
try {
log.debug("This is the message type:" + message.getJMSType() + "\n\n" + message);
fos = (FileOutputStream) map.remove(((ActiveMQMessage)message).getGroupID() );
if (fos != null)
{
log.debug("Closing File");
fos.close();
}
} catch (JMSException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
@Override
public void onException(JMSException ex) {
System.out.println(ex);
ex.printStackTrace();
}
}
}
No comments:
Post a Comment