Skip to content

异步迭代器:for await...of 的实现

当你需要遍历一个异步数据源时,普通的for...of无法处理Promise序列。ES2018引入了异步迭代器(Async Iterator)和for await...of语法,让异步遍历变得简洁优雅。V8如何实现这个特性?它与同步迭代器有什么区别?本章将揭示异步迭代的底层机制。

从同步迭代器说起

理解异步迭代器,需要先回顾同步迭代器:

javascript
// 同步迭代器回顾
class SyncIteratorReview {
  static demonstrate() {
    console.log('=== 同步迭代器协议 ===\n');
    
    console.log('迭代器协议要求:');
    console.log('  • 实现next()方法');
    console.log('  • 返回{value, done}对象');
    console.log('  • done为true表示迭代结束\n');
    
    console.log('可迭代协议要求:');
    console.log('  • 实现[Symbol.iterator]()方法');
    console.log('  • 返回一个迭代器对象\n');
  }
  
  static demonstrateExample() {
    console.log('=== 同步迭代器示例 ===\n');
    
    // 自定义可迭代对象
    const range = {
      from: 1,
      to: 3,
      
      [Symbol.iterator]() {
        let current = this.from;
        const last = this.to;
        
        return {
          next() {
            if (current <= last) {
              return { value: current++, done: false };
            }
            return { value: undefined, done: true };
          }
        };
      }
    };
    
    console.log('使用for...of遍历:');
    for (const num of range) {
      console.log('  ', num);
    }
    console.log('');
  }
  
  static runAll() {
    this.demonstrate();
    this.demonstrateExample();
  }
}

SyncIteratorReview.runAll();

异步迭代器协议

异步迭代器的定义:

javascript
// 异步迭代器协议
class AsyncIteratorProtocol {
  static demonstrate() {
    console.log('=== 异步迭代器协议 ===\n');
    
    console.log('异步迭代器协议:');
    console.log('  • 实现next()方法');
    console.log('  • 返回Promise<{value, done}>');
    console.log('  • Promise解析为{value, done}对象\n');
    
    console.log('异步可迭代协议:');
    console.log('  • 实现[Symbol.asyncIterator]()方法');
    console.log('  • 返回一个异步迭代器对象\n');
    
    console.log('对比:');
    console.log('');
    console.log('  同步迭代器:');
    console.log('    next() → {value, done}');
    console.log('');
    console.log('  异步迭代器:');
    console.log('    next() → Promise<{value, done}>');
    console.log('');
  }
  
  static async demonstrateExample() {
    console.log('=== 异步迭代器示例 ===\n');
    
    // 自定义异步可迭代对象
    const asyncRange = {
      from: 1,
      to: 3,
      
      [Symbol.asyncIterator]() {
        let current = this.from;
        const last = this.to;
        
        return {
          async next() {
            // 模拟异步操作
            await new Promise(r => setTimeout(r, 100));
            
            if (current <= last) {
              return { value: current++, done: false };
            }
            return { value: undefined, done: true };
          }
        };
      }
    };
    
    console.log('使用for await...of遍历:');
    for await (const num of asyncRange) {
      console.log('  ', num);
    }
    console.log('');
  }
  
  static async runAll() {
    this.demonstrate();
    await this.demonstrateExample();
  }
}

AsyncIteratorProtocol.runAll();

for await...of的转换

V8如何处理for await...of:

javascript
// for await...of转换
class ForAwaitOfTransformation {
  static demonstrate() {
    console.log('=== for await...of的转换 ===\n');
    
    console.log('原始代码:');
    console.log(`
    async function consume(asyncIterable) {
      for await (const item of asyncIterable) {
        console.log(item);
      }
    }
    `);
    
    console.log('转换后的等效代码:');
    console.log(`
    async function consume(asyncIterable) {
      // 获取异步迭代器
      const iterator = asyncIterable[Symbol.asyncIterator]();
      
      try {
        while (true) {
          // 调用next()并等待Promise
          const result = await iterator.next();
          
          if (result.done) {
            break;
          }
          
          const item = result.value;
          console.log(item);
        }
      } finally {
        // 如果有return方法,调用它进行清理
        if (iterator.return) {
          await iterator.return();
        }
      }
    }
    `);
  }
  
  static demonstrateEarlyExit() {
    console.log('=== 提前退出的处理 ===\n');
    
    console.log('当使用break或throw退出时:');
    console.log(`
    for await (const item of asyncIterable) {
      if (someCondition) {
        break;  // 会调用iterator.return()
      }
      
      if (errorCondition) {
        throw new Error();  // 也会调用iterator.return()
      }
    }
    `);
    
    console.log('return()方法的作用:');
    console.log('  • 释放资源(如关闭文件、断开连接)');
    console.log('  • 通知生产者停止生成数据');
    console.log('  • 清理迭代器内部状态\n');
  }
  
  static runAll() {
    this.demonstrate();
    this.demonstrateEarlyExit();
  }
}

ForAwaitOfTransformation.runAll();

V8中的实现

V8如何处理异步迭代:

javascript
// V8实现
class V8AsyncIteratorImplementation {
  static demonstrate() {
    console.log('=== V8异步迭代器处理 ===\n');
    
    console.log('for await...of在字节码层面:');
    console.log(`
    // 获取迭代器
    GetIterator <asyncIterable>, ASYNC
    
    // 循环开始
    loop:
      // 调用next()
      Call iterator.next()
      
      // 等待Promise
      Await
      
      // 检查done
      JumpIfTrue result.done, end
      
      // 获取value
      GetProperty result, "value"
      
      // 执行循环体
      ...
      
      // 继续循环
      Jump loop
      
    end:
      // 清理
      CallIfPresent iterator.return()
    `);
  }
  
  static demonstrateAsyncFromSyncIterator() {
    console.log('=== AsyncFromSyncIterator ===\n');
    
    console.log('同步迭代器在for await中的处理:');
    console.log(`
    const syncArray = [1, 2, 3];
    
    // 可以用for await遍历同步可迭代对象
    for await (const item of syncArray) {
      console.log(item);
    }
    `);
    
    console.log('V8内部处理:');
    console.log('  • 检测到同步迭代器');
    console.log('  • 创建AsyncFromSyncIterator包装器');
    console.log('  • 包装器的next()返回Promise');
    console.log('  • 兼容同步和异步迭代\n');
    
    console.log('AsyncFromSyncIterator伪代码:');
    console.log(`
    class AsyncFromSyncIterator {
      constructor(syncIterator) {
        this.syncIterator = syncIterator;
      }
      
      async next() {
        const result = this.syncIterator.next();
        return {
          value: await result.value,
          done: result.done
        };
      }
    }
    `);
  }
  
  static runAll() {
    this.demonstrate();
    this.demonstrateAsyncFromSyncIterator();
  }
}

V8AsyncIteratorImplementation.runAll();

异步生成器

创建异步迭代器的简便方式:

javascript
// 异步生成器
class AsyncGenerators {
  static demonstrate() {
    console.log('=== 异步生成器语法 ===\n');
    
    console.log('异步生成器结合了:');
    console.log('  • async函数的await能力');
    console.log('  • 生成器的yield能力');
    console.log('  • 自动实现异步迭代器协议\n');
    
    console.log('语法:');
    console.log(`
    async function* asyncGenerator() {
      yield await asyncOperation1();
      yield await asyncOperation2();
      yield await asyncOperation3();
    }
    `);
  }
  
  static async demonstrateExample() {
    console.log('=== 异步生成器示例 ===\n');
    
    // 定义异步生成器
    async function* fetchPages(urls) {
      for (const url of urls) {
        // 模拟fetch
        await new Promise(r => setTimeout(r, 100));
        yield { url, data: `Data from ${url}` };
      }
    }
    
    const urls = ['/page1', '/page2', '/page3'];
    
    console.log('遍历异步生成器:');
    for await (const page of fetchPages(urls)) {
      console.log('  获取:', page.url);
    }
    console.log('');
  }
  
  static demonstrateYieldAndAwait() {
    console.log('=== yield和await的配合 ===\n');
    
    console.log('三种yield模式:');
    console.log(`
    async function* example() {
      // 1. yield普通值
      yield 1;
      
      // 2. yield Promise(自动await)
      yield Promise.resolve(2);
      
      // 3. await后yield
      const data = await fetchData();
      yield data;
      
      // 4. yield* 委托给另一个异步迭代器
      yield* anotherAsyncIterable;
    }
    `);
  }
  
  static async runAll() {
    this.demonstrate();
    await this.demonstrateExample();
    this.demonstrateYieldAndAwait();
  }
}

AsyncGenerators.runAll();

实际应用场景

异步迭代器的典型用途:

javascript
// 实际应用
class RealWorldApplications {
  static demonstrateStreamProcessing() {
    console.log('=== 流数据处理 ===\n');
    
    console.log('示例:处理大文件');
    console.log(`
    async function* readLargeFile(filePath) {
      const stream = fs.createReadStream(filePath);
      
      for await (const chunk of stream) {
        yield chunk;
      }
    }
    
    async function processFile() {
      for await (const chunk of readLargeFile('large.txt')) {
        await processChunk(chunk);
      }
    }
    `);
    
    console.log('优势:');
    console.log('  • 内存友好,不需要一次加载全部');
    console.log('  • 可以处理无限数据流');
    console.log('  • 背压控制(等待处理完再读取下一块)\n');
  }
  
  static demonstratePagination() {
    console.log('=== 分页数据获取 ===\n');
    
    console.log('示例:API分页');
    console.log(`
    async function* fetchAllPages(baseUrl) {
      let page = 1;
      let hasMore = true;
      
      while (hasMore) {
        const response = await fetch(baseUrl + '?page=' + page);
        const data = await response.json();
        
        yield* data.items;  // 展开本页所有项
        
        hasMore = data.hasMore;
        page++;
      }
    }
    
    // 使用
    for await (const item of fetchAllPages('/api/items')) {
      console.log(item);
      // 所有分页数据像单个列表一样处理
    }
    `);
  }
  
  static demonstrateWebSocket() {
    console.log('=== WebSocket消息流 ===\n');
    
    console.log('示例:WebSocket消息迭代');
    console.log(`
    async function* websocketMessages(url) {
      const ws = new WebSocket(url);
      const messages = [];
      let resolve;
      
      ws.onmessage = (event) => {
        messages.push(event.data);
        if (resolve) {
          resolve();
          resolve = null;
        }
      };
      
      try {
        while (ws.readyState === WebSocket.OPEN) {
          if (messages.length === 0) {
            await new Promise(r => resolve = r);
          }
          yield messages.shift();
        }
      } finally {
        ws.close();
      }
    }
    
    // 使用
    for await (const message of websocketMessages('ws://...')) {
      handleMessage(message);
    }
    `);
  }
  
  static runAll() {
    this.demonstrateStreamProcessing();
    this.demonstratePagination();
    this.demonstrateWebSocket();
  }
}

RealWorldApplications.runAll();

手动实现异步迭代器

深入理解内部机制:

javascript
// 手动实现
class ManualImplementation {
  static async demonstrate() {
    console.log('=== 手动实现异步可迭代对象 ===\n');
    
    // 创建一个异步队列
    class AsyncQueue {
      constructor() {
        this.queue = [];
        this.waiters = [];
        this.closed = false;
      }
      
      // 添加数据
      push(value) {
        if (this.waiters.length > 0) {
          const waiter = this.waiters.shift();
          waiter.resolve({ value, done: false });
        } else {
          this.queue.push(value);
        }
      }
      
      // 关闭队列
      close() {
        this.closed = true;
        for (const waiter of this.waiters) {
          waiter.resolve({ value: undefined, done: true });
        }
        this.waiters = [];
      }
      
      // 实现异步迭代器协议
      [Symbol.asyncIterator]() {
        return this;
      }
      
      async next() {
        if (this.queue.length > 0) {
          return { value: this.queue.shift(), done: false };
        }
        
        if (this.closed) {
          return { value: undefined, done: true };
        }
        
        // 等待新数据
        return new Promise((resolve) => {
          this.waiters.push({ resolve });
        });
      }
    }
    
    // 测试
    const queue = new AsyncQueue();
    
    // 生产者
    setTimeout(() => queue.push(1), 100);
    setTimeout(() => queue.push(2), 200);
    setTimeout(() => queue.push(3), 300);
    setTimeout(() => queue.close(), 400);
    
    // 消费者
    console.log('消费队列数据:');
    for await (const item of queue) {
      console.log('  收到:', item);
    }
    console.log('  队列关闭\n');
  }
  
  static runAll() {
    this.demonstrate();
  }
}

ManualImplementation.runAll();

错误处理

异步迭代中的错误处理:

javascript
// 错误处理
class ErrorHandling {
  static demonstrate() {
    console.log('=== 异步迭代器错误处理 ===\n');
    
    console.log('1. try-catch包裹整个循环:');
    console.log(`
    try {
      for await (const item of asyncIterable) {
        process(item);
      }
    } catch (error) {
      // 捕获迭代器或处理过程中的错误
      console.error('迭代错误:', error);
    }
    `);
    
    console.log('2. 循环内部处理:');
    console.log(`
    for await (const item of asyncIterable) {
      try {
        await process(item);
      } catch (error) {
        // 只捕获处理错误,继续迭代
        console.error('处理错误:', error);
      }
    }
    `);
  }
  
  static demonstrateThrowMethod() {
    console.log('=== throw方法 ===\n');
    
    console.log('异步迭代器可以实现throw方法:');
    console.log(`
    const iterator = {
      [Symbol.asyncIterator]() {
        return this;
      },
      
      async next() {
        return { value: 1, done: false };
      },
      
      async throw(error) {
        // 处理外部抛入的错误
        console.log('收到错误:', error);
        return { value: undefined, done: true };
      }
    };
    `);
    
    console.log('用途:');
    console.log('  • 通知迭代器发生错误');
    console.log('  • 允许迭代器进行清理');
    console.log('  • 优雅地终止迭代\n');
  }
  
  static runAll() {
    this.demonstrate();
    this.demonstrateThrowMethod();
  }
}

ErrorHandling.runAll();

性能考量

异步迭代的性能特点:

javascript
// 性能考量
class PerformanceConsiderations {
  static demonstrate() {
    console.log('=== 异步迭代性能 ===\n');
    
    console.log('开销分析:');
    console.log('  • 每次迭代至少一个微任务');
    console.log('  • 异步迭代器对象创建开销');
    console.log('  • Promise包装和解析开销\n');
    
    console.log('与同步迭代对比:');
    console.log(`
    // 同步迭代:非常快
    for (const item of syncArray) {
      process(item);
    }
    
    // 异步迭代:每项一个微任务
    for await (const item of asyncArray) {
      await process(item);
    }
    `);
  }
  
  static demonstrateBatching() {
    console.log('=== 批量处理优化 ===\n');
    
    console.log('问题:逐项处理太慢');
    console.log(`
    // 每个项目一次网络请求
    for await (const item of items) {
      await sendToServer(item);  // 串行,慢
    }
    `);
    
    console.log('优化:批量处理');
    console.log(`
    async function* batchItems(items, batchSize = 10) {
      let batch = [];
      
      for await (const item of items) {
        batch.push(item);
        
        if (batch.length >= batchSize) {
          yield batch;
          batch = [];
        }
      }
      
      if (batch.length > 0) {
        yield batch;
      }
    }
    
    // 使用批量迭代
    for await (const batch of batchItems(items)) {
      await sendBatchToServer(batch);  // 批量发送,快
    }
    `);
  }
  
  static demonstrateParallelProcessing() {
    console.log('=== 并行处理 ===\n');
    
    console.log('有限并行度:');
    console.log(`
    async function processWithConcurrency(asyncIterable, processor, concurrency = 3) {
      const executing = new Set();
      
      for await (const item of asyncIterable) {
        const promise = processor(item).then(() => {
          executing.delete(promise);
        });
        executing.add(promise);
        
        if (executing.size >= concurrency) {
          await Promise.race(executing);
        }
      }
      
      await Promise.all(executing);
    }
    `);
  }
  
  static runAll() {
    this.demonstrate();
    this.demonstrateBatching();
    this.demonstrateParallelProcessing();
  }
}

PerformanceConsiderations.runAll();

与其他异步模式的结合

异步迭代器与其他模式配合:

javascript
// 与其他模式结合
class CombiningPatterns {
  static demonstrate() {
    console.log('=== 异步迭代器与RxJS ===\n');
    
    console.log('Observable转异步迭代器:');
    console.log(`
    async function* fromObservable(observable) {
      const queue = [];
      let resolve;
      let done = false;
      
      observable.subscribe({
        next(value) {
          queue.push(value);
          if (resolve) {
            resolve();
            resolve = null;
          }
        },
        complete() {
          done = true;
          if (resolve) resolve();
        }
      });
      
      while (!done || queue.length > 0) {
        if (queue.length === 0) {
          await new Promise(r => resolve = r);
        }
        if (queue.length > 0) {
          yield queue.shift();
        }
      }
    }
    `);
  }
  
  static demonstrateWithAbortController() {
    console.log('=== 与AbortController配合 ===\n');
    
    console.log('可取消的异步迭代:');
    console.log(`
    async function* cancellableIterable(signal) {
      let count = 0;
      
      while (!signal.aborted) {
        await new Promise(r => setTimeout(r, 100));
        yield count++;
      }
    }
    
    // 使用
    const controller = new AbortController();
    
    setTimeout(() => controller.abort(), 500);
    
    for await (const num of cancellableIterable(controller.signal)) {
      console.log(num);
    }
    `);
  }
  
  static runAll() {
    this.demonstrate();
    this.demonstrateWithAbortController();
  }
}

CombiningPatterns.runAll();

本章小结

本章探讨了异步迭代器的实现机制和应用场景。核心要点包括:

  1. 异步迭代器协议:next()方法返回Promise<{value, done}>,通过Symbol.asyncIterator标识可异步迭代对象。

  2. for await...of转换:V8将其转换为while循环,每次await iterator.next()的结果,支持提前退出和资源清理。

  3. 异步生成器:使用async function*语法,结合await和yield,简化异步迭代器的创建。

  4. 实际应用:适用于流数据处理、分页数据获取、WebSocket消息流等场景。

  5. 错误处理:支持try-catch捕获,迭代器可实现throw方法处理外部错误。

  6. 性能优化:注意每次迭代的微任务开销,考虑批量处理和并行度控制。

  7. 兼容性:for await...of可以遍历同步可迭代对象,V8会自动包装。

异步迭代器为处理异步数据序列提供了优雅的解决方案。下一章我们将探讨Node.js中事件循环的特殊实现。

异步迭代器:for await...of 的实现 has loaded