Non-Blocking Socket I/O in JDK 1.4

6/13/2006 - I wrote this tutorial in the early days of NIO in the JSE. NIO is fairly tricky to deal with and so now I recommend that people use MINA from the Apache project. I leave this tutorial here for historical reasons. Best regards, Tim.

By Tim Burns

Last changed: 12/14/2001 7:11 AM

 

Non-Blocking Socket I/O in JDK 1.4. 1

Selectors and Channels. 1

Serving Sockets with Keys. 2

Writing to a Non-Blocking Socket 3

Reading From a Non-Blocking Socket 3

The Callback Pattern. 4

Context 4

Problem.. 4

Solution. 4

Conclusions. 4

Thanks. 4

References. 4

Source Code. 4

 

JDK 1.4 provides developers non-blocking I/O on both sockets and files.  For Java network programmers, non-blocking I/O is very exciting, because it makes writing scalable, portable socket applications simpler.

 

Previously, Java programmers would have to deal with multiple socket connections by starting a thread for each connection.  Inevitably, they would encounter issues such as operating system limits, deadlocks, or thread safety violations.  Now, the developer can use selectors to manage multiple simultaneous socket connections on a single thread.   This article deals with how to write a simple non-blocking server using the new I/O in JDK 1.4 for non-blocking sockets. 

Selectors and Channels

Non-blocking I/O is built around the Selector object for multiplexing selectable channels.  The Selector object maintains a set of selected keys that may be active at a given time in a server program.   Keys can be added or removed from the selection channel.  Each key corresponds to a client connection.  The selector itself manages the keys and the programmer uses the key-state to manage callbacks to perform individual client requests.

 

A Selector object can be created using itself as a factory or using a SelectorProvider factory.  The external SelectorProvider factory is available because programmers may want to take advantage of existing proprietary non-blocking socket libraries for higher or tuned performance.

 

The simplest method of creating a socket is using the following command:

 

            Selector selector = Selector.open();

 

Similiarly, one can use the SelectorProvider factory:

 

            Selector selector = SelectorProvider.provider().openSelector();

 

To demultiplex incoming data, create a channel and register that channel with the Selector.  Initially, one should configure a ServerSocketChannel to accept new connections by binding it to a port on the local host.

 

      ServerSocketChannel channel = ServerSocketChannel.open();

      channel.configureBlocking(false);

      InetAddress lh = InetAddress.getLocalHost();

      InetSocketAddress isa = new InetSocketAddress(lh, port );

      channel.socket().bind(isa);

 

 

Channels should be registered according to the task they will perform in the program.  For instance, the channel that accepts new connections should be registered as follows:

 

SelectionKey acceptKey = channel.register( selector,
SelectionKey.OP_ACCEPT );

 

A channel that reads and writes data will be registered as follows:

 

SelectionKey readWriteKey = channel.register( selector,
SelectionKey. OP_READ| SelectionKey. OP_WRITE );

 

The selector works by returning keys when a client sends a request.  Generally, one would select in a loop.

 

 

      while ((keysAdded = selector.select()) > 0) {

          Set readyKeys = selector.selectedKeys();

          Iterator i = readyKeys.iterator();

 

          // Walk through the ready keys collection and process
          // requests.

          while (i.hasNext()) {

            SelectionKey sk = (SelectionKey)i.next();

            … (Accept the connection and process the request)

          }

      }

 

Serving Sockets with Keys

The key in the previous example will either be acceptable, readable, writable, or connectable.   The boolean check on the flag is key.isAcceptable(), key.isWritable(), key.isReadable() and  key.isConnectable().  On a new connection from the server, the key returned by the selector is an acceptable key.  The selector will return keys on I/O events that correspond to the event and will either be writeable or readable.

 

On a new connection a key is set isAcceptable().  During the connection, we will have to read and write to and from that socket, but we can’t write to the socket until the key is set to isWritable() and we can’t read until it is set to isReadable().  In the meantime, we attach the Socket to the key so we can pick it up on the callback.

 

                if ( key.isAcceptable() ) {

                    ServerSocketChannel nextReady =

                        (ServerSocketChannel)key.channel();

                    SocketChannel channel = nextReady.accept();

                    channel.configureBlocking( false );

                    SelectionKey readKey =

                        channel.register( this.selector,

                            SelectionKey.OP_READ|SelectionKey.OP_WRITE  );

                    readKey.attach( channel );

                    this.callback.put( channel, new ChannelCallback( channel ) );

                }

 

 

The Selector initiates a callback after the accept() operation and we can put the greeting.  On the main selection loop the selector returns a selection for the connection and another once that key is writable.

 

        while (( this._keysAdded = acceptKey.selector().select()) > 0 ) {

 

            while (i.hasNext()) {

                SelectionKey key = (SelectionKey)i.next();

                i.remove();

               

                if ( key.isAcceptable() ) {

                     …(Accept the socket channel)

                }

                else if ( key.isWritable() ) {

                     … (Write to the socket channel)

 

                }

            }

        }

 

When the client sends a request to the server, the Selector returns a key with isReadble() true.  There is new data pending so we should process that data.  Note that during the accept process, we attached the Socket to the channel, so we can retrieve it again for reading and writing.  Sockets become writable once a read-request has been completed, so we don’t need to check in this case if the socket is writable.

 

                else if ( key.isReadable() ) {

                    SelectableChannel nextReady =

                        (SelectableChannel) key.channel();

                    (… Read and write stuff to and from the socket)

                }

 

Writing to a Non-Blocking Socket

Writing to a non-blocking socket is a little tricky.  Here is what the naïve programmer might do:

 

                else if ( key.isWritable() ) {

                    Socket socket = (Socket) key.attachment();

                    PrintWriter out =

                            new PrintWriter( socket.getOutputStream(), true );

                    out.println( "What is your name? " );

                }

 

The problem with this code is that the PrintWriter blocks I/O and does not support the underlying asynchronous I/O mechanisms.  To deal with this problem, we cannot use any of the standard I/O utilities, but instead must wrap our message in  a ByteBuffer object and send it through the SocketChannel object.


                else if ( key.isWritable() ) {

                    SocketChannel channel = (SocketChannel) key.attachment();

                    String message = "What is your name? ";

                    ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );

                    int nbytes = channel.write( buf );

                }

 

Reading From a Non-Blocking Socket

Non-blocking socket reads also rely on the ByteBuffer.  Since we don’t know how much data we will receive, it is a bit more sophisticated than writing to a socket. 

 

        ByteBuffer byteBuffer = ByteBuffer.allocate( BUFSIZE );

        int nbytes = channel.read( byteBuffer );

 

When the channel has been read, the byteBuffer is not yet ready for decoding.  You need to fix the limit at the current position, and then set the current position to zero before decoding the message.

 

     byteBuffer.flip();

 

Extracting the data from the ByteBuffer requires the package java.nio.charset.*.  This package has three abstract classes: Charset, CharsetDecoder, and CharsetEncoder.  Generally, one will decode ByteBuffer messages using CharsetDecoder.

 

        Charset charset = Charset.forName( "us-ascii" );

        CharsetDecoder decoder = charset.newDecoder();

        CharBuffer charBuffer = decoder.decode( byteBuffer );

        String result = charBuffer.toString();

 

The Callback Pattern

Context

The context of the pattern is that we have multiple reads coming in from multiple sources. 

Problem

We cannot execute a command until we get a complete message.  Messages may be arriving from multiple clients in any order.  We need to complete the messages and execute a command when the messages are complete.  The design should be simple.

Solution

Attach an ChannelCallback object to every unique socket containing that can service an append(String) message and a execute() message.  Append the results of the read to the object every time a new ByteBuffer is received.  If a read is complete, execute the callback.

Conclusions

Non-blocking I/O is a very exciting development in Java.  It will make writing portable, scalable server-side applications in Java simpler.  To use it, you need to use the package java.nio.* and the subpackages.  The java.nio.* package contains the buffers needed to marshall the data in and out of the channel, the java.nio.channels.* package contains the objects needed to perform the I/O operations, and the java.nio.charset is needed to translate byte data to and from specified character sets such as US-ASCII, UTF-8, etc.

Thanks

Thanks to members of the Saint Louis Developers Roundtable, especially Brian Button, and Edwin van der Elst.

 

References

[Sun01] Sun Microsystems, “New I/O APSs”, http://java.sun.com/j2se/1.4/docs/guide/nio.

[Lea99] Lea, Doug.  Concurrent Programming in Java, Addison Wesley, 1999.

[Ous96] Ousterhout, John.  Why threads are a Bad Idea (for Most Purposes)”, USENIX Technical Conference, 1996

[Ren98] Renesse, Robbert van, Goal-Oriented Programming, or Composition Using Events, or Threads Considered Harmful”, ACM SIGOPS 98.

[Ver96] Allan Vermeulen, “An Asynchronous Design Pattern”, Dr. Dobbs Journal, June, 1996.

 

Source Code

The source code here is derived from Sun’s example server [Sun01].

package com.owlmountain.concurrent;

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.channels.spi.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
import org.apache.log4j.*;

public class NonBlockingServer {

    int port = 4001;
    Selector selector = null;
    ServerSocketChannel selectableChannel = null;
    int keysAdded = 0;

    static Category log =
        Category.getInstance(NonBlockingServer.class.getName());

    static String QUIT_SERVER = "quit";
    static String SHUTDOWN = "shutdown";

    public NonBlockingServer() {
    }

    public NonBlockingServer( int port ) {
        this.port = port;
    }

    public void initialize()

    throws IOException {
        this.selector = SelectorProvider.provider().openSelector();
        this.selectableChannel = ServerSocketChannel.open();
	this.selectableChannel.configureBlocking(false);
	InetAddress lh = InetAddress.getLocalHost();
	InetSocketAddress isa = new InetSocketAddress(lh, this.port );
	this.selectableChannel.socket().bind(isa);
    }

    public void finalize()

    throws IOException {
        this.selectableChannel.close();
        this.selector.close();
    }

    public void acceptConnections()
    throws IOException, InterruptedException {

      SelectionKey acceptKey =
            this.selectableChannel.register( this.selector,
                SelectionKey.OP_ACCEPT );

        log.debug( "Acceptor loop..." );
        while (( this.keysAdded = acceptKey.selector().select()) > 0 ) {

            log.debug( "Selector returned "
                + this.keysAdded + " ready for IO operations" );

            Set readyKeys = this.selector.selectedKeys();
            Iterator i = readyKeys.iterator();
 
            while (i.hasNext()) {
                SelectionKey key = (SelectionKey)i.next();
                i.remove();

                if ( key.isAcceptable() ) {
                    ServerSocketChannel nextReady =
                        (ServerSocketChannel)key.channel();

                    log.debug( "Processing selection key read="
                        + key.isReadable() + " write=" + key.isWritable() +
                        " accept=" + key.isAcceptable() );

                    SocketChannel channel = nextReady.accept();
                    channel.configureBlocking( false );
                    SelectionKey readKey =
                        channel.register( this.selector,
                            SelectionKey.OP_READ|SelectionKey.OP_WRITE  );
                    readKey.attach( new ChannelCallback( channel ) );
                }

                else if ( key.isReadable() ) {
                    SelectableChannel nextReady =
                        (SelectableChannel) key.channel();
                    log.debug( "Processing selection key read="
                        + key.isReadable() + " write=" + key.isWritable() +
                        " accept=" + key.isAcceptable() );
                    this.readMessage( (ChannelCallback)  key.attachment() );
                }

                else if ( key.isWritable() ) {
                    ChannelCallback callback = (ChannelCallback) key.attachment();
                    String message = "What is your name? ";
                    ByteBuffer buf = ByteBuffer.wrap( message.getBytes() );
                    int nbytes = callback.getChannel().write( buf );
                }
            }
        }

        log.debug( "End acceptor loop..." );

    }

 

    public void writeMessage( SocketChannel channel, String message )
    throws IOException {
        ByteBuffer buf = ByteBuffer.wrap( message.getBytes()  );
        int nbytes = channel.write( buf );
        log.debug( "Wrote " + nbytes + " to channel." );
    }

 

    static final int BUFSIZE = 8;

    public String decode( ByteBuffer byteBuffer )
    throws CharacterCodingException {
        Charset charset = Charset.forName( "us-ascii" );
        CharsetDecoder decoder = charset.newDecoder();
        CharBuffer charBuffer = decoder.decode( byteBuffer );
        String result = charBuffer.toString();
        return result;
    }

    public void readMessage( ChannelCallback callback )
    throws IOException, InterruptedException {
        ByteBuffer byteBuffer = ByteBuffer.allocate( BUFSIZE );
        int nbytes = callback.getChannel().read( byteBuffer );
        byteBuffer.flip();
        String result = this.decode( byteBuffer );
        log.debug( result );
        if ( result.indexOf( "quit" ) >= 0 ) callback.getChannel().close();
        else if ( result.indexOf( "shutdown" ) >= 0 ) {
            callback.getChannel().close();
            throw new InterruptedException();
        }
        else {
            callback.append( result.toString() );
            //If we are done with the line then we execute the callback.
            if ( result.indexOf( "\n" ) >= 0 )
                callback.execute();
        }
    }

    public class ChannelCallback {
        private SocketChannel channel;
        private StringBuffer buffer;

        public ChannelCallback( SocketChannel channel ) {
            this.channel = channel;
            this.buffer = new StringBuffer();
        }

        public void execute() throws IOException {
            log.debug( this.buffer.toString() );
            writeMessage( this.channel, this.buffer.toString() );
            buffer = new StringBuffer();
        }

        public SocketChannel getChannel() {
            return this.channel;
        }

        public void append( String values ) {
            buffer.append( values );
        }

    }

 

    public static void main( String[] args ) {
        BasicConfigurator.configure();

        NonBlockingServer nbServer = new NonBlockingServer();

        try {
            nbServer.initialize();
        } catch ( IOException e ) {
            e.printStackTrace();
            System.exit( -1 );
        }

        try {
            nbServer.acceptConnections();
        }

        catch ( IOException e ) {
            e.printStackTrace();
            log.error( e );
        }

        catch ( InterruptedException e ) {
            log.info( "Exiting normally..." );
        }

    }

}