Appearance
异步迭代器:for await...of 的实现
当你需要遍历一个异步数据源时,普通的for...of无法处理Promise序列。ES2018引入了异步迭代器(Async Iterator)和for await...of语法,让异步遍历变得简洁优雅。V8如何实现这个特性?它与同步迭代器有什么区别?本章将揭示异步迭代的底层机制。
从同步迭代器说起
理解异步迭代器,需要先回顾同步迭代器:
javascript
// 同步迭代器回顾
class SyncIteratorReview {
static demonstrate() {
console.log('=== 同步迭代器协议 ===\n');
console.log('迭代器协议要求:');
console.log(' • 实现next()方法');
console.log(' • 返回{value, done}对象');
console.log(' • done为true表示迭代结束\n');
console.log('可迭代协议要求:');
console.log(' • 实现[Symbol.iterator]()方法');
console.log(' • 返回一个迭代器对象\n');
}
static demonstrateExample() {
console.log('=== 同步迭代器示例 ===\n');
// 自定义可迭代对象
const range = {
from: 1,
to: 3,
[Symbol.iterator]() {
let current = this.from;
const last = this.to;
return {
next() {
if (current <= last) {
return { value: current++, done: false };
}
return { value: undefined, done: true };
}
};
}
};
console.log('使用for...of遍历:');
for (const num of range) {
console.log(' ', num);
}
console.log('');
}
static runAll() {
this.demonstrate();
this.demonstrateExample();
}
}
SyncIteratorReview.runAll();异步迭代器协议
异步迭代器的定义:
javascript
// 异步迭代器协议
class AsyncIteratorProtocol {
static demonstrate() {
console.log('=== 异步迭代器协议 ===\n');
console.log('异步迭代器协议:');
console.log(' • 实现next()方法');
console.log(' • 返回Promise<{value, done}>');
console.log(' • Promise解析为{value, done}对象\n');
console.log('异步可迭代协议:');
console.log(' • 实现[Symbol.asyncIterator]()方法');
console.log(' • 返回一个异步迭代器对象\n');
console.log('对比:');
console.log('');
console.log(' 同步迭代器:');
console.log(' next() → {value, done}');
console.log('');
console.log(' 异步迭代器:');
console.log(' next() → Promise<{value, done}>');
console.log('');
}
static async demonstrateExample() {
console.log('=== 异步迭代器示例 ===\n');
// 自定义异步可迭代对象
const asyncRange = {
from: 1,
to: 3,
[Symbol.asyncIterator]() {
let current = this.from;
const last = this.to;
return {
async next() {
// 模拟异步操作
await new Promise(r => setTimeout(r, 100));
if (current <= last) {
return { value: current++, done: false };
}
return { value: undefined, done: true };
}
};
}
};
console.log('使用for await...of遍历:');
for await (const num of asyncRange) {
console.log(' ', num);
}
console.log('');
}
static async runAll() {
this.demonstrate();
await this.demonstrateExample();
}
}
AsyncIteratorProtocol.runAll();for await...of的转换
V8如何处理for await...of:
javascript
// for await...of转换
class ForAwaitOfTransformation {
static demonstrate() {
console.log('=== for await...of的转换 ===\n');
console.log('原始代码:');
console.log(`
async function consume(asyncIterable) {
for await (const item of asyncIterable) {
console.log(item);
}
}
`);
console.log('转换后的等效代码:');
console.log(`
async function consume(asyncIterable) {
// 获取异步迭代器
const iterator = asyncIterable[Symbol.asyncIterator]();
try {
while (true) {
// 调用next()并等待Promise
const result = await iterator.next();
if (result.done) {
break;
}
const item = result.value;
console.log(item);
}
} finally {
// 如果有return方法,调用它进行清理
if (iterator.return) {
await iterator.return();
}
}
}
`);
}
static demonstrateEarlyExit() {
console.log('=== 提前退出的处理 ===\n');
console.log('当使用break或throw退出时:');
console.log(`
for await (const item of asyncIterable) {
if (someCondition) {
break; // 会调用iterator.return()
}
if (errorCondition) {
throw new Error(); // 也会调用iterator.return()
}
}
`);
console.log('return()方法的作用:');
console.log(' • 释放资源(如关闭文件、断开连接)');
console.log(' • 通知生产者停止生成数据');
console.log(' • 清理迭代器内部状态\n');
}
static runAll() {
this.demonstrate();
this.demonstrateEarlyExit();
}
}
ForAwaitOfTransformation.runAll();V8中的实现
V8如何处理异步迭代:
javascript
// V8实现
class V8AsyncIteratorImplementation {
static demonstrate() {
console.log('=== V8异步迭代器处理 ===\n');
console.log('for await...of在字节码层面:');
console.log(`
// 获取迭代器
GetIterator <asyncIterable>, ASYNC
// 循环开始
loop:
// 调用next()
Call iterator.next()
// 等待Promise
Await
// 检查done
JumpIfTrue result.done, end
// 获取value
GetProperty result, "value"
// 执行循环体
...
// 继续循环
Jump loop
end:
// 清理
CallIfPresent iterator.return()
`);
}
static demonstrateAsyncFromSyncIterator() {
console.log('=== AsyncFromSyncIterator ===\n');
console.log('同步迭代器在for await中的处理:');
console.log(`
const syncArray = [1, 2, 3];
// 可以用for await遍历同步可迭代对象
for await (const item of syncArray) {
console.log(item);
}
`);
console.log('V8内部处理:');
console.log(' • 检测到同步迭代器');
console.log(' • 创建AsyncFromSyncIterator包装器');
console.log(' • 包装器的next()返回Promise');
console.log(' • 兼容同步和异步迭代\n');
console.log('AsyncFromSyncIterator伪代码:');
console.log(`
class AsyncFromSyncIterator {
constructor(syncIterator) {
this.syncIterator = syncIterator;
}
async next() {
const result = this.syncIterator.next();
return {
value: await result.value,
done: result.done
};
}
}
`);
}
static runAll() {
this.demonstrate();
this.demonstrateAsyncFromSyncIterator();
}
}
V8AsyncIteratorImplementation.runAll();异步生成器
创建异步迭代器的简便方式:
javascript
// 异步生成器
class AsyncGenerators {
static demonstrate() {
console.log('=== 异步生成器语法 ===\n');
console.log('异步生成器结合了:');
console.log(' • async函数的await能力');
console.log(' • 生成器的yield能力');
console.log(' • 自动实现异步迭代器协议\n');
console.log('语法:');
console.log(`
async function* asyncGenerator() {
yield await asyncOperation1();
yield await asyncOperation2();
yield await asyncOperation3();
}
`);
}
static async demonstrateExample() {
console.log('=== 异步生成器示例 ===\n');
// 定义异步生成器
async function* fetchPages(urls) {
for (const url of urls) {
// 模拟fetch
await new Promise(r => setTimeout(r, 100));
yield { url, data: `Data from ${url}` };
}
}
const urls = ['/page1', '/page2', '/page3'];
console.log('遍历异步生成器:');
for await (const page of fetchPages(urls)) {
console.log(' 获取:', page.url);
}
console.log('');
}
static demonstrateYieldAndAwait() {
console.log('=== yield和await的配合 ===\n');
console.log('三种yield模式:');
console.log(`
async function* example() {
// 1. yield普通值
yield 1;
// 2. yield Promise(自动await)
yield Promise.resolve(2);
// 3. await后yield
const data = await fetchData();
yield data;
// 4. yield* 委托给另一个异步迭代器
yield* anotherAsyncIterable;
}
`);
}
static async runAll() {
this.demonstrate();
await this.demonstrateExample();
this.demonstrateYieldAndAwait();
}
}
AsyncGenerators.runAll();实际应用场景
异步迭代器的典型用途:
javascript
// 实际应用
class RealWorldApplications {
static demonstrateStreamProcessing() {
console.log('=== 流数据处理 ===\n');
console.log('示例:处理大文件');
console.log(`
async function* readLargeFile(filePath) {
const stream = fs.createReadStream(filePath);
for await (const chunk of stream) {
yield chunk;
}
}
async function processFile() {
for await (const chunk of readLargeFile('large.txt')) {
await processChunk(chunk);
}
}
`);
console.log('优势:');
console.log(' • 内存友好,不需要一次加载全部');
console.log(' • 可以处理无限数据流');
console.log(' • 背压控制(等待处理完再读取下一块)\n');
}
static demonstratePagination() {
console.log('=== 分页数据获取 ===\n');
console.log('示例:API分页');
console.log(`
async function* fetchAllPages(baseUrl) {
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await fetch(baseUrl + '?page=' + page);
const data = await response.json();
yield* data.items; // 展开本页所有项
hasMore = data.hasMore;
page++;
}
}
// 使用
for await (const item of fetchAllPages('/api/items')) {
console.log(item);
// 所有分页数据像单个列表一样处理
}
`);
}
static demonstrateWebSocket() {
console.log('=== WebSocket消息流 ===\n');
console.log('示例:WebSocket消息迭代');
console.log(`
async function* websocketMessages(url) {
const ws = new WebSocket(url);
const messages = [];
let resolve;
ws.onmessage = (event) => {
messages.push(event.data);
if (resolve) {
resolve();
resolve = null;
}
};
try {
while (ws.readyState === WebSocket.OPEN) {
if (messages.length === 0) {
await new Promise(r => resolve = r);
}
yield messages.shift();
}
} finally {
ws.close();
}
}
// 使用
for await (const message of websocketMessages('ws://...')) {
handleMessage(message);
}
`);
}
static runAll() {
this.demonstrateStreamProcessing();
this.demonstratePagination();
this.demonstrateWebSocket();
}
}
RealWorldApplications.runAll();手动实现异步迭代器
深入理解内部机制:
javascript
// 手动实现
class ManualImplementation {
static async demonstrate() {
console.log('=== 手动实现异步可迭代对象 ===\n');
// 创建一个异步队列
class AsyncQueue {
constructor() {
this.queue = [];
this.waiters = [];
this.closed = false;
}
// 添加数据
push(value) {
if (this.waiters.length > 0) {
const waiter = this.waiters.shift();
waiter.resolve({ value, done: false });
} else {
this.queue.push(value);
}
}
// 关闭队列
close() {
this.closed = true;
for (const waiter of this.waiters) {
waiter.resolve({ value: undefined, done: true });
}
this.waiters = [];
}
// 实现异步迭代器协议
[Symbol.asyncIterator]() {
return this;
}
async next() {
if (this.queue.length > 0) {
return { value: this.queue.shift(), done: false };
}
if (this.closed) {
return { value: undefined, done: true };
}
// 等待新数据
return new Promise((resolve) => {
this.waiters.push({ resolve });
});
}
}
// 测试
const queue = new AsyncQueue();
// 生产者
setTimeout(() => queue.push(1), 100);
setTimeout(() => queue.push(2), 200);
setTimeout(() => queue.push(3), 300);
setTimeout(() => queue.close(), 400);
// 消费者
console.log('消费队列数据:');
for await (const item of queue) {
console.log(' 收到:', item);
}
console.log(' 队列关闭\n');
}
static runAll() {
this.demonstrate();
}
}
ManualImplementation.runAll();错误处理
异步迭代中的错误处理:
javascript
// 错误处理
class ErrorHandling {
static demonstrate() {
console.log('=== 异步迭代器错误处理 ===\n');
console.log('1. try-catch包裹整个循环:');
console.log(`
try {
for await (const item of asyncIterable) {
process(item);
}
} catch (error) {
// 捕获迭代器或处理过程中的错误
console.error('迭代错误:', error);
}
`);
console.log('2. 循环内部处理:');
console.log(`
for await (const item of asyncIterable) {
try {
await process(item);
} catch (error) {
// 只捕获处理错误,继续迭代
console.error('处理错误:', error);
}
}
`);
}
static demonstrateThrowMethod() {
console.log('=== throw方法 ===\n');
console.log('异步迭代器可以实现throw方法:');
console.log(`
const iterator = {
[Symbol.asyncIterator]() {
return this;
},
async next() {
return { value: 1, done: false };
},
async throw(error) {
// 处理外部抛入的错误
console.log('收到错误:', error);
return { value: undefined, done: true };
}
};
`);
console.log('用途:');
console.log(' • 通知迭代器发生错误');
console.log(' • 允许迭代器进行清理');
console.log(' • 优雅地终止迭代\n');
}
static runAll() {
this.demonstrate();
this.demonstrateThrowMethod();
}
}
ErrorHandling.runAll();性能考量
异步迭代的性能特点:
javascript
// 性能考量
class PerformanceConsiderations {
static demonstrate() {
console.log('=== 异步迭代性能 ===\n');
console.log('开销分析:');
console.log(' • 每次迭代至少一个微任务');
console.log(' • 异步迭代器对象创建开销');
console.log(' • Promise包装和解析开销\n');
console.log('与同步迭代对比:');
console.log(`
// 同步迭代:非常快
for (const item of syncArray) {
process(item);
}
// 异步迭代:每项一个微任务
for await (const item of asyncArray) {
await process(item);
}
`);
}
static demonstrateBatching() {
console.log('=== 批量处理优化 ===\n');
console.log('问题:逐项处理太慢');
console.log(`
// 每个项目一次网络请求
for await (const item of items) {
await sendToServer(item); // 串行,慢
}
`);
console.log('优化:批量处理');
console.log(`
async function* batchItems(items, batchSize = 10) {
let batch = [];
for await (const item of items) {
batch.push(item);
if (batch.length >= batchSize) {
yield batch;
batch = [];
}
}
if (batch.length > 0) {
yield batch;
}
}
// 使用批量迭代
for await (const batch of batchItems(items)) {
await sendBatchToServer(batch); // 批量发送,快
}
`);
}
static demonstrateParallelProcessing() {
console.log('=== 并行处理 ===\n');
console.log('有限并行度:');
console.log(`
async function processWithConcurrency(asyncIterable, processor, concurrency = 3) {
const executing = new Set();
for await (const item of asyncIterable) {
const promise = processor(item).then(() => {
executing.delete(promise);
});
executing.add(promise);
if (executing.size >= concurrency) {
await Promise.race(executing);
}
}
await Promise.all(executing);
}
`);
}
static runAll() {
this.demonstrate();
this.demonstrateBatching();
this.demonstrateParallelProcessing();
}
}
PerformanceConsiderations.runAll();与其他异步模式的结合
异步迭代器与其他模式配合:
javascript
// 与其他模式结合
class CombiningPatterns {
static demonstrate() {
console.log('=== 异步迭代器与RxJS ===\n');
console.log('Observable转异步迭代器:');
console.log(`
async function* fromObservable(observable) {
const queue = [];
let resolve;
let done = false;
observable.subscribe({
next(value) {
queue.push(value);
if (resolve) {
resolve();
resolve = null;
}
},
complete() {
done = true;
if (resolve) resolve();
}
});
while (!done || queue.length > 0) {
if (queue.length === 0) {
await new Promise(r => resolve = r);
}
if (queue.length > 0) {
yield queue.shift();
}
}
}
`);
}
static demonstrateWithAbortController() {
console.log('=== 与AbortController配合 ===\n');
console.log('可取消的异步迭代:');
console.log(`
async function* cancellableIterable(signal) {
let count = 0;
while (!signal.aborted) {
await new Promise(r => setTimeout(r, 100));
yield count++;
}
}
// 使用
const controller = new AbortController();
setTimeout(() => controller.abort(), 500);
for await (const num of cancellableIterable(controller.signal)) {
console.log(num);
}
`);
}
static runAll() {
this.demonstrate();
this.demonstrateWithAbortController();
}
}
CombiningPatterns.runAll();本章小结
本章探讨了异步迭代器的实现机制和应用场景。核心要点包括:
异步迭代器协议:next()方法返回
Promise<{value, done}>,通过Symbol.asyncIterator标识可异步迭代对象。for await...of转换:V8将其转换为while循环,每次await iterator.next()的结果,支持提前退出和资源清理。
异步生成器:使用
async function*语法,结合await和yield,简化异步迭代器的创建。实际应用:适用于流数据处理、分页数据获取、WebSocket消息流等场景。
错误处理:支持try-catch捕获,迭代器可实现throw方法处理外部错误。
性能优化:注意每次迭代的微任务开销,考虑批量处理和并行度控制。
兼容性:for await...of可以遍历同步可迭代对象,V8会自动包装。
异步迭代器为处理异步数据序列提供了优雅的解决方案。下一章我们将探讨Node.js中事件循环的特殊实现。