线程安全
线程安全概念 : 当多个线程访问某一个类(对象或方法)时,这个类始终都能表现出正确的行为,那么这个类(对象或方法)就是线程安全的.
synchronized : 可以在任意对象及方法上加锁,而加锁的这段代码称为"互斥区"或"临界区".
总结 :
当多个 线程访问myThread的run方法时,以排队的方式进行处理(这里排队是按照CPU分配的先后顺序而定的),一个线程想要执行synchronized修饰的方法里的代码,首先
是尝试获得锁,如果拿到锁,执行synchronized方法体的内容;拿不到锁,这个线程就会不断的尝试获得这把锁,直到拿到为止,而且是多个线程同时去竞争这把锁.(也就是会有锁竞争的问题)
多个线程多个锁 : 多个线程,每个线程都可以拿到自己指定的锁,分别获得锁之后,执行synchronized方法体的内容.
总结 :
关键字synchronized取得的锁都是对象锁,而不是把一段代码(方法)当做锁,所以示例代码中哪个线程先执行synchronized关键字的方法,那个线程就持有该方法所属对象的
锁(Lock),两个对象,线程获得的就是两个不同的锁,他们互不影响.
有一种情况则是相同的锁,既在静态方法上加synchronized关键字,表示锁定.class类,类一级别的锁(独占.class类)
对象锁的同步和异步
同步 : synchronized
同步的概念就是共享,我们要牢牢记住"共享"这两个字,如果不是共享的资源,就没有必要进行同步.
异步 : asynchronized
异步的概念就是独立,相互之间不受到任何制约.就好像我们学校http的时候,在页面发起的Ajax请求,我们还可以继续浏览或操作页面的内容,二者之间没有任何关系.
同步的目的就是为了线程安全,其实对于线程安全来说,需要满足两个特性 : 原子性(同步),可见性.
总结 :
A线程先持有object对象的Lock锁, B线程如果在这个时候调用对象中的同步(synchronized)方法则需等待, 也就是同步;
A线程先持有object对象的Lock锁, B线程可以以异步的方法调用对象中的非synchronized修饰的方法.
脏读
对于对象的同步和异步的方法,我们在设计自己的程序的时候,一定要考虑问题的整体,不然就会出现数据不一致的错误,很经典的错误就是脏读(dirtyread)
总结 :
在我们对一个对象的方法加锁的时候,需要考虑业务的整体性,既为setValue/getValue方法同时加锁synchronized同步关键字,保证业务(service)的原子性,不然会出现
业务的错误(也从侧面保证业务的一致性)
Oracle数据库一致性读的概念 :
undo主要有三大作用 : 提供一致性读(Consistent Read), 回滚事务(Rollback Transaction)以及实例恢复(Instance Recovery).
线程之间通信
线程通信概念 : 线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体,线程间的通信就成为整体的必用方式之一.当线程存在通信指挥,系统间
的交互性会更强大,在提高CPU利用率的同时还会使开发人员对线程任务在处理的过程中进行有效的把控与监督.
使用wait / notify 方法实现线程间的通信.(注意这两个方法都是object的类的方法,换句话说java为所有的对象都提供了这两个方法)
1 : wait和notify必须配合synchronized关键字使用;
2 : wait方法释放锁,notify方法不释放锁.
在Java中可以用wait,notify和notifyAll来实现线程间的通信.线程在运行的时候,如果发现某些条件没有被满足,可以调用wait方法暂停自己的执行,并且放弃已经获得的锁,
然后进入等待状态.当该线程被其他线程唤醒并获得锁后,可以沿着之前暂停的地方继续向后执行,而不是再次从同步代码块开始的地方开始执行.但是需要注意的一点是,对线程
等待的条件的判断要使用while而不是if来进行判断.这样在线程被唤醒后,会再次判断条件是否正真满足.
案例 :
package mythread.mythread;
import java.util.ArrayList;
import java.util.List;
public class ListAdd1 {
private volatile static List list = new ArrayList();
public void add() {
list.add("bjsxt");
}
public int size() {
return list.size();
}
public static void main(String[] args) {
final ListAdd1 list1 = new ListAdd1();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
list1.add();
System.out.println("当前线程 : " + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
if (list1.size() == 5) {
System.out.println("当前线程收到通知 : " + Thread.currentThread().getName() + "list size = 5线程停止");
throw new RuntimeException();
}
}
}
}, "t2");
t1.start();
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
package mythread.mythread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* wait notfiy 方法,wait释放锁,notfiy不释放锁
*/
public class ListAdd2 {
private volatile static List list = new ArrayList<>();
public void add() {
list.add("bjsxt");
}
public int size() {
return list.size();
}
public static void main(String[] args) {
final ListAdd2 list2 = new ListAdd2();
// 1.实例化出来一个lock
// 2.当使用wait和notify的时候,一定要配合着synchronized关键字去使用
// final Object lock = new Object();
final CountDownLatch countDownLatch = new CountDownLatch(1);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
///synchronized (lock) {
for (int i = 0; i < 10; i++) {
list2.add();
System.out.println("当前线程 : " + Thread.currentThread().getName() + "添加了一个元素..");
Thread.sleep(500);
if (list2.size() == 5) {
System.out.println("已结发出通知 ..");
countDownLatch.countDown();
//lock.notify();
}
}
// }
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
//synchronized (lock) {
if (list2.size() != 5) {
try {
System.out.println("t2进入...");
Thread.sleep(3000);
//lock.wait();
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
System.out.println("当前线程 : " + Thread.currentThread().getName() + "收到通知线程停止..");
throw new RuntimeException();
}
//}
}, "t2");
t2.start();
t1.start();
}
}
使用wait/notify模拟Queue
BlockingQueue : 顾名思义,首先它是一个队列,并且支持阻塞的机制,阻塞的放入和得到数据.我们要实现LinkedBlockingQueue下面两个简单的方法put和take.
put(anObject) : 把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.
take : 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入.
例子 :
package mythread.mythread;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicInteger;
public class MyQueue {
// 1.需要一个承装元素的集合
private LinkedList<Object> list = new LinkedList<Object>();
// 2需要一个计数器
private AtomicInteger count = new AtomicInteger(0);
// 3需要制定上限和下限
private final int minSize = 0;
private final int maxSize;
// 4构造方法
public MyQueue(int maxSize) {
this.maxSize = maxSize;
}
// 5初始化一个对象,用于加锁
private final Object lock = new Object();
// put(anObject) : 把anObject加到BlockQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断,直到BlockingQueue里面有空间再继续.
public void put(Object obj) {
synchronized (lock) {
while (count.get() == this.maxSize) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
//1 加入元素
list.add(obj);
//2计数器累加
count.incrementAndGet();
//3通知另外一个线程(唤醒)
lock.notify();
System.out.println("新加入的元素为 : " + obj);
}
}
//take : 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入.
public Object take() {
Object ret = null;
synchronized (lock) {
while (count.get() == this.minSize) {
try {
lock.wait();
} catch (Exception e) {
e.printStackTrace();
}
}
//1做移除元素操作
ret = list.removeFirst();
//2计数器递减
count.decrementAndGet();
//3唤醒另外一个线程
lock.notify();
}
return ret;
}
public static void main(String[] args) {
final MyQueue mq = new MyQueue(5);
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
System.out.println("当前容器的长度 : " + mq.getSize());
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
mq.put("f");
mq.put("g");
}
}, "t1");
t1.start();
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
Object o1 = mq.take();
System.out.println("移除的元素为 : " + o1);
Object o2 = mq.take();
System.out.println("移除的元素为 : " + o2);
}
}, "t2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
public int getSize() {
return list.size();
}
}
ThreadLocal概念 : 线程局部变量,是一种多线程间并发访问变量的解决方案.与其synchronized等加锁的方式不同,ThreadLocal完全不提供锁,而使用以空间换时间的手段,
为每个线程提供变量的独立副本,以保障线程安全.从性能上说,ThreadLocal不具有绝对的优势,在并发不是很高的时候,加锁的性能会更好,但作为一套与锁完成无关的
线程安全解决方案,在高并发量或者竞争激烈的场景,使用ThreadLocal可以在一定程度上减少锁竞争.
例子 :
package mythread.mythread;
public class ConnThreadLocal {
public static ThreadLocal<String> th = new ThreadLocal<String>();
public void setTh(String value) {
th.set(value);
}
public void getTh() {
System.out.println(Thread.currentThread().getName() + ":" + this.th.get());
}
public static void main(String[] args) throws Exception{
final ConnThreadLocal ct = new ConnThreadLocal();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
ct.setTh("张三");
ct.getTh();
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
ct.getTh();
} catch (Exception e) {
e.printStackTrace();
}
}
}, "t2");
t1.start();
t2.start();
}
}
单例&多线程
单例模式,最常见的就是饥饿模式,和懒汉模式,一个直接实例化对象,一个在调用方法时进行实例化对象.在多线程模式中,考虑到性能和线程安全问题,我们一般选择下面
两者经典的单例模式,在性能提高的同时,又保证了线程安全.
dubble check instance
static inner class
例子 :
package mythread.mythread;
/**
* 静态内部类实现单例模式
*/
public class Singletion {
private static class InnerSingletion {
private static Singletion singe = new Singletion();
}
public static Singletion getInstance() {
return InnerSingletion.singe;
}
}
同步类容器
同步类容器都是线程安全的,但在某些场景下可能需要加锁来保护复合操作.复合类操作如 : 迭代(反复访问元素,遍历完容器中所有的元素),跳转(根据指定的顺序找到当前
的下一个元素),以及条件运算.这些复合操作在多线程并发的修改容器时,可能会表现出意外的行为,最经典的便是ConcurrentModificationException,原因是当容器迭代的
过程中,被并发的修改了内容,这是由于早期迭代器设计的时候并没有考虑并发修改的问题.
同步类容器 : 如古老的Vector,HashTable.这些容器的同步功能其实都是由JDK的Collections.synchronized***等工厂方法区创建实现的.其底层的机制无非就是用传统的
synchronized关键字对每个公用的方法都进行同步,使得每次只能有一个线程访问容器的状态.这很明显不满足我们今天互联网时代高并发的需求,在保证线程安全的同时,
也必须要有足够好的性能.
并发类容器
jdk5.0以后提供了多种并发类容器来替代同步类容器从而改善性能.同步类容器的状态都是串行化的.他们虽然实现了线程安全,但是严重降低了并发性,在多线程环境时,
严重降低了应用程序的吞吐量.
并发类容器时专门针对并发设计的,使用ConcurrentHashMap来代替给予散列的传统的HashTable,而且在ConcurrnetHashMap中,添加了一些常见符复合操作的支持.以及使用
了CopyOnWriteArrayList代替Voctor,并发的CopyonWriteArraySet,以及并发的Queue,ConcurrentLinkedQueue和LinkedBlockingQueue,前者是高性能的队列,后者是以阻塞
形式的队列,具体实现Queue还有很多,例如ArrayBlockingQueue,PriorityBlockingQueue,SynchronousQueue等.
ConcurrentMap
ConcurrentMap接口下有两个重要的实现 :
ConcurrentHashMap
ConcurrentSkipListMap(支持并发排序功能,弥补ConcurrentHashMap)
ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的HashTable,他们有自己的锁.只要多个修改操作发生在不同的段上,他们就可以
并发进行.把一个整体分成了16个段(Segment).也就是最高支持16个线程的并发修改操作.这也是在多线程场景时减少锁的粒度从而降低锁竞争的一种方案.并且代码中大
多共享变量使用volatile关键字声明,目的是第一时间获取修改的内容,性能非常好.
Copy-On-Write容器
Copy-On-Write简称COW,是一种用于程序设计中的优化策略.
JDK里的COW容器有两种 : CopyOnWriteArrayList和CopyOnWriteArraySet,COW容器非常有用,可以在非常多的并发场景中使用到.
什么是CopyOnWrite容器?
CopyOnWrite容器既写时复制的容器.通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后
新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器.这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加
任何元素.所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器.
并发Queue
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种继承自Queue;
ConcurrentLinkedQueue
ConcurrentLinkedQueue : 是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue.它
是一个基于链接节点的无界线程安全队列.该队列的元素遵循先进先出的原则.头是最先加入的,尾是最近加入的,该队列不容许null元素.
ConcurrentLinkedQueue重要方法 :
add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法没有任何区别)
poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会.
BlockingQueue接口
ArrayBlockingQueue : 基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,其内部没实现读写分离,也就是意味着
生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适合使用.
LinkedBlockingQueue : 基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue之所以能够
高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行.他是一个无界队列.
SynchronousQueue : 一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费.
PriorityBlockingQueue : 基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现
PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列.
DelayQueue : 带有延迟时间的Queue,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素.DelayQueue中的元素必须实现Delayed接口,DelayQueue是
一个没有大小限制的队列,应用场景很多,比如对缓存超时的数据进行移除,任务超时处理,空闲链接的关闭等等.
Master-Worker模式
Master-Worker模式是常用的并行计算模式.它的核心思想是系统由两类进程协作工作 : Master进程和Workder进程.Master负责接收和分配任务,Worker负责处理子任务.
当各个Worker子进程处理完成后,会将结果返回给Master,由Master做归纳和总结.其好处是能将一个大任务分解成若干个小任务,并行执行,从而提高系统的吞吐量.
生产者-消费者
生产者和消费者也是一个多线程模式,我们在实际开发中应用非常广泛的思想理念.在生产-消费模式中 : 通常由两类线程,即若干个生产者的线程和若干个消费者的线程.
生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信.
Executor框架
创建线程池方法 :
newFixedThreadPool()方法,该方法返回一个固定数量的线程池,该方法的线程数始终不变,当有一个任务提交时,若线程池中空闲,则立即执行,若没有,则会被暂缓在
一个任务队列中等待有空闲的线程去执行.
newSingleThreadExecutor()方法,创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中.
newCachedThreadPool()方法,返回一个可根据实际情况调整线程个数的线程池,不限制最大线程数量,若有任务,则创建线程,若无任务则不创建线程.如果 没有任务则
线程在60s后自动回收(空闲时间60s).
newScheduledThreadPool()方法,该方法返回一个SchededExecutorService对象,但该线程池可以指定线程的数量.
自定义线程池使用详细
这个构造方法对于队列是什么类型的比较关键 :
在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务加入队列,若队列已满,则在
总线程数不大于maximumPoolSize的前提下,创建新的线程,若线程数大于maximumPoolSize,则执行拒绝策略.或其他自定义方式.
无界的任务队列时,LinkedBlockingQueue.与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况.当有新任务到来,系统的线程数小于
corePoolSize时,则新建线程执行任务.当达到corePoolSize后,就不会继续增加.若后续任有新的任务加入,而有没有空闲的线程资源,则任务直接进入队列等待.若任务创建
和处理的速度差异很大,无界队列会保持快速 增长,直到耗尽系统内存.
JDK拒绝策略 :
AbortPolicy : 直接抛出异常阻止系统正常工作;
CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务.
DiscardOldestPolicy : 丢弃最老的一个请求,尝试再次提交当前任务.
DiscardPolicy : 丢弃无法处理的任务,不给予任何处理.
如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口.
标签:容器,Thread,队列,编程,并发,线程,new,public From: https://blog.51cto.com/u_15829196/5756181