docker安装kafka

背景

最近学习到Kafka相关知识,所以用docker在vps上安装单节点版和集群版方便后续学习

环境

  • Debian10
  • Kafka:

单节点版

使用docker compose安装。注意在yml目录下创建一个data目录放上两个文件message.json和update_run.sh,做好权限修改后up一下就行

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
version: '2'
services:

kafka1:
image: confluentinc/cp-kafka:7.2.1
hostname: kafka1
container_name: kafka1
ports:
- "9092:9092"
- "9997:9997"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka1:29092,PLAINTEXT_HOST://154.40.46.38:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9997
KAFKA_JMX_HOSTNAME: localhost
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka1:29092,CONTROLLER://kafka1:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
volumes:
- ./data/update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo \"ERROR: Did you forget the update_run.sh file that came with this docker-compose.yml file?\" && exit 1 ; else /tmp/update_run.sh && /etc/confluent/docker/run ; fi'"

schemaregistry1:
image: confluentinc/cp-schema-registry:7.2.1
ports:
- 18085:8085
depends_on:
- kafka1
volumes:
- ./data/jaas:/conf
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:29092
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: PLAINTEXT
SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
SCHEMA_REGISTRY_LISTENERS: http://schemaregistry1:8085

# ui那边SCHEMA模块默认账户密码: admin/letmein
SCHEMA_REGISTRY_AUTHENTICATION_METHOD: BASIC
SCHEMA_REGISTRY_AUTHENTICATION_REALM: SchemaRegistryProps
SCHEMA_REGISTRY_AUTHENTICATION_ROLES: admin
SCHEMA_REGISTRY_OPTS: -Djava.security.auth.login.config=/conf/schema_registry.jaas

SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: "http"
SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL: INFO
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _schemas

kafka-init-topics:
image: confluentinc/cp-kafka:7.2.1
volumes:
- ./data/message.json:/data/message.json
depends_on:
- kafka1
command: "bash -c 'echo Waiting for Kafka to be ready... && \
cub kafka-ready -b kafka1:29092 1 30 && \
kafka-topics --create --topic users --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \
kafka-topics --create --topic messages --partitions 2 --replication-factor 1 --if-not-exists --bootstrap-server kafka1:29092 && \
kafka-console-producer --bootstrap-server kafka1:29092 --topic users < /data/message.json'"

# ui控制
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 18080:8080
depends_on:
- kafka1
- schemaregistry1
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka1:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schemaregistry1:8085
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_USERNAME: admin
KAFKA_CLUSTERS_0_SCHEMAREGISTRYAUTH_PASSWORD: letmein
AUTH_TYPE: "LOGIN_FORM"
SPRING_SECURITY_USER_NAME: username
SPRING_SECURITY_USER_PASSWORD: passwd

ref: https://blog.csdn.net/dghkgjlh/article/details/133418837

https://flxdu.cn/2023/01/23/Kafka-in-Docker-%E5%8D%95%E8%8A%82%E7%82%B9-%E5%A4%9A%E8%8A%82%E7%82%B9/

update_run.sh

1
2
3
4
5
6
7
8
9
10
11
# This script is required to run kafka cluster (without zookeeper)
#!/bin/sh

# Docker workaround: Remove check for KAFKA_ZOOKEEPER_CONNECT parameter
sed -i '/KAFKA_ZOOKEEPER_CONNECT/d' /etc/confluent/docker/configure

# Docker workaround: Ignore cub zk-ready
sed -i 's/cub zk-ready/echo ignore zk-ready/' /etc/confluent/docker/ensure

# KRaft required step: Format the storage directory with a new cluster ID
echo "kafka-storage format --ignore-formatted -t $(kafka-storage random-uuid) -c /etc/kafka/kafka.properties" >> /etc/confluent/docker/ensure

message.json

放个空的就行

1
{}

Kafka集群版

这里用多个docker模拟集群的多节点, 直接复制yml然后up就行

参考

docker-compose.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
# Copyright VMware, Inc.
# SPDX-License-Identifier: APACHE-2.0

version: "2"

services:
kafka-0:
image: docker.io/bitnami/kafka:3.6
ports:
- "9092:9092"
restart: unless-stopped
environment:
# KRaft settings
# 结点序号1
- KAFKA_CFG_NODE_ID=0
# 结点角色:controller,broker(同一时间只会有一个controller)
- KAFKA_CFG_PROCESS_ROLES=controller,broker
# controller选举投票权结点
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
# 禁用自动创建topic
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# 日志保留时间
- KAFKA_CFG_LOG_RETENTION_HOURS=168
# 单个分区大小
- KAFKA_CFG_LOG_RETENTION_BYTES=1073741824
# 启用日志压缩
- KAFKA_CFG_LOG_CLEANER_ENABLE=true
# 消息体最大为1M
- KAFKA_CFG_MESSAGE_MAX_BYTES=1000012
# ISR中最小个数,小于这个生产者可能发送失败,数据丢失风险增加,可靠性降低
- KAFKA_CFG_MIN_IN_SYNC_REPLICAS=2
# 集群ID
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
# Listeners
# 服务监听部分,监听器PLAINTEXT(没错这只是个监听器名称)监听9092,监听器CONTROLLER监听9093端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 连接建议监听器为PLAINTEXT,端口为9092,这里为了外部访问需要指定外部ip(如果外部有代理则这里就用集群内部格式就行)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://103.171.26.248:9092
# 映射监听器名称和使用的协议,PLAINTEXT使用的协议是PLAINTEXT,CONTROLLER也用的PLAINTEXT(比较方便)
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
# 指定用于控制器通信的监听器
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# broker间通信使用的监听器
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
# 主题消费记录偏移量的副本个数(记录消费组消费主题的偏移量的副本)
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
# Kafka 会在集群中的三个不同节点上保持事务状态日志的副本
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
# 至少有两个副本(包括领导副本)需要是同步的,才能继续事务操作
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_0_data:/bitnami/kafka
kafka-1:
image: docker.io/bitnami/kafka:3.6
ports:
- "9093:9092"
restart: unless-stopped
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=1
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CFG_LOG_RETENTION_HOURS=168
- KAFKA_CFG_LOG_RETENTION_BYTES=1073741824
- KAFKA_CFG_LOG_CLEANER_ENABLE=true
- KAFKA_CFG_MESSAGE_MAX_BYTES=1000012
- KAFKA_CFG_MIN_IN_SYNC_REPLICAS=2
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://103.171.26.248:9093
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_1_data:/bitnami/kafka
kafka-2:
image: docker.io/bitnami/kafka:3.6
ports:
- "9094:9092"
restart: unless-stopped
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093
- KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
- KAFKA_CFG_LOG_RETENTION_HOURS=168
- KAFKA_CFG_LOG_RETENTION_BYTES=1073741824
- KAFKA_CFG_LOG_CLEANER_ENABLE=true
- KAFKA_CFG_MESSAGE_MAX_BYTES=1000012
- KAFKA_CFG_MIN_IN_SYNC_REPLICAS=2
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://103.171.26.248:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Clustering
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
volumes:
- kafka_2_data:/bitnami/kafka

volumes:
kafka_0_data:
driver: local
kafka_1_data:
driver: local
kafka_2_data:
driver: local

查看集群内的topic

1
2
3
4
5
# 进入容器
docker exec -it [容器名称或ID] /bin/bash

# 调用kafka脚本
kafka-topics.sh --bootstrap-server localhost:9092 --list

集群内新建一个topic

1
2
# 用脚本创建
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic TOPIC_LY_TEST

查看某个topic信息

1
kafka-topics.sh --describe --topic TOPIC_LY_TEST --bootstrap-server localhost:9092