Showing posts with label active mq. Show all posts
Showing posts with label active mq. Show all posts

Thursday, October 29, 2015

Super Simple Example of Spring JMS - Producer

Step 1: You need Jar files:

A. Spring core, spring-context
B. Spring JMS
C. activemq-all 
D. slf4j , log4j etc.


Step 2: Create a Java Project in Eclipse and add "applicationContext.xml" File directly under "src" folder:

applicationContext.xml :

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xsi:schemaLocation="http://www.springframework.org/schema/beans 
                           http://www.springframework.org/schema/beans/spring-beans.xsd
                           http://www.springframework.org/schema/context 
                           http://www.springframework.org/schema/context/spring-context.xsd
                           http://www.springframework.org/schema/jms 
                           http://www.springframework.org/schema/jms/spring-jms.xsd
                           http://activemq.apache.org/schema/core 
                           http://activemq.apache.org/schema/core/activemq-core.xsd">

<context:component-scan base-package="com.itech" />
<context:annotation-config/>

<!-- JMS ConnectionFactory to use, configuring the embedded broker using XML -->
<bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <!-- brokerURL, You may have different IP or port -->
    <constructor-arg name="brokerURL" value="vm://localhost:61616" />
  </bean>  

<!-- Pooled Spring connection factory -->
 <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="activemqConnectionFactory" />
 </bean>

<!--  Queue Destination  -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- name of the queue -->
    <constructor-arg index="0" value="THIS.IS.TEST.QUEUE" />
  </bean>

  <!-- JmsTemplate Definition -->
  <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="queueDestination" />
  </bean>

</beans>


Have a look at XML, it has almost everything you need to start with: - ConnectionFactory, Destination and Spring Provides JmsTemplate which we will be using to send the message.

Step 3: Create Test Class:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class ProducerTest {

public static void main(String[] args) {
// Load Application Context 
   ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml");
  
            // Get JmsTemplate Bean from context     
   JmsTemplate template = (JmsTemplate) ctx.getBean("jmsTemplate");
 
           // Create Message 
   MessageCreator message = new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage("This is test message");
}
};

              // Send the message
template.send(message);

              // Close the context
((ClassPathXmlApplicationContext)ctx).close();
// You are Done!!
}

}

That's it. You are Done! You have written  a complete code in SpringJMS for sending a message.
Thanks!




Friday, October 12, 2012

ActiveMQ Composite Destinations


ActiveMQ Composite Destinations

Composite queue allows one-to-many relationship. Send a message to One Queue (A) and it will forward the message to Queue B, Queue C and Topic D.
To map Composite destination you need to add following section in virtualDestinations tag.
 <destinationInterceptors>
    <virtualDestinationInterceptor>
        <virtualDestinations>
<compositeQueue name="TEST.QUEUE.A">
   <forwardTo>
<queue physicalName="TEST.QUEUE.B" />
<queue physicalName="TEST.QUEUE.C" />
<queue physicalName="TEST.TOPIC.D" />
   </forwardTo>
</compositeQueue>
        </virtualDestinations>
    </virtualDestinationInterceptor>
  </destinationInterceptors>

By default, subscribers cannot consume messages directly from a composite queue or topic - it is a logical construct only. Given the configuration above, subscribers can only consume messages from TEST.QUEUE.B, TEST.QUEUE.C and TEST.TOPIC.D; but NOT TEST.QUEUE.A.

Using filtered destinations

You may wish to create a virtual destination which forwards messages to multiple destinations but applying a selector first to decide if the message really does have to go to a particular destination.
 <destinationInterceptors>
    <virtualDestinationInterceptor>
        <virtualDestinations>
<compositeQueue name="TEST.QUEUE.A">
   <forwardTo>
<filteredDestination selector="criteria = ‘B’” queue="TEST.QUEUE.B" />
<filteredDestination selector="criteria = ‘C’” queue="TEST.QUEUE.C" />
<filteredDestination selector="criteria = ‘D’” queue="TEST.TOPIC.D" />
   </forwardTo>
</compositeQueue>
        </virtualDestinations>
    </virtualDestinationInterceptor>
  </destinationInterceptors>

In the case above, message from TEST.QUEUE.A will be forwarded to filtered destinations with selector criteria. Result is same as having selector based consumer on TEST.QUEUE.B


Important Aspects of Composite Destinations:

-          Listener Queues (under forwardTo Tag) will not be created automatically. If queues are not present, message will be LOST.
-          If you want to store message in composite queue, make forwardOnly flag as ‘false’. It’s true by default.
<compositeQueue name="TEST.QUEUE.A" forwardOnly="false">   
      <forwardTo>
<queue physicalName="TEST.QUEUE.B" />
   </forwardTo>
</compositeQueue>

-          Composite queue can send the message to Virtual Topic, which will be further consumed by multiple Virtual Destinations.
-          Virtual Topic Selector Aware Consumers will not pull the message if consumer is not active; On the contrary, Filtered Composite destinations pull the message despite of consumer’s active status. This is very helpful in real world scenarios.
-          Important: Composite Destinations Supports wildcards. This is also valid for Filtered Destinations.
<destinationInterceptors>
   <virtualDestinationInterceptor>
      <virtualDestinations>
<compositeQueue name="TEST.QUEUE.A">
<forwardTo>
   <filteredDestination selector="criteria = ‘B’” queue="TEST.B.>."/>
   <queue physicalName="COMPOSITE.WILD.>."/>
</forwardTo>
</compositeQueue>
</virtualDestinations>
</virtualDestinationInterceptor>
</destinationInterceptors>

In the settings above, message will be forwarded to all the queues starting with “COMPOSITE.WILD”. Also, the message with filtered destinations will be forwarded to all the queues starting with ‘TEST.B’.

Tuesday, August 7, 2012

ActiveMQ Mirror Queue: Working Example.


If you reached here, you might have already checked few very confusing paragraphs about Mirror queues in ActiveMQ.
Mirror Queues are disabled by default. To enable them, you have to following settings in activemq.xml
<destinationInterceptors>
<mirroredQueue copyMessage = "true" postfix="" prefix="MIRROR."/>
                <virtualDestinationInterceptor>
                                <virtualDestinations>
                                                <virtualTopic name="MIRROR.TEST.>"  prefix="MIR.>." selectorAware="false"/>
                                </virtualDestinations>
                </virtualDestinationInterceptor>
</destinationInterceptors>
               
Out of above: “<mirroredQueue copyMessage = "true" postfix="" prefix="MIRROR."/>” is actually responsible for enabling Mirror Queues.

How ActiveMQ Handles Mirror Queues:

1.       When you enable Mirror Queues, with say prefix as “MIRROR.”, for every queue in the broker one Topic will be created with MIRROR. Prefix.
2.       Now to receive a message from this Topic, Virtual Destinations can be used. More about Virtual Destination can be found here.

Working Example:

                Let’s consider above configuration for prefix for Mirror queues and Virtual Destinations.
1.       Create a queue: TEST.QUEUE
2.       With above settings, Mirror Topic is automatically created as : MIRROR.TEST.QUEUE
3.       Create Virtual Topic Consumer : MIR.COPY.MIRROR.TEST.QUEUE
a.       MIR – Defined Virtual Topic Interceptor
b.      COPY – Name of the Consumer
c.       MIRROR.TEST.QUEUE – is a Topic Name
4.       Send a test message to TEST.QUEUE, same message will appear in MIR.COPY.MIRROR.TEST.QUEUE.

This is complete setting of Mirror Queues.

Problems:

1.       Mirror Topics will be created for Every Queue in the broker
2.       As broker has to track each message from the queue and copy it to topic, further to virtual destination, broker may face performance problem depending on load.

Tuesday, June 19, 2012

Virtual Topic SelectorAware Consumers

Making a Consumer selectorAware comes with a lackuna.
If selectorAware property is r’true’, by default, consumer queue will not pull the message from the Topic, unless there is atleast one ‘Active Consumer Application’ listening to consumer queue. Even if there is no selector defined on the consumer queue.
On the contrary, if the selectorAware is false, all the messages from the Topic will be posted to consumer queue regardless of consuming application availability.
So, its actually a design decision to use this flag wisely for proper implementation.

Tuesday, May 29, 2012

Making Virtual Topic Consumer “Selector Aware”

Making Virtual Topic Consumer “Selector Aware”
<virtualDestinations>
<virtualTopic name="MF.PRICE.CHANGE.TP01" prefix="VTCON.*." selectorAware="true"/>
</virtualDestinations>

Making Virtual topics selector aware is as easy as flipping the flag to “true”.  For this example we created two consumers, one - “VTCON.GEN.MF.PRICE.CHANGE.TP01” listens to all the messages for ‘MF.PRICE.CHANGE.TP01’ topic and second – “VTCON.MKT.MF.PRICE.CHANGE.TP01" has a Selector of “Market = 100” on it.
Now, let’s send two messages, one with “market” string property with value 100 and another with value 200. In a result we should get one message in VTCON.MKT.MF.PRICE.CHANGE.TP01 as it has a Selector of “Market = 100” on it and two messages in “VTCON.GEN.MF.PRICE.CHANGE.TP01”

Java Code will look like below:

package test.consumer;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class VirtualTopicSelectorAwareConsumer {


static Connection connection;
static Session session;

public static void main(String[] args) {
ActiveMQConnectionFactory connectionFactory =
 new ActiveMQConnectionFactory("tcp://yourserver.com:port");

try {
 connection = connectionFactory.createConnection();
 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
 // Consumer Listens to message with Market String property as 100
 String queueName = "VTCON.MKT.MF.PRICE.CHANGE.TP01";
 Destination mkt = session.createQueue(queueName);
 String selector100 = "Market = '100'";
 MessageConsumer consumer100 = session.createConsumer(mkt, selector100);
 Consumer100 listener = new VirtualTopicSelectorAwareConsumer().new Consumer100();
 consumer100.setMessageListener(listener);
 session.commit();
 // Consumer listens to all the message (Without a Selector)
 String genralConsumer = "VTCON.GEN.MF.PRICE.CHANGE.TP01";
 Destination genral = session.createQueue(genralConsumer);
 MessageConsumer genConsumer = session.createConsumer(genral);
 ConsumerNormal normal = new VirtualTopicSelectorAwareConsumer().new ConsumerNormal();
 genConsumer.setMessageListener(normal);
 session.commit();

 Topic topic = session.createTopic("MF.PRICE.CHANGE.TP01");
 MessageProducer producer = session.createProducer(topic);
 Message mktMessage = session.createTextMessage("Message for Market 100");
 mktMessage.setStringProperty("Market", "100");
 producer.send(mktMessage);

 Message mktMessage2 = session.createTextMessage("Message for Market 200");
 mktMessage.setStringProperty("Market", "200");
 producer.send(mktMessage2);
 producer.close();
 session.commit();

} catch (JMSException e) {
 e.printStackTrace();
}

}
/**
 * ConsumerNormal Listens All the messages and Log it
 */
private class ConsumerNormal implements MessageListener{

@Override
public void onMessage(Message arg0) {
 try {
  System.out.println(((TextMessage)arg0).getText());
  session.commit();
 } catch (JMSException e) {
  e.printStackTrace();
 }
}
}

/**
 * Consumer100 only listens to Market=100 messages
 */
private class Consumer100 implements MessageListener{
@Override
public void onMessage(Message arg0) {
try {
 System.out.println(((TextMessage)arg0).getText());
 session.commit();
} catch (JMSException e) {
 e.printStackTrace();
}
}
}
}

Friday, May 18, 2012

ActiveMQ Virtual Topics or Virtual Destinations

Creating Virtual Topics

Virtual Topics helps with following prospective:
1.       Load Balancing of messages
2.       Fast Failover of Subscriber
3.       Re-using same connection Factory for different Producers and Consumers. (Durable Subscribers needs a Unique JMS Client Id and same cannot be reused for any other Producer or consumer)
Steps to Define a Virtual Topic:
1.       Topic has to be created on Broker with Name: VirtualTopic.<TopicName>, However “VirtualTopic” is Out-of-the-box prefix for using this functionality.
2.       To make the change in prefix, following changes are needed.
<broker xmlns="http://activemq.apache.org/schema/core">
    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <virtualTopic name=">" prefix="VTCON.*." selectorAware="false"/>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
</broker>
In above settings two things are achieved.
A.      All Topics are made as Virtual Topic with name=">". However name field also accepts wildcard to apply different Virtual Topics policies. To change this Virtual Topic for Single Topic, just replace the “>” with actual Topic name.
<virtualDestinations>
<virtualTopic name="TEST.TP01" prefix="VTCON.*." selectorAware="false"/>
</virtualDestinations>

B.      Prefix for Consumer will be “VTCON”.  Now Virtual Topic consumers will be created with name like “VTCON.BASETBL.TEST.TP01”
C.      Settings are also made to NOT allow VirtualTopic Consumers as selectorAware. This will forward every message published to topic to consumers with “VTCON” prefix. If selectorAware is true, consumers will be able to add selector and only selected messages will be passed to consumers.



Tuesday, January 31, 2012

MQ Queue Manager: if you can directly access MQQueueManager object in your working
Environment, following is the way to create the Object directly.


MQEnvironment.hostname = "yourhost.yourcompany.com";
MQEnvironment.port = CLIENT_PORT; // Mention int port here
MQEnvironment.channel = CLIENT_CHANNEL; // Menion Server connection channel here

/* Mention queueManagerName here */
 MQQueueManagerqueueManager = new MQQueueManager(queueManagerName);

Thats it. hostname, port , channel and queueManagerName can connect you to your queue Manager.

And yes. dont forget to import this:

import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQQueueManager;

Wednesday, January 18, 2012

Streaming Messaging in ActiveMQ

Many examples on the Web floating around with a copied code snap from apache site. But here is the code which actually sends a Stream Message and More.. it also receives it with MessageListener.

Please let me know suggestions, improvements and questions if you have. thanks.

package com.xyz;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.util.ByteSequence;
import org.apache.log4j.Logger;

import com.xyz.AMQConnectorException;
public class ReadFileAndSendStream {
 static ActiveMQDirectConnector connector = null;
 static ActiveMQConnection connection = null;
 static Session session = null;
 static Destination sendToQueue = null;
 static int ch;
 static Map<String, FileOutputStream> map = new HashMap<String, FileOutputStream>();
 private static Logger log = Logger.getLogger(ReadFileAndSendStream.class);
 public static void main(String args[]){

  try {
   connector = new ActiveMQDirectConnector("tcp://Server.company.com:<PORT>");
   connection =   connector.getConnectionDetails().getConnection();
  
   session = connector.getConnectionDetails().getSession();
   // Reading a File and sending the stream to ActiveMQ queue
   sendToQueue = session.createQueue("AAD.STREAM.TEST");
  
   /*********  Sending a huge huge huge message to ActiveMQ Queue **************/
   log.debug("Start time to Send  ::  " + System.currentTimeMillis());
   File file = new File("C:/access1.log");
   DataOutputStream out2 =  new DataOutputStream(connection.createOutputStream(sendToQueue));
      FileInputStream fin = null;
      try {
        fin = new FileInputStream(file);
        while ((ch = fin.read()) != -1)
         out2.write((char) ch);
         fin.close();
      } catch (Exception e) {
       e.printStackTrace();
      }
   out2.close();
   session.commit();
   Thread.sleep(10000);
   log.debug("End time to Send  ::  " + System.currentTimeMillis());
 
   /********* I should write a consumer ***********/
  
   log.debug("Start time to read  ::  " + System.currentTimeMillis());
  
 
   MessageConsumer consumer = session.createConsumer(sendToQueue);
   MyConsumer myConsumer = new MyConsumer();
   consumer.setMessageListener(myConsumer);
   connection.setExceptionListener(myConsumer);
  
   log.debug("End time to read  ::  " + System.currentTimeMillis());
  
   /*********  Reading a huge huge huge message from ActiveMQ Queue
  
   DataInputStream in = new DataInputStream(connection.createInputStream(sendToQueue));
   FileWriter fstream = new FileWriter("C:/out.txt");
   BufferedWriter out = new BufferedWriter(fstream);
    try {
         while ((ch = in.read()) != -1)
        out.write((char) ch);
          in.close();
       } catch (Exception e) {
        e.printStackTrace();
       }
     out.close();
     **************/
  } catch (AMQConnectorException e) {
   e.printStackTrace();
  }catch (JMSException e) {
   e.printStackTrace();
  } catch (IOException e) {
   e.printStackTrace();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 
 }

 private static class MyConsumer implements MessageListener, ExceptionListener{

  @Override
   public void onMessage(Message message) {
    String messageId;
    FileOutputStream fos = null;

    if (message instanceof BytesMessage) {
    try {
     messageId = ((ActiveMQBytesMessage)message).getGroupID();
   
     if(!map.containsKey(messageId)) {
      log.debug("This is the message ID:" + messageId);
      File outFile = new File("C:/outListen.txt");
      fos = new FileOutputStream(outFile);
      map.put(messageId, fos);
     } else {
      fos = (FileOutputStream) map.get(messageId);
     }
     ByteSequence bs = ((ActiveMQBytesMessage)message).getContent();
     fos.write(bs.getData());
    
     log.debug("getGroupSequence" +((ActiveMQBytesMessage)message).getGroupSequence());
    }catch (IOException e) {
     e.printStackTrace();
    } finally {
   
    }
    }else
    {
     try {
      log.debug("This is the message type:" + message.getJMSType() + "\n\n" + message);
      fos = (FileOutputStream) map.remove(((ActiveMQMessage)message).getGroupID() );

      if (fos != null)
      {
       log.debug("Closing File");
       fos.close();
      }
      } catch (JMSException e) {
       e.printStackTrace();
      } catch (IOException e) {
       e.printStackTrace();
      } finally {
       try {
        session.commit();
       } catch (JMSException e) {
        e.printStackTrace();
       }
      }

    }
   }
   @Override
   public void onException(JMSException ex) {
    System.out.println(ex);
    ex.printStackTrace();
   
   }
 }
}