多任务分块下载器 MultiTaskDownloader
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// 多任务分块下载器
/// </summary>
class MultiTaskDownloader
{
/// <summary>
/// 多线程数量
/// </summary>
public int NumberOfParts { get; private set; }
/// <summary>
/// 临时文件夹
/// </summary>
public string TempFileDir { get; set; } = Environment.GetEnvironmentVariable("temp");
/// <summary>
/// 临时文件名称
/// </summary>
public string TempFileName { get; set; }
/// <summary>
/// 文件下载路径
/// </summary>
public string TargetFilePath { get; set; }
/// <summary>
/// 下载地址
/// </summary>
public string Url { get; set; }
/// <summary>
/// 下载块
/// </summary>
public List<PartialDownloader> PartialDownloaderList { get; }
/// <summary>
/// 下载文件大小
/// </summary>
public long ContentLength { get; }
private CancellationToken _taskToken;
public MultiTaskDownloader(string url, int numberOfParts, string savePath)
{
Url = url;
if (WSCommFunc.IsRangeAllowed(url))
{
NumberOfParts = numberOfParts;
}
else
{
NumberOfParts = 1;
}
TargetFilePath = savePath;
PartialDownloaderList = new List<PartialDownloader>();
ContentLength = WSCommFunc.GetContentLength(url);
}
public async Task StartAsync()
{
_taskToken = CancellationToken.None;
await StartAsync(_taskToken);
}
public async Task StartAsync(CancellationToken token)
{
_taskToken = token;
TempFileName = GuidHelper.Increment().ToString() + "_tmp";
var progress = new Progress<DownloaderProgressArg>(DownloaderProgressHandler);
PartialDownloaderList.Clear();
for (int i = 0; i < NumberOfParts; i++)
{
var temp = CreatePartialDownloader(i, progress);
PartialDownloaderList.Add(temp);
}
var listTasks = PartialDownloaderList.Select(s => s.DownladAsync(token)).ToList();
var timer = new HiPerfTimer();
timer.Start();
foreach (var item in WSCommFunc.Interleaved(listTasks))
{
var result = await item.ConfigureAwait(false);
if (item.IsCompleted)
{
WSCommFunc.PrintThreadId($"{result.Index} Completed!");
}
else
{
WSCommFunc.PrintThreadId(item.Status.ToString());
}
}
timer.Stop();
var speed = ContentLength / 1024 / timer.Duration;
Console.WriteLine($"{Url}: {timer.Duration} ms, Speed:{speed,5:f2} MB/S");
}
public void Pause()
{
foreach (var item in PartialDownloaderList)
{
item.Stopped = true;
}
WSCommFunc.PrintThreadId("Pause......");
}
public async Task ResumeAsync()
{
WSCommFunc.PrintThreadId("Resume......");
var progress = new Progress<DownloaderProgressArg>(DownloaderProgressHandler);
for (int i = 0; i < NumberOfParts; i++)
{
var temp = CreatePartialDownloader(i, progress);
if (!temp.Completed)
{
PartialDownloaderList[i] = temp;
}
}
var listTasks = PartialDownloaderList.Select(s => s.DownladAsync(_taskToken)).ToList();
var timer = new HiPerfTimer();
timer.Start();
foreach (var item in WSCommFunc.Interleaved(listTasks))
{
var result = await item.ConfigureAwait(false);
if (item.IsCompleted)
{
WSCommFunc.PrintThreadId($"{result.Index} Completed!");
}
else
{
WSCommFunc.PrintThreadId(item.Status.ToString());
}
}
timer.Stop();
var speed = ContentLength / 1024 / timer.Duration;
Console.WriteLine($"{Url}: {timer.Duration} ms, Speed:{speed,5:f2} MB/S");
}
private async void DownloaderProgressHandler(DownloaderProgressArg arg)
{
WSCommFunc.PrintThreadId($"{arg.Index,-2} {arg.Percent,6:P} {arg.Speed,10:f2}KB/s");
if (arg.Percent == 1.0 && PartialDownloaderList.Count(x => x.Completed) == PartialDownloaderList.Count)
{
WSCommFunc.PrintThreadId("Merge......");
var timer = new HiPerfTimer();
timer.Start();
await MergePartsAsync().ConfigureAwait(false);
timer.Stop();
WSCommFunc.PrintThreadId($"Merge...Completed! Size:{ContentLength} B, Time:{timer.Duration} ms ");
}
}
/// <summary>
/// 按顺序合并分块文件
/// </summary>
/// <returns></returns>
private async Task MergePartsAsync()
{
var orderList = PartialDownloaderList.OrderBy(x => x.PartialOrder);
var dir = new FileInfo(TargetFilePath).DirectoryName;
Directory.CreateDirectory(dir);
using (var fs = new FileStream(TargetFilePath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite))
{
//long totalBytes = 0;
foreach (var item in orderList)
{
using (var reader = File.OpenRead(item.FullPath))
{
byte[] buffer = new byte[81920];
int readLen;
while ((readLen = await reader.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
await fs.WriteAsync(buffer, 0, readLen);
//totalBytes += readLen;
//var percent = totalBytes * 1d / ContentLength;
//Console.WriteLine($"Merge...{percent,5:P}");
}
}
try
{
File.Delete(item.FullPath);
}
catch { }
}
}
}
private PartialDownloader CreatePartialDownloader(int order, IProgress<DownloaderProgressArg> progress)
{
int division = (int)ContentLength / NumberOfParts;
int remain = (int)ContentLength % NumberOfParts;
int start = division * order;
int end = start + division - 1;
// 最后一块内容长度较大
end += (order == NumberOfParts - 1 ? remain : 0);
var fileName = Path.Combine(TempFileDir, TempFileName) + order.ToString();
var offset = 0;
// 断点续传
if (File.Exists(fileName))
{
offset = (int)new FileInfo(fileName).Length;
}
WSCommFunc.PrintThreadId($"{order} {fileName}");
return new PartialDownloader(order, Url, fileName, start, end, offset, progress);
}
}
部分下载 PartialDownloader
/// <summary>
/// 部分下载器
/// </summary>
class PartialDownloader
{
/// <summary>
/// 下载已完成
/// </summary>
public bool Completed { get; private set; }
/// <summary>
/// 部分索引
/// </summary>
public int PartialOrder { get; set; }
/// <summary>
/// 文件路径
/// </summary>
public string FullPath { get; set; }
/// <summary>
/// 下载已停止
/// </summary>
public bool Stopped { get; set; }
/// <summary>
/// 文件链接
/// </summary>
private readonly string _url;
/// <summary>
/// 源文件的起始位置
/// </summary>
private readonly int _from;
/// <summary>
/// 源文件的结束位置
/// </summary>
private readonly int _to;
/// <summary>
/// 下载内容的字节长度
/// </summary>
private readonly long _contentLength;
/// <summary>
/// 已下载内容的字节长度
/// </summary>
private int _totalLength;
/// <summary>
/// 下载速度的记录
/// </summary>
private readonly int[] _lastSpeed;
/// <summary>
/// 记录的索引
/// </summary>
private int _index;
private readonly IProgress<DownloaderProgressArg> _progress;
/// <summary>
/// 文件下载器
/// </summary>
/// <param name="order">文件部分序号</param>
/// <param name="url">文件链接</param>
/// <param name="fileName">文件名称</param>
/// <param name="start">需要读取的起始位置</param>
/// <param name="end">需要读取的结束位置</param>
/// <param name="offset">字节偏移量</param>
/// <param name="progress">进度条</param>
public PartialDownloader(int order, string url, string fileName, int start, int end, int offset, IProgress<DownloaderProgressArg> progress)
{
PartialOrder = order;
FullPath = fileName;
_url = url;
_contentLength = end - start + 1;
_from = start + offset;
_to = end;
_totalLength = offset;
_progress = progress;
_lastSpeed = new int[10];
if (start >= end || start < 0 || end < 0)
{
Completed = true;
Stopped = true;
}
}
public async Task<DownloaderProgressArg> DownladAsync()
{
return await DownladAsync(CancellationToken.None);
}
// 考虑校验 分块已下载内容的合法性,不合法或已过期,需要重新下载
// 压缩文件的处理 GZipStream
// From、To 支持long类型
public async Task<DownloaderProgressArg> DownladAsync(CancellationToken token)
{
var rslt = new DownloaderProgressArg() { Index = PartialOrder };
if (token.IsCancellationRequested || Stopped)
{
WSCommFunc.PrintThreadId($"{PartialOrder} completed......");
return rslt;
}
WSCommFunc.PrintThreadId($"{PartialOrder} starting......");
Completed = false;
try
{
//using (var fs = new FileStream(FullPath, FileMode.Append, FileAccess.Write, FileShare.ReadWrite))
using (var fs = new FileStream(FullPath, FileMode.Append, FileAccess.Write))
{
var req = (HttpWebRequest)WebRequest.Create(_url);
req.UserAgent = WSCommFunc.UserAgent;
req.AllowAutoRedirect = true;
req.MaximumAutomaticRedirections = 5;
req.ServicePoint.ConnectionLimit = 4;
req.ServicePoint.Expect100Continue = true;
// 1.1支持部分下载
req.ProtocolVersion = HttpVersion.Version11;
req.AddRange(_from, _to);
using (token.Register(() => req.Abort(), false))
{
var timer = new HiPerfTimer();
using (var response = (HttpWebResponse)await req.GetResponseAsync())
{
using (var stream = response.GetResponseStream())
{
int readLength;
byte[] buffer = new byte[81920];
timer.Start();
while ((readLength = await stream.ReadAsync(buffer, 0, buffer.Length)) > 0)
{
timer.Stop();
_lastSpeed[_index] = (int)(readLength / timer.Duration);
if (_totalLength + readLength > _contentLength)
{
readLength = (int)(_contentLength - _totalLength);
}
await fs.WriteAsync(buffer, 0, readLength);
_totalLength += readLength;
_index = (_index >= 9) ? 0 : _index++;
_progress.Report(new DownloaderProgressArg()
{
Index = PartialOrder,
Percent = _totalLength * 1d / _contentLength,
Speed = _lastSpeed.Average(),
});
if (Stopped)
{
Completed = false;
await fs.FlushAsync();
fs.Close();
req.Abort();
}
if (_totalLength == _contentLength)
{
Completed = true;
fs.Close();
break;
}
timer.Start();
}
}
}
}
req.Abort();
}
}
catch (WebException ex)
{
Console.WriteLine(ex.Message);
throw ex;
}
catch (Exception ex)
{
if (token.IsCancellationRequested)
{
File.Delete(FullPath);
}
Console.WriteLine(ex.Message);
throw ex;
}
return rslt;
}
}
下载进度参数类型 DownloaderProgressArg
/// <summary>
/// 下载进度参数类型
/// </summary>
class DownloaderProgressArg
{
/// <summary>
/// 索引(多任务下载时有用)
/// </summary>
public int Index { get; set; }
/// <summary>
/// 下载进度(0~1)
/// </summary>
public double Percent { get; set; }
/// <summary>
/// 下载速度(KB/s)
/// </summary>
public double Speed { get; set; }
}
标签:断点续传,WSCommFunc,分块,int,timer,多任务,var,new,public
From: https://www.cnblogs.com/wesson2019-blog/p/16737038.html