Files
hyungi_document_server/app/services/worker_recap_context.py
Hyungi Ahn eae1f48d62 feat(worker-pool): Registry-1C cap 1MB + deterministic compaction
사용자 결정 2026-05-19: 100KB cap 이 운영 7d 데이터 1.36MB 대비 부족 →
cap 상향만으로 raw 비대화 위험. cap 1MB + payload compaction 병행.

fetch_recap_context() 변경:
- memo payload item field 축소 = id/title/ai_tldr/ai_event_kind/created_at (5 필드)
  (ai_bullets/file_type/source_channel/category/extracted_text 등 제외)
- memo top-N = RECAP_MEMO_TOP_N env (default 200) — 초과분은 aggregate 로
- aggregate = memos_by_day + memos_by_kind + omitted_memos
- payload_compacted flag = aggregate fallback 발현 여부
- events 는 raw (운영 7d 데이터에서 통상 0~소량)

internal_worker.py:
- PAYLOAD_MAX_BYTES → _payload_max_bytes() env override
  (WORKER_RECAP_PAYLOAD_MAX_BYTES default 1_000_000)
- JobsRecapResponse 에 payload_compacted / omitted_memos 노출
- 413 detail 에 "after compaction" 명시 + RECAP_MEMO_TOP_N 조정 안내

테스트 3 항목 신규 + 기존 endpoint 413 test 업데이트:
- 700 memo → 200 kept + 500 omitted + compacted=true + < 1MB
- 10 memo → compacted=false + omitted=0
- 비정상 큰 title (compaction 후에도 cap 초과) → 413 유지

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 12:55:51 +09:00

153 lines
5.1 KiB
Python

"""PR-Worker-Pool-Registry-1C — recap context 조립 service.
memo/event 7d recap context 를 user_id 기준으로 묶어 worker_jobs.payload 로 사용.
회귀 위험 0:
- 새 service 모듈에 격리. memos/events API select 절 touch 0.
- read-only 쿼리만. INSERT/UPDATE 0.
policy:
- documents 는 single-user invariant (user_id 컬럼 부재) — file_type='note' 7d 전체.
- events 는 user_id 매칭 + cancelled 제외.
- timezone = Asia/Seoul ([[project_voice_memo_pipeline]] + events DEFAULT_TIMEZONE 일관).
Deterministic compaction (cap 정책 후속, 사용자 결정 2026-05-19):
- memo payload item field 축소 = `id`, `title`, `ai_tldr`, `ai_event_kind`, `created_at` (5 필드만)
- memo top-N = `RECAP_MEMO_TOP_N` (default 200) — 초과분은 aggregate 로 대체
- aggregate = `memos_by_day` + `memos_by_kind` + `omitted_memos`
- `payload_compacted` flag = aggregate fallback 발현 여부
- events 는 raw 그대로 (7d 운영 데이터에서 통상 작음)
"""
from __future__ import annotations
import os
from collections import Counter, defaultdict
from datetime import datetime, timedelta, timezone
from typing import Any
from zoneinfo import ZoneInfo
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.document import Document
from models.event import Event
DEFAULT_TIMEZONE = "Asia/Seoul"
KST = ZoneInfo(DEFAULT_TIMEZONE)
def _memo_top_n() -> int:
return int(os.getenv("RECAP_MEMO_TOP_N", "200"))
def _compact_memo(doc: Document) -> dict[str, Any]:
# 5 필드만 — ai_bullets / file_type / source_channel / category / extracted_text 등 제외.
return {
"id": doc.id,
"title": doc.title,
"ai_tldr": doc.ai_tldr,
"ai_event_kind": doc.ai_event_kind,
"created_at": doc.created_at.astimezone(KST).isoformat() if doc.created_at else None,
}
def _event_to_payload_item(ev: Event) -> dict[str, Any]:
return {
"id": ev.id,
"title": ev.title,
"description": ev.description,
"kind": ev.kind,
"status": ev.status,
"due_at": ev.due_at.astimezone(KST).isoformat() if ev.due_at else None,
"completed_at": ev.completed_at.astimezone(KST).isoformat() if ev.completed_at else None,
"tags": ev.tags,
"project_tag": ev.project_tag,
"updated_at": ev.updated_at.astimezone(KST).isoformat() if ev.updated_at else None,
}
def _aggregate_memos(docs: list[Document]) -> dict[str, Any]:
by_day: dict[str, int] = defaultdict(int)
by_kind: Counter[str] = Counter()
for d in docs:
if d.created_at is not None:
day = d.created_at.astimezone(KST).date().isoformat()
by_day[day] += 1
by_kind[d.ai_event_kind or "_unknown"] += 1
# by_day → 날짜 정렬, by_kind → 빈도 정렬
return {
"memos_by_day": dict(sorted(by_day.items())),
"memos_by_kind": dict(by_kind.most_common()),
}
async def fetch_recap_context(
session: AsyncSession, user_id: int, days: int = 7
) -> dict[str, Any]:
"""user_id 의 최근 N 일 memo + event recap context.
memo > top_n 일 때 deterministic compaction (최근 top_n full compact + 나머지 aggregate).
payload_compacted flag 로 fallback 발현 여부 노출.
"""
now = datetime.now(timezone.utc)
cutoff = now - timedelta(days=days)
top_n = _memo_top_n()
memo_res = await session.execute(
select(Document)
.where(
Document.file_type == "note",
Document.deleted_at.is_(None),
Document.archived.is_(False),
Document.created_at >= cutoff,
)
.order_by(Document.created_at.desc())
)
all_memos = memo_res.scalars().all()
total_memos = len(all_memos)
if total_memos > top_n:
kept = all_memos[:top_n]
omitted = all_memos[top_n:]
memos_payload = [_compact_memo(d) for d in kept]
aggregate = _aggregate_memos(omitted)
omitted_count = len(omitted)
payload_compacted = True
else:
memos_payload = [_compact_memo(d) for d in all_memos]
aggregate = {"memos_by_day": {}, "memos_by_kind": {}}
omitted_count = 0
payload_compacted = False
event_res = await session.execute(
select(Event)
.where(
Event.user_id == user_id,
Event.cancelled_at.is_(None),
Event.updated_at >= cutoff,
)
.order_by(Event.updated_at.desc())
)
events = [_event_to_payload_item(e) for e in event_res.scalars().all()]
return {
"user_id": user_id,
"days": days,
"period_start": cutoff.astimezone(KST).isoformat(),
"period_end": now.astimezone(KST).isoformat(),
"timezone": DEFAULT_TIMEZONE,
"memos": memos_payload,
"events": events,
"memo_count": total_memos,
"event_count": len(events),
"summary_stats": {
"total_memos": total_memos,
"memos_kept": len(memos_payload),
"omitted_memos": omitted_count,
"top_n": top_n,
**aggregate,
},
"payload_compacted": payload_compacted,
}