Files
hyungi_document_server/tests/test_worker_jobs_ownership.py
Hyungi Ahn 0cbd97fcba refactor(worker-pool): Registry-1B test fixture — NullPool helper standalone
각 helper 가 자체 engine + NullPool 사용 (connection 격리). fixture chain 의
asyncpg "another operation in progress" race 회피. 호출 site 단순화.

같은 파일 sequential 실행 시 module-level app + global engine pool 충돌은
별 follow-up `PR-Worker-Pool-Test-Fixture-Isolation` (P3) 영역.

단독 PASS 검증: auth 5/5 + smoke 3/3 + ownership 1/1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 12:43:53 +09:00

80 lines
2.2 KiB
Python

"""PR-Worker-Pool-Registry-1B 정정 #2 — /result 소유권 검증.
worker A 가 claim 한 job 을 worker B 가 /result → 404.
"""
from __future__ import annotations
import os
import sys
import uuid
import pytest
import pytest_asyncio
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from httpx import ASGITransport, AsyncClient
from _worker_pool_helpers import (
cleanup_worker_capabilities,
cleanup_worker_jobs,
ensure_user,
fetch_worker_job,
insert_worker_capability,
insert_worker_job,
mint_access_token,
)
@pytest_asyncio.fixture
async def env_setup(monkeypatch):
monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
@pytest_asyncio.fixture
async def worker_token(env_setup):
await ensure_user("laptop-worker-bot")
return mint_access_token("laptop-worker-bot")
@pytest.mark.asyncio
async def test_other_worker_cannot_submit_result(worker_token):
from main import app
owner_id = await ensure_user("test-owner-own-1b")
w_a = f"test-own-1b-a-{uuid.uuid4().hex[:6]}"
w_b = f"test-own-1b-b-{uuid.uuid4().hex[:6]}"
await insert_worker_capability(w_a, owner_id)
await insert_worker_capability(w_b, owner_id)
job_id = await insert_worker_job(
owner_id,
"test-own-1b",
status="processing",
worker_id=w_a,
attempts=1,
claimed=True,
)
try:
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as c:
r = await c.post(
"/internal/worker/result",
json={
"job_id": job_id,
"worker_id": w_b, # 다른 worker
"status": "completed",
"result": {"ok": True},
},
headers={"Authorization": f"Bearer {worker_token}"},
)
assert r.status_code == 404, r.text
job = await fetch_worker_job(job_id)
assert job["status"] == "processing"
assert job["worker_id"] == w_a
finally:
await cleanup_worker_jobs("test-own-1b")
await cleanup_worker_capabilities("test-own-1b")