首页 > 其他分享 >单机ELK接收kafka日志环境搭建

单机ELK接收kafka日志环境搭建

时间:2023-12-04 10:45:47浏览次数:30  
标签:ELK log -- zookeeper kafka 日志 data #############################

单机ELK接收kafka日志环境搭建

1、安装elk单机环境

参考以下链接:https://www.cnblogs.com/zuouncle/p/17332191.html

2、搭建kafka

下载kafka:https://archive.apache.org/dist/kafka/2.6.2/

解压:

tar -zxvf kafka_2.12-2.6.2.tgz

mv kafka_2.12-2.6.2 kafka

创建文件夹:

mkdir -p /data/kafka/logs/

mkdir -p /data/zookeeper/data/

mkdir -p /data/zookeeper/logs

配置zookeeper:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
#dataDir=/tmp/zookeeper
dataDir=/data/zookeeper/data
dataLogDir=/data/zookeeper/logs
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

配置kafka

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
host.name=内网IP

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://外网IP:9092
# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=6

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=10.10.0.208:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

3、kafka启动测试

开启防火墙端口:

firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload
firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --reload

cd /usr/local/kafka/bin

启动zookeeper:

./zookeeper-server-start.sh ../config/zookeeper.properties &

启动kafka:

./kafka-server-start.sh ../config/server.properties &

创建topic:

./kafka-topics.sh --create --zookeeper  localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看创建的topic:
./kafka-topics.sh --list --zookeeper localhost:2181

发送消息:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

接收消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic shopmall --from-beginning

4、elk配置接收kafka日志

cd /etc/logstash/conf.d

vim syslog.conf

input{
  kafka{
    bootstrap_servers => "kafka外网IP:9092"
    client_id => "logstash"
    auto_offset_reset => "latest"
    consumer_threads => 5
    topics => ["test"]  #多个用逗号隔开
    type => "demo"
  }
}

#输出到elastcisearch
output{
    stdout { }
    elasticsearch{
        hosts => ["ELK内网IP:9200"]         #elasticsearch服务地址
        index => "shopmalllogs-%{+YYYY.MM}"   #创建的索引
      }
}

  

 

标签:ELK,log,--,zookeeper,kafka,日志,data,#############################
From: https://www.cnblogs.com/zuouncle/p/17874381.html

相关文章

  • .NET 7(C#)配置使用log4net日志框架的方法
    .NET7(C#)中配置和使用log4net日志框架是一个涉及多步骤的过程。log4net是一个高度灵活且强大的日志记录库,可用于记录应用程序的运行时信息。它支持各种日志输出格式和目的地,如文件、数据库、控制台等。以下是在.NET7(C#)项目中配置和使用log4net的基本步骤:1.安装log4net......
  • 消息队列入门 —— 以 Kafka 为例(一)
    消息队列入门——以Kafka为例(一)概述当我们的应用逐步变得庞大,各层应用之间调用关系越来越复杂,对系统的可用性以及可扩展性要求也越来越高。消息队列作为分布式系统架构中的一个关键中间件,提供了“消息传递”和“消息排队模型”,可以应用在系统解耦、异步处理、流量削峰等多个......
  • python日志模块回顾
    日志模块配置文件logging.yamlversion:1formatters:simple:format:'[%(asctime)s%(thread)d][%(levelname)s]%(message)s'#日志内容的格式化,具体参数问GPT或百度dateformat:'%Y-%m-%d%H:%M:%S.%f'handlers:console:class:logging.StreamHand......
  • Logstash分析Nginx日志
    六、Logstash分析Nginx日志实现思路:1.将Nginx普通日志转换为json2.将Nginx日志的时间格式进行格式化输出3.将Nginx日志的来源IP进行地域分析4.将Nginx日志的user-agent字段进行分析5.将Nginx日志的bytes修改为整数6.移除没有用的字段,message、headers6.1架构......
  • centos查看日志文件内容,包含某个关键字的前后5行日志内容
    简述centos查看日志的几种方法centos查看日志文件内容,包含某个关键字的前后5行日志内容前情提示系统:一说部分截图、链接等因过期、更换域名、MD语法等可能不显示,可联系反馈(备注好博文地址),谢谢❤带有#号、删除线、不操作、不执行字样的为提示或者备份bash,实际不执行如果无法下载......
  • 【PUSDN】centos查看日志文件内容,包含某个关键字的前后5行日志内容,centos查看日志的几
    简述centos查看日志的几种方法centos查看日志文件内容,包含某个关键字的前后5行日志内容前情提示系统:一说部分截图、链接等因过期、更换域名、MD语法等可能不显示,可联系反馈(备注好博文地址),谢谢❤带有#号、删除线、不操作、不执行字样的为提示或者备份bash,实际不执行如果无法下载......
  • dump 日志收集与分析(jmap 和 jstack 工具)讲解与实战操作
    目录一、概述二、常见的dump工具三、dump可能会导致进程卡住风险(生产谨慎操作)四、安装JDK五、jmap介绍与示例讲解1)jmap介绍2)Kafka安装(单机)1、下载安装包2、配置环境变量3、配置kafka3、配置ZooKeeper4、启动kafka5、验证3)示例讲解【示例一】执行jmap命令查看内存使用情况【......
  • 日志模块python loguru
    日志模块pythonloguru下载pipinstallloguru开箱即用Loguru的主要概念是只有一个:loggerfromloguruimportloggerlogger.info("Thisisloginfo!")logger.warning("Thisislogwarn!")logger.error("Thisislogerror!")logger.debug("This......
  • Django 日作 12/3 ---日志模块封装(loguru)
    #1pythonloguru如何用,如何把日志写到文件中,日志级别有哪些https://loguru.readthedocs.io/en/stable/resources/recipes.html#compatibility-with-multiprocessing-using-enqueue-argument  官方loguru不需要配置,输出到文件的话——先导入loguru包,fromloguruimportl......
  • SQLServer2019及SQLServer FC日志收集
    日志收集:===============1.SQLServer 错误日志 SQLServer2019:<DataDrive>:\ProgramFiles\MicrosoftSQLServer\MSSQL15.<instancename>\MSSQL\Log 以上路径可能根据您的实例名称不同而有所不同。请找到相应的“LOG”文件夹并压缩以后发送给我。 2. 应用程序日......