0ea72c1aa6
- app/services/worker_recap_context.py — fetch_recap_context(user_id, days) documents file_type='note' 7d (single-user invariant) + events 7d (user_id 매칭 + cancelled 제외) JOIN. timezone Asia/Seoul. - /internal/worker/jobs/recap POST — 일반 user JWT 인증 + context 조립 + worker_jobs INSERT. job_type='recap' + payload JSONB. - payload 100KB guard — JSON 직렬화 100_000 bytes 초과 시 413. - 회귀 위험 0: memos/events API select 절 touch 0, read-only 쿼리만. worker-pool-policy §B.2 invariant 보존: ProcessingQueue 무변경, 운영 자동 분기 변경 0, canonical promote 0 (worker_jobs.payload JSONB only). Notebook-Pilot-1 entry condition 4항목 모두 충족 가능: manual recap E2E / payload <100KB guard / residue 0 / 권한 분리 403. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
318 lines
10 KiB
Python
318 lines
10 KiB
Python
"""PR-Worker-Pool-Registry-1B: /internal/worker/* 5 endpoint 실 구현.
|
|
|
|
worker-pool-policy §B.2 invariant 매핑:
|
|
- inv 2: drain = heartbeat INSERT only (advisory). claim 거부 = Notebook-Pilot-1.
|
|
- inv 3: /result result = raw JSONB only. canonical promote 0.
|
|
- inv 4: ProcessingQueue 무변경 — worker_jobs 별 table.
|
|
- inv 5: 운영 자동 분기 변경 0 — heartbeat alive 판정 SQL 부재, classify_worker/queue_consumer touch 0.
|
|
|
|
사용자 review 정정 5개 (2026-05-19):
|
|
- #1: worker_jobs.user_id = job owner (실 사용자). worker 인증은 worker_id + JWT 별도.
|
|
- #2: /result 소유권 검증 (WHERE id AND worker_id AND status='processing'). 매칭 0건 → 404.
|
|
- #3: explicit failed 재시도 (attempts<max → pending 복귀, attempts>=max → final failed).
|
|
- #4: /claim 204 = Response(status_code=204) body 0.
|
|
- #5: mig 275 status CHECK ('pending','processing','completed','failed').
|
|
"""
|
|
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Annotated, Any
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, Response, status
|
|
from pydantic import BaseModel, Field
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user, require_worker_user
|
|
from core.database import get_session
|
|
from models.worker_pool import WorkerCapability, WorkerHeartbeat, WorkerJob
|
|
from services.worker_recap_context import fetch_recap_context
|
|
|
|
# PR-Worker-Pool-Registry-1C — payload size guard (recap context 가 큰 경우 차단).
|
|
# JSON 직렬화 후 100KB 초과 → 413. memo/event 7d 묶음이 통상 < 30KB.
|
|
PAYLOAD_MAX_BYTES = 100_000
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
# ─── Pydantic schemas ───
|
|
|
|
|
|
class WorkerRegisterRequest(BaseModel):
|
|
worker_id: str
|
|
device_label: str
|
|
worker_class: str
|
|
tier: str
|
|
capabilities: list[str] = []
|
|
models_loaded: list[str] = []
|
|
endpoint: str | None = None
|
|
|
|
|
|
class WorkerHeartbeatRequest(BaseModel):
|
|
worker_id: str
|
|
status: str # starting/available/busy/draining
|
|
current_job_id: int | None = None
|
|
battery: str | None = None
|
|
thermal: str | None = None
|
|
raw_payload: dict[str, Any] = {}
|
|
|
|
|
|
class WorkerClaimRequest(BaseModel):
|
|
worker_id: str
|
|
job_type: str
|
|
|
|
|
|
class WorkerClaimResponse(BaseModel):
|
|
id: int
|
|
job_type: str
|
|
payload: dict[str, Any]
|
|
attempts: int
|
|
|
|
|
|
class WorkerResultRequest(BaseModel):
|
|
job_id: int
|
|
worker_id: str # 정정 #2 — 소유권 검증
|
|
status: str # completed | failed
|
|
result: dict[str, Any] | None = None
|
|
error_message: str | None = None
|
|
|
|
|
|
class WorkerDrainRequest(BaseModel):
|
|
worker_id: str
|
|
reason: str | None = None
|
|
|
|
|
|
# ─── 엔드포인트 ───
|
|
|
|
|
|
@router.post("/register")
|
|
async def register(
|
|
body: WorkerRegisterRequest,
|
|
user: Annotated[Any, Depends(require_worker_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""worker_capabilities UPSERT — register 또는 capability 갱신."""
|
|
now = datetime.now(timezone.utc)
|
|
stmt = pg_insert(WorkerCapability).values(
|
|
worker_id=body.worker_id,
|
|
user_id=user.id,
|
|
device_label=body.device_label,
|
|
worker_class=body.worker_class,
|
|
tier=body.tier,
|
|
capabilities=body.capabilities,
|
|
models_loaded=body.models_loaded,
|
|
endpoint=body.endpoint,
|
|
created_at=now,
|
|
last_registered_at=now,
|
|
).on_conflict_do_update(
|
|
index_elements=["worker_id"],
|
|
set_={
|
|
"device_label": body.device_label,
|
|
"worker_class": body.worker_class,
|
|
"tier": body.tier,
|
|
"capabilities": body.capabilities,
|
|
"models_loaded": body.models_loaded,
|
|
"endpoint": body.endpoint,
|
|
"last_registered_at": now,
|
|
},
|
|
)
|
|
await session.execute(stmt)
|
|
await session.commit()
|
|
return {"ok": True, "worker_id": body.worker_id}
|
|
|
|
|
|
@router.post("/heartbeat")
|
|
async def heartbeat(
|
|
body: WorkerHeartbeatRequest,
|
|
user: Annotated[Any, Depends(require_worker_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""worker_heartbeats append-only INSERT.
|
|
|
|
inv 5 강제: alive 판정 SQL 부재. 본 endpoint 는 row 추가 + ok 반환만.
|
|
"""
|
|
hb = WorkerHeartbeat(
|
|
worker_id=body.worker_id,
|
|
status=body.status,
|
|
current_job_id=body.current_job_id,
|
|
battery=body.battery,
|
|
thermal=body.thermal,
|
|
raw_payload=body.raw_payload,
|
|
)
|
|
session.add(hb)
|
|
await session.commit()
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post(
|
|
"/claim",
|
|
responses={
|
|
200: {"model": WorkerClaimResponse},
|
|
204: {"description": "queue empty"},
|
|
},
|
|
)
|
|
async def claim(
|
|
body: WorkerClaimRequest,
|
|
user: Annotated[Any, Depends(require_worker_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""SELECT FOR UPDATE SKIP LOCKED 로 pending job 1건 claim.
|
|
|
|
정정 #4: miss → Response(status_code=204) body 0. WorkerClaimResponse | None 회피.
|
|
"""
|
|
now = datetime.now(timezone.utc)
|
|
stmt = (
|
|
select(WorkerJob)
|
|
.where(WorkerJob.status == "pending", WorkerJob.job_type == body.job_type)
|
|
.order_by(WorkerJob.created_at)
|
|
.limit(1)
|
|
.with_for_update(skip_locked=True)
|
|
)
|
|
result = await session.execute(stmt)
|
|
job = result.scalar_one_or_none()
|
|
if job is None:
|
|
await session.commit() # FOR UPDATE 트랜잭션 해제
|
|
return Response(status_code=204)
|
|
|
|
job.status = "processing"
|
|
job.worker_id = body.worker_id
|
|
job.claimed_at = now
|
|
job.attempts = job.attempts + 1
|
|
await session.commit()
|
|
|
|
return WorkerClaimResponse(
|
|
id=job.id,
|
|
job_type=job.job_type,
|
|
payload=job.payload,
|
|
attempts=job.attempts,
|
|
)
|
|
|
|
|
|
@router.post("/result")
|
|
async def result(
|
|
body: WorkerResultRequest,
|
|
user: Annotated[Any, Depends(require_worker_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""job 결과 제출. 정정 #2 (소유권) + #3 (재시도) 강제.
|
|
|
|
소유권 검증: WHERE id AND worker_id AND status='processing'. 매칭 0건 → 404.
|
|
completed: status='completed' + result + completed_at.
|
|
failed:
|
|
attempts < max_attempts → status='pending' (worker_id/claimed_at/completed_at NULL).
|
|
attempts >= max_attempts → status='failed' final + completed_at.
|
|
result 컬럼 절대 갱신 X — request.result 무시 (failed 시 partial result 저장 차단).
|
|
"""
|
|
if body.status not in ("completed", "failed"):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="status must be 'completed' or 'failed'",
|
|
)
|
|
|
|
stmt = select(WorkerJob).where(
|
|
WorkerJob.id == body.job_id,
|
|
WorkerJob.worker_id == body.worker_id,
|
|
WorkerJob.status == "processing",
|
|
)
|
|
res = await session.execute(stmt)
|
|
job = res.scalar_one_or_none()
|
|
if job is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="job not found or not owned by this worker (or not in processing)",
|
|
)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
if body.status == "completed":
|
|
job.status = "completed"
|
|
job.result = body.result # raw JSONB (inv 3 — canonical promote 0)
|
|
job.completed_at = now
|
|
job.error_message = None
|
|
else: # failed
|
|
job.error_message = body.error_message
|
|
# 정정 #3 정책: result 컬럼 절대 갱신 X (request.result 무시)
|
|
if job.attempts < job.max_attempts:
|
|
job.status = "pending"
|
|
job.worker_id = None
|
|
job.claimed_at = None
|
|
job.completed_at = None
|
|
else:
|
|
job.status = "failed"
|
|
job.completed_at = now
|
|
|
|
await session.commit()
|
|
return {"ok": True, "status": job.status, "attempts": job.attempts}
|
|
|
|
|
|
class JobsRecapRequest(BaseModel):
|
|
days: int = Field(default=7, ge=1, le=30)
|
|
|
|
|
|
class JobsRecapResponse(BaseModel):
|
|
job_id: int
|
|
memo_count: int
|
|
event_count: int
|
|
payload_bytes: int
|
|
|
|
|
|
@router.post("/jobs/recap", response_model=JobsRecapResponse)
|
|
async def enqueue_recap(
|
|
body: JobsRecapRequest,
|
|
user: Annotated[Any, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""PR-Worker-Pool-Registry-1C — recap context 조립 + worker_jobs INSERT.
|
|
|
|
인증 = 일반 user JWT (require_worker_user 아님). user 자신의 memo/event 만 묶음.
|
|
payload size guard = JSON 직렬화 100KB 초과 시 413 (정정 #4 정신, recap-specific).
|
|
"""
|
|
context = await fetch_recap_context(session, user_id=user.id, days=body.days)
|
|
payload_bytes = len(json.dumps(context, ensure_ascii=False).encode("utf-8"))
|
|
if payload_bytes > PAYLOAD_MAX_BYTES:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
|
|
detail=(
|
|
f"recap context payload {payload_bytes} bytes > {PAYLOAD_MAX_BYTES} bytes. "
|
|
"days 를 줄여 재시도하거나 운영자에게 cap 조정 요청."
|
|
),
|
|
)
|
|
|
|
job = WorkerJob(
|
|
user_id=user.id,
|
|
job_type="recap",
|
|
payload=context,
|
|
)
|
|
session.add(job)
|
|
await session.commit()
|
|
await session.refresh(job)
|
|
return JobsRecapResponse(
|
|
job_id=job.id,
|
|
memo_count=context["memo_count"],
|
|
event_count=context["event_count"],
|
|
payload_bytes=payload_bytes,
|
|
)
|
|
|
|
|
|
@router.post("/drain")
|
|
async def drain(
|
|
body: WorkerDrainRequest,
|
|
user: Annotated[Any, Depends(require_worker_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""drain = heartbeat INSERT status='draining' (advisory/audit only, inv 2).
|
|
|
|
claim 거부 로직 부재 = Notebook-Pilot-1 영역.
|
|
"""
|
|
payload: dict[str, Any] = {}
|
|
if body.reason:
|
|
payload["reason"] = body.reason
|
|
hb = WorkerHeartbeat(
|
|
worker_id=body.worker_id,
|
|
status="draining",
|
|
raw_payload=payload,
|
|
)
|
|
session.add(hb)
|
|
await session.commit()
|
|
return {"ok": True}
|