feat(search): rule hierarchy builder (Hier-Decomp-1 c3)
순수 함수 build_hier_tree(text) → heading 경계 segment 트리 (DB 미접근, c4 에서 insert).
- 경계 규칙: ATX 마크다운(#{1,6}) > 한국 제N장/절/조 > 영문 Chapter/Section/Article.
- segment = heading + 다음 heading 전까지 본문 (disjoint, 100% 커버). parent/level = heading 깊이 정규화 트리.
- 과대 own-text(>HARD_MAX 5000) = 무overlap window 분해(자식 유무 무관), 부모 is_leaf=false(heading 마커, 코퍼스 제외).
- 구조 전용 heading(자식 보유 + own body<30자) = is_leaf=false. is_leaf = replace 코퍼스 편입 대상.
dry-run G2 (insert 없음, 5 pilot + headingless):
- 5140/5186/5225/5151/5124 md_content: coverage 0.9993~1.0, dup_hash 0, empty 0, dangling 0, bad_level 0, leaf_max<=4973(<5000).
- 5152 headingless extracted_text(238k): window 89 leaf, coverage 1.0, dup 0, leaf_max 3000.
관찰: tiny heading-only leaf(7~19자) 잔존(무해, tuning 후보).
plan: hierarchical-decomposition-tiered-nesting-marmot.md
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,220 @@
|
||||
"""Hierarchical decomposition rule builder (PR-DocSrv-Hierarchical-Decomposition-1 c3).
|
||||
|
||||
텍스트(주로 md_content 마크다운) → heading 경계 segment 트리.
|
||||
- 규칙 우선 경계 탐지: ATX 마크다운(#{1,6}) > 한국 구조(제N장/절/조) > 영문(Chapter/Section/Article).
|
||||
- 각 segment = heading 라인 + 다음 heading 전까지 본문 (서로 disjoint, 100% 커버).
|
||||
- parent/level = heading 깊이 기반 네비 트리. preamble(첫 heading 이전) = level 0 root 직속.
|
||||
- 과대 segment(>LEAF_HARD_MAX, 더 깊은 heading 없음) = window fallback: 본문을 무overlap
|
||||
window 로 분해해 child leaf 생성, 부모는 is_leaf=false(heading 만 보유, 코퍼스 제외).
|
||||
- is_leaf = 코퍼스 편입 대상 (replace predicate). window-split 부모만 false.
|
||||
|
||||
순수 함수 — DB 미접근. c4 에서 이 트리를 document_chunks 에 insert(parent_id 해소).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import re
|
||||
import hashlib
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
STRUCTURE_SPLIT_THRESHOLD = 4000
|
||||
LEAF_TARGET_MAX = 3000
|
||||
LEAF_HARD_MAX = 5000
|
||||
MAX_DEPTH = 6
|
||||
|
||||
# 경계 패턴 (우선순위 순). group 'title' = 표시용, level 은 매처가 결정.
|
||||
_ATX = re.compile(r'^(#{1,6})\s+(?P<title>\S.*?)\s*#*\s*$')
|
||||
_KO_JANG = re.compile(r'^\s*(?P<title>제\s*\d+\s*장\b.*)$')
|
||||
_KO_JEOL = re.compile(r'^\s*(?P<title>제\s*\d+\s*절\b.*)$')
|
||||
_KO_JO = re.compile(r'^\s*(?P<title>제\s*\d+\s*조\b.*)$')
|
||||
_ENG = re.compile(r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+[\dIVXLA-Z]+\b.*)$')
|
||||
|
||||
|
||||
@dataclass
|
||||
class HierNode:
|
||||
idx: int
|
||||
parent_idx: int | None
|
||||
level: int
|
||||
node_type: str | None
|
||||
section_title: str | None
|
||||
heading_path: str | None
|
||||
text: str
|
||||
is_leaf: bool = True
|
||||
chunk_content_hash: str = field(default="")
|
||||
|
||||
def finalize_hash(self):
|
||||
self.chunk_content_hash = hashlib.sha256(self.text.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def _detect_heading(line: str) -> tuple[int, str, str] | None:
|
||||
"""(level, title, node_type) 또는 None. level 은 상대 깊이."""
|
||||
m = _ATX.match(line)
|
||||
if m:
|
||||
return (len(m.group(1)), m.group("title").strip(), None) # node_type 은 후처리에서
|
||||
for pat, lvl, nt in ((_KO_JANG, 1, "chapter"), (_KO_JEOL, 2, "section"),
|
||||
(_KO_JO, 3, "clause"), (_ENG, 1, "chapter")):
|
||||
m = pat.match(line)
|
||||
if m:
|
||||
return (lvl, m.group("title").strip()[:200], nt)
|
||||
return None
|
||||
|
||||
|
||||
def _segment(text: str) -> list[tuple[int, str | None, str | None, str]]:
|
||||
"""heading 경계로 분할 → [(level, title, node_type, segment_text), ...].
|
||||
|
||||
preamble(첫 heading 이전 본문) = (0, None, None, text).
|
||||
"""
|
||||
lines = text.splitlines(keepends=True)
|
||||
segs: list[tuple[int, str | None, str | None, list[str]]] = []
|
||||
cur: tuple[int, str | None, str | None, list[str]] | None = None
|
||||
preamble: list[str] = []
|
||||
for ln in lines:
|
||||
h = _detect_heading(ln.rstrip("\n"))
|
||||
if h:
|
||||
if cur is not None:
|
||||
segs.append(cur)
|
||||
elif preamble and "".join(preamble).strip():
|
||||
segs.append((0, None, None, preamble))
|
||||
cur = (h[0], h[1], h[2], [ln])
|
||||
else:
|
||||
if cur is None:
|
||||
preamble.append(ln)
|
||||
else:
|
||||
cur[3].append(ln)
|
||||
if cur is not None:
|
||||
segs.append(cur)
|
||||
elif preamble and "".join(preamble).strip():
|
||||
segs.append((0, None, None, preamble))
|
||||
return [(lvl, title, nt, "".join(body)) for (lvl, title, nt, body) in segs]
|
||||
|
||||
|
||||
def _window_split(body: str, target: int) -> list[str]:
|
||||
"""무overlap, 문단 우선 window 분해 (과대 segment fallback)."""
|
||||
paras = re.split(r'(\n\s*\n)', body) # 구분자 보존
|
||||
chunks: list[str] = []
|
||||
buf = ""
|
||||
for p in paras:
|
||||
if len(buf) + len(p) <= target:
|
||||
buf += p
|
||||
else:
|
||||
if buf.strip():
|
||||
chunks.append(buf)
|
||||
if len(p) <= target:
|
||||
buf = p
|
||||
else: # 단일 문단이 target 초과 → 문자 단위 hard split
|
||||
for i in range(0, len(p), target):
|
||||
chunks.append(p[i:i + target])
|
||||
buf = ""
|
||||
if buf.strip():
|
||||
chunks.append(buf)
|
||||
return [c for c in chunks if c.strip()]
|
||||
|
||||
|
||||
def build_hier_tree(
|
||||
text: str, *,
|
||||
split_threshold: int = STRUCTURE_SPLIT_THRESHOLD,
|
||||
leaf_target_max: int = LEAF_TARGET_MAX,
|
||||
leaf_hard_max: int = LEAF_HARD_MAX,
|
||||
max_depth: int = MAX_DEPTH,
|
||||
) -> list[HierNode]:
|
||||
"""텍스트 → HierNode 리스트 (idx 순, parent_idx 로 트리)."""
|
||||
if not text or not text.strip():
|
||||
return []
|
||||
segs = _segment(text)
|
||||
nodes: list[HierNode] = []
|
||||
# heading 깊이 정규화: 관측된 distinct level(>0) 을 1..k 로 매핑(절대 # 수 gap 제거).
|
||||
distinct = sorted({lvl for lvl, *_ in segs if lvl > 0})
|
||||
level_map = {raw: i + 1 for i, raw in enumerate(distinct)}
|
||||
|
||||
# 부모 찾기용 스택: (norm_level, idx)
|
||||
stack: list[tuple[int, int]] = []
|
||||
|
||||
def _heading_path(parent_idx: int | None, title: str | None) -> str | None:
|
||||
chain = []
|
||||
pi = parent_idx
|
||||
while pi is not None:
|
||||
if nodes[pi].section_title:
|
||||
chain.append(nodes[pi].section_title)
|
||||
pi = nodes[pi].parent_idx
|
||||
chain.reverse()
|
||||
if title:
|
||||
chain.append(title)
|
||||
return " > ".join(chain) if chain else None
|
||||
|
||||
for lvl, title, nt, body in segs:
|
||||
norm = 0 if lvl == 0 else min(level_map[lvl], max_depth)
|
||||
# 부모 = 스택에서 norm 보다 작은 가장 가까운 노드
|
||||
while stack and stack[-1][0] >= norm:
|
||||
stack.pop()
|
||||
parent_idx = stack[-1][1] if stack else None
|
||||
idx = len(nodes)
|
||||
hp = _heading_path(parent_idx, title)
|
||||
node = HierNode(idx=idx, parent_idx=parent_idx, level=norm, node_type=nt,
|
||||
section_title=title, heading_path=hp, text=body, is_leaf=True)
|
||||
nodes.append(node)
|
||||
if norm > 0:
|
||||
stack.append((norm, idx))
|
||||
|
||||
# 과대 segment fallback (window-split) — 이 segment 가 leaf 일 때만(자식 heading 이
|
||||
# 뒤에 오면 자연히 분할되므로, 여기선 일단 생성 후 후처리에서 자식 유무로 판정).
|
||||
has_child = {n.parent_idx for n in nodes if n.parent_idx is not None}
|
||||
MIN_LEAF_BODY = 30 # heading 제외 own body 가 이보다 짧고 자식 있으면 구조 전용(코퍼스 제외)
|
||||
|
||||
def _body_only(n: HierNode) -> str:
|
||||
lines = n.text.splitlines(keepends=True)
|
||||
if n.section_title and lines: # 첫 줄 = heading
|
||||
return "".join(lines[1:])
|
||||
return n.text
|
||||
|
||||
final: list[HierNode] = list(nodes)
|
||||
for n in list(final):
|
||||
is_nav_internal = n.idx in has_child
|
||||
# (B) 구조 전용 heading (자식 보유 + own body 빈약) → 코퍼스 제외. heading 은 자식 heading_path 에 보존.
|
||||
if is_nav_internal and len(_body_only(n).strip()) < MIN_LEAF_BODY:
|
||||
n.is_leaf = False
|
||||
continue
|
||||
# (A) own text 과대 → 자식 heading 유무 무관 window 분해. 부모는 heading 마커로 강등(코퍼스 제외).
|
||||
if len(n.text) > leaf_hard_max:
|
||||
wins = _window_split(n.text, leaf_target_max)
|
||||
if len(wins) > 1:
|
||||
n.is_leaf = False
|
||||
heading_line = (n.text.splitlines() or [""])[0]
|
||||
n.text = heading_line # 중복 저장 회피 (full body 는 window child 가 보유)
|
||||
n.node_type = (n.node_type or "section") + "_split"
|
||||
base_level = min(n.level + 1, max_depth)
|
||||
for wtext in wins:
|
||||
ci = len(final)
|
||||
final.append(HierNode(
|
||||
idx=ci, parent_idx=n.idx, level=base_level, node_type="window",
|
||||
section_title=n.section_title, heading_path=n.heading_path,
|
||||
text=wtext, is_leaf=True))
|
||||
for n in final:
|
||||
n.finalize_hash()
|
||||
return final
|
||||
|
||||
|
||||
def coverage_stats(text: str, nodes: list[HierNode]) -> dict:
|
||||
"""G2 검증 지표."""
|
||||
leaves = [n for n in nodes if n.is_leaf]
|
||||
leaf_chars = sum(len(n.text) for n in leaves)
|
||||
base = len(text)
|
||||
hashes = [n.chunk_content_hash for n in leaves]
|
||||
dup = len(hashes) - len(set(hashes))
|
||||
empty = sum(1 for n in leaves if not n.text.strip())
|
||||
# parent/level 무결성
|
||||
dangling = sum(1 for n in nodes if n.parent_idx is not None and (n.parent_idx < 0 or n.parent_idx >= len(nodes)))
|
||||
bad_level = 0
|
||||
for n in nodes:
|
||||
if n.parent_idx is not None:
|
||||
if n.level != nodes[n.parent_idx].level + 1 and nodes[n.parent_idx].node_type and "split" in (nodes[n.parent_idx].node_type or ""):
|
||||
pass # window child 는 base_level 규칙
|
||||
# 일반 네비: 자식 level > 부모 level 만 보장
|
||||
if n.level <= nodes[n.parent_idx].level and nodes[n.parent_idx].level > 0:
|
||||
bad_level += 1
|
||||
return {
|
||||
"nodes": len(nodes), "leaves": len(leaves),
|
||||
"coverage_ratio": round(leaf_chars / base, 4) if base else 0,
|
||||
"dup_leaf_hash": dup, "empty_leaf": empty,
|
||||
"dangling_parent": dangling, "bad_level": bad_level,
|
||||
"level_dist": {l: sum(1 for n in nodes if n.level == l) for l in sorted({n.level for n in nodes})},
|
||||
"leaf_len_min": min((len(n.text) for n in leaves), default=0),
|
||||
"leaf_len_max": max((len(n.text) for n in leaves), default=0),
|
||||
}
|
||||
Reference in New Issue
Block a user