Wednesday, April 24, 2013

Java7 NIO WatchService

Java 7: NIO WatchService

In my previous post I have written many of the Java 7 features for developing Java Telnet Server.
Here I want to go deep into one such feature WatchService.

The WatchService is a very interesting feature of the new java.nio.file package in Java 7.
One of the more interesting is the WatchService, adding the capability to watch a directory for changes.
   
The WatchService maps directly to the native file event notification mechanism, if available. If a native event notification mechanism is not available, then the default implementation will use polling. As a result, the responsiveness, ordering of events and details available are implementation specific.
 
Source Code:

Watching Directory:
We have to create a File Path for the specific directory we want to watch.
Path:
    Path faxFolder = Paths.get("C:\\foo");

The Path interface implements the register method that takes a WatchService object and varargs of type WatchEvent.Kind as arguments. There are 4 events to watch for:
ENTRY_CREATE
ENTRY_DELETE
ENTRY_MODIFY
OVERFLOW
While the first 3 types are easy to understand, OVERFLOW indicates that events may been lost or discarded. A WatchService is created by calling FileSystem.newWatchService().
Watching a directory is accomplished by registering a Path object with the WatchService:

WatchService watchService = FileSystems.getDefault().newWatchService();
 
faxFolder.register(watchService,   StandardWatchEventKinds.ENTRY_CREATE,  StandardWatchEventKinds.ENTRY_DELETE, 
StandardWatchEventKinds.ENTRY_MODIFY);

WatchKey watchKey = watchService.take();

As you can see above code register method returns a watchKey.  The WatchKey is a token that represents the registration of the Path with the WatchService.

The WatchKey
As a result of the registration process, the WatchKey is in a ‘ready’ state and is considered valid. A WatchKey remains valid until one of the following occurs:
  1. WatchKey.cancel() is called.
  2. The directory being watched is no longer available.
  3. The WatchService object is closed.
Checking For Changes
When a change is detected, the WatchKey state is set to ‘signaled’ and it is placed in a queue for processing.  Getting WatchKeys off the queue involves calling WatchService.poll() or
WatchService.take().
Here is a basic example:
WatchKey watchKey = watchService.poll(60,TimeUnit.SECONDS);
 //this will retrieve all the events for this watch key
List<WatchEvent.Kind<?>> events = watchKey.pollEvents(); 


boolean value = watchKey.reset()

The "reset" method sets the WatchKey state back to 'ready'(meaning listen for events) and returns a boolean indicating if the WatchKey is still valid. If there are any pending events, then the WatchKey will be re-queued, otherwise it will remain in the ready state until new events are detected.

How to process events
Now that we have detected an event, how do we determine:
  1. On which directory did the event happen? (assuming more than one directory registered)
  2. What was the actual event? (assuming listening for more than one event)
  3. What was the target of the event, i.e what Path object was created,deleted or updated?
//WatchKey watchable returns the calling Path object of Path.register
 Path watchedPath = (Path) watchKey.watchable();

for (WatchEvent<?> event : watchKey.pollEvents()) {
   
   //returns the event type
     StandardWatchEventKinds eventKind = event.kind();
  
   //returns the context of the event
     Path target = (Path)event.context();
}


There are two things that about the WatchService.
The WatchService does not pick up events for sub-directories of a watched directory.
We still need to poll the WatchService for events, rather than receive asynchronous notification.


 Here is the sample output:




Friday, April 5, 2013

RabbitMQ using Java

RabbitMQ is an open source message broker which implements AMQP messaging protocol. It also has multiple client libraries (Java, .NET, Erlang) which can be used to send/recieve AMQP messages to/from an AMQP broker.

It's based on Erlang/OTP which gives very good scalability. Finally, you should also look at QPID, an implementation by the Apache Foundation which is supported by Red Hat and Microsoft, and OpenAMQ.

In this post I’m going to explain how to use the RabbitMQ Java Client Library to send and Receive messages. Since RabbitMQ also can act as an AMQP server, I’ll be using it in this post.

It doesnt require any server.

What is AMQP?AMQP (Advanced Message Queuing Protocol) is a networking protocol that enables conforming client applications to communicate with conforming messaging middleware brokers.
Brokers and Their RoleMessaging brokers receive messages from producers (applications that publish them) and route them to consumers (applications that process them).
Since AMQP is a network protocol, the producers, consumers and the broker can all reside on different machines.

It implements a broker architecture, meaning that messages are queued on a central node before being sent to clients. This approach makes RabbitMQ very easy to use and deploy, because advanced scenarios like routing, load balancing or persistent message queuing are supported in just a few lines of code. However, it also makes it less scalable and slower because the central node adds latency and message envelopes are quite big.

Message Queue Servers
Message queue servers are available in various languages, Erlang (RabbitMQ), C (beanstalkd), Ruby (Starling or Sparrow), Scala (Kestrel, Kafka) or Java (ActiveMQ).

Sparrow
  • Sparrow is a lightweight queue written in Ruby that “speaks memcache”
Starling
  • Starling is a Message Queue Server based on MemCached
  • written in Ruby
  • stores jobs in memory (message queue)
Kestrel
  • Starling clone written in Scala (a port of Starling from Ruby to Scala)
  • Queues are stored in memory, but logged on disk
RabbitMQ
  • RabbitMQ is a Message Queue Server in Erlang
  • stores jobs in memory (message queue)
Apache ActiveMQ
  • ActiveMQ is an open source message broker in Java
Beanstalkd
Kafka
  • Written at LinkedIn in Scala
  • Used by LinkedIn to offload processing of all page and other views
  • Defaults to using persistence, uses OS disk cache for hot data (has higher throughput then any of the above having persistence enabled)
  • Supports both on-line as off-line processing
 HornetQ is also an option you can look into, it is JMS Complaint, a better option than ActiveMQ.

Nice working Example is given in below link

Source Code Link


Core Java to create a RabbitMQ connection. It is documented very well in RabbitMQ site.

create a connection to the server:
       ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.

we must declare a queue for us to send to; then we can publish a message to the queue:
        private final static String QUEUE_NAME = "hello";
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    String message = "Hello World!";
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
 it will only be created if it doesn't exist already.
close the channel and the connection;
        channel.close();
    connection.close();

 Reciever

        ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages.
   
    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);
    while (true) {
      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
      String message = new String(delivery.getBody());
      System.out.println(" [x] Received '" + message + "'");
    }