Natural Language Processing with Spark
One of the goals of the Analytics team has been to provide newer, more in-depth ways to analyze the millions of comments that Reputation aggregates from various sources for each customer. One way to do this is through natural language processing (NLP) techniques like part-of-speech(POS) tagging, named entity recognition(NER), and stemming/lemmatization. Combining these NLP techniques with our existing segmentation tools allows us to begin comparing statistics across sets defined by the language content of those comments. For example, we could look at the set of Walgreens comments that mention Rite-Aid and see that these had higher than average ratings in comparison to the total set of Walgreens comments.
These evaluations, however, initially required us to load the set of comments that we wished to analyze into Python, then run each comment through a natural language parser one at a time locally each time we wanted to run an analysis. The overhead required to parse each of these reviews began to impede our ability to rapidly test different types of analyses, so we began to look into alternative methods for achieving this goal. What we were ultimately looking for was a pre-processed database that would allow us to look up a comment by id and receive a set of POS tags, named entities, and lemmas without having to re-parse each comment each time. This natural language pre-processing would need to be done retroactively to the tens of millions of comments already stored in our database, as well as incrementally on any new comments that have been pulled in every few days.
Since much of our analysis framework was already implemented in Python, we began adding this new NLP piece in Python as well. Of the various NLP libraries available to Python at the time of this writing, the one that seemed to work best on the 2-3 sentence reviews in our database was the CoreNLP library from Stanford. Essentially CoreNLP comes with a series of models that have been trained on a large corpus of sample words for different languages (presently English, Arabic, Chinese, French and German). These models are then used to evaluate the likely part-of-speech of new inputs based on patterns learned from the original training data. The library also uses similar processes to determine which words in a given input are references to some named entity (for example an organization, individual name, or location name) and to identify the stem form of each word for easier pattern analysis.
The downside of using CoreNLP, however, is that in order to run, it starts up a new, separate Java process which is then passed one comment at a time for parsing. Starting up this Java process creates 5-10 minutes of overhead for processing a set of comments of any size, and even once this separate process is running it can take a few minutes to fully parse an average length comment (3-5 sentences). Thus to run all the millions of historical comments through CoreNLP in a serial fashion would be computationally infeasible. Instead, we decided to use Apache Spark to bring up a distributed cluster to run these comments through CoreNLP in parallel.
Spark provides a set of libraries in either Python, Scala, R, or Java that handle the hassle of creating a distributed cluster of nodes and efficiently distributing data between them. While it can be used for a wide variety of purposes, we used it to take the set of comments that we needed to evaluate and figure out how to split those comments amongst clusters of varying sizes in order to reduce the time necessary to run all of our historical data through CoreNLP. Using Spark also provided the added bonus of easily integrating with AWS’ Elastic Map-Reduce (EMR) service, which has an easy-to-use command line interface for bringing up clusters of EC2 nodes. Amazon has preconfigured settings to automatically pass the relevant information about each EMR cluster through to Spark so that we can easily bring up any number of nodes with the same code. This makes it easy to setup a cron task to automatically parse the last few days worth of reviews on a regular basis.
Additionally, while we originally set out to create a Python application to interact with Spark and CoreNLP, we eventually discovered that we needed the ability to more carefully control which information CoreNLP passed to each Spark process. Since Spark is capable of running multiple threads on each node in order to better parallelize and since each thread runs a separate version of our Spark application, we noticed that each Python application in each thread was instantiating its own CoreNLP Java process. This meant that if we had 4 threads running on the same node, we would also have 4 CoreNLP Java processes running on that node, which would slow that node’s performance to a crawl. To get around this, we had to translate our application into Scala instead. Scala allows for the existence of transient variables, which allowed us to write our code in such a way that when multiple threads are running on the same node, they all use the same CoreNLP Java process, but whenever a new node is brought up it brings up a new process. (Thanks to Databrick’s Spark/CoreNLP wrapper for this idea!)
Below is some of the code from our Scala-based Spark application. It is designed to do the following:
- Pull in some number of reviews from our Vertica database.
- Distribute those reviews to a cluster of independent nodes.
- Run each review through the CoreNLP process for that node.
- Format CoreNLP’s output so that it can uploaded back into Vertica
- Upload the natural language data (POS tags, NER tags, and lemmas) back into the database
Once our Spark application was working on local developer machines, we began testing running it through EMR’s distributed clusters instead. Initially we ran into some headaches getting Spark to fully utilize the resources made available to it through EMR. There is a line in the code above that talks about pulling in the number of nodes available through the Spark Config (val num_exec = sc.getConf.get(“spark.executor.instances”).toInt). This line tells spark how many nodes it has available so that it can partition the data accordingly. Below are two screenshots of the CPU usage per node in AWS from before this change and after it:
Before proper partitioning – Notice that in this case, the node in blue is the only one that appears to be actually doing any parsing. This is because Spark defaults to assuming a single data partition, so it runs all the comments through the master node.
After proper partitioning – By explicitly telling Spark how many nodes to use, we can see that it now runs some comments through all 8 nodes. (Thanks to Cloudera for explaining this and more about how to properly tune Spark jobs!)
Additionally we ran into some trouble getting EMR to communicate with Vertica through the database’s security restrictions, which involved playing with our VPN settings. Once these hurdles were dealt with though, we were able to begin testing the scaling power of this CoreNLP/Spark/EMR solution. The following graph shows the number of minutes it took Spark to run as dependent on the number of thousands of comments run per each instance in the EMR cluster. As you can see, the time to run increases linearly as a function of how many comments each node is required to run.
Minute to Run vs. # of thousands of comments per node in cluster – This graph shows the time it takes Spark to run our process as a function of number of comments per each distributed node in the cluster. It shows a linear relationship more or less up until the point where there are more than a million comments per node.
The outlier point at 1000 on the x-axis (= 1 million comments per node) is from when we ran all of our historical comments. Further research is required to figure out why performance seems to have degraded for that point.
Interestingly, we also found that it seems when the number of comments per node increases above a about a million or so, the EMR task would fail without outputting any errors in the logs (this is what happened with the rightmost datapoint on the above graph). This may be due to insufficient resources to run the number of comments assigned to that node(we used Amazon’s m3.xlarge instances for each node on each run), but we haven’t done enough analysis to confirm this. The short-term solution to this problem was simply to provide more nodes and get the ratio of comments per node back down to around 1 million or so.