spark-streaming消费kafka写入hbase踩坑实战
场景
otter同步mysql的数据到kafka,spark-streaming消费kafka,采用motator异步写hbase。mysql的insert,update,delete特别频繁,每秒都有大量的insert,delete,update
以下都是基于同一条数据操作
1. 坑 Delete(byte [] row), Put(byte[] row),
bufferedmutator写hbase的时候 同一条数据先 delete 再put ,
遇到的问题:
hbase显示没这个数据;就是bufferedmutator处理数据乱序了(每次测试都是hbase找不到数据)
解决的途径:
遇坑后,百度找到https://developer.aliyun.com/ask/129312?spm=a2c6h.13159736这篇文章。意思就是new 对象 加上时间戳(增加ms级别极端情况判断,对后者加1ms,见case5)
2.坑 Delete(byte [] row, long timestamp) ,Put(byte[] row, long ts)
代码里时间戳使用System.currentTimeMillis()
遇到的问题:
spark-streaming处理数据的时候,一个批次rdd遍历起来很快,难免两个System.currentTimeMillis()一样,此时就和方法1一样,达不到先delete,后insert的效果,依然没有解决(反复测试,确实有两个时间戳一样)
解决的途径:
然后又找到更细粒度的纳秒,System.nanoTime()作为参数传递
3.坑 Delete(byte [] row, long timestamp) ,Put(byte[] row, long ts)
代码里使用System.nanoTime() 作为参数传到delete和put对象里,写hbase是没问题的,此时看似解决问题了,但是又有其他新问题出现,写入hbase的timestamp的值就是System.nanoTime() 的返回值
以前对这个System.nanoTime()不了解,百度找了资料了解了点https://www.cnblogs.com/andy-songwei/p/10784049.html
遇到的问题:
1:HTable设置的ttl是根据System.currentTimeMillis()判断的,达不到想要的效果
2:System.nanoTime()这个返回值是 返回正在运行的Java虚拟机的高分辨率时间源的当前值,以纳秒计。还是随机的
3:System.nanoTime()不能与java.util.Date转换
4:服务器重启,System.nanoTime()的返回值与以前的返回值没有关系,可能比之前的值更小,那么hbase 的数据就不能操作了
5:还了解了hbase的时间戳 https://blog.****.net/inte_sleeper/article/details/11689059
每次操作的时间戳必须比以前旧数据的时间戳大,否则数据不会变化
4.坑 hbase-issues里找到1所说的bug
问题描述:
解决方法:
5.代码参照4方法优化
val keyTimesMap = new mutable.HashMap[String,Long]()
可变集合 保存 key和key对应的时间戳
在event_type是delete 的时候调用下边方法,在event_class不是delete的时候也调用这个方法