首页 > 其他分享 >如何借助Kafka持久化存储K8S事件数据?

如何借助Kafka持久化存储K8S事件数据?

时间:2023-05-22 10:02:45浏览次数:57  
标签:存储 kube name Kubernetes kafka K8S events Kafka

大家应该对 Kubernetes Events 并不陌生,特别是当你使用 kubectl describe 命令或 Event API 资源来了解集群中的故障时。  

$ kubectl get events

15m         Warning   FailedCreate                                                                                                      replicaset/ml-pipeline-visualizationserver-865c7865bc    

Error creating: pods "ml-pipeline-visualizationserver-865c7865bc-" is forbidden: error looking up service account default/default-editor: serviceaccount "default-editor" not found

 

尽管这些信息十分有用,但它只是临时的,保留时间最长为30天。如果出于审计或是故障诊断等目的,你可能想要把这些信息保留得更久,比如保存在像 Kafka 这样更持久、高效的存储中。然后你可以借助其他工具(如 Argo Events)或自己的应用程序订阅 Kafka 主题来对某些事件做出响应。  

构建K8s事件处理链路

我们将构建一整套 Kubernetes 事件处理链路,其主要构成为:

  • Eventrouter,开源的 Kubernetes event 处理器,它可以将所有集群事件整合汇总到某个 Kafka 主题中。
  • Strimzi Operator,在 Kubernetes 中轻松管理 Kafka broker。
  • 自定义 Go 二进制文件以将事件分发到相应的 Kafka 主题中。  

为什么要把事件分发到不同的主题中?比方说,在集群的每个命名空间中存在与特定客户相关的 Kubernetes 资产,那么在使用这些资产之前你当然希望将相关事件隔离开。  

本示例中所有的配置、源代码和详细设置指示都已经放在以下代码仓库中:  

 

创建 Kafka broker 和主题

我选择使用 Strimzi(strimzi.io/) 将 Kafka 部署到 Kubernetes 中。简而言之,它是用于创建和更新 Kafka broker 和主题的。你可以在官方文档中找到如何安装该 Operator 的详细说明:  

首先,创建一个新的 Kafka 集群:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: kube-events
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 3
      log.message.format.version: "2.6"
      offsets.topic.replication.factor: 3
      transaction.state.log.min.isr: 2
      transaction.state.log.replication.factor: 3
    listeners:
    - name: plain
      port: 9092
      tls: false
      type: internal
    - name: tls
      port: 9093
      tls: true
      type: internal
    replicas: 3
    storage:
      type: jbod
      volumes:
      - deleteClaim: false
        id: 0
        size: 10Gi
        type: persistent-claim
    version: 2.6.0
  zookeeper:
    replicas: 3
    storage:
      deleteClaim: false
      size: 10Gi
      type: persistent-claim

 

然后创建 Kafka 主题来接收我们的事件:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaTopic
metadata:
  name: cluster-events
spec:
  config:
    retention.ms: 7200000
    segment.bytes: 1073741824
  partitions: 1
  replicas: 1

 

设置 EventRouter

在本教程中使用 kubectl apply 命令即可,我们需要编辑 router 的配置,以指明我们的 Kafka 端点和要使用的主题:

apiVersion: v1
data:
  config.json: |-
    {
      "sink": "kafka",
      "kafkaBrokers": "kube-events-kafka-bootstrap.kube-events.svc.cluster.local:9092",
      "kafkaTopic": "cluster-events"
    }
kind: ConfigMap
metadata:
  name: eventrouter-cm

 

验证设置是否正常工作

我们的 cluster-events Kafka 的主题现在应该收到所有的事件。最简单的方法是在主题上运行一个 consumer 来检验是否如此。为了方便期间,我们使用我们的一个 Kafka broker pods,它已经有了所有必要的工具,你可以看到事件流:

kubectl -n kube-events exec kube-events-kafka-0 -- bin/kafka-console-consumer.sh \
  --bootstrap-server kube-events-kafka-bootstrap:9092 \
  --topic kube-events \
  --from-beginning
{"verb":"ADDED","event":{...}}
{"verb":"ADDED","event":{...}}
...

 

编写 Golang 消费者

现在我们想将我们的 Kubernetes 事件依据其所在的命名空间分发到多个主题中。我们将编写一个 Golang 消费者和生产者来实现这一逻辑:

  • 消费者部分在 cluster-events 主题上监听传入的集群事件
  • 生产者部分写入与事件的命名空间相匹配的 Kafka 主题中  

如果为Kafka配置了适当的选项(默认情况),就不需要特地创建新的主题,因为 Kafka 会默认为你创建主题。这是 Kafka 客户端 API 的一个非常酷的功能。

p, err := kafka.NewProducer(cfg.Endpoint)
if err != nil {
        sugar.Fatal("cannot create producer")
}
defer p.Close()

c, err := kafka.NewConsumer(cfg.Endpoint, cfg.Topic)
if err != nil {
        sugar.Fatal("cannot create consumer")
}
defer c.Close()

run := true
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
        sig := <-sigs
        sugar.Infof("signal %s received, terminating", sig)
        run = false
}()

var wg sync.WaitGroup
go func() {
        wg.Add(1)
        for run {
                data, err := c.Read()
                if err != nil {
                        sugar.Errorf("read event error: %v", err)
                        time.Sleep(5 * time.Second)
                        continue
                }
                if data == nil {
                        continue
                }
                msg, err := event.CreateDestinationMessage(data)
                if err != nil {
                        sugar.Errorf("cannot create destination event: %v", err)
                }
                p.Write(msg.Topic, msg.Message)
        }
        sugar.Info("worker thread done")
        wg.Done()
}()

wg.Wait()

 

完整代码在此处:  

当然还有更高性能的选择,这取决于预计的事件量和扇出(fanout)逻辑的复杂性。对于一个更强大的实现,使用 Spark Structured Streaming 的消费者将是一个很好的选择。  

部署消费者

构建并将二进制文件推送到 Docker 镜像之后,我们将它封装为 Kubernetes deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: events-fanout
  name: events-fanout
spec:
  replicas: 1
  selector:
    matchLabels:
      app: events-fanout
  template:
    metadata:
      labels:
        app: events-fanout
    spec:
      containers:
        - image: emmsys/events-fanout:latest
          name: events-fanout
          command: [ "./events-fanout"]
          args:
            - -logLevel=info
          env:
            - name: ENDPOINT
              value: kube-events-kafka-bootstrap:9092
            - name: TOPIC
              value: cluster-events

 

检查目标主题是否创建

现在,新的主题已经创建完成:

kubectl -n kube-events get kafkatopics.kafka.strimzi.io -o name

kafkatopic.kafka.strimzi.io/cluster-events
kafkatopic.kafka.strimzi.io/kube-system
kafkatopic.kafka.strimzi.io/default
kafkatopic.kafka.strimzi.io/kafka
kafkatopic.kafka.strimzi.io/kube-events

 

你会发现你的事件根据其命名空间整齐地存储在这些主题中。  

总结

访问 Kubernetes 历史事件日志可以使你对 Kubernetes 系统的状态有了更好的了解,但这单靠 kubectl 比较难做到。更重要的是,它可以通过对事件做出反应来实现集群或应用运维自动化,并以此来构建可靠、反应灵敏的软件。  

原文链接: https://hackernoon.com/monitor-your-kubernetes-cluster-events-with-eventrouter-golang-and-kafka-wh2a35l0

标签:存储,kube,name,Kubernetes,kafka,K8S,events,Kafka
From: https://blog.51cto.com/u_15682575/6321008

相关文章

  • 当k8s拉镜像出现问题时 需要修改 /etc/containerd/config.toml 配置
    找到plugins."io.containerd.grpc.v1.cri".registry添加此两处配置 [plugins."io.containerd.grpc.v1.cri".registry.configs][plugins."io.containerd.grpc.v1.cri".registry.configs."192.168.16.185:8088".tls]insecu......
  • 计算机组成原理:存储器实验
    实验名称:2存储器实验实验目的掌握静态随机存储器RAM工作特性及数据的读写方法。基于信号时序图,了解读写静态随机存储器的原理。掌握Cache的原理及其设计方法。熟悉FPGA应用设计及EDA软件的使用。实验设备PC机一台,TDX-CMX实验系统一套。实验预习静态随机存储器实验1......
  • C语言编程—存储知识
    存储类定义C程序中变量/函数的的存储位置、生命周期和作用域。这些说明符放置在它们所修饰的类型之前。下面列出C程序中可用的存储类:autoregisterstaticexternauto存储类auto存储类是所有局部变量默认的存储类。定义在函数中的变量默认为auto存储类,这意味着它们在函数开始......
  • 《数据结构与算法》之数据的顺存储
    导言:数据结构中,对一些数据序列我们使用的是顺序的方式存储,比较常见的有数组,链表,这些都是最基本的顺序存储的结构,我们会用几个简单的例子来描述顺序存储的方式和演变我们知道顺序存储中有链表,有链表我们就必须知道指针,所以我们先复习一下指针,再来看顺序存储一.指针在C语言中,我......
  • 阿里云对象存储OSS————跨域资源共享(CORS)(m3u8 无法加载m3u8:跨域访问被拒绝)...
    今天在做视频直播录像的时候,添加一个录制APP的.M3U8文件到OSS的一个test文件中存储,结果是访问不到了:提示:无法加载m3u8:跨域访问被拒绝!!!!!项目代码测试地址:https://github.com/Tinywan/ThinkPhpStudy阿里云帮助文档:https://help.aliyun.com/document_detail/31928.html......
  • 【中间件】通过 docker-compose 快速部署 Kafka 保姆级教程
    目录一、概述二、前期准备1)部署docker2)部署docker-compose三、创建网络四、安装Zookeeper五、Kafka编排部署1)下载Kafka2)配置3)启动脚本bootstrap.sh4)构建镜像Dockerfile5)编排docker-compose.yaml6)开始部署六、简单测试验证七、常用的Kafka客户端命令1)添加topic2)查看topic......
  • k8s 1.24.14 Ingress-nginx 的部署
    前言:本次部署使用了高可用的形式,会在每个node节点做亲和性(master不部署),让每一个pod都部署上去,然后加入NGINX去过负载,这样我们之间用NGINX的80端口访问域名就可以了。MountVolume.SetUpfailedforvolume"webhook-cert":secret"ingress-nginx-admission"notfound。......
  • 第五章 树的存储,树和森林的遍历
    双亲表示法(顺序存储)孩子表示法(顺序+链式存储)孩子兄弟表示法(链式存储)树和二叉树的转换森林和二叉树的转换知识回顾树和森林的遍历树的定义笑死树的先根遍历树的后根遍历先根遍历和后根遍历称为深度优先遍历树的层次遍历称为广度优先遍历森林的......
  • 浮点数在内存中的存储规则
    我们知道,整型在内存中的存储比较简单,在内存中都是以二进制来存储的。然而,浮点型在内存中的存储较为复杂。下面来详细探讨:直接举一个例子:intmain(){intn=9;float*pFloat=(float*)&n;printf("n的值为:%d\n",n);printf("*pFloat的值为:%f\n",*pFloat);*pFloat=9.0;pri......
  • 配置k8s的一个serviceaccount具有管理员权限并获取他的token
    创建sa账户/授定管理员角色权限cat>sa.yaml<<eofapiVersion:v1kind:ServiceAccountmetadata:name:kubepi-usernamespace:kube-systemeofcat>rolebe.yaml<<eofapiVersion:rbac.authorization.k8s.io/v1kind:ClusterRoleBindingmetadata:na......