Java 中 的 Semaphore (信号量) 是多线程编程中一种重要的同步工具, 用于控制对共享资源的访问。
通过 Semaphore, 我们可以限制同时访问共享资源的线程数量, 有效地管理并发访问, 确保程序在多线程环境下的稳定性和效率。
在一些资源有限制场景下, Semaphore 是特别合适的, 比如流量控制, 数据库连接池等。
1 Semaphore 的构造方法
public class Semaphore implements java.io.Serializable {
public Semaphore(int permits) {
// permits 同时可以有多少资源可以获取, 默认的实现为非公平实现
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
// 参数 2 可以配置是否为公平实现
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
}
通过上面的构造函数可以看出, Semaphore 最终还是依靠 AQS 实现的, 关于 AQS 就不展开了。
核心的思想就是传入一个整数, 表示资源的数量, 同时提供了一个布尔值, 来决定资源的竞争是否按照公平原则。
2 Semaphore 的主要方法
方法名 | 说明 |
---|---|
void acquire() throws InterruptedException |
获取一个资源,如果无法获取到, 则阻塞等待直至能够获取为止 |
void acquire(int permits) throws InterruptedException |
同 acquire 方法功能基本一样, 只不过该方法可以一次性获取多个资源 |
void release() |
释放一个资源 |
void release(int permits) |
同 release 方法功能基本一样, 只不过该方法可以一次性释放多个资源 |
boolean tryAcquire() |
尝试获取一个资源, 如果能够获取成功则立即返回 true, 否则, 则返回 false |
boolean tryAcquire(int permits) |
与 tryAcquire 方法一致, 只不过这里可以指定获取多个资源 |
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException |
尝试获取一个资源, 如果能够立即获取到或者在指定时间内能够获取到, 则返回 true, 否则返回 false |
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException |
与上一个方法一致, 只不过这里能够获取多个资源 |
int availablePermits() |
返回当前可用的资源个数 |
int getQueueLength() |
返回正在等待获取资源的线程数 |
boolean hasQueuedThreads() |
是否有线程正在等待获取资源 |
Collection<Thread> getQueuedThreads() |
获取所有正在等待资源的线程集合 |
下面用一个简单的例子来说明 Semaphore 的具体使用。 我们来模拟这样一样场景。
有一天, 班主任需要班上 10 个同学到讲台上来填写一个表格, 但是老师只准备了 5 支笔, 因此, 只能保证同时只有 5
个同学能够拿到笔并填写表格,
没有获取到笔的同学只能够等前面的同学用完之后, 才能拿到笔去填写表格。该示例代码如下:
public class SemaphoreDemo {
// 表示老师只有 10 支笔
private static Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
// 表示 10 个学生
ExecutorService service = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
service.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 同学准备获取笔......");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " 同学获取到笔");
System.out.println(Thread.currentThread().getName() + " 填写表格ing.....");
TimeUnit.SECONDS.sleep(3);
semaphore.release();
System.out.println(Thread.currentThread().getName() + " 填写完表格, 归还了笔!!!!!!");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
service.shutdown();
}
}
输出结果
pool-1-thread-1 同学准备获取笔......
pool-1-thread-1 同学获取到笔
pool-1-thread-1 填写表格ing.....
pool-1-thread-2 同学准备获取笔......
pool-1-thread-2 同学获取到笔
pool-1-thread-2 填写表格ing.....
pool-1-thread-3 同学准备获取笔......
pool-1-thread-4 同学准备获取笔......
pool-1-thread-3 同学获取到笔
pool-1-thread-4 同学获取到笔
pool-1-thread-4 填写表格ing.....
pool-1-thread-3 填写表格ing.....
pool-1-thread-5 同学准备获取笔......
pool-1-thread-5 同学获取到笔
pool-1-thread-5 填写表格ing.....
pool-1-thread-6 同学准备获取笔......
pool-1-thread-7 同学准备获取笔......
pool-1-thread-8 同学准备获取笔......
pool-1-thread-9 同学准备获取笔......
pool-1-thread-10 同学准备获取笔......
pool-1-thread-4 填写完表格, 归还了笔!!!!!!
pool-1-thread-9 同学获取到笔
pool-1-thread-9 填写表格ing.....
pool-1-thread-5 填写完表格, 归还了笔!!!!!!
pool-1-thread-7 同学获取到笔
pool-1-thread-7 填写表格ing.....
pool-1-thread-8 同学获取到笔
pool-1-thread-8 填写表格ing.....
pool-1-thread-1 填写完表格, 归还了笔!!!!!!
pool-1-thread-6 同学获取到笔
pool-1-thread-6 填写表格ing.....
pool-1-thread-3 填写完表格, 归还了笔!!!!!!
pool-1-thread-2 填写完表格, 归还了笔!!!!!!
pool-1-thread-10 同学获取到笔
pool-1-thread-10 填写表格ing.....
pool-1-thread-7 填写完表格, 归还了笔!!!!!!
pool-1-thread-9 填写完表格, 归还了笔!!!!!!
pool-1-thread-8 填写完表格, 归还了笔!!!!!!
pool-1-thread-6 填写完表格, 归还了笔!!!!!!
pool-1-thread-10 填写完表格, 归还了笔!!!!!!
根据输出结果进行分析, Semaphore 允许的最大资源为 5, 也就是允许的最大并发执行的线程个数为 5, 可以看出, 前 5 个线程(前 5 个学生)先获取到笔, 然后填写表格。
而 6-10 这 5 个线程, 由于获取不到资源, 只能阻塞等待。当线程 pool-1-thread-4 释放了资源后, pool-1-thread-9 就可以获取到许可, 继续往下执行,
对其他线程的执行过程, 也是同样的道理。 从这个例子就可以看出, Semaphore 用来做特殊资源的并发访问控制是相当合适的, 如果有业务场景需要进行流量控制, 可以优先考虑 Semaphore。
3 Semaphore 的源码实现
Semaphore 内部是通过 AQS 的共享锁实现的, 所以只要理解了 Semaphore 的同步器, 基本就能了解大体的实现了。
3.1 Semaphore 中的 同步器
public class Semaphore implements java.io.Serializable {
/**
* 内部定义的同步器
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
final int nonfairTryAcquireShared(int acquires) {
// 非公平的获取锁
for (;;) {
// 当前的状态
int available = getState();
// 当前的状态 - 需要的状态, 得到剩下的状态
int remaining = available - acquires;
// remaining 小于 0 或者通 cas 设置为新的状态
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
protected final boolean tryReleaseShared(int releases) {
// 共享锁的释放
for (;;) {
// 当前的状态
int current = getState();
// 增加释放的值
int next = current + releases;
// 增加后的值还是小于当前的状态值, 抛出异常
if (next < current)
throw new Error("Maximum permit count exceeded");
// 通过 cas 设置, 成功了释放锁成功
if (compareAndSetState(current, next))
return true;
}
}
final void reducePermits(int reductions) {
// 减少许可证, 即减少状态值
for (;;) {
// 获取当前的状态
int current = getState();
// 计算出新的状态值
int next = current - reductions;
// 新的状态值大于当前的状态值
if (next > current)
throw new Error("Permit count underflow");
// cas 交换
if (compareAndSetState(current, next))
return;
}
}
final int drainPermits() {
// 将当前的状态值设置为 0
for (;;) {
// 获取当前的状态值
int current = getState();
// 当前的状态值等于 0 了
// 或者通过 cas 将当前的状态值设置为 0
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* 非公平锁的实现
* 很简单, 全部都是直接复用 Sync 的方法
*/
static final class NonfairSync extends Sync {
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平锁的实现
*/
static final class FairSync extends Sync {
// 尝试获取共享锁
// 这里是公平锁的实现, 而非公平锁的实现, 就是直接调用 nonfairTryAcquireShared 方法
// FairSync 和 NonfairSync 只是在 Sync 的基础上重写了这个方法, 没有其他的改变了
protected int tryAcquireShared(int acquires) {
for (;;) {
// 调用 AbstractQueuedSynchronizer 的 hasQueuedPredecessors 方法,
// 判断当前同步队列中是否有符合条件的候选节点, 即同步队列中有没有状态不是取消状态的节点,
// 有的话, 返回 -1, 尝试获取锁失败
if (hasQueuedPredecessors())
return -1;
// 获取可用的状态
int available = getState();
// 可用的状态 - 需要的状态, 得到剩下的状态
int remaining = available - acquires;
// 如果剩余的状态小于 0 了 获取通过 cas 设置新的状态失败
if (remaining < 0 || compareAndSetState(available, remaining))
// 返回剩余的状态
return remaining;
}
}
}
}
3.2 Semaphore 的 acquire 方法
public class Semaphore implements java.io.Serializable {
public void acquire() throws InterruptedException {
// 1. 先调用到 AbstractQueuedSynchronizer 的 acquireSharedInterruptibly
// 2. 在 AQS 的 acquireSharedInterruptibly 中先通过 Semaphore 自定义的 Sync 的 tryAcquireShared() 方法判断是否可以获取锁
// 在 tryAcquireShared 方法获取当前的状态值, 通过当前的状态值 - 需要获取的状态值, 得到剩余的状态值
// 如果剩余的状态值小于 0, 否则通过 cas 交换当前的状态值为剩余值
// 最后返回剩余值
// 3. 获取锁失败后, 会加入同步队列, 等待唤醒
sync.acquireSharedInterruptibly(1);
}
}
3.3 Semaphore 的 release 方法
public class Semaphore implements java.io.Serializable {
public void release() {
// 1. 先调用到 AbstractQueuedSynchronizer 的 releaseShared
// 2. 在 AQS 的 releaseShared 中先通过 CountDownLatch 自定义的 Sync 的 tryReleaseShared() 方法判断是否可以释放锁
// 在 tryReleaseShared 方法中, 获取到当前的状态值, 当前的状态值 + 释放的状态值, 得到最新的状态值
// 通过 cas 设置当前的状态值为最新的状态值, 释放锁成功
sync.releaseShared(1);
}
}
几乎所有的方法都是基于同步器 AQS 实现的, 所有理解了 AQS 的实现, Semaphore 的实现也就不难理解了。
至于其他的方法, 比如 tryAcquire, tryRelease, availablePermits 等方法也都是同样的思想, 这里就不再赘述了。