From bbd92a840af936b70110bf8814a6ecd7f1d0d739 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Mon, 18 May 2026 20:24:59 +0900 Subject: [PATCH] =?UTF-8?q?feat(worker-pool):=20Registry-1A=20scaffold=20?= =?UTF-8?q?=E2=80=94=20worker=5Fcapabilities/heartbeats=20+=20/internal/wo?= =?UTF-8?q?rker/*=205=20endpoint=20503=20stub?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-Worker-Pool-Registry-1A (scaffold only, no runtime activation). 신규: - migrations/270~274 (1 statement/1 file 강제): worker_capabilities + 2 idx + worker_heartbeats + 1 idx - app/models/worker_pool.py: WorkerCapability + WorkerHeartbeat ORM (queue.py 패턴) - app/api/internal_worker.py: 5 endpoint 모두 _stub_503() — register/heartbeat/claim/result/drain - tests/test_internal_worker_stub.py: 503 응답 smoke (inline ASGI client, DB 의존 0) 수정: - app/main.py: import + include_router 각 1줄 (prefix=/internal/worker, internal_study 일관) scaffold-first + phase-gate-material-first 강제 (worker-pool-policy §1, §12): - 인증 dependency 0 (1B 에서 JWT + require_worker_user) - ProcessingQueue 변경 0 (방향 b: worker_jobs 별 table = 1B) - LLM 호출 0 / canonical DB 변경 0 / 운영 자동 분기 0 회귀 0 (1주 안전망 = app/main.py.pre-registry-1a.20260518). plan: ~/.claude/plans/floofy-exploring-mitten.md Co-Authored-By: Claude Opus 4.7 (1M context) --- app/api/internal_worker.py | 49 +++++++++++++++++ app/main.py | 2 + app/models/worker_pool.py | 52 +++++++++++++++++++ migrations/270_worker_capabilities_table.sql | 19 +++++++ .../271_worker_capabilities_idx_tier.sql | 4 ++ .../272_worker_capabilities_idx_class.sql | 4 ++ migrations/273_worker_heartbeats_table.sql | 17 ++++++ migrations/274_worker_heartbeats_idx.sql | 4 ++ tests/test_internal_worker_stub.py | 32 ++++++++++++ 9 files changed, 183 insertions(+) create mode 100644 app/api/internal_worker.py create mode 100644 app/models/worker_pool.py create mode 100644 migrations/270_worker_capabilities_table.sql create mode 100644 migrations/271_worker_capabilities_idx_tier.sql create mode 100644 migrations/272_worker_capabilities_idx_class.sql create mode 100644 migrations/273_worker_heartbeats_table.sql create mode 100644 migrations/274_worker_heartbeats_idx.sql create mode 100644 tests/test_internal_worker_stub.py diff --git a/app/api/internal_worker.py b/app/api/internal_worker.py new file mode 100644 index 0000000..ed5b6dd --- /dev/null +++ b/app/api/internal_worker.py @@ -0,0 +1,49 @@ +"""PR-Worker-Pool-Registry-1A scaffold: /internal/worker/* 라우트군 503 stub. + +worker-pool-policy §8 의 5개 라우트 (register/heartbeat/claim/result/drain) 자리잡기. +실 동작 = PR-Worker-Pool-Registry-1B (laptop-worker-bot user + worker_jobs table + recap). + +1A 시점에는: + - 인증 dependency 없음 (503 first response 라 attack surface 0) + - Pydantic schema 없음 (1B 활성화 시 추가) + - 모든 endpoint = HTTP 503 + detail +""" + +from fastapi import APIRouter, HTTPException, status + +router = APIRouter() + + +def _stub_503(endpoint: str) -> None: + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail=( + f"/internal/worker/{endpoint} disabled " + "(Registry-1A stub; activates in Registry-1B)" + ), + ) + + +@router.post("/register") +async def register(): + _stub_503("register") + + +@router.post("/heartbeat") +async def heartbeat(): + _stub_503("heartbeat") + + +@router.post("/claim") +async def claim(): + _stub_503("claim") + + +@router.post("/result") +async def result(): + _stub_503("result") + + +@router.post("/drain") +async def drain(): + _stub_503("drain") diff --git a/app/main.py b/app/main.py index 499fdbc..b9c1e34 100644 --- a/app/main.py +++ b/app/main.py @@ -8,6 +8,7 @@ from sqlalchemy import func, select, text from api.audio import router as audio_router from api.internal_study import router as internal_study_router +from api.internal_worker import router as internal_worker_router from api.auth import router as auth_router from api.briefing import router as briefing_router from api.config import router as config_router @@ -145,6 +146,7 @@ app.include_router(digest_router, prefix="/api/digest", tags=["digest"]) app.include_router(briefing_router, prefix="/api/briefing", tags=["briefing"]) app.include_router(audio_router, prefix="/api/audio", tags=["audio"]) app.include_router(internal_study_router, prefix="/internal/study", tags=["internal-study"]) +app.include_router(internal_worker_router, prefix="/internal/worker", tags=["internal-worker"]) app.include_router(video_router, prefix="/api/video", tags=["video"]) app.include_router(study_sessions_router, prefix="/api/study-sessions", tags=["study-sessions"]) app.include_router(study_topics_router, prefix="/api/study-topics", tags=["study-topics"]) diff --git a/app/models/worker_pool.py b/app/models/worker_pool.py new file mode 100644 index 0000000..6da6f77 --- /dev/null +++ b/app/models/worker_pool.py @@ -0,0 +1,52 @@ +"""worker_capabilities + worker_heartbeats 테이블 ORM (PR-Worker-Pool-Registry-1A). + +1A 단계: schema only. 라우트 5개 (register/heartbeat/claim/result/drain) 모두 503 stub. +실 활성화 + WorkerJob 모델은 1B 영역. 본 모듈 import 자체는 init_db 가 mig 270~274 적용 +후 안전 (테이블 존재 보장). +""" + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, ForeignKey, Text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class WorkerCapability(Base): + __tablename__ = "worker_capabilities" + + worker_id: Mapped[str] = mapped_column(Text, primary_key=True) + user_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("users.id"), nullable=False + ) + device_label: Mapped[str] = mapped_column(Text, nullable=False) + worker_class: Mapped[str] = mapped_column(Text, nullable=False) + tier: Mapped[str] = mapped_column(Text, nullable=False) + capabilities: Mapped[list] = mapped_column(JSONB, default=list, nullable=False) + models_loaded: Mapped[list] = mapped_column(JSONB, default=list, nullable=False) + endpoint: Mapped[str | None] = mapped_column(Text) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) + last_registered_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) + + +class WorkerHeartbeat(Base): + __tablename__ = "worker_heartbeats" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + worker_id: Mapped[str] = mapped_column( + Text, ForeignKey("worker_capabilities.worker_id"), nullable=False + ) + heartbeat_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) + status: Mapped[str] = mapped_column(Text, nullable=False) + current_job_id: Mapped[int | None] = mapped_column(BigInteger) + battery: Mapped[str | None] = mapped_column(Text) + thermal: Mapped[str | None] = mapped_column(Text) + raw_payload: Mapped[dict] = mapped_column(JSONB, default=dict, nullable=False) diff --git a/migrations/270_worker_capabilities_table.sql b/migrations/270_worker_capabilities_table.sql new file mode 100644 index 0000000..d70dd32 --- /dev/null +++ b/migrations/270_worker_capabilities_table.sql @@ -0,0 +1,19 @@ +-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_capabilities 테이블 신규. +-- 노트북 등 worker 가 register 시 advertise 한 device/tier/capability 1 row UPSERT. +-- worker-pool-policy §1 invariant: 노트북 = optional session worker (24/7 X, extraction backbone X). +-- 1A 단계: schema only (라우트 5개 모두 503 stub). register 활성화 = 1B (laptop-worker-bot user). +-- enum 컬럼 (worker_class, tier) 도 TEXT — policy §3,§5 가 capability bundle 을 docs 결정으로 못박음. +-- user_id NOT NULL: 1A 에서는 503 stub 이라 runtime INSERT 없음. 첫 INSERT = 1B JWT 활성화와 동시점. + +CREATE TABLE IF NOT EXISTS worker_capabilities ( + worker_id TEXT PRIMARY KEY, + user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE RESTRICT, + device_label TEXT NOT NULL, + worker_class TEXT NOT NULL, + tier TEXT NOT NULL, + capabilities JSONB NOT NULL DEFAULT '[]'::jsonb, + models_loaded JSONB NOT NULL DEFAULT '[]'::jsonb, + endpoint TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + last_registered_at TIMESTAMPTZ NOT NULL DEFAULT now() +); diff --git a/migrations/271_worker_capabilities_idx_tier.sql b/migrations/271_worker_capabilities_idx_tier.sql new file mode 100644 index 0000000..ef9388b --- /dev/null +++ b/migrations/271_worker_capabilities_idx_tier.sql @@ -0,0 +1,4 @@ +-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_capabilities tier 인덱스. +-- claim 쿼리 (1B) 가 capability + tier 로 필터링 — 인덱스로 가속. +CREATE INDEX IF NOT EXISTS idx_worker_capabilities_tier + ON worker_capabilities (tier); diff --git a/migrations/272_worker_capabilities_idx_class.sql b/migrations/272_worker_capabilities_idx_class.sql new file mode 100644 index 0000000..b79ed2a --- /dev/null +++ b/migrations/272_worker_capabilities_idx_class.sql @@ -0,0 +1,4 @@ +-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_capabilities worker_class 인덱스. +-- worker_class ('laptop'/'macmini'/'cloud') 로 필터링 시 가속. +CREATE INDEX IF NOT EXISTS idx_worker_capabilities_class + ON worker_capabilities (worker_class); diff --git a/migrations/273_worker_heartbeats_table.sql b/migrations/273_worker_heartbeats_table.sql new file mode 100644 index 0000000..7eb7d0b --- /dev/null +++ b/migrations/273_worker_heartbeats_table.sql @@ -0,0 +1,17 @@ +-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_heartbeats 테이블 신규. +-- worker 가 30~60s 주기로 status 발신 (worker-pool-policy §7). +-- append-only. 운영 alive 판정 = (worker_id 별 최신 row).heartbeat_at > now() - interval '10 min'. +-- retention 정책은 별 PR (PR-LLM-Router-Observability-1 또는 동등). +-- 1A 단계: schema only (heartbeat endpoint 503 stub). 활성화 = 1B. +-- current_job_id 는 1A 단계에서 plain BIGINT (FK 없음). 1B 의 worker_jobs 마이그레이션 후 FK 추가 검토. + +CREATE TABLE IF NOT EXISTS worker_heartbeats ( + id BIGSERIAL PRIMARY KEY, + worker_id TEXT NOT NULL REFERENCES worker_capabilities(worker_id) ON DELETE CASCADE, + heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT now(), + status TEXT NOT NULL, + current_job_id BIGINT, + battery TEXT, + thermal TEXT, + raw_payload JSONB NOT NULL DEFAULT '{}'::jsonb +); diff --git a/migrations/274_worker_heartbeats_idx.sql b/migrations/274_worker_heartbeats_idx.sql new file mode 100644 index 0000000..76ddd34 --- /dev/null +++ b/migrations/274_worker_heartbeats_idx.sql @@ -0,0 +1,4 @@ +-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_heartbeats latest-row 조회 인덱스. +-- alive 판정 SQL: SELECT ... ORDER BY heartbeat_at DESC LIMIT 1 per worker_id. +CREATE INDEX IF NOT EXISTS idx_worker_heartbeats_worker_at + ON worker_heartbeats (worker_id, heartbeat_at DESC); diff --git a/tests/test_internal_worker_stub.py b/tests/test_internal_worker_stub.py new file mode 100644 index 0000000..a4f3633 --- /dev/null +++ b/tests/test_internal_worker_stub.py @@ -0,0 +1,32 @@ +"""PR-Worker-Pool-Registry-1A: /internal/worker/* 5 endpoint 503 stub 검증. + +conftest.py 의 async_client/db_session fixture 가 Phase 0 TODO 상태 (line 13) +이므로 본 모듈은 inline ASGI client 만 사용. DB schema (worker_capabilities, +worker_heartbeats) 존재 검증은 deploy 후 curl + psql 로 별도 검증 (plan §A.9). +1B 활성화 시 conftest fixture 도 함께 활성화 검토. +""" + +import pytest +from httpx import ASGITransport, AsyncClient + + +@pytest.fixture(scope="module") +def anyio_backend(): + return "asyncio" + + +@pytest.mark.asyncio +async def test_all_endpoints_return_503(): + """5 endpoint (register/heartbeat/claim/result/drain) 모두 503 + detail 패턴 확인.""" + from main import app # PYTHONPATH=app 환경에서 import + + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + ) as ac: + for ep in ("register", "heartbeat", "claim", "result", "drain"): + r = await ac.post(f"/internal/worker/{ep}") + assert r.status_code == 503, f"{ep}: expected 503, got {r.status_code}" + assert "Registry-1A stub" in r.json()["detail"], ( + f"{ep}: detail mismatch: {r.json()}" + )