CUHK-IEMS5730-HW3
CUHK-IEMS5730-HW3
Question 1: Kafka Integration with Strom
(1) Multi-node Kafka Cluster Setup
Screen-shots shows that all process runs well.
Here is the description of topic: test
and it contains console-producer
demo:
Here is the console-consumer
demo:
(b) Find the Most Frequently Used Words
Here is the top-20 most frequently used words:
[email protected]:~/hw3$ head -20 wordcount_result.txt
the 80777
and 52754
of 47008
to 25478
I 23545
in 19698
that 18190
And 18141
a 17507
his 13299
he 13133
is 12715
my 12306
shall 12102
for 11443
be 11284
with 11121
not 11110
unto 9212
they 8353
Here is the source code of Kafka Producer that reads input file to Kafka pipeline:
package kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaFileProducer extends Thread {
private static final String topicName
= "wordcount";
public static final String fileName = "/home/tian/hw2-src/StormData.txt";
private final KafkaProducer<String, String> producer;
private final Boolean isAsync;
public KafkaFileProducer(String topic, Boolean isAsync) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092");
props.put("acks","1");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(props);
this.isAsync = isAsync;
}
public void sendMessage(String key, String value) {
// long startTime = System.currentTimeMillis();
// if (isAsync) { // Send asynchronously
//
// } else { // Send synchronously
try {
producer.send(
new ProducerRecord<String, String>(topicName,value))
.get();
//System.out.println("Sent message: (" + key + ", " + value + ")");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
public static void main(String [] args){
KafkaFileProducer producer = new KafkaFileProducer(topicName, false);
int lineCount = 0;
FileInputStream fis;
BufferedReader br = null;
try {
fis = new FileInputStream(fileName);
//Construct BufferedReader from InputStreamReader
br = new BufferedReader(new InputStreamReader(fis));
String line = null;
while ((line = br.readLine()) != null) {
lineCount++;
producer.sendMessage(lineCount+"", line);
}
producer.producer.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
try {
br.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
Here is the source code of KafkaWordCount.java
:
package kafka;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.*;
public class KafkaWordCount {
public static class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
String line = tuple.getStringByField("str");
// split with space
String[] words = line.split(" ");
for (String word : words) {
if (!word.isEmpty()) {
this.collector.emit(tuple,new Values(word));
counter(CounterType.EMIT);
}
}
this.collector.ack(tuple);
counter(CounterType.ACK);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
}
public static class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private LinkedHashMap<String, Integer> counterMap;
private int NumOfThe = 10;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counterMap = new LinkedHashMap<String, Integer>();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
if (word.equals("the") && NumOfThe > 0){
NumOfThe--;
System.out.println("fail word \"the\" "+ NumOfThe + " time.");
this.collector.fail(tuple);
}else{
if (counterMap.containsKey(word)) {
counterMap.put(word, counterMap.get(word) + 1);
} else {
counterMap.put(word, 1);
}
this.collector.ack(tuple);
counter(CounterType.ACK);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}
@Override
public void cleanup() {
// dump counters into the console
dumpCounters();
System.out.println("cleanup, sortByValue counterMap start");
// sort and save result into local file
Utils.sortByValue(counterMap, new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
return o2.compareTo(o1);
}
});
System.out.println("cleanup, start to save counterMap into file");
FileWriter fw = null;
BufferedWriter writer = null;
try {
fw = new FileWriter(RESULT_PATH);
writer = new BufferedWriter(fw);
for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {
writer.write(entry.getKey() + "\t" + String.valueOf(entry.getValue()));
writer.newLine();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (writer != null) {
writer.close();
}
if (fw != null) {
fw.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("cleanup, end save counterMap into file");
}
}
// Counter Code START
// !!! LOCAL USED ONLY
public static int emit_counter = 0;
public static int ack_counter = 0;
public static enum CounterType {EMIT, ACK};
public static synchronized void counter(KafkaWordCount.CounterType type) {
if (type == KafkaWordCount.CounterType.EMIT) {
emit_counter += 1;
} else if (type == KafkaWordCount.CounterType.ACK) {
ack_counter += 1;
}
}
public static void dumpCounters() {
System.out.println("--------DUMP COUNTERS START--------");
System.out.println("The number of tuple emitted:" + emit_counter);
System.out.println("The number of tuple acked:" + ack_counter);
System.out.println("The number of tuple failed:" + (emit_counter - ack_counter));
System.out.println("--------DUMP COUNTERS END--------");
}
// Counter Code END
// these two path need to be replaced to local path if debug in local
public static String DATA_PATH = "/home/tian/hw2-src/StormData.txt";
public static final String RESULT_PATH = "/home/tian/hw3/wordcount_result.txt";
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
String zkConnString = "master:2181,slave-1:2181,slave-2:2181";
String topicName = "wordcount";
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName , "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
// how to use KafkaSpout?
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("spout", kafkaSpout, 1);
builder.setBolt("split", new KafkaWordCount.SplitSentenceBolt(), 3).shuffleGrouping("spout");
builder.setBolt("count", new KafkaWordCount.WordCountBolt(), 3).globalGrouping("split");
Config conf = new Config();
conf.setDebug(false);
conf.setMaxSpoutPending(1024);
try {
if (args != null && args.length > 0) {
conf.setNumWorkers(4);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
// local debug cluster only process 20min
// make sure the time is long enough until the debug user stop manually
Thread.sleep(60 * 1000);
cluster.shutdown();
}
} catch (Exception e) {
System.out.println("submit failed with error:" + e.toString());
}
}
}
class Utils {
public static <K, V> void sortByValue(
LinkedHashMap<K, V> m, final Comparator<? super V> c) {
List<Map.Entry<K, V>> entries = new ArrayList<Map.Entry<K, V>>(m.entrySet());
Collections.sort(entries, new Comparator<Map.Entry<K, V>>() {
@Override
public int compare(Map.Entry<K, V> lhs, Map.Entry<K, V> rhs) {
return c.compare(lhs.getValue(), rhs.getValue());
}
});
m.clear();
for (Map.Entry<K, V> e : entries) {
m.put(e.getKey(), e.getValue());
}
}
}
Here is the pom.xml
to indicate that which dependencies I use:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>test</groupId>
<artifactId>KafkaWordCount</artifactId>
<version>1.0</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- see comment below... This fixes an annoyance with intellij -->
<provided.scope>provided</provided.scope>
<hbase.version>0.98.4-hadoop2</hbase.version>
</properties>
<profiles>
<profile>
<id>intellij</id>
<properties>
<provided.scope>compile</provided.scope>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>1.0.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.8.2</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId><!--重要-->
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
© Scaling the topology
This is the screen shot of storm ui
when testing with 3 kafka spout
, 3 split bolt
and 3 count bolt
.
This is the screen shot of storm ui
when testing with 1 kafka spout
, 3 split bolt
and 3 count bolt
:
This is the screen shot of storm ui
when testing with 1 kafka spout
, 1 split bolt
and 3 count bolt
:
This is the screen shot of storm ui
when testing with 3 kafka spout
, 10 split bolt
and 10 count bolt
:
I dont know how to check the exact time of each process. So, can TA give us some tutorial on these homeworks?