使用递增计数器的线程同步工具——信号量,它的原理了解么?
haoteby 2025-07-08 18:08 6 浏览
前言
" 在 JUC 中线程同步器除了 CountDownLatch 和 CycleBarrier ,还有一个叫做 Semaphore (信号量),同样是基于 AQS 实现的。下面来看看信号量的内部原理。 "
1
介绍
一个计数信号量。 从概念上讲,信号量维护了一组许可。 如果有必要,在许可可用之前调用 acquire 方法会被阻塞,直到许可证可用。 调用 release 方法会增加了一个许可证,从而释放被阻塞的线程。
- 声明时指定初始许可数量。
- 调用 acquire(int permits) 方法,指定目标许可数量。
- 调用 release(int permits) 方法,发布指定的许可数量。
在许可数量没有到达指定目标数量时,调用 acquire 方法的线程会被阻塞。
基本使用
public class SemaphoreTest1 {
private static final Semaphore SEMAPHORE = new Semaphore(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; i++) {
pool.submit(() -> {
try {
Thread.sleep(1000 + new Random().nextInt(1000));
} catch (InterruptedException ignored) {
}
System.out.println("当前线程: " + Thread.currentThread().getName() + " 发布一个许可");
SEMAPHORE.release(1);
});
}
System.out.println("-----> 这里是主线程");
SEMAPHORE.acquire(5);
System.out.println("-----> 主线程执行完毕");
pool.shutdown();
}
}
-----> 这里是主线程
当前线程: Thread-pool-2 发布一个许可
当前线程: Thread-pool-4 发布一个许可
当前线程: Thread-pool-1 发布一个许可
当前线程: Thread-pool-0 发布一个许可
当前线程: Thread-pool-3 发布一个许可
-----> 主线程执行完毕
上面这个方法也是模拟了类似 CountDownLatch 的用法, 在子线程执行完毕之后,主线程继续执行。只不过 Semaphore 和 CountDownLatch 区别最大的是:
Semaphore 是从指定数值开始增加,直到到达许可数量,然后被阻塞线程开始继续执行。
CountDownLatch 是从指定数量的线程开始减少,直到为 0 时,被阻塞的线程开始继续执行。
当然这只是最简单的用法,除此让主线程等待,同样也可以让其他线程等待,然后再开始执行。
问题疑问
- Semaphore 和 AQS 有什么关系?
- Semaphore 和 CountDownLatch 有什么区别?
2
源码分析
基本结构
通过类图可以看出在 Semaphore 里面有一个静态内部类 Sync 继承了 AQS,同时为了区分公平和非公平的情况,Sync 分别有两个子类:NonfairSync 、FairSync。
下面根据案例分别从构造函数、acquire()、release() 入手,从而了解内部实现原理。
初始化
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
初始化默认非公平锁, 同时需要传入指定许可数, 可以看到这块代码是调用的 AQS 的 setState(permits) 方法。代码如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
}
setState 方法其实就是对 AQS 的 state 进行赋值。
" 补充
在 ReentrantLock 中 state 代表加锁状态,0 没有线程获得锁,大于等于 1 已经有线程获得锁,大于 1 说明该获得锁的线程多次重入。
在 ReentrantReadWriteLock 中 state 代表锁的状态。state 为 0 ,没有线程持有锁,state 的高 16 为代表读锁状态,低 16 为代表写锁状态。通过位运算可以获取读写锁的实际值。
而在这里 (CountDownLatch)则代表门闩或者说计数的值。"
如果对 state 有所遗忘,可以阅读前面的 AQS 、CAS 相关代码。state 在这里代表的是信号量的许可数量。
acquire()
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
acquire() 和 acquire(int permits) 调用的都是 sync.acquireSharedInterruptibly(permits) 方法,只不过一个支持传递参数,一个默认为 1。
acquireSharedInterruptibly 方法,其实就是 Sync 继承自 AQS 的。
这块可以阅读 AQS 的文章,这里简单介绍下:
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 在失败后会使用 doAcquireSharedInterruptibly(arg); 不断获取资源;
- final Node node = addWaiter(Node.SHARED); 会创建节点以共享模式放到队列里;
- 在循环中不断判断前一个节点,如果是 head,则尝试获取共享资源;
- 在共享模式下获取到资源后会使用 setHeadAndPropagate(node, r); 设置头节点,同时唤醒后续节点。
tryAcquireShared 是需要子类实现,也就是在 Semaphore.Sync 的实现类中实现了,这里以 FairSync 做讲解:
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果前面有节点,则直接返回 -1 表示失败
if (hasQueuedPredecessors())
return -1;
// 获取当前信号量
int available = getState();
// 获取当前剩余量
int remaining = available - acquires;
// 如果小于 0 或者 CAS 设置信号量成功 则直接返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
而这段代码的含义:
- 如果前面有节点,则直接阻塞;
- 如果当前剩余信号量小于 0 ,则返回负值,直接阻塞;
- 如果当前剩余量大于等于 0 ,会 CAS 更新信号量,并返回非负数。
" 这块数值的含义,在 AQS 中定义了,含义如下:
小于 0: 表示失败;
等于 0: 表示共享模式获取资源成功,但后续的节点不能以共享模式获取成功;
大于 0: 表示共享模式获取资源成功,后续节点在共享模式获取也可能会成功,在这种情况下,后续等待线程必须检查可用性。"
releas()
public void release() {
sync.releaseShared(1);
}
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
发布许可证的给定数量,该数量增加可用的许可数量。看其内部调用的是 Sync 的 releaseShared, 其实就是 AQS 的对应方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
如果实现tryReleaseShared返回true,以共享模式释放资源。其中的 tryReleaseShared 部分由 Semaphore.Sync 中实现,逻辑如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前 state
int current = getState();
// 对 state 进行增加
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 使用 CAS 赋值
if (compareAndSetState(current, next))
return true;
}
}
通过上面代码可以看出,在 Semaphore 的 release 方法中主要就是对 state 进行增加,增加成功后会调用 AQS 的 doReleaseShared 方法唤醒头节点。
3
总结
Q&A
Q: 既然 Semaphore 也是基于 AQS, 那在 Semaphore 中 state 的含义代表什么?
A: 在 Semaphore 中 state 代表许可数量,acquire 方法当许可小于指定数量会阻塞线程,release 方法增加许可当许可增加成功则唤醒阻塞节点。
Q: Semaphore 基于 AQS 具体是怎么实现的呢?
A:
- 初始设置 state 的初始值,即初始许可数量。
- acquire 方法设置目标数量,当目标数量大于当前数量时,会阻塞线程并将其放到阻塞队列中。此处基于 AQS 实现。
- release 对 state 进行增加,成功后会调用 AQS 的 doReleaseShared 唤醒头结点。同样是基于 AQS 实现。
Q: Semaphore 和 CountDownLatch 有什么区别?
A: 在Semaphore 的计数器是递加的,而 CountDownLatch 是递减的。相同点就是计数器都不可以重置。
结束语
在阅读 Semaphore 源码过程中,发现其主要功能都是基于 AQS 实现的,可以回顾阅读 AQS 的相关笔记。同样 Semaphore 也支持公平和非公平模式,这块就需要小伙伴自己去阅读啦。
- <End /> -
作者:刘志航,一个宅宅的北漂程序员。
公众号:liuzhihangs,记录工作学习中的技术、开发及源码笔记;时不时分享一些生活中的见闻感悟。欢迎大佬来指导!
相关推荐
- 手把手教你构建一个简单的Eclipse RCP应用
-
EclipseRCP应用,通常用来构建跨平台的图形化管理客户端,Eclipse从IBM开源以来,一直占据开源Java开发平台的头把交椅,现在仍然收到很多人的追捧。今天就带大家通过一个简单的例子:开发...
- Eclipse配置maven 环境(maven的配置、以及eclipse中配置maven)
-
Eclipse配置maven环境的先决条件是,Windows系统已经配置好maven环境Eclipse配置maven环境步骤如下:一、给Eclipse添加本地maven...
- 如何在Eclipse中搭建Zabbix源码的调试和开发环境
-
Zabbix是一款非常优秀的企业级软件,被设计用于对数万台服务器、虚拟机和网络设备的数百万个监控项进行实时监控。Zabbix是开放源码和免费的,这就意味着当出现bug时,我们可以很方便地通过调试源码来...
- Eclipse中将现有的maven项目 导入Git,并发布到
-
Eclipse中将现有的maven项目导入Git,并发布到github一、Eclipse中将现有的maven项目导入Git1.将本地的maven项目,添加他的子项目到git仓库,并发布到githu...
- eclipse安装图解(eclipse安装教程2021)
-
下载eclipse之前请先安装jdk、查看自己电脑系统是多少位第一步:打开官网https://www.eclipse.org/downloads/第二步:点击DownloadPackages第三...
- Eclipse IDE for C/C++ Developers 开发环境搭建详解
-
EclipseIDEforC/C++Developers开发环境搭建详解1.到官网下载eclipseforC/C++Developmer解压就行2.下载MinGW用来编译C/C+...
- 来来来!一文告诉你Eclipse的正确安装使用姿势,你都清楚吗?
-
前言本学习笔记是有关如何设置Eclipse的详细说明。即使你天天在使用它,但是,相信我,或许你并不足够了解它。安装Java运行时环境Eclipse是Java应用程序,因此设置Eclipse的第一步是安...
- 纯干货!Eclipse的安装与使用(eclipse 安装教程)
-
之前有人给小华君留言,说让小华君讲一讲Eclipse,那好,我们今天就简单地讲一下。讲得也是基础部分,如题,主要是Eclipse的安装与使用。废话不多说,开始讲。Eclipse是Java开发的集成开发...
- 2020 最新版jdk & eclipse下载安装 之JDK(一)
-
首次安装Eclipse,去官网下载资源找不对安装包,安装之后又报错,如果和我一样的话,那就来看我的分享吧安装eclipse前,需要先安装JDK软件首先,到oracle官网下载JDK安装包下载链接:...
- Eclipse 安装教程(附安装包下载)(eclipse安装教程最新版)
-
Eclipse软件介绍是一个开放源代码、基于Java的可扩展开发平台。它本身只是一个框架和一组服务,通过插件组件构建开发环境。幸运的是,Eclipse附带了一个标准的插件集,包括Java开发工具(Ja...
- JDK安装、Eclipse安装及运行环境配置
-
1、eclipse下载打开地址:http://www.eclipse.org/downloads/;根据自己机器的操作系统,页面上显示适应机器操作系统的Eclipse下载列表,也可以点击下图所示位置切...
- Ubuntu Linux 21.10官方壁纸现已提供下载 最高8192×4608分辨率
-
距离十月份的Ubuntu21.10Linux发行版的到来,已只有数周的时间。在今年4月介绍了与之有关的大量细节之后,Canonical现又放出了代号为“ImpishIndri”的这一大...
- Linux 4.7系统内核发布:支持RX 480
-
经过一周休假之后,LinusTorvalds今天正式发布了新版LinuxKernel4.7,可在官网直接下载。Linux4.7版内核的开发启动于5月29日,经过了七个RC候选版,加入了不少新特...
- 开发企业官网就用这个基于SpringBoot的CMS系统,真香
-
前言推荐这个项目是因为使用手册部署手册非常...
- 非常详细的Linux系统安装教程!建议收藏
-
公众号:老油条IT记一、下载ISO镜像#官网:CentOS:http://mirror-status.centos.org/#cn#其他:网易:http://mirrors.163.com/cento...