En nuestra publicación anterior, ideé una forma de extraer datos de Twitter en tiempo real utilizando Apache Flume. Actualmente, tengo muchos datos de Twitter. Por lo tanto, me gustaría analizarlos y encontrar algunas tendencias en ellos. Para realizar el análisis de sentimiento de los datos de Twitter, voy a utilizar otra herramienta de Big Data, Apache Spark.
Según Hortonworks, “Apache Spark es un motor de procesamiento de datos rápido y en memoria con APIs de desarrollo elegantes y expresivas que permiten a los profesionales de datos ejecutar de manera eficiente cargas de trabajo de streaming, aprendizaje automático o SQL que requieren un acceso iterativo rápido a los conjuntos de datos. Con Spark ejecutándose en Apache Hadoop YARN, los desarrolladores de todo el mundo ahora pueden crear aplicaciones para aprovechar el poder de Spark, obtener información y enriquecer sus cargas de trabajo de ciencia de datos dentro de un único conjunto de datos compartido en Hadoop.”
Un programa de Spark se puede escribir en JAVA, Scala, Python o R. En este caso, utilizaremos JAVA junto con Maven. Además, Spark viene con las distribuciones HDP y Cloudera. Spark 2 es la versión actual que se está utilizando.
Para realizar el análisis de sentimiento con Spark, estoy creando un nuevo proyecto Maven. Lo estoy nombrando Twitter Sentiment Analyzer’. A continuación, estoy creando una clase, “TwitterDataFlow.java” en la que implementaré todos los métodos requeridos.
Language Tool: Corrector ortográfico
Inicialmente, en la POC, descubrí que si la ortografía de los tweets es incorrecta, los resultados del Análisis de Sentimiento se ven afectados negativamente. Por lo tanto, voy a introducir un SpellChecker. Nos ayudará a corregir la ortografía de los tweets antes de usarlos para el Análisis de Sentimiento.
Voy a hacer esto creando un nuevo método estático llamado ‘CorrectSpell’. Además, voy a utilizar un LanguageTool para comprobar las ortografías y corregirlas.
Según el GIT de LanguageTool’, “LanguageTool es un software de corrección de código abierto para inglés, francés, alemán, polaco, ruso y más de otros 20 idiomas. Encuentra muchos errores que un simple corrector ortográfico no puede detectar.”
A continuación, estoy añadiendo una dependencia para la herramienta de idioma en pom.xml:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
Después de eso, estoy definiendo una variable estática a nivel de clase langTool de la clase JLanguageTool. Luego, estoy inicializando langTool con un objeto de la clase AmericanEnglish.
A continuación, estoy codificando el método llamado SpellChecker con la entrada como String text (texto normal) y el tipo de retorno como String (texto con ortografía correcta) también. Estoy utilizando el método check de JLanguageTool con el parámetro como texto no verificado. A continuación, devuelve una lista de RuleMatch. Según los Java Docs de JLanguageTool, la clase RuleMatch proporciona “información sobre una regla de error que coincide con el texto y la posición de la coincidencia.”
Después de esto, estoy definiendo tres variables, ‘result’ de tipo String, ‘lastPos’ de tipo entero, ‘tmp’ de tipo String. Además, con cada RuleMatch, estoy recreando la oración con la primera sugerencia de ortografía de la herramienta. Con eso, he añadido los bloques try-catch necesarios donde sea requerido.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
Analizador de sentimientos: Stanford CoreNLP
El siguiente paso en el análisis de sentimientos con Spark es encontrar sentimientos a partir del texto. Para hacer esto, estoy utilizando la biblioteca Core NLP de Stanford para encontrar valores de sentimiento. Luego, estoy creando una clase llamada ‘StanfordSentiment’ donde voy a implementar la biblioteca para encontrar los sentimientos dentro de nuestro texto.
Para hacer eso, estoy agregando las siguientes dependencias en el archivo pom.xml:
<!– Esta es la biblioteca Core NLP de Stanford –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– Este es el archivo de modelos de Core NLP de Stanford –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Estoy creando una variable de objeto estática, ‘props’, que define las propiedades para la canalización de Core NLP de Stanford. He seleccionado las propiedades mínimas para que sea lo más ligera posible. Después de eso, estoy configurando los anotadores en tokenize, ssplit, pos, parse, sentiment. Estoy creando otra variable de objeto estática, ‘pipeline’ de la clase StanfordCoreNLP. Finalmente, inicializo la canalización con las propiedades de ‘props’.
GetSentiment:
Creé un método, GetSentiment, con una entrada de tipo String y una salida de tipo Double. Estoy utilizando el método CorrectSpell que creé en el archivo LanguageCheck.java. El método CorrectSpell del objeto LanguageCheck me devuelve la ortografía correcta del tweet ingresado. Utilizo el método annotate de StanfordCoreNLP con este texto corregido. Los valores de sentimiento que proporciona esta biblioteca son:
0 => muy negativo
1 => negativo
2 => neutral
3 => positivo
4 => muy positivo
Estoy restando 2 al resultado por oración para obtener las siguientes nuevas categorías de sentimiento:
-2 => muy negativo
-1 => negativo
0 => neutral
1 => positivo
2 => muy positivo
Estoy devolviendo el resultado del sentimiento de cada tweet como el promedio del sentimiento de cada oración del tweet. Los tweets no están escritos en ningún formato estructurado. Por lo tanto, no puedo asignar a ninguna línea específica de un tweet un peso mayor que a las demás. Por lo tanto, asumo que cada línea de un tweet tiene la misma importancia. Estoy devolviendo la variable ‘total’ de tipo Double, que contiene el valor de sentimiento resultante del tweet.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
paquete com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Texto del Tweet: " + checkedText); System.out.println("Valor de Sentimiento: " + total); return total; } } |
Inicio del programa:
Ahora que estas dos clases están listas, avanzaremos para utilizarlas. Por lo tanto, estoy creando una nueva clase, “TwitterDataFlow.java”. Primero, estoy escribiendo una verificación condicional que solo ejecutaría el programa si el número de argumentos de entrada pasados es exactamente 2. Si el número de argumentos no es igual a 2, imprime el mensaje de uso incorrecto y también sale con un estado de salida 1.
Estoy construyendo una SparkSession con el nombre de la aplicación como Sentiment Analyzer. Estoy configurando la propiedad de la configuración de hadoop del contexto de spark, “mapreduce input fileinputformat input dir recursive” como true. Esto me permitirá recuperar archivos de forma recursiva desde las carpetas. Estoy creando una variable, ‘inputPath’ de la clase String en la que estoy configurando el argumento de entrada así como ‘/*/*’, lo que me permitirá leer los datos particionados almacenados por Flume. Estoy leyendo los datos json de Flume en Dataset<Row> ‘data’.
Luego, estoy registrando una UDF (Función Definida por el Usuario) con Spark SQL Context, llamada ‘Sentiment’, que toma un String y aplica el método GetSentiment de StanfordSentiment sobre él y devuelve un tipo de datos de valor Double.
Actualmente, tengo datos de las palabras clave Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft y Facebook de flume. Por lo tanto, estoy creando una lista de String con estas palabras clave. Para cada una de estas empresas, estoy ejecutando las siguientes operaciones.
Flujo de operaciones:
Primero, estoy creando un outPath donde quiero guardar los resultados. Estoy creando una vista temporal, ‘complete’ sobre el conjunto de datos, ‘data’. A continuación, estoy extrayendo timestamp, partitionBy (para particionar los datos al almacenar los resultados), text, main_text (para usar en expresiones regulares), followers de los datos.
Estoy creando una vista temporal sobre los resultados y filtrando los datos de una empresa en particular a partir de ahí. Además, estoy aplicando la UDF Sentiment, que me devuelve los valores de sentimiento en la columna ‘seVal’. Estoy persistiendo los datos serializados en memoria y como desbordamiento en disco. A partir de estos datos, obtengo NetSentiment, el producto del Número de Seguidores y el Valor de Sentimiento de ese tweet. Esto ayuda a conocer la influencia que puede tener ese tweet. Puedo tener diferentes fórmulas para lo mismo.
Estamos persistiendo los datos serializados en memoria y disco ya que queremos que se almacene todo el resultado, dado que el análisis de sentimiento es una tarea computacionalmente pesada. Si no persistimos esto y planeamos usar múltiples fórmulas para calcular el NetSentiment o la influencia, en la consulta anterior donde estaríamos usando el valor del método Sentiment múltiples veces, estaría realizando el Análisis de Sentimiento del tweet múltiples veces.
Agrupación de datos:
Ahora, estoy agrupando los datos por la columna timestamp y partitionBy y promediando el NetSentiment con esta agrupación. Esto me da la influencia promedio de la empresa, positiva o negativa en un minuto en particular. En consecuencia, estoy escribiendo los resultados para cada empresa en outPath particionándolo por la columna partitionBy.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("¡Número incorrecto de argumentos! \n"); System.out.println( "Uso: Entrada Salida \n " + "Entrada: Ubicación desde donde se deben leer los datos particionados" + "Salida: Donde se debe almacenar el resultado final"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 extrae TimeStamp, partitionBy (Fecha), texto del tweet, texto del tweet en minúsculas // y followers_count del usuario que tuitea Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Filtrando tweets que contienen ciertos nombres de empresas Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // Crea una vista llamada twitter tmp2.createOrReplaceTempView("twitter"); // tmp3 contiene todos los datos seleccionados junto con el valor de Sentiment de los // tweets Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Creando otra vista tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Creando una vista final para guardar los datos net.createOrReplaceTempView("final"); // Promediando los Valores de Sentimiento por minuto agrupando los datos por este query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Guardando el resultado en el dataset result result = spark.sql(query); // Escribiendo el resultado en el disco result.write().partitionBy("partitionBy").json(outPath); } } } |
Además, después de completar el código, exporto un archivo jar ejecutable con todas las dependencias en él y lo copio al servidor, donde quiero ejecutar esta tarea. A continuación, puedo enviarlo usando el siguiente comando,
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
donde (Origen):
--master: La URL del master para el clúster (por ejemplo,spark://23.195.26.187:7077)
--deploy-mode: Si se debe desplegar su driver en los nodos de trabajo (cluster) o localmente como un cliente externo (client) (por defecto:client)application-jar: Ruta a un jar empaquetado que incluye su aplicación y todas las dependencias. Tenga en cuenta que la URL debe ser visible globalmente dentro de su clúster, por ejemplo, una rutahdfs://o una rutafile://que esté presente en todos los nodos.application-arguments: Argumentos pasados al método principal de su clase principal, si los hay. Aquí, la ruta de entrada y salida
Pasos finales:
Luego, obtendremos los resultados del análisis de sentimiento usando Spark desde la ruta de salida. Por ejemplo, este es un posible resultado de apple:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
Desplegué esta aplicación en CloudSigma con un clúster HDP de 5 nodos. Específicamente, cada nodo tiene la siguiente configuración:
256 GB SSD
16 GB RAM
20 GHz CPU
En resumen, pude obtener resultados del análisis de sentimiento usando Spark en aproximadamente 19 horas. Además, incluí cálculos más avanzados que el programa sobre un conjunto de datos de más de 80 GB.
El código se puede encontrar en GITHUB.
Comentarios
Aún no hay comentarios. Sea el primero.