W naszym poprzednim poście, opracowałem sposób na wyodrębnianie danych z Twittera w czasie rzeczywistym za pomocą Apache Flume. Obecnie mam dużo danych z Twittera. Dlatego chciałbym je przeanalizować i znaleźć w nich pewne trendy. Aby przeprowadzić analizę sentymentu danych z Twittera, zamierzam użyć innego narzędzia Big Data, Apache Spark.
Według Hortonworks, “Apache Spark to szybki silnik przetwarzania danych w pamięci z eleganckimi i ekspresyjnymi interfejsami API programowania, które pozwalają pracownikom zajmującym się danymi na wydajne wykonywanie zadań strumieniowych, uczenia maszynowego lub zapytań SQL, które wymagają szybkiego, iteracyjnego dostępu do zbiorów danych. Dzięki uruchomieniu Spark na Apache Hadoop YARN, programiści na całym świecie mogą teraz tworzyć aplikacje wykorzystujące moc Spark, czerpać wnioski i wzbogacać swoje zadania z zakresu data science w ramach jednego, współdzielonego zbioru danych w Hadoop.”
Program w Sparku może być napisany w języku JAVA, Scala, Python lub R. W tym przypadku będziemy używać języka JAVA wraz z Maven. Ponadto Spark jest dostarczany zarówno z dystrybucją HDP, jak i Cloudera. Obecnie używaną wersją jest Spark 2.
Aby przeprowadzić analizę sentymentu za pomocą Sparka, tworzę nowy projekt Maven. Nazywam go Twitter Sentiment Analyzer’. Następnie tworzę klasę “TwitterDataFlow.java”, w której zaimplementuję wszystkie wymagane metody.
Narzędzie językowe: Korektor pisowni
Początkowo, w ramach POC, zauważyłem, że jeśli pisownia w tweetach jest błędna, wpływa to negatywnie na wyniki analizy sentymentu. Dlatego wprowadzam moduł sprawdzania pisowni (SpellChecker). Pomoże nam to poprawić pisownię tweetów przed użyciem ich do analizy sentymentu.
Zamierzam to zrobić poprzez utworzenie nowej metody statycznej o nazwie ‘CorrectSpell’. Ponadto zamierzam użyć narzędzia LanguageTool w celu sprawdzenia pisowni i jej poprawienia.
Według LanguageTool’s GIT, “LanguageTool to oprogramowanie open source do korekty tekstów w języku angielskim, francuskim, niemieckim, polskim, rosyjskim oraz ponad 20 innych językach. Znajduje ono wiele błędów, których zwykły moduł sprawdzania pisowni nie potrafi wykryć.”
Następnie dodaję zależność dla narzędzia językowego w pliku pom.xml:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
Następnie definiuję statyczną zmienną na poziomie klasy o nazwie langTool klasy JLanguageTool. Potem inicjalizuję langTool obiektem klasy AmericanEnglish.
Następnie koduję metodę o nazwie SpellChecker z parametrem wejściowym jako String text (zwykły tekst) oraz typem zwracanym również jako String (tekst z poprawioną pisownią). Używam metody check klasy JLanguageTool z parametrem jako niesprawdzony tekst. Następnie zwraca ona listę obiektów RuleMatch. Zgodnie z dokumentacją Java Docs dla JLanguageTool, klasa RuleMatch dostarcza “informacje o regule błędu pasującej do tekstu oraz o pozycji dopasowania.”
Następnie definiuję trzy zmienne: ‘result’ typu String, ‘lastPos’ typu integer oraz ‘tmp’ typu String. Ponadto przy każdym dopasowaniu RuleMatch odtwarzam zdanie z pierwszą sugerowaną pisownią z narzędzia. Wraz z tym dodałem niezbędne bloki try-catch wszędzie tam, gdzie było to wymagane.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
Analizator wydźwięku: Stanford CoreNLP
Kolejnym krokiem w analizie wydźwięku za pomocą Spark jest znalezienie wydźwięku w tekście. Aby to zrobić, używam biblioteki Stanford’s Core NLP do znalezienia wartości wydźwięku. Następnie tworzę klasę o nazwie ‘StanfordSentiment’, w której zaimplementuję tę bibliotekę do znalezienia wydźwięku w naszym tekście.
Aby to zrobić, dodaję następujące zależności w pliku pom.xml:
<!– To jest biblioteka Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– To jest plik modeli Stanford Core NLP’s –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Tworzę statyczną zmienną obiektową ‘props’, która definiuje właściwości dla potoku Stanford Core NLP’s. Wybrałem minimalne właściwości, aby uczynić go tak lekkim, jak to możliwe. Następnie ustawiam anotatory na tokenize, ssplit, pos, parse, sentiment. Tworzę kolejną statyczną zmienną obiektową ‘pipeline’ klasy StanfordCoreNLP. Na koniec inicjalizuję potok za pomocą właściwości ‘props’.
GetSentiment:
Utworzyłem metodę GetSentiment z parametrem wejściowym typu String i wyjściowym typu Double. Używam metody CorrectSpell, którą utworzyłem w pliku LanguageCheck.java. Metoda CorrectSpell obiektu LanguageCheck zwraca mi poprawną pisownię wprowadzonego tweeta. Używam metody annotate klasy StanfordCoreNLP z tym poprawionym tekstem. Wartości wydźwięku podawane przez tę bibliotekę to:
0 => bardzo negatywny
1 => negatywny
2 => neutralny
3 => pozytywny
4 => bardzo pozytywny
Odejmuję 2 od wyniku dla każdego zdania, aby uzyskać następujące nowe kategorie wydźwięku:
-2 => bardzo negatywny
-1 => negatywny
0 => neutralny
1 => pozytywny
2 => bardzo pozytywny
Zwracam wynik wydźwięku każdego tweeta jako średnią z wydźwięku każdego zdania w tweecie. Tweety nie są pisane w żadnym ustrukturyzowanym formacie. Dlatego nie mogę przypisać żadnej konkretnej linijce tweeta większej wagi niż innym. W związku z tym zakładam, że każda linijka w tweecie ma równe znaczenie. Zwracam zmienną ‘total’ typu Double, która zawiera wynikową wartość wydźwięku tweeta.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
pakiet com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Tekst tweeta: " + checkedText); System.out.println("Wartość sentymentu: " + total); return total; } } |
Inicjacja programu:
Teraz, gdy te dwie klasy są gotowe, przejdziemy do ich użycia. W tym celu tworzę nową klasę, “TwitterDataFlow.java”. Najpierw piszę warunek sprawdzający, który uruchomi program tylko wtedy, gdy liczba przekazanych argumentów wejściowych wynosi dokładnie 2. Jeśli liczba argumentów nie jest równa 2, program wypisuje komunikat o nieprawidłowym użyciu i kończy działanie ze statusem wyjścia 1.
Buduję obiekt SparkSession z nazwą aplikacji ustawioną na Sentiment Analyzer. Ustawiam właściwość konfiguracji hadoop kontekstu spark, “mapreduce input fileinputformat input dir recursive” na true. Pozwoli mi to na rekurencyjne pobieranie plików z folderów. Tworzę zmienną ‘inputPath’ klasy String, w której ustawiam argument wejściowy oraz ‘/*/*’, co pozwoli mi na odczytanie partycjonowanych danych zapisanych przez Flume. Odczytuję dane json z Flume do Dataset<Row> ‘data’.
Następnie rejestruję funkcję UDF (User Defined Function) w Spark SQL Context o nazwie ‘Sentiment’, która przyjmuje obiekt String, stosuje na nim metodę GetSentiment klasy StanfordSentiment i zwraca typ danych Double.
Obecnie posiadam dane dla słów kluczowych Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft i Facebook z flume. Tworzę więc listę obiektów String z tymi słowami kluczowymi. Dla każdej z tych firm wykonuję następujące operacje.
Przebieg operacji:
Najpierw tworzę ścieżkę outPath, w której chcę zapisać wyniki. Tworzę widok tymczasowy ‘complete’ na zbiorze danych ‘data’. Następnie wyciągam z danych timestamp, partitionBy (w celu partycjonowania danych podczas zapisywania wyników), text, main_text (do użycia w wyrażeniach regularnych) oraz followers.
Tworzę widok tymczasowy na wynikach i filtruję z niego dane dotyczące konkretnej firmy. Ponadto stosuję funkcję UDF Sentiment, która zwraca mi wartości sentymentu w kolumnie ‘seVal’. Utrwalam zserializowane dane w pamięci oraz jako zrzut na dysk (disk spill). Z tych danych uzyskuję NetSentiment, czyli iloczyn liczby obserwujących (Number of Followers) i wartości sentymentu (Sentiment Value) danego tweeta. Pomaga to w określeniu wpływu, jaki może mieć ten tweet. Mogę zastosować do tego różne wzory.
Utrwalamy zserializowane dane w pamięci i na dysku, ponieważ chcemy zapisać cały wynik, jako że analiza sentymentu jest zadaniem wymagającym obliczeniowo. Jeśli tego nie utrwalimy, a planujemy użyć wielu wzorów do obliczenia NetSentiment lub wpływu, w poprzednim zapytaniu, w którym wielokrotnie użylibyśmy wartości metody Sentiment, analiza sentymentu tego samego tweeta byłaby wykonywana wielokrotnie.
Grupowanie danych:
Teraz grupuję dane według kolumn timestamp i partitionBy oraz wyciągam średnią z NetSentiment dla tego grupowania. Daje mi to średni wpływ firmy, pozytywny lub negatywny, w danej minucie. W rezultacie zapisuję wyniki dla każdej firmy w outPath, partycjonując je według kolumny partitionBy.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("Nieprawidłowa liczba argumentów! \n"); System.out.println( "Użycie: Wejście Wyjście \n " + "Wejście: Lokalizacja, z której należy odczytać partycjonowane dane" + "Wyjście: Miejsce, w którym ma zostać zapisany wynik końcowy"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 wyodrębnia TimeStamp, partitionBy (Date), tekst tweeta, tekst tweeta pisany małymi // literami oraz followers_count użytkownika tweetującego Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Filtrowanie tweetów zawierających określone nazwy firm Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // Tworzy widok o nazwie twitter tmp2.createOrReplaceTempView("twitter"); // tmp3 zawiera wszystkie wybrane dane wraz z wartością Sentiment (sentymentu) // tweetów Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Tworzenie kolejnego widoku tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Tworzenie ostatecznego widoku do zapisu danych net.createOrReplaceTempView("final"); // Uśrednianie wartości sentymentu na minutę poprzez grupowanie danych według niej query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Zapisywanie wyniku w zbiorze danych result result = spark.sql(query); // Zapisywanie wyniku na dysku result.write().partitionBy("partitionBy").json(outPath); } } } |
Ponadto po zakończeniu pisania kodu eksportuję uruchamialny plik jar ze wszystkimi zależnościami i kopiuję go na serwer, na którym chcę uruchomić to zadanie. Następnie mogę je przesłać za pomocą następującego polecenia,
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
gdzie (Źródło):
--master: Główny adres URL dla klastra (np.spark://23.195.26.187:7077)
--deploy-mode: Czy wdrożyć sterownik na węzłach roboczych (cluster) czy lokalnie jako zewnętrzny klient (client) (domyślnie:client)application-jar: Ścieżka do spakowanego pliku jar zawierającego aplikację i wszystkie zależności. Należy wziąć pod uwagę, że adres URL musi być widoczny globalnie wewnątrz klastra, na przykład ścieżkahdfs://lub ścieżkafile://obecna na wszystkich węzłach.application-arguments: Argumenty przekazywane do metody głównej (main) klasy głównej, jeśli istnieją. W tym przypadku ścieżka wejściowa i wyjściowa
Kroki końcowe:
Następnie otrzymamy wyniki analizy sentymentu za pomocą platformy Spark ze ścieżki wyjściowej. Na przykład, oto możliwy wynik dla firmy apple:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
Wdrożyłem tę aplikację na platformie CloudSigma z 5-węzłowym klastrem HDP. Dokładniej, każdy węzeł miał następującą konfigurację:
256 GB SSD
16 GB RAM
20 GHz CPU
Podsumowując, udało mi się uzyskać wyniki analizy sentymentu przy użyciu platformy Spark w około 19 godzin. Ponadto uwzględniłem bardziej zaawansowane obliczenia niż sam program na zbiorze danych o rozmiarze ponad 80 GB.
Kod można znaleźć na GITHUB.
Komentarze
Brak komentarzy. Bądź pierwszy.