首页 > 编程语言 >Java并发

Java并发

时间:2023-10-22 20:16:00浏览次数:56  
标签:Java Thread 队列 并发 任务 线程 new 执行

Java并发基础

并发:多个任务在同一时间段交替执行

并行:多个任务在同一时刻同时执行

Java线程

创建方式

继承Thread和实现接口

继承Thread类并重写run,之后调用start方法启动线程注意:调用run只是普通的方法调用,不会新开线程。例如如下匿名类继承Thread。

   Thread thread = new Thread() {
        @Override
        public void run() {
            // 线程执行的逻辑
            System.out.println("Thread is running");
        }
    };
	thread.start(); // 启动线程

这种方法局限在于不能再次调用start再次执行该方法。 // TODO why,start的原理?

实现Runnable或者实现Callable接口

两者区别:前者没有返回值,

后者可以使用FutureTask<>包裹Callable,然后传递给Thread执行或者Executor框架中的submit()方法提交Callable任务,并返回一个Future对象,通过该对象获得返回值。

实现Runnable例子,注意和上面的区别,是把匿名实现类当参数传给Thread。如果想多次执行该方法,再创建一个Thread传进去,调用start即可。 // TODO,一个线程能不能多次执行我给的方法?

        Thread thread = new Thread(() -> {
            Thread.sleep(10);
            IntStream.range(0, 10).forEach(System.out::println);
        });
        thread.start();

原理:Thread的start方法会调用run,内部如下,targetRunnable类型的

    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }

实现Callable

  • 使用FutureTaskThread
        FutureTask<String> task = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "HEllo";
            }
        });
        new Thread(task).start();
        System.out.println(task.get());
  • 使用FutureExecutor线程池。把Callable对象提交给线程池后,会返回一个Future,调用get时如果没有执行完,当前线程会阻塞,和FutureTask一样。如果执行时间短,最好用while(!task.isDone())来忙等待,这样可以避免发生阻塞,省去多余的线程上下文切换时间。
        // SingleThreadExecutor仅有的一个线程来执行任务
		ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<String> task = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws InterruptedException {
                Thread.sleep(500);
                return "HEllo";
            }
        });
        System.out.println(task.get());
		executorService.shutdown();//关闭线程池

用线程池创建

创建例子上面给出过了,实现接口加Future接收结果(如果有)。有5钟常见线程池,后面讲。

如果任务抛出过异常,Future.get()时候也会抛出异常。

submit()方法用于提交带返回值的任务,通过Future对象可以获取任务的执行结果;execute()方法用于提交不带返回值的任务,有异常也只能在任务里面做好捕获处理。

线程池中调用这两个方法之后,内部会将传入的Runnable(我们的任务),封装成一个Worker对象放入Hashset.详细见[后文](# 复用线程的原理)

LifeCycle和基本方法

这两个放一起是因为生命周期有很多地方的转换是因为调用了某些线程方法。

PicSrc

image-20231020090711823

:图中几点说明

  • Waiting和TimedWaiting区别:后者有最长时间,时间到线程会被唤醒,前者不会
  • 进入Blocked的机会一般只有等待锁的时候
  • 两个wait状态调用notify之后有两个箭头,一个时runnable,一个blocked,大概是因为被唤醒后又进入了等锁状态。这两条线似乎是多余的。

常用方法

Object的wait notify

调用该对象的wait释放该对象的锁,notifynotifyAll唤醒所有等待该锁的线程

注:获取锁使用sychronized

Thread的interrupt stop

interrupt: 其他线程调用该线程的此方法,中断标志位置为true,若该线程处于阻塞或者等待状态,会抛出InterruptedException,可以自行捕获异常处理,若处于正常运行状态,不会抛出异常。可见,此方法不会直接导致线程退出,接收到信号之后是否退出取决于线程中的逻辑,也可以不退出继续执行。线程可以使用Thread.interruptd重置标记位为false.

注意:有的阻塞状态不会响应中断,比如synchronized

stop:强制中断,不建议使用。因为可能当前线程有重要任务或者资源在使用。

其他

  • sleep
  • setDaemon
  • join

线程池

概念 ref

  • 核心线程数(Core Pool Size):表示核心线程的数量。所谓核心线程,是在"大多数情况下(未开启allowCoreThreadTimeOut参数)"都不会被销毁的线程(即使它们一直处于空闲状态);
  • 最大线程数(Max Pool Size):当任务堆积,且池中没有空闲的核心线程来处理任务时,线程池会创建一些临时的线程来处理堆积的任务。这些临时的线程会在空闲一定时间后被销毁掉, 最大的临时线程数量为MaxPoolSize - CorePoolSize;
  • 存活时间(Keep Alive Time):分为具体的数值和单位,如60s。表示临时线程空闲存活时间(在开启allowCoreThreadTimeOut参数后也表示核心线程的空闲存活时间)。 当临时线程空闲时间超过该值之后,就会被销毁掉;
  • 工作队列(Work Queue):存放挤压的任务;
  • 拒绝策略(Reject Handler):当线程池的线程数量已经达到上限(Max Pool Size),全部都处于非空闲的状态,且工作队列已满无法再堆积任务时, 会按照预先设定的方式拒绝新的任务

:核心线程和非核心线程没有本质区别,主要是后者一个空闲一定时间后被销毁。

注注:线程池allowCoreThreadTimeOut(boolean)如果设置true,核心线程空闲一定时也会被销毁。

加入新任务的流程

此流程对应线程池中execute()

  • 先判断核心线程是否已满,否,直接执行
    • 是,看工作队列是否已满,否,加入队列等待
      • 是,查看能否创建临时线程,否,拒绝策略
        • 是,创建临时线程并执行

总结:核心线程->工作队列->临时线程

以上概念有两个需要仔细理解:工作队列,拒绝策略。

工作队列

在java队列的文章中具体学习

拒绝策略

工作的线程达到上限,并且队列也满,会对新来的执行拒绝策略。

默认提供四种。当然,也可以自己实现RejectedExecutionHandler接口自定义策略。

  • AbortPolicy:默认的拒绝策略。会抛出RejectedExecutionException异常。

  • DiscardPolicy:静默拒绝策略。同AbortPolicy类似,但不会抛出异常。

  • CallerRunsPolicy:交由调用者所在线程来执行任务。源码

        // 判断线程池是否关闭
        if (!e.isShutdown()) {
            // 在线程池没有关闭的情况下,直接执行任务,注意:当前执行的线程是调用线程
            r.run();
        }
    
  • DiscardOldestPolicy:丢弃掉工作队列中等待时间最长的任务,也就是当前队列中的第一个任务,并将新的任务添加到队列尾部。

        if (!e.isShutdown()) {
            e.getQueue().poll(); // 取出对头头部的数据
            e.execute(r); // 提交新任务,注意,新任务会自然放入到队列尾部
        }
    

测试,自己创建(后面讲如何创建)一个最大线程数为1,队列容量为1的线程池,然后提交3个任务。分别指定不同的策略看看执行结果。

    // 交给调用者线程执行
	RejectedExecutionHandler policy = new ThreadPoolExecutor.CallerRunsPolicy();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 0,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1),
            policy);
    Runnable runnable = () -> {
        try {
            System.out.println("任务由" + Thread.currentThread().getName() + "执行 then sleep");
            Thread.sleep(1000 * 5); // 模拟任务一直占用线程
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    };
    for (int i = 0; i < 3; ++i) {
        executor.submit(runnable);
    }

这个例子的输出就是

任务由main执行 then sleep
任务由pool-1-thread-1执行 then sleep
任务由pool-1-thread-1执行 then sleep

创建线程池

构造函数如下,对应上面5个概念,存活时间为值和单位。

另外注意ThreadFactory是一个接口,只有一个Thread newThread(Runnable r);

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

也可以使用Executors类预制的几个线程池,通过他们创建方法可以大概看出他们的特性。

但他们中有的存在一定问题,阿里《Java开发手册》中提到要手动用ThreadPoolExecutor创建比较好

FixedThreadPool

无界队列(LinkedBlockingQueue)。没有指定拒绝策略,则使用默认的AbortPolicy策略。

因为队列无限制,所以不会抛出RejectedExecutionException,但是会出现OutOfMemoryError

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}

// 调用
Executors.newFixedThreadPool();

CachedThreadPool

同步队列(SynchronousQueue),本质上无存储空间,存和取都会阻塞操作者,底层实现是先自旋,超过一定次数再加锁。

总线程数为INTMAX,抛出的东西同上

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}

SingleThreadExecutor

只有一个线程,无界队列

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));

ScheduledThreadPoolExecutor

继承ThreadExecutor

可执行定时任务的线程池。其中ScheduledThreadPoolExecutor.DelayedWorkQueue()是私有内部类。

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 10L, TimeUnit.MILLISECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}

该线程池两个核心方法

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

两者都是隔一段时间执行一次,区别:

  • 前者的间隔时间是从每一次执行开始计算,如果时间到了,但是上一次还没执行完,则等待结束后再执行下一次。
  • 后者间隔时间从一次执行结束后开始计算。

**WorkStealingPool **

ForkJoin框架的一部分。

特点是每个线程都有自己的任务队列,当某个线程完成了自己队列中的任务后,它可以从其他线程的队列中偷取任务来执行。

选用线程池

// TODO 根据不同场景选用不同的线程池

关闭线程池

ref(25张图+原理)

线程池的所有状态

image-20231020101335615

shutdown()会等待线程池和队列任务都完成,shutdownNow()会抛弃队列任务,尝试终止正在执行的任务,具体是调用线程的interrupt()但具体是否终止,取决于线程本身。

复用线程的原理

线程池最大优点就是复用线程。原理是将ThreadRunnable封装进一个Worker对象,这个对象也是实现了Runnable接口,他的run方法内部是一个死循环,该worker中的线程负责执行这个run方法,并且不断从队列中读取任务并执行,达到线程复用的目的。

简单来说就是让一个线程死循环,在循环中读取队列任务并执行。

	// 省略无关代码    
	public void execute(Runnable command) {
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) // 添加新的worker
                return;
        }
        if (isRunning(c) && workQueue.offer(command)) {
        }
    }

解释 : 一开始会尝试addWorker新建线程,如果失败就放入队列。内部如下,创建后会直接开始新建的线程

private boolean addWorker(Runnable firstTask, boolean core) {
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // ...
        if (workerAdded) {
            t.start(); // 开启线程,这个线程相当于 new Thread(worker).start(); 只不过是Thread工厂创建的
        }
    }
}

Worker的run实际调final void runWorker(Worker w),在该循环中会不断用getTask()从队列获取任务然后执行

    final void runWorker(Worker w) {
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
     	while (task != null || (task = getTask()) != null) {
        	// ....
            task.run();
        }
    }

Java锁

锁的底层到底是什么

ref

Java中每个对象都隐含关联一个监视器ObjectMonitor,监视器通过cpp实现内置在JVM中,监视器地址记录在对象的MarkWord上,synchronized通过ObjectMonitor实现对象的锁操作。

涉及知识:java对象的内存模型

MarkWord是什么

JVM在内存中将对象划为三部分:对象头、实例数据和填充数据。对象头分为MarkWord和类型指针两部分。MarkWord存储对象自身的运行数据,如哈希值、GC分代年龄等。对象在不同状态下,MarkWord中存储的信息不同,例如下图,最左边是状态,右边是该状态对应的存储信息。注意重量级锁定的状态中,有指向重量级锁的指针,即ObjectMonitor对象的地址

image-20231022172720201

ObjectMonitor是什么

内部有如下记录

ObjectMonitor() {
    _header       = NULL;
    _count        = 0; //记录个数
    _waiters      = 0,
    _recursions   = 0;
    _object       = NULL;
    _owner        = NULL; //持有Monitor的线程
    _WaitSet      = NULL; //处于wait状态的线程
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;
    FreeNext      = NULL ;
    _EntryList    = NULL ; //处于等待锁block状态的线程
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
}

注意,以上都是cpp在JVM中的实现

一个通俗易懂的图

image-20231022173149966

每个等待锁的线程都会被封装成ObjectWaiter对象,ObjectWaiter首先会进入 Entry Set等着,当线程获取到对象的monitor后把monitor中的owner变量设置为当前线程,同时monitor中的计数器count加1,若线程调用wait()方法,将释放当前持有的monitorowner变量恢复为nullcount自减1,同时该线程进入 WaitSet集合中等待被唤醒。若当前线程执行完毕也将释放monitor并复位变量的值,以便其他线程进入获取对象的monitor

到这里,大致理解了锁底层就是一个ObjectMonitor对象,需要访问对象的线程排成队列,这个队列协调不同的线程按一定顺序规则访问对象,而协调的操作由JVM来做。

多线程共享数据时必然会出现竞争问题,于是就要使用锁来控制数据访问。

:后续讲到的各种锁并不是非此即彼的关系,只是站在不同角度分类有不同的名字。

各种锁简单概念

锁优化策略

有什么点可优化

互斥同步涉及的挂起/唤醒线程都涉及内核态转换,消耗大。jvm会根据同步块代码和锁的访问情况对锁进行不同程度的优化或者升级(轻到重),尽量避免系统的同步或者切换线程的操作。

ref 文章后半段锁优化

ref 文章中锁机制部分

Synchronized 锁优化

  • 锁消除和锁粗化都是在运行时的一些优化方案,比如我们某段代码虽然加了锁,但是在运行时根本不可能出现各个线程之间资源争夺的情况,这种情况下,完全不需要任何加锁机制,所以锁会被消除锁粗化则是我们代码中频繁地出现互斥同步操作,比如在一个循环内部加锁,这样明显是非常消耗性能的,所以JVM一旦检测到这种操作,会将整个同步范围进行扩展。

  • 偏向锁:目的是消除数据在无竞争情况下的同步原语。线程获取偏向锁后,持有锁的线程以后每次进入相应同步块时,都不需要再进行任何同步操作。偏向锁不会主动释放,只有当其他线程尝试获取锁时,才会检查持有线程是否可以释放锁。如可以释放则替换为新线程ID,不可释放则升级为轻量级锁。

  • 轻量级锁:进入同步代码块前,JVM首先查看同步对象的MarkWord看看有没有被占用,如果没有,在当前线程栈帧当中建立一个锁记录Lock Record,用于存储同步对象的MarkWord的拷贝(why? 用于后面比较);然后虚拟机使用CAS操作将对象的LockRecord指针指向当前线程的栈帧(也就是尝试获取锁)。如果更新失败,则检查对象当前的MarkWord和之前拷贝是否一致,如果不一致,说明别的线程抢先一步,就进入自旋并在自旋达到一定次数(自适应自旋锁)后升级为重量级锁。自旋的同时如果有第三个线程尝试获取锁,也会直接升级到重量级锁。

    为什么要用轻量级锁而不是重量级锁?

    因为重量级锁请求失败会将线程挂起,而线程上下文切换涉及OS内核态转换,有一定耗时。所以如果线程很少,每个线程占用锁时间不长,就直接自旋一直占着CPU,等锁释放。

  • 重量级锁:如果加锁作用于代码块,会调用monitorentermonitorexit这种耗时高的操作(一般比之前的自旋和CAS高)来进行同步。作用于方法则会给方法加上ACC_SYNCHRONIZED标记,原理类似。

锁API

锁API和上面提到的锁不同,这些锁是Java API层面实现的锁机制。

ref 文章锁框架部分

Lock && Condition

Lock有如下方法,不同于synchronized,Lock更像是一把真的锁,可以lock和unlock

public interface Lock {
  	//获取锁,拿不到锁会阻塞,等待其他线程释放锁,获取到锁后返回
    void lock();
  	//同上,但是等待过程中会响应中断
    void lockInterruptibly() throws InterruptedException;
  	//尝试获取锁,但是不会阻塞,如果能获取到会返回true,不能返回false
    boolean tryLock();
  	//尝试获取锁,但是可以限定超时时间,如果超出时间还没拿到锁返回false,否则返回true,可以响应中断
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  	//释放锁
    void unlock();
  	//暂时可以理解为替代传统的Object的wait()、notify()等操作的工具
    Condition newCondition();
}

Condition:

public interface Condition {
  	//与调用锁对象的wait方法一样,会进入到等待状态,但是这里需要调用Condition的signal或signalAll方法进行唤醒(感觉就是和普通对象的wait和notify是对应的)同时,等待状态下是可以响应中断的
    void await() throws InterruptedException;
  	//同上,但不响应中断(看名字都能猜到)
  	void awaitUninterruptibly();
  	//等待指定时间,如果在指定时间(纳秒)内被唤醒,会返回剩余时间,如果超时,会返回0或负数,可以响应中断
  	long awaitNanos(long nanosTimeout) throws InterruptedException;
  	//等待指定时间(可以指定时间单位),如果等待时间内被唤醒,返回true,否则返回false,可以响应中断
  	boolean await(long time, TimeUnit unit) throws InterruptedException;
  	//可以指定一个明确的时间点,如果在时间点之前被唤醒,返回true,否则返回false,可以响应中断
  	boolean awaitUntil(Date deadline) throws InterruptedException;
  	//唤醒一个处于等待状态的线程,注意还得获得锁才能接着运行
  	void signal();
  	//同上,但是是唤醒所有等待线程
  	void signalAll();
}

注意signal和Object的notify一样,要有持有锁才能调用,而且需要是同一个Condition对象,每次用Lock new的是不一样的。

Lock的实现类

ReentrantLock

可重入锁。同一个线程可以多次加锁和释放锁。完全释放后其他线程才能加锁。初始化的时候可以指定其是否为公平锁。即线程按照请求顺序获取锁。非公平锁则是每到一个新线程申请锁,则直接尝试,失败后再放入等待队列。

用例

    Lock testLock = new ReentrantLock();
    Condition condition = testLock.newCondition();
    new Thread(() -> {
        testLock.lock();   //和synchronized一样,必须持有锁的情况下才能使用await
        try {
            condition.await();   //进入等待状态
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        testLock.unlock();
    }).start();
    Thread.sleep(100); //防止线程2先跑
    new Thread(() -> {
        testLock.lock();
        condition.signal();   //唤醒线程1,但是此时线程1还必须要拿到锁才能继续运行
        testLock.unlock();   //这里释放锁之后,线程1就可以拿到锁继续运行了
    }).start();

**ReentrantReadWriteLock **

读写锁。实现ReadWriteLock没实现Lock

public interface ReadWriteLock {
    //获取读锁
    Lock readLock();
  	//获取写锁
    Lock writeLock();
}

规则:如果只加了读锁,别的线程还可以继续加读锁,不可加写锁,如果有一个加了写锁,其他线程什么锁都不能加,当然,他自己依然可以加读锁,并且可以加完之后把写锁释放,这叫锁降级

Lock底层原理:AQS队列同步器

ref AQS原理

ref 文章队列同步器AQS 部分

GPT :

Java中的同步器主要有两种实现:基于内置的synchronized关键字的隐式监视器锁(monitor lock)和基于显示的同步器(如AbstractQueuedSynchronizer,简称AQS)。

AQS内部不使用底层提供的monitor,而是自身维护队列,当线程调用lock()方法来申请锁,便会触发AQS的一系列判断,如果不符合条件,比如已经有线程申请了。那就将该线程阻塞(这个是要用底层API的),放入等待队列。另外,线程之间竞争锁的时候也要用底层CAS操作

我们可以通过AQS实现自己的锁,只需要实现不多的几个方法即可。

其他基于AQS实现的"锁"

CyclicBarrier && CountDownLatch

简单说前者用于同步(多个线程),后者用于等待(一个线程等其他线程)

前者可以在多个线程中调用其await方法,用于等待其他线程也都调用这个函数。

后者可以在多个线程中调用其countDown方法,然后在等待线程中调用await方法,当计数器为0,该线程继续执行。

Samphore

使用aquirerelease方法,原理类似信号量机制。

Fork/Join并发框架

方便大任务分成小任务并发执行。主要由以下几个类。

  • ForkJoinPool:实现ExecutorService是线程池。不会为每个任务分配一个线程,会分开放到队列中让不同线程执行,创建线程池可以指定LIFO还是FIFO。
  • ForkJoinTask:实现Future,核心方法:forkjoin,子类有RecursiveAction(有返回值)和RecusiveTask(无返回值)
  • ForkJoinWorkerThread:继承Thread,是线程池中的工作线程,拥有自己的工作队列
  • WorkQueue:保存当前线程的工作。当前线程完成工作会窃取其他线程的队列中的工作来执行

用例

  • 和普通线程池一样,这里也要提交任务,只不过任务是继承了他规定的ForkJoinTask的类
  • 在任务内部可以调用fork来递归产生子任务, join等待子任务返回
  • 注意,接收poolsubmit返回值用的是ForkJoinTask,对应Future,而任务内部接收的join返回值就是目标类型long
  • 窃取其他线程工作的时候是从队尾窃取,减少和原线程的冲突。
public class Main {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> rootTask = forkJoinPool.submit(new SumForkJoinTask(1L, 10_0000_0000L));
        System.out.println("计算结果:" + rootTask.get());
    }
}
class SumForkJoinTask extends RecursiveTask<Long> {
    private final Long min;
    private final Long max;
    private Long threshold = 1000L;

    public SumForkJoinTask(Long min, Long max) {
        this.min = min;
        this.max = max;
    }
    // 继承对应的task,重写compute方法
    @Override
    protected Long compute() {
        // 小于阈值时直接计算
        if ((max - min) <= threshold) {
            long sum = 0;
            for (long i = min; i < max; i++) {
                sum = sum + i;
            }
            return sum;
        }
        // 拆分成小任务
        long middle = (max + min) >>> 1;
        SumForkJoinTask leftTask = new SumForkJoinTask(min, middle);
        leftTask.fork();
        SumForkJoinTask rightTask = new SumForkJoinTask(middle, max);
        rightTask.fork();
        // 汇总结果
        return leftTask.join() + rightTask.join();
    }
}

简化代码,并行流还能提高计算效率。支持如下操作

  • 中间操作
    • filter
    • map
    • distinct
    • skip sort sorted(cmp)....
  • 终端操作
    • count
    • findFirst
    • forEach forEachOrdered(按顺序)
    • reduce(根据指定的计算模型计算Stream中的每一个值,参数1初始值,参数2计算模型) max min allMatch.....

中间操作用于将一个流转成另一个流,执行终端操作时才会真正计算。

分为并行和串行流。

创建流的几种方式

        Arrays.asList(1,2,3).stream();	//集合或者数组
        Stream.of(1, 2, 3);				// of方法
        Stream.iterate(0, n->n + 2);	//iterate方法
        Files.lines(Paths.get("D://a.txt"));//用文件创建

串行

        Arrays.asList(1, 2, 3).stream().filter(n -> n > 1).count();

并行

        Arrays.asList(1, 2, 3).stream().parallel().filter(n -> n > 1).count();

并行流适合CPU密集型计算,太简单的计算,和cpu上下文切换耗时比起来比较亏。而且并行流用的是全局线程池,可能会影响到别的线程执行。

可以用所谓的线程池隔离技术,其实就是另外用ForkJoinPool来提交任务并执行。例如

int sum = forkJoinPool.submit(()->Arrays.asList(1, 3, 4).parallelStream().reduce(0, Integer::sum)).get();

标签:Java,Thread,队列,并发,任务,线程,new,执行
From: https://www.cnblogs.com/BayMax0-0/p/17780964.html

相关文章

  • JAVA项目中的常用的异常处理情况
    在Java项目开发中,异常处理是非常重要的一部分。异常是指在程序运行过程中出现的错误或异常情况,如空指针异常、数组越界异常等。合理处理异常可以提高程序的健壮性和可靠性,保证程序的正常运行。首先在Java中,异常处理的基本原则是“捕获异常、处理异常、抛出异常”。在程序......
  • java异常总结
    JAVA项目中的异常处理在Java项目中,异常处理是非常重要的一部分,它可以帮助我们更好地管理和控制程序的运行流程,提高代码的可读性和可维护性。本文将介绍Java项目中常见的异常处理情况,包括异常的分类、处理方式以及最佳实践。一、Java异常的分类Java异常主要分为两大类:受检查异常......
  • java中使用Graphics绘制图形验证图片,为什么图中的文字没有呈现?
       项目中做了一个图形验证的功能。可选择图形中的文字,想出现的效果如上。图形上有文字。而在实际做的过程中,发到测试环境linux系统上去之后,是下面的情况: 只有图,没有文字!于是问了问度娘,说是字体的原因。项目中使用了linux中没有的字体会导致上图的问题。但我是使用了......
  • JAVA
    1.Java中的泛型是什么?使用泛型的好处是什么?这是在各种Java泛型面试中,一开场你就会被问到的问题中的一个,主要集中在初级和中级面试中。那些拥有Java1.4或更早版本的开发背景的人都知道,在集合中存储对象并在使用前进行类型转换是多么的不方便。泛型防止了那种情况的发生。它......
  • Java基础 字节输出流 写出数据的三种方式
    void write(int b)  →  一次写一个字节数据 void write(byte[] b)  →  一次写一个字节数组数据 void write(byte[] b, int off, int len)  →  一次写一个字节数组的部分数据参数一:装着所有数据的数组。参数二:起始索引。参数......
  • Java基础 字节输出流写出数据的细节
    1.创建字节输出流对象:FileOutputStreamfos=newFileOutputStream("E:\\Java基础资料\\a.txt");细节①:参数可以是字符串表示的路径,也可以是File对象细节②:如果文件不存在,会创建一个新的空文件,但是要保证父级路径是存在的细节③:如果文件已经存在,则会清空文件 2.写数据:f......
  • Java基础 IO流的体系 和 字节输出流基本语法
     FileOutputStream:操作本地文件的字节输出流,可以把程序中的数据写到本地文件中 步骤:1.创建字节输出流对象2.写数据3.释放资源 eg:publicstaticvoidmain(String[]args)throwsIOException{FileOutputStreamfos=newFileOutputStream("E:\\Java基础资料......
  • Java基础 IO 流
    IO流:存储和和读取数据的解决方案IO流用于读写文件中的数据(可以读写文件,或网络中的数据...)IO流可以把程序中的数据保存到本地文件当中,也叫写出数据(output),还可以把本地文件中的数据加载到程序当中,也叫读取数据(input) 问:IO流中,谁在读,谁在写?以谁为参照物看读写的方向呢?答:以......
  • 巧用枚举解决业务场景的 Switch 语句—Java 实践
    巧用枚举解决业务场景的Switch语句——Java实践Switch语句是一种常见的流程控制语句,用于根据不同的条件执行不同的代码块。然而,当业务场景变得复杂时,使用大量的Switch语句可能导致代码冗长、难以维护和扩展。本文将介绍如何巧妙地使用枚举类型来优化和简化Switch语句的使......
  • java 日志框架
    日志框架:什么是日志框架? a.是一套能实现日志输出的工具包. b.能够记录系统运行状态的所有时间的日志.场景: 用户下线、接口超时、数据库崩溃、HelloWorld日志框架的能力:定制输出目标: 并不只是输出到控制台,如文件、数据库、第三方日志服务.运行时选择性输出: ......