首页 > 系统相关 >部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)

部署Kafka+ZK及其日志采集实战(系统版本:linux_CentOs_7.8)

时间:2023-08-19 14:33:16浏览次数:45  
标签:CentOs ZK -- Kafka topic link 日志 kafka logback

  • 部署ZK

    docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
    
  • 部署Kafka

    -p 9092:9092 \
    -e KAFKA_BROKER_ID=0 \
    --env KAFKA_HEAP_OPTS=-Xmx256M \
    --env KAFKA_HEAP_OPTS=-Xms128M \
    -e KAFKA_ZOOKEEPER_CONNECT=[内网ip]:2181 \
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[外网ip]:9092 \
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:2.13-2.7.0
    
  • 采用Slf4j采集日志(lombok)

  • 需求

    • 控制台输出访问日志,方便测试
    • 业务数据实际输出到kafka
    • 常用的框架 log4j、logback、self4j等
  • log4j、logback、self4j 之间有啥关系

    • SLF4J(Simple logging Facade for Java) 门面设计模式 |外观设计模式

      • 把不同的日志系统的实现进行了具体的抽象化,提供统一的日志使用接口

      • 具体的日志系统就有log4j,logback等;

      • logback也是log4j的作者完成的,有更好的特性,可以取代log4j的一个日志框架, 是slf4j的原生实现

      • log4j、logback可以单独的使用,也可以绑定slf4j一起使用

    • 编码规范建议不直接用log4j、logback的API,应该用self4j, 日后更换框架所带来的成本就很低

  • 依赖引入

    <!-- 代码自动生成依赖 end-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
    
  • logback.xml配置

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <property name="LOG_HOME" value="./data/logs/link" />
    
        <!--采用打印到控制台,记录日志的方式-->
        <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
            </encoder>
        </appender>
    
        <!-- 采用保存到日志文件 记录日志的方式-->
        <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <file>${LOG_HOME}/link.log</file>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>${LOG_HOME}/link-%d{yyyy-MM-dd}.log</fileNamePattern>
            </rollingPolicy>
            <encoder>
                <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n</pattern>
            </encoder>
        </appender>
    
    
        <!-- 指定某个类单独打印日志 -->
        <logger name="net.xdclass.service.impl.LogServiceImpl"
                level="INFO" additivity="false">
            <appender-ref ref="rollingFile" />
            <appender-ref ref="console" />
        </logger>
    
        <root level="info" additivity="false">
            <appender-ref ref="console" />
        </root>
    
    </configuration>
    
  • LogServiceImpl

    @Service
    @Slf4j
    public class LogServiceImpl implements LogService {
    	
    	// Kafka:topic
        private static final String TOPIC_NAME = "ods_link_visit_topic";
    
        @Autowired
        private KafkaTemplate kafkaTemplate;
    
        /**
         * 记录日志
         *
         * @param request
         * @param shortLinkCode
         * @param accountNo
         * @return
         */
        @Override
        public void recodeShortLinkLog(HttpServletRequest request, String shortLinkCode, Long accountNo) {
            // ip、 浏览器信息
            String ip = CommonUtil.getIpAddr(request);
            // 全部请求头
            Map<String, String> headerMap = CommonUtil.getAllRequestHeader(request);
    
            Map<String,String> availableMap = new HashMap<>();
            availableMap.put("user-agent",headerMap.get("user-agent"));
            availableMap.put("referer",headerMap.get("referer"));
            availableMap.put("accountNo",accountNo.toString());
    
            LogRecord logRecord = LogRecord.builder()
                    //日志类型
                    .event(LogTypeEnum.SHORT_LINK_TYPE.name())
                    //日志内容
                    .data(availableMap)
                    //客户端ip
                    .ip(ip)
                    // 时间
                    .ts(CommonUtil.getCurrentTimestamp())
                    //业务唯一标识(短链码)
                    .bizId(shortLinkCode).build();
    
            String jsonLog = JsonUtil.obj2Json(logRecord);
    
            //打印日志 in 控制台
            log.info(jsonLog);
    
            // 发送kafka
            kafkaTemplate.send(TOPIC_NAME,jsonLog);
    
        }
    }
    
  • kafka命令

    ```
    创建topic
    ./kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 1 --topic ods_link_visit_topic
    
    查看topic
    ./kafka-topics.sh --list --zookeeper 172.17.0.1:2181
    
    删除topic
    ./kafka-topics.sh --zookeeper 172.17.0.1:2181 --delete --topic ods_link_visit_topic
    
    消费者消费消息
    ./kafka-console-consumer.sh --bootstrap-server 192.168.75.146:9092 --from-beginning --topic ods_link_visit_topic
    
    生产者发送消息
    ./kafka-console-producer.sh --broker-list 192.168.75.146:9092  --topic ods_link_visit_topic
    ```
    
    
  • 测试

    @Controller
    @Slf4j
    public class LinkApiController {
        @Autowired
        private ShortLinkService shortLinkService;
    
        @Autowired
        private LogService logService;
    
    
        /**
         *
         * @param shortLinkCode
         * @param request
         * @param response
         */
        @GetMapping(path = "/test")
        public void dispatch(HttpServletRequest request, HttpServletResponse response) {
    
            log.info("短链:{}", shortLinkCode);
            logService.recodeShortLinkLog(request, shortLinkCode, shortLinkVO.getAccountNo());
            
        }
    
    

标签:CentOs,ZK,--,Kafka,topic,link,日志,kafka,logback
From: https://www.cnblogs.com/xietingwei/p/17642448.html

相关文章

  • Debezium+KafkaConnect+Confluent实现企业级实时数据复制平台
    【I】集群规划5台节点IP地址  10.101.1.45 ZK、Kafka、DebeziumConnector、JDK、DebeziumUI、MySQL、Kafka-Eagle10.101.1.46 ZK、Kafka、DebeziumConnector、JDK10.101.1.47 ZK、Kafka、DebeziumConnector、JDK10.101.1.48 ZK、Kafka、DebeziumConnector、JDK10.......
  • centos7使用yum网络安装
    创建一个空文件data首先进入/etc/yum.repos.d将原来文件转移走,如下cd/etc/yum.repos.dmv ./* /date在编辑一个repo结尾的文件vimcentos.repo然后使用windows浏览器输入阿里https://mirrors.aliyun.com/repo/下载centos-7.repo下载好了用记事本打开,复制粘贴到centos.repo文件......
  • Centos7 install ZSH终端
    Centos的终端用起来太单一了。想着换成zsh终端,并配合ohmyzsh的主题。从而打造不一样的终端吧。安装ZSH我们可以用yum命令或者源码编译安装。(yum)安装的话可能zsh的版本较低。而很多主题都要用到更高版本的zsh,所以这里我使用的是源码安装。首先我们到zsh的官网下载最新版的zsh(http:......
  • kafka——命令备份——docker 启动
    dockerrun-d\--namezookeeper\-p2181:2181\-eZOOKEEPER_CLIENT_PORT=2181\confluentinc/cp-zookeeperdockerrun-d\--namekafka\-p9092:9092\-eKAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.100:9092\......
  • 分布式存储系统举例剖析(elasticsearch,kafka,redis-cluster)
    1.概述对于分布式系统,人们首先对现实中的分布式系统进行高层抽象,然后做出各种假设,发展了诸如CAP,FLP等理论,提出了很多一致性模型,Paxos是其中最璀璨的明珠。我们对分布式系统的时序,复制模式,一致性等基础理论特别关注。在共识算法的基础上衍生了选举算法,并且为分布式事务提供......
  • CentOS7源码安装JDK8☘️
    1.下载jdk  Java版本支持路线图 Java9、Java10、Java12、Java13、Java14、Java15和Java16均为短期版本,建议不要使用以上版本。官网下载如有旧版本请先卸载openjdk:[root@localhost~]#yumerase`rpm-qa|grepopenjdk`-y2.解压安装:[root@localhost~]#tarxvz......
  • centos7 虚拟机安装windchill11
    准备vmware17pro部署的带GUI的cetnos76g+100g环境配置网络环境虚拟机NAT模式连接配置静态ipvmware-编辑-虚拟网络编辑器-NAT设置#查看虚拟机网关和子网掩码192.168.179.2255.255.255.0虚拟机配置静态ipsurootvi/etc/sysconfig/network-s......
  • CentOS7配置VSFTP
    1.安装vsftpd#安装vsftpdyuminstall-yvsftpd#设置开机启动systemctlenablevsftpd.service#重启systemctlrestartvsftpd#查看vsftpd服务的状态systemctlstatusvsftpd.service2.配置vsftpd.conf#备份配置文件cp/etc/vsftpd/vsftpd.conf/etc/vsftpd/vsftpd......
  • centos7 centos-home 磁盘空间转移至centos-root下(磁盘空间不足,磁盘不足)
    在安装centos系统的时候,如果在安装时没有分配磁盘空间,选择的是默认分配的,根分区默认为50G大小,在安装完成后,可以发现大容量磁盘往往分配在了home下面。如果要把home下面的磁盘空间分配到root磁盘下面。可以进行如下操作1.查看CentOS的系统版本2.查看分区df-h(centos-home和c......
  • SpringBoot3集成Kafka
    目录一、简介二、环境搭建1、Kafka部署2、Kafka测试3、可视化工具三、工程搭建1、工程结构2、依赖管理3、配置文件四、基础用法1、消息生产2、消息消费五、参考源码标签:Kafka3.Kafka-eagle3;一、简介Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集......