Wednesday, January 21, 2015

Spark 1.2 with Cassandra 2.1: Setting up a SparkSQL + Cassandra environment

In my previous post on Cassandra and Spark I showed how to get a development environment setup with Vagrant/Ansible/VirtualBox without installing Cassandra/Spark on your dev station.

This update will get us to a point where we can run SQL (yes, SQL, not CQL) on Cassandra. It is just a trivial example to show the setup working.

The previous article was back in the days of Spark 1.0. With Spark 1.1+ we can now run SparkSQL directly against Cassandra.

I've updated the Vagrant/Ansible provisioning to install Spark 1.2 and Cassandra 2.1, and I've added a new "fatjar" with the latest Cassandra Spark connector so that we can use it in the Spark shell and show this magic working. The 1.2 connector isn't released yet so I have built against the Alpha, see here for details. We're just that cutting edge here...

So, once you have ran vagrant up (this will take a while as it downloads + install all of the above) you'll need to SSH in we can get into Spark shell.

I've setup the following alias so no worrying about classpaths:

alias spark-shell='spark-shell --jars /vagrant/spark-connector-1.2.0-alpha1-driver-2.1.4-1.0.0-SNAPSHOT-fat.jar

First lets jump into cqlsh and create a keyspace and table to play with:


Not the most exciting schema you'll ever see, but this is all about getting the Hello World of SparkSQL on Cassandra working!

Now we have some data to play with lets access it from Spark shell.


Lets go through what has happened here:

  • Lines 3-6: Mandatory Spark ASCII art
  • Line 12: Import the connector so we can access Cassandra
  • Line 15: Create a CassandraSQLContext
  • Line 18: Set it to our test Keyspace we created above
  • Line 20: Select the whole table (very exciting I know!)
  • Line 21: Get Spark to execute an action so all the magic happens

That's all for now, tune in later for a more complicated example :)

Here's the link to all the Vagrant/Ansible code.



3 comments:

Anonymous said...

Thanks Christopher for your great post ! May be a little typo in your spark-shell console but I do not see the cc init :

val cc = new CassandraSQLContext(sc)

Nicolas

chbatey said...

Thanks Nicolas - must have deleted it when I was getting rid of Spark's verbose logging!

Unknown said...

I am trying to query data in cassandra from spark .
I want to use CassandraSQLContext , but while i am trying i am getting the following error.
Could any one please help me out .
scala> import org.apache.spark.sql.cassandra.CassandraSQLContext
scala> val csc = new CassandraSQLContext(sc)
scala> csc.setKeyspace("spark_cassandra")
scala> val results = csc.sql("SELECT * from spark_cassandra.test")
scala> results.collect().foreach(println)
Error :
com.google.common.util.concurrent.ExecutionError: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.LogicalRelation.(Lorg/apache/spark/sql/sources/BaseRelation;)V
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2199)
at com.google.common.cache.LocalCache.get(LocalCache.java:3934)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3938)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4821)
at org.apache.spark.sql.cassandra.CassandraCatalog.lookupRelation(CassandraCatalog.scala:34)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.scala:257)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:268)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:57)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:56)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$1.apply(LogicalPlan.scala:54)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:249)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
Like