Exchanger用于两个线程之间(也可以多个线程)交换数据,交换器将自动匹配两个线程,将其数据互相传递.
public class Test { public static void main(String[] args) { doubleThread(); } public static void doubleThread() { Exchanger<String> exchanger = new Exchanger<>(); Thread t1 = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "~~~~~~~~~~~~~" + exchanger.exchange("AAA")); } catch (Exception e) { e.printStackTrace(); } }); t1.setName("线程11111111111111111111111111"); t1.start(); Thread t2 = new Thread(() -> { try { System.out.println(Thread.currentThread().getName() + "~~~~~~~~~~~~~" + exchanger.exchange("BBB")); } catch (Exception e) { e.printStackTrace(); } }); t2.setName("线程22222222222222222222222222"); t2.start(); } }
先进入Exchanger的线程会阻塞等待,直到有新的线程进入,两个线程交换数据后返回.还可以使用 public V exchange(V x, long timeout, TimeUnit unit);来设置超时等待时间,如果在等待时间内还没有配对成功,则抛出异常.
构造函数,创建了一个Participant其实是ThreadLocal,了解ThreadLocal的同学会知道ThreadLocal中其实是不存储数据的,其数据都存放到Thread中,标识符为ThreadLocal值为传递的值,当前的值为Node.
这块的关系看不懂的可以看我的另一篇博客 ThreadLocal源码
public Exchanger() { participant = new Participant(); }
static final class Participant extends ThreadLocal<Node> { public Node initialValue() { return new Node(); } }
@sun.misc.Contended static final class Node { int index; // Arena index int bound; // Last recorded value of Exchanger.bound int collides; // Number of CAS failures at current bound int hash; // Pseudo-random for spins Object item; // This thread's current item volatile Object match; // Item provided by releasing thread volatile Thread parked; // Set to this thread when parked, else null }
下边我们从带超时的 exchange(v)方法查看源码,超时方法与非超时方法只是在线程park()时,传递了休眠时间,如果是被其他线程唤醒的则不会抛出异常,如果是park()睡眠时间到达后自然唤醒的则抛出异常.
public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; //传入的是null则给默认值 if ((arena != null ||(v = slotExchange(item, false, 0L)) == null)
&& ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null))) throw new InterruptedException(); return (v == NULL_ITEM) ? null : (V)v;//如果交换线程给的数据是NULL_ITEM则表示对方给的null }
在这个方法中最主要的两个分支是 slotExchange 和 arenaExchange 前者表示线程并发低的情况,例如A,B两个线程,则不会触发arena机制,无论A,B哪个线程先到达,只需要等待另外一个线程到达与之交换数据即可.而后者则是Exchanger处于高并发情况,线程
数量很多需要使用arena数组存放.
先分析slotExchange
当线程1到达标记1时,先检查slot是否为null,简单说在低并发情况下slot就是表示等待线程的,如果slot为null则表示没有需要配对的线程,那当前线程就需要需要等待.随后自旋到标记2将自己设置为slot.
出了自旋后,走到标记3, match如果有值则表示有线程与之交换.因为此时就一条线程所以继续走到标记4将调用park,将自己阻塞.
此时线程2也到达,也是到达标记1,检查到slot已经有值是线程1,则先将slot设置为null然后将自己要交换的数据设置到线程1的math,在标记5处唤醒线程1,
而自己则将从线程1处获取的交换数据返回.
线程1被唤醒后重新自旋到标记3获取到交换数据,走到标记6完成交换
在整个slotExchange里因为多线程原因,某条线程获取到slot后想通过CAS的方式将其设置为null从而交换数据,但发现slot已经不是自己获取的也就是被其他变量抢先交换了.那就会触发标记7创建arena从而导致使用arena交换策略.
private final Object slotExchange(Object item, boolean timed, long ns) {
Node p = participant.get();
Thread t = Thread.currentThread();
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {
if ((q = slot) != null) {//标记1
if (U.compareAndSwapObject(this, SLOT, q, null)) {
Object v = q.item;
q.match = item;
Thread w = q.parked;
if (w != null)
U.unpark(w);//标记5
return v;
}
// create arena on contention, but continue until slot null
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];//标记7
}
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {//标记2
p.item = item;
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
// await release
int h = p.hash;
long end = timed ? System.nanoTime() + ns : 0L;
int spins = (NCPU > 1) ? SPINS : 1;
Object v;
while ((v = p.match) == null) {//标记3
if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);//标记4
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
U.putOrderedObject(p, MATCH, null);//标记6
p.item = null;
p.hash = h;
return v;
}
arenaExchange策略一旦触发则需要配对的线程就不在设置到slot属性上了,而是存档到arena数组中.arenaExchange策略也就是从数组中寻找能与之交换的线程.
当线程1走到标记1拿到自己的index它默认为0也就是从arena的0号下标开始,到标记2处寻找数组中是否有其他等待的线程,当前肯定是没有则到达标记3将自身存档到数组中,然后到达标记4阻塞线程
当线程2走到标记1拿到自己的index后到标记2寻找数组中是否有等待的线程,此时发现线程1处于等待.此处的交换数据与slotExchange相同,最终唤醒线程1.
线程1唤醒后自旋到标记5判断已经有线程与之交换则拿到数据返回.
private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena; Node p = participant.get(); for (int i = p.index;;) { //标记1 // access slot at i int b, m, c; long j; // j is raw array offset Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);//标记2 if (q != null && U.compareAndSwapObject(a, j, q, null)) { Object v = q.item; // release q.match = item; Thread w = q.parked; if (w != null) U.unpark(w); return v; } else if (i <= (m = (b = bound) & MMASK) && q == null) { p.item = item; // offer if (U.compareAndSwapObject(a, j, null, p)) {//标记3 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L; Thread t = Thread.currentThread(); // wait for (int h = p.hash, spins = SPINS;;) { Object v = p.match; if (v != null) {//标记5 U.putOrderedObject(p, MATCH, null); p.item = null; // clear for next use p.hash = h; return v; } else if (spins > 0) { h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0) Thread.yield(); // two yields per wait } else if (U.getObjectVolatile(a, j) != p) spins = SPINS; // releaser hasn't set match yet else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) { U.putObject(t, BLOCKER, this); // emulate LockSupport p.parked = t; // minimize window if (U.getObjectVolatile(a, j) == p) U.park(false, ns);//标记4 p.parked = null; U.putObject(t, BLOCKER, null); } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) { if (m != 0) // try to shrink U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); p.item = null; p.hash = h; i = p.index >>>= 1; // descend if (Thread.interrupted()) return null; if (timed && m == 0 && ns <= 0L) return TIMED_OUT; break; // expired; restart } } } else p.item = null; // clear offer } else { if (p.bound != b) { // stale; reset p.bound = b; p.collides = 0; i = (i != m || m == 0) ? m : m - 1; } else if ((c = p.collides) < m || m == FULL || !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) { p.collides = c + 1; i = (i == 0) ? m : i - 1; // cyclically traverse } else i = m + 1; // grow p.index = i; } } }
以上就是Exchanger多线程交换的两种策略,本文中主要标记了两种策略的核心流程,源码中对于两种策略中可能发生的情况处理的更细腻,特别是arena策略实际情况如果触发了arena策略觉不是本文所描述的两个线程这么简单.
有想要详细了解的同学可以自行查看源码,也可以和我互相交流.
这篇博客送给孙航,感谢我的朋友.
标签:slot,多线程,Thread,标记,交换,item,线程,Exchanger,null From: https://www.cnblogs.com/zumengjie/p/17135306.html