Monday, January 19, 2015

Configure Flume to Collect Twitter Streams Using Cloudera Manager


Below are the steps I followed to collect twitter streams to HDFS with Flume:


1. Before you start, please make sure you have below 4 items generated for twitter streaming API. For more info please check twitter streaming API documentation. 
consumerKey
consumerSecret
accessToken
accessTokenSecret

2. From Cloudera Manager Stop current running Flume service.

3. Create a dedicated flume agent role group[named: "AgentTweets"] under "Flume" service:
- Cloudera manager-> flume-> configuration-> Role Groups > "Create new group..." > provide the "AgentTweets"

4. Assign as host for role group AgentTweets:
- Cloudera manager->flume->configuration-> Role Groups > "Create new group..." > select "Agent Default Group" > check "agent (sthdmgt1-pvt)"  > "Action for Selected" > Move to Different Role Group .. > AgentTweets > Move

5. Check for Flume plugins directory:
Cloudera manager->flume->configuration->AgentTweets > "Plugin directories"

In our case plugin directories are :
/usr/lib/flume-ng/plugins.d
/var/lib/flume-ng/plugins.d

*** Note: As we are using host "sthdmgt1-pvt" for agent role AgentTweets, we are considering these and rest of the changes on host "sthdmgt1-pvt" only.

6. create the plugin directories as those do not exists:
mkdir -p /usr/lib/flume-ng/plugins.d
mkdir -p /var/lib/flume-ng/plugins.d

-- Also create the location for the twitter plugin:
mkdir -p /usr/lib/flume-ng/plugins.d/twitter-streaming/lib/
mkdir -p /var/lib/flume-ng/plugins.d/twitter-streaming/lib/

chown -R flume:flume /usr/lib/flume-ng/
chown -R flume:flume /var/lib/flume-ng/

7. download Download the custom Flume Source and copy it to both of the flume plugin locations
cd /tmp 
wget "http://files.cloudera.com/samples/flume-sources-1.0-SNAPSHOT.jar"
cp flume-sources-1.0-SNAPSHOT.jar /usr/lib/flume-ng/plugins.d/twitter-streaming/lib/
cp flume-sources-1.0-SNAPSHOT.jar /var/lib/flume-ng/plugins.d/twitter-streaming/lib/

8. If we use twitter4j-* jars with version 3 or above, we may end up to below error in flume agent log:

Unable to start EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} } - Exception follows.
java.lang.NoSuchMethodError: twitter4j.FilterQuery.setIncludeEntities(Z)Ltwitter4j/FilterQuery;
at com.cloudera.flume.source.TwitterSource.start(TwitterSource.java:139)

It is due to both twitter4j-stream-3.0.3.jar & flume-sources-1.0-SNAPSHOT.jar having same class "TwitterSource", To resolve above error do below steps:

8.1 Go to cloudera default location for flume library, Take a backup and remove all the twitter4j-*-3.0.3.jar

[root@sthdmgt1-pvt ~]# ll /opt/cloudera/parcels/CDH/lib/flume-ng/lib/|grep twitter4j
lrwxrwxrwx 1 root root 38 Nov 18 11:09 twitter4j-core-3.0.3.jar -> ../../../jars/twitter4j-core-3.0.3.jar
lrwxrwxrwx 1 root root 47 Nov 18 11:09 twitter4j-media-support-3.0.3.jar -> ../../../jars/twitter4j-media-support-3.0.3.jar
lrwxrwxrwx 1 root root 40 Nov 18 11:09 twitter4j-stream-3.0.3.jar -> ../../../jars/twitter4j-stream-3.0.3.jar
[root@sthdmgt1-pvt ~]#

*** Note: Please make sure to take a note of above links for future reference.
8.2 In this case, just need to remove the links as below:
# rm -rf /opt/cloudera/parcels/CDH/lib/flume-ng/lib/twitter4j-*-3.0.3.jar

8.3 download older version of twitter4j and copy to both of the flume plugin locations. We are choosing latest before 3.x version, that is 2.6:
# cd /tmp

# wget http://twitter4j.org/maven2/org/twitter4j/twitter4j-stream/2.2.6/twitter4j-stream-2.2.6.jar
# wget http://twitter4j.org/maven2/org/twitter4j/twitter4j-core/2.2.6/twitter4j-core-2.2.6.jar
# wget http://twitter4j.org/maven2/org/twitter4j/twitter4j-media-support/2.2.6/twitter4j-media-support-2.2.6.jar

# cp twitter4j-*.jar /var/lib/flume-ng/plugins.d/twitter-streaming/lib/
# cp twitter4j-*.jar /usr/lib/flume-ng/plugins.d/twitter-streaming/lib/

# chown -R flume:flume /usr/lib/flume-ng/
# chown -R flume:flume /var/lib/flume-ng/

Ref: http://stackoverflow.com/questions/19189979/cannot-run-flume-because-of-jar-conflict

9. *** Note: make sure system time zone and time is sync with your twitter settings to avoid 401 error.

10. Configure the flume agent role group:
10.1 Set the agent name:
Cloudera manager->flume->configuration->AgentTweets > Agent Name > set the agent name to "AgentTweets" [make sure agent name here and in config in the next steps are same]
10.2 Set flume configuration, make sure all the config prefoxed with the agent name set in setp 10.1
Cloudera manager->flume->configuration->AgentTweets > Agent Name > copy  below config replace entire content in "Configuration File":
--------------------twitter flume conf start -------------
AgentTweets.sources = Twitter
AgentTweets.channels = MemChannel
AgentTweets.sinks = HDFS

AgentTweets.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
AgentTweets.sources.Twitter.channels = MemChannel
AgentTweets.sources.Twitter.consumerKey = consumerKey from step 1
AgentTweets.sources.Twitter.consumerSecret = consumerSecret from step 1
AgentTweets.sources.Twitter.accessToken = accessToken from step 1
AgentTweets.sources.Twitter.accessTokenSecret = accessTokenSecret from step 1
AgentTweets.sources.Twitter.keywords = malaysia, msia

AgentTweets.sinks.HDFS.channel = MemChannel
AgentTweets.sinks.HDFS.type = hdfs
AgentTweets.sinks.HDFS.hdfs.path = hdfs://namenodeHostnameOrIP:8020/user/flume/tweets/malaysia/%Y/%m/%d/%H/
AgentTweets.sinks.HDFS.hdfs.fileType = DataStream
AgentTweets.sinks.HDFS.hdfs.writeFormat = Text
AgentTweets.sinks.HDFS.hdfs.batchSize = 1000
AgentTweets.sinks.HDFS.hdfs.rollSize = 0
AgentTweets.sinks.HDFS.hdfs.rollCount = 10000

AgentTweets.channels.MemChannel.type = memory
AgentTweets.channels.MemChannel.capacity = 10000
AgentTweets.channels.MemChannel.transactionCapacity = 100
--------------------twitter flume conf end -------------

Note: in the above config please take a note on hdfs.path, it is set to hdfs://namenodeHostnameOrIP:8020/user/flume/tweets/malaysia/%Y/%m/%d/%H/
It is pointing to namenode and port 8020.

11. Take a note of the HDFS location under "AgentTweets.sinks.HDFS.hdfs.path", these location will be created by flume. 
Create the location upto /user/flume/. Make sure that on target host sthdmgt1-pvt.aiu.axiata, OS user "flume" has read write to the HDFS directory  /user/flume/. I
# su - hdfs
$ hadoop fs -mkdir /user/flume
$ hadoop fs -chown flume:flume /user/flume

12. (If you have proxy) Create twitter4j property file for that or handle it from OS. There is no proxy option for Flume.

13. Start/Restart the flume service from cloudera manager

14. monitor the log: 
Cloudera manager->flume->instances->AgentTweets > click "Agent" for host "sthdmgt1-pvt.aiu.axiata"" > click "Log File"

15. If log seems of then check the HDFS location:
$ su - hdfs
$ hadoop fs -ls /user/flume/tweets/malaysia

Note: There will be data under /user/flume/tweets/malaysia////


3 comments:

  1. Thank You very much

    ReplyDelete
  2. Asking questions are truly nice thing if you are not understanding anything fully, except this paragraph provides fastidious understanding even.

    ReplyDelete
  3. If you want to get a good deal from this post then you have to apply such methods to your won web site.

    ReplyDelete