Friday, June 29, 2012

ActiveMQ Message Sequencing : Analysis Part I

Message Sequencing POC:
In most of the corporate production environments, messages are not sent and received from the same broker. Message might end up from multiple hops and clusters within infrastructure before it actually hits the business consumer. I did a small analysis with ActiveMQ out of the box configuration to use sequencing. Below are the results.
Test Case 1:
Messages are sent to queue / Topic and received in same broker (even from Virtual Destination queue). Single producer and exclusive consumer are used.
Result:
1.       Messages are always received in Sequence in normal running consumer
2.       Message maintains their sequence even if consumer thread restarts in between
3.       Message Priority is IGNORED and sequenced is maintained 
4.       Results matches the documentation provided here: http://activemq.apache.org/exclusive-consumer.html
Test Case 2:
Messages are going through multiple hops before get consumed from exclusive consumer.
Result:
1.       Message loses sequence
2.       Priority Set on the messages resets to default (‘4’) regardless of producing value.

Summary:
·         If producer and consumer maintained in same broker and consumers are made exclusive; message sequencing is guaranteed.
·         If message is making multiple hops before reaching to consumer, sequencing is NOT guaranteed even if Exclusive consumers are used.

 In Part II, we will use JMXGroupId concept for message sequencing and find the right results.

Tuesday, June 19, 2012

Virtual Topic SelectorAware Consumers

Making a Consumer selectorAware comes with a lackuna.
If selectorAware property is r’true’, by default, consumer queue will not pull the message from the Topic, unless there is atleast one ‘Active Consumer Application’ listening to consumer queue. Even if there is no selector defined on the consumer queue.
On the contrary, if the selectorAware is false, all the messages from the Topic will be posted to consumer queue regardless of consuming application availability.
So, its actually a design decision to use this flag wisely for proper implementation.

Tuesday, May 29, 2012

Making Virtual Topic Consumer “Selector Aware”

Making Virtual Topic Consumer “Selector Aware”
<virtualDestinations>
<virtualTopic name="MF.PRICE.CHANGE.TP01" prefix="VTCON.*." selectorAware="true"/>
</virtualDestinations>

Making Virtual topics selector aware is as easy as flipping the flag to “true”.  For this example we created two consumers, one - “VTCON.GEN.MF.PRICE.CHANGE.TP01” listens to all the messages for ‘MF.PRICE.CHANGE.TP01’ topic and second – “VTCON.MKT.MF.PRICE.CHANGE.TP01" has a Selector of “Market = 100” on it.
Now, let’s send two messages, one with “market” string property with value 100 and another with value 200. In a result we should get one message in VTCON.MKT.MF.PRICE.CHANGE.TP01 as it has a Selector of “Market = 100” on it and two messages in “VTCON.GEN.MF.PRICE.CHANGE.TP01”

Java Code will look like below:

package test.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopicSelectorAwareConsumer {


static Connection connection;
static Session session;

public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory =
 new ActiveMQConnectionFactory("tcp://yourserver.com:port");

try {
 connection = connectionFactory.createConnection();
 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 // Consumer Listens to message with Market String property as 100
 String queueName = "VTCON.MKT.MF.PRICE.CHANGE.TP01";
 Destination mkt = session.createQueue(queueName);
 String selector100 = "Market = '100'";
 MessageConsumer consumer100 = session.createConsumer(mkt, selector100);
 Consumer100 listener = new VirtualTopicSelectorAwareConsumer().new Consumer100();
 consumer100.setMessageListener(listener);
 session.commit();
 // Consumer listens to all the message (Without a Selector)
 String genralConsumer = "VTCON.GEN.MF.PRICE.CHANGE.TP01";
 Destination genral = session.createQueue(genralConsumer);
 MessageConsumer genConsumer = session.createConsumer(genral);
 ConsumerNormal normal = new VirtualTopicSelectorAwareConsumer().new ConsumerNormal();
 genConsumer.setMessageListener(normal);
 session.commit();

 Topic topic = session.createTopic("MF.PRICE.CHANGE.TP01");
 MessageProducer producer = session.createProducer(topic);
 Message mktMessage = session.createTextMessage("Message for Market 100");
 mktMessage.setStringProperty("Market", "100");
 producer.send(mktMessage);

 Message mktMessage2 = session.createTextMessage("Message for Market 200");
 mktMessage.setStringProperty("Market", "200");
 producer.send(mktMessage2);
 producer.close();
 session.commit();

} catch (JMSException e) {
 e.printStackTrace();
}

}
/**
 * ConsumerNormal Listens All the messages and Log it
 */
private class ConsumerNormal implements MessageListener{

@Override
public void onMessage(Message arg0) {
 try {
  System.out.println(((TextMessage)arg0).getText());
  session.commit();
 } catch (JMSException e) {
  e.printStackTrace();
 }
}
}

/**
 * Consumer100 only listens to Market=100 messages
 */
private class Consumer100 implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
 System.out.println(((TextMessage)arg0).getText());
 session.commit();
} catch (JMSException e) {
 e.printStackTrace();
}
}
}
}

Friday, May 18, 2012

ActiveMQ Virtual Topics or Virtual Destinations

Creating Virtual Topics

Virtual Topics helps with following prospective:
1.       Load Balancing of messages
2.       Fast Failover of Subscriber
3.       Re-using same connection Factory for different Producers and Consumers. (Durable Subscribers needs a Unique JMS Client Id and same cannot be reused for any other Producer or consumer)
Steps to Define a Virtual Topic:
1.       Topic has to be created on Broker with Name: VirtualTopic.<TopicName>, However “VirtualTopic” is Out-of-the-box prefix for using this functionality.
2.       To make the change in prefix, following changes are needed.
<broker xmlns="http://activemq.apache.org/schema/core">
    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <virtualTopic name=">" prefix="VTCON.*." selectorAware="false"/>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>
In above settings two things are achieved.
A.      All Topics are made as Virtual Topic with name=">". However name field also accepts wildcard to apply different Virtual Topics policies. To change this Virtual Topic for Single Topic, just replace the “>” with actual Topic name.
<virtualDestinations>
<virtualTopic name="TEST.TP01" prefix="VTCON.*." selectorAware="false"/>
</virtualDestinations>

B.      Prefix for Consumer will be “VTCON”.  Now Virtual Topic consumers will be created with name like “VTCON.BASETBL.TEST.TP01”
C.      Settings are also made to NOT allow VirtualTopic Consumers as selectorAware. This will forward every message published to topic to consumers with “VTCON” prefix. If selectorAware is true, consumers will be able to add selector and only selected messages will be passed to consumers.



Tuesday, May 15, 2012

CSV to XML transformation using Camel

Lots of Projects I know needs this transformation.
CSV to XML, where you might be reading user uploaded CSV file or it can be Pipe delimited flat file and make a XML out of it.
Below is the code doing this using camel and Xstream. I am exploring more to get XML nodes to populate as headers, but not yet successful. I will update that as soon as I get the solution. till then.. this is the good code u can use.

 

package com.XYZ;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.xml.DomDriver;

public class FileCSVtoXMLConversion {
private static XStream xStream;
public static void main(String args[]) throws Exception {
// create CamelContext
CamelContext context = new DefaultCamelContext();
xStream = new XStream(new DomDriver());
// add our route to the CamelContext
context.addRoutes(new RouteBuilder() {
public void configure() {
from("
file://Some/TEST?fileName=ArticleTest.csv").
unmarshal().
csv().
process(new Processor() {
  @Override
  public void process(Exchange exchange) throws Exception {
    String xmlConverted = xStream.toXML(exchange.getIn().getBody());
    exchange.getIn().setBody(xmlConverted);
  }
 }
).to("
file://Some/TESTOUT?fileName=ArticleTest.XML");
}
});
// start the route and let it do its work
context.start();
Thread.sleep(10000);
// stop the CamelContext
context.stop();
}
}

Monday, May 14, 2012

Sending a Message to Queue (JMS Way)

There are thousand confusing ways to send a message to queue and every method looks correct and perfect for production implemetation.
Below is simple method which is used most of the time and also JMS API Complient. Benefit of using JMS Complient provider is: your code will remain unchanged even if provider changes.  But yes, you have to make sure about you exception handling mechanism and your queue access.

Steps are explained below:
1. Get the ConnectionFactory from JNDI
2. Create Connection from Connection Factory
3. Get the destination from JNDI or Create Destination
3. Create a Session from Connection
4. Create a Message Producer from Session using Destination
5. Create a Message from Session
6. Send the message using message producer.
7. Close the mess.
8. Check the queue depth - Not in Java (using queue browsing tools)



package com.xyz;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class SampleMessageSender
{
private String jndiQCFName = null;
private ConnectionFactory factory = null;
public SampleMessageSender(String jndiQCFName){
this.jndiQCFName = jndiQCFName;
}
public void sendMessage(String jndiDestination, String messagePayload)
throws NamingException
{
InitialContext context = null;
MessageProducer producer =  null;
Connection connection = null;
Session session = null;
try
{
context = new InitialContext();
factory = (ConnectionFactory) context.lookup("java:comp/env/"+jndiQCFName);
Destination destination = (Destination) context.lookup("java:comp/env/"+jndiDestination);
connection = factory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Create a Producer for this Destination
producer = session.createProducer(destination);
// Create a Message from Payload
Message message = session.createTextMessage(messagePayload);
// Send the message
producer.send(message);
}
catch(JMSException exe)
{
exe.printStackTrace();
// Do logging
if(exe.getLinkedException() != null){
// Linked exception may have provider specific details. Log it.
exe.getLinkedException().printStackTrace();
}
}
finally{
context.close();
try {
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
// Nothing can be done here
e.printStackTrace();
}
}
}
}

Tuesday, January 31, 2012

MQ Queue Manager: if you can directly access MQQueueManager object in your working
Environment, following is the way to create the Object directly.


MQEnvironment.hostname = "yourhost.yourcompany.com";
MQEnvironment.port = CLIENT_PORT; // Mention int port here
MQEnvironment.channel = CLIENT_CHANNEL; // Menion Server connection channel here

/* Mention queueManagerName here */
 MQQueueManagerqueueManager = new MQQueueManager(queueManagerName);

Thats it. hostname, port , channel and queueManagerName can connect you to your queue Manager.

And yes. dont forget to import this:

import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQQueueManager;