Storm的WordCount操作
1、创建maven工程
2、引入依赖
3、WordCount类
package com.wordcount; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; /** * @author rjsong */ public class WordCount { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { //创建一个TopologyBuilder TopologyBuilder tb = new TopologyBuilder(); tb.setSpout("SpoutBolt", new SpoutBolt(), 2); tb.setBolt("SplitBolt", new SplitBolt(), 2).shuffleGrouping("SpoutBolt"); tb.setBolt("CountBolt", new CountBolt(), 4).fieldsGrouping("SplitBolt", new Fields("word")); //创建配置 Config conf = new Config(); //设置worker数量 conf.setNumWorkers(2); //提交任务 //集群提交 // StormSubmitter.submitTopology("myWordcount", conf, tb.createTopology()); //本地提交 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("myWordcount", conf, tb.createTopology()); } }
4、SpoutBolt类
package com.wordcount; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.Map; /** * @author rjsong */ public class SpoutBolt extends BaseRichSpout { SpoutOutputCollector collector; /** * 初始化方法 */ public void open(Map map, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } /** * 重复调用方法 */ public void nextTuple() { collector.emit(new Values("hello world this is a test")); } /** * 输出 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("test")); } }
5、SplitBolt类
package com.wordcount; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /** * @author rjsong */ public class SplitBolt extends BaseRichBolt { OutputCollector collector; /** * 初始化 */ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } /**x * 执行方法 */ public void execute(Tuple input) { String line = input.getString(0); String[] split = line.split(" "); for (String word : split) { collector.emit(new Values(word)); } } /** * 输出 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
6、CountBolt类
package com.wordcount; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; /** * @author rjsong */ public class CountBolt extends BaseRichBolt { OutputCollector collector; Map<String, Integer> map = new HashMap<String, Integer>(); /** * 初始化 */ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } /** * 执行方法 */ public void execute(Tuple input) { String word = input.getString(0); if(map.containsKey(word)){ Integer c = map.get(word); map.put(word, c+1); }else{ map.put(word, 1); } //测试输出 System.out.println("结果:"+map); } /** * 输出 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { } }