windows 下 kafka 安裝与使用

zookper 安装使用

在我的另一篇安装使用博客中

https://blog.csdn.net/weixin_41670914/article/details/82884480

下载 kafka

下载 http://kafka.apache.org/downloads.html 

注意要下载二进制版本的,一开始我下载的不是二进制版本的,一直启动打不开,踩坑时觉得很奇怪

  1. 点开 http://kafka.apache.org/downloads.html 下载
    windows 下 kafka 安裝与使用
    windows 下 kafka 安裝与使用
  2. 启动问题提前放这里
  • 不是二进制启动
    windows 下 kafka 安裝与使用
  • 二进制启动
    windows 下 kafka 安裝与使用

解压启动 kafka

  1. 修改配置文件
    windows 下 kafka 安裝与使用
    这里默认就是连本机 zookeeper 至于复杂的集群这里不讨论
    windows 下 kafka 安裝与使用
    还可以修改日志启动文件路径,这里也使用默认路径好了

  2. 启动 kafka
    注意:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行
    进入 kafka 根目录
    windows 下 kafka 安裝与使用

输入 .\bin\windows\kafka-server-start.bat .\config\server.properties   

windows 下 kafka 安裝与使用
启动完成

测试验证 kafka

  1. 往 kafka 发送数据
@Test
    public void kafka() {
        //String topic = environment.getProperty("dafei.data.kafka.topic");
        UserContactEntity entity = new UserContactEntity();
        entity.setContName("aaa");

        kafkaService.send(TopicType.GPS.getTableName(), BaseJsonUtils.writeValue(entity));
        
    }
  1. 接收 kafka 中数据
@Test
    public void recive() {
        if (kafkaService != null) {
            kafkaService.subscribe(Arrays.stream(TopicType.values()).map(p -> p.getTableName()).collect(Collectors.toList()).
                        toArray(new String[TopicType.values().length]));
            while (true) {
                try {
                    if (!BaseProps.commitShutDown()) {
                        log.info("服务准备关闭,不再消费");
                        return;
                    }
                    Map<String, List<String>> messages = kafkaService.poll();

                    messages.keySet().forEach(p -> {
                        List<String> list = messages.get(p);
                        if (CollectionUtils.isNotEmpty(list)) {
                            for (String s : list) {
                                System.out.println("key--" + p + "--s--->" + s);
                            }
                        }
                    });
                    BaseProps.shutDown();
                } catch (Exception e) {
                    log.error("处理kafka信息异常", e);
                }

            }
        }
    }

可以发送,可以接收,安装验证完成

常用命令

  • 查看现有的 topic

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --list

windows 下 kafka 安裝与使用

  • 查看topic详细信息

.\bin\windows\kafka-topics.bat --zookeeper localhost:2181 --describe

windows 下 kafka 安裝与使用

单机版 kafka 安装到此结束

linux 项目开发环境中用 kafka 常用操作

  1. 查看 linux 主机 kafka 运行情况
    windows 下 kafka 安裝与使用
    我在开发中由于 kafka (开发环境单节点)不是自己搭的是同事搭的,老是需要知道安装目录,端口情况,如上文所示查看即可
    2.关闭 kafka

	./bin/kafka-server-stop.sh ./config/server.properties