Apache Storm With Twitter Java Example

posted on Nov 20th, 2016

Apache Storm

Apache Storm is a distributed stream processing computation framework written predominantly in the Clojure programming language. Originally created by Nathan Marz and team at BackType, the project was open sourced after being acquired by Twitter. It uses custom created "spouts" and "bolts" to define information sources and manipulations to allow batch, distributed processing of streaming data. The initial release was on 17 September 2011.

A Storm application is designed as a "topology" in the shape of a directed acyclic graph (DAG) with spouts and bolts acting as the graph vertices. Edges on the graph are named streams and direct data from one node to another. Together, the topology acts as a data transformation pipeline. At a superficial level the general topology structure is similar to a MapReduce job, with the main difference being that data is processed in real time as opposed to in individual batches.

Pre Requirements

1) A machine with Ubuntu 14.04 LTS operating system.

2) Apache ZooKeeper pre installed (How to install ZooKeeper on Ubuntu 14.04)

3) Apache Storm 0.10.0 pre installed (How to install Storm on Ubuntu 14.04)

4) Twitter4j library files (Download)

Storm with Twitter Java Example

Twitter is an online social networking service that provides a platform to send and receive user tweets. Registered users can read and post tweets, but unregistered users can only read tweets. Hashtag is used to categorize tweets by keyword by appending # before the relevant keyword. Now let us take a real-time scenario of finding the most used hashtag per topic.

Add these libraries to your java project build path.

/usr/local/storm/lib/*

twitter4j-async-4.0.4.jar
twitter4j-core-4.0.4.jar
twitter4j-media-support-4.0.4.jar
twitter4j-stream-4.0.4.jar

TwitterSampleSpout.java

import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;

import twitter4j.FilterQuery;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.conf.ConfigurationBuilder;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public class TwitterSampleSpout extends BaseRichSpout {
	SpoutOutputCollector _collector;
	LinkedBlockingQueue<Status> queue = null;
	TwitterStream _twitterStream;
	String consumerKey;
	String consumerSecret;
	String accessToken;
	String accessTokenSecret;
	String[] keyWords;

	public TwitterSampleSpout(String consumerKey, String consumerSecret,
			String accessToken, String accessTokenSecret, String[] keyWords) {
		this.consumerKey = consumerKey;
		this.consumerSecret = consumerSecret;
		this.accessToken = accessToken;
		this.accessTokenSecret = accessTokenSecret;
		this.keyWords = keyWords;
	}

	public TwitterSampleSpout() {
		// TODO Auto-generated constructor stub
	}

	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		queue = new LinkedBlockingQueue<Status>(1000);
		_collector = collector;
		StatusListener listener = new StatusListener() {
			@Override
			public void onStatus(Status status) {
				queue.offer(status);
			}

			@Override
			public void onDeletionNotice(StatusDeletionNotice sdn) {
			}

			@Override
			public void onTrackLimitationNotice(int i) {
			}

			@Override
			public void onScrubGeo(long l, long l1) {
			}

			@Override
			public void onException(Exception ex) {
			}

			@Override
			public void onStallWarning(StallWarning arg0) {
				// TODO Auto-generated method stub
			}
		};
		ConfigurationBuilder cb = new ConfigurationBuilder();
		cb.setDebugEnabled(true).setOAuthConsumerKey(consumerKey)
				.setOAuthConsumerSecret(consumerSecret)
				.setOAuthAccessToken(accessToken)
				.setOAuthAccessTokenSecret(accessTokenSecret);
		_twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
		_twitterStream.addListener(listener);
		if (keyWords.length == 0) {
			_twitterStream.sample();
		} else {
			FilterQuery query = new FilterQuery().track(keyWords);
			_twitterStream.filter(query);
		}
	}

	@Override
	public void nextTuple() {
		Status ret = queue.poll();
		if (ret == null) {
			Utils.sleep(50);
		} else {
			_collector.emit(new Values(ret));
		}
	}

	@Override
	public void close() {
		_twitterStream.shutdown();
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		Config ret = new Config();
		ret.setMaxTaskParallelism(1);
		return ret;
	}

	@Override
	public void ack(Object id) {
	}

	@Override
	public void fail(Object id) {
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("tweet"));
	}
}

HashtagReaderBolt.java

import java.util.Map;

import twitter4j.HashtagEntity;
import twitter4j.Status;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class HashtagReaderBolt implements IRichBolt {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;
	private OutputCollector collector;

	@Override
	public void prepare(Map conf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		Status tweet = (Status) tuple.getValueByField("tweet");
		for (HashtagEntity hashtage : tweet.getHashtagEntities()) {
			System.out.println("Hashtag: " + hashtage.getText());
			this.collector.emit(new Values(hashtage.getText()));
		}
	}

	@Override
	public void cleanup() {
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("hashtag"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}

HashtagCounterBolt.java

import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class HashtagCounterBolt implements IRichBolt {
	Map<String, Integer> counterMap;
	private OutputCollector collector;

	@Override
	public void prepare(Map conf, TopologyContext context,
			OutputCollector collector) {
		this.counterMap = new HashMap<String, Integer>();
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		String key = tuple.getString(0);
		if (!counterMap.containsKey(key)) {
			counterMap.put(key, 1);
		} else {
			Integer c = counterMap.get(key) + 1;
			counterMap.put(key, c);
		}
		collector.ack(tuple);
	}

	@Override
	public void cleanup() {
		for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {
			System.out.println("Result: " + entry.getKey() + " : "
					+ entry.getValue());
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("hashtag"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}

TwitterHashtagStorm.java

import java.util.Arrays;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class TwitterHashtagStorm {
	
	public static void main(String[] args) throws Exception {
		String consumerKey = "bVd3fwceBGCvjghPqjVF6A2jW";
		String consumerSecret = "86EPCj7ByjPpPTx4vNN1nTYqOsdjN0v7ZsainjEgjGY6KzwjFV";
		String accessToken = "******************-0NpAbHQt1WW2NM5njFieh6xVA0BwedG";
		String accessTokenSecret = "lUcbFDxu08lRE6uIISHE9fgAsEdZXKCh6MTpJqbplYUXy";
		String[] keyWords = {"tweet","hello","hadoop"};
		Config config = new Config();
		config.setDebug(true);
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("twitter-spout", new TwitterSampleSpout(consumerKey,
				consumerSecret, accessToken, accessTokenSecret, keyWords));
		builder.setBolt("twitter-hashtag-reader-bolt", new HashtagReaderBolt())
				.shuffleGrouping("twitter-spout");
		builder.setBolt("twitter-hashtag-counter-bolt",
				new HashtagCounterBolt()).fieldsGrouping(
				"twitter-hashtag-reader-bolt", new Fields("hashtag"));
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("TwitterHashtagStorm", config,
				builder.createTopology());
		Thread.sleep(10000);
		cluster.shutdown();
	}
}

Step 1 - Change consumer key, consumer secret, access token and access token secret with your twitter credentials in TwitterHashtagStorm.java program. If you use below credentials it doesn't work.

Step 2 - Change keyowrds.

String consumerKey = "bVd3fwceBGCvjghPqjVF6A2jW";
String consumerSecret = "86EPCj7ByjPpPTx4vNN1nTYqOsdjN0v7ZsainjEgjGY6KzwjFV";
String accessToken = "******************-0NpAbHQt1WW2NM5njFieh6xVA0BwedG";
String accessTokenSecret = "lUcbFDxu08lRE6uIISHE9fgAsEdZXKCh6MTpJqbplYUXy";
String[] keyWords = {"tweet","hello","hadoop"};

Step 3 - Start ZooKeeper. Open a new terminal (CTRL + ALT + T) and start zookeeper.

$ /usr/local/zookeeper/bin/zkServer.sh start

Step 4 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm

$ cd /usr/local/storm

Step 5 - Start nimbus

$ ./bin/storm nimbus

Step 6 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm

$ cd /usr/local/storm

Step 7 - Start supervisor

$ ./bin/storm supervisor

TwitterSampleSpout.java
HashtagReaderBolt.java
HashtagCounterBolt.java
TwitterHashtagStorm.java

Step 8 - Compile all above programs and execute TwitterHashtagStorm.java

$ javac -cp "/usr/local/storm/lib/*":"/twitter4j-4.0.4/lib/*" *.java

$ java -cp "/usr/local/storm/lib/*":"/twitter4j-4.0.4/lib/*":. TwitterHashtagStorm 

Output

Result: ROBLOX : 2
Result: restaurantsneednotapply : 1
Result: ARIASONEDIRECTION : 1
Result: TuitUtil : 1
Result: EVOLution : 1
Result: YouAreMySundayAtLFF : 1

NOTE

For you the output may be different. It depends on the hashtag keywords that you have given in the program.

Make sure you have internet connection before executing.

Please share this blog post and follow me for latest updates on

facebook             google+             twitter             feedburner

Previous Post                                                                                          Next Post

Labels : Storm Installation on Ubuntu   Storm Spout and Bolt Java Example   Storm Trident Java Example