首页 > 编程语言 >实例讲解:NodeJS 操作 Kafka

实例讲解:NodeJS 操作 Kafka

时间:2023-11-22 15:31:34浏览次数:44  
标签:NodeJS -- zookeeper kafka topic windows 实例 Kafka consumer


本人是C#出身的程序员,c#很简单就能实现,有需要的可以加我私聊。但是就目前流行的开发语言,尤其是面向web方向应用的,我感觉就是Nodejs最简单了。下面介绍:

本文将会介绍在windows环境下启动Kafka,并通过nodejs作为客户端,生产和消费消息。

步骤一,Kafka需要java运行时,先安装配置java环境。下载jdk安装配置。通过在命令行中输入java -version 确认java是否成功安装(可能需要查看windows的环境变量中是否有java,java版本是多少)。

步骤二Kafka官网下载最新版本的压缩包(.tgz格式),并解压。分别在两个命令行里面启动zookeeper、kafka(解压缩路径下)

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

bin\windows\kafka-server-start.bat config\server.properties

说明一下zookeeper和kafka的关系:zookeeper是集群的调度者,kafka才是消息队列。 zookeeper的默认端口:2181,kafka的默认端口:9092
相关配置可以在config文件下的server.properties和zookeeper.properties中找到

实例讲解:NodeJS 操作 Kafka_kafka

实例讲解:NodeJS 操作 Kafka_java_02编辑

用记事本打开就可以编辑

建立data,logs,kafka-logs目录,用于日志,备用。

消费者客户端需要的group.id可以在config->consumer.properties中找到。

实例讲解:NodeJS 操作 Kafka_nodejs_03

实例讲解:NodeJS 操作 Kafka_nodejs_04编辑

步骤三,使用DOS的CMD管理员命令行方式测试生产者生产、消费者消费。
//创建一个topic:test
bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

//查看topic
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

//创建生产者主题mytest
bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test-nodetopic

//创建消费者消费mytest
bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-nodetopic --from-beginning

步骤四,生产者发送消息
在生产者窗口,随意输入一条消息,可以在消费者窗口看到该消息。

实例讲解:NodeJS 操作 Kafka_zookeeper_05

实例讲解:NodeJS 操作 Kafka_C#_06编辑

最后,使用nodejs访问kafka  首先安装kafkajs

初始化项目

npm init -y

没有安装kafkajs的,在这里可以安装,互联网在线安装。
npm install kafkajs

新建demo.js,输入以下代码

const { Kafka } = require('kafkajs')

const kafka = new Kafka({

  clientId: 'my-app',

  brokers: ['localhost:9092']

})

const producer = kafka.producer()

const consumer = kafka.consumer({ groupId: 'test-consumer-group' })

const run = async () => {

  // Producing

  await producer.connect()

  await producer.send({

    topic: 'test-nodetopic',

    messages: [

      { value: ' Hello KafkaJS user,I am producer ! ' },

    ],

  })

  // Consuming

  await consumer.connect()

  await consumer.subscribe({ topic: 'test-nodetopic', fromBeginning: true })

  await consumer.run({

    eachMessage: async ({ topic, partition, message }) => {

      console.log({

        partition,

        offset: message.offset,

        value: message.value.toString(),

      })

    },

  })

}

run().catch(console.error)

最后执行命令
node demo.js



标签:NodeJS,--,zookeeper,kafka,topic,windows,实例,Kafka,consumer
From: https://blog.51cto.com/u_15377820/8517663

相关文章

  • 实例讲解C++连接各种数据库,包含SQL Server、MySQL、Oracle、ACCESS、SQLite 和 Postgr
     C++是一种通用的编程语言,可以使用不同的库和驱动程序来连接各种数据库。以下是一些示例代码,演示如何使用C++连接SQLServer、MySQL、Oracle、ACCESS、SQLite和PostgreSQL、MongoDB数据库。连接SQLServer数据库要使用C++连接SQLServer数据库,可以使用Micro......
  • 单例模式 ----实例化类的方法
    定义:确保一个类最多只有一个实例,并提供一个全局访问点单例模式可以分为两种:预加载和懒加载预加载:顾名思义,就是预先加载。再进一步解释就是还没有使用该单例对象,但是,该单例对象就已经被加载到内存了。         很明显,没有使用该单例对象,该对象就被加载到了内......
  • Redis集群的实例什么情况使用redis集群和哨兵
    当考虑Redis集群和哨兵的使用时,我们可以考虑一个在线购物系统的场景,其中需要处理用户会话数据。这个例子将涵盖横向扩展、高可用性和故障处理的方面。场景描述:假设你的在线购物系统使用Redis存储用户会话数据,以提供个性化的购物体验。用户的会话数据包括购物车、用户偏好设置等......
  • nodejs学习04——express框架
    搭建环境新建一个文件夹LearnExpress,命令行://初始化包npminit//安装expressnpmiexpress初体验//1.导入expressconstexpress=require('express');//2.创建应用对象constapp=express();//3.创建路由规则app.get('/home',(req,res)=>{ //res.end('......
  • nodejs升级引起的构建错误
      参考实际使用升级webpacknpminstall-Dwebpack@latest升级vue-clinpminstall-g@vue/cli创建vue-clidemo重新配置vue.configwebpack>5报错问题 1、运行下面这行指令,安装在Webpack中PolyfillNode.js核心模块。npminstallnode-polyfill-webpack......
  • Lightsail VPS 实例在哪些方面胜过 EC2 实例?
    文章作者:Libai引言LightsailVPS 实例和 EC2 实例是云计算领域中两种受欢迎的技术。虽然两者都提供虚拟服务器解决方案,但了解 LightsailVPS 实例在哪些方面胜过 EC2 实例非常重要。在本文中,我们将探讨这两种技术之间的关键区别,并突出使用 LightsailVPS 实例的优势。......
  • centos7安装Kafka
    参考:https://blog.csdn.net/yang1393214887/article/details/1234257151.官网下载https://kafka.apache.org/downloadshttps://dlcdn.apache.org/kafka/3.1.0/kafka_2.12-3.1.0.tgz2.上传到centos7/data/kafka/目录,解压到此目录pwd#当前文件位置mv/home/sili/kafka_2.12-......
  • strimzi operator 部署kafka集群
    环境说明本环境使用了单节点、临时存储集群的kafka-ephemeral-single配置。线上环境推荐kafka-persistent.yaml配置并修改storage配置为自动创建pv/pvc类型。配置清单说明1.kafka-ephemeral-single.yaml:非持久化存储,单节点集群;2.kafka-ephemeral.yaml:非持久化存储,多节点集群......
  • Kafka异常——The coordinator is not available
    之前架设了一个Kafka集群,跑了很久没有什么错误,最近开发的小伙伴跟我说部分kafka不能消费了,了解详细情况后,自己也赶紧作了个测试,发现是有报错...Causedby:rg.apache.kafka.common.errors.CoordinatorNotAvailableException:Thecoordinatorisnotavailable....报错在网上......
  • kafka
    kafka下载路径:(https://kafka.apache.org/downloads)一、kafka单机安装1.1上传jdk环境jdk-8u202-linux-x64.tar.gzkafka_2.12-3.5.1.tgz1.2解压安装包tarxfjdk-8u202-linux-x64.tar.gz-C/usr/local/cd/usr/local/mvjdk1.8.0_202/java1.3编写环境变量文......