diff options
author | jfrijters <jfrijters> | 2014-11-10 16:37:26 +0300 |
---|---|---|
committer | jfrijters <jfrijters> | 2014-11-10 16:37:26 +0300 |
commit | f5f2461d752487698e1f90e2bdc4b0bf6a25dba7 (patch) | |
tree | 3b1db648941372d6da37bbb523203213e460d506 /openjdk/java/util | |
parent | b5ad4f360d9c86263754dd6391b09da2c4202207 (diff) |
Optimized unsafe field access.
Diffstat (limited to 'openjdk/java/util')
-rw-r--r-- | openjdk/java/util/concurrent/ForkJoinPool.java | 151 |
1 files changed, 72 insertions, 79 deletions
diff --git a/openjdk/java/util/concurrent/ForkJoinPool.java b/openjdk/java/util/concurrent/ForkJoinPool.java index 9f5e1491..4b157962 100644 --- a/openjdk/java/util/concurrent/ForkJoinPool.java +++ b/openjdk/java/util/concurrent/ForkJoinPool.java @@ -808,7 +808,7 @@ public class ForkJoinPool extends AbstractExecutorService { int j = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = (ForkJoinTask<?>)U.getObjectVolatile(a, j)) != null && base == b && U.compareAndSwapObject(a, j, t, null)) { - U.putOrderedInt(this, QBASE, b + 1); + base = b + 1; return t; } } @@ -825,7 +825,7 @@ public class ForkJoinPool extends AbstractExecutorService { t = (ForkJoinTask<?>)U.getObjectVolatile(a, j); if (t != null) { if (U.compareAndSwapObject(a, j, t, null)) { - U.putOrderedInt(this, QBASE, b + 1); + base = b + 1; return t; } } @@ -988,7 +988,7 @@ public class ForkJoinPool extends AbstractExecutorService { if (r == root) { if (base == b && U.compareAndSwapObject(a, j, t, null)) { - U.putOrderedInt(this, QBASE, b + 1); + base = b + 1; t.doExec(); } return true; @@ -1012,7 +1012,7 @@ public class ForkJoinPool extends AbstractExecutorService { if ((o = U.getObject(a, j)) instanceof CountedCompleter) { for (t = (CountedCompleter<?>)o, r = t;;) { if (r == root) { - if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { + if (compareAndSwapIntQlock(0, 1)) { if (top == s && array == a && U.compareAndSwapObject(a, j, t, null)) { top = s - 1; @@ -1068,21 +1068,17 @@ public class ForkJoinPool extends AbstractExecutorService { s != Thread.State.TIMED_WAITING); } + @ikvm.internal.InterlockedCompareAndSet("qlock") + final native boolean compareAndSwapIntQlock(int expect, int update); + // Unsafe mechanics private static final sun.misc.Unsafe U; - private static final long QBASE; - private static final long QLOCK; private static final int ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); - Class<?> k = WorkQueue.class; Class<?> ak = ForkJoinTask[].class; - QBASE = U.objectFieldOffset - (k.getDeclaredField("base")); - QLOCK = U.objectFieldOffset - (k.getDeclaredField("qlock")); ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) @@ -1284,13 +1280,13 @@ public class ForkJoinPool extends AbstractExecutorService { int spins = PL_SPINS, ps, nps; for (;;) { if (((ps = plock) & PL_LOCK) == 0 && - U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK)) + compareAndSwapIntPlock(ps, nps = ps + PL_LOCK)) return nps; else if (spins >= 0) { if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } - else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) { + else if (compareAndSwapIntPlock(ps, ps | PL_SIGNAL)) { synchronized (this) { if ((plock & PL_SIGNAL) != 0) { try { @@ -1328,7 +1324,7 @@ public class ForkJoinPool extends AbstractExecutorService { (u & SHORT_SIGN) != 0 && (e = (int)c) >= 0) { long nc = ((long)(((u + UTC_UNIT) & UTC_MASK) | ((u + UAC_UNIT) & UAC_MASK)) << 32) | (long)e; - if (U.compareAndSwapLong(this, CTL, c, nc)) { + if (compareAndSwapLongCtl(c, nc)) { ForkJoinWorkerThreadFactory fac; Throwable ex = null; ForkJoinWorkerThread wt = null; @@ -1364,12 +1360,12 @@ public class ForkJoinPool extends AbstractExecutorService { wt.setDaemon(true); if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); - do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed, + do {} while (!compareAndSwapIntIndexSeed(s = indexSeed, s += SEED_INCREMENT) || s == 0); // skip 0 WorkQueue w = new WorkQueue(this, wt, mode, s); if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + !compareAndSwapIntPlock(ps, ps += PL_LOCK)) ps = acquirePlock(); int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); try { @@ -1392,7 +1388,7 @@ public class ForkJoinPool extends AbstractExecutorService { ws[r] = w; } } finally { - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + if (!compareAndSwapIntPlock(ps, nps)) releasePlock(nps); } wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex >>> 1))); @@ -1413,9 +1409,9 @@ public class ForkJoinPool extends AbstractExecutorService { if (wt != null && (w = wt.workQueue) != null) { int ps; w.qlock = -1; // ensure set - U.getAndAddLong(this, STEALCOUNT, w.nsteals); // collect steals + getAndAddLongStealCount(w.nsteals); // collect steals if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + !compareAndSwapIntPlock(ps, ps += PL_LOCK)) ps = acquirePlock(); int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); try { @@ -1424,14 +1420,14 @@ public class ForkJoinPool extends AbstractExecutorService { if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w) ws[idx] = null; } finally { - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + if (!compareAndSwapIntPlock(ps, nps)) releasePlock(nps); } } long c; // adjust ctl counts - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) | + do {} while (!compareAndSwapLongCtl + (c = ctl, (((c - AC_UNIT) & AC_MASK) | ((c - TC_UNIT) & TC_MASK) | (c & ~(AC_MASK|TC_MASK))))); @@ -1448,7 +1444,7 @@ public class ForkJoinPool extends AbstractExecutorService { ((long)(u + UAC_UNIT) << 32)); if (v.eventCount != (e | INT_SIGN)) break; - if (U.compareAndSwapLong(this, CTL, c, nc)) { + if (compareAndSwapLongCtl(c, nc)) { v.eventCount = (e + E_SEQ) & E_MASK; if ((p = v.parker) != null) U.unpark(p); @@ -1485,7 +1481,7 @@ public class ForkJoinPool extends AbstractExecutorService { WorkQueue[] ws = workQueues; if (ps > 0 && ws != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && - U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock + q.compareAndSwapIntQlock(0, 1)) { // lock if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; @@ -1538,16 +1534,16 @@ public class ForkJoinPool extends AbstractExecutorService { WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ? new WorkQueue[n] : null); if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + !compareAndSwapIntPlock(ps, ps += PL_LOCK)) ps = acquirePlock(); if (((ws = workQueues) == null || ws.length == 0) && nws != null) workQueues = nws; int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + if (!compareAndSwapIntPlock(ps, nps)) releasePlock(nps); } else if ((q = ws[k = r & m & SQMASK]) != null) { - if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { + if (q.qlock == 0 && q.compareAndSwapIntQlock(0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; @@ -1573,12 +1569,12 @@ public class ForkJoinPool extends AbstractExecutorService { q = new WorkQueue(this, null, SHARED_QUEUE, r); q.poolIndex = (short)k; if (((ps = plock) & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + !compareAndSwapIntPlock(ps, ps += PL_LOCK)) ps = acquirePlock(); if ((ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN); - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + if (!compareAndSwapIntPlock(ps, nps)) releasePlock(nps); } else @@ -1595,8 +1591,8 @@ public class ForkJoinPool extends AbstractExecutorService { */ final void incrementActiveCount() { long c; - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, ((c & ~AC_MASK) | + do {} while (!compareAndSwapLongCtl + (c = ctl, ((c & ~AC_MASK) | ((c & AC_MASK) + AC_UNIT)))); } @@ -1623,7 +1619,7 @@ public class ForkJoinPool extends AbstractExecutorService { ((long)(u + UAC_UNIT)) << 32); int ne = (e + E_SEQ) & E_MASK; if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { + compareAndSwapLongCtl(c, nc)) { w.eventCount = ne; if ((p = w.parker) != null) U.unpark(p); @@ -1683,7 +1679,7 @@ public class ForkJoinPool extends AbstractExecutorService { helpRelease(c, ws, w, q, b); else if (q.base == b && U.compareAndSwapObject(a, i, t, null)) { - U.putOrderedInt(q, QBASE, b + 1); + q.base = b + 1; if ((b + 1) - q.top < 0) signalWork(ws, q); w.runTask(t); @@ -1698,7 +1694,7 @@ public class ForkJoinPool extends AbstractExecutorService { long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK)); w.nextWait = e; w.eventCount = ec | INT_SIGN; - if (!U.compareAndSwapLong(this, CTL, c, nc)) + if (!compareAndSwapLongCtl(c, nc)) w.eventCount = ec; // back out } break; @@ -1734,7 +1730,7 @@ public class ForkJoinPool extends AbstractExecutorService { stat = w.qlock = -1; // pool is terminating else if ((ns = w.nsteals) != 0) { // collect steals and retry w.nsteals = 0; - U.getAndAddLong(this, STEALCOUNT, (long)ns); + getAndAddLongStealCount((long)ns); } else { long pc = ((d > 0 || ec != (e | INT_SIGN)) ? 0L : @@ -1750,15 +1746,15 @@ public class ForkJoinPool extends AbstractExecutorService { parkTime = deadline = 0L; if (w.eventCount == ec && ctl == c) { Thread wt = Thread.currentThread(); - U.putObject(wt, PARKBLOCKER, this); + wt.parkBlocker = this; w.parker = wt; // emulate LockSupport.park if (w.eventCount == ec && ctl == c) U.park(false, parkTime); // must recheck before park w.parker = null; - U.putObject(wt, PARKBLOCKER, null); + wt.parkBlocker = null; if (parkTime != 0L && ctl == c && deadline - System.nanoTime() <= 0L && - U.compareAndSwapLong(this, CTL, c, pc)) + compareAndSwapLongCtl(c, pc)) stat = w.qlock = -1; // shrink pool } } @@ -1783,7 +1779,7 @@ public class ForkJoinPool extends AbstractExecutorService { int ne = (e + E_SEQ) & E_MASK; if (q != null && q.base == b && w.eventCount < 0 && v.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { + compareAndSwapLongCtl(c, nc)) { v.eventCount = ne; if ((p = v.parker) != null) U.unpark(p); @@ -1854,7 +1850,7 @@ public class ForkJoinPool extends AbstractExecutorService { if (t == null) break restart; if (U.compareAndSwapObject(a, i, t, null)) { - U.putOrderedInt(v, QBASE, b + 1); + v.base = b + 1; ForkJoinTask<?> ps = joiner.currentSteal; int jt = joiner.top; do { @@ -1954,7 +1950,7 @@ public class ForkJoinPool extends AbstractExecutorService { (c & (AC_MASK|TC_MASK))); int ne = (e + E_SEQ) & E_MASK; if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, c, nc)) { + compareAndSwapLongCtl(c, nc)) { w.eventCount = ne; if ((p = w.parker) != null) U.unpark(p); @@ -1964,12 +1960,12 @@ public class ForkJoinPool extends AbstractExecutorService { else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 && (int)(c >> AC_SHIFT) + pc > 1) { long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK); - if (U.compareAndSwapLong(this, CTL, c, nc)) + if (compareAndSwapLongCtl(c, nc)) return true; // no compensation } else if (tc + pc < MAX_CAP) { long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); - if (U.compareAndSwapLong(this, CTL, c, nc)) { + if (compareAndSwapLongCtl(c, nc)) { ForkJoinWorkerThreadFactory fac; Throwable ex = null; ForkJoinWorkerThread wt = null; @@ -2025,8 +2021,8 @@ public class ForkJoinPool extends AbstractExecutorService { } } long c; // reactivate - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, + do {} while (!compareAndSwapLongCtl + (c = ctl, ((c & ~AC_MASK) | ((c & AC_MASK) + AC_UNIT)))); } @@ -2098,8 +2094,8 @@ public class ForkJoinPool extends AbstractExecutorService { if ((q = findNonEmptyStealQueue()) != null) { if (!active) { // re-establish active count active = true; - do {} while (!U.compareAndSwapLong - (this, CTL, c = ctl, + do {} while (!compareAndSwapLongCtl + (c = ctl, ((c & ~AC_MASK) | ((c & AC_MASK) + AC_UNIT)))); } @@ -2110,12 +2106,12 @@ public class ForkJoinPool extends AbstractExecutorService { long nc = ((c = ctl) & ~AC_MASK) | ((c & AC_MASK) - AC_UNIT); if ((int)(nc >> AC_SHIFT) + parallelism == 0) break; // bypass decrement-then-increment - if (U.compareAndSwapLong(this, CTL, c, nc)) + if (compareAndSwapLongCtl(c, nc)) active = false; } else if ((int)((c = ctl) >> AC_SHIFT) + parallelism <= 0 && - U.compareAndSwapLong - (this, CTL, c, ((c & ~AC_MASK) | + compareAndSwapLongCtl + (c, ((c & ~AC_MASK) | ((c & AC_MASK) + AC_UNIT)))) break; } @@ -2223,10 +2219,10 @@ public class ForkJoinPool extends AbstractExecutorService { if (!enable) return false; if ((ps & PL_LOCK) != 0 || - !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK)) + !compareAndSwapIntPlock(ps, ps += PL_LOCK)) ps = acquirePlock(); int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN; - if (!U.compareAndSwapInt(this, PLOCK, ps, nps)) + if (!compareAndSwapIntPlock(ps, nps)) releasePlock(nps); } for (long c;;) { @@ -2253,7 +2249,7 @@ public class ForkJoinPool extends AbstractExecutorService { } } } - if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) { + if (compareAndSwapLongCtl(c, c | STOP_BIT)) { for (int pass = 0; pass < 3; ++pass) { WorkQueue[] ws; WorkQueue w; Thread wt; if ((ws = workQueues) != null) { @@ -2284,7 +2280,7 @@ public class ForkJoinPool extends AbstractExecutorService { ((cc + AC_UNIT) & AC_MASK) | (cc & (TC_MASK|STOP_BIT))); if (w.eventCount == (e | INT_SIGN) && - U.compareAndSwapLong(this, CTL, cc, nc)) { + compareAndSwapLongCtl(cc, nc)) { w.eventCount = (e + E_SEQ) & E_MASK; w.qlock = -1; if ((p = w.parker) != null) @@ -2326,7 +2322,7 @@ public class ForkJoinPool extends AbstractExecutorService { (a = joiner.array) != null) { long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; if (U.getObject(a, j) == task && - U.compareAndSwapInt(joiner, QLOCK, 0, 1)) { + joiner.compareAndSwapIntQlock(0, 1)) { if (joiner.top == s && joiner.array == a && U.compareAndSwapObject(a, j, task, null)) { joiner.top = s - 1; @@ -3239,39 +3235,36 @@ public class ForkJoinPool extends AbstractExecutorService { return new ForkJoinTask.AdaptedCallable<T>(callable); } + @ikvm.internal.InterlockedCompareAndSet("ctl") + private native boolean compareAndSwapLongCtl(long expect, long update); + + @ikvm.internal.InterlockedCompareAndSet("plock") + private native boolean compareAndSwapIntPlock(int expect, int update); + + @ikvm.internal.InterlockedCompareAndSet("indexSeed") + private native boolean compareAndSwapIntIndexSeed(int expect, int update); + + @ikvm.internal.InterlockedCompareAndSet("stealCount") + private native boolean compareAndSwapLongStealCount(long expect, long update); + + private long getAndAddLongStealCount(long delta) { + for (;;) { + long value = stealCount; + if (compareAndSwapLongStealCount(value, value + delta)) { + return value; + } + } + } + // Unsafe mechanics private static final sun.misc.Unsafe U; - private static final long CTL; - private static final long PARKBLOCKER; private static final int ABASE; private static final int ASHIFT; - private static final long STEALCOUNT; - private static final long PLOCK; - private static final long INDEXSEED; - private static final long QBASE; - private static final long QLOCK; static { // initialize field offsets for CAS etc try { U = sun.misc.Unsafe.getUnsafe(); - Class<?> k = ForkJoinPool.class; - CTL = U.objectFieldOffset - (k.getDeclaredField("ctl")); - STEALCOUNT = U.objectFieldOffset - (k.getDeclaredField("stealCount")); - PLOCK = U.objectFieldOffset - (k.getDeclaredField("plock")); - INDEXSEED = U.objectFieldOffset - (k.getDeclaredField("indexSeed")); - Class<?> tk = Thread.class; - PARKBLOCKER = U.objectFieldOffset - (tk.getDeclaredField("parkBlocker")); - Class<?> wk = WorkQueue.class; - QBASE = U.objectFieldOffset - (wk.getDeclaredField("base")); - QLOCK = U.objectFieldOffset - (wk.getDeclaredField("qlock")); Class<?> ak = ForkJoinTask[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); |