自定义kafka分区器Partitioner
研究分区器先从ProducerRecord看起,因为分区是在每条record的基础上实现的。
ProducerRecord的字段:
在构造函数中可以指定partition,消息会直接放在指定的分区上。如果不指定partion,那么就会以默认分区器,按照key的散列算法进行分区,分布在主题的所有分区上,有可能放在不可用的分区上。相同的key会有相同的散列值,相同的散列值会在同一个分区,也就是相同key的记录会被放在同一个分区,但是相同的分区内不一定都是相同的key。如果是记录中没有Key,就按照轮询算法将消息均匀的分布在主题的可用分区上。
在KafkaProducer中的partition方法注释中可以看到该方法就是用来计算partition,如果发过来的记录中含有partition信息,那么就返回partition信息,否则就调用配置的分区器去计算partition。
点进上面的第1210行的partition方法可以跳转到默认的分区器(DefaultPartitioner)中是如何实现分区的:
在分区器中,先从cluster中获取topic的分区列表,然后得到分区数。判断记录中的key是否为NUll,如果key是null,通过nextValue()得到topic的一个值,nextValue(topic)这个方法中第一次使用topic时会先产生一个随机数,以后再使用的时候会在这个随机数基础上进行自增。然后从cluster中获取这个topic中可用的partition,如果有可用的partition,就把刚才那个nextValue(topic)的值变成正数然后对可用的partition数量进行取余,返回一个分区。如果没有可用的分区,就会对整个partition列表的长度取余,这个时候返回的就是一个不可用的分区。(因为前面已经对可用的分区进行计算而没有得到结果,说明现在只剩下不可用的分区)。也就是在key为null的时候,优先分配可用分区。而在Key不为null时,是直接对所有分区列表的长度进行取余,而这种情况和Key为null时不一样,是因为这里没有先对可用分区进行计算,所以得到的就是可能是可用分区也可能是不可用分区。
在key不为null时,使用Utils.murmur2(keyBytes)对key的序列化字节数组进行hash算法,返回一个hash值,也就是这样会使相同的key会在同一个分区:
kafka的分区选择策略:
1.消息的key为null
加果key为null,则上次计算分区时使用的自增整数加一。然后判断topic的可用分区数是否大于0,如果大于0则使用获取的nextValue()的值和可用分区数取余。如果topic的可用分区数小于0,则用nextValue()的值与总的分区数取余。
2.消息key不为Null
这种情况就是根据key的hash算法(murmur2)计算得到的值与总的分区数取余。
虽然kafka为我们提供了默认的分区选择器,但是在有些业务中,比如我们只想让某一个key有一个专门的partition,这个partition中不包含其他key的信息,就可以自定义一个分区器,让这个Key进到特定的partition中,其余的key使用散列算法来处理,这时就可以用到自定义分区器。下面就来实现一下自定义分区器:
1.pom文件:
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
2.自定义分区器:
package demo;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class CustomPartitioner implements Partitioner{
@Override
public void configure(Map<String,?> configs){
}
//只需要重写partition方法实现我们自己的分区逻辑
@Override
public int partition(String topic, Object key, byte[] key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster){
//从clusten中获取这个topicn分区信息,在此我门建立了一个partition为5的topic
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
//获取分区总个数
int numPartitions = partitions.size();
if(key == null){
throw new InvalidRecordException("We expect all messages to have customer name as key...");
}else{
//假设这里我们让key为hangzhou的记录放在一个专门的partition中
if(((String)key).equals("hangzhou")){
/*
* 返回partition,partition是int类型的,值就是一个在numPartitions范围内的一个值,范围从0开始,
* 这里我们定义返回partition为最后一个分区(也就是序号最大的partition),这样才可以保证在else的情况中我们的partition分配的范围是在除去专属于"hangzhou"的partition中,
* 因为在else中减1之后,Partition不会达到这个原本最大的partition序号,也就不会再分到这个partition
* 原本的partition序号是0~4,如果我们指定的是4,在else中取余的时候减1,partition的范围就变成了0~3,肯定不会分到4
* 但如果我们在这里指定的不是4(那就是0~3),而在else中的范围是0~3,就肯定会有记录分配到专属partition中
* 否则,假如在这里指定的是partition为3的partition,那在else的情况中虽然partition个数减1了,
* 但是并没有体现是排除了这里专属的partition,就导致其他的key的记录在hash算法时还会分到这里
*/
return numPartitions - 1;
}else{
//其余的记录按照散列算法的方式分布在其他partition中,此时partition的个数是在总个数上减1,而且是上面的专属partition已经排除了
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1));
}
}
}
@Override
public void close(){
}
}
3.创建Producer:
package demo;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class MyProducer{
public static void main(String[] args)throws InterruptedException, ExecutionException{
Properties props = new Properties();
props.put("bootstrap.servers","192.168.184.128:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//指定分区器为自定义分区器,值是自定义分区器的全路径,如果不指定就是默认的分区器
props.put("partitioner.class","demo。CustomPartitioner");
Producer<String, String> producer = new Kafkaproducer<>(props);
for(int i = 0; i < 10; i++){
ProducerRecord<String, String> record1 = new ProducerRecord<String, String>("wyh-partitioner-topic", "hangzhou", Integer.toString(i));
RecordMetadata recordMetadata = producer.send(record1).get();
ProducerRecord<String, String> record2 = new ProducerRecord<String, String>("wyh-partitioner-topic", "beijing-"+i, Integer.toString(i));
RecordMetadata recordMetadata2 = producer.send(record2).get();
System.out.printin("key:"+record1.key()+"***partition:"+recordMetadata.partition());
System.out.printin("key:"+record2.key()+"***partition:"+recordMetadata2.partition());
}
producer.close();
}
}
运行:
可以看到只有hangzhou被分配到了partition4中,其他的key都分配在剩余的partition中。
这样就实现了自定义分区器。