Az előző bejegyzésünkben kidolgoztam egy módszert a valós idejű Twitter-adatok kinyerésére az Apache Flume segítségével. Jelenleg rengeteg adatom van a Twitterről. Ezért szeretném elemezni őket, és trendeket keresni bennük. A Twitter-adatok érzelemelemzésének elvégzéséhez egy másik Big Data eszközt fogok használni, az Apache Spark.
-ot. A Hortonworks, szerint: “Az Apache Spark egy gyors, memórián belüli adatfeldolgozó motor elegáns és kifejező fejlesztési API-kkal, amelyek lehetővé teszik az adatkezelők számára a gyors, iteratív adathalmaz-hozzáférést igénylő streaming, gépi tanulási vagy SQL feladatok hatékony végrehajtását. A Spark Apache Hadoop YARN-on való futtatásával a fejlesztők mostantól bárhol létrehozhatnak olyan alkalmazásokat, amelyek kihasználják a Spark erejét, felismeréseket nyernek, és gazdagítják adat-tudományi munkáikat egyetlen, megosztott adathalmazon belül a Hadoopban.”
Egy Spark program megírható JAVA, Scala, Python vagy R nyelven. Ebben az esetben JAVA-t fogunk használni a Maven eszközzel együtt. Emellett a Spark mind a HDP, mind a Cloudera disztribúcióban megtalálható. Jelenleg a Spark 2-es verziót használják.
A Spark segítségével végzett érzelemelemzéshez egy új Maven projektet hozok létre. A „Twitter Sentiment Analyzer” nevet adom neki. Ezután létrehozok egy „TwitterDataFlow.java” osztályt, amelyben implementálom az összes szükséges metódust.
Nyelvi eszköz: Helyesírás-javító
Kezdetben, a POC (koncepcióigazolás) során azt tapasztaltam, hogy ha a tweetek helyesírása hibás, az hátrányosan befolyásolja az érzelemelemzés eredményeit. Ezért bevezetek egy SpellChecker-t (helyesírás-ellenőrzőt). Ez segít kijavítani a tweetek helyesírását, mielőtt felhasználnánk őket az érzelemelemzéshez.
Ezt egy új, „CorrectSpell” nevű statikus metódus létrehozásával fogom megtenni. Továbbá egy LanguageTool-t fogok használni a helyesírás ellenőrzésére és javítására.
A LanguageTool’s GIT, szerint: “A LanguageTool egy nyílt forráskódú szövegellenőrző szoftver angol, francia, német, lengyel, orosz és több mint 20 egyéb nyelvhez. Számos olyan hibát megtalál, amelyet egy egyszerű helyesírás-ellenőrző nem képes észlelni.”
Ezután hozzáadok egy függőséget a nyelvi eszközhöz a pom.xml fájlban:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
Ezt követően definiálok egy JLanguageTool osztályú, statikus osztályszintű langTool változót. Majd inicializálom a langTool-t egy AmericanEnglish osztályú objektummal.
Ezután megírom a SpellChecker nevű metódust, amelynek bemenete egy String text (normál szöveg), és a visszatérési típusa szintén String (helyesírás-javított szöveg). A JLanguageTool check metódusát használom, ahol a paraméter az ellenőrizetlen szöveg. Ezután ez egy RuleMatch listát ad vissza. A JLanguageTool Java dokumentációja szerint a RuleMatch osztály “információt nyújt a szövegre illeszkedő hibaszabályról és az illeszkedés pozíciójáról.”
Ezt követően három változót definiálok: egy String típusú „result”-ot, egy integer típusú „lastPos”-t és egy String típusú „tmp”-t. Emellett minden egyes RuleMatch esetén újraalkotom a mondatot az eszköz által javasolt első helyesírással. Ezzel együtt hozzáadtam a szükséges try-catch blokkokat is, ahol szükséges volt.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
Érzelemelemző: Stanford CoreNLP
A Spark segítségével végzett érzelemelemzés következő lépése az érzelmek kinyerése a szövegből. Ennek érdekében a Stanford’s Core NLP könyvtárat használom az érzelemértékek meghatározásához. Ezután létrehozok egy ‘StanfordSentiment’ nevű osztályt, amelyben implementálni fogom a könyvtárat a szövegünkben található érzelmek keresésére.
Ehhez a következő függőségeket adom hozzá a pom.xml fájlhoz:
<!– Ez a stanford Core NLP könyvtár –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– Ez a stanford Core NLP’s modellfájlja –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Létrehozok egy statikus objektumváltozót, a ‘props’-ot, amely meghatározza a Stanford Core NLP’s csővezetékének (pipeline) tulajdonságait. A minimális tulajdonságokat választottam ki, hogy a lehető legkönnyebb legyen. Ezután az annotátorokat a tokenize, ssplit, pos, parse, sentiment értékekre állítom be. Létrehozok egy másik statikus objektumváltozót, a StanfordCoreNLP osztályú ‘pipeline’-t. Végül inicializálom a csővezetéket a ‘props’ tulajdonságokkal.
GetSentiment:
Létrehoztam egy GetSentiment metódust, amelynek bemenete String, kimenete pedig Double. A LanguageCheck.java fájlban létrehozott CorrectSpell metódust használom. A LanguageCheck objektum CorrectSpell metódusa a beírt tweet helyes írásmódját adja vissza. A StanfordCoreNLP annotate metódusát használom ezzel a javított szöveggel. A könyvtár által visszaadott érzelemértékek a következők:
0 => nagyon negatív
1 => negatív
2 => semleges
3 => pozitív
4 => nagyon pozitív
Mondatonként kivonok 2-t az eredményből, hogy a következő új érzelemkategóriákat kapjam:
-2 => nagyon negatív
-1 => negatív
0 => semleges
1 => pozitív
2 => nagyon pozitív
Az egyes tweetek érzelmi eredményét a tweet egyes mondatainak átlagos érzelmi értékeként adom vissza. A tweetek nincsenek strukturált formátumban megírva. Ezért nem tudok a tweet egyetlen konkrét sorának sem nagyobb súlyt adni, mint a többinek. Ebből kifolyólag feltételezem, hogy a tweet minden sora egyenlő fontosságú. Visszaadom a Double típusú ‘total’ változót, amely a tweet eredő érzelemértékét tartalmazza.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
csomag com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Tweet szöveg: " + checkedText); System.out.println("Szentiment érték: " + total); return total; } } |
Program indítása:
Most, hogy ez a két osztály elkészült, továbblépünk a használatukra. Ezért létrehozok egy új osztályt, “TwitterDataFlow.java” néven. Először egy feltételes ellenőrzést írok, amely csak akkor futtatja a programot, ha a megadott bemeneti argumentumok száma pontosan 2. Ha az argumentumok száma nem egyenlő 2-vel, kiírja a helytelen használatra figyelmeztető üzenetet, és 1-es kilépési kóddal leáll.
Létrehozok egy SparkSession-t, amelynek az alkalmazásneve Sentiment Analyzer. Beállítom a spark context hadoop konfigurációjának “mapreduce input fileinputformat input dir recursive” tulajdonságát true-ra. Ez lehetővé teszi a fájlok rekurzív lekérését a mappákból. Létrehozok egy String osztályú ‘inputPath’ változót, amelyben beállítom a bemeneti argumentumot, valamint a ‘/*/*’ értéket, ami lehetővé teszi a Flume által tárolt particionált adatok beolvasását. Beolvasom a Flume json adatait a Dataset<Row> ‘data’ változóba.
Ezután regisztrálok egy UDF-et (felhasználó által definiált függvényt) a Spark SQL Context-ben ‘Sentiment’ néven, amely egy Stringet fogad, alkalmazza rá a StanfordSentiment GetSentiment metódusát, és Double típusú értéket ad vissza.
Jelenleg az Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft és Facebook kulcsszavak adatai állnak rendelkezésemre a flume-ból. Ezért létrehozok egy String listát ezekkel a kulcsszavakkal. Mindegyik vállalat esetében a következő műveleteket futtatom.
Műveleti folyamat:
Először létrehozok egy outPath-t, ahová az eredményeket menteni szeretném. Létrehozok egy ‘complete’ ideiglenes nézetet (temp view) a ‘data’ adatkészleten. Ezután kinyerem az adatból az időbélyeget (timestamp), a partitionBy-t (az adatok particionálásához az eredmények mentése során), a text-et, a main_text-et (reguláris kifejezésekhez való használatra) és a követőket (followers).
Létrehozok egy ideiglenes nézetet az eredményeken, és kiszűröm belőle az adott vállalat adatait. Továbbá alkalmazom a Sentiment UDF-et, amely a szentiment értékeket adja vissza a ‘seVal’ oszlopban. A szerializált adatokat a memóriában és lemezre kiírva (disk spill) is megőrzöm (persist). Ebből az adatból megkapom a NetSentiment-et, ami a követők számának (Number of Followers) és az adott tweet szentiment értékének (Sentiment Value) a szorzata. Ez segít megismerni a tweet lehetséges hatását (influence). Erre különböző képleteket is használhatok.
A szerializált adatokat a memóriában és a lemezen is megőrizzük (persist), mivel azt szeretnénk, hogy a teljes eredmény tárolva legyen, ugyanis a szentimentelemzés számításigényes feladat. Ha ezt nem tennénk meg, és több képletet terveznénk használni a NetSentiment vagy a hatás kiszámítására, akkor az előző lekérdezésben, ahol a Sentiment metódus értékét többször is felhasználnánk, a tweet szentimentelemzése többször is lefutna.
Adatcsoportosítás:
Most a timestamp és a partitionBy oszlop szerint csoportosítom az adatokat, és átlagolom a NetSentiment értéket ezzel a csoportosítással. Ez megadja a vállalat átlagos befolyását, legyen az pozitív vagy negatív egy adott percben. Következésképpen az egyes vállalatok eredményeit az outPath-ba írom, a partitionBy oszlop szerint particionálva.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("Helytelen számú argumentum! \n"); System.out.println( "Használat: Bemenet Kimenet \n " + "Bemenet: Az a hely, ahonnan a particionált adatokat be kell olvasni" + "Kimenet: Ahol a végső eredményt tárolni kell"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 extracts TimeStamp, partitionBy (Date), tweet text, tweet text in lower // case and followers_count of the user tweeting Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Filtering tweets having certain company names in it Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // Creates a view named twitter tmp2.createOrReplaceTempView("twitter"); // tmp3 contains the entire selected data along with the Sentiment value of the // tweets Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Creating another view tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Creating a final view to save the data net.createOrReplaceTempView("final"); // Averaging the Sentiment Values per minute by grouping the data onto it query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Saving the result in result dataset result = spark.sql(query); // Writing the result onto the disk result.write().partitionBy("partitionBy").json(outPath); } } } |
Emellett a kód befejezése után exportálok egy futtatható jar fájlt az összes függőséggel együtt, és átmásolom a szerverre, ahol futtatni szeretném ezt a feladatot. Ezután a következő paranccsal küldhetem be,
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
ahol (Forrás):
--master: A master URL a fürthöz (pl.spark://23.195.26.187:7077)
--deploy-mode: Azt határozza meg, hogy a drivert a munkás csomópontokon (cluster) vagy helyileg, külső kliensként (client) fusson-e (alapértelmezett:client)application-jar: Az alkalmazást és az összes függőséget tartalmazó, becsomagolt jar fájl elérési útja. Vegye figyelembe, hogy az URL-nek globálisan láthatónak kell lennie a fürtön belül, például egyhdfs://elérési út vagy egyfile://elérési út, amely minden csomóponton jelen van.application-arguments: A fő osztály main metódusának átadott argumentumok, ha vannak. Itt a bemeneti és kimeneti elérési út
Utolsó lépések:
Ezután megkapjuk a Spark segítségével végzett érzelemelemzés eredményeit a kimeneti útvonalról. Például ez az apple egy lehetséges eredménye:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
Ezt az alkalmazást a CloudSigma-n telepítettem egy 5 csomópontos HDP fürttel. Pontosabban, az egyes csomópontok a következő konfigurációval rendelkeznek:
256 GB SSD
16 GB RAM
20 GHz CPU
Összességében körülbelül 19 óra alatt sikerült eredményeket kapnom a Spark segítségével végzett érzelemelemzésből. Ezenkívül a programnál fejlettebb számításokat is beépítettem egy több mint 80 GB-os adatkészleten.
A kód megtalálható a GITHUB.
Hozzászólások
Még nincsenek hozzászólások. Legyen Ön az első.