Comet/WebSocket? Introducing the Atmosphere framework

Pushing messages to connected clients has always been a need on the web, growing with the apparition of new Rich Internet Applications, like realtime feeds (Gmail, news, market quotes), social feeds (The Twitter, Facebook and consort) and many other providing realtime collaboration, monitoring and control like new Internet of Things applications (FlightRadar24, Arduino #1, #2).

Comet technics (ie. polling, long-polling, streaming) and more recently, the WebSocket protocol, have made possible various webpush applications.

Today, when you want to enable realtime push on your own Java-based webapp, you have several solutions:

In this article, I will focus on Atmosphere. After a brief presentation of the framework, I will demonstrate how easy it is to make push-capable applications, whatever the container you use and the nature of your clients.

Before we start, you can take a look at the following video, that demonstrates what I achieved thanks to the Atmosphere framework (more details below, of course):


http://youtu.be/1Abv88t5igc

What is Atmosphere ?

Atmosphere is a Java OpenSource framework led by Jean-Francois Arcand. Started around 2009, it evolved significantly and powers today websites like smartmoney.com. The project is very active (daily commits, lively mailing list, 300 Github followers). Currently in version 0.9, Atmosphere is close to the major 1.0 release planned for May.

One of the strength of the framework is that it is container agnostic: you can deploy an Atmosphere application in Jetty, Tomcat, JBoss, Grizzly,… Indeed, Atmosphere will seamlessly use specific compatibility modules and enable WebSocket depending on the detected container. The user does not need to learn container-specific Comet or WebSocket implementations anymore, only the way Atmosphere runs.

Atmosphere takes advantage of asynchronous I/O assuming the container supports it (e.g. Tomcat APR Connector). Indeed, as realtime connections are all maintained server side (either with WebSocket or Comet), clients are permanently connected. If you don’t use advanced mechanisms, available threads in connection pool can run out very quickly. Here comes async I/O, that breaks the 1 client / 1 thread paradigm and allows one server to handle numerous connected clients asynchronously. Of course, if your container can’t run async I/O, Atmosphere will fallback with common blocking thread per request.

Atmosphere can run in two modes: embedded or standalone. The first one is the classic approach and the one you generally choose when you start from scratch. The second one, called Nettosphere, relies on the combination of the Netty container and the Atmosphere framework, allowing you to extend your existing Java webapp with realtime push capabilities in a non-intrusive way. Moreover, Nettosphere fits well when you need to do integration test on your realtime resources as you can instantiate it programmatically on demand, similarly to the Jersey-test-framework.

Usually, when you want to achieve standard comet (long-polling, streaming) communication, you don’t have to worry about the underneath container: they are compatible with Atmosphere. About WebSocket nonetheless, the compatibility will depend on the version of the WebSocket specification implemented by the container running Atmosphere. To this day, Jetty, Grizzly, Netty and even Tomcat are some WebSocket-capable containers.

A variety of modules and plugins are bundled with the framework, letting the user benefit from Atmosphere in various way: you are able to write Atmosphere applications in Java, Scala, JRuby and Groovy, and use other frameworks such as Jersey, GWT, Spring or JSF.

Another interesting thing: Atmosphere was built to scale thanks to clustering plugins. These plugins actually wrap a clustering layer implementation among JMS, Redis, XMPP or Hazelcast to name a few…  So if one server is not enough, you have tools to scale-out as needed.

Among all Atmosphere’s components, we find the jQuery plugin: this Javascript API connects your web pages to your server and offers methods to exchange realtime data. It can detect client’s capabilities and switch between protocols as necessary via a fallback mechanism. Therefore, the same user experience can be offered from IE6 to Chrome18.

From now on, we will focus on the Jersey’s Atmosphere extension, very convenient as it largely reduces lines to code thanks to JAX-RS implementation (annotations, json mapping, etc…).

Grab your keyboard, launch your Eclipse!

To demonstrate the use of Atmosphere, I chose to build an application that draws realtime events on a map: user can generate events by clicking on the map, and server can generate random events. A generated event (by either client or server) is delivered to each connected clients.

Thus, this application – called MapPush (I was not able to find anything more explicit!) – is composed of:

  • The client, a simple HTML page that uses Javascript/jQuery to process logic
  • The server, a JEE webapp backed by Atmosphere and Jersey frameworks

Note: the application is hosted on Github at the address http://github.com/ncolomer/MapPush. All the snippets below are extracted from this project. Feel free to browse the source code or clone the project!

Project bootstrap

In Eclipse, create a Maven project with a “webapp” archetype. As Atmosphere is available on Sonatype repositories (snapshotsreleases), you can simply add the following to your pom.xml :

<!-- Sonatype repositories -->
<repositories>
	<repository>
		<id>Sonatype snapshots</id>
		<url>https://oss.sonatype.org/content/repositories/snapshots</url>
	</repository>
	<repository>
		<id>Sonatype releases</id>
		<url>https://oss.sonatype.org/content/repositories/releases</url>
	</repository>
</repositories>

<!-- Dependencies -->
<dependencies>
	<!-- Atmosphere -->
	<dependency>
		<!-- Atmosphere's Comet Portable Runtime (CPR) -->
		<groupId>org.atmosphere</groupId>
		<artifactId>atmosphere-runtime</artifactId>
		<version>0.9.7</version>
	</dependency>
	<dependency>
		<!-- Atmosphere's Jersey module -->
		<!-- Transitivity will pull all necessary dependencies -->
		<!-- ie. Jersey 1.10 core, server, etc... -->
		<groupId>org.atmosphere</groupId>
		<artifactId>atmosphere-jersey</artifactId>
		<version>0.9.7</version>
	</dependency>
	<dependency>
		<!-- Atmosphere's jQuery plugin -->
		<groupId>org.atmosphere</groupId>
		<artifactId>atmosphere-jquery</artifactId>
		<version>0.9.7</version>
		<type>war</type>
	</dependency>
	<!-- Jersey's JSON mapper -->
	<dependency>
		<groupId>com.sun.jersey</groupId>
		<artifactId>jersey-json</artifactId>
		<version>1.10</version>
	</dependency>
</dependencies>

To start the framework we need a servlet, and not any… an AtmosphereServlet! In case the Atmosphere’s Jersey module is detected at load time, the framework will wrap around the Jersey Servlet (ContainerServlet) to load our resources and extend it with Atmosphere capabilities (IoC, annotations, etc…). Therefore, Jersey’s init-params are still available.

Open the web.xml file of your project and paste the following snippet:

<!-- Atmosphere -->
<servlet>
    <description>AtmosphereServlet</description>
    <servlet-name>AtmosphereServlet</servlet-name>
    <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
    <init-param>
        <!-- Jersey base package of your resources -->
        <param-name>com.sun.jersey.config.property.packages</param-name>
        <param-value>org.mappush.resource</param-value>
    </init-param>
    <init-param>
        <!-- Enable Jersey's JSON mapping feature -->
        <param-name>com.sun.jersey.api.json.POJOMappingFeature</param-name>
        <param-value>true</param-value>
    </init-param>
    <load-on-startup>0</load-on-startup>
</servlet>
<servlet-mapping>
    <servlet-name>AtmosphereServlet</servlet-name>
    <url-pattern>/api/*</url-pattern>
</servlet-mapping>

We are now ready to code both client and server application.

The server

If you have already used the official JAX-RS implementation, Jersey, you won’t be disapointed. For others, you’ll find out that programming Atmosphere push APIs is really… dead simple!

Among the essential pieces of Atmosphere, one is called Broadcaster. This is the object that manages connected clients and delivers broadcasted messages to them. A connected client is seen as an AtmosphereResource by the framework.

In the push communication lifecycle, we can define three steps:

  • the client sends a request, that is suspended (in case of comet) or upgraded (in case of websocket) by the server.
  • then, server and client can exchange data : server generally broadcasts/pushes data to clients, and in case of a duplex protocol like WebSocket, client can send data back to it.
  • finally, either one closes the connection.

Atmosphere offers two convenient ways to handle this lifecycle: you can choose between annotation or programmatic API (or even a combination of both), the last one allowing more customization from my point of view.

  • To suspend a request, use the @Suspend annotation, or simply return a SuspendResponse
  • To broadcast data, use the  @Broadcast annotation and simply return a Broadcastable. For a more specific use of broadcast mechanism, you are able to manually deal with the Broadcaster and its broadcast(…) methods to push messages to all, a subset or a particular AtmosphereResource.

Here is an exemple of Atmosphere resource using the main concepts we have described above:

@Path("/")
@Singleton
public class EventResource {

	private final Logger logger = LoggerFactory.getLogger(EventResource.class);

	private EventListener listener;
	private EventGenerator generator;

	private @Context BroadcasterFactory bf;

	/**
	 * Programmatic way to get a Broadcaster instance
	 * @return the MapPush Broadcaster
	 */
	private Broadcaster getBroadcaster() {
		return bf.lookup(DefaultBroadcaster.class, "MapPush", true);
    }

	/**
	 * The @PostConstruct annotation makes this method executed by the
	 * container after this resource is instanciated. It is one way
	 * to initialize the Broadcaster (e.g. by adding some Filters)
	 */
	@PostConstruct
	public void init() {
		logger.info("Initializing EventResource");
		BroadcasterConfig config = getBroadcaster().getBroadcasterConfig();
		config.addFilter(new BoundsFilter());
		listener = new EventListener();
		generator = new EventGenerator(getBroadcaster(), 100);
	}

	/**
	 * When the client connects to this URI, the response is suspended or
	 * upgraded if both client and server arc WebSocket capable. A Broadcaster
	 * is affected to deliver future messages and manage the
	 * communication lifecycle.
	 * @param res the AtmosphereResource (injected by the container)
	 * @param bounds the bounds (extracted from header and deserialized)
	 * @return a SuspendResponse
	 */
	@GET
	@Produces(MediaType.APPLICATION_JSON)
	public SuspendResponse<String> connect(@Context AtmosphereResource res,
			@HeaderParam("X-Map-Bounds") Bounds bounds) {
		if (bounds != null) res.getRequest().setAttribute("bounds", bounds);
		return new SuspendResponse.SuspendResponseBuilder<String>()
				.broadcaster(getBroadcaster())
				.outputComments(true)
				.addListener(listener)
				.build();
	}

	/**
	 * This URI allows a client to send a new Event that will be broadcaster
	 * to all other connected clients.
	 * @param event the Event (deserialized from JSON by Jersey)
	 * @return a Response
	 */
	@POST
	@Path("event")
	@Consumes(MediaType.APPLICATION_JSON)
	public Response broadcastEvent(Event event) {
		logger.info("New event: {}", event);
		getBroadcaster().broadcast(event);
		return Response.ok().build();
	}

	// ...

}

Listen for client messages

In case of a duplex protocol, we said a client can send data back to the server using the connection. But how do we handle such messages? Atmosphere provides a listener mechanism, implementable via interfaces such as the AtmosphereResourceEventListener or its WebSocket specialization, the WebSocketEventListener. The WebSocket interface exposes useful methods like onConnectonHandshakeonDisconnect, or onMessage. Listeners are declared when the client connects (ie. when suspending a response).

public class EventListener extends WebSocketEventListenerAdapter {

	private final Logger logger = LoggerFactory.getLogger(EventListener.class);

	@Override
	public void onMessage(WebSocketEvent event) {
		Bounds bounds = JsonUtils.fromJson(event.message(), Bounds.class);
		if (bounds == null) return;
		logger.info("New bounds {} for resource {}",
				event.message(), event.webSocket().resource().hashCode());
		AtmosphereRequest req = event.webSocket().resource().getRequest();
		req.setAttribute("bounds", bounds);
	}

}

Filter your broadcasted messages

You may also have observed the init() method… but what is done inside exactly ?

A possibility offered by the framework is the ability to include logic before delivering message to each connected client. We can achieve that thanks to the BroadcastFilter interface and its specialization, the PerRequestBroadcastFilter interface. The first interface allows to execute logic once (common to all clients) whereas the second one can apply to each client according to their associated context (session data, first connection headers, etc…).

Each BroadcastFilter can CONTINUE or ABORT a broadcast. While the broadcast is not aborted, the filter chain is executed in order (BroadcastFilters first, PerRequestBroadcastFilters then) and finally delivered (or not) to each client. The following snippet is an example of PerRequestBroadcastFilter implementation:

public class BoundsFilter implements PerRequestBroadcastFilter {

    private final Logger logger = LoggerFactory.getLogger(BoundsFilter.class);

    @Override
    public BroadcastAction filter(Object originalMessage, Object message) {
        return new BroadcastAction(ACTION.CONTINUE, originalMessage);
    }

    @Override
    public BroadcastAction filter(AtmosphereResource res,
            Object originalMessage, Object message) {
        logger.info("BoundsFilter triggered for AtmosphereResource {} "+
				"with message {}", res.hashCode(), message);
        Event event = (Event) message;
        try {
            Bounds bounds = (Bounds) res.getRequest().getAttribute("bounds");
            if (bounds == null) throw new NoBoundsException("no bounds");
            if (bounds.contains(event)) {
                String json = JsonUtils.toJson(event); // Manual serialization
                return new BroadcastAction(ACTION.CONTINUE, json);
            } else {
                return new BroadcastAction(ACTION.ABORT, message);
            }
        } catch (NoBoundsException e) {
            logger.info("Applying default action CONTINUE, cause: {}",
					e.getMessage());
            String json = JsonUtils.toJson(event); // Manual serialization
            return new BroadcastAction(ACTION.CONTINUE, json);
        } catch (Exception e) {
            logger.info("Filter BoundsFilter aborted, cause: {}",
					e.getMessage());
            return new BroadcastAction(ACTION.ABORT, message);
        }
    }

}

The client

Now that all is ready server side, we are able to connect our clients to the realtime endpoint. If you plan to use WebSocket, several clients are compatible with Atmosphere. Let’s focus on Java and Javascript ones:

  • In java, you find several projects like Java-WebSocket or async-http-client. Some webapp containers also provide a WebSocket client implementation (e.g. Jetty)
  • In Javascript, most of recent browsers implement the WebSocket interface. But, in case the browser is not compatible with WebSocket, this API can’t fallback into another protocol. Here comes the jQuery Atmosphere Plugin and its fallback mechanism.

In our case, you probably guessed, we’ll use the Atmosphere jQuery plugin. Please note that due to the jQuery dependency, we have to import the jQuery library in addition to the Atmosphere plugin.

Once done, connect to the server is no more complicated than the following connect() javascript routine:

var endpoint;
function connect() {

    var callback = function callback(response) {
        // Websocket events.
        if (response.state == "opening") {
            console.log("Connected to realtime endpoint");
        } else if (response.state == "closed") {
            console.log("Disconnected from realtime endpoint");
        } else if (response.transport != 'polling' &&
                response.state == 'messageReceived') {
            if (response.status == 200) {
                var data = response.responseBody;
                if (data.length > 0) {
                    statsAgent.notify();
                    console.log("Message Received: " + data +
                            " & detected transport is " + response.transport);
                    var json = JSON.parse(data);
                    mapsAgent.drawEvent(json);
                }
            }
        }
    };

    var bounds = mapsAgent.getBounds();
    var header = bounds.southLat + "," + bounds.northLat +
            "," + bounds.westLng + "," + bounds.eastLng;
    endpoint = $.atmosphere.subscribe(url, callback, {
        transport: 'websocket',
		/* available transports: websocket, jsonp, long-polling,
			polling, streaming */
        attachHeadersAsQueryString: true,
        headers: {"X-Map-Bounds": header}
    });
}

You can see that we attach headers when connecting. They are used here to transmit some client context when connecting (actually, the current bounds of the map). As the WebSocket handshake doesn’t allow the client to pass any headers, they have to be serialized in the query string, justifying the attachHeadersAsQueryString: true entry. On the server side, the query string will be translated to headers so application wise, you don’t have to care about the difference.

To process message pushed from the server, we can add a callback. It will be triggered on each received message, passing a response (Javascript object) containing values as status, state, transport etc…

The endpoint variable – that stores the connection – was made global to be used when you want to disconnect the client from the realtime endpoint or push data to the server. The following snippet shows you the two corresponding javascript routines.

function disconnect() {
	$.atmosphere.unsubscribe();
	endpoint = null;
}

function update(bounds) {
	console.log("### Map bounds changed:", JSON.stringify(bounds));
	if (!endpoint) return;
	endpoint.push(JSON.stringify(bounds));
}

Testing your WebSocket resources

In addition to regular browser testing, you can easily try your realtime URIs with a shell and cURL:

curl -v -N -XGET http://localhost:8080/MapPush/api
  • -v/--verbose: Make the operation more talkative
  • -N/--no-buffer: Disable buffering of the output stream
  • -X/--request <command>: Specify request command to use

With the second option, you will be able to observe all data sent by the server. Nonetheless, note that you’ll not be able to send data back to it.

cURL also allows you to send headers with the request:

boundsHeader='48.0,49.0,2.0,3.0'
curl -v -N -XGET http://localhost:8080/MapPush/api -H "X-MAP-BOUNDS: $boundsHeader"
  • -H/--header <line>: Custom header to pass to server

You can also go deeper and analyse WebSocket frames with tools such as ngrep or Wireshark: these are a bit more complex tools but it may become very useful in some situations!

Conclusion

Atmophere provides a powerful ecosystem that simplifies the creation of push applications and makes easy full-duplex communication between a server and any kind of client. Its intensive use of async I/O and its ready-to-use clustering plugins give it both performance and scalability. Moreover, the project is under Open source Apache license, the community is growing quickly and the last 0.9 version is stabilizing fast.

In short, the Atmosphere framework has a bright future :)

Additional Resources

  • Atmosphere project is hosted on GitHub:
  • The community is reachable via Atmosphere’s Google Group
  • You can also follow Atmosphere via Jean-Francois Arcand’s blog and Twitter
Leave a comment ?

17 Comments.

  1. Thanks for a great writeup on Atmosphere. Have you tried if it works on IE?

  2. Hey thanks for the excellent writeup and providing the code on GitHub. I wish all software was this easy to use :)

  3. It seems like you are having fun with this framework. I might use it for my project. Talk to you later. ;)

  4. The framework seems to be nice but I cannot build your maven project. It says: “org/mappush/resource/EventResource.java:[100,4] error: cannot access JResponse” :(

  5. Great tutorial.
    I have one question, and is if you are sure that events out of client’s bounds are not received. I opened the console both in Firefox and Chrome and they where receiving all the events. The app is deployed in tomcat 6 (AFAIK supports streaming).

    • Hi Jon, thanks for your message!
      An event that is triggered in one client will be received by another client only if the fired event is in its map bounds. To reproduce this behavior, open two clients, zoom in France in one of them, and trigger two events (e.g. USA and France) in the other one. The first client should receive only the event triggered on France.
      You can take a look at the sendMessage_withBoundsHeader_shouldBroadcast() JUnit test case that checks this behavior.

  6. Hey, thanks for this tutorial. I tried to run this on GlassFish B23, I had to change the script.js line 163 to socket = $.atmosphere.subscribe(apiUrl, null, request); but when connecting, this error is sent : GRAVE: 14840 [http-thread-pool-8080(1)] WARN org.atmosphere.container.GlassFishWebSocketSupport – failed to connect to web socket…but WebSocket are enabled in my Glassfish (checked in domain.xml)…any idea ?

    • Hi Yannick,
      I’m surprised you had to change the line #163 of script.js: according to the 0.9 migration note (see this article), the right method signature is now socket.subscribe(request).
      I also checked the Atmosphere / GlassFish compatibility: the atmosphere-runtime dependency pulls by default the atmoshpere-compat-tomcat7 jar (see this thread), so this should not be the issue.

  7. Hi Nicolas,

    Thanks for sharing your work.

    I’m experiencing some issue, Just wondering if you can help:

    I have deployed the application on jetty server 8. All works fine. No issues.
    Now I’m trying to deploy the same app on Tomcat 7.0.29. When i try to press the Connect button on the UI, in the firebug I can see the following:
    Message Received using websocket: java.lang.NumberFormatException: empty String
    scripts.js (line 156)

    Tomcat logs has the following:
    23:50:02.833 [http-bio-8080-exec-6] WARN o.a.w.protocol.SimpleHttpProtocol – St
    atus code higher than 400 Status 400 Message OK
    23:50:02.833 [http-bio-8080-exec-6] WARN o.a.w.protocol.SimpleHttpProtocol – No
    AtmosphereResource has been suspended. The WebSocket will be closed: /MapPush/
    api Status 400 Message OK

    Can you please advise?
    Thanks
    JP

    • Hi JP, thanks for your feedback!
      According to a recent @jfarcand’s tweet, Tomcat 7.0.29 release brings some backward incompatibilities on its WebSocket API (WebSocket implementation is still new in Tomcat). Hopefully, the latest version of Atmosphere handles this perfectly.
      After succeeding in reproduce the problem, I updated the Atmosphere dependency from 0.9.1 to 0.9.7 and finally made it run as expected. Just git pull and redeploy on your Tomcat ;)

  8. I try to deploy to tomcat 7.0.27 and tomcat7.0.29 both FF/Chrome work fine, but IE 8 & 9 does not work. IE sends message back to server, but IE could not get back message from server. IE sends message to server, and server can send message to FF/Chrome, but not IE.

    • Hi Frank,
      I succeed to reproduce the issue and get some NPE and ISE server-side using IE. However, as mentioned in this wiki page, Tomcat 7.0.27+ is not yet fully supported by Atmosphere.
      This is due to the youth (and related issues) of the Tomcat’s WebSocket implementation. Please refer to the link above to get some workarounds.

  9. Hi,Nicolas

    I’m a student from Taiwan, I’m very newbie in Atmosphere and glassfish

    just want to say thank you for this tutorial.

    and that’s one more thing I want to ask, I tried to run this on GlassFishv3
    could you give me some advice? it seems like need some maven plugin to use glassfish?

    it will be very appreciated if you can help!

    thanks

    • Hi Arvin,
      Just git clone the project and run mvn package to generate a war file (You need both Git and Maven).
      I never used GlassFish but I suppose you just have to put the generated war somewhere in the GlassFish’s hierarchy (as the webapps/ directory for Tomcat).
      If you are looking for a simpler way, just run mvn jetty:run as described in the readme to start a Jetty container with a running MapPush instance.

Leave a Comment