CUHK-IEMS5730-HW3

CUHK-IEMS5730-HW3

Question 1: Kafka Integration with Strom

(1) Multi-node Kafka Cluster Setup

Screen-shots shows that all process runs well.
CUHK-IEMS5730-HW3
Here is the description of topic: test and it contains console-producer demo:
CUHK-IEMS5730-HW3
Here is the console-consumer demo:
CUHK-IEMS5730-HW3

(b) Find the Most Frequently Used Words

Here is the top-20 most frequently used words:

[email protected]:~/hw3$ head -20 wordcount_result.txt
the     80777
and     52754
of      47008
to      25478
I       23545
in      19698
that    18190
And     18141
a       17507
his     13299
he      13133
is      12715
my      12306
shall   12102
for     11443
be      11284
with    11121
not     11110
unto    9212
they    8353

Here is the source code of Kafka Producer that reads input file to Kafka pipeline:

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaFileProducer extends Thread {

    private static final String topicName
            = "wordcount";
    public static final String fileName = "/home/tian/hw2-src/StormData.txt";

    private final KafkaProducer<String, String> producer;
    private final Boolean isAsync;

    public KafkaFileProducer(String topic, Boolean isAsync) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "master:9092");
        props.put("acks","1");
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<String, String>(props);
        this.isAsync = isAsync;
    }

    public void sendMessage(String key, String value) {
//        long startTime = System.currentTimeMillis();
//        if (isAsync) { // Send asynchronously
//
//        } else { // Send synchronously
        try {
            producer.send(
                    new ProducerRecord<String, String>(topicName,value))
                    .get();
            //System.out.println("Sent message: (" + key + ", " + value + ")");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

    public static void main(String [] args){
        KafkaFileProducer producer = new KafkaFileProducer(topicName, false);
        int lineCount = 0;
        FileInputStream fis;
        BufferedReader br = null;
        try {
            fis = new FileInputStream(fileName);
            //Construct BufferedReader from InputStreamReader
            br = new BufferedReader(new InputStreamReader(fis));

            String line = null;
            while ((line = br.readLine()) != null) {
                lineCount++;
                producer.sendMessage(lineCount+"", line);
            }
            producer.producer.close();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally{
            try {
                br.close();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }
}

Here is the source code of KafkaWordCount.java:

package kafka;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.*;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.*;

public class KafkaWordCount {

    public static class SplitSentenceBolt extends BaseRichBolt {

        private OutputCollector collector;

        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }

        @Override
        public void execute(Tuple tuple) {
            String line = tuple.getStringByField("str");
            // split with space
            String[] words = line.split(" ");
            for (String word : words) {
                if (!word.isEmpty()) {
                    this.collector.emit(tuple,new Values(word));
                    counter(CounterType.EMIT);
                }
            }
            this.collector.ack(tuple);
            counter(CounterType.ACK);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("word"));
        }
    }

    public static class WordCountBolt extends BaseRichBolt {

        private OutputCollector collector;
        private LinkedHashMap<String, Integer> counterMap;
        private int NumOfThe = 10;

        @Override
        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
            this.counterMap = new LinkedHashMap<String, Integer>();
        }

        @Override
        public void execute(Tuple tuple) {
            String word = tuple.getStringByField("word");
            if (word.equals("the") && NumOfThe > 0){
                NumOfThe--;
                System.out.println("fail word \"the\" "+ NumOfThe + " time.");
                this.collector.fail(tuple);
            }else{
                if (counterMap.containsKey(word)) {
                    counterMap.put(word, counterMap.get(word) + 1);
                } else {
                    counterMap.put(word, 1);
                }
                this.collector.ack(tuple);
                counter(CounterType.ACK);
            }

        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {}

        @Override
        public void cleanup() {
            // dump counters into the console
            dumpCounters();

            System.out.println("cleanup, sortByValue counterMap start");
            // sort and save result into local file
            Utils.sortByValue(counterMap, new Comparator<Integer>() {
                @Override
                public int compare(Integer o1, Integer o2) {
                    return o2.compareTo(o1);
                }
            });

            System.out.println("cleanup, start to save counterMap into file");
            FileWriter fw = null;
            BufferedWriter writer = null;
            try {
                fw = new FileWriter(RESULT_PATH);
                writer = new BufferedWriter(fw);
                for (Map.Entry<String, Integer> entry : counterMap.entrySet()) {
                    writer.write(entry.getKey() + "\t" + String.valueOf(entry.getValue()));
                    writer.newLine();
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (writer != null) {
                        writer.close();
                    }
                    if (fw != null) {
                        fw.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("cleanup, end save counterMap into file");
        }
    }

    // Counter Code START
    // !!! LOCAL USED ONLY
    public static int emit_counter = 0;
    public static int ack_counter = 0;

    public static enum CounterType {EMIT, ACK};

    public static synchronized void counter(KafkaWordCount.CounterType type) {
        if (type == KafkaWordCount.CounterType.EMIT) {
            emit_counter += 1;
        } else if (type == KafkaWordCount.CounterType.ACK) {
            ack_counter += 1;
        }
    }

    public static void dumpCounters() {
        System.out.println("--------DUMP COUNTERS START--------");
        System.out.println("The number of tuple emitted:" + emit_counter);
        System.out.println("The number of tuple acked:" + ack_counter);
        System.out.println("The number of tuple failed:" + (emit_counter - ack_counter));
        System.out.println("--------DUMP COUNTERS END--------");
    }
    // Counter Code END

    // these two path need to be replaced to local path if debug in local
    public static String DATA_PATH = "/home/tian/hw2-src/StormData.txt";
    public static final String RESULT_PATH = "/home/tian/hw3/wordcount_result.txt";

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        String zkConnString = "master:2181,slave-1:2181,slave-2:2181";
        String topicName = "wordcount";

        BrokerHosts hosts = new ZkHosts(zkConnString);
        SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName , "/" + topicName, UUID.randomUUID().toString());
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

        // how to use KafkaSpout?
        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
        builder.setSpout("spout", kafkaSpout, 1);
        builder.setBolt("split", new KafkaWordCount.SplitSentenceBolt(), 3).shuffleGrouping("spout");
        builder.setBolt("count", new KafkaWordCount.WordCountBolt(), 3).globalGrouping("split");

        Config conf = new Config();
        conf.setDebug(false);
        conf.setMaxSpoutPending(1024);
        try {
            if (args != null && args.length > 0) {
                conf.setNumWorkers(4);
                StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
            } else {
                conf.setMaxTaskParallelism(3);
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("word-count", conf, builder.createTopology());

                // local debug cluster only process 20min
                // make sure the time is long enough until the debug user stop manually
                Thread.sleep(60 * 1000);

                cluster.shutdown();
            }
        } catch (Exception e) {
            System.out.println("submit failed with error:" + e.toString());
        }
    }

}

class Utils {
    public static <K, V> void sortByValue(
            LinkedHashMap<K, V> m, final Comparator<? super V> c) {
        List<Map.Entry<K, V>> entries = new ArrayList<Map.Entry<K, V>>(m.entrySet());

        Collections.sort(entries, new Comparator<Map.Entry<K, V>>() {
            @Override
            public int compare(Map.Entry<K, V> lhs, Map.Entry<K, V> rhs) {
                return c.compare(lhs.getValue(), rhs.getValue());
            }
        });

        m.clear();
        for (Map.Entry<K, V> e : entries) {
            m.put(e.getKey(), e.getValue());
        }
    }
}

Here is the pom.xml to indicate that which dependencies I use:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
    <groupId>test</groupId>
    <artifactId>KafkaWordCount</artifactId>
    <version>1.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <!-- see comment below... This fixes an annoyance with intellij -->
        <provided.scope>provided</provided.scope>
        <hbase.version>0.98.4-hadoop2</hbase.version>
    </properties>

    <profiles>
        <profile>
            <id>intellij</id>
            <properties>
                <provided.scope>compile</provided.scope>
            </properties>
        </profile>
    </profiles>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.0.6</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka-client</artifactId>
            <version>1.0.6</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.8.2</artifactId>
            <version>0.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId><!--重要-->
                    <artifactId>zookeeper</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-kafka</artifactId>
            <version>1.1.1</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

© Scaling the topology

This is the screen shot of storm ui when testing with 3 kafka spout, 3 split bolt and 3 count bolt.
CUHK-IEMS5730-HW3
This is the screen shot of storm ui when testing with 1 kafka spout, 3 split bolt and 3 count bolt:
CUHK-IEMS5730-HW3
This is the screen shot of storm ui when testing with 1 kafka spout, 1 split bolt and 3 count bolt:
CUHK-IEMS5730-HW3
This is the screen shot of storm ui when testing with 3 kafka spout, 10 split bolt and 10 count bolt:
CUHK-IEMS5730-HW3
I dont know how to check the exact time of each process. So, can TA give us some tutorial on these homeworks?

Question 2: PageRank

empty…