"""PR-Worker-Pool-Registry-1B: /internal/worker/* 5 endpoint 실 구현. worker-pool-policy §B.2 invariant 매핑: - inv 2: drain = heartbeat INSERT only (advisory). claim 거부 = Notebook-Pilot-1. - inv 3: /result result = raw JSONB only. canonical promote 0. - inv 4: ProcessingQueue 무변경 — worker_jobs 별 table. - inv 5: 운영 자동 분기 변경 0 — heartbeat alive 판정 SQL 부재, classify_worker/queue_consumer touch 0. 사용자 review 정정 5개 (2026-05-19): - #1: worker_jobs.user_id = job owner (실 사용자). worker 인증은 worker_id + JWT 별도. - #2: /result 소유권 검증 (WHERE id AND worker_id AND status='processing'). 매칭 0건 → 404. - #3: explicit failed 재시도 (attempts=max → final failed). - #4: /claim 204 = Response(status_code=204) body 0. - #5: mig 275 status CHECK ('pending','processing','completed','failed'). """ from datetime import datetime, timezone from typing import Annotated, Any from fastapi import APIRouter, Depends, HTTPException, Response, status from pydantic import BaseModel from sqlalchemy import select, update from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from core.auth import require_worker_user from core.database import get_session from models.worker_pool import WorkerCapability, WorkerHeartbeat, WorkerJob router = APIRouter() # ─── Pydantic schemas ─── class WorkerRegisterRequest(BaseModel): worker_id: str device_label: str worker_class: str tier: str capabilities: list[str] = [] models_loaded: list[str] = [] endpoint: str | None = None class WorkerHeartbeatRequest(BaseModel): worker_id: str status: str # starting/available/busy/draining current_job_id: int | None = None battery: str | None = None thermal: str | None = None raw_payload: dict[str, Any] = {} class WorkerClaimRequest(BaseModel): worker_id: str job_type: str class WorkerClaimResponse(BaseModel): id: int job_type: str payload: dict[str, Any] attempts: int class WorkerResultRequest(BaseModel): job_id: int worker_id: str # 정정 #2 — 소유권 검증 status: str # completed | failed result: dict[str, Any] | None = None error_message: str | None = None class WorkerDrainRequest(BaseModel): worker_id: str reason: str | None = None # ─── 엔드포인트 ─── @router.post("/register") async def register( body: WorkerRegisterRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """worker_capabilities UPSERT — register 또는 capability 갱신.""" now = datetime.now(timezone.utc) stmt = pg_insert(WorkerCapability).values( worker_id=body.worker_id, user_id=user.id, device_label=body.device_label, worker_class=body.worker_class, tier=body.tier, capabilities=body.capabilities, models_loaded=body.models_loaded, endpoint=body.endpoint, created_at=now, last_registered_at=now, ).on_conflict_do_update( index_elements=["worker_id"], set_={ "device_label": body.device_label, "worker_class": body.worker_class, "tier": body.tier, "capabilities": body.capabilities, "models_loaded": body.models_loaded, "endpoint": body.endpoint, "last_registered_at": now, }, ) await session.execute(stmt) await session.commit() return {"ok": True, "worker_id": body.worker_id} @router.post("/heartbeat") async def heartbeat( body: WorkerHeartbeatRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """worker_heartbeats append-only INSERT. inv 5 강제: alive 판정 SQL 부재. 본 endpoint 는 row 추가 + ok 반환만. """ hb = WorkerHeartbeat( worker_id=body.worker_id, status=body.status, current_job_id=body.current_job_id, battery=body.battery, thermal=body.thermal, raw_payload=body.raw_payload, ) session.add(hb) await session.commit() return {"ok": True} @router.post( "/claim", responses={ 200: {"model": WorkerClaimResponse}, 204: {"description": "queue empty"}, }, ) async def claim( body: WorkerClaimRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """SELECT FOR UPDATE SKIP LOCKED 로 pending job 1건 claim. 정정 #4: miss → Response(status_code=204) body 0. WorkerClaimResponse | None 회피. """ now = datetime.now(timezone.utc) stmt = ( select(WorkerJob) .where(WorkerJob.status == "pending", WorkerJob.job_type == body.job_type) .order_by(WorkerJob.created_at) .limit(1) .with_for_update(skip_locked=True) ) result = await session.execute(stmt) job = result.scalar_one_or_none() if job is None: await session.commit() # FOR UPDATE 트랜잭션 해제 return Response(status_code=204) job.status = "processing" job.worker_id = body.worker_id job.claimed_at = now job.attempts = job.attempts + 1 await session.commit() return WorkerClaimResponse( id=job.id, job_type=job.job_type, payload=job.payload, attempts=job.attempts, ) @router.post("/result") async def result( body: WorkerResultRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """job 결과 제출. 정정 #2 (소유권) + #3 (재시도) 강제. 소유권 검증: WHERE id AND worker_id AND status='processing'. 매칭 0건 → 404. completed: status='completed' + result + completed_at. failed: attempts < max_attempts → status='pending' (worker_id/claimed_at/completed_at NULL). attempts >= max_attempts → status='failed' final + completed_at. result 컬럼 절대 갱신 X — request.result 무시 (failed 시 partial result 저장 차단). """ if body.status not in ("completed", "failed"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="status must be 'completed' or 'failed'", ) stmt = select(WorkerJob).where( WorkerJob.id == body.job_id, WorkerJob.worker_id == body.worker_id, WorkerJob.status == "processing", ) res = await session.execute(stmt) job = res.scalar_one_or_none() if job is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="job not found or not owned by this worker (or not in processing)", ) now = datetime.now(timezone.utc) if body.status == "completed": job.status = "completed" job.result = body.result # raw JSONB (inv 3 — canonical promote 0) job.completed_at = now job.error_message = None else: # failed job.error_message = body.error_message # 정정 #3 정책: result 컬럼 절대 갱신 X (request.result 무시) if job.attempts < job.max_attempts: job.status = "pending" job.worker_id = None job.claimed_at = None job.completed_at = None else: job.status = "failed" job.completed_at = now await session.commit() return {"ok": True, "status": job.status, "attempts": job.attempts} @router.post("/drain") async def drain( body: WorkerDrainRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """drain = heartbeat INSERT status='draining' (advisory/audit only, inv 2). claim 거부 로직 부재 = Notebook-Pilot-1 영역. """ payload: dict[str, Any] = {} if body.reason: payload["reason"] = body.reason hb = WorkerHeartbeat( worker_id=body.worker_id, status="draining", raw_payload=payload, ) session.add(hb) await session.commit() return {"ok": True}