JohnShen's Blog.

[回顾并发基础] 常用并发工具类

字数统计: 1.2k阅读时长: 6 min
2019/09/22 Share

Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

Semaphore 有 Lock 不易实现的功能:Semaphore 可以允许多个线程访问一个临界区

1
2
3
4
5
6
7
8
9
10
11
12
public Semaphore(int permits);
public Semaphore(int permits, boolean fair);
public void acquire() throws InterruptedException;
public void acquireUninterruptibly();
public void acquire(int permits) throws InterruptedException;
public void acquireUninterruptibly(int permits);
public boolean tryAcquire();
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException;
public boolean tryAcquire(int permits);
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException;
public void release()
public void release(int permits);

其他方法:

1
2
3
4
5
public int availablePermits(); // 返回此信号量中当前可用的许可证
public final int getQueueLength(); // 返回正在等待获取许可证的线程数
public final boolean hasQueuedThreads(); // 是否有线程正在等待获取许可证
protected void reducePermits(int reduction); // 减少reduction个许可证
protected Collection<Thread> getQueuedThreads(); // 返回所有等待获取许可证的线程集合

应用示例

示例一:共10个线程,每次只有两个线程可以获取到资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class SemaphoreTest {
private static final int THREAD_COUNT = 10;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(2);

public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(() -> {
try {
s.acquire();
TimeUnit.SECONDS.sleep(2);
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
}
});
}
threadPool.shutdown();
}
}

示例二:Semaphore的许可量为0,后两个线程分别release,使得acquire(2)的线程可以继续下去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.submit(() -> {
try {
System.out.println(Thread.currentThread() + " over ");
semaphore.release();
} catch (Exception e) {
}
});
executorService.submit(() -> {
try {
System.out.println(Thread.currentThread() + " over ");
semaphore.release();
} catch (Exception e) {
}
});
semaphore.acquire(2);
System.out.println("all child thread over ");
executorService.shutdown();
}
}

示例三:官方示例

Semaphores are often used to restrict the number of threads than can access some (physical or logical) resource. For example, here is a class that uses a semaphore to control access to a pool of items:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}

// Not a particularly efficient data structure; just for demo

protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}

protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}

实现

Semaphore 还是使用 AQS 实现的。Sync 只是对 AQS 的一个修饰,并且 Sync 有两个实现类(NonfairSync,FairSync),用来指定获取信号量时是否采用公平策略,默认采用非公平策略。相对而言,比较简单,这里不赘述。把源码的文档注释删掉,其实也就是一百多行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;

private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) {
setState(permits);
}

final int getPermits() {
return getState();
}

final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

public void acquireUninterruptibly() {
sync.acquireShared(1);
}

public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void release() {
sync.releaseShared(1);
}

public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

public int availablePermits() {
return sync.getPermits();
}

public int drainPermits() {
return sync.drainPermits();
}

protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}

public boolean isFair() {
return sync instanceof FairSync;
}

public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public final int getQueueLength() {
return sync.getQueueLength();
}

protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}

public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}
CATALOG
  1. 1. Semaphore
    1. 1.1. 应用示例
    2. 1.2. 实现