Java Semaphore


Java 中 的 Semaphore (信号量) 是多线程编程中一种重要的同步工具, 用于控制对共享资源的访问。
通过 Semaphore, 我们可以限制同时访问共享资源的线程数量, 有效地管理并发访问, 确保程序在多线程环境下的稳定性和效率。
在一些资源有限制场景下, Semaphore 是特别合适的, 比如流量控制, 数据库连接池等。

1 Semaphore 的构造方法

public class Semaphore implements java.io.Serializable {

    public Semaphore(int permits) {
        // permits 同时可以有多少资源可以获取, 默认的实现为非公平实现
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        // 参数 2 可以配置是否为公平实现
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
}

通过上面的构造函数可以看出, Semaphore 最终还是依靠 AQS 实现的, 关于 AQS 就不展开了。
核心的思想就是传入一个整数, 表示资源的数量, 同时提供了一个布尔值, 来决定资源的竞争是否按照公平原则。

2 Semaphore 的主要方法

方法名 说明
void acquire() throws InterruptedException 获取一个资源,如果无法获取到, 则阻塞等待直至能够获取为止
void acquire(int permits) throws InterruptedException 同 acquire 方法功能基本一样, 只不过该方法可以一次性获取多个资源
void release() 释放一个资源
void release(int permits) 同 release 方法功能基本一样, 只不过该方法可以一次性释放多个资源
boolean tryAcquire() 尝试获取一个资源, 如果能够获取成功则立即返回 true, 否则, 则返回 false
boolean tryAcquire(int permits) 与 tryAcquire 方法一致, 只不过这里可以指定获取多个资源
boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException 尝试获取一个资源, 如果能够立即获取到或者在指定时间内能够获取到, 则返回 true, 否则返回 false
boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException 与上一个方法一致, 只不过这里能够获取多个资源
int availablePermits() 返回当前可用的资源个数
int getQueueLength() 返回正在等待获取资源的线程数
boolean hasQueuedThreads() 是否有线程正在等待获取资源
Collection<Thread> getQueuedThreads() 获取所有正在等待资源的线程集合

下面用一个简单的例子来说明 Semaphore 的具体使用。 我们来模拟这样一样场景。
有一天, 班主任需要班上 10 个同学到讲台上来填写一个表格, 但是老师只准备了 5 支笔, 因此, 只能保证同时只有 5
个同学能够拿到笔并填写表格,
没有获取到笔的同学只能够等前面的同学用完之后, 才能拿到笔去填写表格。该示例代码如下:

public class SemaphoreDemo {

    // 表示老师只有 10 支笔
    private static Semaphore semaphore = new Semaphore(5);

    public static void main(String[] args) {

        // 表示 10 个学生
        ExecutorService service = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            service.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + "  同学准备获取笔......");
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "  同学获取到笔");
                    System.out.println(Thread.currentThread().getName() + "  填写表格ing.....");
                    TimeUnit.SECONDS.sleep(3);
                    semaphore.release();
                    System.out.println(Thread.currentThread().getName() + "  填写完表格, 归还了笔!!!!!!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        service.shutdown();
    }
}

输出结果

pool-1-thread-1  同学准备获取笔......
pool-1-thread-1  同学获取到笔
pool-1-thread-1  填写表格ing.....
pool-1-thread-2  同学准备获取笔......
pool-1-thread-2  同学获取到笔
pool-1-thread-2  填写表格ing.....
pool-1-thread-3  同学准备获取笔......
pool-1-thread-4  同学准备获取笔......
pool-1-thread-3  同学获取到笔
pool-1-thread-4  同学获取到笔
pool-1-thread-4  填写表格ing.....
pool-1-thread-3  填写表格ing.....
pool-1-thread-5  同学准备获取笔......
pool-1-thread-5  同学获取到笔
pool-1-thread-5  填写表格ing.....
pool-1-thread-6  同学准备获取笔......
pool-1-thread-7  同学准备获取笔......
pool-1-thread-8  同学准备获取笔......
pool-1-thread-9  同学准备获取笔......
pool-1-thread-10  同学准备获取笔......

pool-1-thread-4  填写完表格, 归还了笔!!!!!!
pool-1-thread-9  同学获取到笔
pool-1-thread-9  填写表格ing.....
pool-1-thread-5  填写完表格, 归还了笔!!!!!!
pool-1-thread-7  同学获取到笔
pool-1-thread-7  填写表格ing.....
pool-1-thread-8  同学获取到笔
pool-1-thread-8  填写表格ing.....
pool-1-thread-1  填写完表格, 归还了笔!!!!!!
pool-1-thread-6  同学获取到笔
pool-1-thread-6  填写表格ing.....
pool-1-thread-3  填写完表格, 归还了笔!!!!!!
pool-1-thread-2  填写完表格, 归还了笔!!!!!!
pool-1-thread-10  同学获取到笔
pool-1-thread-10  填写表格ing.....
pool-1-thread-7  填写完表格, 归还了笔!!!!!!
pool-1-thread-9  填写完表格, 归还了笔!!!!!!
pool-1-thread-8  填写完表格, 归还了笔!!!!!!
pool-1-thread-6  填写完表格, 归还了笔!!!!!!
pool-1-thread-10  填写完表格, 归还了笔!!!!!!

根据输出结果进行分析, Semaphore 允许的最大资源为 5, 也就是允许的最大并发执行的线程个数为 5, 可以看出, 前 5 个线程(前 5 个学生)先获取到笔, 然后填写表格。
而 6-10 这 5 个线程, 由于获取不到资源, 只能阻塞等待。当线程 pool-1-thread-4 释放了资源后, pool-1-thread-9 就可以获取到许可, 继续往下执行,
对其他线程的执行过程, 也是同样的道理。 从这个例子就可以看出, Semaphore 用来做特殊资源的并发访问控制是相当合适的, 如果有业务场景需要进行流量控制, 可以优先考虑 Semaphore。

3 Semaphore 的源码实现

Semaphore 内部是通过 AQS 的共享锁实现的, 所以只要理解了 Semaphore 的同步器, 基本就能了解大体的实现了。

3.1 Semaphore 中的 同步器

public class Semaphore implements java.io.Serializable {

    /**
     * 内部定义的同步器
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {

        Sync(int permits) {
            setState(permits);
        }

        final int nonfairTryAcquireShared(int acquires) {
            // 非公平的获取锁
            for (;;) {
            
                // 当前的状态
                int available = getState();
                // 当前的状态 - 需要的状态, 得到剩下的状态
                int remaining = available - acquires;
                // remaining 小于 0 或者通 cas 设置为新的状态
                if (remaining < 0 || compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            // 共享锁的释放
            for (;;) {
                // 当前的状态
                int current = getState();
                // 增加释放的值
                int next = current + releases;
                // 增加后的值还是小于当前的状态值, 抛出异常
                if (next < current) 
                    throw new Error("Maximum permit count exceeded");
                // 通过 cas 设置, 成功了释放锁成功    
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            // 减少许可证, 即减少状态值
            for (;;) {
                // 获取当前的状态
                int current = getState();
                // 计算出新的状态值
                int next = current - reductions;
                // 新的状态值大于当前的状态值
                if (next > current)
                    throw new Error("Permit count underflow");
                // cas 交换    
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            // 将当前的状态值设置为 0 
            for (;;) {
                // 获取当前的状态值
                int current = getState();
                // 当前的状态值等于 0 了
                // 或者通过 cas 将当前的状态值设置为 0
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /**
     * 非公平锁的实现
     * 很简单, 全部都是直接复用 Sync 的方法
     */
    static final class NonfairSync extends Sync {

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * 公平锁的实现
     */
    static final class FairSync extends Sync {

        // 尝试获取共享锁
        // 这里是公平锁的实现, 而非公平锁的实现, 就是直接调用 nonfairTryAcquireShared 方法
        // FairSync 和 NonfairSync 只是在 Sync 的基础上重写了这个方法, 没有其他的改变了
        protected int tryAcquireShared(int acquires) {
            for (;;) {

                // 调用 AbstractQueuedSynchronizer 的 hasQueuedPredecessors 方法, 
                // 判断当前同步队列中是否有符合条件的候选节点, 即同步队列中有没有状态不是取消状态的节点, 
                // 有的话, 返回 -1, 尝试获取锁失败
                if (hasQueuedPredecessors())
                    return -1;
                // 获取可用的状态    
                int available = getState();
                // 可用的状态 - 需要的状态, 得到剩下的状态
                int remaining = available - acquires;
                // 如果剩余的状态小于 0 了 获取通过 cas 设置新的状态失败
                if (remaining < 0 || compareAndSetState(available, remaining))
                    // 返回剩余的状态
                    return remaining;
            }
        }
    }
    
}

3.2 Semaphore 的 acquire 方法

public class Semaphore implements java.io.Serializable {

    public void acquire() throws InterruptedException {

        // 1. 先调用到 AbstractQueuedSynchronizer 的 acquireSharedInterruptibly
        // 2. 在 AQS 的 acquireSharedInterruptibly 中先通过 Semaphore 自定义的 Sync 的 tryAcquireShared() 方法判断是否可以获取锁
        // 在 tryAcquireShared 方法获取当前的状态值, 通过当前的状态值 - 需要获取的状态值, 得到剩余的状态值
        // 如果剩余的状态值小于 0, 否则通过 cas 交换当前的状态值为剩余值
        // 最后返回剩余值
        // 3. 获取锁失败后, 会加入同步队列, 等待唤醒
        sync.acquireSharedInterruptibly(1);
    }
}

3.3 Semaphore 的 release 方法

public class Semaphore implements java.io.Serializable {

    public void release() {

        // 1. 先调用到 AbstractQueuedSynchronizer 的 releaseShared
        // 2. 在 AQS 的 releaseShared 中先通过 CountDownLatch 自定义的 Sync 的 tryReleaseShared() 方法判断是否可以释放锁
        // 在 tryReleaseShared 方法中, 获取到当前的状态值, 当前的状态值 + 释放的状态值, 得到最新的状态值
        // 通过 cas 设置当前的状态值为最新的状态值, 释放锁成功
        sync.releaseShared(1);
    }
}

几乎所有的方法都是基于同步器 AQS 实现的, 所有理解了 AQS 的实现, Semaphore 的实现也就不难理解了。
至于其他的方法, 比如 tryAcquire, tryRelease, availablePermits 等方法也都是同样的思想, 这里就不再赘述了。

4 参考

大白话说Java并发工具类-Semaphore, Exchanger


  目录