sentiment analysis using Spark featured image

Sentiment Analysis of Twitter using Spark

In our previous post, I worked out a way to extract real-time Twitter data using Apache Flume. Currently, I have got a lot of data from Twitter. Therefore,  I would want to analyze it and find some trends from it. In order to perform sentiment analysis of the Twitter data, I am going to use another Big Data tool, Apache Spark.

According to Hortonworks, “Apache Spark is a fast, in-memory data processing engine with elegant and expressive development APIs to allow data workers to efficiently execute streaming, machine learning or SQL workloads that require fast iterative access to datasets. With Spark running on Apache Hadoop YARN, developers everywhere can now create applications to exploit Spark’s power, derive insights, and enrich their data science workloads within a single, shared dataset in Hadoop.”

A Spark program can be written in JAVA, Scala, Python or R. In this case, we will be using JAVA along with Maven. In addition, Spark comes with both HDP and Cloudera distribution. Spark 2 is the current version being used.

In order to perform the sentiment analysis with Spark, I am creating a new Maven project. I am naming it Twitter Sentiment Analyzer’. Next, I am creating a class, “TwitterDataFlow.java” in which I would implement all the required methods.

Language Tool: Spell Corrector

Initially, in the POC, I found that if the spelling in the tweets is wrong, the results of the Sentiment Analysis are adversely affected. Therefore, I am introducing a SpellChecker. It will help us correct the spelling of the tweets before using them for Sentiment Analysis.

I am going to do this by creating a new static method named ‘CorrectSpell’. Furthermore, I am going to use a LanguageTool in order to check the spellings and correct them.

According to LanguageTool’s GIT, “LanguageTool is an Open Source proofreading software for English, French, German, Polish, Russian, and more than 20 other languages. It finds many errors that a simple spell checker cannot detect.”

Next, I am adding a dependency for the language tool in pom.xml:

<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>

After that, I am defining a static class level variable langTool of class JLanguageTool. Then, I am initializing langTool with an object of class AmericanEnglish.

Next, I am coding the method named SpellChecker with input as String text (normal text) and return type as String (Text with Correct Spellings) as well. I am using the check method of JLanguageTool with the parameter as unchecked text. Next, it returns a list of RuleMatch. According to JLanguageTool Java Docs, RuleMatch class provides “information about an error rule that matches text and the position of the match.”

Following this, I am defining three variables, ‘result’ of type String, ‘lastPos’ of type integer, ‘tmp’ of type String. In addition, with each RuleMatch, I am recreating the sentence with first suggested spelling from the tool. With that, I have added necessary try-catch blocks wherever required.

 

Sentiment Analyzer: Stanford CoreNLP

The next step in the sentiment analysis with Spark is to find sentiments from the text. In order to do this, I am using Stanford’s Core NLP Library to find sentiment values. Then, I am creating a class named ‘StanfordSentiment’ where I am going to implement the library to find the sentiments within our text.

To do that, I am adding the following dependencies in pom.xml file:

<!– This is stanford Core NLP Library –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>

<!– This is stanford Core NLP’s models file –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>

I am creating a static object variable, ‘props’, which defines properties for Stanford Core NLP’s pipeline. I have selected the minimum properties to make it as light as possible. After that, I am setting the annotators to tokenize, ssplit, pos, parse, sentiment. I am creating another static object variable, ‘pipeline’ of class StanfordCoreNLP. Finally, I initialize the pipeline with ‘props’ properties.

GetSentiment:

I created a method, GetSentiment with input as String and output as Double. I am using CorrectSpell method that I created in LanguageCheck.java file. The CorrectSpell method of LanguageCheck object returns me the correct spelling of the tweet entered. I use annotate method of StanfordCoreNLP with this corrected text. The sentiment values that are given by this library are:

0 => very negative

1 => negative

2 => neutral

3 => positive

4 => very positive

 

I am subtracting the result by 2 per sentence to get the following new Sentiment categories:

-2 => very negative

-1 => negative

0 => neutral

1 => positive

2 => very positive

 

I am returning the result of the sentiment of each tweet as the average of the sentiment of each sentence of the tweet. Tweets are not written in any structured format. Hence, I can’t allocate any specific line of a tweet higher weight than others. Therefore, I am assuming each line in a tweet has an equal importance. I am returning the variable, ‘total’ of type Double which has the resultant sentiment value of the tweet.

 

Program Initiation:

Now that these two classes are done, we will move forward to use the same. Thus, I am creating a new class, “TwitterDataFlow.java”. First, I am writing a conditional check that would only run the program if the number of input arguments passed is exactly 2. If the number of arguments is not equal to 2, it prints the incorrect usage message and also exits with an exit status 1.

I am building a SparkSession with app name as Sentiment Analyzer. I am setting spark context’s hadoop configuration’s property, “mapreduce input fileinputformat input dir recursive” as true. This would let me retrieve files recursively from folders. I am creating a variable, ‘inputPath’ of the class String in which I am setting the input argument as well as ‘/*/*’ which would let me read the partitioned data stored by Flume. I am reading the json data of Flume in Dataset<Row> ‘data’.

Then, I am registering a UDF (User Defined Function) with Spark SQL Context, named ‘Sentiment’ which takes a String and applies StanfordSentiment’s GetSentiment method over it and returns Double value datatype.

Currently, I have data of keywords Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft and Facebook from flume. So, I am creating a list of String with these keywords. For each of these companies, I am running the following operations.

Operations flow:

First, I am creating an outPath where I want to save the results. I am creating a temp view, ‘complete’ over the dataset, ‘data’. Next, I am extracting timestamp, partitionBy (in order to partition the data while storing the results), text, main_text (to use for regular expressions), followers from the data.

I am creating a temp view over the results and filtering particular company’s data from that. Also, I am applying Sentiment UDF, which returns me the sentiment values in the column ‘seVal’. I am persisting the serialized data in memory and as disk spill. From this data, I am getting NetSentiment, the product of Number of Followers and the Sentiment Value of that tweet. This helps in knowing the influence that tweet can have. I can have different formulas for the same.

We are persisting the serialized data in memory and disk as we want the entire result to be stored, as sentiment analysis is a computational heavy task. If we don’t persist this and we plan to use multiple formulas to calculate the NetSentiment or the influence, in the previous query where we would be using Sentiment method’s value multiple times, it would be doing Sentiment Analysis of the tweet multiple times.

Data grouping:

Now, I am grouping the data by timestamp and partitionBy column and averaging the NetSentiment with this grouping. This gives me average influence of the company, positive or negative in a particular minute. Consequently, I am writing the results for each company in outPath partitioning it by partitionBy column.

Also, after the completion of the code, I export a runnable jar with all the dependencies in it and copy it to the server, where I want to run this job. Next, I can submit it using the following command,

where (Source):

  • --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
  • application-jar: Path to a bundled jar including your application and all dependencies.  Take into consideration that the URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any. Here, input and output path

Final Steps:

Then, we will get the results from the sentiment analysis using Spark from output path. For example, this is a possible result of apple:

{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}

I deployed this application on CloudSigma with a 5 node HDP cluster. Specifically, with each node having the following configuration:

256 GB SSD
16 GB RAM
20 GHz CPU

All in all, I was able to get results from the sentiment analysis using Spark in approximately 19 hours. Furthermore, I included more advanced calculations than the program over a data set of 80+ GB.

The code can be found on GITHUB.