В нашем предыдущем посте, я разработал способ извлечения данных Twitter в реальном времени с помощью Apache Flume. На данный момент я получил много данных из Twitter. Поэтому я хотел бы проанализировать их и найти в них некоторые тенденции. Чтобы выполнить анализ тональности данных Twitter, я собираюсь использовать еще один инструмент Big Data, Apache Spark.
Согласно Hortonworks, “Apache Spark — это быстрый механизм обработки данных в оперативной памяти с элегантными и выразительными API для разработки, позволяющий специалистам по данным эффективно выполнять потоковую передачу, машинное обучение или SQL-нагрузки, требующие быстрого итеративного доступа к наборам данных. Благодаря запуску Spark на Apache Hadoop YARN разработчики во всем мире теперь могут создавать приложения, использующие возможности Spark, получать аналитическую информацию и обогащать свои рабочие нагрузки в области анализа данных в рамках единого общего набора данных в Hadoop.”
Программу на Spark можно написать на JAVA, Scala, Python или R. В данном случае мы будем использовать JAVA вместе с Maven. Кроме того, Spark поставляется как в дистрибутиве HDP, так и в Cloudera. В настоящее время используется версия Spark 2.
Чтобы выполнить анализ тональности с помощью Spark, я создаю новый проект Maven. Я называю его Twitter Sentiment Analyzer’. Далее я создаю класс “TwitterDataFlow.java”, в котором реализую все необходимые методы.
Language Tool: Исправление орфографии
Изначально в рамках POC я обнаружил, что если в твитах есть орфографические ошибки, это негативно сказывается на результатах анализа тональности. Поэтому я внедряю SpellChecker. Он поможет нам исправить орфографию в твитах перед их использованием для анализа тональности.
Я собираюсь сделать это, создав новый статический метод с именем ‘CorrectSpell’. Кроме того, я буду использовать LanguageTool для проверки орфографии и ее исправления.
Согласно LanguageTool’s GIT, “LanguageTool — это программное обеспечение с открытым исходным кодом для проверки правописания на английском, французском, немецком, польском, русском и более чем 20 других языках. Оно находит множество ошибок, которые не может обнаружить простой инструмент проверки орфографии.”
Далее я добавляю зависимость для языкового инструмента в pom.xml:
<dependency>
<groupId>org.languagetool</groupId>
<artifactId>language-en</artifactId>
<version>4.0</version>
</dependency>
После этого я определяю статическую переменную уровня класса langTool класса JLanguageTool. Затем я инициализирую langTool объектом класса AmericanEnglish.
Далее я пишу код метода с именем SpellChecker, где входным параметром является String text (обычный текст), а возвращаемым типом также является String (текст с исправленной орфографией). Я использую метод check класса JLanguageTool с параметром в виде непроверенного текста. Далее он возвращает список RuleMatch. Согласно Java-документации JLanguageTool, класс RuleMatch предоставляет “информацию о правиле ошибки, соответствующем тексту, и позиции совпадения.”
После этого я определяю три переменные: ‘result’ типа String, ‘lastPos’ типа integer, ‘tmp’ типа String. Кроме того, для каждого RuleMatch я воссоздаю предложение с первым предложенным инструментом вариантом написания. Вместе с этим я добавил необходимые блоки try-catch везде, где это требуется.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import org.languagetool.JLanguageTool; import org.languagetool.language.AmericanEnglish; import org.languagetool.rules.RuleMatch; public class LanguageCheck { static JLanguageTool langTool = new JLanguageTool(new AmericanEnglish()); public static String CorrectSpell(String text) { String query = text; try { List matches = langTool.check(query); String result = ""; int lastPos = 0; String tmp = ""; for (RuleMatch ma : matches) { try { tmp = ma.getSuggestedReplacements().get(0); result += query.substring(lastPos, ma.getFromPos()); result += tmp; lastPos = ma.getToPos(); } catch (Exception e) { return text; } } if (lastPos < query.length()) { result += query.substring(lastPos, query.length()); } return result; } catch (Exception e) { return text; } } } |
Анализатор тональности: Stanford CoreNLP
Следующим шагом в анализе тональности с помощью Spark является определение тональности текста. Для этого я использую библиотеку Stanford’s Core NLP для поиска значений тональности. Затем я создаю класс с именем ‘StanfordSentiment’, в котором собираюсь реализовать библиотеку для поиска тональности в нашем тексте.
Для этого я добавляю следующие зависимости в файл pom.xml:
<!– Это библиотека Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<!– Это файл моделей Stanford Core NLP –>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
Я создаю статическую переменную объекта ‘props’, которая определяет свойства для конвейера Stanford Core NLP. Я выбрал минимальные свойства, чтобы сделать его как можно более легким. После этого я настраиваю аннотаторы на tokenize, ssplit, pos, parse, sentiment. Я создаю еще одну статическую переменную объекта ‘pipeline’ класса StanfordCoreNLP. Наконец, я инициализирую конвейер со свойствами ‘props’.
GetSentiment:
Я создал метод GetSentiment с входным значением типа String и выходным значением типа Double. Я использую метод CorrectSpell, который я создал в файле LanguageCheck.java. Метод CorrectSpell объекта LanguageCheck возвращает мне правильное написание введенного твита. Я использую метод annotate класса StanfordCoreNLP с этим исправленным текстом. Значения тональности, возвращаемые этой библиотекой, следующие:
0 => очень негативный
1 => негативный
2 => нейтральный
3 => положительный
4 => очень положительный
Я вычитаю 2 из результата для каждого предложения, чтобы получить следующие новые категории тональности:
-2 => очень негативный
-1 => негативный
0 => нейтральный
1 => положительный
2 => очень положительный
Я возвращаю результат тональности каждого твита как среднее значение тональности каждого предложения твита. Твиты не пишутся в каком-либо структурированном формате. Следовательно, я не могу присвоить какой-то конкретной строке твита больший вес, чем другим. Поэтому я предполагаю, что каждая строка в твите имеет одинаковую важность. Я возвращаю переменную ‘total’ типа Double, которая содержит результирующее значение тональности твита.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
пакет com.CloudSigma.Spark.TwitterSentiment; import java.util.List; import java.util.Properties; import edu.stanford.nlp.ling.CoreAnnotations.SentencesAnnotation; import edu.stanford.nlp.neural.rnn.RNNCoreAnnotations; import edu.stanford.nlp.pipeline.Annotation; import edu.stanford.nlp.pipeline.StanfordCoreNLP; import edu.stanford.nlp.sentiment.SentimentCoreAnnotations; import edu.stanford.nlp.trees.Tree; import edu.stanford.nlp.util.CoreMap; import static com.CloudSigma.Spark.TwitterSentiment.LanguageCheck.*; public class StanfordSentiment { static Properties props = new Properties(); static { props.setProperty("annotators", "tokenize,ssplit,pos,parse,sentiment");} static StanfordCoreNLP pipeline = new StanfordCoreNLP(props); public static Double GetSentiment(String text) { String checkedText = CorrectSpell(text); Annotation document = new Annotation(checkedText); pipeline.annotate(document); List sentences = document.get(SentencesAnnotation.class); Double sum = 0.0; for (CoreMap sentence : sentences) { Tree tree = sentence.get(SentimentCoreAnnotations.SentimentAnnotatedTree.class); int sentiment = RNNCoreAnnotations.getPredictedClass(tree); int scaled = sentiment - 2; sum = sum + scaled; } Double total = sum / sentences.size(); System.out.println("Текст твита: " + checkedText); System.out.println("Значение тональности: " + total); return total; } } |
Запуск программы:
Теперь, когда эти два класса готовы, мы перейдем к их использованию. Таким образом, я создаю новый класс “TwitterDataFlow.java”. Сначала я пишу условную проверку, которая запустит программу только в том случае, если количество переданных входных аргументов равно ровно 2. Если количество аргументов не равно 2, она выводит сообщение о некорректном использовании, а также завершает работу с кодом выхода 1.
Я создаю SparkSession с именем приложения Sentiment Analyzer. Я устанавливаю свойство конфигурации hadoop для spark context’s “mapreduce input fileinputformat input dir recursive” в значение true. Это позволит мне рекурсивно извлекать файлы из папок. Я создаю переменную ‘inputPath’ класса String, в которой задаю входной аргумент, а также ‘/*/*’, что позволит мне читать партиционированные данные, сохраненные Flume. Я читаю данные json из Flume в Dataset<Row> ‘data’.
Затем я регистрирую UDF (пользовательскую функцию) в Spark SQL Context под именем ‘Sentiment’, которая принимает String, применяет к ней метод GetSentiment класса StanfordSentiment’s и возвращает тип данных Double.
В настоящее время у меня есть данные по ключевым словам Apple, Google, Tesla, Infosys, TCS, Oracle, Microsoft и Facebook из flume. Поэтому я создаю список String с этими ключевыми словами. Для каждой из этих компаний я выполняю следующие операции.
Поток операций:
Сначала я создаю outPath, куда хочу сохранить результаты. Я создаю временное представление (temp view) ‘complete’ для набора данных ‘data’. Далее я извлекаю из данных timestamp, partitionBy (чтобы партиционировать данные при сохранении результатов), text, main_text (для использования в регулярных выражениях), followers.
Я создаю временное представление для результатов и отфильтровываю из него данные конкретной компании. Также я применяю UDF Sentiment, которая возвращает мне значения тональности в колонке ‘seVal’. Я сохраняю (persist) сериализованные данные в памяти и со сбросом на диск (disk spill). Из этих данных я получаю NetSentiment — произведение количества подписчиков (Number of Followers) и значения тональности (Sentiment Value) этого твита. Это помогает определить влияние, которое может иметь твит. Для этого я могу использовать различные формулы.
Мы сохраняем (persist) сериализованные данные в памяти и на диске, так как хотим сохранить весь результат, поскольку анализ тональности — это ресурсоемкая задача. Если мы не сохраним эти данные и планируем использовать несколько формул для расчета NetSentiment или влияния, то в предыдущем запросе, где мы бы использовали значение метода Sentiment’s несколько раз, анализ тональности твита выполнялся бы многократно.
Группировка данных:
Теперь я группирую данные по столбцам timestamp и partitionBy и нахожу среднее значение NetSentiment для этой группировки. Это дает мне среднее влияние компании, положительное или отрицательное, в конкретную минуту. Следовательно, я записываю результаты для каждой компании в outPath, разделяя их по столбцу partitionBy.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 |
package com.CloudSigma.Spark.TwitterSentiment; import java.util.Arrays; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.storage.StorageLevel; import static com.CloudSigma.Spark.TwitterSentiment.StanfordSentiment.*; public class TwitterDataFlow { public static void main(String[] args) { if (args.length != 2) { System.out.println("Неверное количество аргументов! \n"); System.out.println( "Использование: Вход Выход \n " + "Вход: Путь, откуда должны быть считаны секционированные данные" + "Выход: Путь, где должен быть сохранен конечный результат"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("Sentiment Analyzer").getOrCreate(); spark.sparkContext().hadoopConfiguration().set("mapreduce.input.fileinputformat.input.dir.recursive", "true"); String inputPath = args[0] + "/*/*"; Dataset data = spark.read().json(inputPath); spark.sqlContext().udf().register("Sentiment", (String s) -> GetSentiment(s), DataTypes.DoubleType); List list = Arrays.asList("apple", "google", "tesla", "infosys", "tcs", "oracle", "microsoft", "facebook"); String query; String outPath; Dataset result; for (String company : list) { outPath = args[1] + "/" + company; data.createOrReplaceTempView("complete"); // tmp1 извлекает TimeStamp, partitionBy (Date), текст твита, текст твита в нижнем // регистре и followers_count пользователя, отправившего твит Dataset tmp1 = spark.sql( "select concat(substr(created_at,5,6), substr(created_at,26,5),' ',substr(created_at,12,6),'00') as timestamp,substr(created_at,5,6) as partitionBy,text,lower(text) as main_text,user.followers_count as followers from complete"); tmp1.createOrReplaceTempView("tmp"); // Фильтрация твитов, содержащих определенные названия компаний Dataset tmp2 = spark.sql("select * from tmp where main_text regexp '(" + company + ")'"); // Создает представление с именем twitter tmp2.createOrReplaceTempView("twitter"); // tmp3 содержит все выбранные данные вместе со значением Sentiment для // твитов Dataset tmp3 = spark.sql("select *, Sentiment(text) as seVal from twitter"); tmp3.persist(StorageLevel.MEMORY_AND_DISK()); // Создание еще одного представления tmp3.createOrReplaceTempView("dataSe"); Dataset net = spark.sql( "select *,followers*seVal as NetSentiment from dataSe"); // Создание финального представления для сохранения данных net.createOrReplaceTempView("final"); // Усреднение значений тональности (Sentiment) за минуту путем группировки данных по ней query = "select timestamp,partitionBy,AVG(NetSentiment) from final group by timestamp,partitionBy"; // Сохранение результата в датасет result result = spark.sql(query); // Запись результата на диск result.write().partitionBy("partitionBy").json(outPath); } } } |
Также после завершения написания кода я экспортирую исполняемый jar-файл со всеми зависимостями и копирую его на сервер, где хочу запустить эту задачу. Затем я могу отправить ее с помощью следующей команды:
|
1 |
spark-submit --master yarn --deploy-mode cluster SentimentAnalyzer.jar /flume/Twitter/PublicStream/ /flume/output |
где (Источник):
--master: главный URL-адрес для кластера (например,spark://23.195.26.187:7077)
--deploy-mode: развертывать ли драйвер на рабочих узлах (cluster) или локально в качестве внешнего клиента (client) (по умолчанию:client)application-jar: путь к собранному jar-файлу, содержащему ваше приложение и все зависимости. Учтите, что URL-адрес должен быть глобально виден внутри вашего кластера, например, путьhdfs://илиfile://путь, который присутствует на всех узлах.application-arguments: аргументы, передаваемые в метод main вашего основного класса, если таковые имеются. В данном случае — пути ввода и вывода
Финальные шаги:
Затем мы получим результаты анализа тональности с помощью Spark из выходного пути. Например, это возможный результат для apple:
{“timestamp”:”Apr 30 2018 20:31:00″,”avg(NetSentiment)”:-3678.768518518518}
{“timestamp”:”Apr 30 2018 20:32:00″,”avg(NetSentiment)”:-883.002824858757}
Я развернул это приложение на CloudSigma с 5-узловым кластером HDP. В частности, каждый узел имел следующую конфигурацию:
256 ГБ SSD
16 ГБ ОЗУ
20 ГГц ЦП
В общей сложности мне удалось получить результаты анализа тональности с помощью Spark примерно за 19 часов. Кроме того, я включил более сложные вычисления, чем в программе, для набора данных объемом более 80 ГБ.
Код можно найти на GITHUB.
Комментарии
Комментариев пока нет. Будьте первым.