Dans notre article précédent, j'ai trouvé un moyen d'extraire des données Twitter en temps réel à l'aide de Apache Flume. Actuellement, j'ai récupéré beaucoup de données de Twitter. Par conséquent, je souhaiterais les analyser et en dégager des tendances. Afin d'effectuer une analyse de sentiment des données Twitter, je vais utiliser un autre outil Big Data, Apache Spark.
Selon Hortonworks, “Apache Spark est un moteur de traitement de données en mémoire rapide, doté d'API de développement élégantes et expressives pour permettre aux professionnels des données d'exécuter efficacement des charges de travail de streaming, de machine learning ou SQL qui nécessitent un accès itératif rapide aux ensembles de données. Avec Spark s'exécutant sur Apache Hadoop YARN, les développeurs du monde entier peuvent désormais créer des applications pour exploiter la puissance de Spark, en tirer des informations et enrichir leurs charges de travail de science des données au sein d'un ensemble de données unique et partagé dans Hadoop.”
Un programme Spark peut être écrit en JAVA, Scala, Python ou R. Dans ce cas, nous utiliserons JAVA avec Maven. De plus, Spark est fourni avec les distributions HDP et Cloudera. Spark 2 est la version actuellement utilisée.
Afin d'effectuer l'analyse de sentiment avec Spark, je crée un nouveau projet Maven. Je le nomme Twitter Sentiment Analyzer’. Ensuite, je crée une classe, “TwitterDataFlow.java” dans laquelle j'implémenterai toutes les méthodes requises.
Language Tool : Correcteur d'orthographe
Initialement, dans le POC, j'ai constaté que si l'orthographe des tweets est incorrecte, les résultats de l'analyse de sentiment sont négativement affectés. Par conséquent, j'introduis un SpellChecker. Il nous aidera à corriger l'orthographe des tweets avant de les utiliser pour l'analyse de sentiment.
Je vais faire cela en créant une nouvelle méthode statique nommée ‘CorrectSpell’. De plus, je vais utiliser un LanguageTool afin de vérifier les orthographes et de les corriger.
Selon le GIT de LanguageTool, “LanguageTool est un logiciel de correction open source pour l'anglais, le français, l'allemand, le polonais, le russe et plus de 20 autres langues. Il trouve de nombreuses erreurs qu'un simple correcteur orthographique ne peut pas détecter.”
Ensuite, j'ajoute une dépendance pour l'outil linguistique dans le fichier pom.xml :
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
Après cela, je définis une variable statique au niveau de la classe, langTool, de la classe JLanguageTool. Ensuite, j'initialise langTool avec un objet de la classe AmericanEnglish.
Ensuite, je code la méthode nommée SpellChecker avec comme entrée un texte de type String (texte normal) et également comme type de retour String (texte avec orthographe corrigée). J'utilise la méthode check de JLanguageTool avec comme paramètre le texte non vérifié. Ensuite, elle renvoie une liste de RuleMatch. Selon la Javadoc de JLanguageTool, la classe RuleMatch fournit “des informations sur une règle d'erreur qui correspond au texte et la position de la correspondance.”
À la suite de cela, je définis trois variables, ‘result’ de type String, ‘lastPos’ de type entier, ‘tmp’ de type String. De plus, avec chaque RuleMatch, je recrée la phrase avec la première suggestion d'orthographe de l'outil. Avec cela, j'ai ajouté les blocs try-catch nécessaires partout où cela est requis.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
Analyseur de sentiment : Stanford CoreNLP
La prochaine étape de l'analyse de sentiment avec Spark consiste à trouver des sentiments à partir du texte. Pour ce faire, j'utilise la bibliothèque Stanford’s Core NLP pour trouver les valeurs de sentiment. Ensuite, je crée une classe nommée ‘StanfordSentiment’ dans laquelle je vais implémenter la bibliothèque pour trouver les sentiments au sein de notre texte.
Pour ce faire, j'ajoute les dépendances suivantes dans le fichier pom.xml :
<!– Il s'agit de la bibliothèque Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– Il s'agit du fichier de modèles de Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Je crée une variable d'objet statique, ‘props’, qui définit les propriétés du pipeline de Stanford Core NLP. J'ai sélectionné les propriétés minimales pour le rendre aussi léger que possible. Après cela, je configure les annotateurs sur tokenize, ssplit, pos, parse, sentiment. Je crée une autre variable d'objet statique, ‘pipeline’ de la classe StanfordCoreNLP. Enfin, j'initialise le pipeline avec les propriétés ‘props’.
GetSentiment :
J'ai créé une méthode, GetSentiment, avec une chaîne de caractères (String) en entrée et un Double en sortie. J'utilise la méthode CorrectSpell que j'ai créée dans le fichier LanguageCheck.java. La méthode CorrectSpell de l'objet LanguageCheck me renvoie l'orthographe correcte du tweet saisi. J'utilise la méthode annotate de StanfordCoreNLP avec ce texte corrigé. Les valeurs de sentiment fournies par cette bibliothèque sont :
0 => très négatif
1 => négatif
2 => neutre
3 => positif
4 => très positif
Je soustrais 2 au résultat par phrase pour obtenir les nouvelles catégories de sentiment suivantes :
-2 => très négatif
-1 => négatif
0 => neutre
1 => positif
2 => très positif
Je renvoie le résultat du sentiment de chaque tweet sous forme de moyenne du sentiment de chaque phrase du tweet. Les tweets ne sont pas écrits dans un format structuré. Par conséquent, je ne peux pas attribuer un poids plus élevé à une ligne spécifique d'un tweet par rapport aux autres. Par conséquent, je suppose que chaque ligne d'un tweet a une importance égale. Je renvoie la variable ‘total’ de type Double qui contient la valeur de sentiment résultante du tweet.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Texte du tweet : " + checkedText); System.out.println("Valeur du sentiment : " + total); return total; } } |
Initialisation du programme :
Maintenant que ces deux classes sont prêtes, nous allons passer à leur utilisation. Ainsi, je crée une nouvelle classe, “TwitterDataFlow.java”. Tout d'abord, j'écris une vérification conditionnelle qui n'exécutera le programme que si le nombre d'arguments d'entrée transmis est exactement égal à 2. Si le nombre d'arguments n'est pas égal à 2, elle affiche le message d'utilisation incorrecte et quitte également avec un code de sortie 1.
Je construis une SparkSession avec le nom d'application Sentiment Analyzer. Je définis la propriété de configuration hadoop du contexte spark, “mapreduce input fileinputformat input dir recursive” sur true. Cela me permettra de récupérer des fichiers de manière récursive à partir des dossiers. Je crée une variable, ‘inputPath’ de la classe String dans laquelle je définis l'argument d'entrée ainsi que ‘/*/*’ ce qui me permettra de lire les données partitionnées stockées par Flume. Je lis les données json de Flume dans Dataset<Row> ‘data’.
Ensuite, j'enregistre une UDF (User Defined Function) auprès de Spark SQL Context, nommée ‘Sentiment’ qui prend une String et lui applique la méthode GetSentiment de StanfordSentiment et renvoie un type de données de valeur Double.
Actuellement, je dispose des données des mots-clés Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft et Facebook provenant de flume. Je crée donc une liste de String avec ces mots-clés. Pour chacune de ces entreprises, j'exécute les opérations suivantes.
Flux d'opérations :
Tout d'abord, je crée un outPath où je souhaite enregistrer les résultats. Je crée une vue temporaire, ‘complete’ sur le jeu de données, ‘data’. Ensuite, j'extrais le timestamp, partitionBy (afin de partitionner les données lors du stockage des résultats), text, main_text (à utiliser pour les expressions régulières), followers à partir des données.
Je crée une vue temporaire sur les résultats et je filtre les données d'une entreprise particulière à partir de là. De plus, j'applique l'UDF Sentiment, qui me renvoie les valeurs de sentiment dans la colonne ‘seVal’. Je persiste les données sérialisées en mémoire et sous forme de débordement sur disque (disk spill). À partir de ces données, j'obtiens NetSentiment, le produit du nombre de followers (Number of Followers) et de la valeur de sentiment (Sentiment Value) de ce tweet. Cela permet de connaître l'influence que ce tweet peut avoir. Je peux avoir différentes formules pour cela.
Nous persistons les données sérialisées en mémoire et sur disque car nous voulons que l'intégralité du résultat soit stockée, l'analyse de sentiment étant une tâche lourde en calculs. Si nous ne persistons pas cela et que nous prévoyons d'utiliser plusieurs formules pour calculer le NetSentiment ou l'influence, dans la requête précédente où nous utiliserions la valeur de la méthode Sentiment plusieurs fois, elle effectuerait l'analyse de sentiment du tweet plusieurs fois.
Groupement de données :
Maintenant, je regroupe les données par timestamp et par colonne partitionBy et je calcule la moyenne du NetSentiment avec ce regroupement. Cela me donne l'influence moyenne de l'entreprise, positive ou négative, à une minute précise. Par conséquent, j'écris les résultats pour chaque entreprise dans outPath en les partitionnant par la colonne partitionBy.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("Nombre incorrect d'arguments ! \n"); System.out.println( "Utilisation : Entrée Sortie \n " + "Entrée : Emplacement à partir duquel les données partitionnées doivent être lues" + "Sortie : Emplacement où le résultat final doit être stocké"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 extrait TimeStamp, partitionBy (Date), le texte du tweet, le texte du tweet en minuscules et followers_count de l'utilisateur qui tweete Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Filtrage des tweets contenant certains noms d'entreprises Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // Crée une vue nommée twitter tmp2.createOrReplaceTempView("twitter"); // tmp3 contient l'intégralité des données sélectionnées ainsi que la valeur de sentiment des // tweets Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Création d'une autre vue tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Création d'une vue finale pour sauvegarder les données net.createOrReplaceTempView("final"); // Calcul de la moyenne des valeurs de sentiment par minute en y groupant les données query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Sauvegarde du résultat dans le jeu de données result result = spark.sql(query); // Écriture du résultat sur le disque result.write().partitionBy("partitionBy").json(outPath); } } } |
De plus, une fois le code terminé, j'exporte un jar exécutable contenant toutes les dépendances et je le copie sur le serveur où je souhaite exécuter cette tâche. Ensuite, je peux le soumettre à l'aide de la commande suivante,
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
où (Source):
--master: L' URL master pour le cluster (par exemple,spark://23.195.26.187:7077)
--deploy-mode: S'il faut déployer votre driver sur les nœuds de travail (cluster) ou localement en tant que client externe (client) (par défaut :client)application-jar: Chemin vers un jar packagé incluant votre application et toutes ses dépendances. Prenez en considération que l'URL doit être visible globalement à l'intérieur de votre cluster, par exemple, un cheminhdfs://ou un cheminfile://présent sur tous les nœuds.application-arguments: Arguments transmis à la méthode principale de votre classe principale, le cas échéant. Ici, les chemins d'entrée et de sortie
Étapes finales :
Ensuite, nous obtiendrons les résultats de l'analyse de sentiment à l'aide de Spark à partir du chemin de sortie. Par exemple, voici un résultat possible pour apple :
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
J'ai déployé cette application sur CloudSigma avec un cluster HDP de 5 nœuds. Plus précisément, chaque nœud ayant la configuration suivante :
256 GB SSD
16 GB RAM
20 GHz CPU
Dans l'ensemble, j'ai pu obtenir les résultats de l'analyse de sentiment à l'aide de Spark en environ 19 heures. De plus, j'ai inclus des calculs plus avancés que le programme sur un ensemble de données de plus de 80 Go.
Le code est disponible sur GITHUB.
Commentaires
Aucun commentaire pour l'instant. Soyez le premier.