Making Virtual Topic Consumer “Selector Aware”
<virtualDestinations>
<virtualTopic name="MF.PRICE.CHANGE.TP01" prefix="VTCON.*." selectorAware="true"/>
</virtualDestinations>
Making Virtual topics selector aware is as easy as flipping the flag to “true”. For this example we created two consumers, one - “VTCON.GEN.MF.PRICE.CHANGE.TP01” listens to all the messages for ‘MF.PRICE.CHANGE.TP01’ topic and second – “VTCON.MKT.MF.PRICE.CHANGE.TP01" has a Selector of “Market = 100” on it.
Now, let’s send two messages, one with “market” string property with value 100 and another with value 200. In a result we should get one message in VTCON.MKT.MF.PRICE.CHANGE.TP01 as it has a Selector of “Market = 100” on it and two messages in “VTCON.GEN.MF.PRICE.CHANGE.TP01”
Java Code will look like below:
package test.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopicSelectorAwareConsumer {
static Connection connection;
static Session session;
public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://yourserver.com:port");
try {
connection = connectionFactory.createConnection();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// Consumer Listens to message with Market String property as 100
String queueName = "VTCON.MKT.MF.PRICE.CHANGE.TP01";
Destination mkt = session.createQueue(queueName);
String selector100 = "Market = '100'";
MessageConsumer consumer100 = session.createConsumer(mkt, selector100);
Consumer100 listener = new VirtualTopicSelectorAwareConsumer().new Consumer100();
consumer100.setMessageListener(listener);
session.commit();
// Consumer listens to all the message (Without a Selector)
String genralConsumer = "VTCON.GEN.MF.PRICE.CHANGE.TP01";
Destination genral = session.createQueue(genralConsumer);
MessageConsumer genConsumer = session.createConsumer(genral);
ConsumerNormal normal = new VirtualTopicSelectorAwareConsumer().new ConsumerNormal();
genConsumer.setMessageListener(normal);
session.commit();
Topic topic = session.createTopic("MF.PRICE.CHANGE.TP01");
MessageProducer producer = session.createProducer(topic);
Message mktMessage = session.createTextMessage("Message for Market 100");
mktMessage.setStringProperty("Market", "100");
producer.send(mktMessage);
Message mktMessage2 = session.createTextMessage("Message for Market 200");
mktMessage.setStringProperty("Market", "200");
producer.send(mktMessage2);
producer.close();
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* ConsumerNormal Listens All the messages and Log it
*/
private class ConsumerNormal implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
System.out.println(((TextMessage)arg0).getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
/**
* Consumer100 only listens to Market=100 messages
*/
private class Consumer100 implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
System.out.println(((TextMessage)arg0).getText());
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}