Realtime Twitter Data Ingestion using Flume

With more than 330 million active users, Twitter is one of the top platforms where people like to share their thoughts. Twitter data can be used for a variety of purposes such as research, consumer insights, demographic insights and many more. Twitter data insights are especially useful for businesses as they allow for the analysis of large amounts of data available online which would be nearly impossible to investigate otherwise.

In a previous blog post, we learned how to get a sample of tweets with Twitter API using Python. The amount of tweets we were able to collect with our previous Twitter program per keyword was around 200. With a big data tool like Apache Flume, we are able to extract real-time tweets. With a normal account, we can extract around 1% of the Twitter data. For the entire data, we would need an Enterprise account to access Twitter’s PowerTrack API. The PowerTrack API provides customers with the ability to filter the full Twitter firehose, and only receive the data that they or their customers are interested in.

Introduction to Flume

We are using Flume to access the real-time streaming data.

According to Apache.org, “Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and faults tolerant with tunable reliability mechanisms and many failovers and recovery mechanisms. It uses a simple extensible data model that allows for an online analytic application.”

Flume has three components – Source, Channel, and Sink.

The source is where the data is received from. In our case, the source is Twitter.

The channel is the mediator where the data is stored until it’s consumed by the sink.

The sink is where the data is finally stored/consumed at. For example- HDFS.

Accessing Twitter Source

As our source is Twitter, we need a way to access the real-time data from Twitter. Cloudera has released their own implementation for real-time data extraction, which can be found here.

When I used their implementation, I faced the following problems:

  1. Keyword Search wasn’t exactly working. I got many tweets which did not have the keywords I specified.
  2. I couldn’t filter by language. It would be an additional overhead for me to store the data and then another additional overhead to create a say, Spark job to filter that data to only retrieve English language tweets.

When I explored their implementation. I found out that they are using the Twitter4j library (v. 3.0.5). Updating to a newer version ( v. 4.0.6), we get to use a variety of filters. Those are:

  • Keywords Filter
    Specifies the keywords to filter the data.The tweets would be filtered by the specified set of keywords.
  • Language Filter
    Specifies the languages to filter the data by language.In our case, we would use only the English language. So, we’d be specifying it as ‘en’. More language keywords can be found from ISO 639-1 codes in this link.
  • Follow Filter
    Specifies the Twitter IDs from which we want the content.
  • Locations Filter
    Specifies four coordinates. Those four coordinates would make a box. The tweets geolocated inside that area would be filtered and retrieved.
  • Count of Previous Tweets
    It would first retrieve the past tweets of the mentioned number. This feature is available only with the premium version.

Configuring the Source Jar

First, I am changing the dependency of Twitter4j from version 3.0.5 to 4.0.6. For that, in pom.xml file, I am replacing the following code:

Old Dependency:
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.5</version>
</dependency>

New Dependency:

<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>4.0.6</version>
<scope>compile</scope>
</dependency>

In TwitterSourceConstants.java, I have defined the following new Strings. These are defined to get the values for the new filters we are adding from the Flume configuration file.

Now, moving onto TwitterSource.java, I am defining the following new variables.

For these variables, we are getting the values from TwitterSourceConstants.java using Context.

Now, we will be parsing the values that we have got in form of Strings, and converting them in the desired type.

For language, we are converting it into an array of String.

For follow_ids, we are converting it into an array of long.

For locations, we are converting it into a 2D array.

For count_prev_tweets, we are converting it into an int value.

Now, along with the below-mentioned filtering of keywords, I am adding a few we just defined.

Adding the filter for previous tweets:

Adding the filter for languages:

Adding the filter of Twitter IDs:

Adding the filter for geo-locations:

We can do maven clean install to get the output jar in the target folder.

Configuring Flume

First of all, remove the old version of twitter4j jars from $FLUME_HOME/lib and add the newer version of them which can be downloaded from Maven repositories. Delete flume-twitter-source-1.8.0 file from $FLUME_HOME/lib and copy your output jar to this folder.

We are going to create a file, TwitterStream.properties, which we are placing in $FLUME_HOME. The content of the file would be.

I am defining the agent’s source, channel, and sink here. I am naming the agent TwitterAgent. As mentioned above, there are three components of the agent – Source, Channel, and Sink. We are naming our source Twitter, channel MemCh and sink HDFS.

Defining the source properties for TwitterAgent. Type of Twitter would be the fully classified class name. Channels would be MemCh which would bind a connection between the source Twitter and channel MemCh. Consumer Key, Consumer Secret, Access Token and Access Token Secret can be obtained by creating an app on Twitter. Keywords are where we put the list of keywords which we want in our filtered tweets text, for example, Big Data, Data, Hadoop.  We can put locations to filter tweets by geolocation. We can use language to filter tweet by language, ‘en’ for English.  Follow is for the list of Twitter IDs you would want the tweets from and the Count is the number of previous tweets you’d want to retrieve before starting with the real-time data. The Count feature is only available for enterprise accounts on Twitter.

Defining the sink properties for HDFS. The channel that we have set for HDFS sink is MemCh. This would connect our source with sink. The type of sink is hdfs. The path for the same is flume/Twitter/ on my HDFS. Inside those folders, there would be data partitioned into multiple folders based on when that data was acquired. I am listing down the definition of other properties from Flume’s documentation here with their default values listed in the 2nd column:

hdfs.rollInterval 30 Number of seconds to wait before rolling current file
(0 = never roll based on time interval)
hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount 10 Number of events written to file before it rolled
(0 = never roll based on a number of events)
hdfs.batchSize 100 number of events written to file before it is flushed to HDFS
hdfs.fileType SequenceFile File format: currently SequenceFileDataStream or CompressedStream
(1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.writeFormat Writable Format for sequence file records. One of Text or Writable. Set to Text before creating data files with Flume, otherwise, those files cannot be read by either Apache Impala (incubating) or Apache Hive.
Defining the channel properties for Memory Channel. The type of MemCh is memory. I am listing down the definition of the properties from Flume’s documentation here with their default values listed in the 2nd column:

capacity 100 The maximum number of events stored in the channel
transactionCapacity 100 The maximum number of events the channel will take from a source or give to a sink per transaction
I found the above optimal properties here.

We can execute the following command in the $FLUME_HOME directory to start the ingestion. On another note, you can add the bin directory of $FLUME_HOME to directly access the commands anywhere. Following is the description of the parameters of this command.

  • conf-file : The flume configuration file where we have configured the source, channel, sink and the related properties.
  • name : Name of the Agent. In this case, TwitterAgent
  • conf : Configuration directory of the flume. Generally, $FLUME_HOME/conf
  • Dflume.root.logger=INFO,console : It writes the logs to console
This would run a Flume job which would take real-time tweets data from Twitter and store it in HDFS sink through memory channel. It would allow us to collect real-time data of the specified keywords and find meaningful trends and insights from the ongoing conversations/status by the netizens.

The modified implementation can be found here. The configuration file and the jar can be found in the ‘output’ folder on the same link.

Share this Post

About Akshay Nagpal

Big Data Analytics and ML enthusiast.

Leave a Reply