b6ce228f6e
A-1: _detect_heading 의 ATX 분기가 절번호 식별자(UG-79/PG-27.4.1/UW-11/A-69 등,
[A-Z]{1,4}-\d+(\.\d+)*)를 node_type='clause' 로 분류(과거 ATX=무조건 None).
ASME clause=0 사각지대의 근본 원인 — 절은 이미 ATX heading 으로 탐지되나 'clause'
타이핑이 한국 제N조 전용이었음(5180 Sec I = clause 0, heading_path 1637 = window/None).
C-4: _clean_label 로 marker LaTeX/markdown/페이지번호 아티팩트
('$\textbf{PG-20.1 ...}', '(25) **A-69**')를 패턴 매칭 전 정제 — 없으면 노이즈에
막혀 매칭 0. 표시 라벨도 동시 정제. 한국 법령/일반 ATX 엔 inert(무회귀).
A-2: 큰 절(>LEAF_HARD_MAX)은 기존 window-split 이 'clause'→'clause_split'
(char_start 점프 타깃 보존)로 자동 처리 — 추가 코드 없음.
검증(순수함수, DB/GPU/재마크다운 0): test_asme_clause 6/6 신규 + test_eng_matcher 4/4
(PG-1 계약을 clause 로 갱신) + test_builder_char_start 7/7(char_start 무영향).
DS 적용(V-0 스모크 → 기존 md V-1 0-cost 검증)은 후속.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
319 lines
15 KiB
Python
319 lines
15 KiB
Python
"""Hierarchical decomposition rule builder (PR-DocSrv-Hierarchical-Decomposition-1 c3).
|
|
|
|
텍스트(주로 md_content 마크다운) → heading 경계 segment 트리.
|
|
- 규칙 우선 경계 탐지: ATX 마크다운(#{1,6}) > 한국 구조(제N장/절/조) > 영문(Chapter/Section/Article).
|
|
- 각 segment = heading 라인 + 다음 heading 전까지 본문 (서로 disjoint, 100% 커버).
|
|
- parent/level = heading 깊이 기반 네비 트리. preamble(첫 heading 이전) = level 0 root 직속.
|
|
- 과대 segment(>LEAF_HARD_MAX, 더 깊은 heading 없음) = window fallback: 본문을 무overlap
|
|
window 로 분해해 child leaf 생성, 부모는 is_leaf=false(heading 만 보유, 코퍼스 제외).
|
|
- is_leaf = 코퍼스 편입 대상 (replace predicate). window-split 부모만 false.
|
|
|
|
순수 함수 — DB 미접근. c4 에서 이 트리를 document_chunks 에 insert(parent_id 해소).
|
|
"""
|
|
from __future__ import annotations
|
|
import re
|
|
import hashlib
|
|
import unicodedata
|
|
from dataclasses import dataclass, field
|
|
|
|
STRUCTURE_SPLIT_THRESHOLD = 4000
|
|
LEAF_TARGET_MAX = 3000
|
|
LEAF_HARD_MAX = 5000
|
|
MAX_DEPTH = 6
|
|
|
|
# 경계 패턴 (우선순위 순). group 'title' = 표시용, level 은 매처가 결정.
|
|
_ATX = re.compile(r'^(#{1,6})\s+(?P<title>\S.*?)\s*#*\s*$')
|
|
_KO_JANG = re.compile(r'^\s*(?P<title>제\s*\d+\s*장\b.*)$')
|
|
_KO_JEOL = re.compile(r'^\s*(?P<title>제\s*\d+\s*절\b.*)$')
|
|
_KO_JO = re.compile(r'^\s*(?P<title>제\s*\d+\s*조\b.*)$')
|
|
# _ENG: 영문 구조 헤딩(ATX 미사용 문서용). ASME 파트는 보통 ATX(`# PART PG`)로 잡혀 _ENG 의존 낮음.
|
|
# D1: 식별자 뒤가 소문자 문장연속이면("Part III to demonstrate to the satisfaction…") 본문이므로
|
|
# 미탐지 — 가짜 절 차단. 선택 제목은 대문자/괄호/숫자로 시작해야 헤딩 인정(소문자 시작=문장으로 봄).
|
|
# 식별자는 번호/PG/3.31/UHX/A-1 등 (.·- 소수·하이픈 확장 허용).
|
|
_ENG = re.compile(
|
|
r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+'
|
|
r'[\dIVXLA-Z]+(?:[.\-][\dA-Za-z]+)*'
|
|
r'(?:\s+[A-Z(\d][^\n]*)?'
|
|
r')\s*$'
|
|
)
|
|
|
|
# 코드펜스 경계 (FE outlineAnchors.ts:60 `/^\s{0,3}(```|~~~)/` 와 동일). 펜스 내부 라인은
|
|
# heading 미탐지 — 코드블록 안 '# foo' 가 가짜 절을 만들지 않게(O3).
|
|
_FENCE = re.compile(r'^\s{0,3}(```|~~~)')
|
|
|
|
|
|
# ASME 절 식별자 (A-1): UG-79 · PG-27.4.1 · UW-11 · UCS-56 · A-69 · PFT-14
|
|
# (대문자 1~4 + 하이픈 + 숫자[.숫자]*). _detect_heading 의 ATX 분기에서 node_type='clause' 판정에 사용.
|
|
# 한국 법령(제N조)은 _KO_JO 가 별도 처리 — 본 패턴/정제와 무관(무회귀).
|
|
_ASME_CLAUSE = re.compile(r'^[A-Z]{1,4}-\d+(?:\.\d+)*\b')
|
|
|
|
|
|
def _clean_label(title: str) -> str:
|
|
r"""C-4: marker 가 박는 LaTeX/markdown/페이지번호 아티팩트 제거 — 절번호 패턴 매칭의 전처리 겸 표시 라벨 정제.
|
|
실데이터 예: '$\textbf{PG-20.1 …} \hspace{0.2cm} \textbf{(25)}$' → 'PG-20.1 …' / '(25) **A-69**' → 'A-69'.
|
|
노이즈 없는 제목(한국 법령·일반 ATX 등)엔 inert(무회귀)."""
|
|
t = re.sub(r'\\textbf|\\textit|\\mathbf|\\hspace\{[^}]*\}|[${}]|\*\*', '', title)
|
|
t = re.sub(r'^\s*\(\d+\)\s*', '', t) # 선두 페이지번호 '(25) '
|
|
return re.sub(r'\s{2,}', ' ', t).strip()
|
|
|
|
|
|
def _utf16_units(s: str) -> int:
|
|
"""JS 문자열 .length(= UTF-16 code unit 수) 와 동일. astral(BMP 밖)=surrogate pair=2 units.
|
|
FE 의 `raw.length` / `out.slice(off)` 가 UTF-16 code unit 단위라 char_start 도 같은 단위여야 함.
|
|
len(s.encode('utf-16-le'))//2 = code unit 수 (utf-16-le 는 BOM 미부착)."""
|
|
return len(s.encode("utf-16-le")) // 2
|
|
|
|
|
|
@dataclass
|
|
class HierNode:
|
|
idx: int
|
|
parent_idx: int | None
|
|
level: int
|
|
node_type: str | None
|
|
section_title: str | None
|
|
heading_path: str | None
|
|
text: str
|
|
is_leaf: bool = True
|
|
chunk_content_hash: str = field(default="")
|
|
# md_content 내 heading 라인 시작 offset(UTF-16 code unit). jump-target(비-window leaf / %_split parent)만
|
|
# 값 보유; window-child / preamble(title None) = None(점프 타깃 아님, g0-t2/g2-t3).
|
|
char_start: int | None = None
|
|
|
|
def finalize_hash(self):
|
|
self.chunk_content_hash = hashlib.sha256(self.text.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def _detect_heading(line: str) -> tuple[int, str, str] | None:
|
|
"""(level, title, node_type) 또는 None. level 은 상대 깊이."""
|
|
m = _ATX.match(line)
|
|
if m:
|
|
title = _clean_label(m.group("title").strip()) # C-4: LaTeX/md/페이지번호 정제(전처리)
|
|
nt = "clause" if _ASME_CLAUSE.match(title) else None # A-1: ASME 절 식별자(UG-79 등) → clause
|
|
return (len(m.group(1)), title, nt)
|
|
for pat, lvl, nt in ((_KO_JANG, 1, "chapter"), (_KO_JEOL, 2, "section"),
|
|
(_KO_JO, 3, "clause"), (_ENG, 1, "chapter")):
|
|
m = pat.match(line)
|
|
if m:
|
|
return (lvl, m.group("title").strip()[:200], nt)
|
|
return None
|
|
|
|
|
|
def _segment(text: str) -> list[tuple[int, str | None, str | None, str, int | None]]:
|
|
"""heading 경계로 분할 → [(level, title, node_type, segment_text, char_start), ...].
|
|
|
|
라인 모델 = FE outlineAnchors.ts:55-65 와 동일: `text.split('\n')` + UTF-16 code-unit offset +
|
|
코드펜스 추적(splitlines(keepends=True) 폐기 — JS 와 라인경계 \v\f\x1c… 7종을 다르게 쪼개는 문제 제거).
|
|
char_start = 그 segment 첫 라인(=heading 라인)의 UTF-16 offset. preamble = None(점프 타깃 아님).
|
|
node.text 보존(라인모델 변경에 hash-neutral): 그룹을 '\n'.join 하되 마지막 그룹이 아니면 분리용 '\n'
|
|
을 그 그룹 끝에 되돌려 붙여(= splitlines(keepends) 가 마지막 라인에 \n 을 남기던 동작) 원문과 동일.
|
|
CR 미strip(CRLF 면 '\r' 잔류 → FE raw.length 와 동일), NFC 무변환.
|
|
"""
|
|
raw_lines = text.split("\n")
|
|
n = len(raw_lines)
|
|
# 라인별 (offset, heading) 선계산 — 펜스 내부/경계 라인은 heading 미탐지.
|
|
offs: list[int] = []
|
|
headings: list[tuple[int, str, str | None] | None] = []
|
|
off = 0
|
|
in_fence = False
|
|
for raw in raw_lines:
|
|
fence_toggle = bool(_FENCE.match(raw))
|
|
fenced_here = in_fence or fence_toggle
|
|
offs.append(off)
|
|
headings.append(None if fenced_here else _detect_heading(raw))
|
|
if fence_toggle:
|
|
in_fence = not in_fence
|
|
off += _utf16_units(raw) + 1 # '\n'
|
|
|
|
# 그룹 경계 = 첫 heading 이전(preamble) + 각 heading 라인. (start_idx, meta) 리스트.
|
|
first_heading = next((i for i in range(n) if headings[i] is not None), None)
|
|
starts: list[int] = []
|
|
metas: list[tuple[int, str | None, str | None] | None] = []
|
|
if first_heading is None:
|
|
starts.append(0)
|
|
metas.append(None) # 전체 = preamble
|
|
else:
|
|
if first_heading > 0:
|
|
starts.append(0)
|
|
metas.append(None)
|
|
for i in range(first_heading, n):
|
|
h = headings[i]
|
|
if h is not None:
|
|
starts.append(i)
|
|
metas.append((h[0], h[1], h[2]))
|
|
|
|
segs: list[tuple[int, str | None, str | None, str, int | None]] = []
|
|
for gi, s_idx in enumerate(starts):
|
|
e_idx = starts[gi + 1] if gi + 1 < len(starts) else n
|
|
seg_text = "\n".join(raw_lines[s_idx:e_idx])
|
|
if e_idx < n:
|
|
seg_text += "\n" # 분리용 '\n' 을 앞 그룹에 귀속(splitlines keepends 동치)
|
|
meta = metas[gi]
|
|
if meta is None:
|
|
if not seg_text.strip(): # 빈 preamble 폐기(기존 동작)
|
|
continue
|
|
segs.append((0, None, None, seg_text, None))
|
|
else:
|
|
lvl, title, nt = meta
|
|
segs.append((lvl, title, nt, seg_text, offs[s_idx]))
|
|
return segs
|
|
|
|
|
|
def _window_split(body: str, target: int) -> list[str]:
|
|
"""무overlap, 문단 우선 window 분해 (과대 segment fallback)."""
|
|
paras = re.split(r'(\n\s*\n)', body) # 구분자 보존
|
|
chunks: list[str] = []
|
|
buf = ""
|
|
for p in paras:
|
|
if len(buf) + len(p) <= target:
|
|
buf += p
|
|
else:
|
|
if buf.strip():
|
|
chunks.append(buf)
|
|
if len(p) <= target:
|
|
buf = p
|
|
else: # 단일 문단이 target 초과 → 문자 단위 hard split
|
|
for i in range(0, len(p), target):
|
|
chunks.append(p[i:i + target])
|
|
buf = ""
|
|
if buf.strip():
|
|
chunks.append(buf)
|
|
return [c for c in chunks if c.strip()]
|
|
|
|
|
|
def build_hier_tree(
|
|
text: str, *,
|
|
split_threshold: int = STRUCTURE_SPLIT_THRESHOLD,
|
|
leaf_target_max: int = LEAF_TARGET_MAX,
|
|
leaf_hard_max: int = LEAF_HARD_MAX,
|
|
max_depth: int = MAX_DEPTH,
|
|
) -> list[HierNode]:
|
|
"""텍스트 → HierNode 리스트 (idx 순, parent_idx 로 트리)."""
|
|
if not text or not text.strip():
|
|
return []
|
|
segs = _segment(text)
|
|
nodes: list[HierNode] = []
|
|
# heading 깊이 정규화: 관측된 distinct level(>0) 을 1..k 로 매핑(절대 # 수 gap 제거).
|
|
distinct = sorted({lvl for lvl, *_ in segs if lvl > 0})
|
|
level_map = {raw: i + 1 for i, raw in enumerate(distinct)}
|
|
|
|
# 부모 찾기용 스택: (norm_level, idx)
|
|
stack: list[tuple[int, int]] = []
|
|
|
|
def _heading_path(parent_idx: int | None, title: str | None) -> str | None:
|
|
chain = []
|
|
pi = parent_idx
|
|
while pi is not None:
|
|
if nodes[pi].section_title:
|
|
chain.append(nodes[pi].section_title)
|
|
pi = nodes[pi].parent_idx
|
|
chain.reverse()
|
|
if title:
|
|
chain.append(title)
|
|
return " > ".join(chain) if chain else None
|
|
|
|
for lvl, title, nt, body, cstart in segs:
|
|
norm = 0 if lvl == 0 else min(level_map[lvl], max_depth)
|
|
# 부모 = 스택에서 norm 보다 작은 가장 가까운 노드
|
|
while stack and stack[-1][0] >= norm:
|
|
stack.pop()
|
|
parent_idx = stack[-1][1] if stack else None
|
|
idx = len(nodes)
|
|
hp = _heading_path(parent_idx, title)
|
|
# char_start = 생성 시점 할당(window-split 가 n.text 를 heading 라인으로 truncate 하기 전에 박제).
|
|
# split-parent 가 돼도 이 값(heading 라인 offset)이 windowed section 단일 jump target 으로 보존된다.
|
|
node = HierNode(idx=idx, parent_idx=parent_idx, level=norm, node_type=nt,
|
|
section_title=title, heading_path=hp, text=body, is_leaf=True,
|
|
char_start=cstart)
|
|
nodes.append(node)
|
|
if norm > 0:
|
|
stack.append((norm, idx))
|
|
|
|
# 과대 segment fallback (window-split) — 이 segment 가 leaf 일 때만(자식 heading 이
|
|
# 뒤에 오면 자연히 분할되므로, 여기선 일단 생성 후 후처리에서 자식 유무로 판정).
|
|
has_child = {n.parent_idx for n in nodes if n.parent_idx is not None}
|
|
MIN_LEAF_BODY = 30 # heading 제외 own body 가 이보다 짧고 자식 있으면 구조 전용(코퍼스 제외)
|
|
|
|
def _body_only(n: HierNode) -> str:
|
|
lines = n.text.splitlines(keepends=True)
|
|
if n.section_title and lines: # 첫 줄 = heading
|
|
return "".join(lines[1:])
|
|
return n.text
|
|
|
|
final: list[HierNode] = list(nodes)
|
|
for n in list(final):
|
|
is_nav_internal = n.idx in has_child
|
|
# (B) 구조 전용 heading (자식 보유 + own body 빈약) → 코퍼스 제외. heading 은 자식 heading_path 에 보존.
|
|
if is_nav_internal and len(_body_only(n).strip()) < MIN_LEAF_BODY:
|
|
n.is_leaf = False
|
|
continue
|
|
# (A) own text 과대 → 자식 heading 유무 무관 window 분해. 부모는 heading 마커로 강등(코퍼스 제외).
|
|
if len(n.text) > leaf_hard_max:
|
|
wins = _window_split(n.text, leaf_target_max)
|
|
if len(wins) > 1:
|
|
n.is_leaf = False
|
|
heading_line = (n.text.splitlines() or [""])[0]
|
|
n.text = heading_line # 중복 저장 회피 (full body 는 window child 가 보유)
|
|
n.node_type = (n.node_type or "section") + "_split" # chapter_split/clause_split/section_split
|
|
# n.char_start 보존 = windowed section 의 단일 jump target(생성시점 heading offset).
|
|
base_level = min(n.level + 1, max_depth)
|
|
for wtext in wins:
|
|
ci = len(final)
|
|
# window child = char_start None(_window_split 가 whitespace buf 를 drop 해
|
|
# char-preserving 이 아니므로 합산 offset 이 거짓; 점프 타깃도 아님, B1/#1).
|
|
final.append(HierNode(
|
|
idx=ci, parent_idx=n.idx, level=base_level, node_type="window",
|
|
section_title=n.section_title, heading_path=n.heading_path,
|
|
text=wtext, is_leaf=True, char_start=None))
|
|
for n in final:
|
|
n.finalize_hash()
|
|
return final
|
|
|
|
|
|
def coverage_stats(text: str, nodes: list[HierNode]) -> dict:
|
|
"""G2 검증 지표."""
|
|
leaves = [n for n in nodes if n.is_leaf]
|
|
leaf_chars = sum(len(n.text) for n in leaves)
|
|
base = len(text)
|
|
hashes = [n.chunk_content_hash for n in leaves]
|
|
dup = len(hashes) - len(set(hashes))
|
|
empty = sum(1 for n in leaves if not n.text.strip())
|
|
# parent/level 무결성
|
|
dangling = sum(1 for n in nodes if n.parent_idx is not None and (n.parent_idx < 0 or n.parent_idx >= len(nodes)))
|
|
bad_level = 0
|
|
for n in nodes:
|
|
if n.parent_idx is not None:
|
|
if n.level != nodes[n.parent_idx].level + 1 and nodes[n.parent_idx].node_type and "split" in (nodes[n.parent_idx].node_type or ""):
|
|
pass # window child 는 base_level 규칙
|
|
# 일반 네비: 자식 level > 부모 level 만 보장
|
|
if n.level <= nodes[n.parent_idx].level and nodes[n.parent_idx].level > 0:
|
|
bad_level += 1
|
|
# char_start O5 검증 (UTF-16 슬라이스 == heading 라인) + NFC telemetry (g2-t4).
|
|
# 검증은 FE 가 실제 쓰는 방식과 동일: md.encode('utf-16-le')[2*cs:2*(cs+n)].decode == heading_line
|
|
# (Python code-point 슬라이스 md[cs:cs+n] 가 아님 — astral 시 어긋남).
|
|
md_u16 = text.encode("utf-16-le")
|
|
cs_total = cs_verified = 0
|
|
for n in nodes:
|
|
if n.char_start is None:
|
|
continue
|
|
cs_total += 1
|
|
first_line = n.text.split("\n", 1)[0]
|
|
nu = _utf16_units(first_line)
|
|
seg = md_u16[2 * n.char_start: 2 * (n.char_start + nu)]
|
|
try:
|
|
if seg.decode("utf-16-le") == first_line:
|
|
cs_verified += 1
|
|
except UnicodeDecodeError:
|
|
pass
|
|
non_nfc = 1 if unicodedata.normalize("NFC", text) != text else 0
|
|
return {
|
|
"nodes": len(nodes), "leaves": len(leaves),
|
|
"coverage_ratio": round(leaf_chars / base, 4) if base else 0,
|
|
"dup_leaf_hash": dup, "empty_leaf": empty,
|
|
"dangling_parent": dangling, "bad_level": bad_level,
|
|
"level_dist": {l: sum(1 for n in nodes if n.level == l) for l in sorted({n.level for n in nodes})},
|
|
"leaf_len_min": min((len(n.text) for n in leaves), default=0),
|
|
"leaf_len_max": max((len(n.text) for n in leaves), default=0),
|
|
"char_start_total": cs_total, "char_start_verified": cs_verified,
|
|
"non_nfc": non_nfc,
|
|
}
|