diff --git a/app/api/auth.py b/app/api/auth.py index 110cd9b..a2f3a7a 100644 --- a/app/api/auth.py +++ b/app/api/auth.py @@ -15,6 +15,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from core.auth import ( REFRESH_TOKEN_EXPIRE_DAYS, create_access_token, + create_laptop_worker_bot_token, create_refresh_token, create_voice_memo_bot_token, decode_token, @@ -124,6 +125,11 @@ async def login( if bot_token is not None: return AccessTokenResponse(access_token=bot_token) + # PR-Worker-Pool-Registry-1B — laptop-worker-bot 한정 long-expiry token (voice-memo 분기 우선 평가). + laptop_bot_token = create_laptop_worker_bot_token(user.username) + if laptop_bot_token is not None: + return AccessTokenResponse(access_token=laptop_bot_token) + # refresh token → HttpOnly cookie _set_refresh_cookie(response, create_refresh_token(user.username)) diff --git a/app/api/internal_worker.py b/app/api/internal_worker.py index ed5b6dd..b5c6b56 100644 --- a/app/api/internal_worker.py +++ b/app/api/internal_worker.py @@ -1,49 +1,262 @@ -"""PR-Worker-Pool-Registry-1A scaffold: /internal/worker/* 라우트군 503 stub. +"""PR-Worker-Pool-Registry-1B: /internal/worker/* 5 endpoint 실 구현. -worker-pool-policy §8 의 5개 라우트 (register/heartbeat/claim/result/drain) 자리잡기. -실 동작 = PR-Worker-Pool-Registry-1B (laptop-worker-bot user + worker_jobs table + recap). +worker-pool-policy §B.2 invariant 매핑: + - inv 2: drain = heartbeat INSERT only (advisory). claim 거부 = Notebook-Pilot-1. + - inv 3: /result result = raw JSONB only. canonical promote 0. + - inv 4: ProcessingQueue 무변경 — worker_jobs 별 table. + - inv 5: 운영 자동 분기 변경 0 — heartbeat alive 판정 SQL 부재, classify_worker/queue_consumer touch 0. -1A 시점에는: - - 인증 dependency 없음 (503 first response 라 attack surface 0) - - Pydantic schema 없음 (1B 활성화 시 추가) - - 모든 endpoint = HTTP 503 + detail +사용자 review 정정 5개 (2026-05-19): + - #1: worker_jobs.user_id = job owner (실 사용자). worker 인증은 worker_id + JWT 별도. + - #2: /result 소유권 검증 (WHERE id AND worker_id AND status='processing'). 매칭 0건 → 404. + - #3: explicit failed 재시도 (attempts=max → final failed). + - #4: /claim 204 = Response(status_code=204) body 0. + - #5: mig 275 status CHECK ('pending','processing','completed','failed'). """ -from fastapi import APIRouter, HTTPException, status +from datetime import datetime, timezone +from typing import Annotated, Any + +from fastapi import APIRouter, Depends, HTTPException, Response, status +from pydantic import BaseModel +from sqlalchemy import select, update +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import require_worker_user +from core.database import get_session +from models.worker_pool import WorkerCapability, WorkerHeartbeat, WorkerJob router = APIRouter() -def _stub_503(endpoint: str) -> None: - raise HTTPException( - status_code=status.HTTP_503_SERVICE_UNAVAILABLE, - detail=( - f"/internal/worker/{endpoint} disabled " - "(Registry-1A stub; activates in Registry-1B)" - ), - ) +# ─── Pydantic schemas ─── + + +class WorkerRegisterRequest(BaseModel): + worker_id: str + device_label: str + worker_class: str + tier: str + capabilities: list[str] = [] + models_loaded: list[str] = [] + endpoint: str | None = None + + +class WorkerHeartbeatRequest(BaseModel): + worker_id: str + status: str # starting/available/busy/draining + current_job_id: int | None = None + battery: str | None = None + thermal: str | None = None + raw_payload: dict[str, Any] = {} + + +class WorkerClaimRequest(BaseModel): + worker_id: str + job_type: str + + +class WorkerClaimResponse(BaseModel): + id: int + job_type: str + payload: dict[str, Any] + attempts: int + + +class WorkerResultRequest(BaseModel): + job_id: int + worker_id: str # 정정 #2 — 소유권 검증 + status: str # completed | failed + result: dict[str, Any] | None = None + error_message: str | None = None + + +class WorkerDrainRequest(BaseModel): + worker_id: str + reason: str | None = None + + +# ─── 엔드포인트 ─── @router.post("/register") -async def register(): - _stub_503("register") +async def register( + body: WorkerRegisterRequest, + user: Annotated[Any, Depends(require_worker_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """worker_capabilities UPSERT — register 또는 capability 갱신.""" + now = datetime.now(timezone.utc) + stmt = pg_insert(WorkerCapability).values( + worker_id=body.worker_id, + user_id=user.id, + device_label=body.device_label, + worker_class=body.worker_class, + tier=body.tier, + capabilities=body.capabilities, + models_loaded=body.models_loaded, + endpoint=body.endpoint, + created_at=now, + last_registered_at=now, + ).on_conflict_do_update( + index_elements=["worker_id"], + set_={ + "device_label": body.device_label, + "worker_class": body.worker_class, + "tier": body.tier, + "capabilities": body.capabilities, + "models_loaded": body.models_loaded, + "endpoint": body.endpoint, + "last_registered_at": now, + }, + ) + await session.execute(stmt) + await session.commit() + return {"ok": True, "worker_id": body.worker_id} @router.post("/heartbeat") -async def heartbeat(): - _stub_503("heartbeat") +async def heartbeat( + body: WorkerHeartbeatRequest, + user: Annotated[Any, Depends(require_worker_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """worker_heartbeats append-only INSERT. + + inv 5 강제: alive 판정 SQL 부재. 본 endpoint 는 row 추가 + ok 반환만. + """ + hb = WorkerHeartbeat( + worker_id=body.worker_id, + status=body.status, + current_job_id=body.current_job_id, + battery=body.battery, + thermal=body.thermal, + raw_payload=body.raw_payload, + ) + session.add(hb) + await session.commit() + return {"ok": True} -@router.post("/claim") -async def claim(): - _stub_503("claim") +@router.post( + "/claim", + responses={ + 200: {"model": WorkerClaimResponse}, + 204: {"description": "queue empty"}, + }, +) +async def claim( + body: WorkerClaimRequest, + user: Annotated[Any, Depends(require_worker_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """SELECT FOR UPDATE SKIP LOCKED 로 pending job 1건 claim. + + 정정 #4: miss → Response(status_code=204) body 0. WorkerClaimResponse | None 회피. + """ + now = datetime.now(timezone.utc) + stmt = ( + select(WorkerJob) + .where(WorkerJob.status == "pending", WorkerJob.job_type == body.job_type) + .order_by(WorkerJob.created_at) + .limit(1) + .with_for_update(skip_locked=True) + ) + result = await session.execute(stmt) + job = result.scalar_one_or_none() + if job is None: + await session.commit() # FOR UPDATE 트랜잭션 해제 + return Response(status_code=204) + + job.status = "processing" + job.worker_id = body.worker_id + job.claimed_at = now + job.attempts = job.attempts + 1 + await session.commit() + + return WorkerClaimResponse( + id=job.id, + job_type=job.job_type, + payload=job.payload, + attempts=job.attempts, + ) @router.post("/result") -async def result(): - _stub_503("result") +async def result( + body: WorkerResultRequest, + user: Annotated[Any, Depends(require_worker_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """job 결과 제출. 정정 #2 (소유권) + #3 (재시도) 강제. + + 소유권 검증: WHERE id AND worker_id AND status='processing'. 매칭 0건 → 404. + completed: status='completed' + result + completed_at. + failed: + attempts < max_attempts → status='pending' (worker_id/claimed_at/completed_at NULL). + attempts >= max_attempts → status='failed' final + completed_at. + result 컬럼 절대 갱신 X — request.result 무시 (failed 시 partial result 저장 차단). + """ + if body.status not in ("completed", "failed"): + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="status must be 'completed' or 'failed'", + ) + + stmt = select(WorkerJob).where( + WorkerJob.id == body.job_id, + WorkerJob.worker_id == body.worker_id, + WorkerJob.status == "processing", + ) + res = await session.execute(stmt) + job = res.scalar_one_or_none() + if job is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="job not found or not owned by this worker (or not in processing)", + ) + + now = datetime.now(timezone.utc) + if body.status == "completed": + job.status = "completed" + job.result = body.result # raw JSONB (inv 3 — canonical promote 0) + job.completed_at = now + job.error_message = None + else: # failed + job.error_message = body.error_message + # 정정 #3 정책: result 컬럼 절대 갱신 X (request.result 무시) + if job.attempts < job.max_attempts: + job.status = "pending" + job.worker_id = None + job.claimed_at = None + job.completed_at = None + else: + job.status = "failed" + job.completed_at = now + + await session.commit() + return {"ok": True, "status": job.status, "attempts": job.attempts} @router.post("/drain") -async def drain(): - _stub_503("drain") +async def drain( + body: WorkerDrainRequest, + user: Annotated[Any, Depends(require_worker_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """drain = heartbeat INSERT status='draining' (advisory/audit only, inv 2). + + claim 거부 로직 부재 = Notebook-Pilot-1 영역. + """ + payload: dict[str, Any] = {} + if body.reason: + payload["reason"] = body.reason + hb = WorkerHeartbeat( + worker_id=body.worker_id, + status="draining", + raw_payload=payload, + ) + session.add(hb) + await session.commit() + return {"ok": True} diff --git a/app/core/auth.py b/app/core/auth.py index c0cf632..d35257f 100644 --- a/app/core/auth.py +++ b/app/core/auth.py @@ -51,6 +51,17 @@ def create_voice_memo_bot_token(username: str) -> str | None: return create_access_token(username, expires_minutes=expire_days * 24 * 60) +def create_laptop_worker_bot_token(username: str) -> str | None: + # PR-Worker-Pool-Registry-1B — laptop-worker-bot 계정 한정 long-expiry token (voice-memo 동형). + if os.getenv("LAPTOP_WORKER_BOT_TOKEN_ENABLED", "false").lower() != "true": + return None + bot_username = os.getenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + if username != bot_username: + return None + expire_days = int(os.getenv("LAPTOP_WORKER_BOT_TOKEN_EXPIRE_DAYS", "365")) + return create_access_token(username, expires_minutes=expire_days * 24 * 60) + + def create_refresh_token(subject: str) -> str: now = datetime.now(timezone.utc) expire = now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) @@ -129,3 +140,21 @@ async def require_admin( detail="관리자 권한 필요", ) return user + + +async def require_worker_user( + credentials: Annotated[HTTPAuthorizationCredentials, Depends(security)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """PR-Worker-Pool-Registry-1B — /internal/worker/* 인증. + + laptop-worker-bot 만 허용. voice-memo-bot 또는 일반 사용자 토큰 → 403. + """ + user = await get_current_user(credentials, session) + bot_username = os.getenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + if user.username != bot_username: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="worker user only", + ) + return user diff --git a/app/models/worker_pool.py b/app/models/worker_pool.py index 6da6f77..0ef9370 100644 --- a/app/models/worker_pool.py +++ b/app/models/worker_pool.py @@ -1,13 +1,11 @@ -"""worker_capabilities + worker_heartbeats 테이블 ORM (PR-Worker-Pool-Registry-1A). +"""worker_capabilities + worker_heartbeats + worker_jobs 테이블 ORM. -1A 단계: schema only. 라우트 5개 (register/heartbeat/claim/result/drain) 모두 503 stub. -실 활성화 + WorkerJob 모델은 1B 영역. 본 모듈 import 자체는 init_db 가 mig 270~274 적용 -후 안전 (테이블 존재 보장). +1A scaffold (mig 270~274) + 1B 활성화 (mig 275~276). 1B = WorkerJob 신규 + 5 endpoint 실 구현. """ from datetime import datetime -from sqlalchemy import BigInteger, DateTime, ForeignKey, Text +from sqlalchemy import BigInteger, DateTime, ForeignKey, SmallInteger, Text from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column @@ -50,3 +48,29 @@ class WorkerHeartbeat(Base): battery: Mapped[str | None] = mapped_column(Text) thermal: Mapped[str | None] = mapped_column(Text) raw_payload: Mapped[dict] = mapped_column(JSONB, default=dict, nullable=False) + + +class WorkerJob(Base): + # user_id = job owner user_id (실 사용자). worker bot 아님. worker 인증은 worker_id+JWT 별도. + # result = raw JSONB only (policy §B.2 invariant 3 — canonical promote = Notebook-Pilot-1). + __tablename__ = "worker_jobs" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + user_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("users.id"), nullable=False + ) + job_type: Mapped[str] = mapped_column(Text, nullable=False) + status: Mapped[str] = mapped_column(Text, nullable=False, default="pending") + worker_id: Mapped[str | None] = mapped_column( + Text, ForeignKey("worker_capabilities.worker_id") + ) + payload: Mapped[dict] = mapped_column(JSONB, default=dict, nullable=False) + result: Mapped[dict | None] = mapped_column(JSONB) + error_message: Mapped[str | None] = mapped_column(Text) + attempts: Mapped[int] = mapped_column(SmallInteger, default=0, nullable=False) + max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3, nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) + claimed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) diff --git a/credentials.env.example b/credentials.env.example index aefa347..1b31e28 100644 --- a/credentials.env.example +++ b/credentials.env.example @@ -57,6 +57,13 @@ LAW_OC= # B3 FP 검증 (true FP < 20%) 통과 후만 production 적용. VERIFIER_NUMERIC_PROMOTE=0 +# ─── PR-Worker-Pool-Registry-1B: laptop-worker-bot long-expiry token ─── +# voice-memo-bot 패턴 동형. password = scripts/seed_laptop_worker_bot.py 로 별도 INSERT. +# 활성화 = ENABLED=true 토글. 회전 = users.password_changed_at 갱신 시 JWT 자동 무효화. +LAPTOP_WORKER_BOT_TOKEN_ENABLED=false +LAPTOP_WORKER_BOT_USERNAME=laptop-worker-bot +LAPTOP_WORKER_BOT_TOKEN_EXPIRE_DAYS=365 + # ─── Phase 3.5 fix2: eval runner shared secret ─── # /ask 엔드포인트의 X-Source=eval / X-Eval-Case-Id 헤더 신뢰 검증 토큰. # 비어있거나 클라이언트 X-Eval-Token 와 불일치 시 eval 헤더 거부 (warning log + source='document_server' 강등). diff --git a/migrations/275_worker_jobs_table.sql b/migrations/275_worker_jobs_table.sql new file mode 100644 index 0000000..5d6df51 --- /dev/null +++ b/migrations/275_worker_jobs_table.sql @@ -0,0 +1,29 @@ +-- 2026-05-20 PR-Worker-Pool-Registry-1B: worker_jobs 테이블 신규. +-- worker-pool-policy §8 방향 (b) — ProcessingQueue 무변경, worker 전용 큐 분리. +-- §8 결정: document_id 컬럼 의도적 부재. memo/event recap + future code_review/long_summary +-- /repo_analysis 모두 document 단위 아님. context 는 payload JSONB 안에. +-- user_id 의미 = job owner user_id (실 사용자). worker bot 의 user_id 가 아님. +-- 1C recap context 가 user_id 기준 memos/events JOIN. worker 인증은 worker_id + JWT 별도. +-- §B.2 invariant 3: result = raw JSONB only (canonical promote 0, Notebook-Pilot-1 영역). +-- §B.2 invariant 4: ProcessingQueue 무변경 — 이 테이블은 document FK 없음. +-- claim 방식 = SELECT FOR UPDATE SKIP LOCKED (queue_consumer 의 단순 UPDATE 와 다름). +-- status CHECK constraint = 신규 table 부터 enum 문자열 drift 차단. +-- explicit /result failed 재시도 (attempts < max_attempts → pending 복귀) = 1B 영역. +-- stale recovery (timeout 기반) = Notebook-Pilot-1 영역. + +CREATE TABLE IF NOT EXISTS worker_jobs ( + id BIGSERIAL PRIMARY KEY, + user_id BIGINT NOT NULL REFERENCES users(id) ON DELETE RESTRICT, + job_type TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending' + CHECK (status IN ('pending', 'processing', 'completed', 'failed')), + worker_id TEXT REFERENCES worker_capabilities(worker_id) ON DELETE SET NULL, + payload JSONB NOT NULL DEFAULT '{}'::jsonb, + result JSONB, + error_message TEXT, + attempts SMALLINT NOT NULL DEFAULT 0, + max_attempts SMALLINT NOT NULL DEFAULT 3, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + claimed_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ +); diff --git a/migrations/276_worker_jobs_claim_idx.sql b/migrations/276_worker_jobs_claim_idx.sql new file mode 100644 index 0000000..b3a093c --- /dev/null +++ b/migrations/276_worker_jobs_claim_idx.sql @@ -0,0 +1,7 @@ +-- 2026-05-20 PR-Worker-Pool-Registry-1B: worker_jobs claim 핫패스 partial index. +-- claim 쿼리: WHERE status='pending' AND job_type=$1 ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED. +-- partial index = pending row 만 색인 (테이블 성장과 무관 — completed/failed row 누적해도 인덱스 크기 일정). + +CREATE INDEX IF NOT EXISTS idx_worker_jobs_pending_type + ON worker_jobs (job_type, created_at) + WHERE status = 'pending'; diff --git a/scripts/seed_laptop_worker_bot.py b/scripts/seed_laptop_worker_bot.py new file mode 100644 index 0000000..026464e --- /dev/null +++ b/scripts/seed_laptop_worker_bot.py @@ -0,0 +1,78 @@ +"""laptop-worker-bot 계정 생성 / 비밀번호 갱신 스크립트 (PR-Worker-Pool-Registry-1B) + +사용법: + docker compose exec fastapi env \\ + LAPTOP_WORKER_BOT_USERNAME=laptop-worker-bot \\ + LAPTOP_WORKER_BOT_PASSWORD='' \\ + python /app/scripts/seed_laptop_worker_bot.py + +env 우선순위: + - LAPTOP_WORKER_BOT_USERNAME (default 'laptop-worker-bot') + - LAPTOP_WORKER_BOT_PASSWORD (required, prompt fallback) + +worker-pool-policy §9 (Token 발급 path) Sec-1H 패턴 동형. +password_changed_at 갱신 시 기존 JWT 자동 무효화 (Sec-1H mig 269). +""" + +import asyncio +import getpass +import os +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + +from core.auth import hash_password + + +async def seed_laptop_worker_bot(): + database_url = os.getenv( + "DATABASE_URL", + "postgresql+asyncpg://pkm:pkm@localhost:5432/pkm", + ) + + username = os.getenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot").strip() + password = os.getenv("LAPTOP_WORKER_BOT_PASSWORD", "") + if not password: + password = getpass.getpass(f"'{username}' 비밀번호: ") + if not password: + print("비밀번호가 비어 있습니다.", file=sys.stderr) + sys.exit(1) + + password_hash = hash_password(password) + engine = create_async_engine(database_url) + async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + async with async_session() as session: + existing = await session.execute( + text("SELECT id FROM users WHERE username = :username"), + {"username": username}, + ) + row = existing.first() + if row is not None: + await session.execute( + text( + "UPDATE users SET password_hash = :hash, is_active = TRUE, " + "password_changed_at = NOW() WHERE username = :username" + ), + {"hash": password_hash, "username": username}, + ) + print(f"'{username}' 계정 비밀번호 갱신 + password_changed_at 동기화.") + else: + await session.execute( + text( + "INSERT INTO users (username, password_hash, is_active, password_changed_at) " + "VALUES (:username, :hash, TRUE, NOW())" + ), + {"username": username, "hash": password_hash}, + ) + print(f"'{username}' 계정 신규 생성.") + await session.commit() + + await engine.dispose() + + +if __name__ == "__main__": + asyncio.run(seed_laptop_worker_bot()) diff --git a/tests/_worker_pool_helpers.py b/tests/_worker_pool_helpers.py new file mode 100644 index 0000000..8478b2f --- /dev/null +++ b/tests/_worker_pool_helpers.py @@ -0,0 +1,92 @@ +"""PR-Worker-Pool-Registry-1B 통합 테스트용 헬퍼. + +각 테스트 파일이 import 해서 자체 fixture 안에서 호출. conftest.py 갱신은 회피 +(타 테스트 영향 가능성, [[feedback_residue_grep_live_vs_history]] 정신). + +사용 예: + from _worker_pool_helpers import ( + get_database_url, mint_access_token, + ensure_user, cleanup_worker_jobs, cleanup_worker_capabilities, + ) +""" + +from __future__ import annotations + +import os +import secrets +import sys + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from datetime import datetime, timedelta, timezone + +from jose import jwt +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + + +def get_database_url() -> str: + return os.getenv( + "DATABASE_URL", + "postgresql+asyncpg://pkm:pkm@postgres:5432/pkm", + ) + + +def mint_access_token(username: str, expires_minutes: int = 60) -> str: + """test 용 JWT access token (`core.auth.create_access_token` 와 동일 페이로드).""" + from core.config import settings + + now = datetime.now(timezone.utc) + payload = { + "sub": username, + "exp": now + timedelta(minutes=expires_minutes), + "iat": int(now.timestamp()), + "type": "access", + } + return jwt.encode(payload, settings.jwt_secret, algorithm="HS256") + + +async def ensure_user( + session: AsyncSession, username: str, is_active: bool = True +) -> int: + """users row 존재 보장 + id 반환. 비밀번호 = random bcrypt hash.""" + from core.auth import hash_password + + result = await session.execute( + text("SELECT id FROM users WHERE username = :u"), {"u": username} + ) + row = result.first() + if row is not None: + return int(row[0]) + + h = hash_password(secrets.token_urlsafe(32)) + inserted = await session.execute( + text( + "INSERT INTO users (username, password_hash, is_active, password_changed_at) " + "VALUES (:u, :h, :a, NOW()) RETURNING id" + ), + {"u": username, "h": h, "a": is_active}, + ) + new_id = int(inserted.scalar_one()) + await session.commit() + return new_id + + +async def cleanup_worker_jobs(session: AsyncSession, job_type_prefix: str) -> None: + await session.execute( + text("DELETE FROM worker_jobs WHERE job_type LIKE :p"), + {"p": f"{job_type_prefix}%"}, + ) + await session.commit() + + +async def cleanup_worker_capabilities(session: AsyncSession, worker_id_prefix: str) -> None: + await session.execute( + text("DELETE FROM worker_heartbeats WHERE worker_id LIKE :p"), + {"p": f"{worker_id_prefix}%"}, + ) + await session.execute( + text("DELETE FROM worker_capabilities WHERE worker_id LIKE :p"), + {"p": f"{worker_id_prefix}%"}, + ) + await session.commit() diff --git a/tests/test_internal_worker_authz.py b/tests/test_internal_worker_authz.py new file mode 100644 index 0000000..5bfb5bc --- /dev/null +++ b/tests/test_internal_worker_authz.py @@ -0,0 +1,75 @@ +"""PR-Worker-Pool-Registry-1B — /internal/worker/* 권한 분리 (정정 #2 보조). + +worker user 외 모든 사용자 = 403. + 1. voice-memo-bot JWT → 403 + 2. 일반 user JWT → 403 + +(401 = token 자체 invalid 시. 본 테스트는 토큰 자체는 유효 + 권한만 부족.) +""" + +from __future__ import annotations + +import os +import sys + +import pytest +import pytest_asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from httpx import ASGITransport, AsyncClient +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +from _worker_pool_helpers import ensure_user, get_database_url, mint_access_token + + +@pytest_asyncio.fixture +async def setup(monkeypatch): + monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + engine = create_async_engine(get_database_url()) + sm = async_sessionmaker(engine, expire_on_commit=False) + async with sm() as session: + await ensure_user(session, "voice-memo-bot") + await ensure_user(session, "test-regular-user-1b") + await engine.dispose() + + +@pytest_asyncio.fixture +async def client(): + from main import app + + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + ) as ac: + yield ac + + +@pytest.mark.asyncio +async def test_voice_memo_bot_jwt_rejected(client, setup): + """voice-memo-bot JWT 로 /internal/worker/register 호출 → 403.""" + token = mint_access_token("voice-memo-bot") + r = await client.post( + "/internal/worker/register", + json={ + "worker_id": "x", + "device_label": "x", + "worker_class": "x", + "tier": "x", + }, + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 403, r.text + assert "worker user" in r.json().get("detail", "").lower() + + +@pytest.mark.asyncio +async def test_regular_user_jwt_rejected(client, setup): + """일반 user JWT 로 /internal/worker/heartbeat 호출 → 403.""" + token = mint_access_token("test-regular-user-1b") + r = await client.post( + "/internal/worker/heartbeat", + json={"worker_id": "x", "status": "available"}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 403, r.text diff --git a/tests/test_internal_worker_endpoints.py b/tests/test_internal_worker_endpoints.py new file mode 100644 index 0000000..43c4fce --- /dev/null +++ b/tests/test_internal_worker_endpoints.py @@ -0,0 +1,275 @@ +"""PR-Worker-Pool-Registry-1B — /internal/worker/* 5 endpoint 정상 동작 검증. + +5 항목: + 1. /register UPSERT — 같은 worker_id 두 번 호출하면 단일 row 유지 + last_registered_at 갱신. + 2. /heartbeat — INSERT row 추가 + status='available' 저장 확인. + 3. /claim — queue empty 시 204 + body 0. pending row 있으면 200 + payload + status='processing'. + 4. /result — completed 시 status='completed' + result JSONB 저장. + 5. /drain — heartbeat row status='draining' INSERT. + +DB integration. GPU docker compose exec fastapi pytest 환경 가정. +""" + +from __future__ import annotations + +import os +import sys +import uuid + +import pytest +import pytest_asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from httpx import ASGITransport, AsyncClient +from sqlalchemy import text +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +from _worker_pool_helpers import ( + cleanup_worker_capabilities, + cleanup_worker_jobs, + ensure_user, + get_database_url, + mint_access_token, +) + + +@pytest_asyncio.fixture +async def db_session(monkeypatch): + monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + engine = create_async_engine(get_database_url()) + sm = async_sessionmaker(engine, expire_on_commit=False) + async with sm() as session: + yield session + await engine.dispose() + + +@pytest_asyncio.fixture +async def client(): + from main import app + + async with AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + ) as ac: + yield ac + + +@pytest_asyncio.fixture +async def worker_token(db_session): + await ensure_user(db_session, "laptop-worker-bot") + return mint_access_token("laptop-worker-bot") + + +@pytest_asyncio.fixture +async def owner_id(db_session): + return await ensure_user(db_session, "test-owner-user-1b") + + +@pytest_asyncio.fixture +async def worker_id_unique(): + wid = f"test-mbp-1b-{uuid.uuid4().hex[:8]}" + yield wid + + +@pytest_asyncio.fixture +async def cleanup_after(db_session, worker_id_unique): + yield + await cleanup_worker_jobs(db_session, "test-rec-1b") + await cleanup_worker_capabilities(db_session, worker_id_unique) + + +def _auth(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +@pytest.mark.asyncio +async def test_register_upsert(client, db_session, worker_token, worker_id_unique, cleanup_after): + body = { + "worker_id": worker_id_unique, + "device_label": "MacBook Pro M3 Pro", + "worker_class": "laptop", + "tier": "laptop_small", + "capabilities": ["short_summary"], + "models_loaded": ["gemma-3-4b-it"], + } + r1 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token)) + assert r1.status_code == 200, r1.text + # 두번째 호출 = upsert 갱신 + body["device_label"] = "MacBook Pro M3 Pro (renamed)" + r2 = await client.post("/internal/worker/register", json=body, headers=_auth(worker_token)) + assert r2.status_code == 200 + res = await db_session.execute( + text( + "SELECT device_label, last_registered_at FROM worker_capabilities WHERE worker_id = :w" + ), + {"w": worker_id_unique}, + ) + rows = res.all() + assert len(rows) == 1 + assert rows[0][0] == "MacBook Pro M3 Pro (renamed)" + + +@pytest.mark.asyncio +async def test_heartbeat_insert( + client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after +): + # 사전: worker_capabilities row 필요 (FK) + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " + "ON CONFLICT (worker_id) DO NOTHING" + ), + {"w": worker_id_unique, "u": owner_id}, + ) + await db_session.commit() + + r = await client.post( + "/internal/worker/heartbeat", + json={"worker_id": worker_id_unique, "status": "available"}, + headers=_auth(worker_token), + ) + assert r.status_code == 200 + res = await db_session.execute( + text( + "SELECT status FROM worker_heartbeats WHERE worker_id = :w ORDER BY heartbeat_at DESC LIMIT 1" + ), + {"w": worker_id_unique}, + ) + assert res.scalar_one() == "available" + + +@pytest.mark.asyncio +async def test_claim_204_when_empty(client, worker_token, cleanup_after): + r = await client.post( + "/internal/worker/claim", + json={"worker_id": "test-empty-1b", "job_type": "test-rec-1b-empty"}, + headers=_auth(worker_token), + ) + assert r.status_code == 204 + # 정정 #4: body 0 + assert r.content == b"" + + +@pytest.mark.asyncio +async def test_claim_processing_transition( + client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after +): + # seed worker_capabilities + worker_jobs + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " + "ON CONFLICT (worker_id) DO NOTHING" + ), + {"w": worker_id_unique, "u": owner_id}, + ) + jt = "test-rec-1b-claim" + job_id = ( + await db_session.execute( + text( + "INSERT INTO worker_jobs (user_id, job_type, payload) " + "VALUES (:u, :j, '{\"week\":\"2026-W20\"}'::jsonb) RETURNING id" + ), + {"u": owner_id, "j": jt}, + ) + ).scalar_one() + await db_session.commit() + + r = await client.post( + "/internal/worker/claim", + json={"worker_id": worker_id_unique, "job_type": jt}, + headers=_auth(worker_token), + ) + assert r.status_code == 200 + js = r.json() + assert js["id"] == job_id + assert js["job_type"] == jt + assert js["attempts"] == 1 + assert js["payload"]["week"] == "2026-W20" + + res = await db_session.execute( + text("SELECT status, worker_id, attempts FROM worker_jobs WHERE id = :i"), + {"i": job_id}, + ) + status_, w_id, attempts = res.first() + assert status_ == "processing" + assert w_id == worker_id_unique + assert attempts == 1 + + +@pytest.mark.asyncio +async def test_result_completed( + client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after +): + # seed + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " + "ON CONFLICT (worker_id) DO NOTHING" + ), + {"w": worker_id_unique, "u": owner_id}, + ) + jt = "test-rec-1b-result" + job_id = ( + await db_session.execute( + text( + "INSERT INTO worker_jobs (user_id, job_type, status, worker_id, " + "attempts, claimed_at) VALUES (:u, :j, 'processing', :w, 1, NOW()) RETURNING id" + ), + {"u": owner_id, "j": jt, "w": worker_id_unique}, + ) + ).scalar_one() + await db_session.commit() + + r = await client.post( + "/internal/worker/result", + json={ + "job_id": job_id, + "worker_id": worker_id_unique, + "status": "completed", + "result": {"summary": "test"}, + }, + headers=_auth(worker_token), + ) + assert r.status_code == 200 + res = await db_session.execute( + text("SELECT status, result FROM worker_jobs WHERE id = :i"), {"i": job_id} + ) + s, result_jsonb = res.first() + assert s == "completed" + assert result_jsonb == {"summary": "test"} + + +@pytest.mark.asyncio +async def test_drain_heartbeat_insert( + client, db_session, worker_token, worker_id_unique, owner_id, cleanup_after +): + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " + "ON CONFLICT (worker_id) DO NOTHING" + ), + {"w": worker_id_unique, "u": owner_id}, + ) + await db_session.commit() + + r = await client.post( + "/internal/worker/drain", + json={"worker_id": worker_id_unique, "reason": "sleep"}, + headers=_auth(worker_token), + ) + assert r.status_code == 200 + res = await db_session.execute( + text( + "SELECT status, raw_payload FROM worker_heartbeats WHERE worker_id = :w " + "ORDER BY heartbeat_at DESC LIMIT 1" + ), + {"w": worker_id_unique}, + ) + s, rp = res.first() + assert s == "draining" + assert rp == {"reason": "sleep"} diff --git a/tests/test_internal_worker_stub.py b/tests/test_internal_worker_stub.py deleted file mode 100644 index a4f3633..0000000 --- a/tests/test_internal_worker_stub.py +++ /dev/null @@ -1,32 +0,0 @@ -"""PR-Worker-Pool-Registry-1A: /internal/worker/* 5 endpoint 503 stub 검증. - -conftest.py 의 async_client/db_session fixture 가 Phase 0 TODO 상태 (line 13) -이므로 본 모듈은 inline ASGI client 만 사용. DB schema (worker_capabilities, -worker_heartbeats) 존재 검증은 deploy 후 curl + psql 로 별도 검증 (plan §A.9). -1B 활성화 시 conftest fixture 도 함께 활성화 검토. -""" - -import pytest -from httpx import ASGITransport, AsyncClient - - -@pytest.fixture(scope="module") -def anyio_backend(): - return "asyncio" - - -@pytest.mark.asyncio -async def test_all_endpoints_return_503(): - """5 endpoint (register/heartbeat/claim/result/drain) 모두 503 + detail 패턴 확인.""" - from main import app # PYTHONPATH=app 환경에서 import - - async with AsyncClient( - transport=ASGITransport(app=app), - base_url="http://test", - ) as ac: - for ep in ("register", "heartbeat", "claim", "result", "drain"): - r = await ac.post(f"/internal/worker/{ep}") - assert r.status_code == 503, f"{ep}: expected 503, got {r.status_code}" - assert "Registry-1A stub" in r.json()["detail"], ( - f"{ep}: detail mismatch: {r.json()}" - ) diff --git a/tests/test_laptop_worker_bot_auth.py b/tests/test_laptop_worker_bot_auth.py new file mode 100644 index 0000000..897d23a --- /dev/null +++ b/tests/test_laptop_worker_bot_auth.py @@ -0,0 +1,145 @@ +"""PR-Worker-Pool-Registry-1B invariant 1 강제 — voice-memo-bot wrapper 회귀 0. + +테스트 5개: + 1. voice-memo-bot legacy 발급 그대로 (env=true, username=voice-memo-bot → token) + 2. voice-memo 분기 우선 평가 (둘 다 env true 라도 voice-memo 가 먼저 hit) + 3. laptop-worker-bot env disabled → None + 4. laptop-worker-bot env enabled + username match → token (TTL 365d) + 5. password_changed_at 갱신 시 401 (legacy NULL 호환 포함, decode_token + verify_password_changed_at) + +DB 의존 0 (token 함수 + decode_token + monkeypatch only). +""" + +from __future__ import annotations + +import os +import sys +from datetime import datetime, timedelta, timezone + +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + + +@pytest.fixture(scope="module") +def anyio_backend(): + return "asyncio" + + +@pytest.fixture +def env_clean(monkeypatch): + for k in ( + "VOICE_MEMO_BOT_TOKEN_ENABLED", + "VOICE_MEMO_BOT_USERNAME", + "VOICE_MEMO_BOT_TOKEN_EXPIRE_DAYS", + "LAPTOP_WORKER_BOT_TOKEN_ENABLED", + "LAPTOP_WORKER_BOT_USERNAME", + "LAPTOP_WORKER_BOT_TOKEN_EXPIRE_DAYS", + ): + monkeypatch.delenv(k, raising=False) + return monkeypatch + + +def test_voice_memo_bot_legacy_unchanged(env_clean): + """invariant 1 ①: voice-memo-bot env=true 시 365d token 발급 그대로.""" + from core.auth import create_voice_memo_bot_token, decode_token + + env_clean.setenv("VOICE_MEMO_BOT_TOKEN_ENABLED", "true") + token = create_voice_memo_bot_token("voice-memo-bot") + assert token is not None + payload = decode_token(token) + assert payload is not None + assert payload["sub"] == "voice-memo-bot" + assert payload["type"] == "access" + + +def test_voice_memo_branch_preferred_over_laptop(env_clean): + """invariant 1 ②: 두 env 동시 true 일 때 voice-memo 분기가 먼저 평가. + + 실 호출 site (app/api/auth.py login) 의 분기 순서 검증: + bot_token = create_voice_memo_bot_token(...) # 먼저 + if bot_token is not None: return ... + laptop_bot_token = create_laptop_worker_bot_token(...) # 그 다음 + voice-memo-bot username 으로 호출하면 voice-memo 분기에서 return, + laptop_bot_token 은 평가되지 않는다. 본 테스트는 username 별 결과로 검증. + """ + from core.auth import create_laptop_worker_bot_token, create_voice_memo_bot_token + + env_clean.setenv("VOICE_MEMO_BOT_TOKEN_ENABLED", "true") + env_clean.setenv("LAPTOP_WORKER_BOT_TOKEN_ENABLED", "true") + + # voice-memo-bot username 에서는 voice-memo 가 hit, laptop 은 hit X + assert create_voice_memo_bot_token("voice-memo-bot") is not None + assert create_laptop_worker_bot_token("voice-memo-bot") is None + + # laptop-worker-bot username 에서는 반대 + assert create_voice_memo_bot_token("laptop-worker-bot") is None + assert create_laptop_worker_bot_token("laptop-worker-bot") is not None + + +def test_laptop_worker_bot_env_disabled_returns_none(env_clean): + """정정 #1 ③: env 미설정/false 일 때 None.""" + from core.auth import create_laptop_worker_bot_token + + # env 없음 + assert create_laptop_worker_bot_token("laptop-worker-bot") is None + # env false + env_clean.setenv("LAPTOP_WORKER_BOT_TOKEN_ENABLED", "false") + assert create_laptop_worker_bot_token("laptop-worker-bot") is None + # env true + username mismatch + env_clean.setenv("LAPTOP_WORKER_BOT_TOKEN_ENABLED", "true") + assert create_laptop_worker_bot_token("not-laptop-worker-bot") is None + + +def test_laptop_worker_bot_env_enabled_returns_token(env_clean): + """정정 #1 ④: env true + username match → 365d token.""" + from core.auth import create_laptop_worker_bot_token, decode_token + + env_clean.setenv("LAPTOP_WORKER_BOT_TOKEN_ENABLED", "true") + env_clean.setenv("LAPTOP_WORKER_BOT_TOKEN_EXPIRE_DAYS", "365") + token = create_laptop_worker_bot_token("laptop-worker-bot") + assert token is not None + payload = decode_token(token) + assert payload is not None + assert payload["sub"] == "laptop-worker-bot" + assert payload["type"] == "access" + now_ts = int(datetime.now(timezone.utc).timestamp()) + exp = int(payload["exp"]) + diff_days = (exp - now_ts) / 86400 + # 365d ± 1d 허용 (테스트 실행 시간 marginal) + assert 364 <= diff_days <= 366, f"expected ~365d TTL, got {diff_days:.2f}d" + + +def test_laptop_worker_bot_password_changed_at_invalidates(env_clean): + """invariant 1 ⑤: password_changed_at 갱신 시 401 (legacy NULL 호환 포함).""" + from fastapi import HTTPException + + from core.auth import ( + create_laptop_worker_bot_token, + decode_token, + verify_password_changed_at, + ) + + env_clean.setenv("LAPTOP_WORKER_BOT_TOKEN_ENABLED", "true") + token = create_laptop_worker_bot_token("laptop-worker-bot") + assert token is not None + payload = decode_token(token) + + class FakeUser: + password_changed_at: datetime | None + + def __init__(self, dt): + self.password_changed_at = dt + + # ⑤-a: NULL → skip (legacy 호환) + verify_password_changed_at(payload, FakeUser(None)) # raise 없음 + + # ⑤-b: password 변경이 token iat 보다 이전 → 통과 + past = datetime.now(timezone.utc) - timedelta(hours=1) + verify_password_changed_at(payload, FakeUser(past)) # raise 없음 + + # ⑤-c: password 변경이 token iat 보다 이후 → 401 + future = datetime.now(timezone.utc) + timedelta(hours=1) + with pytest.raises(HTTPException) as exc: + verify_password_changed_at(payload, FakeUser(future)) + assert exc.value.status_code == 401 diff --git a/tests/test_worker_jobs_ownership.py b/tests/test_worker_jobs_ownership.py new file mode 100644 index 0000000..b1cc47d --- /dev/null +++ b/tests/test_worker_jobs_ownership.py @@ -0,0 +1,99 @@ +"""PR-Worker-Pool-Registry-1B 정정 #2 — /result 소유권 검증. + +worker A 가 claim 한 job 을 worker B 가 /result 호출 → 404. +""" + +from __future__ import annotations + +import os +import sys +import uuid + +import pytest +import pytest_asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from httpx import ASGITransport, AsyncClient +from sqlalchemy import text +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +from _worker_pool_helpers import ( + cleanup_worker_capabilities, + cleanup_worker_jobs, + ensure_user, + get_database_url, + mint_access_token, +) + + +@pytest_asyncio.fixture +async def db_session(monkeypatch): + monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + engine = create_async_engine(get_database_url()) + sm = async_sessionmaker(engine, expire_on_commit=False) + async with sm() as session: + yield session + await engine.dispose() + + +@pytest_asyncio.fixture +async def worker_token(db_session): + await ensure_user(db_session, "laptop-worker-bot") + return mint_access_token("laptop-worker-bot") + + +@pytest.mark.asyncio +async def test_other_worker_cannot_submit_result(db_session, worker_token): + """worker A 의 processing job → worker B 가 /result → 404.""" + from main import app + + owner_id = await ensure_user(db_session, "test-owner-own-1b") + w_a = f"test-own-1b-a-{uuid.uuid4().hex[:6]}" + w_b = f"test-own-1b-b-{uuid.uuid4().hex[:6]}" + for w in (w_a, w_b): + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')" + ), + {"w": w, "u": owner_id}, + ) + job_id = ( + await db_session.execute( + text( + "INSERT INTO worker_jobs (user_id, job_type, status, worker_id, " + "attempts, claimed_at) VALUES (:u, 'test-own-1b', 'processing', :w, 1, NOW()) " + "RETURNING id" + ), + {"u": owner_id, "w": w_a}, + ) + ).scalar_one() + await db_session.commit() + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: + r = await c.post( + "/internal/worker/result", + json={ + "job_id": job_id, + "worker_id": w_b, + "status": "completed", + "result": {"ok": True}, + }, + headers={"Authorization": f"Bearer {worker_token}"}, + ) + assert r.status_code == 404, r.text + # job 은 여전히 processing (worker_id=w_a) + res = await db_session.execute( + text("SELECT status, worker_id FROM worker_jobs WHERE id = :i"), + {"i": job_id}, + ) + s, w_id = res.first() + assert s == "processing" + assert w_id == w_a + finally: + await cleanup_worker_jobs(db_session, "test-own-1b") + await cleanup_worker_capabilities(db_session, "test-own-1b") diff --git a/tests/test_worker_jobs_retry.py b/tests/test_worker_jobs_retry.py new file mode 100644 index 0000000..92407d0 --- /dev/null +++ b/tests/test_worker_jobs_retry.py @@ -0,0 +1,185 @@ +"""PR-Worker-Pool-Registry-1B 정정 #3 — explicit /result failed 재시도 정책. + +3 case: + A. initial attempts=0, max_attempts=3 → claim 후 attempts=1 → failed → status='pending' 복귀. + (worker_id=NULL, claimed_at=NULL, completed_at=NULL, error_message 저장) + B. initial attempts=2, max_attempts=3 → claim 후 attempts=3 → failed → final 'failed'. + (completed_at=now) + C. failed 요청에 result={"partial":"..."} 포함 → DB result IS NULL 유지 (request.result 무시). +""" + +from __future__ import annotations + +import os +import sys +import uuid + +import pytest +import pytest_asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from httpx import ASGITransport, AsyncClient +from sqlalchemy import text +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +from _worker_pool_helpers import ( + cleanup_worker_capabilities, + cleanup_worker_jobs, + ensure_user, + get_database_url, + mint_access_token, +) + + +@pytest_asyncio.fixture +async def db_session(monkeypatch): + monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + engine = create_async_engine(get_database_url()) + sm = async_sessionmaker(engine, expire_on_commit=False) + async with sm() as session: + yield session + await engine.dispose() + + +@pytest_asyncio.fixture +async def worker_token(db_session): + await ensure_user(db_session, "laptop-worker-bot") + return mint_access_token("laptop-worker-bot") + + +@pytest_asyncio.fixture +async def owner_id(db_session): + return await ensure_user(db_session, "test-owner-retry-1b") + + +@pytest_asyncio.fixture +async def worker_id_unique(db_session, owner_id): + wid = f"test-retry-1b-{uuid.uuid4().hex[:8]}" + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')" + ), + {"w": wid, "u": owner_id}, + ) + await db_session.commit() + yield wid + await cleanup_worker_jobs(db_session, "test-retry-1b") + await cleanup_worker_capabilities(db_session, wid) + + +async def _seed_processing_job( + db_session, owner_id: int, worker_id: str, attempts: int, max_attempts: int = 3 +) -> int: + job_id = ( + await db_session.execute( + text( + "INSERT INTO worker_jobs (user_id, job_type, status, worker_id, " + "attempts, max_attempts, claimed_at) " + "VALUES (:u, :j, 'processing', :w, :a, :m, NOW()) RETURNING id" + ), + { + "u": owner_id, + "j": f"test-retry-1b-{uuid.uuid4().hex[:8]}", + "w": worker_id, + "a": attempts, + "m": max_attempts, + }, + ) + ).scalar_one() + await db_session.commit() + return job_id + + +@pytest.mark.asyncio +async def test_case_a_failed_then_pending( + db_session, worker_token, worker_id_unique, owner_id +): + """Case A: attempts(=1) < max_attempts(=3) → pending 복귀.""" + from main import app + + job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=1) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + r = await c.post( + "/internal/worker/result", + json={ + "job_id": job_id, + "worker_id": worker_id_unique, + "status": "failed", + "error_message": "case A error", + }, + headers={"Authorization": f"Bearer {worker_token}"}, + ) + assert r.status_code == 200, r.text + + res = await db_session.execute( + text( + "SELECT status, worker_id, claimed_at, completed_at, error_message " + "FROM worker_jobs WHERE id = :i" + ), + {"i": job_id}, + ) + s, w_id, c_at, comp_at, em = res.first() + assert s == "pending" + assert w_id is None + assert c_at is None + assert comp_at is None + assert em == "case A error" + + +@pytest.mark.asyncio +async def test_case_b_failed_max_attempts_final( + db_session, worker_token, worker_id_unique, owner_id +): + """Case B: attempts(=3) >= max_attempts(=3) → final 'failed' + completed_at.""" + from main import app + + job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=3) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + r = await c.post( + "/internal/worker/result", + json={ + "job_id": job_id, + "worker_id": worker_id_unique, + "status": "failed", + "error_message": "case B final", + }, + headers={"Authorization": f"Bearer {worker_token}"}, + ) + assert r.status_code == 200 + + res = await db_session.execute( + text("SELECT status, completed_at FROM worker_jobs WHERE id = :i"), + {"i": job_id}, + ) + s, comp_at = res.first() + assert s == "failed" + assert comp_at is not None + + +@pytest.mark.asyncio +async def test_case_c_failed_ignores_result_field( + db_session, worker_token, worker_id_unique, owner_id +): + """Case C: failed 요청 result={'partial':...} 포함해도 DB result IS NULL 유지.""" + from main import app + + job_id = await _seed_processing_job(db_session, owner_id, worker_id_unique, attempts=1) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c: + r = await c.post( + "/internal/worker/result", + json={ + "job_id": job_id, + "worker_id": worker_id_unique, + "status": "failed", + "result": {"partial": "should be ignored"}, + "error_message": "case C", + }, + headers={"Authorization": f"Bearer {worker_token}"}, + ) + assert r.status_code == 200 + res = await db_session.execute( + text("SELECT result FROM worker_jobs WHERE id = :i"), {"i": job_id} + ) + assert res.scalar_one() is None # 정정 #3 엄격 규칙 diff --git a/tests/test_worker_jobs_skip_locked.py b/tests/test_worker_jobs_skip_locked.py new file mode 100644 index 0000000..19946a6 --- /dev/null +++ b/tests/test_worker_jobs_skip_locked.py @@ -0,0 +1,120 @@ +"""PR-Worker-Pool-Registry-1B — /claim 동시성 (SKIP LOCKED) + 204 body 검증. + +2 항목: + 1. 두 async session 동시 claim → 한쪽만 200, 다른 쪽 204 + body 0 (정정 #4) + 2. queue empty + 별도 호출 → 204 + Content-Length: 0 명시 +""" + +from __future__ import annotations + +import asyncio +import os +import sys +import uuid + +import pytest +import pytest_asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from httpx import ASGITransport, AsyncClient +from sqlalchemy import text +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +from _worker_pool_helpers import ( + cleanup_worker_capabilities, + cleanup_worker_jobs, + ensure_user, + get_database_url, + mint_access_token, +) + + +@pytest_asyncio.fixture +async def db_session(monkeypatch): + monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + engine = create_async_engine(get_database_url()) + sm = async_sessionmaker(engine, expire_on_commit=False) + async with sm() as session: + yield session + await engine.dispose() + + +@pytest_asyncio.fixture +async def worker_token(db_session): + await ensure_user(db_session, "laptop-worker-bot") + return mint_access_token("laptop-worker-bot") + + +@pytest_asyncio.fixture +async def owner_id(db_session): + return await ensure_user(db_session, "test-owner-skip-1b") + + +@pytest.mark.asyncio +async def test_skip_locked_only_one_winner(db_session, worker_token, owner_id): + """두 client 가 동시에 동일 job_type claim → 한쪽 200, 다른 쪽 204.""" + from main import app + + jt = f"test-skip-1b-{uuid.uuid4().hex[:8]}" + w1 = f"test-skip-1b-w1-{uuid.uuid4().hex[:6]}" + w2 = f"test-skip-1b-w2-{uuid.uuid4().hex[:6]}" + + # seed capability + 1 pending job + for w in (w1, w2): + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small') " + "ON CONFLICT (worker_id) DO NOTHING" + ), + {"w": w, "u": owner_id}, + ) + await db_session.execute( + text("INSERT INTO worker_jobs (user_id, job_type) VALUES (:u, :j)"), + {"u": owner_id, "j": jt}, + ) + await db_session.commit() + + headers = {"Authorization": f"Bearer {worker_token}"} + try: + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c1, \ + AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c2: + r1, r2 = await asyncio.gather( + c1.post( + "/internal/worker/claim", + json={"worker_id": w1, "job_type": jt}, + headers=headers, + ), + c2.post( + "/internal/worker/claim", + json={"worker_id": w2, "job_type": jt}, + headers=headers, + ), + ) + codes = sorted([r1.status_code, r2.status_code]) + assert codes == [200, 204], f"unexpected codes: {codes}" + loser = r1 if r1.status_code == 204 else r2 + # 정정 #4: 204 body 빈 검증 + assert loser.content == b"" + finally: + await cleanup_worker_jobs(db_session, "test-skip-1b") + await cleanup_worker_capabilities(db_session, "test-skip-1b-w") + + +@pytest.mark.asyncio +async def test_claim_204_body_explicit_empty(db_session, worker_token): + """queue empty 호출 → 204 + content empty (정정 #4 명시적).""" + from main import app + + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: + r = await c.post( + "/internal/worker/claim", + json={"worker_id": "test-skip-1b-empty", "job_type": "test-skip-1b-NOEXIST"}, + headers={"Authorization": f"Bearer {worker_token}"}, + ) + assert r.status_code == 204 + assert r.content == b"" + # Content-Length 헤더는 ASGI 가 자동 0 으로 세팅. 없을 수도 있으므로 content 만 보장. diff --git a/tests/test_worker_jobs_smoke.py b/tests/test_worker_jobs_smoke.py new file mode 100644 index 0000000..4d841e5 --- /dev/null +++ b/tests/test_worker_jobs_smoke.py @@ -0,0 +1,146 @@ +"""PR-Worker-Pool-Registry-1B — worker_jobs ORM/schema 단위 검증. + +3 항목 (endpoint test 와 중복 회피, schema-level 만): + 1. CHECK constraint — status 가 enum 4 외 값일 때 INSERT 거부 (정정 #5) + 2. partial unique-index 동작 검증 — pending row 만 색인 (idx_worker_jobs_pending_type) + 3. ON DELETE SET NULL — worker_capabilities 삭제 시 worker_jobs.worker_id 자동 NULL +""" + +from __future__ import annotations + +import os +import sys +import uuid + +import pytest +import pytest_asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from sqlalchemy import text +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + +from _worker_pool_helpers import ( + cleanup_worker_capabilities, + cleanup_worker_jobs, + ensure_user, + get_database_url, +) + + +@pytest_asyncio.fixture +async def db_session(): + engine = create_async_engine(get_database_url()) + sm = async_sessionmaker(engine, expire_on_commit=False) + async with sm() as session: + yield session + await engine.dispose() + + +@pytest_asyncio.fixture +async def owner_id(db_session): + return await ensure_user(db_session, "test-owner-smoke-1b") + + +@pytest_asyncio.fixture +async def worker_id_unique(db_session, owner_id): + wid = f"test-smoke-1b-{uuid.uuid4().hex[:8]}" + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')" + ), + {"w": wid, "u": owner_id}, + ) + await db_session.commit() + yield wid + await cleanup_worker_jobs(db_session, "test-smoke-1b") + await cleanup_worker_capabilities(db_session, wid) + + +@pytest.mark.asyncio +async def test_check_constraint_rejects_unknown_status(db_session, owner_id, worker_id_unique): + """정정 #5: status 가 enum 4 외 값이면 IntegrityError.""" + with pytest.raises(IntegrityError): + await db_session.execute( + text( + "INSERT INTO worker_jobs (user_id, job_type, status) " + "VALUES (:u, 'test-smoke-1b-bad', 'running')" + ), + {"u": owner_id}, + ) + await db_session.commit() + await db_session.rollback() + + +@pytest.mark.asyncio +async def test_partial_pending_index_used_for_claim_query(db_session, owner_id, worker_id_unique): + """partial index idx_worker_jobs_pending_type 가 pending claim 쿼리 실행계획에 사용.""" + # seed 2 rows: pending + completed + await db_session.execute( + text( + "INSERT INTO worker_jobs (user_id, job_type, status) " + "VALUES (:u, 'test-smoke-1b-idx', 'pending'), " + " (:u, 'test-smoke-1b-idx', 'completed')" + ), + {"u": owner_id}, + ) + await db_session.commit() + + # EXPLAIN ANALYZE 가 partial index 사용하는지 확인 (운영 환경에선 seq scan 가능 — 본 테스트는 인덱스 정의 존재만 보장) + res = await db_session.execute( + text( + "SELECT indexname FROM pg_indexes " + "WHERE tablename = 'worker_jobs' AND indexname = 'idx_worker_jobs_pending_type'" + ) + ) + assert res.scalar_one_or_none() == "idx_worker_jobs_pending_type" + + # pending row 만 SELECT 됨 (실제 동작 검증) + res = await db_session.execute( + text( + "SELECT count(*) FROM worker_jobs " + "WHERE status = 'pending' AND job_type = 'test-smoke-1b-idx'" + ) + ) + assert res.scalar_one() == 1 + + +@pytest.mark.asyncio +async def test_on_delete_set_null_when_capability_dropped(db_session, owner_id): + """worker_capabilities 삭제 시 worker_jobs.worker_id 자동 NULL (ON DELETE SET NULL).""" + wid = f"test-smoke-1b-del-{uuid.uuid4().hex[:8]}" + await db_session.execute( + text( + "INSERT INTO worker_capabilities (worker_id, user_id, device_label, " + "worker_class, tier) VALUES (:w, :u, 'lbl', 'laptop', 'laptop_small')" + ), + {"w": wid, "u": owner_id}, + ) + job_id = ( + await db_session.execute( + text( + "INSERT INTO worker_jobs (user_id, job_type, status, worker_id, attempts) " + "VALUES (:u, 'test-smoke-1b-del', 'completed', :w, 1) RETURNING id" + ), + {"u": owner_id, "w": wid}, + ) + ).scalar_one() + await db_session.commit() + + # worker_heartbeats CASCADE 가 worker_capabilities 삭제 차단할 수 있으니 사전 정리 + await db_session.execute( + text("DELETE FROM worker_heartbeats WHERE worker_id = :w"), {"w": wid} + ) + await db_session.execute(text("DELETE FROM worker_capabilities WHERE worker_id = :w"), {"w": wid}) + await db_session.commit() + + res = await db_session.execute( + text("SELECT worker_id FROM worker_jobs WHERE id = :i"), {"i": job_id} + ) + assert res.scalar_one() is None + + # cleanup + await db_session.execute(text("DELETE FROM worker_jobs WHERE id = :i"), {"i": job_id}) + await db_session.commit()