在我們的上一篇貼文中,我研究出了一種方法,可以使用Apache Flume來擷取即時的 Twitter 資料。目前,我已經從 Twitter 獲取了大量資料。因此,我想對其進行分析並從中找出一些趨勢。為了對 Twitter 資料進行情緒分析,我將使用另一個大數據工具:Apache Spark.
根據Hortonworks, “Apache Spark 是一個快速的記憶體內資料處理引擎,具有優雅且極具表現力的開發 API,可讓資料工作者高效地執行需要對資料集進行快速反覆存取的串流、機器學習或 SQL 工作負載。隨著 Spark 在 Apache Hadoop YARN 上運行,各地的開發人員現在都可以建立應用程式來利用 Spark 的強大功能、獲取洞察,並在 Hadoop 內單一、共享的資料集中豐富其資料科學工作負載。”
Spark 程式可以使用 JAVA、Scala、Python 或 R 編寫。在此案例中,我們將使用 JAVA 搭配Maven。此外,Spark 同時提供 HDP 和 Cloudera 發行版。目前使用的是 Spark 2 版本。
為了使用 Spark 進行情緒分析,我正在建立一個新的 Maven 專案。我將其命名為 Twitter Sentiment Analyzer’。接下來,我將建立一個類別 “TwitterDataFlow.java”,並在其中實作所有必要的方法。
Language Tool:拼字糾正器
最初,在概念驗證(POC)中,我發現如果推文中的拼字錯誤,會對情緒分析的結果產生不利影響。因此,我引入了 SpellChecker。它將幫助我們在將推文用於情緒分析之前糾正其拼字。
我將透過建立一個名為 ‘CorrectSpell’ 的新靜態方法來實現這一點。此外,我將使用 LanguageTool 來檢查拼字並進行糾正。
根據LanguageTool’s GIT, “LanguageTool 是一款適用於英文、法文、德文、波蘭文、俄文以及20 多種其他語言的開源校對軟體。它能發現許多簡單拼字檢查工具無法偵測到的錯誤。”
接下來,我在 pom.xml 中為語言工具新增相依性:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
之後,我定義了一個 JLanguageTool 類別的靜態類別層級變數 langTool。然後,我使用 AmericanEnglish 類別的物件來初始化 langTool。
接下來,我正在編寫名為 SpellChecker 的方法,其輸入為 String text(一般文字),回傳類型也為 String(拼字正確的文字)。我使用 JLanguageTool 的 check 方法,並將未檢查的文字作為參數。接著,它會回傳一個 RuleMatch 列表。根據 JLanguageTool Java 文件,RuleMatch 類別提供了“與文字相符的錯誤規則以及相符位置的相關資訊。”
在此之後,我定義了三個變數:String 類型的 ‘result’、整數類型的 ‘lastPos’,以及 String 類型的 ‘tmp’。此外,針對每個 RuleMatch,我會使用該工具建議的第一個拼字來重新建構句子。至此,我已在需要的地方加入了必要的 try-catch 區塊。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
情感分析器:Stanford CoreNLP
使用 Spark 進行情感分析的下一步是從文本中找出情感。為了做到這一點,我正在使用 Stanford 的 Core NLP 函式庫來尋找情感值。然後,我建立了一個名為 ‘StanfordSentiment’ 的類別,我將在其中實作該函式庫以尋找我們文本中的情感。
為此,我正在 pom.xml 檔案中新增以下相依性:
<!– 這是 stanford Core NLP 函式庫 –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– 這是 stanford Core NLP 的模型檔案 –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
我正在建立一個靜態物件變數 ‘props’,它定義了 Stanford Core NLP 管線的屬性。我選擇了最少的屬性以使其儘可能輕量。之後,我將標註器設定為 tokenize、ssplit、pos、parse、sentiment。我正在建立另一個 StanfordCoreNLP 類別的靜態物件變數 ‘pipeline’。最後,我使用 ‘props’ 屬性初始化該管線。
GetSentiment:
我建立了一個名為 GetSentiment 的方法,其輸入為 String,輸出為 Double。我使用的是在 LanguageCheck.java 檔案中建立的 CorrectSpell 方法。LanguageCheck 物件的 CorrectSpell 方法會向我傳回所輸入推文的正確拼字。我對這個修正後的文字使用 StanfordCoreNLP 的 annotate 方法。該函式庫給出的情感值為:
0 => 非常負面
1 => 負面
2 => 中立
3 => 正面
4 => 非常正面
我將每句的結果減去 2,以得到以下新的情感類別:
-2 => 非常負面
-1 => 負面
0 => 中立
1 => 正面
2 => 非常正面
我將每條推文的情感結果傳回為該推文中每個句子情感的平均值。推文並非以任何結構化格式編寫。因此,我無法為推文的任何特定行分配比其他行更高的權重。因此,我假設推文中的每一行都具有同等的重要性。我傳回 Double 類型的變數 ‘total’,其中包含推文的最終情感值。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
套件 com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Tweet Text: " + checkedText); System.out.println("Sentiment Value: " + total); return total; } } |
程式初始化:
既然這兩個類別已經完成,我們將繼續使用它們。因此,我正在建立一個新類別 “TwitterDataFlow.java”。首先,我正在編寫一個條件檢查,只有在傳入的輸入參數數量剛好為 2 時才會執行程式。如果參數數量不等於 2,它會列印錯誤的使用方式訊息,並以結束狀態碼 1 退出。
我正在建立一個 SparkSession,應用程式名稱為 Sentiment Analyzer。我正在將 spark context 的 hadoop configuration 屬性 “mapreduce input fileinputformat input dir recursive” 設定為 true。這將使我能夠從資料夾中遞迴地檢索檔案。我正在建立一個 String 類別的變數 ‘inputPath’,在其中我設定了輸入參數以及 ‘/*/*’,這將使我能夠讀取由 Flume 儲存的分區資料。我正在將 Flume 的 json 資料讀取到 Dataset<Row> ‘data’ 中。
然後,我正在向 Spark SQL Context 註冊一個名為 ‘Sentiment’ 的 UDF(使用者定義函數),它接受一個 String,並對其套用 StanfordSentiment 的 GetSentiment 方法,然後返回 Double 數值資料類型。
目前,我有來自 flume 的關鍵字 Apple、Google、Tesla、Infosys、TCS、Oracle、Microsoft 和 Facebook 的資料。因此,我正在使用這些關鍵字建立一個 String 列表。針對這些公司的每一家,我正在執行以下操作。
操作流程:
首先,我正在建立一個我想用來儲存結果的 outPath。我正在資料集 ‘data’ 上建立一個暫存檢視表 ‘complete’。接下來,我正在從資料中提取 timestamp、partitionBy(以便在儲存結果時對資料進行分區)、text、main_text(用於正規表示式)和 followers。
我正在結果上建立一個暫存檢視表,並從中篩選出特定公司的資料。此外,我正在套用 Sentiment UDF,它會在 ‘seVal’ 欄位中返回情緒值。我正在將序列化資料保留在記憶體中並溢寫至磁碟。從這些資料中,我得到了 NetSentiment,即追蹤者數量與該推文情緒值的乘積。這有助於了解該推文可能產生的影響力。我也可以為此使用不同的公式。
我們正在將序列化資料保留在記憶體和磁碟中,因為我們希望儲存整個結果,而情緒分析是一項計算繁重的任務。如果我們不保留這些資料,而我們又計劃使用多個公式來計算 NetSentiment 或影響力,那麼在先前我們會多次使用 Sentiment 方法值的查詢中,它將會對該推文進行多次情緒分析。
資料分組:
現在,我正按 timestamp 和 partitionBy 欄位對數據進行分組,並以此分組計算 NetSentiment 的平均值。這給了我該公司在特定分鐘內的平均影響力(正面或負面)。因此,我將每個公司的結果寫入 outPath,並按 partitionBy 欄位進行分區。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("參數數量不正確! \n"); System.out.println( "用法:輸入 輸出 \n " + "輸入:需要讀取分區數據的位置" + "輸出:需要存儲最終結果的位置"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 提取 TimeStamp、partitionBy (Date)、推文文字、小寫的推文文字 // 以及發推文用戶的 followers_count Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // 過濾其中含有特定公司名稱的推文 Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // 建立一個名為 twitter 的視圖 tmp2.createOrReplaceTempView("twitter"); // tmp3 包含所有選取的資料以及推文的 // 情緒值 (Sentiment value) Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // 建立另一個視圖 tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // 建立最終視圖以儲存資料 net.createOrReplaceTempView("final"); // 透過對資料進行分組,計算每分鐘的平均情緒值 query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // 將結果儲存在 result 資料集中 result = spark.sql(query); // 將結果寫入磁碟 result.write().partitionBy("partitionBy").json(outPath); } } } |
此外,在程式碼完成後,我會匯出一個包含所有依賴項的可執行 jar,並將其複製到我想執行此作業的伺服器上。接下來,我可以使用以下命令來提交它:
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
其中 (來源):
--master: 該 master URL 叢集(例如spark://23.195.26.187:7077)
--deploy-mode: 是否將您的驅動程式部署在工作節點 (cluster) 上,或是作為外部用戶端 (client) 本地部署(預設值:client)application-jar: 包含您的應用程式和所有依賴項的打包 jar 路徑。請注意,該 URL 在您的叢集內部必須是全域可見的,例如hdfs://路徑或存在於所有節點上的file://路徑。application-arguments: 傳遞給主類別主方法的參數(如果有的話)。在這裡是輸入和輸出路徑
最終步驟:
然後,我們將從輸出路徑中獲取使用 Spark 進行情緒分析的結果。例如,這可能是 apple 的結果:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
我將此應用程式部署在 CloudSigma 上,使用 5 個節點的 HDP 叢集。具體而言,每個節點的配置如下:
256 GB SSD
16 GB RAM
20 GHz CPU
總而言之,我能夠在約 19 小時內使用 Spark 獲得情緒分析的結果。此外,我還在超過 80 GB 的資料集上進行了比該程式更進階的計算。
程式碼可以在 GITHUB.
留言
目前尚無留言。成為第一個留言的人吧。