在测试下面的代码时,线程0下载的文件内容正确,但后面的线程不能下载到正确的数据(我查看了下载文件的二进制数据)(也有可能是拼接的问题)。
注:Logger是一个日志类,Array是一个仿数组类(提供了数组下标访问)
public class ThreadDownload {
private Logger log = new("Download.log");
private string URL; //文件URL
private string Path; //下载后文件保存位置
private long Length; //文件大小(B)
private int Threads; //下载线程数
private long DownloadEnd = 0; //下载完成的字节
private bool Stop = false; //出错停止
private bool NoAccept = false; //目标服务器是否支持多线程下载
private Array<byte[]> DownloadBuffer;
private bool[] Changed;
public ThreadDownload(string dURL,string dPath = null,int dThreads = 64) {
URL = dURL;
Path = dPath;
Threads = dThreads;
Do();
}
private void Do() {
try {
ServicePointManager.DefaultConnectionLimit = Threads + 1;
ServicePointManager.SecurityProtocol = SecurityProtocolType.Ssl3 | SecurityProtocolType.SystemDefault | SecurityProtocolType.Tls | SecurityProtocolType.Tls11 | SecurityProtocolType.Tls12 | SecurityProtocolType.Tls13;
Length = WebRequest.Create(URL).GetResponse().ContentLength;
long BlockLong = (Length - Length % Threads) / Threads;
DownloadBuffer = new(Threads+1);
Changed = new bool[Threads+1];
Console.Clear();
Path = Path==null ? new Uri(URL).Segments[new Uri(URL).Segments.Length-1] : Path;
if (File.Exists(Path)) {
File.Delete(Path);
}
{
var readreturntmp1 = ((HttpWebRequest)WebRequest.Create(URL));
readreturntmp1.AddRange(0,1);
var readreturn = (HttpWebResponse)readreturntmp1.GetResponse();
for (int i = 0;i < readreturn.Headers.AllKeys.Length;++i) {
if (readreturn.Headers.AllKeys[i].Equals("Accept-Ranges") && readreturn.Headers[i].Equals("bytes")) {
break;
} else if (readreturn.Headers.AllKeys[i].Equals("Accept-Ranges") && readreturn.Headers[i].Equals("none")) {
NoAccept = true;
break;
}
if (i == readreturn.Headers.AllKeys.Length-1) {
NoAccept = true;
break;
}
}
}
if (!NoAccept) {
Thread[] downThs = new Thread[Threads+1];
Thread pushThs = new Thread(() => {
try {
int Changeds = 0;
var write = new FileStream(Path,FileMode.Append,FileAccess.Write);
for (;Changeds < Threads+1;) {
if (Stop) {
write.Close();
File.Delete(Path);
log = log << "将在单线程重试";
new Download(URL,Path,ref log);
return;
} else if (Changed[Changeds]) {
write.Write(DownloadBuffer[Changeds],0,DownloadBuffer[Changeds].Length);
write.Write(new byte[1]{0},0,1);
write.Flush();
DownloadBuffer[Changeds] = new byte[0];
log = log << "写入" + Changeds + "号线程的数据";
++Changeds;
} else {
Thread.Sleep(200);
}
}
write.Close();
} catch(Exception ex) {
Stop = true;
log = log << new Exception("在合并线程:",ex);
}
});
Thread tjisThs = new Thread(() => {
try {
for (;;) {
long old = DownloadEnd;
Thread.Sleep(200);
Console.Clear();
Console.WriteLine("已完成下载{0}/{2},当前下载速度{1}\\s。",ByteToBytes.ByteToString(DownloadEnd),ByteToBytes.ByteToString(DownloadEnd - old),ByteToBytes.ByteToString(Length));
}
} catch (Exception ex) {
Stop = true;
log = log << new Exception("在统计线程:",ex);
}
});
pushThs.Start();
for (int i = 0;i < downThs.Length;++i) {
downThs[i] = new Thread(info => {
const int ListenBlackLong = 1024*64;
try {
int DownloadBlock = (int)(((DownloadInfo)info).End - ((DownloadInfo)info).Start);
int ID = ((DownloadInfo)info).ID;
DownloadBuffer[ID] = new byte[DownloadBlock];
int key = 0;
Logger log = new("DownloadLog\\" + ID + ".log");
log = log << "下载区间:" + ((DownloadInfo)info).Start + "~" + ((DownloadInfo)info).End + ",共" + DownloadBlock;
for (;key < DownloadBlock;) {
long nowend;
if (DownloadBlock - key < ListenBlackLong) {
nowend = DownloadBlock;
} else {
nowend = ListenBlackLong + key;
}
int nowrange = (int)(nowend - key);
log = log << "读取区间:" + key + "~" + nowend + ",共" + nowrange;
var readstreamt = ((HttpWebRequest)WebRequest.Create(URL));
readstreamt.AddRange(key,nowend);
var readstream = readstreamt.GetResponse().GetResponseStream();
for (int readkey = 0;;) {
if (readkey == nowrange) {
break;
}
lock (DownloadBuffer) {
readkey += readstream.Read(DownloadBuffer[ID],key+readkey,nowrange-readkey);
}
}
readstream.Close();
DownloadEnd += nowrange;
key += nowrange;
}
log.Close();
if (!Stop) {
Changed[ID] = true;
} else {
File.Delete(Path + "-" + ID.ToString());
}
} catch (Exception ex) {
Stop = true;
log = log << new Exception("在" + ((DownloadInfo)info).ID.ToString() + "号线程:",ex);
}
});
var info = new DownloadInfo();
if (i!=downThs.Length-1) {
info.Start = BlockLong * i;
info.End = BlockLong * (i + 1);
} else {
info.Start = Length - Length % Threads;
info.End = Length;
}
info.ID = i;
downThs[i].Start(info);
}
tjisThs.Start();
pushThs.Join();
log.Close();
tjisThs.Abort();
} else {
try {
log = log << "无法多线程下载,开始单线程下载。";
new Download(URL,Path,ref log);
} catch (Exception ex) {
log = log << new Exception("在单线程下载:",ex);
}
}
} catch (Exception ex) {
Stop = true;
log = log << new Exception("在主控制线程:",ex);
log = log << "将在单线程下载中重试";
new Download(URL,Path,ref log);
}
}
private struct DownloadInfo {
public long Start = 0;
public long End = 0;
public int ID = 0;
public DownloadInfo() {
}
}
}
/nuget Microsoft.Extensions.Http(他本身就包含Microsoft.Extensions.Logging日志)
//nuget Microsoft.Extensions.DependencyInjection
internal class Program
{
private static IHttpClientFactory httpClientFactory;
static async Task Main(string[] args)
{
var serviceProvider = new ServiceCollection()
.AddHttpClient()
.BuildServiceProvider();
httpClientFactory = serviceProvider.GetService<System.Net.Http.IHttpClientFactory>();
await downloadfile(new Uri("http://127.0.0.1:5500/a.exe"));
Console.ReadKey();
}
public static async Task downloadfile(Uri uri, int blocksize = 4 * 1024 * 1024)
{
var test = await CheckUriCanRange(uri);
if (test.canRange)
{
var windowcount = (test.totalsize + blocksize - 1) / blocksize;
System.Net.Http.Headers.RangeHeaderValue[] ranges = new System.Net.Http.Headers.RangeHeaderValue[windowcount];
for (int i = 0; i < windowcount - 1; i++)
{
ranges[i] = new System.Net.Http.Headers.RangeHeaderValue(i * blocksize, i * blocksize + blocksize - 1);
}
ranges[windowcount - 1] = new RangeHeaderValue((windowcount - 1) * blocksize, null);
var task = ranges.Select((r, index) =>
{
var filename = $"d_{index}.temp";
return downloadfile0(uri, filename, r);
});
await Task.WhenAll(task);
//合并部分我就随便写了,反正demo只是表明个意思
FileInfo save = new FileInfo("download.t");
using (FileStream fs = save.OpenWrite())
{
for (int i = 0; i < windowcount; i++)
{
using (var fs2=new FileInfo($"d_{i}.temp").OpenRead())
{
await fs2.CopyToAsync(fs);
}
}
}
}
else
{
//不支持断点续传的我就不写了
}
}
private static SemaphoreSlim slim = new SemaphoreSlim(5);
public static async Task downloadfile0(Uri uri, string filename,
System.Net.Http.Headers.RangeHeaderValue range = null)
{
await slim.WaitAsync();
try
{
using (var httpClient = httpClientFactory.CreateClient())
{
if (range != null)
{
httpClient.DefaultRequestHeaders.Range = range;
}
using (HttpResponseMessage response =
await httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead))
{
response.EnsureSuccessStatusCode();
using (FileStream fs = new FileInfo(filename).OpenWrite())
{
await response.Content.CopyToAsync(fs);
}
}
}
}
finally
{
slim.Release();
}
}
public static async Task<(bool canRange, long totalsize)> CheckUriCanRange(Uri uri)
{
long totalsize = 0;
//demo,异常处理不写了,自己完成
using (var httpClient = httpClientFactory.CreateClient())
{
httpClient.DefaultRequestHeaders.Range = new System.Net.Http.Headers.RangeHeaderValue(2, null);
//发起head请求,获取文件总大小
using (HttpResponseMessage headResponse =
await httpClient.SendAsync(new HttpRequestMessage(HttpMethod.Head, uri)))
{
// 获取文件总大小
totalsize = headResponse.Content.Headers.ContentLength ?? 0;
//发起一个get请求测试是否支持断点续传
using (HttpResponseMessage response =
await httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead))
{
if (response.StatusCode == System.Net.HttpStatusCode.PartialContent)
{
return (true, totalsize);
}
else
{
return (false, totalsize);
}
}
}
}
}
}
}
引用GPT: 根据您提供的代码,下面是几个可能导致下载不正确的问题:
多线程并发访问数组:在多线程同时访问和修改DownloadBuffer
和Changed
数组时可能会发生冲突。您可以尝试使用线程安全的集合类,例如ConcurrentDictionary
或ConcurrentQueue
来存储下载缓冲区和状态信息。
数据拼接问题:在合并线程中,您使用了FileStream
的Write
方法将每个线程下载的数据写入文件。然而,您没有确保数据的顺序和完整性。您可以通过维护一个有序的缓冲区,按照字节的顺序将每个线程下载的数据放入缓冲区,并在顺序满足后将缓冲区中的数据写入文件。
启动的线程数:在代码中,您创建了Threads+1
个线程用于下载和合并工作。但是,您并没有等待所有线程完成工作再关闭文件流和写入日志。你可以使用Thread.Join()
方法来等待所有线程完成。
异常处理:您在代码中捕获了异常,并设置了Stop
变量为true
。这会导致下载停止并删除已下载的文件。您可以修改异常处理逻辑,根据具体情况进行处理,例如打印错误日志并重新尝试下载。