生产环境推荐的kafka部署方式为operator方式部署,Strimzi是目前最主流的operator方案。集群数据量较小的话,可以采用NFS共享存储,数据量较大的话可使用local pv存储。
部署operator
[root@k8s-master31 01-kafka]# helm repo add strimzi https://strimzi.io/charts/
"strimzi" has been added to your repositories
[root@k8s-master31 01-kafka]# helm install strimzi -n kafka strimzi/strimzi-kafka-operator
NAME: strimzi
LAST DEPLOYED: Fri Aug 2 11:17:49 2024
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing strimzi-kafka-operator-0.42.0
To create a Kafka cluster refer to the following documentation.
https://strimzi.io/docs/operators/latest/deploying.html#deploying-cluster-operator-helm-chart-str
[root@k8s-master31 01-kafka]# kubectl get pod -n kafka
NAME READY STATUS RESTARTS AGE
strimzi-cluster-operator-bc8c67c48-7s4gf 1/1 Running 0 7m39s
[root@k8s-master31 01-kafka]# helm -n kafka list
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSI
strimzi kafka 1 2024-08-02 11:17:49.681761827 +0800 CST deployed strimzi-kafka-operator-0.42.0 0.42.0
创建 Kafka 集群
Strimzi官方仓库为我们提供了各种场景下的示例文件,资源清单下载地址:https://github.com/strimzi/strimzi-kafka-operator/releases
- kafka-persistent.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点的持久集群。(推荐)
- kafka-jbod.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点(每个节点使用多个持久卷)的持久集群。
- kafka-persistent-single.yaml:部署具有单个 ZooKeeper 节点和单个 Kafka 节点的持久集群。
- kafka-ephemeral.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点的临时群集。
- kafka-ephemeral-single.yaml:部署具有三个 ZooKeeper 节点和一个 Kafka 节点的临时群集。
[root@k8s-master31 01-kafka]# wget https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.42.0/strimzi-0.42.0.tar.gz
[root@k8s-master31 01-kafka]# tar xf strimzi-0.42.0.tar.gz
[root@k8s-master31 01-kafka]# cd strimzi-0.42.0/examples/kafka/
[root@k8s-master31 kafka]# ls
kafka-ephemeral-single.yaml kafka-ephemeral.yaml kafka-jbod.yaml kafka-persistent-single.yaml kafka-persistent.yaml kafka-with-node-pools.yaml kraft
创建 pvc
此处以nfs存储为例,提前创建pvc资源,分别用于3个zookeeper和3个kafka持久化存储数据使用。
[root@tiaoban kafka]# cat kafka-pvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data-my-cluster-zookeeper-0
namespace: kafka
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data-my-cluster-zookeeper-1
namespace: kafka
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data-my-cluster-zookeeper-2
namespace: kafka
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data-0-my-cluster-kafka-0
namespace: kafka
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data-0-my-cluster-kafka-1
namespace: kafka
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: data-0-my-cluster-kafka-2
namespace: kafka
spec:
storageClassName: nfs-client
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 100Gi
部署kafka和zookeeper
参考官方仓库的kafka-persistent.yaml示例文件,部署三个 ZooKeeper 和三个 Kafka 节点的持久集群。
apiVersion: kafka.strimzi.io/v1beta2 # API版本,指定使用Strimzi Kafka Operator的v1beta2版本
kind: Kafka # 资源类型为Kafka
metadata:
name: my-cluster # Kafka集群的名称
spec: # Kafka集群的规范
kafka: # Kafka集群的配置
version: 3.7.1 # Kafka的版本
replicas: 3 # Kafka的副本数量
listeners: # Kafka监听器配置
- name: plain # 监听器名称为plain
port: 9092 # 监听器端口为9092
type: internal # 监听器类型为内部
tls: false # 是否启用TLS(传输层安全性),此处为false
- name: tls # 监听器名称为tls
port: 9093 # 监听器端口为9093
type: internal # 监听器类型为内部
tls: true # 是否启用TLS,此处为true
config: # Kafka的配置
offsets.topic.replication.factor: 3 # offsets主题的副本因子
transaction.state.log.replication.factor: 3 # 事务状态日志的副本因子
transaction.state.log.min.isr: 2 # 事务状态日志的最小同步副本数
default.replication.factor: 3 # 默认副本因子
min.insync.replicas: 2 # 最小同步副本数
inter.broker.protocol.version: "3.7" # Kafka代理间的协议版本
storage: # Kafka存储配置
type: jbod # 存储类型为JBOD(Just a Bunch of Disks)
volumes: # 存储卷配置
- id: 0 # 存储卷ID
type: persistent-claim # 存储卷类型为持久化声明
size: 100Gi # 存储卷大小为100Gi
deleteClaim: false # 是否在删除Kafka集群时删除存储声明,此处为false
zookeeper: # Zookeeper的配置
replicas: 3 # Zookeeper的副本数量
storage: # Zookeeper存储配置
type: persistent-claim # 存储类型为持久化声明
size: 100Gi # 存储大小为100Gi
deleteClaim: false # 是否在删除Zookeeper集群时删除存储声明,此处为false
entityOperator: # 实体操作器的配置
topicOperator: {} # 主题操作器的配置(空表示启用默认配置)
userOperator: {} # 用户操作器的配置(空表示启用默认配置)
访问验证
[root@k8s-master31 kafka]# kubectl get pod -n kafka
NAME READY STATUS RESTARTS AGE
kafka-ui-dcfdd54c8-q45g5 1/1 Running 0 50m
my-cluster-entity-operator-86f6555b5b-hkkfk 2/2 Running 0 57m
my-cluster-kafka-0 1/1 Running 0 57m
my-cluster-kafka-1 1/1 Running 0 57m
my-cluster-kafka-2 1/1 Running 0 57m
my-cluster-zookeeper-0 1/1 Running 0 59m
my-cluster-zookeeper-1 1/1 Running 0 59m
my-cluster-zookeeper-2 1/1 Running 0 59m
strimzi-cluster-operator-bc8c67c48-p6ztc 1/1 Running 0 4h24m
[root@k8s-master31 kafka]#
[root@k8s-master31 kafka]# kubectl get svc -n kafka
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
my-cluster-kafka-bootstrap ClusterIP 172.101.111.112 <none> 9091/TCP,9092/TCP,9093/TCP 58m
my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP 58m
my-cluster-zookeeper-client ClusterIP 172.105.10.29 <none> 2181/TCP 60m
my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 60m
[root@k8s-master31 kafka]#
部署kafka-ui
:::info
官网: https://docs.kafka-ui.provectus.io/configuration/helm-charts/quick-start
:::
# 添加helm仓库地址
helm repo add kafka-ui https://provectus.github.io/kafka-ui-charts
创建configmap资源,在configmap中指定kafka连接地址
[root@tiaoban kafka]# cat kafka-ui.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-ui-helm-values
namespace: kafka
data:
KAFKA_CLUSTERS_0_NAME: "kafka-cluster"
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: "my-cluster-kafka-brokers.kafka.svc:9092"
AUTH_TYPE: "DISABLED"
MANAGEMENT_HEALTH_LDAP_ENABLED: "FALSE"
创建 configmap
[root@tiaoban kafka]# kubectl apply -f kafka-ui.yaml
configmap/kafka-ui-helm-values created
helm方式部署kafka-ui并指定 configmap 配置文件
[root@k8s-master31 kafka]# helm install kafka-ui kafka-ui/kafka-ui -n kafka --set existingConfigMap="kafka-ui-helm-values"
NAME: kafka-ui
LAST DEPLOYED: Fri Aug 2 15:01:32 2024
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
1. Get the application URL by running these commands:
export POD_NAME=$(kubectl get pods --namespace kafka -l "app.kubernetes.io/name=kafka-ui,app.kubernetes.io/instance=kafka-ui" -o jsonpath="{.items[0].metadata.name}")
echo "Visit http://127.0.0.1:8080 to use your application"
kubectl --namespace kafka port-forward $POD_NAME 8080:8080
# 端口映射
[root@k8s-master31 kafka]# kubectl --namespace kafka port-forward --address 0.0.0.0 $POD_NAME 8080:8080
Forwarding from 0.0.0.0:8080 -> 8080
测试
要创建一个可以用作 Kafka 客户端的 Pod,请运行以下命令:
kubectl run kafka-cluster-client --restart='Never' --image bitnami/kafka:3.4.1 --namespace kafka --command -- sleep infinity
#进入容器
kubectl exec --tty -i kafka-cluster-client --namespace kafka -- bash
# 启动生产者
kafka-console-producer.sh \
--broker-list my-cluster-kafka-brokers.kafka.svc:9092 \
--topic test
# 启动消费者
kafka-console-consumer.sh \
--bootstrap-server my-cluster-kafka-brokers.kafka.svc:9092 \
--topic test \
--from-beginning
浏览器访问 ip:8080