Storm-kafka集成——1.1.0版本storm中tuple取KafkaSpout数据详解
问题描述:
KafkaSpout拉取kafka topic数据,下一级bolt从kafkaspout获取数据,tuple到底采用什么方法取出spout中的消息呢?
KafkaSpout创建:
/* *根据数据源topic和zk_id创建并返回kafkaSpout * */ public static KafkaSpout init(String spout_topic,String zk_id){ KafkaSpoutConfig<String,String> kafkaSpoutConfig = KafkaSpoutConfig .builder(bootstrap_address,spout_topic) .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") .setProp(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000) .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000) .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName()) .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName()) .setOffsetCommitPeriodMs(10000)//控制spout多久向kafka提交一次offset .setGroupId(zk_id) .setMaxUncommittedOffsets(250) .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .build(); KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig); return kafkaSpout; }
bolt逻辑处理:
public void execute(Tuple tuple, BasicOutputCollector collector) { this.collector = collector; System.out.println("MC开始读取数据"); String message = tuple.getStringByField("value"); //038,2018-03-21 14:51:47,17134906630,2018-03-21 14:53:03,V0330700,E02,036,null,E020005,01,V0350100 String strs[] = message.split(",", -1); String userProv = strs[0];//归属省份编码 String occurtime = strs[1];//最新信令产生时间 String deviceNumber = strs[2];//电话号码 String eventArea = strs[4]; //信令发生地市 String eventProv = strs[6]; //信令发生省份 String userArea = strs[10];//归属地市编码 //过滤漫入用户,只要漫出用户 if(userProv.equals("038") && !eventProv.equals("038")){ String provDescGuishu = null; String areaDescGuishu = null; String provDescOccur = null; String areaDescOccur = null; String typeGuishu = "省内"; String community = null; String mrmcType = "漫出"; String roamType = "国内"; String longitude = null; String latitude = null; String areaInfoGuishu = JedisUtil.get("zbarea|" + userArea);//用户归属地市新信息 if(areaInfoGuishu != null && !areaInfoGuishu.equals("")){ String strs1[] = areaInfoGuishu.split(":"); provDescGuishu = strs1[0]; areaDescGuishu = strs1[1]; } String areaInfo_occur = JedisUtil.get("zbarea|" + eventArea); if(areaInfo_occur != null && !areaInfo_occur.equals("")){ String strs2[] = areaInfo_occur.split(":"); provDescOccur = strs2[0]; areaDescOccur = strs2[1]; } //格式化目标数据: //电话|信令发生时间|用户归属省份描述|用户归属地市描述|用户归属类型|用户到访省份描述|用户到访地市描述|用户到访小区描述|漫入漫出类型描述|漫游类型|经度|纬度 String mrmc2merge = deviceNumber + "#" + occurtime + "#" + provDescGuishu + "#" + areaDescGuishu + "#" + typeGuishu + "#" + provDescOccur + "#" + areaDescOccur + "#" + community + "#" + mrmcType + "#" + roamType + "#" + longitude + "#" + latitude; System.out.println("MC数据" + mrmc2merge); collector.emit(new Values(mrmc2merge)); }else{ System.out.println(deviceNumber + "******被过滤漫入用户"); } }
topo结构构造
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("s1mme_spout",init(s1mme_spout_topic,zk_s1mme_id),12); builder.setBolt("s1mme_split",new S1mmeSplitBolt(),36).shuffleGrouping("s1mme_spout"); builder.setSpout("sgs_spout",init(sgs_spout_topic,zk_sgs_id),3); builder.setBolt("sgs_split",new SgsSpiltBolt(),6).shuffleGrouping("sgs_spout"); builder.setSpout("mrmc_spout",init(mrmc_spout_topic,zk_mrmc_id),1); builder.setBolt("mrmc_split",new MrmcSplitBolt(),3).shuffleGrouping("mrmc_spout");
首先我们来看tuple具备的属性:
tuple的几个主要方法:
tuple.getFields();//获取kafkaSpout的topic相关属性。tuple的属性列表:topic, partition, offset, key, value
tuple.getValues();//获取所有属性的值
tuple.getValue();//根据属性列表下标获取值
tuple.toString();//获取tuple的所有信息,包括数据来源,消息id,数据value等信息
tuple.getMessageId();//获取消息id
tuple.getSourceComponent();//获取数据源
tuple.getSourceStreamId();//获取本条消息的id
tuple.getSourceTask();
上述代码的输出结果对应下图:
总结:我们在取值的时候主要是根据tuple的属性列表以及对应下标索引进行取值,特别是使用getString,getLong,这类的方法时,1.X版本与0.X版本有所不同,在1.X版本必须采取上述取值方法