首页 > 系统相关 >使用Nginx做页面采集, Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM

使用Nginx做页面采集, Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM

时间:2023-09-03 18:35:49浏览次数:46  
标签:6XwWe5qWHGM2PojVPUSejM log Nginx -- kafka Topic nginx Kafka

使用Nginx做页面采集, Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM

使用Nginx做页面采集, Kafka收集到对应Topic

0.架构简介

模拟线上的实时流,比如用户的操作日志,采集到数据后,进行处理,暂时只考虑数据的采集,使用Html+Jquery+Nginx+Ngx_kafka_module+Kafka来实现,其中Ngxkafkamodule 是开源的专门用来对接Nginx和Kafka的一个组件。

1.需求描述

1.1 用htmljquery 模拟用户请求日志

其中包括下面下面几项:

用户id:user_id, 访问时间:act_time, 操作: (action,包括click,job_collect,cv_send,cv_upload)

企业编码job_code

1.2 用Nginx接受1.1中的请求

1.3 接受完请求后,使用ngx_kafka_module将数据发送到Kafka的主题tp_individual 中。

1.4 在kafka中使用一个消费者消费该主题,观察

2.搭建步骤

2.1 Kafka安装

由于使用现成的已安装好的docker-kafka镜像,所以直接启动即可.

2.2 安装Nginx,并启动

$ cd /usr/local/src
$ git clone git@github.com:edenhill/librdkafka.git
# 进入到librdkafka,然后进行编译
$  cd librdkafka
$  yum install -y gcc gcc-c++ pcre-devel zlib-devel
$  ./configure
$  make && make install

$ yum -y install make zlib-devel gcc-c++ libtool openssl openssl-devel
$ cd /opt/hoult/software
# 1.下载
$ wget http://nginx.org/download/nginx-1.18.0.tar.gz
# 2.解压
$ tar -zxf nginx-1.18.0.tar.gz -C /opt/hoult/servers
# 3. 下载模块源码
$ cd /opt/hoult/software
$ git clone git@github.com:brg-liuwei/ngx_kafka_module.git
# 4. 编译
$ cd /opt/hoult/servers/nginx-1.18.0
$ ./configure --add-module=/opt/hoult/software/ngx_kafka_module/
$ make && make install 
# 5.删除Nginx安装包
$ rm /opt/hoult/software/nginx-1.18.0.tar.gz
# 6.启动nginx
$ cd /user/local/nginx
$ sbin/nginx 

3.相关配置

3.1 nginx配置nginx.conf

#pid        logs/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;

    kafka;
    kafka_broker_list linux121:9092;

    server {
        listen       9090;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        #------------kafka相关配置开始------------
        location = /kafka/log {
                #跨域相关配置
                add_header 'Access-Control-Allow-Origin' $http_origin;
                add_header 'Access-Control-Allow-Credentials' 'true';
                add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';

                kafka_topic tp_individual;
        }

        #error_page  404              /404.html;
    }

}

3.2 启动kafka 生产者和消费者

# 创建topic
kafka-topics.sh --zookeeper linux121:2181/myKafka --create --topic tp_individual --partitions 1 --replication-factor 1
# 创建消费者
kafka-console-consumer.sh --bootstrap-server linux121:9092 --topic tp_individual --from-beginning
# 创建生产者测试
kafka-console-producer.sh --broker-list linux121:9092 --topic tp_individual 

3.3 编写Html + Jquery代码

<!DOCTYPE html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1,shrink-to-fit=no">
        <title>index</title>
        <!-- jquery cdn, 可换其他 -->
        <script src="https://cdn.bootcdn.net/ajax/libs/jquery/3.5.1/jquery.js"></script>
    </head>
    <body>
        <input id="click" type="button" value="点击" onclick="operate('click')" />
        <input id="collect" type="button" value="收藏" onclick="operate('job_collect')" />
        <input id="send" type="button" value="投简历" onclick="operate('cv_send')" />
        <input id="upload" type="button" value="上传简历" onclick="operate('cv_upload')" />
    </body>

    <script>

        function operate(action) {

            var json = {'user_id': 'u_donald', 'act_time': current().toString(), 'action': action, 'job_code': 'donald'};

            $.ajax({
                url:"http://linxu121:8437/kafka/log",
                type:"POST" ,
                crossDomain: true,
                data: JSON.stringify(json),
                // 下面这句话允许跨域的cookie访问
                xhrFields: {
                    withCredentials: true
                },
                success:function (data, status, xhr) {

                    // console.log("操作成功:'" + action)
                },
                error:function (err) {

                    // console.log(err.responseText);
                }
            });
        };

        function current() {
            var d   = new Date(),
                str = '';
            str += d.getFullYear() + '-';
            str += d.getMonth() + 1 + '-';
            str += d.getDate() + ' ';
            str += d.getHours() + ':';
            str += d.getMinutes() + ':';
            str += d.getSeconds();
            return str;
        }
    </script>

</html>

a.html ​放在nginx的目录下,浏览器访问192.168.18.128:9090

4.演示

4.1 首先启动zk集群,kafka集群

4.2 然后创建topic, 创建消费者,创建生产者,测试topic

4.3 启动nginx访问页面,进行点击,观察消费者状态

整个过程如下图:

标签:6XwWe5qWHGM2PojVPUSejM,log,Nginx,--,kafka,Topic,nginx,Kafka
From: https://www.cnblogs.com/hulichao/p/use-nginx-as-a-page-collection-kafka-collects-correspon

相关文章

  • kafka在工作中的使用
    @KafkaListener(topics={KafkaInitialConfig.TOPIC_RECHARGE_YTK},groupId=KafkaInitialConfig.GROUP_ID_STORE_BFF,containerFactory="kafkaListenerContainerFactory",autoStartup="${kafka.listener.autoStartup}")publicvoidre......
  • Kafka - 生产者 - 压缩算法
    总结1.Producer端压缩、Broker端保持、Consumer端解压缩。2.开启压缩的最佳实践:Producer端完成的压缩,那么启用压缩的一个条件就是Producer程序运行机器上的CPU资源要很充足。如果你的环境中带宽资源有限,那么我也建议你开启压缩。如果你的机器CPU资源有很多富余,强烈......
  • Kafka-基础
    1.简介Kafka(ApacheKafka)是一种分布式流数据平台,最初由LinkedIn开发,并于后来捐赠给Apache软件基金会,成为了一个Apache顶级项目。它被设计用于处理大规模、实时的数据流,并为构建高吞吐量、容错性强的数据流应用程序提供支持。Kafka的特点使得它在日志收集、实时处理、事件驱动架......
  • SpringAMQP--TopicExchange
             ......
  • kafka安装以及参数
    kafka安装安装JDKyuminstall-yjava-1.8.0-openjdk.x86_64查看版本java-versionkafka是分布式的,需要多台机器,并且保证机器之间是免密登录同时需要用zookeeper集群负责管理。1、kafka版本选择,从官网下载即可,我这使用的是kafka_2.12-2.70.tgz2、brokers节点分配,注......
  • Kafka - 线上集群部署方案怎么做?
    既然是集群,那必然就要有多个Kafka节点机器,因为只有单台机器构成的Kafka伪集群只能用于日常测试之用,根本无法满足实际的线上生产需求。而真正的线上环境需要仔细地考量各种因素,结合自身的业务需求而制定。下面我就分别从操作系统、磁盘、磁盘容量和带宽等方面来讨论一下。 ......
  • 生产环境 kafka 平滑迁移之旅
    背景线上kafka集群,3台机器,3个broker;其中某台机器因为硬件故障,需要停机维修;停机意味这跑在机器上的服务会停止。所以本次做kafka迁移的目标是机器可以停止但依赖kafka的上游和下游业务可不能停止,因为所属行业的特殊性,服务的停止,对业务的影响和伤害还蛮大的。分析我们知道kafka是有......
  • Kafka - 不仅是消息引擎,还是分布式流处理平台
     如果你通读全篇文字但只能记住一句话,我希望你记住的就是这句ApacheKafka是消息引擎系统,也是一个分布式流处理平台(DistributedStreamingPlatform) 作为流处理平台,Kafka与其他主流大数据流式计算框架相比,优势在哪里呢?我能想到的有两点。第一点是更容易实现端到端的正......
  • Kafka - 为什么 Kafka 不像 MySQL 那样允许追随者副本对外提供读服务?
    几个原因:1,kafka的分区已经让读是从多个broker读从而负载均衡,不是MySQL的主从,压力都在主上;2,kafka保存的数据和数据库的性质有实质的区别就是数据具有消费的概念,是流数据,kafka是消息队列,所以消费需要位移,而数据库是实体数据不存在这个概念,如果从kafka的follower读,消费端offset控制......
  • kafka的下载和了解
    可以登录Apachekafka官方下载https://kafka.apache.org/downloads.html下载Scala2.13 -kafka_2.13-3.3.1.tgz(asc,sha512)官方推荐下载scala2.13版本的。kafka作为一个分布式流平台,有哪些关键的能力?发布和订阅消息(流),在这方面,它类似于一个消息队列。以容错(故障转......