首页 > 其他分享 >K8s - 安装部署Kafka、Zookeeper集群教程(支持从K8s外部访问)

K8s - 安装部署Kafka、Zookeeper集群教程(支持从K8s外部访问)

时间:2023-10-26 12:13:40浏览次数:52  
标签:name service zk -- Zookeeper kafka K8s local Kafka

本文演示如何在K8s集群下部署Kafka集群,并且搭建后除了可以K8s内部访问Kafka服务,也支持从K8s集群外部访问Kafka服务。服务的集群部署通常有两种方式:一种是 StatefulSet,另一种是 Service&Deployment。本次我们使用 StatefulSet 方式搭建 ZooKeeper 集群,使用 Service&Deployment 搭建 Kafka 集群。

一、创建 NFS 存储

    NFS 存储主要是为了给 Kafka、ZooKeeper 提供稳定的后端存储,当 Kafka、ZooKeeper 的 Pod 发生故障重启或迁移后,依然能获得原先的数据。

1,安装 NFS

这里我选择在 master 节点创建 NFS 存储,首先执行如下命令安装 NFS:
yum -y install nfs-utils
yum -y install rpcbind

2,创建共享文件夹

(1)执行如下命令创建 6 个文件夹:
mkdir -p /usr/local/k8s/zookeeper/pv{1..3}
mkdir -p /usr/local/k8s/kafka/pv{1..3}

(2)编辑 /etc/exports 文件:

vim /etc/exports

(3)在里面添加如下内容:

/usr/local/k8s/kafka/pv1 *(rw,sync,no_root_squash)
/usr/local/k8s/kafka/pv2 *(rw,sync,no_root_squash)
/usr/local/k8s/kafka/pv3 *(rw,sync,no_root_squash)
/usr/local/k8s/zookeeper/pv1 *(rw,sync,no_root_squash)
/usr/local/k8s/zookeeper/pv2 *(rw,sync,no_root_squash)
/usr/local/k8s/zookeeper/pv3 *(rw,sync,no_root_squash)

(4)保存退出后执行如下命令重启服务:

如果执行 systemctl restart nfs 报“Failed to restart nfs.service: Unit nfs.service not found.”错误,可以尝试改用如下命令:

    • sudo service nfs-server start
systemctl restart rpcbind
systemctl restart nfs
systemctl enable nfs

(5)执行 exportfs -v 命令可以显示出所有的共享目录:

 (6)而其他的 Node 节点上需要执行如下命令安装 nfs-utils 客户端:

yum -y install nfs-util

(7)然后其他的 Node 节点上可执行如下命令(ip 为 Master 节点 IP)查看 Master 节点上共享的文件夹:

showmount -e  107.106.37.33(nfs服务端的IP)

 

二、创建 ZooKeeper 集群

1,创建 ZooKeeper PV

(1)首先创建一个 zookeeper-pv.yaml 文件,内容如下:

注意:170.106.37.33 需要改成实际 NFS 服务器地址:
apiVersion: v1
kind: PersistentVolume
metadata:
  name: k8s-pv-zk01
  labels:
    app: zk
  annotations:
    volume.beta.kubernetes.io/storage-class: "anything"
spec:
  capacity:
    storage: 1Gi
  accessModes:
    - ReadWriteOnce
  nfs:
    server: 170.106.37.33
    path: "/usr/local/k8s/zookeeper/pv1"
  persistentVolumeReclaimPolicy: Recycle
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: k8s-pv-zk02
  labels:
    app: zk
  annotations:
    volume.beta.kubernetes.io/storage-class: "anything"
spec:
  capacity:
    storage: 1Gi
  accessModes:
    - ReadWriteOnce
  nfs:
    server: 170.106.37.33
    path: "/usr/local/k8s/zookeeper/pv2"
  persistentVolumeReclaimPolicy: Recycle
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: k8s-pv-zk03
  labels:
    app: zk
  annotations:
    volume.beta.kubernetes.io/storage-class: "anything"
spec:
  capacity:
    storage: 1Gi
  accessModes:
    - ReadWriteOnce
  nfs:
    server: 170.106.37.33
    path: "/usr/local/k8s/zookeeper/pv3"
  persistentVolumeReclaimPolicy: Recycle

(2)然后执行如下命令创建 PV:

kubectl apply -f zookeeper-pv.yaml

(3)执行如下命令可以查看是否创建成功:

kubectl get pv

2,创建 ZooKeeper 集群

(1)我们这里要搭建一个包含 3 个节点的 ZooKeeper 集群。首先创建一个 zookeeper.yaml 文件,内容如下:
apiVersion: v1
kind: Service
metadata:
  name: zk-hs
  labels:
    app: zk
spec:
  selector:
    app: zk
  clusterIP: None
  ports:
    - name: server
      port: 2888
    - name: leader-election
      port: 3888
---
apiVersion: v1
kind: Service
metadata:
  name: zk-cs
  labels:
    app: zk
spec:
  selector:
    app: zk
  type: NodePort
  ports:
    - name: client
      port: 2181
      nodePort: 31811
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zk
spec:
  serviceName: "zk-hs"
  replicas: 3 # by default is 1
  selector:
    matchLabels:
      app: zk # has to match .spec.template.metadata.labels
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: zk # has to match .spec.selector.matchLabels
    spec:
      containers:
        - name: zk
          imagePullPolicy: Always
          image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10
          ports:
            - containerPort: 2181
              name: client
            - containerPort: 2888
              name: server
            - containerPort: 3888
              name: leader-election
          command:
            - sh
            - -c
            - "start-zookeeper \
        --servers=3 \
        --data_dir=/var/lib/zookeeper/data \
        --data_log_dir=/var/lib/zookeeper/data/log \
        --conf_dir=/opt/zookeeper/conf \
        --client_port=2181 \
        --election_port=3888 \
        --server_port=2888 \
        --tick_time=2000 \
        --init_limit=10 \
        --sync_limit=5 \
        --heap=4G \
        --max_client_cnxns=60 \
        --snap_retain_count=3 \
        --purge_interval=12 \
        --max_session_timeout=40000 \
        --min_session_timeout=4000 \
        --log_level=INFO"
          readinessProbe:
            exec:
              command:
                - sh
                - -c
                - "zookeeper-ready 2181"
            initialDelaySeconds: 10
            timeoutSeconds: 5
          livenessProbe:
            exec:
              command:
                - sh
                - -c
                - "zookeeper-ready 2181"
            initialDelaySeconds: 10
            timeoutSeconds: 5
          volumeMounts:
            - name: datadir
              mountPath: /var/lib/zookeeper
  volumeClaimTemplates:
    - metadata:
        name: datadir
        annotations:
          volume.beta.kubernetes.io/storage-class: "anything"
      spec:
        accessModes: [ "ReadWriteOnce" ]
        resources:
          requests:
            storage: 1Gi

(2)然后执行如下命令开始创建:

kubectl apply -f zookeeper.yaml

(3)执行如下命令可以查看是否创建成功:

kubectl get pods
kubectl get service

 

三、创建 Kafka 集群

(1)我们这里要搭建一个包含 3 个节点的 Kafka 集群。首先创建一个 kafka.yaml 文件,内容如下: 注意:
  • nfs 地址需要改成实际 NFS 服务器地址。
  • status.hostIP 表示宿主机的 IP,即 Pod 实际最终部署的 Node 节点 IP(本文我是直接部署到 Master 节点上),将 KAFKA_ADVERTISED_HOST_NAME 设置为宿主机 IP 可以确保 K8s 集群外部也可以访问 Kafka

apiVersion: v1
kind: Service
metadata:
  name: kafka-service-1
labels:
  app: kafka-service-1
spec:
  type: NodePort
  ports:
    - port: 9092
      name: kafka-service-1
      targetPort: 9092
      nodePort: 30901
      protocol: TCP
    selector:
      app: kafka-1
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-2
labels:
  app: kafka-service-2
spec:
  type: NodePort
  ports:
    - port: 9092
      name: kafka-service-2
      targetPort: 9092
      nodePort: 30902
      protocol: TCP
    selector:
      app: kafka-2
---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service-3
labels:
  app: kafka-service-3
spec:
  type: NodePort
  ports:
    - port: 9092
      name: kafka-service-3
      targetPort: 9092
      nodePort: 30903
      protocol: TCP
    selector:
      app: kafka-3
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-1
  template:
    metadata:
      labels:
        app: kafka-1
    spec:
      containers:
        - name: kafka-1
          image: wurstmeister/kafka
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
          env:
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
            - name: KAFKA_BROKER_ID
              value: "1"
            - name: KAFKA_CREATE_TOPICS
              value: mytopic:2:1
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://0.0.0.0:9092
            - name: KAFKA_ADVERTISED_PORT
              value: "30901"
            - name: KAFKA_ADVERTISED_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
          volumeMounts:
            - name: datadir
              mountPath: /var/lib/kafka
      volumes:
         - name: datadir
           nfs:
           server: 170.106.37.33
           path: "/usr/local/k8s/kafka/pv1"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-2
  template:
    metadata:
      labels:
        app: kafka-2
    spec:
      containers:
        - name: kafka-2
          image: wurstmeister/kafka
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
          env:
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
            - name: KAFKA_BROKER_ID
              value: "2"
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://0.0.0.0:9092
            - name: KAFKA_ADVERTISED_PORT

              value: "30902"
            - name: KAFKA_ADVERTISED_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
         volumeMounts:
            - name: datadir
              mountPath: /var/lib/kafka
      volumes:
        - name: datadir
          nfs:
            server: 170.106.37.33
            path: "/usr/local/k8s/kafka/pv2"
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-deployment-3
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-3
  template:
    metadata:
      labels:
        app: kafka-3
    spec:
      containers:
        - name: kafka-3
          image: wurstmeister/kafka
          imagePullPolicy: IfNotPresent
          ports:
            - containerPort: 9092
          env:
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
            - name: KAFKA_BROKER_ID
              value: "3"
            - name: KAFKA_LISTENERS
              value: PLAINTEXT://0.0.0.0:9092
            - name: KAFKA_ADVERTISED_PORT
              value: "30903"
            - name: KAFKA_ADVERTISED_HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: status.hostIP
         volumeMounts:
            - name: datadir
              mountPath: /var/lib/kafka
      volumes:
         - name: datadir
           nfs:
             server: 170.106.37.33
             path: "/usr/local/k8s/kafka/pv3"

(2)然后执行如下命令开始创建:

kubectl apply -f kafka.yaml

(3)执行如下命令可以查看是否创建成功:

kubectl get pods
kubectl get service

 

四、开始测试

1,K8s 集群内部测试

(1)首先执行如下命令进入一个容器:
kubectl exec -it kafka-deployment-1-59f87c7cbb-99k46 /bin/bash

(2)接着执行如下命令创建一个名为 test_topic 的 topic:

kafka-topics.sh --create --topic test_topic --zookeeper zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 --partitions 1 --replication-factor 1

(3)创建后执行如下命令开启一个生产者,启动后可以直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。

kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic

(4)重新再打开一个终端连接服务器,然后进入容器后执行如下命令开启一个消费者:

kafka-console-consumer.sh --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic

(5)再次打开之前的消息生产客户端来发送消息,并观察消费者这边对消息的输出来体验 Kafka 对消息的基础处理。

 

2,集群外出测试

    使用 Kafka 客户端工具(Offset Explorer)连接 Kafka 集群(可以通过 zookeeper 地址连接,也可以通过 kafka 地址连接),可以连接成功并能查看到数据。

 

更多的测试命令参考:

二、Kafka生产者消费者实例(基于命令行)
1.创建一个itcasttopic的主题
代码如下(示例):
kafka-topics.sh --create --topic itcasttopic --partitions 3 --replication-factor 2 -zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181

2.hadoop01当生产者
代码如下(示例):
kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic itcasttopic

3.hadoop02当消费者
代码如下(示例):
kafka-console-consumer.sh --from-beginning --topic itcasttopic --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092

3.–list查看所有主题
代码如下(示例):
kafka-topics.sh --list --zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181

4.删除主题
代码如下(示例):
kafka-topics.sh --delete --zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181 --topic itcasttopic

5.关闭kafka
代码如下(示例):
bin/kafka-server-stop.sh config/server.properties

 

 

 

 

 

标签:name,service,zk,--,Zookeeper,kafka,K8s,local,Kafka
From: https://www.cnblogs.com/fengyuanfei/p/17789107.html

相关文章

  • Kafka 生产者API,消费者API,拦截器,流计算
    pom文件如下:<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId......
  • Kafka 简介、集群架构、安装部署、基本命令
    一、kafka是什么?在实时计算中,Kafka主要是用来缓存数据,storm可以通过消费kafka中的数据进行实时计算。一套开源的分布式的消息队列系统,由scala写成,支持javaAPI。Kafka读消息采用topic进行归类。二、kafka中有哪两种角色?发送消息:Producer(生产者)接收消息:Consumer(消费者)三......
  • zookeeper 的安装和配置
    一、下载zookeeper以zookeeper-3.4.10为例:https://archive.apache.org/dist/zookeeper/zookeeper-3.4.10/二、上传到Linux服务器我使用的是WinSCP进行上传,下载地址:https://dl.pconline.com.cn/html_2/1/86/id=7244&pn=0&linkPage=1.html三、安装解压tar包(我是解压到家目录......
  • 利用 zookeeper 的分布式锁实现秒杀
    常见的业务场景:x年x月x日x点x分x秒,限时抢购10件商品。前提:分布式的环境,多用户高并发访问。依赖的jar包<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0htt......
  • kafka基于SCRAM认证,快速配置启用ACL
    启动和停止服务zookeeper/usr/local/apache-zookeeper-3.8.2-bin/bin/zkServer.shstart/usr/local/apache-zookeeper-3.8.2-bin/bin/zkServer.shstopkafka/usr/local/kafka_2.13-3.2.3/bin/kafka-server-stop.sh/usr/local/kafka_2.13-3.2.3/bin/kafka-server-start.sh-......
  • docker-compose部署SASL认证的kafka
    前言测试服务器:10.255.60.149一.编写docker-compose文件1.docker-compose.ymlversion:'3.8'services:zookeeper:image:wurstmeister/zookeepervolumes:-/data/zookeeper/data:/data-/home/docker-compose/kafka/config:/opt/zookeeper-......
  • helm部署kafka鉴权以及ACL
    官方文档https://github.com/bitnami/charts/tree/main/bitnami/kafkahttps://blog.csdn.net/u011618288/article/details/129105777(包含zookeeper与broker认证、鉴权流程)一.修改values.yaml文件按通用部署方案拉下来kafka安装包,修改values.yaml文件,开启scram鉴权,ACL以......
  • kafka-ACL
    本文档时在centos7直接部署添加认证的kafka文件基础上,做下面的修改实现ACL访问控制topic参考:https://www.seaxiang.com/blog/Qpsqii一.添加多个kafka用户及相关的配置文件1.kafka_server_jaas.confKafkaServer{org.apache.kafka.common.security.plain.PlainLoginModule......
  • centos7直接部署添加认证的kafka
    前言测试服务器:10.255.60.149一.安装jdk官网下载jdk1.8版本以上的https://www.oracle.com/java/technologies/downloads/测试系统版本为centos7,选择了最后一个下载后,使用rpm-ivh即可安装二.安装zookeeper和kafka软件版本:kafka_2.12-2.4.0(带zookeeper)下载链接:http://a......
  • k8s service访问偶发超时问题
    问题现象在某个集群节点上的服务访问service服务:端口,会出现偶发timeout的问题,集群有的节点不会出现访问timeout的问题问题处理查看bridge-nf-call-iptables参数是否开启cat/proc/sys/net/bridge/bridge-nf-call-iptables0问题解决开启内核参数echo"net.bridge.br......