首页 > 编程语言 >java使用nio多线程读文件

java使用nio多线程读文件

时间:2023-02-28 10:03:09浏览次数:56  
标签:java nio int newStrByte new byte 多线程 public String


模拟多线程nio读取文件,并输出,output方法自己补一下。

ReadFile代码:

public class ReadFile extends Observable {

private int bufSize = 1024;
// 换行符
private byte key = "\n".getBytes()[0];
// 当前行数
private long lineNum = 0;
// 文件编码,默认为gb2312
private String encode = "gb2312";
// 具体业务逻辑监听器
private ReaderFileListener readerListener;

public void setEncode(String encode) {
this.encode = encode;
}

public void setReaderListener(ReaderFileListener readerListener) {
this.readerListener = readerListener;
}

/**
* 获取准确开始位置
* @param file
* @param position
* @return
* @throws Exception
*/
public long getStartNum(File file, long position) throws Exception {
long startNum = position;
FileChannel fcin = new RandomAccessFile(file, "r").getChannel();
fcin.position(position);
try {
int cache = 1024;
ByteBuffer rBuffer = ByteBuffer.allocate(cache);
// 每次读取的内容
byte[] bs = new byte[cache];
// 缓存
byte[] tempBs = new byte[0];
String line = "";
while (fcin.read(rBuffer) != -1) {
int rSize = rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[] newStrByte = bs;
// 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
if (null != tempBs) {
int tL = tempBs.length;
newStrByte = new byte[rSize + tL];
System.arraycopy(tempBs, 0, newStrByte, 0, tL);
System.arraycopy(bs, 0, newStrByte, tL, rSize);
}
// 获取开始位置之后的第一个换行符
int endIndex = indexOf(newStrByte, 0);
if (endIndex != -1) {
return startNum + endIndex;
}
tempBs = substring(newStrByte, 0, newStrByte.length);
startNum += 1024;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
fcin.close();
}
return position;
}

/**
* 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾
* @param fullPath
* @param start
* @param end
* @throws Exception
*/
public void readFileByLine(String fullPath, long start, long end) throws Exception {
File fin = new File(fullPath);
if (fin.exists()) {
FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();
fcin.position(start);
try {
ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);
// 每次读取的内容
byte[] bs = new byte[bufSize];
// 缓存
byte[] tempBs = new byte[0];
String line = "";
// 当前读取文件位置
long nowCur = start;
while (fcin.read(rBuffer) != -1) {
nowCur += bufSize;

int rSize = rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[] newStrByte = bs;
// 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
if (null != tempBs) {
int tL = tempBs.length;
newStrByte = new byte[rSize + tL];
System.arraycopy(tempBs, 0, newStrByte, 0, tL);
System.arraycopy(bs, 0, newStrByte, tL, rSize);
}
// 是否已经读到最后一位
boolean isEnd = false;
// 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置
if (end > 0 && nowCur > end) {
// 缓存长度 - 当前已经读取位数 - 最后位数
int l = newStrByte.length - (int) (nowCur - end);
newStrByte = substring(newStrByte, 0, l);
isEnd = true;
}
int fromIndex = 0;
int endIndex = 0;
// 每次读一行内容,以 key(默认为\n) 作为结束符
while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {
byte[] bLine = substring(newStrByte, fromIndex, endIndex);
line = new String(bLine, 0, bLine.length, encode);
lineNum++;
// 输出一行内容,处理方式由调用方提供
readerListener.outLine(line.trim(), lineNum, false);
fromIndex = endIndex + 1;
}
// 将未读取完成的内容放到缓存中
tempBs = substring(newStrByte, fromIndex, newStrByte.length);
if (isEnd) {
break;
}
}
// 将剩下的最后内容作为一行,输出,并指明这是最后一行
String lineStr = new String(tempBs, 0, tempBs.length, encode);
readerListener.outLine(lineStr.trim(), lineNum, true);
} catch (Exception e) {
e.printStackTrace();
} finally {
fcin.close();
}

} else {
throw new FileNotFoundException("没有找到文件:" + fullPath);
}
// 通知观察者,当前工作已经完成
setChanged();
notifyObservers(start+"-"+end);
}

/**
* 查找一个byte[]从指定位置之后的一个换行符位置
*
* @param src
* @param fromIndex
* @return
* @throws Exception
*/
private int indexOf(byte[] src, int fromIndex) throws Exception {

for (int i = fromIndex; i < src.length; i++) {
if (src[i] == key) {
return i;
}
}
return -1;
}

/**
* 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[]
*
* @param src
* @param fromIndex
* @param endIndex
* @return
* @throws Exception
*/
private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {
int size = endIndex - fromIndex;
byte[] ret = new byte[size];
System.arraycopy(src, fromIndex, ret, 0, size);
return ret;
}

}

ReadFileThread代码:

public class ReadFileThread extends Thread {

private ReaderFileListener processPoiDataListeners;
private String filePath;
private long start;
private long end;

public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) {
this.setName(this.getName()+"-ReadFileThread");
this.start = start;
this.end = end;
this.filePath = file;
this.processPoiDataListeners = processPoiDataListeners;
}

@Override
public void run() {
ReadFile readFile = new ReadFile();
readFile.setReaderListener(processPoiDataListeners);
readFile.setEncode(processPoiDataListeners.getEncode());
// readFile.addObserver();
try {
readFile.readFileByLine(filePath, start, end + 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}

ReaderFileListener代码:

public abstract class ReaderFileListener {

// 一次读取行数,默认为500
private int readColNum = 500;

private String encode;

private List<String> list = new ArrayList<String>();

/**
* 设置一次读取行数
* @param readColNum
*/
protected void setReadColNum(int readColNum) {
this.readColNum = readColNum;
}

public String getEncode() {
return encode;
}

public void setEncode(String encode) {
this.encode = encode;
}

/**
* 每读取到一行数据,添加到缓存中
* @param lineStr 读取到的数据
* @param lineNum 行号
* @param over 是否读取完成
* @throws Exception
*/
public void outLine(String lineStr, long lineNum, boolean over) throws Exception {
if(null != lineStr)
list.add(lineStr);
if (!over && (lineNum % readColNum == 0)) {
output(list);
list.clear();
} else if (over) {
output(list);
list.clear();
}
}

/**
* 批量输出
*
* @param stringList
* @throws Exception
*/
public abstract void output(List<String> stringList) throws Exception;

}

ProcessDataByPostgisListeners代码:
很多网上教程没有这个类,手动补下。

/*
* 很多网上资料没有这个实现类
* 手动加了个,其实就是初始化父类传个encode
*/
public class ProcessDataByPostgisListeners extends ReaderFileListener{
public ProcessDataByPostgisListeners(String encode) {
super.setEncode(encode);
}

@Override
public void output(List<String> stringList) throws Exception {
// 这个方法记得写 要不nio没有输出
}
}

BuildData代码:

public class BuildData {
public static void main(String[] args) throws Exception {

String userHome = System.getProperties().getProperty("user.home"); // 用户目录,如:C:\Users\chushiyun
String fileName = userHome+"/1G.mp4"; // 文件路径
File file = new File(fileName);
FileInputStream fis = null;
try {
ReadFile readFile = new ReadFile();
fis = new FileInputStream(file);
int available = fis.available();
int maxThreadNum = 50;
// 线程粗略开始位置
int i = available / maxThreadNum;
for (int j = 0; j < maxThreadNum; j++) {
// 计算精确开始位置
long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);
long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;
// 具体监听实现
ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("GBK");
new ReadFileThread(listeners, startNum, endNum, file.getPath()).start();
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}

github地址:
​​​https://github.com/1054294965/git-nio​



标签:java,nio,int,newStrByte,new,byte,多线程,public,String
From: https://blog.51cto.com/u_7341513/6090172

相关文章

  • java-jdbc
    0、简介Java数据库连接,(JavaDatabaseConnectivity,简称JDBC)是Java语言中用来规范客户端程序如何来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。J......
  • java网络编程-客户端和服务器
    基于java.net包,实现一个简单的服务端和客户端,客户端只管发,服务端只管收缺点:服务端只能处理一个客户端的请求,因为服务端是单线程的。一次只能与一个客户端进行消息通信服......
  • Failed to start bean ‘documentationPluginsBootstrapper’; nested exception is j
    Youmightencounterthe“Failedtostartbean'documentationPluginsBootstrapper';nestedexceptionisjava.lang.NullPointerException”errorwhileupgradingS......
  • JavaFX 学习记录
    使用JavaFX时一些奇怪的问题继承自Application类的构造函数会被执行两次先看代码://FXTestMain.javaimportjavafx.application.Application;importjavafx.stage......
  • java中&的使用
    &是位于运算当它的左右是两个int类型数时,要将它们转化为二进制进行位于运算(即将两个二进制数上的每一位进行且运算)例如:4&3即(100&101)结果为:100......
  • python 如何实现多线程
    今天本来打算学习学习多进程的,但是由于我现在的电脑没有Linux系统,无法通过Linux系统编辑一些多进程的程序,因此我打算从多线程入手。多线程我们的程序一般都是多任务的,如......
  • 吐血整理!2万字Java基础面试题(带答案)请收好!
    熬夜整理了这么多年来的Java基础面试题,欢迎学习收藏,手机上可以点击这里,效果更佳https://mp.weixin.qq.com/s/ncbEQqQdJo0UaogQSgA0bQ1.1Hashmap与concurrentHashMap......
  • java正则匹配demo
    java正则匹配实现1.问题描述根据指定的字段名限制条件,提取出sql语句中的对应字段名并返回。字段名限制条件如下:必须以${开头,}结尾;中间只能包含字母、数字和下划......
  • javaSE学习二
    使用Scanner实现用户交互   注意点:使用next方法时一定读取到有效字符后才能结束输入,有效字符前的空白自动去除,有效字符后的空白为结束符,next不能得到有空格的字符串......
  • Java语言概述
    Java概述是SUN(StanfordUniversityNetwork,斯坦福大学网络公司)1995年推出的一门高级编程语言。是一种面向Internet的编程语言。Java一开始富有吸引力是因为Java程序......