首页 > 其他分享 >pod日志采集-DaemonSet(ElFK方案)

pod日志采集-DaemonSet(ElFK方案)

时间:2024-07-31 15:40:39浏览次数:6  
标签:ElFK log namespace filebeat kafka DaemonSet pod root name

目录

采集方案

部署方式是采用DaemonSet的方式,采集时按照k8s集群的namespace进行分类,然后根据namespace的名称创建不同的topic到kafka中

K8S-日志文件说明

一般情况下,容器中的日志在输出到标准输出(stdout)时,会以-json.log的命名方式保存在/var/lib/docker/containers目录中,当然如果修改了docker的数据目录,那就是在修改后的数据目录中了,例如:
image.png
这里能看到,有这么个文件: /data/docker/containers/container id/
-json.log,然后k8s默认会在/var/log/containers和/var/log/pods目录中会生成这些日志文件的软连接,如下所示:

cattle-node-agent-tvhlq_cattle-system_agent-8accba2d42cbc907a412be9ea3a628a90624fb8ef0b9aa2bc6ff10eab21cf702.log
etcd-k8s-master01_kube-system_etcd-248e250c64d89ee6b03e4ca28ba364385a443cc220af2863014b923e7f982800.log

然后,会看到这个目录下存在了此宿主机上的所有容器日志,文件的命名方式为:

[podName]_[nameSpace]_[depoymentName]-[containerId].log

上面这个是deployment的命名方式,其他的会有些不同,例如:DaemonSet,StatefulSet等,不过所有的都有一个共同点,就是

*_[nameSpace]_*.log

kafka部署

operator部署

Strimzi是目前最主流的operator方案。集群数据量较小的话,可以采用NFS共享存储,数据量较大的话可使用local pv存储。

opertor下载

https://strimzi.io/downloads/

查看对应的版本

image.png

选择.tgz下载

image.png

安装

[root@master01 kafka]# ll
总用量 80
drwxr-xr-x 4 root root   122 10月 25 16:09 strimzi-kafka-operator
-rw-r--r-- 1 root root 81283 10月 25 16:06 strimzi-kafka-operator-helm-3-chart-0.31.1.tgz

[root@master01 kafka]# helm install  strimzi  ./strimzi-kafka-operator
NAME: strimzi
LAST DEPLOYED: Sun Oct  8 21:16:31 2023
NAMESPACE: kafka
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing strimzi-kafka-operator-0.31.1

2.资源清单下载

下载对应版本的yaml清单

https://github.com/strimzi/strimzi-kafka-operator/releases

wget https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.31.1/strimzi-0.31.1.tar.gz

image.png
image.png

解压

tar -xf strimzi-0.31.1.tar.gz

yaml说明

  • kafka-persistent.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点的持久集群。(推荐)
  • kafka-jbod.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点(每个节点使用多个持久卷)的持久集群。
  • kafka-persistent-single.yaml:部署具有单个 ZooKeeper 节点和单个 Kafka 节点的持久集群。
  • kafka-ephemeral.yaml:部署具有三个 ZooKeeper 和三个 Kafka 节点的临时群集。
  • kafka-ephemeral-single.yaml:部署具有三个 ZooKeeper 节点和一个 Kafka 节点的临时群集。
[root@master01 kafka20231025]# cd strimzi-0.31.1/examples/kafka/
[root@master01 kafka]# ll
总用量 20
-rw-r--r-- 1 redis docker 713 9月  21 2022 kafka-ephemeral-single.yaml
-rw-r--r-- 1 redis docker 713 9月  21 2022 kafka-ephemeral.yaml
-rw-r--r-- 1 redis docker 957 9月  21 2022 kafka-jbod.yaml
-rw-r--r-- 1 redis docker 865 9月  21 2022 kafka-persistent-single.yaml
-rw-r--r-- 1 redis docker 865 9月  21 2022 kafka-persistent.yaml

创建pvc/pv

此处以nfs存储为例,提前创建pvc资源,分别用于3个zookeeper和3个kafka持久化存储数据使用。

  • 创建pv
[root@master01 kafka]# cat kafka-pvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-my-cluster-zookeeper-0
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-my-cluster-zookeeper-1
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-my-cluster-zookeeper-2
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-0-my-cluster-kafka-0
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-0-my-cluster-kafka-1
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-0-my-cluster-kafka-2
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
  • 创建pvc
[root@master01 kafka]# cat kafka-pvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-my-cluster-zookeeper-0
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-my-cluster-zookeeper-1
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-my-cluster-zookeeper-2
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-0-my-cluster-kafka-0
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-0-my-cluster-kafka-1
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: data-0-my-cluster-kafka-2
  namespace: kafka
spec:
  storageClassName: nfs-client
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 100Gi

安装验证

[root@master01 kafka]# kubectl get pod -n kafka
NAME                                          READY   STATUS    RESTARTS   AGE
my-cluster-entity-operator-7c68d4b9d9-tg56j   3/3     Running   0          2m15s
my-cluster-kafka-0                            1/1     Running   0          2m54s
my-cluster-kafka-1                            1/1     Running   0          2m54s
my-cluster-kafka-2                            1/1     Running   0          2m54s
my-cluster-zookeeper-0                        1/1     Running   0          3m19s
my-cluster-zookeeper-1                        1/1     Running   0          3m19s
my-cluster-zookeeper-2                        1/1     Running   0          3m19s
strimzi-cluster-operator-56fdbb99cb-gznkw     1/1     Running   0          97m

kafka-ui部署

docker run -d \
    -p 9096:8080 \
    -v /data/kafka-client:/usr/local/kafka-map/data \
    -e DEFAULT_USERNAME=admin \
    -e DEFAULT_PASSWORD=admin \
    --name kafka-map \
    --restart always dushixiang/kafka-map:latest

数据格式化
image.png

filebeat部署

部署采用的DaemonSet方式进行,这里没有啥可说的,参照官方文档直接部署即可

filebeat-rbac.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: filebeat
subjects:
- kind: ServiceAccount
  name: filebeat
  namespace: kube-system
roleRef:
  kind: ClusterRole
  name: filebeat
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: filebeat
  labels:
    k8s-app: filebeat
rules:
- apiGroups: [""] # "" indicates the core API group
  resources:
  - namespaces
  - pods
  - nodes
  verbs:
  - get
  - watch
  - list
- apiGroups: ["apps"]
  resources:
    - replicasets
  verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: filebeat
  namespace: kube-system
  labels:
    k8s-app: filebeat

filebeat-cm.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: filebeat-config
  namespace: kube-system
data:
  filebeat.yml: |-
    filebeat.inputs:
    - type: container
      enabled: true
      paths:
        - /var/log/containers/*_pi6000_*log
      fields:
        log_topic: pi6000
        env: dev
      multiline.pattern: '(^\[[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3}\])|(^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})|(^[0-9]{2}:[0-9]{2}:[0-9]{2})'
      multiline.negate: true
      multiline.match: after
      multiline.max_lines: 100

    - type: container
      enabled: true
      paths:
        - /var/log/containers/*_default_*log
      fields:
        log_topic: default
        env: dev
      multiline.pattern: '(^\[[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3}\])|(^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})|(^[0-9]{2}:[0-9]{2}:[0-9]{2})'
      multiline.negate: true
      multiline.match: after
      multiline.max_lines: 100

    filebeat.config.modules:
      path: ${path.config}/modules.d/*.yml
      reload.enabled: false

    processors:
    #添加k8s元数据信息
    - add_kubernetes_metadata:
        host: ${NODE_NAME}
        matchers:
        - logs_path:
            logs_path: "/var/log/containers/"
            
            
     #移除多余的字段
    - drop_fields:
        fields:
          - host
          - ecs
          - log
          - agent
          - input
          - stream
          - container
          - kubernetes.pod.uid
          - kubernetes.namespace_uid
          - kubernetes.namespace_labels
          - kubernetes.node.uid
          - kubernetes.node.labels
          - kubernetes.replicaset
          - kubernetes.labels
          - kubernetes.node.name
        ignore_missing: true

    - script:
        lang: javascript
        id: format_time
        tag: enable
        source: |
          function process(event) {
            var str = event.Get("message");
            // 用括号提取时间戳
            var regex = /^\[(.*?)\]/;
            var match = str.match(regex);
            if (match && match.length > 1) {
              var time = match[1]; //提取的不带括号的时间戳
              event.Put("time", time);
            }
            // 提取不带括号的时间戳
            var regex2 = /^\d{2}:\d{2}:\d{2}/;
            var match2 = str.match(regex2);
            if (match2) {
              time = match2[0]; // Extracted timestamp
              event.Put("time", time);
            }
          }
    #优化层级结构
    - script:
        lang: javascript
        id: format_k8s
        tag: enable
        source: |
          function process(event) {
            var k8s = event.Get("kubernetes");
            var newK8s = {
              podName: k8s.pod.name,
              nameSpace: k8s.namespace,
              imageAddr: k8s.container.name,
              hostName: k8s.node.hostname
            };
            event.Put("k8s", newK8s);
          }
   #添加时间,可以在logstash处理
    - timestamp:
        field: time
        timezone: Asia/Shanghai
        layouts:
          - '2006-01-02 15:04:05'
          - '2006-01-02 15:04:05.999'
        test:
          - '2019-06-22 16:33:51'

    output.kafka:
      hosts: ["10.0.0.15:9092"]
      topic: '%{[fields.log_topic]}'
      partition.round_robin:
        reachable_only: true

filebeat-daemonset.yaml

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: filebeat
  namespace: kube-system
  labels:
    k8s-app: filebeat
spec:
  selector:
    matchLabels:
      k8s-app: filebeat
  template:
    metadata:
      labels:
        k8s-app: filebeat
    spec:
      serviceAccountName: filebeat
      terminationGracePeriodSeconds: 30
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      containers:
      - name: filebeat
        image: registry.us-east-1.aliyuncs.com/oll/filebeat:7.12.0
        args: [
          "-c", "/etc/filebeat.yml",
          "-e",
        ]
        env:
        - name: NODE_NAME
          valueFrom:
            fieldRef:
              fieldPath: spec.nodeName
        securityContext:
          runAsUser: 0
          # If using Red Hat OpenShift uncomment this:
          #privileged: true
        resources:
          limits:
            memory: 200Mi
          requests:
            cpu: 100m
            memory: 100Mi
        volumeMounts:
        - name: config
          mountPath: /etc/filebeat.yml
          readOnly: true
          subPath: filebeat.yml
        - name: data
          mountPath: /usr/share/filebeat/data
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: varlog
          mountPath: /var/log
          readOnly: true
      volumes:
      - name: config
        configMap:
          defaultMode: 0640
          name: filebeat-config
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: varlog
        hostPath:
          path: /var/log
      # data folder stores a registry of read status for all files, so we don't send everything again on a Filebeat pod restart
      - name: data
        hostPath:
          # When filebeat runs as non-root user, this directory needs to be writable by group (g+w).
          path: /var/lib/filebeat-data
          type: DirectoryOrCreate

部署

[root@master01 ds-filebeat-7.12]# kubectl  apply  -f .
[root@master01 ds-filebeat-7.12]# kubectl  -n kube-system  get  pods | grep filebeat
filebeat-5pvvq                             1/1     Running   0                74m
filebeat-74rbc                             1/1     Running   0                74m
filebeat-md8k4                             1/1     Running   0                74m
filebeat-ssg6g                             1/1     Running   0                74m
filebeat-stlxt                             1/1     Running   0                74m

访问kafka数据验证

查看kafka topic信息,已经成功创建了名为pi6000的topic

可以看到使用 Filebeat 优化了日志的层级结构和字段,使数据更易于分析
image.png

logstash部署

es集群部署可参考:https://www.cnblogs.com/Unstoppable9527/p/18329622

安装

[root@node03 ~]# wget https://artifacts.elastic.co/downloads/logstash/logstash-8.8.2-x86_64.rpm
[root@node03 ~]# rpm -ivh logstash-8.8.2-x86_64.rpm
[root@node03 ~]# systemctl enable logstash
Created symlink /etc/systemd/system/multi-user.target.wants/logstash.service → /usr/lib/systemd/system/logstash.service.

增加环境变量

[root@node03 ~]# vim  /etc/profile
export PATH=$PATH:/usr/share/logstash/bin
[root@node03 ~]# source /etc/profile
[root@node03 ~]# logstash -V
Using bundled JDK: /usr/share/logstash/jdk
logstash 8.8.2

修改pipeline文件

[root@node03 ~]# cat > /etc/logstash/conf.d/log-to-es.conf << EOF
input {
  kafka {
    bootstrap_servers => "10.0.0.15:9092"
    auto_offset_reset => "latest" # 从最新的偏移量开始消费
    decorate_events => true # 此属性会将当前topic、offset、group、partition等信息也带到message中
    topics => ["pi6000"]
    group_id => "test"
    #消费3个分区,并行处理提高消费
    consumer_threads => 3
    codec => "json"
  }

}

filter {
  
    #匹配多种日志格式
    grok {
        match => { "message" => [
            "%{TIMESTAMP_ISO8601:log_time} +%{LOGLEVEL:log_level} +%{JAVACLASS:log_class} +%{INT:log_line} - %{GREEDYDATA:log_message}",
            "%{TIMESTAMP_ISO8601:log_time} +%{LOGLEVEL:log_level}\s*-%{GREEDYDATA:log_message}",
            "%{TIMESTAMP_ISO8601:log_time} +%{LOGLEVEL:log_level} +%{JAVACLASS:log_class} - %{GREEDYDATA:log_message}",
            "\[%{TIMESTAMP_ISO8601:log_time}\] +%{LOGLEVEL:log_level} +%{JAVACLASS:log_class} +%{INT:log_line} - %{GREEDYDATA:log_message}",
            "%{TIME:log_time} \[%{DATA:thread}\] \[%{DATA:empty_field}\] %{LOGLEVEL:log_level}\s*%{JAVACLASS:log_class} - %{GREEDYDATA:log_message}",
            "%{TIME:log_time} %{LOGLEVEL:log_level}\s* - %{GREEDYDATA:log_message}"
        ]}
    }
   #移除不匹配的message,生产中不建议用哦
   if "_grokparsefailure" in [tags] {
       drop {}
   }

    #移除不需要的字段
    mutate {
      #remove_field => [ "message","ecs","agent","log","host","input" ]
      rename => { "[kubernetes][container][image]" => "[kubernetes][container][image_name]" }
      remove_field => [ "ecs","agent","log","host","input" ]

   }

    date {
        match => ["log_time", "yyyy-MM-dd HH:mm:ss.SSS"]
        target => "@timestamp"
        timezone => "Asia/Shanghai" # 根据你的时区进行调整
    }
}

output {
  stdout { codec => rubydebug }
  elasticsearch {
    hosts => ["https://10.0.0.14:9200", "https://10.0.0.15:9200", "https://10.0.0.16:9200"]
    index => "pi6000-ds-%{+yyyy.MM.dd}"
    template_overwrite => true
    user => "elastic"
    password => "123456"
    ssl => true
    ssl_certificate_verification => false
  }
}
EOF

启动logstash

[root@node03 ~]# systemctl start logstash

kibana分析数据

kibana部署可参考 https://www.cnblogs.com/Unstoppable9527/p/18329632

  • 可以根据deployment对服务进行区分

image.png

总结

个人认为,在日志收集的第一层使用 Filebeat 进行一些预处理,可以缩短整个流程的处理时间,因为瓶颈往往出现在 Elasticsearch 和 Logstash 上。因此,尽可能在 Filebeat 阶段完成耗时操作,如果无法处理再使用 Logstash。另一个容易被忽视的重要点是简化日志内容,这显著减少了日志体积。我做过测试,相同数量的日志,未经简化的体积为 20GB,而经过优化后的体积不到 10GB,这对整个 Elasticsearch 集群的性能提升非常显著。此外,可以通过版本控制来管理 Filebeat 的配置文件,这在维护时提供了记录和变更管理的便利。

标签:ElFK,log,namespace,filebeat,kafka,DaemonSet,pod,root,name
From: https://www.cnblogs.com/Unstoppable9527/p/18334767

相关文章

  • podman 不运行容器的情况下,如何查看镜像中的文件
    前言podman在镜像容器启动的情况下,可以通过podmanexec-it容器名/容器idsh/bash登录到容器内部,查看内部文件系统。但如果不想启动容器,只用podmancreate的方式创建临时容器,没办法通过podmanexec进入容器,只能用podmancp从容器中拷贝文件。在ai的帮助下,通过试验验......
  • 在K8S中,Pod有几种探针?
    在Kubernetes(K8s)中,Pod的容器可以通过探针(Probes)来检测容器的健康状况和就绪状态。探针主要用于确保容器能够正确启动并在运行过程中保持健康状态。Kubernetes支持三种类型的探针:livenessProbe(存活探针)readinessProbe(就绪探针)startupProbe(启动探针)每种探针都......
  • 在K8S中,删除一个Pod会发生什么事情?
    在Kubernetes(K8S)中,删除一个Pod会触发一系列操作,包括向容器发送终止信号、清理资源以及可能的重新调度。以下将详细阐述具体过程:用户发出删除命令命令提交:用户通过kubectl命令行工具或者API接口发出删除Pod的请求,例如执行kubectldeletepod<pod-name>[3]。APIServer响应:Kub......
  • 在K8S中,在容器内如何获取pod和namespace名字?
    在K8S中,可以通过DownwardAPI将Pod和Namespace的名字注入到容器内的环境变量或文件中。这种方法允许容器内部的应用直接访问这些信息,从而进行相应的配置或处理。具体技术介绍如下:环境变量获取获取Pod名称:在Pod的配置中,可以通过设置一个环境变量,将Pod的名称注入到容器内。例......
  • k8s修改pod的内核参数以优化服务网络性能
    k8s修改pod的内核参数以优化服务网络性能1、面对高并发场景:TIME_WAIT连接复用如果短连接并发量较高,它所在netns中TIME_WAIT状态的连接就比较多,而TIME_WAIT连接默认要等2MSL时长才释放,长时间占用源端口,当这种状态连接数量累积到超过一定量之后可能会导致无法新建连接。所......
  • Kubernetes 集群中 Pod 使用镜像的批量导出脚本
    目录动机脚本使用实例获取指定命名空间获取所有命名空间生成csv文件动机最近,由于DockerHub镜像的失效,在重新启动Pod时,拉取镜像失败,导致Pod无法正常启动。因此,我需要批量检查集群中有哪些Pod使用了官方的DockerHub镜像,并将这些镜像保存到本地的Harbor仓库中。为此,......
  • pods驱逐
    您好,如附件,在运维管理这里,有一个事件中心,是可以查询最近一个小时的情况,超出的,有一个pod事件监控您选择事件以及输入pod名称就可以查询,但是前提是您安装一下npd组件,把事件采集到sls才可以持久化查询到image.png internet-deploy-5df788c7c4-fdd2b         0/1 ......
  • 在K8S中,创建pod的过程是什么?
    在Kubernetes(K8S)中,创建一个Pod是一个涉及多个步骤的过程。Pod是Kubernetes中的基本部署单元,它封装了一个或多个容器(通常是Docker容器),这些容器共享存储、网络和生命周期。以下是创建Pod的详细过程:定义Pod资源:首先,需要定义一个Pod资源,通常通过编写一个YAML或JSON格式的配置文件......
  • 在K8S中,节点故障pod驱逐pod过程,时间怎么定义的?
    在Kubernetes(K8S)中,节点故障导致Pod驱逐的过程是一个涉及多个组件和参数协同工作的复杂流程。这个过程的时间定义主要通过一系列参数来配置和确定。以下是该过程的详细解释以及时间定义的说明:一、节点故障检测节点控制器(NodeController)检查间隔:node-monitor-period:这是节点控......
  • 在K8S中,k8s删除pod的过程是什么?
    在Kubernetes(K8s)中,删除Pod的过程是一个涉及多个步骤和组件协作的流程。以下是K8s删除Pod的详细过程:一、连接到Kubernetes集群首先,用户或系统需要连接到Kubernetes集群。这通常通过kubectl命令行工具或其他KubernetesAPI客户端实现。kubectl是Kubernetes的官方命令行工具,用于与K......