Showing posts with label selectorAware. Show all posts
Showing posts with label selectorAware. Show all posts

Tuesday, June 19, 2012

Virtual Topic SelectorAware Consumers

Making a Consumer selectorAware comes with a lackuna.
If selectorAware property is r’true’, by default, consumer queue will not pull the message from the Topic, unless there is atleast one ‘Active Consumer Application’ listening to consumer queue. Even if there is no selector defined on the consumer queue.
On the contrary, if the selectorAware is false, all the messages from the Topic will be posted to consumer queue regardless of consuming application availability.
So, its actually a design decision to use this flag wisely for proper implementation.

Tuesday, May 29, 2012

Making Virtual Topic Consumer “Selector Aware”

Making Virtual Topic Consumer “Selector Aware”
<virtualDestinations>
<virtualTopic name="MF.PRICE.CHANGE.TP01" prefix="VTCON.*." selectorAware="true"/>
</virtualDestinations>

Making Virtual topics selector aware is as easy as flipping the flag to “true”.  For this example we created two consumers, one - “VTCON.GEN.MF.PRICE.CHANGE.TP01” listens to all the messages for ‘MF.PRICE.CHANGE.TP01’ topic and second – “VTCON.MKT.MF.PRICE.CHANGE.TP01" has a Selector of “Market = 100” on it.
Now, let’s send two messages, one with “market” string property with value 100 and another with value 200. In a result we should get one message in VTCON.MKT.MF.PRICE.CHANGE.TP01 as it has a Selector of “Market = 100” on it and two messages in “VTCON.GEN.MF.PRICE.CHANGE.TP01”

Java Code will look like below:

package test.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopicSelectorAwareConsumer {


static Connection connection;
static Session session;

public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory =
 new ActiveMQConnectionFactory("tcp://yourserver.com:port");

try {
 connection = connectionFactory.createConnection();
 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 // Consumer Listens to message with Market String property as 100
 String queueName = "VTCON.MKT.MF.PRICE.CHANGE.TP01";
 Destination mkt = session.createQueue(queueName);
 String selector100 = "Market = '100'";
 MessageConsumer consumer100 = session.createConsumer(mkt, selector100);
 Consumer100 listener = new VirtualTopicSelectorAwareConsumer().new Consumer100();
 consumer100.setMessageListener(listener);
 session.commit();
 // Consumer listens to all the message (Without a Selector)
 String genralConsumer = "VTCON.GEN.MF.PRICE.CHANGE.TP01";
 Destination genral = session.createQueue(genralConsumer);
 MessageConsumer genConsumer = session.createConsumer(genral);
 ConsumerNormal normal = new VirtualTopicSelectorAwareConsumer().new ConsumerNormal();
 genConsumer.setMessageListener(normal);
 session.commit();

 Topic topic = session.createTopic("MF.PRICE.CHANGE.TP01");
 MessageProducer producer = session.createProducer(topic);
 Message mktMessage = session.createTextMessage("Message for Market 100");
 mktMessage.setStringProperty("Market", "100");
 producer.send(mktMessage);

 Message mktMessage2 = session.createTextMessage("Message for Market 200");
 mktMessage.setStringProperty("Market", "200");
 producer.send(mktMessage2);
 producer.close();
 session.commit();

} catch (JMSException e) {
 e.printStackTrace();
}

}
/**
 * ConsumerNormal Listens All the messages and Log it
 */
private class ConsumerNormal implements MessageListener{

@Override
public void onMessage(Message arg0) {
 try {
  System.out.println(((TextMessage)arg0).getText());
  session.commit();
 } catch (JMSException e) {
  e.printStackTrace();
 }
}
}

/**
 * Consumer100 only listens to Market=100 messages
 */
private class Consumer100 implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
 System.out.println(((TextMessage)arg0).getText());
 session.commit();
} catch (JMSException e) {
 e.printStackTrace();
}
}
}
}