前回の投稿において、私はApache Flumeを使用したリアルタイムのTwitterデータの抽出方法を考案しました。現在、Twitterから大量のデータを取得しています。そのため、それを分析して何らかのトレンドを見つけたいと考えています。Twitterデータの感情分析を行うために、別のビッグデータツールであるApache Spark.
を使用します。Hortonworks, によると、“Apache Sparkは、データセットへの高速な反復アクセスを必要とするストリーミング、機械学習、またはSQLワークロードをデータワーカーが効率的に実行できるようにする、エレガントで表現力豊かな開発APIを備えた高速なインメモリデータ処理エンジンです。SparkがApache Hadoop YARN上で動作することで、世界中の開発者がSparkのパワーを活用し、インサイトを導き出し、Hadoop内の単一の共有データセット内でデータサイエンスのワークロードを充実させるアプリケーションを作成できるようになります。”
SparkプログラムはJAVA、Scala、Python、またはRで記述できます。今回は、JAVAとMavenを併用します。さらに、SparkはHDPとClouderaディストリビューションの両方に付属しています。現在使用されているバージョンはSpark 2です。
Sparkで感情分析を実行するために、新しいMavenプロジェクトを作成します。名前は『Twitter Sentiment Analyzer’』とします。次に、必要なすべてのメソッドを実装するクラス、“TwitterDataFlow.java”を作成します。
Language Tool: スペル修正ツール
当初、POCにおいて、ツイートのスペルが間違っていると感情分析の結果に悪影響が及ぶことがわかりました。そのため、SpellCheckerを導入します。これにより、感情分析に使用する前にツイートのスペルを修正することができます。
これは、‘CorrectSpell’という名前 of 新しい静的メソッドを作成することで行います。さらに、スペルをチェックして修正するためにLanguageToolを使用します。
According to LanguageTool’s GIT, によると、“LanguageToolは、英語、フランス語、ドイツ語、ポーランド語、ロシア語、およびその他20以上の言語に対応したオープンソースの校正ソフトウェアです。単純なスペルチェッカーでは検出できない多くのエラーを見つけます。”
次に、pom.xmlにlanguage toolの依存関係を追加します:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
その後、JLanguageToolクラスの静的クラスレベル変数langToolを定義します。そして、AmericanEnglishクラス of オブジェクトでlangToolを初期化します。
次に、入力としてString text(通常のテキスト)、戻り値の型もString(正しいスペルのテキスト)とするSpellCheckerという名前のメソッドをコーディングします。未チェックのテキストをパラメータとしてJLanguageToolのcheckメソッドを使用します。次に、RuleMatchのリストが返されます。JLanguageToolのJavaドキュメントによると、RuleMatchクラスは“テキストに一致するエラー・ルールと、その一致した位置に関する情報”
これに続いて、String型の‘result’、integer型の‘lastPos’、String型の‘tmp’という3つの変数を定義します。さらに、各RuleMatchについて、ツールから最初に提案されたスペルを使用して文を再構築します。それに伴い、必要に応じて適切なtry-catchブロックを追加しました。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
感情分析器: Stanford CoreNLP
Sparkを使用した感情分析の次のステップは、テキストから感情を検出することです。これを行うために、StanfordのCore NLPライブラリを使用して感情値を取得しています。次に、テキスト内の感情を検出するためにこのライブラリを実装する「StanfordSentiment」という名前のクラスを作成します。
そのためには、pom.xmlファイルに以下の依存関係を追加します:
<!– これはstanford Core NLPライブラリです –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– これはstanford Core NLPのモデルファイルです –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Stanford Core NLPのパイプラインのプロパティを定義する静的オブジェクト変数、‘props’を作成しています。できるだけ軽量にするために、最小限のプロパティを選択しました。その後、アノテーターをtokenize、ssplit、pos、parse、sentimentに設定しています。StanfordCoreNLPクラスの別の静的オブジェクト変数、‘pipeline’を作成します。最後に、‘props’プロパティを使用してパイプラインを初期化します。
GetSentiment:
入力をString、出力をDoubleとするGetSentimentメソッドを作成しました。LanguageCheck.javaファイルで作成したCorrectSpellメソッドを使用しています。LanguageCheckオブジェクトのCorrectSpellメソッドは、入力されたツイートの正しいスペルを返します。この修正されたテキストを使用して、StanfordCoreNLPのannotateメソッドを使用します。このライブラリによって提供される感情値は以下の通りです:
0 => 非常にネガティブ
1 => ネガティブ
2 => ニュートラル
3 => ポジティブ
4 => 非常にポジティブ
文ごとに結果から2を引くことで、以下の新しい感情カテゴリを取得しています:
-2 => 非常にネガティブ
-1 => ネガティブ
0 => ニュートラル
1 => ポジティブ
2 => 非常にポジティブ
各ツイートの感情の結果を、ツイートの各文の感情の平均として返しています。ツイートは構造化された形式で書かれていません。そのため、ツイートの特定の行に他の行よりも高い重みを割り当てることはできません。したがって、ツイートの各行は等しい重要性を持つと仮定しています。ツイートの最終的な感情値を持つDouble型の変数、‘total’を返しています。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
パッケージ com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("ツイート本文: " + checkedText); System.out.println("感情値: " + total); return total; } } |
プログラムの開始:
これら2つのクラスが完成したので、これらを使用することに進みます。そのため、新しいクラス「“TwitterDataFlow.java”」を作成します。まず、渡された入力引数の数が正確に2である場合にのみプログラムを実行する条件チェックを記述します。引数の数が2に等しくない場合、不適切な使用方法のメッセージを出力し、終了ステータス1で終了します。
アプリ名を「Sentiment Analyzer」としてSparkSessionを構築しています。SparkコンテキストのHadoop設定のプロパティ「“mapreduce input fileinputformat input dir recursive”」をtrueに設定しています。これにより、フォルダから再帰的にファイルを取得できるようになります。Stringクラスの変数「‘inputPath’」を作成し、そこに入力引数と、Flumeによって保存されたパーティション分割データを読み込むための「‘/*/*’」を設定しています。Dataset<Row>「‘data’」にFlumeのJSONデータを読み込んでいます。
次に、Spark SQL Contextに「‘Sentiment’」という名前のUDF(ユーザー定義関数)を登録します。これはStringを受け取り、その上でStanfordSentimentのGetSentimentメソッドを適用し、Double値のデータ型を返します。
現在、FlumeからApple、Google、Tesla、Infosys、TCS、Oracle、Microsoft、Facebookのキーワードのデータがあります。そのため、これらのキーワードを含むStringのリストを作成しています。これらの各企業について、以下の操作を実行しています。
操作フロー:
まず、結果を保存したいoutPathを作成します。データセット「‘data’」の上に一時ビュー「‘complete’」を作成します。次に、データからtimestamp、partitionBy(結果を保存する際にデータをパーティション分割するため)、text、main_text(正規表現に使用するため)、followersを抽出します。
結果の上に一時ビューを作成し、そこから特定の企業のデータをフィルタリングしています。また、感情分析UDFを適用しており、これにより「‘seVal’」列に感情値が返されます。シリアル化されたデータをメモリおよびディスクへのスピルとして永続化しています。このデータから、フォロワー数とそのツイートの感情値の積であるNetSentimentを取得しています。これにより、そのツイートが持ち得る影響力を知ることができます。これには異なる数式を使用することもできます。
感情分析は計算負荷の高いタスクであるため、結果全体を保存したいために、シリアル化されたデータをメモリとディスクに永続化しています。これを永続化せず、NetSentimentや影響力を計算するために複数の数式を使用する予定がある場合、前のクエリでSentimentメソッドの値を複数回使用することになり、ツイートの感情分析が複数回実行されることになります。
データグループ化:
現在、データをtimestampとpartitionBy列でグループ化し、このグループ化でNetSentimentを平均化しています。これにより、特定の1分間における企業の平均的な影響力(ポジティブまたはネガティブ)が得られます。その結果、各企業の結果をpartitionBy列でパーティショニングしてoutPathに書き込んでいます。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("引数の数が正しくありません! \n"); System.out.println( "使用法: Input Output \n " + "Input: パーティション分割されたデータの読み込み元となる場所" + "Output: 最終結果の保存先"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1はTimeStamp、partitionBy (Date)、ツイート本文、小文字のツイート本文、 // およびツイートしたユーザーのfollowers_countを抽出します Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // 特定の会社名が含まれるツイートをフィルタリングします Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // twitterという名前のビューを作成します tmp2.createOrReplaceTempView("twitter"); // tmp3には、選択されたすべてのデータとツイートの // 感情値(Sentiment)が含まれます Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // 別のビューを作成します tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // データを保存するための最終ビューを作成します net.createOrReplaceTempView("final"); // データを分単位でグループ化し、感情値を平均化します query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // 結果をresultデータセットに保存します result = spark.sql(query); // 結果をディスクに書き込みます result.write().partitionBy("partitionBy").json(outPath); } } } |
また、コードの完了後、すべての依存関係が含まれる実行可能なjarをエクスポートし、このジョブを実行したいサーバーにコピーします。次に、以下のコマンドを使用して送信できます。
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
ここで (ソース):
--master: クラスターの マスターURL (例:spark://23.195.26.187:7077)
--deploy-mode: ドライバーをワーカーノードにデプロイするか(cluster)、またはローカルに外部クライアントとしてデプロイするか(client)(デフォルト:client)application-jar: アプリケーションとすべての依存関係を含むバンドルされたjarへのパス。URLはクラスターの内部からグローバルに参照可能である必要があることに注意してください。例えば、すべてのノードに存在するhdfs://パスやfile://パスなどです。application-arguments: メインクラスのメインメソッドに渡される引数(ある場合)。ここでは、入力パスと出力パスです。
最終ステップ:
その後、出力パスからSparkを使用した感情分析の結果を取得します。例えば、以下はappleの考えられる結果です:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
私はこのアプリケーションを5ノードのHDPクラスター構成のCloudSigmaにデプロイしました。具体的には、各ノードの構成は以下の通りです:
256 GB SSD
16 GB RAM
20 GHz CPU
全体として、Sparkを使用した感情分析の結果を約19時間で取得することができました。さらに、80 GB以上のデータセットに対して、プログラムよりも高度な計算を含めました。
コードは以下で確認できます: GITHUB.
コメント
コメントはまだありません。最初のコメントを投稿しましょう。