首页 > 编程语言 >【并发编程】同步容器与并发容器

【并发编程】同步容器与并发容器

时间:2023-01-20 11:35:31浏览次数:41  
标签:容器 队列 编程 System 并发 int 线程 new


文章目录

  • ​​1.同步容器类​​
  • ​​2.并发容器类​​

1.同步容器类

(1)为什么会出现同步容器

  • Java集合框架中,主要有四大类别:List、Set、Queue、Map。
  • List、Set、Queue接口分别继承了Collection接口,Map本身是一个接口。
  • 注意Collection和Map是一个顶层接口,而List、Set、Queue则继承了Collection接口,分别代表数组、集合和对列这三大容器。
  • 像ArrayList、LinkedList都是实现List接口,HashSet实现了Set接口,而Deque继承了Queue接口,PriorityQueue实现了Queue接口。另外LinkedList实现了Deque接口。
  • 像ArrayList、LinkedList、HashMap这些容器都是非线程安全的。如果有多个线程并发地访问这些容器时,就会出现问题。
  • 因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。
  • 所以,Java提供了同步容器供用户使用。

(2)Java中的同步容器类

  • Vector、Stack、HashTable
  • Collections类中提供的静态工厂方法创建的类
  • Vector实现了List接口,Vector实际上就是一个数组,和ArrayList类似,但是Vector中的方法都是synchronized方法,即进行了同步措施。
  • Stack也是一个同步容器,它的方法也是用synchronzied进行了同步,继承Vector类。
  • HashTable实现了Map接口,他和HashMap相似,但是HashTable进行了同步处理,而HashMap没有。
  • Collections类是一个工具提供类,注意,它和Collection不同,Collection是一个顶层的接口。在Collections类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类

【并发编程】同步容器与并发容器_i++

(3)同步容器的缺陷

  • 1.性能问题:传统的非同步容器和同步容器的性能差异,我们以ArrayList和Vector为例
public class Demo1 {
public static void main(String[] args) {
ArrayList<Integer> arrayList = new ArrayList<>();
Vector<Integer> vector = new Vector<>();

long startTime = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
vector.add(i);
}
long endTime = System.currentTimeMillis();
System.out.println("vector循环10万次添加元素消耗的时间:"+(endTime-startTime)+"ms");

long startTime1 = System.currentTimeMillis();
for (int i = 0; i < 100000; i++) {
arrayList.add(i);
}
long endTime1 = System.currentTimeMillis();
System.out.println("ArrayList循环10万次添加元素消耗的时间:"+(endTime1-startTime1)+"ms");
}
}

【并发编程】同步容器与并发容器_List_02

  • 进行同样多的插入操作,Vector的耗时是ArrayList的两倍。
  • 这只是其中的一方面性能问题上的反映。
  • 另外,由于Vector中的add方法和get方法都进行了同步,因此,在有多个线程进行访问时,如果多个线程都只是进行读取操作,那么每个时刻就只能有一个线程进行读取,其他线程便只能等待,这些线程必须竞争同一把锁。
  • 因此为了解决同步容器的性能问题,在Java 1.5中提供了并发容器,位于java.util.concurrent目录下。
  • 2.同步容器的安全性问题
  • Vector中的方法都进行了同步处理,那么一定就是线程安全的,事实上这可不一定。
public class Demo2 {
private static Vector<Integer> vector = new Vector<>();

public static void main(String[] args) {

while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread(() -> {

for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
});
Thread thread2 = new Thread(() -> {

for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
});
thread1.start();
thread2.start();
}
}
}

【并发编程】同步容器与并发容器_后端_03

正如大家所看到的,这段代码报错了:数组下标越界。

也许有朋友会问:Vector是线程安全的,为什么还会报这个错?很简单,对于Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:

当某个线程在某个时刻执行这句时:

for(int i= 0;i < vector.size();i++) vector.get(i);

假若此时vector的size方法返回的是10,i的值为9

然后另外一个线程执行了这句:

for(int i= 0;i < vector.size();i++) vector.remove(i);

将下标为9的元素删除了。

那么通过get方法访问下标为9的元素肯定就会出问题了。

因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

public class Demo2 {
private static Vector<Integer> vector = new Vector<>();
private static Object object = new Object();

public static void main(String[] args) {

while (true) {
for (int i = 0; i < 10; i++) {
vector.add(i);
}
Thread thread1 = new Thread(() -> {

synchronized (object) {
for (int i = 0; i < vector.size(); i++) {
vector.remove(i);
}
}
});
Thread thread2 = new Thread(() -> {

synchronized (object) {
for (int i = 0; i < vector.size(); i++) {
vector.get(i);
}
}
});
thread1.start();
thread2.start();
}
}
}
  • 3.ConcurrentModificationException异常
  • 在对Vector等容器并发地进行迭代修改时,会报ConcurrentModificationException异常,关于这个异常将会在后续文章中讲述。但是在并发容器中不会出现这个问题。

2.并发容器类

(1)ConcurrectHashMap

【并发编程】同步容器与并发容器_java_04

  • HashMap、HashTable与ConcurrentHashMap都是实现的哈希表数据结构,在随机读取的时候效率很高。HashTable实现同步是利用synchronzied关键字进行锁定的,其实针对整张Hash表进行锁定,及每次锁住整张表让线程独占,在线程安全的背后是巨大的浪费。
  • ConcurrentHashMap和HashTable主要的区别就在于围绕这锁的粒度进行区别以及如何区锁定。
  • 上图中,左边是HashTable的实现方式,可以看到锁住的是整张哈希表,而右边是ConcurrectHashMap的实现方式,单独锁住每一个桶(segment),ConcurrentHashMap将哈希表分为16个桶(默认值),诸如get()、put()、remove()等常用操作之锁定当前需要的桶,而size()才锁定整张表。原来只能一个线程进入,现在却能同时接受16个写线程并发进入(写线程需要锁定,而读线程几乎不受限制),并发性的提升是显而易见的。
  • 而在迭代时,ConcurrentHashMap使用了不同于传统集合的快速失败迭代器(fast-fail iterator)的另外一种迭代方式,称为弱一致迭代器。在这种迭代方式中,当iterator被创建后集合在发生改变就不再是抛出ConcurrentModificationException,取而代之的是在改变时实例化出新的数据从而不影响原有的数据,iterator完成后在将头指针替换成为新的数据,这样iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。
public class Demo3 {
public static void main(String[] args) {
//Map<String,String> map = new ConcurrentHashMap<>(); //84ms
//Map<String,String> map = new ConcurrentSkipListMap<>(); //108ms
//Map<String,String> map = new Hashtable<>(); //92ms
Map<String,String> map = new HashMap<>(); //139ms

Random random = new Random();

Thread[] threads = new Thread[100];

CountDownLatch latch = new CountDownLatch(threads.length);
long startTime = System.currentTimeMillis();
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(()->{
for (int j = 0; j < 10000; j++) {
map.put(random.nextInt(1000000)+"",random.nextInt()+"");
latch.countDown();
}
});
}

Arrays.asList(threads).forEach(thread -> thread.start());

try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(endTime-startTime+"ms");
}
}
  • 运行结果:
ConcurrentHashMap:84ms
ConcurrentSkipListMap:108ms
Hashtable:92ms
HashMap:139ms
  • 启动100个线程,向每个容器中添加100000个元素,最终发现,ConcurrentHashMap要比HashMap效率高,ConcurrentHashMap是将大锁分成若干小锁,实现多个线程共同运行,锁以效率有很大差距。ConcurrentSkipListMap较ConcurrentHashMap除了实现高并发外还能对元素进行排序。

(2)ConcurrentQueue

  • 与ConcurrentHashMap相同,ConcurrentQueue也是通过同样的方式来提高并发性能的。
  • 同步容器中提到过火车票问题:
  • 有N张火车票,每张车票都有一个编号,同时有10个窗口对外售票。
  • 使用ConcurrentQueue进一步提高并发性:
public class Demo4 {
private static Queue<String> queues = new ConcurrentLinkedDeque<>();

static {
for (int i = 0; i < 10000; i++) {
queues.add("票编号:"+i);
}
}

public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
new Thread(()->{
while(true){
String s = queues.poll();
if (s ==null) break;
else System.out.println("销售了---"+s);
}
}).start();
}
long end = System.currentTimeMillis();
Thread.sleep(3000L);
System.out.println("总耗时:"+(end-start)+"ms");
}

}

【并发编程】同步容器与并发容器_System_05

  • 常用的API
Queue<String> strings = new ConcurrentLinkedQueue<String>();

strings.offer(元素) //相当于add,放进队列

strings.size() //获取当前队列的元素个数

strings.poll() //取出并移除

strings.peek() //取出不会移除,相当于get();

(3)CopyOnWriteArrayList

  • 写时复制容器,即copy-on-write,多线程环境下,写时效率低,读时效率高,适合写少读多的环境。
public class Demo5 implements Runnable{
private static List<String> lists = new ArrayList<>();
//private static List<String> lists = new Vector<>();
//private static List<String> lists = new CopyOnWriteArrayList<>();
private Random random = new Random();

@Override
public void run() {
for (int i = 0; i < 10000; i++) {
lists.add(random.nextInt()+"");
}
}

public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
System.out.println("线程开始操作");
for (int i = 0; i < 10; i++) {
new Thread(new Demo5()).start();
}
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(()->{
for (int j = 0; j < lists.size(); j++) {
lists.get(finalI);
}
}).start();
}
long end = System.currentTimeMillis();
Thread.sleep(6000L);
System.out.println("耗时:"+(end-start)+"ms");
}
}
  • 运行结果:
ArrayList:报错:Exception in thread "Thread-1" java.lang.ArrayIndexOutOfBoundsException: 244
Vector:117ms
CopyOnWriteArrayList:222ms
  • 从JDK5开始Java并发包里面提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。
  • 当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后向新的容器添加元素,添加完成元素后,再将原来的容器的引用指向新的容器。这样做的好处就是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为在当前读的容器不会添加任何元素。所以CopyOnWrite容器是一种读写分离的思想,读和写写对应不同的容器。

(4)BlockingQueue

  • 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。
  • 阻塞对列,顾名思义,首先他是一个队列,一个队列在数据结构当中起到的作用大致如下:

【并发编程】同步容器与并发容器_List_06

  • 队列可以使得数据由队列的一端输入,从另一端输出。
  • 先进先出(FIFO):先插入的队列的元素也是最先出队列,这种队列体现了一种公平性。
  • 后进先出(LIFO):后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。

【并发编程】同步容器与并发容器_i++_07

(5)LinkedBlockingQueue

  • 这中并发容器,会自动实现阻塞式的生产者/消费者模式。使用队列解耦合,在实现异步事物的时候很有用。
  • 案例
public class Demo6 {

//实例化时指定容器容量
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);

public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
try {
//向对列中添加元素,如果对列满了 就等待1s在进行添加
boolean b = linkedBlockingQueue.offer(finalI + "", 1, TimeUnit.SECONDS);
if (b) {
System.out.println(finalI + "队列添加成功");
} else {
System.out.println(finalI + "队列添加失败,进入等待");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}

for (int i = 0; i < 10; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 1000; j++) {
try {
//消费队列,如果为空就等待消费
String take = linkedBlockingQueue.take();
System.out.println("消费队列元素:" + take);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}).start();
}

}

}

【并发编程】同步容器与并发容器_后端_08

  • 常用API
//实例化时指定容器容量
private static LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);

linkedBlockingQueue.add(元素) //如果队列满了,再次添加就会抛出异常:java.lang.IllegalStateException: Queue full

linkedBlockingQueue.offer(元素,时间,时间单位) //队列满了,等待时间后,再次添加,失败返回false

linkedBlockingQueue.offer(元素) //队列满了,添加失败返回false,成功返回true

linkedBlockingQueue.put(元素) //加入队列,如果满了就等待阻塞

linkedBlockingQueue.take() //取出队列中的元素,如果空了,就会等待阻塞

(6)ArrayBlockingQueue

  • ArrayBlockingQueue和LinkedBlockingQueue对象的方法都是一样的,用法是一样的。
  • 二者的区别:
  • LinkedBlockingQueue是一个单向链表实现的阻塞队列,在链表一头加入元素,如果队列满了,就会阻塞,另一头取出元素,如果队列为空,就会阻塞。
  • LinkedBlockingQueue内部使用ReetrantLock实现插入锁(putLock)和取出锁(takeLock)。
  • ArrayBlockingQueue基于数组实现,成为有界队列,LinkedBlockingQueue认为是无界队列。当然LinkedBlockingQueue也可以指定队列容量。

【并发编程】同步容器与并发容器_System_09

(7)DelayQueue

  • DelayQueue也是一个BlockingQueue,用于放置实现了Delayed接口的对象,只能是实现了Delayed接口的对象,其中对象只能在其到期时才能从队列中取走。
  • Delayed扩展了Comparable接口,比较的基准为延时的时间,Delayed接口实现类getDelay()返回值为固定值(final),DelayedQueue内部是使用PriorityQueue实现的;即 (DelayQueue = BlockingQueue + PriorityQueue + Delayed
  • 可以说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准是时间。是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时能从队列中取走。这种队列是有序的,及队头对象的延迟到期时间最长。但是要注意不能将null元素放置到队列中。
  • Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现类必须重写一个compareTo()方法,该方法提供于此接口的getDelay()方法一致的排序。
  • DelayQueue存储的对象是实现了Delayed接口的对象,在这个对象中,需要重写compareTo()和getDelay()方法。
  • 自定义MyTask类实现Delayed
public class MyTask implements Delayed {

private long time;
private String name;
private long start = System.currentTimeMillis();

public MyTask(String name,long time) {
this.time = time;
this.name = name;
}

@Override
public long getDelay(TimeUnit unit) {
return (start+time) - System.currentTimeMillis();
}

@Override
public int compareTo(Delayed o) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}

@Override
public String toString() {
return "MyTask{" +
"time=" + time +
", name='" + name + '\'' +
'}';
}
}
  • 测试main
public class Main {
public static void main(String[] args) throws InterruptedException {

DelayQueue <MyTask> myTasks = new DelayQueue<>();

new Thread(()->{
myTasks.offer(new MyTask("task1",10000));
myTasks.offer(new MyTask("task2",4000));
myTasks.offer(new MyTask("task3",4200));
myTasks.offer(new MyTask("task4",6200));
myTasks.offer(new MyTask("task5",9800));
}).start();
long start = System.currentTimeMillis();
Thread.sleep(2000);
System.out.println("队列中存放数据:");
for (MyTask myTask : myTasks) {
System.out.println(myTask);
}

System.out.println();
System.out.println("队列中取出数据:");
while(true){
MyTask myTask = myTasks.take();
System.out.println(myTask+":取出耗时:"+(System.currentTimeMillis()-start)+"ms");
}
}
}

【并发编程】同步容器与并发容器_后端_10

  • DelayQueue能做什么
  • 淘宝订单业务:下单之后如果30分钟之内没有付款就自动取消订单。
  • 饿了么定餐通知:下单成功后60s后给用户发短信。
  • 关闭空闲连接:服务器中,很多客户端的连接,空闲一段时间之后需要关闭。
  • 缓存:缓存中的对象,超过了空闲时间,需要从缓存中移出。
  • 任务超时处理:在网络协议滑动窗口请求应答交互时,处理超时未响应的请求等。

(8)LinkedTransferQueue

  • TransferQueue是一个继承了BlockingQueue的接口,并且增加了若干新方法。
  • LinkedTransferQueue是TransferQueue接口的实现类,其定义一个无界的队列,具有先进先出(FIFO)的特性。
  • TransferQueue接口含有下面几个重要方法:
  • transfer(E e)
  • 若当前存在一个正在等待获取的消费者线程,即立刻移交之,否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
  • tryTransfer(E e)
  • 若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移\传输对象元素e;如不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
  • tryTransfer(E e,long timeout,TimeUnit nuit)
  • 若当前存在一个正在等待的消费者线程,会立即传输给它,否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉,若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
  • hasWaitingConsumer()
  • 判断是否由消费者线程。
  • getWaitingConsumerCount()
  • 获取所有等待获取元素的消费者线程数量。
  • size()
  • 因为队列的异步特性,检测当前队列的元素个数需要逐一迭代,无法保证原子性,可能会得到一个不太准确的结果,尤其是在遍历时有可能队列发生更改。
  • 消费者生产者案例
  • Producer
public class Producer implements Runnable {

private final TransferQueue<String> queue;

//构造传入LinkedTransferQueue队列
public Producer(TransferQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
try {
//生产者循环
while (true){
//判断当前队列是否还有消费者,有的话就生产产品交由消费者线程
if(queue.hasWaitingConsumer()) queue.transfer(produce());
//休眠1s
TimeUnit.SECONDS.sleep(1);
}
}catch (Exception e){
e.printStackTrace();
}
}
//生产产品方法
private String produce(){
return "Your lucky number:"+(new Random().nextInt(100));
}
}
  • Consumer
public class Consumer implements Runnable{
private final TransferQueue<String> queue;

public Consumer(TransferQueue<String> queue) {
this.queue = queue;
}

@Override
public void run() {
try{
//消费者线程取出队列元素
System.out.println("Consumer--"+Thread.currentThread().getName()+"--"+queue.take());
}catch (Exception e){
e.printStackTrace();
}
}
}
  • main测试
public class Main {
public static void main(String[] args) {
TransferQueue<String> queue = new LinkedTransferQueue<>();

Thread producer = new Thread(new Producer(queue));
producer.setDaemon(true);
producer.start();

for (int i = 0; i < 20; i++) {
Thread consumer = new Thread(new Consumer(queue));
consumer.setDaemon(true);
consumer.start();
try{
Thread.sleep(1000L);
}catch (Exception e){
e.printStackTrace();
}
}
}
}

【并发编程】同步容器与并发容器_java_11

(9)SynchronousQueue

  • SynchronousQueue也是一种BlockingQueue,是一种无缓冲的等待队列。所以在某次添加元素后必须等待其他线程取走后才能继续添加,可以认为SynchronousQueue是一个缓存值为0的阻塞队列(也可以是1),它的isEmpty()方法永远返回时true,remainingCapacity()方法永远返回时0。
  • remove和removeAll方法返回永远是false,iterator()方法永远返回空,peek()方法永远返回null。
  • 使用put()方法时,会一直阻塞在这里,等待被消费。
  • 案例代码
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> strings = new SynchronousQueue<>();

for (int i = 0; i < 2; i++) {
new Thread(()->{
try{
System.out.println("取出数据:"+strings.take());
}catch (Exception e){
e.printStackTrace();
}
}).start();
}
strings.put("aaa");
strings.put("bbb");

}
}

【并发编程】同步容器与并发容器_System_12

【并发编程】同步容器与并发容器_i++_13


标签:容器,队列,编程,System,并发,int,线程,new
From: https://blog.51cto.com/u_15646271/6020578

相关文章

  • Function函数式编程汇总
    publicclassParent{publicvoidprint(){System.out.println("Parent");}protectedIntegergetName(Stringname){return0;......
  • 【并发编程】并发工具类
    文章目录​​1.并发工具CountDownLatch类​​​​1.1.CountDownLatch简介​​​​1.2.CountDownLatch数据结构​​​​3.CountDownLatch源码分析​​​​4.CountDownLatch案......
  • 【并发编程】原子类
    文章目录​​1.什么是原子类​​​​2.原子更新基本类型​​​​3.原子更新数组类型​​​​4.原子地更新属性​​1.什么是原子类(1)原子类简介一度认为原子是不可分割的最小......
  • 读函数式编程思维笔记01_演化的语言
    1. 范式转变1.1. 学习一种全新的编程范式,困难并不在于掌握新的语言1.1.1. 真正考验人的,是怎么学会用另一种方式去思考1.2. 计算机科学的间歇式进步,好思路有时搁置......
  • 资深程序员在编程中有哪些特殊的习惯或方法?
    知乎上有一个问答:高级程序员在编程中有哪些特殊的习惯或方法?我是一个有着22年编程经验的老程序员,谈不上高级,我来谈谈自己的三点心得。一定要有自己的代码库以前有程......
  • 17种编程语言实现排序算法-快速排序
    开源地址https://gitee.com/lblbc/simple-works/tree/master/sort/1.安卓Java版privatestaticvoidsort(int[]array){sortMe(array,0,array.length-1);......
  • 链式编程
    目录一、什么是链式编程二、链式编程的实现(指针)三、链式编程(引用)一、什么是链式编程链式编程在C++中使用的地方很多,比如说输出的时候可以使用很多的<<,这就使用了链式编程......
  • redis高并发经典问题
    缓存穿透当用户访问的数据既不在缓存也不在数据库中时,就会导致每个用户查询都会“穿透”缓存“直抵”数据库。这种情况就称为缓存穿透。当高度发的访问请求到达时,缓存穿......
  • 【学懂Java】(四)面向对象编程-2
    一.局部变量和成员变量局部变量成员变量(全局变量)定义在方法中定义在方法外,类之内的变量栈内存中堆内存中局部变量没有默认值成员变量有默认值当前方法当前类的方法不同的方......
  • Docker获取tomcat镜像,安装 tomcat容器
    Docker获取tomcat镜像,安装tomcat容器1、搜索tomcat镜像 dockersearchtomcat2、拉取tomcat镜像 dockerpulltomcat:7-jre73、创建tomcat容器 dockerrun-di......