python-background-jobs
タスクキュー、ワーカー、イベント駆動アーキテクチャなど、Pythonのバックグラウンドジョブパターンを扱うスキル。非同期タスク処理やジョブキューの実装、長時間処理の管理、リクエスト/レスポンスサイクルからの処理分離が必要な際に活用できます。
description の原文を見る
Python background job patterns including task queues, workers, and event-driven architecture. Use when implementing async task processing, job queues, long-running operations, or decoupling work from request/response cycles.
SKILL.md 本文
Python Background Jobs & Task Queues
リクエスト/レスポンスのサイクルから長時間実行または不安定な処理を分離します。ユーザーにはすぐに返却し、バックグラウンドワーカーが非同期で重い処理を行います。
このスキルを使用する場合
- 数秒以上かかる処理の実行
- メール、通知、webhook の送信
- レポート生成またはデータ書き出し
- アップロードやメディア変換の処理
- 信頼性の低い外部サービスとの統合
- イベント駆動アーキテクチャの構築
コア概念
1. タスクキューパターン
API はリクエストを受け取り、ジョブをキューに入れ、ジョブID で即座に返却します。ワーカーはジョブを非同期で処理します。
2. べき等性
タスクは失敗時に再試行される場合があります。安全に再実行できるように設計します。
3. ジョブ状態マシン
ジョブは以下の状態を遷移します:pending → running → succeeded/failed
4. 最低1回の配信保証
ほとんどのキューは最低1回の配信を保証します。コードは重複を処理する必要があります。
クイックスタート
このスキルは広く採用されているタスクキューの Celery を例として使用します。RQ、Dramatiq、クラウドネイティブソリューション (AWS SQS、GCP Tasks) など、同等の選択肢もあります。
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
@app.task
def send_email(to: str, subject: str, body: str) -> None:
# This runs in a background worker
email_client.send(to, subject, body)
# In your API handler
send_email.delay("user@example.com", "Welcome!", "Thanks for signing up")
基本パターン
パターン1:ジョブIDを即座に返す
数秒を超える操作の場合、ジョブID を返して非同期に処理します。
from uuid import uuid4
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
class JobStatus(Enum):
PENDING = "pending"
RUNNING = "running"
SUCCEEDED = "succeeded"
FAILED = "failed"
@dataclass
class Job:
id: str
status: JobStatus
created_at: datetime
started_at: datetime | None = None
completed_at: datetime | None = None
result: dict | None = None
error: str | None = None
# API endpoint
async def start_export(request: ExportRequest) -> JobResponse:
"""Start export job and return job ID."""
job_id = str(uuid4())
# Persist job record
await jobs_repo.create(Job(
id=job_id,
status=JobStatus.PENDING,
created_at=datetime.utcnow(),
))
# Enqueue task for background processing
await task_queue.enqueue(
"export_data",
job_id=job_id,
params=request.model_dump(),
)
# Return immediately with job ID
return JobResponse(
job_id=job_id,
status="pending",
poll_url=f"/jobs/{job_id}",
)
パターン2:Celery タスク設定
適切なリトライおよびタイムアウト設定で Celery タスクを設定します。
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379")
# Global configuration
app.conf.update(
task_time_limit=3600, # Hard limit: 1 hour
task_soft_time_limit=3000, # Soft limit: 50 minutes
task_acks_late=True, # Acknowledge after completion
task_reject_on_worker_lost=True,
worker_prefetch_multiplier=1, # Don't prefetch too many tasks
)
@app.task(
bind=True,
max_retries=3,
default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError),
)
def process_payment(self, payment_id: str) -> dict:
"""Process payment with automatic retry on transient errors."""
try:
result = payment_gateway.charge(payment_id)
return {"status": "success", "transaction_id": result.id}
except PaymentDeclinedError as e:
# Don't retry permanent failures
return {"status": "declined", "reason": str(e)}
except TransientError as e:
# Retry with exponential backoff
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
パターン3:タスクをべき等にする
ワーカーはクラッシュやタイムアウト時に再試行できます。安全に再実行できるように設計します。
@app.task(bind=True)
def process_order(self, order_id: str) -> None:
"""Process order idempotently."""
order = orders_repo.get(order_id)
# Already processed? Return early
if order.status == OrderStatus.COMPLETED:
logger.info("Order already processed", order_id=order_id)
return
# Already in progress? Check if we should continue
if order.status == OrderStatus.PROCESSING:
# Use idempotency key to avoid double-charging
pass
# Process with idempotency key
result = payment_provider.charge(
amount=order.total,
idempotency_key=f"order-{order_id}", # Critical!
)
orders_repo.update(order_id, status=OrderStatus.COMPLETED)
べき等性の戦略:
- 実行前チェック:アクション前に状態を検証
- べき等キー:外部サービスと共に一意のトークンを使用
- Upsert パターン:
INSERT ... ON CONFLICT UPDATE - 重複排除ウィンドウ:N 時間の間に処理されたID を追跡
パターン4:ジョブ状態管理
可視性とデバッグのためにジョブの状態遷移を保持します。
class JobRepository:
"""Repository for managing job state."""
async def create(self, job: Job) -> Job:
"""Create new job record."""
await self._db.execute(
"""INSERT INTO jobs (id, status, created_at)
VALUES ($1, $2, $3)""",
job.id, job.status.value, job.created_at,
)
return job
async def update_status(
self,
job_id: str,
status: JobStatus,
**fields,
) -> None:
"""Update job status with timestamp."""
updates = {"status": status.value, **fields}
if status == JobStatus.RUNNING:
updates["started_at"] = datetime.utcnow()
elif status in (JobStatus.SUCCEEDED, JobStatus.FAILED):
updates["completed_at"] = datetime.utcnow()
await self._db.execute(
"UPDATE jobs SET status = $1, ... WHERE id = $2",
updates, job_id,
)
logger.info(
"Job status updated",
job_id=job_id,
status=status.value,
)
高度なパターン
パターン5:デッドレターキュー
手動検査のため、永続的に失敗したタスクを処理します。
@app.task(bind=True, max_retries=3)
def process_webhook(self, webhook_id: str, payload: dict) -> None:
"""Process webhook with DLQ for failures."""
try:
result = send_webhook(payload)
if not result.success:
raise WebhookFailedError(result.error)
except Exception as e:
if self.request.retries >= self.max_retries:
# Move to dead letter queue for manual inspection
dead_letter_queue.send({
"task": "process_webhook",
"webhook_id": webhook_id,
"payload": payload,
"error": str(e),
"attempts": self.request.retries + 1,
"failed_at": datetime.utcnow().isoformat(),
})
logger.error(
"Webhook moved to DLQ after max retries",
webhook_id=webhook_id,
error=str(e),
)
return
# Exponential backoff retry
raise self.retry(exc=e, countdown=2 ** self.request.retries * 60)
パターン6:ステータスポーリングエンドポイント
クライアントがジョブステータスをチェックするためのエンドポイントを提供します。
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str) -> JobStatusResponse:
"""Get current status of a background job."""
job = await jobs_repo.get(job_id)
if job is None:
raise HTTPException(404, f"Job {job_id} not found")
return JobStatusResponse(
job_id=job.id,
status=job.status.value,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
result=job.result if job.status == JobStatus.SUCCEEDED else None,
error=job.error if job.status == JobStatus.FAILED else None,
# Helpful for clients
is_terminal=job.status in (JobStatus.SUCCEEDED, JobStatus.FAILED),
)
パターン7:タスクチェーンとワークフロー
シンプルなタスクから複雑なワークフローを構成します。
from celery import chain, group, chord
# Simple chain: A → B → C
workflow = chain(
extract_data.s(source_id),
transform_data.s(),
load_data.s(destination_id),
)
# Parallel execution: A, B, C all at once
parallel = group(
send_email.s(user_email),
send_sms.s(user_phone),
update_analytics.s(event_data),
)
# Chord: Run tasks in parallel, then a callback
# Process all items, then send completion notification
workflow = chord(
[process_item.s(item_id) for item_id in item_ids],
send_completion_notification.s(batch_id),
)
workflow.apply_async()
パターン8:別のタスクキュー
ニーズに合わせて適切なツールを選択します。
RQ (Redis Queue):シンプルで Redis ベース
from rq import Queue
from redis import Redis
queue = Queue(connection=Redis())
job = queue.enqueue(send_email, "user@example.com", "Subject", "Body")
Dramatiq:最新の Celery 代替案
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker())
@dramatiq.actor
def send_email(to: str, subject: str, body: str) -> None:
email_client.send(to, subject, body)
クラウドネイティブオプション:
- AWS SQS + Lambda
- Google Cloud Tasks
- Azure Functions
ベストプラクティスのまとめ
- 即座に返す - 長時間実行の操作でリクエストをブロックしない
- ジョブ状態を保持 - ステータスポーリングとデバッグを有効化
- タスクをべき等にする - どの失敗に対しても安全に再試行可能
- べき等キーを使用 - 外部サービス呼び出しに対して
- タイムアウトを設定 - ソフトとハード両方の制限
- DLQ を実装 - 永続的に失敗したタスクをキャプチャ
- 遷移をログ - ジョブ状態の変更を追跡
- 適切に再試行 - 一時的なエラーに対して指数バックオフ
- 永続的な失敗を再試行しない - 検証エラー、無効な認証情報
- キューの深さを監視 - バックログの増加でアラート
ライセンス: MIT(寛容ライセンスのため全文を引用しています) · 原本リポジトリ
詳細情報
- 作者
- wshobson
- リポジトリ
- wshobson/agents
- ライセンス
- MIT
- 最終更新
- 不明
Source: https://github.com/wshobson/agents / ライセンス: MIT
関連スキル
doubt-driven-development
重要な判断はすべて、本番環境への展開前に新しい視点から対抗的レビューを実施します。速度より正確性が重要な場合、不慣れなコードを扱う場合、本番環境・セキュリティに関わるロジック・取り消し不可の操作など影響度が高い場合、または後でバグを修正するよりも今検証する方が効率的な場合に活用してください。
apprun-skills
TypeScriptを使用したAppRunアプリケーションのMVU設計に関する総合的なガイダンスが得られます。コンポーネントパターン、イベントハンドリング、状態管理(非同期ジェネレータを含む)、パラメータと保護機能を備えたルーティング・ナビゲーション、vistestを使用したテストに対応しています。AppRunコンポーネントの設計・レビュー、ルートの配線、状態フローの管理、AppRunテストの作成時に活用してください。
desloppify
コードベースのヘルスチェックと技術負債の追跡ツールです。コード品質、技術負債、デッドコード、大規模ファイル、ゴッドクラス、重複関数、コードスメル、命名規則の問題、インポートサイクル、結合度の問題についてユーザーが質問した場合に使用してください。また、ヘルススコアの確認、次の改善項目の提案、クリーンアップ計画の作成をリクエストされた際にも対応します。29言語に対応しています。
debugging-and-error-recovery
テストが失敗したり、ビルドが壊れたり、動作が期待と異なったり、予期しないエラーが発生したりした場合に、体系的な根本原因デバッグをガイドします。推測ではなく、根本原因を見つけて修正するための体系的なアプローチが必要な場合に使用してください。
test-driven-development
テスト駆動開発により実装を進めます。ロジックの実装、バグの修正、動作の変更など、あらゆる場面で活用できます。コードが正常に動作することを証明する必要がある場合、バグ報告を受けた場合、既存機能を修正する予定がある場合に使用してください。
incremental-implementation
変更を段階的に実施します。複数のファイルに影響する機能や変更を実装する場合に使用してください。大量のコードを一度に書こうとしている場合や、タスクが一度では完結できないほど大きい場合に活用します。