WEBKT

Node.js 多线程进阶:worker_threads 中 Atomics 与 SharedArrayBuffer 的深度同步实践

178 0 0 0

你好,我是你们的“老朋友”——“代码挖掘机”。今天咱们不聊那些花里胡哨的框架,来聊点 Node.js 多线程编程中的硬核知识:worker_threads 模块里的 Atomics 对象以及它在 SharedArrayBuffer 线程同步中的作用。如果你已经对 Node.js 的多线程编程有了一定了解,那么相信这篇文章能让你对线程同步的理解更上一层楼。

为什么需要 Atomics 和 SharedArrayBuffer?

在传统的 JavaScript 单线程模型中,我们很少需要考虑并发问题。但是,当 Node.js 引入 worker_threads 后,多线程编程成为可能,并发问题也随之而来。多个线程同时访问和修改同一块内存区域(SharedArrayBuffer)时,如果没有适当的同步机制,就会出现数据竞争(Data Race),导致程序行为不可预测。

Atomics 对象提供了一组原子操作,可以保证对 SharedArrayBuffer 的读写操作是“原子”的,即不可中断的。这意味着,在一个线程执行原子操作期间,其他线程无法访问或修改同一内存位置的数据。这就像给共享数据加了一把“锁”,确保同一时刻只有一个线程能操作它。

Atomics 对象的核心方法

Atomics 对象提供了一系列静态方法,用于对 SharedArrayBuffer 进行原子操作。下面介绍几个核心方法,并通过代码示例演示其用法。

1. Atomics.load(typedArray, index)

原子地读取 typedArray 中指定 index 位置的值。

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);

// 初始值
int32[0] = 10;

// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
//   console.log('从主线程读取的值:', Atomics.load(int32, 0)); // 原子地读取值
//   port.close();
// });

// 模拟主线程的操作,直接打印
console.log('从主线程读取的值:', Atomics.load(int32, 0));

2. Atomics.store(typedArray, index, value)

原子地将 value 写入 typedArray 中指定 index 的位置,并返回旧值。

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);

// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
//   const oldValue = Atomics.store(int32, 0, 20); // 原子地写入新值
//   console.log('旧值:', oldValue); // 输出旧值
//   console.log('新值:', Atomics.load(int32, 0)); // 再次读取确认
//   port.close();
// });

// 模拟主线程
const oldValue = Atomics.store(int32, 0, 20); // 原子地写入新值
console.log('旧值:', oldValue); // 输出旧值
console.log('新值:', Atomics.load(int32, 0)); // 再次读取确认

3. Atomics.compareExchange(typedArray, index, expectedValue, replacementValue)

原子地比较 typedArrayindex 位置的值与 expectedValue。如果相等,则将 index 位置的值替换为 replacementValue,并返回旧值;否则,不进行任何操作,直接返回旧值。

这个方法是实现互斥锁(Mutex)的基础。

const sab = new SharedArrayBuffer(1024);
const int32 = new Int32Array(sab);

// 初始值
int32[0] = 0; // 0 表示锁空闲,1 表示锁被占用

// 尝试获取锁
function acquireLock() {
  while (true) {
    const oldValue = Atomics.compareExchange(int32, 0, 0, 1);
    if (oldValue === 0) {
      // 获取锁成功
      return;
    }
    // 锁被占用,等待一段时间后重试,或者使用 Atomics.wait
    Atomics.wait(int32, 0, 1); //等待
  }
}

// 释放锁
function releaseLock() {
  const oldValue = Atomics.store(int32, 0, 0);
  if (oldValue !== 1) {
    throw new Error('释放锁失败');
  }
    Atomics.notify(int32, 0, 1);
}

// 在一个 worker 线程中
// const { port } = require('worker_threads');
// port.on('message', () => {
//   acquireLock();
//   console.log('Worker 线程获取到锁,开始执行临界区代码...');
//   // 模拟临界区代码执行
//   for (let i = 0; i < 1000000; i++) { }
//   console.log('Worker 线程执行完毕,释放锁...');
//   releaseLock();
//   port.close();
// });

acquireLock();
console.log('主线程获取到锁,开始执行临界区代码...');
// 模拟临界区代码执行
for (let i = 0; i < 1000000; i++) { }
console.log('主线程执行完毕,释放锁...');
releaseLock();

4. Atomics.wait(typedArray, index, value, timeout)

原子地检查 typedArrayindex 位置的值是否等于 value。如果相等,则当前线程进入休眠状态,直到被 Atomics.notify 唤醒或超时(timeout,以毫秒为单位)。如果 index 位置的值不等于 value,则立即返回 'not-equal'

Atomics.waitAtomics.notify 结合使用,可以实现条件变量。

5. Atomics.notify(typedArray, index, count)

唤醒在 typedArrayindex 位置等待的最多 count 个线程。返回实际唤醒的线程数。

// 续上例 compareExchange

其他原子操作方法

除了上述几个核心方法外,Atomics 对象还提供了一些其他原子操作方法,如:

  • Atomics.add(typedArray, index, value):原子地将 value 加到 typedArrayindex 位置的值上,并返回旧值。
  • Atomics.sub(typedArray, index, value):原子地从 typedArrayindex 位置的值减去 value,并返回旧值。
  • Atomics.and(typedArray, index, value):原子地将 typedArrayindex 位置的值与 value 进行按位与操作,并返回旧值。
  • Atomics.or(typedArray, index, value):原子地将 typedArrayindex 位置的值与 value 进行按位或操作,并返回旧值。
  • Atomics.xor(typedArray, index, value):原子地将 typedArrayindex 位置的值与 value 进行按位异或操作,并返回旧值。
  • Atomics.exchange(typedArray, index, value): 原子地将typedArrayindex位置的值设置为value,并返回旧值. 相当于 Atomics.store

这些方法的用法与 Atomics.loadAtomics.store 类似,只是执行的操作不同。你可以根据实际需要选择合适的方法。

使用 Atomics 实现互斥锁和条件变量

上面我们已经通过 Atomics.compareExchange 演示了一个简单的互斥锁实现。下面我们再来看一个更完整的互斥锁和条件变量的实现示例。

// Mutex.js
class Mutex {
  constructor(sharedArrayBuffer, offset = 0) {
    this.int32 = new Int32Array(sharedArrayBuffer, offset, 1);
  }

  acquire() {
    while (true) {
      const oldValue = Atomics.compareExchange(this.int32, 0, 0, 1);
      if (oldValue === 0) {
        return;
      }
      Atomics.wait(this.int32, 0, 1);
    }
  }

  release() {
    const oldValue = Atomics.store(this.int32, 0, 0);
    if (oldValue !== 1) {
      throw new Error('释放锁失败');
    }
    Atomics.notify(this.int32, 0, 1); // 只唤醒一个等待的线程
  }
}

// ConditionVariable.js
class ConditionVariable {
  constructor(sharedArrayBuffer, offset = 0) {
    this.int32 = new Int32Array(sharedArrayBuffer, offset, 1);
    this.waiters = 0; // 等待的线程数
  }

  wait(mutex) {
    this.waiters++;
    mutex.release();
    Atomics.wait(this.int32, 0, 0); // 在条件变量上等待
     this.waiters--;
    mutex.acquire();

  }

  signal() {
    if (this.waiters > 0) {
       Atomics.notify(this.int32, 0, 1); // 唤醒一个等待的线程
    }

  }

  broadcast() {
    if (this.waiters > 0) {
      Atomics.notify(this.int32, 0, this.waiters); // 唤醒所有等待的线程
    }
  }
}

// 使用示例
const sab = new SharedArrayBuffer(1024);
const mutex = new Mutex(sab);
const cond = new ConditionVariable(sab, 4); // 注意偏移量

let sharedCounter = new Int32Array(sab, 8); //共享计数器

// 在一个 worker 线程中
// const { Worker, isMainThread } = require('worker_threads');
// if (!isMainThread) {
//   mutex.acquire();
//   while (sharedCounter[0] < 10) {
//     console.log("等待条件满足...");
//     cond.wait(mutex);
//   }
//   console.log('条件满足,开始执行...');
//    sharedCounter[0]  = 0;
//   mutex.release();
//   process.exit();
// }


// 模拟主线程
mutex.acquire();
console.log("主线程增加计数器...");
for(let i = 0; i< 11; i++){
    sharedCounter[0] = i;
}
cond.signal(); // 发送信号,唤醒一个等待的线程
mutex.release();


// 另一个worker线程
// new Worker(__filename);

这个例子中,我们定义了 MutexConditionVariable 两个类,分别用于实现互斥锁和条件变量。Mutex 类使用 Atomics.compareExchange 实现锁的获取和释放,ConditionVariable 类使用 Atomics.waitAtomics.notify 实现线程的等待和唤醒。

在使用示例中,主线程和 worker 线程通过 MutexConditionVariable 进行同步。worker 线程首先获取锁,然后检查共享计数器 sharedCounter 的值。如果值小于 10,则调用 cond.wait(mutex) 在条件变量上等待,并释放锁。主线程获取锁后,增加计数器的值,然后调用 cond.signal() 发送信号,唤醒一个等待的 worker 线程。worker 线程被唤醒后,重新获取锁,并继续执行。

总结与注意事项

Atomics 对象和 SharedArrayBuffer 为 Node.js 多线程编程提供了强大的同步机制。通过原子操作,我们可以避免数据竞争,实现线程安全的共享内存访问。但是,在使用 AtomicsSharedArrayBuffer 时,需要注意以下几点:

  1. 性能开销:原子操作虽然可以保证线程安全,但也会带来一定的性能开销。因此,应避免过度使用原子操作,只在必要时使用。
  2. 死锁:在使用互斥锁时,要注意避免死锁。例如,一个线程持有锁 A,等待锁 B,而另一个线程持有锁 B,等待锁 A,就会导致死锁。避免死锁的常见方法包括:按固定顺序获取锁、避免持有多个锁、使用超时机制等。
  3. 复杂性:多线程编程本身就比较复杂,使用 AtomicsSharedArrayBuffer 会进一步增加复杂性。因此,在编写多线程代码时,要格外小心,充分测试,确保代码的正确性。
  4. SharedArrayBuffer 的兼容性:虽然大多数现代浏览器和 Node.js 版本都支持 SharedArrayBuffer,但仍有一些旧版本不支持。在使用 SharedArrayBuffer 之前,最好检查一下目标环境的兼容性。
  5. Atomics.waitAtomics.notify 必须配对使用: 必须在同一个SharedArrayBuffer的同一个位置上进行wait和notify操作. 否则会出现不可预期的结果。

希望这篇文章能帮助你深入理解 Node.js 中 AtomicsSharedArrayBuffer 的用法。如果你有任何问题或想法,欢迎在评论区留言,我们一起探讨!

代码挖掘机 Node.js多线程Atomics

评论点评