c2077b3108
plan ds-presegment-mapreduce-2. TRIGGER(25K tok) 이하 = 기존 단일콜 byte-불변 무회귀. 초과 시 3-way over% 게이트: auto=유닛별 map(26B)→reduce(26B, p3c_deep_summary_reduce 변형) → ai_detail_summary 동일 기록(불일치=reduce+map 합본 dedup) / hybrid·whole= HOLD(payload.presegment.awaiting_split + StageDeferred 24h, 맥미니 미전송 — 알람· 클로드 유인 분할은 PR3). - 유닛 단위 멱등 재개: 성공 유닛 즉시 payload.map_results commit — 502/defer/재시작 후 완료 유닛 skip, 실패 유닛만 raise→기존 attempts/백오프 재사용 - 모든 LLM 콜 캡(12K tok) 이하 — map=greedy-pack 보장, reduce=build_reduce_units_block 비례 절단 보장, est_tokens 로그로 단정 가능 - 콜 사이 gate 해제 → 짧은 인터랙티브 요청 interleave (허브 굶김 해소 본체) - fix: summarize_units 의 `from app.services...` 절대 import — 컨테이너(빌드 컨텍스트 ./app)에 app 패키지가 없어 배선 시 ModuleNotFoundError 나는 PR1 잠복 버그 → 상대 import 로 수정 (컨테이너/repo-root 테스트 양쪽 동작) - tests: 헬퍼 6 + worker seam 5 (map-reduce e2e·재개·유닛실패·drain 보류·HOLD) — PR1 15 포함 26 passed, 인접 policy/hier_decomp/fair_share 123 passed Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
225 lines
8.5 KiB
Python
225 lines
8.5 KiB
Python
"""summarize_units — 거대문서 요약 전용 분할(map-reduce 유닛) 순수함수 (presegment PR1).
|
|
|
|
plan ds-presegment-mapreduce-2 (2026-06-29 설계 합의 · PR0 실측 봉인):
|
|
- CAP_TOKENS = 12,000 tok/unit — greedy-pack 상한 (PR0: giant 236건 실측 캘리브레이션)
|
|
- TRIGGER_TOKENS = 25,000 tok — 이하는 단일콜 유지, 초과 시 map-reduce
|
|
- 3-way over% 게이트 (단독 CAP 초과 섹션의 토큰 비중. 헤딩 개수는 무의미 — ASME 1,494개):
|
|
over% == 0 → 'auto' (TIER1: 로컬 자동 분할, PR0 실측 78%)
|
|
0 < over% <= 40 → 'hybrid' (패킹분 로컬 + 초과 섹션만 클로드, 8%)
|
|
over% > 40 → 'whole' (TIER2: 클로드 전체 분할, 14%)
|
|
- 토큰 추정 = PR0 실 Qwen 토크나이저 캘리브레이션: 한글 0.529 tok/char · 기타 0.217.
|
|
구 휴리스틱(0.625/0.25)은 ~15% 과대라 폐기.
|
|
|
|
불변식:
|
|
- 순수함수 — DB/네트워크/파일 접촉 0. 분할 = 요약 전용 아티팩트(문서 아님·검색/임베딩 미편입).
|
|
- leaf 추출 = hier_decomp.builder 재사용, leaf_hard_max=∞ 로 window-split 억제
|
|
(헤딩 leaf 만 — PR0 측정환경과 동일). 인접 섹션만 greedy-pack(순서 보존·중간 폐기 0
|
|
— 구 deep_summary 의 head/mid/tail 가운데 폐기 버그를 커버리지로 대체).
|
|
- 배선(deep_summary 분기·HOLD·클로드 알람)은 PR2/PR3 — 본 모듈은 계획만 산출.
|
|
|
|
호출: plan_summarize_units(md_text) -> UnitPlan
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
|
|
# 상대 import — 컨테이너(services.*)와 repo-root 테스트(app.services.*) 양쪽에서 동작.
|
|
# (구 `from app.services...` 절대 import 는 컨테이너에 app 패키지가 없어 ModuleNotFoundError —
|
|
# PR1 은 소비자 0 이라 잠복했던 버그, PR2 배선 시점에 수정.)
|
|
from .hier_decomp.builder import HierNode, build_hier_tree
|
|
|
|
CAP_TOKENS = 12_000
|
|
TRIGGER_TOKENS = 25_000
|
|
HYBRID_MAX_OVER_PCT = 40.0
|
|
|
|
# PR0 실 Qwen tokenizer 캘리브레이션 (tok/char)
|
|
KO_TOK_PER_CHAR = 0.529
|
|
OTHER_TOK_PER_CHAR = 0.217
|
|
|
|
_HANGUL_RANGES = (
|
|
(0xAC00, 0xD7A3), # 완성형 음절
|
|
(0x1100, 0x11FF), # 자모
|
|
(0x3130, 0x318F), # 호환 자모
|
|
)
|
|
|
|
|
|
def _is_hangul(ch: str) -> bool:
|
|
cp = ord(ch)
|
|
return any(lo <= cp <= hi for lo, hi in _HANGUL_RANGES)
|
|
|
|
|
|
def estimate_tokens(text: str) -> int:
|
|
"""PR0 캘리브레이션 기반 토큰 추정 (한글 0.529 · 기타 0.217 tok/char)."""
|
|
if not text:
|
|
return 0
|
|
ko = sum(1 for ch in text if _is_hangul(ch))
|
|
other = len(text) - ko
|
|
return round(ko * KO_TOK_PER_CHAR + other * OTHER_TOK_PER_CHAR)
|
|
|
|
|
|
@dataclass
|
|
class SummarizeUnit:
|
|
"""map-reduce 1유닛 — 인접 leaf 섹션들의 greedy-pack (요약 전용, 문서 아님)."""
|
|
index: int
|
|
section_titles: list[str | None] = field(default_factory=list)
|
|
text: str = ""
|
|
est_tokens: int = 0
|
|
over_cap: bool = False # 단독 섹션이 CAP 초과 (hybrid 시 클로드 대상)
|
|
|
|
|
|
@dataclass
|
|
class UnitPlan:
|
|
mode: str # 'single' | 'map_reduce'
|
|
tier: str | None # map_reduce 시 'auto' | 'hybrid' | 'whole'
|
|
total_est_tokens: int = 0
|
|
over_pct: float = 0.0
|
|
units: list[SummarizeUnit] = field(default_factory=list)
|
|
|
|
|
|
def extract_leaves(md_text: str) -> list[HierNode]:
|
|
"""헤딩 leaf 만 추출 — leaf_hard_max=∞ 로 window-split 억제 (PR0 측정환경 동일)."""
|
|
nodes = build_hier_tree(
|
|
md_text,
|
|
leaf_target_max=sys.maxsize,
|
|
leaf_hard_max=sys.maxsize,
|
|
)
|
|
return [n for n in nodes if n.is_leaf]
|
|
|
|
|
|
def greedy_pack(leaves: list[HierNode], cap: int = CAP_TOKENS) -> list[SummarizeUnit]:
|
|
"""인접 leaf 를 순서 보존하며 est_tokens<=cap 으로 pack. 단독 초과 leaf = 전용 유닛(over_cap)."""
|
|
units: list[SummarizeUnit] = []
|
|
cur_titles: list[str | None] = []
|
|
cur_texts: list[str] = []
|
|
cur_tokens = 0
|
|
|
|
def _flush() -> None:
|
|
nonlocal cur_titles, cur_texts, cur_tokens
|
|
if cur_texts:
|
|
units.append(SummarizeUnit(
|
|
index=len(units),
|
|
section_titles=cur_titles,
|
|
text="\n\n".join(cur_texts),
|
|
est_tokens=cur_tokens,
|
|
))
|
|
cur_titles, cur_texts, cur_tokens = [], [], 0
|
|
|
|
for leaf in leaves:
|
|
t = estimate_tokens(leaf.text)
|
|
if t > cap:
|
|
_flush()
|
|
units.append(SummarizeUnit(
|
|
index=len(units),
|
|
section_titles=[leaf.section_title],
|
|
text=leaf.text,
|
|
est_tokens=t,
|
|
over_cap=True,
|
|
))
|
|
continue
|
|
if cur_tokens + t > cap:
|
|
_flush()
|
|
cur_titles.append(leaf.section_title)
|
|
cur_texts.append(leaf.text)
|
|
cur_tokens += t
|
|
_flush()
|
|
return units
|
|
|
|
|
|
def over_pct(leaves: list[HierNode], cap: int = CAP_TOKENS) -> float:
|
|
"""단독 CAP 초과 섹션들의 토큰 비중(%) — 3-way 게이트 입력."""
|
|
total = 0
|
|
over = 0
|
|
for leaf in leaves:
|
|
t = estimate_tokens(leaf.text)
|
|
total += t
|
|
if t > cap:
|
|
over += t
|
|
if total == 0:
|
|
return 0.0
|
|
return over * 100.0 / total
|
|
|
|
|
|
def gate(over: float) -> str:
|
|
"""over% → tier. 0=auto / (0,40]=hybrid / >40=whole. 클로드 결과 재검증에도 재사용."""
|
|
if over <= 0.0:
|
|
return "auto"
|
|
if over <= HYBRID_MAX_OVER_PCT:
|
|
return "hybrid"
|
|
return "whole"
|
|
|
|
|
|
def plan_summarize_units(
|
|
md_text: str, *,
|
|
cap: int = CAP_TOKENS,
|
|
trigger: int = TRIGGER_TOKENS,
|
|
) -> UnitPlan:
|
|
"""문서 → 요약 실행 계획. trigger 이하=single(현행 단일콜), 초과=map_reduce(tier+units)."""
|
|
total = estimate_tokens(md_text)
|
|
if total <= trigger:
|
|
return UnitPlan(mode="single", tier=None, total_est_tokens=total)
|
|
leaves = extract_leaves(md_text)
|
|
pct = over_pct(leaves, cap)
|
|
return UnitPlan(
|
|
mode="map_reduce",
|
|
tier=gate(pct),
|
|
total_est_tokens=total,
|
|
over_pct=round(pct, 2),
|
|
units=greedy_pack(leaves, cap),
|
|
)
|
|
|
|
|
|
# ─── PR2 — map/reduce 프롬프트 조립 순수함수 (deep_summary_worker 가 소비) ───
|
|
|
|
def render_map_slice(unit: SummarizeUnit, total_units: int) -> str:
|
|
"""map 콜의 {original_text_slices} 대체 — 유닛 위치·섹션 라벨 + 본문."""
|
|
titles = " · ".join(t for t in unit.section_titles if t) or "(무제 구간)"
|
|
return f"[유닛 {unit.index + 1}/{total_units} — 섹션: {titles}]\n{unit.text}"
|
|
|
|
|
|
def _format_unit_summary(res: dict, total_units: int) -> str:
|
|
"""map 결과 1건 → reduce 입력 블록. res 키 = index/titles/tldr/detail/inconsistencies."""
|
|
titles = " · ".join(t for t in (res.get("titles") or []) if t) or "(무제 구간)"
|
|
lines = [f"[유닛 {int(res.get('index', 0)) + 1}/{total_units} — 섹션: {titles}]"]
|
|
if res.get("tldr"):
|
|
lines.append(f"TLDR: {res['tldr']}")
|
|
if res.get("detail"):
|
|
lines.append(str(res["detail"]))
|
|
for inc in res.get("inconsistencies") or []:
|
|
if isinstance(inc, dict):
|
|
lines.append(f"불일치({inc.get('kind', '')}): {inc.get('desc', '')}")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def build_reduce_units_block(
|
|
results: list[dict],
|
|
budget_tokens: int,
|
|
*,
|
|
min_detail_chars: int = 200,
|
|
) -> tuple[str, bool]:
|
|
"""reduce 입력 블록 조립 — budget_tokens 이하 보장(캡 초과 0 검증 게이트의 reduce 측).
|
|
|
|
초과 시 detail 만 비례 절단(라벨·TLDR·불일치 보전, 원문 순서 유지). 반환 (block, truncated).
|
|
"""
|
|
total_units = len(results)
|
|
work = [dict(r) for r in results]
|
|
truncated = False
|
|
for _ in range(4):
|
|
block = "\n\n".join(_format_unit_summary(r, total_units) for r in work)
|
|
est = estimate_tokens(block)
|
|
if est <= budget_tokens:
|
|
return block, truncated
|
|
ratio = budget_tokens / est
|
|
for r in work:
|
|
detail = str(r.get("detail") or "")
|
|
keep = max(min_detail_chars, int(len(detail) * ratio * 0.9))
|
|
if len(detail) > keep:
|
|
r["detail"] = detail[:keep] + "…(절단)"
|
|
truncated = True
|
|
# 최후 방어 — 비례 절단이 floor(min_detail_chars)에 막히면 문자 하드 컷(KO 최악 비율 가정)
|
|
block = "\n\n".join(_format_unit_summary(r, total_units) for r in work)
|
|
if estimate_tokens(block) > budget_tokens:
|
|
block = block[: max(1, int(budget_tokens / KO_TOK_PER_CHAR))]
|
|
truncated = True
|
|
return block, truncated
|