Tuesday, May 29, 2012

Making Virtual Topic Consumer “Selector Aware”

Making Virtual Topic Consumer “Selector Aware”
<virtualDestinations>
<virtualTopic name="MF.PRICE.CHANGE.TP01" prefix="VTCON.*." selectorAware="true"/>
</virtualDestinations>

Making Virtual topics selector aware is as easy as flipping the flag to “true”.  For this example we created two consumers, one - “VTCON.GEN.MF.PRICE.CHANGE.TP01” listens to all the messages for ‘MF.PRICE.CHANGE.TP01’ topic and second – “VTCON.MKT.MF.PRICE.CHANGE.TP01" has a Selector of “Market = 100” on it.
Now, let’s send two messages, one with “market” string property with value 100 and another with value 200. In a result we should get one message in VTCON.MKT.MF.PRICE.CHANGE.TP01 as it has a Selector of “Market = 100” on it and two messages in “VTCON.GEN.MF.PRICE.CHANGE.TP01”

Java Code will look like below:

package test.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopicSelectorAwareConsumer {


static Connection connection;
static Session session;

public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory =
 new ActiveMQConnectionFactory("tcp://yourserver.com:port");

try {
 connection = connectionFactory.createConnection();
 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 // Consumer Listens to message with Market String property as 100
 String queueName = "VTCON.MKT.MF.PRICE.CHANGE.TP01";
 Destination mkt = session.createQueue(queueName);
 String selector100 = "Market = '100'";
 MessageConsumer consumer100 = session.createConsumer(mkt, selector100);
 Consumer100 listener = new VirtualTopicSelectorAwareConsumer().new Consumer100();
 consumer100.setMessageListener(listener);
 session.commit();
 // Consumer listens to all the message (Without a Selector)
 String genralConsumer = "VTCON.GEN.MF.PRICE.CHANGE.TP01";
 Destination genral = session.createQueue(genralConsumer);
 MessageConsumer genConsumer = session.createConsumer(genral);
 ConsumerNormal normal = new VirtualTopicSelectorAwareConsumer().new ConsumerNormal();
 genConsumer.setMessageListener(normal);
 session.commit();

 Topic topic = session.createTopic("MF.PRICE.CHANGE.TP01");
 MessageProducer producer = session.createProducer(topic);
 Message mktMessage = session.createTextMessage("Message for Market 100");
 mktMessage.setStringProperty("Market", "100");
 producer.send(mktMessage);

 Message mktMessage2 = session.createTextMessage("Message for Market 200");
 mktMessage.setStringProperty("Market", "200");
 producer.send(mktMessage2);
 producer.close();
 session.commit();

} catch (JMSException e) {
 e.printStackTrace();
}

}
/**
 * ConsumerNormal Listens All the messages and Log it
 */
private class ConsumerNormal implements MessageListener{

@Override
public void onMessage(Message arg0) {
 try {
  System.out.println(((TextMessage)arg0).getText());
  session.commit();
 } catch (JMSException e) {
  e.printStackTrace();
 }
}
}

/**
 * Consumer100 only listens to Market=100 messages
 */
private class Consumer100 implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
 System.out.println(((TextMessage)arg0).getText());
 session.commit();
} catch (JMSException e) {
 e.printStackTrace();
}
}
}
}

No comments:

Post a Comment