diff --git a/app/api/internal_worker.py b/app/api/internal_worker.py index a6ec2f6..6670645 100644 --- a/app/api/internal_worker.py +++ b/app/api/internal_worker.py @@ -15,6 +15,7 @@ worker-pool-policy §B.2 invariant 매핑: """ import json +import os from datetime import datetime, timezone from typing import Annotated, Any @@ -30,8 +31,11 @@ from models.worker_pool import WorkerCapability, WorkerHeartbeat, WorkerJob from services.worker_recap_context import fetch_recap_context # PR-Worker-Pool-Registry-1C — payload size guard (recap context 가 큰 경우 차단). -# JSON 직렬화 후 100KB 초과 → 413. memo/event 7d 묶음이 통상 < 30KB. -PAYLOAD_MAX_BYTES = 100_000 +# 사용자 결정 2026-05-19: cap 1MB 상향 + fetch_recap_context deterministic compaction +# (top-N memo + daily/kind aggregate). 운영 7d 데이터 ~1.36MB → 100KB 부족 → 1MB. +# 운영 조정용 env override = `WORKER_RECAP_PAYLOAD_MAX_BYTES`. +def _payload_max_bytes() -> int: + return int(os.getenv("WORKER_RECAP_PAYLOAD_MAX_BYTES", "1000000")) router = APIRouter() @@ -254,6 +258,8 @@ class JobsRecapResponse(BaseModel): memo_count: int event_count: int payload_bytes: int + payload_compacted: bool + omitted_memos: int @router.post("/jobs/recap", response_model=JobsRecapResponse) @@ -269,12 +275,14 @@ async def enqueue_recap( """ context = await fetch_recap_context(session, user_id=user.id, days=body.days) payload_bytes = len(json.dumps(context, ensure_ascii=False).encode("utf-8")) - if payload_bytes > PAYLOAD_MAX_BYTES: + cap = _payload_max_bytes() + if payload_bytes > cap: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail=( - f"recap context payload {payload_bytes} bytes > {PAYLOAD_MAX_BYTES} bytes. " - "days 를 줄여 재시도하거나 운영자에게 cap 조정 요청." + f"recap context payload {payload_bytes} bytes > {cap} bytes (after compaction). " + f"days 를 줄여 재시도 (현재 {body.days}d) 또는 운영자에게 RECAP_MEMO_TOP_N / " + "WORKER_RECAP_PAYLOAD_MAX_BYTES 조정 요청." ), ) @@ -291,6 +299,8 @@ async def enqueue_recap( memo_count=context["memo_count"], event_count=context["event_count"], payload_bytes=payload_bytes, + payload_compacted=context["payload_compacted"], + omitted_memos=context["summary_stats"]["omitted_memos"], ) diff --git a/app/services/worker_recap_context.py b/app/services/worker_recap_context.py index 975b180..2d92e5d 100644 --- a/app/services/worker_recap_context.py +++ b/app/services/worker_recap_context.py @@ -1,16 +1,28 @@ """PR-Worker-Pool-Registry-1C — recap context 조립 service. memo/event 7d recap context 를 user_id 기준으로 묶어 worker_jobs.payload 로 사용. -회귀 위험 0 보장: + +회귀 위험 0: - 새 service 모듈에 격리. memos/events API select 절 touch 0. - read-only 쿼리만. INSERT/UPDATE 0. + +policy: - documents 는 single-user invariant (user_id 컬럼 부재) — file_type='note' 7d 전체. - events 는 user_id 매칭 + cancelled 제외. - timezone = Asia/Seoul ([[project_voice_memo_pipeline]] + events DEFAULT_TIMEZONE 일관). + +Deterministic compaction (cap 정책 후속, 사용자 결정 2026-05-19): + - memo payload item field 축소 = `id`, `title`, `ai_tldr`, `ai_event_kind`, `created_at` (5 필드만) + - memo top-N = `RECAP_MEMO_TOP_N` (default 200) — 초과분은 aggregate 로 대체 + - aggregate = `memos_by_day` + `memos_by_kind` + `omitted_memos` + - `payload_compacted` flag = aggregate fallback 발현 여부 + - events 는 raw 그대로 (7d 운영 데이터에서 통상 작음) """ from __future__ import annotations +import os +from collections import Counter, defaultdict from datetime import datetime, timedelta, timezone from typing import Any from zoneinfo import ZoneInfo @@ -25,17 +37,18 @@ DEFAULT_TIMEZONE = "Asia/Seoul" KST = ZoneInfo(DEFAULT_TIMEZONE) -def _memo_to_payload_item(doc: Document) -> dict[str, Any]: +def _memo_top_n() -> int: + return int(os.getenv("RECAP_MEMO_TOP_N", "200")) + + +def _compact_memo(doc: Document) -> dict[str, Any]: + # 5 필드만 — ai_bullets / file_type / source_channel / category / extracted_text 등 제외. return { "id": doc.id, "title": doc.title, "ai_tldr": doc.ai_tldr, - "ai_bullets": doc.ai_bullets, "ai_event_kind": doc.ai_event_kind, - "category": doc.category, "created_at": doc.created_at.astimezone(KST).isoformat() if doc.created_at else None, - "file_type": doc.file_type, - "source_channel": doc.source_channel, } @@ -54,19 +67,32 @@ def _event_to_payload_item(ev: Event) -> dict[str, Any]: } +def _aggregate_memos(docs: list[Document]) -> dict[str, Any]: + by_day: dict[str, int] = defaultdict(int) + by_kind: Counter[str] = Counter() + for d in docs: + if d.created_at is not None: + day = d.created_at.astimezone(KST).date().isoformat() + by_day[day] += 1 + by_kind[d.ai_event_kind or "_unknown"] += 1 + # by_day → 날짜 정렬, by_kind → 빈도 정렬 + return { + "memos_by_day": dict(sorted(by_day.items())), + "memos_by_kind": dict(by_kind.most_common()), + } + + async def fetch_recap_context( session: AsyncSession, user_id: int, days: int = 7 ) -> dict[str, Any]: - """user_id 의 최근 N 일 memo + event 묶음 반환. + """user_id 의 최근 N 일 memo + event recap context. - memos: documents WHERE file_type='note' AND deleted_at IS NULL AND created_at >= cutoff - (single-user invariant — documents.user_id 부재). - events: events WHERE user_id=:user_id AND cancelled_at IS NULL AND updated_at >= cutoff. - - archived 메모 제외 (recap 노이즈 감소). audio voice 메모는 file_type='immutable' 라 자동 제외. + memo > top_n 일 때 deterministic compaction (최근 top_n full compact + 나머지 aggregate). + payload_compacted flag 로 fallback 발현 여부 노출. """ now = datetime.now(timezone.utc) cutoff = now - timedelta(days=days) + top_n = _memo_top_n() memo_res = await session.execute( select(Document) @@ -78,7 +104,21 @@ async def fetch_recap_context( ) .order_by(Document.created_at.desc()) ) - memos = [_memo_to_payload_item(d) for d in memo_res.scalars().all()] + all_memos = memo_res.scalars().all() + total_memos = len(all_memos) + + if total_memos > top_n: + kept = all_memos[:top_n] + omitted = all_memos[top_n:] + memos_payload = [_compact_memo(d) for d in kept] + aggregate = _aggregate_memos(omitted) + omitted_count = len(omitted) + payload_compacted = True + else: + memos_payload = [_compact_memo(d) for d in all_memos] + aggregate = {"memos_by_day": {}, "memos_by_kind": {}} + omitted_count = 0 + payload_compacted = False event_res = await session.execute( select(Event) @@ -97,8 +137,16 @@ async def fetch_recap_context( "period_start": cutoff.astimezone(KST).isoformat(), "period_end": now.astimezone(KST).isoformat(), "timezone": DEFAULT_TIMEZONE, - "memos": memos, + "memos": memos_payload, "events": events, - "memo_count": len(memos), + "memo_count": total_memos, "event_count": len(events), + "summary_stats": { + "total_memos": total_memos, + "memos_kept": len(memos_payload), + "omitted_memos": omitted_count, + "top_n": top_n, + **aggregate, + }, + "payload_compacted": payload_compacted, } diff --git a/tests/test_worker_recap_compaction.py b/tests/test_worker_recap_compaction.py new file mode 100644 index 0000000..f1af3e7 --- /dev/null +++ b/tests/test_worker_recap_compaction.py @@ -0,0 +1,202 @@ +"""PR-Worker-Pool-Registry-1C compaction — deterministic top-N + aggregate. + +3 항목 (사용자 결정 2026-05-19 spec): + 1. 700+ memo → /jobs/recap 200 + payload_size < 1MB + payload_compacted=true + omitted > 0 + 2. 작은 fixture (10 memos) → compacted=false + omitted=0 + 3. 거대 title (1MB 초과 시뮬레이션) → 413 유지 + +monkeypatch 로 fetch_recap_context 또는 _memo_top_n 조정. +""" + +from __future__ import annotations + +import json +import os +import sys + +import pytest +import pytest_asyncio + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +from httpx import ASGITransport, AsyncClient + +from _worker_pool_helpers import ( + cleanup_worker_jobs, + ensure_user, + fetch_worker_job, + mint_access_token, +) + + +@pytest_asyncio.fixture +async def env_setup(monkeypatch): + monkeypatch.setenv("LAPTOP_WORKER_BOT_USERNAME", "laptop-worker-bot") + + +def _make_fake_context(total_memos: int, top_n: int = 200) -> dict: + """compaction 모사 — _compact_memo 형식 (5 필드) 의 row.""" + from datetime import datetime, timezone + from zoneinfo import ZoneInfo + + kst = ZoneInfo("Asia/Seoul") + now = datetime.now(timezone.utc) + kept_count = min(total_memos, top_n) + omitted = max(0, total_memos - top_n) + memos = [ + { + "id": i, + "title": f"memo {i}", + "ai_tldr": f"tldr {i}", + "ai_event_kind": "note", + "created_at": now.astimezone(kst).isoformat(), + } + for i in range(kept_count) + ] + return { + "user_id": 1, + "days": 7, + "period_start": now.astimezone(kst).isoformat(), + "period_end": now.astimezone(kst).isoformat(), + "timezone": "Asia/Seoul", + "memos": memos, + "events": [], + "memo_count": total_memos, + "event_count": 0, + "summary_stats": { + "total_memos": total_memos, + "memos_kept": kept_count, + "omitted_memos": omitted, + "top_n": top_n, + "memos_by_day": {"2026-05-19": omitted} if omitted else {}, + "memos_by_kind": {"note": omitted} if omitted else {}, + }, + "payload_compacted": omitted > 0, + } + + +@pytest.mark.asyncio +async def test_compaction_kicks_in_at_top_n(env_setup, monkeypatch): + """700 memo → 200 kept + 500 omitted + payload_compacted=true + payload < 1MB.""" + from api import internal_worker as iw_mod + from main import app + + fake = _make_fake_context(total_memos=700, top_n=200) + + async def fake_fetch(session, user_id, days=7): + # user_id 채워 줌 + fake["user_id"] = user_id + return fake + + monkeypatch.setattr(iw_mod, "fetch_recap_context", fake_fetch) + await ensure_user("test-recap-compact-1c") + token = mint_access_token("test-recap-compact-1c") + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: + r = await c.post( + "/internal/worker/jobs/recap", + json={"days": 7}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + js = r.json() + assert js["memo_count"] == 700 + assert js["payload_compacted"] is True + assert js["omitted_memos"] == 500 + assert js["payload_bytes"] < 1_000_000 + # DB verify — payload 안에 compacted memos + summary_stats + job = await fetch_worker_job(js["job_id"]) + assert job is not None + finally: + await cleanup_worker_jobs("recap") + + +@pytest.mark.asyncio +async def test_no_compaction_when_under_top_n(env_setup, monkeypatch): + """10 memo → kept 10 + omitted 0 + payload_compacted=false.""" + from api import internal_worker as iw_mod + from main import app + + fake = _make_fake_context(total_memos=10, top_n=200) + + async def fake_fetch(session, user_id, days=7): + fake["user_id"] = user_id + return fake + + monkeypatch.setattr(iw_mod, "fetch_recap_context", fake_fetch) + await ensure_user("test-recap-uncompact-1c") + token = mint_access_token("test-recap-uncompact-1c") + + try: + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: + r = await c.post( + "/internal/worker/jobs/recap", + json={"days": 7}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 200, r.text + js = r.json() + assert js["memo_count"] == 10 + assert js["payload_compacted"] is False + assert js["omitted_memos"] == 0 + finally: + await cleanup_worker_jobs("recap") + + +@pytest.mark.asyncio +async def test_413_when_compacted_still_over_cap(env_setup, monkeypatch): + """비정상 큰 title (compaction 후에도 1MB 초과) → 413 유지.""" + from api import internal_worker as iw_mod + from main import app + + async def fake_fetch(session, user_id, days=7): + huge_title = "x" * 6000 # per memo + return { + "user_id": user_id, + "days": days, + "period_start": "2026-05-12T00:00:00+09:00", + "period_end": "2026-05-19T00:00:00+09:00", + "timezone": "Asia/Seoul", + "memos": [ + { + "id": i, + "title": huge_title, + "ai_tldr": None, + "ai_event_kind": None, + "created_at": "2026-05-19T00:00:00+09:00", + } + for i in range(200) + ], # ~1.2MB + "events": [], + "memo_count": 200, + "event_count": 0, + "summary_stats": { + "total_memos": 200, + "memos_kept": 200, + "omitted_memos": 0, + "top_n": 200, + "memos_by_day": {}, + "memos_by_kind": {}, + }, + "payload_compacted": False, + } + + monkeypatch.setattr(iw_mod, "fetch_recap_context", fake_fetch) + await ensure_user("test-recap-413-large-1c") + token = mint_access_token("test-recap-413-large-1c") + + async with AsyncClient( + transport=ASGITransport(app=app), base_url="http://test" + ) as c: + r = await c.post( + "/internal/worker/jobs/recap", + json={"days": 7}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert r.status_code == 413, r.text + assert "after compaction" in r.json()["detail"] diff --git a/tests/test_worker_recap_endpoint.py b/tests/test_worker_recap_endpoint.py index 3117308..0097033 100644 --- a/tests/test_worker_recap_endpoint.py +++ b/tests/test_worker_recap_endpoint.py @@ -83,6 +83,8 @@ async def test_recap_endpoint_creates_worker_job(env_setup): assert js["memo_count"] >= 0 assert js["event_count"] >= 0 assert js["payload_bytes"] > 0 + assert "payload_compacted" in js + assert "omitted_memos" in js # DB verify job = await fetch_worker_job(js["job_id"]) assert job is not None @@ -93,7 +95,7 @@ async def test_recap_endpoint_creates_worker_job(env_setup): @pytest.mark.asyncio async def test_recap_payload_413_when_oversize(env_setup, monkeypatch): - """payload 100KB 초과 시 413.""" + """payload 1MB 초과 시 413 (사용자 결정 2026-05-19 cap 1MB).""" from api import internal_worker as iw_mod from main import app @@ -107,10 +109,20 @@ async def test_recap_payload_413_when_oversize(env_setup, monkeypatch): "period_start": "2026-05-12T00:00:00+09:00", "period_end": "2026-05-19T00:00:00+09:00", "timezone": "Asia/Seoul", - "memos": [{"id": i, "title": "x" * 1000} for i in range(120)], # ~120KB + # ~1.2MB raw payload (compaction 후에도 cap 초과 가정) + "memos": [{"id": i, "title": "x" * 6000} for i in range(200)], "events": [], - "memo_count": 120, + "memo_count": 200, "event_count": 0, + "summary_stats": { + "total_memos": 200, + "memos_kept": 200, + "omitted_memos": 0, + "top_n": 200, + "memos_by_day": {}, + "memos_by_kind": {}, + }, + "payload_compacted": False, } monkeypatch.setattr(iw_mod, "fetch_recap_context", fake_fetch) @@ -125,3 +137,4 @@ async def test_recap_payload_413_when_oversize(env_setup, monkeypatch): ) assert r.status_code == 413, r.text assert "bytes" in r.json()["detail"] + assert "after compaction" in r.json()["detail"]