Apache Storm Spout and Bolt Java Example

posted on Nov 20th, 2016

Apache Storm

Apache Storm is a distributed stream processing computation framework written predominantly in the Clojure programming language. Originally created by Nathan Marz and team at BackType, the project was open sourced after being acquired by Twitter. It uses custom created "spouts" and "bolts" to define information sources and manipulations to allow batch, distributed processing of streaming data. The initial release was on 17 September 2011.

A Storm application is designed as a "topology" in the shape of a directed acyclic graph (DAG) with spouts and bolts acting as the graph vertices. Edges on the graph are named streams and direct data from one node to another. Together, the topology acts as a data transformation pipeline. At a superficial level the general topology structure is similar to a MapReduce job, with the main difference being that data is processed in real time as opposed to in individual batches.

Pre Requirements

1) A machine with Ubuntu 14.04 LTS operating system.

2) Apache ZooKeeper pre installed (How to install ZooKeeper on Ubuntu 14.04)

3) Apache Storm 0.10.0 pre installed (How to install Storm on Ubuntu 14.04)

Storm Spout and Bolt Example

Spout is a component which is used for data generation. Basically, a spout will implement an IRichSpout interface.

Bolt is a component that takes tuples as input, processes the tuple, and produces new tuples as output. Bolts will implement IRichBolt interface.

Add these libraries to your java project build path.

/usr/local/storm/lib/*

FakeCallLogReaderSpout.java

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

// Create a class FakeLogReaderSpout which implement IRichSpout interface to access functionalities
public class FakeCallLogReaderSpout implements IRichSpout {
	private static final long serialVersionUID = 1L;
	// Create instance for SpoutOutputCollector which passes tuples to bolt.
	private SpoutOutputCollector collector;
	private boolean completed = false;

	// Create instance for TopologyContext which contains topology data.
	private TopologyContext context;
	// Create instance for Random class.
	private Random randomGenerator = new Random();
	private Integer idx = 0;

	@Override
	public void open(Map conf, TopologyContext context,
			SpoutOutputCollector collector) {
		this.context = context;
		this.collector = collector;
	}

	@Override
	public void nextTuple() {
		if (this.idx <= 1000) {
			List<String> mobileNumbers = new ArrayList<String>();

			mobileNumbers.add("1234123401");
			mobileNumbers.add("1234123402");
			mobileNumbers.add("1234123403");
			mobileNumbers.add("1234123404");
			Integer localIdx = 0;
			while (localIdx++ < 100 && this.idx++ < 1000) {
				String fromMobileNumber = mobileNumbers.get(randomGenerator
						.nextInt(4));
				String toMobileNumber = mobileNumbers.get(randomGenerator
						.nextInt(4));
				while (fromMobileNumber == toMobileNumber) {
					toMobileNumber = mobileNumbers.get(randomGenerator
							.nextInt(4));
				}
				Integer duration = randomGenerator.nextInt(60);
				this.collector.emit(new Values(fromMobileNumber,
						toMobileNumber, duration));
			}
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("from", "to", "duration"));
	}

	// Override all the interface methods
	@Override
	public void close() {
	}

	public boolean isDistributed() {
		return false;
	}

	@Override
	public void activate() {
	}

	@Override
	public void deactivate() {
	}

	@Override
	public void ack(Object msgId) {
	}

	@Override
	public void fail(Object msgId) {
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}

CallLogCreatorBolt.java

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//Create a class CallLogCreatorBolt which implement IRichBolt interface
public class CallLogCreatorBolt implements IRichBolt {
	private static final long serialVersionUID = 1L;
	// Create instance for OutputCollector which collects and emits tuples to
	// produce output
	private OutputCollector collector;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map conf, TopologyContext context,
			OutputCollector collector) {
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		String from = tuple.getString(0);
		String to = tuple.getString(1);
		Integer duration = tuple.getInteger(2);
		collector.emit(new Values(from + " - " + to, duration));
	}

	@Override
	public void cleanup() {
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("call", "duration"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}

CallLogCounterBolt.java

import java.util.HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class CallLogCounterBolt implements IRichBolt {

	private static final long serialVersionUID = 1L;
	Map<String, Integer> counterMap;
	private OutputCollector collector;

	@Override
	public void prepare(Map conf, TopologyContext context,
			OutputCollector collector) {
		this.counterMap = new HashMap<String, Integer>();
		this.collector = collector;
	}

	@Override
	public void execute(Tuple tuple) {
		String call = tuple.getString(0);
		Integer duration = tuple.getInteger(1);
		if (!counterMap.containsKey(call)) {
			counterMap.put(call, 1);
		} else {
			Integer c = counterMap.get(call) + 1;
			counterMap.put(call, c);
		}
		collector.ack(tuple);
	}

	@Override
	public void cleanup() {
		for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {
			System.out.println(entry.getKey() + " : " + entry.getValue());
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("call"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		return null;
	}
}

LogAnalyserStorm.java

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
//Create main class LogAnalyserStorm submit topology.
public class LogAnalyserStorm {
	public static void main(String[] args) throws Exception {
		// Create Config instance for cluster configuration
		Config config = new Config();
		config.setDebug(true);
		//
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());
		builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())
				.shuffleGrouping("call-log-reader-spout");
		builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())
				.fieldsGrouping("call-log-creator-bolt", new Fields("call"));
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("LogAnalyserStorm", config,
				builder.createTopology());
		Thread.sleep(10000);
		// Stop the topology
		cluster.shutdown();
	}
}

Step 1 - Start ZooKeeper. Open a new terminal (CTRL + ALT + T) and start zookeeper.

$ /usr/local/zookeeper/bin/zkServer.sh start

Step 2 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm

$ cd /usr/local/storm

Step 3 - Start nimbus

$ ./bin/storm nimbus

Step 4 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm

$ cd /usr/local/storm

Step 5 - Start supervisor

$ ./bin/storm supervisor

FakeCallLogReaderSpout.java
CallLogCreaterBolt.java
CallLogCounterBolt.java
LogAnalyerStorm.java

Step 6 - Compile all above programs and execute LogAnalyerStorm.java

$ javac -cp "/usr/local/storm/lib/*" *.java
$ java -cp "/usr/local/storm/lib/*":. LogAnalyserStorm

Output

1234123402 - 1234123401 : 93
1234123404 - 1234123403 : 72
1234123404 - 1234123402 : 83
1234123404 - 1234123401 : 99
1234123402 - 1234123404 : 82
1234123402 - 1234123403 : 86
1234123403 - 1234123404 : 77
1234123401 - 1234123404 : 82
1234123401 - 1234123403 : 78
1234123401 - 1234123402 : 80
1234123403 - 1234123401 : 96
1234123403 - 1234123402 : 72

Please share this blog post and follow me for latest updates on

facebook             google+             twitter             feedburner

Previous Post                                                                                          Next Post

Labels : Storm Installation on Ubuntu   Storm Trident Java Example   Storm with Twitter Java Example