Meaningless Words to Useful Phrases in Spark – word2phrase

Introduction to word2phrase

When we communicate, we often know that individual words in the correct placements can change the meaning of what we’re trying to say.  Add “very” in front of an adjective and you place more emphasis on the adjective.  Add “york” in after the word “new” and you get a location.  Throw in “times” after that and now it’s a newspaper.

It follows that when working with data, these meanings should be known.  The three separate words “new”, “york”, and “times” are very different than “New York Times” as one phrase.  This is where the word2phrase algorithm comes into play.

Words to Phrases

At its core, word2phrase takes in a sentence of individual words and potentially turns bigrams (two consecutive words) into a phrase by joining the two words together with a symbol (underscore in our case).  Whether or not a bigram is turned into a phrase is determined by the training set and parameters set by the user.  Note that every two consecutive words are considered, so in a sentence with w1 w2 w3, bigrams would be w1w2, w2w3.

In our word2phrase implementation in Spark (and done similarly in Gensim), there are two distinct steps; a training (estimator) step and application (transform) step.

*For clarity, note that “new york” is a bigram, while “new_york” is a phrase.

Estimator Step

The training step is where we pass in a training set to the word2phrase estimator.  The estimator takes this dataset and produces a model using the algorithm.  The model is called the transformer, which we pass in datasets that we want to transform, i.e. sentences that with bigrams that we may want to transform to phrases.

In the training set, the dataset is an array of sentences.  The algorithm will take these sentences and apply the following formula to give a score to each bigram:

score(wi, wj) = (count(wiwj) – delta) / (count(wi) * count(wj))

where wi and wj are word i and word j, and delta is discounting coefficient that can be set to prevent phrases consisting of infrequent words to be formed.  So wiwj is when word j follows word i.

After the score for each bigram is calculated, those above a set threshold (this value can be changed by the user) will be transformed into phrases.  The model produces by the estimator step is thus an array of bigrams; the ones that should be turned to phrases.

Transformer Step

The transform step is incredibly simple; pass in any array of sentences to your model and it will search for matching bigrams.  All matching bigrams in the array you passed in will then be turned to phrases.

You can repeat these steps to produce trigrams (i.e. three words into a phrase).  For example, with “I read the New York Times” may produce “I read the new_york Times” after the first run, but run it again to get “I read the new_york_times”, because in the second run “new_york” is also an individual word now.

Example

First we create our training dataset; it’s a dataframe where the occurrences “new york” and “test drive” appears frequently.  (The sentences make no sense as they are randomly generated words.  See below for link to full dataframe.)

You can copy/paste this into your spark shell to test it, so long as you have the word2phrase algorithm included (available as a maven package with coordinates com.reputation.spark:word2phrase:1.0.1).

Download the package, create our test dataframe:

spark-shell –packages com.reputation.spark.word2phrase.1.0.1

import org.apache.spark.ml.feature.Word2Phrase

val wordDataFrame = sqlContext.createDataFrame(Seq(
(0, “new york test drive cool york how always learn media new york .”),
(1, “online york new york learn to media cool time .”),
(2, “media play how cool times play .”),
(3, “code to to code york to loaded times media .”),
(4, “play awesome to york .”),
.
.
.
(1099, “work please ideone how awesome times .”),
(1100, “play how play awesome to new york york awesome use new york work please loaded      always like .”),
(1101, “learn like I media online new york .”),
(1102, “media follow learn code code there to york times .”),
(1103, “cool use play work please york cool new york how follow .”),
(1104, “awesome how loaded media use us cool new york online code judge ideone like .”),
(1105, “judge media times time ideone new york new york time us fun .”),
(1106, “new york to time there media time fun there new like media time time .”),
(1107, “awesome to new times learn cool code play how to work please to learn to .”),
(1108, “there work please online new york how to play play judge how always work please .”),
(1109, “fun ideone to play loaded like how .”),
(1110, “fun york test drive awesome play times ideone new us media like follow .”)
)).toDF(“label”, “inputWords”)

We set the input and output column names and create the model (the estimator step, represented by the fit(wordDataFrame) function).

scala> val t = new Word2Phrase().setInputCol(“inputWords”).setOutputCol(“out”)
t: org.apache.spark.ml.feature.Word2Phrase = deltathresholdScal_f07fb0d91c1f

scala> val model = t.fit(wordDataFrame)

Here are some of the scores (Table 1) calculated by the algorithm before removing those below the threshold (note all the scores above the threshold are shown here).  The default values have delta -> 100, threshold -> 0.00001, and minWords -> 0.

Table 1
bigram score
 test drive  0.002214815139686856
 work please 0.002047826661381…
 new york  5.946183949006843E-4
 york new  -1.64600247723372…
 york york  -6.43001404062082…
 york how  -6.64999302561707…
 how new  -6.80666229773923…
 new new  -7.42968903739342…
 to new  -7.52757602015383E-5
 york to  -9.25567252744992…

only showing top 10 rows

So our model produces three bigrams that will be searched for in the transform step:

test drive
work please
new york

We then use this model to transform our original dataframe sentences and view the results.  Unfortunately you can’t see the entire row in the spark-shell, but in the out column it’s clear that all instances of “new york” and “test drive” have been transformed into “new_york” and “test_drive”.

scala> val bi_gram_data = model.transform(wordDataFrame)
bi_gram_data: org.apache.spark.sql.DataFrame = [label: int, inputWords: string … 1 more field]

scala> bi_gram_data.show()

Table 2
label inputWords out
0 new york test dri…  new_york test_dri…
1 online york new y…  online york new_y…
2 media play how co…  media play how co…
3 code to to code y…  code to to code y…
4 play awesome to y…  play awesome to y…
5  like I I always .   like I I always .
6 how to there lear…  how to there lear…
7 judge time us pla…  judge time us pla…
8 judge test drive …  judge test_drive …
9 judge follow fun …  judge follow fun …
 10  how I follow ideo…  how I follow ideo…
 11  use use learn I t…  use use learn I t…
 12  us new york alway…  us new_york alway…
 13  there always how …  there always how …
 14  always time media…  always time media…
 15 how test drive to…  how test_drive to…
 16  cool us online ti…  cool us online ti…
 17 follow time aweso…  follow time aweso…
 18  us york test driv…  us york test_driv…
 19  use fun new york …  use fun new_york …

only showing top 20 rows

The algorithm and test dataset (testSentences.scala) are available at this repository.


Using JDBC and Apache Spark

Overview

Apache Spark does many things well – one of them being to take a variety of different data sources and building a clean data processing pipeline without thinking too much about the underlying details. Usually, for toy projects that works, but sometimes the abstractions are a little broken.

In our case, we wanted to suck in several hundred GBs from Vertica to Apache Spark and do so quickly – so we wanted it to run in parallel across all of our nodes

The Problem

Vertica runs on a set of machine in AWS – so does our Apache Spark cluster. We want the data from a few large table to get updated in Apache Spark for us to run our NLP processing. The main problem with transferring large amounts of data is that the network is the limiting factor for a single connection.

The default settings open up on connection like this:

Single Connection - New Page

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[URL]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

In AWS, you will typically get around a max of 1.6GBs between two machines. You can increase this by making sure that all your machines have a 10Gbs network connection (only on the high end machines) and ensure that everything is in the same cluster network, launched together, and a few other things, but even then you are capped by a single network connection. It is easier, cheaper and more scalable to have multiple connections between our two storage systems.

Multiple Connections - New Page (1)

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[replaceme]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “partitionColumn” -> “tenant_id”,

 “lowerBound” -> “1”,

 “upperBound” -> “10000”,

 “numPartitions” -> “10”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

 

If you run this and look at the connections, you’ll see a bunch of connections coming in (basically, it will be the number of Spark Executors running on your cluster).

 

Ways of looking at the data

This is a start – and the performance is definitely a step up from where we started (in our case around 8x the speed with a cluster of 6 machines m3.xlarges). What about the next steps? For us, we want to operate on locations for each tenant (in our data model, each tenant has many locations). Some tenants have 1 location, some have 10000s. We want our data spread evenly in partitions so that Spark can process the data efficiently.

 

Let’s look at the partitions and distribution using a mapping:

df.mapPartitions(iter => Array(iter.size).iterator).take(300)

 

We can see that the first few partitions have a bunch of data, and then nothing else. I asked the system to use 200 partitions – and it did, but the data isn’t spread out at all.  

To understand this, let me quickly explain how Spark is doing partitioning of JDBC jobs.

 

Apache Spark JDBC Sharding

There are 4 inputs when using parallel data retrieval:

  • partition field
  • min
  • max
  • numPartitions

 

The first thing to understand is that the driver does not look at the data before creating the jobs (this is a good thing in terms of scaling, but requires a bit of work on our part). It simply creates partition ranges using the information we gave it, e.g.:

JDBC Spark Partitioning - New Page

The real problem

In our case, I mentioned that what we really wanted was to partition the data by Location_id not tenant (since downstream processing just need the location data together to prevent the need for a bunch of shuffling). So why didn’t we just use Location_id?  As it happens, location_id is a GUID and not an integer, so we can use as a range input to Apache Spark.

First pass at a solution (sorting, etc)

So, let’s try a first simple pass at solving this issue.  What about using timestamps? Can we take all of our data, find the min and max, convert the the timestamp into an int  and use that?

There are two issues with this approach:

  1. Hopefully we are collecting more data now than we were a year ago, so the data won’t be evenly spread across time slots.
  2. It requires our DB to sort the entire data set. This requires a lot of work (let’s face, there is a reason why data processing systems have a sorting competition) This is only going to get worse as our data set increases. This is expensive, since it requires sending data from node to node over the network.

 

Second/Final Solution – the Hashing trick

Now, an ideal solution doesn’t need to look at the data at all. Then we don’t have to worry about reading the data, and we know that we can scale out horizontally. For this, we need to start looking at hash functions.

As a quick reminder, a hash function takes in an input and maps that to a set of values. Typically the output will be compressed.  A good hash function should also possess a property called uniformity. Basically, this means that any bucket should wind up with approximately m/n records, where n is the number of buckets and m is the number of input values.

Hash Functions - New Page (2)

Since most hash functions that are available on a database don’t take an output range, we will need to convert from the space m to our target Our flow now looks like:

Hashing Process - New Page (1)

Now to use this:

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[replaceme]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “partitionColumn” -> “(hash(location_id) % 10) as bucket_id”,

 “lowerBound” -> “1”,

 “upperBound” -> “10000”,

 “numPartitions” -> “10”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

 

With this in place, we can show that as long as our hash function is decent, the distribution across our partitions will be even. Now, we haven’t guaranteed anything about the size of the data for each location, and that is really just domain specific.

 

Hashing in SQL

This is what this looks like in Vertica hasing on a username:

partition = 100

 

sql = “””

 select

   (1 + mod(hash(username), %(partition)s)) as hash_code,

   username,

   first_name,

   last_name,

   age

 from user_table

“”” % {“partition”: partition}

 

df = sqlContext.read.jdbc(

 url = “jdbc:vertica://db_host:db_port/db_name”,

 table = “(%s) as tmp” % sql,

 properties = {“user”: user, “password”: pw},

 column = ‘hash_code’, lowerBound=1, upperBound=params[‘partition’], numPartitions=partition)

 

df.registerTempTable(“users”)

sqlContext.sql(“select * from users where first_name = ‘John’”).show()

 

Why isn’t the built in to Apache Spark

Some existing Spark connectors integrate this type of load balancing into their core (the spark-mongodb connection from stratio, for example – https://github.com/Stratio/Spark-MongoDB ) The issue is that it requires a hash function that can take in a string, and return an integer. Most database do have thing that can do this (md5 as a extreme  – it is fast enough that it will still help with the issue), but it is not standardized in SQL, so the Apache Spark contributors presumably felt that it was better to leave it out.