模拟多线程nio读取文件,并输出,output方法自己补一下。
ReadFile代码:
public class ReadFile extends Observable {
private int bufSize = 1024;
// 换行符
private byte key = "\n".getBytes()[0];
// 当前行数
private long lineNum = 0;
// 文件编码,默认为gb2312
private String encode = "gb2312";
// 具体业务逻辑监听器
private ReaderFileListener readerListener;
public void setEncode(String encode) {
this.encode = encode;
}
public void setReaderListener(ReaderFileListener readerListener) {
this.readerListener = readerListener;
}
/**
* 获取准确开始位置
* @param file
* @param position
* @return
* @throws Exception
*/
public long getStartNum(File file, long position) throws Exception {
long startNum = position;
FileChannel fcin = new RandomAccessFile(file, "r").getChannel();
fcin.position(position);
try {
int cache = 1024;
ByteBuffer rBuffer = ByteBuffer.allocate(cache);
// 每次读取的内容
byte[] bs = new byte[cache];
// 缓存
byte[] tempBs = new byte[0];
String line = "";
while (fcin.read(rBuffer) != -1) {
int rSize = rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[] newStrByte = bs;
// 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
if (null != tempBs) {
int tL = tempBs.length;
newStrByte = new byte[rSize + tL];
System.arraycopy(tempBs, 0, newStrByte, 0, tL);
System.arraycopy(bs, 0, newStrByte, tL, rSize);
}
// 获取开始位置之后的第一个换行符
int endIndex = indexOf(newStrByte, 0);
if (endIndex != -1) {
return startNum + endIndex;
}
tempBs = substring(newStrByte, 0, newStrByte.length);
startNum += 1024;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
fcin.close();
}
return position;
}
/**
* 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾
* @param fullPath
* @param start
* @param end
* @throws Exception
*/
public void readFileByLine(String fullPath, long start, long end) throws Exception {
File fin = new File(fullPath);
if (fin.exists()) {
FileChannel fcin = new RandomAccessFile(fin, "r").getChannel();
fcin.position(start);
try {
ByteBuffer rBuffer = ByteBuffer.allocate(bufSize);
// 每次读取的内容
byte[] bs = new byte[bufSize];
// 缓存
byte[] tempBs = new byte[0];
String line = "";
// 当前读取文件位置
long nowCur = start;
while (fcin.read(rBuffer) != -1) {
nowCur += bufSize;
int rSize = rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[] newStrByte = bs;
// 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
if (null != tempBs) {
int tL = tempBs.length;
newStrByte = new byte[rSize + tL];
System.arraycopy(tempBs, 0, newStrByte, 0, tL);
System.arraycopy(bs, 0, newStrByte, tL, rSize);
}
// 是否已经读到最后一位
boolean isEnd = false;
// 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置
if (end > 0 && nowCur > end) {
// 缓存长度 - 当前已经读取位数 - 最后位数
int l = newStrByte.length - (int) (nowCur - end);
newStrByte = substring(newStrByte, 0, l);
isEnd = true;
}
int fromIndex = 0;
int endIndex = 0;
// 每次读一行内容,以 key(默认为\n) 作为结束符
while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) {
byte[] bLine = substring(newStrByte, fromIndex, endIndex);
line = new String(bLine, 0, bLine.length, encode);
lineNum++;
// 输出一行内容,处理方式由调用方提供
readerListener.outLine(line.trim(), lineNum, false);
fromIndex = endIndex + 1;
}
// 将未读取完成的内容放到缓存中
tempBs = substring(newStrByte, fromIndex, newStrByte.length);
if (isEnd) {
break;
}
}
// 将剩下的最后内容作为一行,输出,并指明这是最后一行
String lineStr = new String(tempBs, 0, tempBs.length, encode);
readerListener.outLine(lineStr.trim(), lineNum, true);
} catch (Exception e) {
e.printStackTrace();
} finally {
fcin.close();
}
} else {
throw new FileNotFoundException("没有找到文件:" + fullPath);
}
// 通知观察者,当前工作已经完成
setChanged();
notifyObservers(start+"-"+end);
}
/**
* 查找一个byte[]从指定位置之后的一个换行符位置
*
* @param src
* @param fromIndex
* @return
* @throws Exception
*/
private int indexOf(byte[] src, int fromIndex) throws Exception {
for (int i = fromIndex; i < src.length; i++) {
if (src[i] == key) {
return i;
}
}
return -1;
}
/**
* 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[]
*
* @param src
* @param fromIndex
* @param endIndex
* @return
* @throws Exception
*/
private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception {
int size = endIndex - fromIndex;
byte[] ret = new byte[size];
System.arraycopy(src, fromIndex, ret, 0, size);
return ret;
}
}
ReadFileThread代码:
public class ReadFileThread extends Thread {
private ReaderFileListener processPoiDataListeners;
private String filePath;
private long start;
private long end;
public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) {
this.setName(this.getName()+"-ReadFileThread");
this.start = start;
this.end = end;
this.filePath = file;
this.processPoiDataListeners = processPoiDataListeners;
}
@Override
public void run() {
ReadFile readFile = new ReadFile();
readFile.setReaderListener(processPoiDataListeners);
readFile.setEncode(processPoiDataListeners.getEncode());
// readFile.addObserver();
try {
readFile.readFileByLine(filePath, start, end + 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
ReaderFileListener代码:
public abstract class ReaderFileListener {
// 一次读取行数,默认为500
private int readColNum = 500;
private String encode;
private List<String> list = new ArrayList<String>();
/**
* 设置一次读取行数
* @param readColNum
*/
protected void setReadColNum(int readColNum) {
this.readColNum = readColNum;
}
public String getEncode() {
return encode;
}
public void setEncode(String encode) {
this.encode = encode;
}
/**
* 每读取到一行数据,添加到缓存中
* @param lineStr 读取到的数据
* @param lineNum 行号
* @param over 是否读取完成
* @throws Exception
*/
public void outLine(String lineStr, long lineNum, boolean over) throws Exception {
if(null != lineStr)
list.add(lineStr);
if (!over && (lineNum % readColNum == 0)) {
output(list);
list.clear();
} else if (over) {
output(list);
list.clear();
}
}
/**
* 批量输出
*
* @param stringList
* @throws Exception
*/
public abstract void output(List<String> stringList) throws Exception;
}
ProcessDataByPostgisListeners代码:
很多网上教程没有这个类,手动补下。
/*
* 很多网上资料没有这个实现类
* 手动加了个,其实就是初始化父类传个encode
*/
public class ProcessDataByPostgisListeners extends ReaderFileListener{
public ProcessDataByPostgisListeners(String encode) {
super.setEncode(encode);
}
@Override
public void output(List<String> stringList) throws Exception {
// 这个方法记得写 要不nio没有输出
}
}
BuildData代码:
public class BuildData {
public static void main(String[] args) throws Exception {
String userHome = System.getProperties().getProperty("user.home"); // 用户目录,如:C:\Users\chushiyun
String fileName = userHome+"/1G.mp4"; // 文件路径
File file = new File(fileName);
FileInputStream fis = null;
try {
ReadFile readFile = new ReadFile();
fis = new FileInputStream(file);
int available = fis.available();
int maxThreadNum = 50;
// 线程粗略开始位置
int i = available / maxThreadNum;
for (int j = 0; j < maxThreadNum; j++) {
// 计算精确开始位置
long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j);
long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2;
// 具体监听实现
ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("GBK");
new ReadFileThread(listeners, startNum, endNum, file.getPath()).start();
}
} catch (IOException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
github地址:
https://github.com/1054294965/git-nio