"""PR-Worker-Pool-Registry-1B 통합 테스트용 헬퍼. 각 helper 가 자체 engine + session 사용. fixture 의 db_session 과 격리하여 asyncpg "another operation in progress" race 회피. 호출 site 는 단순: user_id = await ensure_user("test-user") await cleanup_worker_jobs("test-prefix") """ from __future__ import annotations import os import secrets import sys sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from datetime import datetime, timedelta, timezone from jose import jwt from sqlalchemy import text from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from sqlalchemy.pool import NullPool def get_database_url() -> str: return os.getenv( "DATABASE_URL", "postgresql+asyncpg://pkm:pkm@postgres:5432/pkm", ) def _make_engine(): """매 helper 호출 시 새 engine + NullPool (connection 격리, dispose 안전).""" return create_async_engine(get_database_url(), poolclass=NullPool) def mint_access_token(username: str, expires_minutes: int = 60) -> str: """test 용 JWT access token (`core.auth.create_access_token` 동형).""" from core.config import settings now = datetime.now(timezone.utc) payload = { "sub": username, "exp": now + timedelta(minutes=expires_minutes), "iat": int(now.timestamp()), "type": "access", } return jwt.encode(payload, settings.jwt_secret, algorithm="HS256") async def ensure_user(username: str, is_active: bool = True) -> int: """users row 존재 보장 + id 반환. 자체 engine 사용 (fixture session 비격리).""" from core.auth import hash_password engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: result = await session.execute( text("SELECT id FROM users WHERE username = :u"), {"u": username} ) row = result.first() if row is not None: return int(row[0]) h = hash_password(secrets.token_urlsafe(32)) inserted = await session.execute( text( "INSERT INTO users (username, password_hash, is_active, password_changed_at) " "VALUES (:u, :h, :a, NOW()) RETURNING id" ), {"u": username, "h": h, "a": is_active}, ) new_id = int(inserted.scalar_one()) await session.commit() return new_id finally: await engine.dispose() async def cleanup_worker_jobs(job_type_prefix: str) -> None: engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: await session.execute( text("DELETE FROM worker_jobs WHERE job_type LIKE :p"), {"p": f"{job_type_prefix}%"}, ) await session.commit() finally: await engine.dispose() async def cleanup_worker_capabilities(worker_id_prefix: str) -> None: engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: await session.execute( text("DELETE FROM worker_heartbeats WHERE worker_id LIKE :p"), {"p": f"{worker_id_prefix}%"}, ) await session.execute( text("DELETE FROM worker_capabilities WHERE worker_id LIKE :p"), {"p": f"{worker_id_prefix}%"}, ) await session.commit() finally: await engine.dispose() async def insert_worker_capability(worker_id: str, user_id: int) -> None: """worker_capabilities row INSERT (FK source). 자체 engine.""" engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: await session.execute( text( "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " "ON CONFLICT (worker_id) DO NOTHING" ), {"w": worker_id, "u": user_id}, ) await session.commit() finally: await engine.dispose() async def insert_worker_job( user_id: int, job_type: str, status: str = "pending", worker_id: str | None = None, attempts: int = 0, max_attempts: int = 3, payload: str = "{}", claimed: bool = False, ) -> int: """worker_jobs row INSERT + id 반환. 자체 engine.""" engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: cols = ["user_id", "job_type", "status", "attempts", "max_attempts", "payload"] vals = {"u": user_id, "j": job_type, "s": status, "a": attempts, "m": max_attempts, "p": payload} placeholders = [":u", ":j", ":s", ":a", ":m", ":p::jsonb"] if worker_id is not None: cols.append("worker_id") vals["w"] = worker_id placeholders.append(":w") if claimed: cols.append("claimed_at") placeholders.append("NOW()") stmt = ( f"INSERT INTO worker_jobs ({', '.join(cols)}) " f"VALUES ({', '.join(placeholders)}) RETURNING id" ) res = await session.execute(text(stmt), vals) job_id = int(res.scalar_one()) await session.commit() return job_id finally: await engine.dispose() async def fetch_worker_job(job_id: int) -> dict | None: """worker_jobs row 1건 조회. 자체 engine.""" engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: res = await session.execute( text( "SELECT id, status, worker_id, attempts, claimed_at, completed_at, " "result, error_message FROM worker_jobs WHERE id = :i" ), {"i": job_id}, ) row = res.first() if row is None: return None return { "id": row[0], "status": row[1], "worker_id": row[2], "attempts": row[3], "claimed_at": row[4], "completed_at": row[5], "result": row[6], "error_message": row[7], } finally: await engine.dispose() async def fetch_latest_heartbeat(worker_id: str) -> dict | None: engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: res = await session.execute( text( "SELECT status, raw_payload FROM worker_heartbeats " "WHERE worker_id = :w ORDER BY heartbeat_at DESC LIMIT 1" ), {"w": worker_id}, ) row = res.first() if row is None: return None return {"status": row[0], "raw_payload": row[1]} finally: await engine.dispose() async def fetch_worker_capability(worker_id: str) -> dict | None: engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: res = await session.execute( text( "SELECT device_label, last_registered_at FROM worker_capabilities " "WHERE worker_id = :w" ), {"w": worker_id}, ) row = res.first() if row is None: return None return {"device_label": row[0], "last_registered_at": row[1]} finally: await engine.dispose() async def count_worker_capability(worker_id: str) -> int: engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) try: async with sm() as session: res = await session.execute( text( "SELECT count(*) FROM worker_capabilities WHERE worker_id = :w" ), {"w": worker_id}, ) return int(res.scalar_one()) finally: await engine.dispose()