返回部落格

使用 Spark 進行 Twitter 情感分析

使用 Spark 進行 Twitter 情感分析

在我們的上一篇貼文中,我研究出了一種方法,可以使用Apache Flume來擷取即時的 Twitter 資料。目前,我已經從 Twitter 獲取了大量資料。因此,我想對其進行分析並從中找出一些趨勢。為了對 Twitter 資料進行情緒分析,我將使用另一個大數據工具:Apache Spark.

根據Hortonworks, “Apache Spark 是一個快速的記憶體內資料處理引擎,具有優雅且極具表現力的開發 API,可讓資料工作者高效地執行需要對資料集進行快速反覆存取的串流、機器學習或 SQL 工作負載。隨著 Spark 在 Apache Hadoop YARN 上運行,各地的開發人員現在都可以建立應用程式來利用 Spark 的強大功能、獲取洞察,並在 Hadoop 內單一、共享的資料集中豐富其資料科學工作負載。”

Spark 程式可以使用 JAVA、Scala、Python 或 R 編寫。在此案例中,我們將使用 JAVA 搭配Maven。此外,Spark 同時提供 HDP 和 Cloudera 發行版。目前使用的是 Spark 2 版本。

為了使用 Spark 進行情緒分析,我正在建立一個新的 Maven 專案。我將其命名為 Twitter Sentiment Analyzer’。接下來,我將建立一個類別 “TwitterDataFlow.java”,並在其中實作所有必要的方法。

Language Tool:拼字糾正器

最初,在概念驗證(POC)中,我發現如果推文中的拼字錯誤,會對情緒分析的結果產生不利影響。因此,我引入了 SpellChecker。它將幫助我們在將推文用於情緒分析之前糾正其拼字。

我將透過建立一個名為 ‘CorrectSpell’ 的新靜態方法來實現這一點。此外,我將使用 LanguageTool 來檢查拼字並進行糾正。

根據LanguageTool’s GIT, “LanguageTool 是一款適用於英文、法文、德文、波蘭文、俄文以及20 多種其他語言的開源校對軟體。它能發現許多簡單拼字檢查工具無法偵測到的錯誤。”

接下來,我在 pom.xml 中為語言工具新增相依性:

<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>

之後,我定義了一個 JLanguageTool 類別的靜態類別層級變數 langTool。然後,我使用 AmericanEnglish 類別的物件來初始化 langTool。

接下來,我正在編寫名為 SpellChecker 的方法,其輸入為 String text(一般文字),回傳類型也為 String(拼字正確的文字)。我使用 JLanguageTool 的 check 方法,並將未檢查的文字作為參數。接著,它會回傳一個 RuleMatch 列表。根據 JLanguageTool Java 文件,RuleMatch 類別提供了“與文字相符的錯誤規則以及相符位置的相關資訊。”

在此之後,我定義了三個變數:String 類型的 ‘result’、整數類型的 ‘lastPos’,以及 String 類型的 ‘tmp’。此外,針對每個 RuleMatch,我會使用該工具建議的第一個拼字來重新建構句子。至此,我已在需要的地方加入了必要的 try-catch 區塊。

 

情感分析器:Stanford CoreNLP

使用 Spark 進行情感分析的下一步是從文本中找出情感。為了做到這一點,我正在使用 Stanford 的 Core NLP 函式庫來尋找情感值。然後,我建立了一個名為 ‘StanfordSentiment’ 的類別,我將在其中實作該函式庫以尋找我們文本中的情感。

為此,我正在 pom.xml 檔案中新增以下相依性:

<!– 這是 stanford Core NLP 函式庫 –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>

<!– 這是 stanford Core NLP 的模型檔案 –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>

我正在建立一個靜態物件變數 ‘props’,它定義了 Stanford Core NLP 管線的屬性。我選擇了最少的屬性以使其儘可能輕量。之後,我將標註器設定為 tokenize、ssplit、pos、parse、sentiment。我正在建立另一個 StanfordCoreNLP 類別的靜態物件變數 ‘pipeline’。最後,我使用 ‘props’ 屬性初始化該管線。

GetSentiment:

我建立了一個名為 GetSentiment 的方法,其輸入為 String,輸出為 Double。我使用的是在 LanguageCheck.java 檔案中建立的 CorrectSpell 方法。LanguageCheck 物件的 CorrectSpell 方法會向我傳回所輸入推文的正確拼字。我對這個修正後的文字使用 StanfordCoreNLP 的 annotate 方法。該函式庫給出的情感值為:

0 => 非常負面

1 => 負面

2 => 中立

3 => 正面

4 => 非常正面

 

我將每句的結果減去 2,以得到以下新的情感類別:

-2 => 非常負面

-1 => 負面

0 => 中立

1 => 正面

2 => 非常正面

 

我將每條推文的情感結果傳回為該推文中每個句子情感的平均值。推文並非以任何結構化格式編寫。因此,我無法為推文的任何特定行分配比其他行更高的權重。因此,我假設推文中的每一行都具有同等的重要性。我傳回 Double 類型的變數 ‘total’,其中包含推文的最終情感值。

 

程式初始化:

既然這兩個類別已經完成,我們將繼續使用它們。因此,我正在建立一個新類別 “TwitterDataFlow.java”。首先,我正在編寫一個條件檢查,只有在傳入的輸入參數數量剛好為 2 時才會執行程式。如果參數數量不等於 2,它會列印錯誤的使用方式訊息,並以結束狀態碼 1 退出。

我正在建立一個 SparkSession,應用程式名稱為 Sentiment Analyzer。我正在將 spark context 的 hadoop configuration 屬性 “mapreduce input fileinputformat input dir recursive” 設定為 true。這將使我能夠從資料夾中遞迴地檢索檔案。我正在建立一個 String 類別的變數 ‘inputPath’,在其中我設定了輸入參數以及 ‘/*/*’,這將使我能夠讀取由 Flume 儲存的分區資料。我正在將 Flume 的 json 資料讀取到 Dataset<Row> ‘data’ 中。

然後,我正在向 Spark SQL Context 註冊一個名為 ‘Sentiment’ 的 UDF(使用者定義函數),它接受一個 String,並對其套用 StanfordSentiment 的 GetSentiment 方法,然後返回 Double 數值資料類型。

目前,我有來自 flume 的關鍵字 Apple、Google、Tesla、Infosys、TCS、Oracle、Microsoft 和 Facebook 的資料。因此,我正在使用這些關鍵字建立一個 String 列表。針對這些公司的每一家,我正在執行以下操作。

操作流程:

首先,我正在建立一個我想用來儲存結果的 outPath。我正在資料集 ‘data’ 上建立一個暫存檢視表 ‘complete’。接下來,我正在從資料中提取 timestamp、partitionBy(以便在儲存結果時對資料進行分區)、text、main_text(用於正規表示式)和 followers。

我正在結果上建立一個暫存檢視表,並從中篩選出特定公司的資料。此外,我正在套用 Sentiment UDF,它會在 ‘seVal’ 欄位中返回情緒值。我正在將序列化資料保留在記憶體中並溢寫至磁碟。從這些資料中,我得到了 NetSentiment,即追蹤者數量與該推文情緒值的乘積。這有助於了解該推文可能產生的影響力。我也可以為此使用不同的公式。

我們正在將序列化資料保留在記憶體和磁碟中,因為我們希望儲存整個結果,而情緒分析是一項計算繁重的任務。如果我們不保留這些資料,而我們又計劃使用多個公式來計算 NetSentiment 或影響力,那麼在先前我們會多次使用 Sentiment 方法值的查詢中,它將會對該推文進行多次情緒分析。

資料分組:

現在,我正按 timestamp 和 partitionBy 欄位對數據進行分組,並以此分組計算 NetSentiment 的平均值。這給了我該公司在特定分鐘內的平均影響力(正面或負面)。因此,我將每個公司的結果寫入 outPath,並按 partitionBy 欄位進行分區。

此外,在程式碼完成後,我會匯出一個包含所有依賴項的可執行 jar,並將其複製到我想執行此作業的伺服器上。接下來,我可以使用以下命令來提交它:

其中 (來源):

  • --master: 該 master URL 叢集(例如 spark://23.195.26.187:7077)
  • --deploy-mode: 是否將您的驅動程式部署在工作節點 (cluster) 上,或是作為外部用戶端 (client) 本地部署(預設值: client)
  • application-jar: 包含您的應用程式和所有依賴項的打包 jar 路徑。請注意,該 URL 在您的叢集內部必須是全域可見的,例如 hdfs:// 路徑或存在於所有節點上的 file:// 路徑。
  • application-arguments: 傳遞給主類別主方法的參數(如果有的話)。在這裡是輸入和輸出路徑

最終步驟:

然後,我們將從輸出路徑中獲取使用 Spark 進行情緒分析的結果。例如,這可能是 apple 的結果:

{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}

我將此應用程式部署在 CloudSigma 上,使用 5 個節點的 HDP 叢集。具體而言,每個節點的配置如下:

256 GB SSD
16 GB RAM
20 GHz CPU

總而言之,我能夠在約 19 小時內使用 Spark 獲得情緒分析的結果。此外,我還在超過 80 GB 的資料集上進行了比該程式更進階的計算。

程式碼可以在 GITHUB.

 

author

Akshay Nagpal

作者 · CloudSigma

Preslav Dobrev 是 CloudSigma 的創意設計師,專注於透過傳統與創新行銷渠道建立一致的企業形象。他擅長將藝術願景與策略行銷相融合,創造具有影響力的品牌敘事。

留言

目前尚無留言。成為第一個留言的人吧。