kafka的安装及使用
1.下载kafka(kafka_2.11-0.8.2.0.tgz)
注意:强烈建议使用flume1.6,不要使用flume1.8
2.上传到hadoop05节点
3.解压文件
tar -zxvf kafka_2.11-0.8.2.0.tgz -C apps/
4.单节点启动(前台)
bin
/kafka-server-start
.sh config
/server
.properties
5.查看topics
bin
/kafka-topics
.sh --list --zookeeper localhost:2181
6.创建topics
bin
/kafka-topics
.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic
test
7.模拟生产者和消费者
复制两个hadoop05的连接会话
下面是生产者
bin
/kafka-console-producer
.sh --broker-list localhost:9092 --topic
test
然后输入ni hao
下面是消费者
bin
/kafka-console-consumer
.sh --zookeeper localhost:2181 --topic
test
发现是pull不到之前输入的ni hao的数据的
bin
/kafka-console-consumer
.sh --zookeeper localhost:2181 --topic
test --from-beginning
kafka集群的安装及使用(hadoop03,hadoop04,hadoop05):
1.修改server.properties 配置文件
cd /home/hadoop/apps/kafka_2.11-0.8.2.0/config
[[email protected] config]$ vim server.properties
按照下图修改为自己的zookeeper集群
2.从hadoop05发送kafka的安装文件到hadoop04,hadoop03
[[email protected] apps]$ scp -r kafka_2.11-0.8.2.0/ hadoop04:$PWD
[[email protected] apps]$ scp -r kafka_2.11-0.8.2.0/ hadoop03:$PWD
3.分别进入不同的节点修改server.properties 配置文件
进入hadoop04修改为下图
进入hadoop05修改为下图
这样的话hadoop03的id为0,hadoop04的id为1,hadoop05的id为2
4.启动kafka
注意先启动ZK
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
5.创建topic
新建一个hadoop03的连接会话
这里随便指定zk集群的一个节点就好了。
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 3 --partitions 3 --topic topic1
6.查看副本列表
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic1
7.向topic1中发布消息
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 -topic topic1
注意这里仅仅是去kafka中发布一条消息
8.从topic1中消费消息
然后再复制一个窗口
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topic1
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topic1
java的api去访问kafka集群的使用:
1.新建maven项目,在pom.xml中添加
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.8.2.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies>
下载源码文件:kafka-0.8.2.0-src.tgz
在G:\spark\kafka\kafka-0.8.2.0-src\examples\src\main\java\kafka\examples下找到所有的.java文件放到IDEA中
2.修改代码:
在KafkaProperties.java中修改为下图所示
修改Producer.java文件
修改Consumer.java文件
新建一个ProducerDemo.java文件
把KafkaConsumerProducerDemo.java文件的第一句话剪切到自己新建的文件中
然后把KafkaConsumerProducerDemo.java的文件名改为ConsumerDemo.java
先启动hadoop03节点的kafka服务
[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties
然后先运行ConsumerDemo.java再运行ProducerDemo.java
通过观察发现全部都是0
然后继续修改Produce.java
然后先运行ConsumerDemo.java再运行ProducerDemo.java
下面我们要自定义分区。就需要重新实现Partition接口
import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class HashPartitioner implements Partitioner { public HashPartitioner(VerifiableProperties verifiableProperties) {} @Override public int partition(Object key, int numPartitions) { if ((key instanceof Integer)) { return Math.abs(Integer.parseInt(key.toString())) % numPartitions; } return Math.abs(key.hashCode() % numPartitions); } }
或者
import java.util.concurrent.atomic.AtomicLong; import kafka.producer.Partitioner; import kafka.utils.VerifiableProperties; public class RoundRobinPartitioner implements Partitioner { private static AtomicLong next = new AtomicLong(); public RoundRobinPartitioner(VerifiableProperties verifiableProperties) {} @Override public int partition(Object key, int numPartitions) { long nextIndex = next.incrementAndGet(); return (int)nextIndex % numPartitions; } }
在Producer.java下增加下面代码