Compare commits

..

3 Commits

Author SHA1 Message Date
hyungi a55bb3453d feat(presegment): PR3 테스트 — 알람·dedupe·override 검증·재개·무회귀 19건
tests/test_presegment_pr3.py: alerts no-op(env 미설정, 프로세스당 1회 로그)/
synochat·ntfy 포맷/실패 무raise(webhook 전부 fake — 실호출 0), HOLD 알람
발화+alerted_at 7일 dedupe, validate_override_boundaries(정상/dict형/중첩/
캡초과/커버리지 부족/범위 밖/TODO 잔존/공백 경고), leaf_spans 원문 재구성,
units_override 가 tier 판정(plan_summarize_units) 우회하고 map-reduce 재개,
잘못된 override(캡 초과·source_len 불일치)=재-HOLD+알람+LLM 콜 0,
override 없는 소형(단일콜)·whole(HOLD+알람) 문서 무회귀.

기존 test_summarize_units 26 + test_deep_summary_mapreduce 등 인접 100건
pass 유지 (test_pipeline_hold 1건 실패는 main 기존 결함 — 본 PR 무관).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 9061f2e25c feat(presegment): PR3 C — 유인 분할 CLI scripts/presegment_attended.py
컨테이너 안(docker exec)에서 실행하는 3-subcommand CLI:
  list   — awaiting_split 보류 큐 행(문서·tier·over%·토큰·초과 섹션·보류/재개 시각)
  export — 문서 통계+hier 개요(overview.md)·자동 pack 유닛 제안(summarize_units
           재사용)·초과 섹션 (start,end) 스팬+본문 덤프(섹션당 파일, 200K자 분할,
           파일명에 절대 오프셋)·boundaries 템플릿 JSON(자동팩 채움+초과=todo 마커)
           +README(유인 클로드 세션 작업 안내)
  apply  — 경계 검증(단조증가·비중첩·본문 범위·커버리지 90%+공백 경고·유닛 캡
           초과 시 유닛 명시 거부·todo 잔존 거부·source_len 드리프트 거부) 통과 시
           payload.presegment.units_override 기록 + awaiting_split=false +
           deferred_until 제거(즉시 재개) + status pending·alerted_at/map_results
           정리. --dry-run 지원.

stdout = 사람이 읽는 요약 + '{' 로 시작하는 기계 파싱용 JSON 라인.
DB 접속 = 기존 scripts/ 패턴(DATABASE_URL env, backfill_tier.py 동형).
마이그레이션 없음 — payload JSONB 만 사용.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 33427d4a42 feat(presegment): PR3 A+B — HOLD 웹훅 알람 + units_override 재개 경로
A. services/alerts.py 신설 — send_alert(title, message):
   ALERT_WEBHOOK_URL 미설정=no-op(프로세스당 1회 INFO), ALERT_WEBHOOK_KIND
   synochat(기본)|ntfy, httpx 5s, 실패=WARNING만(절대 raise 금지).
   deep_summary HOLD/override 거부 시 발화 — 문서 id·제목·tier·over%·토큰·
   초과 섹션 상위3·재개 예정시각·유인 분할 힌트. dedupe=payload.presegment
   .alerted_at(7일) — 매 24h 재보류마다 재알람 방지.

B. units_override 재개 — payload.presegment.units_override 존재 시 tier
   재판정·HOLD 없이 (start,end,title) 문자 오프셋 경계로 유닛 구성 후 기존
   PR2 map-reduce 그대로(유닛 단위 멱등 commit·reduce·doc 기록). 방어:
   source_len 불일치·형식 오류·유닛 추정토큰 > CAP*1.1 이면 실패 대신
   재-HOLD + 알람(잘못된 override 의 900s 콜 재생산 차단). override 없는
   문서는 기존 경로 무회귀.

summarize_units 에 공용 순수함수 추가: choose_override_source(md_content
우선)·validate_override_boundaries(단조·비중첩·범위·커버리지 90%·유닛 캡)·
units_from_boundaries·leaf_spans + greedy_pack 유닛에 leaf_indexes 기록
(export CLI 스팬 계산용).

plan ds-presegment-mapreduce-2 / env DEEP_SUMMARY_HOLD_RETRY_MINUTES 유지.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:02 +09:00
5 changed files with 1393 additions and 15 deletions
+69
View File
@@ -0,0 +1,69 @@
"""운영 알람 webhook (presegment PR3 — HOLD 유인 전환 게이트).
deep_summary HOLD(awaiting_split) 처럼 "사람이 개입해야 풀리는" 상태를 웹훅으로 발화한다.
환경변수:
ALERT_WEBHOOK_URL — 미설정 = no-op (프로세스당 1회 INFO 로그만).
ALERT_WEBHOOK_KIND — 'synochat' | 'ntfy' (기본 synochat).
synochat: Synology Chat incoming webhook — POST form `payload={"text": "..."}`.
ntfy: POST body=message. 제목은 query param(?title=) — HTTP 헤더는 latin-1
한정이라 한글 제목이 깨진다 (ntfy 는 query param title 을 공식 지원).
불변식: 알람은 절대 raise 하지 않는다 — 실패는 WARNING 로그만. 알람이 워커를
죽이면 본전(요약 파이프라인)이 무너진다. 호출부는 반환값(bool)에 의존하지 말 것.
"""
from __future__ import annotations
import json
import os
import httpx
from core.utils import setup_logger
logger = setup_logger("alerts")
ALERT_TIMEOUT_SECONDS = 5.0
# 프로세스당 1회만 "미설정 no-op" 로그 — 매 HOLD 마다 로그 오염 방지.
_noop_logged = False
async def send_alert(title: str, message: str) -> bool:
"""webhook 으로 알람 1건 발화. 성공 True / no-op·실패 False. 절대 raise 금지."""
global _noop_logged
url = (os.getenv("ALERT_WEBHOOK_URL") or "").strip()
if not url:
if not _noop_logged:
logger.info("ALERT_WEBHOOK_URL 미설정 — 알람 no-op (이 로그는 프로세스당 1회)")
_noop_logged = True
return False
kind = (os.getenv("ALERT_WEBHOOK_KIND") or "synochat").strip().lower()
if kind not in ("synochat", "ntfy"):
logger.warning(f"ALERT_WEBHOOK_KIND={kind!r} 미지원 — synochat 으로 폴백")
kind = "synochat"
try:
async with httpx.AsyncClient(timeout=ALERT_TIMEOUT_SECONDS) as client:
if kind == "ntfy":
resp = await client.post(
url,
params={"title": title},
content=message.encode("utf-8"),
)
else: # synochat
text = f"{title}\n{message}" if title else message
resp = await client.post(
url,
data={"payload": json.dumps({"text": text}, ensure_ascii=False)},
)
if resp.status_code >= 400:
logger.warning(
f"알람 webhook({kind}) HTTP {resp.status_code}: {resp.text[:200]}"
)
return False
return True
except Exception as exc: # noqa: BLE001 — 알람 실패가 워커를 죽이면 안 됨
logger.warning(f"알람 webhook({kind}) 발화 실패: {exc}")
return False
+193 -3
View File
@@ -66,6 +66,9 @@ class SummarizeUnit:
text: str = ""
est_tokens: int = 0
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
# PR3: 이 유닛을 구성한 leaf 의 서수(extract_leaves 순서) — export CLI 가
# leaf_spans 와 결합해 유닛 (start,end) 스팬을 계산한다. 페이로드 미기록.
leaf_indexes: list[int] = field(default_factory=list)
@dataclass
@@ -92,20 +95,22 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
units: list[SummarizeUnit] = []
cur_titles: list[str | None] = []
cur_texts: list[str] = []
cur_indexes: list[int] = []
cur_tokens = 0
def _flush() -> None:
nonlocal cur_titles, cur_texts, cur_tokens
nonlocal cur_titles, cur_texts, cur_indexes, cur_tokens
if cur_texts:
units.append(SummarizeUnit(
index=len(units),
section_titles=cur_titles,
text="\n\n".join(cur_texts),
est_tokens=cur_tokens,
leaf_indexes=cur_indexes,
))
cur_titles, cur_texts, cur_tokens = [], [], 0
cur_titles, cur_texts, cur_indexes, cur_tokens = [], [], [], 0
for leaf in leaves:
for li, leaf in enumerate(leaves):
t = estimate_tokens(leaf.text)
if t > cap:
_flush()
@@ -115,12 +120,14 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
text=leaf.text,
est_tokens=t,
over_cap=True,
leaf_indexes=[li],
))
continue
if cur_tokens + t > cap:
_flush()
cur_titles.append(leaf.section_title)
cur_texts.append(leaf.text)
cur_indexes.append(li)
cur_tokens += t
_flush()
return units
@@ -222,3 +229,186 @@ def build_reduce_units_block(
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
truncated = True
return block, truncated
# ─── PR3 — 유인 분할(units_override) 경계 순수함수 (worker + attended CLI 공용) ───
#
# HOLD(hybrid/whole) 문서를 사람이(유인 클로드 세션) 분할한 경계
# [(start, end, title)] 로 재개하는 경로. 오프셋 = 소스 텍스트의 Python 문자
# (code point) 인덱스 — export 가 덤프한 파일과 apply/워커의 슬라이스가 같은
# 기준을 공유해야 한다 (builder 의 char_start 는 UTF-16 단위라 여기서 미사용).
OVERRIDE_MIN_COVERAGE_PCT = 90.0 # apply 게이트 — 전체 본문의 90%+ 커버 필수
OVERRIDE_GAP_WARN_CHARS = 1_000 # 이보다 큰 공백 구간은 경고로 노출
@dataclass
class OverrideCheck:
"""validate_override_boundaries 결과 — ok=False 면 errors 에 사유."""
ok: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
coverage_pct: float = 0.0
# 정규화된 (start, end, title) — units_from_boundaries 입력으로 그대로 사용
boundaries: list[tuple[int, int, str | None]] = field(default_factory=list)
unit_tokens: list[int] = field(default_factory=list)
def choose_override_source(
md_content: str | None, extracted_text: str | None
) -> tuple[str, str]:
"""units_override 오프셋 기준 텍스트 선택 — (source_name, text).
canonical markdown(md_content) 우선, 부재/공백 시 extracted_text 폴백.
export CLI · apply CLI · 워커 재개가 반드시 같은 규칙을 공유해야
(start,end) 오프셋이 일치한다. 선택 결과는 units_override.source 에 박제.
"""
if md_content and md_content.strip():
return "md_content", md_content
return "extracted_text", extracted_text or ""
def _normalize_boundary(entry, idx: int, errors: list[str]) -> tuple[int, int, str | None] | None:
"""boundaries 1건 정규화 — [start,end,title?] 배열 또는 {start,end,title} 객체.
export 템플릿의 초과 스팬은 "todo" 키를 달고 나온다 — 미해결(todo 잔존) 상태로
apply 하면 여기서 에러 (사람이 CAP 이하 경계로 분할을 완성해야 통과).
"""
if isinstance(entry, dict):
if entry.get("todo"):
errors.append(
f"유닛 {idx}: TODO 미해결 — 초과 스팬을 CAP 이하 경계들로 분할한 뒤 todo 키를 제거해야 함"
)
return None
start, end, title = entry.get("start"), entry.get("end"), entry.get("title")
elif isinstance(entry, (list, tuple)) and 2 <= len(entry) <= 3:
start, end = entry[0], entry[1]
title = entry[2] if len(entry) == 3 else None
else:
errors.append(f"유닛 {idx}: 형식 오류 — [start, end, title] 또는 {{start, end, title}} 여야 함")
return None
if (
isinstance(start, bool) or isinstance(end, bool)
or not isinstance(start, int) or not isinstance(end, int)
):
errors.append(f"유닛 {idx}: start/end 는 정수여야 함 (start={start!r}, end={end!r})")
return None
if title is not None and not isinstance(title, str):
title = str(title)
return start, end, title
def validate_override_boundaries(
text: str,
raw_boundaries,
*,
cap: int = CAP_TOKENS,
min_coverage_pct: float = OVERRIDE_MIN_COVERAGE_PCT,
gap_warn_chars: int = OVERRIDE_GAP_WARN_CHARS,
) -> OverrideCheck:
"""units_override 경계 검증 — 단조증가·비중첩·본문 범위 내·커버리지·유닛별 캡.
apply CLI = 기본 게이트(cap=CAP_TOKENS, coverage>=90%).
워커 방어 = cap 슬랙(CAP*1.1)·coverage 0 으로 완화 호출 — 잘못된 override 가
900s 콜을 재생산하는 것만 차단하고, 품질 게이트는 apply 시점에 이미 통과했다고 본다.
"""
errors: list[str] = []
warnings: list[str] = []
if not isinstance(raw_boundaries, (list, tuple)) or not raw_boundaries:
return OverrideCheck(ok=False, errors=["boundaries 가 비어있거나 리스트가 아님"])
normalized: list[tuple[int, int, str | None]] = []
for i, entry in enumerate(raw_boundaries):
norm = _normalize_boundary(entry, i, errors)
if norm is not None:
normalized.append(norm)
if errors:
return OverrideCheck(ok=False, errors=errors, warnings=warnings)
n = len(text)
prev_end = None
unit_tokens: list[int] = []
covered = 0
for i, (start, end, title) in enumerate(normalized):
label = f"유닛 {i}" + (f" ({title})" if title else "")
if start < 0 or end > n:
errors.append(f"{label}: 본문 범위 밖 — [{start}, {end}) vs len={n}")
continue
if start >= end:
errors.append(f"{label}: start >= end ([{start}, {end}))")
continue
if prev_end is not None and start < prev_end:
errors.append(f"{label}: 직전 유닛과 중첩/역순 — start={start} < 직전 end={prev_end}")
if prev_end is not None and start - prev_end > gap_warn_chars:
warnings.append(f"{label} 앞 공백 구간 {start - prev_end:,}자 ([{prev_end}, {start})) — 의도 확인")
est = estimate_tokens(text[start:end])
unit_tokens.append(est)
if est > cap:
errors.append(f"{label}: 추정 {est:,} tok > cap {cap:,} — 이 스팬을 더 분할해야 함")
covered += end - start
prev_end = max(prev_end or 0, end)
if not errors:
head_gap = normalized[0][0]
tail_gap = n - normalized[-1][1]
if head_gap > gap_warn_chars:
warnings.append(f"문서 선두 공백 구간 {head_gap:,}자 ([0, {normalized[0][0]})) — 의도 확인")
if tail_gap > gap_warn_chars:
warnings.append(f"문서 말미 공백 구간 {tail_gap:,}자 ([{normalized[-1][1]}, {n})) — 의도 확인")
coverage_pct = round(covered * 100.0 / n, 2) if n else 0.0
if not errors and coverage_pct < min_coverage_pct:
errors.append(
f"커버리지 {coverage_pct}% < {min_coverage_pct}% — 경계가 본문 대부분을 덮어야 함"
)
return OverrideCheck(
ok=not errors,
errors=errors,
warnings=warnings,
coverage_pct=coverage_pct,
boundaries=normalized if not errors else [],
unit_tokens=unit_tokens,
)
def units_from_boundaries(
text: str, boundaries: list[tuple[int, int, str | None]]
) -> list[SummarizeUnit]:
"""정규화·검증 통과한 (start,end,title) 리스트 → SummarizeUnit 리스트.
유닛 index = 경계 서수 — boundaries 는 payload 에 박제되므로 attempt 간 안정
(map_results 멱등 재개 키와 정합).
"""
units: list[SummarizeUnit] = []
for i, (start, end, title) in enumerate(boundaries):
seg = text[start:end]
units.append(SummarizeUnit(
index=i,
section_titles=[title],
text=seg,
est_tokens=estimate_tokens(seg),
))
return units
def leaf_spans(text: str, leaves: list[HierNode]) -> list[tuple[int, int]]:
"""extract_leaves 결과 leaf 들의 원문 (start,end) 문자 스팬.
_segment 가 원문을 연속 파티션(빈 preamble 만 폐기)으로 자르므로, 커서 순차
탐색이 항상 정확한 위치를 찾는다 (동일 본문 반복이 있어도 순서가 앞선 leaf 가
앞 오프셋을 가져간다).
"""
spans: list[tuple[int, int]] = []
cursor = 0
for leaf in leaves:
pos = text.find(leaf.text, cursor)
if pos < 0:
# 이론상 불가(연속 파티션) — 방어적으로 전체 재탐색
pos = text.find(leaf.text)
if pos < 0:
raise ValueError(f"leaf 본문을 원문에서 찾지 못함 (title={leaf.section_title!r})")
spans.append((pos, pos + len(leaf.text)))
cursor = pos + len(leaf.text)
return spans
+189 -12
View File
@@ -14,7 +14,7 @@ import asyncio
import json
import os
import time
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import desc, select
@@ -29,15 +29,19 @@ from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue, StageDeferred
from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.alerts import send_alert
from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.summarize_units import (
CAP_TOKENS,
UnitPlan,
build_reduce_units_block,
choose_override_source,
estimate_tokens,
plan_summarize_units,
render_map_slice,
units_from_boundaries,
validate_override_boundaries,
)
logger = setup_logger("deep_summary_worker")
@@ -45,9 +49,16 @@ logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary"
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
REDUCE_TASK = "p3c_deep_summary_reduce"
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는
# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음.
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
# 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
ALERT_DEDUPE_DAYS = 7
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
OVERRIDE_CAP_SLACK = 1.1
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
@@ -111,6 +122,17 @@ async def process(
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
if (payload.get("presegment") or {}).get("units_override"):
await _process_units_override(
doc, queue_row, envelope, subject_domain, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
return
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
@@ -123,7 +145,9 @@ async def process(
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
if unit_plan.tier != "auto" or not unit_plan.units:
await _hold_awaiting_split(session, queue_row, unit_plan, document_id)
await _hold_awaiting_split(
session, queue_row, unit_plan, document_id, doc_title=doc.title
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, unit_plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
@@ -250,43 +274,196 @@ async def process(
)
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
ts = preseg.get("alerted_at")
if not ts:
return True
try:
prev = datetime.fromisoformat(str(ts))
except ValueError:
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
if prev.tzinfo is None:
prev = prev.replace(tzinfo=timezone.utc)
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
async def _hold_awaiting_split(
session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int
session: AsyncSession,
queue_row: ProcessingQueue,
plan: UnitPlan,
document_id: int,
doc_title: str | None = None,
) -> None:
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
알람(ntfy)·클로드 경계 주입은 PR3 — 그 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복.
PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe — 매 24h 재보류마다 재알람 방지).
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
"""
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
oversized = [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20]
preseg.update({
"awaiting_split": True,
"tier": plan.tier,
"over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens,
"units": len(plan.units),
# 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용)
"oversized_sections": [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20],
# 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
"oversized_sections": oversized,
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.info(
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)"
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
)
if alert_due:
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
await send_alert(
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
(
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
f"tier={plan.tier} / over%={plan.over_pct} / "
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
f"초과 섹션(상위 3): {top3}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
),
)
raise StageDeferred(
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _rehold_bad_override(
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
) -> None:
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
units_override 는 payload 에 보존(사람이 원인 조사) + override_rejected 사유 기록.
apply CLI 가 alerted_at 을 지워두므로 첫 거부는 즉시 발화되고, 이후 24h 재보류
루프는 ALERT_DEDUPE_DAYS dedupe 로 억제된다.
"""
document_id = doc.id
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
preseg.update({
"awaiting_split": True,
"override_rejected": reason,
"override_rejected_at": now.isoformat(),
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
if alert_due:
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
await send_alert(
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
(
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
f"사유: {reason}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"수정: scripts/presegment_attended.py export --doc {document_id} "
f"→ apply --doc {document_id} --boundaries FILE"
),
)
raise StageDeferred(
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _process_units_override(
doc: Document,
queue_row: ProcessingQueue,
envelope: EscalationEnvelope,
subject_domain: str,
session: AsyncSession,
*,
defer_on_deep_unavailable: bool,
) -> None:
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
(source_len 일치·단조·비중첩·범위·유닛 캡*1.1) 실패 시 재-HOLD + 알람 —
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
통과 시 기존 PR2 _process_map_reduce 를 그대로 탄다(맵 결과 유닛 단위 commit·
reduce·ai_detail_summary 기록 — 유닛 index 는 payload 박제 경계 서수라 안정).
"""
document_id = doc.id
preseg = dict((queue_row.payload or {}).get("presegment") or {})
override = preseg.get("units_override")
if isinstance(override, (list, tuple)):
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
override = {"boundaries": list(override)}
if not isinstance(override, dict):
await _rehold_bad_override(
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
)
source = override.get("source")
if source is None:
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
elif source in ("md_content", "extracted_text"):
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
else:
await _rehold_bad_override(
session, queue_row, doc, f"units_override.source={source!r} 미지원"
)
expected_len = override.get("source_len")
if expected_len is not None and expected_len != len(text_src):
await _rehold_bad_override(
session, queue_row, doc,
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
" (본문 재생성됨 — export 부터 재실행)",
)
check = validate_override_boundaries(
text_src,
override.get("boundaries") or [],
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
)
if not check.ok:
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
units = units_from_boundaries(text_src, check.boundaries)
plan = UnitPlan(
mode="map_reduce",
tier="override",
total_est_tokens=estimate_tokens(text_src),
over_pct=float(preseg.get("over_pct") or 0.0),
units=units,
)
logger.info(
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
async def _call_26b(
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
):
+456
View File
@@ -0,0 +1,456 @@
"""presegment PR3 — HOLD 거대문서 유인 분할 CLI (plan ds-presegment-mapreduce-2).
deep_summary 워커가 HOLD(payload.presegment.awaiting_split=true) 로 보류한
hybrid/whole tier 거대문서를, 사람이(유인 클로드 세션) 경계를 완성해 재개시키는 도구.
사용법 (fastapi 컨테이너 안에서 실행):
docker compose exec fastapi python /app/scripts/presegment_attended.py list
docker compose exec fastapi python /app/scripts/presegment_attended.py export --doc 44443 --out /app/logs/preseg_44443
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json --dry-run
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json
워크플로우:
1. list — awaiting_split 문서 확인.
2. export — 문서 통계·hier 개요·자동 pack 유닛 제안·초과 섹션 본문 덤프·boundaries
템플릿 JSON 생성. 유인 클로드 세션은 이 파일들만 읽고 TODO 스팬을
CAP 이하 경계들로 분할해 템플릿을 완성한다.
3. apply — 완성된 boundaries 검증(단조·비중첩·범위·커버리지 90%+·유닛 캡) 후
payload.presegment.units_override 기록 + awaiting_split 해제 +
deferred_until 제거(즉시 재개). 워커가 다음 사이클에 map-reduce 재개.
stdout 규약: 사람이 읽는 요약 행 + '{' 로 시작하는 기계 파싱용 JSON 라인(1건 1라인).
사람용 행은 절대 '{' 로 시작하지 않는다.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
from sqlalchemy import text as sql_text # noqa: E402
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # noqa: E402
from services.hier_decomp.builder import build_hier_tree # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
OVERRIDE_MIN_COVERAGE_PCT,
TRIGGER_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
validate_override_boundaries,
)
# 초과 섹션 본문 덤프 분할 단위 (유인 세션 컨텍스트 보호)
DUMP_CHUNK_CHARS = 200_000
def _jsonl(obj: dict) -> None:
"""기계 파싱용 JSON 라인 — 반드시 '{' 로 시작하는 단독 라인."""
print(json.dumps(obj, ensure_ascii=False, default=str))
def _session_factory():
database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
)
engine = create_async_engine(database_url)
return engine, async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# ─── list ────────────────────────────────────────────────────────────────────
LIST_SQL = """
SELECT q.id AS queue_id, q.document_id, q.status, q.attempts,
q.payload::text AS payload_text,
LEFT(COALESCE(d.title, '(제목 없음)'), 80) AS title
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'deep_summary'
AND q.status IN ('pending', 'processing', 'failed')
AND (q.payload -> 'presegment' ->> 'awaiting_split') = 'true'
ORDER BY q.id
"""
async def cmd_list() -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
rows = (await session.execute(sql_text(LIST_SQL))).mappings().all()
finally:
await engine.dispose()
print(f"awaiting_split 보류 문서 {len(rows)}")
for r in rows:
payload = json.loads(r["payload_text"] or "{}")
preseg = payload.get("presegment") or {}
oversized = preseg.get("oversized_sections") or []
print(
f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']} "
f"tier={preseg.get('tier')} over%={preseg.get('over_pct')} "
f"tokens={preseg.get('total_est_tokens'):,} units={preseg.get('units')}"
if isinstance(preseg.get("total_est_tokens"), int)
else f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']}"
)
print(f" 초과 섹션 {len(oversized)}건: {', '.join(str(t) for t in oversized[:3] if t)}")
print(
f" 보류 알람={preseg.get('alerted_at') or '-'} / "
f"재개 예정={payload.get('deferred_until') or '(즉시)'}"
+ (f" / 거부 사유={preseg.get('override_rejected')}" if preseg.get("override_rejected") else "")
)
_jsonl({
"cmd": "list",
"queue_id": r["queue_id"],
"doc_id": r["document_id"],
"title": r["title"],
"status": r["status"],
"tier": preseg.get("tier"),
"over_pct": preseg.get("over_pct"),
"total_est_tokens": preseg.get("total_est_tokens"),
"units": preseg.get("units"),
"oversized_sections": oversized,
"alerted_at": preseg.get("alerted_at"),
"deferred_until": payload.get("deferred_until"),
"override_rejected": preseg.get("override_rejected"),
})
return 0
# ─── export ──────────────────────────────────────────────────────────────────
def _safe_name(title: str | None, fallback: str) -> str:
t = re.sub(r"[^0-9A-Za-z가-힣._-]+", "_", (title or fallback)).strip("_")
return (t or fallback)[:60]
def _build_outline(text: str) -> str:
"""hier_decomp builder 재사용 — window-split 억제(요약 계획과 동일 환경) 개요."""
nodes = build_hier_tree(text, leaf_target_max=sys.maxsize, leaf_hard_max=sys.maxsize)
lines = []
for n in nodes:
indent = " " * max(n.level, 0)
title = n.section_title or "(preamble)"
tok = estimate_tokens(n.text)
mark = " [CAP 초과]" if n.is_leaf and tok > CAP_TOKENS else ""
lines.append(f"{indent}- L{n.level} {title}{tok:,} tok{mark}")
return "\n".join(lines)
async def cmd_export(doc_id: int, out_dir: str) -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
row = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
finally:
await engine.dispose()
if not row:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
source, text = choose_override_source(row["md_content"], row["extracted_text"])
if not text.strip():
print(f"[error] 문서 id={doc_id} 본문 비어있음 (md_content/extracted_text 둘 다)")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "empty_text"})
return 1
plan = plan_summarize_units(text)
leaves = extract_leaves(text)
spans = leaf_spans(text, leaves)
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
files: list[str] = []
now_iso = datetime.now(timezone.utc).isoformat(timespec="seconds")
# ① 통계 + hier 개요
oversized_units = [u for u in plan.units if u.over_cap]
overview = [
f"# presegment export — doc {doc_id}",
"",
f"- 제목: {row['title'] or '(제목 없음)'}",
f"- source: {source} (len={len(text):,}자, 오프셋 기준 텍스트)",
f"- 추정 토큰: {plan.total_est_tokens:,} (trigger={TRIGGER_TOKENS:,} / cap={CAP_TOKENS:,})",
f"- plan: mode={plan.mode} tier={plan.tier} over%={plan.over_pct}",
f"- 유닛: 자동 pack {len(plan.units) - len(oversized_units)}개 + CAP 초과 {len(oversized_units)}",
f"- 생성: {now_iso}",
"",
"## 유닛 제안 (summarize_units greedy-pack)",
"",
]
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
titles = " · ".join(t for t in u.section_titles if t) or "(무제 구간)"
flag = " ★CAP 초과 — 분할 필요" if u.over_cap else ""
overview.append(f"- 유닛 {u.index}: [{s}, {e}) {u.est_tokens:,} tok — {titles[:120]}{flag}")
overview += ["", "## hier 개요", "", _build_outline(text), ""]
(out / "overview.md").write_text("\n".join(overview), encoding="utf-8")
files.append("overview.md")
# ③ 초과 섹션 본문 덤프 (섹션당 파일 · 200K자 단위 분할 · 파일명에 절대 스팬)
boundaries: list[dict] = []
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
title = next((t for t in u.section_titles if t), None)
if not u.over_cap:
# ④ 자동 pack 유닛은 템플릿에 채워둔다
boundaries.append({"start": s, "end": e, "title": title or f"유닛 {u.index}"})
continue
boundaries.append({
"start": s, "end": e, "title": title or f"유닛 {u.index}",
"todo": (
f"CAP 초과({u.est_tokens:,} tok > {CAP_TOKENS:,}) — 이 스팬을 cap 이하 "
"경계 여러 개로 교체하고 todo 키를 제거할 것"
),
})
seg = text[s:e]
base = f"oversized_{u.index:03d}_{_safe_name(title, f'unit{u.index}')}"
for k in range(0, len(seg), DUMP_CHUNK_CHARS):
cs, ce = s + k, s + min(k + DUMP_CHUNK_CHARS, len(seg))
fname = f"{base}.{cs}_{ce}.md"
# 본문 원문 그대로 (헤더 미부착 — 파일 내 로컬 오프셋 + 파일명 cs 로 절대 오프셋 계산)
(out / fname).write_text(text[cs:ce], encoding="utf-8")
files.append(fname)
# ④ boundaries 템플릿 JSON
template = {
"doc_id": doc_id,
"source": source,
"source_len": len(text),
"cap_tokens": CAP_TOKENS,
"generated_at": now_iso,
"boundaries": boundaries,
}
(out / "boundaries_template.json").write_text(
json.dumps(template, ensure_ascii=False, indent=2), encoding="utf-8"
)
files.append("boundaries_template.json")
# 유인 세션용 작업 안내
readme = f"""# doc {doc_id} 유인 분할 안내
1. overview.md 로 구조 파악 (유닛 제안 + hier 개요).
2. oversized_*.md 본문을 읽고 의미 경계를 정한다.
- 파일명 `..._<cs>_<ce>.md` 의 cs = 파일 첫 문자의 절대 오프셋.
- 절대 오프셋 = cs + 파일 내 로컬 오프셋.
3. boundaries_template.json 의 todo 항목을 cap({CAP_TOKENS:,} tok) 이하 경계 여러 개로
교체하고 todo 키를 제거한다. 나머지 자동 pack 항목은 그대로 둬도 된다.
- 토큰 추정: 한글 0.529 tok/자 · 기타 0.217 tok/자 (services/summarize_units.py).
- 규칙: start 단조증가 · 비중첩 · 전체 커버리지 {OVERRIDE_MIN_COVERAGE_PCT:.0f}%+ · 유닛당 cap 이하.
4. 검증/적용:
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE> --dry-run
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE>
"""
(out / "README.md").write_text(readme, encoding="utf-8")
files.append("README.md")
print(f"doc {doc_id} [{row['title'] or '(제목 없음)'}] export 완료 → {out}")
print(
f" source={source} len={len(text):,}자 tokens={plan.total_est_tokens:,} "
f"tier={plan.tier} over%={plan.over_pct}"
)
print(f" 자동 pack 유닛 {len(plan.units) - len(oversized_units)}개 / TODO(초과) {len(oversized_units)}")
print(f" 파일 {len(files)}개: {', '.join(files[:6])}{' ...' if len(files) > 6 else ''}")
_jsonl({
"cmd": "export", "ok": True, "doc_id": doc_id, "out": str(out),
"source": source, "source_len": len(text),
"total_est_tokens": plan.total_est_tokens, "tier": plan.tier,
"over_pct": plan.over_pct,
"units_auto": len(plan.units) - len(oversized_units),
"units_todo": len(oversized_units),
"files": files,
})
return 0
# ─── apply ───────────────────────────────────────────────────────────────────
QUEUE_ROW_SQL = """
SELECT id, status, attempts, payload::text AS payload_text
FROM processing_queue
WHERE document_id = :d AND stage = 'deep_summary'
AND status IN ('pending', 'processing', 'failed')
ORDER BY id DESC
LIMIT 1
"""
APPLY_UPDATE_SQL = """
UPDATE processing_queue
SET payload = CAST(:payload AS JSONB),
status = 'pending',
attempts = 0,
error_message = NULL
WHERE id = :qid
"""
async def cmd_apply(doc_id: int, boundaries_file: str, dry_run: bool) -> int:
raw = json.loads(Path(boundaries_file).read_text(encoding="utf-8"))
if isinstance(raw, dict):
boundaries = raw.get("boundaries") or []
declared_source = raw.get("source")
declared_len = raw.get("source_len")
if raw.get("doc_id") not in (None, doc_id):
print(f"[error] boundaries 파일 doc_id={raw.get('doc_id')} != --doc {doc_id}")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "doc_id_mismatch"})
return 1
else:
boundaries, declared_source, declared_len = raw, None, None
engine, factory = _session_factory()
try:
async with factory() as session:
doc = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
if not doc:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
qrow = (await session.execute(sql_text(QUEUE_ROW_SQL), {"d": doc_id})).mappings().first()
if not qrow:
print(f"[error] doc {doc_id} 의 활성 deep_summary 큐 행 없음 (pending/processing/failed)")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_row_not_found"})
return 1
if qrow["status"] == "processing":
print(f"[error] queue {qrow['id']} 가 processing 중 — 워커 완료/보류 후 재시도")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_processing"})
return 1
# 오프셋 기준 텍스트 — export 와 동일 규칙 (파일에 source 선언 시 그 선언 우선)
if declared_source in ("md_content", "extracted_text"):
source = declared_source
text = (doc["md_content"] if source == "md_content" else doc["extracted_text"]) or ""
else:
source, text = choose_override_source(doc["md_content"], doc["extracted_text"])
if declared_len is not None and declared_len != len(text):
print(
f"[error] source_len 불일치 — 파일={declared_len:,} vs 현재 {source}={len(text):,}"
" (본문 재생성됨 — export 부터 재실행)"
)
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "source_len_mismatch"})
return 1
check = validate_override_boundaries(text, boundaries)
for w in check.warnings:
print(f" [warn] {w}")
if not check.ok:
print(f"[error] 경계 검증 실패 — {len(check.errors)}건:")
for e in check.errors:
print(f" - {e}")
_jsonl({
"cmd": "apply", "ok": False, "doc_id": doc_id,
"error": "validation_failed", "errors": check.errors,
"warnings": check.warnings, "coverage_pct": check.coverage_pct,
})
return 1
print(
f"doc {doc_id} [{doc['title'] or '(제목 없음)'}] 경계 검증 통과 — "
f"유닛 {len(check.boundaries)}개 / 커버리지 {check.coverage_pct}% / "
f"최대 유닛 {max(check.unit_tokens):,} tok (cap {CAP_TOKENS:,})"
)
for i, ((s, e, t), tok) in enumerate(zip(check.boundaries, check.unit_tokens)):
print(f" 유닛 {i}: [{s}, {e}) {tok:,} tok — {t or '(무제)'}")
payload = json.loads(qrow["payload_text"] or "{}")
preseg = dict(payload.get("presegment") or {})
preseg["units_override"] = {
"source": source,
"source_len": len(text),
"boundaries": [[s, e, t] for s, e, t in check.boundaries],
"applied_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
}
preseg["awaiting_split"] = False
# 알람 dedupe 리셋(다음 이벤트는 신선하게 발화) + 이전 거부/맵 잔재 제거
for k in ("alerted_at", "override_rejected", "override_rejected_at", "map_results"):
preseg.pop(k, None)
payload["presegment"] = preseg
payload.pop("deferred_until", None) # 즉시 재개
if dry_run:
print(f" [dry-run] queue {qrow['id']} 미변경 — 위 경계로 적용 가능")
_jsonl({
"cmd": "apply", "ok": True, "dry_run": True, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
await session.execute(
sql_text(APPLY_UPDATE_SQL),
{"payload": json.dumps(payload, ensure_ascii=False), "qid": qrow["id"]},
)
await session.commit()
print(
f" 적용 완료 — queue {qrow['id']} status=pending, deferred_until 제거 "
f"(다음 queue_consumer 사이클에 재개)"
)
_jsonl({
"cmd": "apply", "ok": True, "dry_run": False, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
finally:
await engine.dispose()
# ─── main ────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="presegment 유인 분할 CLI (PR3)")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("list", help="awaiting_split 보류 문서 목록")
p_export = sub.add_parser("export", help="유인 분할 작업 패키지 덤프")
p_export.add_argument("--doc", type=int, required=True)
p_export.add_argument("--out", default=None, help="출력 디렉토리 (기본 ./preseg_export_<doc>)")
p_apply = sub.add_parser("apply", help="완성된 boundaries 검증·적용(재개)")
p_apply.add_argument("--doc", type=int, required=True)
p_apply.add_argument("--boundaries", required=True, help="boundaries JSON 파일 경로")
p_apply.add_argument("--dry-run", action="store_true", help="검증만 하고 DB 미변경")
args = parser.parse_args()
if args.cmd == "list":
return asyncio.run(cmd_list())
if args.cmd == "export":
out = args.out or f"./preseg_export_{args.doc}"
return asyncio.run(cmd_export(args.doc, out))
if args.cmd == "apply":
return asyncio.run(cmd_apply(args.doc, args.boundaries, args.dry_run))
return 2
if __name__ == "__main__":
sys.exit(main())
+486
View File
@@ -0,0 +1,486 @@
"""presegment PR3 — HOLD 알람·units_override 재개·유인 분할 경계 검증 단위테스트.
test_deep_summary_mapreduce.py (PR2) 선례를 따르는 seam 단위 검증:
- services.alerts.send_alert: env 미설정 no-op(프로세스당 1회 로그)·synochat/ntfy
포맷·실패 시 절대 raise 금지 (webhook 은 전부 fake — 실호출 0).
- _hold_awaiting_split: 알람 발화 + alerted_at dedupe(7일).
- validate_override_boundaries: 정상/중첩/캡초과/커버리지 부족/TODO 잔존/범위 밖.
- process(): units_override 존재 시 tier 판정(plan_summarize_units) 우회 →
map-reduce 재개 / 잘못된 override 는 재-HOLD + 알람 / override 없는 문서 무회귀.
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import httpx # noqa: E402
import services.alerts as alerts # noqa: E402
from ai.envelope import EscalationEnvelope # noqa: E402
from models.queue import StageDeferred # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
units_from_boundaries,
validate_override_boundaries,
)
import workers.deep_summary_worker as dsw # noqa: E402
# ─── fixtures (PR2 테스트와 동일 계열) ───────────────────────────────────────
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole (HOLD 대상)
GIANT_WHOLE_MD = "# 통짜\n" + ("" * 60_000)
# trigger(25K tok) 이하 소형 문서 — 기존 단일콜 경로 (무회귀 확인용)
SMALL_MD = "# 소형\n" + ("" * 2_000)
MAP_JSON = (
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
' "inconsistencies": [], "confidence": 0.9}'
)
REDUCE_JSON = (
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
' "inconsistencies": [], "confidence": 0.8}'
)
class FakeSession:
def __init__(self, row=None):
self.commits = 0
self._row = row
async def commit(self):
self.commits += 1
class FakeProcSession(FakeSession):
"""process() 레벨 — session.get(Document) + execute(select queue_row) fake."""
def __init__(self, doc, row):
super().__init__(row)
self._doc = doc
async def get(self, model, pk):
return self._doc
async def execute(self, stmt):
row = self._row
return SimpleNamespace(scalar_one_or_none=lambda: row)
class FakeClient:
"""deep 슬롯 보유 — call_deep_or_defer 가 call_deep 을 탄다 (PR2 테스트 동일)."""
def __init__(self):
self.ai = SimpleNamespace(
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
)
self.prompts: list[str] = []
async def call_deep(self, prompt: str, system=None) -> str:
self.prompts.append(prompt)
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
return REDUCE_JSON
return MAP_JSON
async def close(self):
pass
def _doc(text: str = GIANT_WHOLE_MD, md_content: str | None = None):
return SimpleNamespace(
id=999,
title="테스트 문서",
extracted_text=text,
md_content=md_content,
ai_detail_summary=None,
ai_inconsistencies=None,
ai_analysis_tier="triage",
ai_processed_at=None,
)
def _envelope_raw():
return {
"from_stage": "classify",
"escalation_reasons": ["long_context"],
"risk_flags": [],
"distilled_context": "4B 요지",
"original_pointers": {"doc_ids": [999]},
}
def _envelope():
return EscalationEnvelope.from_json(json.dumps(_envelope_raw()))
@pytest.fixture
def _patch_telemetry(monkeypatch):
events: list[dict] = []
async def fake_record(**kwargs):
events.append(kwargs)
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
return events
@pytest.fixture
def _alert_recorder(monkeypatch):
calls: list[tuple[str, str]] = []
async def fake_alert(title: str, message: str) -> bool:
calls.append((title, message))
return True
monkeypatch.setattr(dsw, "send_alert", fake_alert)
return calls
# ─── A. services.alerts ──────────────────────────────────────────────────────
class _FakeHttpxClient:
"""httpx.AsyncClient 대체 — post 호출 캡처. 실호출 0."""
captured: list[dict] = []
fail_with: Exception | None = None
status_code = 200
def __init__(self, timeout=None):
self.timeout = timeout
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
return False
async def post(self, url, **kwargs):
if _FakeHttpxClient.fail_with is not None:
raise _FakeHttpxClient.fail_with
_FakeHttpxClient.captured.append({"url": url, **kwargs})
return SimpleNamespace(status_code=_FakeHttpxClient.status_code, text="ok")
@pytest.fixture
def _fake_httpx(monkeypatch):
_FakeHttpxClient.captured = []
_FakeHttpxClient.fail_with = None
_FakeHttpxClient.status_code = 200
monkeypatch.setattr(alerts.httpx, "AsyncClient", _FakeHttpxClient)
return _FakeHttpxClient
@pytest.mark.asyncio
async def test_send_alert_noop_without_env(monkeypatch, _fake_httpx):
monkeypatch.delenv("ALERT_WEBHOOK_URL", raising=False)
monkeypatch.setattr(alerts, "_noop_logged", False)
infos: list[str] = []
monkeypatch.setattr(alerts.logger, "info", lambda msg, *a, **k: infos.append(msg))
assert await alerts.send_alert("t", "m") is False
assert await alerts.send_alert("t", "m") is False
assert len(infos) == 1 # 프로세스당 1회만 로그
assert _fake_httpx.captured == [] # webhook 미호출
@pytest.mark.asyncio
async def test_send_alert_synochat_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
monkeypatch.delenv("ALERT_WEBHOOK_KIND", raising=False) # 기본 = synochat
assert await alerts.send_alert("제목", "본문 줄1\n줄2") is True
assert len(_fake_httpx.captured) == 1
call = _fake_httpx.captured[0]
assert call["url"] == "http://chat.local/webhook"
payload = json.loads(call["data"]["payload"])
assert payload == {"text": "제목\n본문 줄1\n줄2"}
@pytest.mark.asyncio
async def test_send_alert_ntfy_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://ntfy.local/ds")
monkeypatch.setenv("ALERT_WEBHOOK_KIND", "ntfy")
assert await alerts.send_alert("한글 제목", "알람 본문") is True
call = _fake_httpx.captured[0]
# 한글 제목은 헤더(latin-1 한정) 대신 query param
assert call["params"] == {"title": "한글 제목"}
assert call["content"] == "알람 본문".encode("utf-8")
@pytest.mark.asyncio
async def test_send_alert_failure_never_raises(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
_fake_httpx.fail_with = httpx.ConnectError("down")
assert await alerts.send_alert("t", "m") is False # raise 없이 False
_fake_httpx.fail_with = None
_fake_httpx.status_code = 500
assert await alerts.send_alert("t", "m") is False # HTTP 5xx 도 False
# ─── B. HOLD 알람 + alerted_at dedupe ───────────────────────────────────────
@pytest.mark.asyncio
async def test_hold_alerts_once_then_dedupes(_alert_recorder):
plan = plan_summarize_units(GIANT_WHOLE_MD)
assert plan.tier == "whole"
row = SimpleNamespace(payload={"envelope": {"x": 1}})
session = FakeSession()
# 1차 HOLD — 알람 발화 + alerted_at 기록
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
title, message = _alert_recorder[0]
assert "999" in title
assert "테스트 문서" in message and "whole" in message
assert f"export --doc 999" in message # 유인 분할 힌트
alerted_at = row.payload["presegment"]["alerted_at"]
assert datetime.fromisoformat(alerted_at)
# 2차 재보류(24h 후 재계획 시나리오) — 7일 이내 → 재알람 억제
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
assert row.payload["presegment"]["alerted_at"] == alerted_at # 미갱신
# 3차 — alerted_at 이 8일 전이면 재발화 + 갱신
stale = (datetime.now(timezone.utc) - timedelta(days=8)).isoformat()
payload = dict(row.payload)
payload["presegment"] = {**payload["presegment"], "alerted_at": stale}
row.payload = payload
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 2
assert row.payload["presegment"]["alerted_at"] != stale
# ─── C. validate_override_boundaries ────────────────────────────────────────
def test_validate_ok_full_coverage():
text = "" * 40_000 # ≈ 21,160 tok
bounds = [[0, 20_000, "전반"], [20_000, 40_000, "후반"]]
check = validate_override_boundaries(text, bounds)
assert check.ok and check.errors == []
assert check.coverage_pct == 100.0
assert check.boundaries == [(0, 20_000, "전반"), (20_000, 40_000, "후반")]
assert all(t <= CAP_TOKENS for t in check.unit_tokens)
def test_validate_dict_form_and_units_from_boundaries():
text = "" * 10_000
bounds = [{"start": 0, "end": 5_000, "title": "a"}, {"start": 5_000, "end": 10_000, "title": "b"}]
check = validate_override_boundaries(text, bounds)
assert check.ok
units = units_from_boundaries(text, check.boundaries)
assert [u.index for u in units] == [0, 1]
assert units[0].text == text[0:5_000]
assert units[0].section_titles == ["a"]
assert units[0].est_tokens == estimate_tokens(text[0:5_000])
def test_validate_rejects_overlap():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 6_000, "a"], [5_000, 10_000, "b"]])
assert not check.ok
assert any("중첩" in e for e in check.errors)
def test_validate_rejects_cap_exceed():
text = "" * 40_000 # 단일 유닛 ≈ 21,160 tok > CAP 12,000
check = validate_override_boundaries(text, [[0, 40_000, "통짜"]])
assert not check.ok
assert any("cap" in e and "유닛 0" in e for e in check.errors) # 어느 유닛인지 명시
def test_validate_rejects_low_coverage():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 1_000, "머리만"]])
assert not check.ok
assert any("커버리지" in e for e in check.errors)
def test_validate_rejects_out_of_range_and_todo():
text = "" * 1_000
check = validate_override_boundaries(text, [[0, 2_000, ""]])
assert not check.ok and any("범위 밖" in e for e in check.errors)
check2 = validate_override_boundaries(
text, [{"start": 0, "end": 1_000, "title": "t", "todo": "분할 필요"}]
)
assert not check2.ok and any("TODO" in e for e in check2.errors)
def test_validate_warns_on_gap_but_passes_coverage():
text = "" * 100_000
# 4% 공백 구간 — 커버리지 96% (>=90) 통과 + 경고
bounds = [[0, 20_000, "a"], [24_000, 100_000, None]]
# 24000~100000 = 76000자 ≈ 40,204 tok > CAP → cap 완화해 gap 경고만 검증
check = validate_override_boundaries(text, bounds, cap=50_000)
assert check.ok
assert any("공백 구간" in w for w in check.warnings)
def test_choose_override_source_prefers_md_content():
assert choose_override_source("# md 본문", "추출본") == ("md_content", "# md 본문")
assert choose_override_source(" \n", "추출본") == ("extracted_text", "추출본")
assert choose_override_source(None, None) == ("extracted_text", "")
def test_leaf_spans_reconstruct_source():
md = "# 절 1\n" + "" * 100 + "\n# 절 2\n" + "" * 100
leaves = extract_leaves(md)
spans = leaf_spans(md, leaves)
assert len(spans) == len(leaves)
for (s, e), leaf in zip(spans, leaves):
assert md[s:e] == leaf.text
# 인접 파티션 — 이어붙이면 원문 전체
assert spans[0][0] == 0 and spans[-1][1] == len(md)
# ─── D. units_override 재개 경로 (worker process 레벨) ──────────────────────
def _override_payload(text: str, n_units: int = 3, source: str = "extracted_text") -> dict:
step = len(text) // n_units
bounds = []
for i in range(n_units):
s = i * step
e = len(text) if i == n_units - 1 else (i + 1) * step
bounds.append([s, e, f"파트 {i + 1}"])
return {
"envelope": _envelope_raw(),
"subject_domain": "generic",
"presegment": {
"tier": "whole", "over_pct": 61.46, "awaiting_split": False,
"units_override": {
"source": source, "source_len": len(text), "boundaries": bounds,
},
},
}
@pytest.mark.asyncio
async def test_units_override_bypasses_tier_gate(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD) # override 없으면 whole → HOLD 였을 문서
row = SimpleNamespace(payload=_override_payload(GIANT_WHOLE_MD, n_units=3))
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
def _boom(*a, **k):
raise AssertionError("units_override 는 plan_summarize_units(tier 재판정)를 타면 안 됨")
monkeypatch.setattr(dsw, "plan_summarize_units", _boom)
await dsw.process(999, session)
# map 3 + reduce 1, 모든 콜 캡 준수 (오버헤드 = 정책 템플릿+envelope ~3K)
assert len(client.prompts) == 4
for p in client.prompts:
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
assert doc.ai_detail_summary == "최종 상세."
assert doc.ai_analysis_tier == "deep"
preseg = row.payload["presegment"]
assert preseg["tier"] == "override"
assert preseg["over_pct"] == 61.46 # HOLD 당시 실측치 보존
assert len(preseg["map_results"]) == 3
assert preseg["units_override"]["boundaries"][0][2] == "파트 1" # override 보존(감사)
assert _alert_recorder == [] # 정상 재개 — 알람 없음
assert len(_patch_telemetry) == 1
@pytest.mark.asyncio
async def test_bad_override_reholds_and_alerts(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=1) # 단일 유닛 ≈ 31.7K tok > CAP*1.1
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
def _no_llm():
raise AssertionError("잘못된 override 는 LLM 콜(900s 재생산)로 흐르면 안 됨")
monkeypatch.setattr(dsw, "AIClient", _no_llm)
with pytest.raises(StageDeferred) as ei:
await dsw.process(999, session)
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True # 재-HOLD
assert "cap" in preseg["override_rejected"]
assert "units_override" in preseg # 원인 조사용 보존
assert len(_alert_recorder) == 1 # 거부 알람
assert "거부" in _alert_recorder[0][0]
assert doc.ai_detail_summary is None
assert _patch_telemetry == []
@pytest.mark.asyncio
async def test_override_source_len_mismatch_reholds(monkeypatch, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=3)
payload["presegment"]["units_override"]["source_len"] = 12_345 # 본문 재생성 시나리오
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
assert row.payload["presegment"]["awaiting_split"] is True
assert "source_len 불일치" in row.payload["presegment"]["override_rejected"]
assert len(_alert_recorder) == 1
@pytest.mark.asyncio
async def test_no_override_small_doc_keeps_single_call_path(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""무회귀 — units_override 없는 소형 문서는 기존 단일콜 경로 그대로."""
doc = _doc(SMALL_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
await dsw.process(999, session)
assert len(client.prompts) == 1 # 단일콜 (map-reduce 아님)
assert SMALL_MD[:200].split("\n")[1][:50] in client.prompts[0] # 원문 슬라이스 포함
assert doc.ai_detail_summary == "유닛 상세."
assert "presegment" not in row.payload # payload 무변경
assert _alert_recorder == []
@pytest.mark.asyncio
async def test_no_override_whole_doc_still_holds_with_alert(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""override 미주입 whole 문서 — 기존 HOLD 시멘틱 유지 + PR3 알람만 추가."""
doc = _doc(GIANT_WHOLE_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True and preseg["tier"] == "whole"
assert len(_alert_recorder) == 1
assert "HOLD" in _alert_recorder[0][0]