首页 > 数据库 >使用mongodb、Kafka保存mqtt消息

使用mongodb、Kafka保存mqtt消息

时间:2024-06-22 09:22:06浏览次数:26  
标签:mqttEvents mongodb Kafka mqtt var new Server mqttServer

一、引言

随着物联网技术的迅猛发展,大量的设备和传感器产生了海量的数据。本文利用了 MQTT、Kafka 和 MongoDB 各自的优点,满足实时数据处理和大规模数据存储的需求。 如图: 0

二、总结

优点:

1. 可靠和解耦:

Kafka的复制机制和持久化存储确保了数据在传输过程中的可靠性,即使某个节点发生故障,也不会导致数据丢失,将数据生产者和消费者解耦,各模块可以独立扩展和优化,减少了相互影响。
2. 高可用和灵活性:

MongoDB的复制集和分片机制提供了数据的高可用性和容错能力,保证了数据存储的可靠性和灵活性。

缺点:

1. 复杂度高:

包含多个组件(MQTT、Kafka、MongoDB)配置、部署和维护、各组件之间的协调和集成也增加了实现的复杂性。
2. 延迟:

数据从设备上传到最终存储在MongoDB之间经过多个处理环节,每个环节都可能增加一些延迟。
3. 一致性:

数据在Kafka和MongoDB之间传递时可能需要额外的处理机制来确保一致性。

三、实现

准备工作

使用docker-compose.yml创建Kafka服务和MongoDB,简易代码如下:
version: '3.8'

networks:
  app-tier:
    driver: bridge

services:
  kafka:
    image: 'bitnami/kafka:latest'
    networks:
      - app-tier
    ports:
      - "9092:9092"
    environment:
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    volumes:
      - kafka-data:/bitnami/kafka

  mongodb:
    image: 'mongo:latest'
    networks:
      - app-tier
    container_name: mongodb
    ports:
      - "27017:27017"
    volumes:
      - mongo-data:/data/db

volumes:
  kafka-data:
    driver: local
  mongo-data:
    driver: local
View Code

实现步骤

1. 设备数据上传: 服务端代码
var mqttFactory = new MqttFactory();
            var mqttServerOptions = new MqttServerOptionsBuilder()
                .WithDefaultEndpointPort(1883)//监听的端口
                .WithDefaultEndpoint()
                .WithoutEncryptedEndpoint()// 不启用tls
                .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(10 * 1000))//10秒超时
                .WithPersistentSessions(true)//启用session
                .WithConnectionBacklog(1000)//积累的最大连接请求数
                .Build();
            using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
            {
                AddMqttEvents(mqttServer);

                await mqttServer.StartAsync();
                Console.WriteLine("Press Enter Ctrl+C to exit.");
                Console.ReadLine();
                Console.CancelKeyPress += async (sender, e) =>
                {
                    e.Cancel = true; // 防止进程直接终止
                    await mqttServer.StopAsync();
                    Environment.Exit(0);
                };
            }

private static void AddMqttEvents(MqttServer mqttServer)
        {
            MqttServerEvents mqttEvents = new MqttServerEvents();
            mqttServer.ClientConnectedAsync += mqttEvents.Server_ClientConnectedAsync;
            mqttServer.StartedAsync += mqttEvents.Server_StartedAsync;
            mqttServer.StoppedAsync += mqttEvents.Server_StoppedAsync;
            mqttServer.ClientSubscribedTopicAsync += mqttEvents.Server_ClientSubscribedTopicAsync;
            mqttServer.ClientUnsubscribedTopicAsync += mqttEvents.Server_ClientUnsubscribedTopicAsync;
            mqttServer.ValidatingConnectionAsync += mqttEvents.Server_ValidatingConnectionAsync;
            mqttServer.ClientDisconnectedAsync += mqttEvents.Server_ClientDisconnectedAsync;
            mqttServer.InterceptingInboundPacketAsync += mqttEvents.Server_InterceptingInboundPacketAsync;
            mqttServer.InterceptingOutboundPacketAsync += mqttEvents.Server_InterceptingOutboundPacketAsync;
            mqttServer.InterceptingPublishAsync += mqttEvents.Server_InterceptingPublishAsync;
            mqttServer.ApplicationMessageNotConsumedAsync += mqttEvents.Server_ApplicationMessageNotConsumedAsync;
            mqttServer.ClientAcknowledgedPublishPacketAsync += mqttEvents.Server_ClientAcknowledgedPublishPacketAsync;
        }
View Code

客户端代码

 var mqttFactory = new MqttFactory();
            var mqttClient = mqttFactory.CreateMqttClient();

            var mqttOptions = new MqttClientOptionsBuilder()
                .WithClientId("MqttServiceClient")
                .WithTcpServer("127.0.0.1", 1883)
                .Build();
            mqttClient.ConnectedAsync+=(e =>
            {
                Console.WriteLine("MQTT连接成功");
                return Task.CompletedTask;
            });

            mqttClient.DisconnectedAsync+=(e =>
            {
                Console.WriteLine("MQTT连接断开");
                return Task.CompletedTask;
            });
            await mqttClient.ConnectAsync(mqttOptions, CancellationToken.None);
    //发送消息
MqttApplicationMessage applicationMessage = new MqttApplicationMessage
                    {
                        Topic = "mqtttest",
                        PayloadSegment = new ArraySegment<byte>(System.Text.Encoding.UTF8.GetBytes(input))
                    };

      var res = await mqttClient.PublishAsync(applicationMessage);
View Code 2. Kafka消息处理:

生产者代码

        var config = new ProducerConfig
            {
                BootstrapServers = "localhost:9092"
            };
            using var producer = new ProducerBuilder<string, string>(config).Build();
            try
            {
                var message = new Message<string, string>
                {
                    Key = e.ClientId,
                    Value = JsonConvert.SerializeObject(e.Packet)
                };
                var deliveryResult = await producer.ProduceAsync("mqttMsg-topic", message);
                Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
            }
            catch (ProduceException<string, string> ke)
            {
                Console.WriteLine($"Delivery failed: {ke.Error.Reason}");
            }    
View Code

消费者代码

var config = new ConsumerConfig
            {
                GroupId = "my-consumer-group",
                BootstrapServers = "127.0.0.1:9092",
                AutoOffsetReset = AutoOffsetReset.Earliest
            };
            using var consumer = new ConsumerBuilder<string, string>(config).Build();
            consumer.Subscribe("mqttMsg-topic");
//消费消息并保存到mongodb
 var client = new MongoClient("mongodb://127.0.0.1:27017");
                var collection = client.GetDatabase("mqtttest").GetCollection<BsonDocument>($"history_{DateTime.UtcNow.Year}_{DateTime.UtcNow.Month}");
                while (true)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(cancellationToken.Token);
                        Console.WriteLine($"收到Kafka消息 '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
                        var document = new BsonDocument
                        {
                            { "clientId", consumeResult.Message.Key },
                            { "JsonData", MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>(consumeResult.Message.Value) },//不同设备上报数据格式不一定一样
                            { "created", DateTime.UtcNow }
                        };
                        await collection.InsertOneAsync(document);
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"处理Kafka消息异常: {e.Error.Reason}");
                    }
                }
View Code

源码地址:https://github.com/jclown/MqttPersistence

标签:mqttEvents,mongodb,Kafka,mqtt,var,new,Server,mqttServer
From: https://www.cnblogs.com/chumochen/p/18259584

相关文章

  • 消息队列kafka中间件详解:案例解析(第10天)
    系列文章目录1-消息队列(熟悉)2-Kafka的基本介绍(掌握架构,其他了解)3-Kafka的相关使用(掌握kafka常用shell命令)4-Kafka的PythonAPI的操作(熟悉)文章目录系列文章目录前言一、消息队列(熟悉)1、产生背景2、消息队列介绍2.1常见的消息队列产品2.2应用场景2.3消息队列中两......
  • Kafka 新的消费组默认的偏移量设置和消费行为
    个人名片......
  • 面试官问:Kafka 会不会丢消息?怎么处理的?
    作者:Java3y链接:https://www.zhihu.com/question/628325953/answer/3281764326来源:知乎著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。面试官:今天我想问下,你觉得Kafka会丢数据吗?候选者:嗯,使用Kafka时,有可能会有以下场景会丢消息候选者:比如说,我们用Produce......
  • kafka 如何保证不重复消费又不丢失数据?
    作者:Java3y链接:https://www.zhihu.com/question/483747691/answer/2392949203来源:知乎著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。面试官:今天我想问下,你觉得Kafka会丢数据吗?候选者:嗯,使用Kafka时,有可能会有以下场景会丢消息候选者:比如说,我们用Produce......
  • 剖析 Kafka 消息丢失的原因
    目录前言一、生产者导致消息丢失的场景场景1:消息体太大解决方案:1、减少生产者发送消息体体积2、调整参数max.request.size场景2:异步发送机制解决方案:1、使用带回调函数的发送方法场景3:网络问题和配置不当解决方案:1、设置acks参数设置为"all"2、设置重试参数3、设置min.insync.......
  • KAFKA配置 SASL_SSL双重认证
    1.背景kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。SASL:是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。SSL:是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并......
  • Linux 安装mongodb
    1.1Mongodb要求使用最新稳定版本安装包下载地址:https://www.mongodb.com/try/download/community本次选择:mongodb-linux-x86_64-rhel70-4.4.13.tgz1.2安装步骤上传安装包到服务器,并解压#tar–zxvfmongodb-linux-x86_64-rhel70-4.4.13.tgz 重命名解压后的文件名#mvmo......
  • 剖析 Kafka 消息丢失的原因
    文章目录前言一、生产者导致的消息丢失的场景场景1:消息太大解决方案:1、减少生产者发送消息体体积2、调整参数max.request.size场景2:异步发送机制解决方案:1、使用带回调函数的发送方法场景3:网络问题和配置不当解决方案:1、设置`acks`参数设置为"all"2、设置重试参数......
  • Docker部署安装应用大集合(Tomcat、Nginx、Mysql、Redis、MQ、Nacos、Zookeeper、Port
    Docker部署安装应用大集合(Tomcat、Nginx、Mysql、Redis、MQ、Nacos、Zookeeper、Portainer、MongoDB......) 精选 原创CodeDevMaster2022-11-1608:42:24博主文章分类:Docker©著作权文章标签dockermysqlNginxNacosMQ文章分类Docker云计算yyds干货盘点 Docker部署......
  • Kafka集群保姆级部署教程
    目录资源列表基础环境修改主机名关闭防火墙关闭selinux安装JAVA安装Kafka下载Kafka解压修改配置文件kafka01kafka02kafka03启动服务启动ZK启动Kafka验证测试创建topic查看topic        今天给大家分享的是Kafka分布式集群部署,上次分享的单机版的k......