首页 > 其他分享 >当Elasticsearch遇见Kafka

当Elasticsearch遇见Kafka

时间:2023-06-02 17:33:25浏览次数:58  
标签:插件 Logstash Elasticsearch 遇见 kafka Kafka logstash


Elasticsearch作为当前主流的全文检索引擎,除了强大的全文检索能力和高扩展性之外,对多种数据源的兼容能力也是其成功的秘诀之一。而Elasticsearch强大的数据源兼容能力,主要来源于其核心组件之一的Logstash, Logstash通过插件的形式实现了对多种数据源的输入和输出。Kafka是一种高吞吐量的分布式发布订阅消息系统,是一种常见的数据源,也是Logstash支持的众多输入输出源的其中一个。本文将从实践的角度,研究使用Logstash Kafka Input插件实现将Kafka中数据导入到Elasticsearch的过程。

当Elasticsearch遇见Kafka_kafka

使用Logstash Kafka插件连接Kafka和Elasticsearch

1 Logstash Kafka input插件简介

Logstash Kafka Input插件使用Kafka API从Kafka topic中读取数据信息,使用时需要注意Kafka的版本及对应的插件版本是否一致。该插件支持通过SSL和Kerveros SASL方式连接Kafka。另外该插件提供了group管理,并使用默认的offset管理策略来操作Kafka topic。

Logstash默认情况下会使用一个单独的group来订阅Kafka消息,每个Logstash Kafka Consumer会使用多个线程来增加吞吐量。当然也可以多个Logstash实例使用同一个group_id,来均衡负载。另外建议把Consumer的个数设置为Kafka分区的大小,以提供更好的性能。

2 测试环境准备 2.1 创建Elasticsearch集群

为了简化搭建过程,本文使用了腾讯云Elasticsearch service。腾讯云Elasticsearch service不仅可以实现Elasticsearch集群的快速搭建,还提供了内置Kibana,集群监控,专用主节点,Ik分词插件等功能,极大的简化了Elasticsearch集群的创建和管理工作。

2.2 创建Kafka服务

Kafka服务的搭建采用腾讯云CKafka来完成。与Elasticsearch Service一样,腾讯云CKafka可以实现Kafka服务的快速创建,100%兼容开源Kafka API(0.9版本)。

2.3 服务器

除了准备Elasticsearch和Kafka,另外还需要准备一台服务器,用于运行Logstash以连接Elasticsearch和Kafka。本文采用腾讯云CVM服务器

2.4 注意事项

1) 需要将Elasticsearch、Kafka和服务器创建在同一个网络下,以便实现网络互通。由于本文采用的是腾讯云相关的技术服务,因此只需要将Elasticsearch service,CKafka和CVM创建在同一个私有网路(VPC)下即可。

2) 注意获取Elasticsearch serivce,CKafka和CVM的内网地址和端口,以便后续服务使用

本次测试中:

服务 ip port

 

 

 

Elasticsearch service

192.168.0.8

9200

Ckafka

192.168.13.10

9092

CVM

192.168.0.13

-

3 使用Logstash连接Elasticsearch和Kafka 3.1 Kafka准备

可以参考[CKafka 使用入门]

按照上面的教程

1) 创建名为kafka_es_test的topic

2) 安装JDK

3) 安装Kafka工具包

4) 创建producer和consumer验证kafka功能

3.2 安装Logstash

Logstash的安装和使用可以参考[一文快速上手Logstash]

3.3 配置Logstash Kafka input插件

创建kafka_test_pipeline.conf文件内容如下:

input{        kafka{
                bootstrap_servers=>"192.168.13.10:9092"
                topics=>["kafka_es_test"]
                group_id=>"logstash_kafka_test"
        }
}
output{
        elasticsearch{
                hosts=>["192.168.0.8:9200"]
        }
}

其中定义了一个kafka的input和一个elasticsearch的output

对于Kafka input插件上述三个参数为必填参数,除此之外还有一些对插件行为进行调整的一些参数如:

auto_commit_interval_ms 用于设置Consumer提交offset给Kafka的时间间隔

consumer_threads 用于设置Consumer的线程数,默认为1,实际中应设置与Kafka Topic分区数一致

fetch_max_wait_ms 用于指定Consumer等待一个fetch请求达到fetch_min_bytes的最长时间

fetch_min_bytes 用于指定Consumer fetch请求应返回的最小数据量

topics_pattern 用于通过正则订阅符合某一规则的一组topic

更多参数参考:[Kafka Input Configuration Options]

3.4 启动Logstash

以下操作在Logstash根目录中进行

1) 验证配置


./bin/logstash -f kafka_test_pipeline.conf --config.test_and_exit


如有错误,根据提示修改配置文件。若配置正确会得到如下结果


Sending Logstash's logs to /root/logstash-5.6.13/logs which is now configured via log4j2.properties[2018-11-11T15:24:01,598][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/root/logstash-5.6.13/modules/netflow/configuration"}
[2018-11-11T15:24:01,603][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/root/logstash-5.6.13/modules/fb_apache/configuration"}
Configuration OK
[2018-11-11T15:24:01,746][INFO ][logstash.runner          ] Using config.test_and_exit mode. Config Validation Result: OK. Exiting Logstash


2) 启动Logstash


./bin/logstash -f kafka_test_pipeline.conf --config.reload.automatic


观察日志是否有错误提示,并及时处理

3.4 启动Kafka Producer

以下操作在Kafka工具包根目录下进行


./bin/kafka-console-producer.sh --broker-list 192.168.13.10:9092 --topic kafka_es_test


写入测试数据


This is a message


3.5 Kibana验证结果

登录Elasticsearch对应Kibana, 在Dev Tools中进行如下操作

1) 查看索引


GET _cat/indices


可以看到一个名为logstash-xxx.xx.xx的索引被创建成功


green open .kibana             QUw45tN0SHqeHbF9-QVU6A 1 1 1 0 5.5kb 2.7kbgreen open logstash-2018.11.11 DejRdNJVQ1e1MwbyJjJjLw 5 1 1 0 8.7kb 4.3kb


2) 查看写入的数据


GET logstash-2018.11.11/_search


可以看到数据已经被成功写入

{  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "logstash-2018.11.11",
        "_type": "logs",
        "_id": "AWcBsEegMu-Dkjm1ap3H",
        "_score": 1,
        "_source": {
          "message": "This is a message",
          "@version": "1",
          "@timestamp": "2018-11-11T07:33:09.079Z"
        }
      }
    ]
  }
}

4 总结

Logstash作为Elastic Stack中数据采集和处理的核心组件,为Elasticsearch提供了强大的数据源兼容能力。从测试过程可以看出,使用Logstash实现kafka和Elaticsearch的连接过程相当简单方便。另外Logstash的数据处理功能,也使得采用该架构的系统对数据映射和处理有天然的优势。

标签:插件,Logstash,Elasticsearch,遇见,kafka,Kafka,logstash
From: https://blog.51cto.com/u_16145034/6404313

相关文章

  • Elasticsearch专题精讲—— REST APIs —— Document APIs —— Delete by query API
    RESTAPIs——DocumentAPIs——DeletebyqueryAPIhttps://www.elastic.co/guide/en/elasticsearch/reference/8.8/docs-delete-by-query.htmlDeletesdocumentsthatmatchthespecifiedquery.删除与指定查询匹配的文档。curl-XPOS......
  • Elasticsearch专题精讲—— REST APIs —— Document APIs —— Delete API
    RESTAPIs——DocumentAPIs——DeleteAPIRemovesaJSONdocumentfromthespecifiedindex.从指定的索引中移除JSON文档。1、Request(请求)https://www.elastic.co/guide/en/elasticsearch/reference/8.8/docs-delete.......
  • Elasticsearch介绍及安装
    elasticsearch的作用elasticsearch是一款非常强大的开源搜索引擎,具备非常多强大功能,可以帮助我们从海量数据中快速找到需要的内容。倒排索引倒排索引的概念是基于MySQL这样的正向索引而言的。elasticsearch便是基于倒排索引实现快速查找的功能。倒排索引中有两个非常重要的概......
  • Elasticsearch专题精讲—— REST APIs —— Document APIs —— GET API
     RESTAPIs——DocumentAPIs——GETAPIhttps://www.elastic.co/guide/en/elasticsearch/reference/8.8/docs-get.html#docs-getRetrievesthespecifiedJSONdocumentfromanindex.从索引中检索指定的JSON文档。......
  • 服务注册与发现-etcd 遇见的问题
    服务注册与发现-etcd遇见的问题问题现象grpcclient调用server,通过etcd提供服务发现能力2023/06/0211:25:33scheme:etcd;{"level":"warn","ts":"2023-06-02T11:25:33.444+0800","logger":"etcd-client","caller":&qu......
  • docker安装elasticsearch
    一、环境CentOSelasticsearch5.6.12二、安装1.镜像拉取dockerpullelasticsearch:5.6.122.启动镜像dockerrun--nameelasticsearch-d-eES_JAVA_OPTS="-Xms256m-Xmx256m"-e"discovery.type=single-node"-p9200:9200-p9300:9300elasticsearch:5.6.12注......
  • kafka动态生产者
    packagecom.sunclouder.das.data.kafka.forward;importcn.hutool.core.util.StrUtil;importcn.hutool.json.JSONObject;importcn.hutool.json.JSONUtil;importcom.sunclouder.das.data.kafka.entity.ConfigEntity;importcom.sunclouder.das.data.kafka.entity.DasConfig......
  • elasticsearch常用命令总结
    目录#查看集群状态curlhttp://*:9200/_cluster/health?pretty#查看所有索引状态curl"http://*:9200/_cat/indices?pretty"#查看异常索引状态curl"http://*:9200/_cat/indices?v&health=red"#查看异常索引分片分配状态curl"http://*:9200/_cat/shards/your_inde......
  • Kafka环境的配置
    大家对于消息队列,想必不会很陌生,特别是ActiveMQ和RabbitMQ,今天我将会为大家介绍下Kafka在centOs系统中的安装。第一步:准备好包。对于kafka,你需要zookeeper,所以你需要下载zookeeper。点击zookeeper下载下载zookeeper后放入到centos中.放入文件夹software中。接着准备kafka.点击下......
  • Linux centos7 ppc64le编译安装MySQL8遇见问题
    一.关于Nopackagedevtoolset-7-gccavailable.的解决办法1.使用centos默认yum源2.依次执行以下命令yuminstall-ycentos-release-sclyuminstall-ydevtoolset-7 二.cmake3>=3.6.1isneededbymysql-community-8.0.18-1.el7.ppc64le安装cmake3yuminstall......