Στην προηγούμενη ανάρτησή μας, βρήκα έναν τρόπο να εξάγω δεδομένα Twitter σε πραγματικό χρόνο χρησιμοποιώντας Apache Flume. Αυτήν τη στιγμή, έχω λάβει πολλά δεδομένα από το Twitter. Επομένως, θα ήθελα να τα αναλύσω και να βρω κάποιες τάσεις από αυτά. Για να πραγματοποιήσω ανάλυση συναισθήματος των δεδομένων του Twitter, πρόκειται να χρησιμοποιήσω ένα άλλο εργαλείο Big Data, Apache Spark.
Σύμφωνα με το Hortonworks, “Το Apache Spark είναι μια γρήγορη μηχανή επεξεργασίας δεδομένων στη μνήμη με κομψά και εκφραστικά API ανάπτυξης που επιτρέπουν στους εργαζόμενους δεδομένων να εκτελούν αποτελεσματικά φόρτους εργασίας ροής, μηχανικής μάθησης ή SQL που απαιτούν γρήγορη επαναληπτική πρόσβαση σε σύνολα δεδομένων. Με το Spark να εκτελείται στο Apache Hadoop YARN, οι προγραμματιστές παντού μπορούν πλέον να δημιουργούν εφαρμογές για να εκμεταλλευτούν τη δύναμη του Spark, να αντλήσουν πληροφορίες και να εμπλουτίσουν τους φόρτους εργασίας της επιστήμης δεδομένων τους μέσα σε ένα ενιαίο, κοινόχρηστο σύνολο δεδομένων στο Hadoop.”
Ένα πρόγραμμα Spark μπορεί να γραφτεί σε JAVA, Scala, Python ή R. Σε αυτήν την περίπτωση, θα χρησιμοποιήσουμε JAVA μαζί με Maven. Επιπλέον, το Spark συνοδεύεται τόσο από τη διανομή HDP όσο και από τη διανομή Cloudera. Το Spark 2 είναι η τρέχουσα έκδοση που χρησιμοποιείται.
Για να πραγματοποιήσω την ανάλυση συναισθήματος με το Spark, δημιουργώ ένα νέο έργο Maven. Το ονομάζω Twitter Sentiment Analyzer’. Στη συνέχεια, δημιουργώ μια κλάση, “TwitterDataFlow.java” στην οποία θα υλοποιήσω όλες τις απαιτούμενες μεθόδους.
Language Tool: Διορθωτής Ορθογραφίας
Αρχικά, στο POC, διαπίστωσα ότι εάν η ορθογραφία στα tweets είναι λανθασμένη, τα αποτελέσματα της Ανάλυσης Συναισθήματος επηρεάζονται αρνητικά. Επομένως, εισάγω έναν SpellChecker. Θα μας βοηθήσει να διορθώσουμε την ορθογραφία των tweets πριν τα χρησιμοποιήσουμε για την Ανάλυση Συναισθήματος.
Πρόκειται να το κάνω αυτό δημιουργώντας μια νέα στατική μέθοδο με όνομα ‘CorrectSpell’. Επιπλέον, πρόκειται να χρησιμοποιήσω ένα LanguageTool για να ελέγξω τις ορθογραφίες και να τις διορθώσω.
Σύμφωνα με το GIT του LanguageTool, “Το LanguageTool είναι ένα λογισμικό διόρθωσης κειμένου Ανοιχτού Κώδικα για Αγγλικά, Γαλλικά, Γερμανικά, Πολωνικά, Ρωσικά και περισσότερες από 20 άλλες γλώσσες. Εντοπίζει πολλά σφάλματα που ένας απλός ορθογραφικός έλεγχος δεν μπορεί να ανιχνεύσει.”
Στη συνέχεια, προσθέτω μια εξάρτηση για το εργαλείο γλώσσας στο pom.xml:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
Μετά από αυτό, ορίζω μια στατική μεταβλητή επιπέδου κλάσης langTool της κλάσης JLanguageTool. Στη συνέχεια, αρχικοποιώ το langTool με ένα αντικείμενο της κλάσης AmericanEnglish.
Στη συνέχεια, κωδικοποιώ τη μέθοδο με όνομα SpellChecker με είσοδο ως String text (κανονικό κείμενο) και τύπο επιστροφής επίσης ως String (Κείμενο με Σωστή Ορθογραφία). Χρησιμοποιώ τη μέθοδο check του JLanguageTool με παράμετρο το μη ελεγμένο κείμενο. Στη συνέχεια, επιστρέφει μια λίστα από RuleMatch. Σύμφωνα με τα Java Docs του JLanguageTool, η κλάση RuleMatch παρέχει “πληροφορίες σχετικά με έναν κανόνα σφάλματος που ταιριάζει με το κείμενο και τη θέση της αντιστοίχισης.”
Μετά από αυτό, ορίζω τρεις μεταβλητές, ‘result’ τύπου String, ‘lastPos’ τύπου integer, ‘tmp’ τύπου String. Επιπλέον, με κάθε RuleMatch, αναδημιουργώ την πρόταση με την πρώτη προτεινόμενη ορθογραφία από το εργαλείο. Με αυτό, έχω προσθέσει τα απαραίτητα μπλοκ try-catch όπου απαιτείται.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
Αναλυτής Συναισθήματος: Stanford CoreNLP
Το επόμενο βήμα στην ανάλυση συναισθήματος με το Spark είναι η εύρεση συναισθημάτων από το κείμενο. Για να το κάνω αυτό, χρησιμοποιώ τη βιβλιοθήκη Core NLP του Stanford’s για να βρω τιμές συναισθήματος. Στη συνέχεια, δημιουργώ μια κλάση με όνομα ‘StanfordSentiment’ όπου πρόκειται να υλοποιήσω τη βιβλιοθήκη για να βρω τα συναισθήματα μέσα στο κείμενό μας.
Για να το κάνω αυτό, προσθέτω τις ακόλουθες εξαρτήσεις στο αρχείο pom.xml:
<!– Αυτή είναι η βιβλιοθήκη Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– Αυτό είναι το αρχείο μοντέλων του Stanford Core NLP’s –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Δημιουργώ μια στατική μεταβλητή αντικειμένου, ‘props’, η οποία ορίζει ιδιότητες για τη διοχέτευση (pipeline) του Stanford Core NLP’s. Έχω επιλέξει τις ελάχιστες ιδιότητες για να την κάνω όσο το δυνατόν πιο ελαφριά. Μετά από αυτό, ορίζω τους σχολιαστές (annotators) σε tokenize, ssplit, pos, parse, sentiment. Δημιουργώ μια άλλη στατική μεταβλητή αντικειμένου, ‘pipeline’ της κλάσης StanfordCoreNLP. Τέλος, αρχικοποιώ τη διοχέτευση με τις ιδιότητες ‘props’.
GetSentiment:
Δημιούργησα μια μέθοδο, GetSentiment με είσοδο ως String και έξοδο ως Double. Χρησιμοποιώ τη μέθοδο CorrectSpell που δημιούργησα στο αρχείο LanguageCheck.java. Η μέθοδος CorrectSpell του αντικειμένου LanguageCheck μου επιστρέφει τη σωστή ορθογραφία του tweet που εισήχθη. Χρησιμοποιώ τη μέθοδο annotate του StanfordCoreNLP με αυτό το διορθωμένο κείμενο. Οι τιμές συναισθήματος που δίνονται από αυτήν τη βιβλιοθήκη είναι:
0 => πολύ αρνητικό
1 => αρνητικό
2 => ουδέτερο
3 => θετικό
4 => πολύ θετικό
Αφαιρώ 2 από το αποτέλεσμα ανά πρόταση για να λάβω τις ακόλουθες νέες κατηγορίες συναισθήματος (Sentiment):
-2 => πολύ αρνητικό
-1 => αρνητικό
0 => ουδέτερο
1 => θετικό
2 => πολύ θετικό
Επιστρέφω το αποτέλεσμα του συναισθήματος κάθε tweet ως τον μέσο όρο του συναισθήματος κάθε πρότασης του tweet. Τα tweets δεν είναι γραμμένα σε κάποια δομημένη μορφή. Επομένως, δεν μπορώ να εκχωρήσω σε κάποια συγκεκριμένη γραμμή ενός tweet μεγαλύτερη βαρύτητα από τις άλλες. Συνεπώς, υποθέτω ότι κάθε γραμμή σε ένα tweet έχει την ίδια σημασία. Επιστρέφω τη μεταβλητή ‘total’ τύπου Double, η οποία περιέχει την τελική τιμή συναισθήματος του tweet.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
πακέτο com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Κείμενο Tweet: " + checkedText); System.out.println("Τιμή Συναισθήματος: " + total); return total; } } |
Έναρξη Προγράμματος:
Τώρα που αυτές οι δύο κλάσεις έχουν ολοκληρωθεί, θα προχωρήσουμε στη χρήση τους. Έτσι, δημιουργώ μια νέα κλάση, “TwitterDataFlow.java”. Αρχικά, γράφω έναν έλεγχο υπό συνθήκη που θα εκτελούσε το πρόγραμμα μόνο εάν ο αριθμός των ορισμάτων εισόδου που μεταβιβάζονται είναι ακριβώς 2. Εάν ο αριθμός των ορισμάτων δεν είναι ίσος με 2, εκτυπώνει το μήνυμα εσφαλμένης χρήσης και επίσης τερματίζει με κατάσταση εξόδου 1.
Δημιουργώ ένα SparkSession με όνομα εφαρμογής Sentiment Analyzer. Ορίζω την ιδιότητα της παραμετροποίησης hadoop του spark context, “mapreduce input fileinputformat input dir recursive” ως true. Αυτό θα μου επιτρέψει να ανακτώ αρχεία αναδρομικά από φακέλους. Δημιουργώ μια μεταβλητή, ‘inputPath’ της κλάσης String στην οποία ορίζω το όρισμα εισόδου καθώς και το ‘/*/*’ που θα μου επιτρέψει να διαβάσω τα κατανεμημένα δεδομένα που αποθηκεύονται από το Flume. Διαβάζω τα δεδομένα json του Flume στο Dataset<Row> ‘data’.
Στη συνέχεια, καταχωρώ μια UDF (User Defined Function) στο Spark SQL Context, με το όνομα ‘Sentiment’, η οποία δέχεται ένα String και εφαρμόζει τη μέθοδο GetSentiment του StanfordSentiment σε αυτό και επιστρέφει τύπο δεδομένων τιμής Double.
Επί του παρόντος, έχω δεδομένα για τις λέξεις-κλειδιά Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft και Facebook από το flume. Έτσι, δημιουργώ μια λίστα String με αυτές τις λέξεις-κλειδιά. Για καθεμία από αυτές τις εταιρείες, εκτελώ τις ακόλουθες λειτουργίες.
Ροή λειτουργιών:
Αρχικά, δημιουργώ ένα outPath όπου θέλω να αποθηκεύσω τα αποτελέσματα. Δημιουργώ μια προσωρινή προβολή (temp view), ‘complete’ πάνω στο σύνολο δεδομένων, ‘data’. Στη συνέχεια, εξάγω τα timestamp, partitionBy (προκειμένου να διαμεριστούν τα δεδομένα κατά την αποθήκευση των αποτελεσμάτων), text, main_text (για χρήση σε κανονικές εκφράσεις), followers από τα δεδομένα.
Δημιουργώ μια προσωρινή προβολή (temp view) πάνω στα αποτελέσματα και φιλτράρω τα δεδομένα της συγκεκριμένης εταιρείας από αυτήν. Επίσης, εφαρμόζω το Sentiment UDF, το οποίο μου επιστρέφει τις τιμές συναισθήματος στη στήλη ‘seVal’. Διατηρώ (persist) τα σειριοποιημένα δεδομένα στη μνήμη και ως disk spill. Από αυτά τα δεδομένα, λαμβάνω το NetSentiment, το γινόμενο του αριθμού των ακολούθων (Number of Followers) και της τιμής συναισθήματος (Sentiment Value) αυτού του tweet. Αυτό βοηθά στη γνώση της επιρροής που μπορεί να έχει αυτό το tweet. Μπορώ να έχω διαφορετικούς τύπους για το ίδιο.
Διατηρούμε (persist) τα σειριοποιημένα δεδομένα στη μνήμη και στον δίσκο καθώς θέλουμε να αποθηκευτεί ολόκληρο το αποτέλεσμα, καθώς η ανάλυση συναισθήματος είναι μια υπολογιστικά βαριά εργασία. Εάν δεν το διατηρήσουμε αυτό και σχεδιάζουμε να χρησιμοποιήσουμε πολλαπλούς τύπους για τον υπολογισμό του NetSentiment ή της επιρροής, στο προηγούμενο ερώτημα όπου θα χρησιμοποιούσαμε την τιμή της μεθόδου Sentiment πολλαπλές φορές, θα έκανε Ανάλυση Συναισθήματος του tweet πολλαπλές φορές.
Ομαδοποίηση δεδομένων:
Τώρα, ομαδοποιώ τα δεδομένα ανά timestamp και στήλη partitionBy και υπολογίζω τον μέσο όρο του NetSentiment με αυτήν την ομαδοποίηση. Αυτό μου δίνει τη μέση επιρροή της εταιρείας, θετική ή αρνητική σε ένα συγκεκριμένο λεπτό. Κατά συνέπεια, γράφω τα αποτελέσματα για κάθε εταιρεία στο outPath, διαχωρίζοντάς τα ανά στήλη partitionBy.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("Μη έγκυρος αριθμός ορισμάτων! \n"); System.out.println( "Χρήση: Είσοδος Έξοδος \n " + "Είσοδος: Τοποθεσία από την οποία πρέπει να αναγνωστούν τα διαμερισμένα δεδομένα" + "Έξοδος: Πού πρέπει να αποθηκευτεί το τελικό αποτέλεσμα"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // Το tmp1 εξάγει το TimeStamp, το partitionBy (Ημερομηνία), το κείμενο του tweet, το κείμενο του tweet σε πεζά // γράμματα και το followers_count του χρήστη που κάνει το tweet Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Φιλτράρισμα των tweets που περιέχουν συγκεκριμένα ονόματα εταιρειών Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // Δημιουργεί μια προβολή με το όνομα twitter tmp2.createOrReplaceTempView("twitter"); // Το tmp3 περιέχει όλα τα επιλεγμένα δεδομένα μαζί με την τιμή Sentiment των // tweets Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Δημιουργία άλλης μιας προβολής tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Δημιουργία μιας τελικής προβολής για την αποθήκευση των δεδομένων net.createOrReplaceTempView("final"); // Υπολογισμός του μέσου όρου των τιμών Sentiment ανά λεπτό ομαδοποιώντας τα δεδομένα σε αυτό query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Αποθήκευση του αποτελέσματος στο σύνολο δεδομένων result result = spark.sql(query); // Εγγραφή του αποτελέσματος στον δίσκο result.write().partitionBy("partitionBy").json(outPath); } } } |
Επίσης, μετά την ολοκλήρωση του κώδικα, εξάγω ένα εκτελέσιμο αρχείο jar με όλες τις εξαρτήσεις σε αυτό και το αντιγράφω στον διακομιστή, όπου θέλω να εκτελέσω αυτήν την εργασία. Στη συνέχεια, μπορώ να το υποβάλω χρησιμοποιώντας την ακόλουθη εντολή,
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
όπου (Πηγή):
--master: Το master URL για το cluster (π.χ.spark://23.195.26.187:7077)
--deploy-mode: Εάν θα γίνει ανάπτυξη (deploy) του driver σας στους κόμβους εργασίας (cluster) ή τοπικά ως εξωτερικός πελάτης (client) (προεπιλογή:client)application-jar: Διαδρομή προς ένα πακεταρισμένο jar που περιλαμβάνει την εφαρμογή σας και όλες τις εξαρτήσεις. Λάβετε υπόψη ότι η διεύθυνση URL πρέπει να είναι καθολικά ορατή μέσα στο cluster σας, για παράδειγμα, μια διαδρομήhdfs://ή μια διαδρομήfile://που υπάρχει σε όλους τους κόμβους.application-arguments: Ορίσματα που μεταβιβάζονται στην κύρια μέθοδο της κύριας κλάσης σας, εάν υπάρχουν. Εδώ, η διαδρομή εισόδου και εξόδου
Τελικά Βήματα:
Στη συνέχεια, θα λάβουμε τα αποτελέσματα από την ανάλυση συναισθήματος χρησιμοποιώντας το Spark από τη διαδρομή εξόδου. Για παράδειγμα, αυτό είναι ένα πιθανό αποτέλεσμα της apple:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
Ανέπτυξα αυτήν την εφαρμογή στο CloudSigma με ένα HDP cluster 5 κόμβων. Συγκεκριμένα, με κάθε κόμβο να έχει την ακόλουθη διαμόρφωση:
256 GB SSD
16 GB RAM
20 GHz CPU
Συνολικά, κατάφερα να λάβω αποτελέσματα από την ανάλυση συναισθήματος χρησιμοποιώντας το Spark σε περίπου 19 ώρες. Επιπλέον, συμπεριέλαβα πιο προηγμένους υπολογισμούς από το πρόγραμμα σε ένα σύνολο δεδομένων άνω των 80 GB.
Ο κώδικας βρίσκεται στο GITHUB.
Σχόλια
Δεν υπάρχουν σχόλια ακόμα. Γράψτε το πρώτο.