Apache Storm Trident Java Example

posted on Nov 20th, 2016

Apache Storm

Apache Storm is a distributed stream processing computation framework written predominantly in the Clojure programming language. Originally created by Nathan Marz and team at BackType, the project was open sourced after being acquired by Twitter. It uses custom created "spouts" and "bolts" to define information sources and manipulations to allow batch, distributed processing of streaming data. The initial release was on 17 September 2011.

A Storm application is designed as a "topology" in the shape of a directed acyclic graph (DAG) with spouts and bolts acting as the graph vertices. Edges on the graph are named streams and direct data from one node to another. Together, the topology acts as a data transformation pipeline. At a superficial level the general topology structure is similar to a MapReduce job, with the main difference being that data is processed in real time as opposed to in individual batches.

Pre Requirements

1) A machine with Ubuntu 14.04 LTS operating system.

2) Apache ZooKeeper pre installed (How to install ZooKeeper on Ubuntu 14.04)

3) Apache Storm 0.10.0 pre installed (How to install Storm on Ubuntu 14.04)

Storm Trident Java Example

Trident is an extension of Storm. Like Storm, Trident was also developed by Twitter. The main reason behind developing Trident is to provide a high-level abstraction on top of Storm along with stateful stream processing and low latency distributed querying.

Trident uses spout and bolt, but these low-level components are auto-generated by Trident before execution. Trident has functions, filters, joins, grouping, and aggregation.

Add these libraries to your java project build path.

/usr/local/storm/lib/*

FormatCall.java

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		String fromMobileNumber = tuple.getString(0);
		String toMobileNumber = tuple.getString(1);
		collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
	}
}

CSVSplit.java

import backtype.storm.tuple.Values;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
	private static final long serialVersionUID = 1L;

	@Override
	public void execute(TridentTuple tuple, TridentCollector collector) {
		for (String word : tuple.getString(0).split(",")) {
			if (word.length() > 0) {
				collector.emit(new Values(word));
			}
		}
	}
}

LogAnalyserTrident.java

import java.util.*;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.MemoryMapState;
import org.apache.storm.shade.com.google.common.collect.*;

public class LogAnalyserTrident {
	public static void main(String[] args) throws Exception {
		System.out.println("Log Analyser Trident");
		TridentTopology topology = new TridentTopology();
		FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of(
				"fromMobileNumber", "toMobileNumber", "duration"));
		TridentState callCounts = topology
				.newStream("fixed-batch-spout", testSpout)
				.each(new Fields("fromMobileNumber", "toMobileNumber"),
						new FormatCall(), new Fields("call"))
				.groupBy(new Fields("call"))
				.persistentAggregate(new MemoryMapState.Factory(), new Count(),
						new Fields("count"));
		LocalDRPC drpc = new LocalDRPC();
		topology.newDRPCStream("call_count", drpc).stateQuery(callCounts,
				new Fields("args"), new MapGet(), new Fields("count"));
		topology.newDRPCStream("multiple_call_count", drpc)
				.each(new Fields("args"), new CSVSplit(), new Fields("call"))
				.groupBy(new Fields("call"))
				.stateQuery(callCounts, new Fields("call"), new MapGet(),
						new Fields("count"))
				.each(new Fields("call", "count"), new Debug())
				.each(new Fields("count"), new FilterNull())
				.aggregate(new Fields("count"), new Sum(), new Fields("sum"));
		Config conf = new Config();
		LocalCluster cluster = new LocalCluster();
		cluster.submitTopology("trident", conf, topology.build());
		Random randomGenerator = new Random();
		int idx = 0;
		while (idx < 10) {
			testSpout.feed(ImmutableList.of(new Values("1234123401",
					"1234123402", randomGenerator.nextInt(60))));
			testSpout.feed(ImmutableList.of(new Values("1234123401",
					"1234123403", randomGenerator.nextInt(60))));
			testSpout.feed(ImmutableList.of(new Values("1234123401",
					"1234123404", randomGenerator.nextInt(60))));
			testSpout.feed(ImmutableList.of(new Values("1234123402",
					"1234123403", randomGenerator.nextInt(60))));
			idx = idx + 1;
		}
		System.out.println("DRPC : Query starts");
		System.out.println(drpc
				.execute("call_count", "1234123401 - 1234123402"));
		System.out.println(drpc.execute("multiple_call_count",
				"1234123401 - 1234123402,1234123401 - 1234123403"));
		System.out.println("DRPC : Query ends");
		cluster.shutdown();
		drpc.shutdown();
		// DRPCClient client = new DRPCClient("drpc.server.location", 3772);
	}
}

Step 1 - Start ZooKeeper. Open a new terminal (CTRL + ALT + T) and start zookeeper.

$ /usr/local/zookeeper/bin/zkServer.sh start

Step 2 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm

$ cd /usr/local/storm

Step 3 - Start nimbus

$ ./bin/storm nimbus

Step 4 - Open a new terminal (CTRL + ALT + T). Change the directory to /usr/local/storm

$ cd /usr/local/storm

Step 5 - Start supervisor

$ ./bin/storm supervisor

FormatCall.java
CSVSplit.java
LogAnalyerTrident.java

Step 6 - Compile all above programs and execute LogAnalyerTrident.java

$ javac -cp "/usr/local/storm/lib/*" *.java
$ java -cp "/usr/local/storm/lib/*":. LogAnalyerTrident

Output

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

Please share this blog post and follow me for latest updates on

facebook             google+             twitter             feedburner

Previous Post                                                                                          Next Post

Labels : Storm Installation on Ubuntu   Storm Spout and Bolt Java Example   Storm with Twitter Java Example