"""PR-Worker-Pool-Registry-1B: /internal/worker/* 5 endpoint 실 구현. worker-pool-policy §B.2 invariant 매핑: - inv 2: drain = heartbeat INSERT only (advisory). claim 거부 = Notebook-Pilot-1. - inv 3: /result result = raw JSONB only. canonical promote 0. - inv 4: ProcessingQueue 무변경 — worker_jobs 별 table. - inv 5: 운영 자동 분기 변경 0 — heartbeat alive 판정 SQL 부재, classify_worker/queue_consumer touch 0. 사용자 review 정정 5개 (2026-05-19): - #1: worker_jobs.user_id = job owner (실 사용자). worker 인증은 worker_id + JWT 별도. - #2: /result 소유권 검증 (WHERE id AND worker_id AND status='processing'). 매칭 0건 → 404. - #3: explicit failed 재시도 (attempts=max → final failed). - #4: /claim 204 = Response(status_code=204) body 0. - #5: mig 275 status CHECK ('pending','processing','completed','failed'). """ import json import os from datetime import datetime, timezone from typing import Annotated, Any from fastapi import APIRouter, Depends, HTTPException, Response, status from pydantic import BaseModel, Field from sqlalchemy import select, update from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user, require_worker_user from core.database import get_session from models.worker_pool import WorkerCapability, WorkerHeartbeat, WorkerJob from services.worker_recap_context import fetch_recap_context # PR-Worker-Pool-Registry-1C — payload size guard (recap context 가 큰 경우 차단). # 사용자 결정 2026-05-19: cap 1MB 상향 + fetch_recap_context deterministic compaction # (top-N memo + daily/kind aggregate). 운영 7d 데이터 ~1.36MB → 100KB 부족 → 1MB. # 운영 조정용 env override = `WORKER_RECAP_PAYLOAD_MAX_BYTES`. def _payload_max_bytes() -> int: return int(os.getenv("WORKER_RECAP_PAYLOAD_MAX_BYTES", "1000000")) router = APIRouter() # ─── Pydantic schemas ─── class WorkerRegisterRequest(BaseModel): worker_id: str device_label: str worker_class: str tier: str capabilities: list[str] = [] models_loaded: list[str] = [] endpoint: str | None = None class WorkerHeartbeatRequest(BaseModel): worker_id: str status: str # starting/available/busy/draining current_job_id: int | None = None battery: str | None = None thermal: str | None = None raw_payload: dict[str, Any] = {} class WorkerClaimRequest(BaseModel): worker_id: str job_type: str class WorkerClaimResponse(BaseModel): id: int job_type: str payload: dict[str, Any] attempts: int class WorkerResultRequest(BaseModel): job_id: int worker_id: str # 정정 #2 — 소유권 검증 status: str # completed | failed result: dict[str, Any] | None = None error_message: str | None = None class WorkerDrainRequest(BaseModel): worker_id: str reason: str | None = None # ─── 엔드포인트 ─── @router.post("/register") async def register( body: WorkerRegisterRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """worker_capabilities UPSERT — register 또는 capability 갱신.""" now = datetime.now(timezone.utc) stmt = pg_insert(WorkerCapability).values( worker_id=body.worker_id, user_id=user.id, device_label=body.device_label, worker_class=body.worker_class, tier=body.tier, capabilities=body.capabilities, models_loaded=body.models_loaded, endpoint=body.endpoint, created_at=now, last_registered_at=now, ).on_conflict_do_update( index_elements=["worker_id"], set_={ "device_label": body.device_label, "worker_class": body.worker_class, "tier": body.tier, "capabilities": body.capabilities, "models_loaded": body.models_loaded, "endpoint": body.endpoint, "last_registered_at": now, }, ) await session.execute(stmt) await session.commit() return {"ok": True, "worker_id": body.worker_id} @router.post("/heartbeat") async def heartbeat( body: WorkerHeartbeatRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """worker_heartbeats append-only INSERT. inv 5 강제: alive 판정 SQL 부재. 본 endpoint 는 row 추가 + ok 반환만. """ hb = WorkerHeartbeat( worker_id=body.worker_id, status=body.status, current_job_id=body.current_job_id, battery=body.battery, thermal=body.thermal, raw_payload=body.raw_payload, ) session.add(hb) await session.commit() return {"ok": True} @router.post( "/claim", responses={ 200: {"model": WorkerClaimResponse}, 204: {"description": "queue empty"}, }, ) async def claim( body: WorkerClaimRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """SELECT FOR UPDATE SKIP LOCKED 로 pending job 1건 claim. 정정 #4: miss → Response(status_code=204) body 0. WorkerClaimResponse | None 회피. """ now = datetime.now(timezone.utc) stmt = ( select(WorkerJob) .where(WorkerJob.status == "pending", WorkerJob.job_type == body.job_type) .order_by(WorkerJob.created_at) .limit(1) .with_for_update(skip_locked=True) ) result = await session.execute(stmt) job = result.scalar_one_or_none() if job is None: await session.commit() # FOR UPDATE 트랜잭션 해제 return Response(status_code=204) job.status = "processing" job.worker_id = body.worker_id job.claimed_at = now job.attempts = job.attempts + 1 await session.commit() return WorkerClaimResponse( id=job.id, job_type=job.job_type, payload=job.payload, attempts=job.attempts, ) @router.post("/result") async def result( body: WorkerResultRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """job 결과 제출. 정정 #2 (소유권) + #3 (재시도) 강제. 소유권 검증: WHERE id AND worker_id AND status='processing'. 매칭 0건 → 404. completed: status='completed' + result + completed_at. failed: attempts < max_attempts → status='pending' (worker_id/claimed_at/completed_at NULL). attempts >= max_attempts → status='failed' final + completed_at. result 컬럼 절대 갱신 X — request.result 무시 (failed 시 partial result 저장 차단). """ if body.status not in ("completed", "failed"): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="status must be 'completed' or 'failed'", ) stmt = select(WorkerJob).where( WorkerJob.id == body.job_id, WorkerJob.worker_id == body.worker_id, WorkerJob.status == "processing", ) res = await session.execute(stmt) job = res.scalar_one_or_none() if job is None: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="job not found or not owned by this worker (or not in processing)", ) now = datetime.now(timezone.utc) if body.status == "completed": job.status = "completed" job.result = body.result # raw JSONB (inv 3 — canonical promote 0) job.completed_at = now job.error_message = None else: # failed job.error_message = body.error_message # 정정 #3 정책: result 컬럼 절대 갱신 X (request.result 무시) if job.attempts < job.max_attempts: job.status = "pending" job.worker_id = None job.claimed_at = None job.completed_at = None else: job.status = "failed" job.completed_at = now await session.commit() return {"ok": True, "status": job.status, "attempts": job.attempts} class JobsRecapRequest(BaseModel): days: int = Field(default=7, ge=1, le=30) class JobsRecapResponse(BaseModel): job_id: int memo_count: int event_count: int payload_bytes: int payload_compacted: bool omitted_memos: int @router.post("/jobs/recap", response_model=JobsRecapResponse) async def enqueue_recap( body: JobsRecapRequest, user: Annotated[Any, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """PR-Worker-Pool-Registry-1C — recap context 조립 + worker_jobs INSERT. 인증 = 일반 user JWT (require_worker_user 아님). user 자신의 memo/event 만 묶음. payload size guard = JSON 직렬화 100KB 초과 시 413 (정정 #4 정신, recap-specific). """ context = await fetch_recap_context(session, user_id=user.id, days=body.days) payload_bytes = len(json.dumps(context, ensure_ascii=False).encode("utf-8")) cap = _payload_max_bytes() if payload_bytes > cap: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail=( f"recap context payload {payload_bytes} bytes > {cap} bytes (after compaction). " f"days 를 줄여 재시도 (현재 {body.days}d) 또는 운영자에게 RECAP_MEMO_TOP_N / " "WORKER_RECAP_PAYLOAD_MAX_BYTES 조정 요청." ), ) job = WorkerJob( user_id=user.id, job_type="recap", payload=context, ) session.add(job) await session.commit() await session.refresh(job) return JobsRecapResponse( job_id=job.id, memo_count=context["memo_count"], event_count=context["event_count"], payload_bytes=payload_bytes, payload_compacted=context["payload_compacted"], omitted_memos=context["summary_stats"]["omitted_memos"], ) @router.post("/drain") async def drain( body: WorkerDrainRequest, user: Annotated[Any, Depends(require_worker_user)], session: Annotated[AsyncSession, Depends(get_session)], ): """drain = heartbeat INSERT status='draining' (advisory/audit only, inv 2). claim 거부 로직 부재 = Notebook-Pilot-1 영역. """ payload: dict[str, Any] = {} if body.reason: payload["reason"] = body.reason hb = WorkerHeartbeat( worker_id=body.worker_id, status="draining", raw_payload=payload, ) session.add(hb) await session.commit() return {"ok": True}