Compare commits

..

4 Commits

Author SHA1 Message Date
hyungi 371ee4ebe6 feat(presegment): 영구 실패 Chat 알람 — ALERT_FAIL_STAGES allowlist(기본 deep_summary,summarize) 2026-07-03 08:14:12 +09:00
hyungi a55bb3453d feat(presegment): PR3 테스트 — 알람·dedupe·override 검증·재개·무회귀 19건
tests/test_presegment_pr3.py: alerts no-op(env 미설정, 프로세스당 1회 로그)/
synochat·ntfy 포맷/실패 무raise(webhook 전부 fake — 실호출 0), HOLD 알람
발화+alerted_at 7일 dedupe, validate_override_boundaries(정상/dict형/중첩/
캡초과/커버리지 부족/범위 밖/TODO 잔존/공백 경고), leaf_spans 원문 재구성,
units_override 가 tier 판정(plan_summarize_units) 우회하고 map-reduce 재개,
잘못된 override(캡 초과·source_len 불일치)=재-HOLD+알람+LLM 콜 0,
override 없는 소형(단일콜)·whole(HOLD+알람) 문서 무회귀.

기존 test_summarize_units 26 + test_deep_summary_mapreduce 등 인접 100건
pass 유지 (test_pipeline_hold 1건 실패는 main 기존 결함 — 본 PR 무관).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 9061f2e25c feat(presegment): PR3 C — 유인 분할 CLI scripts/presegment_attended.py
컨테이너 안(docker exec)에서 실행하는 3-subcommand CLI:
  list   — awaiting_split 보류 큐 행(문서·tier·over%·토큰·초과 섹션·보류/재개 시각)
  export — 문서 통계+hier 개요(overview.md)·자동 pack 유닛 제안(summarize_units
           재사용)·초과 섹션 (start,end) 스팬+본문 덤프(섹션당 파일, 200K자 분할,
           파일명에 절대 오프셋)·boundaries 템플릿 JSON(자동팩 채움+초과=todo 마커)
           +README(유인 클로드 세션 작업 안내)
  apply  — 경계 검증(단조증가·비중첩·본문 범위·커버리지 90%+공백 경고·유닛 캡
           초과 시 유닛 명시 거부·todo 잔존 거부·source_len 드리프트 거부) 통과 시
           payload.presegment.units_override 기록 + awaiting_split=false +
           deferred_until 제거(즉시 재개) + status pending·alerted_at/map_results
           정리. --dry-run 지원.

stdout = 사람이 읽는 요약 + '{' 로 시작하는 기계 파싱용 JSON 라인.
DB 접속 = 기존 scripts/ 패턴(DATABASE_URL env, backfill_tier.py 동형).
마이그레이션 없음 — payload JSONB 만 사용.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:23 +09:00
hyungi 33427d4a42 feat(presegment): PR3 A+B — HOLD 웹훅 알람 + units_override 재개 경로
A. services/alerts.py 신설 — send_alert(title, message):
   ALERT_WEBHOOK_URL 미설정=no-op(프로세스당 1회 INFO), ALERT_WEBHOOK_KIND
   synochat(기본)|ntfy, httpx 5s, 실패=WARNING만(절대 raise 금지).
   deep_summary HOLD/override 거부 시 발화 — 문서 id·제목·tier·over%·토큰·
   초과 섹션 상위3·재개 예정시각·유인 분할 힌트. dedupe=payload.presegment
   .alerted_at(7일) — 매 24h 재보류마다 재알람 방지.

B. units_override 재개 — payload.presegment.units_override 존재 시 tier
   재판정·HOLD 없이 (start,end,title) 문자 오프셋 경계로 유닛 구성 후 기존
   PR2 map-reduce 그대로(유닛 단위 멱등 commit·reduce·doc 기록). 방어:
   source_len 불일치·형식 오류·유닛 추정토큰 > CAP*1.1 이면 실패 대신
   재-HOLD + 알람(잘못된 override 의 900s 콜 재생산 차단). override 없는
   문서는 기존 경로 무회귀.

summarize_units 에 공용 순수함수 추가: choose_override_source(md_content
우선)·validate_override_boundaries(단조·비중첩·범위·커버리지 90%·유닛 캡)·
units_from_boundaries·leaf_spans + greedy_pack 유닛에 leaf_indexes 기록
(export CLI 스팬 계산용).

plan ds-presegment-mapreduce-2 / env DEEP_SUMMARY_HOLD_RETRY_MINUTES 유지.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-07-03 05:54:02 +09:00
6 changed files with 1425 additions and 15 deletions
+69
View File
@@ -0,0 +1,69 @@
"""운영 알람 webhook (presegment PR3 — HOLD 유인 전환 게이트).
deep_summary HOLD(awaiting_split) 처럼 "사람이 개입해야 풀리는" 상태를 웹훅으로 발화한다.
환경변수:
ALERT_WEBHOOK_URL — 미설정 = no-op (프로세스당 1회 INFO 로그만).
ALERT_WEBHOOK_KIND — 'synochat' | 'ntfy' (기본 synochat).
synochat: Synology Chat incoming webhook — POST form `payload={"text": "..."}`.
ntfy: POST body=message. 제목은 query param(?title=) — HTTP 헤더는 latin-1
한정이라 한글 제목이 깨진다 (ntfy 는 query param title 을 공식 지원).
불변식: 알람은 절대 raise 하지 않는다 — 실패는 WARNING 로그만. 알람이 워커를
죽이면 본전(요약 파이프라인)이 무너진다. 호출부는 반환값(bool)에 의존하지 말 것.
"""
from __future__ import annotations
import json
import os
import httpx
from core.utils import setup_logger
logger = setup_logger("alerts")
ALERT_TIMEOUT_SECONDS = 5.0
# 프로세스당 1회만 "미설정 no-op" 로그 — 매 HOLD 마다 로그 오염 방지.
_noop_logged = False
async def send_alert(title: str, message: str) -> bool:
"""webhook 으로 알람 1건 발화. 성공 True / no-op·실패 False. 절대 raise 금지."""
global _noop_logged
url = (os.getenv("ALERT_WEBHOOK_URL") or "").strip()
if not url:
if not _noop_logged:
logger.info("ALERT_WEBHOOK_URL 미설정 — 알람 no-op (이 로그는 프로세스당 1회)")
_noop_logged = True
return False
kind = (os.getenv("ALERT_WEBHOOK_KIND") or "synochat").strip().lower()
if kind not in ("synochat", "ntfy"):
logger.warning(f"ALERT_WEBHOOK_KIND={kind!r} 미지원 — synochat 으로 폴백")
kind = "synochat"
try:
async with httpx.AsyncClient(timeout=ALERT_TIMEOUT_SECONDS) as client:
if kind == "ntfy":
resp = await client.post(
url,
params={"title": title},
content=message.encode("utf-8"),
)
else: # synochat
text = f"{title}\n{message}" if title else message
resp = await client.post(
url,
data={"payload": json.dumps({"text": text}, ensure_ascii=False)},
)
if resp.status_code >= 400:
logger.warning(
f"알람 webhook({kind}) HTTP {resp.status_code}: {resp.text[:200]}"
)
return False
return True
except Exception as exc: # noqa: BLE001 — 알람 실패가 워커를 죽이면 안 됨
logger.warning(f"알람 webhook({kind}) 발화 실패: {exc}")
return False
+193 -3
View File
@@ -66,6 +66,9 @@ class SummarizeUnit:
text: str = ""
est_tokens: int = 0
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
# PR3: 이 유닛을 구성한 leaf 의 서수(extract_leaves 순서) — export CLI 가
# leaf_spans 와 결합해 유닛 (start,end) 스팬을 계산한다. 페이로드 미기록.
leaf_indexes: list[int] = field(default_factory=list)
@dataclass
@@ -92,20 +95,22 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
units: list[SummarizeUnit] = []
cur_titles: list[str | None] = []
cur_texts: list[str] = []
cur_indexes: list[int] = []
cur_tokens = 0
def _flush() -> None:
nonlocal cur_titles, cur_texts, cur_tokens
nonlocal cur_titles, cur_texts, cur_indexes, cur_tokens
if cur_texts:
units.append(SummarizeUnit(
index=len(units),
section_titles=cur_titles,
text="\n\n".join(cur_texts),
est_tokens=cur_tokens,
leaf_indexes=cur_indexes,
))
cur_titles, cur_texts, cur_tokens = [], [], 0
cur_titles, cur_texts, cur_indexes, cur_tokens = [], [], [], 0
for leaf in leaves:
for li, leaf in enumerate(leaves):
t = estimate_tokens(leaf.text)
if t > cap:
_flush()
@@ -115,12 +120,14 @@ def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[Summarize
text=leaf.text,
est_tokens=t,
over_cap=True,
leaf_indexes=[li],
))
continue
if cur_tokens + t > cap:
_flush()
cur_titles.append(leaf.section_title)
cur_texts.append(leaf.text)
cur_indexes.append(li)
cur_tokens += t
_flush()
return units
@@ -222,3 +229,186 @@ def build_reduce_units_block(
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
truncated = True
return block, truncated
# ─── PR3 — 유인 분할(units_override) 경계 순수함수 (worker + attended CLI 공용) ───
#
# HOLD(hybrid/whole) 문서를 사람이(유인 클로드 세션) 분할한 경계
# [(start, end, title)] 로 재개하는 경로. 오프셋 = 소스 텍스트의 Python 문자
# (code point) 인덱스 — export 가 덤프한 파일과 apply/워커의 슬라이스가 같은
# 기준을 공유해야 한다 (builder 의 char_start 는 UTF-16 단위라 여기서 미사용).
OVERRIDE_MIN_COVERAGE_PCT = 90.0 # apply 게이트 — 전체 본문의 90%+ 커버 필수
OVERRIDE_GAP_WARN_CHARS = 1_000 # 이보다 큰 공백 구간은 경고로 노출
@dataclass
class OverrideCheck:
"""validate_override_boundaries 결과 — ok=False 면 errors 에 사유."""
ok: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
coverage_pct: float = 0.0
# 정규화된 (start, end, title) — units_from_boundaries 입력으로 그대로 사용
boundaries: list[tuple[int, int, str | None]] = field(default_factory=list)
unit_tokens: list[int] = field(default_factory=list)
def choose_override_source(
md_content: str | None, extracted_text: str | None
) -> tuple[str, str]:
"""units_override 오프셋 기준 텍스트 선택 — (source_name, text).
canonical markdown(md_content) 우선, 부재/공백 시 extracted_text 폴백.
export CLI · apply CLI · 워커 재개가 반드시 같은 규칙을 공유해야
(start,end) 오프셋이 일치한다. 선택 결과는 units_override.source 에 박제.
"""
if md_content and md_content.strip():
return "md_content", md_content
return "extracted_text", extracted_text or ""
def _normalize_boundary(entry, idx: int, errors: list[str]) -> tuple[int, int, str | None] | None:
"""boundaries 1건 정규화 — [start,end,title?] 배열 또는 {start,end,title} 객체.
export 템플릿의 초과 스팬은 "todo" 키를 달고 나온다 — 미해결(todo 잔존) 상태로
apply 하면 여기서 에러 (사람이 CAP 이하 경계로 분할을 완성해야 통과).
"""
if isinstance(entry, dict):
if entry.get("todo"):
errors.append(
f"유닛 {idx}: TODO 미해결 — 초과 스팬을 CAP 이하 경계들로 분할한 뒤 todo 키를 제거해야 함"
)
return None
start, end, title = entry.get("start"), entry.get("end"), entry.get("title")
elif isinstance(entry, (list, tuple)) and 2 <= len(entry) <= 3:
start, end = entry[0], entry[1]
title = entry[2] if len(entry) == 3 else None
else:
errors.append(f"유닛 {idx}: 형식 오류 — [start, end, title] 또는 {{start, end, title}} 여야 함")
return None
if (
isinstance(start, bool) or isinstance(end, bool)
or not isinstance(start, int) or not isinstance(end, int)
):
errors.append(f"유닛 {idx}: start/end 는 정수여야 함 (start={start!r}, end={end!r})")
return None
if title is not None and not isinstance(title, str):
title = str(title)
return start, end, title
def validate_override_boundaries(
text: str,
raw_boundaries,
*,
cap: int = CAP_TOKENS,
min_coverage_pct: float = OVERRIDE_MIN_COVERAGE_PCT,
gap_warn_chars: int = OVERRIDE_GAP_WARN_CHARS,
) -> OverrideCheck:
"""units_override 경계 검증 — 단조증가·비중첩·본문 범위 내·커버리지·유닛별 캡.
apply CLI = 기본 게이트(cap=CAP_TOKENS, coverage>=90%).
워커 방어 = cap 슬랙(CAP*1.1)·coverage 0 으로 완화 호출 — 잘못된 override 가
900s 콜을 재생산하는 것만 차단하고, 품질 게이트는 apply 시점에 이미 통과했다고 본다.
"""
errors: list[str] = []
warnings: list[str] = []
if not isinstance(raw_boundaries, (list, tuple)) or not raw_boundaries:
return OverrideCheck(ok=False, errors=["boundaries 가 비어있거나 리스트가 아님"])
normalized: list[tuple[int, int, str | None]] = []
for i, entry in enumerate(raw_boundaries):
norm = _normalize_boundary(entry, i, errors)
if norm is not None:
normalized.append(norm)
if errors:
return OverrideCheck(ok=False, errors=errors, warnings=warnings)
n = len(text)
prev_end = None
unit_tokens: list[int] = []
covered = 0
for i, (start, end, title) in enumerate(normalized):
label = f"유닛 {i}" + (f" ({title})" if title else "")
if start < 0 or end > n:
errors.append(f"{label}: 본문 범위 밖 — [{start}, {end}) vs len={n}")
continue
if start >= end:
errors.append(f"{label}: start >= end ([{start}, {end}))")
continue
if prev_end is not None and start < prev_end:
errors.append(f"{label}: 직전 유닛과 중첩/역순 — start={start} < 직전 end={prev_end}")
if prev_end is not None and start - prev_end > gap_warn_chars:
warnings.append(f"{label} 앞 공백 구간 {start - prev_end:,}자 ([{prev_end}, {start})) — 의도 확인")
est = estimate_tokens(text[start:end])
unit_tokens.append(est)
if est > cap:
errors.append(f"{label}: 추정 {est:,} tok > cap {cap:,} — 이 스팬을 더 분할해야 함")
covered += end - start
prev_end = max(prev_end or 0, end)
if not errors:
head_gap = normalized[0][0]
tail_gap = n - normalized[-1][1]
if head_gap > gap_warn_chars:
warnings.append(f"문서 선두 공백 구간 {head_gap:,}자 ([0, {normalized[0][0]})) — 의도 확인")
if tail_gap > gap_warn_chars:
warnings.append(f"문서 말미 공백 구간 {tail_gap:,}자 ([{normalized[-1][1]}, {n})) — 의도 확인")
coverage_pct = round(covered * 100.0 / n, 2) if n else 0.0
if not errors and coverage_pct < min_coverage_pct:
errors.append(
f"커버리지 {coverage_pct}% < {min_coverage_pct}% — 경계가 본문 대부분을 덮어야 함"
)
return OverrideCheck(
ok=not errors,
errors=errors,
warnings=warnings,
coverage_pct=coverage_pct,
boundaries=normalized if not errors else [],
unit_tokens=unit_tokens,
)
def units_from_boundaries(
text: str, boundaries: list[tuple[int, int, str | None]]
) -> list[SummarizeUnit]:
"""정규화·검증 통과한 (start,end,title) 리스트 → SummarizeUnit 리스트.
유닛 index = 경계 서수 — boundaries 는 payload 에 박제되므로 attempt 간 안정
(map_results 멱등 재개 키와 정합).
"""
units: list[SummarizeUnit] = []
for i, (start, end, title) in enumerate(boundaries):
seg = text[start:end]
units.append(SummarizeUnit(
index=i,
section_titles=[title],
text=seg,
est_tokens=estimate_tokens(seg),
))
return units
def leaf_spans(text: str, leaves: list[HierNode]) -> list[tuple[int, int]]:
"""extract_leaves 결과 leaf 들의 원문 (start,end) 문자 스팬.
_segment 가 원문을 연속 파티션(빈 preamble 만 폐기)으로 자르므로, 커서 순차
탐색이 항상 정확한 위치를 찾는다 (동일 본문 반복이 있어도 순서가 앞선 leaf 가
앞 오프셋을 가져간다).
"""
spans: list[tuple[int, int]] = []
cursor = 0
for leaf in leaves:
pos = text.find(leaf.text, cursor)
if pos < 0:
# 이론상 불가(연속 파티션) — 방어적으로 전체 재탐색
pos = text.find(leaf.text)
if pos < 0:
raise ValueError(f"leaf 본문을 원문에서 찾지 못함 (title={leaf.section_title!r})")
spans.append((pos, pos + len(leaf.text)))
cursor = pos + len(leaf.text)
return spans
+189 -12
View File
@@ -14,7 +14,7 @@ import asyncio
import json
import os
import time
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import desc, select
@@ -29,15 +29,19 @@ from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue, StageDeferred
from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.alerts import send_alert
from services.document_telemetry import record_analyze_event
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.summarize_units import (
CAP_TOKENS,
UnitPlan,
build_reduce_units_block,
choose_override_source,
estimate_tokens,
plan_summarize_units,
render_map_slice,
units_from_boundaries,
validate_override_boundaries,
)
logger = setup_logger("deep_summary_worker")
@@ -45,9 +49,16 @@ logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary"
# presegment PR2 (plan ds-presegment-mapreduce-2) — 거대문서 map-reduce
REDUCE_TASK = "p3c_deep_summary_reduce"
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격. PR3(알람·경계 주입) 전까지는
# 이 간격으로 재계획만 반복한다 — attempts 미소모(StageDeferred)라 영구 failed 없음.
# HYBRID/TIER2(클로드 유인 분할 필요) HOLD 재확인 간격 — attempts 미소모(StageDeferred)라
# 영구 failed 없음. PR3: HOLD 시 웹훅 알람 + units_override 주입 시 즉시 재개.
HOLD_RETRY_MINUTES = int(os.getenv("DEEP_SUMMARY_HOLD_RETRY_MINUTES", "1440"))
# HOLD 알람 dedupe — payload.presegment.alerted_at 이 이 일수 이내면 재발화 억제
# (매 24h 재보류마다 재알람 방지). apply CLI 가 override 기록 시 alerted_at 을 지워
# 다음 이벤트(예: override 거부)는 신선하게 발화된다.
ALERT_DEDUPE_DAYS = 7
# units_override 방어 캡 슬랙 — apply 게이트(CAP)보다 10% 여유. 초과 유닛은 실패
# 대신 재-HOLD + 알람 (잘못된 override 가 900s 콜을 재생산하는 것 차단).
OVERRIDE_CAP_SLACK = 1.1
# reduce 프롬프트 오버헤드가 비정상적으로 커도 유닛 블록 예산은 이 밑으로 안 내려감(방어).
REDUCE_BUDGET_FLOOR_TOKENS = 1_000
@@ -111,6 +122,17 @@ async def process(
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# ─── presegment PR3 — units_override 재개 경로 (유인 분할 경계 주입) ───
# apply CLI(scripts/presegment_attended.py) 가 payload.presegment.units_override 를
# 기록한 문서는 tier 재판정·HOLD 없이 그 경계로 유닛을 구성해 기존 PR2
# map-reduce 경로를 그대로 탄다. override 없는 문서는 아래 기존 경로와 바이트 동일.
if (payload.get("presegment") or {}).get("units_override"):
await _process_units_override(
doc, queue_row, envelope, subject_domain, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
return
# ─── presegment PR2 게이트 (plan ds-presegment-mapreduce-2) ───
# TRIGGER(25K tok) 이하 = 아래 기존 단일콜 경로 그대로(무회귀). 초과 시 3-way:
# auto(over%==0) → 로컬 map-reduce (유닛별 26B → reduce)
@@ -123,7 +145,9 @@ async def process(
# units 빈 auto 는 이론상 불가(비어있지 않은 텍스트 = leaf >= 1)지만, 빈 reduce
# 단일콜(환각 위험)로 흐르지 않게 방어적으로 HOLD 로 보낸다.
if unit_plan.tier != "auto" or not unit_plan.units:
await _hold_awaiting_split(session, queue_row, unit_plan, document_id)
await _hold_awaiting_split(
session, queue_row, unit_plan, document_id, doc_title=doc.title
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, unit_plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
@@ -250,43 +274,196 @@ async def process(
)
def _hold_alert_due(preseg: dict, now: datetime) -> bool:
"""HOLD 알람 dedupe — alerted_at 이 없거나 ALERT_DEDUPE_DAYS 초과 시에만 발화."""
ts = preseg.get("alerted_at")
if not ts:
return True
try:
prev = datetime.fromisoformat(str(ts))
except ValueError:
return True # 깨진 타임스탬프 = 기록 신뢰 불가 → 발화하고 재기록
if prev.tzinfo is None:
prev = prev.replace(tzinfo=timezone.utc)
return (now - prev) >= timedelta(days=ALERT_DEDUPE_DAYS)
async def _hold_awaiting_split(
session: AsyncSession, queue_row: ProcessingQueue, plan: UnitPlan, document_id: int
session: AsyncSession,
queue_row: ProcessingQueue,
plan: UnitPlan,
document_id: int,
doc_title: str | None = None,
) -> None:
"""HYBRID/TIER2 — 클로드 유인 분할 대기(HOLD). 맥미니 미전송, StageDeferred 보류.
payload.presegment.awaiting_split 마킹을 먼저 commit — StageDeferred 핸들러
(queue_consumer)는 새 세션에서 행을 다시 읽어 deferred_until 만 병합하므로 유실 없음.
알람(ntfy)·클로드 경계 주입은 PR3 — 그 전까지는 HOLD_RETRY_MINUTES 간격 재계획만 반복.
PR3: 유인 전환 게이트 웹훅 알람 발화(alerted_at dedupe — 매 24h 재보류마다 재알람 방지).
무인 자동 cloud 호출 금지 룰 준수(클로드 경로는 항상 유인 게이트).
"""
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
oversized = [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20]
preseg.update({
"awaiting_split": True,
"tier": plan.tier,
"over_pct": plan.over_pct,
"total_est_tokens": plan.total_est_tokens,
"units": len(plan.units),
# 클로드가 분할해야 할 초과 섹션 표본 (PR3 알람 본문용)
"oversized_sections": [
(u.section_titles[0] if u.section_titles else None)
for u in plan.units if u.over_cap
][:20],
# 클로드가 분할해야 할 초과 섹션 표본 (알람 본문 + export CLI 안내용)
"oversized_sections": oversized,
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.info(
f"[deep] id={document_id} awaiting_split tier={plan.tier} over_pct={plan.over_pct} "
f"total_est_tokens={plan.total_est_tokens} units={len(plan.units)} "
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, 클로드 분할=PR3 유인)"
f"→ HOLD ({HOLD_RETRY_MINUTES}분 후 재확인, alert={'발화' if alert_due else 'dedupe'})"
)
if alert_due:
# commit 이후 발화 — 알람이 5s 행에 걸려도 payload 마킹은 이미 영속.
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
top3 = ", ".join(t for t in oversized[:3] if t) or "(제목 없음)"
await send_alert(
f"[DS] deep_summary HOLD — doc {document_id} 유인 분할 필요",
(
f"문서: {doc_title or '(제목 없음)'} (id={document_id})\n"
f"tier={plan.tier} / over%={plan.over_pct} / "
f"total_est_tokens={plan.total_est_tokens:,} / units={len(plan.units)}\n"
f"초과 섹션(상위 3): {top3}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"유인 분할: scripts/presegment_attended.py export --doc {document_id}"
),
)
raise StageDeferred(
f"awaiting_split:{plan.tier}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _rehold_bad_override(
session: AsyncSession, queue_row: ProcessingQueue, doc: Document, reason: str
) -> None:
"""잘못된 units_override — 실패 대신 재-HOLD + 알람 (900s 콜 재생산 차단).
units_override 는 payload 에 보존(사람이 원인 조사) + override_rejected 사유 기록.
apply CLI 가 alerted_at 을 지워두므로 첫 거부는 즉시 발화되고, 이후 24h 재보류
루프는 ALERT_DEDUPE_DAYS dedupe 로 억제된다.
"""
document_id = doc.id
payload = dict(queue_row.payload or {})
preseg = dict(payload.get("presegment") or {})
now = datetime.now(timezone.utc)
alert_due = _hold_alert_due(preseg, now)
preseg.update({
"awaiting_split": True,
"override_rejected": reason,
"override_rejected_at": now.isoformat(),
})
if alert_due:
preseg["alerted_at"] = now.isoformat()
payload["presegment"] = preseg
queue_row.payload = payload # 재할당 = JSONB 변경 감지
await session.commit()
logger.warning(f"[deep] id={document_id} units_override 거부 → 재-HOLD: {reason}")
if alert_due:
resume_at = now + timedelta(minutes=HOLD_RETRY_MINUTES)
await send_alert(
f"[DS] deep_summary 유인 분할 경계 거부 — doc {document_id}",
(
f"문서: {doc.title or '(제목 없음)'} (id={document_id})\n"
f"사유: {reason}\n"
f"재개 예정(UTC): {resume_at.isoformat(timespec='minutes')}\n"
f"수정: scripts/presegment_attended.py export --doc {document_id} "
f"→ apply --doc {document_id} --boundaries FILE"
),
)
raise StageDeferred(
f"override_rejected:{reason[:80]}", retry_after_minutes=HOLD_RETRY_MINUTES
)
async def _process_units_override(
doc: Document,
queue_row: ProcessingQueue,
envelope: EscalationEnvelope,
subject_domain: str,
session: AsyncSession,
*,
defer_on_deep_unavailable: bool,
) -> None:
"""PR3 — apply CLI 가 기록한 유인 분할 경계로 map-reduce 재개.
경계 = units_override.source 텍스트의 (start, end) 문자 오프셋. 방어 검증
(source_len 일치·단조·비중첩·범위·유닛 캡*1.1) 실패 시 재-HOLD + 알람 —
apply 이후 본문이 재생성됐거나 수기 주입이 깨진 경우 900s 콜로 흐르지 않는다.
통과 시 기존 PR2 _process_map_reduce 를 그대로 탄다(맵 결과 유닛 단위 commit·
reduce·ai_detail_summary 기록 — 유닛 index 는 payload 박제 경계 서수라 안정).
"""
document_id = doc.id
preseg = dict((queue_row.payload or {}).get("presegment") or {})
override = preseg.get("units_override")
if isinstance(override, (list, tuple)):
# 수기 주입 호환 — bare [(start,end,title)] 리스트도 허용
override = {"boundaries": list(override)}
if not isinstance(override, dict):
await _rehold_bad_override(
session, queue_row, doc, f"units_override 형식 오류 (type={type(override).__name__})"
)
source = override.get("source")
if source is None:
source, text_src = choose_override_source(doc.md_content, doc.extracted_text)
elif source in ("md_content", "extracted_text"):
text_src = (doc.md_content if source == "md_content" else doc.extracted_text) or ""
else:
await _rehold_bad_override(
session, queue_row, doc, f"units_override.source={source!r} 미지원"
)
expected_len = override.get("source_len")
if expected_len is not None and expected_len != len(text_src):
await _rehold_bad_override(
session, queue_row, doc,
f"source_len 불일치 — override={expected_len:,} vs 현재 {source}={len(text_src):,}"
" (본문 재생성됨 — export 부터 재실행)",
)
check = validate_override_boundaries(
text_src,
override.get("boundaries") or [],
cap=int(CAP_TOKENS * OVERRIDE_CAP_SLACK),
min_coverage_pct=0.0, # 커버리지 품질 게이트는 apply CLI 가 이미 통과시킴
)
if not check.ok:
await _rehold_bad_override(session, queue_row, doc, "; ".join(check.errors[:5]))
units = units_from_boundaries(text_src, check.boundaries)
plan = UnitPlan(
mode="map_reduce",
tier="override",
total_est_tokens=estimate_tokens(text_src),
over_pct=float(preseg.get("over_pct") or 0.0),
units=units,
)
logger.info(
f"[deep] id={document_id} units_override 재개 — source={source} units={len(units)} "
f"coverage={check.coverage_pct}% max_unit_tokens={max(check.unit_tokens, default=0)}"
)
await _process_map_reduce(
doc, queue_row, envelope, subject_domain, plan, session,
defer_on_deep_unavailable=defer_on_deep_unavailable,
)
async def _call_26b(
client: AIClient, prompt: str, *, defer_on_deep_unavailable: bool, document_id: int
):
+32
View File
@@ -23,6 +23,15 @@ logger = setup_logger("queue_consumer")
# pipeline.held_stages 안내 로그는 1분 사이클마다 반복하지 않고 최초 1회만.
_hold_logged = False
# PR3 후속(2026-07-03): 영구 실패 알람 — 사람이 개입해야 풀리는 상태라 Chat 웹훅 발화.
# allowlist 로 소음 제한: embed/chunk 류 대량 배치가 일제히 실패하면 문서 수만큼 알람이
# 쏟아지므로, 건당 가치가 높고 발생률이 낮은 LLM 스테이지만 기본 대상으로 한다.
_ALERT_FAIL_STAGES = {
s.strip()
for s in os.getenv("ALERT_FAIL_STAGES", "deep_summary,summarize").split(",")
if s.strip()
}
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
@@ -353,6 +362,8 @@ async def _process_stage(stage, worker_fn):
except Exception as e:
# 실패 처리
permanently_failed = False
doc_title = None
async with async_session() as session:
item = await session.get(ProcessingQueue, queue_id)
if not item:
@@ -363,7 +374,16 @@ async def _process_stage(stage, worker_fn):
item.error_message = err_text[:500]
if item.attempts >= item.max_attempts:
item.status = "failed"
permanently_failed = True
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
if stage in _ALERT_FAIL_STAGES:
# 알람용 제목 best-effort — 실패해도 알람 자체는 발화한다.
try:
from models.document import Document
_doc = await session.get(Document, document_id)
doc_title = getattr(_doc, "title", None)
except Exception:
doc_title = None
# B3: marker_worker 는 변환 시작 시 doc.md_status='processing' 으로 표시한다.
# 변환이 _fail()/_set_skipped() 를 거치지 않고 예외로 죽으면(예: 대형
# batch ReadTimeout) doc.md_status 가 'processing' 에 영구 고착 = orphan
@@ -385,6 +405,18 @@ async def _process_stage(stage, worker_fn):
item.started_at = None
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
await session.commit()
if permanently_failed and stage in _ALERT_FAIL_STAGES:
# 영구 실패 = 무인 파이프라인이 스스로 못 푸는 상태 → 유인 전환 알람.
# send_alert 는 절대 raise 하지 않음(no-op/실패 = False 반환뿐).
from services.alerts import send_alert
await send_alert(
f"[DS] {stage} 영구 실패 — doc {document_id}",
(
f"{doc_title or '(제목 미상)'}\n"
f"에러: {err_text[:300]}\n"
f"확인: scripts/presegment_attended.py list (보류/거부 사유) 또는 큐 재큐"
),
)
async def consume_queue():
+456
View File
@@ -0,0 +1,456 @@
"""presegment PR3 — HOLD 거대문서 유인 분할 CLI (plan ds-presegment-mapreduce-2).
deep_summary 워커가 HOLD(payload.presegment.awaiting_split=true) 보류한
hybrid/whole tier 거대문서를, 사람이(유인 클로드 세션) 경계를 완성해 재개시키는 도구.
사용법 (fastapi 컨테이너 안에서 실행):
docker compose exec fastapi python /app/scripts/presegment_attended.py list
docker compose exec fastapi python /app/scripts/presegment_attended.py export --doc 44443 --out /app/logs/preseg_44443
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json --dry-run
docker compose exec fastapi python /app/scripts/presegment_attended.py apply --doc 44443 --boundaries /app/logs/preseg_44443/boundaries_template.json
워크플로우:
1. list awaiting_split 문서 확인.
2. export 문서 통계·hier 개요·자동 pack 유닛 제안·초과 섹션 본문 덤프·boundaries
템플릿 JSON 생성. 유인 클로드 세션은 파일들만 읽고 TODO 스팬을
CAP 이하 경계들로 분할해 템플릿을 완성한다.
3. apply 완성된 boundaries 검증(단조·비중첩·범위·커버리지 90%+·유닛 )
payload.presegment.units_override 기록 + awaiting_split 해제 +
deferred_until 제거(즉시 재개). 워커가 다음 사이클에 map-reduce 재개.
stdout 규약: 사람이 읽는 요약 + '{' 시작하는 기계 파싱용 JSON 라인(1 1라인).
사람용 행은 절대 '{' 시작하지 않는다.
"""
from __future__ import annotations
import argparse
import asyncio
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
from sqlalchemy import text as sql_text # noqa: E402
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine # noqa: E402
from services.hier_decomp.builder import build_hier_tree # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
OVERRIDE_MIN_COVERAGE_PCT,
TRIGGER_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
validate_override_boundaries,
)
# 초과 섹션 본문 덤프 분할 단위 (유인 세션 컨텍스트 보호)
DUMP_CHUNK_CHARS = 200_000
def _jsonl(obj: dict) -> None:
"""기계 파싱용 JSON 라인 — 반드시 '{' 로 시작하는 단독 라인."""
print(json.dumps(obj, ensure_ascii=False, default=str))
def _session_factory():
database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
)
engine = create_async_engine(database_url)
return engine, async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
# ─── list ────────────────────────────────────────────────────────────────────
LIST_SQL = """
SELECT q.id AS queue_id, q.document_id, q.status, q.attempts,
q.payload::text AS payload_text,
LEFT(COALESCE(d.title, '(제목 없음)'), 80) AS title
FROM processing_queue q
JOIN documents d ON d.id = q.document_id
WHERE q.stage = 'deep_summary'
AND q.status IN ('pending', 'processing', 'failed')
AND (q.payload -> 'presegment' ->> 'awaiting_split') = 'true'
ORDER BY q.id
"""
async def cmd_list() -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
rows = (await session.execute(sql_text(LIST_SQL))).mappings().all()
finally:
await engine.dispose()
print(f"awaiting_split 보류 문서 {len(rows)}")
for r in rows:
payload = json.loads(r["payload_text"] or "{}")
preseg = payload.get("presegment") or {}
oversized = preseg.get("oversized_sections") or []
print(
f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']} "
f"tier={preseg.get('tier')} over%={preseg.get('over_pct')} "
f"tokens={preseg.get('total_est_tokens'):,} units={preseg.get('units')}"
if isinstance(preseg.get("total_est_tokens"), int)
else f" doc {r['document_id']} [{r['title']}] queue={r['queue_id']} status={r['status']}"
)
print(f" 초과 섹션 {len(oversized)}건: {', '.join(str(t) for t in oversized[:3] if t)}")
print(
f" 보류 알람={preseg.get('alerted_at') or '-'} / "
f"재개 예정={payload.get('deferred_until') or '(즉시)'}"
+ (f" / 거부 사유={preseg.get('override_rejected')}" if preseg.get("override_rejected") else "")
)
_jsonl({
"cmd": "list",
"queue_id": r["queue_id"],
"doc_id": r["document_id"],
"title": r["title"],
"status": r["status"],
"tier": preseg.get("tier"),
"over_pct": preseg.get("over_pct"),
"total_est_tokens": preseg.get("total_est_tokens"),
"units": preseg.get("units"),
"oversized_sections": oversized,
"alerted_at": preseg.get("alerted_at"),
"deferred_until": payload.get("deferred_until"),
"override_rejected": preseg.get("override_rejected"),
})
return 0
# ─── export ──────────────────────────────────────────────────────────────────
def _safe_name(title: str | None, fallback: str) -> str:
t = re.sub(r"[^0-9A-Za-z가-힣._-]+", "_", (title or fallback)).strip("_")
return (t or fallback)[:60]
def _build_outline(text: str) -> str:
"""hier_decomp builder 재사용 — window-split 억제(요약 계획과 동일 환경) 개요."""
nodes = build_hier_tree(text, leaf_target_max=sys.maxsize, leaf_hard_max=sys.maxsize)
lines = []
for n in nodes:
indent = " " * max(n.level, 0)
title = n.section_title or "(preamble)"
tok = estimate_tokens(n.text)
mark = " [CAP 초과]" if n.is_leaf and tok > CAP_TOKENS else ""
lines.append(f"{indent}- L{n.level} {title}{tok:,} tok{mark}")
return "\n".join(lines)
async def cmd_export(doc_id: int, out_dir: str) -> int:
engine, factory = _session_factory()
try:
async with factory() as session:
row = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
finally:
await engine.dispose()
if not row:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
source, text = choose_override_source(row["md_content"], row["extracted_text"])
if not text.strip():
print(f"[error] 문서 id={doc_id} 본문 비어있음 (md_content/extracted_text 둘 다)")
_jsonl({"cmd": "export", "ok": False, "doc_id": doc_id, "error": "empty_text"})
return 1
plan = plan_summarize_units(text)
leaves = extract_leaves(text)
spans = leaf_spans(text, leaves)
out = Path(out_dir)
out.mkdir(parents=True, exist_ok=True)
files: list[str] = []
now_iso = datetime.now(timezone.utc).isoformat(timespec="seconds")
# ① 통계 + hier 개요
oversized_units = [u for u in plan.units if u.over_cap]
overview = [
f"# presegment export — doc {doc_id}",
"",
f"- 제목: {row['title'] or '(제목 없음)'}",
f"- source: {source} (len={len(text):,}자, 오프셋 기준 텍스트)",
f"- 추정 토큰: {plan.total_est_tokens:,} (trigger={TRIGGER_TOKENS:,} / cap={CAP_TOKENS:,})",
f"- plan: mode={plan.mode} tier={plan.tier} over%={plan.over_pct}",
f"- 유닛: 자동 pack {len(plan.units) - len(oversized_units)}개 + CAP 초과 {len(oversized_units)}",
f"- 생성: {now_iso}",
"",
"## 유닛 제안 (summarize_units greedy-pack)",
"",
]
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
titles = " · ".join(t for t in u.section_titles if t) or "(무제 구간)"
flag = " ★CAP 초과 — 분할 필요" if u.over_cap else ""
overview.append(f"- 유닛 {u.index}: [{s}, {e}) {u.est_tokens:,} tok — {titles[:120]}{flag}")
overview += ["", "## hier 개요", "", _build_outline(text), ""]
(out / "overview.md").write_text("\n".join(overview), encoding="utf-8")
files.append("overview.md")
# ③ 초과 섹션 본문 덤프 (섹션당 파일 · 200K자 단위 분할 · 파일명에 절대 스팬)
boundaries: list[dict] = []
for u in plan.units:
if not u.leaf_indexes:
continue
s = spans[u.leaf_indexes[0]][0]
e = spans[u.leaf_indexes[-1]][1]
title = next((t for t in u.section_titles if t), None)
if not u.over_cap:
# ④ 자동 pack 유닛은 템플릿에 채워둔다
boundaries.append({"start": s, "end": e, "title": title or f"유닛 {u.index}"})
continue
boundaries.append({
"start": s, "end": e, "title": title or f"유닛 {u.index}",
"todo": (
f"CAP 초과({u.est_tokens:,} tok > {CAP_TOKENS:,}) — 이 스팬을 cap 이하 "
"경계 여러 개로 교체하고 todo 키를 제거할 것"
),
})
seg = text[s:e]
base = f"oversized_{u.index:03d}_{_safe_name(title, f'unit{u.index}')}"
for k in range(0, len(seg), DUMP_CHUNK_CHARS):
cs, ce = s + k, s + min(k + DUMP_CHUNK_CHARS, len(seg))
fname = f"{base}.{cs}_{ce}.md"
# 본문 원문 그대로 (헤더 미부착 — 파일 내 로컬 오프셋 + 파일명 cs 로 절대 오프셋 계산)
(out / fname).write_text(text[cs:ce], encoding="utf-8")
files.append(fname)
# ④ boundaries 템플릿 JSON
template = {
"doc_id": doc_id,
"source": source,
"source_len": len(text),
"cap_tokens": CAP_TOKENS,
"generated_at": now_iso,
"boundaries": boundaries,
}
(out / "boundaries_template.json").write_text(
json.dumps(template, ensure_ascii=False, indent=2), encoding="utf-8"
)
files.append("boundaries_template.json")
# 유인 세션용 작업 안내
readme = f"""# doc {doc_id} 유인 분할 안내
1. overview.md 구조 파악 (유닛 제안 + hier 개요).
2. oversized_*.md 본문을 읽고 의미 경계를 정한다.
- 파일명 `..._<cs>_<ce>.md` cs = 파일 문자의 절대 오프셋.
- 절대 오프셋 = cs + 파일 로컬 오프셋.
3. boundaries_template.json todo 항목을 cap({CAP_TOKENS:,} tok) 이하 경계 여러 개로
교체하고 todo 키를 제거한다. 나머지 자동 pack 항목은 그대로 둬도 된다.
- 토큰 추정: 한글 0.529 tok/ · 기타 0.217 tok/ (services/summarize_units.py).
- 규칙: start 단조증가 · 비중첩 · 전체 커버리지 {OVERRIDE_MIN_COVERAGE_PCT:.0f}%+ · 유닛당 cap 이하.
4. 검증/적용:
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE> --dry-run
python /app/scripts/presegment_attended.py apply --doc {doc_id} --boundaries <FILE>
"""
(out / "README.md").write_text(readme, encoding="utf-8")
files.append("README.md")
print(f"doc {doc_id} [{row['title'] or '(제목 없음)'}] export 완료 → {out}")
print(
f" source={source} len={len(text):,}자 tokens={plan.total_est_tokens:,} "
f"tier={plan.tier} over%={plan.over_pct}"
)
print(f" 자동 pack 유닛 {len(plan.units) - len(oversized_units)}개 / TODO(초과) {len(oversized_units)}")
print(f" 파일 {len(files)}개: {', '.join(files[:6])}{' ...' if len(files) > 6 else ''}")
_jsonl({
"cmd": "export", "ok": True, "doc_id": doc_id, "out": str(out),
"source": source, "source_len": len(text),
"total_est_tokens": plan.total_est_tokens, "tier": plan.tier,
"over_pct": plan.over_pct,
"units_auto": len(plan.units) - len(oversized_units),
"units_todo": len(oversized_units),
"files": files,
})
return 0
# ─── apply ───────────────────────────────────────────────────────────────────
QUEUE_ROW_SQL = """
SELECT id, status, attempts, payload::text AS payload_text
FROM processing_queue
WHERE document_id = :d AND stage = 'deep_summary'
AND status IN ('pending', 'processing', 'failed')
ORDER BY id DESC
LIMIT 1
"""
APPLY_UPDATE_SQL = """
UPDATE processing_queue
SET payload = CAST(:payload AS JSONB),
status = 'pending',
attempts = 0,
error_message = NULL
WHERE id = :qid
"""
async def cmd_apply(doc_id: int, boundaries_file: str, dry_run: bool) -> int:
raw = json.loads(Path(boundaries_file).read_text(encoding="utf-8"))
if isinstance(raw, dict):
boundaries = raw.get("boundaries") or []
declared_source = raw.get("source")
declared_len = raw.get("source_len")
if raw.get("doc_id") not in (None, doc_id):
print(f"[error] boundaries 파일 doc_id={raw.get('doc_id')} != --doc {doc_id}")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "doc_id_mismatch"})
return 1
else:
boundaries, declared_source, declared_len = raw, None, None
engine, factory = _session_factory()
try:
async with factory() as session:
doc = (await session.execute(
sql_text(
"SELECT id, title, md_content, extracted_text FROM documents WHERE id = :d"
),
{"d": doc_id},
)).mappings().first()
if not doc:
print(f"[error] 문서 id={doc_id} 없음")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "document_not_found"})
return 1
qrow = (await session.execute(sql_text(QUEUE_ROW_SQL), {"d": doc_id})).mappings().first()
if not qrow:
print(f"[error] doc {doc_id} 의 활성 deep_summary 큐 행 없음 (pending/processing/failed)")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_row_not_found"})
return 1
if qrow["status"] == "processing":
print(f"[error] queue {qrow['id']} 가 processing 중 — 워커 완료/보류 후 재시도")
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "queue_processing"})
return 1
# 오프셋 기준 텍스트 — export 와 동일 규칙 (파일에 source 선언 시 그 선언 우선)
if declared_source in ("md_content", "extracted_text"):
source = declared_source
text = (doc["md_content"] if source == "md_content" else doc["extracted_text"]) or ""
else:
source, text = choose_override_source(doc["md_content"], doc["extracted_text"])
if declared_len is not None and declared_len != len(text):
print(
f"[error] source_len 불일치 — 파일={declared_len:,} vs 현재 {source}={len(text):,}"
" (본문 재생성됨 — export 부터 재실행)"
)
_jsonl({"cmd": "apply", "ok": False, "doc_id": doc_id, "error": "source_len_mismatch"})
return 1
check = validate_override_boundaries(text, boundaries)
for w in check.warnings:
print(f" [warn] {w}")
if not check.ok:
print(f"[error] 경계 검증 실패 — {len(check.errors)}건:")
for e in check.errors:
print(f" - {e}")
_jsonl({
"cmd": "apply", "ok": False, "doc_id": doc_id,
"error": "validation_failed", "errors": check.errors,
"warnings": check.warnings, "coverage_pct": check.coverage_pct,
})
return 1
print(
f"doc {doc_id} [{doc['title'] or '(제목 없음)'}] 경계 검증 통과 — "
f"유닛 {len(check.boundaries)}개 / 커버리지 {check.coverage_pct}% / "
f"최대 유닛 {max(check.unit_tokens):,} tok (cap {CAP_TOKENS:,})"
)
for i, ((s, e, t), tok) in enumerate(zip(check.boundaries, check.unit_tokens)):
print(f" 유닛 {i}: [{s}, {e}) {tok:,} tok — {t or '(무제)'}")
payload = json.loads(qrow["payload_text"] or "{}")
preseg = dict(payload.get("presegment") or {})
preseg["units_override"] = {
"source": source,
"source_len": len(text),
"boundaries": [[s, e, t] for s, e, t in check.boundaries],
"applied_at": datetime.now(timezone.utc).isoformat(timespec="seconds"),
}
preseg["awaiting_split"] = False
# 알람 dedupe 리셋(다음 이벤트는 신선하게 발화) + 이전 거부/맵 잔재 제거
for k in ("alerted_at", "override_rejected", "override_rejected_at", "map_results"):
preseg.pop(k, None)
payload["presegment"] = preseg
payload.pop("deferred_until", None) # 즉시 재개
if dry_run:
print(f" [dry-run] queue {qrow['id']} 미변경 — 위 경계로 적용 가능")
_jsonl({
"cmd": "apply", "ok": True, "dry_run": True, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
await session.execute(
sql_text(APPLY_UPDATE_SQL),
{"payload": json.dumps(payload, ensure_ascii=False), "qid": qrow["id"]},
)
await session.commit()
print(
f" 적용 완료 — queue {qrow['id']} status=pending, deferred_until 제거 "
f"(다음 queue_consumer 사이클에 재개)"
)
_jsonl({
"cmd": "apply", "ok": True, "dry_run": False, "doc_id": doc_id,
"queue_id": qrow["id"], "units": len(check.boundaries),
"coverage_pct": check.coverage_pct, "unit_tokens": check.unit_tokens,
})
return 0
finally:
await engine.dispose()
# ─── main ────────────────────────────────────────────────────────────────────
def main() -> int:
parser = argparse.ArgumentParser(description="presegment 유인 분할 CLI (PR3)")
sub = parser.add_subparsers(dest="cmd", required=True)
sub.add_parser("list", help="awaiting_split 보류 문서 목록")
p_export = sub.add_parser("export", help="유인 분할 작업 패키지 덤프")
p_export.add_argument("--doc", type=int, required=True)
p_export.add_argument("--out", default=None, help="출력 디렉토리 (기본 ./preseg_export_<doc>)")
p_apply = sub.add_parser("apply", help="완성된 boundaries 검증·적용(재개)")
p_apply.add_argument("--doc", type=int, required=True)
p_apply.add_argument("--boundaries", required=True, help="boundaries JSON 파일 경로")
p_apply.add_argument("--dry-run", action="store_true", help="검증만 하고 DB 미변경")
args = parser.parse_args()
if args.cmd == "list":
return asyncio.run(cmd_list())
if args.cmd == "export":
out = args.out or f"./preseg_export_{args.doc}"
return asyncio.run(cmd_export(args.doc, out))
if args.cmd == "apply":
return asyncio.run(cmd_apply(args.doc, args.boundaries, args.dry_run))
return 2
if __name__ == "__main__":
sys.exit(main())
+486
View File
@@ -0,0 +1,486 @@
"""presegment PR3 — HOLD 알람·units_override 재개·유인 분할 경계 검증 단위테스트.
test_deep_summary_mapreduce.py (PR2) 선례를 따르는 seam 단위 검증:
- services.alerts.send_alert: env 미설정 no-op(프로세스당 1 로그)·synochat/ntfy
포맷·실패 절대 raise 금지 (webhook 전부 fake 실호출 0).
- _hold_awaiting_split: 알람 발화 + alerted_at dedupe(7).
- validate_override_boundaries: 정상/중첩/캡초과/커버리지 부족/TODO 잔존/범위 .
- process(): units_override 존재 tier 판정(plan_summarize_units) 우회
map-reduce 재개 / 잘못된 override -HOLD + 알람 / override 없는 문서 무회귀.
"""
from __future__ import annotations
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from types import SimpleNamespace
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import httpx # noqa: E402
import services.alerts as alerts # noqa: E402
from ai.envelope import EscalationEnvelope # noqa: E402
from models.queue import StageDeferred # noqa: E402
from services.summarize_units import ( # noqa: E402
CAP_TOKENS,
choose_override_source,
estimate_tokens,
extract_leaves,
leaf_spans,
plan_summarize_units,
units_from_boundaries,
validate_override_boundaries,
)
import workers.deep_summary_worker as dsw # noqa: E402
# ─── fixtures (PR2 테스트와 동일 계열) ───────────────────────────────────────
# 헤딩 1개 + 한글 60,000자 단일 섹션 ≈ 31.7K tok (> CAP) → over% 100 → whole (HOLD 대상)
GIANT_WHOLE_MD = "# 통짜\n" + ("" * 60_000)
# trigger(25K tok) 이하 소형 문서 — 기존 단일콜 경로 (무회귀 확인용)
SMALL_MD = "# 소형\n" + ("" * 2_000)
MAP_JSON = (
'{"mode": "single", "tldr": "유닛 요약", "detail": "유닛 상세.",'
' "inconsistencies": [], "confidence": 0.9}'
)
REDUCE_JSON = (
'{"mode": "single", "tldr": "전체 요약", "detail": "최종 상세.",'
' "inconsistencies": [], "confidence": 0.8}'
)
class FakeSession:
def __init__(self, row=None):
self.commits = 0
self._row = row
async def commit(self):
self.commits += 1
class FakeProcSession(FakeSession):
"""process() 레벨 — session.get(Document) + execute(select queue_row) fake."""
def __init__(self, doc, row):
super().__init__(row)
self._doc = doc
async def get(self, model, pk):
return self._doc
async def execute(self, stmt):
row = self._row
return SimpleNamespace(scalar_one_or_none=lambda: row)
class FakeClient:
"""deep 슬롯 보유 — call_deep_or_defer 가 call_deep 을 탄다 (PR2 테스트 동일)."""
def __init__(self):
self.ai = SimpleNamespace(
deep=SimpleNamespace(model="qwen-macbook", context_char_limit=260_000)
)
self.prompts: list[str] = []
async def call_deep(self, prompt: str, system=None) -> str:
self.prompts.append(prompt)
if "유닛 요약 (총" in prompt: # reduce 프롬프트 마커
return REDUCE_JSON
return MAP_JSON
async def close(self):
pass
def _doc(text: str = GIANT_WHOLE_MD, md_content: str | None = None):
return SimpleNamespace(
id=999,
title="테스트 문서",
extracted_text=text,
md_content=md_content,
ai_detail_summary=None,
ai_inconsistencies=None,
ai_analysis_tier="triage",
ai_processed_at=None,
)
def _envelope_raw():
return {
"from_stage": "classify",
"escalation_reasons": ["long_context"],
"risk_flags": [],
"distilled_context": "4B 요지",
"original_pointers": {"doc_ids": [999]},
}
def _envelope():
return EscalationEnvelope.from_json(json.dumps(_envelope_raw()))
@pytest.fixture
def _patch_telemetry(monkeypatch):
events: list[dict] = []
async def fake_record(**kwargs):
events.append(kwargs)
monkeypatch.setattr(dsw, "record_analyze_event", fake_record)
return events
@pytest.fixture
def _alert_recorder(monkeypatch):
calls: list[tuple[str, str]] = []
async def fake_alert(title: str, message: str) -> bool:
calls.append((title, message))
return True
monkeypatch.setattr(dsw, "send_alert", fake_alert)
return calls
# ─── A. services.alerts ──────────────────────────────────────────────────────
class _FakeHttpxClient:
"""httpx.AsyncClient 대체 — post 호출 캡처. 실호출 0."""
captured: list[dict] = []
fail_with: Exception | None = None
status_code = 200
def __init__(self, timeout=None):
self.timeout = timeout
async def __aenter__(self):
return self
async def __aexit__(self, *exc):
return False
async def post(self, url, **kwargs):
if _FakeHttpxClient.fail_with is not None:
raise _FakeHttpxClient.fail_with
_FakeHttpxClient.captured.append({"url": url, **kwargs})
return SimpleNamespace(status_code=_FakeHttpxClient.status_code, text="ok")
@pytest.fixture
def _fake_httpx(monkeypatch):
_FakeHttpxClient.captured = []
_FakeHttpxClient.fail_with = None
_FakeHttpxClient.status_code = 200
monkeypatch.setattr(alerts.httpx, "AsyncClient", _FakeHttpxClient)
return _FakeHttpxClient
@pytest.mark.asyncio
async def test_send_alert_noop_without_env(monkeypatch, _fake_httpx):
monkeypatch.delenv("ALERT_WEBHOOK_URL", raising=False)
monkeypatch.setattr(alerts, "_noop_logged", False)
infos: list[str] = []
monkeypatch.setattr(alerts.logger, "info", lambda msg, *a, **k: infos.append(msg))
assert await alerts.send_alert("t", "m") is False
assert await alerts.send_alert("t", "m") is False
assert len(infos) == 1 # 프로세스당 1회만 로그
assert _fake_httpx.captured == [] # webhook 미호출
@pytest.mark.asyncio
async def test_send_alert_synochat_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
monkeypatch.delenv("ALERT_WEBHOOK_KIND", raising=False) # 기본 = synochat
assert await alerts.send_alert("제목", "본문 줄1\n줄2") is True
assert len(_fake_httpx.captured) == 1
call = _fake_httpx.captured[0]
assert call["url"] == "http://chat.local/webhook"
payload = json.loads(call["data"]["payload"])
assert payload == {"text": "제목\n본문 줄1\n줄2"}
@pytest.mark.asyncio
async def test_send_alert_ntfy_format(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://ntfy.local/ds")
monkeypatch.setenv("ALERT_WEBHOOK_KIND", "ntfy")
assert await alerts.send_alert("한글 제목", "알람 본문") is True
call = _fake_httpx.captured[0]
# 한글 제목은 헤더(latin-1 한정) 대신 query param
assert call["params"] == {"title": "한글 제목"}
assert call["content"] == "알람 본문".encode("utf-8")
@pytest.mark.asyncio
async def test_send_alert_failure_never_raises(monkeypatch, _fake_httpx):
monkeypatch.setenv("ALERT_WEBHOOK_URL", "http://chat.local/webhook")
_fake_httpx.fail_with = httpx.ConnectError("down")
assert await alerts.send_alert("t", "m") is False # raise 없이 False
_fake_httpx.fail_with = None
_fake_httpx.status_code = 500
assert await alerts.send_alert("t", "m") is False # HTTP 5xx 도 False
# ─── B. HOLD 알람 + alerted_at dedupe ───────────────────────────────────────
@pytest.mark.asyncio
async def test_hold_alerts_once_then_dedupes(_alert_recorder):
plan = plan_summarize_units(GIANT_WHOLE_MD)
assert plan.tier == "whole"
row = SimpleNamespace(payload={"envelope": {"x": 1}})
session = FakeSession()
# 1차 HOLD — 알람 발화 + alerted_at 기록
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
title, message = _alert_recorder[0]
assert "999" in title
assert "테스트 문서" in message and "whole" in message
assert f"export --doc 999" in message # 유인 분할 힌트
alerted_at = row.payload["presegment"]["alerted_at"]
assert datetime.fromisoformat(alerted_at)
# 2차 재보류(24h 후 재계획 시나리오) — 7일 이내 → 재알람 억제
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 1
assert row.payload["presegment"]["alerted_at"] == alerted_at # 미갱신
# 3차 — alerted_at 이 8일 전이면 재발화 + 갱신
stale = (datetime.now(timezone.utc) - timedelta(days=8)).isoformat()
payload = dict(row.payload)
payload["presegment"] = {**payload["presegment"], "alerted_at": stale}
row.payload = payload
with pytest.raises(StageDeferred):
await dsw._hold_awaiting_split(session, row, plan, 999, doc_title="테스트 문서")
assert len(_alert_recorder) == 2
assert row.payload["presegment"]["alerted_at"] != stale
# ─── C. validate_override_boundaries ────────────────────────────────────────
def test_validate_ok_full_coverage():
text = "" * 40_000 # ≈ 21,160 tok
bounds = [[0, 20_000, "전반"], [20_000, 40_000, "후반"]]
check = validate_override_boundaries(text, bounds)
assert check.ok and check.errors == []
assert check.coverage_pct == 100.0
assert check.boundaries == [(0, 20_000, "전반"), (20_000, 40_000, "후반")]
assert all(t <= CAP_TOKENS for t in check.unit_tokens)
def test_validate_dict_form_and_units_from_boundaries():
text = "" * 10_000
bounds = [{"start": 0, "end": 5_000, "title": "a"}, {"start": 5_000, "end": 10_000, "title": "b"}]
check = validate_override_boundaries(text, bounds)
assert check.ok
units = units_from_boundaries(text, check.boundaries)
assert [u.index for u in units] == [0, 1]
assert units[0].text == text[0:5_000]
assert units[0].section_titles == ["a"]
assert units[0].est_tokens == estimate_tokens(text[0:5_000])
def test_validate_rejects_overlap():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 6_000, "a"], [5_000, 10_000, "b"]])
assert not check.ok
assert any("중첩" in e for e in check.errors)
def test_validate_rejects_cap_exceed():
text = "" * 40_000 # 단일 유닛 ≈ 21,160 tok > CAP 12,000
check = validate_override_boundaries(text, [[0, 40_000, "통짜"]])
assert not check.ok
assert any("cap" in e and "유닛 0" in e for e in check.errors) # 어느 유닛인지 명시
def test_validate_rejects_low_coverage():
text = "" * 10_000
check = validate_override_boundaries(text, [[0, 1_000, "머리만"]])
assert not check.ok
assert any("커버리지" in e for e in check.errors)
def test_validate_rejects_out_of_range_and_todo():
text = "" * 1_000
check = validate_override_boundaries(text, [[0, 2_000, ""]])
assert not check.ok and any("범위 밖" in e for e in check.errors)
check2 = validate_override_boundaries(
text, [{"start": 0, "end": 1_000, "title": "t", "todo": "분할 필요"}]
)
assert not check2.ok and any("TODO" in e for e in check2.errors)
def test_validate_warns_on_gap_but_passes_coverage():
text = "" * 100_000
# 4% 공백 구간 — 커버리지 96% (>=90) 통과 + 경고
bounds = [[0, 20_000, "a"], [24_000, 100_000, None]]
# 24000~100000 = 76000자 ≈ 40,204 tok > CAP → cap 완화해 gap 경고만 검증
check = validate_override_boundaries(text, bounds, cap=50_000)
assert check.ok
assert any("공백 구간" in w for w in check.warnings)
def test_choose_override_source_prefers_md_content():
assert choose_override_source("# md 본문", "추출본") == ("md_content", "# md 본문")
assert choose_override_source(" \n", "추출본") == ("extracted_text", "추출본")
assert choose_override_source(None, None) == ("extracted_text", "")
def test_leaf_spans_reconstruct_source():
md = "# 절 1\n" + "" * 100 + "\n# 절 2\n" + "" * 100
leaves = extract_leaves(md)
spans = leaf_spans(md, leaves)
assert len(spans) == len(leaves)
for (s, e), leaf in zip(spans, leaves):
assert md[s:e] == leaf.text
# 인접 파티션 — 이어붙이면 원문 전체
assert spans[0][0] == 0 and spans[-1][1] == len(md)
# ─── D. units_override 재개 경로 (worker process 레벨) ──────────────────────
def _override_payload(text: str, n_units: int = 3, source: str = "extracted_text") -> dict:
step = len(text) // n_units
bounds = []
for i in range(n_units):
s = i * step
e = len(text) if i == n_units - 1 else (i + 1) * step
bounds.append([s, e, f"파트 {i + 1}"])
return {
"envelope": _envelope_raw(),
"subject_domain": "generic",
"presegment": {
"tier": "whole", "over_pct": 61.46, "awaiting_split": False,
"units_override": {
"source": source, "source_len": len(text), "boundaries": bounds,
},
},
}
@pytest.mark.asyncio
async def test_units_override_bypasses_tier_gate(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD) # override 없으면 whole → HOLD 였을 문서
row = SimpleNamespace(payload=_override_payload(GIANT_WHOLE_MD, n_units=3))
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
def _boom(*a, **k):
raise AssertionError("units_override 는 plan_summarize_units(tier 재판정)를 타면 안 됨")
monkeypatch.setattr(dsw, "plan_summarize_units", _boom)
await dsw.process(999, session)
# map 3 + reduce 1, 모든 콜 캡 준수 (오버헤드 = 정책 템플릿+envelope ~3K)
assert len(client.prompts) == 4
for p in client.prompts:
assert estimate_tokens(p) <= CAP_TOKENS + 3_000
assert doc.ai_detail_summary == "최종 상세."
assert doc.ai_analysis_tier == "deep"
preseg = row.payload["presegment"]
assert preseg["tier"] == "override"
assert preseg["over_pct"] == 61.46 # HOLD 당시 실측치 보존
assert len(preseg["map_results"]) == 3
assert preseg["units_override"]["boundaries"][0][2] == "파트 1" # override 보존(감사)
assert _alert_recorder == [] # 정상 재개 — 알람 없음
assert len(_patch_telemetry) == 1
@pytest.mark.asyncio
async def test_bad_override_reholds_and_alerts(monkeypatch, _patch_telemetry, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=1) # 단일 유닛 ≈ 31.7K tok > CAP*1.1
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
def _no_llm():
raise AssertionError("잘못된 override 는 LLM 콜(900s 재생산)로 흐르면 안 됨")
monkeypatch.setattr(dsw, "AIClient", _no_llm)
with pytest.raises(StageDeferred) as ei:
await dsw.process(999, session)
assert ei.value.retry_after_minutes == dsw.HOLD_RETRY_MINUTES
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True # 재-HOLD
assert "cap" in preseg["override_rejected"]
assert "units_override" in preseg # 원인 조사용 보존
assert len(_alert_recorder) == 1 # 거부 알람
assert "거부" in _alert_recorder[0][0]
assert doc.ai_detail_summary is None
assert _patch_telemetry == []
@pytest.mark.asyncio
async def test_override_source_len_mismatch_reholds(monkeypatch, _alert_recorder):
doc = _doc(GIANT_WHOLE_MD)
payload = _override_payload(GIANT_WHOLE_MD, n_units=3)
payload["presegment"]["units_override"]["source_len"] = 12_345 # 본문 재생성 시나리오
row = SimpleNamespace(payload=payload)
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
assert row.payload["presegment"]["awaiting_split"] is True
assert "source_len 불일치" in row.payload["presegment"]["override_rejected"]
assert len(_alert_recorder) == 1
@pytest.mark.asyncio
async def test_no_override_small_doc_keeps_single_call_path(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""무회귀 — units_override 없는 소형 문서는 기존 단일콜 경로 그대로."""
doc = _doc(SMALL_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
client = FakeClient()
monkeypatch.setattr(dsw, "AIClient", lambda: client)
await dsw.process(999, session)
assert len(client.prompts) == 1 # 단일콜 (map-reduce 아님)
assert SMALL_MD[:200].split("\n")[1][:50] in client.prompts[0] # 원문 슬라이스 포함
assert doc.ai_detail_summary == "유닛 상세."
assert "presegment" not in row.payload # payload 무변경
assert _alert_recorder == []
@pytest.mark.asyncio
async def test_no_override_whole_doc_still_holds_with_alert(
monkeypatch, _patch_telemetry, _alert_recorder
):
"""override 미주입 whole 문서 — 기존 HOLD 시멘틱 유지 + PR3 알람만 추가."""
doc = _doc(GIANT_WHOLE_MD)
row = SimpleNamespace(payload={"envelope": _envelope_raw(), "subject_domain": "generic"})
session = FakeProcSession(doc, row)
monkeypatch.setattr(dsw, "AIClient", lambda: (_ for _ in ()).throw(AssertionError("no LLM")))
with pytest.raises(StageDeferred):
await dsw.process(999, session)
preseg = row.payload["presegment"]
assert preseg["awaiting_split"] is True and preseg["tier"] == "whole"
assert len(_alert_recorder) == 1
assert "HOLD" in _alert_recorder[0][0]