
本文旨在探讨在ExpressJs应用中如何高效地并发执行多个异步任务,并确保所有Promise都已完成处理后再向客户端发送响应。我们将重点介绍async/await语法与Promise.all()的结合使用,优化异步代码的可读性和健壮性,同时提供错误处理的最佳实践,以确保API行为符合预期。
异步操作与ExpressJs响应机制
在Node.%ignore_a_1%和ExpressJs开发中,处理I/O密集型或网络请求等异步操作是常态。当一个API请求需要触发多个独立的异步任务(例如,并发请求外部服务并写入文件)时,我们通常希望在所有这些任务都完成后再向客户端返回最终结果或确认信息。然而,如果不正确地管理这些异步操作,服务器可能会在所有任务完成之前就发送响应,导致数据不一致或客户端获取到不完整的信息。
问题分析:为何await Promise.all()未生效
开发者在使用async/await和Promise.all()时,常遇到的一个核心问题是,尽管代码中包含了await Promise.all(tasks);,但Express路由处理函数似乎并未等待所有任务完成。这通常是由于以下两个关键点被忽略:
Express路由处理函数必须标记为async: await关键字只能在async函数内部使用。如果Express的路由处理函数(例如app.post(‘/’, (req, res) => { … }))没有被标记为async,那么其中的await语句将不会真正暂停函数的执行,而是会直接被解析为一个普通的表达式,导致后续代码立即执行。Promise的正确返回与错误传播: 确保所有被Promise.all()聚合的Promise都能正确地返回(resolve)或拒绝(reject),并且错误能够被有效地传播。原始的processTask函数在某些情况下可能没有正确地拒绝Promise,或者在fs.writeFile的回调中没有处理错误,导致Promise链断裂或无法被Promise.all()捕获。
解决方案:使用async/await重构异步逻辑
为了确保Express路由能够正确等待所有并发的Promise完成,我们需要对代码进行两方面的优化:
1. 优化 processTask 函数
原始的processTask函数使用了new Promise构造函数和嵌套的.then().catch(),这在现代JavaScript中通常可以通过async/await来简化。同时,需要确保文件写入操作的错误也能被捕获并拒绝Promise。
原始 processTask 示例(问题中的第一版):
function processTask(task: Task, configs: Configs) { return new Promise((resolve, reject) => { try { const fileName = './output/' + task.tag + 's.json'; fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, { method: 'GET' }).then(result => { result.json().then(jsonResult => { fs.writeFile(fileName, JSON.stringify(jsonResult), function () { // 缺少错误处理 console.log('finished writing :' + fileName); resolve(); }); }).catch(err => reject(err)); }).catch(err => reject(err)); } catch (err) { console.log(err); // 这里的错误不会拒绝外部Promise } });}
优化后的 processTask 函数:
使用async/await和fs.promises模块可以大大简化代码,并提供更清晰的错误处理机制。
import * as fs from 'fs/promises'; // 导入fs.promisesasync function processTask(task: Task, configs: Configs): Promise { try { const fileName = './output/' + task.tag + 's.json'; // 使用 await 等待 fetch 请求完成 const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, { method: 'GET' }); // 使用 await 等待 JSON 解析完成 const jsonResult = await result.json(); // 使用 fs.promises.writeFile 写入文件,它返回一个 Promise await fs.writeFile(fileName, JSON.stringify(jsonResult)); console.log('finished writing :' + fileName); } catch (err) { // 捕获任何发生在 fetch、json解析或文件写入过程中的错误 console.error(`Error processing task ${task.tag}:`, err); // 重新抛出错误,以便 Promise.all 能够捕获到它 throw err; }}
注意事项:
async函数默认返回一个Promise。当函数正常执行完毕时,Promise会以undefined(如果函数没有明确return值)或return的值来resolve。当async函数内部抛出错误时,它返回的Promise会自动reject。我们使用了fs.promises模块,它提供了Promise版本的Node.js文件系统API,避免了回调地狱。
2. 优化 Express 路由处理函数
Express路由处理函数必须被标记为async,才能在其内部正确使用await。
原始 Express 路由处理函数示例(问题中的第二版):
app.post('/', (req: Request, res: Response) => { // 缺少 async 关键字 const tasksRequest = req.body as TasksRequest; let tasks = [] tasks = tasksRequest.tasks.map( (t) => processTask(t, tasksRequest.configs)); console.log(tasks); Promise.all(tasks).then(res=>{ // 缺少 await console.log('After awaiting'); });});
优化后的 Express 路由处理函数:
import { Request, Response } from 'express'; // 假设类型定义app.post('/', async (req: Request, res: Response) => { // 关键:添加 async 关键字 const tasksRequest = req.body as TasksRequest; let tasks: Promise[] = []; // 明确 Promise 类型 try { tasks = tasksRequest.tasks.map((t) => processTask(t, tasksRequest.configs)); console.log('Starting all tasks...'); // 关键:使用 await Promise.all() 等待所有 Promise 完成 await Promise.all(tasks); console.log('After awaiting all tasks.'); // 所有任务完成后,发送成功响应 res.status(200).json({ message: 'All tasks processed successfully.' }); } catch (error) { console.error('An error occurred during task processing:', error); // 如果任何一个 Promise 拒绝,Promise.all 会立即拒绝 // 在这里发送错误响应 res.status(500).json({ message: 'Failed to process some tasks.', error: error.message }); }});
注意事项:
app.post(‘/’, async (req, res) => { … })是确保await在路由处理函数中生效的关键。await Promise.all(tasks);会暂停当前async函数的执行,直到tasks数组中的所有Promise都成功解决,或者其中任何一个Promise被拒绝。当Promise.all()中的任何一个Promise被拒绝时,Promise.all()自身也会立即拒绝,并抛出第一个拒绝的原因。因此,使用try…catch块来捕获潜在的错误并向客户端发送适当的错误响应至关重要。
完整示例代码
结合上述优化,一个完整的、健壮的Express路由处理并发异步任务的示例如下:
import express, { Request, Response } from 'express';import * as fs from 'fs/promises'; // 导入fs.promisesconst app = express();app.use(express.json()); // 用于解析请求体// 假设的类型定义interface Task { tag: string; parentResource: string; mostRelatedPath: string;}interface Configs { Host: string; APIsBasePrefix: string;}interface TasksRequest { tasks: Task[]; configs: Configs;}// 异步处理单个任务的函数async function processTask(task: Task, configs: Configs): Promise { try { const fileName = `./output/${task.tag}s.json`; // 使用模板字符串更简洁 // 模拟外部 API 请求 const result = await fetch(configs.Host + configs.APIsBasePrefix + task.parentResource + task.mostRelatedPath, { method: 'GET' }); if (!result.ok) { throw new Error(`HTTP error! status: ${result.status}`); } const jsonResult = await result.json(); // 确保 output 目录存在 const outputDir = './output'; await fs.mkdir(outputDir, { recursive: true }); // 写入文件 await fs.writeFile(fileName, JSON.stringify(jsonResult, null, 2)); // 美化JSON输出 console.log(`Finished writing: ${fileName}`); } catch (err) { console.error(`Error processing task ${task.tag}:`, err); // 重新抛出错误,让调用者(Promise.all)能够捕获 throw err; }}// Express POST 路由处理函数app.post('/', async (req: Request, res: Response) => { const tasksRequest = req.body as TasksRequest; if (!tasksRequest || !tasksRequest.tasks || !Array.isArray(tasksRequest.tasks) || !tasksRequest.configs) { return res.status(400).json({ message: 'Invalid request body.' }); } const tasksPromises: Promise[] = []; try { // 为每个任务创建并收集 Promise for (const t of tasksRequest.tasks) { tasksPromises.push(processTask(t, tasksRequest.configs)); } console.log(`Processing ${tasksPromises.length} tasks concurrently...`); // 等待所有任务 Promise 完成 await Promise.all(tasksPromises); console.log('All tasks completed successfully.'); // 所有任务成功完成,发送成功响应 res.status(200).json({ message: 'All tasks processed successfully.' }); } catch (error: any) { // 捕获 Promise.all 中任何一个任务的错误 console.error('An error occurred during concurrent task processing:', error); res.status(500).json({ message: 'Failed to process some tasks.', error: error.message, details: error.stack // 生产环境不建议直接暴露堆栈信息 }); }});const PORT = process.env.PORT || 3000;app.listen(PORT, () => { console.log(`Server running on port ${PORT}`);});// 示例用法 (假设在其他地方调用此API)// curl -X POST -H "Content-Type: application/json" -d '{// "tasks": [// {"tag": "user", "parentResource": "/api/v1/", "mostRelatedPath": "users"},// {"tag": "product", "parentResource": "/api/v1/", "mostRelatedPath": "products"}// ],// "configs": {// "Host": "https://jsonplaceholder.typicode.com",// "APIsBasePrefix": "/"// }// }' http://localhost:3000/
总结与最佳实践
在ExpressJs中处理并发异步任务并确保所有Promise完成,核心在于正确利用JavaScript的async/await语法和Promise.all()方法:
标记async路由处理函数: 任何包含await关键字的Express路由处理函数都必须用async关键字标记。优化异步函数: 将复杂的.then().catch()链重构为更简洁、更易读的async/await模式。使用Promise.all()并发执行: 对于多个相互独立的异步任务,使用Promise.all()可以高效地并发执行它们,并等待所有任务完成。健壮的错误处理: 在async函数内部使用try…catch捕获并传播错误。在Express路由处理函数中,使用try…catch包裹await Promise.all(),以便在任何一个并发任务失败时能够捕获错误并向客户端发送适当的错误响应(例如500 Internal Server Error)。利用Promise-based API: 优先使用返回Promise的API(如fetch、fs.promises),而不是基于回调的API,以更好地融入async/await生态。
通过遵循这些实践,开发者可以构建出更稳定、更易维护的ExpressJs应用,有效管理复杂的异步流程。
以上就是ExpressJs中并发处理异步任务并等待所有Promise完成的详细内容,更多请关注创想鸟其它相关文章!
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 chuangxiangniao@163.com 举报,一经查实,本站将立刻删除。
发布者:程序猿,转转请注明出处:https://www.chuangxiangniao.com/p/1520466.html
微信扫一扫
支付宝扫一扫