WEBKT

基于 SharedArrayBuffer 与 Atomics 的多 Web Worker 无锁高并发队列实现方案

3 0 0 0

在现代 Web 开发中,随着 WebGL/WebGPU 渲染、大型物理引擎及复杂音频合成等重度计算任务向前端转移,传统的 postMessage 通信机制由于结构化克隆(Structured Clone)带来的高延迟,正逐渐成为系统的性能瓶颈。

为了实现零拷贝、微秒级延迟的多线程协同,基于 SharedArrayBuffer(SAB)与 Atomics 的共享内存通信方案是最优解。然而,由于 Spectre 等 CPU 侧信道漏洞,浏览器默认禁用了 SharedArrayBuffer

本文将介绍如何在启用 COOPCOEP 安全头的前提下,基于 Vyukov 算法实现一个适用于多 Web Worker 并发读写的无锁(Lock-Free)环形缓冲区(Ring Buffer)队列。


一、 基础设施:配置 COOP 与 COEP 安全响应头

要在浏览器中启用 SharedArrayBuffer,服务器必须返回以下两个 HTTP 响应头,以确保当前上下文处于“跨源隔离”(Cross-Origin Isolated)状态:

Cross-Origin-Opener-Policy: same-origin
Cross-Origin-Embedder-Policy: require-corp

常见服务器配置示例

1. Nginx 配置

server {
    listen 80;
    server_name localhost;

    location / {
        add_header Cross-Origin-Opener-Policy "same-origin" always;
        add_header Cross-Origin-Embedder-Policy "require-corp" always;
        root /usr/share/nginx/html;
        index index.html;
    }
}

2. Node.js (Koa / Express) 配置

// Express 示例
app.use((req, res, next) => {
  res.setHeader('Cross-Origin-Opener-Policy', 'same-origin');
  res.setHeader('Cross-Origin-Embedder-Policy', 'require-corp');
  next();
});

在本地开发阶段,如使用 Vite,可在 vite.config.js 中配置 server.headers

export default {
  server: {
    headers: {
      'Cross-Origin-Opener-Policy': 'same-origin',
      'Cross-Origin-Embedder-Policy': 'require-corp',
    },
  },
};

二、 内存布局设计:多生产者多消费者(MPMC)无锁队列

要在共享内存中构建一个无锁队列,我们需要设计紧凑的内存布局,并直接操作二进制数组。我们采用经典的 Vyukov 无锁环形缓冲区(Multi-Producer Multi-Consumer Ring Buffer)算法。

1. 约束条件

  • 队列容量 $N$ 必须是 2 的幂次方(例如 256, 1024),以便利用位运算 (index & (capacity - 1)) 代替高成本的取模 % 运算。
  • 使用 Int32Array 作为底层视图,以确保 Atomics 操作的原子性和内存对齐。

2. 内存布局

我们将整个 SharedArrayBuffer 划分为三个部分:

  • Header(元数据区)
    • head (4 字节, Int32):消费者读取指针。
    • tail (4 字节, Int32):生产者写入指针。
  • Sequences(序号数组区):长度为 $N$ 的 Int32Array,用于检测槽位当前的可读/可写状态,避免读写冲突。
  • Data(数据缓冲区):长度为 $N$ 的 Int32Array,存放实际队列数据。
SharedArrayBuffer 字节流:
+------------------+------------------+-----------------------------+-----------------------------+
| head (4 Bytes)   | tail (4 Bytes)   | Sequences [0 ... N-1]       | Data [0 ... N-1]            |
| Int32            | Int32            | Int32Array (N * 4 Bytes)    | Int32Array (N * 4 Bytes)    |
+------------------+------------------+-----------------------------+-----------------------------+
0                  4                  8                             8 + 4N

三、 无锁队列核心类实现:LockFreeQueue

以下是基于 SharedArrayBufferAtomics 构建的高并发无锁环形队列实现。它不仅支持多线程安全的 enqueuedequeue 操作,而且由于采用 CAS(Compare-And-Swap)机制,完全不需要加锁。

// lock-free-queue.js

export class LockFreeQueue {
  /**
   * 初始化队列
   * @param {number|SharedArrayBuffer} capacityOrBuffer - 队列容量(必须是2的幂)或已存在的 SharedArrayBuffer
   */
  constructor(capacityOrBuffer) {
    if (capacityOrBuffer instanceof SharedArrayBuffer) {
      this.sab = capacityOrBuffer;
      this._initViews();
      this.capacity = this.sequences.length;
    } else {
      const capacity = capacityOrBuffer;
      if ((capacity & (capacity - 1)) !== 0) {
        throw new Error("容量必须是 2 的幂次方(如 128, 512, 1024)");
      }
      this.capacity = capacity;
      
      // 计算所需总内存字节数
      // 8 (head & tail) + N * 4 (sequences) + N * 4 (data)
      const totalBytes = 8 + capacity * 4 + capacity * 4;
      this.sab = new SharedArrayBuffer(totalBytes);
      this._initViews();
      this._resetSequences();
    }
    
    this.mask = this.capacity - 1;
  }

  _initViews() {
    this.header = new Int32Array(this.sab, 0, 2);
    // 元数据映射:0 -> head, 1 -> tail
    this.headRef = 0;
    this.tailRef = 1;

    // 偏移量 8 字节开始为 sequence 数组
    this.sequences = new Int32Array(this.sab, 8, this.capacity);
    // sequence 数组后为 data 数组
    this.data = new Int32Array(this.sab, 8 + this.capacity * 4, this.capacity);
  }

  _resetSequences() {
    for (let i = 0; i < this.capacity; i++) {
      this.sequences[i] = i;
    }
  }

  /**
   * 获取底层的 SharedArrayBuffer,用于跨线程传递
   */
  getBuffer() {
    return this.sab;
  }

  /**
   * 向队列写入一个元素 (生产者调用)
   * @param {number} value - 待写入的 32 位整型数据
   * @returns {boolean} - 写入成功返回 true,队列已满返回 false
   */
  enqueue(value) {
    const sequences = this.sequences;
    const data = this.data;
    const mask = this.mask;

    while (true) {
      // 1. 获取当前的尾指针
      const tail = Atomics.load(this.header, this.tailRef);
      const index = tail & mask;
      
      // 2. 获取该槽位的序号
      const seq = Atomics.load(sequences, index);
      const diff = seq - tail;

      if (diff === 0) {
        // 槽位准备好写入。尝试抢占 tail 指针(CAS 操作)
        if (Atomics.compareExchange(this.header, this.tailRef, tail, tail + 1) === tail) {
          // 抢占成功,安全地写入数据并更新槽位状态
          data[index] = value;
          // 通知消费者该槽位可读 (将序号递增 1)
          Atomics.store(sequences, index, tail + 1);
          return true;
        }
        // 如果 CAS 失败,说明有其他生产者抢占成功,继续自旋尝试
      } else if (diff < 0) {
        // 槽位尚未被读取,说明队列已满
        return false;
      }
      // diff > 0 说明当前槽位已被别的生产者抢先写入但还未更新序号,继续自旋
    }
  }

  /**
   * 从队列中读取一个元素 (消费者调用)
   * @param {Object} outRef - 接收返回值的容器对象 { value: null }
   * @returns {boolean} - 读取成功返回 true,队列为空返回 false
   */
  dequeue(outRef) {
    const sequences = this.sequences;
    const data = this.data;
    const mask = this.mask;
    const capacity = this.capacity;

    while (true) {
      // 1. 获取当前的头指针
      const head = Atomics.load(this.header, this.headRef);
      const index = head & mask;

      // 2. 获取该槽位的序号
      const seq = Atomics.load(sequences, index);
      const diff = seq - (head + 1);

      if (diff === 0) {
        // 槽位准备好读取。尝试抢占 head 指针 (CAS 操作)
        if (Atomics.compareExchange(this.header, this.headRef, head, head + 1) === head) {
          // 抢占成功,读取数据并重置该槽位序号为下一轮循环可用
          outRef.value = data[index];
          // 将槽位序号设为下一个写入周期的 tail 值:head + mask + 1 (即当前周期 + 队列容量)
          Atomics.store(sequences, index, head + capacity);
          return true;
        }
        // CAS 失败,有其他消费者抢占成功,自旋
      } else if (diff < 0) {
        // 说明该槽位尚无新数据写入,队列为空
        return false;
      }
      // diff > 0 说明其他消费者正在读取,但还未更新序号,自旋
    }
  }
}

四、 实战:主线程与多 Worker 间的高并发协作

下面的实例模拟了主线程作为任务调度中心,分配任务到 4 个后台 Worker 中,Worker 从队列中并发读取任务并执行的场景。

1. Worker 代码 (worker.js)

在 Worker 中,我们监听初始化消息获取 SharedArrayBuffer,重建 LockFreeQueue 实例,随后进入无锁自旋读取模式:

import { LockFreeQueue } from './lock-free-queue.js';

let queue = null;
let running = true;

self.onmessage = (e) => {
  if (e.data.type === 'INIT') {
    // 跨线程共享内存,重建队列实例
    queue = new LockFreeQueue(e.data.sab);
    startConsuming();
  } else if (e.data.type === 'STOP') {
    running = false;
  }
};

function startConsuming() {
  const container = { value: null };
  
  function poll() {
    if (!running) return;

    let processedCount = 0;
    // 每次轮询尝试批量读取,减少调度开销
    for (let i = 0; i < 50; i++) {
      if (queue.dequeue(container)) {
        const taskData = container.value;
        // 模拟执行轻量计算任务
        const result = heavyMath(taskData);
        processedCount++;
      } else {
        // 队列为空,退出内层循环
        break;
      }
    }

    if (processedCount > 0) {
      // 仅在非空轮询时使用,避免过度占用 CPU
      requestAnimationFrame(poll); 
    } else {
      // 队列为空,使用 setImmediate/setTimeout 延迟自旋,防止 CPU 空转跑满 100%
      setTimeout(poll, 1);
    }
  }

  poll();
}

function heavyMath(num) {
  // 模拟耗时运算
  return Math.sqrt(num) * Math.sin(num);
}

2. 主线程代码 (main.js)

在主线程中实例化队列,创建 4 个 Worker 并分配底层的 SharedArrayBuffer

import { LockFreeQueue } from './lock-free-queue.js';

const QUEUE_CAPACITY = 2048; // 2的幂
const queue = new LockFreeQueue(QUEUE_CAPACITY);

// 创建 4 个并发 Worker
const workerCount = 4;
const workers = [];

for (let i = 0; i < workerCount; i++) {
  const worker = new Worker(new URL('./worker.js', import.meta.url), { type: 'module' });
  // 将 SharedArrayBuffer 发送给 Worker
  worker.postMessage({
    type: 'INIT',
    sab: queue.getBuffer()
  });
  workers.push(worker);
}

// 主线程不断往队列写入数据(生产者)
let taskCounter = 0;
function produceTasks() {
  let successCount = 0;
  
  // 尝试批量写入任务
  for (let i = 0; i < 100; i++) {
    if (queue.enqueue(taskCounter)) {
      taskCounter++;
      successCount++;
    } else {
      // 队列已满
      console.warn("Queue Full! Slowing down production.");
      break;
    }
  }

  // 持续生成任务
  requestAnimationFrame(produceTasks);
}

// 开启生产线
produceTasks();

五、 性能优化与注意事项

  1. 避免 CPU 空转(Spinlock Mitigation)
    当无锁队列为空(读)或已满(写)时,如果不加控制地执行 while(true),会导致 CPU 核心占用率飙升到 100%。在上文的 worker.js 代码中,我们对空载情况引入了 setTimeout(poll, 1),以退让 CPU 时间片。在极低延迟场景中,可使用几百次空转(spin)后引入短暂休眠的混合退避算法(Exponential Backoff)。

  2. 利用 Atomics.wait()Atomics.notify() 进行挂起唤醒
    如果队列中没有任务,可以让 Worker 线程进入阻塞状态,在主线程有新任务进入时再行唤醒:

    // 消费者无数据可读时挂起自身(注意:主线程不支持 Atomics.wait 阻塞)
    Atomics.wait(this.header, this.headRef, expectedValue);
    
    // 生产者写入新数据时唤醒一个正在等待的 Worker
    Atomics.notify(this.header, this.headRef, 1);
    
  3. JS 引擎垃圾回收(GC)开销为零
    无锁队列直接在预先分配的 SharedArrayBuffer 内存块上直接进行就地存取(In-place reads/writes)。这意味着整个高并发读写链路中没有任何新 JS 对象的创建,实现了极致的 零 GC 开销,在平滑帧率、消除卡顿方面具有决定性优势。

极客零号 WebWorker无锁队列

评论点评