Meaningless Words to Useful Phrases in Spark – word2phrase

Introduction to word2phrase

When we communicate, we often know that individual words in the correct placements can change the meaning of what we’re trying to say.  Add “very” in front of an adjective and you place more emphasis on the adjective.  Add “york” in after the word “new” and you get a location.  Throw in “times” after that and now it’s a newspaper.

It follows that when working with data, these meanings should be known.  The three separate words “new”, “york”, and “times” are very different than “New York Times” as one phrase.  This is where the word2phrase algorithm comes into play.

Words to Phrases

At its core, word2phrase takes in a sentence of individual words and potentially turns bigrams (two consecutive words) into a phrase by joining the two words together with a symbol (underscore in our case).  Whether or not a bigram is turned into a phrase is determined by the training set and parameters set by the user.  Note that every two consecutive words are considered, so in a sentence with w1 w2 w3, bigrams would be w1w2, w2w3.

In our word2phrase implementation in Spark (and done similarly in Gensim), there are two distinct steps; a training (estimator) step and application (transform) step.

*For clarity, note that “new york” is a bigram, while “new_york” is a phrase.

Estimator Step

The training step is where we pass in a training set to the word2phrase estimator.  The estimator takes this dataset and produces a model using the algorithm.  The model is called the transformer, which we pass in datasets that we want to transform, i.e. sentences that with bigrams that we may want to transform to phrases.

In the training set, the dataset is an array of sentences.  The algorithm will take these sentences and apply the following formula to give a score to each bigram:

score(wi, wj) = (count(wiwj) – delta) / (count(wi) * count(wj))

where wi and wj are word i and word j, and delta is discounting coefficient that can be set to prevent phrases consisting of infrequent words to be formed.  So wiwj is when word j follows word i.

After the score for each bigram is calculated, those above a set threshold (this value can be changed by the user) will be transformed into phrases.  The model produces by the estimator step is thus an array of bigrams; the ones that should be turned to phrases.

Transformer Step

The transform step is incredibly simple; pass in any array of sentences to your model and it will search for matching bigrams.  All matching bigrams in the array you passed in will then be turned to phrases.

You can repeat these steps to produce trigrams (i.e. three words into a phrase).  For example, with “I read the New York Times” may produce “I read the new_york Times” after the first run, but run it again to get “I read the new_york_times”, because in the second run “new_york” is also an individual word now.

Example

First we create our training dataset; it’s a dataframe where the occurrences “new york” and “test drive” appears frequently.  (The sentences make no sense as they are randomly generated words.  See below for link to full dataframe.)

You can copy/paste this into your spark shell to test it, so long as you have the word2phrase algorithm included (available as a maven package with coordinates com.reputation.spark:word2phrase:1.0.1).

Download the package, create our test dataframe:

spark-shell –packages com.reputation.spark.word2phrase.1.0.1

import org.apache.spark.ml.feature.Word2Phrase

val wordDataFrame = sqlContext.createDataFrame(Seq(
(0, “new york test drive cool york how always learn media new york .”),
(1, “online york new york learn to media cool time .”),
(2, “media play how cool times play .”),
(3, “code to to code york to loaded times media .”),
(4, “play awesome to york .”),
.
.
.
(1099, “work please ideone how awesome times .”),
(1100, “play how play awesome to new york york awesome use new york work please loaded      always like .”),
(1101, “learn like I media online new york .”),
(1102, “media follow learn code code there to york times .”),
(1103, “cool use play work please york cool new york how follow .”),
(1104, “awesome how loaded media use us cool new york online code judge ideone like .”),
(1105, “judge media times time ideone new york new york time us fun .”),
(1106, “new york to time there media time fun there new like media time time .”),
(1107, “awesome to new times learn cool code play how to work please to learn to .”),
(1108, “there work please online new york how to play play judge how always work please .”),
(1109, “fun ideone to play loaded like how .”),
(1110, “fun york test drive awesome play times ideone new us media like follow .”)
)).toDF(“label”, “inputWords”)

We set the input and output column names and create the model (the estimator step, represented by the fit(wordDataFrame) function).

scala> val t = new Word2Phrase().setInputCol(“inputWords”).setOutputCol(“out”)
t: org.apache.spark.ml.feature.Word2Phrase = deltathresholdScal_f07fb0d91c1f

scala> val model = t.fit(wordDataFrame)

Here are some of the scores (Table 1) calculated by the algorithm before removing those below the threshold (note all the scores above the threshold are shown here).  The default values have delta -> 100, threshold -> 0.00001, and minWords -> 0.

Table 1
bigram score
 test drive  0.002214815139686856
 work please 0.002047826661381…
 new york  5.946183949006843E-4
 york new  -1.64600247723372…
 york york  -6.43001404062082…
 york how  -6.64999302561707…
 how new  -6.80666229773923…
 new new  -7.42968903739342…
 to new  -7.52757602015383E-5
 york to  -9.25567252744992…

only showing top 10 rows

So our model produces three bigrams that will be searched for in the transform step:

test drive
work please
new york

We then use this model to transform our original dataframe sentences and view the results.  Unfortunately you can’t see the entire row in the spark-shell, but in the out column it’s clear that all instances of “new york” and “test drive” have been transformed into “new_york” and “test_drive”.

scala> val bi_gram_data = model.transform(wordDataFrame)
bi_gram_data: org.apache.spark.sql.DataFrame = [label: int, inputWords: string … 1 more field]

scala> bi_gram_data.show()

Table 2
label inputWords out
0 new york test dri…  new_york test_dri…
1 online york new y…  online york new_y…
2 media play how co…  media play how co…
3 code to to code y…  code to to code y…
4 play awesome to y…  play awesome to y…
5  like I I always .   like I I always .
6 how to there lear…  how to there lear…
7 judge time us pla…  judge time us pla…
8 judge test drive …  judge test_drive …
9 judge follow fun …  judge follow fun …
 10  how I follow ideo…  how I follow ideo…
 11  use use learn I t…  use use learn I t…
 12  us new york alway…  us new_york alway…
 13  there always how …  there always how …
 14  always time media…  always time media…
 15 how test drive to…  how test_drive to…
 16  cool us online ti…  cool us online ti…
 17 follow time aweso…  follow time aweso…
 18  us york test driv…  us york test_driv…
 19  use fun new york …  use fun new_york …

only showing top 20 rows

The algorithm and test dataset (testSentences.scala) are available at this repository.


http://tech.reputation.com/what-to-do-when-your-dating-a-loser/

Overview

Apache Spark does many things well – one of them being to take a variety of different data sources and building a clean data processing pipeline without thinking too much about the underlying details. Usually, for toy projects that works, but sometimes the abstractions are a little broken.

In our case, we wanted to suck in several hundred GBs from Vertica to Apache Spark and do so quickly – so we wanted it to run in parallel across all of our nodes

The Problem

Vertica runs on a set of machine in AWS – so does our Apache Spark cluster. We want the data from a few large table to get updated in Apache Spark for us to run our NLP processing. The main problem with transferring large amounts of data is that the network is the limiting factor for a single connection.

The default settings open up on connection like this:

http://tech.reputation.com/radioactive-dating-calculations/

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[URL]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

In AWS, you will typically get around a max of 1.6GBs between two machines. You can increase this by making sure that all your machines have a 10Gbs network connection (only on the high end machines) and ensure that everything is in the same cluster network, launched together, and a few other things, but even then you are capped by a single network connection. It is easier, cheaper and more scalable to have multiple connections between our two storage systems.

buddhist dating sites usa

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[replaceme]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “partitionColumn” -> “tenant_id”,

 “lowerBound” -> “1”,

 “upperBound” -> “10000”,

 “numPartitions” -> “10”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

 

If you run this and look at the connections, you’ll see a bunch of connections coming in (basically, it will be the number of Spark Executors running on your cluster).

 

Ways of looking at the data

This is a start – and the performance is definitely a step up from where we started (in our case around 8x the speed with a cluster of 6 machines m3.xlarges). What about the next steps? For us, we want to operate on locations for each tenant (in our data model, each tenant has many locations). Some tenants have 1 location, some have 10000s. We want our data spread evenly in partitions so that Spark can process the data efficiently.

 

Let’s look at the partitions and distribution using a mapping:

df.mapPartitions(iter => Array(iter.size).iterator).take(300)

 

We can see that the first few partitions have a bunch of data, and then nothing else. I asked the system to use 200 partitions – and it did, but the data isn’t spread out at all.  

To understand this, let me quickly explain how Spark is doing partitioning of JDBC jobs.

 

Apache Spark JDBC Sharding

There are 4 inputs when using parallel data retrieval:

  • partition field
  • min
  • max
  • numPartitions

 

The first thing to understand is that the driver does not look at the data before creating the jobs (this is a good thing in terms of scaling, but requires a bit of work on our part). It simply creates partition ranges using the information we gave it, e.g.:

JDBC Spark Partitioning - New Page

The real problem

In our case, I mentioned that what we really wanted was to partition the data by Location_id not tenant (since downstream processing just need the location data together to prevent the need for a bunch of shuffling). So why didn’t we just use Location_id?  As it happens, location_id is a GUID and not an integer, so we can use as a range input to Apache Spark.

First pass at a solution (sorting, etc)

So, let’s try a first simple pass at solving this issue.  What about using timestamps? Can we take all of our data, find the min and max, convert the the timestamp into an int  and use that?

There are two issues with this approach:

  1. Hopefully we are collecting more data now than we were a year ago, so the data won’t be evenly spread across time slots.
  2. It requires our DB to sort the entire data set. This requires a lot of work (let’s face, there is a reason why data processing systems have a sorting competition) This is only going to get worse as our data set increases. This is expensive, since it requires sending data from node to node over the network.

 

Second/Final Solution – the Hashing trick

Now, an ideal solution doesn’t need to look at the data at all. Then we don’t have to worry about reading the data, and we know that we can scale out horizontally. For this, we need to start looking at hash functions.

As a quick reminder, a hash function takes in an input and maps that to a set of values. Typically the output will be compressed.  A good hash function should also possess a property called uniformity. Basically, this means that any bucket should wind up with approximately m/n records, where n is the number of buckets and m is the number of input values.

Hash Functions - New Page (2)

Since most hash functions that are available on a database don’t take an output range, we will need to convert from the space m to our target Our flow now looks like:

Hashing Process - New Page (1)

Now to use this:

val df = sqlContext.read.format(“jdbc”).options(

 Map(“url” -> “jdbc:vertica://[replaceme]:5433/warehouse”,

 “dbtable” -> “ratings”,

 “partitionColumn” -> “(hash(location_id) % 10) as bucket_id”,

 “lowerBound” -> “1”,

 “upperBound” -> “10000”,

 “numPartitions” -> “10”,

 “user” -> “REPLACEME”,

 “password” -> “REPLACEME”)).load()

 

With this in place, we can show that as long as our hash function is decent, the distribution across our partitions will be even. Now, we haven’t guaranteed anything about the size of the data for each location, and that is really just domain specific.

 

Hashing in SQL

This is what this looks like in Vertica hasing on a username:

partition = 100

 

sql = “””

 select

   (1 + mod(hash(username), %(partition)s)) as hash_code,

   username,

   first_name,

   last_name,

   age

 from user_table

“”” % {“partition”: partition}

 

df = sqlContext.read.jdbc(

 url = “jdbc:vertica://db_host:db_port/db_name”,

 table = “(%s) as tmp” % sql,

 properties = {“user”: user, “password”: pw},

 column = ‘hash_code’, lowerBound=1, upperBound=params[‘partition’], numPartitions=partition)

 

df.registerTempTable(“users”)

sqlContext.sql(“select * from users where first_name = ‘John’”).show()

 

Why isn’t the built in to Apache Spark

Some existing Spark connectors integrate this type of load balancing into their core (the spark-mongodb connection from stratio, for example – https://github.com/Stratio/Spark-MongoDB ) The issue is that it requires a hash function that can take in a string, and return an integer. Most database do have thing that can do this (md5 as a extreme  – it is fast enough that it will still help with the issue), but it is not standardized in SQL, so the Apache Spark contributors presumably felt that it was better to leave it out.

 


Natural Language Processing with Spark

One of the goals of the Analytics team has been to provide newer, more in-depth ways to analyze the millions of comments that Reputation aggregates from various sources for each customer. One way to do this is through natural language processing (NLP) techniques like part-of-speech(POS) tagging, named entity recognition(NER), and stemming/lemmatization. Combining these NLP techniques with our existing segmentation tools allows us to begin comparing statistics across sets defined by the language content of those comments. For example, we could look at the set of Walgreens comments that mention Rite-Aid and see that these had higher than average ratings in comparison to the total set of Walgreens comments.

These evaluations, however, initially required us to load the set of comments that we wished to analyze into Python, then run each comment through a natural language parser one at a time locally each time we wanted to run an analysis. The overhead required to parse each of these reviews began to impede our ability to rapidly test different types of analyses, so we began to look into alternative methods for achieving this goal. What we were ultimately looking for was a pre-processed database that would allow us to look up a comment by id and receive a set of POS tags, named entities, and lemmas without having to re-parse each comment each time. This natural language pre-processing would need to be done retroactively to the tens of millions of comments already stored in our database, as well as incrementally on any new comments that have been pulled in every few days.

Since much of our analysis framework was already implemented in Python, we began adding this new NLP piece in Python as well. Of the various NLP libraries available to Python at the time of this writing, the one that seemed to work best on the 2-3 sentence reviews in our database was the CoreNLP library from Stanford. Essentially CoreNLP comes with a series of models that have been trained on a large corpus of sample words for different languages (presently English, Arabic, Chinese, French and German). These models are then used to evaluate the likely part-of-speech of new inputs based on patterns learned from the original training data. The library also uses similar processes to determine which words in a given input are references to some named entity (for example an organization, individual name, or location name) and to identify the stem form of each word for easier pattern analysis.

The downside of using CoreNLP, however, is that in order to run, it starts up a new, separate Java process which is then passed one comment at a time for parsing. Starting up this Java process creates 5-10 minutes of overhead for processing a set of comments of any size, and even once this separate process is running it can take a few minutes to fully parse an average length comment (3-5 sentences). Thus to run all the millions of historical comments through CoreNLP in a serial fashion would be computationally infeasible. Instead, we decided to use Apache Spark to bring up a distributed cluster to run these comments through CoreNLP in parallel.

Spark provides a set of libraries in either Python, Scala, R, or Java that handle the hassle of creating a distributed cluster of nodes and efficiently distributing data between them. While it can be used for a wide variety of purposes, we used it to take the set of comments that we needed to evaluate and figure out how to split those comments amongst clusters of varying sizes in order to reduce the time necessary to run all of our historical data through CoreNLP. Using Spark also provided the added bonus of easily integrating with AWS’ Elastic Map-Reduce (EMR) service, which has an easy-to-use command line interface for bringing up clusters of EC2 nodes. Amazon has preconfigured settings to automatically pass the relevant information about each EMR cluster through to Spark so that we can easily bring up any number of nodes with the same code. This makes it easy to setup a cron task to automatically parse the last few days worth of reviews on a regular basis.

Additionally, while we originally set out to create a Python application to interact with Spark and CoreNLP, we eventually discovered that we needed the ability to more carefully control which information CoreNLP passed to each Spark process. Since Spark is capable of running multiple threads on each node in order to better parallelize and since each thread runs a separate version of our Spark application, we noticed that each Python application in each thread was instantiating its own CoreNLP Java process. This meant that if we had 4 threads running on the same node, we would also have 4 CoreNLP Java processes running on that node, which would slow that node’s performance to a crawl. To get around this, we had to translate our application into Scala instead. Scala allows for the existence of transient variables, which allowed us to write our code in such a way that when multiple threads are running on the same node, they all use the same CoreNLP Java process, but whenever a new node is brought up it brings up a new process. (Thanks to Databrick’s Spark/CoreNLP wrapper for this idea!)

Below is some of the code from our Scala-based Spark application. It is designed to do the following:

  1.     Pull in some number of reviews from our Vertica database.
  2.     Distribute those reviews to a cluster of independent nodes.
  3.     Run each review through the CoreNLP process for that node.
  4.     Format CoreNLP’s output so that it can uploaded back into Vertica
  5.     Upload the natural language data (POS tags, NER tags, and lemmas) back into the database

Click here for Github Gist

Once our Spark application was working  on local developer machines, we began testing running it through EMR’s distributed clusters instead. Initially we ran into some headaches getting Spark to fully utilize the resources made available to it through EMR. There is a line in the code above that talks about pulling in the number of nodes available through the Spark Config (val num_exec = sc.getConf.get(“spark.executor.instances”).toInt). This line tells spark how many nodes it has available so that it can partition the data accordingly. Below are two screenshots of the CPU usage per node in AWS from before this change and after it:

Screen Shot 2015-11-10 at 3.59.15 PM

Before proper partitioning – Notice that in this case, the node in blue is the only one that appears to be actually doing any parsing. This is because Spark defaults to assuming a single data partition, so it runs all the comments through the master node.

Screen Shot 2015-12-31 at 12.07.02 PM

After proper partitioning – By explicitly telling Spark how many nodes to use, we can see that it now runs some comments through all 8 nodes. (Thanks to Cloudera for explaining this and more about how to properly tune Spark jobs!)

 

Additionally we ran into some trouble getting EMR to communicate with Vertica through the database’s security restrictions, which involved playing with our VPN settings. Once these hurdles were dealt with though, we were able to begin testing the scaling power of this CoreNLP/Spark/EMR solution. The following graph shows the number of minutes it took Spark to run as dependent on the number of thousands of comments run per each instance in the EMR cluster. As you can see, the time to run increases linearly as a function of how many comments each node is required to run.

minutes_per_comments_per_node

Minute to Run vs. # of thousands of comments per node in cluster – This graph shows the time it takes Spark to run our process as a function of number of comments per each distributed node in the cluster. It shows a linear relationship more or less up until the point where there are more than a million comments per node.

The outlier point at 1000 on the x-axis (= 1 million comments per node) is from when we ran all of our historical comments. Further research is required to figure out why performance seems to have degraded for that point.

Interestingly, we also found that it seems when the number of comments per node increases above a about a million or so, the EMR task would fail without outputting any errors in the logs (this is what happened with the rightmost datapoint on the above graph). This may be due to insufficient resources to run the number of comments assigned to that node(we used Amazon’s m3.xlarge instances for each node on each run), but we haven’t done enough analysis to confirm this. The short-term solution to this problem was simply to provide more nodes and get the ratio of comments per node back down to around 1 million or so.