0cbd97fcba
각 helper 가 자체 engine + NullPool 사용 (connection 격리). fixture chain 의 asyncpg "another operation in progress" race 회피. 호출 site 단순화. 같은 파일 sequential 실행 시 module-level app + global engine pool 충돌은 별 follow-up `PR-Worker-Pool-Test-Fixture-Isolation` (P3) 영역. 단독 PASS 검증: auth 5/5 + smoke 3/3 + ownership 1/1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
194 lines
5.7 KiB
Python
194 lines
5.7 KiB
Python
"""PR-Worker-Pool-Registry-1B — /internal/worker/* 5 endpoint 정상 동작.
|
|
|
|
6 항목 (claim 두 경로 분리):
|
|
1. /register UPSERT — 같은 worker_id 두번 호출 시 row 1 + device_label 갱신.
|
|
2. /heartbeat — row INSERT + status='available' 저장.
|
|
3. /claim — queue empty 시 204 + body 0 (정정 #4).
|
|
4. /claim — pending row 1건 + 200 + status='processing' 전이.
|
|
5. /result completed — status='completed' + result JSONB 저장.
|
|
6. /drain — heartbeat row status='draining' INSERT.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import uuid
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
|
|
|
from httpx import ASGITransport, AsyncClient
|
|
|
|
from _worker_pool_helpers import (
|
|
cleanup_worker_capabilities,
|
|
cleanup_worker_jobs,
|
|
count_worker_capability,
|
|
ensure_user,
|
|
fetch_latest_heartbeat,
|
|
fetch_worker_capability,
|
|
fetch_worker_job,
|
|
insert_worker_capability,
|
|
insert_worker_job,
|
|
mint_access_token,
|
|
)
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def env_setup(monkeypatch):
|
|
monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def worker_token(env_setup):
|
|
await ensure_user("laptop-worker-bot")
|
|
return mint_access_token("laptop-worker-bot")
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def owner_id():
|
|
return await ensure_user("test-owner-ep-1b")
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def worker_id_with_capability(owner_id):
|
|
wid = f"test-ep-1b-{uuid.uuid4().hex[:8]}"
|
|
await insert_worker_capability(wid, owner_id)
|
|
yield wid
|
|
await cleanup_worker_jobs("test-ep-1b")
|
|
await cleanup_worker_capabilities("test-ep-1b")
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def client():
|
|
from main import app
|
|
|
|
async with AsyncClient(
|
|
transport=ASGITransport(app=app), base_url="http://test"
|
|
) as ac:
|
|
yield ac
|
|
|
|
|
|
def _auth(token: str) -> dict:
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_register_upsert(client, worker_token):
|
|
wid = f"test-ep-1b-reg-{uuid.uuid4().hex[:6]}"
|
|
try:
|
|
body = {
|
|
"worker_id": wid,
|
|
"device_label": "MacBook Pro M3 Pro",
|
|
"worker_class": "laptop",
|
|
"tier": "laptop_small",
|
|
"capabilities": ["short_summary"],
|
|
"models_loaded": ["gemma-3-4b-it"],
|
|
}
|
|
r1 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token))
|
|
assert r1.status_code == 200, r1.text
|
|
body["device_label"] = "MacBook Pro M3 Pro (renamed)"
|
|
r2 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token))
|
|
assert r2.status_code == 200
|
|
assert await count_worker_capability(wid) == 1
|
|
cap = await fetch_worker_capability(wid)
|
|
assert cap is not None
|
|
assert cap["device_label"] == "MacBook Pro M3 Pro (renamed)"
|
|
finally:
|
|
await cleanup_worker_capabilities(wid)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_heartbeat_insert(client, worker_token, worker_id_with_capability):
|
|
wid = worker_id_with_capability
|
|
r = await client.post(
|
|
"/internal/worker/heartbeat",
|
|
json={"worker_id": wid, "status": "available"},
|
|
headers=_auth(worker_token),
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
hb = await fetch_latest_heartbeat(wid)
|
|
assert hb is not None
|
|
assert hb["status"] == "available"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_204_when_empty(client, worker_token):
|
|
r = await client.post(
|
|
"/internal/worker/claim",
|
|
json={"worker_id": "test-ep-1b-empty", "job_type": "test-ep-1b-NOEXIST"},
|
|
headers=_auth(worker_token),
|
|
)
|
|
assert r.status_code == 204
|
|
assert r.content == b"" # 정정 #4
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_claim_processing_transition(
|
|
client, worker_token, worker_id_with_capability, owner_id
|
|
):
|
|
wid = worker_id_with_capability
|
|
jt = "test-ep-1b-claim"
|
|
job_id = await insert_worker_job(owner_id, jt, payload='{"week":"2026-W20"}')
|
|
|
|
r = await client.post(
|
|
"/internal/worker/claim",
|
|
json={"worker_id": wid, "job_type": jt},
|
|
headers=_auth(worker_token),
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
js = r.json()
|
|
assert js["id"] == job_id
|
|
assert js["job_type"] == jt
|
|
assert js["attempts"] == 1
|
|
assert js["payload"]["week"] == "2026-W20"
|
|
|
|
job = await fetch_worker_job(job_id)
|
|
assert job["status"] == "processing"
|
|
assert job["worker_id"] == wid
|
|
assert job["attempts"] == 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_result_completed(client, worker_token, worker_id_with_capability, owner_id):
|
|
wid = worker_id_with_capability
|
|
job_id = await insert_worker_job(
|
|
owner_id,
|
|
"test-ep-1b-result",
|
|
status="processing",
|
|
worker_id=wid,
|
|
attempts=1,
|
|
claimed=True,
|
|
)
|
|
r = await client.post(
|
|
"/internal/worker/result",
|
|
json={
|
|
"job_id": job_id,
|
|
"worker_id": wid,
|
|
"status": "completed",
|
|
"result": {"summary": "test"},
|
|
},
|
|
headers=_auth(worker_token),
|
|
)
|
|
assert r.status_code == 200, r.text
|
|
job = await fetch_worker_job(job_id)
|
|
assert job["status"] == "completed"
|
|
assert job["result"] == {"summary": "test"}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_drain_heartbeat_insert(client, worker_token, worker_id_with_capability):
|
|
wid = worker_id_with_capability
|
|
r = await client.post(
|
|
"/internal/worker/drain",
|
|
json={"worker_id": wid, "reason": "sleep"},
|
|
headers=_auth(worker_token),
|
|
)
|
|
assert r.status_code == 200
|
|
hb = await fetch_latest_heartbeat(wid)
|
|
assert hb is not None
|
|
assert hb["status"] == "draining"
|
|
assert hb["raw_payload"] == {"reason": "sleep"}
|