Kafka的快速使用

Kafka使用到了zookeeper,所以首先你得安装zookeeper再安装kafka。

1.单节点的broker部署

首先我们需要修改$KAFKA_HOME/config/server.properties这个配置文件,主要以下几处需要修改:

broker.id=0,#每个broker的ID需要唯一
listeners:#监听的端口(此处笔者设置的是默认端口9092)
host.name:#当前机器
log.dirs:#存储日志的文件夹

num.partitions:分区的数量
zookeeper.connect:zookeeper的地址(默认为localhost:2181)

这几处根据你自身需要进行配置,然后启动步骤如下:

1)开启zookeeper,此处需要注意的是zookeeper的conf目录下的zoo.cfg配置文件,主要修改的也是日志存储目录那块。

2)启动Kafka,命令为:kafka-server-start.sh $KAFKA_HOME/config/server.properties

3)创建topic,需要指定zookeeper,命令为:kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic。 注意指定zookeeper,后面几个属性可以根据你实际情况进行定义。另外查看所有topic的命令为:
kafka-topics.sh --list --zookeeper hadoop000:2181

4)发送消息,需要指定broker,命令为:kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic

5)消费消息,需要指定zookeeper,命令为:kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning。意思就是指定zookeeper上的topic进行消费,from-beginning的设置,可以查看之前的消息。

2.单节点,多broker

主要是增加多个server.properties文件,一个配置文件就相当于一个broker,我就设置三个broker:

server-1.properties

log.dirs=/home/hadoop/app/tmp/kafka-logs-1

listeners=PLAINTEXT://:9093

broker.id=1

 

server-2.properties

log.dirs=/home/hadoop/app/tmp/kafka-logs-2

listeners=PLAINTEXT://:9094

broker.id=2

 

server-3.properties

log.dirs=/home/hadoop/app/tmp/kafka-logs-3

listeners=PLAINTEXT://:9095

broker.id=3

然后依次开启,命令如下:

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &

kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

 接下来就跟上面的步骤一样:

kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic

kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic

 查看 topic的详细信息:

kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic

要注意的是,副本中会有个leader,而多副本也实现了kafka的容错性,挂掉一个副本后,会自动在剩下副本里选出一个leader来同步操作。

根据上面步骤操作,我们在producer窗口输入,在consumer消费窗口看到相应输出。

Producer和Consumer API的使用

接下来展示一个简单的Demo,在生产端简单创建个线程进行循环输出,然后用消费者端对输出的内容进行展示,也就是消费。

配置文件

/**

* Kafka常用配置文件

*/

public class KafkaProperties {

 

public static final String ZK = "192.168.199.111:2181";

 

public static final String TOPIC = "hello_topic";

 

public static final String BROKER_LIST = "192.168.199.111:9092";

 

public static final String GROUP_ID = "test_group1";

 

}

 Producer API DEMO

 
import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;



import java.util.Properties;



/**

* Kafka生产者

*/

public class KafkaProducer extends Thread{



    private String topic;



    private Producer<Integer, String> producer;



    public KafkaProducer(String topic) {

        this.topic = topic;



        Properties properties = new Properties();



        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);

        properties.put("serializer.class","kafka.serializer.StringEncoder");

        properties.put("request.required.acks","1");


        producer = new Producer<Integer, String>(new ProducerConfig(properties));

    }





@Override

public void run() {

       int messageNo = 1;



        while(true) {

        String message = "message_" + messageNo;

        producer.send(new KeyedMessage<Integer, String>(topic, message));

        System.out.println("Sent: " + message);



        messageNo ++ ;



        try{

            Thread.sleep(2000);

        } catch (Exception e){

            e.printStackTrace();

    }

}



}

}

Consumer API DEMO

 
import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;


import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;


/**

* Kafka消费者

*/

public class KafkaConsumer extends Thread{


private String topic;


public KafkaConsumer(String topic) {

    this.topic = topic;

}



private ConsumerConnector createConnector(){

        Properties properties = new Properties();

        properties.put("zookeeper.connect", KafkaProperties.ZK);
    
        properties.put("group.id",KafkaProperties.GROUP_ID);

        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

}


@Override

public void run() {

    ConsumerConnector consumer = createConnector();


    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    topicCountMap.put(topic, 1);

// topicCountMap.put(topic2, 1);

// topicCountMap.put(topic3, 1);


// String: topic

// List<KafkaStream<byte[], byte[]>> 对应的数据流

   Map<String, List<KafkaStream<byte[], byte[]>>> messageStream =                 consumer.createMessageStreams(topicCountMap);


    KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0); //获取我们每次接收到的暑假


    ConsumerIterator<byte[], byte[]> iterator = stream.iterator();



    while (iterator.hasNext()) {

            String message = new String(iterator.next().message());

            System.out.println("rec: " + message);

        }

    }

}

最后在main函数对这两个类调用即可,结果如下:

Kafka的快速使用

 

**** 原文:https://blog.****.net/wing_93/article/details/78513782