基于 SharedArrayBuffer 与 Atomics 的多 Web Worker 无锁高并发队列实现方案
在现代 Web 开发中,随着 WebGL/WebGPU 渲染、大型物理引擎及复杂音频合成等重度计算任务向前端转移,传统的 postMessage 通信机制由于结构化克隆(Structured Clone)带来的高延迟,正逐渐成为系统的性能瓶颈。
为了实现零拷贝、微秒级延迟的多线程协同,基于 SharedArrayBuffer(SAB)与 Atomics 的共享内存通信方案是最优解。然而,由于 Spectre 等 CPU 侧信道漏洞,浏览器默认禁用了 SharedArrayBuffer。
本文将介绍如何在启用 COOP 和 COEP 安全头的前提下,基于 Vyukov 算法实现一个适用于多 Web Worker 并发读写的无锁(Lock-Free)环形缓冲区(Ring Buffer)队列。
一、 基础设施:配置 COOP 与 COEP 安全响应头
要在浏览器中启用 SharedArrayBuffer,服务器必须返回以下两个 HTTP 响应头,以确保当前上下文处于“跨源隔离”(Cross-Origin Isolated)状态:
Cross-Origin-Opener-Policy: same-origin
Cross-Origin-Embedder-Policy: require-corp
常见服务器配置示例
1. Nginx 配置
server {
listen 80;
server_name localhost;
location / {
add_header Cross-Origin-Opener-Policy "same-origin" always;
add_header Cross-Origin-Embedder-Policy "require-corp" always;
root /usr/share/nginx/html;
index index.html;
}
}
2. Node.js (Koa / Express) 配置
// Express 示例
app.use((req, res, next) => {
res.setHeader('Cross-Origin-Opener-Policy', 'same-origin');
res.setHeader('Cross-Origin-Embedder-Policy', 'require-corp');
next();
});
在本地开发阶段,如使用 Vite,可在 vite.config.js 中配置 server.headers:
export default {
server: {
headers: {
'Cross-Origin-Opener-Policy': 'same-origin',
'Cross-Origin-Embedder-Policy': 'require-corp',
},
},
};
二、 内存布局设计:多生产者多消费者(MPMC)无锁队列
要在共享内存中构建一个无锁队列,我们需要设计紧凑的内存布局,并直接操作二进制数组。我们采用经典的 Vyukov 无锁环形缓冲区(Multi-Producer Multi-Consumer Ring Buffer)算法。
1. 约束条件
- 队列容量 $N$ 必须是 2 的幂次方(例如 256, 1024),以便利用位运算
(index & (capacity - 1))代替高成本的取模%运算。 - 使用
Int32Array作为底层视图,以确保Atomics操作的原子性和内存对齐。
2. 内存布局
我们将整个 SharedArrayBuffer 划分为三个部分:
- Header(元数据区):
head(4 字节, Int32):消费者读取指针。tail(4 字节, Int32):生产者写入指针。
- Sequences(序号数组区):长度为 $N$ 的
Int32Array,用于检测槽位当前的可读/可写状态,避免读写冲突。 - Data(数据缓冲区):长度为 $N$ 的
Int32Array,存放实际队列数据。
SharedArrayBuffer 字节流:
+------------------+------------------+-----------------------------+-----------------------------+
| head (4 Bytes) | tail (4 Bytes) | Sequences [0 ... N-1] | Data [0 ... N-1] |
| Int32 | Int32 | Int32Array (N * 4 Bytes) | Int32Array (N * 4 Bytes) |
+------------------+------------------+-----------------------------+-----------------------------+
0 4 8 8 + 4N
三、 无锁队列核心类实现:LockFreeQueue
以下是基于 SharedArrayBuffer 与 Atomics 构建的高并发无锁环形队列实现。它不仅支持多线程安全的 enqueue 和 dequeue 操作,而且由于采用 CAS(Compare-And-Swap)机制,完全不需要加锁。
// lock-free-queue.js
export class LockFreeQueue {
/**
* 初始化队列
* @param {number|SharedArrayBuffer} capacityOrBuffer - 队列容量(必须是2的幂)或已存在的 SharedArrayBuffer
*/
constructor(capacityOrBuffer) {
if (capacityOrBuffer instanceof SharedArrayBuffer) {
this.sab = capacityOrBuffer;
this._initViews();
this.capacity = this.sequences.length;
} else {
const capacity = capacityOrBuffer;
if ((capacity & (capacity - 1)) !== 0) {
throw new Error("容量必须是 2 的幂次方(如 128, 512, 1024)");
}
this.capacity = capacity;
// 计算所需总内存字节数
// 8 (head & tail) + N * 4 (sequences) + N * 4 (data)
const totalBytes = 8 + capacity * 4 + capacity * 4;
this.sab = new SharedArrayBuffer(totalBytes);
this._initViews();
this._resetSequences();
}
this.mask = this.capacity - 1;
}
_initViews() {
this.header = new Int32Array(this.sab, 0, 2);
// 元数据映射:0 -> head, 1 -> tail
this.headRef = 0;
this.tailRef = 1;
// 偏移量 8 字节开始为 sequence 数组
this.sequences = new Int32Array(this.sab, 8, this.capacity);
// sequence 数组后为 data 数组
this.data = new Int32Array(this.sab, 8 + this.capacity * 4, this.capacity);
}
_resetSequences() {
for (let i = 0; i < this.capacity; i++) {
this.sequences[i] = i;
}
}
/**
* 获取底层的 SharedArrayBuffer,用于跨线程传递
*/
getBuffer() {
return this.sab;
}
/**
* 向队列写入一个元素 (生产者调用)
* @param {number} value - 待写入的 32 位整型数据
* @returns {boolean} - 写入成功返回 true,队列已满返回 false
*/
enqueue(value) {
const sequences = this.sequences;
const data = this.data;
const mask = this.mask;
while (true) {
// 1. 获取当前的尾指针
const tail = Atomics.load(this.header, this.tailRef);
const index = tail & mask;
// 2. 获取该槽位的序号
const seq = Atomics.load(sequences, index);
const diff = seq - tail;
if (diff === 0) {
// 槽位准备好写入。尝试抢占 tail 指针(CAS 操作)
if (Atomics.compareExchange(this.header, this.tailRef, tail, tail + 1) === tail) {
// 抢占成功,安全地写入数据并更新槽位状态
data[index] = value;
// 通知消费者该槽位可读 (将序号递增 1)
Atomics.store(sequences, index, tail + 1);
return true;
}
// 如果 CAS 失败,说明有其他生产者抢占成功,继续自旋尝试
} else if (diff < 0) {
// 槽位尚未被读取,说明队列已满
return false;
}
// diff > 0 说明当前槽位已被别的生产者抢先写入但还未更新序号,继续自旋
}
}
/**
* 从队列中读取一个元素 (消费者调用)
* @param {Object} outRef - 接收返回值的容器对象 { value: null }
* @returns {boolean} - 读取成功返回 true,队列为空返回 false
*/
dequeue(outRef) {
const sequences = this.sequences;
const data = this.data;
const mask = this.mask;
const capacity = this.capacity;
while (true) {
// 1. 获取当前的头指针
const head = Atomics.load(this.header, this.headRef);
const index = head & mask;
// 2. 获取该槽位的序号
const seq = Atomics.load(sequences, index);
const diff = seq - (head + 1);
if (diff === 0) {
// 槽位准备好读取。尝试抢占 head 指针 (CAS 操作)
if (Atomics.compareExchange(this.header, this.headRef, head, head + 1) === head) {
// 抢占成功,读取数据并重置该槽位序号为下一轮循环可用
outRef.value = data[index];
// 将槽位序号设为下一个写入周期的 tail 值:head + mask + 1 (即当前周期 + 队列容量)
Atomics.store(sequences, index, head + capacity);
return true;
}
// CAS 失败,有其他消费者抢占成功,自旋
} else if (diff < 0) {
// 说明该槽位尚无新数据写入,队列为空
return false;
}
// diff > 0 说明其他消费者正在读取,但还未更新序号,自旋
}
}
}
四、 实战:主线程与多 Worker 间的高并发协作
下面的实例模拟了主线程作为任务调度中心,分配任务到 4 个后台 Worker 中,Worker 从队列中并发读取任务并执行的场景。
1. Worker 代码 (worker.js)
在 Worker 中,我们监听初始化消息获取 SharedArrayBuffer,重建 LockFreeQueue 实例,随后进入无锁自旋读取模式:
import { LockFreeQueue } from './lock-free-queue.js';
let queue = null;
let running = true;
self.onmessage = (e) => {
if (e.data.type === 'INIT') {
// 跨线程共享内存,重建队列实例
queue = new LockFreeQueue(e.data.sab);
startConsuming();
} else if (e.data.type === 'STOP') {
running = false;
}
};
function startConsuming() {
const container = { value: null };
function poll() {
if (!running) return;
let processedCount = 0;
// 每次轮询尝试批量读取,减少调度开销
for (let i = 0; i < 50; i++) {
if (queue.dequeue(container)) {
const taskData = container.value;
// 模拟执行轻量计算任务
const result = heavyMath(taskData);
processedCount++;
} else {
// 队列为空,退出内层循环
break;
}
}
if (processedCount > 0) {
// 仅在非空轮询时使用,避免过度占用 CPU
requestAnimationFrame(poll);
} else {
// 队列为空,使用 setImmediate/setTimeout 延迟自旋,防止 CPU 空转跑满 100%
setTimeout(poll, 1);
}
}
poll();
}
function heavyMath(num) {
// 模拟耗时运算
return Math.sqrt(num) * Math.sin(num);
}
2. 主线程代码 (main.js)
在主线程中实例化队列,创建 4 个 Worker 并分配底层的 SharedArrayBuffer:
import { LockFreeQueue } from './lock-free-queue.js';
const QUEUE_CAPACITY = 2048; // 2的幂
const queue = new LockFreeQueue(QUEUE_CAPACITY);
// 创建 4 个并发 Worker
const workerCount = 4;
const workers = [];
for (let i = 0; i < workerCount; i++) {
const worker = new Worker(new URL('./worker.js', import.meta.url), { type: 'module' });
// 将 SharedArrayBuffer 发送给 Worker
worker.postMessage({
type: 'INIT',
sab: queue.getBuffer()
});
workers.push(worker);
}
// 主线程不断往队列写入数据(生产者)
let taskCounter = 0;
function produceTasks() {
let successCount = 0;
// 尝试批量写入任务
for (let i = 0; i < 100; i++) {
if (queue.enqueue(taskCounter)) {
taskCounter++;
successCount++;
} else {
// 队列已满
console.warn("Queue Full! Slowing down production.");
break;
}
}
// 持续生成任务
requestAnimationFrame(produceTasks);
}
// 开启生产线
produceTasks();
五、 性能优化与注意事项
避免 CPU 空转(Spinlock Mitigation)
当无锁队列为空(读)或已满(写)时,如果不加控制地执行while(true),会导致 CPU 核心占用率飙升到 100%。在上文的worker.js代码中,我们对空载情况引入了setTimeout(poll, 1),以退让 CPU 时间片。在极低延迟场景中,可使用几百次空转(spin)后引入短暂休眠的混合退避算法(Exponential Backoff)。利用
Atomics.wait()和Atomics.notify()进行挂起唤醒
如果队列中没有任务,可以让 Worker 线程进入阻塞状态,在主线程有新任务进入时再行唤醒:// 消费者无数据可读时挂起自身(注意:主线程不支持 Atomics.wait 阻塞) Atomics.wait(this.header, this.headRef, expectedValue); // 生产者写入新数据时唤醒一个正在等待的 Worker Atomics.notify(this.header, this.headRef, 1);JS 引擎垃圾回收(GC)开销为零
无锁队列直接在预先分配的SharedArrayBuffer内存块上直接进行就地存取(In-place reads/writes)。这意味着整个高并发读写链路中没有任何新 JS 对象的创建,实现了极致的 零 GC 开销,在平滑帧率、消除卡顿方面具有决定性优势。