非同期・並列処理の基本概念や Task・Parallel の使い分けは非同期処理と並列処理の違い完全ガイドで整理しました。本記事は実装レシピ集として、「実際の業務で直面する10種類のシナリオ」ごとに最適な書き方を示します。
Fan-out/Fan-in・レート制限付き並行処理・パイプライン・Worker Pool・Producer/Consumer・Map-Reduce・部分失敗の扱い・タイムアウト・リトライ・複合キャンセルまで、そのままコピーして使えるパターン集としてまとめています。
- 10パターン — シナリオ別早見表
- パターン① — Fan-out / Fan-in(大量の並行タスク)
- パターン② — レート制限付き並行処理(SemaphoreSlim)
- パターン③ — パイプライン処理(ステージ別最適化)
- パターン④ — Worker Pool(Channel + 複数消費者)
- パターン⑤ — プロデューサ/コンシューマ + バックプレッシャー
- パターン⑥ — Map-Reduce 風集計
- パターン⑦ — 部分失敗を許容する並行処理
- パターン⑧ — タイムアウト付き並行処理
- パターン⑨ — リトライ付き非同期処理
- パターン⑩ — 複合キャンセル(複数の中止要因)
- パフォーマンス診断のチェックリスト
- よくある質問
- まとめ
10パターン — シナリオ別早見表
| シナリオ | 推奨パターン | 鍵となる API |
|---|---|---|
| 大量の独立タスクを並行実行したい | Fan-out / Fan-in | Task.WhenAll |
| 外部 API のレート制限を守りたい | Semaphore 制御 | SemaphoreSlim |
| 処理を段階ごとに最適化したい | パイプライン | TPL Dataflow / Channel |
| 複数の消費者で並行処理したい | Worker Pool | Channel + Task.WhenAll |
| プロデューサとコンシューマが別々の速度 | Producer / Consumer + Backpressure | BoundedChannel |
| 大量データを並列集計したい | Map-Reduce | Parallel.For localFinally |
| 一部失敗しても続行したい | 部分失敗パターン | Task.WhenAll + 例外集約 |
| 全体にタイムアウトをかけたい | Timeout 付き並行処理 | CancellationTokenSource.CancelAfter |
| 失敗したらリトライしたい | リトライ付き非同期 | Polly / 自前リトライ |
| 複数のキャンセル要因を統合 | 複合キャンセル | CreateLinkedTokenSource |
パターン① — Fan-out / Fan-in(大量の並行タスク)
多数の独立したタスク(API 呼び出し・DB クエリ等)を同時に投げて、全部終わってから結果を集める基本パターンです。I/O-bound かつ互いに依存がない場合の定番。
// 100 ユーザーの情報を並行取得
public async Task<List<User>> FetchUsersAsync(IEnumerable<int> userIds)
{
var tasks = userIds.Select(id => FetchUserAsync(id));
User[] users = await Task.WhenAll(tasks);
return users.ToList();
}
// NG: foreach + await で逐次実行になってしまう
public async Task<List<User>> FetchUsersBadAsync(IEnumerable<int> userIds)
{
var result = new List<User>();
foreach (var id in userIds)
result.Add(await FetchUserAsync(id)); // 100倍遅い
return result;
}
// Task.WhenAny: 最初に終わった1つだけ取る(競争)
public async Task<string> FetchFirstOfAsync(params string[] urls)
{
var tasks = urls.Select(FetchAsync).ToList();
Task<string> first = await Task.WhenAny(tasks);
return await first; // 最速で応答したエンドポイントの結果
}
userIds.Select(id => FetchUserAsync(id)) は遅延評価なので、Task.WhenAll に渡す前に .ToList() で展開すべきケースがあります。特に Task.WhenAll の後で tasks を再列挙すると同じ処理が2回走ってしまうことがあるため、var tasks = userIds.Select(...).ToList(); としてから await Task.WhenAll(tasks); と書くのが安全です。パターン② — レート制限付き並行処理(SemaphoreSlim)
「外部 API は同時接続 10 まで」「1 秒あたり 100 リクエストまで」などの制約を守りながら大量の並行リクエストを捌くパターン。SemaphoreSlim で同時実行数を絞ります。
public sealed class RateLimitedClient
{
private readonly SemaphoreSlim _gate = new(initialCount: 10); // 同時 10 まで
private readonly HttpClient _http = new();
public async Task<string> FetchAsync(string url, CancellationToken ct)
{
await _gate.WaitAsync(ct);
try
{
return await _http.GetStringAsync(url, ct);
}
finally
{
_gate.Release(); // 必ず解放(例外時も)
}
}
public async Task<List<string>> FetchAllAsync(
IEnumerable<string> urls, CancellationToken ct)
{
var tasks = urls.Select(u => FetchAsync(u, ct));
return (await Task.WhenAll(tasks)).ToList();
}
}
// 代替: Parallel.ForEachAsync の MaxDegreeOfParallelism(.NET 6+)
public async Task FetchAllAsync2(IEnumerable<string> urls, CancellationToken ct)
{
await Parallel.ForEachAsync(urls,
new ParallelOptions { MaxDegreeOfParallelism = 10, CancellationToken = ct },
async (url, c) => await _http.GetStringAsync(url, c));
}
using System.Threading.RateLimiting;
// 1秒あたり100リクエストまで許可(.NET 7+ の System.Threading.RateLimiting)
var limiter = new TokenBucketRateLimiter(new TokenBucketRateLimiterOptions
{
TokenLimit = 100,
TokensPerPeriod = 100,
ReplenishmentPeriod = TimeSpan.FromSeconds(1),
AutoReplenishment = true,
QueueLimit = 1000,
});
public async Task<string> FetchWithLimitAsync(string url, CancellationToken ct)
{
using RateLimitLease lease = await limiter.AcquireAsync(permitCount: 1, ct);
if (!lease.IsAcquired) throw new InvalidOperationException("レート制限超過");
return await _http.GetStringAsync(url, ct);
}
パターン③ — パイプライン処理(ステージ別最適化)
using System.Threading.Tasks.Dataflow;
// ダウンロード(I/O)→ 変換(CPU)→ 保存(I/O)
// 各ステージで適切な並列度を別々に設定できる
public sealed class DocumentPipeline
{
private readonly TransformBlock<string, string> _download;
private readonly TransformBlock<string, Document> _parse;
private readonly ActionBlock<Document> _save;
public DocumentPipeline(HttpClient http, IDatabase db)
{
_download = new(async url =>
{
return await http.GetStringAsync(url);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 20, // I/O は多めに並列
BoundedCapacity = 100, // バックプレッシャー
});
_parse = new(html => ParseDocument(html),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount, // CPU コア数
BoundedCapacity = 50,
});
_save = new(async doc => await db.SaveAsync(doc),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 4, // DB の接続プール上限
BoundedCapacity = 30,
});
var link = new DataflowLinkOptions { PropagateCompletion = true };
_download.LinkTo(_parse, link);
_parse.LinkTo(_save, link);
}
public async Task ProcessAsync(IEnumerable<string> urls)
{
foreach (var u in urls) await _download.SendAsync(u);
_download.Complete();
await _save.Completion;
}
}
パターン④ — Worker Pool(Channel + 複数消費者)
using System.Threading.Channels;
public sealed class WorkerPool<T>
{
private readonly Channel<T> _queue;
private readonly Func<T, CancellationToken, Task> _handler;
private readonly int _workerCount;
public WorkerPool(int workerCount, int queueCapacity,
Func<T, CancellationToken, Task> handler)
{
_workerCount = workerCount;
_handler = handler;
_queue = Channel.CreateBounded<T>(new BoundedChannelOptions(queueCapacity)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false,
SingleWriter = false,
});
}
public ValueTask EnqueueAsync(T item, CancellationToken ct)
=> _queue.Writer.WriteAsync(item, ct);
public void Complete() => _queue.Writer.Complete();
public async Task RunAsync(CancellationToken ct)
{
var workers = Enumerable.Range(0, _workerCount)
.Select(_ => Task.Run(async () =>
{
await foreach (var item in _queue.Reader.ReadAllAsync(ct))
await _handler(item, ct);
}, ct))
.ToArray();
await Task.WhenAll(workers);
}
}
// 使用例
var pool = new WorkerPool<WorkItem>(
workerCount: 8,
queueCapacity: 100,
handler: async (item, ct) => await ProcessAsync(item, ct));
var runTask = pool.RunAsync(cts.Token);
foreach (var work in LoadWorkItems())
await pool.EnqueueAsync(work, cts.Token);
pool.Complete();
await runTask;
パターン⑤ — プロデューサ/コンシューマ + バックプレッシャー
// プロデューサが速すぎてメモリが膨張するのを防ぐ
// BoundedChannel で上限を設けると、満杯時は Writer 側が自動待機
var channel = Channel.CreateBounded<LogEntry>(new BoundedChannelOptions(10_000)
{
FullMode = BoundedChannelFullMode.Wait, // 上限到達時はプロデューサが待機
// 代替: DropOldest / DropNewest / DropWrite(損失を許容する場合)
});
// プロデューサ: ログを高速生成
Task producer = Task.Run(async () =>
{
while (IsRunning)
{
LogEntry entry = await ReadFromSocketAsync();
await channel.Writer.WriteAsync(entry); // 満杯なら自動で待つ
}
channel.Writer.Complete();
});
// コンシューマ: DB への書き込みは遅い
Task consumer = Task.Run(async () =>
{
await foreach (var entry in channel.Reader.ReadAllAsync())
await db.InsertAsync(entry);
});
await Task.WhenAll(producer, consumer);
パターン⑥ — Map-Reduce 風集計
// NG: 毎回ロック → 直列化して並列のメリットが消える
long sum = 0;
object gate = new();
Parallel.For(0, items.Length, i =>
{
long local = Compute(items[i]);
lock (gate) { sum += local; } // 競合
});
// OK: スレッドローカル状態で Map→Reduce
long sumFast = 0;
Parallel.For(0, items.Length,
localInit: () => 0L,
body: (i, state, local) => local + Compute(items[i]),
localFinally: local => Interlocked.Add(ref sumFast, local));
// PLINQ 版(シンプルだが制御しにくい)
long sumPlinq = items.AsParallel()
.Select(Compute)
.Sum();
// 複数集計が必要な場合(平均・分散・最大最小)
var stats = items.AsParallel()
.Aggregate(
seedFactory: () => new Stats(),
updateAccumulatorFunc: (acc, x) => acc.Add(x),
combineAccumulatorsFunc: (a, b) => a.Merge(b),
resultSelector: acc => acc.ToResult());
パターン⑦ — 部分失敗を許容する並行処理
public sealed record FetchResult<T>(T? Value, Exception? Error, string Source)
{
public bool IsSuccess => Error is null;
}
public async Task<List<FetchResult<string>>> FetchAllSafeAsync(
IEnumerable<string> urls, CancellationToken ct)
{
async Task<FetchResult<string>> TryFetch(string url)
{
try
{
var body = await _http.GetStringAsync(url, ct);
return new FetchResult<string>(body, null, url);
}
catch (Exception ex)
{
return new FetchResult<string>(null, ex, url);
}
}
var results = await Task.WhenAll(urls.Select(TryFetch));
return results.ToList();
}
// 使用側で成功/失敗を振り分け
var results = await FetchAllSafeAsync(urls, ct);
var succeeded = results.Where(r => r.IsSuccess).Select(r => r.Value!).ToList();
var failed = results.Where(r => !r.IsSuccess).ToList();
foreach (var f in failed)
logger.LogWarning(f.Error, "Failed to fetch {Url}", f.Source);
複数タスクが失敗しても、
await Task.WhenAll(tasks) の await では最初に発生した1件の例外だけが再スローされます。残りの例外は Task.Exception.InnerExceptions に格納されていますが、参照しないとスローされません。全エラーを知りたい場合は上記のようにtry-catch を各タスク内に閉じるか、Task.WhenAll 後に tasks.Where(t => t.IsFaulted) で個別に例外を取り出してください。パターン⑧ — タイムアウト付き並行処理
// ① 全体に 30 秒のタイムアウト
public async Task RunAllWithGlobalTimeoutAsync(IEnumerable<string> urls)
{
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
try
{
var tasks = urls.Select(u => FetchAsync(u, cts.Token));
await Task.WhenAll(tasks);
}
catch (OperationCanceledException) when (cts.IsCancellationRequested)
{
Console.WriteLine("全体タイムアウト");
}
}
// ② 個別に 5 秒のタイムアウト(1つ遅くても全体は止めない)
public async Task<string?> FetchWithTimeoutAsync(string url, TimeSpan timeout)
{
using var cts = new CancellationTokenSource(timeout);
try
{
return await _http.GetStringAsync(url, cts.Token);
}
catch (OperationCanceledException)
{
return null;
}
}
// ③ .NET 6+: Task.WaitAsync でタイムアウトを手軽に
public async Task<string> FetchAsync(string url)
{
return await _http.GetStringAsync(url)
.WaitAsync(TimeSpan.FromSeconds(10)); // 10秒超過で TimeoutException
}
パターン⑨ — リトライ付き非同期処理
using Polly;
// 指数バックオフ: 1秒 → 2秒 → 4秒 で最大3回リトライ
var policy = Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(
retryCount: 3,
sleepDurationProvider: attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt - 1)),
onRetry: (ex, delay, attempt, _) =>
Console.WriteLine($"Retry {attempt} after {delay}: {ex.Message}"));
string result = await policy.ExecuteAsync(
ct => _http.GetStringAsync("https://api.example.com", ct),
cancellationToken: ct);
// .NET 8+: Microsoft.Extensions.Resilience で標準のレジリエンス
services.AddHttpClient<MyApiClient>()
.AddStandardResilienceHandler(); // リトライ・サーキットブレーカー・タイムアウトが全部入り
// 自前実装(Polly なしでシンプルに)
public async Task<T> RetryAsync<T>(
Func<CancellationToken, Task<T>> action,
int maxAttempts,
CancellationToken ct)
{
Exception? lastError = null;
for (int i = 0; i < maxAttempts; i++)
{
try { return await action(ct); }
catch (Exception ex) when (ex is not OperationCanceledException)
{
lastError = ex;
var delay = TimeSpan.FromMilliseconds(Math.Pow(2, i) * 100);
await Task.Delay(delay, ct);
}
}
throw new InvalidOperationException($"{maxAttempts}回試行して失敗", lastError);
}
パターン⑩ — 複合キャンセル(複数の中止要因)
public async Task<T> RobustCallAsync<T>(
Func<CancellationToken, Task<T>> action,
CancellationToken userToken,
TimeSpan timeout)
{
// アプリ停止用(AppLifetime 等から)
CancellationToken appShutdownToken = _appLifetime.ApplicationStopping;
// 3つを束ねた1つのトークンを作る
using var linked = CancellationTokenSource.CreateLinkedTokenSource(
userToken, appShutdownToken);
linked.CancelAfter(timeout);
try
{
return await action(linked.Token);
}
catch (OperationCanceledException)
{
// どの要因でキャンセルされたか判別
if (userToken.IsCancellationRequested) throw new UserCancelledException();
if (appShutdownToken.IsCancellationRequested) throw new AppShutdownException();
throw new TimeoutException($"{timeout} 経過");
}
}
CancellationTokenSource.CreateLinkedTokenSource は内部で親トークンに登録するため、使い終わったら必ず Dispose を呼ぶ必要があります。呼ばないと親トークンが生存している間メモリリークし続けます。using var linked = ... でスコープを閉じるのが定石です。CancelAfter を追加登録する場合も同じインスタンスに対して行えます。パフォーマンス診断のチェックリスト
| 症状 | 疑うべき原因 | 調査ツール |
|---|---|---|
| 並列度を上げても速くならない | ネットワークや DB が律速。スレッド数より帯域 | dotnet-counters・APM ツール |
| CPU 100% だが遅い | ロック競合・False Sharing | PerfView・dotTrace |
| メモリが右肩上がり | バックプレッシャー不足・Channel が無制限 | dotnet-counters gc-heap-stats |
| スレッドプール枯渇 | sync-over-async・ThreadPool 起動遅延 | dotnet-counters threadpool-queue-length |
| たまにデッドロック | UI スレッドで .Result・ConfigureAwait 漏れ | Visual Studio デッドロック診断 |
| 例外が握り潰される | async void・fire-and-forget | TaskScheduler.UnobservedTaskException |
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;
[MemoryDiagnoser]
public class FanOutBench
{
private readonly int[] _ids = Enumerable.Range(0, 100).ToArray();
[Benchmark(Baseline = true)]
public async Task Sequential()
{
foreach (var id in _ids) await FetchAsync(id);
}
[Benchmark]
public async Task FanOut_Unlimited()
{
await Task.WhenAll(_ids.Select(FetchAsync));
}
[Benchmark]
public async Task FanOut_Limited_10()
{
using var gate = new SemaphoreSlim(10);
async Task Gated(int id)
{
await gate.WaitAsync();
try { await FetchAsync(id); } finally { gate.Release(); }
}
await Task.WhenAll(_ids.Select(Gated));
}
}
// 実行: BenchmarkRunner.Run<FanOutBench>();
よくある質問
Parallel.ForEachAsync + MaxDegreeOfParallelism がシンプルです。一方、異なる箇所(異なるメソッド)で同じ上限を共有したい場合は SemaphoreSlim のインスタンスをクラスに持たせて共有する必要があります。外部 API のレートを「このクライアント全体で 10 並行」と制御したいなら SemaphoreSlim、単発の処理を「この呼び出しだけ 10 並行」なら MaxDegreeOfParallelism が自然です。AddStandardResilienceHandler の宣言的設定で大半のユースケース(リトライ・サーキットブレーカー・タイムアウト・ヘッジング)をカバーできるため、まずこちらを検討してください。細かいカスタム戦略が必要なら従来の Polly API を使います。CancellationToken を受け取っているのに処理に渡さないと、「キャンセルを要求しても何も起きない」状況になります。ユーザーが「取り消し」ボタンを押しても処理が完了するまで待たされ、Task.Delay や HttpClient.GetAsync など下位の待機も最後まで進行します。特にASP.NET Core の HttpContext.RequestAbortedは「クライアントが接続を切った」サインなので、これを無視すると既に誰も待っていない計算を延々と続けるリソース浪費になります。受け取った CancellationToken は必ず全ての非同期呼び出しに渡すのが鉄則です。Channel.CreateUnbounded・Task.WhenAll に数万件のタスクを一度に詰める・ConcurrentQueue へ無制限に Enqueue する、などのパターンで起きます。対策は Channel.CreateBounded + FullMode.Wait・SemaphoreSlim による同時実行数制限・バッチ処理への分割です。SemaphoreSlim.Wait()(同期版)をスレッドプールで使うと、スレッドをブロックした状態で解放待ちするためスレッドプール枯渇の原因になります。非同期コードからは必ず await semaphore.WaitAsync() を使ってください。これなら待機中はスレッドが解放され、他の処理が進行できます。同期版 Wait() が必要なのはコンソールアプリの同期メインなど特殊な場面だけです。まとめ
| シナリオ | 推奨パターン |
|---|---|
| 独立したN個のI/O | Fan-out: Task.WhenAll(items.Select(...)) |
| 同時実行数の制限 | SemaphoreSlim or MaxDegreeOfParallelism |
| 時間窓レート制限 | .NET 7+ の RateLimiter |
| 段階的な処理 | TPL Dataflow(ステージごとの並列度) |
| 複数のワーカー | Channel + 複数の ReadAllAsync |
| プロデューサ > コンシューマ | BoundedChannel + FullMode.Wait |
| 集計処理 | Parallel.For の localInit/localFinally or PLINQ Aggregate |
| 部分失敗 | 各タスク内で try-catch し結果オブジェクトで返す |
| 全体タイムアウト | CancellationTokenSource.CancelAfter |
| 個別タイムアウト | Task.WaitAsync(.NET 6+) |
| リトライ | Polly or Microsoft.Extensions.Resilience |
| 複合キャンセル | CreateLinkedTokenSource + using |
基本概念と API の使い分けは非同期処理と並列処理の違い完全ガイド、async/await のステートマシンや ConfigureAwait・ValueTask の詳細はasync/await 完全ガイド、CancellationToken の実装パターンはCancellationToken 完全ガイド、Channels はQueueとStack完全ガイドを参照してください。

