Monday, January 19, 2015

Configure Flume to Collect Twitter Streams Using Cloudera Manager


Below are the steps I followed to collect twitter streams to HDFS with Flume:


1. Before you start, please make sure you have below 4 items generated for twitter streaming API. For more info please check twitter streaming API documentation. 
consumerKey
consumerSecret
accessToken
accessTokenSecret

2. From Cloudera Manager Stop current running Flume service.

3. Create a dedicated flume agent role group[named: "AgentTweets"] under "Flume" service:
- Cloudera manager-> flume-> configuration-> Role Groups > "Create new group..." > provide the "AgentTweets"

4. Assign as host for role group AgentTweets:
- Cloudera manager->flume->configuration-> Role Groups > "Create new group..." > select "Agent Default Group" > check "agent (sthdmgt1-pvt)"  > "Action for Selected" > Move to Different Role Group .. > AgentTweets > Move

5. Check for Flume plugins directory:
Cloudera manager->flume->configuration->AgentTweets > "Plugin directories"

In our case plugin directories are :
/usr/lib/flume-ng/plugins.d
/var/lib/flume-ng/plugins.d

*** Note: As we are using host "sthdmgt1-pvt" for agent role AgentTweets, we are considering these and rest of the changes on host "sthdmgt1-pvt" only.

6. create the plugin directories as those do not exists:
mkdir -p /usr/lib/flume-ng/plugins.d
mkdir -p /var/lib/flume-ng/plugins.d

-- Also create the location for the twitter plugin:
mkdir -p /usr/lib/flume-ng/plugins.d/twitter-streaming/lib/
mkdir -p /var/lib/flume-ng/plugins.d/twitter-streaming/lib/

chown -R flume:flume /usr/lib/flume-ng/
chown -R flume:flume /var/lib/flume-ng/

7. download Download the custom Flume Source and copy it to both of the flume plugin locations
cd /tmp 
wget "http://files.cloudera.com/samples/flume-sources-1.0-SNAPSHOT.jar"
cp flume-sources-1.0-SNAPSHOT.jar /usr/lib/flume-ng/plugins.d/twitter-streaming/lib/
cp flume-sources-1.0-SNAPSHOT.jar /var/lib/flume-ng/plugins.d/twitter-streaming/lib/

8. If we use twitter4j-* jars with version 3 or above, we may end up to below error in flume agent log:

Unable to start EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} } - Exception follows.
java.lang.NoSuchMethodError: twitter4j.FilterQuery.setIncludeEntities(Z)Ltwitter4j/FilterQuery;
at com.cloudera.flume.source.TwitterSource.start(TwitterSource.java:139)

It is due to both twitter4j-stream-3.0.3.jar & flume-sources-1.0-SNAPSHOT.jar having same class "TwitterSource", To resolve above error do below steps:

8.1 Go to cloudera default location for flume library, Take a backup and remove all the twitter4j-*-3.0.3.jar

[root@sthdmgt1-pvt ~]# ll /opt/cloudera/parcels/CDH/lib/flume-ng/lib/|grep twitter4j
lrwxrwxrwx 1 root root 38 Nov 18 11:09 twitter4j-core-3.0.3.jar -> ../../../jars/twitter4j-core-3.0.3.jar
lrwxrwxrwx 1 root root 47 Nov 18 11:09 twitter4j-media-support-3.0.3.jar -> ../../../jars/twitter4j-media-support-3.0.3.jar
lrwxrwxrwx 1 root root 40 Nov 18 11:09 twitter4j-stream-3.0.3.jar -> ../../../jars/twitter4j-stream-3.0.3.jar
[root@sthdmgt1-pvt ~]#

*** Note: Please make sure to take a note of above links for future reference.
8.2 In this case, just need to remove the links as below:
# rm -rf /opt/cloudera/parcels/CDH/lib/flume-ng/lib/twitter4j-*-3.0.3.jar

8.3 download older version of twitter4j and copy to both of the flume plugin locations. We are choosing latest before 3.x version, that is 2.6:
# cd /tmp

# wget http://twitter4j.org/maven2/org/twitter4j/twitter4j-stream/2.2.6/twitter4j-stream-2.2.6.jar
# wget http://twitter4j.org/maven2/org/twitter4j/twitter4j-core/2.2.6/twitter4j-core-2.2.6.jar
# wget http://twitter4j.org/maven2/org/twitter4j/twitter4j-media-support/2.2.6/twitter4j-media-support-2.2.6.jar

# cp twitter4j-*.jar /var/lib/flume-ng/plugins.d/twitter-streaming/lib/
# cp twitter4j-*.jar /usr/lib/flume-ng/plugins.d/twitter-streaming/lib/

# chown -R flume:flume /usr/lib/flume-ng/
# chown -R flume:flume /var/lib/flume-ng/

Ref: http://stackoverflow.com/questions/19189979/cannot-run-flume-because-of-jar-conflict

9. *** Note: make sure system time zone and time is sync with your twitter settings to avoid 401 error.

10. Configure the flume agent role group:
10.1 Set the agent name:
Cloudera manager->flume->configuration->AgentTweets > Agent Name > set the agent name to "AgentTweets" [make sure agent name here and in config in the next steps are same]
10.2 Set flume configuration, make sure all the config prefoxed with the agent name set in setp 10.1
Cloudera manager->flume->configuration->AgentTweets > Agent Name > copy  below config replace entire content in "Configuration File":
--------------------twitter flume conf start -------------
AgentTweets.sources = Twitter
AgentTweets.channels = MemChannel
AgentTweets.sinks = HDFS

AgentTweets.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
AgentTweets.sources.Twitter.channels = MemChannel
AgentTweets.sources.Twitter.consumerKey = consumerKey from step 1
AgentTweets.sources.Twitter.consumerSecret = consumerSecret from step 1
AgentTweets.sources.Twitter.accessToken = accessToken from step 1
AgentTweets.sources.Twitter.accessTokenSecret = accessTokenSecret from step 1
AgentTweets.sources.Twitter.keywords = malaysia, msia

AgentTweets.sinks.HDFS.channel = MemChannel
AgentTweets.sinks.HDFS.type = hdfs
AgentTweets.sinks.HDFS.hdfs.path = hdfs://namenodeHostnameOrIP:8020/user/flume/tweets/malaysia/%Y/%m/%d/%H/
AgentTweets.sinks.HDFS.hdfs.fileType = DataStream
AgentTweets.sinks.HDFS.hdfs.writeFormat = Text
AgentTweets.sinks.HDFS.hdfs.batchSize = 1000
AgentTweets.sinks.HDFS.hdfs.rollSize = 0
AgentTweets.sinks.HDFS.hdfs.rollCount = 10000

AgentTweets.channels.MemChannel.type = memory
AgentTweets.channels.MemChannel.capacity = 10000
AgentTweets.channels.MemChannel.transactionCapacity = 100
--------------------twitter flume conf end -------------

Note: in the above config please take a note on hdfs.path, it is set to hdfs://namenodeHostnameOrIP:8020/user/flume/tweets/malaysia/%Y/%m/%d/%H/
It is pointing to namenode and port 8020.

11. Take a note of the HDFS location under "AgentTweets.sinks.HDFS.hdfs.path", these location will be created by flume. 
Create the location upto /user/flume/. Make sure that on target host sthdmgt1-pvt.aiu.axiata, OS user "flume" has read write to the HDFS directory  /user/flume/. I
# su - hdfs
$ hadoop fs -mkdir /user/flume
$ hadoop fs -chown flume:flume /user/flume

12. (If you have proxy) Create twitter4j property file for that or handle it from OS. There is no proxy option for Flume.

13. Start/Restart the flume service from cloudera manager

14. monitor the log: 
Cloudera manager->flume->instances->AgentTweets > click "Agent" for host "sthdmgt1-pvt.aiu.axiata"" > click "Log File"

15. If log seems of then check the HDFS location:
$ su - hdfs
$ hadoop fs -ls /user/flume/tweets/malaysia

Note: There will be data under /user/flume/tweets/malaysia////


No comments:

Post a Comment