阅读《java并发编程实战》第五章
Semaphore的应用举例
- Semaphore的应用举例:实现一个固定大小的Set。当容器满了之后,无法add,线程阻塞。
public class BoundedHashSet {
// invariant: size of Set always less than or equal to given size
private final Set<Integer> set;
private final Semaphore semaphore;
public BoundedHashSet(final int size) {
if (size <= 0) {
throw new IllegalArgumentException("size should be above 0.");
}
this.set = Collections.synchronizedSet(new HashSet<>());
this.semaphore = new Semaphore(size);
}
public boolean add(int x) throws InterruptedException {
semaphore.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(x);
return wasAdded;
} finally {
if (!wasAdded) {
semaphore.release();
}
}
}
public boolean remove(int x) {
boolean res = set.remove(x);
if (res) {
semaphore.release(); // release不会抛 InterruptedException
}
return res;
}
}
构建可伸缩的结果缓存(实践同步的思路)
version 1: 线程安全,但并发度很差
public interface Computable <A, V> {
V compute(A arg);
}
/**
* ThreadSafe, but performance/flexibility is very bad.
* Not Good
*/
@ThreadSafe
public class Memorizer1<A, V> implements Computable<A, V> {
// 有点像静态代理模式,类似Collections.synchronizedMap()
private final Computable<A, V> task;
// GuardedBy(this)
private final Map<A, V> cache = new HashMap<>();
public Memorizer1(Computable<A, V> task) {
this.task = task;
}
@Override
public synchronized V compute(A arg) {
V ans = cache.get(arg);
if (ans == null) {
ans = task.compute(arg);
cache.put(arg, ans);
}
return ans;
}
}
version 2: 线程安全,但是task.compute()方法有可能被调用两次,因为compute
public class Memorizer2<A, V> implements Computable<A, V> {
private final Computable<A, V> task;
private final Map<A, V> cache = new ConcurrentHashMap<>();
public Memorizer2(Computable<A, V> task) {
this.task = task;
}
@Override
public synchronized V compute(A arg) {
V ans = cache.get(arg);
if (ans == null) {
ans = task.compute(arg);
cache.put(arg, ans);
}
return ans;
}
}
version 3: 线程安全,但是同样有version 2的问题,会存在多个线程进入到 if (ans == null) 里面并发执行,重复计算。
public class Memorizer3<A, V> implements Computable<A, V> {
// have an idea: if the param needs to be passed from outter, then initialize it in constructor
// otherwise initialize in declaration.
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
private final Computable<A, V> task;
public Memorizer3(Computable<A, V> task) {
this.task = task;
}
@Override
public V compute(A arg) {
Future<V> ans = cache.get(arg);
if (ans == null) {
Callable<V> eval = () -> task.compute(arg);
FutureTask<V> ft = new FutureTask<>(eval);
cache.put(arg, ft);
ans = ft;
ft.run();
}
try {
return ans.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
version 4: 线程安全,并发度也高,也解决了同时计算的问题,主要是利用了并发容器ConcurrentHashMap的putIfAbsent原子操作,只有成功添加的线程才有机会执行计算。
public class Memorizer4<A, V> implements Computable<A, V> {
private final Computable<A, V> task;
private final Map<A, Future<V>> cache = new ConcurrentHashMap<>();
public Memorizer4(Computable<A, V> task) {
this.task = task;
}
@Override
public V compute(A arg) {
while (true) {
Future<V> ans = cache.get(arg);
if (ans == null) {
FutureTask<V> ft = new FutureTask<>(() -> task.compute(arg));
// if already exists, it will return the current value, so that ans will not be null, thus not run task
ans = cache.putIfAbsent(arg, ft);
if (ans == null) { // it means put success
ans = ft;
ft.run();
}
}
try {
return ans.get();
} catch (CancellationException e) {
cache.remove(arg, ans);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
总结:通过这个例子,可以看到,充分利用并发容器类,以及并发工具类(FutureTask类,Task接口)可以简化并发编程的难度,以及提高并发的可伸缩性。
标签:task,java,编程,并发,ans,cache,arg,Computable,public From: https://www.cnblogs.com/xianzhon/p/17437793.html