WEBKT

Node.js 分布式任务系统:Redis Pub/Sub 实现实时任务分发与状态同步

86 0 0 0

Node.js 分布式任务系统:Redis Pub/Sub 实现实时任务分发与状态同步

为啥要用 Redis Pub/Sub?

实战:搭建一个简单的任务分发系统

1. 环境准备

2. 任务发布者(Publisher)

3. 任务消费者(Subscriber)

4. 运行测试

进阶:解决消息可靠性和消费者竞争问题

1. 消息可靠性

2. 消费者竞争

总结与进阶

Node.js 分布式任务系统:Redis Pub/Sub 实现实时任务分发与状态同步

大家好,我是你们的赛博朋克老友“码农老炮儿”。今天咱们来聊聊 Node.js 分布式任务系统里一个挺有意思的话题:怎么用 Redis 的 Pub/Sub(发布/订阅)机制来实现任务的实时分发和状态同步。这玩意儿在实际开发中用处可大了,特别是对于那些需要构建实时任务处理系统的 Node.js 开发者来说,绝对是居家旅行必备良药。

为啥要用 Redis Pub/Sub?

在分布式系统中,任务的分发和状态同步是个老大难问题。你想啊,一堆任务扔过来,怎么保证每个任务都能被可靠地执行,而且执行状态还能实时反馈?传统的轮询方式效率太低,消息队列又太重,这时候 Redis Pub/Sub 就闪亮登场了。

Redis Pub/Sub 有以下几个优点:

  1. 轻量级:Redis 本身就以高性能著称,Pub/Sub 又是它自带的功能,不需要额外的组件,用起来非常方便。
  2. 实时性:消息发布后,订阅者几乎可以立即收到,延迟非常低。
  3. 简单易用:Redis 的 API 设计得非常简洁,几行代码就能搞定 Pub/Sub 的基本功能。
  4. 支持多订阅者:一个频道可以有多个订阅者,方便实现任务的广播。

当然,Redis Pub/Sub 也有一些缺点,主要是:

  1. 消息不可靠:如果订阅者不在线,消息就丢失了,Redis 不会持久化 Pub/Sub 消息。
  2. 不支持消息确认:发布者不知道订阅者是否成功处理了消息。

所以,在使用 Redis Pub/Sub 的时候,我们需要根据实际情况权衡利弊,并采取一些措施来弥补它的不足。接下来,我们就来详细聊聊怎么做。

实战:搭建一个简单的任务分发系统

为了让大家更好地理解,咱们来动手搭建一个简单的任务分发系统。这个系统包含两个角色:

  1. 任务发布者(Publisher):负责发布任务。
  2. 任务消费者(Subscriber):负责订阅任务并执行。

1. 环境准备

首先,确保你已经安装了 Node.js 和 Redis。然后,创建一个新的 Node.js 项目,并安装 Redis 客户端库:

npm init -y
npm install redis

2. 任务发布者(Publisher)

// publisher.js
const redis = require('redis');
// 创建 Redis 客户端
const publisher = redis.createClient();
(async () => {
await publisher.connect();
// 模拟任务发布
let taskId = 1;
setInterval(async () => {
const task = {
id: taskId++,
data: `任务数据-${taskId}`,
};
// 发布任务到 'task_queue' 频道
await publisher.publish('task_queue', JSON.stringify(task));
console.log(`发布任务:${JSON.stringify(task)}`);
}, 1000);
})();

这段代码创建了一个 Redis 客户端,然后每隔 1 秒发布一个任务到名为 task_queue 的频道。任务数据是一个简单的 JSON 对象。

3. 任务消费者(Subscriber)

// subscriber.js
const redis = require('redis');
// 创建 Redis 客户端
const subscriber = redis.createClient();
(async () => {
await subscriber.connect();
// 订阅 'task_queue' 频道
await subscriber.subscribe('task_queue', (message) => {
const task = JSON.parse(message);
console.log(`收到任务:${JSON.stringify(task)}`);
// 模拟任务处理
setTimeout(() => {
console.log(`任务 ${task.id} 处理完成`);
}, 500);
});
console.log('开始监听任务...');
})();

这段代码创建了另一个 Redis 客户端,并订阅了 task_queue 频道。当收到消息时,它会解析 JSON 数据,模拟任务处理,并在 500 毫秒后打印任务完成的消息。

4. 运行测试

分别运行 publisher.jssubscriber.js

node publisher.js
node subscriber.js

你会看到发布者不断发布任务,消费者不断接收并处理任务。

进阶:解决消息可靠性和消费者竞争问题

上面的例子只是一个简单的演示,实际应用中还需要考虑消息可靠性和消费者竞争的问题。

1. 消息可靠性

Redis Pub/Sub 不保证消息可靠性,如果订阅者不在线,消息就丢失了。为了解决这个问题,我们可以结合 Redis 的 List 数据结构来实现一个简单的任务队列。

  • 发布者:发布任务时,除了使用 publish 发布消息外,还使用 lpush 将任务添加到 List 中。
  • 消费者:启动时,先尝试从 List 中使用rpop获取任务(这样可以处理离线期间积压的任务)。如果没有获取到任务,再使用subscribe订阅频道。
// publisher.js (改进版)
const redis = require('redis');
const publisher = redis.createClient();
(async () => {
await publisher.connect();
let taskId = 1;
setInterval(async () => {
const task = {
id: taskId++,
data: `任务数据-${taskId}`,
};
await publisher.publish('task_queue', JSON.stringify(task));
// 将任务添加到 List
await publisher.lPush('task_list', JSON.stringify(task));
console.log(`发布任务:${JSON.stringify(task)}`);
}, 1000);
})();
// subscriber.js (改进版)
const redis = require('redis');
const subscriber = redis.createClient();
(async () => {
await subscriber.connect();
const processTask = async (task) => {
console.log(`收到任务:${JSON.stringify(task)}`);
setTimeout(() => {
console.log(`任务 ${task.id} 处理完成`);
}, 500);
}
// 优先从 List 中获取任务
const getTaskFromList = async()=>{
let task = await subscriber.rPop('task_list');
while(task){
await processTask(JSON.parse(task));
task = await subscriber.rPop('task_list');
}
}
await getTaskFromList();
await subscriber.subscribe('task_queue', async (message) => {
const task = JSON.parse(message);
await processTask(task);
});
console.log('开始监听任务...');
})();

通过这种方式,即使消费者离线,任务也会保存在List中,消费者上线后可以继续处理。

2. 消费者竞争

如果有多个消费者同时订阅同一个频道,它们会收到相同的消息,导致任务重复执行。为了解决这个问题,我们可以使用 Redis 的 SETNX 命令来实现一个简单的分布式锁。

  • 消费者:在处理消息之前,先尝试获取锁(SETNX)。如果获取成功,表示当前消费者可以处理该任务;否则,表示其他消费者正在处理,当前消费者忽略该消息。
// subscriber.js (再次改进版)
const redis = require('redis');
const subscriber = redis.createClient();
(async () => {
await subscriber.connect();
const processTask = async (task) => {
console.log(`收到任务:${JSON.stringify(task)}`);
setTimeout(() => {
console.log(`任务 ${task.id} 处理完成`);
// 释放锁
subscriber.del(`task_lock:${task.id}`);
}, 500);
};
const getTaskFromList = async () => {
let task = await subscriber.rPop('task_list');
while (task) {
const taskObj = JSON.parse(task);
// 尝试获取锁
const lock = await subscriber.setNX(`task_lock:${taskObj.id}`, '1');
if (lock) {
await subscriber.expire(`task_lock:${taskObj.id}`,10); //设置过期时间
await processTask(taskObj);
}
task = await subscriber.rPop('task_list');
}
};
await getTaskFromList();
await subscriber.subscribe('task_queue', async (message) => {
const task = JSON.parse(message);
// 尝试获取锁
const lock = await subscriber.setNX(`task_lock:${task.id}`, '1');
if (lock) {
await subscriber.expire(`task_lock:${task.id}`,10); //设置过期时间
await processTask(task);
}
});
console.log('开始监听任务...');
})();

通过分布式锁,我们可以确保同一时刻只有一个消费者处理同一个任务。

总结与进阶

今天我们学习了如何使用 Redis Pub/Sub 机制在 Node.js 分布式任务系统中实现实时任务分发和状态同步。我们通过一个简单的例子演示了基本用法,并针对消息可靠性和消费者竞争问题提出了解决方案。

当然,这只是一个入门级的方案,实际应用中还需要考虑更多因素,例如:

  • 任务优先级:如何让重要的任务优先执行?
  • 任务重试:如果任务执行失败,如何自动重试?
  • 任务监控:如何监控任务的执行状态,及时发现问题?
  • 水平扩展:如何让系统支持更多的任务和消费者?
  • 更完善的错误处理: 包括网络错误、Redis连接错误、任务处理过程中的错误等。

这些问题都需要我们在实践中不断探索和优化。希望今天的分享能给大家带来一些启发,让大家在构建 Node.js 分布式任务系统时更加得心应手。

如果你有任何问题或者想法,欢迎在评论区留言,咱们一起交流学习!

码农老炮儿 Node.jsRedis分布式任务

评论点评

打赏赞助
sponsor

感谢您的支持让我们更好的前行

分享

QRcode

https://www.webkt.com/article/7947