storm编程-项目实战篇-wordcount程序

部分转载:

(1)Open()

是初始化方法

(2)close()

在该spout关闭前执行,但是并不能得到保证其一定被执行,kill -9时不执行,Storm kill {topoName} 时执行

(3)activate()

当Spout已经从失效模式中**时被调用。该Spout的nextTuple()方法很快就会被调用。

(4)deactivate ()

当Spout已经失效时被调用。在Spout失效期间,nextTuple不会被调用。Spout将来可能会也可能不会被重新**。

(5)nextTuple()

当调用nextTuple()方法时,Storm要求Spout发射元组到输出收集器(OutputCollecctor)。NextTuple方法应该是非阻塞的,所以,如果Spout没有元组可以发射,该方法应该返回。nextTuple()、ack()和fail()方法都在Spout任务的单一线程内紧密循环被调用。当没有元组可以发射时,可以让nextTuple去sleep很短的时间,例如1毫秒,这样就不会浪费太多的CPU资源。

(6)ack()

成功处理tuple回调方法

(7)fail()

处理失败tuple回调方法

原则:通常情况下(Shell和事务型的除外),实现一个Spout,可以直接实现接口IRichSpout,如果不想写多余的代码,可以直接继承BaseRichSpout

bolt的最顶层抽象是IBolt接口

(1)prepare()

prepare ()方法在集群的工作进程内被初始化时被调用,提供了Bolt执行所需要的环境。

(2)execute()

接受一个tuple进行处理,也可emit数据到下一级组件。

(3)cleanup()

Cleanup方法当一个IBolt即将关闭时被调用。不能保证cleanup()方法一定会被调用,因为Supervisor可以对集群的工作进程使用kill -9命令强制杀死进程命令。

如果在本地模式下运行Storm,当拓扑被杀死的时候,可以保证cleanup()方法一定会被调用。

实现一个Bolt,可以实现IRichBolt接口或继承BaseRichBolt,如果不想自己处理结果反馈,可以实现 IBasicBolt接口或继承BaseBasicBolt,它实际上相当于自动做了prepare方法和collector.emit.ack(inputTuple)。

storm编程-项目实战篇-wordcount程序

stormAPI中分为Component,spout,bolt,tail。topology主要是由spout和bolt组成:

首先创建一个spout类用来读取数据源,并向下一个bolt 发射tuple:

package cn.com.sjzxy.edu.wordcount;

import java.util.Map;

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class WordCountSpout extends BaseRichSpout {
    private static final long serialVersionUID = 1L;
    private SpoutOutputCollector collector=null;
    

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declare) {
        // TODO Auto-generated method stub
        declare.declare(new Fields("love"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    @Override
    public void nextTuple() {
        // TODO Auto-generated method stub
        collector.emit(new Values("come baby are you my cute baby Oh you hive no number in heart ?"));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    @Override
    public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector arg2) {
        // TODO Auto-generated method stub
        this.collector=arg2;
    }

}
创建一个WordCountSplitBolt 类将从spout发送过来的单行文本记录(句子)切分成单词

package cn.com.sjzxy.edu.wordcount;

import java.util.Map;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

public class WordCountSplitBolt extends BaseRichBolt{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    
    private OutputCollector collector=null;
    @Override
    public void execute(Tuple arg0) {
        // TODO Auto-generated method stub
        String words=arg0.getString(0);
        String[] argWords=words.split(" ");
        for(String word : argWords) {
            collector.emit(new Values(word,1));
        }
        
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        // TODO Auto-generated method stub
        this.collector=arg2;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // TODO Auto-generated method stub
        arg0.declare(new Fields("word","num"));
    }

}

 

创建wordCountBolt将单词的频数进行累加;

package cn.com.sjzxy.edu.wordcount;

import java.util.HashMap;
import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;

public class WordCountBolt extends BaseRichBolt{
    /**
     * 
     */
    private static final long serialVersionUID = 1L;
    private Map<String, Integer> map = new HashMap<String, Integer>();
    @Override
    public void execute(Tuple arg0) {
        // TODO Auto-generated method stub
        String word=arg0.getString(0);
        Integer num=arg0.getInteger(1);
        if(map.containsKey(word)) {
            Integer count= map.get(word);
            map.put(word, count+num);
        }else {
            map.put(word, num);
        }
        System.err.println(Thread.currentThread().getId() + "  word:" + word + "  num:" + map.get(word));
        
    }

    @Override
    public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
        // TODO Auto-generated method stub
        
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
    
    }
    
            

}
最后主程序main方法构造topology拓扑结构,本地方法运行:

package cn.com.sjzxy.edu.wordcount;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WorCountMain  {
    public static void main(String[] args) {
            TopologyBuilder builder=new TopologyBuilder();
            builder.setSpout("WordCountSpout", new WordCountSpout(),1);
            builder.setBolt("WordCountSplit", new WordCountSplitBolt(),2).shuffleGrouping("WordCountSpout");
            builder.setBolt("wordwordcount", new WordCountBolt(),4).fieldsGrouping("WordCountSplit", new Fields("word"));
            Config conf=new Config();
            conf.setNumWorkers(2);
            if (args.length > 0) {
                try {
                    // 4 分布式提交
                    StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                // 5 本地模式提交
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("wordcounttopology", conf, builder.createTopology());
            }
        }
}

 

 

 

storm编程-项目实战篇-wordcount程序