首页 > 其他分享 >手写MSMQ微软消息队列收发工具类

手写MSMQ微软消息队列收发工具类

时间:2024-10-18 08:50:45浏览次数:1  
标签:fs MSMQ cfg 收发 ex file 手写 null message

一、MSMQ介绍

MSMQ(Microsoft Message Queuing)是微软开发的消息队列技术,支持事务,支持异步发送和接收消息。

两个重要的概念:队列和消息。队列是存放消息的容器和传输消息的通道。消息是在队列上存储和传输的数据的基本单元;这个消息在计算机上的存在形式可以是任意格式的文件;在C#程序中的消息是各种类型的Class类的实例,在程序中的消息类型可以在创建XmlMessageFormatter实例时指定。消息最大4M。

应用场景可以是异步处理,如系统短时间收到大量数据/请求,到达程序能够处理的请求数上限,可以将待处理的数据/请求全部放入队列中,程序再从队列中读取消息逐个处理。应用场景也可以是系统解耦,最常见的举例如电商平台中,订单系统要将订单数据发给支付系统时,订单系统可以将数据存入队列中,支付系统从队列中读取后处理。这时订单系统不需要调用支付系统的接口,这样订单系统和支付系统可以独立部署减少了依赖。

二、工具类封装

封装的好处:甲方早期项目中大量用到微软的技术,消息队列都是用的MSMQ。我们的多个项目都有对接MQ的需求,将消息收发的功能从多个项目中提取出来放一个独立程序中,后期维护中只需要修改一份代码。同时MQ这部分功能也从业务系统中剥离出来,更新MQ程序时不影响业务系统。封装中需要兼容的两点:

  • 功能较全,适用性强。满足多种格式的数据收发,同时兼容事务性队列和非事务性队列。

  • 配置简单。

  • 同时支持多个队列的收发处理。

     

几个核心类

  • MQHandler:抽象类,实现消息发送和备份、消息接收的主流程,抽象方法Send和Receive,这两个方法在下一层的实现类中实现具体功能。为什么要有这个抽象类?这里主要考虑项目中将来可能出现别的类型队列,但对【发送-备份-接收】的主流程来说不管什么类型的队列都不变,那么这部分功能对不同队列来说是相同的代码,因此在MQHandler实现,因队列类型不同的部分代码分别在下一层中实现。

   public abstract class MQHandler
    {
        protected MqCfgEntity cfg = null;
        //是否正在处理消息
        private bool IsProcessingMessage = false;

        public MQHandler(MqCfgEntity cfg)
        {
            this.cfg = cfg;
        }

        public void Start()
        {
            while (true)
            {
                try
                {
                    if (IsProcessingMessage) return;
                    IsProcessingMessage = true;//正在处理消息
                    if ("Send".Equals(cfg.ProcessType))
                    {
                        Send();
                    }
                    else if ("Receive".Equals(cfg.ProcessType))
                    {
                        Receive();
                    }
                    IsProcessingMessage = false; //消息处理完成
                }
                catch (Exception ex)
                {
                    Log4Net.Error($"{ex.Message},{ex.StackTrace}");
                    IsProcessingMessage = false; //消息处理完成
                }
                Thread.Sleep(cfg.SplitSeconds * 1000);
            }
        }

        /// <summary>
        /// 备份已发送的文件,file为待备份完整文件名
        /// </summary>
        protected void BackSendFile(string file)
        {
            if (File.Exists(file) && cfg.BackPath.IsNotNullOrEmpty())
            {
                FileInfo info = new FileInfo(file);
                string backPath = cfg.BackPath;
                if (cfg.BackFormat.IsNotNullOrEmpty())
                {
                    backPath = $"{backPath}/{DateTime.Now.ToString(cfg.BackFormat)}";
                    if (!Directory.Exists(backPath))
                    {
                        Directory.CreateDirectory(backPath);
                    }
                }
                string backName = $"{backPath}/{info.Name}";
                if (File.Exists(backName))
                {
                    backName = $"{backPath}/{Guid.NewGuid().ToString()}_{info.Name}";
                }
                File.Move(file, backName);
            }
        }

        private void Send()
        {
            var names = Directory.GetFiles(cfg.FilePath, cfg.FileFilter);
            if (names != null && names.Count() > 0)
            {
                var files = names.ToList().Select(f => f.ForMatPath());
                foreach (var file in files)
                {
                    if (file.TryOpenFile() && Send(file))
                    {
                        Log4Net.Info($"{file}已发送");
                        BackSendFile(file);
                        Log4Net.Info($"{file}已备份");
                    }
                }
            }
        }

        public abstract bool Send(string file);

        public abstract void Receive();
    }
View Code
  • MSMQHandler,继承自MQHandler,实现Send和Receive方法。这里面的XmlMessageFormatter参数用来指定消息类型。如文件byte[],文本string,xml对象XmlDocument。

   public class MSMQHandler : MQHandler
    {
        private XmlMessageFormatter formatter = null;
        //把xml当作txt读取在msmq中传输时,使用utf8编码,Unicode可能会造成部分报文数据紊乱
        private Encoding encoding = Encoding.UTF8;

        public MSMQHandler(MqCfgEntity cfg) : base(cfg)
        {
            Type type = GetMsgType(cfg.MessageType);
            formatter = new XmlMessageFormatter(new Type[] { type });
        }

        public override void Receive()
        {
            MessageQueue queue = null;
            try
            {
                queue = new MessageQueue(cfg.Queue);
                int num = queue.GetAllMessages().Length;
                for (int i = 0; i < num; i++)
                {
                    ReceiveMessage(queue);
                }
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (queue != null) queue.Dispose();
            }
        }

        private void ReceiveMessage(MessageQueue queue)
        {
            System.Messaging.Message message = null;
            try
            {
                message = queue.Receive();
                message.Formatter = formatter;
                string toFile = $"{cfg.FilePath}/{message.Label}";
                if ("file".Equals(cfg.MessageType))
                {
                    SaveMessageAsBinaryFile(message, toFile);
                }
                else if ("xml".Equals(cfg.MessageType))
                {
                    var doc = (XmlDocument)message.Body;
                    doc.Save(toFile);
                }
                else if ("txt".Equals(cfg.MessageType))
                {
                    var txt = (string)message.Body;
                    SaveMessageAsTxt(message, toFile);
                }
                Log4Net.Info($"收到消息,已保存,{toFile}");
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (message != null) message.Dispose();
            }
        }

        private void SaveMessageAsTxt(Message message, string toFile)
        {
            FileStream fs = null;
            try
            {
                fs = new FileStream(toFile, FileMode.Create);
                string content = (string)message.Body;
                var bts = encoding.GetBytes(content);
                fs.Write(bts, 0, bts.Length);
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (fs != null) fs.Dispose();
            }
        }

        private void SaveMessageAsBinaryFile(Message message, string toFile)
        {
            FileStream fs = null;
            try
            {
                fs = new FileStream(toFile, FileMode.Create);
                var bts = (byte[])message.Body;
                fs.Write(bts, 0, bts.Length);
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (fs != null) fs.Dispose();
            }
        }

        public override bool Send(string file)
        {
            bool success = true;
            FileInfo fileInfo = new FileInfo(file);
            MessageQueue myQueue = null;
            try
            {
                myQueue = new MessageQueue(cfg.Queue);
                object body = null;
                if ("file".Equals(cfg.MessageType))
                {
                    FileStream fs = null;
                    try
                    {
                        fs = new FileStream(file, FileMode.Open);
                        byte[] bts = new byte[fs.Length];
                        fs.Read(bts, 0, bts.Length);
                        body = bts;
                    }
                    catch (Exception ex)
                    {
                        Log4Net.Error($"{ex.Message},{ex.StackTrace}");
                    }
                    finally
                    {
                        if (fs != null) fs.Dispose();
                    }
                }
                else if ("xml".Equals(cfg.MessageType))
                {
                    XmlDocument doc = new XmlDocument();
                    doc.Load(file);
                    body = doc;
                }
                else if ("txt".Equals(cfg.MessageType))
                {
                    FileStream fs = null;
                    try
                    {
                        fs = new FileStream(file, FileMode.Open);
                        byte[] bts = new byte[fs.Length];
                        fs.Read(bts, 0, bts.Length);
                        string content = encoding.GetString(bts);
                        body = content;
                    }
                    catch (Exception ex)
                    {
                        Log4Net.Error($"{ex.Message},{ex.StackTrace}");
                    }
                    finally
                    {
                        if (fs != null) fs.Dispose();
                    }
                }
                Push(fileInfo.Name, myQueue, body);
            }
            catch (Exception ex)
            {
                success = false;
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (myQueue != null) myQueue.Dispose();
            }
            return success;
        }

        //往队列上推送消息
        private void Push(string fileName, MessageQueue myQueue, object body)
        {
            System.Messaging.Message message = null;
            try
            {
                message = new System.Messaging.Message(body);
                message.Formatter = formatter;
                message.Label = fileName;
                if (cfg.IsTransQueue)
                {
                    using (MessageQueueTransaction trans = new MessageQueueTransaction())
                    {
                        trans.Begin();
                        myQueue.Send(message, trans);
                        trans.Commit();
                    }
                }
                else
                {
                    myQueue.Send(message);
                }
            }
            catch (Exception ex)
            {
                Log4Net.Error($"{ex.Message},{ex.StackTrace}");
            }
            finally
            {
                if (message != null) message.Dispose();
            }
        }

        /// <summary>
        /// 根据配置文件的类型,返回MQ队列上的消息类型
        /// </summary>
        private Type GetMsgType(string code)
        {
            Type type = null;
            switch (code)
            {
                case "file": type = typeof(byte[]); break;
                case "txt": type = typeof(string); break;
                case "xml": type = typeof(XmlDocument); break;
            }
            return type;
        }
    }
View Code

 

配置文件说明

  • mq.xml,在exe同级目录中,根节点为Config,其中可以包含多个Msmq节点,一个Msmq节点对应一个接收或发送任务。

  • Msmq节点字段说明:

    • ProcessType:Send或Receive,表示用于发送或接收消息。

    • Queue:队列名称。

    • FilePath:待发送的文件所在目录,或接收到的文件的存放目录。

    • FileFilter:Send时才配置,表示待发送目录中哪些后缀格式的文件需要处理,如*.txt,*.xml,*.jpg,*.*。

    • SplitSeconds:每一轮任务处理完成后暂停多少秒再进入下一个轮循。

    • BackPath:Send时才配置,消息发送以后文件备份到哪个目录。

    • BackFormat:跟BackPath配合使用,BackPath是备份目录,BackPath表示备份文件在BackPath下按小时/天/月/年来分文件夹备份。可以为yyyyMM、yyyyMMdd等。

    • MessageType:消息类型,可以为file、xml、txt,表示消息以哪种类型(对应XmlMessageFormatter中的文件byte[]、文本string、xml对象XmlDocument)发送。

    • IsTransQueue:true或false,表示队列是否为事务性队列。

其它说明

  • 程序运行环境:.net framework 4.5+

  • 程序启动:直接运行MsmqClient.exe,后台进程,无前台界面。

  • 完整项目代码:关注以下公众号,后台回复"msmq"获取

标签:fs,MSMQ,cfg,收发,ex,file,手写,null,message
From: https://www.cnblogs.com/cy2011/p/18472959

相关文章

  • QT学习第一战串口调试助手(3)实现收发数据以及显示
    前情概述在之前的文章中我们以及完成了串口调试助手页面的制作,同时在打开串口按键的槽函数中实现串口的打开功能本章节将注重于实现在串口打开后数据的收发问题以及一系列优化本章流程准备工作 1.在头文件中定义以下变量privateslots:voidon_btnCloseorOpenSer......
  • 【数据结构】自己动手写一个C++链表功能
    链表数据结构在操作数据时具有更高的性能,但同时因为其结构的原因会造成时间复杂度为O(N),因此理解链表结构的底层实现能够让我们在开发中对程序的性能进行进一步优化。如果你不知道什么是链表或者时间复杂度,可以参考我另外两篇文章:【数据结构】数组、链表、堆栈、队列到......
  • Exchange 2016与国内版O365混合部署(3):安装Exchange2016并配置邮件的外网收发
    Exchange2016与国内版O365混合部署(3):安装Exchange2016并配置邮件的外网收发Exchange2016安装和内网邮件收发测试:Exchange2016的安装这块这里就不做详细介绍了,网上也有很多教程,并不复杂。安装完成后登录https://localhost/ecp管理员中心页面查看:(略)登录https://localhost/owa:......
  • 手写持向量机(SVM)实现
    下面是一个简单的支持向量机(SVM)实现,用于解决线性可分问题。这个实现不使用任何机器学习库,只使用NumPy进行矩阵运算。请注意,这个实现主要用于教学目的,实际应用中推荐使用成熟的库,如scikit-learn。importnumpyasnpclassSVM:def__init__(self,learning_rate=0.001,l......
  • Chromium 中HTML5 WebSocket收发消息分析c++(一)
    一、WebSocket前端接口定义:WebSocket 对象提供了用于创建和管理 WebSocket 连接,以及可以通过该连接发送和接收数据的API。使用 WebSocket() 构造函数来构造一个 WebSocket。构造函数WebSocket(url[,protocols])返回一个 WebSocket 对象。常量ConstantValueWeb......
  • Chromium 中HTML5 WebSocket收发消息分析c++(二)
    看下websocket调用过程:基本定义参考上一篇:Chromium中HTML5WebSocket收发消息分析c++(一)-CSDN博客一、前端测试用例 参考:HTML5WebSocket|菜鸟教程(runoob.com) websocket.html文件如下:<!DOCTYPEHTML><html><head><metacharset="utf-8"><title>Web......
  • 运维技巧(4):管理邮箱收发附件限制(精华)
    运维技巧(4):管理邮箱收发附件限制(精华)进行收发邮件大小的限制是很有必要的,因为邮件服务器不能当作文件服务器来使用,不符合最佳实践的要求,也不合理。太大的附件可以通过网盘或者大附件共享的方式进行发送。exchange使用的是ESE的数据库,在不进行脱机整理的情况下,很难自动减小空间,如......
  • C#使用 MailKit 收发邮件
    目录获取QQ邮箱授权码安装MailKit配置邮件服务器信息实现邮件收发方法测试邮件收发参考文章获取QQ邮箱授权码打开QQ邮箱,进入设置->账号页面:在POP3/IMAP/SMTP中开启SMTP服务,然后点击授权码复制授权码:QQ邮箱服务器的参数如下,详细内容参考SMTP/IMAP服务:接收邮件服务器......
  • 基于MATLAB的BP神经网络手写数字识别系统
    介绍*:本课题为基于MATLAB的BP神经网络手写数字识别系统。带有GUI人机交互式界面。读入测试图片,通过截取某个数字,进行预处理,经过bp网络训练,得出识别的结果。可经过二次改造成识别中文汉字,英文字符等课题。运行效果示例图:......
  • 【AD2426/7/8_A2B收发器技术参考手册】第四章 事件控制
    【AD2426/7/8_A2B收发器技术参考手册】第四章事件控制四,A2B事件控制4.1错误管理4.1.1下行数据错误检测4.1.2上行数据错误检测4.1.3数据时隙错误纠正4.1.4控制和响应错误处理4.1.5错误信号4.1.6A2B通信和位错误4.1.7从属节点中断处理4.1.8错误管理寄存器4.1.......