flume+kafka+storm的集成
第一步:启动storm:
1.1启动storm集群
master:
python bin/storm nimbus &
python bin/storm ui &
python bin/storm logviewer &
slave:
python bin/storm supervisor &
python bin/storm logviewer &
1.2开发storm+kafka的集成代码:
stormKafka.java:
package stormKafkaPackage;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class stormKafka {
public static void main(String[] args) throws Exception {
String topic = "badou_storm_kafka_test";
String zkRoot = "/badou_storm_kafka_test";
String spoutId = "kafkaSpout";
BrokerHosts brokerHosts = new ZkHosts("master:2181");
SpoutConfig kafkaConf = new SpoutConfig(brokerHosts, topic, zkRoot, spoutId);
kafkaConf.forceFromStart = true;
kafkaConf.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConf);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", kafkaSpout, 2);
builder.setBolt("printer", new PrinterBolt())
.shuffleGrouping("spout");
Config config = new Config();
config.setDebug(false);
if(args!=null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
config.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("kafka", config, builder.createTopology());
// Thread.sleep(10000);
// cluster.shutdown();
}
}
}
PrinterBolt.java:
package stormKafkaPackage;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
public class PrinterBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
System.out.println(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
}
}
1.3运行storm程序脚本:
python /usr/local/src/apache-storm-0.9.3/bin/storm jar \
/root/IdeaProjects/stormtest/target/stormtest-1.0-SNAPSHOT.jar \
stormKafkaPackage.stormKafka \
guoqing_remote
第二步:启动kafka:
./bin/kafka-server-start.sh config/server.properties
第三步:启动flume:
3.1编写flume文件:
3.2启动命令:
./bin/flume-ng agent --conf conf --conf-file ./conf/flume_kafka.conf --name a1 -Dflume.root.logger=INFO,console
3.3测试: