"""PR-Worker-Pool-Registry-1B — /internal/worker/* 5 endpoint 정상 동작. 6 항목 (claim 두 경로 분리): 1. /register UPSERT — 같은 worker_id 두번 호출 시 row 1 + device_label 갱신. 2. /heartbeat — row INSERT + status='available' 저장. 3. /claim — queue empty 시 204 + body 0 (정정 #4). 4. /claim — pending row 1건 + 200 + status='processing' 전이. 5. /result completed — status='completed' + result JSONB 저장. 6. /drain — heartbeat row status='draining' INSERT. """ from __future__ import annotations import os import sys import uuid import pytest import pytest_asyncio sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from httpx import ASGITransport, AsyncClient from _worker_pool_helpers import ( cleanup_worker_capabilities, cleanup_worker_jobs, count_worker_capability, ensure_user, fetch_latest_heartbeat, fetch_worker_capability, fetch_worker_job, insert_worker_capability, insert_worker_job, mint_access_token, ) @pytest_asyncio.fixture async def env_setup(monkeypatch): monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") @pytest_asyncio.fixture async def worker_token(env_setup): await ensure_user("laptop-worker-bot") return mint_access_token("laptop-worker-bot") @pytest_asyncio.fixture async def owner_id(): return await ensure_user("test-owner-ep-1b") @pytest_asyncio.fixture async def worker_id_with_capability(owner_id): wid = f"test-ep-1b-{uuid.uuid4().hex[:8]}" await insert_worker_capability(wid, owner_id) yield wid await cleanup_worker_jobs("test-ep-1b") await cleanup_worker_capabilities("test-ep-1b") @pytest_asyncio.fixture async def client(): from main import app async with AsyncClient( transport=ASGITransport(app=app), base_url="http://test" ) as ac: yield ac def _auth(token: str) -> dict: return {"Authorization": f"Bearer {token}"} @pytest.mark.asyncio async def test_register_upsert(client, worker_token): wid = f"test-ep-1b-reg-{uuid.uuid4().hex[:6]}" try: body = { "worker_id": wid, "device_label": "MacBook Pro M3 Pro", "worker_class": "laptop", "tier": "laptop_small", "capabilities": ["short_summary"], "models_loaded": ["gemma-3-4b-it"], } r1 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token)) assert r1.status_code == 200, r1.text body["device_label"] = "MacBook Pro M3 Pro (renamed)" r2 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token)) assert r2.status_code == 200 assert await count_worker_capability(wid) == 1 cap = await fetch_worker_capability(wid) assert cap is not None assert cap["device_label"] == "MacBook Pro M3 Pro (renamed)" finally: await cleanup_worker_capabilities(wid) @pytest.mark.asyncio async def test_heartbeat_insert(client, worker_token, worker_id_with_capability): wid = worker_id_with_capability r = await client.post( "/internal/worker/heartbeat", json={"worker_id": wid, "status": "available"}, headers=_auth(worker_token), ) assert r.status_code == 200, r.text hb = await fetch_latest_heartbeat(wid) assert hb is not None assert hb["status"] == "available" @pytest.mark.asyncio async def test_claim_204_when_empty(client, worker_token): r = await client.post( "/internal/worker/claim", json={"worker_id": "test-ep-1b-empty", "job_type": "test-ep-1b-NOEXIST"}, headers=_auth(worker_token), ) assert r.status_code == 204 assert r.content == b"" # 정정 #4 @pytest.mark.asyncio async def test_claim_processing_transition( client, worker_token, worker_id_with_capability, owner_id ): wid = worker_id_with_capability jt = "test-ep-1b-claim" job_id = await insert_worker_job(owner_id, jt, payload='{"week":"2026-W20"}') r = await client.post( "/internal/worker/claim", json={"worker_id": wid, "job_type": jt}, headers=_auth(worker_token), ) assert r.status_code == 200, r.text js = r.json() assert js["id"] == job_id assert js["job_type"] == jt assert js["attempts"] == 1 assert js["payload"]["week"] == "2026-W20" job = await fetch_worker_job(job_id) assert job["status"] == "processing" assert job["worker_id"] == wid assert job["attempts"] == 1 @pytest.mark.asyncio async def test_result_completed(client, worker_token, worker_id_with_capability, owner_id): wid = worker_id_with_capability job_id = await insert_worker_job( owner_id, "test-ep-1b-result", status="processing", worker_id=wid, attempts=1, claimed=True, ) r = await client.post( "/internal/worker/result", json={ "job_id": job_id, "worker_id": wid, "status": "completed", "result": {"summary": "test"}, }, headers=_auth(worker_token), ) assert r.status_code == 200, r.text job = await fetch_worker_job(job_id) assert job["status"] == "completed" assert job["result"] == {"summary": "test"} @pytest.mark.asyncio async def test_drain_heartbeat_insert(client, worker_token, worker_id_with_capability): wid = worker_id_with_capability r = await client.post( "/internal/worker/drain", json={"worker_id": wid, "reason": "sleep"}, headers=_auth(worker_token), ) assert r.status_code == 200 hb = await fetch_latest_heartbeat(wid) assert hb is not None assert hb["status"] == "draining" assert hb["raw_payload"] == {"reason": "sleep"}