Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a55bb3453d | |||
| 9061f2e25c | |||
| 33427d4a42 |
@@ -0,0 +1,69 @@
|
||||
"""운영 알람 webhook (presegment PR3 — HOLD 유인 전환 게이트).
|
||||
|
||||
deep_summary HOLD(awaiting_split) 처럼 "사람이 개입해야 풀리는" 상태를 웹훅으로 발화한다.
|
||||
환경변수:
|
||||
ALERT_WEBHOOK_URL — 미설정 = no-op (프로세스당 1회 INFO 로그만).
|
||||
ALERT_WEBHOOK_KIND — 'synochat' | 'ntfy' (기본 synochat).
|
||||
synochat: Synology Chat incoming webhook — POST form `payload={"text": "..."}`.
|
||||
ntfy: POST body=message. 제목은 query param(?title=) — HTTP 헤더는 latin-1
|
||||
한정이라 한글 제목이 깨진다 (ntfy 는 query param title 을 공식 지원).
|
||||
|
||||
불변식: 알람은 절대 raise 하지 않는다 — 실패는 WARNING 로그만. 알람이 워커를
|
||||
죽이면 본전(요약 파이프라인)이 무너진다. 호출부는 반환값(bool)에 의존하지 말 것.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
|
||||
import httpx
|
||||
|
||||
from core.utils import setup_logger
|
||||
|
||||
logger = setup_logger("alerts")
|
||||
|
||||
ALERT_TIMEOUT_SECONDS = 5.0
|
||||
|
||||
# 프로세스당 1회만 "미설정 no-op" 로그 — 매 HOLD 마다 로그 오염 방지.
|
||||
_noop_logged = False
|
||||
|
||||
|
||||
async def send_alert(title: str, message: str) -> bool:
|
||||
"""webhook 으로 알람 1건 발화. 성공 True / no-op·실패 False. 절대 raise 금지."""
|
||||
global _noop_logged
|
||||
url = (os.getenv("ALERT_WEBHOOK_URL") or "").strip()
|
||||
if not url:
|
||||
if not _noop_logged:
|
||||
logger.info("ALERT_WEBHOOK_URL 미설정 — 알람 no-op (이 로그는 프로세스당 1회)")
|
||||
_noop_logged = True
|
||||
return False
|
||||
|
||||
kind = (os.getenv("ALERT_WEBHOOK_KIND") or "synochat").strip().lower()
|
||||
if kind not in ("synochat", "ntfy"):
|
||||
logger.warning(f"ALERT_WEBHOOK_KIND={kind!r} 미지원 — synochat 으로 폴백")
|
||||
kind = "synochat"
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=ALERT_TIMEOUT_SECONDS) as client:
|
||||
if kind == "ntfy":
|
||||
resp = await client.post(
|
||||
url,
|
||||
params={"title": title},
|
||||
content=message.encode("utf-8"),
|
||||
)
|
||||
else: # synochat
|
||||
text = f"{title}\n{message}" if title else message
|
||||
resp = await client.post(
|
||||
url,
|
||||
data={"payload": json.dumps({"text": text}, ensure_ascii=False)},
|
||||
)
|
||||
if resp.status_code >= 400:
|
||||
logger.warning(
|
||||
f"알람 webhook({kind}) HTTP {resp.status_code}: {resp.text[:200]}"
|
||||
)
|
||||
return False
|
||||
return True
|
||||
except Exception as exc: # noqa: BLE001 — 알람 실패가 워커를 죽이면 안 됨
|
||||
logger.warning(f"알람 webhook({kind}) 발화 실패: {exc}")
|
||||
return False
|
||||
@@ -66,6 +66,9 @@ class SummarizeUnit:
|
||||
text: str = ""
|
||||
est_tokens: int = 0
|
||||
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
|
||||
# PR3: 이 유닛을 구성한 leaf 의 서수(extract_leaves 순서) — export CLI 가
|
||||
# leaf_spans 와 결합해 유닛 (start,end) 스팬을 계산한다. 페이로드 미기록.
|
||||
leaf_indexes: list[int] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -92,20 +95,22 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
|
||||
units: list[SummarizeUnit] = []
|
||||
cur_titles: list[str | None] = []
|
||||
cur_texts: list[str] = []
|
||||
cur_indexes: list[int] = []
|
||||
cur_tokens = 0
|
||||
|
||||
def _flush() -> None:
|
||||
nonlocal cur_titles, cur_texts, cur_tokens
|
||||
nonlocal cur_titles, cur_texts, cur_indexes, cur_tokens
|
||||
if cur_texts:
|
||||
units.append(SummarizeUnit(
|
||||
index=len(units),
|
||||
section_titles=cur_titles,
|
||||
text="\n\n".join(cur_texts),
|
||||
est_tokens=cur_tokens,
|
||||
leaf_indexes=cur_indexes,
|
||||
))
|
||||
cur_titles, cur_texts, cur_tokens = [], [], 0
|
||||
cur_titles, cur_texts, cur_indexes, cur_tokens = [], [], [], 0
|
||||
|
||||
for leaf in leaves:
|
||||
for li, leaf in enumerate(leaves):
|
||||
t = estimate_tokens(leaf.text)
|
||||
if t > cap:
|
||||
_flush()
|
||||
@@ -115,12 +120,14 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
|
||||
text=leaf.text,
|
||||
est_tokens=t,
|
||||
over_cap=True,
|
||||
leaf_indexes=[li],
|
||||
))
|
||||
continue
|
||||
if cur_tokens + t > cap:
|
||||
_flush()
|
||||
cur_titles.append(leaf.section_title)
|
||||
cur_texts.append(leaf.text)
|
||||
cur_indexes.append(li)
|
||||
cur_tokens += t
|
||||
_flush()
|
||||
return units
|
||||
@@ -222,3 +229,186 @@ def build_reduce_units_block(
|
||||
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
|
||||
truncated = True
|
||||
return block, truncated
|
||||
|
||||
|
||||
# ─── PR3 — 유인 분할(units_override) 경계 순수함수 (worker + attended CLI 공용) ───
|
||||
#
|
||||
# HOLD(hybrid/whole) 문서를 사람이(유인 클로드 세션) 분할한 경계
|
||||
# [(start, end, title)] 로 재개하는 경로. 오프셋 = 소스 텍스트의 Python 문자
|
||||
# (code point) 인덱스 — export 가 덤프한 파일과 apply/워커의 슬라이스가 같은
|
||||
# 기준을 공유해야 한다 (builder 의 char_start 는 UTF-16 단위라 여기서 미사용).
|
||||
|
||||
OVERRIDE_MIN_COVERAGE_PCT = 90.0 # apply 게이트 — 전체 본문의 90%+ 커버 필수
|
||||
OVERRIDE_GAP_WARN_CHARS = 1_000 # 이보다 큰 공백 구간은 경고로 노출
|
||||
|
||||
|
||||
@dataclass
|
||||
class OverrideCheck:
|
||||
"""validate_override_boundaries 결과 — ok=False 면 errors 에 사유."""
|
||||
ok: bool
|
||||
errors: list[str] = field(default_factory=list)
|
||||
warnings: list[str] = field(default_factory=list)
|
||||
coverage_pct: float = 0.0
|
||||
# 정규화된 (start, end, title) — units_from_boundaries 입력으로 그대로 사용
|
||||
boundaries: list[tuple[int, int, str | None]] = field(default_factory=list)
|
||||
unit_tokens: list[int] = field(default_factory=list)
|
||||
|
||||
|
||||
def choose_override_source(
|
||||
md_content: str | None, extracted_text: str | None
|
||||
) -> tuple[str, str]:
|
||||
"""units_override 오프셋 기준 텍스트 선택 — (source_name, text).
|
||||
|
||||
canonical markdown(md_content) 우선, 부재/공백 시 extracted_text 폴백.
|
||||
export CLI · apply CLI · 워커 재개가 반드시 같은 규칙을 공유해야
|
||||
(start,end) 오프셋이 일치한다. 선택 결과는 units_override.source 에 박제.
|
||||
"""
|
||||
if md_content and md_content.strip():
|
||||
return "md_content", md_content
|
||||
return "extracted_text", extracted_text or ""
|
||||
|
||||
|
||||
def _normalize_boundary(entry, idx: int, errors: list[str]) -> tuple[int, int, str | None] | None:
|
||||
"""boundaries 1건 정규화 — [start,end,title?] 배열 또는 {start,end,title} 객체.
|
||||
|
||||
export 템플릿의 초과 스팬은 "todo" 키를 달고 나온다 — 미해결(todo 잔존) 상태로
|
||||
apply 하면 여기서 에러 (사람이 CAP 이하 경계로 분할을 완성해야 통과).
|
||||
"""
|
||||
if isinstance(entry, dict):
|
||||
if entry.get("todo"):
|
||||
errors.append(
|
||||
f"유닛 {idx}: TODO 미해결 — 초과 스팬을 CAP 이하 경계들로 분할한 뒤 todo 키를 제거해야 함"
|
||||
)
|
||||
return None
|
||||
start, end, title = entry.get("start"), entry.get("end"), entry.get("title")
|
||||
elif isinstance(entry, (list, tuple)) and 2 <= len(entry) <= 3:
|
||||
start, end = entry[0], entry[1]
|
||||
title = entry[2] if len(entry) == 3 else None
|
||||
else:
|
||||
errors.append(f"유닛 {idx}: 형식 오류 — [start, end, title] 또는 {{start, end, title}} 여야 함")
|
||||
return None
|
||||
if (
|
||||
isinstance(start, bool) or isinstance(end, bool)
|
||||
or not isinstance(start, int) or not isinstance(end, int)
|
||||
):
|
||||
errors.append(f"유닛 {idx}: start/end 는 정수여야 함 (start={start!r}, end={end!r})")
|
||||
return None
|
||||
if title is not None and not isinstance(title, str):
|
||||
title = str(title)
|
||||
return start, end, title
|
||||
|
||||
|
||||
def validate_override_boundaries(
|
||||
text: str,
|
||||
raw_boundaries,
|
||||
*,
|
||||
cap: int = CAP_TOKENS,
|
||||
min_coverage_pct: float = OVERRIDE_MIN_COVERAGE_PCT,
|
||||
gap_warn_chars: int = OVERRIDE_GAP_WARN_CHARS,
|
||||
) -> OverrideCheck:
|
||||
"""units_override 경계 검증 — 단조증가·비중첩·본문 범위 내·커버리지·유닛별 캡.
|
||||
|
||||
apply CLI = 기본 게이트(cap=CAP_TOKENS, coverage>=90%).
|
||||
워커 방어 = cap 슬랙(CAP*1.1)·coverage 0 으로 완화 호출 — 잘못된 override 가
|
||||
900s 콜을 재생산하는 것만 차단하고, 품질 게이트는 apply 시점에 이미 통과했다고 본다.
|
||||
"""
|
||||
errors: list[str] = []
|
||||
warnings: list[str] = []
|
||||
|
||||
if not isinstance(raw_boundaries, (list, tuple)) or not raw_boundaries:
|
||||
return OverrideCheck(ok=False, errors=["boundaries 가 비어있거나 리스트가 아님"])
|
||||
|
||||
normalized: list[tuple[int, int, str | None]] = []
|
||||
for i, entry in enumerate(raw_boundaries):
|
||||
norm = _normalize_boundary(entry, i, errors)
|
||||
if norm is not None:
|
||||
normalized.append(norm)
|
||||
if errors:
|
||||
return OverrideCheck(ok=False, errors=errors, warnings=warnings)
|
||||
|
||||
n = len(text)
|
||||
prev_end = None
|
||||
unit_tokens: list[int] = []
|
||||
covered = 0
|
||||
for i, (start, end, title) in enumerate(normalized):
|
||||
label = f"유닛 {i}" + (f" ({title})" if title else "")
|
||||
if start < 0 or end > n:
|
||||
errors.append(f"{label}: 본문 범위 밖 — [{start}, {end}) vs len={n}")
|
||||
continue
|
||||
if start >= end:
|
||||
errors.append(f"{label}: start >= end ([{start}, {end}))")
|
||||
continue
|
||||
if prev_end is not None and start < prev_end:
|
||||
errors.append(f"{label}: 직전 유닛과 중첩/역순 — start={start} < 직전 end={prev_end}")
|
||||
if prev_end is not None and start - prev_end > gap_warn_chars:
|
||||
warnings.append(f"{label} 앞 공백 구간 {start - prev_end:,}자 ([{prev_end}, {start})) — 의도 확인")
|
||||
est = estimate_tokens(text[start:end])
|
||||
unit_tokens.append(est)
|
||||
if est > cap:
|
||||
errors.append(f"{label}: 추정 {est:,} tok > cap {cap:,} — 이 스팬을 더 분할해야 함")
|
||||
covered += end - start
|
||||
prev_end = max(prev_end or 0, end)
|
||||
|
||||
if not errors:
|
||||
head_gap = normalized[0][0]
|
||||
tail_gap = n - normalized[-1][1]
|
||||
if head_gap > gap_warn_chars:
|
||||
warnings.append(f"문서 선두 공백 구간 {head_gap:,}자 ([0, {normalized[0][0]})) — 의도 확인")
|
||||
if tail_gap > gap_warn_chars:
|
||||
warnings.append(f"문서 말미 공백 구간 {tail_gap:,}자 ([{normalized[-1][1]}, {n})) — 의도 확인")
|
||||
|
||||
coverage_pct = round(covered * 100.0 / n, 2) if n else 0.0
|
||||
if not errors and coverage_pct < min_coverage_pct:
|
||||
errors.append(
|
||||
f"커버리지 {coverage_pct}% < {min_coverage_pct}% — 경계가 본문 대부분을 덮어야 함"
|
||||
)
|
||||
|
||||
return OverrideCheck(
|
||||
ok=not errors,
|
||||
errors=errors,
|
||||
warnings=warnings,
|
||||
coverage_pct=coverage_pct,
|
||||
boundaries=normalized if not errors else [],
|
||||
unit_tokens=unit_tokens,
|
||||
)
|
||||
|
||||
|
||||
def units_from_boundaries(
|
||||
text: str, boundaries: list[tuple[int, int, str | None]]
|
||||
) -> list[SummarizeUnit]:
|
||||
"""정규화·검증 통과한 (start,end,title) 리스트 → SummarizeUnit 리스트.
|
||||
|
||||
유닛 index = 경계 서수 — boundaries 는 payload 에 박제되므로 attempt 간 안정
|
||||
(map_results 멱등 재개 키와 정합).
|
||||
"""
|
||||
units: list[SummarizeUnit] = []
|
||||
for i, (start, end, title) in enumerate(boundaries):
|
||||
seg = text[start:end]
|
||||
units.append(SummarizeUnit(
|
||||
index=i,
|
||||
section_titles=[title],
|
||||
text=seg,
|
||||
est_tokens=estimate_tokens(seg),
|
||||
))
|
||||
return units
|
||||
|
||||
|
||||
def leaf_spans(text: str, leaves: list[HierNode]) -> list[tuple[int, int]]:
|
||||
"""extract_leaves 결과 leaf 들의 원문 (start,end) 문자 스팬.
|
||||
|
||||
_segment 가 원문을 연속 파티션(빈 preamble 만 폐기)으로 자르므로, 커서 순차
|
||||
탐색이 항상 정확한 위치를 찾는다 (동일 본문 반복이 있어도 순서가 앞선 leaf 가
|
||||
앞 오프셋을 가져간다).
|
||||
"""
|
||||
spans: list[tuple[int, int]] = []
|
||||
cursor = 0
|
||||
for leaf in leaves:
|
||||
pos = text.find(leaf.text, cursor)
|
||||
if pos < 0:
|
||||
# 이론상 불가(연속 파티션) — 방어적으로 전체 재탐색
|
||||
pos = text.find(leaf.text)
|
||||
if pos < 0:
|
||||
raise ValueError(f"leaf 본문을 원문에서 찾지 못함 (title={leaf.section_title!r})")
|
||||
spans.append((pos, pos + len(leaf.text)))
|
||||
cursor = pos + len(leaf.text)
|
||||
return spans
|
||||
|
||||
@@ -14,7 +14,7 @@ import asyncio
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from pydantic import BaseModel, Field, ValidationError
|
||||
from sqlalchemy import desc, select
|
||||
@@ -29,15 +29,19 @@ from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.queue import ProcessingQueue, StageDeferred
|
||||
from policy.prompt_render import render_26b, policy_version as compute_policy_version
|
||||
from services.alerts import send_alert
|
||||
from services.document_telemetry import record_analyze_event
|
||||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||||
from services.summarize_units import (
|
||||
CAP_TOKENS,
|
||||
UnitPlan,
|
||||
build_reduce_units_block,
|
||||
choose_override_source,
|
||||
estimate_tokens,
|
||||
plan_summarize_units,
|
||||
render_map_slice,
|
||||
units_from_boundaries,
|
||||
validate_override_boundaries,
|
||||
)
|
||||
|
||||
logger = setup_logger("deep_summary_worker")
|
||||
@@ -45,9 +49,16 @@ logger = setup_logger("deep_summary_worker")
|
||||
DEEP_SUMMARY_TASK = "p3c_deep_summary"
|
||||
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
|
||||
REDUCE_TASK = "p3c_deep_summary_reduce"
|
||||
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는
|
||||
# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음.
|
||||
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
|
||||
# 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
|
||||
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
|
||||
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
|
||||
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
|
||||
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
|
||||
ALERT_DEDUPE_DAYS = 7
|
||||
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
|
||||
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
|
||||
OVERRIDE_CAP_SLACK = 1.1
|
||||
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
|
||||
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
|
||||
|
||||
@@ -111,6 +122,17 @@ async def process(
|
||||
|
||||
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
|
||||
|
||||
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
|
||||
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
|
||||
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
|
||||
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
|
||||
if (payload.get("presegment") or {}).get("units_override"):
|
||||
await _process_units_override(
|
||||
doc, queue_row, envelope, subject_domain, session,
|
||||
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
||||
)
|
||||
return
|
||||
|
||||
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
|
||||
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
|
||||
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
|
||||
@@ -123,7 +145,9 @@ async def process(
|
||||
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
|
||||
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
|
||||
if unit_plan.tier != "auto" or not unit_plan.units:
|
||||
await _hold_awaiting_split(session, queue_row, unit_plan, document_id)
|
||||
await _hold_awaiting_split(
|
||||
session, queue_row, unit_plan, document_id, doc_title=doc.title
|
||||
)
|
||||
await _process_map_reduce(
|
||||
doc, queue_row, envelope, subject_domain, unit_plan, session,
|
||||
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
||||
@@ -250,43 +274,196 @@ async def process(
|
||||
)
|
||||
|
||||
|
||||
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
|
||||
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
|
||||
ts = preseg.get("alerted_at")
|
||||
if not ts:
|
||||
return True
|
||||
try:
|
||||
prev = datetime.fromisoformat(str(ts))
|
||||
except ValueError:
|
||||
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
|
||||
if prev.tzinfo is None:
|
||||
prev = prev.replace(tzinfo=timezone.utc)
|
||||
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
|
||||
|
||||
|
||||
async def _hold_awaiting_split(
|
||||
session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int
|
||||
session: AsyncSession,
|
||||
queue_row: ProcessingQueue,
|
||||
plan: UnitPlan,
|
||||
document_id: int,
|
||||
doc_title: str | None = None,
|
||||
) -> None:
|
||||
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
|
||||
|
||||
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
|
||||
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
|
||||
알람(ntfy)·클로드 경계 주입은 PR3 — 그 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복.
|
||||
PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe — 매 24h 재보류마다 재알람 방지).
|
||||
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
|
||||
"""
|
||||
payload = dict(queue_row.payload or {})
|
||||
preseg = dict(payload.get("presegment") or {})
|
||||
now = datetime.now(timezone.utc)
|
||||
alert_due = _hold_alert_due(preseg, now)
|
||||
oversized = [
|
||||
(u.section_titles[0] if u.section_titles else None)
|
||||
for u in plan.units if u.over_cap
|
||||
][:20]
|
||||
preseg.update({
|
||||
"awaiting_split": True,
|
||||
"tier": plan.tier,
|
||||
"over_pct": plan.over_pct,
|
||||
"total_est_tokens": plan.total_est_tokens,
|
||||
"units": len(plan.units),
|
||||
# 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용)
|
||||
"oversized_sections": [
|
||||
(u.section_titles[0] if u.section_titles else None)
|
||||
for u in plan.units if u.over_cap
|
||||
][:20],
|
||||
# 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
|
||||
"oversized_sections": oversized,
|
||||
})
|
||||
if alert_due:
|
||||
preseg["alerted_at"] = now.isoformat()
|
||||
payload["presegment"] = preseg
|
||||
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
||||
await session.commit()
|
||||
logger.info(
|
||||
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
|
||||
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
|
||||
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)"
|
||||
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
|
||||
)
|
||||
if alert_due:
|
||||
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
|
||||
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
|
||||
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
|
||||
await send_alert(
|
||||
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
|
||||
(
|
||||
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
|
||||
f"tier={plan.tier} / over%={plan.over_pct} / "
|
||||
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
|
||||
f"초과 섹션(상위 3): {top3}\n"
|
||||
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
|
||||
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
|
||||
),
|
||||
)
|
||||
raise StageDeferred(
|
||||
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
|
||||
)
|
||||
|
||||
|
||||
async def _rehold_bad_override(
|
||||
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
|
||||
) -> None:
|
||||
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
|
||||
|
||||
units_override 는 payload 에 보존(사람이 원인 조사) + override_rejected 사유 기록.
|
||||
apply CLI 가 alerted_at 을 지워두므로 첫 거부는 즉시 발화되고, 이후 24h 재보류
|
||||
루프는 ALERT_DEDUPE_DAYS dedupe 로 억제된다.
|
||||
"""
|
||||
document_id = doc.id
|
||||
payload = dict(queue_row.payload or {})
|
||||
preseg = dict(payload.get("presegment") or {})
|
||||
now = datetime.now(timezone.utc)
|
||||
alert_due = _hold_alert_due(preseg, now)
|
||||
preseg.update({
|
||||
"awaiting_split": True,
|
||||
"override_rejected": reason,
|
||||
"override_rejected_at": now.isoformat(),
|
||||
})
|
||||
if alert_due:
|
||||
preseg["alerted_at"] = now.isoformat()
|
||||
payload["presegment"] = preseg
|
||||
queue_row.payload = payload # 재할당 = JSONB 변경 감지
|
||||
await session.commit()
|
||||
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
|
||||
if alert_due:
|
||||
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
|
||||
await send_alert(
|
||||
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
|
||||
(
|
||||
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
|
||||
f"사유: {reason}\n"
|
||||
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
|
||||
f"수정: scripts/presegment_attended.py export --doc {document_id} "
|
||||
f"→ apply --doc {document_id} --boundaries FILE"
|
||||
),
|
||||
)
|
||||
raise StageDeferred(
|
||||
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
|
||||
)
|
||||
|
||||
|
||||
async def _process_units_override(
|
||||
doc: Document,
|
||||
queue_row: ProcessingQueue,
|
||||
envelope: EscalationEnvelope,
|
||||
subject_domain: str,
|
||||
session: AsyncSession,
|
||||
*,
|
||||
defer_on_deep_unavailable: bool,
|
||||
) -> None:
|
||||
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
|
||||
|
||||
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
|
||||
(source_len 일치·단조·비중첩·범위·유닛 캡*1.1) 실패 시 재-HOLD + 알람 —
|
||||
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
|
||||
통과 시 기존 PR2 _process_map_reduce 를 그대로 탄다(맵 결과 유닛 단위 commit·
|
||||
reduce·ai_detail_summary 기록 — 유닛 index 는 payload 박제 경계 서수라 안정).
|
||||
"""
|
||||
document_id = doc.id
|
||||
preseg = dict((queue_row.payload or {}).get("presegment") or {})
|
||||
override = preseg.get("units_override")
|
||||
if isinstance(override, (list, tuple)):
|
||||
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
|
||||
override = {"boundaries": list(override)}
|
||||
if not isinstance(override, dict):
|
||||
await _rehold_bad_override(
|
||||
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
|
||||
)
|
||||
|
||||
source = override.get("source")
|
||||
if source is None:
|
||||
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
|
||||
elif source in ("md_content", "extracted_text"):
|
||||
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
|
||||
else:
|
||||
await _rehold_bad_override(
|
||||
session, queue_row, doc, f"units_override.source={source!r} 미지원"
|
||||
)
|
||||
|
||||
expected_len = override.get("source_len")
|
||||
if expected_len is not None and expected_len != len(text_src):
|
||||
await _rehold_bad_override(
|
||||
session, queue_row, doc,
|
||||
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
|
||||
" (본문 재생성됨 — export 부터 재실행)",
|
||||
)
|
||||
|
||||
check = validate_override_boundaries(
|
||||
text_src,
|
||||
override.get("boundaries") or [],
|
||||
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
|
||||
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
|
||||
)
|
||||
if not check.ok:
|
||||
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
|
||||
|
||||
units = units_from_boundaries(text_src, check.boundaries)
|
||||
plan = UnitPlan(
|
||||
mode="map_reduce",
|
||||
tier="override",
|
||||
total_est_tokens=estimate_tokens(text_src),
|
||||
over_pct=float(preseg.get("over_pct") or 0.0),
|
||||
units=units,
|
||||
)
|
||||
logger.info(
|
||||
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
|
||||
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
|
||||
)
|
||||
await _process_map_reduce(
|
||||
doc, queue_row, envelope, subject_domain, plan, session,
|
||||
defer_on_deep_unavailable=defer_on_deep_unavailable,
|
||||
)
|
||||
|
||||
|
||||
async def _call_26b(
|
||||
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
|
||||
):
|
||||
|
||||
@@ -0,0 +1,456 @@
|
||||
"""presegment PR3 — HOLD 거대문서 유인 분할 CLI (plan ds-presegment-mapreduce-2).
|
||||
|
||||
deep_summary 워커가 HOLD(payload.presegment.awaiting_split=true) 로 보류한
|
||||
hybrid/whole tier 거대문서를, 사람이(유인 클로드 세션) 경계를 완성해 재개시키는 도구.
|
||||
|
||||
사용법 (fastapi 컨테이너 안에서 실행):
|
||||
docker compose exec fastapi python /app/scripts/presegment_attended.py list
|
||||
docker compose exec fastapi python /app/scripts/presegment_attended.py export --doc 44443 --out /app/logs/preseg_44443
|
||||
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json --dry-run
|
||||
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json
|
||||
|
||||
워크플로우:
|
||||
1. list — awaiting_split 문서 확인.
|
||||
2. export — 문서 통계·hier 개요·자동 pack 유닛 제안·초과 섹션 본문 덤프·boundaries
|
||||
템플릿 JSON 생성. 유인 클로드 세션은 이 파일들만 읽고 TODO 스팬을
|
||||
CAP 이하 경계들로 분할해 템플릿을 완성한다.
|
||||
3. apply — 완성된 boundaries 검증(단조·비중첩·범위·커버리지 90%+·유닛 캡) 후
|
||||
payload.presegment.units_override 기록 + awaiting_split 해제 +
|
||||
deferred_until 제거(즉시 재개). 워커가 다음 사이클에 map-reduce 재개.
|
||||
|
||||
stdout 규약: 사람이 읽는 요약 행 + '{' 로 시작하는 기계 파싱용 JSON 라인(1건 1라인).
|
||||
사람용 행은 절대 '{' 로 시작하지 않는다.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
|
||||
|
||||
from sqlalchemy import text as sql_text # noqa: E402
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # noqa: E402
|
||||
|
||||
from services.hier_decomp.builder import build_hier_tree # noqa: E402
|
||||
from services.summarize_units import ( # noqa: E402
|
||||
CAP_TOKENS,
|
||||
OVERRIDE_MIN_COVERAGE_PCT,
|
||||
TRIGGER_TOKENS,
|
||||
choose_override_source,
|
||||
estimate_tokens,
|
||||
extract_leaves,
|
||||
leaf_spans,
|
||||
plan_summarize_units,
|
||||
validate_override_boundaries,
|
||||
)
|
||||
|
||||
# 초과 섹션 본문 덤프 분할 단위 (유인 세션 컨텍스트 보호)
|
||||
DUMP_CHUNK_CHARS = 200_000
|
||||
|
||||
|
||||
def _jsonl(obj: dict) -> None:
|
||||
"""기계 파싱용 JSON 라인 — 반드시 '{' 로 시작하는 단독 라인."""
|
||||
print(json.dumps(obj, ensure_ascii=False, default=str))
|
||||
|
||||
|
||||
def _session_factory():
|
||||
database_url = os.getenv(
|
||||
"DATABASE_URL",
|
||||
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
|
||||
)
|
||||
engine = create_async_engine(database_url)
|
||||
return engine, async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
|
||||
|
||||
# ─── list ────────────────────────────────────────────────────────────────────
|
||||
|
||||
LIST_SQL = """
|
||||
SELECT q.id AS queue_id, q.document_id, q.status, q.attempts,
|
||||
q.payload::text AS payload_text,
|
||||
LEFT(COALESCE(d.title, '(제목 없음)'), 80) AS title
|
||||
FROM processing_queue q
|
||||
JOIN documents d ON d.id = q.document_id
|
||||
WHERE q.stage = 'deep_summary'
|
||||
AND q.status IN ('pending', 'processing', 'failed')
|
||||
AND (q.payload -> 'presegment' ->> 'awaiting_split') = 'true'
|
||||
ORDER BY q.id
|
||||
"""
|
||||
|
||||
|
||||
async def cmd_list() -> int:
|
||||
engine, factory = _session_factory()
|
||||
try:
|
||||
async with factory() as session:
|
||||
rows = (await session.execute(sql_text(LIST_SQL))).mappings().all()
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
print(f"awaiting_split 보류 문서 {len(rows)}건")
|
||||
for r in rows:
|
||||
payload = json.loads(r["payload_text"] or "{}")
|
||||
preseg = payload.get("presegment") or {}
|
||||
oversized = preseg.get("oversized_sections") or []
|
||||
print(
|
||||
f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']} "
|
||||
f"tier={preseg.get('tier')} over%={preseg.get('over_pct')} "
|
||||
f"tokens={preseg.get('total_est_tokens'):,} units={preseg.get('units')}"
|
||||
if isinstance(preseg.get("total_est_tokens"), int)
|
||||
else f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']}"
|
||||
)
|
||||
print(f" 초과 섹션 {len(oversized)}건: {', '.join(str(t) for t in oversized[:3] if t)}")
|
||||
print(
|
||||
f" 보류 알람={preseg.get('alerted_at') or '-'} / "
|
||||
f"재개 예정={payload.get('deferred_until') or '(즉시)'}"
|
||||
+ (f" / 거부 사유={preseg.get('override_rejected')}" if preseg.get("override_rejected") else "")
|
||||
)
|
||||
_jsonl({
|
||||
"cmd": "list",
|
||||
"queue_id": r["queue_id"],
|
||||
"doc_id": r["document_id"],
|
||||
"title": r["title"],
|
||||
"status": r["status"],
|
||||
"tier": preseg.get("tier"),
|
||||
"over_pct": preseg.get("over_pct"),
|
||||
"total_est_tokens": preseg.get("total_est_tokens"),
|
||||
"units": preseg.get("units"),
|
||||
"oversized_sections": oversized,
|
||||
"alerted_at": preseg.get("alerted_at"),
|
||||
"deferred_until": payload.get("deferred_until"),
|
||||
"override_rejected": preseg.get("override_rejected"),
|
||||
})
|
||||
return 0
|
||||
|
||||
|
||||
# ─── export ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def _safe_name(title: str | None, fallback: str) -> str:
|
||||
t = re.sub(r"[^0-9A-Za-z가-힣._-]+", "_", (title or fallback)).strip("_")
|
||||
return (t or fallback)[:60]
|
||||
|
||||
|
||||
def _build_outline(text: str) -> str:
|
||||
"""hier_decomp builder 재사용 — window-split 억제(요약 계획과 동일 환경) 개요."""
|
||||
nodes = build_hier_tree(text, leaf_target_max=sys.maxsize, leaf_hard_max=sys.maxsize)
|
||||
lines = []
|
||||
for n in nodes:
|
||||
indent = " " * max(n.level, 0)
|
||||
title = n.section_title or "(preamble)"
|
||||
tok = estimate_tokens(n.text)
|
||||
mark = " [CAP 초과]" if n.is_leaf and tok > CAP_TOKENS else ""
|
||||
lines.append(f"{indent}- L{n.level} {title} — {tok:,} tok{mark}")
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def cmd_export(doc_id: int, out_dir: str) -> int:
|
||||
engine, factory = _session_factory()
|
||||
try:
|
||||
async with factory() as session:
|
||||
row = (await session.execute(
|
||||
sql_text(
|
||||
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
|
||||
),
|
||||
{"d": doc_id},
|
||||
)).mappings().first()
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
if not row:
|
||||
print(f"[error] 문서 id={doc_id} 없음")
|
||||
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
|
||||
return 1
|
||||
|
||||
source, text = choose_override_source(row["md_content"], row["extracted_text"])
|
||||
if not text.strip():
|
||||
print(f"[error] 문서 id={doc_id} 본문 비어있음 (md_content/extracted_text 둘 다)")
|
||||
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "empty_text"})
|
||||
return 1
|
||||
|
||||
plan = plan_summarize_units(text)
|
||||
leaves = extract_leaves(text)
|
||||
spans = leaf_spans(text, leaves)
|
||||
|
||||
out = Path(out_dir)
|
||||
out.mkdir(parents=True, exist_ok=True)
|
||||
files: list[str] = []
|
||||
now_iso = datetime.now(timezone.utc).isoformat(timespec="seconds")
|
||||
|
||||
# ① 통계 + hier 개요
|
||||
oversized_units = [u for u in plan.units if u.over_cap]
|
||||
overview = [
|
||||
f"# presegment export — doc {doc_id}",
|
||||
"",
|
||||
f"- 제목: {row['title'] or '(제목 없음)'}",
|
||||
f"- source: {source} (len={len(text):,}자, 오프셋 기준 텍스트)",
|
||||
f"- 추정 토큰: {plan.total_est_tokens:,} (trigger={TRIGGER_TOKENS:,} / cap={CAP_TOKENS:,})",
|
||||
f"- plan: mode={plan.mode} tier={plan.tier} over%={plan.over_pct}",
|
||||
f"- 유닛: 자동 pack {len(plan.units) - len(oversized_units)}개 + CAP 초과 {len(oversized_units)}개",
|
||||
f"- 생성: {now_iso}",
|
||||
"",
|
||||
"## 유닛 제안 (summarize_units greedy-pack)",
|
||||
"",
|
||||
]
|
||||
for u in plan.units:
|
||||
if not u.leaf_indexes:
|
||||
continue
|
||||
s = spans[u.leaf_indexes[0]][0]
|
||||
e = spans[u.leaf_indexes[-1]][1]
|
||||
titles = " · ".join(t for t in u.section_titles if t) or "(무제 구간)"
|
||||
flag = " ★CAP 초과 — 분할 필요" if u.over_cap else ""
|
||||
overview.append(f"- 유닛 {u.index}: [{s}, {e}) {u.est_tokens:,} tok — {titles[:120]}{flag}")
|
||||
overview += ["", "## hier 개요", "", _build_outline(text), ""]
|
||||
(out / "overview.md").write_text("\n".join(overview), encoding="utf-8")
|
||||
files.append("overview.md")
|
||||
|
||||
# ③ 초과 섹션 본문 덤프 (섹션당 파일 · 200K자 단위 분할 · 파일명에 절대 스팬)
|
||||
boundaries: list[dict] = []
|
||||
for u in plan.units:
|
||||
if not u.leaf_indexes:
|
||||
continue
|
||||
s = spans[u.leaf_indexes[0]][0]
|
||||
e = spans[u.leaf_indexes[-1]][1]
|
||||
title = next((t for t in u.section_titles if t), None)
|
||||
if not u.over_cap:
|
||||
# ④ 자동 pack 유닛은 템플릿에 채워둔다
|
||||
boundaries.append({"start": s, "end": e, "title": title or f"유닛 {u.index}"})
|
||||
continue
|
||||
boundaries.append({
|
||||
"start": s, "end": e, "title": title or f"유닛 {u.index}",
|
||||
"todo": (
|
||||
f"CAP 초과({u.est_tokens:,} tok > {CAP_TOKENS:,}) — 이 스팬을 cap 이하 "
|
||||
"경계 여러 개로 교체하고 todo 키를 제거할 것"
|
||||
),
|
||||
})
|
||||
seg = text[s:e]
|
||||
base = f"oversized_{u.index:03d}_{_safe_name(title, f'unit{u.index}')}"
|
||||
for k in range(0, len(seg), DUMP_CHUNK_CHARS):
|
||||
cs, ce = s + k, s + min(k + DUMP_CHUNK_CHARS, len(seg))
|
||||
fname = f"{base}.{cs}_{ce}.md"
|
||||
# 본문 원문 그대로 (헤더 미부착 — 파일 내 로컬 오프셋 + 파일명 cs 로 절대 오프셋 계산)
|
||||
(out / fname).write_text(text[cs:ce], encoding="utf-8")
|
||||
files.append(fname)
|
||||
|
||||
# ④ boundaries 템플릿 JSON
|
||||
template = {
|
||||
"doc_id": doc_id,
|
||||
"source": source,
|
||||
"source_len": len(text),
|
||||
"cap_tokens": CAP_TOKENS,
|
||||
"generated_at": now_iso,
|
||||
"boundaries": boundaries,
|
||||
}
|
||||
(out / "boundaries_template.json").write_text(
|
||||
json.dumps(template, ensure_ascii=False, indent=2), encoding="utf-8"
|
||||
)
|
||||
files.append("boundaries_template.json")
|
||||
|
||||
# 유인 세션용 작업 안내
|
||||
readme = f"""# doc {doc_id} 유인 분할 안내
|
||||
|
||||
1. overview.md 로 구조 파악 (유닛 제안 + hier 개요).
|
||||
2. oversized_*.md 본문을 읽고 의미 경계를 정한다.
|
||||
- 파일명 `..._<cs>_<ce>.md` 의 cs = 파일 첫 문자의 절대 오프셋.
|
||||
- 절대 오프셋 = cs + 파일 내 로컬 오프셋.
|
||||
3. boundaries_template.json 의 todo 항목을 cap({CAP_TOKENS:,} tok) 이하 경계 여러 개로
|
||||
교체하고 todo 키를 제거한다. 나머지 자동 pack 항목은 그대로 둬도 된다.
|
||||
- 토큰 추정: 한글 0.529 tok/자 · 기타 0.217 tok/자 (services/summarize_units.py).
|
||||
- 규칙: start 단조증가 · 비중첩 · 전체 커버리지 {OVERRIDE_MIN_COVERAGE_PCT:.0f}%+ · 유닛당 cap 이하.
|
||||
4. 검증/적용:
|
||||
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE> --dry-run
|
||||
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE>
|
||||
"""
|
||||
(out / "README.md").write_text(readme, encoding="utf-8")
|
||||
files.append("README.md")
|
||||
|
||||
print(f"doc {doc_id} [{row['title'] or '(제목 없음)'}] export 완료 → {out}")
|
||||
print(
|
||||
f" source={source} len={len(text):,}자 tokens={plan.total_est_tokens:,} "
|
||||
f"tier={plan.tier} over%={plan.over_pct}"
|
||||
)
|
||||
print(f" 자동 pack 유닛 {len(plan.units) - len(oversized_units)}개 / TODO(초과) {len(oversized_units)}개")
|
||||
print(f" 파일 {len(files)}개: {', '.join(files[:6])}{' ...' if len(files) > 6 else ''}")
|
||||
_jsonl({
|
||||
"cmd": "export", "ok": True, "doc_id": doc_id, "out": str(out),
|
||||
"source": source, "source_len": len(text),
|
||||
"total_est_tokens": plan.total_est_tokens, "tier": plan.tier,
|
||||
"over_pct": plan.over_pct,
|
||||
"units_auto": len(plan.units) - len(oversized_units),
|
||||
"units_todo": len(oversized_units),
|
||||
"files": files,
|
||||
})
|
||||
return 0
|
||||
|
||||
|
||||
# ─── apply ───────────────────────────────────────────────────────────────────
|
||||
|
||||
QUEUE_ROW_SQL = """
|
||||
SELECT id, status, attempts, payload::text AS payload_text
|
||||
FROM processing_queue
|
||||
WHERE document_id = :d AND stage = 'deep_summary'
|
||||
AND status IN ('pending', 'processing', 'failed')
|
||||
ORDER BY id DESC
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
APPLY_UPDATE_SQL = """
|
||||
UPDATE processing_queue
|
||||
SET payload = CAST(:payload AS JSONB),
|
||||
status = 'pending',
|
||||
attempts = 0,
|
||||
error_message = NULL
|
||||
WHERE id = :qid
|
||||
"""
|
||||
|
||||
|
||||
async def cmd_apply(doc_id: int, boundaries_file: str, dry_run: bool) -> int:
|
||||
raw = json.loads(Path(boundaries_file).read_text(encoding="utf-8"))
|
||||
if isinstance(raw, dict):
|
||||
boundaries = raw.get("boundaries") or []
|
||||
declared_source = raw.get("source")
|
||||
declared_len = raw.get("source_len")
|
||||
if raw.get("doc_id") not in (None, doc_id):
|
||||
print(f"[error] boundaries 파일 doc_id={raw.get('doc_id')} != --doc {doc_id}")
|
||||
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "doc_id_mismatch"})
|
||||
return 1
|
||||
else:
|
||||
boundaries, declared_source, declared_len = raw, None, None
|
||||
|
||||
engine, factory = _session_factory()
|
||||
try:
|
||||
async with factory() as session:
|
||||
doc = (await session.execute(
|
||||
sql_text(
|
||||
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
|
||||
),
|
||||
{"d": doc_id},
|
||||
)).mappings().first()
|
||||
if not doc:
|
||||
print(f"[error] 문서 id={doc_id} 없음")
|
||||
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
|
||||
return 1
|
||||
|
||||
qrow = (await session.execute(sql_text(QUEUE_ROW_SQL), {"d": doc_id})).mappings().first()
|
||||
if not qrow:
|
||||
print(f"[error] doc {doc_id} 의 활성 deep_summary 큐 행 없음 (pending/processing/failed)")
|
||||
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_row_not_found"})
|
||||
return 1
|
||||
if qrow["status"] == "processing":
|
||||
print(f"[error] queue {qrow['id']} 가 processing 중 — 워커 완료/보류 후 재시도")
|
||||
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_processing"})
|
||||
return 1
|
||||
|
||||
# 오프셋 기준 텍스트 — export 와 동일 규칙 (파일에 source 선언 시 그 선언 우선)
|
||||
if declared_source in ("md_content", "extracted_text"):
|
||||
source = declared_source
|
||||
text = (doc["md_content"] if source == "md_content" else doc["extracted_text"]) or ""
|
||||
else:
|
||||
source, text = choose_override_source(doc["md_content"], doc["extracted_text"])
|
||||
if declared_len is not None and declared_len != len(text):
|
||||
print(
|
||||
f"[error] source_len 불일치 — 파일={declared_len:,} vs 현재 {source}={len(text):,}"
|
||||
" (본문 재생성됨 — export 부터 재실행)"
|
||||
)
|
||||
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "source_len_mismatch"})
|
||||
return 1
|
||||
|
||||
check = validate_override_boundaries(text, boundaries)
|
||||
for w in check.warnings:
|
||||
print(f" [warn] {w}")
|
||||
if not check.ok:
|
||||
print(f"[error] 경계 검증 실패 — {len(check.errors)}건:")
|
||||
for e in check.errors:
|
||||
print(f" - {e}")
|
||||
_jsonl({
|
||||
"cmd": "apply", "ok": False, "doc_id": doc_id,
|
||||
"error": "validation_failed", "errors": check.errors,
|
||||
"warnings": check.warnings, "coverage_pct": check.coverage_pct,
|
||||
})
|
||||
return 1
|
||||
|
||||
print(
|
||||
f"doc {doc_id} [{doc['title'] or '(제목 없음)'}] 경계 검증 통과 — "
|
||||
f"유닛 {len(check.boundaries)}개 / 커버리지 {check.coverage_pct}% / "
|
||||
f"최대 유닛 {max(check.unit_tokens):,} tok (cap {CAP_TOKENS:,})"
|
||||
)
|
||||
for i, ((s, e, t), tok) in enumerate(zip(check.boundaries, check.unit_tokens)):
|
||||
print(f" 유닛 {i}: [{s}, {e}) {tok:,} tok — {t or '(무제)'}")
|
||||
|
||||
payload = json.loads(qrow["payload_text"] or "{}")
|
||||
preseg = dict(payload.get("presegment") or {})
|
||||
preseg["units_override"] = {
|
||||
"source": source,
|
||||
"source_len": len(text),
|
||||
"boundaries": [[s, e, t] for s, e, t in check.boundaries],
|
||||
"applied_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
|
||||
}
|
||||
preseg["awaiting_split"] = False
|
||||
# 알람 dedupe 리셋(다음 이벤트는 신선하게 발화) + 이전 거부/맵 잔재 제거
|
||||
for k in ("alerted_at", "override_rejected", "override_rejected_at", "map_results"):
|
||||
preseg.pop(k, None)
|
||||
payload["presegment"] = preseg
|
||||
payload.pop("deferred_until", None) # 즉시 재개
|
||||
|
||||
if dry_run:
|
||||
print(f" [dry-run] queue {qrow['id']} 미변경 — 위 경계로 적용 가능")
|
||||
_jsonl({
|
||||
"cmd": "apply", "ok": True, "dry_run": True, "doc_id": doc_id,
|
||||
"queue_id": qrow["id"], "units": len(check.boundaries),
|
||||
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
|
||||
})
|
||||
return 0
|
||||
|
||||
await session.execute(
|
||||
sql_text(APPLY_UPDATE_SQL),
|
||||
{"payload": json.dumps(payload, ensure_ascii=False), "qid": qrow["id"]},
|
||||
)
|
||||
await session.commit()
|
||||
print(
|
||||
f" 적용 완료 — queue {qrow['id']} status=pending, deferred_until 제거 "
|
||||
f"(다음 queue_consumer 사이클에 재개)"
|
||||
)
|
||||
_jsonl({
|
||||
"cmd": "apply", "ok": True, "dry_run": False, "doc_id": doc_id,
|
||||
"queue_id": qrow["id"], "units": len(check.boundaries),
|
||||
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
|
||||
})
|
||||
return 0
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
# ─── main ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="presegment 유인 분할 CLI (PR3)")
|
||||
sub = parser.add_subparsers(dest="cmd", required=True)
|
||||
|
||||
sub.add_parser("list", help="awaiting_split 보류 문서 목록")
|
||||
|
||||
p_export = sub.add_parser("export", help="유인 분할 작업 패키지 덤프")
|
||||
p_export.add_argument("--doc", type=int, required=True)
|
||||
p_export.add_argument("--out", default=None, help="출력 디렉토리 (기본 ./preseg_export_<doc>)")
|
||||
|
||||
p_apply = sub.add_parser("apply", help="완성된 boundaries 검증·적용(재개)")
|
||||
p_apply.add_argument("--doc", type=int, required=True)
|
||||
p_apply.add_argument("--boundaries", required=True, help="boundaries JSON 파일 경로")
|
||||
p_apply.add_argument("--dry-run", action="store_true", help="검증만 하고 DB 미변경")
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.cmd == "list":
|
||||
return asyncio.run(cmd_list())
|
||||
if args.cmd == "export":
|
||||
out = args.out or f"./preseg_export_{args.doc}"
|
||||
return asyncio.run(cmd_export(args.doc, out))
|
||||
if args.cmd == "apply":
|
||||
return asyncio.run(cmd_apply(args.doc, args.boundaries, args.dry_run))
|
||||
return 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,486 @@
|
||||
"""presegment PR3 — HOLD 알람·units_override 재개·유인 분할 경계 검증 단위테스트.
|
||||
|
||||
test_deep_summary_mapreduce.py (PR2) 선례를 따르는 seam 단위 검증:
|
||||
- services.alerts.send_alert: env 미설정 no-op(프로세스당 1회 로그)·synochat/ntfy
|
||||
포맷·실패 시 절대 raise 금지 (webhook 은 전부 fake — 실호출 0).
|
||||
- _hold_awaiting_split: 알람 발화 + alerted_at dedupe(7일).
|
||||
- validate_override_boundaries: 정상/중첩/캡초과/커버리지 부족/TODO 잔존/범위 밖.
|
||||
- process(): units_override 존재 시 tier 판정(plan_summarize_units) 우회 →
|
||||
map-reduce 재개 / 잘못된 override 는 재-HOLD + 알람 / override 없는 문서 무회귀.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from types import SimpleNamespace
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
|
||||
|
||||
import httpx # noqa: E402
|
||||
|
||||
import services.alerts as alerts # noqa: E402
|
||||
from ai.envelope import EscalationEnvelope # noqa: E402
|
||||
from models.queue import StageDeferred # noqa: E402
|
||||
from services.summarize_units import ( # noqa: E402
|
||||
CAP_TOKENS,
|
||||
choose_override_source,
|
||||
estimate_tokens,
|
||||
extract_leaves,
|
||||
leaf_spans,
|
||||
plan_summarize_units,
|
||||
units_from_boundaries,
|
||||
validate_override_boundaries,
|
||||
)
|
||||
import workers.deep_summary_worker as dsw # noqa: E402
|
||||
|
||||
|
||||
# ─── fixtures (PR2 테스트와 동일 계열) ───────────────────────────────────────
|
||||
|
||||
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole (HOLD 대상)
|
||||
GIANT_WHOLE_MD = "# 통짜\n" + ("가" * 60_000)
|
||||
# trigger(25K tok) 이하 소형 문서 — 기존 단일콜 경로 (무회귀 확인용)
|
||||
SMALL_MD = "# 소형\n" + ("가" * 2_000)
|
||||
|
||||
MAP_JSON = (
|
||||
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
|
||||
' "inconsistencies": [], "confidence": 0.9}'
|
||||
)
|
||||
REDUCE_JSON = (
|
||||
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
|
||||
' "inconsistencies": [], "confidence": 0.8}'
|
||||
)
|
||||
|
||||
|
||||
class FakeSession:
|
||||
def __init__(self, row=None):
|
||||
self.commits = 0
|
||||
self._row = row
|
||||
|
||||
async def commit(self):
|
||||
self.commits += 1
|
||||
|
||||
|
||||
class FakeProcSession(FakeSession):
|
||||
"""process() 레벨 — session.get(Document) + execute(select queue_row) fake."""
|
||||
|
||||
def __init__(self, doc, row):
|
||||
super().__init__(row)
|
||||
self._doc = doc
|
||||
|
||||
async def get(self, model, pk):
|
||||
return self._doc
|
||||
|
||||
async def execute(self, stmt):
|
||||
row = self._row
|
||||
return SimpleNamespace(scalar_one_or_none=lambda: row)
|
||||
|
||||
|
||||
class FakeClient:
|
||||
"""deep 슬롯 보유 — call_deep_or_defer 가 call_deep 을 탄다 (PR2 테스트 동일)."""
|
||||
|
||||
def __init__(self):
|
||||
self.ai = SimpleNamespace(
|
||||
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
|
||||
)
|
||||
self.prompts: list[str] = []
|
||||
|
||||
async def call_deep(self, prompt: str, system=None) -> str:
|
||||
self.prompts.append(prompt)
|
||||
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
|
||||
return REDUCE_JSON
|
||||
return MAP_JSON
|
||||
|
||||
async def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def _doc(text: str = GIANT_WHOLE_MD, md_content: str | None = None):
|
||||
return SimpleNamespace(
|
||||
id=999,
|
||||
title="테스트 문서",
|
||||
extracted_text=text,
|
||||
md_content=md_content,
|
||||
ai_detail_summary=None,
|
||||
ai_inconsistencies=None,
|
||||
ai_analysis_tier="triage",
|
||||
ai_processed_at=None,
|
||||
)
|
||||
|
||||
|
||||
def _envelope_raw():
|
||||
return {
|
||||
"from_stage": "classify",
|
||||
"escalation_reasons": ["long_context"],
|
||||
"risk_flags": [],
|
||||
"distilled_context": "4B 요지",
|
||||
"original_pointers": {"doc_ids": [999]},
|
||||
}
|
||||
|
||||
|
||||
def _envelope():
|
||||
return EscalationEnvelope.from_json(json.dumps(_envelope_raw()))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _patch_telemetry(monkeypatch):
|
||||
events: list[dict] = []
|
||||
|
||||
async def fake_record(**kwargs):
|
||||
events.append(kwargs)
|
||||
|
||||
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
|
||||
return events
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _alert_recorder(monkeypatch):
|
||||
calls: list[tuple[str, str]] = []
|
||||
|
||||
async def fake_alert(title: str, message: str) -> bool:
|
||||
calls.append((title, message))
|
||||
return True
|
||||
|
||||
monkeypatch.setattr(dsw, "send_alert", fake_alert)
|
||||
return calls
|
||||
|
||||
|
||||
# ─── A. services.alerts ──────────────────────────────────────────────────────
|
||||
|
||||
class _FakeHttpxClient:
|
||||
"""httpx.AsyncClient 대체 — post 호출 캡처. 실호출 0."""
|
||||
|
||||
captured: list[dict] = []
|
||||
fail_with: Exception | None = None
|
||||
status_code = 200
|
||||
|
||||
def __init__(self, timeout=None):
|
||||
self.timeout = timeout
|
||||
|
||||
async def __aenter__(self):
|
||||
return self
|
||||
|
||||
async def __aexit__(self, *exc):
|
||||
return False
|
||||
|
||||
async def post(self, url, **kwargs):
|
||||
if _FakeHttpxClient.fail_with is not None:
|
||||
raise _FakeHttpxClient.fail_with
|
||||
_FakeHttpxClient.captured.append({"url": url, **kwargs})
|
||||
return SimpleNamespace(status_code=_FakeHttpxClient.status_code, text="ok")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _fake_httpx(monkeypatch):
|
||||
_FakeHttpxClient.captured = []
|
||||
_FakeHttpxClient.fail_with = None
|
||||
_FakeHttpxClient.status_code = 200
|
||||
monkeypatch.setattr(alerts.httpx, "AsyncClient", _FakeHttpxClient)
|
||||
return _FakeHttpxClient
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_alert_noop_without_env(monkeypatch, _fake_httpx):
|
||||
monkeypatch.delenv("ALERT_WEBHOOK_URL", raising=False)
|
||||
monkeypatch.setattr(alerts, "_noop_logged", False)
|
||||
infos: list[str] = []
|
||||
monkeypatch.setattr(alerts.logger, "info", lambda msg, *a, **k: infos.append(msg))
|
||||
|
||||
assert await alerts.send_alert("t", "m") is False
|
||||
assert await alerts.send_alert("t", "m") is False
|
||||
assert len(infos) == 1 # 프로세스당 1회만 로그
|
||||
assert _fake_httpx.captured == [] # webhook 미호출
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_alert_synochat_format(monkeypatch, _fake_httpx):
|
||||
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
|
||||
monkeypatch.delenv("ALERT_WEBHOOK_KIND", raising=False) # 기본 = synochat
|
||||
|
||||
assert await alerts.send_alert("제목", "본문 줄1\n줄2") is True
|
||||
assert len(_fake_httpx.captured) == 1
|
||||
call = _fake_httpx.captured[0]
|
||||
assert call["url"] == "http://chat.local/webhook"
|
||||
payload = json.loads(call["data"]["payload"])
|
||||
assert payload == {"text": "제목\n본문 줄1\n줄2"}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_alert_ntfy_format(monkeypatch, _fake_httpx):
|
||||
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://ntfy.local/ds")
|
||||
monkeypatch.setenv("ALERT_WEBHOOK_KIND", "ntfy")
|
||||
|
||||
assert await alerts.send_alert("한글 제목", "알람 본문") is True
|
||||
call = _fake_httpx.captured[0]
|
||||
# 한글 제목은 헤더(latin-1 한정) 대신 query param
|
||||
assert call["params"] == {"title": "한글 제목"}
|
||||
assert call["content"] == "알람 본문".encode("utf-8")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_alert_failure_never_raises(monkeypatch, _fake_httpx):
|
||||
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
|
||||
_fake_httpx.fail_with = httpx.ConnectError("down")
|
||||
assert await alerts.send_alert("t", "m") is False # raise 없이 False
|
||||
|
||||
_fake_httpx.fail_with = None
|
||||
_fake_httpx.status_code = 500
|
||||
assert await alerts.send_alert("t", "m") is False # HTTP 5xx 도 False
|
||||
|
||||
|
||||
# ─── B. HOLD 알람 + alerted_at dedupe ───────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_hold_alerts_once_then_dedupes(_alert_recorder):
|
||||
plan = plan_summarize_units(GIANT_WHOLE_MD)
|
||||
assert plan.tier == "whole"
|
||||
row = SimpleNamespace(payload={"envelope": {"x": 1}})
|
||||
session = FakeSession()
|
||||
|
||||
# 1차 HOLD — 알람 발화 + alerted_at 기록
|
||||
with pytest.raises(StageDeferred):
|
||||
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
|
||||
assert len(_alert_recorder) == 1
|
||||
title, message = _alert_recorder[0]
|
||||
assert "999" in title
|
||||
assert "테스트 문서" in message and "whole" in message
|
||||
assert f"export --doc 999" in message # 유인 분할 힌트
|
||||
alerted_at = row.payload["presegment"]["alerted_at"]
|
||||
assert datetime.fromisoformat(alerted_at)
|
||||
|
||||
# 2차 재보류(24h 후 재계획 시나리오) — 7일 이내 → 재알람 억제
|
||||
with pytest.raises(StageDeferred):
|
||||
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
|
||||
assert len(_alert_recorder) == 1
|
||||
assert row.payload["presegment"]["alerted_at"] == alerted_at # 미갱신
|
||||
|
||||
# 3차 — alerted_at 이 8일 전이면 재발화 + 갱신
|
||||
stale = (datetime.now(timezone.utc) - timedelta(days=8)).isoformat()
|
||||
payload = dict(row.payload)
|
||||
payload["presegment"] = {**payload["presegment"], "alerted_at": stale}
|
||||
row.payload = payload
|
||||
with pytest.raises(StageDeferred):
|
||||
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
|
||||
assert len(_alert_recorder) == 2
|
||||
assert row.payload["presegment"]["alerted_at"] != stale
|
||||
|
||||
|
||||
# ─── C. validate_override_boundaries ────────────────────────────────────────
|
||||
|
||||
def test_validate_ok_full_coverage():
|
||||
text = "가" * 40_000 # ≈ 21,160 tok
|
||||
bounds = [[0, 20_000, "전반"], [20_000, 40_000, "후반"]]
|
||||
check = validate_override_boundaries(text, bounds)
|
||||
assert check.ok and check.errors == []
|
||||
assert check.coverage_pct == 100.0
|
||||
assert check.boundaries == [(0, 20_000, "전반"), (20_000, 40_000, "후반")]
|
||||
assert all(t <= CAP_TOKENS for t in check.unit_tokens)
|
||||
|
||||
|
||||
def test_validate_dict_form_and_units_from_boundaries():
|
||||
text = "가" * 10_000
|
||||
bounds = [{"start": 0, "end": 5_000, "title": "a"}, {"start": 5_000, "end": 10_000, "title": "b"}]
|
||||
check = validate_override_boundaries(text, bounds)
|
||||
assert check.ok
|
||||
units = units_from_boundaries(text, check.boundaries)
|
||||
assert [u.index for u in units] == [0, 1]
|
||||
assert units[0].text == text[0:5_000]
|
||||
assert units[0].section_titles == ["a"]
|
||||
assert units[0].est_tokens == estimate_tokens(text[0:5_000])
|
||||
|
||||
|
||||
def test_validate_rejects_overlap():
|
||||
text = "가" * 10_000
|
||||
check = validate_override_boundaries(text, [[0, 6_000, "a"], [5_000, 10_000, "b"]])
|
||||
assert not check.ok
|
||||
assert any("중첩" in e for e in check.errors)
|
||||
|
||||
|
||||
def test_validate_rejects_cap_exceed():
|
||||
text = "가" * 40_000 # 단일 유닛 ≈ 21,160 tok > CAP 12,000
|
||||
check = validate_override_boundaries(text, [[0, 40_000, "통짜"]])
|
||||
assert not check.ok
|
||||
assert any("cap" in e and "유닛 0" in e for e in check.errors) # 어느 유닛인지 명시
|
||||
|
||||
|
||||
def test_validate_rejects_low_coverage():
|
||||
text = "가" * 10_000
|
||||
check = validate_override_boundaries(text, [[0, 1_000, "머리만"]])
|
||||
assert not check.ok
|
||||
assert any("커버리지" in e for e in check.errors)
|
||||
|
||||
|
||||
def test_validate_rejects_out_of_range_and_todo():
|
||||
text = "가" * 1_000
|
||||
check = validate_override_boundaries(text, [[0, 2_000, "밖"]])
|
||||
assert not check.ok and any("범위 밖" in e for e in check.errors)
|
||||
|
||||
check2 = validate_override_boundaries(
|
||||
text, [{"start": 0, "end": 1_000, "title": "t", "todo": "분할 필요"}]
|
||||
)
|
||||
assert not check2.ok and any("TODO" in e for e in check2.errors)
|
||||
|
||||
|
||||
def test_validate_warns_on_gap_but_passes_coverage():
|
||||
text = "가" * 100_000
|
||||
# 4% 공백 구간 — 커버리지 96% (>=90) 통과 + 경고
|
||||
bounds = [[0, 20_000, "a"], [24_000, 100_000, None]]
|
||||
# 24000~100000 = 76000자 ≈ 40,204 tok > CAP → cap 완화해 gap 경고만 검증
|
||||
check = validate_override_boundaries(text, bounds, cap=50_000)
|
||||
assert check.ok
|
||||
assert any("공백 구간" in w for w in check.warnings)
|
||||
|
||||
|
||||
def test_choose_override_source_prefers_md_content():
|
||||
assert choose_override_source("# md 본문", "추출본") == ("md_content", "# md 본문")
|
||||
assert choose_override_source(" \n", "추출본") == ("extracted_text", "추출본")
|
||||
assert choose_override_source(None, None) == ("extracted_text", "")
|
||||
|
||||
|
||||
def test_leaf_spans_reconstruct_source():
|
||||
md = "# 절 1\n" + "가" * 100 + "\n# 절 2\n" + "나" * 100
|
||||
leaves = extract_leaves(md)
|
||||
spans = leaf_spans(md, leaves)
|
||||
assert len(spans) == len(leaves)
|
||||
for (s, e), leaf in zip(spans, leaves):
|
||||
assert md[s:e] == leaf.text
|
||||
# 인접 파티션 — 이어붙이면 원문 전체
|
||||
assert spans[0][0] == 0 and spans[-1][1] == len(md)
|
||||
|
||||
|
||||
# ─── D. units_override 재개 경로 (worker process 레벨) ──────────────────────
|
||||
|
||||
def _override_payload(text: str, n_units: int = 3, source: str = "extracted_text") -> dict:
|
||||
step = len(text) // n_units
|
||||
bounds = []
|
||||
for i in range(n_units):
|
||||
s = i * step
|
||||
e = len(text) if i == n_units - 1 else (i + 1) * step
|
||||
bounds.append([s, e, f"파트 {i + 1}"])
|
||||
return {
|
||||
"envelope": _envelope_raw(),
|
||||
"subject_domain": "generic",
|
||||
"presegment": {
|
||||
"tier": "whole", "over_pct": 61.46, "awaiting_split": False,
|
||||
"units_override": {
|
||||
"source": source, "source_len": len(text), "boundaries": bounds,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_units_override_bypasses_tier_gate(monkeypatch, _patch_telemetry, _alert_recorder):
|
||||
doc = _doc(GIANT_WHOLE_MD) # override 없으면 whole → HOLD 였을 문서
|
||||
row = SimpleNamespace(payload=_override_payload(GIANT_WHOLE_MD, n_units=3))
|
||||
session = FakeProcSession(doc, row)
|
||||
|
||||
client = FakeClient()
|
||||
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||||
|
||||
def _boom(*a, **k):
|
||||
raise AssertionError("units_override 는 plan_summarize_units(tier 재판정)를 타면 안 됨")
|
||||
|
||||
monkeypatch.setattr(dsw, "plan_summarize_units", _boom)
|
||||
|
||||
await dsw.process(999, session)
|
||||
|
||||
# map 3 + reduce 1, 모든 콜 캡 준수 (오버헤드 = 정책 템플릿+envelope ~3K)
|
||||
assert len(client.prompts) == 4
|
||||
for p in client.prompts:
|
||||
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
|
||||
assert doc.ai_detail_summary == "최종 상세."
|
||||
assert doc.ai_analysis_tier == "deep"
|
||||
preseg = row.payload["presegment"]
|
||||
assert preseg["tier"] == "override"
|
||||
assert preseg["over_pct"] == 61.46 # HOLD 당시 실측치 보존
|
||||
assert len(preseg["map_results"]) == 3
|
||||
assert preseg["units_override"]["boundaries"][0][2] == "파트 1" # override 보존(감사)
|
||||
assert _alert_recorder == [] # 정상 재개 — 알람 없음
|
||||
assert len(_patch_telemetry) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bad_override_reholds_and_alerts(monkeypatch, _patch_telemetry, _alert_recorder):
|
||||
doc = _doc(GIANT_WHOLE_MD)
|
||||
payload = _override_payload(GIANT_WHOLE_MD, n_units=1) # 단일 유닛 ≈ 31.7K tok > CAP*1.1
|
||||
row = SimpleNamespace(payload=payload)
|
||||
session = FakeProcSession(doc, row)
|
||||
|
||||
def _no_llm():
|
||||
raise AssertionError("잘못된 override 는 LLM 콜(900s 재생산)로 흐르면 안 됨")
|
||||
|
||||
monkeypatch.setattr(dsw, "AIClient", _no_llm)
|
||||
|
||||
with pytest.raises(StageDeferred) as ei:
|
||||
await dsw.process(999, session)
|
||||
|
||||
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
|
||||
preseg = row.payload["presegment"]
|
||||
assert preseg["awaiting_split"] is True # 재-HOLD
|
||||
assert "cap" in preseg["override_rejected"]
|
||||
assert "units_override" in preseg # 원인 조사용 보존
|
||||
assert len(_alert_recorder) == 1 # 거부 알람
|
||||
assert "거부" in _alert_recorder[0][0]
|
||||
assert doc.ai_detail_summary is None
|
||||
assert _patch_telemetry == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_override_source_len_mismatch_reholds(monkeypatch, _alert_recorder):
|
||||
doc = _doc(GIANT_WHOLE_MD)
|
||||
payload = _override_payload(GIANT_WHOLE_MD, n_units=3)
|
||||
payload["presegment"]["units_override"]["source_len"] = 12_345 # 본문 재생성 시나리오
|
||||
row = SimpleNamespace(payload=payload)
|
||||
session = FakeProcSession(doc, row)
|
||||
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
|
||||
|
||||
with pytest.raises(StageDeferred):
|
||||
await dsw.process(999, session)
|
||||
|
||||
assert row.payload["presegment"]["awaiting_split"] is True
|
||||
assert "source_len 불일치" in row.payload["presegment"]["override_rejected"]
|
||||
assert len(_alert_recorder) == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_override_small_doc_keeps_single_call_path(
|
||||
monkeypatch, _patch_telemetry, _alert_recorder
|
||||
):
|
||||
"""무회귀 — units_override 없는 소형 문서는 기존 단일콜 경로 그대로."""
|
||||
doc = _doc(SMALL_MD)
|
||||
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
|
||||
session = FakeProcSession(doc, row)
|
||||
client = FakeClient()
|
||||
monkeypatch.setattr(dsw, "AIClient", lambda: client)
|
||||
|
||||
await dsw.process(999, session)
|
||||
|
||||
assert len(client.prompts) == 1 # 단일콜 (map-reduce 아님)
|
||||
assert SMALL_MD[:200].split("\n")[1][:50] in client.prompts[0] # 원문 슬라이스 포함
|
||||
assert doc.ai_detail_summary == "유닛 상세."
|
||||
assert "presegment" not in row.payload # payload 무변경
|
||||
assert _alert_recorder == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_no_override_whole_doc_still_holds_with_alert(
|
||||
monkeypatch, _patch_telemetry, _alert_recorder
|
||||
):
|
||||
"""override 미주입 whole 문서 — 기존 HOLD 시멘틱 유지 + PR3 알람만 추가."""
|
||||
doc = _doc(GIANT_WHOLE_MD)
|
||||
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
|
||||
session = FakeProcSession(doc, row)
|
||||
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
|
||||
|
||||
with pytest.raises(StageDeferred):
|
||||
await dsw.process(999, session)
|
||||
|
||||
preseg = row.payload["presegment"]
|
||||
assert preseg["awaiting_split"] is True and preseg["tier"] == "whole"
|
||||
assert len(_alert_recorder) == 1
|
||||
assert "HOLD" in _alert_recorder[0][0]
|
||||
Reference in New Issue
Block a user