java-并发集合-并发hash表 ConcurrentHashMap 演示
package me.grass.demo.concurrent;
import java.util.Date;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
/**
* 并发集合 java.util.concurrent.ConcurrentHashMap 演示
* 并发 10 个生产者产生数据,并发 5 个消费消费数据;调整活动线程数将减小处理时间
* @author xxj
*/
public class ConcurrentHashMapDemo {
/**
* 生产者快,消费者慢
* @param args
* @throws InterruptedException
* @author xxj
*/
public static void main(String[] args) throws InterruptedException {
Date before=new Date();
int pTotalThread =100; //最大线程数(生产者)
int pActivities=10; //最大线程数(生产者)
int cTotalThread =50; //活动线程数(消费者)
int cActivities=5; //活动线程数(消费者)
_lCountDownLatch = new CountDownLatch(pTotalThread+cTotalThread);
initKeys(pTotalThread);
startProducer(pActivities,pTotalThread);
startConsumer(cActivities,cTotalThread);
_lCountDownLatch.await();//等待所有线程完成
Date after = new Date();
System.out.println("队列为空:"+_concurrentHashMap.isEmpty());
System.out.println("耗时:"+((after.getTime()-before.getTime())/1000));
System.out.println("同步队列:"+_lCountDownLatch.getCount());
}
private static java.util.concurrent.CountDownLatch _lCountDownLatch;
private static java.util.concurrent.ConcurrentHashMap<Integer,Integer> _concurrentHashMap =
new java.util.concurrent.ConcurrentHashMap<Integer,Integer>();
private static java.util.concurrent.ConcurrentLinkedQueue<Integer> _keys =
new ConcurrentLinkedQueue<Integer>();
private static void initKeys(int size){
for(int i=0;i<size;i++){
_keys.add(i);
}
}
private static void startProducer(int active,int totalThread) throws InterruptedException{
java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(active);
int size =1024*1024*10;//产生 3 M 数据
Thread thread ;
for(int i=0;i<totalThread;i++){
thread = new Thread(new producer(i,size));
pool.execute(thread);
}
}
private static void startConsumer(int active,int totalThread) throws InterruptedException{
java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(active);
Thread thread ;
//启动x个消费者
for(int i=0;i<totalThread;i++){
thread = new Thread(new consumer());
pool.execute(thread);
}
}
/**
* 生产者
* @author xxj
*
*/
private static class producer implements Runnable{
public producer(int key,int size){
_size=size;
_key=key;
}
int _key;
int _size;
public void run() {
ConcurrentHashMapDemo._concurrentHashMap.put(_key,_key);//生产
System.out.println("已创建:"+_key);
ConcurrentHashMapDemo._lCountDownLatch.countDown();//线程同步递减
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* 消费者
* @author xxj
*
*/
private static class consumer implements Runnable{
public consumer(){
}
public void run() {
Integer key = 0;
Integer nInteger=null;
//循环消费,直到队列内容为空
while(!ConcurrentHashMapDemo._concurrentHashMap.isEmpty() &&
!ConcurrentHashMapDemo._keys.isEmpty()){
key = ConcurrentHashMapDemo._keys.poll();//获取 key
if(ConcurrentHashMapDemo._concurrentHashMap.containsKey(key)){
nInteger = ConcurrentHashMapDemo._concurrentHashMap.get(key);//消费
}
System.err.println("消费:"+nInteger);
try {
Thread.sleep(500);//每次消费等一会儿
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
ConcurrentHashMapDemo._lCountDownLatch.countDown();//线程同步递减
}
}
}