Node.js 多线程进阶:worker_threads 中 Atomics 与 SharedArrayBuffer 的深度同步实践
你好,我是你们的“老朋友”——“代码挖掘机”。今天咱们不聊那些花里胡哨的框架,来聊点 Node.js 多线程编程中的硬核知识:worker_threads 模块里的 Atomics 对象以及它在 SharedArrayBuffer 线程同步中的作用。如果你已经对 Node.js 的多线程编程有了一定了解,那么相信这篇文章能让你对线程同步的理解更上一层楼。
为什么需要 Atomics 和 SharedArrayBuffer?
在传统的 JavaScript 单线程模型中,我们很少需要考虑并发问题。但是,当 Node.js 引入 worker_threads 后,多线程编程成为可能,并发问题也随之而来。多个线程同时访问和修改同一块内存区域(SharedArrayBuffer)时,如果没有适当的同步机制,就会出现数据竞争(Data Race),导致程序行为不可预测。
Atomics 对象提供了一组原子操作,可以保证对 SharedArrayBuffer 的读写操作是“原子”的,即不可中断的。这意味着,在一个线程执行原子操作期间,其他线程无法访问或修改同一内存位置的数据。这就像给共享数据加了一把“锁”,确保同一时刻只有一个线程能操作它。
Atomics 对象的核心方法
Atomics 对象提供了一系列静态方法,用于对 SharedArrayBuffer 进行原子操作。下面介绍几个核心方法,并通过代码示例演示其用法。
1. Atomics.load(typedArray, index)
原子地读取 typedArray 中指定 index 位置的值。
const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
// 初始值
int32[0] = 10;
// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
// console.log('从主线程读取的值:', Atomics.load(int32, 0)); // 原子地读取值
// port.close();
// });
// 模拟主线程的操作,直接打印
console.log('从主线程读取的值:', Atomics.load(int32, 0));
2. Atomics.store(typedArray, index, value)
原子地将 value 写入 typedArray 中指定 index 的位置,并返回旧值。
const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
// const oldValue = Atomics.store(int32, 0, 20); // 原子地写入新值
// console.log('旧值:', oldValue); // 输出旧值
// console.log('新值:', Atomics.load(int32, 0)); // 再次读取确认
// port.close();
// });
// 模拟主线程
const oldValue = Atomics.store(int32, 0, 20); // 原子地写入新值
console.log('旧值:', oldValue); // 输出旧值
console.log('新值:', Atomics.load(int32, 0)); // 再次读取确认
3. Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)
原子地比较 typedArray 中 index 位置的值与 expectedValue。如果相等,则将 index 位置的值替换为 replacementValue,并返回旧值;否则,不进行任何操作,直接返回旧值。
这个方法是实现互斥锁(Mutex)的基础。
const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);
// 初始值
int32[0] = 0; // 0 表示锁空闲,1 表示锁被占用
// 尝试获取锁
function acquireLock() {
while (true) {
const oldValue = Atomics.compareExchange(int32, 0, 0, 1);
if (oldValue === 0) {
// 获取锁成功
return;
}
// 锁被占用,等待一段时间后重试,或者使用 Atomics.wait
Atomics.wait(int32, 0, 1); //等待
}
}
// 释放锁
function releaseLock() {
const oldValue = Atomics.store(int32, 0, 0);
if (oldValue !== 1) {
throw new Error('释放锁失败');
}
Atomics.notify(int32, 0, 1);
}
// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
// acquireLock();
// console.log('Worker 线程获取到锁,开始执行临界区代码...');
// // 模拟临界区代码执行
// for (let i = 0; i < 1000000; i++) { }
// console.log('Worker 线程执行完毕,释放锁...');
// releaseLock();
// port.close();
// });
acquireLock();
console.log('主线程获取到锁,开始执行临界区代码...');
// 模拟临界区代码执行
for (let i = 0; i < 1000000; i++) { }
console.log('主线程执行完毕,释放锁...');
releaseLock();
4. Atomics.wait(typedArray, index, value, timeout)
原子地检查 typedArray 中 index 位置的值是否等于 value。如果相等,则当前线程进入休眠状态,直到被 Atomics.notify 唤醒或超时(timeout,以毫秒为单位)。如果 index 位置的值不等于 value,则立即返回 'not-equal'。
Atomics.wait 和 Atomics.notify 结合使用,可以实现条件变量。
5. Atomics.notify(typedArray, index, count)
唤醒在 typedArray 中 index 位置等待的最多 count 个线程。返回实际唤醒的线程数。
// 续上例 compareExchange
其他原子操作方法
除了上述几个核心方法外,Atomics 对象还提供了一些其他原子操作方法,如:
Atomics.add(typedArray, index, value):原子地将value加到typedArray中index位置的值上,并返回旧值。Atomics.sub(typedArray, index, value):原子地从typedArray中index位置的值减去value,并返回旧值。Atomics.and(typedArray, index, value):原子地将typedArray中index位置的值与value进行按位与操作,并返回旧值。Atomics.or(typedArray, index, value):原子地将typedArray中index位置的值与value进行按位或操作,并返回旧值。Atomics.xor(typedArray, index, value):原子地将typedArray中index位置的值与value进行按位异或操作,并返回旧值。Atomics.exchange(typedArray, index, value): 原子地将typedArray中index位置的值设置为value,并返回旧值. 相当于Atomics.store。
这些方法的用法与 Atomics.load 和 Atomics.store 类似,只是执行的操作不同。你可以根据实际需要选择合适的方法。
使用 Atomics 实现互斥锁和条件变量
上面我们已经通过 Atomics.compareExchange 演示了一个简单的互斥锁实现。下面我们再来看一个更完整的互斥锁和条件变量的实现示例。
// Mutex.js
class Mutex {
constructor(sharedArrayBuffer, offset = 0) {
this.int32 = new Int32Array(sharedArrayBuffer, offset, 1);
}
acquire() {
while (true) {
const oldValue = Atomics.compareExchange(this.int32, 0, 0, 1);
if (oldValue === 0) {
return;
}
Atomics.wait(this.int32, 0, 1);
}
}
release() {
const oldValue = Atomics.store(this.int32, 0, 0);
if (oldValue !== 1) {
throw new Error('释放锁失败');
}
Atomics.notify(this.int32, 0, 1); // 只唤醒一个等待的线程
}
}
// ConditionVariable.js
class ConditionVariable {
constructor(sharedArrayBuffer, offset = 0) {
this.int32 = new Int32Array(sharedArrayBuffer, offset, 1);
this.waiters = 0; // 等待的线程数
}
wait(mutex) {
this.waiters++;
mutex.release();
Atomics.wait(this.int32, 0, 0); // 在条件变量上等待
this.waiters--;
mutex.acquire();
}
signal() {
if (this.waiters > 0) {
Atomics.notify(this.int32, 0, 1); // 唤醒一个等待的线程
}
}
broadcast() {
if (this.waiters > 0) {
Atomics.notify(this.int32, 0, this.waiters); // 唤醒所有等待的线程
}
}
}
// 使用示例
const sab = new SharedArrayBuffer(1024);
const mutex = new Mutex(sab);
const cond = new ConditionVariable(sab, 4); // 注意偏移量
let sharedCounter = new Int32Array(sab, 8); //共享计数器
// 在一个 worker 线程中
// const { Worker, isMainThread } = require('worker_threads');
// if (!isMainThread) {
// mutex.acquire();
// while (sharedCounter[0] < 10) {
// console.log("等待条件满足...");
// cond.wait(mutex);
// }
// console.log('条件满足,开始执行...');
// sharedCounter[0] = 0;
// mutex.release();
// process.exit();
// }
// 模拟主线程
mutex.acquire();
console.log("主线程增加计数器...");
for(let i = 0; i< 11; i++){
sharedCounter[0] = i;
}
cond.signal(); // 发送信号,唤醒一个等待的线程
mutex.release();
// 另一个worker线程
// new Worker(__filename);
这个例子中,我们定义了 Mutex 和 ConditionVariable 两个类,分别用于实现互斥锁和条件变量。Mutex 类使用 Atomics.compareExchange 实现锁的获取和释放,ConditionVariable 类使用 Atomics.wait 和 Atomics.notify 实现线程的等待和唤醒。
在使用示例中,主线程和 worker 线程通过 Mutex 和 ConditionVariable 进行同步。worker 线程首先获取锁,然后检查共享计数器 sharedCounter 的值。如果值小于 10,则调用 cond.wait(mutex) 在条件变量上等待,并释放锁。主线程获取锁后,增加计数器的值,然后调用 cond.signal() 发送信号,唤醒一个等待的 worker 线程。worker 线程被唤醒后,重新获取锁,并继续执行。
总结与注意事项
Atomics 对象和 SharedArrayBuffer 为 Node.js 多线程编程提供了强大的同步机制。通过原子操作,我们可以避免数据竞争,实现线程安全的共享内存访问。但是,在使用 Atomics 和 SharedArrayBuffer 时,需要注意以下几点:
- 性能开销:原子操作虽然可以保证线程安全,但也会带来一定的性能开销。因此,应避免过度使用原子操作,只在必要时使用。
- 死锁:在使用互斥锁时,要注意避免死锁。例如,一个线程持有锁 A,等待锁 B,而另一个线程持有锁 B,等待锁 A,就会导致死锁。避免死锁的常见方法包括:按固定顺序获取锁、避免持有多个锁、使用超时机制等。
- 复杂性:多线程编程本身就比较复杂,使用
Atomics和SharedArrayBuffer会进一步增加复杂性。因此,在编写多线程代码时,要格外小心,充分测试,确保代码的正确性。 SharedArrayBuffer的兼容性:虽然大多数现代浏览器和 Node.js 版本都支持SharedArrayBuffer,但仍有一些旧版本不支持。在使用SharedArrayBuffer之前,最好检查一下目标环境的兼容性。Atomics.wait和Atomics.notify必须配对使用: 必须在同一个SharedArrayBuffer的同一个位置上进行wait和notify操作. 否则会出现不可预期的结果。
希望这篇文章能帮助你深入理解 Node.js 中 Atomics 和 SharedArrayBuffer 的用法。如果你有任何问题或想法,欢迎在评论区留言,我们一起探讨!