refactor(worker-pool): Registry-1B test fixture — NullPool helper standalone
각 helper 가 자체 engine + NullPool 사용 (connection 격리). fixture chain 의 asyncpg "another operation in progress" race 회피. 호출 site 단순화. 같은 파일 sequential 실행 시 module-level app + global engine pool 충돌은 별 follow-up `PR-Worker-Pool-Test-Fixture-Isolation` (P3) 영역. 단독 PASS 검증: auth 5/5 + smoke 3/3 + ownership 1/1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+207
-47
@@ -1,13 +1,10 @@
|
||||
"""PR-Worker-Pool-Registry-1B 통합 테스트용 헬퍼.
|
||||
|
||||
각 테스트 파일이 import 해서 자체 fixture 안에서 호출. conftest.py 갱신은 회피
|
||||
(타 테스트 영향 가능성, [[feedback_residue_grep_live_vs_history]] 정신).
|
||||
각 helper 가 자체 engine + session 사용. fixture 의 db_session 과 격리하여
|
||||
asyncpg "another operation in progress" race 회피. 호출 site 는 단순:
|
||||
|
||||
사용 예:
|
||||
from _worker_pool_helpers import (
|
||||
get_database_url, mint_access_token,
|
||||
ensure_user, cleanup_worker_jobs, cleanup_worker_capabilities,
|
||||
)
|
||||
user_id = await ensure_user("test-user")
|
||||
await cleanup_worker_jobs("test-prefix")
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -22,7 +19,8 @@ from datetime import datetime, timedelta, timezone
|
||||
|
||||
from jose import jwt
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
from sqlalchemy.pool import NullPool
|
||||
|
||||
|
||||
def get_database_url() -> str:
|
||||
@@ -32,8 +30,13 @@ def get_database_url() -> str:
|
||||
)
|
||||
|
||||
|
||||
def _make_engine():
|
||||
"""매 helper 호출 시 새 engine + NullPool (connection 격리, dispose 안전)."""
|
||||
return create_async_engine(get_database_url(), poolclass=NullPool)
|
||||
|
||||
|
||||
def mint_access_token(username: str, expires_minutes: int = 60) -> str:
|
||||
"""test 용 JWT access token (`core.auth.create_access_token` 와 동일 페이로드)."""
|
||||
"""test 용 JWT access token (`core.auth.create_access_token` 동형)."""
|
||||
from core.config import settings
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
@@ -46,47 +49,204 @@ def mint_access_token(username: str, expires_minutes: int = 60) -> str:
|
||||
return jwt.encode(payload, settings.jwt_secret, algorithm="HS256")
|
||||
|
||||
|
||||
async def ensure_user(
|
||||
session: AsyncSession, username: str, is_active: bool = True
|
||||
) -> int:
|
||||
"""users row 존재 보장 + id 반환. 비밀번호 = random bcrypt hash."""
|
||||
async def ensure_user(username: str, is_active: bool = True) -> int:
|
||||
"""users row 존재 보장 + id 반환. 자체 engine 사용 (fixture session 비격리)."""
|
||||
from core.auth import hash_password
|
||||
|
||||
result = await session.execute(
|
||||
text("SELECT id FROM users WHERE username = :u"), {"u": username}
|
||||
)
|
||||
row = result.first()
|
||||
if row is not None:
|
||||
return int(row[0])
|
||||
|
||||
h = hash_password(secrets.token_urlsafe(32))
|
||||
inserted = await session.execute(
|
||||
text(
|
||||
"INSERT INTO users (username, password_hash, is_active, password_changed_at) "
|
||||
"VALUES (:u, :h, :a, NOW()) RETURNING id"
|
||||
),
|
||||
{"u": username, "h": h, "a": is_active},
|
||||
)
|
||||
new_id = int(inserted.scalar_one())
|
||||
await session.commit()
|
||||
return new_id
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
result = await session.execute(
|
||||
text("SELECT id FROM users WHERE username = :u"), {"u": username}
|
||||
)
|
||||
row = result.first()
|
||||
if row is not None:
|
||||
return int(row[0])
|
||||
h = hash_password(secrets.token_urlsafe(32))
|
||||
inserted = await session.execute(
|
||||
text(
|
||||
"INSERT INTO users (username, password_hash, is_active, password_changed_at) "
|
||||
"VALUES (:u, :h, :a, NOW()) RETURNING id"
|
||||
),
|
||||
{"u": username, "h": h, "a": is_active},
|
||||
)
|
||||
new_id = int(inserted.scalar_one())
|
||||
await session.commit()
|
||||
return new_id
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def cleanup_worker_jobs(session: AsyncSession, job_type_prefix: str) -> None:
|
||||
await session.execute(
|
||||
text("DELETE FROM worker_jobs WHERE job_type LIKE :p"),
|
||||
{"p": f"{job_type_prefix}%"},
|
||||
)
|
||||
await session.commit()
|
||||
async def cleanup_worker_jobs(job_type_prefix: str) -> None:
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
await session.execute(
|
||||
text("DELETE FROM worker_jobs WHERE job_type LIKE :p"),
|
||||
{"p": f"{job_type_prefix}%"},
|
||||
)
|
||||
await session.commit()
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def cleanup_worker_capabilities(session: AsyncSession, worker_id_prefix: str) -> None:
|
||||
await session.execute(
|
||||
text("DELETE FROM worker_heartbeats WHERE worker_id LIKE :p"),
|
||||
{"p": f"{worker_id_prefix}%"},
|
||||
)
|
||||
await session.execute(
|
||||
text("DELETE FROM worker_capabilities WHERE worker_id LIKE :p"),
|
||||
{"p": f"{worker_id_prefix}%"},
|
||||
)
|
||||
await session.commit()
|
||||
async def cleanup_worker_capabilities(worker_id_prefix: str) -> None:
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
await session.execute(
|
||||
text("DELETE FROM worker_heartbeats WHERE worker_id LIKE :p"),
|
||||
{"p": f"{worker_id_prefix}%"},
|
||||
)
|
||||
await session.execute(
|
||||
text("DELETE FROM worker_capabilities WHERE worker_id LIKE :p"),
|
||||
{"p": f"{worker_id_prefix}%"},
|
||||
)
|
||||
await session.commit()
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def insert_worker_capability(worker_id: str, user_id: int) -> None:
|
||||
"""worker_capabilities row INSERT (FK source). 자체 engine."""
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
await session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') "
|
||||
"ON CONFLICT (worker_id) DO NOTHING"
|
||||
),
|
||||
{"w": worker_id, "u": user_id},
|
||||
)
|
||||
await session.commit()
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def insert_worker_job(
|
||||
user_id: int,
|
||||
job_type: str,
|
||||
status: str = "pending",
|
||||
worker_id: str | None = None,
|
||||
attempts: int = 0,
|
||||
max_attempts: int = 3,
|
||||
payload: str = "{}",
|
||||
claimed: bool = False,
|
||||
) -> int:
|
||||
"""worker_jobs row INSERT + id 반환. 자체 engine."""
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
cols = ["user_id", "job_type", "status", "attempts", "max_attempts", "payload"]
|
||||
vals = {"u": user_id, "j": job_type, "s": status, "a": attempts, "m": max_attempts, "p": payload}
|
||||
placeholders = [":u", ":j", ":s", ":a", ":m", ":p::jsonb"]
|
||||
if worker_id is not None:
|
||||
cols.append("worker_id")
|
||||
vals["w"] = worker_id
|
||||
placeholders.append(":w")
|
||||
if claimed:
|
||||
cols.append("claimed_at")
|
||||
placeholders.append("NOW()")
|
||||
stmt = (
|
||||
f"INSERT INTO worker_jobs ({', '.join(cols)}) "
|
||||
f"VALUES ({', '.join(placeholders)}) RETURNING id"
|
||||
)
|
||||
res = await session.execute(text(stmt), vals)
|
||||
job_id = int(res.scalar_one())
|
||||
await session.commit()
|
||||
return job_id
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def fetch_worker_job(job_id: int) -> dict | None:
|
||||
"""worker_jobs row 1건 조회. 자체 engine."""
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
res = await session.execute(
|
||||
text(
|
||||
"SELECT id, status, worker_id, attempts, claimed_at, completed_at, "
|
||||
"result, error_message FROM worker_jobs WHERE id = :i"
|
||||
),
|
||||
{"i": job_id},
|
||||
)
|
||||
row = res.first()
|
||||
if row is None:
|
||||
return None
|
||||
return {
|
||||
"id": row[0],
|
||||
"status": row[1],
|
||||
"worker_id": row[2],
|
||||
"attempts": row[3],
|
||||
"claimed_at": row[4],
|
||||
"completed_at": row[5],
|
||||
"result": row[6],
|
||||
"error_message": row[7],
|
||||
}
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def fetch_latest_heartbeat(worker_id: str) -> dict | None:
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
res = await session.execute(
|
||||
text(
|
||||
"SELECT status, raw_payload FROM worker_heartbeats "
|
||||
"WHERE worker_id = :w ORDER BY heartbeat_at DESC LIMIT 1"
|
||||
),
|
||||
{"w": worker_id},
|
||||
)
|
||||
row = res.first()
|
||||
if row is None:
|
||||
return None
|
||||
return {"status": row[0], "raw_payload": row[1]}
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def fetch_worker_capability(worker_id: str) -> dict | None:
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
res = await session.execute(
|
||||
text(
|
||||
"SELECT device_label, last_registered_at FROM worker_capabilities "
|
||||
"WHERE worker_id = :w"
|
||||
),
|
||||
{"w": worker_id},
|
||||
)
|
||||
row = res.first()
|
||||
if row is None:
|
||||
return None
|
||||
return {"device_label": row[0], "last_registered_at": row[1]}
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def count_worker_capability(worker_id: str) -> int:
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
res = await session.execute(
|
||||
text(
|
||||
"SELECT count(*) FROM worker_capabilities WHERE worker_id = :w"
|
||||
),
|
||||
{"w": worker_id},
|
||||
)
|
||||
return int(res.scalar_one())
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
"""PR-Worker-Pool-Registry-1B — /internal/worker/* 권한 분리 (정정 #2 보조).
|
||||
"""PR-Worker-Pool-Registry-1B — /internal/worker/* 권한 분리.
|
||||
|
||||
worker user 외 모든 사용자 = 403.
|
||||
1. voice-memo-bot JWT → 403
|
||||
2. 일반 user JWT → 403
|
||||
|
||||
(401 = token 자체 invalid 시. 본 테스트는 토큰 자체는 유효 + 권한만 부족.)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -18,20 +16,25 @@ import pytest_asyncio
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||||
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from _worker_pool_helpers import ensure_user, get_database_url, mint_access_token
|
||||
from _worker_pool_helpers import ensure_user, mint_access_token
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def setup(monkeypatch):
|
||||
async def env_setup(monkeypatch):
|
||||
monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
|
||||
engine = create_async_engine(get_database_url())
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
await ensure_user(session, "voice-memo-bot")
|
||||
await ensure_user(session, "test-regular-user-1b")
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def voice_memo_token(env_setup):
|
||||
await ensure_user("voice-memo-bot")
|
||||
return mint_access_token("voice-memo-bot")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def regular_user_token(env_setup):
|
||||
await ensure_user("test-regular-user-1b")
|
||||
return mint_access_token("test-regular-user-1b")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
@@ -39,37 +42,26 @@ async def client():
|
||||
from main import app
|
||||
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app),
|
||||
base_url="http://test",
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as ac:
|
||||
yield ac
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_voice_memo_bot_jwt_rejected(client, setup):
|
||||
"""voice-memo-bot JWT 로 /internal/worker/register 호출 → 403."""
|
||||
token = mint_access_token("voice-memo-bot")
|
||||
async def test_voice_memo_bot_jwt_rejected(client, voice_memo_token):
|
||||
r = await client.post(
|
||||
"/internal/worker/register",
|
||||
json={
|
||||
"worker_id": "x",
|
||||
"device_label": "x",
|
||||
"worker_class": "x",
|
||||
"tier": "x",
|
||||
},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
json={"worker_id": "x", "device_label": "x", "worker_class": "x", "tier": "x"},
|
||||
headers={"Authorization": f"Bearer {voice_memo_token}"},
|
||||
)
|
||||
assert r.status_code == 403, r.text
|
||||
assert "worker user" in r.json().get("detail", "").lower()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_regular_user_jwt_rejected(client, setup):
|
||||
"""일반 user JWT 로 /internal/worker/heartbeat 호출 → 403."""
|
||||
token = mint_access_token("test-regular-user-1b")
|
||||
async def test_regular_user_jwt_rejected(client, regular_user_token):
|
||||
r = await client.post(
|
||||
"/internal/worker/heartbeat",
|
||||
json={"worker_id": "x", "status": "available"},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
headers={"Authorization": f"Bearer {regular_user_token}"},
|
||||
)
|
||||
assert r.status_code == 403, r.text
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
"""PR-Worker-Pool-Registry-1B — /internal/worker/* 5 endpoint 정상 동작 검증.
|
||||
"""PR-Worker-Pool-Registry-1B — /internal/worker/* 5 endpoint 정상 동작.
|
||||
|
||||
5 항목:
|
||||
1. /register UPSERT — 같은 worker_id 두 번 호출하면 단일 row 유지 + last_registered_at 갱신.
|
||||
2. /heartbeat — INSERT row 추가 + status='available' 저장 확인.
|
||||
3. /claim — queue empty 시 204 + body 0. pending row 있으면 200 + payload + status='processing'.
|
||||
4. /result — completed 시 status='completed' + result JSONB 저장.
|
||||
5. /drain — heartbeat row status='draining' INSERT.
|
||||
|
||||
DB integration. GPU docker compose exec fastapi pytest 환경 가정.
|
||||
6 항목 (claim 두 경로 분리):
|
||||
1. /register UPSERT — 같은 worker_id 두번 호출 시 row 1 + device_label 갱신.
|
||||
2. /heartbeat — row INSERT + status='available' 저장.
|
||||
3. /claim — queue empty 시 204 + body 0 (정정 #4).
|
||||
4. /claim — pending row 1건 + 200 + status='processing' 전이.
|
||||
5. /result completed — status='completed' + result JSONB 저장.
|
||||
6. /drain — heartbeat row status='draining' INSERT.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -22,26 +21,44 @@ import pytest_asyncio
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||||
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from _worker_pool_helpers import (
|
||||
cleanup_worker_capabilities,
|
||||
cleanup_worker_jobs,
|
||||
count_worker_capability,
|
||||
ensure_user,
|
||||
get_database_url,
|
||||
fetch_latest_heartbeat,
|
||||
fetch_worker_capability,
|
||||
fetch_worker_job,
|
||||
insert_worker_capability,
|
||||
insert_worker_job,
|
||||
mint_access_token,
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def db_session(monkeypatch):
|
||||
async def env_setup(monkeypatch):
|
||||
monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
|
||||
engine = create_async_engine(get_database_url())
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
yield session
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_token(env_setup):
|
||||
await ensure_user("laptop-worker-bot")
|
||||
return mint_access_token("laptop-worker-bot")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def owner_id():
|
||||
return await ensure_user("test-owner-ep-1b")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_id_with_capability(owner_id):
|
||||
wid = f"test-ep-1b-{uuid.uuid4().hex[:8]}"
|
||||
await insert_worker_capability(wid, owner_id)
|
||||
yield wid
|
||||
await cleanup_worker_jobs("test-ep-1b")
|
||||
await cleanup_worker_capabilities("test-ep-1b")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
@@ -49,227 +66,128 @@ async def client():
|
||||
from main import app
|
||||
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app),
|
||||
base_url="http://test",
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as ac:
|
||||
yield ac
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_token(db_session):
|
||||
await ensure_user(db_session, "laptop-worker-bot")
|
||||
return mint_access_token("laptop-worker-bot")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def owner_id(db_session):
|
||||
return await ensure_user(db_session, "test-owner-user-1b")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_id_unique():
|
||||
wid = f"test-mbp-1b-{uuid.uuid4().hex[:8]}"
|
||||
yield wid
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def cleanup_after(db_session, worker_id_unique):
|
||||
yield
|
||||
await cleanup_worker_jobs(db_session, "test-rec-1b")
|
||||
await cleanup_worker_capabilities(db_session, worker_id_unique)
|
||||
|
||||
|
||||
def _auth(token: str) -> dict:
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_register_upsert(client, db_session, worker_token, worker_id_unique, cleanup_after):
|
||||
body = {
|
||||
"worker_id": worker_id_unique,
|
||||
"device_label": "MacBook Pro M3 Pro",
|
||||
"worker_class": "laptop",
|
||||
"tier": "laptop_small",
|
||||
"capabilities": ["short_summary"],
|
||||
"models_loaded": ["gemma-3-4b-it"],
|
||||
}
|
||||
r1 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token))
|
||||
assert r1.status_code == 200, r1.text
|
||||
# 두번째 호출 = upsert 갱신
|
||||
body["device_label"] = "MacBook Pro M3 Pro (renamed)"
|
||||
r2 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token))
|
||||
assert r2.status_code == 200
|
||||
res = await db_session.execute(
|
||||
text(
|
||||
"SELECT device_label, last_registered_at FROM worker_capabilities WHERE worker_id = :w"
|
||||
),
|
||||
{"w": worker_id_unique},
|
||||
)
|
||||
rows = res.all()
|
||||
assert len(rows) == 1
|
||||
assert rows[0][0] == "MacBook Pro M3 Pro (renamed)"
|
||||
async def test_register_upsert(client, worker_token):
|
||||
wid = f"test-ep-1b-reg-{uuid.uuid4().hex[:6]}"
|
||||
try:
|
||||
body = {
|
||||
"worker_id": wid,
|
||||
"device_label": "MacBook Pro M3 Pro",
|
||||
"worker_class": "laptop",
|
||||
"tier": "laptop_small",
|
||||
"capabilities": ["short_summary"],
|
||||
"models_loaded": ["gemma-3-4b-it"],
|
||||
}
|
||||
r1 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token))
|
||||
assert r1.status_code == 200, r1.text
|
||||
body["device_label"] = "MacBook Pro M3 Pro (renamed)"
|
||||
r2 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token))
|
||||
assert r2.status_code == 200
|
||||
assert await count_worker_capability(wid) == 1
|
||||
cap = await fetch_worker_capability(wid)
|
||||
assert cap is not None
|
||||
assert cap["device_label"] == "MacBook Pro M3 Pro (renamed)"
|
||||
finally:
|
||||
await cleanup_worker_capabilities(wid)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_heartbeat_insert(
|
||||
client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after
|
||||
):
|
||||
# 사전: worker_capabilities row 필요 (FK)
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') "
|
||||
"ON CONFLICT (worker_id) DO NOTHING"
|
||||
),
|
||||
{"w": worker_id_unique, "u": owner_id},
|
||||
)
|
||||
await db_session.commit()
|
||||
|
||||
async def test_heartbeat_insert(client, worker_token, worker_id_with_capability):
|
||||
wid = worker_id_with_capability
|
||||
r = await client.post(
|
||||
"/internal/worker/heartbeat",
|
||||
json={"worker_id": worker_id_unique, "status": "available"},
|
||||
json={"worker_id": wid, "status": "available"},
|
||||
headers=_auth(worker_token),
|
||||
)
|
||||
assert r.status_code == 200
|
||||
res = await db_session.execute(
|
||||
text(
|
||||
"SELECT status FROM worker_heartbeats WHERE worker_id = :w ORDER BY heartbeat_at DESC LIMIT 1"
|
||||
),
|
||||
{"w": worker_id_unique},
|
||||
)
|
||||
assert res.scalar_one() == "available"
|
||||
assert r.status_code == 200, r.text
|
||||
hb = await fetch_latest_heartbeat(wid)
|
||||
assert hb is not None
|
||||
assert hb["status"] == "available"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_claim_204_when_empty(client, worker_token, cleanup_after):
|
||||
async def test_claim_204_when_empty(client, worker_token):
|
||||
r = await client.post(
|
||||
"/internal/worker/claim",
|
||||
json={"worker_id": "test-empty-1b", "job_type": "test-rec-1b-empty"},
|
||||
json={"worker_id": "test-ep-1b-empty", "job_type": "test-ep-1b-NOEXIST"},
|
||||
headers=_auth(worker_token),
|
||||
)
|
||||
assert r.status_code == 204
|
||||
# 정정 #4: body 0
|
||||
assert r.content == b""
|
||||
assert r.content == b"" # 정정 #4
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_claim_processing_transition(
|
||||
client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after
|
||||
client, worker_token, worker_id_with_capability, owner_id
|
||||
):
|
||||
# seed worker_capabilities + worker_jobs
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') "
|
||||
"ON CONFLICT (worker_id) DO NOTHING"
|
||||
),
|
||||
{"w": worker_id_unique, "u": owner_id},
|
||||
)
|
||||
jt = "test-rec-1b-claim"
|
||||
job_id = (
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_jobs (user_id, job_type, payload) "
|
||||
"VALUES (:u, :j, '{\"week\":\"2026-W20\"}'::jsonb) RETURNING id"
|
||||
),
|
||||
{"u": owner_id, "j": jt},
|
||||
)
|
||||
).scalar_one()
|
||||
await db_session.commit()
|
||||
wid = worker_id_with_capability
|
||||
jt = "test-ep-1b-claim"
|
||||
job_id = await insert_worker_job(owner_id, jt, payload='{"week":"2026-W20"}')
|
||||
|
||||
r = await client.post(
|
||||
"/internal/worker/claim",
|
||||
json={"worker_id": worker_id_unique, "job_type": jt},
|
||||
json={"worker_id": wid, "job_type": jt},
|
||||
headers=_auth(worker_token),
|
||||
)
|
||||
assert r.status_code == 200
|
||||
assert r.status_code == 200, r.text
|
||||
js = r.json()
|
||||
assert js["id"] == job_id
|
||||
assert js["job_type"] == jt
|
||||
assert js["attempts"] == 1
|
||||
assert js["payload"]["week"] == "2026-W20"
|
||||
|
||||
res = await db_session.execute(
|
||||
text("SELECT status, worker_id, attempts FROM worker_jobs WHERE id = :i"),
|
||||
{"i": job_id},
|
||||
)
|
||||
status_, w_id, attempts = res.first()
|
||||
assert status_ == "processing"
|
||||
assert w_id == worker_id_unique
|
||||
assert attempts == 1
|
||||
job = await fetch_worker_job(job_id)
|
||||
assert job["status"] == "processing"
|
||||
assert job["worker_id"] == wid
|
||||
assert job["attempts"] == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_result_completed(
|
||||
client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after
|
||||
):
|
||||
# seed
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') "
|
||||
"ON CONFLICT (worker_id) DO NOTHING"
|
||||
),
|
||||
{"w": worker_id_unique, "u": owner_id},
|
||||
async def test_result_completed(client, worker_token, worker_id_with_capability, owner_id):
|
||||
wid = worker_id_with_capability
|
||||
job_id = await insert_worker_job(
|
||||
owner_id,
|
||||
"test-ep-1b-result",
|
||||
status="processing",
|
||||
worker_id=wid,
|
||||
attempts=1,
|
||||
claimed=True,
|
||||
)
|
||||
jt = "test-rec-1b-result"
|
||||
job_id = (
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_jobs (user_id, job_type, status, worker_id, "
|
||||
"attempts, claimed_at) VALUES (:u, :j, 'processing', :w, 1, NOW()) RETURNING id"
|
||||
),
|
||||
{"u": owner_id, "j": jt, "w": worker_id_unique},
|
||||
)
|
||||
).scalar_one()
|
||||
await db_session.commit()
|
||||
|
||||
r = await client.post(
|
||||
"/internal/worker/result",
|
||||
json={
|
||||
"job_id": job_id,
|
||||
"worker_id": worker_id_unique,
|
||||
"worker_id": wid,
|
||||
"status": "completed",
|
||||
"result": {"summary": "test"},
|
||||
},
|
||||
headers=_auth(worker_token),
|
||||
)
|
||||
assert r.status_code == 200
|
||||
res = await db_session.execute(
|
||||
text("SELECT status, result FROM worker_jobs WHERE id = :i"), {"i": job_id}
|
||||
)
|
||||
s, result_jsonb = res.first()
|
||||
assert s == "completed"
|
||||
assert result_jsonb == {"summary": "test"}
|
||||
assert r.status_code == 200, r.text
|
||||
job = await fetch_worker_job(job_id)
|
||||
assert job["status"] == "completed"
|
||||
assert job["result"] == {"summary": "test"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_drain_heartbeat_insert(
|
||||
client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after
|
||||
):
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') "
|
||||
"ON CONFLICT (worker_id) DO NOTHING"
|
||||
),
|
||||
{"w": worker_id_unique, "u": owner_id},
|
||||
)
|
||||
await db_session.commit()
|
||||
|
||||
async def test_drain_heartbeat_insert(client, worker_token, worker_id_with_capability):
|
||||
wid = worker_id_with_capability
|
||||
r = await client.post(
|
||||
"/internal/worker/drain",
|
||||
json={"worker_id": worker_id_unique, "reason": "sleep"},
|
||||
json={"worker_id": wid, "reason": "sleep"},
|
||||
headers=_auth(worker_token),
|
||||
)
|
||||
assert r.status_code == 200
|
||||
res = await db_session.execute(
|
||||
text(
|
||||
"SELECT status, raw_payload FROM worker_heartbeats WHERE worker_id = :w "
|
||||
"ORDER BY heartbeat_at DESC LIMIT 1"
|
||||
),
|
||||
{"w": worker_id_unique},
|
||||
)
|
||||
s, rp = res.first()
|
||||
assert s == "draining"
|
||||
assert rp == {"reason": "sleep"}
|
||||
hb = await fetch_latest_heartbeat(wid)
|
||||
assert hb is not None
|
||||
assert hb["status"] == "draining"
|
||||
assert hb["raw_payload"] == {"reason": "sleep"}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""PR-Worker-Pool-Registry-1B 정정 #2 — /result 소유권 검증.
|
||||
|
||||
worker A 가 claim 한 job 을 worker B 가 /result 호출 → 404.
|
||||
worker A 가 claim 한 job 을 worker B 가 /result → 404.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -15,61 +15,46 @@ import pytest_asyncio
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||||
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from _worker_pool_helpers import (
|
||||
cleanup_worker_capabilities,
|
||||
cleanup_worker_jobs,
|
||||
ensure_user,
|
||||
get_database_url,
|
||||
fetch_worker_job,
|
||||
insert_worker_capability,
|
||||
insert_worker_job,
|
||||
mint_access_token,
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def db_session(monkeypatch):
|
||||
async def env_setup(monkeypatch):
|
||||
monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
|
||||
engine = create_async_engine(get_database_url())
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
yield session
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_token(db_session):
|
||||
await ensure_user(db_session, "laptop-worker-bot")
|
||||
async def worker_token(env_setup):
|
||||
await ensure_user("laptop-worker-bot")
|
||||
return mint_access_token("laptop-worker-bot")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_other_worker_cannot_submit_result(db_session, worker_token):
|
||||
"""worker A 의 processing job → worker B 가 /result → 404."""
|
||||
async def test_other_worker_cannot_submit_result(worker_token):
|
||||
from main import app
|
||||
|
||||
owner_id = await ensure_user(db_session, "test-owner-own-1b")
|
||||
owner_id = await ensure_user("test-owner-own-1b")
|
||||
w_a = f"test-own-1b-a-{uuid.uuid4().hex[:6]}"
|
||||
w_b = f"test-own-1b-b-{uuid.uuid4().hex[:6]}"
|
||||
for w in (w_a, w_b):
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')"
|
||||
),
|
||||
{"w": w, "u": owner_id},
|
||||
)
|
||||
job_id = (
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_jobs (user_id, job_type, status, worker_id, "
|
||||
"attempts, claimed_at) VALUES (:u, 'test-own-1b', 'processing', :w, 1, NOW()) "
|
||||
"RETURNING id"
|
||||
),
|
||||
{"u": owner_id, "w": w_a},
|
||||
)
|
||||
).scalar_one()
|
||||
await db_session.commit()
|
||||
await insert_worker_capability(w_a, owner_id)
|
||||
await insert_worker_capability(w_b, owner_id)
|
||||
job_id = await insert_worker_job(
|
||||
owner_id,
|
||||
"test-own-1b",
|
||||
status="processing",
|
||||
worker_id=w_a,
|
||||
attempts=1,
|
||||
claimed=True,
|
||||
)
|
||||
|
||||
try:
|
||||
async with AsyncClient(
|
||||
@@ -79,21 +64,16 @@ async def test_other_worker_cannot_submit_result(db_session, worker_token):
|
||||
"/internal/worker/result",
|
||||
json={
|
||||
"job_id": job_id,
|
||||
"worker_id": w_b,
|
||||
"worker_id": w_b, # 다른 worker
|
||||
"status": "completed",
|
||||
"result": {"ok": True},
|
||||
},
|
||||
headers={"Authorization": f"Bearer {worker_token}"},
|
||||
)
|
||||
assert r.status_code == 404, r.text
|
||||
# job 은 여전히 processing (worker_id=w_a)
|
||||
res = await db_session.execute(
|
||||
text("SELECT status, worker_id FROM worker_jobs WHERE id = :i"),
|
||||
{"i": job_id},
|
||||
)
|
||||
s, w_id = res.first()
|
||||
assert s == "processing"
|
||||
assert w_id == w_a
|
||||
job = await fetch_worker_job(job_id)
|
||||
assert job["status"] == "processing"
|
||||
assert job["worker_id"] == w_a
|
||||
finally:
|
||||
await cleanup_worker_jobs(db_session, "test-own-1b")
|
||||
await cleanup_worker_capabilities(db_session, "test-own-1b")
|
||||
await cleanup_worker_jobs("test-own-1b")
|
||||
await cleanup_worker_capabilities("test-own-1b")
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
"""PR-Worker-Pool-Registry-1B 정정 #3 — explicit /result failed 재시도 정책.
|
||||
"""PR-Worker-Pool-Registry-1B 정정 #3 — explicit /result failed 재시도.
|
||||
|
||||
3 case:
|
||||
A. initial attempts=0, max_attempts=3 → claim 후 attempts=1 → failed → status='pending' 복귀.
|
||||
(worker_id=NULL, claimed_at=NULL, completed_at=NULL, error_message 저장)
|
||||
B. initial attempts=2, max_attempts=3 → claim 후 attempts=3 → failed → final 'failed'.
|
||||
(completed_at=now)
|
||||
C. failed 요청에 result={"partial":"..."} 포함 → DB result IS NULL 유지 (request.result 무시).
|
||||
A. initial attempts=0, max=3 → claim 후 attempts=1 → failed → status='pending' 복귀.
|
||||
B. initial attempts=2, max=3 → claim 후 attempts=3 → failed → final 'failed'.
|
||||
C. failed 요청에 result={'partial':...} 포함 → DB result IS NULL 유지.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -20,87 +18,65 @@ import pytest_asyncio
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||||
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from _worker_pool_helpers import (
|
||||
cleanup_worker_capabilities,
|
||||
cleanup_worker_jobs,
|
||||
ensure_user,
|
||||
get_database_url,
|
||||
fetch_worker_job,
|
||||
insert_worker_capability,
|
||||
insert_worker_job,
|
||||
mint_access_token,
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def db_session(monkeypatch):
|
||||
async def env_setup(monkeypatch):
|
||||
monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
|
||||
engine = create_async_engine(get_database_url())
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
yield session
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_token(db_session):
|
||||
await ensure_user(db_session, "laptop-worker-bot")
|
||||
async def worker_token(env_setup):
|
||||
await ensure_user("laptop-worker-bot")
|
||||
return mint_access_token("laptop-worker-bot")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def owner_id(db_session):
|
||||
return await ensure_user(db_session, "test-owner-retry-1b")
|
||||
async def owner_id():
|
||||
return await ensure_user("test-owner-retry-1b")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_id_unique(db_session, owner_id):
|
||||
async def worker_id_unique(owner_id):
|
||||
wid = f"test-retry-1b-{uuid.uuid4().hex[:8]}"
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')"
|
||||
),
|
||||
{"w": wid, "u": owner_id},
|
||||
)
|
||||
await db_session.commit()
|
||||
await insert_worker_capability(wid, owner_id)
|
||||
yield wid
|
||||
await cleanup_worker_jobs(db_session, "test-retry-1b")
|
||||
await cleanup_worker_capabilities(db_session, wid)
|
||||
await cleanup_worker_jobs("test-retry-1b")
|
||||
await cleanup_worker_capabilities(wid)
|
||||
|
||||
|
||||
async def _seed_processing_job(
|
||||
db_session, owner_id: int, worker_id: str, attempts: int, max_attempts: int = 3
|
||||
owner_id: int, worker_id: str, attempts: int, max_attempts: int = 3
|
||||
) -> int:
|
||||
job_id = (
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_jobs (user_id, job_type, status, worker_id, "
|
||||
"attempts, max_attempts, claimed_at) "
|
||||
"VALUES (:u, :j, 'processing', :w, :a, :m, NOW()) RETURNING id"
|
||||
),
|
||||
{
|
||||
"u": owner_id,
|
||||
"j": f"test-retry-1b-{uuid.uuid4().hex[:8]}",
|
||||
"w": worker_id,
|
||||
"a": attempts,
|
||||
"m": max_attempts,
|
||||
},
|
||||
)
|
||||
).scalar_one()
|
||||
await db_session.commit()
|
||||
return job_id
|
||||
return await insert_worker_job(
|
||||
owner_id,
|
||||
f"test-retry-1b-{uuid.uuid4().hex[:8]}",
|
||||
status="processing",
|
||||
worker_id=worker_id,
|
||||
attempts=attempts,
|
||||
max_attempts=max_attempts,
|
||||
claimed=True,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_case_a_failed_then_pending(
|
||||
db_session, worker_token, worker_id_unique, owner_id
|
||||
):
|
||||
"""Case A: attempts(=1) < max_attempts(=3) → pending 복귀."""
|
||||
async def test_case_a_failed_then_pending(worker_token, worker_id_unique, owner_id):
|
||||
from main import app
|
||||
|
||||
job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=1)
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
||||
job_id = await _seed_processing_job(owner_id, worker_id_unique, attempts=1)
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as c:
|
||||
r = await c.post(
|
||||
"/internal/worker/result",
|
||||
json={
|
||||
@@ -112,31 +88,22 @@ async def test_case_a_failed_then_pending(
|
||||
headers={"Authorization": f"Bearer {worker_token}"},
|
||||
)
|
||||
assert r.status_code == 200, r.text
|
||||
|
||||
res = await db_session.execute(
|
||||
text(
|
||||
"SELECT status, worker_id, claimed_at, completed_at, error_message "
|
||||
"FROM worker_jobs WHERE id = :i"
|
||||
),
|
||||
{"i": job_id},
|
||||
)
|
||||
s, w_id, c_at, comp_at, em = res.first()
|
||||
assert s == "pending"
|
||||
assert w_id is None
|
||||
assert c_at is None
|
||||
assert comp_at is None
|
||||
assert em == "case A error"
|
||||
job = await fetch_worker_job(job_id)
|
||||
assert job["status"] == "pending"
|
||||
assert job["worker_id"] is None
|
||||
assert job["claimed_at"] is None
|
||||
assert job["completed_at"] is None
|
||||
assert job["error_message"] == "case A error"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_case_b_failed_max_attempts_final(
|
||||
db_session, worker_token, worker_id_unique, owner_id
|
||||
):
|
||||
"""Case B: attempts(=3) >= max_attempts(=3) → final 'failed' + completed_at."""
|
||||
async def test_case_b_failed_max_attempts_final(worker_token, worker_id_unique, owner_id):
|
||||
from main import app
|
||||
|
||||
job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=3)
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
||||
job_id = await _seed_processing_job(owner_id, worker_id_unique, attempts=3)
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as c:
|
||||
r = await c.post(
|
||||
"/internal/worker/result",
|
||||
json={
|
||||
@@ -148,25 +115,19 @@ async def test_case_b_failed_max_attempts_final(
|
||||
headers={"Authorization": f"Bearer {worker_token}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
|
||||
res = await db_session.execute(
|
||||
text("SELECT status, completed_at FROM worker_jobs WHERE id = :i"),
|
||||
{"i": job_id},
|
||||
)
|
||||
s, comp_at = res.first()
|
||||
assert s == "failed"
|
||||
assert comp_at is not None
|
||||
job = await fetch_worker_job(job_id)
|
||||
assert job["status"] == "failed"
|
||||
assert job["completed_at"] is not None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_case_c_failed_ignores_result_field(
|
||||
db_session, worker_token, worker_id_unique, owner_id
|
||||
):
|
||||
"""Case C: failed 요청 result={'partial':...} 포함해도 DB result IS NULL 유지."""
|
||||
async def test_case_c_failed_ignores_result_field(worker_token, worker_id_unique, owner_id):
|
||||
from main import app
|
||||
|
||||
job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=1)
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
|
||||
job_id = await _seed_processing_job(owner_id, worker_id_unique, attempts=1)
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as c:
|
||||
r = await c.post(
|
||||
"/internal/worker/result",
|
||||
json={
|
||||
@@ -179,7 +140,5 @@ async def test_case_c_failed_ignores_result_field(
|
||||
headers={"Authorization": f"Bearer {worker_token}"},
|
||||
)
|
||||
assert r.status_code == 200
|
||||
res = await db_session.execute(
|
||||
text("SELECT result FROM worker_jobs WHERE id = :i"), {"i": job_id}
|
||||
)
|
||||
assert res.scalar_one() is None # 정정 #3 엄격 규칙
|
||||
job = await fetch_worker_job(job_id)
|
||||
assert job["result"] is None # 정정 #3 엄격 규칙
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
"""PR-Worker-Pool-Registry-1B — /claim 동시성 (SKIP LOCKED) + 204 body 검증.
|
||||
|
||||
2 항목:
|
||||
1. 두 async session 동시 claim → 한쪽만 200, 다른 쪽 204 + body 0 (정정 #4)
|
||||
2. queue empty + 별도 호출 → 204 + Content-Length: 0 명시
|
||||
1. 두 client 동시 claim → 한쪽 200, 다른 쪽 204 + body 0 (정정 #4)
|
||||
2. queue empty 호출 → 204 + body 0 explicit
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -18,68 +18,48 @@ import pytest_asyncio
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||||
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from _worker_pool_helpers import (
|
||||
cleanup_worker_capabilities,
|
||||
cleanup_worker_jobs,
|
||||
ensure_user,
|
||||
get_database_url,
|
||||
insert_worker_capability,
|
||||
insert_worker_job,
|
||||
mint_access_token,
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def db_session(monkeypatch):
|
||||
async def env_setup(monkeypatch):
|
||||
monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot")
|
||||
engine = create_async_engine(get_database_url())
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
yield session
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_token(db_session):
|
||||
await ensure_user(db_session, "laptop-worker-bot")
|
||||
async def worker_token(env_setup):
|
||||
await ensure_user("laptop-worker-bot")
|
||||
return mint_access_token("laptop-worker-bot")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def owner_id(db_session):
|
||||
return await ensure_user(db_session, "test-owner-skip-1b")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skip_locked_only_one_winner(db_session, worker_token, owner_id):
|
||||
"""두 client 가 동시에 동일 job_type claim → 한쪽 200, 다른 쪽 204."""
|
||||
async def test_skip_locked_only_one_winner(worker_token):
|
||||
"""두 client 동시 동일 job_type claim → 한쪽 200, 다른 쪽 204."""
|
||||
from main import app
|
||||
|
||||
owner_id = await ensure_user("test-owner-skip-1b")
|
||||
jt = f"test-skip-1b-{uuid.uuid4().hex[:8]}"
|
||||
w1 = f"test-skip-1b-w1-{uuid.uuid4().hex[:6]}"
|
||||
w2 = f"test-skip-1b-w2-{uuid.uuid4().hex[:6]}"
|
||||
|
||||
# seed capability + 1 pending job
|
||||
for w in (w1, w2):
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') "
|
||||
"ON CONFLICT (worker_id) DO NOTHING"
|
||||
),
|
||||
{"w": w, "u": owner_id},
|
||||
)
|
||||
await db_session.execute(
|
||||
text("INSERT INTO worker_jobs (user_id, job_type) VALUES (:u, :j)"),
|
||||
{"u": owner_id, "j": jt},
|
||||
)
|
||||
await db_session.commit()
|
||||
await insert_worker_capability(w, owner_id)
|
||||
await insert_worker_job(owner_id, jt)
|
||||
|
||||
headers = {"Authorization": f"Bearer {worker_token}"}
|
||||
try:
|
||||
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c1, \
|
||||
AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c2:
|
||||
async with AsyncClient(
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as c1, AsyncClient(
|
||||
transport=ASGITransport(app=app), base_url="http://test"
|
||||
) as c2:
|
||||
r1, r2 = await asyncio.gather(
|
||||
c1.post(
|
||||
"/internal/worker/claim",
|
||||
@@ -95,16 +75,15 @@ async def test_skip_locked_only_one_winner(db_session, worker_token, owner_id):
|
||||
codes = sorted([r1.status_code, r2.status_code])
|
||||
assert codes == [200, 204], f"unexpected codes: {codes}"
|
||||
loser = r1 if r1.status_code == 204 else r2
|
||||
# 정정 #4: 204 body 빈 검증
|
||||
assert loser.content == b""
|
||||
assert loser.content == b"" # 정정 #4
|
||||
finally:
|
||||
await cleanup_worker_jobs(db_session, "test-skip-1b")
|
||||
await cleanup_worker_capabilities(db_session, "test-skip-1b-w")
|
||||
await cleanup_worker_jobs("test-skip-1b")
|
||||
await cleanup_worker_capabilities("test-skip-1b-w")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_claim_204_body_explicit_empty(db_session, worker_token):
|
||||
"""queue empty 호출 → 204 + content empty (정정 #4 명시적)."""
|
||||
async def test_claim_204_body_explicit_empty(worker_token):
|
||||
"""queue empty 호출 → 204 + body 0."""
|
||||
from main import app
|
||||
|
||||
async with AsyncClient(
|
||||
@@ -117,4 +96,3 @@ async def test_claim_204_body_explicit_empty(db_session, worker_token):
|
||||
)
|
||||
assert r.status_code == 204
|
||||
assert r.content == b""
|
||||
# Content-Length 헤더는 ASGI 가 자동 0 으로 세팅. 없을 수도 있으므로 content 만 보장.
|
||||
|
||||
+70
-107
@@ -1,8 +1,8 @@
|
||||
"""PR-Worker-Pool-Registry-1B — worker_jobs ORM/schema 단위 검증.
|
||||
|
||||
3 항목 (endpoint test 와 중복 회피, schema-level 만):
|
||||
1. CHECK constraint — status 가 enum 4 외 값일 때 INSERT 거부 (정정 #5)
|
||||
2. partial unique-index 동작 검증 — pending row 만 색인 (idx_worker_jobs_pending_type)
|
||||
3 항목 (endpoint test 와 중복 회피):
|
||||
1. CHECK constraint — status enum 4 외 값 INSERT → IntegrityError (정정 #5)
|
||||
2. partial unique-index 존재 검증 (idx_worker_jobs_pending_type)
|
||||
3. ON DELETE SET NULL — worker_capabilities 삭제 시 worker_jobs.worker_id 자동 NULL
|
||||
"""
|
||||
|
||||
@@ -25,122 +25,85 @@ from _worker_pool_helpers import (
|
||||
cleanup_worker_capabilities,
|
||||
cleanup_worker_jobs,
|
||||
ensure_user,
|
||||
fetch_worker_job,
|
||||
get_database_url,
|
||||
insert_worker_capability,
|
||||
insert_worker_job,
|
||||
)
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def db_session():
|
||||
async def owner_id():
|
||||
return await ensure_user("test-owner-smoke-1b")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_constraint_rejects_unknown_status(owner_id):
|
||||
"""정정 #5: status 가 enum 4 외 값이면 IntegrityError."""
|
||||
engine = create_async_engine(get_database_url())
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
yield session
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def owner_id(db_session):
|
||||
return await ensure_user(db_session, "test-owner-smoke-1b")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture
|
||||
async def worker_id_unique(db_session, owner_id):
|
||||
wid = f"test-smoke-1b-{uuid.uuid4().hex[:8]}"
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')"
|
||||
),
|
||||
{"w": wid, "u": owner_id},
|
||||
)
|
||||
await db_session.commit()
|
||||
yield wid
|
||||
await cleanup_worker_jobs(db_session, "test-smoke-1b")
|
||||
await cleanup_worker_capabilities(db_session, wid)
|
||||
try:
|
||||
with pytest.raises(IntegrityError):
|
||||
async with sm() as session:
|
||||
await session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_jobs (user_id, job_type, status) "
|
||||
"VALUES (:u, 'test-smoke-1b-bad', 'running')"
|
||||
),
|
||||
{"u": owner_id},
|
||||
)
|
||||
await session.commit()
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_constraint_rejects_unknown_status(db_session, owner_id, worker_id_unique):
|
||||
"""정정 #5: status 가 enum 4 외 값이면 IntegrityError."""
|
||||
with pytest.raises(IntegrityError):
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_jobs (user_id, job_type, status) "
|
||||
"VALUES (:u, 'test-smoke-1b-bad', 'running')"
|
||||
),
|
||||
{"u": owner_id},
|
||||
)
|
||||
await db_session.commit()
|
||||
await db_session.rollback()
|
||||
async def test_partial_pending_index_exists(owner_id):
|
||||
"""idx_worker_jobs_pending_type partial index 존재 검증."""
|
||||
engine = create_async_engine(get_database_url())
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
res = await session.execute(
|
||||
text(
|
||||
"SELECT indexname FROM pg_indexes "
|
||||
"WHERE tablename = 'worker_jobs' "
|
||||
"AND indexname = 'idx_worker_jobs_pending_type'"
|
||||
)
|
||||
)
|
||||
assert res.scalar_one_or_none() == "idx_worker_jobs_pending_type"
|
||||
|
||||
# 정의에 WHERE status = 'pending' 포함 확인
|
||||
res2 = await session.execute(
|
||||
text(
|
||||
"SELECT indexdef FROM pg_indexes "
|
||||
"WHERE indexname = 'idx_worker_jobs_pending_type'"
|
||||
)
|
||||
)
|
||||
indexdef = res2.scalar_one()
|
||||
assert "WHERE" in indexdef
|
||||
assert "pending" in indexdef
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_partial_pending_index_used_for_claim_query(db_session, owner_id, worker_id_unique):
|
||||
"""partial index idx_worker_jobs_pending_type 가 pending claim 쿼리 실행계획에 사용."""
|
||||
# seed 2 rows: pending + completed
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_jobs (user_id, job_type, status) "
|
||||
"VALUES (:u, 'test-smoke-1b-idx', 'pending'), "
|
||||
" (:u, 'test-smoke-1b-idx', 'completed')"
|
||||
),
|
||||
{"u": owner_id},
|
||||
)
|
||||
await db_session.commit()
|
||||
|
||||
# EXPLAIN ANALYZE 가 partial index 사용하는지 확인 (운영 환경에선 seq scan 가능 — 본 테스트는 인덱스 정의 존재만 보장)
|
||||
res = await db_session.execute(
|
||||
text(
|
||||
"SELECT indexname FROM pg_indexes "
|
||||
"WHERE tablename = 'worker_jobs' AND indexname = 'idx_worker_jobs_pending_type'"
|
||||
)
|
||||
)
|
||||
assert res.scalar_one_or_none() == "idx_worker_jobs_pending_type"
|
||||
|
||||
# pending row 만 SELECT 됨 (실제 동작 검증)
|
||||
res = await db_session.execute(
|
||||
text(
|
||||
"SELECT count(*) FROM worker_jobs "
|
||||
"WHERE status = 'pending' AND job_type = 'test-smoke-1b-idx'"
|
||||
)
|
||||
)
|
||||
assert res.scalar_one() == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_on_delete_set_null_when_capability_dropped(db_session, owner_id):
|
||||
"""worker_capabilities 삭제 시 worker_jobs.worker_id 자동 NULL (ON DELETE SET NULL)."""
|
||||
async def test_on_delete_set_null_when_capability_dropped(owner_id):
|
||||
"""worker_capabilities 삭제 시 worker_jobs.worker_id 자동 NULL."""
|
||||
wid = f"test-smoke-1b-del-{uuid.uuid4().hex[:8]}"
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
||||
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')"
|
||||
),
|
||||
{"w": wid, "u": owner_id},
|
||||
await insert_worker_capability(wid, owner_id)
|
||||
job_id = await insert_worker_job(
|
||||
owner_id,
|
||||
"test-smoke-1b-del",
|
||||
status="completed",
|
||||
worker_id=wid,
|
||||
attempts=1,
|
||||
)
|
||||
job_id = (
|
||||
await db_session.execute(
|
||||
text(
|
||||
"INSERT INTO worker_jobs (user_id, job_type, status, worker_id, attempts) "
|
||||
"VALUES (:u, 'test-smoke-1b-del', 'completed', :w, 1) RETURNING id"
|
||||
),
|
||||
{"u": owner_id, "w": wid},
|
||||
)
|
||||
).scalar_one()
|
||||
await db_session.commit()
|
||||
|
||||
# worker_heartbeats CASCADE 가 worker_capabilities 삭제 차단할 수 있으니 사전 정리
|
||||
await db_session.execute(
|
||||
text("DELETE FROM worker_heartbeats WHERE worker_id = :w"), {"w": wid}
|
||||
)
|
||||
await db_session.execute(text("DELETE FROM worker_capabilities WHERE worker_id = :w"), {"w": wid})
|
||||
await db_session.commit()
|
||||
|
||||
res = await db_session.execute(
|
||||
text("SELECT worker_id FROM worker_jobs WHERE id = :i"), {"i": job_id}
|
||||
)
|
||||
assert res.scalar_one() is None
|
||||
|
||||
# cleanup
|
||||
await db_session.execute(text("DELETE FROM worker_jobs WHERE id = :i"), {"i": job_id})
|
||||
await db_session.commit()
|
||||
try:
|
||||
# cleanup_worker_capabilities 도 worker_heartbeats CASCADE 처리
|
||||
await cleanup_worker_capabilities(wid)
|
||||
job = await fetch_worker_job(job_id)
|
||||
assert job is not None
|
||||
assert job["worker_id"] is None # ON DELETE SET NULL
|
||||
finally:
|
||||
await cleanup_worker_jobs("test-smoke-1b")
|
||||
|
||||
Reference in New Issue
Block a user