概述、生产者消费者模型、锁对象、集合的线程安全问题、Callable的使用、计数器、队列、线程池、ForkJoin、异步回调、单例模式、CAS、锁
概述
1
多线程下变量访问存在问题
变量访问不可见性
2
JMM特点
所有共享变量存于主内存中
每个线程有自己的工作内存
线程对变量的操作都必须在工作内存中完成
不同线程之间不可以直接访问对方的工作内存
3
不可见性的解决方案
用synchronized加锁
对变量使用volatile关键字修饰
4
volatile特点
1)保持可见性,可作为刷新之前变量的出发前
2)但不保证原子性
3)会生成内存屏障,由于内存屏障,可避免指令重排现象
4)volatile可是long和double的赋值是原子的
5
保证可见性和原子性
1)变量使用volatile修饰,对操作使用synchronized加锁
2)对目标变量使用原子类
6
volatile与synchronized区别
1)volatile只能修饰实例变量和类变量,而synchronized可修饰方法和代码块
2)两者机制不同,volatile保证数据可见性,不保证原子性;synchronized是一种排他机制
3)volatile能禁止指令重排
4)volatile可看作轻量级的synchronized,如果是对一个共享变量进行多线程赋值,可使用volatile代替synchronized,以保证线程安全
生产者消费者模型
1)synchronized方式
public class Data {
public int product=0;
public synchronized void get(){
if (product!=0){
try {
this.wait ();
} catch (InterruptedException e) {
e.printStackTrace ( );
}
}
System.out.println (Thread.currentThread().getName()+"=>"+product);
product++;
this.notifyAll ();
}
public synchronized void sale(){
if (product==0){
try {
this.wait ();
} catch (InterruptedException e) {
e.printStackTrace ( );
}
}
System.out.println (Thread.currentThread().getName()+"=>"+product);
product--;
this.notifyAll ();//唤醒所有线程
}
}
同一时刻只能由一种操作,要么存放,要么消耗,存放完成后唤醒消耗线程,消耗完成后唤醒存放线程
public class test {
@Test
public void test1() {
Data data=new Data ();
//两个线程生成,两个线程消费
new Thread (()->{
for (int i = 0; i <5 ; i++) {
data.get ();
}
}).start ();
new Thread (()->{
for (int i = 0; i <5 ; i++) {
data.sale ();
}
}).start ();
new Thread (()->{
for (int i = 0; i <5 ; i++) {
data.get ();
}
}).start ();
new Thread (()->{
for (int i = 0; i <5 ; i++) {
data.sale ();
}
}).start ();
}
}
执行结果
Thread-0=>0
Thread-1=>1
Thread-0=>0
Thread-1=>1
Thread-0=>0
Thread-1=>1
Thread-0=>0
Thread-1=>1
Thread-2=>0
Thread-0=>1
Thread-2=>2 #会出现这样的结果,是虚假唤醒造成的
Thread-1=>3
Thread-2=>2
Thread-3=>3
Thread-3=>2
Thread-3=>1
Thread-2=>0
Thread-3=>1
Thread-2=>0
Thread-3=>1
什么是虚假唤醒?
在notify或者notifyAll时不知道具体是唤醒的哪个线程,notify唤醒随机一个,notifyAll会唤醒所有对应的wait的线程,但是并非所有都是需要唤醒的,这个就是所谓的虚假唤醒
解决办法(二选一):
(1)将notifyAll换成notify
(2)将if判断换成while
2)ReentrantLock方式
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Data {
public int product=0;
Lock lock=new ReentrantLock ();
Condition condition1=lock.newCondition ();//监视器
Condition condition2=lock.newCondition ();
public void get(){
lock.lock ();
try {
while (product!=0){
condition1.await ();//线程休眠
}
System.out.println (Thread.currentThread().getName()+"=>"+product);
product++;
condition2.signal ();//唤醒监视器2休眠的线程
} catch (InterruptedException e) {
e.printStackTrace ( );
}finally {
lock.unlock ();
}
}
public void sale(){
lock.lock ();
try {
while (product==0){
condition2.await ();
}
System.out.println (Thread.currentThread().getName()+"=>"+product);
product--;
condition1.signal ();//唤醒监视器1休眠的线程
} catch (InterruptedException e) {
e.printStackTrace ( );
}finally {
lock.unlock ();
}
}
}
import org.junit.Test;
public class test {
@Test
public void test1() {
Data data=new Data ();
new Thread (()->{
for (int i = 0; i <5 ; i++) {
data.get ();//虽然get方法里有while循环,但是data.get ()执行一次,锁之间的代码只会执行一次
}
}).start ();
new Thread (()->{
for (int i = 0; i <5 ; i++) {
data.sale ();
}
}).start ();
new Thread (()->{
for (int i = 0; i <5 ; i++) {
data.get ();
}
}).start ();
new Thread (()->{
for (int i = 0; i <5 ; i++) {
data.sale ();
}
}).start ();
}
}
执行结果
Thread-0=>0
Thread-1=>1
Thread-0=>0
Thread-1=>1
Thread-0=>0
Thread-1=>1
Thread-0=>0
Thread-1=>1
Thread-0=>0
Thread-1=>1
Thread-2=>0
Thread-3=>1
Thread-2=>0
Thread-3=>1
Thread-2=>0
Thread-3=>1
Thread-2=>0
Thread-3=>1
Thread-2=>0
Thread-3=>1
不存在虚假唤醒
锁对象
1.一个类里定义一个成员synchronized修饰的方法,锁对象是该类对应的实例对象
2.一个类里定义一个静态synchronized修饰的方法,锁对象是该类
集合的线程安全问题
常用的ArrayList,HashSet有线程安全问题,在多线程下会报错ConcurrentModificationException
可通过以下三种方式解决
//将其换成Vector
List<String> list=new Vector<> ();
//使用Collections.synchronizedList包裹
List<String> list1= Collections.synchronizedList (new ArrayList<> ());
//换成CopyOnWriteArrayList
List<String> list2=new CopyOnWriteArrayList<> ();
对于set集合可用opyOnWriteArraySet替换,HashMap可用ConcurrentHashMap替换
Callable的使用
Callable和Runnable类似,但有返回值,并可抛出错误
不能直接使用Callable的实现类开启线程,因为Thread接收参数对象只能是Runnable的实现类,所以需要借助一个同时实现Callable和Runnable的适配器类
import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils;
import org.junit.Test;
import java.util.*;
import java.util.concurrent.*;
@Test
public void test2() throws ExecutionException, InterruptedException {
//要开启线程需要实例化一个Thread类,并传入一个实现Runable接口的类,要使用Callable需要借助一个实现类(实现了Runable接口)来进行适配
FutureTask futureTask=new FutureTask (new myThread ());
new Thread (futureTask).start ();
Integer num=(Integer) futureTask.get ();//get方法可能会阻塞,应放到最后或使用异步通信
System.out.println (num);
}
class myThread implements Callable<Integer>{//定义一个Callable的实现类
@Override
public Integer call() throws Exception {
return 100;
}
}
}
计数器
1
减数器
//部分代码
@Test
public void test3() throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch (6);//定义计数器,从6开始倒计
for (int i = 0; i <6 ; i++) {
new Thread (()->{
System.out.println (Thread.currentThread ().getName ());
countDownLatch.countDown ();//计数器减一
}).start ();
}
countDownLatch.await ();//等待计数器归0
System.out.println ("All Done");
}
2
加数器
//部分代码
@Test
public void test4(){
CyclicBarrier cyclicBarrier=new CyclicBarrier (7,()->{//计数器到7时,会执行方法
System.out.println ("集齐七颗龙珠");
});
for (int i = 0; i <7 ; i++) {
final int temp=i;
new Thread (()->{
System.out.println (temp);
try {
cyclicBarrier.await ();//计数器+1,并等待
} catch (InterruptedException e) {
e.printStackTrace ( );
} catch (BrokenBarrierException e) {
e.printStackTrace ( );
}
}).start ();
}
}
3
信号量
@Test
public void test5(){
//控制最大线程数
Semaphore semaphore=new Semaphore (3);
for (int i = 0; i <6 ; i++) {
new Thread (()->{
try {
semaphore.acquire ();
System.out.println (Thread.currentThread ().getName ()+"进入停车位");
System.out.println (Thread.currentThread ().getName ()+"离开停车位");
} catch (InterruptedException e) {
e.printStackTrace ( );
} finally {
semaphore.release ();
}
}).start ();
}
}
读写锁
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class myCache {
Map<String,String> map=new HashMap<> ();
ReentrantReadWriteLock.ReadLock readLock=new ReentrantReadWriteLock ().readLock ();//读锁
ReentrantReadWriteLock.WriteLock writeLock=new ReentrantReadWriteLock().writeLock ();//写锁
public void put(String key,String value){
try {
writeLock.lock ();
System.out.println (Thread.currentThread ().getName ()+"开始存入");
map.put (key,value);
System.out.println (Thread.currentThread ().getName ()+"存入成功");
} catch (Exception e) {
e.printStackTrace ( );
} finally {
writeLock.unlock ();
}
}
public String get(String key){
String value=null;
try {
readLock.lock ();
System.out.println (Thread.currentThread ().getName ()+"开始取值");
value= map.get (key);
System.out.println (Thread.currentThread ().getName ()+"取值成功");
} catch (Exception e) {
e.printStackTrace ( );
} finally {
readLock.unlock ();
}
return value;
}
}
//部分代码
@Test
public void test6(){
myCache myCache=new myCache ();
for (int i = 0; i <5 ; i++) {//放数据
final int temp=i;
new Thread (()->{
myCache.put (temp+"",temp+"");
}).start ();
}
for (int i = 0; i <5 ; i++) {//取数据
final int temp=i;
new Thread (()->{
myCache.get (temp+"");
}).start ();
}
}
队列
1
阻塞队列
1)四组API
方式 | 抛出异常 | 有返回值,不抛出异常 | 阻塞等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer | put | offer(,,) |
删除 | remove | poll | take | pull(,) |
检测首元素 | element | peek | - | - |
2)代码
//部分代码
@Test
public void test7() throws InterruptedException {
ArrayBlockingQueue blockingDeque=new ArrayBlockingQueue<> (3);
blockingDeque.add ("a");
blockingDeque.element ();
blockingDeque.remove ();
blockingDeque.offer ("a");
blockingDeque.peek ();
blockingDeque.poll ();
blockingDeque.put ("a");
blockingDeque.take ();
blockingDeque.take ();//会阻塞
}
2
同步队列
只能存一个元素的队列
//部分代码
@Test
public void test8(){
SynchronousQueue synchronousQueue=new SynchronousQueue ();
new Thread (()->{
for (int i = 0; i <3 ; i++) {
try {
synchronousQueue.put (i);
} catch (InterruptedException e) {
e.printStackTrace ( );
}
}
}).start ();
new Thread (()->{
try {
System.out.println (synchronousQueue.take ( ));
System.out.println (synchronousQueue.take ( ));
System.out.println (synchronousQueue.take ( ));
} catch (InterruptedException e) {
e.printStackTrace ( );
}
}).start ();
}
线程池
1
Executors创建线程池的3中方式
//部分代码
//创建线程池3大方法
ExecutorService pool1=Executors.newSingleThreadExecutor ();//创建只有一个线程的线程池
ExecutorService pool2=Executors.newFixedThreadPool (3);//创建有3个线程的线程池
ExecutorService pool3=Executors.newCachedThreadPool ();//自动配置线程池大小
提交线程
//部分代码
@Test
public void test9(){
//创建线程池3大方法
ExecutorService pool=Executors.newSingleThreadExecutor ();//创建只有一个线程的线程池
try {
pool.submit (()->{
System.out.println ("go");
});
} catch (Exception e) {
e.printStackTrace ( );
} finally {
pool.shutdown ();//使用完成后关闭
}
}
}
2
另一种创建线程池的方式
//部分代码
@Test
public void test9(){
ExecutorService pool=new ThreadPoolExecutor (
2,//corePoolSize
5,//maximumPoolSize
3,//keepAliveTime
TimeUnit.SECONDS,
new ArrayBlockingQueue<> (3),//等待队列
Executors.defaultThreadFactory (),
new ThreadPoolExecutor.DiscardPolicy ()//拒绝策略
);
}
3
拒绝策略
线程池最大容量是maximumPoolSize+队列长度,如果提交的线程数量超过线程池的最大容量,会进行拒绝,有4个拒绝策略
//不处理,直接抛出异常
new ThreadPoolExecutor.AbortPolicy ();
//哪里来返回哪里
new ThreadPoolExecutor.CallerRunsPolicy ();
//丢掉任务,不抛出异常
new ThreadPoolExecutor.DiscardOldestPolicy ();
//尝试和最早的线程竞争,不抛出错误
new ThreadPoolExecutor.DiscardPolicy ();
ForkJoin
数据处理量非常大的时候,可使用
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
public class ForkJoinDemo extends RecursiveTask<Long> {//定义一个处理类
private long start;
private long end;
private long temp=1000L;
public ForkJoinDemo(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {//归并思想
if((end-start)<temp){
long sum=0L;
for (long i = start; i <end ; i++) {
sum+=i;
}
return sum;
}else {
long mid=(start+end)/2;
ForkJoinDemo task1=new ForkJoinDemo (start,mid);
task1.fork ();
ForkJoinDemo task2=new ForkJoinDemo (mid+1,end);
task2.fork ();
return task1.join ()+task2.join ();
}
}
}
//部分代码
@Test
public void test10() throws ExecutionException, InterruptedException {
ForkJoinPool pool=new ForkJoinPool ();//创建线程池
ForkJoinTask<Long> task=new ForkJoinDemo (0,1000000000L);//创建任务
ForkJoinTask<Long> result=pool.submit (task);//提交任务
System.out.println (result.get ( ));//获取结果
}
@Test
public void test11(){//不使用forkjoin的方式
//并行流方式
long sum= LongStream.rangeClosed (0,1000000000L).parallel ().reduce(0,Long::sum);
System.out.println (sum);
}
异步回调
//部分代码
@Test
public void test12() throws ExecutionException, InterruptedException {
//没有返回值的异步回调
CompletableFuture<Void> completableFuture=CompletableFuture.runAsync (()->{
try {
TimeUnit.SECONDS.sleep (1);
} catch (InterruptedException e) {
e.printStackTrace ( );
}
System.out.println ("没有返回值");
});
System.out.println ("这句话先完成执行1");
//有返回值的异步回调
CompletableFuture<Integer> completableFuture1=CompletableFuture.supplyAsync (()->{
try {
TimeUnit.SECONDS.sleep (1);
} catch (InterruptedException e) {
e.printStackTrace ( );
}
return 1000;
});
completableFuture1.whenComplete ((t,u)->{//成功时执行
System.out.println (t);//t是返回值
System.out.println (u);
}).exceptionally ((e)->{//失败时执行
e.printStackTrace ();
return 500;//必须返回值
});
System.out.println ("这句话先完成执行2");
System.out.println (completableFuture1.get ( ));//这个是同步获取,执行到这里会阻塞
System.out.println ("这句话在获取到返回值后完成执行");
}
这句话先完成执行1
这句话先完成执行2
没有返回值
1000
null
1000
这句话在获取到返回值后完成执行
单例模式
1
构建单例的两种模式
1)饿汉模式
public class Hungry {
public static Hungry hungry=new Hungry ();
public static Hungry getIncetance(){
return hungry;
}
}
//没有用到Hungry对象的时候对象就创建出来了,会占用一定空间
2)懒汉模式
public class Lazy {
public static Lazy lazy=null;
public static Lazy getIncetance(){
if(lazy==null){
lazy=new Lazy ();
}
return lazy;
}
}
//多线程下会创建多个对象
2
双重检查的单例
public class Lazy {
public volatile static Lazy lazy=null;
public static Lazy getIncetance(){
if(lazy==null){
synchronized (Lazy.class){
if(lazy==null){
lazy=new Lazy ();
}
}
}
return lazy;
}
}
通过volatile保证可见性,通过synchronized保证线程安全
3
匿名内部类方式
public class Singleton1 {
private static class singletonIncetance{
private static final Singleton1 incetance=new Singleton1 ();
}
public static Singleton1 getInstance(){
return singletonIncetance.incetance;
}
}
4
枚举的单例
public enum enumSingle {
INCETANCE;
public enumSingle getIncetance(){
return INCETANCE;
}
}
CAS
比较并交换
//部分代码
@Test
public void test13(){
AtomicInteger atomicInteger=new AtomicInteger (100);
atomicInteger.compareAndSet (100,200);//期望值是100,更新值是200,如果当前值和期望值相同才会执行更新操作
System.out.println (atomicInteger.get ( ));
}
}
//存在ABA问题
解决ABA问题
//部分代码
@Test
public void test13(){
AtomicStampedReference<Integer> atomicStampedReference=new AtomicStampedReference<> (10,1);//新增一个时间戳,防止ABA问题
atomicStampedReference.compareAndSet (10,20,atomicStampedReference.getStamp (),atomicStampedReference.getStamp ()+1);
//expectedReference,newReference,expectedStamp,newStamp
System.out.println (atomicStampedReference.getReference ( ));
}
锁
1
按公平划分
公平锁和非公平锁,大部分是非公平锁
2
可重入锁
1)同一把锁
public class Phone {
public synchronized void sms(){
System.out.println ("发送信息"+Thread.currentThread ().getName ());
call ();
}
public synchronized void call(){
System.out.println ("打电话"+Thread.currentThread ().getName ());
}
}
//部分代码
@Test
public void test14(){
Phone phone=new Phone ();
new Thread (()->{
phone.sms ();
}).start ();
new Thread (()->{
phone.sms ();
}).start ();
}
发送信息Thread-0
打电话Thread-0
发送信息Thread-1
打电话Thread-1
2)不同的锁
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Phone {
Lock lock=new ReentrantLock ();
public void sms(){
lock.lock ();//第一把锁
try {
System.out.println ("发送信息"+Thread.currentThread ().getName ());
call ();
} catch (Exception e) {
e.printStackTrace ( );
} finally {
lock.unlock ();
}
}
public void call(){
lock.lock ();//第二把锁
try {
System.out.println ("打电话"+Thread.currentThread ().getName ());
} catch (Exception e) {
e.printStackTrace ( );
} finally {
lock.unlock ();
}
}
}
//部分代码
@Test
public void test14(){
Phone phone=new Phone ();
new Thread (()->{
phone.sms ();
}).start ();
new Thread (()->{
phone.sms ();
}).start ();
}
发送信息Thread-0
打电话Thread-0
发送信息Thread-1
打电话Thread-1
锁必须配对,否则程序会卡住
3)自旋锁
import java.util.concurrent.atomic.AtomicReference;
public class SpinLock {//构建锁对象
AtomicReference<Thread> atomicReference=new AtomicReference<> ();
public void lock(){
Thread thread=Thread.currentThread ();
while (!atomicReference.compareAndSet (null,thread)){}//未释放锁将进入循环,直至释放锁
}
public void unlock(){
Thread thread=Thread.currentThread ();
atomicReference.compareAndSet (thread,null);
}
}
//部分代码
@Test
public void test16() throws InterruptedException {
SpinLock spinLock=new SpinLock ();
new Thread (()->{
spinLock.lock ();
try {
TimeUnit.SECONDS.sleep (1);
System.out.println ("第一个线程执行完成");
} catch (InterruptedException e) {
e.printStackTrace ( );
} finally {
spinLock.unlock ();
}
}).start ();
new Thread (()->{
spinLock.lock ();
try {
System.out.println ("第二个线程执行完成");
} catch (Exception e) {
e.printStackTrace ( );
} finally {
spinLock.unlock ();
}
}).start ();
System.out.println ("最先执行完成");
Thread.sleep (2000);
}
最先执行完成
第一个线程执行完成
第二个线程执行完成
多线程概念
1)notify和notifyAll区别
调用notify时,只有一个等待线程会被唤醒而且它不能保证哪个线程会被唤醒,这取决于线程调度器
调用notifyAll方法,等待该锁的所有线程都会被唤醒
2)newFixedThreadPool 与 newSingleThreadExecutor 区别
newSingleThreadExecutor采用FIFO,保证线程执行顺序,先提交的任务先执行,而newFixedThreadPool不保证
在newSingleThreadExecutor方法中,当线程执行出现异常时,它会重新创建一个线程替换之前的线程继续执行,而newFixedThreadPool不行
自定义线程池
1)定义线程工厂
package com.koal;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
public class CommonThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger nextId = new AtomicInteger(1);
public CommonThreadFactory(String namePrefix) {
this.namePrefix = "From CommonThreadFactory's " +namePrefix + "-Worker-";
}
@Override
public Thread newThread(Runnable task) {
String name = namePrefix + nextId.getAndIncrement();
Thread thread = new Thread(task);
log.info(name);
return thread;
}
}
2)创建线程池
package com.koal;
import org.junit.jupiter.api.Test;
import java.util.concurrent.*;
public class NormalTest {
@Test
public void test() throws InterruptedException {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2,4,20,
TimeUnit.SECONDS,new LinkedBlockingDeque<Runnable>(),
new CommonThreadFactory("ResumeParser"),new ThreadPoolExecutor.DiscardOldestPolicy());//创建线程池
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println("hello");
}
});//执行任务
}
}
标签:Thread,编程,28,System,2020,new,println,线程,public
From: https://www.cnblogs.com/sylvesterzhang/p/18089863