U našem prethodnom postu, razradio sam način za izdvajanje Twitter podataka u stvarnom vremenu koristeći Apache Flume. Trenutno imam mnogo podataka s Twittera. Stoga bih ih želio analizirati i pronaći neke trendove u njima. Kako bih proveo analizu sentimenta podataka s Twittera, koristit ću još jedan Big Data alat, Apache Spark.
Prema Hortonworks, “Apache Spark je brz pokretač za obradu podataka u memoriji s elegantnim i izražajnim razvojnim API-jima koji stručnjacima za podatke omogućuju učinkovito izvršavanje strujanja, strojnog učenja ili SQL radnih opterećenja koja zahtijevaju brz iterativni pristup skupovima podataka. Uz Spark koji se izvodi na Apache Hadoop YARN-u, razvojni programeri posvuda sada mogu stvarati aplikacije kako bi iskoristili snagu Sparka, izvukli uvide i obogatili svoja radna opterećenja znanosti o podacima unutar jednog, zajedničkog skupa podataka u Hadoopu.”
Spark program može se napisati u JAVI, Scali, Pythonu ili R-u. U ovom slučaju koristit ćemo JAVU zajedno s Maven. Osim toga, Spark dolazi s HDP i Cloudera distribucijom. Spark 2 je trenutna verzija koja se koristi.
Kako bih proveo analizu sentimenta pomoću Sparka, kreiram novi Maven projekt. Nazivam ga Twitter Sentiment Analyzer’. Zatim kreiram klasu “TwitterDataFlow.java” u kojoj ću implementirati sve potrebne metode.
Language Tool: Korektor pravopisa
U početku sam u POC-u otkrio da ako je pravopis u tvitovima pogrešan, to negativno utječe na rezultate analize sentimenta. Stoga uvodim SpellChecker. On će nam pomoći ispraviti pravopis u tvitovima prije nego što ih upotrijebimo za analizu sentimenta.
To ću učiniti stvaranjem nove statičke metode pod nazivom ‘CorrectSpell’. Nadalje, koristit ću LanguageTool kako bih provjerio i ispravio pravopis.
Prema LanguageTool’ovom GIT-u, “LanguageTool je softver otvorenog koda za lekturu za engleski, francuski, njemački, poljski, ruski i više od 20 drugih jezika. Pronalazi mnoge pogreške koje jednostavna provjera pravopisa ne može otkriti.”
Zatim dodajem ovisnost za jezični alat u pom.xml:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
Nakon toga, definiram statičku varijablu na razini klase langTool klase JLanguageTool. Zatim inicijaliziram langTool s objektom klase AmericanEnglish.
Zatim kodiram metodu nazvanu SpellChecker s ulazom kao String text (normalan tekst) i povratnim tipom također kao String (tekst s ispravnim pravopisom). Koristim metodu check klase JLanguageTool s parametrom kao neprovjereni tekst. Zatim ona vraća popis RuleMatch. Prema JLanguageTool Java Docs, klasa RuleMatch pruža “informacije o pravilu pogreške koje se podudara s tekstom i položaju podudaranja.”
Nakon toga, definiram tri varijable, ‘result’ tipa String, ‘lastPos’ tipa integer, ‘tmp’ tipa String. Osim toga, uz svaki RuleMatch, ponovno kreiram rečenicu s prvim predloženim načinom pisanja iz alata. Time sam dodao potrebne try-catch blokove gdje god je to potrebno.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
paket com.CloudSigma.Spark.TwitterSentiment; uvoz java.util.Lista; uvoz org.languagetool.JLanguageTool; uvoz org.languagetool.jezik.AmericanEnglish; uvoz org.languagetool.pravila.RuleMatch; javno klasa LanguageCheck { statički JLanguageTool langTool = novi JLanguageTool(novi AmericanEnglish()); javno statički String CorrectSpell(String tekst) { String upit = tekst; pokušaj { Lista podudaranja = langTool.provjeri(upit); String rezultat = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : podudaranja) { try { tmp = ma.dohvatiPredloženeZamjene().dohvati(0); rezultat += upit.podniz(zadnjaPoz, ma.dohvatiOdPoz()); rezultat += privremeno; zadnjaPoz = ma.dohvatiDoPoz(); } uhvati (Iznimka e) { vrati tekst; } } ako (lastPos < upit.duljina()) { rezultat += upit.podniz(lastPos, upit.duljina()); } return rezultat; } catch (Exception e) { return tekst; } } } |
Analizator sentimenta: Stanford CoreNLP
Sljedeći korak u analizi sentimenta pomoću Sparka je pronalaženje sentimenta iz teksta. Kako bih to učinio, koristim Stanford’s Core NLP knjižnicu za pronalaženje vrijednosti sentimenta. Zatim kreiram klasu pod nazivom ‘StanfordSentiment’ u kojoj ću implementirati knjižnicu za pronalaženje sentimenta unutar našeg teksta.
Kako bih to učinio, dodajem sljedeće ovisnosti u pom.xml datoteku:
<!– Ovo je Stanford Core NLP knjižnica –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– Ovo je datoteka s modelima za stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Stvaram statičku varijablu objekta, ‘props’, koja definira svojstva za cjevovod za Stanford Core NLP. Odabrao sam minimalna svojstva kako bi bio što lakši. Nakon toga, postavljam anotatore na tokenize, ssplit, pos, parse, sentiment. Stvaram još jednu statičku varijablu objekta, ‘pipeline’ klase StanfordCoreNLP. Na kraju, inicijaliziram cjevovod sa svojstvima ‘props’.
GetSentiment:
Kreirao sam metodu GetSentiment s ulazom kao String i izlazom kao Double. Koristim metodu CorrectSpell koju sam kreirao u datoteci LanguageCheck.java. Metoda CorrectSpell objekta LanguageCheck vraća mi ispravan pravopis unesenog tweeta. Koristim metodu annotate biblioteke StanfordCoreNLP s ovim ispravljenim tekstom. Vrijednosti sentimenta koje daje ova biblioteka su:
0 => vrlo negativan
1 => negativan
2 => neutralan
3 => pozitivan
4 => vrlo pozitivan
Oduzimam 2 od rezultata po rečenici kako bih dobio sljedeće nove kategorije sentimenta:
-2 => vrlo negativan
-1 => negativno
0 => neutralno
1 => pozitivno
2 => vrlo pozitivno
Vraćam rezultat sentimenta svakog tvita kao prosjek sentimenta svake rečenice tvita. Tvitovi nisu napisani u nikakvom strukturiranom formatu. Stoga ne mogu dodijeliti veću težinu pojedinom retku tvita u odnosu na ostale. Zbog toga pretpostavljam da svaki redak u tvitu ima jednaku važnost. Vraćam varijablu ‘total’ tipa Double koja sadrži rezultirajuću vrijednost sentimenta tvita.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Tekst tweeta: " + checkedText); System.out.println("Vrijednost sentimenta: " + ukupno); povrat ukupno; } } |
Pokretanje programa:
Sada kada su ove dvije klase gotove, krenut ćemo dalje s njihovim korištenjem. Stoga kreiram novu klasu, “TwitterDataFlow.java”. Prvo pišem uvjetnu provjeru koja bi pokrenula program samo ako je broj proslijeđenih ulaznih argumenata točno 2. Ako broj argumenata nije jednak 2, ispisuje poruku o neispravnoj upotrebi i također završava s izlaznim statusom 1.
Gradim SparkSession s nazivom aplikacije Sentiment Analyzer. Postavljam svojstvo hadoop konfiguracije spark konteksta, “mapreduce input fileinputformat input dir recursive” na true. To bi mi omogućilo rekurzivno dohvaćanje datoteka iz mapa. Kreiram varijablu ‘inputPath’ klase String u kojoj postavljam ulazni argument kao i ‘/*/*’ što bi mi omogućilo čitanje particioniranih podataka koje je pohranio Flume. Čitam JSON podatke iz Flumea u Dataset<Row> ‘data’.
Zatim registriram UDF (korisnički definiranu funkciju) u Spark SQL Context, pod nazivom ‘Sentiment’ koja prima String, primjenjuje StanfordSentiment’s metodu GetSentiment nad njim i vraća Double tip podatka.
Trenutno imam podatke za ključne riječi Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft i Facebook iz flume. Zato kreiram popis Stringova s tim ključnim riječima. Za svaku od tih tvrtki pokrećem sljedeće operacije.
Tijek operacija:
Prvo kreiram outPath u koji želim spremiti rezultate. Kreiram privremeni prikaz, ‘complete’ nad skupom podataka, ‘data’. Zatim izdvajam timestamp, partitionBy (kako bih particionirao podatke tijekom pohranjivanja rezultata), text, main_text (za korištenje u regularnim izrazima), followers iz podataka.
Kreiram privremeni prikaz nad rezultatima i iz njega filtriram podatke određene tvrtke. Također, primjenjujem Sentiment UDF, koji mi vraća vrijednosti sentimenta u stupcu ‘seVal’. Pohranjujem serijalizirane podatke u memoriji i kao disk spill. Iz tih podataka dobivam NetSentiment, umnožak Number of Followers i Sentiment Value tog tvita. To pomaže u razumijevanju utjecaja koji taj tvit može imati. Za isto mogu imati različite formule.
Perzistiramo serijalizirane podatke u memoriji i na disku jer želimo da se pohrani cijeli rezultat, budući da je analiza sentimenta računalno zahtjevan zadatak. Ako to ne perzistiramo, a planiramo koristiti više formula za izračun NetSentiment ili utjecaja, u prethodnom upitu u kojem bismo više puta koristili vrijednost metode Sentiment, analiza sentimenta tweeta izvršavala bi se više puta.
Grupiranje podataka:
Sada grupiram podatke prema timestampu i stupcu partitionBy te računam prosjek za NetSentiment s ovim grupiranjem. To mi daje prosječni utjecaj tvrtke, pozitivan ili negativan, u određenoj minuti. Slijedom toga, zapisujem rezultate za svaku tvrtku u outPath, particionirajući ih prema stupcu partitionBy.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("Neispravan broj argumenata! \n"); System.out.println( "Upotreba: Ulaz Izlaz \n " + "Ulaz: Lokacija s koje je potrebno čitati particionirane podatke" + "Izlaz: Gdje se konačni rezultat treba pohraniti"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Analizator sentimenta").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + tvrtka; podaci.createOrReplaceTempView("complete"); // tmp1 izdvaja TimeStamp, partitionBy (datum), tekst tweeta, tekst tweeta u malim // slovima i followers_count korisnika koji tweeta Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Filtriranje tweetova koji sadrže određene nazive tvrtki Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + tvrtka + ")'"); // Kreira pogled pod nazivom twitter tmp2.createOrReplaceTempView("twitter"); // tmp3 sadrži cjelokupne odabrane podatke zajedno s vrijednošću Sentiment // tweetova Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Kreiranje još jednog pogleda tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Kreiranje konačnog pogleda za spremanje podataka net.createOrReplaceTempView("final"); // Izračunavanje prosjeka vrijednosti sentimenta po minuti grupiranjem podataka po tome query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Spremanje rezultata u skup podataka rezultata result = spark.sql(query); // Zapisivanje rezultata na disk result.write().partitionBy("partitionBy").json(outPath); } } } |
Također, nakon dovršetka koda, izvozim izvršni jar sa svim ovisnostima u njemu i kopiram ga na poslužitelj na kojem želim pokrenuti ovaj posao. Zatim ga mogu poslati pomoću sljedeće naredbe,
|
1 |
spark-podnošenje --master yarn --razmještaj-način klaster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/izlaz |
gdje (Izvor):
--master: master URL za klaster (npr.spark://23.195.26.187:7077)
--deploy-mode: Treba li razmjestiti vaš driver na radne čvorove (klaster) ili lokalno kao vanjski klijent (klijent) (zadano:klijent)application-jar: Put do pakirane jar datoteke koja uključuje vašu aplikaciju i sve ovisnosti. Uzmite u obzir da URL mora biti globalno vidljiv unutar vašeg klastera, na primjer,hdfs://putanja ilifile://putanja koja je prisutna na svim čvorovima.application-arguments: Argumenti koji se prosljeđuju glavnoj metodi vaše glavne klase, ako ih ima. Ovdje, ulazna i izlazna putanja
Završni koraci:
Zatim ćemo dobiti rezultate analize sentimenta pomoću Sparka iz izlazne putanje. Na primjer, ovo je mogući rezultat za apple:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
Implementirao sam ovu aplikaciju na CloudSigma s HDP klasterom od 5 čvorova. Točnije, svaki čvor ima sljedeću konfiguraciju:
256 GB SSD
16 GB RAM
20 GHz CPU
Sve u svemu, uspio sam dobiti rezultate analize sentimenta koristeći Spark za otprilike 19 sati. Nadalje, uključio sam naprednije izračune od programa nad skupom podataka od 80+ GB.
Kod se može pronaći na GITHUB.
Komentari
Još nema komentara. Budite prvi.