首页 > 编程语言 >C#使用MJpeg实现视频流发送与显示

C#使用MJpeg实现视频流发送与显示

时间:2024-06-22 17:21:04浏览次数:25  
标签:MJpeg C# void 视频流 System private using new public

1、发送视频流:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Drawing;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace Mjpeg
{
    public class MJpegStreamingServer
    {
        private static string _contentLengthString = "__PayloadHeaderContentLength__";
        private AsyncTcpServer _server;
        private ConcurrentDictionary<string, TcpClient> _clients;

        public MJpegStreamingServer(int listenPort)
          : this(listenPort, "--dennisgao")
        {
        }

        public MJpegStreamingServer(int listenPort, string boundary)
        {
            Port = listenPort;
            Boundary = boundary;

            _server = new AsyncTcpServer(Port);
            _server.Encoding = Encoding.ASCII;
            _clients = new ConcurrentDictionary<string, TcpClient>();
        }

        /// <summary>
        /// 监听的端口
        /// </summary>
        public int Port { get; private set; }

        /// <summary>
        /// 分隔符
        /// </summary>
        public string Boundary { get; private set; }

        /// <summary>
        /// 流头部
        /// </summary>
        public string StreamHeader
        {
            get
            {
                return "HTTP/1.1 200 OK" +
                    "\r\n" +
                         "Content-Type: multipart/x-mixed-replace; boundary=" + this.Boundary +
                          "\r\n";
            }
        }

        /// <summary>
        /// 图片头部
        /// </summary>
        public string PayloadHeader
        {
            get
            {
                return "\r\n" +
                           this.Boundary +
                               "\r\n" +
                             "Content-Type: image/jpeg" +
                             "\r\n" +
                             "Content-Length: " + _contentLengthString +
                             "\r\n\r\n";
            }
        }

        public void Start()
        {
            _server.Start(10);
            _server.ClientConnected += new EventHandler<TcpClientConnectedEventArgs>(OnClientConnected);
            _server.ClientDisconnected += new EventHandler<TcpClientDisconnectedEventArgs>(OnClientDisconnected);
        }

        public void Stop()
        {
            _server.Stop();
            _server.ClientConnected -= new EventHandler<TcpClientConnectedEventArgs>(OnClientConnected);
            _server.ClientDisconnected -= new EventHandler<TcpClientDisconnectedEventArgs>(OnClientDisconnected);
        }

        private void OnClientConnected(object sender, TcpClientConnectedEventArgs e)
        {
            _clients.AddOrUpdate(e.TcpClient.Client.RemoteEndPoint.ToString(), e.TcpClient, (n, o) => { return e.TcpClient; });
        }

        private void OnClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
        {
            TcpClient clientToBeThrowAway;
            _clients.TryRemove(e.TcpClient.Client.RemoteEndPoint.ToString(), out clientToBeThrowAway);
        }

        public void Write(Bitmap image)
        {
            if (_server.IsRunning)
            {
                byte[] payload = BytesOf(image);

                WriteStreamHeader();
                WritePayload(payload);
            }
        }

        private void WriteStreamHeader()
        {
            if (_clients.Count > 0)
            {
                foreach (var item in _clients)
                {
                    //Logger.Debug(string.Format(CultureInfo.InvariantCulture,  "Writing stream header, {0}, {1}{2}", item.Key, Environment.NewLine, StreamHeader));

                    _server.Send(item.Value, StreamHeader);

                    TcpClient clientToBeThrowAway;
                    _clients.TryRemove(item.Key, out clientToBeThrowAway);
                }
            }
        }

        private void WritePayload(byte[] payload)
        {
            string payloadHeader = this.PayloadHeader.Replace(_contentLengthString, payload.Length.ToString());
            string payloadTail = "\r\n";

            //Logger.Debug(string.Format(CultureInfo.InvariantCulture, "Writing payload header, {0}{1}", Environment.NewLine, payloadHeader));

            byte[] payloadHeaderBytes = _server.Encoding.GetBytes(payloadHeader);
            byte[] payloadTailBytes = _server.Encoding.GetBytes(payloadTail);
            byte[] packet = new byte[payloadHeaderBytes.Length + payload.Length + payloadTailBytes.Length];
            Buffer.BlockCopy(payloadHeaderBytes, 0, packet, 0, payloadHeaderBytes.Length);
            Buffer.BlockCopy(payload, 0, packet, payloadHeaderBytes.Length, payload.Length);
            Buffer.BlockCopy(payloadTailBytes, 0, packet, payloadHeaderBytes.Length + payload.Length, payloadTailBytes.Length);

            _server.SendAll(packet);
        }

        private byte[] BytesOf(Bitmap image)
        {
            MemoryStream ms = new MemoryStream();
            image.Save(ms, System.Drawing.Imaging.ImageFormat.Jpeg);

            byte[] payload = ms.ToArray();

            return payload;
        }
    }
}

用到的AsyncTcpServer类:

using System.Collections.Generic;
using System.Net.Sockets;
using System.Net;
using System.Reflection.Emit;
using System.Text;
using System;

namespace Mjpeg
{
    /// <summary>
    /// 异步TCP服务器
    /// </summary>
    public class AsyncTcpServer : IDisposable
    {
        #region Fields

        private TcpListener listener;
        private List<TcpClientState> clients;
        private bool disposed = false;

        #endregion

        #region Ctors

        /// <summary>
        /// 异步TCP服务器
        /// </summary>
        /// <param name="listenPort">监听的端口</param>
        public AsyncTcpServer(int listenPort)
          : this(IPAddress.Any, listenPort)
        {
        }

        /// <summary>
        /// 异步TCP服务器
        /// </summary>
        /// <param name="localEP">监听的终结点</param>
        public AsyncTcpServer(IPEndPoint localEP)
          : this(localEP.Address, localEP.Port)
        {
        }

        /// <summary>
        /// 异步TCP服务器
        /// </summary>
        /// <param name="localIPAddress">监听的IP地址</param>
        /// <param name="listenPort">监听的端口</param>
        public AsyncTcpServer(IPAddress localIPAddress, int listenPort)
        {
            Address = localIPAddress;
            Port = listenPort;
            this.Encoding = Encoding.Default;

            clients = new List<TcpClientState>();

            listener = new TcpListener(Address, Port);
            listener.AllowNatTraversal(true);
        }

        #endregion

        #region Properties

        /// <summary>
        /// 服务器是否正在运行
        /// </summary>
        public bool IsRunning { get; private set; }
        /// <summary>
        /// 监听的IP地址
        /// </summary>
        public IPAddress Address { get; private set; }
        /// <summary>
        /// 监听的端口
        /// </summary>
        public int Port { get; private set; }
        /// <summary>
        /// 通信使用的编码
        /// </summary>
        public Encoding Encoding { get; set; }

        #endregion

        #region Server

        /// <summary>
        /// 启动服务器
        /// </summary>
        /// <returns>异步TCP服务器</returns>
        public AsyncTcpServer Start()
        {
            if (!IsRunning)
            {
                IsRunning = true;
                listener.Start();
                listener.BeginAcceptTcpClient(
                  new AsyncCallback(HandleTcpClientAccepted), listener);
            }
            return this;
        }

        /// <summary>
        /// 启动服务器
        /// </summary>
        /// <param name="backlog">
        /// 服务器所允许的挂起连接序列的最大长度
        /// </param>
        /// <returns>异步TCP服务器</returns>
        public AsyncTcpServer Start(int backlog)
        {
            if (!IsRunning)
            {
                IsRunning = true;
                listener.Start(backlog);
                listener.BeginAcceptTcpClient(
                  new AsyncCallback(HandleTcpClientAccepted), listener);
            }
            return this;
        }

        /// <summary>
        /// 停止服务器
        /// </summary>
        /// <returns>异步TCP服务器</returns>
        public AsyncTcpServer Stop()
        {
            if (IsRunning)
            {
                IsRunning = false;
                listener.Stop();

                lock (this.clients)
                {
                    for (int i = 0; i < this.clients.Count; i++)
                    {
                        this.clients[i].TcpClient.Client.Disconnect(false);
                    }
                    this.clients.Clear();
                }

            }
            return this;
        }

        #endregion

        #region Receive

        private void HandleTcpClientAccepted(IAsyncResult ar)
        {
            if (IsRunning)
            {
                TcpListener tcpListener = (TcpListener)ar.AsyncState;

                TcpClient tcpClient = tcpListener.EndAcceptTcpClient(ar);
                byte[] buffer = new byte[tcpClient.ReceiveBufferSize];

                TcpClientState internalClient
                  = new TcpClientState(tcpClient, buffer);
                lock (this.clients)
                {
                    this.clients.Add(internalClient);
                    RaiseClientConnected(tcpClient);
                }

                NetworkStream networkStream = internalClient.NetworkStream;
                networkStream.BeginRead(
                  internalClient.Buffer,
                  0,
                  internalClient.Buffer.Length,
                  HandleDatagramReceived,
                  internalClient);

                tcpListener.BeginAcceptTcpClient(
                  new AsyncCallback(HandleTcpClientAccepted), ar.AsyncState);
            }
        }

        private void HandleDatagramReceived(IAsyncResult ar)
        {
            if (IsRunning)
            {
                TcpClientState internalClient = (TcpClientState)ar.AsyncState;
                NetworkStream networkStream = internalClient.NetworkStream;

                int numberOfReadBytes = 0;
                try
                {
                    numberOfReadBytes = networkStream.EndRead(ar);
                }
                catch
                {
                    numberOfReadBytes = 0;
                }

                if (numberOfReadBytes == 0)
                {
                    // connection has been closed
                    lock (this.clients)
                    {
                        this.clients.Remove(internalClient);
                        RaiseClientDisconnected(internalClient.TcpClient);
                        return;
                    }
                }

                // received byte and trigger event notification
                byte[] receivedBytes = new byte[numberOfReadBytes];
                Buffer.BlockCopy(
                  internalClient.Buffer, 0,
                  receivedBytes, 0, numberOfReadBytes);
                RaiseDatagramReceived(internalClient.TcpClient, receivedBytes);
                RaisePlaintextReceived(internalClient.TcpClient, receivedBytes);

                // continue listening for tcp datagram packets
                networkStream.BeginRead(
                  internalClient.Buffer,
                  0,
                  internalClient.Buffer.Length,
                  HandleDatagramReceived,
                  internalClient);
            }
        }

        #endregion

        #region Events

        /// <summary>
        /// 接收到数据报文事件
        /// </summary>
        public event EventHandler<TcpDatagramReceivedEventArgs<byte[]>> DatagramReceived;
        /// <summary>
        /// 接收到数据报文明文事件
        /// </summary>
        public event EventHandler<TcpDatagramReceivedEventArgs<string>> PlaintextReceived;

        private void RaiseDatagramReceived(TcpClient sender, byte[] datagram)
        {
            if (DatagramReceived != null)
            {
                DatagramReceived(this, new TcpDatagramReceivedEventArgs<byte[]>(sender, datagram));
            }
        }

        private void RaisePlaintextReceived(TcpClient sender, byte[] datagram)
        {
            if (PlaintextReceived != null)
            {
                PlaintextReceived(this, new TcpDatagramReceivedEventArgs<string>(
                  sender, this.Encoding.GetString(datagram, 0, datagram.Length)));
            }
        }

        /// <summary>
        /// 与客户端的连接已建立事件
        /// </summary>
        public event EventHandler<TcpClientConnectedEventArgs> ClientConnected;
        /// <summary>
        /// 与客户端的连接已断开事件
        /// </summary>
        public event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;

        private void RaiseClientConnected(TcpClient tcpClient)
        {
            if (ClientConnected != null)
            {
                ClientConnected(this, new TcpClientConnectedEventArgs(tcpClient));
            }
        }

        private void RaiseClientDisconnected(TcpClient tcpClient)
        {
            if (ClientDisconnected != null)
            {
                ClientDisconnected(this, new TcpClientDisconnectedEventArgs(tcpClient));
            }
        }

        #endregion

        #region Send

        /// <summary>
        /// 发送报文至指定的客户端
        /// </summary>
        /// <param name="tcpClient">客户端</param>
        /// <param name="datagram">报文</param>
        public void Send(TcpClient tcpClient, byte[] datagram)
        {
            if (!IsRunning)
                throw new InvalidProgramException("This TCP server has not been started.");

            if (tcpClient == null)
                throw new ArgumentNullException("tcpClient");

            if (datagram == null)
                throw new ArgumentNullException("datagram");

            tcpClient.GetStream().BeginWrite(
              datagram, 0, datagram.Length, HandleDatagramWritten, tcpClient);
        }

        private void HandleDatagramWritten(IAsyncResult ar)
        {
            ((TcpClient)ar.AsyncState).GetStream().EndWrite(ar);
        }

        /// <summary>
        /// 发送报文至指定的客户端
        /// </summary>
        /// <param name="tcpClient">客户端</param>
        /// <param name="datagram">报文</param>
        public void Send(TcpClient tcpClient, string datagram)
        {
            Send(tcpClient, this.Encoding.GetBytes(datagram));
        }

        /// <summary>
        /// 发送报文至所有客户端
        /// </summary>
        /// <param name="datagram">报文</param>
        public void SendAll(byte[] datagram)
        {
            if (!IsRunning)
                throw new InvalidProgramException("This TCP server has not been started.");

            for (int i = 0; i < this.clients.Count; i++)
            {
                Send(this.clients[i].TcpClient, datagram);
            }
        }

        /// <summary>
        /// 发送报文至所有客户端
        /// </summary>
        /// <param name="datagram">报文</param>
        public void SendAll(string datagram)
        {
            if (!IsRunning)
                throw new InvalidProgramException("This TCP server has not been started.");

            SendAll(this.Encoding.GetBytes(datagram));
        }

        #endregion

        #region IDisposable Members

        /// <summary>
        /// Performs application-defined tasks associated with freeing,
        /// releasing, or resetting unmanaged resources.
        /// </summary>
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        /// <summary>
        /// Releases unmanaged and - optionally - managed resources
        /// </summary>
        /// <param name="disposing"><c>true</c> to release
        /// both managed and unmanaged resources; <c>false</c>
        /// to release only unmanaged resources.</param>
        protected virtual void Dispose(bool disposing)
        {
            if (!this.disposed)
            {
                if (disposing)
                {
                    try
                    {
                        Stop();

                        if (listener != null)
                        {
                            listener = null;
                        }
                    }
                    catch (SocketException ex)
                    {
                        //ExceptionHandler.Handle(ex);
                    }
                }

                disposed = true;
            }
        }

        #endregion
    }
}

调用:实现截屏发送

 MJpegStreamingServer server = new MJpegStreamingServer(1022);
            server.Start();
            Task.Run(() =>
            {
                while (true)
                {
                    try
                    {
                        var bit = CaptureActiveScreen();
                        server.Write(bit);
                        Thread.Sleep(50);
                    }
                    catch { }
                };

            });

截屏代码:

/// <summary>
        /// 截屏
        /// </summary>
        /// <returns></returns>
        public Bitmap CaptureActiveScreen()
        {
            // 创建一个和当前屏幕一样大小的Bitmap
            Rectangle bounds = Screen.GetBounds(Point.Empty);
            Bitmap bmp = new Bitmap(bounds.Width, bounds.Height, PixelFormat.Format32bppArgb);

            // 创建一个画布
            Graphics g = Graphics.FromImage(bmp);

            // 截取整个屏幕
            g.CopyFromScreen(Point.Empty, Point.Empty, bounds.Size);

            // 释放画布资源
            g.Dispose();

            return bmp;
        }

2、显示Mjpeg流:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Mjpeg
{
    public class MjpegUtils : IDisposable
    {
        public bool IsExit = false;
        /// <summary>
        /// 播放MJPEG视频流
        /// </summary>
        /// <param name="mjpegStreamUrl">MJPEG视频流地址</param>
        /// <param name="accessImageHandler">读到图片后处理</param>
        /// <param name="sleep">每次读取文件睡眠时间,太长会导致卡顿,太短可能频繁矫正流</param>
        public MjpegUtils(string mjpegStreamUrl, Action<byte[]> accessImageHandler, int sleep = 50)
        {
            HttpWebRequest hwRequest = (System.Net.HttpWebRequest)WebRequest.Create(mjpegStreamUrl);
            hwRequest.Method = "GET";
            HttpWebResponse hwResponse = (HttpWebResponse)hwRequest.GetResponse();
            string boundary = hwRequest.Headers["Content-Type:"];
            //响应流中没有响应头信息,全是响应体内容
            Stream stream = hwResponse.GetResponseStream();
            string headerName = "Content-Length:";
            StringBuilder sb = new StringBuilder();
            int len = 1024;
            while (true)
            {
                while (true)
                {
                    char c = (char)stream.ReadByte();
                    //Console.Write(c);
                    if (c == '\n')
                    {
                        break;
                    }
                    sb.Append(c);
                }
                string line = sb.ToString();
                sb.Remove(0, sb.Length);
                int i = line.IndexOf(headerName);
                if (i != -1)
                {

                    int imageFileLength = Convert.ToInt32(line.Substring(i + headerName.Length).Trim());
                    stream.Read(new byte[2], 0, 2);
                    byte[] imageFileBytes = new byte[imageFileLength];
                    stream.Read(imageFileBytes, 0, imageFileBytes.Length);
                    //Console.WriteLine("文件头:" + imageFileBytes[0].ToString("X") + " " + imageFileBytes[1].ToString("X") + " " + imageFileBytes[2].ToString("X") + " " + imageFileBytes[3].ToString("X") + " " + imageFileBytes[4].ToString("X"));
                    //Console.WriteLine("文件尾:" + imageFileBytes[imageFileLength - 2].ToString("X") + " " + imageFileBytes[imageFileLength - 1].ToString("X"));
                    if (imageFileBytes[imageFileLength - 2].ToString("X") != "FF" && imageFileBytes[imageFileLength - 1].ToString("X") != "D9")
                    {
                        //Console.WriteLine("开始矫正...");
                        //修正
                        char l = '0';
                        while (true)
                        {
                            char c = (char)stream.ReadByte();
                            if (l == '-' && c == '-')
                            {
                                break;
                            }
                            l = c;
                        }
                    }
                    else
                    {
                        //读取图片成功!
                        accessImageHandler(imageFileBytes);
                    }
                    Thread.Sleep(sleep);
                }

            }
            stream.Close();
            hwResponse.Close();
            Console.Read();
        }

        public int StreamFindContentLength(Stream stream)
        {
            StringBuilder sb = new StringBuilder();
            sb.Append((char)stream.ReadByte());
            sb.Append((char)stream.ReadByte());
            sb.Append((char)stream.ReadByte());
            sb.Append((char)stream.ReadByte());
            Console.WriteLine("num:" + sb);
            int num = Convert.ToInt32(sb.ToString().Trim());
            //跳过\r\n
            char c = '0';
            do
            {
                c = (char)stream.ReadByte();
            } while (c == '\r' || c == '\n');
            return num;
        }

        public void Dispose()
        {
            IsExit = true;
        }



    }
}

调用:

    var thread = new Thread(new ThreadStart(delegate ()
             {
                 var mjpegUtils = new MjpegUtils("http://127.0.0.1:1022", delegate (byte[] bytes)
                   {
                       //Console.WriteLine(bytes.Length);
                       MemoryStream ms = new MemoryStream(bytes);
                       pictureBox1.Image = Image.FromStream(ms);
                       ms.Close();
                   }, 50);
             }));
            thread.IsBackground = true;
            thread.Start();

最终效果:

源码下载

 参考文章:

https://blog.csdn.net/weixin_33709364/article/details/86419401

https://www.jb51.net/article/206675.htm

https://www.cnblogs.com/gaochundong/archive/2013/04/14/csharp_async_tcp_server.html

https://www.cnblogs.com/gaochundong/archive/2013/04/14/csharp_async_tcp_client.html

 

标签:MJpeg,C#,void,视频流,System,private,using,new,public
From: https://www.cnblogs.com/mqxs/p/18262531

相关文章

  • ABC358
    ABC358E-AlphabetTiles一句话题意:给定\(K\)和\(C_{1\sim26}\),问共有多少个长度为\(1\simK\)的字符串,满足第\(i\)种英文字母的出现次数不大于\(C_i\),不小于\(0\).标签:动态规划,组合数,动态拆分记\(f[i][j]\)表示使用前\(i\)种字母,组成长度为\(j\)的字......
  • C#的无边框窗体项目模板 - 开源研究系列文章
          继续整理和编写代码及博文。      这次将笔者自己整理的C#的无边框窗体项目的基本模板进行总结,得出了基于C#的.netframework的Winform的4个项目模板,这些模板具有基本的功能,即已经初步将代码写了,直接在其基础上添加业务代码即可: 1、空项目;这个......
  • AnchorPane锚点布局
    JavaFX的AnchorPane是一种布局方式,允许你通过指定锚点来定位子节点。锚点是相对于父节点边缘的位置,你可以使用这些锚点来控制子节点的位置和大小。AnchorPane非常适合用来创建复杂的布局,其中组件的位置需要相对于其他组件或父容器的边缘进行定位。以下是AnchorPane的一些基......
  • C++学习笔记----重载运算符
    运算符重载运算符重载可以在通过成员函数或者全局函数进行重载,编译器提供了内置的运算符;我们可以通过定义对应的函数和参数进行重载,也可以使用编译器提供的名称`operator运算符()`进行重载;运算符重载本质是对内置的运算符函数进行重载:函数相同,参数不同;返回引用和地址需要思......
  • excel 转json 工具、json检测工具(来自个人学习,编写,自己使用工具)
    推荐一款自己编写的一款软件,excel转json工具(绿色、纯净、免费、共享);因为游戏需要将excel或其它表格转成json工具,以供cocoscreator使用,其它一些工具的比较:1.cocoscreator插件,也有免费的,但主要是效率低,如果excels文件多的时候,转换时间特别的长,无法接受。就是入口方便自......
  • Memcached分布式特性解析:高效缓存策略的关键
    在现代的互联网应用中,缓存是提高性能和扩展性的关键技术之一。Memcached作为一个高性能的分布式内存缓存系统,广泛用于减轻数据库负载、加快数据访问速度。本文将深入探讨Memcached的分布式特性,包括其工作原理、集群管理、数据一致性、故障恢复以及与其他分布式系统的集成等......
  • C语言-运算、输出函数
    学习目标:运算输入输出函数学习内容:位运算:复合运算符条件运算符:?:问号是三目运算符,形式为:<表达式1>?<表达式2>:<表达式3>如:x>y?x+18:y-20;解释:如果x>y,则运行x+18,反之,运行y-20。逗号运算符:运算规则:先计算左侧的表达式,在计算右侧的表达式,最后返回右侧表达式的值(......
  • 怎样利用 Clojure 的宏来创建自定义的控制结构,并且如何避免常见的错误?
    在Clojure中,宏是一种宏展开的机制,它可以用于创建自定义的控制结构。通过使用宏,你可以在编写代码时引入新的语法,从而使代码更具可读性和表达力。要创建一个宏,你需要使用defmacro宏,并将宏名称与一个参数列表和一个展开形式绑定。这个展开形式将在宏被调用时用于生成代码。......
  • 计算机系统基础实训五—CacheLab实验
    实验目的与要求1、让学生更好地应用程序性能的优化方法;2、让学生更好地理解存储器层次结构在程序运行过程中所起的重要作用;3、让学生更好地理解高速缓存对程序性能的影响;实验原理与内容本实验将帮助您了解缓存对C程序性能的影响。实验由两部分组成。在第一部分中,您将编写......
  • AUCell和AddModuleScore函数进行基因集评分
    AUCell和AddModuleScore分析是两种主流的用于单细胞RNA测序数据的基因集活性分析的方法。这些基因集可以来自文献、数据库或者根据具体研究问题进行自行定义。AUCell分析原理:1、AUCell分析可以将细胞中的所有基因按表达量进行排序,生成一个基因排名列表,表达量越高的基因排名......