Showing posts with label virtual destinations. Show all posts
Showing posts with label virtual destinations. Show all posts

Tuesday, June 19, 2012

Virtual Topic SelectorAware Consumers

Making a Consumer selectorAware comes with a lackuna.
If selectorAware property is r’true’, by default, consumer queue will not pull the message from the Topic, unless there is atleast one ‘Active Consumer Application’ listening to consumer queue. Even if there is no selector defined on the consumer queue.
On the contrary, if the selectorAware is false, all the messages from the Topic will be posted to consumer queue regardless of consuming application availability.
So, its actually a design decision to use this flag wisely for proper implementation.

Tuesday, May 29, 2012

Making Virtual Topic Consumer “Selector Aware”

Making Virtual Topic Consumer “Selector Aware”
<virtualDestinations>
<virtualTopic name="MF.PRICE.CHANGE.TP01" prefix="VTCON.*." selectorAware="true"/>
</virtualDestinations>

Making Virtual topics selector aware is as easy as flipping the flag to “true”.  For this example we created two consumers, one - “VTCON.GEN.MF.PRICE.CHANGE.TP01” listens to all the messages for ‘MF.PRICE.CHANGE.TP01’ topic and second – “VTCON.MKT.MF.PRICE.CHANGE.TP01" has a Selector of “Market = 100” on it.
Now, let’s send two messages, one with “market” string property with value 100 and another with value 200. In a result we should get one message in VTCON.MKT.MF.PRICE.CHANGE.TP01 as it has a Selector of “Market = 100” on it and two messages in “VTCON.GEN.MF.PRICE.CHANGE.TP01”

Java Code will look like below:

package test.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopicSelectorAwareConsumer {


static Connection connection;
static Session session;

public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory =
 new ActiveMQConnectionFactory("tcp://yourserver.com:port");

try {
 connection = connectionFactory.createConnection();
 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 // Consumer Listens to message with Market String property as 100
 String queueName = "VTCON.MKT.MF.PRICE.CHANGE.TP01";
 Destination mkt = session.createQueue(queueName);
 String selector100 = "Market = '100'";
 MessageConsumer consumer100 = session.createConsumer(mkt, selector100);
 Consumer100 listener = new VirtualTopicSelectorAwareConsumer().new Consumer100();
 consumer100.setMessageListener(listener);
 session.commit();
 // Consumer listens to all the message (Without a Selector)
 String genralConsumer = "VTCON.GEN.MF.PRICE.CHANGE.TP01";
 Destination genral = session.createQueue(genralConsumer);
 MessageConsumer genConsumer = session.createConsumer(genral);
 ConsumerNormal normal = new VirtualTopicSelectorAwareConsumer().new ConsumerNormal();
 genConsumer.setMessageListener(normal);
 session.commit();

 Topic topic = session.createTopic("MF.PRICE.CHANGE.TP01");
 MessageProducer producer = session.createProducer(topic);
 Message mktMessage = session.createTextMessage("Message for Market 100");
 mktMessage.setStringProperty("Market", "100");
 producer.send(mktMessage);

 Message mktMessage2 = session.createTextMessage("Message for Market 200");
 mktMessage.setStringProperty("Market", "200");
 producer.send(mktMessage2);
 producer.close();
 session.commit();

} catch (JMSException e) {
 e.printStackTrace();
}

}
/**
 * ConsumerNormal Listens All the messages and Log it
 */
private class ConsumerNormal implements MessageListener{

@Override
public void onMessage(Message arg0) {
 try {
  System.out.println(((TextMessage)arg0).getText());
  session.commit();
 } catch (JMSException e) {
  e.printStackTrace();
 }
}
}

/**
 * Consumer100 only listens to Market=100 messages
 */
private class Consumer100 implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
 System.out.println(((TextMessage)arg0).getText());
 session.commit();
} catch (JMSException e) {
 e.printStackTrace();
}
}
}
}

Friday, May 18, 2012

ActiveMQ Virtual Topics or Virtual Destinations

Creating Virtual Topics

Virtual Topics helps with following prospective:
1.       Load Balancing of messages
2.       Fast Failover of Subscriber
3.       Re-using same connection Factory for different Producers and Consumers. (Durable Subscribers needs a Unique JMS Client Id and same cannot be reused for any other Producer or consumer)
Steps to Define a Virtual Topic:
1.       Topic has to be created on Broker with Name: VirtualTopic.<TopicName>, However “VirtualTopic” is Out-of-the-box prefix for using this functionality.
2.       To make the change in prefix, following changes are needed.
<broker xmlns="http://activemq.apache.org/schema/core">
    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <virtualTopic name=">" prefix="VTCON.*." selectorAware="false"/>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>
In above settings two things are achieved.
A.      All Topics are made as Virtual Topic with name=">". However name field also accepts wildcard to apply different Virtual Topics policies. To change this Virtual Topic for Single Topic, just replace the “>” with actual Topic name.
<virtualDestinations>
<virtualTopic name="TEST.TP01" prefix="VTCON.*." selectorAware="false"/>
</virtualDestinations>

B.      Prefix for Consumer will be “VTCON”.  Now Virtual Topic consumers will be created with name like “VTCON.BASETBL.TEST.TP01”
C.      Settings are also made to NOT allow VirtualTopic Consumers as selectorAware. This will forward every message published to topic to consumers with “VTCON” prefix. If selectorAware is true, consumers will be able to add selector and only selected messages will be passed to consumers.