Files
hyungi_document_server/tests/test_internal_worker_stub.py
T
Hyungi Ahn bbd92a840a feat(worker-pool): Registry-1A scaffold — worker_capabilities/heartbeats + /internal/worker/* 5 endpoint 503 stub
PR-Worker-Pool-Registry-1A (scaffold only, no runtime activation).

신규:
- migrations/270~274 (1 statement/1 file 강제): worker_capabilities + 2 idx + worker_heartbeats + 1 idx
- app/models/worker_pool.py: WorkerCapability + WorkerHeartbeat ORM (queue.py 패턴)
- app/api/internal_worker.py: 5 endpoint 모두 _stub_503() — register/heartbeat/claim/result/drain
- tests/test_internal_worker_stub.py: 503 응답 smoke (inline ASGI client, DB 의존 0)

수정:
- app/main.py: import + include_router 각 1줄 (prefix=/internal/worker, internal_study 일관)

scaffold-first + phase-gate-material-first 강제 (worker-pool-policy §1, §12):
- 인증 dependency 0 (1B 에서 JWT + require_worker_user)
- ProcessingQueue 변경 0 (방향 b: worker_jobs 별 table = 1B)
- LLM 호출 0 / canonical DB 변경 0 / 운영 자동 분기 0

회귀 0 (1주 안전망 = app/main.py.pre-registry-1a.20260518).

plan: ~/.claude/plans/floofy-exploring-mitten.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-18 20:24:59 +09:00

33 lines
1.2 KiB
Python

"""PR-Worker-Pool-Registry-1A: /internal/worker/* 5 endpoint 503 stub 검증.
conftest.py 의 async_client/db_session fixture 가 Phase 0 TODO 상태 (line 13)
이므로 본 모듈은 inline ASGI client 만 사용. DB schema (worker_capabilities,
worker_heartbeats) 존재 검증은 deploy 후 curl + psql 로 별도 검증 (plan §A.9).
1B 활성화 시 conftest fixture 도 함께 활성화 검토.
"""
import pytest
from httpx import ASGITransport, AsyncClient
@pytest.fixture(scope="module")
def anyio_backend():
return "asyncio"
@pytest.mark.asyncio
async def test_all_endpoints_return_503():
"""5 endpoint (register/heartbeat/claim/result/drain) 모두 503 + detail 패턴 확인."""
from main import app # PYTHONPATH=app 환경에서 import
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as ac:
for ep in ("register", "heartbeat", "claim", "result", "drain"):
r = await ac.post(f"/internal/worker/{ep}")
assert r.status_code == 503, f"{ep}: expected 503, got {r.status_code}"
assert "Registry-1A stub" in r.json()["detail"], (
f"{ep}: detail mismatch: {r.json()}"
)