Monday, December 22, 2014

Getting started: Cassandra + Spark with Vagrant

I play with a lot of different technologies and I like to keep my work stations clean. I do this by having a lot of vagrant VMs. My latest is Apache Spark with Apache Cassandra. We're going to install a working setup of Cassandra/Spark using Vagrant and Ansible. The Vagrant/Ansible is on Github here.

To get going you'll need:
If you haven't used Ansible before ignore all the paid for Ansible Tower and install it with your favourite package manager e.g homebrew or apt. 

Once that's installed checkout the Vagrant file.

Then launch the VM with vagrant up. This can take some time as it actually installs:
  • Java
  • Cassandra
  • Spark
  • Spark Cassandra connector
I could have baked a virtual box with all this in but the Ansible also documents you install all of these (and me once I've forgotten). As well as being slow it has the disadvantage that if downloads Cassandra/Spark so if their repositories are down it won't work.

The VM runs on port Your Spark master should be up and running on

You'll also have ops centre installed at:

To add the cluster simply click "Add existing cluster.." then enter the IP

If you want to use cqlsh then simply "vagrant ssh" in and then run "cqlsh"

To get spark shell up and running just "vagrant ssh" in and then run the spark-shell command:

Spark shell has been aliased to include the Cassandra spark connector so you can start using Cassandra backed RDDs right away!

Any questions or problems just ping me on twitter: @chbatey

Monday, December 8, 2014

Streaming large payloads over HTTP from Cassandra with a small Heap: Java Driver + RX-Java

Cassandra's normal use case is vast number of small read and write operations distributed equally across the cluster.

However every so often you need to extract a large quantity of data in a single query. For example, say you are storing customer events that you normally query in small slices, however once a day you want to extract all of them for a particular customer. For customers with a lot of events than this could be many hundreds of megabytes of data.

If you want to write a Java application that executes this query against a version of Cassandra prior to 2.0 then you may run into some issues. Let us look at the first one..

Coordinator out of memory:

Previous versions of Cassandra used to bring all of the rows back to the coordinator before sending them to your application, so if the result is too large for the coordinator's heap it would run out of memory.

Let's say you had just enough memory in the coordinator for the result, then you ran the risk of...

Application out of memory:

To get around this you had to implement your own paging, where you split the query into many small queries and processed them in batches. This can be achieved by limiting the results and issuing the next query after the last result of the previous query.

If your application was streaming the results over HTTP then the architecture could look something like this:

Here we place some kind of queue, say an ArrayBlockingQueue if using Java, between the thread executing the queries and the thread streaming it out over HTTP. If the queue fills up the DAO thread is blocked, meaning that it won't bring any more rows from Cassandra. If the DAO gets behind the WEB thread (perhaps a tomcat thread) blocks waiting to get more rows out of the queue. This works very nicely with the JAX-RS StreamingOutput.

This all sounds like a lot of hard work...

The 2.0+ solution

From version 2.0, Cassandra would no longer suffer from the coordinator out of memory. This is because the coordinator pages the response to the driver and doesn't bring the whole result into memory. However if your application reads the whole ResultSet into memory then your application running out of memory is still an issue.

However the DataStax driver's ResultSet pages as well, which works really nicely with Rx-Java and JAX-RS StreamingOutput. Time go get real, let's take the following schema:

And you want to get all the events for a particular customer_id (the partition key). First let's write the DAO:

Let's go through this line by line:

2: Async Execute of the query that will bring back more rows that will fit in memory.
4: Convert the ListenableFuture to an RxJava Observable. The Observable has a really nice callback interface / way to do transformation.
5: As ResultSet implements iterable we can flatMap it to Row!
6: Finally map the Row object to CustomerEvent object to prevent driver knowledge escaping the DAO.

And then let's see the JAX-RS resource class:

Looks complicated but it really isn't, first a little about JAX-RS streaming.

The way JAX-RS works is we are given a StreamingOutput interface which we implement to get a hold of the raw OutputStream. The container e.g Tomcat or Jetty, will call the write method. It is our responsibility to keep the container's thread in that method until we have finished streaming. With that knowledge let's go through the code:

5: Get the Observable<CustomerEvent> from the DAO.
6: Create a CountDownLatch which we'll use to block the container thread.
7: Register a callback to consume all the rows and write them to the output stream,
12: When the rows are finished, close the OutputStream.
16: Countdown the latch to release the container thread on line 33.
26: Each time we get a CustomerEvent, write it to the OutputStream.
33: Await on the latch to keep the container thread blocked.
39: Return the StreamingOutput instance to the container so it can call write.

Given that we're dealing with the rows from Cassandra asynchronously you didn't expect the code to be in order did you? ;)

The full working example is on my GitHub. To test it all I put around 600mb of data in a Cassandra cluster for the same partition. There is a sample class in the test directory to do this.

I then started the application with a MaxHeapSize of 256mb, then used curl to hit the events/stream endpoint:

As you can see 610M came back in 7 minutes. The whole time I had VisuamVM attached to the application and the coordinator and monitored the memory usage.

Here's the graph from the application:

The test was ran from 14:22 to 14:37. Even though we were pumping through 610M of data through the application the heap was gittering between 50m and 200m, easily able to reclaim the memory of the data we have streamed out.

For those new to Cassandra and other distributed databases this might not seem that spectacular, but I once wrote a rather large project to do what we can manage here in a few lines. Awesome work by the Apache Cassandra commitors and the DataStax Java driver team.

Friday, December 5, 2014

Cassandra summit EU - British Gas, i20 water, testing Cassandra and paging!

Yesterday was the EU Cassandra Summit in London, 1000 crazy Cassandra lovers. I've only just recovered from what was a hectic day.

Over the course of the day I got to do chat with two awesome companies, Michael Williams from i20 water and Josep Casals from British Gas Connected Homes. Both of these companies are using Cassandra to store time series data from devices, dare I use the ever popular buzz phrase Internet of Things?

But really they are, i20 water enable water companies to place sensors all around their network and gather the data to detect leaks, saving them 100s of millions of litres of water a day.

British Gas Connected homes are enabling their customers to turn their central heating on and off via their mobile, and are expanding into monitoring boilers and predicting when they'll fail/need a service.

In addition to speaking with Cassandra users I also snuck in a talk and a lightning talk. The talk was on how to test Cassandra applications and the lightning talk on server side paging.

Here are the slides for the talk, the video will no doubt be online soon:


And for the lightning talk:

Monday, December 1, 2014

Talking about Cassandra at the LJC open conference 2014

The LJC conference is a yearly event for Java/JVM developers in London to get together and see some (hopefully) great talks :)

IBM kindly provide the an awesome venue on South Bank at no charge.

I had the fortune of being scheduled to talk first which means I could get my talk done and then enjoy the rest of the day. I chose to speak about Cassandra for Java devs which went down really well and I had people coming to me all day asking about Cassandra.

Here are the slides:

Overall it was an awesome day and I look forward to next year :)