【C#】非同期LINQ完全ガイド|IAsyncEnumerable・System.Linq.Async・EnumeratorCancellation・EF Core連携まで

【C#】非同期LINQ(System.Linq.Async)の活用方法 C#

C# 8 で導入された IAsyncEnumerable<T> は、非同期にデータを1件ずつストリーミングするための標準インターフェースです。大量データを Task<List<T>> で一括取得するとメモリを圧迫しますが、IAsyncEnumerable<T> なら1件ずつ到着した順に処理でき、DB ページネーション・API ストリーミング・SSE(Server-Sent Events)などで活躍します。

本記事では IAsyncEnumerable の内部仕組み・System.Linq.Async(Ix.NET)の全オペレーター・[EnumeratorCancellation] によるキャンセル伝搬・WhereAwait / SelectAwait の使い分け・EF Core との統合・ページネーション / SSE / バックプレッシャーの実践パターン・落とし穴まで体系的に解説します。

スポンサーリンク

IAsyncEnumerable<T> の基本 — 非同期イテレータ

yield return と await の組み合わせ
using System.Runtime.CompilerServices;

// 非同期イテレータメソッド: async + yield return + IAsyncEnumerable<T>
// CancellationToken は [EnumeratorCancellation] で受け取る
public static async IAsyncEnumerable<int> ProduceAsync(
    int count,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    for (int i = 1; i <= count; i++)
    {
        await Task.Delay(100, ct);    // 疑似的な I/O
        yield return i;               // 1件ずつ戻す
    }
}

// 消費側: await foreach で1件ずつ受け取る
await foreach (var n in ProduceAsync(5))
    Console.WriteLine(n);

// キャンセルトークン付き
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(1));
try
{
    await foreach (var n in ProduceAsync(100).WithCancellation(cts.Token))
        Console.WriteLine(n);
}
catch (OperationCanceledException) { Console.WriteLine("中止"); }
[EnumeratorCancellation] は必須
IAsyncEnumerable を返すメソッドで CancellationToken を受け取る場合、パラメーターに [EnumeratorCancellation] 属性を付けないと呼び出し側の .WithCancellation(token) が伝わりません。この属性を付けると、await foreach (var x in source.WithCancellation(ct)) で渡したトークンがメソッド内の ct パラメーターに自動連結されます。付け忘れると処理がキャンセル不能になるので、常に付ける習慣をつけてください。

System.Linq.Async — Ix.NET の LINQ オペレーター

C# 8 標準の IAsyncEnumerableawait foreach で列挙できるだけで、WhereSelectToListAsync 等の LINQ メソッドは含まれていません。System.Linq.Async パッケージ(dotnet/reactive の Ix.NET プロジェクト)を追加するとLINQ オペレーター相当のメソッドが使えるようになります。

パッケージのインストールと使用
// dotnet add package System.Linq.Async

using System.Linq;   // IEnumerable に対する同期 LINQ
// System.Linq.Async は using 不要(拡張メソッドが自動で見える)

// フィルタ + 射影 + 収集 を非同期にパイプライン化
List<int> evens = await ProduceAsync(100)
    .Where(n => n % 2 == 0)            // IAsyncEnumerable → IAsyncEnumerable
    .Select(n => n * 10)
    .Take(5)
    .ToListAsync();

Console.WriteLine(string.Join(",", evens));  // 20,40,60,80,100

// LINQ-to-Entities の ToListAsync と名前が同じなので注意:
// - EF Core: using Microsoft.EntityFrameworkCore; が必要
// - System.Linq.Async: パッケージ追加だけで OK
// 両方あるプロジェクトではオーバーロード解決で衝突することがある

System.Linq.Async の主要オペレーター

カテゴリ メソッド例
フィルタリング Where, WhereAwait, Skip, SkipWhile, Take, TakeWhile, Distinct
射影 Select, SelectAwait, SelectMany, SelectManyAwait
並び替え OrderBy, OrderByDescending, ThenBy, Reverse
グルーピング GroupBy, ToLookupAsync
結合 Join, GroupJoin, Zip, Concat
集約 CountAsync, SumAsync, AverageAsync, MinAsync, MaxAsync, AggregateAsync
要素取得 FirstAsync, FirstOrDefaultAsync, SingleAsync, ElementAtAsync, LastAsync
条件判定 AnyAsync, AllAsync, ContainsAsync
収集 ToListAsync, ToArrayAsync, ToDictionaryAsync, ToHashSetAsync
その他 Chunk, WithCancellation, ToAsyncEnumerable
各オペレーターの使用例
// フィルタ + 変換 + 収集
List<string> names = await users
    .Where(u => u.IsActive)
    .Select(u => u.Name)
    .ToListAsync();

// 集約
int count = await users.CountAsync(u => u.IsAdmin);
decimal total = await orders.SumAsync(o => o.Amount);
bool anyActive = await users.AnyAsync(u => u.IsActive);

// 要素取得
User? admin = await users.FirstOrDefaultAsync(u => u.IsAdmin);
User single = await users.SingleAsync(u => u.Id == 42);

// 並び替え → ページング
var page = await users
    .OrderBy(u => u.Id)
    .Skip(20)
    .Take(10)
    .ToListAsync();

// グルーピング
var groups = users
    .GroupBy(u => u.Department);
await foreach (var g in groups)
{
    Console.WriteLine($"{g.Key}: {await g.CountAsync()}");
}

WhereAwait / SelectAwait — 非同期セレクタ

ラムダ内で await したいとき
// 通常の Where / Select は同期ラムダしか受け付けない
// 各要素に対して非同期処理をしたい場合は WhereAwait / SelectAwait

// ① SelectAwait: 各要素を非同期変換
// 例: 各 userId からユーザー詳細を非同期取得
IAsyncEnumerable<UserDetail> details = userIds
    .SelectAwait(async id => await _userService.FetchAsync(id));

// ② WhereAwait: 各要素を非同期フィルタ
// 例: 各 URL で HEAD リクエストして生存確認
IAsyncEnumerable<string> aliveUrls = urls
    .WhereAwait(async url => await IsAliveAsync(url));

// ③ キャンセル対応版 (WithCancellation)
IAsyncEnumerable<UserDetail> detailsCancellable = userIds
    .SelectAwaitWithCancellation(async (id, ct) =>
        await _userService.FetchAsync(id, ct));

// 組み合わせて非同期パイプラインを構築
var activeUserNames = await userIds
    .SelectAwaitWithCancellation((id, ct) => _userService.FetchAsync(id, ct))
    .WhereAwait(async u => await _authService.IsActiveAsync(u))
    .Select(u => u.Name)
    .ToListAsync(cancellationToken: cts.Token);
WhereAwait / SelectAwait は逐次実行される
SelectAwait の非同期ラムダは1要素ずつ順番に awaitされます。並列で実行したい場合は Parallel.ForEachAsyncTask.WhenAll(ids.Select(...)) を別途組み合わせる必要があります。「ストリームを逐次処理したい」なら SelectAwait、「ストリームの全要素を並列処理したい」なら Parallel.ForEachAsync、という使い分けが重要です。

EF Core との統合

IQueryable の非同期 LINQ(EF Core 固有)
using Microsoft.EntityFrameworkCore;

// EF Core は独自の ToListAsync / FirstOrDefaultAsync / AnyAsync 等を提供
// これらは IQueryable 上で動作し、DB に送信される SQL を非同期に発行する
var activeUsers = await dbContext.Users
    .Where(u => u.IsActive)
    .OrderBy(u => u.Name)
    .ToListAsync();

User? admin = await dbContext.Users
    .FirstOrDefaultAsync(u => u.IsAdmin);

int count = await dbContext.Users.CountAsync();

// IAsyncEnumerable としてストリーミング取得
// 大量データを全部メモリに載せずに1件ずつ処理
await foreach (var user in dbContext.Users.AsAsyncEnumerable())
{
    await ProcessUserAsync(user);   // 1件ずつ処理
    // メモリフットプリント極小
}

// System.Linq.Async との関係:
// EF Core の Extension(Microsoft.EntityFrameworkCore)は IQueryable を対象
// System.Linq.Async は IAsyncEnumerable を対象
// AsAsyncEnumerable() で変換後は System.Linq.Async のオペレーターが使える
EF Core と System.Linq.Async の ToListAsync は別物
プロジェクトで Microsoft.EntityFrameworkCoreSystem.Linq.Async の両方を参照していると、ToListAsync の呼び出しがどちらの拡張メソッドか曖昧になることがあります。特に IQueryable<T> に対して呼ぶのか、IAsyncEnumerable<T> に対して呼ぶのかで意味が変わります。AsAsyncEnumerable() で明示的に変換してから呼ぶか、完全修飾名を使うと曖昧さを避けられます。

実践パターン① — API ページネーションのストリーミング化

全ページを透過的に IAsyncEnumerable で
// REST API は通常「1ページ100件 × ページ番号」で分割して取得する
// これを1つの IAsyncEnumerable にまとめると、利用側はページを意識しなくてよい

public static async IAsyncEnumerable<User> GetAllUsersAsync(
    HttpClient http,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    int page = 1;
    while (true)
    {
        var response = await http.GetFromJsonAsync<UserPage>(
            $"/api/users?page={page}&size=100", ct);

        if (response is null || response.Items.Count == 0) yield break;

        foreach (var user in response.Items)
            yield return user;

        if (!response.HasMore) yield break;
        page++;
    }
}

// 使用側は「全ユーザー」として使える(裏側でページが進んでいく)
await foreach (var user in GetAllUsersAsync(http, ct))
{
    await ProcessUserAsync(user);
}

// Take で先頭 N 件だけ処理も可能(余分な API 呼び出しを自動停止)
var first50 = await GetAllUsersAsync(http, ct)
    .Take(50)
    .ToListAsync();
// → 50 件超過時点で自動で列挙を止めるので API も呼ばれない(遅延実行の利点)

実践パターン② — サーバー送信イベント(SSE)の読み取り

HttpClient でストリーミングを受け取る
public static async IAsyncEnumerable<string> ReadSseAsync(
    HttpClient http,
    string url,
    [EnumeratorCancellation] CancellationToken ct = default)
{
    using var response = await http.GetAsync(url,
        HttpCompletionOption.ResponseHeadersRead, ct);
    response.EnsureSuccessStatusCode();

    using var stream = await response.Content.ReadAsStreamAsync(ct);
    using var reader = new StreamReader(stream);

    while (!reader.EndOfStream && !ct.IsCancellationRequested)
    {
        // .NET 7+ は ReadLineAsync(ct)。それ以前は ReadLineAsync() + 外側で ct チェック
        string? line = await reader.ReadLineAsync(ct);
        if (line is null) yield break;
        if (!line.StartsWith("data: ")) continue;

        yield return line["data: ".Length..];
    }
}

// 使用側: 届いたイベントを即座に処理
await foreach (var payload in ReadSseAsync(http, "/api/events", ct))
{
    Console.WriteLine($"受信: {payload}");
    await HandleEventAsync(payload);
}

実践パターン③ — Chunk でバッチ化してバッチ処理

ストリームを N 件ずつバッチ処理
// Chunk: IAsyncEnumerable を N 要素ごとの配列に分割(System.Linq.Async)
await foreach (User[] batch in GetAllUsersAsync(http, ct).Chunk(100))
{
    // 100 件ずつ DB に一括挿入
    await dbContext.Users.AddRangeAsync(batch, ct);
    await dbContext.SaveChangesAsync(ct);
}

// 同期 LINQ(IEnumerable<T>)にも .NET 6+ から Chunk が入っている
foreach (var batch in someList.Chunk(100))
    Process(batch);

// 時間ベースのバッチ化(5秒ごと or 100件たまったらバッチ送信)は
// System.Reactive(Rx)の Buffer(TimeSpan, count) 相当。
// IAsyncEnumerable では ToObservable() で Rx に橋渡しするか、
// 自前で Channel + Timer を組み合わせて実装するのが一般的

実践パターン④ — バックプレッシャーと並列処理

Parallel.ForEachAsync で並列消費
// IAsyncEnumerable は本質的に「逐次」ストリーム
// 各要素を「並列」で処理したい場合は Parallel.ForEachAsync を使う

await Parallel.ForEachAsync(
    source: GetAllUsersAsync(http, ct),
    parallelOptions: new ParallelOptions
    {
        MaxDegreeOfParallelism = 10,          // 10 並列
        CancellationToken      = ct,
    },
    body: async (user, token) =>
    {
        await ProcessUserAsync(user, token);   // 並列に処理
    });

// Channel 経由での「生産者 × 複数消費者」パターン
using System.Threading.Channels;

var channel = Channel.CreateBounded<User>(new BoundedChannelOptions(1000)
{
    FullMode = BoundedChannelFullMode.Wait,   // 満杯なら生産者を待たせる
});

// 生産側
var producer = Task.Run(async () =>
{
    await foreach (var user in GetAllUsersAsync(http, ct))
        await channel.Writer.WriteAsync(user, ct);
    channel.Writer.Complete();
}, ct);

// 消費側(複数並列)
var consumers = Enumerable.Range(0, 4).Select(_ => Task.Run(async () =>
{
    await foreach (var user in channel.Reader.ReadAllAsync(ct))
        await ProcessUserAsync(user, ct);
}, ct)).ToArray();

await Task.WhenAll(new[] { producer }.Concat(consumers));

ConfigureAwait(false) と IAsyncEnumerable

await foreach に ConfigureAwait を付ける
// IAsyncEnumerable にも ConfigureAwait(false) が適用できる
// ライブラリコードでは SynchronizationContext キャプチャを避けるため必須

await foreach (var item in GetItemsAsync()
    .ConfigureAwait(false)
    .WithCancellation(ct))
{
    Process(item);
}

// IAsyncEnumerable の拡張メソッドで ConfigureAwait と WithCancellation を組み合わせる
// 順序はどちらでも同じ結果

// 非同期イテレータの実装側でも ConfigureAwait を使える
public static async IAsyncEnumerable<int> StreamAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    for (int i = 0; i < 100; i++)
    {
        await Task.Delay(10, ct).ConfigureAwait(false);
        yield return i;
    }
}

同期コレクションとの相互変換

ToAsyncEnumerable と ToListAsync
// IEnumerable<T> → IAsyncEnumerable<T>
IEnumerable<int> syncData = Enumerable.Range(1, 100);
IAsyncEnumerable<int> asyncData = syncData.ToAsyncEnumerable();

// IAsyncEnumerable<T> → List<T>(終端で await)
List<int> result = await asyncData.ToListAsync();

// IAsyncEnumerable<T> → IEnumerable<T>(終端で列挙)
// ToListAsync().Result は NG(デッドロックの危険)
// 代わりに await foreach で取得するか、ToListAsync の戻りを使う

// よくある誤用: Task<IEnumerable<T>> と IAsyncEnumerable<T> の混同
Task<List<int>> taskResult1 = asyncData.ToListAsync().AsTask();  // Task に包む

// Task<IAsyncEnumerable<T>> は 「Task 解決後に IAsyncEnumerable が返る」という意味
// 普通は IAsyncEnumerable<T> を直接返す(値は await できる)
async Task<IAsyncEnumerable<int>> GetStreamBadAsync()
{
    await Task.Delay(100);
    return StreamAsync();  // Task を返してから IAsyncEnumerable → 2段階 await
}
// → 使用側が「await 後に await foreach」と2段階必要で使いづらい

// 推奨: IAsyncEnumerable をそのまま返す
async IAsyncEnumerable<int> GetStreamGoodAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    await Task.Delay(100, ct);
    await foreach (var x in StreamAsync().WithCancellation(ct))
        yield return x;
}

よくある落とし穴

落とし穴① — CancellationToken の伝搬忘れ
// NG: [EnumeratorCancellation] なし
public static async IAsyncEnumerable<int> BadAsync(CancellationToken ct)
{
    for (int i = 0; i < 1000; i++)
    {
        await Task.Delay(100, ct);
        yield return i;
    }
}

// 呼び出し側で WithCancellation(token) を渡しても
// ↑ のメソッドの ct パラメーターには届かない(新しい token が作られる)

// OK: [EnumeratorCancellation] を付ける
public static async IAsyncEnumerable<int> GoodAsync(
    [EnumeratorCancellation] CancellationToken ct = default)
{
    for (int i = 0; i < 1000; i++)
    {
        await Task.Delay(100, ct);
        yield return i;
    }
}

// 使用側
await foreach (var n in GoodAsync().WithCancellation(cts.Token))
    Console.WriteLine(n);
// → cts.Cancel() で GoodAsync 内の ct が中止される
落とし穴② — IAsyncEnumerable を2回列挙すると処理が2回走る
IAsyncEnumerable<int> source = GetExpensiveStreamAsync();

// 1回目の列挙
int count = await source.CountAsync();

// 2回目の列挙: 同じストリームを再度走査 → 処理が2回実行される!
List<int> list = await source.ToListAsync();

// 対策: 先に実体化してから複数回使う
List<int> cached = await GetExpensiveStreamAsync().ToListAsync();
int count2 = cached.Count;
var first = cached.First();

// 注: ただしメモリ使用量が増えるので、大量データでは実体化せずに
//     1パスで必要な情報を全部集めるパターンを優先する
var (count3, sum, max) = await GetExpensiveStreamAsync().AggregateAsync(
    seed: (Count: 0, Sum: 0, Max: int.MinValue),
    accumulator: (acc, x) => (acc.Count + 1, acc.Sum + x, Math.Max(acc.Max, x)));
落とし穴③ — ToListAsync().Result でデッドロック
// NG: 同期的に待つと UI スレッドや ASP.NET Classic でデッドロック
List<int> result = stream.ToListAsync().Result;
stream.ToListAsync().Wait();

// OK: 常に await する
List<int> result2 = await stream.ToListAsync();

// 同期コンテキストから逃げたい場合(やむを得ないとき)
List<int> result3 = Task.Run(async () => await stream.ToListAsync())
    .GetAwaiter().GetResult();
// → Task.Run でスレッドプールに逃がすとデッドロックしない
// が、原則 async all the way が正解
落とし穴④ — Rx(System.Reactive)と混同
// System.Linq.Async    → IAsyncEnumerable 用(Ix.NET プロジェクト)
// System.Reactive      → IObservable 用(Rx.NET プロジェクト、同じ dotnet/reactive 下)

// IAsyncEnumerable: プル型(消費側が要求すると生産される)
await foreach (var x in streamA)  // 消費側が await foreach するたびに進む
    Process(x);

// IObservable: プッシュ型(生産側が好きなタイミングで流し込む)
streamB.Subscribe(x => Process(x));  // 購読側は待たずに受け取る

// 使い分け:
// - バックプレッシャー / キャンセル / 1つずつ処理 → IAsyncEnumerable
// - UIイベント / リアルタイム配信 / 変換パイプライン → IObservable
// - 両者の相互変換は ToObservable() / ToAsyncEnumerable() で可能

よくある質問

QIAsyncEnumerable は Task<IEnumerable<T>> とどう違いますか?
ATask<IEnumerable<T>>「非同期に全件準備完了してから列挙」で、IAsyncEnumerable<T>「1件ずつ非同期に到着」します。100万件のデータを扱う場合、前者は全件メモリに載せてから処理するのに対し、後者は1件ごとに処理できるためメモリフットプリントが極小になります。DB のページネーション・大量ファイル処理・リアルタイムストリームでは IAsyncEnumerable が適切です。
QSystem.Linq.Async と Microsoft.EntityFrameworkCore の ToListAsync は何が違いますか?
A対象の型が異なります。EF Core の ToListAsyncIQueryable<T> に対するもので、SQL を組み立てて DB に非同期クエリを発行します。System.Linq.Async の ToListAsyncIAsyncEnumerable<T> に対するもので、既存の非同期ストリームを List に集約します。同じ名前で型が違うため、プロジェクトで両方を参照しているとオーバーロード解決が曖昧になることがあります。EF Core の結果を AsAsyncEnumerable() で変換してから System.Linq.Async を使うと責務が分離しやすくなります。
QSelectAwait は並列で実行されますか?
Aいいえ、逐次実行ですSelectAwait は1要素を await してから次の要素に進みます。並列化したい場合は Parallel.ForEachAsync(並列消費)や Task.WhenAll(ids.Select(FetchAsync))(全件並列)を使ってください。IAsyncEnumerable はもともと「非同期にストリーミング」する型で、「非同期ラムダを並列で実行する」目的ではありません。
QIAsyncEnumerable はキャンセルするとどうなりますか?
ACancellationToken が発火すると、現在待機中の awaitawait Task.Delay 等)が OperationCanceledException を投げ、await foreach の外に例外が伝搬します。ただし [EnumeratorCancellation] 属性がないと、呼び出し側の .WithCancellation(token) がメソッド内に伝わりません。公開する非同期イテレータでは必ず [EnumeratorCancellation] CancellationToken ct = default を引数に追加してください。
Q大量データを DB から取得する時の最適解は?
AEF Core の AsAsyncEnumerable() が第一選択です。ToListAsync() だと全件メモリに載せてしまうため、100 万件などの大量データではメモリ不足を起こします。AsAsyncEnumerable() + await foreach で1件ずつ処理すれば、メモリフットプリントは定数になります。SQL Server の AsNoTracking() と組み合わせると変更追跡コストも削減できます。

まとめ

項目 ポイント
基本 async IAsyncEnumerable<T> + yield return + await
キャンセル [EnumeratorCancellation] 属性必須
ライブラリ System.Linq.Async(dotnet/reactive の Ix.NET)
フィルタ Where(同期)/ WhereAwait(非同期ラムダ)
射影 Select / SelectAwait / SelectAwaitWithCancellation
収集 ToListAsync / ToArrayAsync / ToDictionaryAsync
集約 CountAsync / SumAsync / FirstOrDefaultAsync
EF Core AsAsyncEnumerable() で大量データをストリーミング
ページネーション yield return で「透過的な全件列挙」に
並列処理 IAsyncEnumerable は逐次。Parallel.ForEachAsync で並列
バックプレッシャー Channel<T> + BoundedChannelFullMode.Wait
Rx との違い プル型(IAsyncEnumerable)vs プッシュ型(IObservable)
落とし穴 2回列挙で処理2回・.Result でデッドロック・ConfigureAwait 忘れ

関連する非同期処理の詳細は以下を参照してください。async/await 完全ガイドでステートマシン・ConfigureAwait・ValueTask、Task<T> 完全ガイドで Task との違い、非同期と並列の違い完全ガイドで Parallel.ForEachAsync、CancellationToken 完全ガイドでキャンセル伝搬、LINQ 完全ガイドで同期 LINQ の基本を解説しています。