Docker安装Kafka

由于kafka依赖zookeeper,因此需要使用 docker 同时安装zookeeperkafka

1-拉取安装镜像

# 1、下载zookeeper镜像 
docker pull wurstmeister/zookeeper
# 2、下载kafka镜像
docker pull wurstmeister/kafka

2-启动服务

# 启动 zookeeper
## docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper
# 启动kafka镜像生成容器
## docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT={host-ip}:{zookeeper-port}/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://{host-ip}:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
docker run -d --restart=always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.docker.internal:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

参数说明:
-d:参数指定容器后台运行
–name kafka:参数指定容器别名

-e 参数可以设置 docker 容器内环境变量,每个变量的解释:

-e KAFKA_BROKER_ID=0:在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
-e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181/kafka:配置zookeeper管理kafka的路径host.docker.internal:2181/kafka
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.docker.internal:9092:把kafka的地址端口注册给zookeeper,如果是远程访问要改成外网IP,类如 Java 程序访问出现无法连接。
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092:配置kafka的监听端口

-v参数设置:

-v /etc/localtime:/etc/localtime 容器时间同步虚拟机的时间

注意点:

比如我的电脑是 mac,在 host-ip 这块就不能填本机 ip(windows 和 linux 可以),需要填docker.for.mac.host.internal或者host.docker.internal,zookeeper 端口启动在 2181,kafka 即将启动在 9092,那么我的命令可以是这样的docker run -d –name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=host.docker.internal:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://host.docker.internal:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka,如果要对日志文件做限制,可以参靠上面的命令。

由于 macOS 的 docker 底层实现的不同,主要原因是 macOS 的 docker 在容器和宿主之间无法通过 ip 直接通信。因此在安装的时候需要特殊注意与 ip 相关的设置,当容器需要访问宿主ip时,需要使用docker.for.mac.host.internal或者host.docker.internal代替。

这里向zookeeper注册的时候,使用的是host.docker.internal,我们在程序中连接kafka的时候,直接使用localhost会报错,如:Error connecting to node host.docker.internal:9092,其处理方式:

  • 我们可以在本机的 host 文件中,添加映射,将 127.0.0.1 host.docker.internal;
  • 不使用host.docker.internal,而是使用自己主机IP,但是当IP发生变化的时候,就要重新配置。
# 测试容器内访问宿主机ip 
curl host.docker.internal:2181

3-验证

3.1-进入 kafka 容器

docker exec -it kafka bash

3.2-进入 kafka 容器中的脚本目录

注意,此时应该已经进入到了容器中的bash。进入 kafka 的脚本目录,其中kafka_2.13-2.7.0可能会随着版本而变化数字。

bash-5.1# cd opt/kafka_2.13-2.7.0/bin/

通过 ls 可以看到许多的.sh 脚本

bash-5.1# ls
connect-distributed.sh kafka-console-producer.sh kafka-leader-election.sh kafka-run-class.sh trogdor.sh
connect-mirror-maker.sh kafka-consumer-groups.sh kafka-log-dirs.sh kafka-server-start.sh windows
connect-standalone.sh kafka-consumer-perf-test.sh kafka-mirror-maker.sh kafka-server-stop.sh zookeeper-security-migration.sh
kafka-acls.sh kafka-delegation-tokens.sh kafka-preferred-replica-election.sh kafka-streams-application-reset.sh zookeeper-server-start.sh
kafka-broker-api-versions.sh kafka-delete-records.sh kafka-producer-perf-test.sh kafka-topics.sh zookeeper-server-stop.sh
kafka-configs.sh kafka-dump-log.sh kafka-reassign-partitions.sh kafka-verifiable-consumer.sh zookeeper-shell.sh
kafka-console-consumer.sh kafka-features.sh kafka-replica-verification.sh kafka-verifiable-producer.sh

3.3-测试 kafka 生产者和消费者

1. 启动 kafka 生产者

运行 kafka 生产者发送消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic

看到出现了个对话提示的小>就可以发送消息了,不过不要着急,先把消费者启动了。

2. 启动 kafka 消费者

另起一个终端,进入 kafka 容器,进入/opt/kafka_2.13-2.7.0/bin目录,运行 kafka 消费者接收消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning

3. 测试发送和接收消息

在生产者中发送消息,消费者中应该能够收到对应的消息。

生产者

bash-5.1# ./kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>111111
>222222
>33333
>

消费者

bash-5.1# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning
111111
222222
33333

4. 创建topic并查看

创建topic

# 创建脚本
./kafka-topics.sh --create --zookeeper host.docker.internal:2181/kafka --replication-factor 1 --partitions 1 --topic topic01
# 返回结果
Created topic topic01.
# 或者下面这种方式
bin/kafka-topics.sh --create --topic topic02 --partitions 2 --replication-factor 1 --bootstrap-server 127.0.0.1:9092

查看topic

# 查看topic脚本
kafka-topics.sh --zookeeper host.docker.internal:2181/kafka --list
# 返回结果
__consumer_offsets
test-topic
topic01
# 或者以下面这种方式
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

5. 查看消费者组的消费情况

查看的脚本

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group02

返回结果

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
group02 topic02 0 0 0 0 consumer-2-f4a165d3-dc0d-49cf-82e3-130a171f8538 /172.17.0.1 consumer-2
group02 topic02 1 0 0 0 consumer-3-08cdd82c-dcd1-4c7e-bdb1-287e76fa290c /172.17.0.1 consumer-3
  • TOPIC:topic名字
  • PARTITION:分区id
  • CURRENT-OFFSET:当前已消费的条数
  • LOG-END-OFFSET:总条数
  • LAG:未消费的条数(延迟)
  • CONSUMER-ID:消费id
  • HOST:主机ip
  • CLIENT-ID:客户端id