1、思路
多线程首先要对文件进行分割,这里使用每个子线程的任务大小固定的方法,根据文件大小分配不同数量的子线程。
要实现断点下载,必须要记录已经复制的位置,每次继续时从上次下载的结束位置继续复制,这里将已经复制的文件位置以long类型写入一个日志文件,继续下载时每个线程从对应的日志文件位置继续复制。
2、实现
子线程类:
实现runnable接口,重写run方法。
使用stopwatch计算每个线程的下载时间。
package com.cn.threadtest;
import org.apache.commons.lang3.time.StopWatch;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
/**
* IO流线程
*
* @author Tobieance
* @date 2023-08-29 20:17
*/
public class DownloadThread implements Runnable {
RandomAccessFile inputStream;
RandomAccessFile outputStream;
RandomAccessFile offsetStream;
long start;
long end;
int length;
File toFile = null;
File fromFile = null;
File logFile = null;
boolean isStack = false;
int index;
byte[] bytes = new byte[1024];
public DownloadThread(File fromFile, long start, long end, File toFile, int index, File logFile) {
this.start = start;
this.end = end;
this.fromFile = fromFile;
this.toFile = toFile;
this.index = index;
this.logFile = logFile;
}
@Override
public void run() {
try {
inputStream = new RandomAccessFile(fromFile, "rw");
outputStream = new RandomAccessFile(toFile, "rw");
offsetStream = new RandomAccessFile(logFile, "rw");
//从记录文件中偏移量 index * 8 的位置开始,记录的是该线程的偏移量
offsetStream.seek(index * 8L);
//获取该线程到达的偏移量
long offset = offsetStream.readLong();
//偏移量为0,即为第一次复制,初始化偏移量为默认偏移量
if (offset == 0) {
offset = start;
} else {
System.out.println("线程" + (index + 1) + "继续下载\n");
}
// 要传输的数据长度
long sum = end - offset + 1;
System.out.println("线程" + (index + 1) +
"\t开始: " + (offset / (1024 * 1024)) +
"MB\t结束:" + (end / (1024 * 1024)) +
"MB\t写入大小:" + (end - offset + 1) / (1024 * 1024) + "MB\n");
StopWatch stopWatch = new StopWatch();
stopWatch.start();
//读写流偏移
inputStream.seek(offset);
outputStream.seek(offset);
//偏移量
long pointer;
while (sum > 0) {
if (sum >= 1024) {
inputStream.read(bytes);
length = 1024;
sum -= 1024;
} else {
inputStream.read(bytes, 0, (int) sum);
length = (int) sum;
sum = 0;
}
//写入文件
outputStream.write(bytes, 0, length);
//获取复制文件输出流的偏移量
pointer = outputStream.getFilePointer();
//把记录文件流的偏移量设置回起点
offsetStream.seek(index * 8L);
//写入新的偏移量
offsetStream.writeLong(pointer);
//判断偏移量是否已经超过应该到达的偏移量
if (pointer >= end) {
break;
}
}
outputStream.close();
offsetStream.close();
stopWatch.stop();
System.err.println("线程" + (index + 1) + "\t任务结束\t\t耗时:" + stopWatch.getTime() + "ms\n");
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
} finally {
// 关流
try {
if (inputStream != null) {
inputStream.close();
}
if (outputStream != null) {
outputStream.close();
}
if (offsetStream != null) {
offsetStream.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
**具体实现类:
文件过大时会生成过多子线程,故使用线程池限制正常工作线程和最大工作线程。
将每个线程需要完成的任务信息,如源文件,目标文件,起始文件位置,结束文件位置,日志文件
存入节点中,使用节点的列表来创建多个线程。**
package com.cn.threadtest;
import java.io.*;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* IO流测试
*
* @author 87036
* @date 2023/08/29
*/
public class Download {
/**
* 单个分片大小 500MB
*/
public static final int PART_SIZE = 1024 * 1024 * 500;
/**
* 目标文件路径
*/
private static String toPathName = null;
/**
* 长度
*/
private static long length;
/**
* 源文件
*/
private static File fromFile = null;
/**
* 目标文件
*/
private static File toFile = null;
/**
* 日志文件
*/
private static File logFile = null;
/**
* 节点列表
*/
private static ArrayList<Node> arrayList = null;
/**
* 实例
*/
private static Download INSTANCE = null;
/**
* 访问内部实例
*
* @return {@link Download}
*/
public static Download getDownloadInstance(String fileName) {
if(INSTANCE==null){
INSTANCE=new Download(fileName);
return INSTANCE;
}
else {
return INSTANCE;
}
}
/**
* 私有构造方法
*
* @param fileName 文件名称
*/
private Download(String fileName) {
fromFile = new File(fileName);
//获取文件长度
length = fromFile.length();
System.out.println("下载大文件\t文件大小:" + (length/(1024*1024))+"MB");
//节点列表
arrayList = getNodeList(length);
if (fromFile.exists()) {
toPathName = fromFile.getParent() + "copy\\" + fromFile.getName();
System.out.println("目标文件目录:\t" + toPathName);
}
//实例化目标文件
toFile = new File(toPathName);
//日志文件
logFile = new File("F:\\log.txt");
System.out.println("初始化除日志文件外各属性\n");
}
public void downloadSingleThread(){
//ThreadPoolExecutor创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
//正常工作的线程个数
5,
//最大可以工作的线程个数
8,
//线程释放时间
60,
//线程释放时间单位
TimeUnit.SECONDS,
//超过3个线程等待则出发最大线程
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
threadPool.execute(
new DownloadThread(fromFile,
0,
length-1,
toFile,
0,
logFile));
threadPool.shutdown();
}
public void downloadMultipleThread() {
//ThreadPoolExecutor创建线程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
//正常工作的线程个数
5,
//最大可以工作的线程个数
8,
//线程释放时间
60,
//线程释放时间单位
TimeUnit.SECONDS,
//超过3个线程等待则出发最大线程
new LinkedBlockingDeque<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
for (int i = 0; i < arrayList.size(); i++) {
threadPool.execute(
new DownloadThread(fromFile,
arrayList.get(i).start,
arrayList.get(i).end,
toFile,
i,
logFile));
}
threadPool.shutdown();
}
public void init(){
//初始化日志文件
System.out.println("初始化日志文件\n");
try {
RandomAccessFile raf = new RandomAccessFile(logFile, "rw");
raf.setLength(0);
for (int i = 0; i < arrayList.size(); i++) {
raf.seek(i * 8L);
raf.writeLong(0);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 获取每个线程的开始和结束节点列表
*
* @param length 长度
* @return {@link ArrayList}
*/
private static ArrayList getNodeList(long length) {
ArrayList<Node> arrayList = new ArrayList<>();
long split = length / PART_SIZE;
long start = 0;
if(split==0){
System.err.println("小于最小分块,单线程处理");
arrayList.add(new Node(start, length - 1));
return arrayList;
}
for (int i = 0; i <= split; i++) {
if (i != split ) {
arrayList.add(new Node(start, start + PART_SIZE - 1));
start += PART_SIZE;
}else {
arrayList.add(new Node(start, length - 1));
}
}
return arrayList;
}
/**
* 节点对象,包含开始和结束
*
* @author Tobieance
* @date 2023/08/29
*/
private static class Node {
long start;
long end;
public Node(long start, long end) {
this.start = start;
this.end = end;
}
}
}
测试类:
package com.cn.threadtest;
import lombok.val;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Scanner;
/**
* @author Tobieance
* @date 2023-08-29 19:09
*/
public class DownloadTest {
public static void main(String[] args) {
// 文件路径
String fileName="D:\\TestBig.zip";
val list=new LinkedList<DownloadThread>();
val arrlist=new ArrayList<DownloadThread>();
Scanner scanner=new Scanner(System.in);
System.out.println("请输入n\nn等于0多线程下载\t等于1单线程下载");
int n=scanner.nextInt();
//n等于0多线程下载 等于1单线程下载
if(n==0) {
System.out.println("多线程");
System.out.println("请输入m\nm等于0第一次下载\t等于1继续下载");
int m= scanner.nextInt();
if(m==0){
//初始化日志文件
Download.getDownloadInstance(fileName).init();
Download.getDownloadInstance(fileName).downloadMultipleThread();
}else if(m==1){
Download.getDownloadInstance(fileName).downloadMultipleThread();
}
}else if(n==1){
System.out.println("单线程");
System.out.println("请输入m\nm等于0第一次下载\t等于1继续下载");
int m= scanner.nextInt();
if(m==0){
//初始化日志文件
Download.getDownloadInstance(fileName).init();
Download.getDownloadInstance(fileName).downloadSingleThread();
}else if(m==1){
Download.getDownloadInstance(fileName).downloadSingleThread();
}
}
}
}
3、运行截图:
标签:文件,System,length,复制,线程,new,import,多线程,断点 From: https://www.cnblogs.com/tobieance/p/17744361.html