首页 > 其他分享 >BlockingQueue读取文本内容,多线程处理数据(线程池版本)

BlockingQueue读取文本内容,多线程处理数据(线程池版本)

时间:2023-04-09 12:46:27浏览次数:39  
标签:String queue 线程 new line 多线程 public BlockingQueue

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.*;

public class ceshi2 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(100);
        ExecutorService executor = Executors.newFixedThreadPool(10); // 创建一个固定大小的线程池,最多同时执行10个线程

        // 将读取文件任务提交给线程池
        Future<?> readerFuture = executor.submit(new ReaderTask(queue, "/Users/flyme/data/input-50w-ex.txt"));

        // 创建10个处理任务,将它们都提交给线程池
        Future<?>[] workerFutures = new Future[10];
        for (int i = 0; i < 10; i++) {
            workerFutures[i] = executor.submit(new WorkerTask(queue));
        }

        // 等待读取任务完成
        readerFuture.get();

        // 等待处理任务完成
        for (Future<?> workerFuture : workerFutures) {
            workerFuture.get();
        }

        // 关闭线程池
        executor.shutdown();

        System.out.println("所有任务完成");
    }
}

class ReaderTask implements Runnable {
    private LinkedBlockingQueue<String> queue;
    private String filePath;

    public ReaderTask(LinkedBlockingQueue<String> queue, String filePath) {
        this.queue = queue;
        this.filePath = filePath;
    }

    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
            String line;
            while ((line = reader.readLine()) != null) {
                queue.put(line); // 将每行内容放入队列
//                System.out.println(Thread.currentThread().getName() + ": 发送到队列中:" + line);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                queue.put("EOF"); // 放置一个结束标记
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

class WorkerTask implements Runnable {
    private LinkedBlockingQueue<String> queue;

    public WorkerTask(LinkedBlockingQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            String line;
            while ((line = queue.poll(1, TimeUnit.SECONDS)) != null) { // 从队列中取出一行数据
                if ("EOF".equals(line)) {
                    queue.put(line); // 将结束标记放回队列
                    break;
                }

                System.out.println(Thread.currentThread().getName() + ": 队列中取到:" + line);

                String[] parts = line.split(",");
                String id = parts[0];
                String address = parts[1];

                // 向API接口发送请求,并获取返回结果
                String result = sendRequest(address);

                // 解析返回结果,提取商品信息
                String[] coordinates = parseCoordinates(result);

                // 拼接id、商品id、商品信息成一行记录
                String record = id + "," + address + "," + coordinates[0] + "," + coordinates[1];

                // 将记录写入输出文件
                writeRecordToFile(record);
            }
        } catch (InterruptedException | IOException e) {
            e.printStackTrace();
        }
    }

    private String sendRequest(String address) {
        // 发送http请求代码省略
        return "名称,销量"; // 模拟返回结果
    }

    private String[] parseCoordinates(String result) {
        // 解析商品信息代码省略
        return new String[]{"名称", "销量"}; // 模拟解析结果
    }

    private void writeRecordToFile(String record) throws IOException {
        try (FileWriter writer = new FileWriter("/Users/flyme/data/output-50w-ex.txt", true)) { // 追加模式写入
            writer.write(record + System.lineSeparator());
        }
    }
}

标签:String,queue,线程,new,line,多线程,public,BlockingQueue
From: https://www.cnblogs.com/flyme6/p/17300144.html

相关文章

  • BlockingQueue读取文本内容,多线程处理数据
    现在有一个txt文本,每个文本中每行的内容是:id,商品id。要求:启动一个线程去读取文本的内容,把每行的内容通过使用BlockingQueue发送到队列里面,然后多线程,最好是10个线程,从BlockingQueue队列里面取出来,将地址作为请求参数,请求api接口,把返回的内容解析出来,把原内容id,商品id,结果集......
  • Java SpringBoot Test 单元测试中包括多线程时,没跑完就结束了
    如何阻止JavaSpringBootTest单元测试中包括多线程时,没跑完就结束了使用CountDownLatchCountDownLatch、CyclicBarrier使用区别多线程ThreadPoolTaskExecutor应用JavaBasePooledObjectFactory对象池化技术@SpringBootTestpublicclassPoolTest{@Testvoid......
  • 进程、线程
    1、进程、线程区别进程是以资源分配的基本单位,线程是CPU调度的基本单位。进程有自己独立的地址空间,线程属于进程,没有独立的地址空间。进程上下文切换大、线程上下文切换小。2、进程、线程分类进程分类:(1)、按进程特点:交互式进程:由shell终端启动的进程,常与用户交互。可位......
  • C++多核多线程同步实现
    使用MakefileC++11工程模拟dsp的多核同步逻辑,使用多线程模拟多核,多个线程通过C++11的条件变量实现同步。当某一线程执行到同步函数syn_func时,判断是否其他线程执行到此处了,若有其他线程没有执行到此处,本线程就应阻塞。当最后一个线程执行到同步函数时,通知所有线程解除阻塞,实现......
  • C++ 并发编程实战 第二章 线程管控
    第二章线程管控std::thread简介构造和析构函数///默认构造///创建一个线程,什么也不做thread()noexcept;///带参构造///创建一个线程,以A为参数执行F函数template<classFn,class...Args>explicitthread(Fn&&F,Args&&...A);///拷贝构造(不可用)thread......
  • 自定义线程池详解
    自定义线程池ThreadPoolExecutorexecutor=newThreadPoolExecutor(5,10,200,TimeUnit.MILLISECONDS, newArrayBlockingQueue<Runnable>(5));第一个参数:核心线程池大小,默认创建后就不会销毁,需要设置allowCoreThreadTimeOut为true时会销毁第二个参数:线程池最大大......
  • 面试题百日百刷-HBase中HTable API有没有线程安全问题,在程序是单例还是多例?
    锁屏面试题百日百刷,每个工作日坚持更新面试题。请看到最后就能获取你想要的,接下来的是今日的面试题: 1.HBase内部机制是什么?Hbase是一个能适应联机业务的数据库系统物理存储:hbase的持久化数据是将数据存储在HDFS上。存储管理:一个表是划分为很多region的,这些region分布式地......
  • 在android的fragment中使用子线程查询的数据如何实时更新在主界面的listview中
    主要是使用handler来对ui界面进行实时更新 public class YourFragment extends Fragment {    private ListView mListView;    private YourAdapter mAdapter = new YourAdapter(getContext(), new ArrayList<YourData>());//注意这一步的初始化如果闪......
  • 协程 goroutine,线程,进程,GPM,的介绍
    前言:进程,线程,协程,并发,并行介绍正文:线程,进程介绍:1.线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;2.一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线3.进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间4.调度和切换:线程上......
  • IO流中「线程」模型总结
    IO流模块:经常看、经常用、经常忘;一、基础简介在IO流的网络模型中,以常见的「客户端-服务端」交互场景为例;客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程需要执行,从处理逻辑看就是「读取数据-业务......