Friday, April 5, 2013

RabbitMQ using Java

RabbitMQ is an open source message broker which implements AMQP messaging protocol. It also has multiple client libraries (Java, .NET, Erlang) which can be used to send/recieve AMQP messages to/from an AMQP broker.

It's based on Erlang/OTP which gives very good scalability. Finally, you should also look at QPID, an implementation by the Apache Foundation which is supported by Red Hat and Microsoft, and OpenAMQ.

In this post I’m going to explain how to use the RabbitMQ Java Client Library to send and Receive messages. Since RabbitMQ also can act as an AMQP server, I’ll be using it in this post.

It doesnt require any server.

What is AMQP?AMQP (Advanced Message Queuing Protocol) is a networking protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.
Brokers and Their RoleMessaging brokers receive messages from producers (applications that publish them) and route them to consumers (applications that process them).
Since AMQP is a network protocol, the producers, consumers and the broker can all reside on different machines.

It implements a broker architecture, meaning that messages are queued on a central node before being sent to clients. This approach makes RabbitMQ very easy to use and deploy, because advanced scenarios like routing, load balancing or persistent message queuing are supported in just a few lines of code. However, it also makes it less scalable and slower because the central node adds latency and message envelopes are quite big.

Message Queue Servers
Message queue servers are available in various languages, Erlang (RabbitMQ), C (beanstalkd), Ruby (Starling or Sparrow), Scala (Kestrel, Kafka) or Java (ActiveMQ).

Sparrow
  • Sparrow is a lightweight queue written in Ruby that “speaks memcache”
Starling
  • Starling is a Message Queue Server based on MemCached
  • written in Ruby
  • stores jobs in memory (message queue)
Kestrel
  • Starling clone written in Scala (a port of Starling from Ruby to Scala)
  • Queues are stored in memory, but logged on disk
RabbitMQ
  • RabbitMQ is a Message Queue Server in Erlang
  • stores jobs in memory (message queue)
Apache ActiveMQ
  • ActiveMQ is an open source message broker in Java
Beanstalkd
Kafka
  • Written at LinkedIn in Scala
  • Used by LinkedIn to offload processing of all page and other views
  • Defaults to using persistence, uses OS disk cache for hot data (has higher throughput then any of the above having persistence enabled)
  • Supports both on-line as off-line processing
 HornetQ is also an option you can look into, it is JMS Complaint, a better option than ActiveMQ.

Nice working Example is given in below link

Source Code Link


Core Java to create a RabbitMQ connection. It is documented very well in RabbitMQ site.

create a connection to the server:
       ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.

we must declare a queue for us to send to; then we can publish a message to the queue:
        private final static String QUEUE_NAME = "hello";
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
 it will only be created if it doesn't exist already.
close the channel and the connection;
        channel.close();
    connection.close();

 Reciever

        ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages.
   
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }


No comments:

Post a Comment