Agent Skills by ALSEL
汎用EC・マーケティング⭐ リポ 6品質スコア 73/100

social-media-api-integration

ソーシャルメディアプラットフォームのAPIと連携し、投稿の自動化、スケジューリング、分析データの取得、コンテンツシンジケーションに対応します。OAuth フロー、レート制限、複数プラットフォーム対応の戦略をカバーしており、ソーシャルメディア API の連携、自動投稿、またはプラットフォーム API のリクエストをトリガーとして動作します。

description の原文を見る

Integrate with social media platform APIs for automated posting, scheduling, analytics retrieval, and content syndication. Covers OAuth flows, rate limiting, and multi-platform strategies. Triggers on social media API integration, automated posting, or platform API requests.

SKILL.md 本文

ソーシャルメディア API インテグレーション

ソーシャルプラットフォーム API との信頼性の高いインテグレーションを構築し、自動化されたコンテンツ配信を実現します。

プラットフォーム API の概要

プラットフォームAPI スタイル認証レート制限主な制約
BlueskyAT Protocolアプリパスワード / OAuth3000/5分分散型、オープンプロトコル
MastodonRESTOAuth 2.0300/5分/IPインスタンス固有のエンドポイント
LinkedInRESTOAuth 2.0100/日投稿厳格なコンテンツポリシー
Dev.toRESTAPI キー30/30秒記事主体のプラットフォーム
MediumRESTOAuth 2.0 + Bearer100/日インポート API のみ
RSSプルベースなしN/A読み込み専用シンジケーション

認証パターン

OAuth 2.0 フロー (LinkedIn、Mastodon)

from authlib.integrations.httpx_client import AsyncOAuth2Client

class SocialOAuth:
    def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
        self.client = AsyncOAuth2Client(
            client_id=client_id,
            client_secret=client_secret,
            redirect_uri=redirect_uri,
        )

    def get_auth_url(self, scope: str) -> str:
        url, state = self.client.create_authorization_url(
            "https://platform.example.com/oauth/authorize",
            scope=scope,
        )
        return url

    async def exchange_code(self, code: str) -> dict:
        token = await self.client.fetch_token(  # allow-secret
            "https://platform.example.com/oauth/token",
            code=code,
        )
        return token

API キー認証 (Dev.to、Bluesky)

import httpx

class DevToClient:
    def __init__(self, api_key: str):  # allow-secret
        self.client = httpx.AsyncClient(
            base_url="https://dev.to/api",
            headers={"api-key": api_key, "Accept": "application/json"},
        )

    async def create_article(self, title: str, body: str, tags: list[str], published: bool = False):
        return await self.client.post("/articles", json={
            "article": {
                "title": title,
                "body_markdown": body,
                "tags": tags,
                "published": published,
            }
        })

マルチプラットフォーム投稿

コンテンツ アダプター パターン

from dataclasses import dataclass
from abc import ABC, abstractmethod

@dataclass
class Post:
    title: str
    body: str
    url: str | None = None
    tags: list[str] | None = None
    image_url: str | None = None

class PlatformAdapter(ABC):
    @abstractmethod
    async def publish(self, post: Post) -> str:
        """Publish and return the post URL."""

    @abstractmethod
    def format_content(self, post: Post) -> dict:
        """Format post for platform constraints."""

class BlueskyAdapter(PlatformAdapter):
    async def publish(self, post: Post) -> str:
        content = self.format_content(post)
        # AT Protocol posting
        response = await self.client.post(
            "com.atproto.repo.createRecord",
            json=content,
        )
        return response["uri"]

    def format_content(self, post: Post) -> dict:
        text = post.body[:300]  # 300 char limit
        if post.url:
            text = f"{text}\n\n{post.url}"
        return {"collection": "app.bsky.feed.post", "record": {"text": text}}

class MastodonAdapter(PlatformAdapter):
    async def publish(self, post: Post) -> str:
        content = self.format_content(post)
        response = await self.client.post("/api/v1/statuses", json=content)
        return response.json()["url"]

    def format_content(self, post: Post) -> dict:
        text = post.body[:500]  # 500 char default
        if post.url:
            text = f"{text}\n\n{post.url}"
        return {"status": text, "visibility": "public"}

配信オーケストレーター

class ContentDistributor:
    def __init__(self, adapters: dict[str, PlatformAdapter]):
        self.adapters = adapters

    async def distribute(self, post: Post, platforms: list[str] | None = None) -> dict[str, str]:
        targets = platforms or list(self.adapters.keys())
        results = {}
        for platform in targets:
            adapter = self.adapters[platform]
            try:
                url = await adapter.publish(post)
                results[platform] = url
            except Exception as e:
                results[platform] = f"ERROR: {e}"
        return results

レート制限

クライアント側のレート制限器

import asyncio
import time

class RateLimiter:
    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests: list[float] = []
        self.lock = asyncio.Lock()

    async def acquire(self):
        async with self.lock:
            now = time.time()
            self.requests = [t for t in self.requests if now - t < self.window]
            if len(self.requests) >= self.max_requests:
                wait_time = self.requests[0] + self.window - now
                await asyncio.sleep(wait_time)
            self.requests.append(time.time())

レート制限時のリトライ

async def api_call_with_retry(func, *args, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            return await func(*args)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = int(e.response.headers.get("Retry-After", 60))
                await asyncio.sleep(retry_after)
            else:
                raise
    raise Exception("Max retries exceeded")

スケジューリング

from datetime import datetime, timedelta

class PostScheduler:
    def __init__(self, distributor: ContentDistributor):
        self.distributor = distributor
        self.queue: list[tuple[datetime, Post, list[str]]] = []

    def schedule(self, post: Post, platforms: list[str], publish_at: datetime):
        self.queue.append((publish_at, post, platforms))
        self.queue.sort(key=lambda x: x[0])

    async def run(self):
        while self.queue:
            publish_at, post, platforms = self.queue[0]
            now = datetime.now()
            if now >= publish_at:
                self.queue.pop(0)
                await self.distributor.distribute(post, platforms)
            else:
                await asyncio.sleep((publish_at - now).total_seconds())

アナリティクス取得

@dataclass
class PostMetrics:
    views: int
    likes: int
    shares: int
    comments: int
    clicks: int

async def aggregate_metrics(post_urls: dict[str, str]) -> dict[str, PostMetrics]:
    metrics = {}
    for platform, url in post_urls.items():
        adapter = adapters[platform]
        metrics[platform] = await adapter.get_metrics(url)
    return metrics

アンチパターン

  • 全プラットフォームに同一内容を投稿する — プラットフォームごとにフォーマットとトーンを調整してください
  • レート制限を無視する — 必ずクライアント側のレート制限を実装してください
  • トークンをコードに保存する — 環境変数またはシークレット管理サービスを使用してください
  • 投稿失敗に対するエラーハンドリングがない — リトライ用キューに保存し、失敗をログに記録してください
  • 同期的なマルチプラットフォーム投稿 — 非同期・並列投稿と個別のエラーハンドリングを使用してください
  • プラットフォーム URL のハードコード — インスタンス URL は変動します (Mastodon など)。設定ファイルで管理してください

ライセンス: Apache-2.0(寛容ライセンスのため全文を引用しています) · 原本リポジトリ

詳細情報

作者
a-organvm
リポジトリ
a-organvm/a-i--skills
ライセンス
Apache-2.0
最終更新
2026/5/10

Source: https://github.com/a-organvm/a-i--skills / ライセンス: Apache-2.0

本サイトは GitHub 上で公開されているオープンソースの SKILL.md ファイルをクロール・インデックス化したものです。 各スキルの著作権は原作者に帰属します。掲載に問題がある場合は info@alsel.co.jp または /takedown フォームよりご連絡ください。
原作者: a-organvm · a-organvm/a-i--skills · ライセンス: Apache-2.0