首页 > 其他分享 >简单的.NET 8 Web API使用Kafka 发布订阅模式,示例api示例

简单的.NET 8 Web API使用Kafka 发布订阅模式,示例api示例

时间:2024-01-16 22:23:04浏览次数:21  
标签:Web 示例 Kafka public WeatherUpdatesProducer api var new topicName

简单的.NET 8 Web API使用Kafka 发布订阅模式,示例api示例
  • kafka

当使用Kafka时,我们需要使用Kafka的客户端库来与Kafka集群进行通信。在.NET Core中,可以使用Confluent.Kafka客户端库来实现与Kafka的集成。首先,我们需要在项目中添加Confluent.Kafka库的引用。

首先,使用NuGet包管理器或者命令行工具添加Confluent.Kafka库的引用:

dotnet add package Confluent.Kafka

  

接下来,我们可以创建一个名为WeatherUpdatesProducer的类,用于生成天气预报数据并将其发布到Kafka的主题中:


// WeatherUpdatesProducer.csusing Confluent.Kafka; using System; using System.Threading; namespaceWebApiDemo { publicclassWeatherUpdatesProducer { privatereadonly IProducer<Null, string> _producer; privatereadonlystring _topicName; public WeatherUpdatesProducer(string bootstrapServers, string topicName) { _topicName = topicName; var config = new ProducerConfig { BootstrapServers = bootstrapServers }; _producer = new ProducerBuilder<Null, string>(config).Build(); } public void ProduceWeatherUpdates() { var rng = new Random(); for (int i = 0; i < 5; i++) { var forecast = new { Date = DateTime.Now.AddDays(i), TemperatureC = rng.Next(-20, 55), Summary = Summaries[rng.Next(Summaries.Length)] }; var message = $"{forecast.Date}: {forecast.Summary}"; _producer.Produce(_topicName, new Message<Null, string> { Value = message }); Thread.Sleep(1000); // 模拟每秒发布一次消息 } } privatestaticreadonlystring[] Summaries = new[] { "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching" }; } }

  在这个示例中,我们创建了一个名为WeatherUpdatesProducer的类,它负责生成天气预报数据并将其发布到Kafka的主题中。我们使用Confluent.Kafka库中的IProducer接口来实现与Kafka的通信。

接下来,我们可以创建一个名为WeatherUpdatesConsumer的类,用于订阅Kafka的主题并处理接收到的天气更新消息:


// WeatherUpdatesConsumer.csusing Confluent.Kafka; using System; namespaceWebApiDemo { publicclassWeatherUpdatesConsumer { privatereadonly IConsumer<Ignore, string> _consumer; public WeatherUpdatesConsumer(string bootstrapServers, string groupId, string topicName) { var config = new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupId, AutoOffsetReset = AutoOffsetReset.Earliest }; _consumer = new ConsumerBuilder<Ignore, string>(config).Build(); _consumer.Subscribe(topicName); } public void ConsumeWeatherUpdates() { try { while (true) { try { var message = _consumer.Consume(); Console.WriteLine($"Received weather update: {message.Value}"); } catch (ConsumeException e) { Console.WriteLine($"Error occurred: {e.Error.Reason}"); } } } catch (OperationCanceledException) { _consumer.Close(); } } } }

  

在这个示例中,我们创建了一个名为WeatherUpdatesConsumer的类,它负责订阅Kafka的主题并处理接收到的天气更新消息。我们使用Confluent.Kafka库中的IConsumer接口来实现消费者的功能。

接下来,在Startup.cs中注册WeatherUpdatesProducer和WeatherUpdatesConsumer服务,并在应用启动时分别调用ProduceWeatherUpdates和ConsumeWeatherUpdates方法:

// Startup.csusing Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespaceWebApiDemo
{
    publicclassStartup
    {
        public Startup(IConfiguration configuration)
        {
            Configuration = configuration;
        }

        public IConfiguration Configuration { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddControllers();

            // 注册WeatherUpdatesProducer服务
            services.AddSingleton<WeatherUpdatesProducer>(provider =>
            {
                var bootstrapServers = Configuration["Kafka:BootstrapServers"];
                var topicName = Configuration["Kafka:TopicName"];
                returnnew WeatherUpdatesProducer(bootstrapServers, topicName);
            });

            // 注册WeatherUpdatesConsumer服务
            services.AddSingleton<WeatherUpdatesConsumer>(provider =>
            {
                var bootstrapServers = Configuration["Kafka:BootstrapServers"];
                var groupId = Configuration["Kafka:GroupId"];
                var topicName = Configuration["Kafka:TopicName"];
                returnnew WeatherUpdatesConsumer(bootstrapServers, groupId, topicName);
            });
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env, WeatherUpdatesProducer weatherUpdatesProducer, WeatherUpdatesConsumer weatherUpdatesConsumer)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });

            // 在应用启动时生成天气预报数据并发布到Kafka
            weatherUpdatesProducer.ProduceWeatherUpdates();

            // 在应用启动时订阅Kafka主题并处理天气更新消息
            weatherUpdatesConsumer.ConsumeWeatherUpdates();
        }
    }
}

 在这个示例中,我们使用Configuration来获取Kafka的连接配置信息,并在应用启动时分别调用WeatherUpdatesProducer的ProduceWeatherUpdates方法来生成天气预报数据并发布到Kafka,以及调用WeatherUpdatesConsumer的ConsumeWeatherUpdates方法来订阅Kafka的主题并处理接收到的天气更新消息。

这样,我们就实现了基于Kafka的消息发布和订阅功能。当应用启动时,会生成天气预报数据并发布到Kafka的主题中,同时也会订阅Kafka的主题并处理接收到的天气更新消息。

 

// WeatherUpdatesProducer.csusing Confluent.Kafka; using System; using System.Threading; namespaceWebApiDemo { publicclassWeatherUpdatesProducer { privatereadonly IProducer<Null, string> _producer; privatereadonlystring _topicName; public WeatherUpdatesProducer(string bootstrapServers, string topicName) { _topicName = topicName; var config = new ProducerConfig { BootstrapServers = bootstrapServers }; _producer = new ProducerBuilder<Null, string>(config).Build(); } public void ProduceWeatherUpdates() { var rng = new Random(); for (int i = 0; i < 5; i++) { var forecast = new { Date = DateTime.Now.AddDays(i), TemperatureC = rng.Next(-20, 55), Summary = Summaries[rng.Next(Summaries.Length)] }; var message = $"{forecast.Date}: {forecast.Summary}"; _producer.Produce(_topicName, new Message<Null, string> { Value = message }); Thread.Sleep(1000); // 模拟每秒发布一次消息 } } privatestaticreadonlystring[] Summaries = new[] { "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching" }; } }

标签:Web,示例,Kafka,public,WeatherUpdatesProducer,api,var,new,topicName
From: https://www.cnblogs.com/Leo_wl/p/17968690

相关文章

  • Springboot3+Vue3在进行WebSocket通讯时出现No mapping for GET或者是404
    参考:在SpringBoot中整合、使用WebSocket-spring中文网(springdoc.cn)===============================原代码(此时前端访问后端,后端会出现:NomappingforGET/wspath)前端相关代码:letsocket:WebSocket|null=nullconstsocketURL=`ws://127.0.0.1:8084/w......
  • HarmonyOS4.0系列——01、下载、安装、配置环境、搭建页面以及运行示例代码
    HarmonyOS4.0应用开发安装编辑器这里安装windows版本为例安装依赖打开DevEcoStudio这八项全部打钩即可开始编写代码,如果存在x,需要安装正确的库即可开发点击CreateProject选择默认模板——nextModel部分分为Stage和FA两个应用模型,FA是支持7版本以内的模型支持JS和TS,而Stage支持最......
  • Web 实时消息推送
    总结以下内容为JavaGuide补充 介绍优点缺点短轮询客户端定时向服务端发送请求,服务端直接返回响应数据(即使没有数据更新)简单、易理解、易实现实时性太差,无效请求太多,频繁建立连接太耗费资源长轮询与短轮询不同是,长轮询接收到客户端请求之后等到有数据更新才......
  • FastAPi Celery RabbitMQ 与 Redis 的使用,并使用 Flower 监控 Celery 状态
    FastAPiCeleryRabbitMQ与Redis的使用,并使用Flower监控Celery状态本文介绍了Windows下FastAPiCelery使用RabbitMQ与Redis做代理的使用方法,本文参考了国外大佬的文章,并做了修改与补充,原文见这里,SumanDas,他文章中的完整代码,见这里,GitHubRabbitMQ与Redis的......
  • 如何通过WebDAV服务器访问铁威马NAS
    WebDAV是HTTP协议的扩展,可让用户管理存储在远程服务器上的文件,可以使用用户名和密码来进行访问,同时直接拷贝,编辑或删除共享空间内的文件。启用WebDAV服务器后,可使用支持WebDAV的客户端程序(如WinSCP、RaiDrive、MacOSFinder、Linux资源管理器)访问TNAS设备。接下来为大家分享......
  • dremio 测试特性api 的开启&外部profile查看
    以前简单说过基于代码修改开启test的外部profile能力,实际上官方是由配置参数的,可以在启动的时候添加到配置中配置添加dremio.conf文件debug{allowTestApis:true}检查选项时候开启的一个技巧使用arthas命令使用了arthas的vmtool也可以结......
  • 处理跨域请求的API接口数据
     在Web开发中,跨域请求是一个常见的问题。由于浏览器的安全策略限制,JavaScript在发送HTTP请求时只能访问同源下的资源,即协议、域名、端口号都必须一致。然而,有时我们需要从不同域名下获取数据,这就涉及到了跨域请求的问题。为了解决这个问题,我们可以使用API接口来处理跨域请求。......
  • 《eslint篇》webstorm配置eslint校验
    法1:安依赖方式参考链接:https://blog.csdn.net/weixin_43575792/article/details/1232478621.安装依赖并初始化文件首先安装eslintnpminstalleslint--save-dev安装完成后我们开始初始化eslint配置文件npminit@eslint/config上述选择大家根据自己的需求来改变,博......
  • docker jmeter分布式压测部署 jmeter websocket压测
    测试场景:1.多名用户加入房间。2.房间人数为固定人数(比如4人) 3.有人进入时,进入用户会收到反馈当前房间人员列表。4.其他人会收到反馈新加入用户的信息消息。5.当人数已满时,会自动推送消息给所有人。6.在人满后,每个人需要按固定序列,发送消息。7.所有人发送特定消息后,推进房......
  • JMeter测试WebSocket的经验总结
    最近有一个微信聊天系统的项目需要性能测试,既然是测试微信聊天,肯定绕不开websocket接口的测试,首选工具是Jmeter,网上能搜到现成的方法,但是网上提供的jar包往往不是最新的,既然是用最新版本的Jmeter4.0,那么所依赖的插件jar包也应该追求新的。所以提供了以下链接供大家下载(甚至连源码......