很久没写文章了,之所以写这篇文章是想其他同学少走弯路,因为我在进行配置的时候发现google及百度没有一篇像样的文章。官方doc说的又不是很清楚,所以比较蛋疼,最终还是折腾出来了。
Kafka SSL 配置
大家先可以去看官方doc:
我感觉比较误导人。。
首先来看看整个集群的架构
Kafka1
|
Kafka2
|
Kafka3
|
192.168.56.100
|
192.168.56.101
|
192.168.56.102
|
Zookeeper
|
Zookeeper
|
Zookeeper
|
Kafka broker 100
|
Kafka broker 101
|
Kafkabroker 102
|
集群共三个节点如上述所示
配置步骤如下:
zookeeper与kafka的安装部署我这里就不说了,主要说SSL配置
1、配置三个节点的server.properties 文件
-
[[email protected] kafka-0.9.0.1]# cat config/server.properties
-
broker.id=100
-
#port=9020
-
port=9093
-
host.name=192.168.56.100
-
advertised.host.name=192.168.56.100
-
zookeeper.connect=192.168.56.100:2181,192.168.56.101:2181,192.168.56.102:2181/kafka91
-
allow.everyone.if.no.acl.found=true
-
#allow.everyone.if.no.acl.found=false
-
#super.users=User:Bob;User:Alice
-
super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test
-
#listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093
-
#advertised.listeners=PLAINTEXT://192.168.56.100:9020, SSL://192.168.56.100:9093
-
listeners=SSL://192.168.56.100:9093
-
advertised.listeners=SSL://192.168.56.100:9093
-
ssl.keystore.location=/root/kafka1/kafka.server.keystore.jks
-
ssl.keystore.password=zbdba94
-
ssl.key.password=zbdba94
-
ssl.truststore.location=/root/kafka1/kafka.server.truststore.jks
-
ssl.truststore.password=zbdba94
-
ssl.client.auth=required
-
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
-
ssl.keystore.type=JKS
-
ssl.truststore.type=JKS
-
security.inter.broker.protocol=SSL
-
-
#zookeeper.set.acl=true
-
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
-
principal.builder.class=org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
-
-
-
# Replication configurations
-
num.replica.fetchers=4
-
replica.fetch.max.bytes=1048576
-
replica.fetch.wait.max.ms=500
-
replica.high.watermark.checkpoint.interval.ms=5000
-
replica.socket.timeout.ms=30000
-
replica.socket.receive.buffer.bytes=65536
-
replica.lag.time.max.ms=10000
-
controller.socket.timeout.ms=30000
-
controller.message.queue.size=10
-
default.replication.factor=3
-
-
# Log configuration
-
log.dir=/data1/kafka-0.9.0.1/data
-
kafka.logs.dir=logs
-
num.partitions=20
-
message.max.bytes=1000000
-
auto.create.topics.enable=true
-
log.index.interval.bytes=4096
-
log.index.size.max.bytes=10485760
-
log.retention.hours=720
-
log.flush.interval.ms=10000
-
log.flush.interval.messages=20000
-
log.flush.scheduler.interval.ms=2000
-
log.roll.hours=168
-
log.retention.check.interval.ms=300000
-
log.segment.bytes=1073741824
-
delete.topic.enable=true
-
-
# ZK configuration
-
zookeeper.connection.timeout.ms=6000
-
zookeeper.sync.time.ms=2000
-
-
# Socket server configuration
-
num.io.threads=8
-
num.network.threads=8
-
socket.request.max.bytes=104857600
-
socket.receive.buffer.bytes=1048576
-
socket.send.buffer.bytes=1048576
-
queued.max.requests=16
-
fetch.purgatory.purge.interval.requests=100
-
producer.purgatory.purge.interval.requests=100
2、在kafka1节点上面生成certificate和ca文件
-
#!/bin/bash
-
name=$HOSTNAME
-
folder=securityDemo
-
-
cd /root
-
rm -rf $folder
-
mkdir $folder
-
cd $folder
-
printf "zbdba94\nzbdba94\kafka1\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.server.keystore.jks -alias $name -validity 365 -genkey
-
-
printf "te\ntest\ntest\ntest\ntest\kafka1\[email protected]\n" | openssl req -new -x509 -keyout ca-key -out ca-cert -days 365 -passout pass:zbdba94
-
echo "done"
-
printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
-
printf "zbdba94\nzbdba94\nyes\n" | keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
-
printf "zbdba94\n" | keytool -keystore kafka.server.keystore.jks -alias $name -certreq -file cert-file
-
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:zbdba94
-
printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
-
printf "zbdba94\nyes\n" | keytool -keystore kafka.server.keystore.jks -alias $name -import -file cert-signed
-
-
#producer.propeties
-
rm -rf producer.properties
-
-
printf $PWD
-
-
-
echo "bootstrap.servers=$name:9093" >> producer.properties
-
echo "security.protocol=SSL" >> producer.properties
-
echo "ssl.truststore.location=$PWD/kafka.client.truststore.jks">> producer.properties
-
echo "ssl.truststore.password=zbdba94">> producer.properties
-
echo "ssl.keystore.location=$PWD/kafka.server.keystore.jks">> producer.properties
-
echo "ssl.keystore.password=zbdba94">> producer.properties
-
echo "ssl.key.password=zbdba94">> producer.properties
注意将kafka1换成你机器的host
3、在kafka2机器上生成客户端certificate并采用kafka1生成的ca文件进行标识
执行以下脚本
client1.sh
-
#!/bin/bash
-
name=$HOSTNAME
-
cd /root
-
dirname=securityDemo
-
rm -rf $dirname
-
mkdir $dirname
-
cd $dirname
-
-
printf "zbdba94\nzbdba94\n$name\ntest\ntest\ntest\ntest\ntest\nyes\n\n" | keytool -keystore kafka.client.keystore.jks -alias $name -validity 36 -genkey
-
printf "zbdba94\nzbdba94\nyes\n" |keytool -keystore kafka.client.keystore.jks -alias $name -certreq -file cert-file
-
-
cp cert-file cert-file-$name
在kafka2节点上将kafka1生成的文件拷贝过来
cd /root/ && scp -r kafka1:/root/securityDemo /root/kafka1
然后执行以下脚本
client2.sh
-
#!/bin/bash
-
name=$HOSTNAME
-
cd /root
-
openssl x509 -req -CA /root/kafka1/ca-cert -CAkey /root/kafka1/ca-key -in /root/securityDemo/cert-file-$name -out /root/securityDemo/cert-signed-$name -days 36 -CAcreateserial -passin pass:zbdba94
然后执行以下脚本
client3.sh
-
#!/bin/sh
-
name=$HOSTNAME
-
-
cd /root/securityDemo
-
printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file /root/kafka1/ca-cert
-
printf "zbdba94\nyes\n" | keytool -keystore kafka.client.keystore.jks -alias $name -import -file /root/securityDemo/cert-signed-$name
-
-
-
-
#producer.propeties
-
rm -rf producer.properties
-
-
printf $PWD
-
-
-
echo "bootstrap.servers=localhost:9093" >> producer.properties
-
echo "security.protocol=SSL" >> producer.properties
-
echo "ssl.truststore.location=$PWD/kafka.client.keystore.jks">> producer.properties
-
echo "ssl.truststore.password=zbdba94">> producer.properties
-
echo "ssl.keystore.location=$PWD/kafka.client.keystore.jks">> producer.properties
-
echo "ssl.keystore.password=zbdba94">> producer.properties
-
echo "ssl.key.password=zbdba94">> producer.properties
同理kafka3节点安装kafka2节点进行配置
4、启动集群
启动集群logfile中打印如下日志:
三个节点分别打印
-
INFO Registered broker 100 at path /brokers/ids/100 with addresses: SSL -> EndPoint(192.168.56.100,9093,SSL) (kafka.utils.ZkUtils)
-
INFO Registered broker 101 at path /brokers/ids/101 with addresses: SSL -> EndPoint(192.168.56.101,9093,SSL) (kafka.utils.ZkUtils)
-
INFO Registered broker 102 at path /brokers/ids/102 with addresses: SSL -> EndPoint(192.168.56.102,9093,SSL) (kafka.utils.ZkUtils)
然后进行验证
可以按照官方给的demo验证
To check quickly if the server keystore and truststore are setup properly you can run the following command
openssl s_client -debug -connect localhost:9093 -tls1
(Note: TLSv1 should be listed under ssl.enabled.protocols)
In the output of this command you should see server's certificate:
-----BEGIN CERTIFICATE-----
{variable sized random bytes}
-----END CERTIFICATE-----
subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
开始写入和消费验证
在kafka1写入消息:
-
/data1/kafka-0.9.0.1/bin/kafka-console-producer.sh --broker-list kafka1:9093 --topic jingbotest5 --producer.config /root/securityDemo/producer.properties
在kafka2消费消息:
-
/data1/kafka-0.9.0.1/bin/kafka-console-consumer.sh --bootstrap-server kafka2:9093 --topic jingbotest5 --new-consumer --consumer.config /root/securityDemo/producer.properties
如果可以正常消费则没有问题
下面看看Java client如何连接
这里只给出简单的demo,主要展示如何连接
-
package com.jingbo.test;
-
-
import java.util.Properties;
-
-
import org.apache.kafka.clients.CommonClientConfigs;
-
import org.apache.kafka.clients.producer.KafkaProducer;
-
import org.apache.kafka.clients.producer.ProducerConfig;
-
import org.apache.kafka.clients.producer.ProducerRecord;
-
import org.apache.kafka.common.config.SslConfigs;
-
-
public class ProducerZbdba {
-
public static void main(String[] args) {
-
Properties props = new Properties();
-
Properties producerProps = new Properties();
-
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.102:9093");
-
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "myApiKey");
-
producerProps.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks");
-
producerProps.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "zbdba94");
-
producerProps.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "C:/Users/zbdba/Downloads/kafka.client.keystore.jks");
-
producerProps.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "zbdba94");
-
producerProps.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");
-
producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
-
-
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-
-
KafkaProducer producer = new KafkaProducer(producerProps);
-
for(int i = 0; i < 100; i++)
-
producer.send(new ProducerRecord<String, String>("jingbotest5", Integer.toString(i), Integer.toString(i)));
-
System.out.println("test");
-
producer.close();
-
}
-
}
kafka.client.keystore.jks可以从任意一个节点拷贝下来。
之前在102 的消费者可以一直开着,这是写入看那边能否消费到。如果可以正常消费,那么表示SSL已经配置成功了。
Kafka ACL 配置
本来想单独开一篇文章的,但是感觉比较简单就没有必要,那为什么要说这个呢,是因为还是 有点坑的。
大家可以先参考官方的doc:
我按照配置,最后出现了如下错误:
-
[2016-09-05 06:32:35,144] ERROR [KafkaApi-100] error when handling request Name:UpdateMetadataRequest;Version:1;Controller:100;ControllerEpoch:39;CorrelationId:116;ClientId:100;AliveBrokers:102 : (EndPoint(192.168.56.102,9093,SSL)),101 : (EndPoint(192.168.56.101,9093,SSL)),100 : (EndPoint(192.168.56.100,9093,SSL));PartitionState:[jingbotest5,2] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100),[jingbotest5,5] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:42,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,100,101),[jingbotest5,8] -> (LeaderAndIsrInfo:(Leader:102,ISR:102,LeaderEpoch:40,ControllerEpoch:39),ReplicationFactor:3),AllReplicas:102,101,100) (kafka.server.KafkaApis)
-
kafka.common.ClusterAuthorizationException: Request Request(1,192.168.56.100:9093-192.168.56.100:43909,Session(User:CN=zbdba2,OU=test,O=test,L=test,ST=test,C=test,zbdba2/192.168.56.100),null,1473071555140,SSL) is not authorized.
-
at kafka.server.KafkaApis.authorizeClusterAction(KafkaApis.scala:910)
-
at kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:158)
-
at kafka.server.KafkaApis.handle(KafkaApis.scala:74)
-
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
-
at java.lang.Thread.run(Thread.java:744)
-
[2016-09-05 06:32:35,310] ERROR [ReplicaFetcherThread-2-101], Error for partition [jingbotest5,4] to broker 101:org.apache.kafka.common.errors.AuthorizationException: Topic authorization failed. (kafka.server.ReplicaFetcherThread)
官方的doc说配置了:
principal.builder.class=CustomizedPrincipalBuilderClass
就可以为SSL用户进行ACL验证,但是CustomizedPrincipalBuilderClass已经过时,搜索doc的时候发现已经变更为:class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
于是开开心心拿去配置上,然而启动错误。根据日志发现其实不能用class的,也就是org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
所以对热爱看官方doc的人遇到kafka还是比较蛋疼的。
最终起来了,但是还是依然报以上的错误。看到这篇文章的人就踩不到坑了,因为上面我已经帮你配置好了。
super.users=User:CN=kafka1,OU=test,O=test,L=test,ST=test,C=test
直接将SSL用户设置为superuser
这时候ACL就可以正常的跑起来了。
Kafka 的 SSL 和ACL 感觉整合起来可以实现一套完整的权限控制,但是不知道真正运行起来是否有坑,对于性能影响方面大家可以去参考slideshare上面的一个ppt
当然你可以自行压力测试,根据ppt上所示,性能会有30%左右的损耗。

作者也咨询了各大厂商,用的人比较少。还有的准备要上。我们也在考虑是否要上,业务需求比较大。
以上的脚本作者整理一下并且放入到了github中:
https://github.com/zbdba/Kafka-SSL-config
参考链接:
https://github.com/confluentinc/securing-kafka-blog 这里面通过Vagrant整合全自动配置
转自:http://blog.****.net/zbdba/article/details/52458654点击打开链接