feat(search): hier section per-leaf analysis scaffold (Section-Summary-1 c1)

chunk_section_analysis 테이블(migration 286) + ORM model + pilot script.
document_chunks(retrieval-hot)와 분리된 절-레벨 분석 축. domain 상속,
section_type 절-전용 역할 enum, status로 skip 박제, source_content_hash로 stale 탐지.
script-only(scripts mount, rebuild 불필요). LLM 0 dry-run 검증 = 5225 147 analyze + 17 skip.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-05-24 13:45:30 +00:00
parent a7b16b63db
commit cfadaaffd9
3 changed files with 395 additions and 0 deletions
+49
View File
@@ -0,0 +1,49 @@
"""chunk_section_analysis 테이블 ORM (PR-DocSrv-Hier-Section-Summary-1).
per-절(hier_section is_leaf) Mac mini 분석 결과 저장. document_chunks(retrieval-hot)
와 분리된 절-레벨 분석 축. migration 286 에서 테이블 생성.
⚠ pilot 단계(scripts/section_summary_pilot.py)는 `./scripts` mount 로 rebuild 없이
돌지만, 이 모델은 `app/` 이라 baked — 즉 pilot script 는 이 모델을 import 하지 않고
raw SQL 을 쓴다. 본 모델은 (1) 스키마 문서화 (2) 향후 상시 worker 배선(별 PR, image
rebuild 동반) 용도. 컬럼 정의는 migration 286 과 단일 진실로 동기 유지.
"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, Float, ForeignKey, Text, text
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class ChunkSectionAnalysis(Base):
__tablename__ = "chunk_section_analysis"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
# FK CASCADE — document_chunks 에 종속된 분석 데이터(1:1). parent_id(self-FK, app-level)와 의도적 차이.
chunk_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("document_chunks.id", ondelete="CASCADE"), nullable=False
)
# summarized | skipped_tiny | failed — skip 도 행으로 박제(미처리 vs 의도 skip 구분)
status: Mapped[str] = mapped_column(Text, nullable=False)
summary: Mapped[str | None] = mapped_column(Text)
# 절-전용 역할 enum (느슨한 text, CHECK 미설정 — pilot 관찰 후 조임).
# definition/requirement/procedure/formula/data_table/example/case_study/question/reference/overview/other
section_type: Mapped[str | None] = mapped_column(Text)
# doc-level taxonomy path(documents.ai_domain) 상속 스냅샷.
domain: Mapped[str | None] = mapped_column(Text)
confidence: Mapped[float | None] = mapped_column(Float)
model: Mapped[str | None] = mapped_column(Text)
prompt_version: Mapped[str] = mapped_column(Text, nullable=False)
# 분석 시점 leaf chunk_content_hash 스냅샷 — 원문 변경(재분해) stale 탐지.
source_content_hash: Mapped[str | None] = mapped_column(Text)
error: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=text("now()"), nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=text("now()"), nullable=False
)
# UNIQUE(chunk_id, prompt_version) 는 migration 286 에 정의 (ORM 미반영 — 조회/upsert 는 raw SQL).
+37
View File
@@ -0,0 +1,37 @@
-- 286_chunk_section_analysis.sql
-- PR-DocSrv-Hier-Section-Summary-1: per-절(leaf) Mac mini 분석 결과 저장 테이블.
--
-- document_chunks(retrieval-hot: ivfflat + 2 gin + 6 btree)와 분리된 "절-레벨 분석 축".
-- hot 테이블 무손상 + builder/replace 재작성과 무관. 검색 코퍼스와 완전 분리(additive).
--
-- 컬럼 설계 (사용자 review 2026-05-24):
-- status : summarized | skipped_tiny | failed — skip 도 행으로 박제(미처리 vs 의도 skip 구분)
-- summary/section_type/domain/confidence : 분석 결과 (skip/failed 행은 NULL 가능)
-- section_type : 절-전용 역할 enum (느슨한 text, CHECK 미설정 — pilot 관찰 후 조임)
-- domain : doc-level taxonomy path(d.ai_domain) 상속 스냅샷
-- model/prompt_version: 모델·프롬프트 변경 추적
-- source_content_hash : 분석 시점 leaf chunk_content_hash 스냅샷 — 원문 변경(재분해) stale 탐지
--
-- chunk_id = FK CASCADE: chunk_section_analysis 는 document_chunks 에 종속된 분석 데이터.
-- parent_id(자기참조 트리, cascade 모호성 회피 위해 app-level)와 달리 여기는 단순 1:1 종속 → FK 정당.
--
-- UNIQUE(chunk_id, prompt_version): 같은 절을 같은 프롬프트로 1행. 원문 변경 시 동일 키 ON CONFLICT 갱신.
-- (chunk_id 단독 조회는 이 복합 UNIQUE 의 leftmost prefix btree 로 커버 → 별 인덱스 불필요)
--
-- ⚠ 단일 statement (migration runner exec_driver_sql). BEGIN/COMMIT/ROLLBACK 금지.
CREATE TABLE IF NOT EXISTS chunk_section_analysis (
id BIGSERIAL PRIMARY KEY,
chunk_id BIGINT NOT NULL REFERENCES document_chunks(id) ON DELETE CASCADE,
status TEXT NOT NULL,
summary TEXT,
section_type TEXT,
domain TEXT,
confidence REAL,
model TEXT,
prompt_version TEXT NOT NULL,
source_content_hash TEXT,
error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE (chunk_id, prompt_version)
);
+309
View File
@@ -0,0 +1,309 @@
"""PR-DocSrv-Hier-Section-Summary-1 — per-절(leaf) Mac mini 분석 pilot (one-shot admin script).
hier_section is_leaf 청크(절)를 Mac mini gemma-4-26B 로 절 단위 **요약 + 기능 type 분류**.
결과를 chunk_section_analysis(migration 286)에 저장. 문서레벨 분석과 별개의 절-레벨 축.
* 영구 worker 경로 아님 — pilot 한정 수동 배치. 상시 enqueue worker 배선은 별 PR(rebuild 동반).
* `./scripts` mount 라 image rebuild 없이 실행. baked 모듈(AIClient/llm_gate/settings)만 import.
* domain 은 doc-level taxonomy(documents.ai_domain) 상속 — LLM 에 domain 안 물음(프롬프트 경량).
* no silent fallback: call_triage 직접 호출(primary→Claude 분기 없음). 실패=status='failed', Claude 호출 0.
* Semaphore(1): acquire_mlx_gate(Priority.BACKGROUND) 경유 — foreground ask 우선, 새 Semaphore 금지.
선별 멱등 predicate (재시작/중복 확대 시 26B 재호출 0):
is_leaf=true AND source_type='hier_section' AND length(text)>=MIN_CHARS
AND NOT EXISTS(분석행 with 동일 chunk_id+prompt_version+source_content_hash)
(<MIN_CHARS leaf 는 LLM 0, status='skipped_tiny' 행만 박제)
실행 (GPU 서버, fastapi 컨테이너):
docker compose exec -T fastapi python /app/scripts/section_summary_pilot.py dry-run --doc 5225
docker compose exec -T fastapi python /app/scripts/section_summary_pilot.py run --doc 5225 [--limit 30]
docker compose exec -T fastapi python /app/scripts/section_summary_pilot.py report --doc 5225
"""
import argparse
import asyncio
import json
import os
import statistics
import sys
import time
# fastapi 컨테이너는 WORKDIR=/app 에 코드를 펼쳐놓음 (app/ 디렉토리 없음).
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from ai.client import AIClient, parse_json_response, strip_thinking
from core.config import settings
from services.search.llm_gate import Priority, acquire_mlx_gate
PROMPT_VERSION = "section-summary-v1"
MIN_CHARS = 100 # <100자 = heading-only orphan(실측 성격 확인) → skipped_tiny, LLM 0
CALL_TIMEOUT_S = 45 # gate 안쪽 안전 timeout (call_triage httpx 자체 timeout=30s 의 상위 보호)
# 절-전용 기능 역할 enum (locked). 'other' 안전판. LLM 이 이 외 값 내면 'other' coerce.
SECTION_TYPES = {
"definition", "requirement", "procedure", "formula", "data_table",
"example", "case_study", "question", "reference", "overview", "other",
}
PROMPT_TEMPLATE = """당신은 기술 문서의 한 '절(section)'을 분석하는 도우미입니다.
아래 절의 heading 경로와 본문을 읽고, JSON 하나만 출력하세요.
heading_path: {heading_path}
---본문 시작---
{body}
---본문 끝---
출력 JSON 스키마 (이 객체 하나만, 다른 텍스트 금지):
{{
"summary": "이 절의 핵심을 2~3문장 한국어로 요약",
"section_type": "<아래 enum 중 정확히 하나>",
"confidence": 0.0~1.0
}}
section_type enum (주된 역할 하나만 선택):
- definition: 용어/개념의 정의
- requirement: 요구사항/기준/규정/제약
- procedure: 절차/단계/방법/수행 지침
- formula: 수식/계산식/산식
- data_table: 표/수치 데이터 나열
- example: 예시/사례 설명
- case_study: 구체적 사례 연구
- question: 문제/질문
- reference: 참고/인용/목록/색인
- overview: 개요/서론/소개/범위
- other: 위 어디에도 해당 없음
JSON 외 다른 텍스트는 절대 출력하지 마세요."""
def _make_engine():
"""phase1d_pilot 패턴 — script 전용 engine (event-loop 바인딩 안전)."""
db_url = os.environ["DATABASE_URL"]
return create_async_engine(db_url, pool_pre_ping=True)
# ── 선별 (멱등) ──────────────────────────────────────────────────────────────
_SELECT_SQL = text("""
SELECT dc.id AS chunk_id,
dc.doc_id AS doc_id,
dc.chunk_index AS chunk_index,
dc.heading_path AS heading_path,
dc.section_title AS section_title,
dc.text AS body,
length(dc.text) AS body_len,
dc.chunk_content_hash AS content_hash,
d.ai_domain AS doc_domain
FROM document_chunks dc
JOIN documents d ON d.id = dc.doc_id
WHERE dc.source_type = 'hier_section'
AND dc.is_leaf = true
AND dc.doc_id = :doc
AND NOT EXISTS (
SELECT 1 FROM chunk_section_analysis a
WHERE a.chunk_id = dc.id
AND a.prompt_version = :pv
AND a.source_content_hash = dc.chunk_content_hash
)
ORDER BY dc.chunk_index
""")
_UPSERT_SQL = text("""
INSERT INTO chunk_section_analysis
(chunk_id, status, summary, section_type, domain, confidence,
model, prompt_version, source_content_hash, error, updated_at)
VALUES
(:chunk_id, :status, :summary, :section_type, :domain, :confidence,
:model, :pv, :content_hash, :error, now())
ON CONFLICT (chunk_id, prompt_version) DO UPDATE SET
status = EXCLUDED.status,
summary = EXCLUDED.summary,
section_type = EXCLUDED.section_type,
domain = EXCLUDED.domain,
confidence = EXCLUDED.confidence,
model = EXCLUDED.model,
source_content_hash = EXCLUDED.source_content_hash,
error = EXCLUDED.error,
updated_at = now()
""")
async def _select_targets(session, doc: int):
rows = (await session.execute(_SELECT_SQL, {"doc": doc, "pv": PROMPT_VERSION})).mappings().all()
skip = [r for r in rows if r["body_len"] < MIN_CHARS]
analyze = [r for r in rows if r["body_len"] >= MIN_CHARS]
return analyze, skip
def _coerce_type(raw_type) -> str:
t = (raw_type or "").strip().lower()
return t if t in SECTION_TYPES else "other"
def _build_prompt(row) -> str:
return PROMPT_TEMPLATE.format(
heading_path=(row["heading_path"] or row["section_title"] or "(제목 없음)"),
body=row["body"],
)
# ── subcommands ──────────────────────────────────────────────────────────────
async def cmd_dry_run(args):
"""LLM 호출 0. 대상/skip 집계 + 본문길이 분포 + 샘플 heading 출력."""
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
async with sm() as session:
analyze, skip = await _select_targets(session, args.doc)
await engine.dispose()
print(f"[dry-run] doc={args.doc} prompt_version={PROMPT_VERSION} MIN_CHARS={MIN_CHARS}")
print(f" analyze (>= {MIN_CHARS}자, 미처리분): {len(analyze)}")
print(f" skip (< {MIN_CHARS}자, skipped_tiny 예정): {len(skip)}")
if analyze:
lens = [r["body_len"] for r in analyze]
print(f" analyze 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)}")
print(f" 샘플 heading (앞 8개):")
for r in analyze[:8]:
print(f" [{r['body_len']:>5}자] {(r['heading_path'] or r['section_title'] or '')[:70]}")
print(" ⚠ LLM 호출 0 (scaffold 검증용).")
async def cmd_run(args):
"""active — skip 행 박제 + analyze 절 26B 호출(gate) + upsert. leaf당 시간 측정."""
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
async with sm() as session:
analyze, skip = await _select_targets(session, args.doc)
if args.limit is not None:
analyze = analyze[: args.limit]
# 1) skip 행 박제 (LLM 0)
for r in skip:
await session.execute(_UPSERT_SQL, {
"chunk_id": r["chunk_id"], "status": "skipped_tiny",
"summary": None, "section_type": None,
"domain": r["doc_domain"], "confidence": None,
"model": None, "pv": PROMPT_VERSION,
"content_hash": r["content_hash"], "error": None,
})
await session.commit()
print(f"[run] doc={args.doc} skip 행 {len(skip)} 박제(skipped_tiny). analyze 대상 {len(analyze)} 시작.")
# 2) analyze — Mac mini 26B (BACKGROUND gate, no fallback)
client = AIClient()
model_name = settings.ai.triage.model
timings, types, confs = [], [], []
n_ok = n_fail = 0
try:
for i, r in enumerate(analyze, 1):
prompt = _build_prompt(r)
status, summary, sec_type, conf, err = "failed", None, None, None, None
start = time.perf_counter()
try:
async with acquire_mlx_gate(Priority.BACKGROUND):
async with asyncio.timeout(CALL_TIMEOUT_S):
raw = await client.call_triage(prompt)
elapsed = time.perf_counter() - start
timings.append(elapsed)
parsed = parse_json_response(strip_thinking(raw)) if raw else None
if parsed and isinstance(parsed, dict):
summary = (parsed.get("summary") or "").strip() or None
sec_type = _coerce_type(parsed.get("section_type"))
try:
conf = float(parsed.get("confidence"))
except (TypeError, ValueError):
conf = 0.5
status = "summarized"
n_ok += 1
types.append(sec_type)
confs.append(conf)
else:
err = "parse_failed"
n_fail += 1
except Exception as exc: # timeout / 호출 실패 — no fallback
elapsed = time.perf_counter() - start
timings.append(elapsed)
err = f"{type(exc).__name__}: {repr(exc)[:160]}"
n_fail += 1
await session.execute(_UPSERT_SQL, {
"chunk_id": r["chunk_id"], "status": status,
"summary": summary, "section_type": sec_type,
"domain": r["doc_domain"], "confidence": conf,
"model": model_name, "pv": PROMPT_VERSION,
"content_hash": r["content_hash"], "error": err,
})
await session.commit()
if i % 20 == 0 or i == len(analyze):
print(f" ... {i}/{len(analyze)} (ok={n_ok} fail={n_fail}, last={elapsed:.1f}s)")
finally:
await client.close()
await engine.dispose()
# 측정 보고 (ETA + guard 충분성 lock 입력)
print(f"\n[run] doc={args.doc} 완료: ok={n_ok} fail={n_fail} skip={len(skip)}")
if timings:
print(f" leaf당 호출시간: avg={statistics.mean(timings):.2f}s "
f"p50={statistics.median(timings):.2f}s "
f"max={max(timings):.2f}s (n={len(timings)})")
print(f" → foreground worst-case 지연 ≈ 진행중 leaf 1건 = max {max(timings):.1f}s")
if types:
from collections import Counter
dist = Counter(types)
other_ratio = dist.get("other", 0) / len(types)
print(f" section_type 분포: {dict(dist.most_common())}")
print(f" other 비율: {other_ratio:.1%} (높으면 enum 확장 신호)")
if confs:
print(f" confidence: avg={statistics.mean(confs):.2f} min={min(confs):.2f}")
async def cmd_report(args):
"""chunk_section_analysis 현황 (doc 또는 전체)."""
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
where = "WHERE dc.doc_id = :doc" if args.doc else ""
params = {"doc": args.doc} if args.doc else {}
async with sm() as session:
rows = (await session.execute(text(f"""
SELECT dc.doc_id, a.status, a.section_type, count(*) AS n,
round(avg(a.confidence)::numeric, 2) AS avg_conf
FROM chunk_section_analysis a
JOIN document_chunks dc ON dc.id = a.chunk_id
{where}
GROUP BY dc.doc_id, a.status, a.section_type
ORDER BY dc.doc_id, a.status, n DESC
"""), params)).mappings().all()
await engine.dispose()
if not rows:
print("[report] 분석 행 없음.")
return
print(f"[report] doc={args.doc or 'ALL'}")
for r in rows:
print(f" doc={r['doc_id']} status={r['status']:<13} "
f"type={str(r['section_type']):<12} n={r['n']:<4} avg_conf={r['avg_conf']}")
def main():
ap = argparse.ArgumentParser(description="hier_section per-leaf Mac mini 분석 pilot")
sub = ap.add_subparsers(dest="cmd", required=True)
p_dry = sub.add_parser("dry-run", help="대상/skip 집계 (LLM 0)")
p_dry.add_argument("--doc", type=int, required=True)
p_run = sub.add_parser("run", help="active — 26B 호출 + 저장")
p_run.add_argument("--doc", type=int, required=True)
p_run.add_argument("--limit", type=int, default=None, help="analyze 상한 (Step B 'first N')")
p_rep = sub.add_parser("report", help="현황 집계")
p_rep.add_argument("--doc", type=int, default=None)
args = ap.parse_args()
fn = {"dry-run": cmd_dry_run, "run": cmd_run, "report": cmd_report}[args.cmd]
asyncio.run(fn(args))
if __name__ == "__main__":
main()