우리의 이전 포스트에서, 저는 Apache Flume을 사용하여 실시간 트위터 데이터를 추출하는 방법을 고안했습니다. 현재 저는 트위터로부터 많은 데이터를 확보했습니다. 따라서 이를 분석하고 여기서 몇 가지 트렌드를 찾아내고 싶습니다. 트위터 데이터의 감성 분석을 수행하기 위해, 또 다른 빅데이터 도구인 Apache Spark.
를 사용할 것입니다. Hortonworks, 에 따르면, “Apache Spark는 데이터 작업자가 스트리밍, 머신러닝 또는 SQL 워크로드를 효율적으로 실행할 수 있도록 우아하고 표현력이 뛰어난 개발 API를 갖춘 빠른 인메모리 데이터 처리 엔진으로, 데이터 세트에 대한 빠른 반복 액세스가 필요합니다. Spark가 Apache Hadoop YARN에서 실행됨에 따라, 이제 어디서나 개발자는 Spark의 강력한 기능을 활용하고, 인사이트를 도출하며, Hadoop 내의 단일 공유 데이터 세트 내에서 데이터 과학 워크로드를 풍부하게 하는 애플리케이션을 만들 수 있습니다.”
Spark 프로그램은 JAVA, Scala, Python 또는 R로 작성할 수 있습니다. 이 경우, 우리는 Maven과 함께 JAVA를 사용할 것입니다. 또한 Spark는 HDP 및 Cloudera 배포판 모두와 함께 제공됩니다. 현재 사용되는 버전은 Spark 2입니다.
Spark로 감성 분석을 수행하기 위해 새로운 Maven 프로젝트를 생성하고 있습니다. 프로젝트 이름은 'Twitter Sentiment Analyzer’'로 지정하겠습니다. 다음으로, 필요한 모든 메서드를 구현할 “TwitterDataFlow.java” 클래스를 생성합니다.
Language Tool: 맞춤법 교정기
처음 POC를 진행할 때 트윗의 맞춤법이 틀리면 감성 분석 결과에 부정적인 영향을 미친다는 것을 발견했습니다. 따라서 SpellChecker를 도입하고자 합니다. 이는 감성 분석에 트윗을 사용하기 전에 트윗의 맞춤법을 교정하는 데 도움이 될 것입니다.
저는 ‘CorrectSpell’이라는 새로운 정적 메서드를 생성하여 이를 수행할 것입니다. 또한, 맞춤법을 검사하고 교정하기 위해 LanguageTool을 사용할 예정입니다.
According to LanguageTool’s GIT, 에 따르면, “LanguageTool은 영어, 프랑스어, 독일어, 폴란드어, 러시아어 및 20개 이상의 다른 언어를 위한 오픈 소스 교정 소프트웨어입니다. 이는 단순한 맞춤법 검사기가 감지할 수 없는 많은 오류를 찾아냅니다.”
다음으로, pom.xml에 언어 도구에 대한 의존성(dependency)을 추가합니다:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
그 후, JLanguageTool 클래스의 정적 클래스 수준 변수인 langTool을 정의합니다. 그런 다음, AmericanEnglish 클래스의 객체로 langTool을 초기화합니다.
다음으로, 입력값으로 String text(일반 텍스트)를 받고 반환 타입도 String(맞춤법이 교정된 텍스트)인 SpellChecker라는 메서드를 코딩합니다. 검사되지 않은 텍스트를 매개변수로 하여 JLanguageTool의 check 메서드를 사용합니다. 그러면 RuleMatch 목록이 반환됩니다. JLanguageTool Java Docs에 따르면, RuleMatch 클래스는 “텍스트와 일치하는 오류 규칙 및 일치 위치에 대한 정보”를 제공합니다.
이어서 String 타입의 ‘result’, integer 타입의 ‘lastPos’, String 타입의 ‘tmp’라는 세 가지 변수를 정의합니다. 또한, 각 RuleMatch에 대해 도구에서 제안한 첫 번째 맞춤법으로 문장을 재구성합니다. 이와 함께 필요한 곳마다 적절한 try-catch 블록을 추가했습니다.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
감성 분석기: Stanford CoreNLP
Spark를 사용한 감성 분석의 다음 단계는 텍스트에서 감성을 찾는 것입니다. 이를 위해 Stanford’s Core NLP 라이브러리를 사용하여 감성 값을 찾고 있습니다. 그런 다음, 텍스트 내에서 감성을 찾기 위해 라이브러리를 구현할 ‘StanfordSentiment’라는 클래스를 생성합니다.
이를 위해 pom.xml 파일에 다음 의존성을 추가합니다:
<!– 이것은 stanford Core NLP 라이브러리입니다 –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– 이것은 stanford Core NLP’s 모델 파일입니다 –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Stanford Core NLP’s 파이프라인의 속성을 정의하는 정적 객체 변수 ‘props’를 생성합니다. 가능한 한 가볍게 만들기 위해 최소한의 속성만 선택했습니다. 그 후, annotators를 tokenize, ssplit, pos, parse, sentiment로 설정합니다. StanfordCoreNLP 클래스의 또 다른 정적 객체 변수인 ‘pipeline’을 생성합니다. 마지막으로 ‘props’ 속성으로 파이프라인을 초기화합니다.
GetSentiment:
입력은 String이고 출력은 Double인 GetSentiment 메서드를 생성했습니다. LanguageCheck.java 파일에서 생성한 CorrectSpell 메서드를 사용하고 있습니다. LanguageCheck 객체의 CorrectSpell 메서드는 입력된 트윗의 올바른 맞춤법을 반환합니다. 이 수정된 텍스트와 함께 StanfordCoreNLP의 annotate 메서드를 사용합니다. 이 라이브러리에서 제공하는 감성 값은 다음과 같습니다:
0 => 매우 부정적
1 => 부정적
2 => 중립적
3 => 긍정적
4 => 매우 긍정적
다음과 같은 새로운 감성 카테고리를 얻기 위해 문장당 결과에서 2를 뺍니다:
-2 => 매우 부정적
-1 => 부정적
0 => 중립적
1 => 긍정적
2 => 매우 긍정적
각 트윗의 감성 결과를 트윗 내 각 문장의 감성 평균으로 반환합니다. 트윗은 정형화된 형식으로 작성되지 않습니다. 따라서 트윗의 특정 줄에 다른 줄보다 더 높은 가중치를 부여할 수 없습니다. 그러므로 트윗의 각 줄이 동일한 중요성을 갖는다고 가정합니다. 트윗의 최종 감성 값을 가지는 Double 타입의 변수 ‘total’을 반환합니다.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
패키지 com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("트윗 텍스트: " + checkedText); System.out.println("감성 값: " + total); return total; } } |
프로그램 시작:
이제 이 두 클래스가 완료되었으므로, 이를 사용하기 위해 계속 진행하겠습니다. 따라서 새로운 클래스인 “TwitterDataFlow.java”를 생성합니다. 먼저, 전달된 입력 인수의 개수가 정확히 2개인 경우에만 프로그램을 실행하는 조건부 검사를 작성합니다. 인수의 개수가 2개가 아니면 잘못된 사용법 메시지를 출력하고 종료 상태 1로 종료합니다.
앱 이름을 Sentiment Analyzer로 하여 SparkSession을 빌드하고 있습니다. spark context의 hadoop configuration 속성인 “mapreduce input fileinputformat input dir recursive”를 true로 설정하고 있습니다. 이를 통해 폴더에서 재귀적으로 파일을 가져올 수 있습니다. String 클래스의 변수 ‘inputPath’를 생성하여 입력 인수와 Flume이 저장한 파티션된 데이터를 읽을 수 있도록 하는 ‘/*/*’를 설정하고 있습니다. Dataset<Row> ‘data’에서 Flume의 json 데이터를 읽고 있습니다.
그런 다음, String을 받아 StanfordSentiment의 GetSentiment 메서드를 적용하고 Double 값 데이터 타입을 반환하는 ‘Sentiment’라는 이름의 UDF(사용자 정의 함수)를 Spark SQL Context에 등록하고 있습니다.
현재 Flume으로부터 Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft, Facebook 키워드의 데이터를 가지고 있습니다. 따라서 이 키워드들로 String 리스트를 생성하고 있습니다. 각 회사에 대해 다음 작업을 실행하고 있습니다.
작업 흐름:
먼저, 결과를 저장할 outPath를 생성하고 있습니다. 데이터 세트 ‘data’ 위에 임시 뷰 ‘complete’를 생성하고 있습니다. 다음으로 데이터에서 timestamp, partitionBy(결과를 저장하는 동안 데이터를 파티션하기 위함), text, main_text(정규 표현식에 사용하기 위함), followers를 추출하고 있습니다.
결과 위에 임시 뷰를 생성하고 거기서 특정 회사의 데이터를 필터링하고 있습니다. 또한, 열 ‘seVal’에 감성 값을 반환하는 Sentiment UDF를 적용하고 있습니다. 직렬화된 데이터를 메모리와 디스크 스필로 유지(persist)하고 있습니다. 이 데이터로부터 팔로워 수와 해당 트윗의 감성 값의 곱인 NetSentiment를 얻고 있습니다. 이는 해당 트윗이 가질 수 있는 영향력을 파악하는 데 도움이 됩니다. 이에 대해 서로 다른 공식을 적용할 수 있습니다.
감성 분석은 계산 집약적인 작업이므로 전체 결과가 저장되기를 원하기 때문에 직렬화된 데이터를 메모리와 디스크에 유지(persist)하고 있습니다. 이를 유지하지 않고 NetSentiment나 영향력을 계산하기 위해 여러 공식을 사용할 계획이라면, Sentiment 메서드의 값을 여러 번 사용하는 이전 쿼리에서 트윗의 감성 분석을 여러 번 수행하게 됩니다.
데이터 그룹화:
이제 데이터를 timestamp 및 partitionBy 열로 그룹화하고 이 그룹화로 NetSentiment의 평균을 구합니다. 이를 통해 특정 분에 해당 회사의 긍정적 또는 부정적인 평균 영향력을 얻을 수 있습니다. 결과적으로 각 회사의 결과를 partitionBy 열로 분할하여 outPath에 기록합니다.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("인수의 개수가 올바르지 않습니다! \n"); System.out.println( "사용법: Input Output \n " + "Input: 분할된 데이터를 읽어올 위치" + "Output: 최종 결과를 저장할 위치"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1은 TimeStamp, partitionBy (Date), 트윗 텍스트, 소문자로 변환된 트윗 텍스트 및 // 트윗을 작성한 사용자의 followers_count를 추출합니다. Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // 특정 회사 이름이 포함된 트윗 필터링 Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // twitter라는 이름의 뷰 생성 tmp2.createOrReplaceTempView("twitter"); // tmp3에는 선택된 전체 데이터와 트윗의 // 감성(Sentiment) 값이 포함됩니다. Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // 또 다른 뷰 생성 tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // 데이터를 저장하기 위한 최종 뷰 생성 net.createOrReplaceTempView("final"); // 데이터를 분 단위로 그룹화하여 감성 값의 평균 계산 query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // 결과를 result 데이터 세트에 저장 result = spark.sql(query); // 결과를 디스크에 쓰기 result.write().partitionBy("partitionBy").json(outPath); } } } |
또한, 코드가 완료된 후 모든 종속성이 포함된 실행 가능한 jar를 내보내고 이 작업을 실행하려는 서버로 복사합니다. 다음으로, 아래 명령을 사용하여 제출할 수 있습니다.
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
여기서 (Source):
--master: 클러스터의 마스터 URL (예:spark://23.195.26.187:7077)
--deploy-mode: 드라이버를 워커 노드에 배포할지 (cluster) 아니면 로컬에 외부 클라이언트로 배포할지 (client) (기본값:client)application-jar: 애플리케이션과 모든 종속성을 포함하는 번들 jar의 경로입니다. URL은 클러스터 내부에서 전역적으로 표시되어야 합니다. 예를 들어, 모든 노드에 존재하는hdfs://경로 또는file://경로여야 합니다.application-arguments: 메인 클래스의 메인 메서드로 전달되는 인수(있는 경우)입니다. 여기서는 입력 및 출력 경로입니다.
Final Steps:
최종 단계:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
저는 이 애플리케이션을 5개 노드 HDP 클러스터가 있는 CloudSigma에 배포했습니다. 구체적으로 각 노드는 다음과 같은 구성을 가집니다:
256 GB SSD
16 GB RAM
20 GHz CPU
대체로 Spark를 사용한 감성 분석 결과를 약 19시간 만에 얻을 수 있었습니다. 또한, 80GB 이상의 데이터 세트에 대해 프로그램보다 더 고급 계산을 포함했습니다.
코드는 다음에서 찾을 수 있습니다: GITHUB.
댓글
아직 댓글이 없습니다. 첫 번째로 작성해 보세요.