Node.js 分布式任务系统:Redis Pub/Sub 实现实时任务分发与状态同步
Node.js 分布式任务系统:Redis Pub/Sub 实现实时任务分发与状态同步
为啥要用 Redis Pub/Sub?
实战:搭建一个简单的任务分发系统
1. 环境准备
2. 任务发布者(Publisher)
3. 任务消费者(Subscriber)
4. 运行测试
进阶:解决消息可靠性和消费者竞争问题
1. 消息可靠性
2. 消费者竞争
总结与进阶
Node.js 分布式任务系统:Redis Pub/Sub 实现实时任务分发与状态同步
大家好,我是你们的赛博朋克老友“码农老炮儿”。今天咱们来聊聊 Node.js 分布式任务系统里一个挺有意思的话题:怎么用 Redis 的 Pub/Sub(发布/订阅)机制来实现任务的实时分发和状态同步。这玩意儿在实际开发中用处可大了,特别是对于那些需要构建实时任务处理系统的 Node.js 开发者来说,绝对是居家旅行必备良药。
为啥要用 Redis Pub/Sub?
在分布式系统中,任务的分发和状态同步是个老大难问题。你想啊,一堆任务扔过来,怎么保证每个任务都能被可靠地执行,而且执行状态还能实时反馈?传统的轮询方式效率太低,消息队列又太重,这时候 Redis Pub/Sub 就闪亮登场了。
Redis Pub/Sub 有以下几个优点:
- 轻量级:Redis 本身就以高性能著称,Pub/Sub 又是它自带的功能,不需要额外的组件,用起来非常方便。
- 实时性:消息发布后,订阅者几乎可以立即收到,延迟非常低。
- 简单易用:Redis 的 API 设计得非常简洁,几行代码就能搞定 Pub/Sub 的基本功能。
- 支持多订阅者:一个频道可以有多个订阅者,方便实现任务的广播。
当然,Redis Pub/Sub 也有一些缺点,主要是:
- 消息不可靠:如果订阅者不在线,消息就丢失了,Redis 不会持久化 Pub/Sub 消息。
- 不支持消息确认:发布者不知道订阅者是否成功处理了消息。
所以,在使用 Redis Pub/Sub 的时候,我们需要根据实际情况权衡利弊,并采取一些措施来弥补它的不足。接下来,我们就来详细聊聊怎么做。
实战:搭建一个简单的任务分发系统
为了让大家更好地理解,咱们来动手搭建一个简单的任务分发系统。这个系统包含两个角色:
- 任务发布者(Publisher):负责发布任务。
- 任务消费者(Subscriber):负责订阅任务并执行。
1. 环境准备
首先,确保你已经安装了 Node.js 和 Redis。然后,创建一个新的 Node.js 项目,并安装 Redis 客户端库:
npm init -y npm install redis
2. 任务发布者(Publisher)
// publisher.js const redis = require('redis'); // 创建 Redis 客户端 const publisher = redis.createClient(); (async () => { await publisher.connect(); // 模拟任务发布 let taskId = 1; setInterval(async () => { const task = { id: taskId++, data: `任务数据-${taskId}`, }; // 发布任务到 'task_queue' 频道 await publisher.publish('task_queue', JSON.stringify(task)); console.log(`发布任务:${JSON.stringify(task)}`); }, 1000); })();
这段代码创建了一个 Redis 客户端,然后每隔 1 秒发布一个任务到名为 task_queue
的频道。任务数据是一个简单的 JSON 对象。
3. 任务消费者(Subscriber)
// subscriber.js const redis = require('redis'); // 创建 Redis 客户端 const subscriber = redis.createClient(); (async () => { await subscriber.connect(); // 订阅 'task_queue' 频道 await subscriber.subscribe('task_queue', (message) => { const task = JSON.parse(message); console.log(`收到任务:${JSON.stringify(task)}`); // 模拟任务处理 setTimeout(() => { console.log(`任务 ${task.id} 处理完成`); }, 500); }); console.log('开始监听任务...'); })();
这段代码创建了另一个 Redis 客户端,并订阅了 task_queue
频道。当收到消息时,它会解析 JSON 数据,模拟任务处理,并在 500 毫秒后打印任务完成的消息。
4. 运行测试
分别运行 publisher.js
和 subscriber.js
:
node publisher.js node subscriber.js
你会看到发布者不断发布任务,消费者不断接收并处理任务。
进阶:解决消息可靠性和消费者竞争问题
上面的例子只是一个简单的演示,实际应用中还需要考虑消息可靠性和消费者竞争的问题。
1. 消息可靠性
Redis Pub/Sub 不保证消息可靠性,如果订阅者不在线,消息就丢失了。为了解决这个问题,我们可以结合 Redis 的 List 数据结构来实现一个简单的任务队列。
- 发布者:发布任务时,除了使用
publish
发布消息外,还使用lpush
将任务添加到 List 中。 - 消费者:启动时,先尝试从 List 中使用
rpop
获取任务(这样可以处理离线期间积压的任务)。如果没有获取到任务,再使用subscribe
订阅频道。
// publisher.js (改进版) const redis = require('redis'); const publisher = redis.createClient(); (async () => { await publisher.connect(); let taskId = 1; setInterval(async () => { const task = { id: taskId++, data: `任务数据-${taskId}`, }; await publisher.publish('task_queue', JSON.stringify(task)); // 将任务添加到 List await publisher.lPush('task_list', JSON.stringify(task)); console.log(`发布任务:${JSON.stringify(task)}`); }, 1000); })(); // subscriber.js (改进版) const redis = require('redis'); const subscriber = redis.createClient(); (async () => { await subscriber.connect(); const processTask = async (task) => { console.log(`收到任务:${JSON.stringify(task)}`); setTimeout(() => { console.log(`任务 ${task.id} 处理完成`); }, 500); } // 优先从 List 中获取任务 const getTaskFromList = async()=>{ let task = await subscriber.rPop('task_list'); while(task){ await processTask(JSON.parse(task)); task = await subscriber.rPop('task_list'); } } await getTaskFromList(); await subscriber.subscribe('task_queue', async (message) => { const task = JSON.parse(message); await processTask(task); }); console.log('开始监听任务...'); })();
通过这种方式,即使消费者离线,任务也会保存在List中,消费者上线后可以继续处理。
2. 消费者竞争
如果有多个消费者同时订阅同一个频道,它们会收到相同的消息,导致任务重复执行。为了解决这个问题,我们可以使用 Redis 的 SETNX
命令来实现一个简单的分布式锁。
- 消费者:在处理消息之前,先尝试获取锁(
SETNX
)。如果获取成功,表示当前消费者可以处理该任务;否则,表示其他消费者正在处理,当前消费者忽略该消息。
// subscriber.js (再次改进版) const redis = require('redis'); const subscriber = redis.createClient(); (async () => { await subscriber.connect(); const processTask = async (task) => { console.log(`收到任务:${JSON.stringify(task)}`); setTimeout(() => { console.log(`任务 ${task.id} 处理完成`); // 释放锁 subscriber.del(`task_lock:${task.id}`); }, 500); }; const getTaskFromList = async () => { let task = await subscriber.rPop('task_list'); while (task) { const taskObj = JSON.parse(task); // 尝试获取锁 const lock = await subscriber.setNX(`task_lock:${taskObj.id}`, '1'); if (lock) { await subscriber.expire(`task_lock:${taskObj.id}`,10); //设置过期时间 await processTask(taskObj); } task = await subscriber.rPop('task_list'); } }; await getTaskFromList(); await subscriber.subscribe('task_queue', async (message) => { const task = JSON.parse(message); // 尝试获取锁 const lock = await subscriber.setNX(`task_lock:${task.id}`, '1'); if (lock) { await subscriber.expire(`task_lock:${task.id}`,10); //设置过期时间 await processTask(task); } }); console.log('开始监听任务...'); })();
通过分布式锁,我们可以确保同一时刻只有一个消费者处理同一个任务。
总结与进阶
今天我们学习了如何使用 Redis Pub/Sub 机制在 Node.js 分布式任务系统中实现实时任务分发和状态同步。我们通过一个简单的例子演示了基本用法,并针对消息可靠性和消费者竞争问题提出了解决方案。
当然,这只是一个入门级的方案,实际应用中还需要考虑更多因素,例如:
- 任务优先级:如何让重要的任务优先执行?
- 任务重试:如果任务执行失败,如何自动重试?
- 任务监控:如何监控任务的执行状态,及时发现问题?
- 水平扩展:如何让系统支持更多的任务和消费者?
- 更完善的错误处理: 包括网络错误、Redis连接错误、任务处理过程中的错误等。
这些问题都需要我们在实践中不断探索和优化。希望今天的分享能给大家带来一些启发,让大家在构建 Node.js 分布式任务系统时更加得心应手。
如果你有任何问题或者想法,欢迎在评论区留言,咱们一起交流学习!