Tuesday, May 29, 2012

Making Virtual Topic Consumer “Selector Aware”

Making Virtual Topic Consumer “Selector Aware”
<virtualDestinations>
<virtualTopic name="MF.PRICE.CHANGE.TP01" prefix="VTCON.*." selectorAware="true"/>
</virtualDestinations>

Making Virtual topics selector aware is as easy as flipping the flag to “true”.  For this example we created two consumers, one - “VTCON.GEN.MF.PRICE.CHANGE.TP01” listens to all the messages for ‘MF.PRICE.CHANGE.TP01’ topic and second – “VTCON.MKT.MF.PRICE.CHANGE.TP01" has a Selector of “Market = 100” on it.
Now, let’s send two messages, one with “market” string property with value 100 and another with value 200. In a result we should get one message in VTCON.MKT.MF.PRICE.CHANGE.TP01 as it has a Selector of “Market = 100” on it and two messages in “VTCON.GEN.MF.PRICE.CHANGE.TP01”

Java Code will look like below:

package test.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopicSelectorAwareConsumer {


static Connection connection;
static Session session;

public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory =
 new ActiveMQConnectionFactory("tcp://yourserver.com:port");

try {
 connection = connectionFactory.createConnection();
 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 // Consumer Listens to message with Market String property as 100
 String queueName = "VTCON.MKT.MF.PRICE.CHANGE.TP01";
 Destination mkt = session.createQueue(queueName);
 String selector100 = "Market = '100'";
 MessageConsumer consumer100 = session.createConsumer(mkt, selector100);
 Consumer100 listener = new VirtualTopicSelectorAwareConsumer().new Consumer100();
 consumer100.setMessageListener(listener);
 session.commit();
 // Consumer listens to all the message (Without a Selector)
 String genralConsumer = "VTCON.GEN.MF.PRICE.CHANGE.TP01";
 Destination genral = session.createQueue(genralConsumer);
 MessageConsumer genConsumer = session.createConsumer(genral);
 ConsumerNormal normal = new VirtualTopicSelectorAwareConsumer().new ConsumerNormal();
 genConsumer.setMessageListener(normal);
 session.commit();

 Topic topic = session.createTopic("MF.PRICE.CHANGE.TP01");
 MessageProducer producer = session.createProducer(topic);
 Message mktMessage = session.createTextMessage("Message for Market 100");
 mktMessage.setStringProperty("Market", "100");
 producer.send(mktMessage);

 Message mktMessage2 = session.createTextMessage("Message for Market 200");
 mktMessage.setStringProperty("Market", "200");
 producer.send(mktMessage2);
 producer.close();
 session.commit();

} catch (JMSException e) {
 e.printStackTrace();
}

}
/**
 * ConsumerNormal Listens All the messages and Log it
 */
private class ConsumerNormal implements MessageListener{

@Override
public void onMessage(Message arg0) {
 try {
  System.out.println(((TextMessage)arg0).getText());
  session.commit();
 } catch (JMSException e) {
  e.printStackTrace();
 }
}
}

/**
 * Consumer100 only listens to Market=100 messages
 */
private class Consumer100 implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
 System.out.println(((TextMessage)arg0).getText());
 session.commit();
} catch (JMSException e) {
 e.printStackTrace();
}
}
}
}

Friday, May 18, 2012

ActiveMQ Virtual Topics or Virtual Destinations

Creating Virtual Topics

Virtual Topics helps with following prospective:
1.       Load Balancing of messages
2.       Fast Failover of Subscriber
3.       Re-using same connection Factory for different Producers and Consumers. (Durable Subscribers needs a Unique JMS Client Id and same cannot be reused for any other Producer or consumer)
Steps to Define a Virtual Topic:
1.       Topic has to be created on Broker with Name: VirtualTopic.<TopicName>, However “VirtualTopic” is Out-of-the-box prefix for using this functionality.
2.       To make the change in prefix, following changes are needed.
<broker xmlns="http://activemq.apache.org/schema/core">
    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <virtualTopic name=">" prefix="VTCON.*." selectorAware="false"/>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>
In above settings two things are achieved.
A.      All Topics are made as Virtual Topic with name=">". However name field also accepts wildcard to apply different Virtual Topics policies. To change this Virtual Topic for Single Topic, just replace the “>” with actual Topic name.
<virtualDestinations>
<virtualTopic name="TEST.TP01" prefix="VTCON.*." selectorAware="false"/>
</virtualDestinations>

B.      Prefix for Consumer will be “VTCON”.  Now Virtual Topic consumers will be created with name like “VTCON.BASETBL.TEST.TP01”
C.      Settings are also made to NOT allow VirtualTopic Consumers as selectorAware. This will forward every message published to topic to consumers with “VTCON” prefix. If selectorAware is true, consumers will be able to add selector and only selected messages will be passed to consumers.



Tuesday, May 15, 2012

CSV to XML transformation using Camel

Lots of Projects I know needs this transformation.
CSV to XML, where you might be reading user uploaded CSV file or it can be Pipe delimited flat file and make a XML out of it.
Below is the code doing this using camel and Xstream. I am exploring more to get XML nodes to populate as headers, but not yet successful. I will update that as soon as I get the solution. till then.. this is the good code u can use.

 

package com.XYZ;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.DomDriver;

public class FileCSVtoXMLConversion {
private static XStream xStream;
public static void main(String args[]) throws Exception {
// create CamelContext
CamelContext context = new DefaultCamelContext();
xStream = new XStream(new DomDriver());
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
public void configure() {
from("
file://Some/TEST?fileName=ArticleTest.csv").
unmarshal().
csv().
process(new Processor() {
  @Override
  public void process(Exchange exchange) throws Exception {
    String xmlConverted = xStream.toXML(exchange.getIn().getBody());
    exchange.getIn().setBody(xmlConverted);
  }
 }
).to("
file://Some/TESTOUT?fileName=ArticleTest.XML");
}
});
// start the route and let it do its work
context.start();
Thread.sleep(10000);
// stop the CamelContext
context.stop();
}
}

Monday, May 14, 2012

Sending a Message to Queue (JMS Way)

There are thousand confusing ways to send a message to queue and every method looks correct and perfect for production implemetation.
Below is simple method which is used most of the time and also JMS API Complient. Benefit of using JMS Complient provider is: your code will remain unchanged even if provider changes.  But yes, you have to make sure about you exception handling mechanism and your queue access.

Steps are explained below:
1. Get the ConnectionFactory from JNDI
2. Create Connection from Connection Factory
3. Get the destination from JNDI or Create Destination
3. Create a Session from Connection
4. Create a Message Producer from Session using Destination
5. Create a Message from Session
6. Send the message using message producer.
7. Close the mess.
8. Check the queue depth - Not in Java (using queue browsing tools)



package com.xyz;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class SampleMessageSender
{
private String jndiQCFName = null;
private ConnectionFactory factory = null;
public SampleMessageSender(String jndiQCFName){
this.jndiQCFName = jndiQCFName;
}
public void sendMessage(String jndiDestination, String messagePayload)
throws NamingException
{
InitialContext context = null;
MessageProducer producer =  null;
Connection connection = null;
Session session = null;
try
{
context = new InitialContext();
factory = (ConnectionFactory) context.lookup("java:comp/env/"+jndiQCFName);
Destination destination = (Destination) context.lookup("java:comp/env/"+jndiDestination);
connection = factory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Create a Producer for this Destination
producer = session.createProducer(destination);
// Create a Message from Payload
Message message = session.createTextMessage(messagePayload);
// Send the message
producer.send(message);
}
catch(JMSException exe)
{
exe.printStackTrace();
// Do logging
if(exe.getLinkedException() != null){
// Linked exception may have provider specific details. Log it.
exe.getLinkedException().printStackTrace();
}
}
finally{
context.close();
try {
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
// Nothing can be done here
e.printStackTrace();
}
}
}
}