一、引言
随着物联网技术的迅猛发展,大量的设备和传感器产生了海量的数据。本文利用了 MQTT、Kafka 和 MongoDB 各自的优点,满足实时数据处理和大规模数据存储的需求。 如图:二、总结
优点:
1. 可靠和解耦:
Kafka的复制机制和持久化存储确保了数据在传输过程中的可靠性,即使某个节点发生故障,也不会导致数据丢失,将数据生产者和消费者解耦,各模块可以独立扩展和优化,减少了相互影响。
2. 高可用和灵活性:
MongoDB的复制集和分片机制提供了数据的高可用性和容错能力,保证了数据存储的可靠性和灵活性。
缺点:
1. 复杂度高:
包含多个组件(MQTT、Kafka、MongoDB)配置、部署和维护、各组件之间的协调和集成也增加了实现的复杂性。
2. 延迟:
数据从设备上传到最终存储在MongoDB之间经过多个处理环节,每个环节都可能增加一些延迟。
3. 一致性:
数据在Kafka和MongoDB之间传递时可能需要额外的处理机制来确保一致性。
三、实现
准备工作
使用docker-compose.yml创建Kafka服务和MongoDB,简易代码如下:version: '3.8' networks: app-tier: driver: bridge services: kafka: image: 'bitnami/kafka:latest' networks: - app-tier ports: - "9092:9092" environment: - KAFKA_CFG_NODE_ID=0 - KAFKA_CFG_PROCESS_ROLES=controller,broker - KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER volumes: - kafka-data:/bitnami/kafka mongodb: image: 'mongo:latest' networks: - app-tier container_name: mongodb ports: - "27017:27017" volumes: - mongo-data:/data/db volumes: kafka-data: driver: local mongo-data: driver: localView Code
实现步骤
1. 设备数据上传: 服务端代码var mqttFactory = new MqttFactory(); var mqttServerOptions = new MqttServerOptionsBuilder() .WithDefaultEndpointPort(1883)//监听的端口 .WithDefaultEndpoint() .WithoutEncryptedEndpoint()// 不启用tls .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(10 * 1000))//10秒超时 .WithPersistentSessions(true)//启用session .WithConnectionBacklog(1000)//积累的最大连接请求数 .Build(); using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions)) { AddMqttEvents(mqttServer); await mqttServer.StartAsync(); Console.WriteLine("Press Enter Ctrl+C to exit."); Console.ReadLine(); Console.CancelKeyPress += async (sender, e) => { e.Cancel = true; // 防止进程直接终止 await mqttServer.StopAsync(); Environment.Exit(0); }; } private static void AddMqttEvents(MqttServer mqttServer) { MqttServerEvents mqttEvents = new MqttServerEvents(); mqttServer.ClientConnectedAsync += mqttEvents.Server_ClientConnectedAsync; mqttServer.StartedAsync += mqttEvents.Server_StartedAsync; mqttServer.StoppedAsync += mqttEvents.Server_StoppedAsync; mqttServer.ClientSubscribedTopicAsync += mqttEvents.Server_ClientSubscribedTopicAsync; mqttServer.ClientUnsubscribedTopicAsync += mqttEvents.Server_ClientUnsubscribedTopicAsync; mqttServer.ValidatingConnectionAsync += mqttEvents.Server_ValidatingConnectionAsync; mqttServer.ClientDisconnectedAsync += mqttEvents.Server_ClientDisconnectedAsync; mqttServer.InterceptingInboundPacketAsync += mqttEvents.Server_InterceptingInboundPacketAsync; mqttServer.InterceptingOutboundPacketAsync += mqttEvents.Server_InterceptingOutboundPacketAsync; mqttServer.InterceptingPublishAsync += mqttEvents.Server_InterceptingPublishAsync; mqttServer.ApplicationMessageNotConsumedAsync += mqttEvents.Server_ApplicationMessageNotConsumedAsync; mqttServer.ClientAcknowledgedPublishPacketAsync += mqttEvents.Server_ClientAcknowledgedPublishPacketAsync; }View Code
客户端代码
var mqttFactory = new MqttFactory(); var mqttClient = mqttFactory.CreateMqttClient(); var mqttOptions = new MqttClientOptionsBuilder() .WithClientId("MqttServiceClient") .WithTcpServer("127.0.0.1", 1883) .Build(); mqttClient.ConnectedAsync+=(e => { Console.WriteLine("MQTT连接成功"); return Task.CompletedTask; }); mqttClient.DisconnectedAsync+=(e => { Console.WriteLine("MQTT连接断开"); return Task.CompletedTask; }); await mqttClient.ConnectAsync(mqttOptions, CancellationToken.None); //发送消息 MqttApplicationMessage applicationMessage = new MqttApplicationMessage { Topic = "mqtttest", PayloadSegment = new ArraySegment<byte>(System.Text.Encoding.UTF8.GetBytes(input)) }; var res = await mqttClient.PublishAsync(applicationMessage);View Code 2. Kafka消息处理:
生产者代码
var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; using var producer = new ProducerBuilder<string, string>(config).Build(); try { var message = new Message<string, string> { Key = e.ClientId, Value = JsonConvert.SerializeObject(e.Packet) }; var deliveryResult = await producer.ProduceAsync("mqttMsg-topic", message); Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'"); } catch (ProduceException<string, string> ke) { Console.WriteLine($"Delivery failed: {ke.Error.Reason}"); }View Code
消费者代码
var config = new ConsumerConfig { GroupId = "my-consumer-group", BootstrapServers = "127.0.0.1:9092", AutoOffsetReset = AutoOffsetReset.Earliest }; using var consumer = new ConsumerBuilder<string, string>(config).Build(); consumer.Subscribe("mqttMsg-topic"); //消费消息并保存到mongodb var client = new MongoClient("mongodb://127.0.0.1:27017"); var collection = client.GetDatabase("mqtttest").GetCollection<BsonDocument>($"history_{DateTime.UtcNow.Year}_{DateTime.UtcNow.Month}"); while (true) { try { var consumeResult = consumer.Consume(cancellationToken.Token); Console.WriteLine($"收到Kafka消息 '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'."); var document = new BsonDocument { { "clientId", consumeResult.Message.Key }, { "JsonData", MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument>(consumeResult.Message.Value) },//不同设备上报数据格式不一定一样 { "created", DateTime.UtcNow } }; await collection.InsertOneAsync(document); } catch (ConsumeException e) { Console.WriteLine($"处理Kafka消息异常: {e.Error.Reason}"); } }View Code
源码地址:https://github.com/jclown/MqttPersistence
标签:mqttEvents,mongodb,Kafka,mqtt,var,new,Server,mqttServer From: https://www.cnblogs.com/chumochen/p/18259584