Semaphore Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore 有 Lock 不易实现的功能:Semaphore 可以允许多个线程访问一个临界区 。
1 2 3 4 5 6 7 8 9 10 11 12 public Semaphore (int permits) ;public Semaphore (int permits, boolean fair) ;public void acquire () throws InterruptedException ;public void acquireUninterruptibly () ;public void acquire (int permits) throws InterruptedException ;public void acquireUninterruptibly (int permits) ;public boolean tryAcquire () ;public boolean tryAcquire (long timeout, TimeUnit unit) throws InterruptedException ;public boolean tryAcquire (int permits) ;public boolean tryAcquire (int permits, long timeout, TimeUnit unit) throws InterruptedException ;public void release () ;public void release (int permits) ;
其他方法:
1 2 3 4 5 public int availablePermits () ; public final int getQueueLength () ; public final boolean hasQueuedThreads () ; protected void reducePermits (int reduction) ; protected Collection<Thread> getQueuedThreads () ;
应用示例 示例一:共10个线程,每次只有两个线程可以获取到资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class SemaphoreTest { private static final int THREAD_COUNT = 10 ; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(2 ); public static void main (String[] args) { for (int i = 0 ; i < THREAD_COUNT; i++) { threadPool.execute(() -> { try { s.acquire(); TimeUnit.SECONDS.sleep(2 ); System.out.println("save data" ); s.release(); } catch (InterruptedException e) { } }); } threadPool.shutdown(); } }
示例二:Semaphore的许可量为0,后两个线程分别release,使得acquire(2)的线程可以继续下去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class SemaphoreTest { private static Semaphore semaphore = new Semaphore(0 ); public static void main (String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2 ); executorService.submit(() -> { try { System.out.println(Thread.currentThread() + " over " ); semaphore.release(); } catch (Exception e) { } }); executorService.submit(() -> { try { System.out.println(Thread.currentThread() + " over " ); semaphore.release(); } catch (Exception e) { } }); semaphore.acquire(2 ); System.out.println("all child thread over " ); executorService.shutdown(); } }
示例三:官方示例
Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource. For example, here is a class that uses a semaphore to control access to a pool of items:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class Pool { private static final int MAX_AVAILABLE = 100 ; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true ); public Object getItem () throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem (Object x) { if (markAsUnused(x)) available.release(); } protected Object[] items = ... whatever kinds of items being managed protected boolean [] used = new boolean [MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem () { for (int i = 0 ; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true ; return items[i]; } } return null ; } protected synchronized boolean markAsUnused (Object item) { for (int i = 0 ; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false ; return true ; } else return false ; } } return false ; } }
实现 Semaphore 还是使用 AQS 实现的。Sync 只是对 AQS 的一个修饰,并且 Sync 有两个实现类(NonfairSync
,FairSync
),用来指定获取信号量时是否采用公平策略 ,默认采用非公平策略。相对而言,比较简单,这里不赘述。把源码的文档注释删掉,其实也就是一百多行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 public class Semaphore implements java .io .Serializable { private static final long serialVersionUID = -3222578661600680210L ; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; Sync(int permits) { setState(permits); } final int getPermits () { return getState(); } final int nonfairTryAcquireShared (int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } final void reducePermits (int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) throw new Error("Permit count underflow" ); if (compareAndSetState(current, next)) return ; } } final int drainPermits () { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0 )) return current; } } } static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L ; FairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } public Semaphore (int permits) { sync = new NonfairSync(permits); } public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public void acquireUninterruptibly () { sync.acquireShared(1 ); } public boolean tryAcquire () { return sync.nonfairTryAcquireShared(1 ) >= 0 ; } public boolean tryAcquire (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } public void release () { sync.releaseShared(1 ); } public void acquire (int permits) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly (int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireShared(permits); } public boolean tryAcquire (int permits) { if (permits < 0 ) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0 ; } public boolean tryAcquire (int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); } public void release (int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.releaseShared(permits); } public int availablePermits () { return sync.getPermits(); } public int drainPermits () { return sync.drainPermits(); } protected void reducePermits (int reduction) { if (reduction < 0 ) throw new IllegalArgumentException(); sync.reducePermits(reduction); } public boolean isFair () { return sync instanceof FairSync; } public final boolean hasQueuedThreads () { return sync.hasQueuedThreads(); } public final int getQueueLength () { return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads () { return sync.getQueuedThreads(); } public String toString () { return super .toString() + "[Permits = " + sync.getPermits() + "]" ; } }