kafka的安装及使用

1.下载kafka(kafka_2.11-0.8.2.0.tgz)

注意:强烈建议使用flume1.6,不要使用flume1.8

点击打开链接

2.上传到hadoop05节点

3.解压文件

tar -zxvf kafka_2.11-0.8.2.0.tgz -C apps/

4.单节点启动(前台)

bin/kafka-server-start.sh config/server.properties

kafka的安装及使用

5.查看topics

bin/kafka-topics.sh --list --zookeeper localhost:2181

kafka的安装及使用kafka的安装及使用

6.创建topics

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

kafka的安装及使用

7.模拟生产者和消费者

复制两个hadoop05的连接会话

下面是生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

然后输入ni hao

kafka的安装及使用

下面是消费者

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

kafka的安装及使用

发现是pull不到之前输入的ni hao的数据的

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

kafka的安装及使用

 

 

kafka集群的安装及使用(hadoop03,hadoop04,hadoop05):

 

1.修改server.properties 配置文件

cd /home/hadoop/apps/kafka_2.11-0.8.2.0/config

[[email protected] config]$ vim server.properties 

按照下图修改为自己的zookeeper集群

kafka的安装及使用

 

 

2.从hadoop05发送kafka的安装文件到hadoop04,hadoop03

 

[[email protected] apps]$ scp -r kafka_2.11-0.8.2.0/ hadoop04:$PWD

[[email protected] apps]$ scp -r kafka_2.11-0.8.2.0/ hadoop03:$PWD

3.分别进入不同的节点修改server.properties 配置文件

 

进入hadoop04修改为下图

kafka的安装及使用

进入hadoop05修改为下图

kafka的安装及使用

这样的话hadoop03的id为0,hadoop04的id为1,hadoop05的id为2

4.启动kafka

注意先启动ZK

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

5.创建topic

新建一个hadoop03的连接会话

这里随便指定zk集群的一个节点就好了。

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --create --zookeeper hadoop02:2181 --replication-factor 3 --partitions 3 --topic topic1

kafka的安装及使用

6.查看副本列表

 

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topic1

kafka的安装及使用

 

7.向topic1中发布消息

 

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 -topic topic1

kafka的安装及使用

注意这里仅仅是去kafka中发布一条消息

8.从topic1中消费消息

 

然后再复制一个窗口

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topic1 

kafka的安装及使用

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic topic1 

kafka的安装及使用

 

 

 

java的api去访问kafka集群的使用:

1.新建maven项目,在pom.xml中添加

 

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.8.2.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.10</version>
        <scope>test</scope>
    </dependency>
</dependencies>

下载源码文件:kafka-0.8.2.0-src.tgz

在G:\spark\kafka\kafka-0.8.2.0-src\examples\src\main\java\kafka\examples下找到所有的.java文件放到IDEA中

2.修改代码:

在KafkaProperties.java中修改为下图所示

kafka的安装及使用

修改Producer.java文件

kafka的安装及使用

修改Consumer.java文件

kafka的安装及使用

新建一个ProducerDemo.java文件

把KafkaConsumerProducerDemo.java文件的第一句话剪切到自己新建的文件中

然后把KafkaConsumerProducerDemo.java的文件名改为ConsumerDemo.java

先启动hadoop03节点的kafka服务

[[email protected] kafka_2.11-0.8.2.0]$ bin/kafka-server-start.sh config/server.properties

然后先运行ConsumerDemo.java再运行ProducerDemo.java

kafka的安装及使用

kafka的安装及使用

通过观察发现全部都是0

然后继续修改Produce.java

kafka的安装及使用

然后先运行ConsumerDemo.java再运行ProducerDemo.java

kafka的安装及使用

 

下面我们要自定义分区。就需要重新实现Partition接口

 

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class HashPartitioner implements Partitioner {

  public HashPartitioner(VerifiableProperties verifiableProperties) {}

  @Override
  public int partition(Object key, int numPartitions) {
    if ((key instanceof Integer)) {
      return Math.abs(Integer.parseInt(key.toString())) % numPartitions;
    }
    return Math.abs(key.hashCode() % numPartitions);
  }
}


或者

 

import java.util.concurrent.atomic.AtomicLong;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class RoundRobinPartitioner implements Partitioner {
  
  private static AtomicLong next = new AtomicLong();

  public RoundRobinPartitioner(VerifiableProperties verifiableProperties) {}

  @Override
  public int partition(Object key, int numPartitions) {
    long nextIndex = next.incrementAndGet();
    return (int)nextIndex % numPartitions;
  }
}


 

 

在Producer.java下增加下面代码

kafka的安装及使用