"""Hierarchical decomposition rule builder (PR-DocSrv-Hierarchical-Decomposition-1 c3). 텍스트(주로 md_content 마크다운) → heading 경계 segment 트리. - 규칙 우선 경계 탐지: ATX 마크다운(#{1,6}) > 한국 구조(제N장/절/조) > 영문(Chapter/Section/Article). - 각 segment = heading 라인 + 다음 heading 전까지 본문 (서로 disjoint, 100% 커버). - parent/level = heading 깊이 기반 네비 트리. preamble(첫 heading 이전) = level 0 root 직속. - 과대 segment(>LEAF_HARD_MAX, 더 깊은 heading 없음) = window fallback: 본문을 무overlap window 로 분해해 child leaf 생성, 부모는 is_leaf=false(heading 만 보유, 코퍼스 제외). - is_leaf = 코퍼스 편입 대상 (replace predicate). window-split 부모만 false. 순수 함수 — DB 미접근. c4 에서 이 트리를 document_chunks 에 insert(parent_id 해소). """ from __future__ import annotations import re import hashlib import unicodedata from dataclasses import dataclass, field STRUCTURE_SPLIT_THRESHOLD = 4000 LEAF_TARGET_MAX = 3000 LEAF_HARD_MAX = 5000 MAX_DEPTH = 6 # 경계 패턴 (우선순위 순). group 'title' = 표시용, level 은 매처가 결정. _ATX = re.compile(r'^(#{1,6})\s+(?P\S.*?)\s*#*\s*$') _KO_JANG = re.compile(r'^\s*(?P<title>제\s*\d+\s*장\b.*)$') _KO_JEOL = re.compile(r'^\s*(?P<title>제\s*\d+\s*절\b.*)$') _KO_JO = re.compile(r'^\s*(?P<title>제\s*\d+\s*조\b.*)$') _ENG = re.compile(r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+[\dIVXLA-Z]+\b.*)$') # 코드펜스 경계 (FE outlineAnchors.ts:60 `/^\s{0,3}(```|~~~)/` 와 동일). 펜스 내부 라인은 # heading 미탐지 — 코드블록 안 '# foo' 가 가짜 절을 만들지 않게(O3). _FENCE = re.compile(r'^\s{0,3}(```|~~~)') def _utf16_units(s: str) -> int: """JS 문자열 .length(= UTF-16 code unit 수) 와 동일. astral(BMP 밖)=surrogate pair=2 units. FE 의 `raw.length` / `out.slice(off)` 가 UTF-16 code unit 단위라 char_start 도 같은 단위여야 함. len(s.encode('utf-16-le'))//2 = code unit 수 (utf-16-le 는 BOM 미부착).""" return len(s.encode("utf-16-le")) // 2 @dataclass class HierNode: idx: int parent_idx: int | None level: int node_type: str | None section_title: str | None heading_path: str | None text: str is_leaf: bool = True chunk_content_hash: str = field(default="") # md_content 내 heading 라인 시작 offset(UTF-16 code unit). jump-target(비-window leaf / %_split parent)만 # 값 보유; window-child / preamble(title None) = None(점프 타깃 아님, g0-t2/g2-t3). char_start: int | None = None def finalize_hash(self): self.chunk_content_hash = hashlib.sha256(self.text.encode("utf-8")).hexdigest() def _detect_heading(line: str) -> tuple[int, str, str] | None: """(level, title, node_type) 또는 None. level 은 상대 깊이.""" m = _ATX.match(line) if m: return (len(m.group(1)), m.group("title").strip(), None) # node_type 은 후처리에서 for pat, lvl, nt in ((_KO_JANG, 1, "chapter"), (_KO_JEOL, 2, "section"), (_KO_JO, 3, "clause"), (_ENG, 1, "chapter")): m = pat.match(line) if m: return (lvl, m.group("title").strip()[:200], nt) return None def _segment(text: str) -> list[tuple[int, str | None, str | None, str, int | None]]: """heading 경계로 분할 → [(level, title, node_type, segment_text, char_start), ...]. 라인 모델 = FE outlineAnchors.ts:55-65 와 동일: `text.split('\n')` + UTF-16 code-unit offset + 코드펜스 추적(splitlines(keepends=True) 폐기 — JS 와 라인경계 \v\f\x1c… 7종을 다르게 쪼개는 문제 제거). char_start = 그 segment 첫 라인(=heading 라인)의 UTF-16 offset. preamble = None(점프 타깃 아님). node.text 보존(라인모델 변경에 hash-neutral): 그룹을 '\n'.join 하되 마지막 그룹이 아니면 분리용 '\n' 을 그 그룹 끝에 되돌려 붙여(= splitlines(keepends) 가 마지막 라인에 \n 을 남기던 동작) 원문과 동일. CR 미strip(CRLF 면 '\r' 잔류 → FE raw.length 와 동일), NFC 무변환. """ raw_lines = text.split("\n") n = len(raw_lines) # 라인별 (offset, heading) 선계산 — 펜스 내부/경계 라인은 heading 미탐지. offs: list[int] = [] headings: list[tuple[int, str, str | None] | None] = [] off = 0 in_fence = False for raw in raw_lines: fence_toggle = bool(_FENCE.match(raw)) fenced_here = in_fence or fence_toggle offs.append(off) headings.append(None if fenced_here else _detect_heading(raw)) if fence_toggle: in_fence = not in_fence off += _utf16_units(raw) + 1 # '\n' # 그룹 경계 = 첫 heading 이전(preamble) + 각 heading 라인. (start_idx, meta) 리스트. first_heading = next((i for i in range(n) if headings[i] is not None), None) starts: list[int] = [] metas: list[tuple[int, str | None, str | None] | None] = [] if first_heading is None: starts.append(0) metas.append(None) # 전체 = preamble else: if first_heading > 0: starts.append(0) metas.append(None) for i in range(first_heading, n): h = headings[i] if h is not None: starts.append(i) metas.append((h[0], h[1], h[2])) segs: list[tuple[int, str | None, str | None, str, int | None]] = [] for gi, s_idx in enumerate(starts): e_idx = starts[gi + 1] if gi + 1 < len(starts) else n seg_text = "\n".join(raw_lines[s_idx:e_idx]) if e_idx < n: seg_text += "\n" # 분리용 '\n' 을 앞 그룹에 귀속(splitlines keepends 동치) meta = metas[gi] if meta is None: if not seg_text.strip(): # 빈 preamble 폐기(기존 동작) continue segs.append((0, None, None, seg_text, None)) else: lvl, title, nt = meta segs.append((lvl, title, nt, seg_text, offs[s_idx])) return segs def _window_split(body: str, target: int) -> list[str]: """무overlap, 문단 우선 window 분해 (과대 segment fallback).""" paras = re.split(r'(\n\s*\n)', body) # 구분자 보존 chunks: list[str] = [] buf = "" for p in paras: if len(buf) + len(p) <= target: buf += p else: if buf.strip(): chunks.append(buf) if len(p) <= target: buf = p else: # 단일 문단이 target 초과 → 문자 단위 hard split for i in range(0, len(p), target): chunks.append(p[i:i + target]) buf = "" if buf.strip(): chunks.append(buf) return [c for c in chunks if c.strip()] def build_hier_tree( text: str, *, split_threshold: int = STRUCTURE_SPLIT_THRESHOLD, leaf_target_max: int = LEAF_TARGET_MAX, leaf_hard_max: int = LEAF_HARD_MAX, max_depth: int = MAX_DEPTH, ) -> list[HierNode]: """텍스트 → HierNode 리스트 (idx 순, parent_idx 로 트리).""" if not text or not text.strip(): return [] segs = _segment(text) nodes: list[HierNode] = [] # heading 깊이 정규화: 관측된 distinct level(>0) 을 1..k 로 매핑(절대 # 수 gap 제거). distinct = sorted({lvl for lvl, *_ in segs if lvl > 0}) level_map = {raw: i + 1 for i, raw in enumerate(distinct)} # 부모 찾기용 스택: (norm_level, idx) stack: list[tuple[int, int]] = [] def _heading_path(parent_idx: int | None, title: str | None) -> str | None: chain = [] pi = parent_idx while pi is not None: if nodes[pi].section_title: chain.append(nodes[pi].section_title) pi = nodes[pi].parent_idx chain.reverse() if title: chain.append(title) return " > ".join(chain) if chain else None for lvl, title, nt, body, cstart in segs: norm = 0 if lvl == 0 else min(level_map[lvl], max_depth) # 부모 = 스택에서 norm 보다 작은 가장 가까운 노드 while stack and stack[-1][0] >= norm: stack.pop() parent_idx = stack[-1][1] if stack else None idx = len(nodes) hp = _heading_path(parent_idx, title) # char_start = 생성 시점 할당(window-split 가 n.text 를 heading 라인으로 truncate 하기 전에 박제). # split-parent 가 돼도 이 값(heading 라인 offset)이 windowed section 단일 jump target 으로 보존된다. node = HierNode(idx=idx, parent_idx=parent_idx, level=norm, node_type=nt, section_title=title, heading_path=hp, text=body, is_leaf=True, char_start=cstart) nodes.append(node) if norm > 0: stack.append((norm, idx)) # 과대 segment fallback (window-split) — 이 segment 가 leaf 일 때만(자식 heading 이 # 뒤에 오면 자연히 분할되므로, 여기선 일단 생성 후 후처리에서 자식 유무로 판정). has_child = {n.parent_idx for n in nodes if n.parent_idx is not None} MIN_LEAF_BODY = 30 # heading 제외 own body 가 이보다 짧고 자식 있으면 구조 전용(코퍼스 제외) def _body_only(n: HierNode) -> str: lines = n.text.splitlines(keepends=True) if n.section_title and lines: # 첫 줄 = heading return "".join(lines[1:]) return n.text final: list[HierNode] = list(nodes) for n in list(final): is_nav_internal = n.idx in has_child # (B) 구조 전용 heading (자식 보유 + own body 빈약) → 코퍼스 제외. heading 은 자식 heading_path 에 보존. if is_nav_internal and len(_body_only(n).strip()) < MIN_LEAF_BODY: n.is_leaf = False continue # (A) own text 과대 → 자식 heading 유무 무관 window 분해. 부모는 heading 마커로 강등(코퍼스 제외). if len(n.text) > leaf_hard_max: wins = _window_split(n.text, leaf_target_max) if len(wins) > 1: n.is_leaf = False heading_line = (n.text.splitlines() or [""])[0] n.text = heading_line # 중복 저장 회피 (full body 는 window child 가 보유) n.node_type = (n.node_type or "section") + "_split" # chapter_split/clause_split/section_split # n.char_start 보존 = windowed section 의 단일 jump target(생성시점 heading offset). base_level = min(n.level + 1, max_depth) for wtext in wins: ci = len(final) # window child = char_start None(_window_split 가 whitespace buf 를 drop 해 # char-preserving 이 아니므로 합산 offset 이 거짓; 점프 타깃도 아님, B1/#1). final.append(HierNode( idx=ci, parent_idx=n.idx, level=base_level, node_type="window", section_title=n.section_title, heading_path=n.heading_path, text=wtext, is_leaf=True, char_start=None)) for n in final: n.finalize_hash() return final def coverage_stats(text: str, nodes: list[HierNode]) -> dict: """G2 검증 지표.""" leaves = [n for n in nodes if n.is_leaf] leaf_chars = sum(len(n.text) for n in leaves) base = len(text) hashes = [n.chunk_content_hash for n in leaves] dup = len(hashes) - len(set(hashes)) empty = sum(1 for n in leaves if not n.text.strip()) # parent/level 무결성 dangling = sum(1 for n in nodes if n.parent_idx is not None and (n.parent_idx < 0 or n.parent_idx >= len(nodes))) bad_level = 0 for n in nodes: if n.parent_idx is not None: if n.level != nodes[n.parent_idx].level + 1 and nodes[n.parent_idx].node_type and "split" in (nodes[n.parent_idx].node_type or ""): pass # window child 는 base_level 규칙 # 일반 네비: 자식 level > 부모 level 만 보장 if n.level <= nodes[n.parent_idx].level and nodes[n.parent_idx].level > 0: bad_level += 1 # char_start O5 검증 (UTF-16 슬라이스 == heading 라인) + NFC telemetry (g2-t4). # 검증은 FE 가 실제 쓰는 방식과 동일: md.encode('utf-16-le')[2*cs:2*(cs+n)].decode == heading_line # (Python code-point 슬라이스 md[cs:cs+n] 가 아님 — astral 시 어긋남). md_u16 = text.encode("utf-16-le") cs_total = cs_verified = 0 for n in nodes: if n.char_start is None: continue cs_total += 1 first_line = n.text.split("\n", 1)[0] nu = _utf16_units(first_line) seg = md_u16[2 * n.char_start: 2 * (n.char_start + nu)] try: if seg.decode("utf-16-le") == first_line: cs_verified += 1 except UnicodeDecodeError: pass non_nfc = 1 if unicodedata.normalize("NFC", text) != text else 0 return { "nodes": len(nodes), "leaves": len(leaves), "coverage_ratio": round(leaf_chars / base, 4) if base else 0, "dup_leaf_hash": dup, "empty_leaf": empty, "dangling_parent": dangling, "bad_level": bad_level, "level_dist": {l: sum(1 for n in nodes if n.level == l) for l in sorted({n.level for n in nodes})}, "leaf_len_min": min((len(n.text) for n in leaves), default=0), "leaf_len_max": max((len(n.text) for n in leaves), default=0), "char_start_total": cs_total, "char_start_verified": cs_verified, "non_nfc": non_nfc, }