feat(worker-pool): Registry-1A scaffold — worker_capabilities/heartbeats + /internal/worker/* 5 endpoint 503 stub

PR-Worker-Pool-Registry-1A (scaffold only, no runtime activation).

신규:
- migrations/270~274 (1 statement/1 file 강제): worker_capabilities + 2 idx + worker_heartbeats + 1 idx
- app/models/worker_pool.py: WorkerCapability + WorkerHeartbeat ORM (queue.py 패턴)
- app/api/internal_worker.py: 5 endpoint 모두 _stub_503() — register/heartbeat/claim/result/drain
- tests/test_internal_worker_stub.py: 503 응답 smoke (inline ASGI client, DB 의존 0)

수정:
- app/main.py: import + include_router 각 1줄 (prefix=/internal/worker, internal_study 일관)

scaffold-first + phase-gate-material-first 강제 (worker-pool-policy §1, §12):
- 인증 dependency 0 (1B 에서 JWT + require_worker_user)
- ProcessingQueue 변경 0 (방향 b: worker_jobs 별 table = 1B)
- LLM 호출 0 / canonical DB 변경 0 / 운영 자동 분기 0

회귀 0 (1주 안전망 = app/main.py.pre-registry-1a.20260518).

plan: ~/.claude/plans/floofy-exploring-mitten.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-05-18 20:24:59 +09:00
parent 406b810e28
commit bbd92a840a
9 changed files with 183 additions and 0 deletions
+49
View File
@@ -0,0 +1,49 @@
"""PR-Worker-Pool-Registry-1A scaffold: /internal/worker/* 라우트군 503 stub.
worker-pool-policy §8 의 5개 라우트 (register/heartbeat/claim/result/drain) 자리잡기.
실 동작 = PR-Worker-Pool-Registry-1B (laptop-worker-bot user + worker_jobs table + recap).
1A 시점에는:
- 인증 dependency 없음 (503 first response 라 attack surface 0)
- Pydantic schema 없음 (1B 활성화 시 추가)
- 모든 endpoint = HTTP 503 + detail
"""
from fastapi import APIRouter, HTTPException, status
router = APIRouter()
def _stub_503(endpoint: str) -> None:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=(
f"/internal/worker/{endpoint} disabled "
"(Registry-1A stub; activates in Registry-1B)"
),
)
@router.post("/register")
async def register():
_stub_503("register")
@router.post("/heartbeat")
async def heartbeat():
_stub_503("heartbeat")
@router.post("/claim")
async def claim():
_stub_503("claim")
@router.post("/result")
async def result():
_stub_503("result")
@router.post("/drain")
async def drain():
_stub_503("drain")
+2
View File
@@ -8,6 +8,7 @@ from sqlalchemy import func, select, text
from api.audio import router as audio_router
from api.internal_study import router as internal_study_router
from api.internal_worker import router as internal_worker_router
from api.auth import router as auth_router
from api.briefing import router as briefing_router
from api.config import router as config_router
@@ -145,6 +146,7 @@ app.include_router(digest_router, prefix="/api/digest", tags=["digest"])
app.include_router(briefing_router, prefix="/api/briefing", tags=["briefing"])
app.include_router(audio_router, prefix="/api/audio", tags=["audio"])
app.include_router(internal_study_router, prefix="/internal/study", tags=["internal-study"])
app.include_router(internal_worker_router, prefix="/internal/worker", tags=["internal-worker"])
app.include_router(video_router, prefix="/api/video", tags=["video"])
app.include_router(study_sessions_router, prefix="/api/study-sessions", tags=["study-sessions"])
app.include_router(study_topics_router, prefix="/api/study-topics", tags=["study-topics"])
+52
View File
@@ -0,0 +1,52 @@
"""worker_capabilities + worker_heartbeats 테이블 ORM (PR-Worker-Pool-Registry-1A).
1A 단계: schema only. 라우트 5개 (register/heartbeat/claim/result/drain) 모두 503 stub.
실 활성화 + WorkerJob 모델은 1B 영역. 본 모듈 import 자체는 init_db 가 mig 270~274 적용
후 안전 (테이블 존재 보장).
"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, ForeignKey, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class WorkerCapability(Base):
__tablename__ = "worker_capabilities"
worker_id: Mapped[str] = mapped_column(Text, primary_key=True)
user_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("users.id"), nullable=False
)
device_label: Mapped[str] = mapped_column(Text, nullable=False)
worker_class: Mapped[str] = mapped_column(Text, nullable=False)
tier: Mapped[str] = mapped_column(Text, nullable=False)
capabilities: Mapped[list] = mapped_column(JSONB, default=list, nullable=False)
models_loaded: Mapped[list] = mapped_column(JSONB, default=list, nullable=False)
endpoint: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
last_registered_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
class WorkerHeartbeat(Base):
__tablename__ = "worker_heartbeats"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
worker_id: Mapped[str] = mapped_column(
Text, ForeignKey("worker_capabilities.worker_id"), nullable=False
)
heartbeat_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
status: Mapped[str] = mapped_column(Text, nullable=False)
current_job_id: Mapped[int | None] = mapped_column(BigInteger)
battery: Mapped[str | None] = mapped_column(Text)
thermal: Mapped[str | None] = mapped_column(Text)
raw_payload: Mapped[dict] = mapped_column(JSONB, default=dict, nullable=False)
@@ -0,0 +1,19 @@
-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_capabilities 테이블 신규.
-- 노트북 등 worker 가 register 시 advertise 한 device/tier/capability 1 row UPSERT.
-- worker-pool-policy §1 invariant: 노트북 = optional session worker (24/7 X, extraction backbone X).
-- 1A 단계: schema only (라우트 5개 모두 503 stub). register 활성화 = 1B (laptop-worker-bot user).
-- enum 컬럼 (worker_class, tier) 도 TEXT — policy §3,§5 가 capability bundle 을 docs 결정으로 못박음.
-- user_id NOT NULL: 1A 에서는 503 stub 이라 runtime INSERT 없음. 첫 INSERT = 1B JWT 활성화와 동시점.
CREATE TABLE IF NOT EXISTS worker_capabilities (
worker_id TEXT PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE RESTRICT,
device_label TEXT NOT NULL,
worker_class TEXT NOT NULL,
tier TEXT NOT NULL,
capabilities JSONB NOT NULL DEFAULT '[]'::jsonb,
models_loaded JSONB NOT NULL DEFAULT '[]'::jsonb,
endpoint TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
last_registered_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -0,0 +1,4 @@
-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_capabilities tier 인덱스.
-- claim 쿼리 (1B) 가 capability + tier 로 필터링 — 인덱스로 가속.
CREATE INDEX IF NOT EXISTS idx_worker_capabilities_tier
ON worker_capabilities (tier);
@@ -0,0 +1,4 @@
-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_capabilities worker_class 인덱스.
-- worker_class ('laptop'/'macmini'/'cloud') 로 필터링 시 가속.
CREATE INDEX IF NOT EXISTS idx_worker_capabilities_class
ON worker_capabilities (worker_class);
@@ -0,0 +1,17 @@
-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_heartbeats 테이블 신규.
-- worker 가 30~60s 주기로 status 발신 (worker-pool-policy §7).
-- append-only. 운영 alive 판정 = (worker_id 별 최신 row).heartbeat_at > now() - interval '10 min'.
-- retention 정책은 별 PR (PR-LLM-Router-Observability-1 또는 동등).
-- 1A 단계: schema only (heartbeat endpoint 503 stub). 활성화 = 1B.
-- current_job_id 는 1A 단계에서 plain BIGINT (FK 없음). 1B 의 worker_jobs 마이그레이션 후 FK 추가 검토.
CREATE TABLE IF NOT EXISTS worker_heartbeats (
id BIGSERIAL PRIMARY KEY,
worker_id TEXT NOT NULL REFERENCES worker_capabilities(worker_id) ON DELETE CASCADE,
heartbeat_at TIMESTAMPTZ NOT NULL DEFAULT now(),
status TEXT NOT NULL,
current_job_id BIGINT,
battery TEXT,
thermal TEXT,
raw_payload JSONB NOT NULL DEFAULT '{}'::jsonb
);
+4
View File
@@ -0,0 +1,4 @@
-- 2026-05-18 PR-Worker-Pool-Registry-1A: worker_heartbeats latest-row 조회 인덱스.
-- alive 판정 SQL: SELECT ... ORDER BY heartbeat_at DESC LIMIT 1 per worker_id.
CREATE INDEX IF NOT EXISTS idx_worker_heartbeats_worker_at
ON worker_heartbeats (worker_id, heartbeat_at DESC);
+32
View File
@@ -0,0 +1,32 @@
"""PR-Worker-Pool-Registry-1A: /internal/worker/* 5 endpoint 503 stub 검증.
conftest.py 의 async_client/db_session fixture 가 Phase 0 TODO 상태 (line 13)
이므로 본 모듈은 inline ASGI client 만 사용. DB schema (worker_capabilities,
worker_heartbeats) 존재 검증은 deploy 후 curl + psql 로 별도 검증 (plan §A.9).
1B 활성화 시 conftest fixture 도 함께 활성화 검토.
"""
import pytest
from httpx import ASGITransport, AsyncClient
@pytest.fixture(scope="module")
def anyio_backend():
return "asyncio"
@pytest.mark.asyncio
async def test_all_endpoints_return_503():
"""5 endpoint (register/heartbeat/claim/result/drain) 모두 503 + detail 패턴 확인."""
from main import app # PYTHONPATH=app 환경에서 import
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as ac:
for ep in ("register", "heartbeat", "claim", "result", "drain"):
r = await ac.post(f"/internal/worker/{ep}")
assert r.status_code == 503, f"{ep}: expected 503, got {r.status_code}"
assert "Registry-1A stub" in r.json()["detail"], (
f"{ep}: detail mismatch: {r.json()}"
)