Nel nostro post precedente, ho trovato un modo per estrarre dati di Twitter in tempo reale utilizzando Apache Flume. Attualmente, ho ottenuto molti dati da Twitter. Pertanto, vorrei analizzarli e trovarvi alcune tendenze. Al fine di eseguire l'analisi del sentiment dei dati di Twitter, utilizzerò un altro strumento Big Data, Apache Spark.
Secondo Hortonworks, “Apache Spark è un motore di elaborazione dati in-memory veloce, con API di sviluppo eleganti ed espressive per consentire ai professionisti dei dati di eseguire in modo efficiente carichi di lavoro di streaming, machine learning o SQL che richiedono un accesso iterativo rapido ai dataset. Con Spark in esecuzione su Apache Hadoop YARN, gli sviluppatori di tutto il mondo possono ora creare applicazioni per sfruttare la potenza di Spark, trarre informazioni e arricchire i propri carichi di lavoro di data science all'interno di un unico dataset condiviso in Hadoop.”
Un programma Spark può essere scritto in JAVA, Scala, Python o R. In questo caso, utilizzeremo JAVA insieme a Maven. Inoltre, Spark viene fornito sia con la distribuzione HDP che con quella Cloudera. Spark 2 è la versione attualmente utilizzata.
Al fine di eseguire l'analisi del sentiment con Spark, sto creando un nuovo progetto Maven. Lo sto chiamando Twitter Sentiment Analyzer’. Successivamente, sto creando una classe, “TwitterDataFlow.java” nella quale implementerò tutti i metodi richiesti.
Language Tool: correttore ortografico
Inizialmente, nel POC, ho riscontrato che se l'ortografia nei tweet è errata, i risultati della Sentiment Analysis ne risentono negativamente. Pertanto, sto introducendo uno SpellChecker. Ci aiuterà a correggere l'ortografia dei tweet prima di utilizzarli per la Sentiment Analysis.
Farò questo creando un nuovo metodo statico chiamato ‘CorrectSpell’. Inoltre, utilizzerò un LanguageTool per controllare le ortografie e correggerle.
Secondo il GIT di LanguageTool’, “LanguageTool è un software di correzione di bozze Open Source per inglese, francese, tedesco, polacco, russo e più di altre 20 lingue. Trova molti errori che un semplice correttore ortografico non è in grado di rilevare.”
Successivamente, sto aggiungendo una dipendenza per lo strumento linguistico in pom.xml:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
Dopodiché, definisco una variabile statica a livello di classe langTool della classe JLanguageTool. Quindi, inizializzo langTool con un oggetto della classe AmericanEnglish.
Successivamente, sto codificando il metodo chiamato SpellChecker con input come String text (testo normale) e anche tipo di ritorno come String (testo con ortografia corretta). Sto utilizzando il metodo check di JLanguageTool con il parametro come testo non controllato. Successivamente, restituisce un elenco di RuleMatch. Secondo i Java Doc di JLanguageTool, la classe RuleMatch fornisce “informazioni su una regola di errore che corrisponde al testo e sulla posizione della corrispondenza.”
In seguito a ciò, definisco tre variabili, ‘result’ di tipo String, ‘lastPos’ di tipo intero, ‘tmp’ di tipo String. Inoltre, con ogni RuleMatch, sto ricreando la frase con la prima ortografia suggerita dallo strumento. Con questo, ho aggiunto i blocchi try-catch necessari ovunque richiesto.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
Analizzatore di sentiment: Stanford CoreNLP
Il passo successivo nell'analisi del sentiment con Spark consiste nel trovare i sentiment dal testo. Per fare questo, sto utilizzando la libreria Core NLP di Stanford per trovare i valori di sentiment. Successivamente, sto creando una classe chiamata ‘StanfordSentiment’ in cui implementerò la libreria per trovare i sentiment all'interno del nostro testo.
Per fare questo, sto aggiungendo le seguenti dipendenze nel file pom.xml:
<!– Questa è la libreria Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– Questo è il file dei modelli di Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Sto creando una variabile oggetto statica, ‘props’, che definisce le proprietà per la pipeline di Stanford Core NLP. Ho selezionato le proprietà minime per renderla il più leggera possibile. Successivamente, sto impostando gli annotatori su tokenize, ssplit, pos, parse, sentiment. Sto creando un'altra variabile oggetto statica, ‘pipeline’ della classe StanfordCoreNLP. Infine, inizializzo la pipeline con le proprietà ‘props’.
GetSentiment:
Ho creato un metodo, GetSentiment, con input di tipo String e output di tipo Double. Sto utilizzando il metodo CorrectSpell che ho creato nel file LanguageCheck.java. Il metodo CorrectSpell dell'oggetto LanguageCheck mi restituisce l'ortografia corretta del tweet inserito. Utilizzo il metodo annotate di StanfordCoreNLP con questo testo corretto. I valori di sentiment forniti da questa libreria sono:
0 => molto negativo
1 => negativo
2 => neutrale
3 => positivo
4 => molto positivo
Sto sottraendo 2 al risultato per frase per ottenere le seguenti nuove categorie di sentiment:
-2 => molto negativo
-1 => negativo
0 => neutrale
1 => positivo
2 => molto positivo
Sto restituendo il risultato del sentiment di ciascun tweet come media del sentiment di ogni frase del tweet. I tweet non sono scritti in alcun formato strutturato. Di conseguenza, non posso attribuire a una riga specifica di un tweet un peso maggiore rispetto alle altre. Pertanto, assumo che ogni riga in un tweet abbia la stessa importanza. Restituisco la variabile ‘total’ di tipo Double che contiene il valore di sentiment risultante del tweet.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Testo del Tweet: " + checkedText); System.out.println("Valore del Sentiment: " + total); return total; } } |
Inizializzazione del programma:
Ora che queste due classi sono pronte, procederemo a utilizzarle. Pertanto, sto creando una nuova classe, “TwitterDataFlow.java”. Per prima cosa, sto scrivendo un controllo condizionale che eseguirà il programma solo se il numero di argomenti di input passati è esattamente 2. Se il numero di argomenti non è uguale a 2, stampa il messaggio di utilizzo errato ed esce con uno stato di uscita 1.
Sto creando una SparkSession con il nome dell'applicazione impostato su Sentiment Analyzer. Sto impostando la proprietà della configurazione hadoop del contesto spark, “mapreduce input fileinputformat input dir recursive” su true. Questo mi consentirà di recuperare i file in modo ricorsivo dalle cartelle. Sto creando una variabile, ‘inputPath’ della classe String, in cui sto impostando l'argomento di input e ‘/*/*’ che mi consentirà di leggere i dati partizionati memorizzati da Flume. Sto leggendo i dati json di Flume in Dataset<Row> ‘data’.
Successivamente, sto registrando una UDF (User Defined Function) con Spark SQL Context, denominata ‘Sentiment’, che accetta una String e applica ad essa il metodo GetSentiment di StanfordSentiment, restituendo un tipo di dati con valore Double.
Attualmente, ho i dati delle parole chiave Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft e Facebook da flume. Quindi, sto creando una lista di String con queste parole chiave. Per ciascuna di queste aziende, sto eseguendo le seguenti operazioni.
Flusso delle operazioni:
Per prima cosa, sto creando un outPath in cui voglio salvare i risultati. Sto creando una vista temporanea, ‘complete’, sul dataset ‘data’. Successivamente, sto estraendo timestamp, partitionBy (per partizionare i dati durante il salvataggio dei risultati), text, main_text (da utilizzare per le espressioni regolari), followers dai dati.
Sto creando una vista temporanea sui risultati e filtrando i dati di una particolare azienda da essa. Inoltre, sto applicando la UDF Sentiment, che mi restituisce i valori di sentiment nella colonna ‘seVal’. Sto persistendo i dati serializzati in memoria e come spill su disco. Da questi dati, sto ottenendo NetSentiment, il prodotto del numero di follower (Number of Followers) e del valore di sentiment (Sentiment Value) di quel tweet. Questo aiuta a conoscere l'influenza che quel tweet può avere. Posso avere formule diverse per lo stesso scopo.
Stiamo persistendo i dati serializzati in memoria e su disco poiché vogliamo che l'intero risultato venga memorizzato, dato che l'analisi del sentiment è un'attività computazionalmente pesante. Se non persistiamo questi dati e prevediamo di utilizzare più formule per calcolare il NetSentiment o l'influenza, nella query precedente in cui utilizzeremmo il valore del metodo Sentiment più volte, verrebbe eseguita l'analisi del sentiment del tweet più volte.
Raggruppamento dei dati:
Ora sto raggruppando i dati per timestamp e colonna partitionBy e calcolando la media di NetSentiment con questo raggruppamento. Questo mi dà l'influenza media dell'azienda, positiva o negativa in un particolare minuto. Di conseguenza, sto scrivendo i risultati per ciascuna azienda in outPath partizionandoli per colonna partitionBy.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("Numero errato di argomenti! \n"); System.out.println( "Uso: Input Output \n " + "Input: Posizione da cui devono essere letti i dati partizionati" + "Output: Dove deve essere memorizzato il risultato finale"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 estrae TimeStamp, partitionBy (Date), testo del tweet, testo del tweet in minuscolo // e followers_count dell'utente che twitta Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Filtro dei tweet che contengono determinati nomi di aziende Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // Crea una vista chiamata twitter tmp2.createOrReplaceTempView("twitter"); // tmp3 contiene l'intero set di dati selezionati insieme al valore di Sentiment dei // tweet Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Creazione di un'altra vista tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Creazione di una vista finale per salvare i dati net.createOrReplaceTempView("final"); // Calcolo della media dei valori di Sentiment al minuto raggruppando i dati su di esso query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Salvataggio del risultato nel dataset dei risultati result = spark.sql(query); // Scrittura del risultato sul disco result.write().partitionBy("partitionBy").json(outPath); } } } |
Inoltre, al completamento del codice, esporto un jar eseguibile con tutte le dipendenze al suo interno e lo copio sul server, dove voglio eseguire questo job. Successivamente, posso inviarlo utilizzando il seguente comando,
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
dove (Origine):
--master: L' URL master per il cluster (ad es.spark://23.195.26.187:7077)
--deploy-mode: Se distribuire il driver sui nodi worker (cluster) o localmente come client esterno (client) (predefinito:client)application-jar: Percorso di un file jar pacchettizzato che include l'applicazione e tutte le dipendenze. Tieni in considerazione che l'URL deve essere visibile globalmente all'interno del cluster, ad esempio un percorsohdfs://o un percorsofile://presente su tutti i nodi.application-arguments: Argomenti passati al metodo main della classe principale, se presenti. Qui, percorso di input e output
Passaggi finali:
Quindi, otterremo i risultati dell'analisi del sentiment utilizzando Spark dal percorso di output. Ad esempio, questo è un possibile risultato di apple:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
Ho distribuito questa applicazione su CloudSigma con un cluster HDP a 5 nodi. Nello specifico, ogni nodo ha la seguente configurazione:
256 GB SSD
16 GB RAM
20 GHz CPU
Tutto sommato, sono stato in grado di ottenere i risultati dell'analisi del sentiment utilizzando Spark in circa 19 ore. Inoltre, ho incluso calcoli più avanzati rispetto al programma su un set di dati di oltre 80 GB.
Il codice è disponibile su GITHUB.
Commenti
Ancora nessun commento. Scrivi il primo.