首页 > 其他分享 >BlockingQueue读取文本内容,多线程处理数据

BlockingQueue读取文本内容,多线程处理数据

时间:2023-04-09 12:22:44浏览次数:29  
标签:String 队列 queue 线程 BlockingQueue new 多线程 读取

现在有一个txt文本,每个文本中每行的内容是:id,商品id。

要求:启动一个线程去读取文本的内容,把每行的内容通过使用BlockingQueue发送到队列里面,然后多线程,最好是10个线程,从BlockingQueue队列里面取出来,将地址作为请求参数,请求api接口,把返回的内容解析出来,把原内容id,商品id,结果集写入到新的文本里面。

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class ceshi {

    public static void main(String[] args) throws Exception {
        String inputFilePath = "/Users/flyme/data/input-100.txt"; // 输入文件路径
        String outputFilePath = "/Users/flyme/data/output-100.txt"; // 输出文件路径
        int numThreads = 100; // 线程数量

        BlockingQueue<String> queue = new LinkedBlockingQueue<>(); // 创建一个阻塞队列
        Thread readerThread = new Thread(new Reader(inputFilePath, queue)); // 创建读取线程
        readerThread.start(); // 启动读取线程

        Thread[] workerThreads = new Thread[numThreads]; // 创建工作线程
        for (int i = 0; i < numThreads; i++) {
            workerThreads[i] = new Thread(new Worker(queue, outputFilePath)); // 创建工作线程
            workerThreads[i].start(); // 启动工作线程
        }

        readerThread.join(); // 等待读取线程结束
        for (int i = 0; i < numThreads; i++) {
            workerThreads[i].join(); // 等待所有工作线程结束
        }
    }

    /**
     * 读取线程,将文件内容放到阻塞队列中。
     */
    static class Reader implements Runnable {
        private String filePath;
        private BlockingQueue<String> queue;

        public Reader(String filePath, BlockingQueue<String> queue) {
            this.filePath = filePath;
            this.queue = queue;
        }

        public void run() {
            try (BufferedReader reader = new BufferedReader(new FileReader(filePath))) {
                String line;
                while ((line = reader.readLine()) != null) {
                    queue.put(line); // 将每行内容放入队列
                    System.out.println(Thread.currentThread().getName() + ": 发送到队列中:" + line);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    queue.put("EOF"); // 放置一个结束标记
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 工作线程,从阻塞队列中读取内容并处理,将结果写入到文件中。
     */
    static class Worker implements Runnable {
        private BlockingQueue<String> queue;
        private String outputFilePath;

        public Worker(BlockingQueue<String> queue, String outputFilePath) {
            this.queue = queue;
            this.outputFilePath = outputFilePath;
        }

        public void run() {
            try (FileWriter writer = new FileWriter(outputFilePath, true)) { // 文件追加写入
                String line;
                while (true) {
                    line = queue.poll(1, TimeUnit.SECONDS); // 从队列中取出一条记录,等待1秒钟
                    if (line != null) {
                        if (line.equals("EOF")) { // 如果遇到结束标记,则退出循环
                            break;
                        }
                        String[] fields = line.split(",");
                        String id = fields[0];
                        String address = fields[1];
                        String result = callAPI(address); // 调用API接口,获取结果集
                        String[] coords = result.split(",");
                        writer.write(String.format("%s,%s,%s,%s\n", id, address, coords[0], coords[1])); // 将结果写入输出文件
                    } else {
                        if (isQueueReadyToTerminate(queue)) { // 判断队列是否准备结束
                            break;
                        } else {
                            Thread.sleep(100); // 防止CPU占用率过高
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        /**
         * 模拟API接口调用,返回结果数据。
         */
        private String callAPI(String address) {
            return "a1,a2";
        }

        /**
         * 判断队列是否已准备结束。如果队列为空或者只剩下"EOF"标记,则说明队列已准备结束。
         */
        private boolean isQueueReadyToTerminate(BlockingQueue<String> queue) {
            return queue.isEmpty() || (queue.size() == 1 && queue.contains("EOF"));
        }
    }
}
  • 从队列中取出一条记录,为什么要等待1秒钟

等待1秒钟是为了防止工作线程无限制地轮询队列,这样会让CPU占用率很高,影响系统的性能。在poll方法中加入等待时间,可以让线程在没有任何任务时进入休眠状态,降低CPU的占用率,提高系统的响应速度。

在代码中,工作线程使用了queue.poll(1, TimeUnit.SECONDS)方法来从队列中取出数据,这个方法会等待1秒钟,如果在等待期间未能从队列中成功取出数据,则方法会返回null,线程就可以进行下一个判断了。如果队列中有新的数据,则可以直接读取,不需要等待。如果队列为空,线程也不会继续执行,并且在等待1秒钟后,会再次尝试读取队列中的数据,避免过度消耗系统资源。

标签:String,队列,queue,线程,BlockingQueue,new,多线程,读取
From: https://www.cnblogs.com/flyme6/p/17300122.html

相关文章

  • 12.保存和读取XML和YMAL文件
    除了图像数据之外,有时程序中的尺寸较小的Mat类矩阵、字符串、数组等数据也需要进行保存,这些数据通常保存成XML文件或者YAML文件。本小节中将介绍如何利用OpenCV4中的函数将数据保存成XML文件或者YAML文件以及如何读取这两种文件中的数据。XML是一种元标记语言,所谓元标记就是......
  • 11.视频读取与保存
    1、视频数据读取虽然视频文件是由多张图片组成的,但是imread()函数并不能直接读取视频文件,需要由专门的视频读取函数进行视频读取,并将每一帧图像保存到Mat类矩阵中,代码清单2-27中给出了VideoCapture类在读取视频文件时的构造方式。代码清单2-27读取视频文件VideoCapture类构造......
  • Java SpringBoot Test 单元测试中包括多线程时,没跑完就结束了
    如何阻止JavaSpringBootTest单元测试中包括多线程时,没跑完就结束了使用CountDownLatchCountDownLatch、CyclicBarrier使用区别多线程ThreadPoolTaskExecutor应用JavaBasePooledObjectFactory对象池化技术@SpringBootTestpublicclassPoolTest{@Testvoid......
  • 10.图像的读取与显示
    1、图像读取函数imread()代码清单2-24imread()函数的原型cv::Matcv::imread(constString&filename,intflags=IMREAD_COLOR)filename:需要读取图像的文件名称,包含图像地址、名称和图像文件扩展名flags:读取图像形式的标志,如将彩色图像按......
  • 任意文件读取漏洞
    任意文件读取漏洞漏洞概念及成因任意文件读取漏洞(ArbitraryFileReadVulnerability)是指攻击者可以通过web应用程序读取任意文件而不受访问控制限制的漏洞。这种漏洞可能导致敏感信息泄露、系统崩溃等问题。攻击者可以利用任意文件读取漏洞访问服务器上的任意文件,包括密码文件......
  • C++多核多线程同步实现
    使用MakefileC++11工程模拟dsp的多核同步逻辑,使用多线程模拟多核,多个线程通过C++11的条件变量实现同步。当某一线程执行到同步函数syn_func时,判断是否其他线程执行到此处了,若有其他线程没有执行到此处,本线程就应阻塞。当最后一个线程执行到同步函数时,通知所有线程解除阻塞,实现......
  • Qt-FFmpeg开发-回调函数读取数据(8)
    音视频/FFmpeg#QtQt-FFmpeg开发-使libavformat解复用器通过自定义AVIOContext读取回调访问媒体内容目录音视频/FFmpeg#QtQt-FFmpeg开发-使libavformat解复用器通过自定义AVIOContext读取回调访问媒体内容1、概述2、实现效果3、主要代码4、完整源代码更多精彩内容......
  • 3、XmlBeanFactory 对xml文件读取
    全局目录.md引子1、容器最基本使用.md系列1-bean标签解析:2、XmlBeanFactory的类图介绍.md3、XmlBeanFactory对xml文件读取.md4、xml配置文件解析之【默认】命名空间【标签】的解析.md5、xml配置文件解析之【自定义】命名空间【标签】的解析.md系列2-bean获取:get......
  • Modbus功能码的学习与实验 01 读取线圈状态
    01读取线圈状态    查询帧  应答帧 ......
  • 利用ExcelJS读取Excel文件
    参考文档https://blog.csdn.net/qq_41374651/article/details/115663014https://blog.csdn.net/qiuliaiali123/article/details/104392662https://www.jianshu.com/p/fc96e22c4df1最终效果代码<template><divclass="test"><inputtype="......