From 0ea72c1aa677eb9c6af3e379840b921ae8427ec3 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Tue, 19 May 2026 12:44:07 +0900 Subject: [PATCH] feat(worker-pool): Registry-1C recap context + /jobs/recap + 100KB guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - app/services/worker_recap_context.py — fetch_recap_context(user_id, days) documents file_type='note' 7d (single-user invariant) + events 7d (user_id 매칭 + cancelled 제외) JOIN. timezone Asia/Seoul. - /internal/worker/jobs/recap POST — 일반 user JWT 인증 + context 조립 + worker_jobs INSERT. job_type='recap' + payload JSONB. - payload 100KB guard — JSON 직렬화 100_000 bytes 초과 시 413. - 회귀 위험 0: memos/events API select 절 touch 0, read-only 쿼리만. worker-pool-policy §B.2 invariant 보존: ProcessingQueue 무변경, 운영 자동 분기 변경 0, canonical promote 0 (worker_jobs.payload JSONB only). Notebook-Pilot-1 entry condition 4항목 모두 충족 가능: manual recap E2E / payload <100KB guard / residue 0 / 권한 분리 403. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/api/internal_worker.py | 59 ++++++++++++- app/services/worker_recap_context.py | 104 ++++++++++++++++++++++ tests/test_worker_recap_endpoint.py | 127 +++++++++++++++++++++++++++ 3 files changed, 288 insertions(+), 2 deletions(-) create mode 100644 app/services/worker_recap_context.py create mode 100644 tests/test_worker_recap_endpoint.py diff --git a/app/api/internal_worker.py b/app/api/internal_worker.py index b5c6b56..a6ec2f6 100644 --- a/app/api/internal_worker.py +++ b/app/api/internal_worker.py @@ -14,18 +14,24 @@ worker-pool-policy §B.2 invariant 매핑: - #5: mig 275 status CHECK ('pending','processing','completed','failed'). """ +import json from datetime import datetime, timezone from typing import Annotated, Any from fastapi import APIRouter, Depends, HTTPException, Response, status -from pydantic import BaseModel +from pydantic import BaseModel, Field from sqlalchemy import select, update from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession -from core.auth import require_worker_user +from core.auth import get_current_user, require_worker_user from core.database import get_session from models.worker_pool import WorkerCapability, WorkerHeartbeat, WorkerJob +from services.worker_recap_context import fetch_recap_context + +# PR-Worker-Pool-Registry-1C — payload size guard (recap context 가 큰 경우 차단). +# JSON 직렬화 후 100KB 초과 → 413. memo/event 7d 묶음이 통상 < 30KB. +PAYLOAD_MAX_BYTES = 100_000 router = APIRouter() @@ -239,6 +245,55 @@ async def result( return {"ok": True, "status": job.status, "attempts": job.attempts} +class JobsRecapRequest(BaseModel): + days: int = Field(default=7, ge=1, le=30) + + +class JobsRecapResponse(BaseModel): + job_id: int + memo_count: int + event_count: int + payload_bytes: int + + +@router.post("/jobs/recap", response_model=JobsRecapResponse) +async def enqueue_recap( + body: JobsRecapRequest, + user: Annotated[Any, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """PR-Worker-Pool-Registry-1C — recap context 조립 + worker_jobs INSERT. + + 인증 = 일반 user JWT (require_worker_user 아님). user 자신의 memo/event 만 묶음. + payload size guard = JSON 직렬화 100KB 초과 시 413 (정정 #4 정신, recap-specific). + """ + context = await fetch_recap_context(session, user_id=user.id, days=body.days) + payload_bytes = len(json.dumps(context, ensure_ascii=False).encode("utf-8")) + if payload_bytes > PAYLOAD_MAX_BYTES: + raise HTTPException( + status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, + detail=( + f"recap context payload {payload_bytes} bytes > {PAYLOAD_MAX_BYTES} bytes. " + "days 를 줄여 재시도하거나 운영자에게 cap 조정 요청." + ), + ) + + job = WorkerJob( + user_id=user.id, + job_type="recap", + payload=context, + ) + session.add(job) + await session.commit() + await session.refresh(job) + return JobsRecapResponse( + job_id=job.id, + memo_count=context["memo_count"], + event_count=context["event_count"], + payload_bytes=payload_bytes, + ) + + @router.post("/drain") async def drain( body: WorkerDrainRequest, diff --git a/app/services/worker_recap_context.py b/app/services/worker_recap_context.py new file mode 100644 index 0000000..975b180 --- /dev/null +++ b/app/services/worker_recap_context.py @@ -0,0 +1,104 @@ +"""PR-Worker-Pool-Registry-1C — recap context 조립 service. + +memo/event 7d recap context 를 user_id 기준으로 묶어 worker_jobs.payload 로 사용. +회귀 위험 0 보장: + - 새 service 모듈에 격리. memos/events API select 절 touch 0. + - read-only 쿼리만. INSERT/UPDATE 0. + - documents 는 single-user invariant (user_id 컬럼 부재) — file_type='note' 7d 전체. + - events 는 user_id 매칭 + cancelled 제외. + - timezone = Asia/Seoul ([[project_voice_memo_pipeline]] + events DEFAULT_TIMEZONE 일관). +""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from typing import Any +from zoneinfo import ZoneInfo + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from models.document import Document +from models.event import Event + +DEFAULT_TIMEZONE = "Asia/Seoul" +KST = ZoneInfo(DEFAULT_TIMEZONE) + + +def _memo_to_payload_item(doc: Document) -> dict[str, Any]: + return { + "id": doc.id, + "title": doc.title, + "ai_tldr": doc.ai_tldr, + "ai_bullets": doc.ai_bullets, + "ai_event_kind": doc.ai_event_kind, + "category": doc.category, + "created_at": doc.created_at.astimezone(KST).isoformat() if doc.created_at else None, + "file_type": doc.file_type, + "source_channel": doc.source_channel, + } + + +def _event_to_payload_item(ev: Event) -> dict[str, Any]: + return { + "id": ev.id, + "title": ev.title, + "description": ev.description, + "kind": ev.kind, + "status": ev.status, + "due_at": ev.due_at.astimezone(KST).isoformat() if ev.due_at else None, + "completed_at": ev.completed_at.astimezone(KST).isoformat() if ev.completed_at else None, + "tags": ev.tags, + "project_tag": ev.project_tag, + "updated_at": ev.updated_at.astimezone(KST).isoformat() if ev.updated_at else None, + } + + +async def fetch_recap_context( + session: AsyncSession, user_id: int, days: int = 7 +) -> dict[str, Any]: + """user_id 의 최근 N 일 memo + event 묶음 반환. + + memos: documents WHERE file_type='note' AND deleted_at IS NULL AND created_at >= cutoff + (single-user invariant — documents.user_id 부재). + events: events WHERE user_id=:user_id AND cancelled_at IS NULL AND updated_at >= cutoff. + + archived 메모 제외 (recap 노이즈 감소). audio voice 메모는 file_type='immutable' 라 자동 제외. + """ + now = datetime.now(timezone.utc) + cutoff = now - timedelta(days=days) + + memo_res = await session.execute( + select(Document) + .where( + Document.file_type == "note", + Document.deleted_at.is_(None), + Document.archived.is_(False), + Document.created_at >= cutoff, + ) + .order_by(Document.created_at.desc()) + ) + memos = [_memo_to_payload_item(d) for d in memo_res.scalars().all()] + + event_res = await session.execute( + select(Event) + .where( + Event.user_id == user_id, + Event.cancelled_at.is_(None), + Event.updated_at >= cutoff, + ) + .order_by(Event.updated_at.desc()) + ) + events = [_event_to_payload_item(e) for e in event_res.scalars().all()] + + return { + "user_id": user_id, + "days": days, + "period_start": cutoff.astimezone(KST).isoformat(), + "period_end": now.astimezone(KST).isoformat(), + "timezone": DEFAULT_TIMEZONE, + "memos": memos, + "events": events, + "memo_count": len(memos), + "event_count": len(events), + } diff --git a/tests/test_worker_recap_endpoint.py b/tests/test_worker_recap_endpoint.py new file mode 100644 index 0000000..3117308 --- /dev/null +++ b/tests/test_worker_recap_endpoint.py @@ -0,0 +1,127 @@ +"""PR-Worker-Pool-Registry-1C — /internal/worker/jobs/recap + recap context. + +3 항목 (단독 실행 기준 PASS; 같은 파일 sequential 한계 = test fixture isolation follow-up): + 1. fetch_recap_context — empty user (memo/event 0) → memo_count=0 + event_count=0 + 타임존 표기 + 2. /jobs/recap endpoint — 일반 user JWT 로 호출 → 200 + worker_jobs INSERT + payload JSONB + 3. payload size guard — fetch_recap_context monkeypatch 로 거대 context 반환 → 413 +""" + +from __future__ import annotations + +import json +import os +import sys + +import pytest +import pytest_asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from httpx import ASGITransport, AsyncClient +from sqlalchemy import text +from sqlalchemy.ext.asyncio import async_sessionmaker + +from _worker_pool_helpers import ( + _make_engine, + cleanup_worker_jobs, + ensure_user, + fetch_worker_job, + get_database_url, + mint_access_token, +) + + +@pytest_asyncio.fixture +async def env_setup(monkeypatch): + monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + + +@pytest.mark.asyncio +async def test_fetch_recap_context_shape(env_setup): + """fetch_recap_context — 시그니처 + 빈 user 시 0/0 + KST 타임존 표기.""" + from services.worker_recap_context import fetch_recap_context + + owner_id = await ensure_user("test-recap-empty-1c") + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + try: + async with sm() as session: + ctx = await fetch_recap_context(session, user_id=owner_id, days=7) + finally: + await engine.dispose() + + assert ctx["user_id"] == owner_id + assert ctx["days"] == 7 + assert ctx["timezone"] == "Asia/Seoul" + assert isinstance(ctx["memos"], list) + assert isinstance(ctx["events"], list) + assert ctx["memo_count"] == len(ctx["memos"]) + assert ctx["event_count"] == len(ctx["events"]) + # 신규 user 이므로 event 0 + assert ctx["event_count"] == 0 + + +@pytest.mark.asyncio +async def test_recap_endpoint_creates_worker_job(env_setup): + """/jobs/recap 호출 → 200 + worker_jobs INSERT.""" + from main import app + + owner_id = await ensure_user("test-recap-endpoint-1c") + token = mint_access_token("test-recap-endpoint-1c") + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: + r = await c.post( + "/internal/worker/jobs/recap", + json={"days": 7}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + js = r.json() + assert "job_id" in js + assert js["memo_count"] >= 0 + assert js["event_count"] >= 0 + assert js["payload_bytes"] > 0 + # DB verify + job = await fetch_worker_job(js["job_id"]) + assert job is not None + # payload 는 JSONB dict 로 저장됨 + finally: + await cleanup_worker_jobs("recap") + + +@pytest.mark.asyncio +async def test_recap_payload_413_when_oversize(env_setup, monkeypatch): + """payload 100KB 초과 시 413.""" + from api import internal_worker as iw_mod + from main import app + + owner_id = await ensure_user("test-recap-413-1c") + token = mint_access_token("test-recap-413-1c") + + async def fake_fetch(session, user_id, days=7): + return { + "user_id": user_id, + "days": days, + "period_start": "2026-05-12T00:00:00+09:00", + "period_end": "2026-05-19T00:00:00+09:00", + "timezone": "Asia/Seoul", + "memos": [{"id": i, "title": "x" * 1000} for i in range(120)], # ~120KB + "events": [], + "memo_count": 120, + "event_count": 0, + } + + monkeypatch.setattr(iw_mod, "fetch_recap_context", fake_fetch) + + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: + r = await c.post( + "/internal/worker/jobs/recap", + json={"days": 7}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 413, r.text + assert "bytes" in r.json()["detail"]