0cbd97fcba
각 helper 가 자체 engine + NullPool 사용 (connection 격리). fixture chain 의 asyncpg "another operation in progress" race 회피. 호출 site 단순화. 같은 파일 sequential 실행 시 module-level app + global engine pool 충돌은 별 follow-up `PR-Worker-Pool-Test-Fixture-Isolation` (P3) 영역. 단독 PASS 검증: auth 5/5 + smoke 3/3 + ownership 1/1. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
253 lines
8.4 KiB
Python
253 lines
8.4 KiB
Python
"""PR-Worker-Pool-Registry-1B 통합 테스트용 헬퍼.
|
|
|
|
각 helper 가 자체 engine + session 사용. fixture 의 db_session 과 격리하여
|
|
asyncpg "another operation in progress" race 회피. 호출 site 는 단순:
|
|
|
|
user_id = await ensure_user("test-user")
|
|
await cleanup_worker_jobs("test-prefix")
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import secrets
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from jose import jwt
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
|
from sqlalchemy.pool import NullPool
|
|
|
|
|
|
def get_database_url() -> str:
|
|
return os.getenv(
|
|
"DATABASE_URL",
|
|
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
|
|
)
|
|
|
|
|
|
def _make_engine():
|
|
"""매 helper 호출 시 새 engine + NullPool (connection 격리, dispose 안전)."""
|
|
return create_async_engine(get_database_url(), poolclass=NullPool)
|
|
|
|
|
|
def mint_access_token(username: str, expires_minutes: int = 60) -> str:
|
|
"""test 용 JWT access token (`core.auth.create_access_token` 동형)."""
|
|
from core.config import settings
|
|
|
|
now = datetime.now(timezone.utc)
|
|
payload = {
|
|
"sub": username,
|
|
"exp": now + timedelta(minutes=expires_minutes),
|
|
"iat": int(now.timestamp()),
|
|
"type": "access",
|
|
}
|
|
return jwt.encode(payload, settings.jwt_secret, algorithm="HS256")
|
|
|
|
|
|
async def ensure_user(username: str, is_active: bool = True) -> int:
|
|
"""users row 존재 보장 + id 반환. 자체 engine 사용 (fixture session 비격리)."""
|
|
from core.auth import hash_password
|
|
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
result = await session.execute(
|
|
text("SELECT id FROM users WHERE username = :u"), {"u": username}
|
|
)
|
|
row = result.first()
|
|
if row is not None:
|
|
return int(row[0])
|
|
h = hash_password(secrets.token_urlsafe(32))
|
|
inserted = await session.execute(
|
|
text(
|
|
"INSERT INTO users (username, password_hash, is_active, password_changed_at) "
|
|
"VALUES (:u, :h, :a, NOW()) RETURNING id"
|
|
),
|
|
{"u": username, "h": h, "a": is_active},
|
|
)
|
|
new_id = int(inserted.scalar_one())
|
|
await session.commit()
|
|
return new_id
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
async def cleanup_worker_jobs(job_type_prefix: str) -> None:
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
await session.execute(
|
|
text("DELETE FROM worker_jobs WHERE job_type LIKE :p"),
|
|
{"p": f"{job_type_prefix}%"},
|
|
)
|
|
await session.commit()
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
async def cleanup_worker_capabilities(worker_id_prefix: str) -> None:
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
await session.execute(
|
|
text("DELETE FROM worker_heartbeats WHERE worker_id LIKE :p"),
|
|
{"p": f"{worker_id_prefix}%"},
|
|
)
|
|
await session.execute(
|
|
text("DELETE FROM worker_capabilities WHERE worker_id LIKE :p"),
|
|
{"p": f"{worker_id_prefix}%"},
|
|
)
|
|
await session.commit()
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
async def insert_worker_capability(worker_id: str, user_id: int) -> None:
|
|
"""worker_capabilities row INSERT (FK source). 자체 engine."""
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
await session.execute(
|
|
text(
|
|
"INSERT INTO worker_capabilities (worker_id, user_id, device_label, "
|
|
"worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') "
|
|
"ON CONFLICT (worker_id) DO NOTHING"
|
|
),
|
|
{"w": worker_id, "u": user_id},
|
|
)
|
|
await session.commit()
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
async def insert_worker_job(
|
|
user_id: int,
|
|
job_type: str,
|
|
status: str = "pending",
|
|
worker_id: str | None = None,
|
|
attempts: int = 0,
|
|
max_attempts: int = 3,
|
|
payload: str = "{}",
|
|
claimed: bool = False,
|
|
) -> int:
|
|
"""worker_jobs row INSERT + id 반환. 자체 engine."""
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
cols = ["user_id", "job_type", "status", "attempts", "max_attempts", "payload"]
|
|
vals = {"u": user_id, "j": job_type, "s": status, "a": attempts, "m": max_attempts, "p": payload}
|
|
placeholders = [":u", ":j", ":s", ":a", ":m", ":p::jsonb"]
|
|
if worker_id is not None:
|
|
cols.append("worker_id")
|
|
vals["w"] = worker_id
|
|
placeholders.append(":w")
|
|
if claimed:
|
|
cols.append("claimed_at")
|
|
placeholders.append("NOW()")
|
|
stmt = (
|
|
f"INSERT INTO worker_jobs ({', '.join(cols)}) "
|
|
f"VALUES ({', '.join(placeholders)}) RETURNING id"
|
|
)
|
|
res = await session.execute(text(stmt), vals)
|
|
job_id = int(res.scalar_one())
|
|
await session.commit()
|
|
return job_id
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
async def fetch_worker_job(job_id: int) -> dict | None:
|
|
"""worker_jobs row 1건 조회. 자체 engine."""
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
res = await session.execute(
|
|
text(
|
|
"SELECT id, status, worker_id, attempts, claimed_at, completed_at, "
|
|
"result, error_message FROM worker_jobs WHERE id = :i"
|
|
),
|
|
{"i": job_id},
|
|
)
|
|
row = res.first()
|
|
if row is None:
|
|
return None
|
|
return {
|
|
"id": row[0],
|
|
"status": row[1],
|
|
"worker_id": row[2],
|
|
"attempts": row[3],
|
|
"claimed_at": row[4],
|
|
"completed_at": row[5],
|
|
"result": row[6],
|
|
"error_message": row[7],
|
|
}
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
async def fetch_latest_heartbeat(worker_id: str) -> dict | None:
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
res = await session.execute(
|
|
text(
|
|
"SELECT status, raw_payload FROM worker_heartbeats "
|
|
"WHERE worker_id = :w ORDER BY heartbeat_at DESC LIMIT 1"
|
|
),
|
|
{"w": worker_id},
|
|
)
|
|
row = res.first()
|
|
if row is None:
|
|
return None
|
|
return {"status": row[0], "raw_payload": row[1]}
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
async def fetch_worker_capability(worker_id: str) -> dict | None:
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
res = await session.execute(
|
|
text(
|
|
"SELECT device_label, last_registered_at FROM worker_capabilities "
|
|
"WHERE worker_id = :w"
|
|
),
|
|
{"w": worker_id},
|
|
)
|
|
row = res.first()
|
|
if row is None:
|
|
return None
|
|
return {"device_label": row[0], "last_registered_at": row[1]}
|
|
finally:
|
|
await engine.dispose()
|
|
|
|
|
|
async def count_worker_capability(worker_id: str) -> int:
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
try:
|
|
async with sm() as session:
|
|
res = await session.execute(
|
|
text(
|
|
"SELECT count(*) FROM worker_capabilities WHERE worker_id = :w"
|
|
),
|
|
{"w": worker_id},
|
|
)
|
|
return int(res.scalar_one())
|
|
finally:
|
|
await engine.dispose()
|