首页 > 其他分享 >ABP 集成ActiveMQ客户端

ABP 集成ActiveMQ客户端

时间:2022-11-01 17:35:22浏览次数:46  
标签:consumer ABP connection activeProducerOptions var new ActiveMQ public 客户端

客户端代码

  1 public  class ActiveMQConnectionService
  2     {
  3         public ActiveProducerOption activeProducerOptions;
  4         private readonly IDatasAppService _datasAppService;
  5 
  6         public ActiveMQConnectionService(ActiveProducerOption activeProducerOptions, IDatasAppService datasAppService)
  7         {
  8             this.activeProducerOptions = activeProducerOptions;
  9             ConnectServer();
 10             _datasAppService = datasAppService;
 11         }
 12 
 13         /// <summary>
 14         /// 连接方式一
 15         /// </summary>
 16         public void ConnectServer()
 17         {
 18             var url = "tcp://" + activeProducerOptions.IpAddress + ":" + activeProducerOptions.Port;
 19             IConnectionFactory factory = new ConnectionFactory(url);
 20             try
 21             {
 22                 //通过工厂构建连接
 23                 var connection = factory.CreateConnection(activeProducerOptions.UserName, activeProducerOptions.Password);
 24                 //这个是连接的客户端名称标识
 25                 connection.ClientId = activeProducerOptions.ClientId;
 26                 connection.Start();
 27                 //通过连接创建一个会话
 28                 ISession session = connection.CreateSession();
 29                 //通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
 30                 //IMessageConsumer consumer = session.CreateConsumer(new ActiveMQTopic(activeProducerOptions.Topic));
 31                 IMessageConsumer consumer = session.CreateDurableConsumer(new ActiveMQTopic(activeProducerOptions.Topic), activeProducerOptions.ClientId, null, false);
 32                 //注册监听事件
 33                 consumer.Listener += new MessageListener(consumer_Listener);
 34             }
 35             catch (Exception ex)
 36             {
 37                 var a = ex;
 38             }
 39         }
 40         /// <summary>
 41         /// 连接方式二
 42         /// </summary>
 43         public  void ConnectServer2()
 44         {
 45             #region 
 46             var url = "tcp://" + activeProducerOptions.IpAddress + ":" + activeProducerOptions.Port;
 47             //创建连接工厂
 48             IConnectionFactory factory = new ConnectionFactory(url);
 49             try
 50             {
 51                 //通过工厂构建连接
 52                 var connection = factory.CreateConnection(activeProducerOptions.UserName, activeProducerOptions.Password);
 53                 //这个是连接的客户端名称标识
 54                 connection.ClientId = activeProducerOptions.ClientId;
 55                 connection.Start();
 56                 //通过连接创建一个会话
 57                 ISession session = connection.CreateSession();
 58                 //通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
 59                 //IMessageConsumer consumer = session.CreateConsumer(new ActiveMQTopic("aimms.aimmsmq.topic.aimms.aimms.detection.data"));
 60                 IMessageConsumer consumer = session.CreateDurableConsumer(new ActiveMQTopic(activeProducerOptions.Topic), activeProducerOptions.ClientId, null, false);
 61                 //注册监听事件
 62                 consumer.Listener += new MessageListener(consumer_Listener);
 63                 //ITextMessage message;
 64                 //while (true)
 65                 //{
 66                 //    try
 67                 //    {
 68                 //        message = (ITextMessage)consumer.Receive();
 69                 //        Console.WriteLine("Receive msg:" + message.Text);
 70                 //    }
 71                 //    catch (Exception ex)
 72                 //    {
 73                 //        Console.WriteLine("接收消息失败!" + ex);
 74                 //    }
 75                 //}
 76                 //connection.Stop();
 77                 //connection.Close();  
 78             }
 79             catch (Exception ex)
 80             {
 81                 var a = ex;
 82             }
 83             #endregion
 84 
 85 
 86         }
 87         /// <summary>
 88         /// 监听识别结果
 89         /// </summary>
 90         /// <param name="message"></param>
 91         public  async void consumer_Listener(IMessage message)
 92         {
 93             ITextMessage msg = (ITextMessage)message;
 94 
 95             //异步调用下,否则无法回归主线程
 96             //tbReceiveMessage.Invoke(new DelegateRevMessage(RevMessage), msg);
 97             await _datasAppService.GetIndetificatioResult(msg.Text);//数据处理
 98 
 99         }
100 
101     }
public class ActiveProducerOption
    {
        public int Port { get; set; }
        public string IpAddress { get; set; }
        public string UserName { get; set; }
        public string Password { get; set; }
        public string ClientId { get; set; }
        public string Topic { get; set; }
    }

StartUp

 1  public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
 2  {
 3 
 4   ...
 5  //启动ActiveMQ连接服务
 6   ConfigureActiveMQ(env, serviceProvider);
 7 }
 8 
 9 private void ConfigureActiveMQ(IWebHostEnvironment env, IServiceProvider serviceProvider)
10 {
11             using (var scope = serviceProvider.CreateScope())
12             {
13                 var picServer = scope.ServiceProvider
14                         .GetRequiredService<IDatasAppService>();//实例化应用层对象
15                 var port = _appConfiguration["ActiveMQSetting:Port"];
16                 new ActiveMQConnectionService(new ActiveProducerOption()
17                 {
18                     IpAddress = _appConfiguration["ActiveMQSetting:IpAddress"],
19                     Port = string.IsNullOrWhiteSpace(port) ? 7018 : int.Parse(port),
20                     UserName = _appConfiguration["ActiveMQSetting:UserName"],
21                     Password = _appConfiguration["ActiveMQSetting:Password"],
22                     ClientId = _appConfiguration["ActiveMQSetting:ClientId"],
23                     Topic = _appConfiguration["ActiveMQSetting:Topic"],
24                 }, picServer);
25             }
26 }

 

标签:consumer,ABP,connection,activeProducerOptions,var,new,ActiveMQ,public,客户端
From: https://www.cnblogs.com/sugarwxx/p/16848516.html

相关文章

  • ABP Value cannot be null. (Parameter 'unitOfWork')
    ABPValuecannotbenull.(Parameter'unitOfWork') 解决方式将需要处理的代码放置到如下位置privatereadonlyIUnitOfWorkManager_unitOfWorkManager;using(v......
  • unity3d:最简单的服务器,把收到消息发回客户端
    usingUnityEngine;usingSystem.Collections;usingSystem.Collections.Generic;usingSystem.Net.Sockets;usingSystem.Net;usingSystem.Threading;usingSystem;usin......
  • ActiveMQ的使用
    应用场景:异步处理、应用解耦、流量消峰简介:Apache出品,是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现JMS消息模型:1)P2P点对点模型(Queue队列模型)......
  • python tcp多个客户端连接服务器
    一、传输层**该层为两台主机上的应用程序提供端到端的通信。传输层有两个传输协议:TCP(传输控制协议)和UDP(用户数据报协议)。其中,TCP是一个可靠的面向连接的协议,udp是不可......
  • docker01_配置允许任何IP的客户端连接
    阅读说明操作说明来自《每天5分钟玩转Docker容器技术》,但是具体展示内容和书中不一样,要详细了解的朋友可以翻阅书中2.3节。这里仅作为阅读笔记。 操作步骤1.找......
  • grpc demo python客户端 c++服务端
    项目需啊将网站上传的图片传入c++推理引擎,网站使用flask架构,python编写,图片推理引擎是一个单独的server,c++编写,因此用grpc来传输比较合适。理论上来说只要规定好proto文件,......
  • 安装es以及使用客户端
    软件下载地址快https://repo.huaweicloud.com/elasticsearch/慢https://www.elastic.co/cn/downloads/past-releases#elasticsearch注意:安装es需要内存至少4G,否则安装......
  • Redis和数据库双写情况下,客户端展示信息未及时生效
    问题现象:后端更新完数据库记录版本号后,前端展示的记录版本号仍然为老的版本号问题分析:客户端展示的版本号是从缓存中获取的,在更新后端数据库记录的时候,采用双写的方式,但......
  • ActiveMQ
    客户端1publicvoidStartClient()2{3#region方式一4//创建连接工厂5IConnectionFactoryfactory=new......
  • 比Xshell 还好用的 SSH 客户端神器
    个人试用后的确好用界面也还凑合,主要是解决了crt的长连接问题推荐链接:https://mp.weixin.qq.com/s?__biz=MzU2NDc4MjE2Ng==&mid=2247493070&idx=3&sn=531a178d919516a0ef......