Using JDBC and Apache Spark

Overview

Apache Spark does many things well – one of them being to take a variety of different data sources and building a clean data processing pipeline without thinking too much about the underlying details. Usually, for toy projects that works, but sometimes the abstractions are a little broken.

In our case, we wanted to suck in several hundred GBs from Vertica to Apache Spark and do so quickly – so we wanted it to run in parallel across all of our nodes

The Problem

Vertica runs on a set of machine in AWS – so does our Apache Spark cluster. We want the data from a few large table to get updated in Apache Spark for us to run our NLP processing. The main problem with transferring large amounts of data is that the network is the limiting factor for a single connection.

The default settings open up on connection like this:

Single Connection - New Page

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[URL]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

In AWS, you will typically get around a max of 1.6GBs between two machines. You can increase this by making sure that all your machines have a 10Gbs network connection (only on the high end machines) and ensure that everything is in the same cluster network, launched together, and a few other things, but even then you are capped by a single network connection. It is easier, cheaper and more scalable to have multiple connections between our two storage systems.

Multiple Connections - New Page (1)

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[replaceme]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “partitionColumn” -> “tenant_id”,

 “lowerBound” -> “1”,

 “upperBound” -> “10000”,

 “numPartitions” -> “10”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

 

If you run this and look at the connections, you’ll see a bunch of connections coming in (basically, it will be the number of Spark Executors running on your cluster).

 

Ways of looking at the data

This is a start – and the performance is definitely a step up from where we started (in our case around 8x the speed with a cluster of 6 machines m3.xlarges). What about the next steps? For us, we want to operate on locations for each tenant (in our data model, each tenant has many locations). Some tenants have 1 location, some have 10000s. We want our data spread evenly in partitions so that Spark can process the data efficiently.

 

Let’s look at the partitions and distribution using a mapping:

df.mapPartitions(iter => Array(iter.size).iterator).take(300)

 

We can see that the first few partitions have a bunch of data, and then nothing else. I asked the system to use 200 partitions – and it did, but the data isn’t spread out at all.  

To understand this, let me quickly explain how Spark is doing partitioning of JDBC jobs.

 

Apache Spark JDBC Sharding

There are 4 inputs when using parallel data retrieval:

  • partition field
  • min
  • max
  • numPartitions

 

The first thing to understand is that the driver does not look at the data before creating the jobs (this is a good thing in terms of scaling, but requires a bit of work on our part). It simply creates partition ranges using the information we gave it, e.g.:

JDBC Spark Partitioning - New Page

The real problem

In our case, I mentioned that what we really wanted was to partition the data by Location_id not tenant (since downstream processing just need the location data together to prevent the need for a bunch of shuffling). So why didn’t we just use Location_id?  As it happens, location_id is a GUID and not an integer, so we can use as a range input to Apache Spark.

First pass at a solution (sorting, etc)

So, let’s try a first simple pass at solving this issue.  What about using timestamps? Can we take all of our data, find the min and max, convert the the timestamp into an int  and use that?

There are two issues with this approach:

  1. Hopefully we are collecting more data now than we were a year ago, so the data won’t be evenly spread across time slots.
  2. It requires our DB to sort the entire data set. This requires a lot of work (let’s face, there is a reason why data processing systems have a sorting competition) This is only going to get worse as our data set increases. This is expensive, since it requires sending data from node to node over the network.

 

Second/Final Solution – the Hashing trick

Now, an ideal solution doesn’t need to look at the data at all. Then we don’t have to worry about reading the data, and we know that we can scale out horizontally. For this, we need to start looking at hash functions.

As a quick reminder, a hash function takes in an input and maps that to a set of values. Typically the output will be compressed.  A good hash function should also possess a property called uniformity. Basically, this means that any bucket should wind up with approximately m/n records, where n is the number of buckets and m is the number of input values.

Hash Functions - New Page (2)

Since most hash functions that are available on a database don’t take an output range, we will need to convert from the space m to our target Our flow now looks like:

Hashing Process - New Page (1)

Now to use this:

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[replaceme]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “partitionColumn” -> “(hash(location_id) % 10) as bucket_id”,

 “lowerBound” -> “1”,

 “upperBound” -> “10000”,

 “numPartitions” -> “10”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

 

With this in place, we can show that as long as our hash function is decent, the distribution across our partitions will be even. Now, we haven’t guaranteed anything about the size of the data for each location, and that is really just domain specific.

 

Hashing in SQL

This is what this looks like in Vertica hasing on a username:

partition = 100

 

sql = “””

 select

   (1 + mod(hash(username), %(partition)s)) as hash_code,

   username,

   first_name,

   last_name,

   age

 from user_table

“”” % {“partition”: partition}

 

df = sqlContext.read.jdbc(

 url = “jdbc:vertica://db_host:db_port/db_name”,

 table = “(%s) as tmp” % sql,

 properties = {“user”: user, “password”: pw},

 column = ‘hash_code’, lowerBound=1, upperBound=params[‘partition’], numPartitions=partition)

 

df.registerTempTable(“users”)

sqlContext.sql(“select * from users where first_name = ‘John’”).show()

 

Why isn’t the built in to Apache Spark

Some existing Spark connectors integrate this type of load balancing into their core (the spark-mongodb connection from stratio, for example – https://github.com/Stratio/Spark-MongoDB ) The issue is that it requires a hash function that can take in a string, and return an integer. Most database do have thing that can do this (md5 as a extreme  – it is fast enough that it will still help with the issue), but it is not standardized in SQL, so the Apache Spark contributors presumably felt that it was better to leave it out.