首页 > 编程语言 >Java 并发

Java 并发

时间:2023-07-10 11:33:32浏览次数:33  
标签:Java Thread int 并发 static new 线程 public

Java 并发

线程基础

进程线程概念

  • 进程是一个独立的运行环境,而线程是在进程中执行的一个任务。他们两个本质的区别是是否单独占有内存地址空间及其它系统资源(比如 I/O):
  • 进程单独占有一定的内存地址空间,所以进程间存在内存隔离,数据是分开的,数据共享复杂但是同步简单,各个进程之间互不干扰;而线程共享所属进程占有的内存地址空间和资源,数据共享简单,但是同步复杂。
  • 进程单独占有一定的内存地址空间,一个进程出现问题不会影响其他进程,不影响主程序的稳定性,可靠性高;一个线程崩溃可能影响整个程序的稳定性,可靠性较低。
  • 进程单独占有一定的内存地址空间,进程的创建和销毁不仅需要保存寄存器和栈信息,还需要资源的分配回收以及页调度,开销较大;线程只需要保存寄存器和栈信息,开销较小。
  • 另外一个重要区别是,进程是操作系统进行资源分配的基本单位,而线程是操作系统进行调度的基本单位,即 CPU 分配时间的单位 。

线程状态和转化方法

https://tva1.sinaimg.cn/large/008i3skNly1gsc0izeluhj30r20fojsv.jpg

线程状态转化图

  • NEW: 线程此时尚未启动,还未调用 Thread.start()
  • RUNNABLE:当前线程正在运行中
  • BLOCKED:处于 BLOCKED 状态的线程正等待锁的释放以进入同步区
  • WAITING:等待状态。处于等待状态的线程变成 RUNNABLE 状态需要其他线程唤醒。
  • TIMED_WAITING:超时等待状态。线程等待一个具体的时间,时间到后会被自动唤醒。
  • TERMINATED:终止状态。此时线程已执行完毕。

锁和同步

synchronized 关键字来给一段代码或一个方法上锁。它同一时刻只能由一个线程执行。如果 synchronized 关键字在方法上,那临界区就是整个方法内部。而如果是使用 synchronized 代码块,那临界区就指的是代码块内部的区域。

同步

可以通过检测进入某个状态的条件是否满足,从而决定是继续执行还是等待别的线程把前置条件准备充足。

  • wait() :进入等待状态。
  • notify() :随机叫醒一个正在等待的线程
  • notifyAll() :叫醒所有正在等待的线程

Java 多线程

Thread 类

publicclass Thread implements Runnable {}

构造方法如下图:

https://image.baidu.com/search/down?url=https://tva1.sinaimg.cn/large/008i3skNly1gsc6ya3onfj30ka0aaahd.jpg

构造方法

  • g:线程组,指定这个线程是在哪个线程组下;
  • target:指定要执行的任务;
  • name:线程的名字,多个线程的名字是可以重复的;
  • acc:用于初始化私有变量    inheritedAccessControlContext。
  • inheritThreadLocals:可继承的    ThreadLocal

Thread 类的几个常用的方法:

  • currentThread() :静态方法,返回对当前正在执行的线程对象的引用;
  • start() :开始执行线程的方法,java 虚拟机会调用线程内的 run() 方法;
  • yield() :yield 在英语里有放弃的意思,同样,这里的 yield() 指的是当前线程愿意让出对当前处理器的占用。这里需要注意的是,就算当前线程调用了 yield() 方法,程序在调度的时候,也还有可能继续运行这个线程的
  • sleep() :静态方法,使当前线程睡眠一段时间;
  • join() :使当前线程等待另一个线程执行完毕之后再继续执行,内部调用的是 Object 类的 wait 方法实现的

Thread 类:Thread 是 Runnable 的实现类,可以通过继承 Thread 类实现线程

Runnable 接口

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
  • 通过实现Runnable 接口的 run 方法,可以实现线程。

Thread 类与 Runnable 接口的比较

  • 实现一个自定义的线程类,可以有继承 Thread 类或者实现 Runnable 接口这两种方式,两者之间的优缺点:
  • 由于 Java“单继承,多实现”的特性,Runnable 接口使用起来比 Thread 更灵活。
  • Runnable 接口出现更符合面向对象,将线程单独进行对象的封装。
  • Runnable 接口出现,降低了线程对象和线程任务的耦合性。
  • 如果使用线程时不需要使用 Thread 类的诸多方法,显然使用 Runnable 接口更为轻量。

Guava 异步

  • ListenableFuture 实现线程回调功能

案例

AB 线程顺序执行各自任务

/**
 * <p>
 * 测试 AB 线程优先执行完某一个线程再执行另一个线程
 * </p>
 *
 * @author zhangjiahao
 * @date 2021/7/10 21:59
 */
public class ThreadTestOne {

    private static final Object lock = new Object();

    static class ThreadTestOneA implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    System.out.println("ThreadTestOneA:\t" + i);
                }
            }
        }
    }
    static class ThreadTestOneB implements Runnable {
        @Override
        public void run() {
            synchronized (lock) {
                for (int i = 0; i < 5; i++) {
                    System.out.println("ThreadTestOneB:\t" + i);
                }
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new ThreadTestOneA()).start();
        new Thread(new ThreadTestOneB()).start();
    }
}

运行结果如下图:

https://tva1.sinaimg.cn/large/008i3skNly1gsc7effunkj30bu0bqmxs.jpg

ThreadTestOne

AB 交替执行各自任务

/**
 * @author zhangjiahao
 * @date
 */
public class ThreadTestTwo {

    private static final Object LOCK = new Object();

    static class ThreadTestTwoA implements Runnable {

        @Override
        public void run() {
            synchronized (LOCK) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("ThreadTestTwoA:\t" + i);
                        LOCK.notifyAll();
                        LOCK.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                LOCK.notifyAll();
            }
        }
    }

    static class ThreadTestTwoB implements Runnable {
        @Override
        public void run() {
            synchronized (LOCK) {
                for (int i = 0; i < 5; i++) {
                    try {
                        System.out.println("ThreadTestTwoB:\t" + i);
                        LOCK.notifyAll();
                        LOCK.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new ThreadTestTwoA()).start();
        new Thread(new ThreadTestTwoB()).start();
    }
}

注意:一定要先去叫醒别人再去等待,否则先进入等待就无法叫醒别人。

执行结果如下图:

https://tva1.sinaimg.cn/large/008i3skNly1gsc7i8zchqj30cc0biq3o.jpg

ThreadTestTwo

AB 交替执行同一个任务

/**
 * <p>
 * </p>
 *
 * @author zhangjiahao
 * @date 2021/7/7 14:43
 */
public class ThreadTestThree {

    private static AtomicInteger signal = new AtomicInteger(0);

    static class ThreadTestThreeA implements Runnable {
        @Override
        public void run() {
            while (signal.get() < 5) {
                if (signal.get() % 2 == 0) {
                    System.out.println("ThreadTestThreeA:\t" + signal);
                    signal.incrementAndGet();
                }
            }
        }
    }

    static class ThreadTestThreeB implements Runnable {
        @Override
        public void run() {
            while (signal.get() < 5) {
                if (signal.get() % 2 == 1) {
                    System.out.println("ThreadTestThreeB:\t" + signal);
                    signal.incrementAndGet();
                }
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new ThreadTestThreeA()).start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(new ThreadTestThreeB()).start();
    }
}

注意:信号量的增加要做原子性,不能让其可以多个线程同时去操作。

执行结果如下图:

https://tva1.sinaimg.cn/large/008i3skNly1gsc7n484fqj30bc05qq2z.jpg

ThreadTestThree

线程进阶

Java 线程池

线程的创建和销毁像数据库连接的创建和销毁一样,需要消耗系统资源,所以像数据库连接池一样采用线程池可以降低系统资源的消耗,主要的有点有以下几点:

  • 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程。
  • 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
  • 可以对线程做统一管理。

ThreadPoolExecutor

// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • int corePoolSize:该线程池中核心线程数最大值核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。
  • int maximumPoolSize:该线程池中线程总数最大值 。 > 该值等于核心线程数量 + 非核心线程数量。
  • long keepAliveTime:非核心线程闲置超时时长。 > 非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置 allowCoreThreadTimeOut(true),则会也作用于核心线程。
  • TimeUnit unit:keepAliveTime 的单位。 > NANOSECONDS : 1 微毫秒 = 1 微秒 / 1000 MICROSECONDS : 1 微秒 = 1 毫秒 / 1000 MILLISECONDS : 1 毫秒 = 1 秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小时 DAYS : 天
  • BlockingQueue workQueue:阻塞队列,维护着等待执行的 Runnable 任务对象。常用的几个阻塞队列: - LinkedBlockingQueue: 链式阻塞队列,底层数据结构是链表,默认大小是 Integer.MAX_VALUE,也可以指定大小。
    • ArrayBlockingQueue: 数组阻塞队列,底层数据结构是数组,需要指定队列的大小。
    • SynchronousQueue: 同步队列,内部容量为 0,每个 put 操作必须等待一个 take 操作,反之亦然。
    • DelayQueue: 延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。
  • ThreadFactory threadFactory: 创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。
static class DefaultThreadFactory implements ThreadFactory {
    // 省略属性
    // 构造函数
    DefaultThreadFactory() {
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
        Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
            poolNumber.getAndIncrement() +
            "-thread-";
    }

    // 省略
}
  • RejectedExecutionHandler handler: 拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :
    • ThreadPoolExecutor.AbortPolicy:默认拒绝处理策略,丢弃任务并抛 RejectedExecutionException 异常。
    • ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
    • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。
/**
 * <p>
 * 线程池测试
 * </p>
 *
 * @author zhangjiahao
 * @date 2021/7/10 22:35
 */
public class ThreadPoolTest {
    private static ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(5);
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 5, 0, TimeUnit.NANOSECONDS,
        arrayBlockingQueue, new MyThreadFactory(), new MyRejectedExecutionHandler());

    static class MyThreadFactory implements ThreadFactory {
        private static AtomicInteger atomicInteger = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "MyThreadFactory-" + atomicInteger.incrementAndGet());
        }
    }

    static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
            if (runnable instanceof ThreadPoolTestThread) {
                System.out.println(
                    " this poll is full , this thread message is " + ((ThreadPoolTestThread)runnable).getMessage());
            }
        }
    }

    @Data
    static class ThreadPoolTestThread implements Runnable {
        private String message;

        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                System.out.println("ThreadPoolTestThread task is completed, this is message \t" + message
                    + " thread name is " + Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public ThreadPoolTestThread(String message) {
            this.message = message;
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            threadPoolExecutor.execute(new ThreadPoolTestThread("index-" + i));
        }
    }
}

执行结果如下图:

https://tva1.sinaimg.cn/large/008i3skNly1gsc93o0zxjj31400n4n4b.jpg

ThreadPoolTest

newCachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

newScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

并发集合

  • 并发 Map:ConcurrentHashMap、ConcurrentNavigableMap、ConcurrentSkipListMap
  • 并发 Queue:ConcurrentLinkedQueue、ConcurrentLinkedDeque
  • 并发 Set:ConcurrentSkipListSet、Guava 的 ConcurrentHashSet

通信工具类

类名 作用
Semaphore 限制线程的数量
Exchanger 两个线程交换数据
Phaser 增强的 CyclicBarrier
CyclicBarrier 作用跟 CountDownLatch 类似,但是可以重复使用
CountDownLatch 线程等待直到计数器减为 0 时开始工作

Semaphore

Semaphore 往往用于资源有限的场景中,去限制线程的数量。

public class SemaphoreDemo {

    static class SemaphoreDemoThread implements Runnable {
        private Semaphore semaphore;
        private int value;

        public SemaphoreDemoThread(Semaphore semaphore, int value) {
            this.semaphore = semaphore;
            this.value = value;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待", value, semaphore.availablePermits(),
                    semaphore.getQueueLength()));
                // 睡眠随机时间,打乱释放顺序
                Random random = new Random();
                Thread.sleep(random.nextInt(1000));
                System.out.println(String.format("线程%d释放了资源", value));
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                semaphore.release();
            }
        }
    }

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            new Thread(new SemaphoreDemoThread(semaphore, i)).start();
        }
    }

}

执行结果如下:

https://tva1.sinaimg.cn/large/008i3skNly1gscwh4oc2hj30hi0mg41v.jpg

SemaphoreDemo

Exchanger

Exchanger 类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。

public class ExchangerDemo {
    public static void main(String[] args) throws InterruptedException {
        Exchanger<String> exchanger = new Exchanger<>();

        new Thread(() -> {
            try {
                System.out.println("这是线程A,得到了另一个线程的数据:"
                        + exchanger.exchange("这是来自线程A的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
        Thread.sleep(1000);

        new Thread(() -> {
            try {
                System.out.println("这是线程B,得到了另一个线程的数据:"
                        + exchanger.exchange("这是来自线程B的数据"));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

CountDownLatch

先来解读一下 CountDownLatch 这个类名字的意义。CountDown 代表计数递减,Latch 是“门闩”的意思。也有人把它称为“屏障”。而 CountDownLatch 这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。

CyclicBarrier

CyclicBarrirer 从名字上来理解是“循环的屏障”的意思。前面提到了 CountDownLatch 一旦计数值 count 被降为 0 后,就不能再重新设置了,它只能起一次“屏障”的作用。而 CyclicBarrier 拥有 CountDownLatch 的所有功能,还可以使用 reset() 方法重置屏障。

Phaser

Fork/Join 框架

public class ForkJoinTest {
    static class ForkJoinTestBean extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 2;
        private int n;

        public ForkJoinTestBean(int n) {
            this.n = n;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            if (n<=1) {
                return n;
            } else {
                ForkJoinTestBean left = new ForkJoinTestBean(n-1);
                left.fork();
                ForkJoinTestBean right = new ForkJoinTestBean(n-2);
                right.fork();
                Integer leftResult = left.join();
                Integer rightResult = right.join();
                sum = leftResult + rightResult;
            }
            return sum;
        }
    }

    public static void main(String[] args) {
        long l = System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTestBean forkJoinTestBean = new ForkJoinTestBean(40);
        ForkJoinTask<Integer> joinTask = forkJoinPool.submit(forkJoinTestBean);
        try {
            System.out.println("time is "+(System.currentTimeMillis()-l)+"\tresult"+joinTask.get());

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            forkJoinPool.shutdown();
        }
    }
}

结果如下图:

https://tva1.sinaimg.cn/large/008i3skNly1gscsl4wak5j30bg04q3zb.jpg

ForkJoinTest

标签:Java,Thread,int,并发,static,new,线程,public
From: https://www.cnblogs.com/jiuxialb/p/17540463.html

相关文章

  • JavaCV实现旋转图像识别和旋转角度预测
    引言本文将介绍如何使用JavaCV库来实现图像识别和旋转角度预测,并结合直方图统计和dhash算法来比较图片的相似度。JavaCV是一个基于OpenCV的Java库,提供了丰富的图像处理和计算机视觉功能。环境搭建在开始之前,需要安装JavaCV库和相关依赖。可以通过Maven或手动下载jar包的方式进......
  • 如何实现java Docker Engine API的具体操作步骤
    使用Java实现DockerEngineAPI引言Docker是一款非常流行的容器化平台,它可以让开发者更方便地构建、交付和运行应用程序。Docker提供了一系列的API,用于管理和操作Docker引擎,通过这些API可以实现容器的创建、启动、停止等操作。本文将向你介绍如何使用Java来实现DockerEngineAPI......
  • java项目 报错 maven jdk.tools 缺失 解决方法
    一、解决方法配置文件pom.xml<dependency><groupId>jdk.tools</groupId><artifactId>jdk.tools</artifactId><version>1.7</version><scope>system</scope><systemPath>${......
  • java实现上传zip解压及判断压缩包文件夹功能
    直接上Service,通过代码看思路贯穿整个功能,很多工具类可以复用,文件路径可以去看我博客里的(使用ResourceBundle国际化资源文件读取properties详解) 这篇制作方法url:html页面<span>ZIP:</span><inputtype="file"style="width:170px"name="hostFileBatch"/><spanid="host......
  • Java获取名字首字母拼音及用户按名字拼音分组工具
    一、需求分析最近在做一个类似于微信用户通讯录的功能,所以考虑通过查找的好友列表,在后台遍历按照26个字母分组,前台获取到Json循环26个字母直接解析对应的字符下的名称为一组分隔,没有则不显示,工具如下↓二、引入Pom<dependency> <groupId>com.belerweb</groupId> <artif......
  • Java大厂面试必考真题算法篇(持续更新)十一、java 统计字符串中每个字符出现的次数
    一、写出一个程序,接受一个字符串,然后输出该字符串反转后的字符串。答案importjava.util.*;publicclassSolution{/***反转字符串*@paramstrstring字符串*@returnstring字符串*/publicStringsolve(Stringstr){if(str......
  • java使用百度翻译接口实现前后端翻译功能
    java 百度翻译工具类 分别有前端和后端的 例子及工具使用百度翻译接口需要网上申请key,代码里面有URL。packagecn.secure.util;importjava.io.BufferedReader;importjava.io.Closeable;importjava.io.IOException;importjava.io.InputStream;importjava.io.InputStrea......
  • 新版Springboot3.0打造能落地的高并发仿12306售票系统
    第1章课程介绍与学习指南3节|22分钟本章主要对课程做整体介绍,其中包括:课程要解决的问题、课程特色和亮点、课程内容安排、学完大家的收获,以及在学习方法上提出的建议与指导。 第2章12306这个系统架构到底有多牛?8节|71分钟本章主要对课程为什么选择12306课程作为实战......
  • java 线程等待和唤醒方法
    java线程状态变迁图从图中可以看出Java线程等待方法是将线程从Runnable状态转换为Waiting状态,Java线程的唤醒方法是将线程从Waiting状态唤醒进入Runnable状态在Java中线程的等待和唤醒主要是分为3组:Object.wait()和Object.notify()LockSupport.park()和LockSupport.unp......
  • Java自签名证书的信任处理
    一、概要1.问题的由来该问题是由于Java访问的域名是https且使用了自签名证书,Java客户端无法验证证书的合法性,进而报出异常。该问题有以下解决思路:a.在Java客户端忽略证书的校验,这种方式适用于自己掌握Code的情况,且客户端服务端处于互相信任的环境中;b.将服务端的自签名证......