在 Node.js 开发中,经常需要同时处理多个独立的异步任务,例如读取配置文件、调用外部接口、批量上传文件等。这些任务彼此独立,但最终结果需要协同处理。如果逐个 await,性能会大打折扣;如果盲目并发,又可能引发资源耗尽或错误处理混乱。本文基于真实项目场景,介绍 7 种成熟的异步协作方案,每种都配有可直接运行的代码示例。
1. Promise.all——应用启动加载必要配置
场景:服务启动时必须读取数据库、Redis 和第三方密钥三个配置文件,任何一个缺失或格式错误都不能继续启动。- const fs = require('fs').promises;
- async function loadConfigs() {
- const [db, redis, secrets] = await Promise.all([
- fs.readFile('./config/db.json', 'utf8').then(JSON.parse),
- fs.readFile('./config/redis.json', 'utf8').then(JSON.parse),
- fs.readFile('./config/secrets.json', 'utf8').then(JSON.parse)
- ]);
- console.log('所有配置加载完成', { db, redis, secrets });
- }
复制代码 特点:全成功或全失败,结果以数组顺序返回。适合“缺一不可”的场景。
2. Promise.allSettled——批量同步用户数据到多个外部系统
场景:用户更新个人资料后,需要同步到 CRM、邮件服务、推送系统。允许个别失败,但要记录失败原因,后续重试。- async function syncUserToExternal(user) {
- const tasks = [
- syncToCRM(user),
- syncToEmailService(user),
- syncToPushService(user)
- ];
- const results = await Promise.allSettled(tasks);
- const failed = results.filter(r => r.status === 'rejected');
- if (failed.length) {
- console.error(`同步失败 ${failed.length} 个系统`, failed.map(f => f.reason));
- // 将失败记录到数据库,等待重试队列处理
- }
- return results;
- }
复制代码 特点:等待所有任务完成,无论成功或失败,都能拿到每个任务的最终状态。
3. Promise.race——HTTP 请求超时控制
场景:调用外部 API,必须在 3 秒内返回结果,否则自动降级使用缓存数据。- function fetchWithTimeout(url, timeout = 3000) {
- const controller = new AbortController();
- const fetchPromise = fetch(url, { signal: controller.signal });
- const timeoutPromise = new Promise((_, reject) =>
- setTimeout(() => {
- controller.abort();
- reject(new Error('请求超时'));
- }, timeout)
- );
- return Promise.race([fetchPromise, timeoutPromise]);
- }
- try {
- const data = await fetchWithTimeout('https://slow-api.example.com/data', 3000);
- console.log(data);
- } catch (err) {
- console.log('使用缓存数据');
- }
复制代码 特点:只取最先完成的那个结果(成功或失败)。常用于超时控制、多源竞速。
4. Promise.any——多 CDN 资源容灾加载
场景:前端静态资源部署在三个 CDN 上,只要任意一个 CDN 返回成功,就使用该资源,忽略失败的 CDN。- async function loadScriptFromCDNs(urls) {
- const fetchTasks = urls.map(url => fetch(url).then(res => {
- if (!res.ok) throw new Error(`HTTP ${res.status}`);
- return res.text();
- }));
- try {
- const scriptContent = await Promise.any(fetchTasks);
- eval(scriptContent); // 实际项目中建议使用更安全的方式
- console.log('脚本加载成功');
- } catch (aggregateError) {
- console.error('所有 CDN 均不可用', aggregateError.errors);
- }
- }
- loadScriptFromCDNs([
- 'https://cdn1.example.com/lib.js',
- 'https://cdn2.example.com/lib.js',
- 'https://cdn3.example.com/lib.js'
- ]);
复制代码 特点:只要有一个成功就返回,全部失败才抛出异常。非常适合冗余容灾设计。
5. 事件计数器——旧式多文件写入完成后合并压缩
场景:维护一个老项目(基于回调风格),需要等三个日志文件全部写入磁盘后,再执行合并压缩操作。- const EventEmitter = require('events');
- const fs = require('fs');
- class FileWriter extends EventEmitter {
- writeAndNotify(file, data) {
- fs.writeFile(file, data, (err) => {
- if (err) this.emit('error', err);
- else this.emit('done', file);
- });
- }
- }
- const writer = new FileWriter();
- let completed = 0;
- const total = 3;
- function onAllDone() {
- console.log('所有文件写入完成,开始合并压缩');
- }
- writer.on('done', (file) => {
- console.log(`${file} 写入完成`);
- if (++completed === total) onAllDone();
- });
- writer.writeAndNotify('log1.txt', 'data1');
- writer.writeAndNotify('log2.txt', 'data2');
- writer.writeAndNotify('log3.txt', 'data3');
复制代码 特点:原始但可控,适合无法使用 Promise 的旧环境或需要细粒度事件监听时使用。
6. 流式处理——实时聚合多个传感器数据流
场景:物联网网关接收温度、湿度、气压三个传感器的实时数据流,需要每收到一组(三个传感器各一个值)就计算平均值并推送。- const { fromEvent, merge, bufferCount } = require('rxjs');
- const { EventEmitter } = require('events');
- const sensorA = new EventEmitter();
- const sensorB = new EventEmitter();
- const sensorC = new EventEmitter();
- setInterval(() => sensorA.emit('data', Math.random() * 30), 1000);
- setInterval(() => sensorB.emit('data', Math.random() * 60), 1000);
- setInterval(() => sensorC.emit('data', Math.random() * 10), 1000);
- const streamA = fromEvent(sensorA, 'data');
- const streamB = fromEvent(sensorB, 'data');
- const streamC = fromEvent(sensorC, 'data');
- merge(streamA, streamB, streamC)
- .pipe(bufferCount(3))
- .subscribe(values => {
- const avg = values.reduce((a, b) => a + b, 0) / values.length;
- console.log(`实时平均传感器值: ${avg.toFixed(2)}`);
- });
复制代码 特点:适合结果逐步产生、需要实时响应的场景。RxJS 提供了强大的组合能力。
7. 队列控制并发——限制同时上传文件的数量
场景:用户一次选择了 100 个文件上传到云存储,必须控制同时上传的并发数为 5,避免网络拥塞和服务器压力过大。- const pLimit = require('p-limit');
- const fs = require('fs').promises;
- const path = require('path');
- async function uploadFile(filePath) {
- console.log(`开始上传 ${path.basename(filePath)}`);
- await new Promise(r => setTimeout(r, 1000)); // 模拟上传
- console.log(`完成上传 ${path.basename(filePath)}`);
- return filePath;
- }
- async function uploadAll(filePaths) {
- const limit = pLimit(5); // 最多5个并发
- const tasks = filePaths.map(filePath =>
- limit(() => uploadFile(filePath))
- );
- const results = await Promise.all(tasks);
- console.log(`全部上传完成,共 ${results.length} 个文件`);
- }
- const files = Array.from({ length: 100 }, (_, i) => `/tmp/file${i}.txt`);
- uploadAll(files);
复制代码 特点:既保证并发效率,又避免资源耗尽。配合 Promise.all 可以等待所有任务完成。
总结:
- 所有任务必须全部成功,结果一起使用 → Promise.all
- 容忍部分失败,但需要知道每个任务的状态 → Promise.allSettled
- 只取最快结果(如超时、多源竞速) → Promise.race
- 只要有一个成功即可,忽略失败 → Promise.any
- 旧项目回调风格或需要细粒度控制 → 事件计数器 / EventEmitter
- 结果流式输出、复杂组合(如传感器数据) → RxJS / 异步迭代器
- 大量任务且需控制并发数量 → 队列 + p-limit
在实际项目中,90% 的异步协作需求都可以用 Promise.all 和 Promise.allSettled 解决。对于更复杂的场景,再考虑流式处理或队列控制。掌握这些模式,你的 Node.js 异步编程能力将更上一层楼。 |