Communication Between Java Applications (3) – ActiveMQ

ActiveMQ is a popular library to use messaging via JMS.

There is a very good “Hello World” tutorial exists in offical site, I must say.
http://activemq.apache.org/hello-world.html

So I’m going to write another “Hello World” example with ActiveMQ, but I also show you how you can get statistics (enqueue/dequeue count etc.) in your code.

ActiveMQ architecture simply consist of a producer which sends messages to a broker and a consumer which listens that broker to receive messages. We will create a Queue on this broker to keep messages.

package com;

public class Main {

	private static String jmsAddress = "tcp://127.0.0.1:8083";
	private static String queueName = "myQueue";

	public static void main(String[] args) throws Exception {
		Broker broker = new Broker(jmsAddress);
		broker.start();

		thread(new Producer(jmsAddress, queueName), false);
		thread(new Consumer(jmsAddress, queueName), false);
		thread(new Producer(jmsAddress, queueName), false);
		thread(new Consumer(jmsAddress, queueName), false);

		// just wait to be sure that connection is not closed before communication complete
		Thread.sleep(2000);
		
		broker.stop();
	}

	public static void thread(Runnable runnable, boolean daemon) {
		Thread brokerThread = new Thread(runnable);
		brokerThread.setDaemon(daemon);
		brokerThread.start();
	}

}


package com;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.QueueView;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;

public class Broker {

	private BrokerService broker;

	public Broker(String jmsAddress) {
		broker = new BrokerService();
		try {
			broker.addConnector(jmsAddress);
		} catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
		}
		broker.setPersistent(false);
		broker.setBrokerName("myBroker");

		// TODO to see statistics on jConsole check
		// http://activemq.apache.org/jmx.html
		// broker.setUseJmx(true);
	}

	public void start() {
		try {
			broker.start();
		} catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
		}
	}

	public void stop() {
		try {
			getStats(); // write statics before shutting down
			broker.stop();
		} catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
		}
	}

	/**
	 * Return all of QueueView objects belongs to brokerService QueueView
	 * objects includes statistics like queueSize, enqueueCount, dequeueCount,
	 * inFlightCount etc.
	 */
	public List<QueueView> getQueueViews() {
		List<QueueView> myQueueViews = new ArrayList<QueueView>();
		try {
			if (broker != null && broker.getAdminView() != null && broker.getAdminView().getBroker() != null) {
				ManagedRegionBroker mrb = broker.getAdminView().getBroker();
				if (mrb.getQueueRegion() != null) {
					Region myRegion = mrb.getQueueRegion();
					if (myRegion.getDestinationMap() != null) {
						for (org.apache.activemq.broker.region.Destination d : myRegion.getDestinationMap().values()) {
							QueueView qv = new QueueView(mrb, (Queue) d);
							myQueueViews.add(qv);
						}
					}
				}
			}
		} catch (IOException e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
		} catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
		}
		return myQueueViews;
	}

	public void getStats() {
		List<QueueView> stat = getQueueViews();
		for (QueueView qv : stat) {
			System.out.println("Queue Name: " + qv.getName() + ", EnqueueCount: " + qv.getEnqueueCount()
					+ ", DequeueCount " + qv.getDequeueCount() + ", In-Flight Count: " + qv.getInFlightCount());
		}
	}

}


/**
 * 
 * Taken from ActiveMQ official site http://activemq.apache.org/hello-world.html
 * 
 * */

package com;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Producer implements Runnable {
	private String jmsAddress = null;
	private String queueName = null;

	public Producer(String jmsAddressParam, String queueNameParam) {
		jmsAddress = jmsAddressParam;
		queueName = queueNameParam;
	}

	public void run() {
		try {
			// Create a ConnectionFactory
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("", "", jmsAddress);

			// Create a Connection
			Connection connection = connectionFactory.createConnection();
			connection.start();

			// Create a Session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// Create the destination (Topic or Queue)
			Destination destination = session.createQueue(queueName);

			// Create a MessageProducer from the Session to the Topic or
			// Queue
			MessageProducer producer = session.createProducer(destination);
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

			// Create a messages
			String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();
			TextMessage message = session.createTextMessage(text);

			// Tell the producer to send the message
			System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());
			producer.send(message);

			// Clean up
			session.close();
			connection.close();
		} catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
		}
	}
}


/**
 * 
 * Taken from ActiveMQ official site http://activemq.apache.org/hello-world.html
 * 
 * */

package com;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer implements Runnable, ExceptionListener {
	private String jmsAddress = null;
	private String queueName = null;

	public Consumer(String jmsAddressParam, String queueNameParam) {
		jmsAddress = jmsAddressParam;
		queueName = queueNameParam;
	}

	public void run() {

		try {

			// Create a ConnectionFactory
			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("", "", jmsAddress);

			// Create a Connection
			Connection connection = connectionFactory.createConnection();
			connection.start();

			connection.setExceptionListener(this);

			// Create a Session
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

			// Create the destination (Topic or Queue)
			Destination destination = session.createQueue(queueName);

			// Create a MessageConsumer from the Session to the Topic or
			// Queue
			MessageConsumer consumer = session.createConsumer(destination);

			// Wait for a message
			Message message = consumer.receive(1000);

			if (message instanceof TextMessage) {
				TextMessage textMessage = (TextMessage) message;
				String text = textMessage.getText();
				System.out.println("Received: " + text);
			} else {
				System.out.println("Received: " + message);
			}

			consumer.close();
			session.close();
			connection.close();
		} catch (Exception e) {
			System.out.println("Caught: " + e);
			e.printStackTrace();
		}
	}

	public synchronized void onException(JMSException ex) {
		System.out.println("JMS Exception occured.  Shutting down client.");
	}
}

It is going to look like this

INFO | Using Persistence Adapter: MemoryPersistenceAdapter
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
INFO | Apache ActiveMQ 5.12.0 (myBroker, ID:afsinka-53781-1440931152475-0:1) is starting
INFO | Listening for connections at: tcp://127.0.0.1:8083
INFO | Connector tcp://127.0.0.1:8083 started
INFO | Apache ActiveMQ 5.12.0 (myBroker, ID:afsinka-53781-1440931152475-0:1) started
INFO | For help or more information please see: http://activemq.apache.org
Sent message: 109976861 : Thread-4
Sent message: 885422328 : Thread-2
Received: Hello world! From: Thread-4 : 335545524
Received: Hello world! From: Thread-2 : 1040242525

Queue Name: myQueue, EnqueueCount: 2, DequeueCount 2, In-Flight Count: 0
INFO | Apache ActiveMQ 5.12.0 (myBroker, ID:afsinka-53781-1440931152475-0:1) is shutting down
INFO | Connector tcp://127.0.0.1:8083 stopped
INFO | Apache ActiveMQ 5.12.0 (myBroker, ID:afsinka-53781-1440931152475-0:1) uptime 2.311 seconds
INFO | Apache ActiveMQ 5.12.0 (myBroker, ID:afsinka-53781-1440931152475-0:1) is shutdown

Also you can see your messaging statistics on jConsole as explained in here:
http://activemq.apache.org/jmx.html

In this example producer, consumer and broker in the same machine. If you want to send messages between seperate machines you should also consider embedded broker concept.

Any criticism is welcome!

Advertisements
Tagged

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: