为什么
由于下载的文件较大,单线程下载会导致时间较长,影响体验,因此在串行程序基础上引入多线程,提高下载速度;
实现功能
- 设置指定下载线程数
- 根据线程进行分段
- 分段下载
- 下载进度展示
- 合并分段
- 清理临时文件
分段、合并需要用到RandomAccessFile
此类的实例支持对随机访问文件的读取和写入。随机访问文件的行为类似存储在文件系统中的一个大型 byte 数组。存在指向该隐含数组的光标或索引,称为文件指针;输入操作从文件指针开始读取字节,并随着对字节的读取而前移此文件指针。如果随机访问文件以读取/写入模式创建,则输出操作也可用;输出操作从文件指针开始写入字节,并随着对字节的写入而前移此文件指针。写入隐含数组的当前末尾之后的输出操作导致该数组扩展。该文件指针可以通过 getFilePointer 方法读取,并通过 seek 方法设置。
主要构造方法
RandomAccessFile(File file, String mode)
创建从中读取和向其中写入(可选)的随机访问文件流,该文件由 File 参数指定。
RandomAccessFile(String name, String mode)
创建从中读取和向其中写入(可选)的随机访问文件流,该文件具有指定名称。
mode定义:mode 参数指定用以打开文件的访问模式。允许的值及其含意为:
“r” | 以只读方式打开。调用结果对象的任何 write 方法都将导致抛出 IOException。 |
“rw“ | 打开以便读取和写入。如果该文件尚不存在,则尝试创建该文件。 |
“rws” | 打开以便读取和写入,对于 “rw”,还要求对文件的内容或元数据的每个更新都同步写入到底层存储设备。 |
"rwd” | 打开以便读取和写入,对于 “rw”,还要求对文件内容的每个更新都同步写入到底层存储设备。 |
代码实现:
controller
/**
*
* @param url 下载链接
*/
@GetMapping("/down")
public void downLoad(@RequestParam("url") String url) throws Exception {
multiThreadDownService.download(url);
}
/**
* 获取下载状态
*/
@GetMapping("/downStates")
public DownStatesVO downStates() {
return LogCache.cache;
}
service
@Service
@Slf4j
public class MultiThreadDownServiceImpl implements MultiThreadDownService {
private DownStatesVO downStatesVO;
// 下载线程数量
public static final int DOWNLOAD_THREAD_NUM = 5;
// 下载线程池
private static final ExecutorService pool = Executors.newFixedThreadPool(DOWNLOAD_THREAD_NUM + 1);
// 临时文件后缀
public static final String FILE_TEMP_SUFFIX = ".temp";
@Override
public void download(String url) {
try {
//获取文件名称
String httpFileName = HttpUtls.getHttpFileName(url);
//获取已经下载到本地的文件大小
long localSize = FileUtils.getFileContentLength(httpFileName);
//获取文件实际大小
long fileSize = HttpUtls.getHttpFileContentLength(url);
//判断是否已经下载完成
if (localSize >= fileSize){
log.info("文件已经下载完成,不需要重复下载");
return;
}
log.info("开始下载时间"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")));
long startTime = System.currentTimeMillis();
//下载任务集合--为了阻塞下载
List<Future<Boolean>> futureList = new ArrayList<>();
//对任务进行切分
splitDownload(url, futureList);
//将ri
LogThread logThread = new LogThread(fileSize);
Future<Boolean> future = pool.submit(logThread);
futureList.add(future);
//阻塞下载--等待所有下载完毕
for (Future<Boolean> booleanFuture : futureList) {
booleanFuture.get();
}
//每段下载完成后--合并
Boolean finish = mergeFile(httpFileName);
if (finish){
//下载
downloadWeb(new File(httpFileName));
//删除分段分文件
deletePartFile(httpFileName);
}
log.info("下载结束");
}catch (Exception e){
log.error(e.getMessage());
throw new RuntimeException("下载出现异常");
}
}
private void downloadWeb( File file) {
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
ServletRequestAttributes attributes1 = (ServletRequestAttributes) attributes;
HttpServletResponse response = attributes1.getResponse();
try (BufferedInputStream fis = new BufferedInputStream(Files.newInputStream(file.toPath()));
OutputStream toClient = new BufferedOutputStream(response.getOutputStream());) {
byte[] buffer = new byte[fis.available()];
fis.read(buffer);
// 清空response
response.reset();
response.setCharacterEncoding("UTF-8");
response.setContentType("application/octet-stream");
// 对文件名进行 URL 编码
response.setCharacterEncoding(String.valueOf(StandardCharsets.UTF_8));
String fileName = URLEncoder.encode(file.getName(), String.valueOf(StandardCharsets.UTF_8));
response.setHeader("Content-disposition", "attachment;filename*=utf-8''" + fileName);
toClient.write(buffer);
toClient.flush();
} catch (Exception e) {
}finally {
file.delete();
}
}
private void deletePartFile(String httpFileName) {
for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) {
File file = new File(httpFileName + FILE_TEMP_SUFFIX + i);
file.delete();
}
}
private Boolean mergeFile(String httpFileName) {
byte[] bytes = new byte[1024];
int len = -1;
try (RandomAccessFile rw = new RandomAccessFile(httpFileName,"rw");
) {
for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) {
try (FileInputStream fileInputStream = new FileInputStream(httpFileName+FILE_TEMP_SUFFIX+i);
BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream)
){
while ((len=bufferedInputStream.read(bytes)) != -1){
rw.write(bytes,0,len);
}
}
}
} catch (Exception e) {
return false;
}
return true;
}
private void splitDownload(String url, List<Future<Boolean>> futureList) throws IOException {
long httpFileContentLength = HttpUtls.getHttpFileContentLength(url);
// 任务切分
long size = httpFileContentLength / DOWNLOAD_THREAD_NUM;
long lastSize = httpFileContentLength - (httpFileContentLength / DOWNLOAD_THREAD_NUM * (DOWNLOAD_THREAD_NUM - 1));
for (int i = 0; i < DOWNLOAD_THREAD_NUM; i++) {
long start = i * size;
long downloadSize = size;
if (i==DOWNLOAD_THREAD_NUM - 1){
downloadSize= lastSize;
}
long end = start + downloadSize;
if (start != 0) {
start++;
}
DownLoadThread downloadThread = new DownLoadThread(url, start, end, i, httpFileContentLength);
Future<Boolean> future = pool.submit(downloadThread);
futureList.add(future);
}
}
}
线程类:
下载线程
@Slf4j
public class DownLoadThread implements Callable<Boolean> {
/**
* 每次读取的数据块大小
*/
private static int BYTE_SIZE = 1024 * 100;
/**
* 下载链接
*/
private String url;
/**
* 下载开始位置
*/
private long startPos;
/**
* 要下载的文件区块大小
*/
private Long endPos;
/**
* 标识多线程下载切分的第几部分
*/
private Integer part;
/**
* 文件总大小
*/
private Long contentLenth;
public DownLoadThread(String url, long startPos, Long endPos, Integer part, Long contentLenth) {
this.url = url;
this.startPos = startPos;
this.endPos = endPos;
this.part = part;
this.contentLenth = contentLenth;
}
@Override
public Boolean call() throws Exception {
//url校验
if (url==null || url.trim().equals("")){
throw new RuntimeException("下载路径不正确");
}
//给这分片起名字
String fileName = HttpUtls.getHttpFileName(url);
if (part != null){
fileName = fileName+".temp"+part;
}
//判断这一部分是否下载过
long localFileSize = FileUtils.getFileContentLength(fileName);
//下载
if (localFileSize>=endPos-startPos){
log.info("已经下载完毕");
}
if (endPos.equals(contentLenth)) {
endPos = null;
}
//写入本地
HttpURLConnection httpUrlConnection = HttpUtls.getHttpUrlConnection(url, startPos + localFileSize, endPos);
try (InputStream inputStream = httpUrlConnection.getInputStream();
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
RandomAccessFile rw = new RandomAccessFile(fileName,"rw"))
{
rw.seek(localFileSize);
byte[] bytes = new byte[BYTE_SIZE];
int len = -1;
while ((len = bufferedInputStream.read(bytes)) != -1 ){
rw.write(bytes,0,len);
//每次写入都加入到日志线程
LogThread.DOWNLOAD_SIZE.addAndGet(len);
}
}catch (Exception e){
log.error("下载出现异常");
return false;
}finally {
httpUrlConnection.disconnect();
//每完成一段,—+1 用以日志线程判断
LogThread.DOWNLOAD_FINISH_THREAD.addAndGet(1);
}
return true;
}
}
日志线程
/**
* <p>
* 多线程下载日志记录
*/
public class LogThread implements Callable<Boolean> {
public static AtomicLong LOCAL_FINISH_SIZE = new AtomicLong();
public static AtomicLong DOWNLOAD_SIZE = new AtomicLong();
public static AtomicLong DOWNLOAD_FINISH_THREAD = new AtomicLong();
private final long httpFileContentLength;
public LogThread(long httpFileContentLength) {
this.httpFileContentLength = httpFileContentLength;
}
@Override
public Boolean call() throws Exception {
int[] downSizeArr = new int[5];
int i = 0;
double size = 0;
double mb = 1024d * 1024d;
// 文件总大小
// 文件总大小
String httpFileSize = String.format("%.2f", httpFileContentLength / mb);
while (DOWNLOAD_FINISH_THREAD.get() != MultiThreadDownServiceImpl.DOWNLOAD_THREAD_NUM) {
double downloadSize = DOWNLOAD_SIZE.get();
downSizeArr[++i % 5] = Double.valueOf(downloadSize - size).intValue();
size = downloadSize;
// 每秒速度
double fiveSecDownloadSize = Arrays.stream(downSizeArr).sum();
//每秒速度
int speed = (int) ((fiveSecDownloadSize / 1024d) / (i < 5d ? i : 5d));
// 剩余时间
double surplusSize = httpFileContentLength - downloadSize - LOCAL_FINISH_SIZE.get();
//剩余时间
String surplusTime = String.format("%.1f", surplusSize / 1024d / speed);
if (surplusTime.equals("Infinity")) {
surplusTime = "-";
}
// 已下大小
//已经下载大小
String currentFileSize = String.format("%.2f", downloadSize / mb + LOCAL_FINISH_SIZE.get() / mb);
String speedLog = String.format("> 已下载 %smb/%smb,速度 %skb/s,剩余时间 %ss", currentFileSize, httpFileSize, speed, surplusTime);
System.out.print("\r");
System.out.print(speedLog);
Thread.sleep(1000);
//将日志相关值赋值,便于前端获取
DownStatesVO cache = LogCache.cache;
cache.setCurrentFileSize(currentFileSize);
cache.setSpeed(speed);
cache.setHttpFileSize(httpFileSize);
cache.setSurplusTime(surplusTime);
}
return true;
}
}
工具类:
/**
* <p>
* 文件操作工具类
*/
public class FileUtils {
/**
* 获取文件内容长度
*
* @param name
* @return
*/
public static long getFileContentLength(String name) {
File file = new File(name);
return file.exists() && file.isFile() ? file.length() : 0;
}
}
/**
* <p>
* 网络请求操作工具类
* @author woniu
*/
@Slf4j
public class HttpUtls {
/**
* 获取 HTTP 链接
*
* @param url
* @return
* @throws IOException
*/
public static HttpURLConnection getHttpUrlConnection(String url) throws IOException {
URL httpUrl = new URL(url);
HttpURLConnection httpConnection = (HttpURLConnection)httpUrl.openConnection();
httpConnection.setRequestProperty("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36");
return httpConnection;
}
/**
* 获取 HTTP 链接
*
* @param url
* @param start
* @param end
* @return
* @throws IOException
*/
public static HttpURLConnection getHttpUrlConnection(String url, long start, Long end) throws IOException {
HttpURLConnection httpUrlConnection = getHttpUrlConnection(url);
log.debug("此线程下载内容区间 {}-{}", start, end);
if (end != null) {
httpUrlConnection.setRequestProperty("RANGE", "bytes=" + start + "-" + end);
} else {
httpUrlConnection.setRequestProperty("RANGE", "bytes=" + start + "-");
}
Map<String, List<String>> headerFields = httpUrlConnection.getHeaderFields();
for (String s : headerFields.keySet()) {
log.debug("此线程相应头{}:{}", s, headerFields.get(s));
}
return httpUrlConnection;
}
/**
* 获取网络文件大小 bytes
*
* @param url
* @return
* @throws IOException
*/
public static long getHttpFileContentLength(String url) throws IOException {
HttpURLConnection httpUrlConnection = getHttpUrlConnection(url);
int contentLength = httpUrlConnection.getContentLength();
httpUrlConnection.disconnect();
return contentLength;
}
/**
* 获取网络文件 Etag
*
* @param url
* @return
* @throws IOException
*/
public static String getHttpFileEtag(String url) throws IOException {
HttpURLConnection httpUrlConnection = getHttpUrlConnection(url);
Map<String, List<String>> headerFields = httpUrlConnection.getHeaderFields();
List<String> eTagList = headerFields.get("ETag");
httpUrlConnection.disconnect();
return eTagList.get(0);
}
/**
* 获取网络文件名
*
* @param url
* @return
*/
public static String getHttpFileName(String url) {
int indexOf = url.lastIndexOf("/");
return url.substring(indexOf + 1);
}
}
获取状态实体
@Data
public class DownStatesVO {
//已经下载大小
private String currentFileSize;
// 文件总大小
private String httpFileSize;
//剩余时间
private String surplusTime;
//每秒速度
private int speed;
}
控制台打印
获取状态
参考:蜗牛高并发;
标签:下载速度,return,String,url,new,多线程,public,下载 From: https://blog.csdn.net/m0_73363097/article/details/141781173