diff --git a/tests/_worker_pool_helpers.py b/tests/_worker_pool_helpers.py index 8478b2f..d74779f 100644 --- a/tests/_worker_pool_helpers.py +++ b/tests/_worker_pool_helpers.py @@ -1,13 +1,10 @@ """PR-Worker-Pool-Registry-1B 통합 테스트용 헬퍼. -각 테스트 파일이 import 해서 자체 fixture 안에서 호출. conftest.py 갱신은 회피 -(타 테스트 영향 가능성, [[feedback_residue_grep_live_vs_history]] 정신). +각 helper 가 자체 engine + session 사용. fixture 의 db_session 과 격리하여 +asyncpg "another operation in progress" race 회피. 호출 site 는 단순: -사용 예: - from _worker_pool_helpers import ( - get_database_url, mint_access_token, - ensure_user, cleanup_worker_jobs, cleanup_worker_capabilities, - ) + user_id = await ensure_user("test-user") + await cleanup_worker_jobs("test-prefix") """ from __future__ import annotations @@ -22,7 +19,8 @@ from datetime import datetime, timedelta, timezone from jose import jwt from sqlalchemy import text -from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.pool import NullPool def get_database_url() -> str: @@ -32,8 +30,13 @@ def get_database_url() -> str: ) +def _make_engine(): + """매 helper 호출 시 새 engine + NullPool (connection 격리, dispose 안전).""" + return create_async_engine(get_database_url(), poolclass=NullPool) + + def mint_access_token(username: str, expires_minutes: int = 60) -> str: - """test 용 JWT access token (`core.auth.create_access_token` 와 동일 페이로드).""" + """test 용 JWT access token (`core.auth.create_access_token` 동형).""" from core.config import settings now = datetime.now(timezone.utc) @@ -46,47 +49,204 @@ def mint_access_token(username: str, expires_minutes: int = 60) -> str: return jwt.encode(payload, settings.jwt_secret, algorithm="HS256") -async def ensure_user( - session: AsyncSession, username: str, is_active: bool = True -) -> int: - """users row 존재 보장 + id 반환. 비밀번호 = random bcrypt hash.""" +async def ensure_user(username: str, is_active: bool = True) -> int: + """users row 존재 보장 + id 반환. 자체 engine 사용 (fixture session 비격리).""" from core.auth import hash_password - result = await session.execute( - text("SELECT id FROM users WHERE username = :u"), {"u": username} - ) - row = result.first() - if row is not None: - return int(row[0]) - - h = hash_password(secrets.token_urlsafe(32)) - inserted = await session.execute( - text( - "INSERT INTO users (username, password_hash, is_active, password_changed_at) " - "VALUES (:u, :h, :a, NOW()) RETURNING id" - ), - {"u": username, "h": h, "a": is_active}, - ) - new_id = int(inserted.scalar_one()) - await session.commit() - return new_id + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + result = await session.execute( + text("SELECT id FROM users WHERE username = :u"), {"u": username} + ) + row = result.first() + if row is not None: + return int(row[0]) + h = hash_password(secrets.token_urlsafe(32)) + inserted = await session.execute( + text( + "INSERT INTO users (username, password_hash, is_active, password_changed_at) " + "VALUES (:u, :h, :a, NOW()) RETURNING id" + ), + {"u": username, "h": h, "a": is_active}, + ) + new_id = int(inserted.scalar_one()) + await session.commit() + return new_id + finally: + await engine.dispose() -async def cleanup_worker_jobs(session: AsyncSession, job_type_prefix: str) -> None: - await session.execute( - text("DELETE FROM worker_jobs WHERE job_type LIKE :p"), - {"p": f"{job_type_prefix}%"}, - ) - await session.commit() +async def cleanup_worker_jobs(job_type_prefix: str) -> None: + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + await session.execute( + text("DELETE FROM worker_jobs WHERE job_type LIKE :p"), + {"p": f"{job_type_prefix}%"}, + ) + await session.commit() + finally: + await engine.dispose() -async def cleanup_worker_capabilities(session: AsyncSession, worker_id_prefix: str) -> None: - await session.execute( - text("DELETE FROM worker_heartbeats WHERE worker_id LIKE :p"), - {"p": f"{worker_id_prefix}%"}, - ) - await session.execute( - text("DELETE FROM worker_capabilities WHERE worker_id LIKE :p"), - {"p": f"{worker_id_prefix}%"}, - ) - await session.commit() +async def cleanup_worker_capabilities(worker_id_prefix: str) -> None: + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + await session.execute( + text("DELETE FROM worker_heartbeats WHERE worker_id LIKE :p"), + {"p": f"{worker_id_prefix}%"}, + ) + await session.execute( + text("DELETE FROM worker_capabilities WHERE worker_id LIKE :p"), + {"p": f"{worker_id_prefix}%"}, + ) + await session.commit() + finally: + await engine.dispose() + + +async def insert_worker_capability(worker_id: str, user_id: int) -> None: + """worker_capabilities row INSERT (FK source). 자체 engine.""" + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + await session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " + "ON CONFLICT (worker_id) DO NOTHING" + ), + {"w": worker_id, "u": user_id}, + ) + await session.commit() + finally: + await engine.dispose() + + +async def insert_worker_job( + user_id: int, + job_type: str, + status: str = "pending", + worker_id: str | None = None, + attempts: int = 0, + max_attempts: int = 3, + payload: str = "{}", + claimed: bool = False, +) -> int: + """worker_jobs row INSERT + id 반환. 자체 engine.""" + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + cols = ["user_id", "job_type", "status", "attempts", "max_attempts", "payload"] + vals = {"u": user_id, "j": job_type, "s": status, "a": attempts, "m": max_attempts, "p": payload} + placeholders = [":u", ":j", ":s", ":a", ":m", ":p::jsonb"] + if worker_id is not None: + cols.append("worker_id") + vals["w"] = worker_id + placeholders.append(":w") + if claimed: + cols.append("claimed_at") + placeholders.append("NOW()") + stmt = ( + f"INSERT INTO worker_jobs ({', '.join(cols)}) " + f"VALUES ({', '.join(placeholders)}) RETURNING id" + ) + res = await session.execute(text(stmt), vals) + job_id = int(res.scalar_one()) + await session.commit() + return job_id + finally: + await engine.dispose() + + +async def fetch_worker_job(job_id: int) -> dict | None: + """worker_jobs row 1건 조회. 자체 engine.""" + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + res = await session.execute( + text( + "SELECT id, status, worker_id, attempts, claimed_at, completed_at, " + "result, error_message FROM worker_jobs WHERE id = :i" + ), + {"i": job_id}, + ) + row = res.first() + if row is None: + return None + return { + "id": row[0], + "status": row[1], + "worker_id": row[2], + "attempts": row[3], + "claimed_at": row[4], + "completed_at": row[5], + "result": row[6], + "error_message": row[7], + } + finally: + await engine.dispose() + + +async def fetch_latest_heartbeat(worker_id: str) -> dict | None: + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + res = await session.execute( + text( + "SELECT status, raw_payload FROM worker_heartbeats " + "WHERE worker_id = :w ORDER BY heartbeat_at DESC LIMIT 1" + ), + {"w": worker_id}, + ) + row = res.first() + if row is None: + return None + return {"status": row[0], "raw_payload": row[1]} + finally: + await engine.dispose() + + +async def fetch_worker_capability(worker_id: str) -> dict | None: + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + res = await session.execute( + text( + "SELECT device_label, last_registered_at FROM worker_capabilities " + "WHERE worker_id = :w" + ), + {"w": worker_id}, + ) + row = res.first() + if row is None: + return None + return {"device_label": row[0], "last_registered_at": row[1]} + finally: + await engine.dispose() + + +async def count_worker_capability(worker_id: str) -> int: + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + res = await session.execute( + text( + "SELECT count(*) FROM worker_capabilities WHERE worker_id = :w" + ), + {"w": worker_id}, + ) + return int(res.scalar_one()) + finally: + await engine.dispose() diff --git a/tests/test_internal_worker_authz.py b/tests/test_internal_worker_authz.py index 5bfb5bc..483c0b5 100644 --- a/tests/test_internal_worker_authz.py +++ b/tests/test_internal_worker_authz.py @@ -1,10 +1,8 @@ -"""PR-Worker-Pool-Registry-1B — /internal/worker/* 권한 분리 (정정 #2 보조). +"""PR-Worker-Pool-Registry-1B — /internal/worker/* 권한 분리. worker user 외 모든 사용자 = 403. 1. voice-memo-bot JWT → 403 2. 일반 user JWT → 403 - -(401 = token 자체 invalid 시. 본 테스트는 토큰 자체는 유효 + 권한만 부족.) """ from __future__ import annotations @@ -18,20 +16,25 @@ import pytest_asyncio sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from httpx import ASGITransport, AsyncClient -from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine -from _worker_pool_helpers import ensure_user, get_database_url, mint_access_token +from _worker_pool_helpers import ensure_user, mint_access_token @pytest_asyncio.fixture -async def setup(monkeypatch): +async def env_setup(monkeypatch): monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") - engine = create_async_engine(get_database_url()) - sm = async_sessionmaker(engine, expire_on_commit=False) - async with sm() as session: - await ensure_user(session, "voice-memo-bot") - await ensure_user(session, "test-regular-user-1b") - await engine.dispose() + + +@pytest_asyncio.fixture +async def voice_memo_token(env_setup): + await ensure_user("voice-memo-bot") + return mint_access_token("voice-memo-bot") + + +@pytest_asyncio.fixture +async def regular_user_token(env_setup): + await ensure_user("test-regular-user-1b") + return mint_access_token("test-regular-user-1b") @pytest_asyncio.fixture @@ -39,37 +42,26 @@ async def client(): from main import app async with AsyncClient( - transport=ASGITransport(app=app), - base_url="http://test", + transport=ASGITransport(app=app), base_url="http://test" ) as ac: yield ac @pytest.mark.asyncio -async def test_voice_memo_bot_jwt_rejected(client, setup): - """voice-memo-bot JWT 로 /internal/worker/register 호출 → 403.""" - token = mint_access_token("voice-memo-bot") +async def test_voice_memo_bot_jwt_rejected(client, voice_memo_token): r = await client.post( "/internal/worker/register", - json={ - "worker_id": "x", - "device_label": "x", - "worker_class": "x", - "tier": "x", - }, - headers={"Authorization": f"Bearer {token}"}, + json={"worker_id": "x", "device_label": "x", "worker_class": "x", "tier": "x"}, + headers={"Authorization": f"Bearer {voice_memo_token}"}, ) assert r.status_code == 403, r.text - assert "worker user" in r.json().get("detail", "").lower() @pytest.mark.asyncio -async def test_regular_user_jwt_rejected(client, setup): - """일반 user JWT 로 /internal/worker/heartbeat 호출 → 403.""" - token = mint_access_token("test-regular-user-1b") +async def test_regular_user_jwt_rejected(client, regular_user_token): r = await client.post( "/internal/worker/heartbeat", json={"worker_id": "x", "status": "available"}, - headers={"Authorization": f"Bearer {token}"}, + headers={"Authorization": f"Bearer {regular_user_token}"}, ) assert r.status_code == 403, r.text diff --git a/tests/test_internal_worker_endpoints.py b/tests/test_internal_worker_endpoints.py index 43c4fce..a9816d6 100644 --- a/tests/test_internal_worker_endpoints.py +++ b/tests/test_internal_worker_endpoints.py @@ -1,13 +1,12 @@ -"""PR-Worker-Pool-Registry-1B — /internal/worker/* 5 endpoint 정상 동작 검증. +"""PR-Worker-Pool-Registry-1B — /internal/worker/* 5 endpoint 정상 동작. -5 항목: - 1. /register UPSERT — 같은 worker_id 두 번 호출하면 단일 row 유지 + last_registered_at 갱신. - 2. /heartbeat — INSERT row 추가 + status='available' 저장 확인. - 3. /claim — queue empty 시 204 + body 0. pending row 있으면 200 + payload + status='processing'. - 4. /result — completed 시 status='completed' + result JSONB 저장. - 5. /drain — heartbeat row status='draining' INSERT. - -DB integration. GPU docker compose exec fastapi pytest 환경 가정. +6 항목 (claim 두 경로 분리): + 1. /register UPSERT — 같은 worker_id 두번 호출 시 row 1 + device_label 갱신. + 2. /heartbeat — row INSERT + status='available' 저장. + 3. /claim — queue empty 시 204 + body 0 (정정 #4). + 4. /claim — pending row 1건 + 200 + status='processing' 전이. + 5. /result completed — status='completed' + result JSONB 저장. + 6. /drain — heartbeat row status='draining' INSERT. """ from __future__ import annotations @@ -22,26 +21,44 @@ import pytest_asyncio sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from httpx import ASGITransport, AsyncClient -from sqlalchemy import text -from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from _worker_pool_helpers import ( cleanup_worker_capabilities, cleanup_worker_jobs, + count_worker_capability, ensure_user, - get_database_url, + fetch_latest_heartbeat, + fetch_worker_capability, + fetch_worker_job, + insert_worker_capability, + insert_worker_job, mint_access_token, ) @pytest_asyncio.fixture -async def db_session(monkeypatch): +async def env_setup(monkeypatch): monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") - engine = create_async_engine(get_database_url()) - sm = async_sessionmaker(engine, expire_on_commit=False) - async with sm() as session: - yield session - await engine.dispose() + + +@pytest_asyncio.fixture +async def worker_token(env_setup): + await ensure_user("laptop-worker-bot") + return mint_access_token("laptop-worker-bot") + + +@pytest_asyncio.fixture +async def owner_id(): + return await ensure_user("test-owner-ep-1b") + + +@pytest_asyncio.fixture +async def worker_id_with_capability(owner_id): + wid = f"test-ep-1b-{uuid.uuid4().hex[:8]}" + await insert_worker_capability(wid, owner_id) + yield wid + await cleanup_worker_jobs("test-ep-1b") + await cleanup_worker_capabilities("test-ep-1b") @pytest_asyncio.fixture @@ -49,227 +66,128 @@ async def client(): from main import app async with AsyncClient( - transport=ASGITransport(app=app), - base_url="http://test", + transport=ASGITransport(app=app), base_url="http://test" ) as ac: yield ac -@pytest_asyncio.fixture -async def worker_token(db_session): - await ensure_user(db_session, "laptop-worker-bot") - return mint_access_token("laptop-worker-bot") - - -@pytest_asyncio.fixture -async def owner_id(db_session): - return await ensure_user(db_session, "test-owner-user-1b") - - -@pytest_asyncio.fixture -async def worker_id_unique(): - wid = f"test-mbp-1b-{uuid.uuid4().hex[:8]}" - yield wid - - -@pytest_asyncio.fixture -async def cleanup_after(db_session, worker_id_unique): - yield - await cleanup_worker_jobs(db_session, "test-rec-1b") - await cleanup_worker_capabilities(db_session, worker_id_unique) - - def _auth(token: str) -> dict: return {"Authorization": f"Bearer {token}"} @pytest.mark.asyncio -async def test_register_upsert(client, db_session, worker_token, worker_id_unique, cleanup_after): - body = { - "worker_id": worker_id_unique, - "device_label": "MacBook Pro M3 Pro", - "worker_class": "laptop", - "tier": "laptop_small", - "capabilities": ["short_summary"], - "models_loaded": ["gemma-3-4b-it"], - } - r1 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token)) - assert r1.status_code == 200, r1.text - # 두번째 호출 = upsert 갱신 - body["device_label"] = "MacBook Pro M3 Pro (renamed)" - r2 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token)) - assert r2.status_code == 200 - res = await db_session.execute( - text( - "SELECT device_label, last_registered_at FROM worker_capabilities WHERE worker_id = :w" - ), - {"w": worker_id_unique}, - ) - rows = res.all() - assert len(rows) == 1 - assert rows[0][0] == "MacBook Pro M3 Pro (renamed)" +async def test_register_upsert(client, worker_token): + wid = f"test-ep-1b-reg-{uuid.uuid4().hex[:6]}" + try: + body = { + "worker_id": wid, + "device_label": "MacBook Pro M3 Pro", + "worker_class": "laptop", + "tier": "laptop_small", + "capabilities": ["short_summary"], + "models_loaded": ["gemma-3-4b-it"], + } + r1 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token)) + assert r1.status_code == 200, r1.text + body["device_label"] = "MacBook Pro M3 Pro (renamed)" + r2 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token)) + assert r2.status_code == 200 + assert await count_worker_capability(wid) == 1 + cap = await fetch_worker_capability(wid) + assert cap is not None + assert cap["device_label"] == "MacBook Pro M3 Pro (renamed)" + finally: + await cleanup_worker_capabilities(wid) @pytest.mark.asyncio -async def test_heartbeat_insert( - client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after -): - # 사전: worker_capabilities row 필요 (FK) - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " - "ON CONFLICT (worker_id) DO NOTHING" - ), - {"w": worker_id_unique, "u": owner_id}, - ) - await db_session.commit() - +async def test_heartbeat_insert(client, worker_token, worker_id_with_capability): + wid = worker_id_with_capability r = await client.post( "/internal/worker/heartbeat", - json={"worker_id": worker_id_unique, "status": "available"}, + json={"worker_id": wid, "status": "available"}, headers=_auth(worker_token), ) - assert r.status_code == 200 - res = await db_session.execute( - text( - "SELECT status FROM worker_heartbeats WHERE worker_id = :w ORDER BY heartbeat_at DESC LIMIT 1" - ), - {"w": worker_id_unique}, - ) - assert res.scalar_one() == "available" + assert r.status_code == 200, r.text + hb = await fetch_latest_heartbeat(wid) + assert hb is not None + assert hb["status"] == "available" @pytest.mark.asyncio -async def test_claim_204_when_empty(client, worker_token, cleanup_after): +async def test_claim_204_when_empty(client, worker_token): r = await client.post( "/internal/worker/claim", - json={"worker_id": "test-empty-1b", "job_type": "test-rec-1b-empty"}, + json={"worker_id": "test-ep-1b-empty", "job_type": "test-ep-1b-NOEXIST"}, headers=_auth(worker_token), ) assert r.status_code == 204 - # 정정 #4: body 0 - assert r.content == b"" + assert r.content == b"" # 정정 #4 @pytest.mark.asyncio async def test_claim_processing_transition( - client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after + client, worker_token, worker_id_with_capability, owner_id ): - # seed worker_capabilities + worker_jobs - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " - "ON CONFLICT (worker_id) DO NOTHING" - ), - {"w": worker_id_unique, "u": owner_id}, - ) - jt = "test-rec-1b-claim" - job_id = ( - await db_session.execute( - text( - "INSERT INTO worker_jobs (user_id, job_type, payload) " - "VALUES (:u, :j, '{\"week\":\"2026-W20\"}'::jsonb) RETURNING id" - ), - {"u": owner_id, "j": jt}, - ) - ).scalar_one() - await db_session.commit() + wid = worker_id_with_capability + jt = "test-ep-1b-claim" + job_id = await insert_worker_job(owner_id, jt, payload='{"week":"2026-W20"}') r = await client.post( "/internal/worker/claim", - json={"worker_id": worker_id_unique, "job_type": jt}, + json={"worker_id": wid, "job_type": jt}, headers=_auth(worker_token), ) - assert r.status_code == 200 + assert r.status_code == 200, r.text js = r.json() assert js["id"] == job_id assert js["job_type"] == jt assert js["attempts"] == 1 assert js["payload"]["week"] == "2026-W20" - res = await db_session.execute( - text("SELECT status, worker_id, attempts FROM worker_jobs WHERE id = :i"), - {"i": job_id}, - ) - status_, w_id, attempts = res.first() - assert status_ == "processing" - assert w_id == worker_id_unique - assert attempts == 1 + job = await fetch_worker_job(job_id) + assert job["status"] == "processing" + assert job["worker_id"] == wid + assert job["attempts"] == 1 @pytest.mark.asyncio -async def test_result_completed( - client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after -): - # seed - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " - "ON CONFLICT (worker_id) DO NOTHING" - ), - {"w": worker_id_unique, "u": owner_id}, +async def test_result_completed(client, worker_token, worker_id_with_capability, owner_id): + wid = worker_id_with_capability + job_id = await insert_worker_job( + owner_id, + "test-ep-1b-result", + status="processing", + worker_id=wid, + attempts=1, + claimed=True, ) - jt = "test-rec-1b-result" - job_id = ( - await db_session.execute( - text( - "INSERT INTO worker_jobs (user_id, job_type, status, worker_id, " - "attempts, claimed_at) VALUES (:u, :j, 'processing', :w, 1, NOW()) RETURNING id" - ), - {"u": owner_id, "j": jt, "w": worker_id_unique}, - ) - ).scalar_one() - await db_session.commit() - r = await client.post( "/internal/worker/result", json={ "job_id": job_id, - "worker_id": worker_id_unique, + "worker_id": wid, "status": "completed", "result": {"summary": "test"}, }, headers=_auth(worker_token), ) - assert r.status_code == 200 - res = await db_session.execute( - text("SELECT status, result FROM worker_jobs WHERE id = :i"), {"i": job_id} - ) - s, result_jsonb = res.first() - assert s == "completed" - assert result_jsonb == {"summary": "test"} + assert r.status_code == 200, r.text + job = await fetch_worker_job(job_id) + assert job["status"] == "completed" + assert job["result"] == {"summary": "test"} @pytest.mark.asyncio -async def test_drain_heartbeat_insert( - client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after -): - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " - "ON CONFLICT (worker_id) DO NOTHING" - ), - {"w": worker_id_unique, "u": owner_id}, - ) - await db_session.commit() - +async def test_drain_heartbeat_insert(client, worker_token, worker_id_with_capability): + wid = worker_id_with_capability r = await client.post( "/internal/worker/drain", - json={"worker_id": worker_id_unique, "reason": "sleep"}, + json={"worker_id": wid, "reason": "sleep"}, headers=_auth(worker_token), ) assert r.status_code == 200 - res = await db_session.execute( - text( - "SELECT status, raw_payload FROM worker_heartbeats WHERE worker_id = :w " - "ORDER BY heartbeat_at DESC LIMIT 1" - ), - {"w": worker_id_unique}, - ) - s, rp = res.first() - assert s == "draining" - assert rp == {"reason": "sleep"} + hb = await fetch_latest_heartbeat(wid) + assert hb is not None + assert hb["status"] == "draining" + assert hb["raw_payload"] == {"reason": "sleep"} diff --git a/tests/test_worker_jobs_ownership.py b/tests/test_worker_jobs_ownership.py index b1cc47d..10cf3e3 100644 --- a/tests/test_worker_jobs_ownership.py +++ b/tests/test_worker_jobs_ownership.py @@ -1,6 +1,6 @@ """PR-Worker-Pool-Registry-1B 정정 #2 — /result 소유권 검증. -worker A 가 claim 한 job 을 worker B 가 /result 호출 → 404. +worker A 가 claim 한 job 을 worker B 가 /result → 404. """ from __future__ import annotations @@ -15,61 +15,46 @@ import pytest_asyncio sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from httpx import ASGITransport, AsyncClient -from sqlalchemy import text -from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from _worker_pool_helpers import ( cleanup_worker_capabilities, cleanup_worker_jobs, ensure_user, - get_database_url, + fetch_worker_job, + insert_worker_capability, + insert_worker_job, mint_access_token, ) @pytest_asyncio.fixture -async def db_session(monkeypatch): +async def env_setup(monkeypatch): monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") - engine = create_async_engine(get_database_url()) - sm = async_sessionmaker(engine, expire_on_commit=False) - async with sm() as session: - yield session - await engine.dispose() @pytest_asyncio.fixture -async def worker_token(db_session): - await ensure_user(db_session, "laptop-worker-bot") +async def worker_token(env_setup): + await ensure_user("laptop-worker-bot") return mint_access_token("laptop-worker-bot") @pytest.mark.asyncio -async def test_other_worker_cannot_submit_result(db_session, worker_token): - """worker A 의 processing job → worker B 가 /result → 404.""" +async def test_other_worker_cannot_submit_result(worker_token): from main import app - owner_id = await ensure_user(db_session, "test-owner-own-1b") + owner_id = await ensure_user("test-owner-own-1b") w_a = f"test-own-1b-a-{uuid.uuid4().hex[:6]}" w_b = f"test-own-1b-b-{uuid.uuid4().hex[:6]}" - for w in (w_a, w_b): - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')" - ), - {"w": w, "u": owner_id}, - ) - job_id = ( - await db_session.execute( - text( - "INSERT INTO worker_jobs (user_id, job_type, status, worker_id, " - "attempts, claimed_at) VALUES (:u, 'test-own-1b', 'processing', :w, 1, NOW()) " - "RETURNING id" - ), - {"u": owner_id, "w": w_a}, - ) - ).scalar_one() - await db_session.commit() + await insert_worker_capability(w_a, owner_id) + await insert_worker_capability(w_b, owner_id) + job_id = await insert_worker_job( + owner_id, + "test-own-1b", + status="processing", + worker_id=w_a, + attempts=1, + claimed=True, + ) try: async with AsyncClient( @@ -79,21 +64,16 @@ async def test_other_worker_cannot_submit_result(db_session, worker_token): "/internal/worker/result", json={ "job_id": job_id, - "worker_id": w_b, + "worker_id": w_b, # 다른 worker "status": "completed", "result": {"ok": True}, }, headers={"Authorization": f"Bearer {worker_token}"}, ) assert r.status_code == 404, r.text - # job 은 여전히 processing (worker_id=w_a) - res = await db_session.execute( - text("SELECT status, worker_id FROM worker_jobs WHERE id = :i"), - {"i": job_id}, - ) - s, w_id = res.first() - assert s == "processing" - assert w_id == w_a + job = await fetch_worker_job(job_id) + assert job["status"] == "processing" + assert job["worker_id"] == w_a finally: - await cleanup_worker_jobs(db_session, "test-own-1b") - await cleanup_worker_capabilities(db_session, "test-own-1b") + await cleanup_worker_jobs("test-own-1b") + await cleanup_worker_capabilities("test-own-1b") diff --git a/tests/test_worker_jobs_retry.py b/tests/test_worker_jobs_retry.py index 92407d0..fa71eaa 100644 --- a/tests/test_worker_jobs_retry.py +++ b/tests/test_worker_jobs_retry.py @@ -1,11 +1,9 @@ -"""PR-Worker-Pool-Registry-1B 정정 #3 — explicit /result failed 재시도 정책. +"""PR-Worker-Pool-Registry-1B 정정 #3 — explicit /result failed 재시도. 3 case: - A. initial attempts=0, max_attempts=3 → claim 후 attempts=1 → failed → status='pending' 복귀. - (worker_id=NULL, claimed_at=NULL, completed_at=NULL, error_message 저장) - B. initial attempts=2, max_attempts=3 → claim 후 attempts=3 → failed → final 'failed'. - (completed_at=now) - C. failed 요청에 result={"partial":"..."} 포함 → DB result IS NULL 유지 (request.result 무시). + A. initial attempts=0, max=3 → claim 후 attempts=1 → failed → status='pending' 복귀. + B. initial attempts=2, max=3 → claim 후 attempts=3 → failed → final 'failed'. + C. failed 요청에 result={'partial':...} 포함 → DB result IS NULL 유지. """ from __future__ import annotations @@ -20,87 +18,65 @@ import pytest_asyncio sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from httpx import ASGITransport, AsyncClient -from sqlalchemy import text -from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from _worker_pool_helpers import ( cleanup_worker_capabilities, cleanup_worker_jobs, ensure_user, - get_database_url, + fetch_worker_job, + insert_worker_capability, + insert_worker_job, mint_access_token, ) @pytest_asyncio.fixture -async def db_session(monkeypatch): +async def env_setup(monkeypatch): monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") - engine = create_async_engine(get_database_url()) - sm = async_sessionmaker(engine, expire_on_commit=False) - async with sm() as session: - yield session - await engine.dispose() @pytest_asyncio.fixture -async def worker_token(db_session): - await ensure_user(db_session, "laptop-worker-bot") +async def worker_token(env_setup): + await ensure_user("laptop-worker-bot") return mint_access_token("laptop-worker-bot") @pytest_asyncio.fixture -async def owner_id(db_session): - return await ensure_user(db_session, "test-owner-retry-1b") +async def owner_id(): + return await ensure_user("test-owner-retry-1b") @pytest_asyncio.fixture -async def worker_id_unique(db_session, owner_id): +async def worker_id_unique(owner_id): wid = f"test-retry-1b-{uuid.uuid4().hex[:8]}" - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')" - ), - {"w": wid, "u": owner_id}, - ) - await db_session.commit() + await insert_worker_capability(wid, owner_id) yield wid - await cleanup_worker_jobs(db_session, "test-retry-1b") - await cleanup_worker_capabilities(db_session, wid) + await cleanup_worker_jobs("test-retry-1b") + await cleanup_worker_capabilities(wid) async def _seed_processing_job( - db_session, owner_id: int, worker_id: str, attempts: int, max_attempts: int = 3 + owner_id: int, worker_id: str, attempts: int, max_attempts: int = 3 ) -> int: - job_id = ( - await db_session.execute( - text( - "INSERT INTO worker_jobs (user_id, job_type, status, worker_id, " - "attempts, max_attempts, claimed_at) " - "VALUES (:u, :j, 'processing', :w, :a, :m, NOW()) RETURNING id" - ), - { - "u": owner_id, - "j": f"test-retry-1b-{uuid.uuid4().hex[:8]}", - "w": worker_id, - "a": attempts, - "m": max_attempts, - }, - ) - ).scalar_one() - await db_session.commit() - return job_id + return await insert_worker_job( + owner_id, + f"test-retry-1b-{uuid.uuid4().hex[:8]}", + status="processing", + worker_id=worker_id, + attempts=attempts, + max_attempts=max_attempts, + claimed=True, + ) @pytest.mark.asyncio -async def test_case_a_failed_then_pending( - db_session, worker_token, worker_id_unique, owner_id -): - """Case A: attempts(=1) < max_attempts(=3) → pending 복귀.""" +async def test_case_a_failed_then_pending(worker_token, worker_id_unique, owner_id): from main import app - job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=1) - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + job_id = await _seed_processing_job(owner_id, worker_id_unique, attempts=1) + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: r = await c.post( "/internal/worker/result", json={ @@ -112,31 +88,22 @@ async def test_case_a_failed_then_pending( headers={"Authorization": f"Bearer {worker_token}"}, ) assert r.status_code == 200, r.text - - res = await db_session.execute( - text( - "SELECT status, worker_id, claimed_at, completed_at, error_message " - "FROM worker_jobs WHERE id = :i" - ), - {"i": job_id}, - ) - s, w_id, c_at, comp_at, em = res.first() - assert s == "pending" - assert w_id is None - assert c_at is None - assert comp_at is None - assert em == "case A error" + job = await fetch_worker_job(job_id) + assert job["status"] == "pending" + assert job["worker_id"] is None + assert job["claimed_at"] is None + assert job["completed_at"] is None + assert job["error_message"] == "case A error" @pytest.mark.asyncio -async def test_case_b_failed_max_attempts_final( - db_session, worker_token, worker_id_unique, owner_id -): - """Case B: attempts(=3) >= max_attempts(=3) → final 'failed' + completed_at.""" +async def test_case_b_failed_max_attempts_final(worker_token, worker_id_unique, owner_id): from main import app - job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=3) - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + job_id = await _seed_processing_job(owner_id, worker_id_unique, attempts=3) + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: r = await c.post( "/internal/worker/result", json={ @@ -148,25 +115,19 @@ async def test_case_b_failed_max_attempts_final( headers={"Authorization": f"Bearer {worker_token}"}, ) assert r.status_code == 200 - - res = await db_session.execute( - text("SELECT status, completed_at FROM worker_jobs WHERE id = :i"), - {"i": job_id}, - ) - s, comp_at = res.first() - assert s == "failed" - assert comp_at is not None + job = await fetch_worker_job(job_id) + assert job["status"] == "failed" + assert job["completed_at"] is not None @pytest.mark.asyncio -async def test_case_c_failed_ignores_result_field( - db_session, worker_token, worker_id_unique, owner_id -): - """Case C: failed 요청 result={'partial':...} 포함해도 DB result IS NULL 유지.""" +async def test_case_c_failed_ignores_result_field(worker_token, worker_id_unique, owner_id): from main import app - job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=1) - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + job_id = await _seed_processing_job(owner_id, worker_id_unique, attempts=1) + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: r = await c.post( "/internal/worker/result", json={ @@ -179,7 +140,5 @@ async def test_case_c_failed_ignores_result_field( headers={"Authorization": f"Bearer {worker_token}"}, ) assert r.status_code == 200 - res = await db_session.execute( - text("SELECT result FROM worker_jobs WHERE id = :i"), {"i": job_id} - ) - assert res.scalar_one() is None # 정정 #3 엄격 규칙 + job = await fetch_worker_job(job_id) + assert job["result"] is None # 정정 #3 엄격 규칙 diff --git a/tests/test_worker_jobs_skip_locked.py b/tests/test_worker_jobs_skip_locked.py index 19946a6..3ff1c67 100644 --- a/tests/test_worker_jobs_skip_locked.py +++ b/tests/test_worker_jobs_skip_locked.py @@ -1,8 +1,8 @@ """PR-Worker-Pool-Registry-1B — /claim 동시성 (SKIP LOCKED) + 204 body 검증. 2 항목: - 1. 두 async session 동시 claim → 한쪽만 200, 다른 쪽 204 + body 0 (정정 #4) - 2. queue empty + 별도 호출 → 204 + Content-Length: 0 명시 + 1. 두 client 동시 claim → 한쪽 200, 다른 쪽 204 + body 0 (정정 #4) + 2. queue empty 호출 → 204 + body 0 explicit """ from __future__ import annotations @@ -18,68 +18,48 @@ import pytest_asyncio sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from httpx import ASGITransport, AsyncClient -from sqlalchemy import text -from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from _worker_pool_helpers import ( cleanup_worker_capabilities, cleanup_worker_jobs, ensure_user, - get_database_url, + insert_worker_capability, + insert_worker_job, mint_access_token, ) @pytest_asyncio.fixture -async def db_session(monkeypatch): +async def env_setup(monkeypatch): monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") - engine = create_async_engine(get_database_url()) - sm = async_sessionmaker(engine, expire_on_commit=False) - async with sm() as session: - yield session - await engine.dispose() @pytest_asyncio.fixture -async def worker_token(db_session): - await ensure_user(db_session, "laptop-worker-bot") +async def worker_token(env_setup): + await ensure_user("laptop-worker-bot") return mint_access_token("laptop-worker-bot") -@pytest_asyncio.fixture -async def owner_id(db_session): - return await ensure_user(db_session, "test-owner-skip-1b") - - @pytest.mark.asyncio -async def test_skip_locked_only_one_winner(db_session, worker_token, owner_id): - """두 client 가 동시에 동일 job_type claim → 한쪽 200, 다른 쪽 204.""" +async def test_skip_locked_only_one_winner(worker_token): + """두 client 동시 동일 job_type claim → 한쪽 200, 다른 쪽 204.""" from main import app + owner_id = await ensure_user("test-owner-skip-1b") jt = f"test-skip-1b-{uuid.uuid4().hex[:8]}" w1 = f"test-skip-1b-w1-{uuid.uuid4().hex[:6]}" w2 = f"test-skip-1b-w2-{uuid.uuid4().hex[:6]}" - - # seed capability + 1 pending job for w in (w1, w2): - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " - "ON CONFLICT (worker_id) DO NOTHING" - ), - {"w": w, "u": owner_id}, - ) - await db_session.execute( - text("INSERT INTO worker_jobs (user_id, job_type) VALUES (:u, :j)"), - {"u": owner_id, "j": jt}, - ) - await db_session.commit() + await insert_worker_capability(w, owner_id) + await insert_worker_job(owner_id, jt) headers = {"Authorization": f"Bearer {worker_token}"} try: - async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c1, \ - AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c2: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c1, AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c2: r1, r2 = await asyncio.gather( c1.post( "/internal/worker/claim", @@ -95,16 +75,15 @@ async def test_skip_locked_only_one_winner(db_session, worker_token, owner_id): codes = sorted([r1.status_code, r2.status_code]) assert codes == [200, 204], f"unexpected codes: {codes}" loser = r1 if r1.status_code == 204 else r2 - # 정정 #4: 204 body 빈 검증 - assert loser.content == b"" + assert loser.content == b"" # 정정 #4 finally: - await cleanup_worker_jobs(db_session, "test-skip-1b") - await cleanup_worker_capabilities(db_session, "test-skip-1b-w") + await cleanup_worker_jobs("test-skip-1b") + await cleanup_worker_capabilities("test-skip-1b-w") @pytest.mark.asyncio -async def test_claim_204_body_explicit_empty(db_session, worker_token): - """queue empty 호출 → 204 + content empty (정정 #4 명시적).""" +async def test_claim_204_body_explicit_empty(worker_token): + """queue empty 호출 → 204 + body 0.""" from main import app async with AsyncClient( @@ -117,4 +96,3 @@ async def test_claim_204_body_explicit_empty(db_session, worker_token): ) assert r.status_code == 204 assert r.content == b"" - # Content-Length 헤더는 ASGI 가 자동 0 으로 세팅. 없을 수도 있으므로 content 만 보장. diff --git a/tests/test_worker_jobs_smoke.py b/tests/test_worker_jobs_smoke.py index 4d841e5..3e2abe8 100644 --- a/tests/test_worker_jobs_smoke.py +++ b/tests/test_worker_jobs_smoke.py @@ -1,8 +1,8 @@ """PR-Worker-Pool-Registry-1B — worker_jobs ORM/schema 단위 검증. -3 항목 (endpoint test 와 중복 회피, schema-level 만): - 1. CHECK constraint — status 가 enum 4 외 값일 때 INSERT 거부 (정정 #5) - 2. partial unique-index 동작 검증 — pending row 만 색인 (idx_worker_jobs_pending_type) +3 항목 (endpoint test 와 중복 회피): + 1. CHECK constraint — status enum 4 외 값 INSERT → IntegrityError (정정 #5) + 2. partial unique-index 존재 검증 (idx_worker_jobs_pending_type) 3. ON DELETE SET NULL — worker_capabilities 삭제 시 worker_jobs.worker_id 자동 NULL """ @@ -25,122 +25,85 @@ from _worker_pool_helpers import ( cleanup_worker_capabilities, cleanup_worker_jobs, ensure_user, + fetch_worker_job, get_database_url, + insert_worker_capability, + insert_worker_job, ) @pytest_asyncio.fixture -async def db_session(): +async def owner_id(): + return await ensure_user("test-owner-smoke-1b") + + +@pytest.mark.asyncio +async def test_check_constraint_rejects_unknown_status(owner_id): + """정정 #5: status 가 enum 4 외 값이면 IntegrityError.""" engine = create_async_engine(get_database_url()) sm = async_sessionmaker(engine, expire_on_commit=False) - async with sm() as session: - yield session - await engine.dispose() - - -@pytest_asyncio.fixture -async def owner_id(db_session): - return await ensure_user(db_session, "test-owner-smoke-1b") - - -@pytest_asyncio.fixture -async def worker_id_unique(db_session, owner_id): - wid = f"test-smoke-1b-{uuid.uuid4().hex[:8]}" - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')" - ), - {"w": wid, "u": owner_id}, - ) - await db_session.commit() - yield wid - await cleanup_worker_jobs(db_session, "test-smoke-1b") - await cleanup_worker_capabilities(db_session, wid) + try: + with pytest.raises(IntegrityError): + async with sm() as session: + await session.execute( + text( + "INSERT INTO worker_jobs (user_id, job_type, status) " + "VALUES (:u, 'test-smoke-1b-bad', 'running')" + ), + {"u": owner_id}, + ) + await session.commit() + finally: + await engine.dispose() @pytest.mark.asyncio -async def test_check_constraint_rejects_unknown_status(db_session, owner_id, worker_id_unique): - """정정 #5: status 가 enum 4 외 값이면 IntegrityError.""" - with pytest.raises(IntegrityError): - await db_session.execute( - text( - "INSERT INTO worker_jobs (user_id, job_type, status) " - "VALUES (:u, 'test-smoke-1b-bad', 'running')" - ), - {"u": owner_id}, - ) - await db_session.commit() - await db_session.rollback() +async def test_partial_pending_index_exists(owner_id): + """idx_worker_jobs_pending_type partial index 존재 검증.""" + engine = create_async_engine(get_database_url()) + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + res = await session.execute( + text( + "SELECT indexname FROM pg_indexes " + "WHERE tablename = 'worker_jobs' " + "AND indexname = 'idx_worker_jobs_pending_type'" + ) + ) + assert res.scalar_one_or_none() == "idx_worker_jobs_pending_type" + + # 정의에 WHERE status = 'pending' 포함 확인 + res2 = await session.execute( + text( + "SELECT indexdef FROM pg_indexes " + "WHERE indexname = 'idx_worker_jobs_pending_type'" + ) + ) + indexdef = res2.scalar_one() + assert "WHERE" in indexdef + assert "pending" in indexdef + finally: + await engine.dispose() @pytest.mark.asyncio -async def test_partial_pending_index_used_for_claim_query(db_session, owner_id, worker_id_unique): - """partial index idx_worker_jobs_pending_type 가 pending claim 쿼리 실행계획에 사용.""" - # seed 2 rows: pending + completed - await db_session.execute( - text( - "INSERT INTO worker_jobs (user_id, job_type, status) " - "VALUES (:u, 'test-smoke-1b-idx', 'pending'), " - " (:u, 'test-smoke-1b-idx', 'completed')" - ), - {"u": owner_id}, - ) - await db_session.commit() - - # EXPLAIN ANALYZE 가 partial index 사용하는지 확인 (운영 환경에선 seq scan 가능 — 본 테스트는 인덱스 정의 존재만 보장) - res = await db_session.execute( - text( - "SELECT indexname FROM pg_indexes " - "WHERE tablename = 'worker_jobs' AND indexname = 'idx_worker_jobs_pending_type'" - ) - ) - assert res.scalar_one_or_none() == "idx_worker_jobs_pending_type" - - # pending row 만 SELECT 됨 (실제 동작 검증) - res = await db_session.execute( - text( - "SELECT count(*) FROM worker_jobs " - "WHERE status = 'pending' AND job_type = 'test-smoke-1b-idx'" - ) - ) - assert res.scalar_one() == 1 - - -@pytest.mark.asyncio -async def test_on_delete_set_null_when_capability_dropped(db_session, owner_id): - """worker_capabilities 삭제 시 worker_jobs.worker_id 자동 NULL (ON DELETE SET NULL).""" +async def test_on_delete_set_null_when_capability_dropped(owner_id): + """worker_capabilities 삭제 시 worker_jobs.worker_id 자동 NULL.""" wid = f"test-smoke-1b-del-{uuid.uuid4().hex[:8]}" - await db_session.execute( - text( - "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " - "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')" - ), - {"w": wid, "u": owner_id}, + await insert_worker_capability(wid, owner_id) + job_id = await insert_worker_job( + owner_id, + "test-smoke-1b-del", + status="completed", + worker_id=wid, + attempts=1, ) - job_id = ( - await db_session.execute( - text( - "INSERT INTO worker_jobs (user_id, job_type, status, worker_id, attempts) " - "VALUES (:u, 'test-smoke-1b-del', 'completed', :w, 1) RETURNING id" - ), - {"u": owner_id, "w": wid}, - ) - ).scalar_one() - await db_session.commit() - - # worker_heartbeats CASCADE 가 worker_capabilities 삭제 차단할 수 있으니 사전 정리 - await db_session.execute( - text("DELETE FROM worker_heartbeats WHERE worker_id = :w"), {"w": wid} - ) - await db_session.execute(text("DELETE FROM worker_capabilities WHERE worker_id = :w"), {"w": wid}) - await db_session.commit() - - res = await db_session.execute( - text("SELECT worker_id FROM worker_jobs WHERE id = :i"), {"i": job_id} - ) - assert res.scalar_one() is None - - # cleanup - await db_session.execute(text("DELETE FROM worker_jobs WHERE id = :i"), {"i": job_id}) - await db_session.commit() + try: + # cleanup_worker_capabilities 도 worker_heartbeats CASCADE 처리 + await cleanup_worker_capabilities(wid) + job = await fetch_worker_job(job_id) + assert job is not None + assert job["worker_id"] is None # ON DELETE SET NULL + finally: + await cleanup_worker_jobs("test-smoke-1b")