在我们的前一篇文章中,我研究出了一种利用Apache Flume提取实时 Twitter 数据的方法。目前,我已经从 Twitter 获取了大量数据。因此,我想对其进行分析并从中发现一些趋势。为了对 Twitter 数据进行情感分析,我将使用另一个大数据工具,Apache Spark.
根据Hortonworks, “Apache Spark 是一种快速的内存数据处理引擎,具有优雅且富有表现力的开发 API,可让数据工作者高效地执行需要对数据集进行快速迭代访问的流式处理、机器学习或 SQL 工作负载。随着 Spark 在 Apache Hadoop YARN 上运行,各地的开发人员现在都可以创建应用程序来利用 Spark 的强大功能、获取洞察力,并在 Hadoop 的单个共享数据集中丰富其数据科学工作负载。”
Spark 程序可以用 JAVA、Scala、Python 或 R 编写。在本例中,我们将使用 JAVA 以及Maven。此外,Spark 同时支持 HDP 和 Cloudera 发行版。目前使用的是 Spark 2 版本。
为了使用 Spark 进行情感分析,我正在创建一个新的 Maven 项目。我将其命名为 Twitter Sentiment Analyzer’。接下来,我将创建一个类 “TwitterDataFlow.java”,在其中实现所有必需的方法。
语言工具:拼写纠错器
最初,在概念验证(POC)中,我发现如果推文中的拼写错误,情感分析的结果会受到不利影响。因此,我引入了一个拼写检查器(SpellChecker)。它将帮助我们在将推文用于情感分析之前纠正其拼写。
我将通过创建一个名为 ‘CorrectSpell’ 的新静态方法来实现这一点。此外,我将使用 LanguageTool 来检查拼写并进行纠正。
根据LanguageTool’s GIT, “LanguageTool 是一款开源的校对软件,支持英语、法语、德语、波兰语、俄语以及20 多种其他语言。它能发现许多简单拼写检查器无法检测到的错误。”
接下来,我在 pom.xml 中添加语言工具的依赖项:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
之后,我定义了一个 JLanguageTool 类的静态类级变量 langTool。然后,我用 AmericanEnglish 类的对象初始化 langTool。
接下来,我编写名为 SpellChecker 的方法,其输入为 String text(普通文本),返回类型也为 String(拼写纠正后的文本)。我使用 JLanguageTool 的 check 方法,参数为未检查的文本。接着,它返回一个 RuleMatch 列表。根据 JLanguageTool Java 文档,RuleMatch 类提供了“有关与文本匹配的错误规则以及匹配位置的信息。”
在此之后,我定义了三个变量:String 类型的 ‘result’、integer 类型的 ‘lastPos’ 以及 String 类型的 ‘tmp’。此外,对于每个 RuleMatch,我都会使用该工具推荐的第一个拼写来重新构建句子。至此,我已在需要的地方添加了必要的 try-catch 块。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
情感分析器:Stanford CoreNLP
使用 Spark 进行情感分析的下一步是从文本中寻找情感。为了做到这一点,我正在使用 Stanford’s Core NLP 库来寻找情感值。然后,我创建了一个名为 ‘StanfordSentiment’ 的类,我将在其中实现该库以寻找我们文本中的情感。
为此,我正在 pom.xml 文件中添加以下依赖项:
<!– This is stanford Core NLP Library –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– This is stanford Core NLP’s models file –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
我正在创建一个静态对象变量 ‘props’,它定义了 Stanford Core NLP’s 管道的属性。我选择了最少的属性以使其尽可能轻量。之后,我将标注器(annotators)设置为 tokenize, ssplit, pos, parse, sentiment。我正在创建另一个 StanfordCoreNLP 类的静态对象变量 ‘pipeline’。最后,我使用 ‘props’ 属性初始化该管道。
GetSentiment:
我创建了一个名为 GetSentiment 的方法,其输入为 String,输出为 Double。我正在使用在 LanguageCheck.java 文件中创建的 CorrectSpell 方法。LanguageCheck 对象的 CorrectSpell 方法会向我返回输入的推文的正确拼写。我对这个纠正后的文本使用 StanfordCoreNLP 的 annotate 方法。该库提供的情感值如下:
0 => 非常消极
1 => 消极
2 => 中性
3 => 积极
4 => 非常积极
我将每句话的结果减去 2,以获得以下新的情感类别:
-2 => 非常消极
-1 => 消极
0 => 中性
1 => 积极
2 => 非常积极
我将每条推文的情感结果作为该推文中每句话情感的平均值返回。推文不是以任何结构化格式编写的。因此,我无法为推文的任何特定行分配比其他行更高的权重。因此,我假设推文中的每一行都具有同等的重要性。我返回 Double 类型的变量 ‘total’,其中包含推文的最终情感值。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
包 com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("推文文本:" + checkedText); System.out.println("情感值:" + total); return total; } } |
程序启动:
现在这两个类已经完成了,我们将继续使用它们。因此,我正在创建一个新类 “TwitterDataFlow.java”。首先,我正在编写一个条件检查,只有在传入的输入参数数量正好为 2 时才会运行该程序。如果参数数量不等于 2,它会打印错误的使用信息,并以退出状态 1 退出。
我正在构建一个应用名称为 Sentiment Analyzer 的 SparkSession。我正在将 spark context’s hadoop configuration’s 属性 “mapreduce input fileinputformat input dir recursive” 设置为 true。这将允许我从文件夹中递归检索文件。我正在创建一个 String 类的变量 ‘inputPath’,在其中设置输入参数以及 ‘/*/*’,这将允许我读取由 Flume 存储的分区数据。我正在将 Flume 的 json 数据读取到 Dataset<Row> ‘data’ 中。
然后,我正在向 Spark SQL Context 注册一个名为 ‘Sentiment’ 的 UDF(用户自定义函数),它接收一个 String 并对其应用 StanfordSentiment’s GetSentiment 方法,然后返回 Double 值数据类型。
目前,我拥有来自 flume 的关键词 Apple、Google、Tesla、Infosys、TCS、Oracle、Microsoft 和 Facebook 的数据。因此,我正在使用这些关键词创建一个 String 列表。对于这些公司中的每一家,我都在运行以下操作。
操作流程:
首先,我正在创建一个用于保存结果的 outPath。我正在数据集 ‘data’ 上创建一个临时视图 ‘complete’。接下来,我正在从数据中提取 timestamp、partitionBy(以便在存储结果时对数据进行分区)、text、main_text(用于正则表达式)和 followers。
我正在对结果创建一个临时视图,并从中过滤特定公司的数据。此外,我正在应用 Sentiment UDF,它会在 ‘seVal’ 列中返回情感值。我正在将序列化数据持久化到内存中并溢写到磁盘。从这些数据中,我得到了 NetSentiment,即关注者数量与该推文情感值的乘积。这有助于了解该推文可能产生的影响。我可以为此使用不同的公式。
我们将序列化数据持久化在内存和磁盘中,因为我们希望存储整个结果,因为情感分析是一项计算密集型任务。如果我们不持久化这些数据,并且计划使用多个公式来计算 NetSentiment 或影响力,那么在之前的查询中,如果我们多次使用 Sentiment 方法的值,它将对推文进行多次情感分析。
数据分组:
现在,我按时间戳和 partitionBy 列对数据进行分组,并在此分组下对 NetSentiment 求平均值。这为我提供了该公司在特定分钟内的平均影响力(正面或负面)。因此,我将每个公司的结果写入 outPath,并按 partitionBy 列对其进行分区。
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("参数数量不正确! \n"); System.out.println( "用法:输入 输出 \n " + "输入:需要从中读取分区数据的位置" + "输出:需要存储最终结果的位置"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 提取 TimeStamp、partitionBy (Date)、推文文本、小写 形式的推文文本以及发布推文用户的 followers_count Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // 过滤其中包含特定公司名称的推文 Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // 创建一个名为 twitter 的视图 tmp2.createOrReplaceTempView("twitter"); // tmp3 包含整个选定的数据以及推文的 情感值 Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // 创建另一个视图 tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // 创建最终视图以保存数据 net.createOrReplaceTempView("final"); // 通过对数据进行分组,计算每分钟的平均情感值 query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // 将结果保存到 result 数据集中 result = spark.sql(query); // 将结果写入磁盘 result.write().partitionBy("partitionBy").json(outPath); } } } |
此外,在代码完成后,我导出一个包含所有依赖项的可运行 jar,并将其复制到我想运行此作业的服务器上。接下来,我可以使用以下命令提交它,
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
其中 (源):
--master: master URL,用于集群(例如spark://23.195.26.187:7077)
--deploy-mode: 是否在工作节点(cluster)上部署驱动程序,还是在本地作为外部客户端(client)部署(默认:client)application-jar: 包含您的应用程序和所有依赖项的打包 jar 的路径。请注意,该 URL 在集群内部必须是全局可见的,例如,存在于所有节点上的hdfs://路径或file://路径。application-arguments: 传递给主类主方法的参数(如果有)。这里是输入和输出路径
最终步骤:
然后,我们将从输出路径中获取使用 Spark 进行情感分析的结果。例如,以下是 apple 的一个可能结果:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
我将此应用程序部署在具有 5 个节点的 HDP 集群的 CloudSigma 上。具体而言,每个节点具有以下配置:
256 GB SSD
16 GB RAM
20 GHz CPU
总而言之,我能够在约 19 小时内通过 Spark 情感分析获得结果。此外,我在超过 80 GB 的数据集上进行了比该程序更高级的计算。
代码可以在 GITHUB.
评论
暂无评论。发表第一条评论吧。