【C#】非同期・並列処理の実践パターン集|Fan-out/Rate Limit/Pipeline/Worker Pool/エラーハンドリングまで

【C#】非同期処理と並列処理の違い|TaskとParallelの使い分け C#

非同期・並列処理の基本概念TaskParallel の使い分けは非同期処理と並列処理の違い完全ガイドで整理しました。本記事は実装レシピ集として、「実際の業務で直面する10種類のシナリオ」ごとに最適な書き方を示します。

Fan-out/Fan-in・レート制限付き並行処理・パイプライン・Worker Pool・Producer/Consumer・Map-Reduce・部分失敗の扱い・タイムアウト・リトライ・複合キャンセルまで、そのままコピーして使えるパターン集としてまとめています。

スポンサーリンク

10パターン — シナリオ別早見表

シナリオ 推奨パターン 鍵となる API
大量の独立タスクを並行実行したい Fan-out / Fan-in Task.WhenAll
外部 API のレート制限を守りたい Semaphore 制御 SemaphoreSlim
処理を段階ごとに最適化したい パイプライン TPL Dataflow / Channel
複数の消費者で並行処理したい Worker Pool Channel + Task.WhenAll
プロデューサとコンシューマが別々の速度 Producer / Consumer + Backpressure BoundedChannel
大量データを並列集計したい Map-Reduce Parallel.For localFinally
一部失敗しても続行したい 部分失敗パターン Task.WhenAll + 例外集約
全体にタイムアウトをかけたい Timeout 付き並行処理 CancellationTokenSource.CancelAfter
失敗したらリトライしたい リトライ付き非同期 Polly / 自前リトライ
複数のキャンセル要因を統合 複合キャンセル CreateLinkedTokenSource

パターン① — Fan-out / Fan-in(大量の並行タスク)

多数の独立したタスク(API 呼び出し・DB クエリ等)を同時に投げて、全部終わってから結果を集める基本パターンです。I/O-bound かつ互いに依存がない場合の定番。

Task.WhenAll による Fan-out / Fan-in
// 100 ユーザーの情報を並行取得
public async Task<List<User>> FetchUsersAsync(IEnumerable<int> userIds)
{
    var tasks = userIds.Select(id => FetchUserAsync(id));
    User[] users = await Task.WhenAll(tasks);
    return users.ToList();
}

// NG: foreach + await で逐次実行になってしまう
public async Task<List<User>> FetchUsersBadAsync(IEnumerable<int> userIds)
{
    var result = new List<User>();
    foreach (var id in userIds)
        result.Add(await FetchUserAsync(id));  // 100倍遅い
    return result;
}

// Task.WhenAny: 最初に終わった1つだけ取る(競争)
public async Task<string> FetchFirstOfAsync(params string[] urls)
{
    var tasks = urls.Select(FetchAsync).ToList();
    Task<string> first = await Task.WhenAny(tasks);
    return await first;   // 最速で応答したエンドポイントの結果
}
Select + Task.WhenAll の落とし穴
userIds.Select(id => FetchUserAsync(id)) は遅延評価なので、Task.WhenAll に渡す前に .ToList()展開すべきケースがあります。特に Task.WhenAll の後で tasks を再列挙すると同じ処理が2回走ってしまうことがあるため、var tasks = userIds.Select(...).ToList(); としてから await Task.WhenAll(tasks); と書くのが安全です。

パターン② — レート制限付き並行処理(SemaphoreSlim)

「外部 API は同時接続 10 まで」「1 秒あたり 100 リクエストまで」などの制約を守りながら大量の並行リクエストを捌くパターン。SemaphoreSlim で同時実行数を絞ります。

SemaphoreSlim で同時実行数を制限
public sealed class RateLimitedClient
{
    private readonly SemaphoreSlim _gate = new(initialCount: 10);  // 同時 10 まで
    private readonly HttpClient _http = new();

    public async Task<string> FetchAsync(string url, CancellationToken ct)
    {
        await _gate.WaitAsync(ct);
        try
        {
            return await _http.GetStringAsync(url, ct);
        }
        finally
        {
            _gate.Release();  // 必ず解放(例外時も)
        }
    }

    public async Task<List<string>> FetchAllAsync(
        IEnumerable<string> urls, CancellationToken ct)
    {
        var tasks = urls.Select(u => FetchAsync(u, ct));
        return (await Task.WhenAll(tasks)).ToList();
    }
}

// 代替: Parallel.ForEachAsync の MaxDegreeOfParallelism(.NET 6+)
public async Task FetchAllAsync2(IEnumerable<string> urls, CancellationToken ct)
{
    await Parallel.ForEachAsync(urls,
        new ParallelOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct },
        async (url, c) => await _http.GetStringAsync(url, c));
}
時間窓レート制限(.NET 7+ RateLimiter)
using System.Threading.RateLimiting;

// 1秒あたり100リクエストまで許可(.NET 7+ の System.Threading.RateLimiting)
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions
{
    TokenLimit        = 100,
    TokensPerPeriod   = 100,
    ReplenishmentPeriod = TimeSpan.FromSeconds(1),
    AutoReplenishment = true,
    QueueLimit        = 1000,
});

public async Task<string> FetchWithLimitAsync(string url, CancellationToken ct)
{
    using RateLimitLease lease = await limiter.AcquireAsync(permitCount: 1, ct);
    if (!lease.IsAcquired) throw new InvalidOperationException("レート制限超過");

    return await _http.GetStringAsync(url, ct);
}

パターン③ — パイプライン処理(ステージ別最適化)

段階ごとに異なる並列度を設定
using System.Threading.Tasks.Dataflow;

// ダウンロード(I/O)→ 変換(CPU)→ 保存(I/O)
// 各ステージで適切な並列度を別々に設定できる
public sealed class DocumentPipeline
{
    private readonly TransformBlock<string, string> _download;
    private readonly TransformBlock<string, Document> _parse;
    private readonly ActionBlock<Document> _save;

    public DocumentPipeline(HttpClient http, IDatabase db)
    {
        _download = new(async url =>
        {
            return await http.GetStringAsync(url);
        }, new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = 20,   // I/O は多めに並列
            BoundedCapacity         = 100, // バックプレッシャー
        });

        _parse = new(html => ParseDocument(html),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = Environment.ProcessorCount, // CPU コア数
                BoundedCapacity         = 50,
            });

        _save = new(async doc => await db.SaveAsync(doc),
            new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = 4,   // DB の接続プール上限
                BoundedCapacity         = 30,
            });

        var link = new DataflowLinkOptions { PropagateCompletion = true };
        _download.LinkTo(_parse, link);
        _parse.LinkTo(_save, link);
    }

    public async Task ProcessAsync(IEnumerable<string> urls)
    {
        foreach (var u in urls) await _download.SendAsync(u);
        _download.Complete();
        await _save.Completion;
    }
}

パターン④ — Worker Pool(Channel + 複数消費者)

1つのキューから N 本のワーカーで並行処理
using System.Threading.Channels;

public sealed class WorkerPool<T>
{
    private readonly Channel<T> _queue;
    private readonly Func<T, CancellationToken, Task> _handler;
    private readonly int _workerCount;

    public WorkerPool(int workerCount, int queueCapacity,
        Func<T, CancellationToken, Task> handler)
    {
        _workerCount = workerCount;
        _handler     = handler;
        _queue       = Channel.CreateBounded<T>(new BoundedChannelOptions(queueCapacity)
        {
            FullMode = BoundedChannelFullMode.Wait,
            SingleReader = false,
            SingleWriter = false,
        });
    }

    public ValueTask EnqueueAsync(T item, CancellationToken ct)
        => _queue.Writer.WriteAsync(item, ct);

    public void Complete() => _queue.Writer.Complete();

    public async Task RunAsync(CancellationToken ct)
    {
        var workers = Enumerable.Range(0, _workerCount)
            .Select(_ => Task.Run(async () =>
            {
                await foreach (var item in _queue.Reader.ReadAllAsync(ct))
                    await _handler(item, ct);
            }, ct))
            .ToArray();

        await Task.WhenAll(workers);
    }
}

// 使用例
var pool = new WorkerPool<WorkItem>(
    workerCount: 8,
    queueCapacity: 100,
    handler: async (item, ct) => await ProcessAsync(item, ct));

var runTask = pool.RunAsync(cts.Token);
foreach (var work in LoadWorkItems())
    await pool.EnqueueAsync(work, cts.Token);
pool.Complete();
await runTask;

パターン⑤ — プロデューサ/コンシューマ + バックプレッシャー

プロデューサ > コンシューマ時の自動待機
// プロデューサが速すぎてメモリが膨張するのを防ぐ
// BoundedChannel で上限を設けると、満杯時は Writer 側が自動待機
var channel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(10_000)
{
    FullMode = BoundedChannelFullMode.Wait,  // 上限到達時はプロデューサが待機
    // 代替: DropOldest / DropNewest / DropWrite(損失を許容する場合)
});

// プロデューサ: ログを高速生成
Task producer = Task.Run(async () =>
{
    while (IsRunning)
    {
        LogEntry entry = await ReadFromSocketAsync();
        await channel.Writer.WriteAsync(entry);  // 満杯なら自動で待つ
    }
    channel.Writer.Complete();
});

// コンシューマ: DB への書き込みは遅い
Task consumer = Task.Run(async () =>
{
    await foreach (var entry in channel.Reader.ReadAllAsync())
        await db.InsertAsync(entry);
});

await Task.WhenAll(producer, consumer);

パターン⑥ — Map-Reduce 風集計

Parallel.For の localInit/localFinally で集計
// NG: 毎回ロック → 直列化して並列のメリットが消える
long sum = 0;
object gate = new();
Parallel.For(0, items.Length, i =>
{
    long local = Compute(items[i]);
    lock (gate) { sum += local; }  // 競合
});

// OK: スレッドローカル状態で Map→Reduce
long sumFast = 0;
Parallel.For(0, items.Length,
    localInit: () => 0L,
    body: (i, state, local) => local + Compute(items[i]),
    localFinally: local => Interlocked.Add(ref sumFast, local));

// PLINQ 版(シンプルだが制御しにくい)
long sumPlinq = items.AsParallel()
                     .Select(Compute)
                     .Sum();

// 複数集計が必要な場合(平均・分散・最大最小)
var stats = items.AsParallel()
    .Aggregate(
        seedFactory: () => new Stats(),
        updateAccumulatorFunc: (acc, x) => acc.Add(x),
        combineAccumulatorsFunc: (a, b) => a.Merge(b),
        resultSelector: acc => acc.ToResult());

パターン⑦ — 部分失敗を許容する並行処理

一部が失敗しても残りは続行したい
public sealed record FetchResult<T>(T? Value, Exception? Error, string Source)
{
    public bool IsSuccess => Error is null;
}

public async Task<List<FetchResult<string>>> FetchAllSafeAsync(
    IEnumerable<string> urls, CancellationToken ct)
{
    async Task<FetchResult<string>> TryFetch(string url)
    {
        try
        {
            var body = await _http.GetStringAsync(url, ct);
            return new FetchResult<string>(body, null, url);
        }
        catch (Exception ex)
        {
            return new FetchResult<string>(null, ex, url);
        }
    }

    var results = await Task.WhenAll(urls.Select(TryFetch));
    return results.ToList();
}

// 使用側で成功/失敗を振り分け
var results = await FetchAllSafeAsync(urls, ct);
var succeeded = results.Where(r => r.IsSuccess).Select(r => r.Value!).ToList();
var failed    = results.Where(r => !r.IsSuccess).ToList();

foreach (var f in failed)
    logger.LogWarning(f.Error, "Failed to fetch {Url}", f.Source);
Task.WhenAll は「最初の例外」しか再スローしない
複数タスクが失敗しても、await Task.WhenAll(tasks)await では最初に発生した1件の例外だけが再スローされます。残りの例外は Task.Exception.InnerExceptions に格納されていますが、参照しないとスローされません。全エラーを知りたい場合は上記のようにtry-catch を各タスク内に閉じるか、Task.WhenAll 後に tasks.Where(t => t.IsFaulted) で個別に例外を取り出してください。

パターン⑧ — タイムアウト付き並行処理

全体タイムアウトと個別タイムアウトの書き分け
// ① 全体に 30 秒のタイムアウト
public async Task RunAllWithGlobalTimeoutAsync(IEnumerable<string> urls)
{
    using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
    try
    {
        var tasks = urls.Select(u => FetchAsync(u, cts.Token));
        await Task.WhenAll(tasks);
    }
    catch (OperationCanceledException) when (cts.IsCancellationRequested)
    {
        Console.WriteLine("全体タイムアウト");
    }
}

// ② 個別に 5 秒のタイムアウト(1つ遅くても全体は止めない)
public async Task<string?> FetchWithTimeoutAsync(string url, TimeSpan timeout)
{
    using var cts = new CancellationTokenSource(timeout);
    try
    {
        return await _http.GetStringAsync(url, cts.Token);
    }
    catch (OperationCanceledException)
    {
        return null;
    }
}

// ③ .NET 6+: Task.WaitAsync でタイムアウトを手軽に
public async Task<string> FetchAsync(string url)
{
    return await _http.GetStringAsync(url)
        .WaitAsync(TimeSpan.FromSeconds(10));  // 10秒超過で TimeoutException
}

パターン⑨ — リトライ付き非同期処理

Polly でエクスポネンシャルバックオフ
using Polly;

// 指数バックオフ: 1秒 → 2秒 → 4秒 で最大3回リトライ
var policy = Policy
    .Handle<HttpRequestException>()
    .WaitAndRetryAsync(
        retryCount: 3,
        sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt - 1)),
        onRetry: (ex, delay, attempt, _) =>
            Console.WriteLine($"Retry {attempt} after {delay}: {ex.Message}"));

string result = await policy.ExecuteAsync(
    ct => _http.GetStringAsync("https://api.example.com", ct),
    cancellationToken: ct);

// .NET 8+: Microsoft.Extensions.Resilience で標準のレジリエンス
services.AddHttpClient<MyApiClient>()
    .AddStandardResilienceHandler();  // リトライ・サーキットブレーカー・タイムアウトが全部入り

// 自前実装(Polly なしでシンプルに)
public async Task<T> RetryAsync<T>(
    Func<CancellationToken, Task<T>> action,
    int maxAttempts,
    CancellationToken ct)
{
    Exception? lastError = null;
    for (int i = 0; i < maxAttempts; i++)
    {
        try { return await action(ct); }
        catch (Exception ex) when (ex is not OperationCanceledException)
        {
            lastError = ex;
            var delay = TimeSpan.FromMilliseconds(Math.Pow(2, i) * 100);
            await Task.Delay(delay, ct);
        }
    }
    throw new InvalidOperationException($"{maxAttempts}回試行して失敗", lastError);
}

パターン⑩ — 複合キャンセル(複数の中止要因)

ユーザーキャンセル+タイムアウト+シャットダウンを束ねる
public async Task<T> RobustCallAsync<T>(
    Func<CancellationToken, Task<T>> action,
    CancellationToken userToken,
    TimeSpan timeout)
{
    // アプリ停止用(AppLifetime 等から)
    CancellationToken appShutdownToken = _appLifetime.ApplicationStopping;

    // 3つを束ねた1つのトークンを作る
    using var linked = CancellationTokenSource.CreateLinkedTokenSource(
        userToken, appShutdownToken);
    linked.CancelAfter(timeout);

    try
    {
        return await action(linked.Token);
    }
    catch (OperationCanceledException)
    {
        // どの要因でキャンセルされたか判別
        if (userToken.IsCancellationRequested)      throw new UserCancelledException();
        if (appShutdownToken.IsCancellationRequested) throw new AppShutdownException();
        throw new TimeoutException($"{timeout} 経過");
    }
}
CreateLinkedTokenSource は必ず using で囲む
CancellationTokenSource.CreateLinkedTokenSource内部で親トークンに登録するため、使い終わったら必ず Dispose を呼ぶ必要があります。呼ばないと親トークンが生存している間メモリリークし続けます。using var linked = ... でスコープを閉じるのが定石です。CancelAfter を追加登録する場合も同じインスタンスに対して行えます。

パフォーマンス診断のチェックリスト

症状 疑うべき原因 調査ツール
並列度を上げても速くならない ネットワークや DB が律速。スレッド数より帯域 dotnet-counters・APM ツール
CPU 100% だが遅い ロック競合・False Sharing PerfView・dotTrace
メモリが右肩上がり バックプレッシャー不足・Channel が無制限 dotnet-counters gc-heap-stats
スレッドプール枯渇 sync-over-async・ThreadPool 起動遅延 dotnet-counters threadpool-queue-length
たまにデッドロック UI スレッドで .Result・ConfigureAwait 漏れ Visual Studio デッドロック診断
例外が握り潰される async void・fire-and-forget TaskScheduler.UnobservedTaskException
よくあるベンチマーク計測パターン
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;

[MemoryDiagnoser]
public class FanOutBench
{
    private readonly int[] _ids = Enumerable.Range(0, 100).ToArray();

    [Benchmark(Baseline = true)]
    public async Task Sequential()
    {
        foreach (var id in _ids) await FetchAsync(id);
    }

    [Benchmark]
    public async Task FanOut_Unlimited()
    {
        await Task.WhenAll(_ids.Select(FetchAsync));
    }

    [Benchmark]
    public async Task FanOut_Limited_10()
    {
        using var gate = new SemaphoreSlim(10);
        async Task Gated(int id)
        {
            await gate.WaitAsync();
            try { await FetchAsync(id); } finally { gate.Release(); }
        }
        await Task.WhenAll(_ids.Select(Gated));
    }
}

// 実行: BenchmarkRunner.Run<FanOutBench>();

よくある質問

QSemaphoreSlim と MaxDegreeOfParallelism はどちらを使うべきですか?
Aコレクション全体を並行処理するなら Parallel.ForEachAsync + MaxDegreeOfParallelism がシンプルです。一方、異なる箇所(異なるメソッド)で同じ上限を共有したい場合は SemaphoreSlim のインスタンスをクラスに持たせて共有する必要があります。外部 API のレートを「このクライアント全体で 10 並行」と制御したいなら SemaphoreSlim、単発の処理を「この呼び出しだけ 10 並行」なら MaxDegreeOfParallelism が自然です。
QPolly と Microsoft.Extensions.Resilience はどう違いますか?
AMicrosoft.Extensions.Resilience(.NET 8+)は Polly を下敷きにした標準レジリエンスライブラリで、Polly の柔軟性と MS.DI 統合の両方を備えています。新規プロジェクトでは AddStandardResilienceHandler の宣言的設定で大半のユースケース(リトライ・サーキットブレーカー・タイムアウト・ヘッジング)をカバーできるため、まずこちらを検討してください。細かいカスタム戦略が必要なら従来の Polly API を使います。
Q非同期処理でキャンセルを無視するとどうなりますか?
ACancellationToken を受け取っているのに処理に渡さないと、「キャンセルを要求しても何も起きない」状況になります。ユーザーが「取り消し」ボタンを押しても処理が完了するまで待たされ、Task.DelayHttpClient.GetAsync など下位の待機も最後まで進行します。特にASP.NET Core の HttpContext.RequestAbortedは「クライアントが接続を切った」サインなので、これを無視すると既に誰も待っていない計算を延々と続けるリソース浪費になります。受け取った CancellationToken必ず全ての非同期呼び出しに渡すのが鉄則です。
Q大量の並行処理でメモリが増え続けます。原因は?
A最もよくあるのはバックプレッシャーの欠如です。プロデューサが出力する速度がコンシューマの処理速度を上回ると、バッファに溜まり続けてメモリが溢れます。Channel.CreateUnboundedTask.WhenAll に数万件のタスクを一度に詰める・ConcurrentQueue へ無制限に Enqueue する、などのパターンで起きます。対策は Channel.CreateBounded + FullMode.WaitSemaphoreSlim による同時実行数制限・バッチ処理への分割です。
QTask.Run 内で SemaphoreSlim を待つと何か問題はありますか?
ASemaphoreSlim.Wait()(同期版)をスレッドプールで使うと、スレッドをブロックした状態で解放待ちするためスレッドプール枯渇の原因になります。非同期コードからは必ず await semaphore.WaitAsync() を使ってください。これなら待機中はスレッドが解放され、他の処理が進行できます。同期版 Wait() が必要なのはコンソールアプリの同期メインなど特殊な場面だけです。

まとめ

シナリオ 推奨パターン
独立したN個のI/O Fan-out: Task.WhenAll(items.Select(...))
同時実行数の制限 SemaphoreSlim or MaxDegreeOfParallelism
時間窓レート制限 .NET 7+ の RateLimiter
段階的な処理 TPL Dataflow(ステージごとの並列度)
複数のワーカー Channel + 複数の ReadAllAsync
プロデューサ > コンシューマ BoundedChannel + FullMode.Wait
集計処理 Parallel.For の localInit/localFinally or PLINQ Aggregate
部分失敗 各タスク内で try-catch し結果オブジェクトで返す
全体タイムアウト CancellationTokenSource.CancelAfter
個別タイムアウト Task.WaitAsync(.NET 6+)
リトライ Polly or Microsoft.Extensions.Resilience
複合キャンセル CreateLinkedTokenSource + using

基本概念と API の使い分けは非同期処理と並列処理の違い完全ガイド、async/await のステートマシンや ConfigureAwait・ValueTask の詳細はasync/await 完全ガイド、CancellationToken の実装パターンはCancellationToken 完全ガイド、Channels はQueueとStack完全ガイドを参照してください。