C# 8 で導入された IAsyncEnumerable<T> は、非同期にデータを1件ずつストリーミングするための標準インターフェースです。大量データを Task<List<T>> で一括取得するとメモリを圧迫しますが、IAsyncEnumerable<T> なら1件ずつ到着した順に処理でき、DB ページネーション・API ストリーミング・SSE(Server-Sent Events)などで活躍します。
本記事では IAsyncEnumerable の内部仕組み・System.Linq.Async(Ix.NET)の全オペレーター・[EnumeratorCancellation] によるキャンセル伝搬・WhereAwait / SelectAwait の使い分け・EF Core との統合・ページネーション / SSE / バックプレッシャーの実践パターン・落とし穴まで体系的に解説します。
- IAsyncEnumerable<T> の基本 — 非同期イテレータ
- System.Linq.Async — Ix.NET の LINQ オペレーター
- System.Linq.Async の主要オペレーター
- WhereAwait / SelectAwait — 非同期セレクタ
- EF Core との統合
- 実践パターン① — API ページネーションのストリーミング化
- 実践パターン② — サーバー送信イベント(SSE)の読み取り
- 実践パターン③ — Chunk でバッチ化してバッチ処理
- 実践パターン④ — バックプレッシャーと並列処理
- ConfigureAwait(false) と IAsyncEnumerable
- 同期コレクションとの相互変換
- よくある落とし穴
- よくある質問
- まとめ
IAsyncEnumerable<T> の基本 — 非同期イテレータ
using System.Runtime.CompilerServices;
// 非同期イテレータメソッド: async + yield return + IAsyncEnumerable<T>
// CancellationToken は [EnumeratorCancellation] で受け取る
public static async IAsyncEnumerable<int> ProduceAsync(
int count,
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 1; i <= count; i++)
{
await Task.Delay(100, ct); // 疑似的な I/O
yield return i; // 1件ずつ戻す
}
}
// 消費側: await foreach で1件ずつ受け取る
await foreach (var n in ProduceAsync(5))
Console.WriteLine(n);
// キャンセルトークン付き
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
try
{
await foreach (var n in ProduceAsync(100).WithCancellation(cts.Token))
Console.WriteLine(n);
}
catch (OperationCanceledException) { Console.WriteLine("中止"); }
IAsyncEnumerable を返すメソッドで CancellationToken を受け取る場合、パラメーターに [EnumeratorCancellation] 属性を付けないと呼び出し側の .WithCancellation(token) が伝わりません。この属性を付けると、await foreach (var x in source.WithCancellation(ct)) で渡したトークンがメソッド内の ct パラメーターに自動連結されます。付け忘れると処理がキャンセル不能になるので、常に付ける習慣をつけてください。System.Linq.Async — Ix.NET の LINQ オペレーター
C# 8 標準の IAsyncEnumerable は await foreach で列挙できるだけで、Where・Select・ToListAsync 等の LINQ メソッドは含まれていません。System.Linq.Async パッケージ(dotnet/reactive の Ix.NET プロジェクト)を追加するとLINQ オペレーター相当のメソッドが使えるようになります。
// dotnet add package System.Linq.Async
using System.Linq; // IEnumerable に対する同期 LINQ
// System.Linq.Async は using 不要(拡張メソッドが自動で見える)
// フィルタ + 射影 + 収集 を非同期にパイプライン化
List<int> evens = await ProduceAsync(100)
.Where(n => n % 2 == 0) // IAsyncEnumerable → IAsyncEnumerable
.Select(n => n * 10)
.Take(5)
.ToListAsync();
Console.WriteLine(string.Join(",", evens)); // 20,40,60,80,100
// LINQ-to-Entities の ToListAsync と名前が同じなので注意:
// - EF Core: using Microsoft.EntityFrameworkCore; が必要
// - System.Linq.Async: パッケージ追加だけで OK
// 両方あるプロジェクトではオーバーロード解決で衝突することがある
System.Linq.Async の主要オペレーター
| カテゴリ | メソッド例 |
|---|---|
| フィルタリング | Where, WhereAwait, Skip, SkipWhile, Take, TakeWhile, Distinct |
| 射影 | Select, SelectAwait, SelectMany, SelectManyAwait |
| 並び替え | OrderBy, OrderByDescending, ThenBy, Reverse |
| グルーピング | GroupBy, ToLookupAsync |
| 結合 | Join, GroupJoin, Zip, Concat |
| 集約 | CountAsync, SumAsync, AverageAsync, MinAsync, MaxAsync, AggregateAsync |
| 要素取得 | FirstAsync, FirstOrDefaultAsync, SingleAsync, ElementAtAsync, LastAsync |
| 条件判定 | AnyAsync, AllAsync, ContainsAsync |
| 収集 | ToListAsync, ToArrayAsync, ToDictionaryAsync, ToHashSetAsync |
| その他 | Chunk, WithCancellation, ToAsyncEnumerable |
// フィルタ + 変換 + 収集
List<string> names = await users
.Where(u => u.IsActive)
.Select(u => u.Name)
.ToListAsync();
// 集約
int count = await users.CountAsync(u => u.IsAdmin);
decimal total = await orders.SumAsync(o => o.Amount);
bool anyActive = await users.AnyAsync(u => u.IsActive);
// 要素取得
User? admin = await users.FirstOrDefaultAsync(u => u.IsAdmin);
User single = await users.SingleAsync(u => u.Id == 42);
// 並び替え → ページング
var page = await users
.OrderBy(u => u.Id)
.Skip(20)
.Take(10)
.ToListAsync();
// グルーピング
var groups = users
.GroupBy(u => u.Department);
await foreach (var g in groups)
{
Console.WriteLine($"{g.Key}: {await g.CountAsync()}");
}
WhereAwait / SelectAwait — 非同期セレクタ
// 通常の Where / Select は同期ラムダしか受け付けない
// 各要素に対して非同期処理をしたい場合は WhereAwait / SelectAwait
// ① SelectAwait: 各要素を非同期変換
// 例: 各 userId からユーザー詳細を非同期取得
IAsyncEnumerable<UserDetail> details = userIds
.SelectAwait(async id => await _userService.FetchAsync(id));
// ② WhereAwait: 各要素を非同期フィルタ
// 例: 各 URL で HEAD リクエストして生存確認
IAsyncEnumerable<string> aliveUrls = urls
.WhereAwait(async url => await IsAliveAsync(url));
// ③ キャンセル対応版 (WithCancellation)
IAsyncEnumerable<UserDetail> detailsCancellable = userIds
.SelectAwaitWithCancellation(async (id, ct) =>
await _userService.FetchAsync(id, ct));
// 組み合わせて非同期パイプラインを構築
var activeUserNames = await userIds
.SelectAwaitWithCancellation((id, ct) => _userService.FetchAsync(id, ct))
.WhereAwait(async u => await _authService.IsActiveAsync(u))
.Select(u => u.Name)
.ToListAsync(cancellationToken: cts.Token);
SelectAwait の非同期ラムダは1要素ずつ順番に awaitされます。並列で実行したい場合は Parallel.ForEachAsync やTask.WhenAll(ids.Select(...)) を別途組み合わせる必要があります。「ストリームを逐次処理したい」なら SelectAwait、「ストリームの全要素を並列処理したい」なら Parallel.ForEachAsync、という使い分けが重要です。EF Core との統合
using Microsoft.EntityFrameworkCore;
// EF Core は独自の ToListAsync / FirstOrDefaultAsync / AnyAsync 等を提供
// これらは IQueryable 上で動作し、DB に送信される SQL を非同期に発行する
var activeUsers = await dbContext.Users
.Where(u => u.IsActive)
.OrderBy(u => u.Name)
.ToListAsync();
User? admin = await dbContext.Users
.FirstOrDefaultAsync(u => u.IsAdmin);
int count = await dbContext.Users.CountAsync();
// IAsyncEnumerable としてストリーミング取得
// 大量データを全部メモリに載せずに1件ずつ処理
await foreach (var user in dbContext.Users.AsAsyncEnumerable())
{
await ProcessUserAsync(user); // 1件ずつ処理
// メモリフットプリント極小
}
// System.Linq.Async との関係:
// EF Core の Extension(Microsoft.EntityFrameworkCore)は IQueryable を対象
// System.Linq.Async は IAsyncEnumerable を対象
// AsAsyncEnumerable() で変換後は System.Linq.Async のオペレーターが使える
プロジェクトで
Microsoft.EntityFrameworkCore と System.Linq.Async の両方を参照していると、ToListAsync の呼び出しがどちらの拡張メソッドか曖昧になることがあります。特に IQueryable<T> に対して呼ぶのか、IAsyncEnumerable<T> に対して呼ぶのかで意味が変わります。AsAsyncEnumerable() で明示的に変換してから呼ぶか、完全修飾名を使うと曖昧さを避けられます。実践パターン① — API ページネーションのストリーミング化
// REST API は通常「1ページ100件 × ページ番号」で分割して取得する
// これを1つの IAsyncEnumerable にまとめると、利用側はページを意識しなくてよい
public static async IAsyncEnumerable<User> GetAllUsersAsync(
HttpClient http,
[EnumeratorCancellation] CancellationToken ct = default)
{
int page = 1;
while (true)
{
var response = await http.GetFromJsonAsync<UserPage>(
$"/api/users?page={page}&size=100", ct);
if (response is null || response.Items.Count == 0) yield break;
foreach (var user in response.Items)
yield return user;
if (!response.HasMore) yield break;
page++;
}
}
// 使用側は「全ユーザー」として使える(裏側でページが進んでいく)
await foreach (var user in GetAllUsersAsync(http, ct))
{
await ProcessUserAsync(user);
}
// Take で先頭 N 件だけ処理も可能(余分な API 呼び出しを自動停止)
var first50 = await GetAllUsersAsync(http, ct)
.Take(50)
.ToListAsync();
// → 50 件超過時点で自動で列挙を止めるので API も呼ばれない(遅延実行の利点)
実践パターン② — サーバー送信イベント(SSE)の読み取り
public static async IAsyncEnumerable<string> ReadSseAsync(
HttpClient http,
string url,
[EnumeratorCancellation] CancellationToken ct = default)
{
using var response = await http.GetAsync(url,
HttpCompletionOption.ResponseHeadersRead, ct);
response.EnsureSuccessStatusCode();
using var stream = await response.Content.ReadAsStreamAsync(ct);
using var reader = new StreamReader(stream);
while (!reader.EndOfStream && !ct.IsCancellationRequested)
{
// .NET 7+ は ReadLineAsync(ct)。それ以前は ReadLineAsync() + 外側で ct チェック
string? line = await reader.ReadLineAsync(ct);
if (line is null) yield break;
if (!line.StartsWith("data: ")) continue;
yield return line["data: ".Length..];
}
}
// 使用側: 届いたイベントを即座に処理
await foreach (var payload in ReadSseAsync(http, "/api/events", ct))
{
Console.WriteLine($"受信: {payload}");
await HandleEventAsync(payload);
}
実践パターン③ — Chunk でバッチ化してバッチ処理
// Chunk: IAsyncEnumerable を N 要素ごとの配列に分割(System.Linq.Async)
await foreach (User[] batch in GetAllUsersAsync(http, ct).Chunk(100))
{
// 100 件ずつ DB に一括挿入
await dbContext.Users.AddRangeAsync(batch, ct);
await dbContext.SaveChangesAsync(ct);
}
// 同期 LINQ(IEnumerable<T>)にも .NET 6+ から Chunk が入っている
foreach (var batch in someList.Chunk(100))
Process(batch);
// 時間ベースのバッチ化(5秒ごと or 100件たまったらバッチ送信)は
// System.Reactive(Rx)の Buffer(TimeSpan, count) 相当。
// IAsyncEnumerable では ToObservable() で Rx に橋渡しするか、
// 自前で Channel + Timer を組み合わせて実装するのが一般的
実践パターン④ — バックプレッシャーと並列処理
// IAsyncEnumerable は本質的に「逐次」ストリーム
// 各要素を「並列」で処理したい場合は Parallel.ForEachAsync を使う
await Parallel.ForEachAsync(
source: GetAllUsersAsync(http, ct),
parallelOptions: new ParallelOptions
{
MaxDegreeOfParallelism = 10, // 10 並列
CancellationToken = ct,
},
body: async (user, token) =>
{
await ProcessUserAsync(user, token); // 並列に処理
});
// Channel 経由での「生産者 × 複数消費者」パターン
using System.Threading.Channels;
var channel = Channel.CreateBounded<User>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait, // 満杯なら生産者を待たせる
});
// 生産側
var producer = Task.Run(async () =>
{
await foreach (var user in GetAllUsersAsync(http, ct))
await channel.Writer.WriteAsync(user, ct);
channel.Writer.Complete();
}, ct);
// 消費側(複数並列)
var consumers = Enumerable.Range(0, 4).Select(_ => Task.Run(async () =>
{
await foreach (var user in channel.Reader.ReadAllAsync(ct))
await ProcessUserAsync(user, ct);
}, ct)).ToArray();
await Task.WhenAll(new[] { producer }.Concat(consumers));
ConfigureAwait(false) と IAsyncEnumerable
// IAsyncEnumerable にも ConfigureAwait(false) が適用できる
// ライブラリコードでは SynchronizationContext キャプチャを避けるため必須
await foreach (var item in GetItemsAsync()
.ConfigureAwait(false)
.WithCancellation(ct))
{
Process(item);
}
// IAsyncEnumerable の拡張メソッドで ConfigureAwait と WithCancellation を組み合わせる
// 順序はどちらでも同じ結果
// 非同期イテレータの実装側でも ConfigureAwait を使える
public static async IAsyncEnumerable<int> StreamAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < 100; i++)
{
await Task.Delay(10, ct).ConfigureAwait(false);
yield return i;
}
}
同期コレクションとの相互変換
// IEnumerable<T> → IAsyncEnumerable<T>
IEnumerable<int> syncData = Enumerable.Range(1, 100);
IAsyncEnumerable<int> asyncData = syncData.ToAsyncEnumerable();
// IAsyncEnumerable<T> → List<T>(終端で await)
List<int> result = await asyncData.ToListAsync();
// IAsyncEnumerable<T> → IEnumerable<T>(終端で列挙)
// ToListAsync().Result は NG(デッドロックの危険)
// 代わりに await foreach で取得するか、ToListAsync の戻りを使う
// よくある誤用: Task<IEnumerable<T>> と IAsyncEnumerable<T> の混同
Task<List<int>> taskResult1 = asyncData.ToListAsync().AsTask(); // Task に包む
// Task<IAsyncEnumerable<T>> は 「Task 解決後に IAsyncEnumerable が返る」という意味
// 普通は IAsyncEnumerable<T> を直接返す(値は await できる)
async Task<IAsyncEnumerable<int>> GetStreamBadAsync()
{
await Task.Delay(100);
return StreamAsync(); // Task を返してから IAsyncEnumerable → 2段階 await
}
// → 使用側が「await 後に await foreach」と2段階必要で使いづらい
// 推奨: IAsyncEnumerable をそのまま返す
async IAsyncEnumerable<int> GetStreamGoodAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
await Task.Delay(100, ct);
await foreach (var x in StreamAsync().WithCancellation(ct))
yield return x;
}
よくある落とし穴
// NG: [EnumeratorCancellation] なし
public static async IAsyncEnumerable<int> BadAsync(CancellationToken ct)
{
for (int i = 0; i < 1000; i++)
{
await Task.Delay(100, ct);
yield return i;
}
}
// 呼び出し側で WithCancellation(token) を渡しても
// ↑ のメソッドの ct パラメーターには届かない(新しい token が作られる)
// OK: [EnumeratorCancellation] を付ける
public static async IAsyncEnumerable<int> GoodAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
for (int i = 0; i < 1000; i++)
{
await Task.Delay(100, ct);
yield return i;
}
}
// 使用側
await foreach (var n in GoodAsync().WithCancellation(cts.Token))
Console.WriteLine(n);
// → cts.Cancel() で GoodAsync 内の ct が中止される
IAsyncEnumerable<int> source = GetExpensiveStreamAsync();
// 1回目の列挙
int count = await source.CountAsync();
// 2回目の列挙: 同じストリームを再度走査 → 処理が2回実行される!
List<int> list = await source.ToListAsync();
// 対策: 先に実体化してから複数回使う
List<int> cached = await GetExpensiveStreamAsync().ToListAsync();
int count2 = cached.Count;
var first = cached.First();
// 注: ただしメモリ使用量が増えるので、大量データでは実体化せずに
// 1パスで必要な情報を全部集めるパターンを優先する
var (count3, sum, max) = await GetExpensiveStreamAsync().AggregateAsync(
seed: (Count: 0, Sum: 0, Max: int.MinValue),
accumulator: (acc, x) => (acc.Count + 1, acc.Sum + x, Math.Max(acc.Max, x)));
// NG: 同期的に待つと UI スレッドや ASP.NET Classic でデッドロック
List<int> result = stream.ToListAsync().Result;
stream.ToListAsync().Wait();
// OK: 常に await する
List<int> result2 = await stream.ToListAsync();
// 同期コンテキストから逃げたい場合(やむを得ないとき)
List<int> result3 = Task.Run(async () => await stream.ToListAsync())
.GetAwaiter().GetResult();
// → Task.Run でスレッドプールに逃がすとデッドロックしない
// が、原則 async all the way が正解
// System.Linq.Async → IAsyncEnumerable 用(Ix.NET プロジェクト)
// System.Reactive → IObservable 用(Rx.NET プロジェクト、同じ dotnet/reactive 下)
// IAsyncEnumerable: プル型(消費側が要求すると生産される)
await foreach (var x in streamA) // 消費側が await foreach するたびに進む
Process(x);
// IObservable: プッシュ型(生産側が好きなタイミングで流し込む)
streamB.Subscribe(x => Process(x)); // 購読側は待たずに受け取る
// 使い分け:
// - バックプレッシャー / キャンセル / 1つずつ処理 → IAsyncEnumerable
// - UIイベント / リアルタイム配信 / 変換パイプライン → IObservable
// - 両者の相互変換は ToObservable() / ToAsyncEnumerable() で可能
よくある質問
Task<IEnumerable<T>> は「非同期に全件準備完了してから列挙」で、IAsyncEnumerable<T> は「1件ずつ非同期に到着」します。100万件のデータを扱う場合、前者は全件メモリに載せてから処理するのに対し、後者は1件ごとに処理できるためメモリフットプリントが極小になります。DB のページネーション・大量ファイル処理・リアルタイムストリームでは IAsyncEnumerable が適切です。ToListAsync は IQueryable<T> に対するもので、SQL を組み立てて DB に非同期クエリを発行します。System.Linq.Async の ToListAsync は IAsyncEnumerable<T> に対するもので、既存の非同期ストリームを List に集約します。同じ名前で型が違うため、プロジェクトで両方を参照しているとオーバーロード解決が曖昧になることがあります。EF Core の結果を AsAsyncEnumerable() で変換してから System.Linq.Async を使うと責務が分離しやすくなります。SelectAwait は1要素を await してから次の要素に進みます。並列化したい場合は Parallel.ForEachAsync(並列消費)や Task.WhenAll(ids.Select(FetchAsync))(全件並列)を使ってください。IAsyncEnumerable はもともと「非同期にストリーミング」する型で、「非同期ラムダを並列で実行する」目的ではありません。CancellationToken が発火すると、現在待機中の await(await Task.Delay 等)が OperationCanceledException を投げ、await foreach の外に例外が伝搬します。ただし [EnumeratorCancellation] 属性がないと、呼び出し側の .WithCancellation(token) がメソッド内に伝わりません。公開する非同期イテレータでは必ず [EnumeratorCancellation] CancellationToken ct = default を引数に追加してください。AsAsyncEnumerable() が第一選択です。ToListAsync() だと全件メモリに載せてしまうため、100 万件などの大量データではメモリ不足を起こします。AsAsyncEnumerable() + await foreach で1件ずつ処理すれば、メモリフットプリントは定数になります。SQL Server の AsNoTracking() と組み合わせると変更追跡コストも削減できます。まとめ
| 項目 | ポイント |
|---|---|
| 基本 | async IAsyncEnumerable<T> + yield return + await |
| キャンセル | [EnumeratorCancellation] 属性必須 |
| ライブラリ | System.Linq.Async(dotnet/reactive の Ix.NET) |
| フィルタ | Where(同期)/ WhereAwait(非同期ラムダ) |
| 射影 | Select / SelectAwait / SelectAwaitWithCancellation |
| 収集 | ToListAsync / ToArrayAsync / ToDictionaryAsync |
| 集約 | CountAsync / SumAsync / FirstOrDefaultAsync |
| EF Core | AsAsyncEnumerable() で大量データをストリーミング |
| ページネーション | yield return で「透過的な全件列挙」に |
| 並列処理 | IAsyncEnumerable は逐次。Parallel.ForEachAsync で並列 |
| バックプレッシャー | Channel<T> + BoundedChannelFullMode.Wait |
| Rx との違い | プル型(IAsyncEnumerable)vs プッシュ型(IObservable) |
| 落とし穴 | 2回列挙で処理2回・.Result でデッドロック・ConfigureAwait 忘れ |
関連する非同期処理の詳細は以下を参照してください。async/await 完全ガイドでステートマシン・ConfigureAwait・ValueTask、Task<T> 完全ガイドで Task との違い、非同期と並列の違い完全ガイドで Parallel.ForEachAsync、CancellationToken 完全ガイドでキャンセル伝搬、LINQ 完全ガイドで同期 LINQ の基本を解説しています。

