From cfadaaffd9876caec316ed2fc4856dfadf376b4c Mon Sep 17 00:00:00 2001 From: hyungi Date: Sun, 24 May 2026 13:45:30 +0000 Subject: [PATCH] feat(search): hier section per-leaf analysis scaffold (Section-Summary-1 c1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit chunk_section_analysis 테이블(migration 286) + ORM model + pilot script. document_chunks(retrieval-hot)와 분리된 절-레벨 분석 축. domain 상속, section_type 절-전용 역할 enum, status로 skip 박제, source_content_hash로 stale 탐지. script-only(scripts mount, rebuild 불필요). LLM 0 dry-run 검증 = 5225 147 analyze + 17 skip. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/models/section_analysis.py | 49 ++++ migrations/286_chunk_section_analysis.sql | 37 +++ scripts/section_summary_pilot.py | 309 ++++++++++++++++++++++ 3 files changed, 395 insertions(+) create mode 100644 app/models/section_analysis.py create mode 100644 migrations/286_chunk_section_analysis.sql create mode 100644 scripts/section_summary_pilot.py diff --git a/app/models/section_analysis.py b/app/models/section_analysis.py new file mode 100644 index 0000000..c101d36 --- /dev/null +++ b/app/models/section_analysis.py @@ -0,0 +1,49 @@ +"""chunk_section_analysis 테이블 ORM (PR-DocSrv-Hier-Section-Summary-1). + +per-절(hier_section is_leaf) Mac mini 분석 결과 저장. document_chunks(retrieval-hot) +와 분리된 절-레벨 분석 축. migration 286 에서 테이블 생성. + +⚠ pilot 단계(scripts/section_summary_pilot.py)는 `./scripts` mount 로 rebuild 없이 +돌지만, 이 모델은 `app/` 이라 baked — 즉 pilot script 는 이 모델을 import 하지 않고 +raw SQL 을 쓴다. 본 모델은 (1) 스키마 문서화 (2) 향후 상시 worker 배선(별 PR, image +rebuild 동반) 용도. 컬럼 정의는 migration 286 과 단일 진실로 동기 유지. +""" + +from datetime import datetime + +from sqlalchemy import BigInteger, DateTime, Float, ForeignKey, Text, text +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class ChunkSectionAnalysis(Base): + __tablename__ = "chunk_section_analysis" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + # FK CASCADE — document_chunks 에 종속된 분석 데이터(1:1). parent_id(self-FK, app-level)와 의도적 차이. + chunk_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("document_chunks.id", ondelete="CASCADE"), nullable=False + ) + # summarized | skipped_tiny | failed — skip 도 행으로 박제(미처리 vs 의도 skip 구분) + status: Mapped[str] = mapped_column(Text, nullable=False) + summary: Mapped[str | None] = mapped_column(Text) + # 절-전용 역할 enum (느슨한 text, CHECK 미설정 — pilot 관찰 후 조임). + # definition/requirement/procedure/formula/data_table/example/case_study/question/reference/overview/other + section_type: Mapped[str | None] = mapped_column(Text) + # doc-level taxonomy path(documents.ai_domain) 상속 스냅샷. + domain: Mapped[str | None] = mapped_column(Text) + confidence: Mapped[float | None] = mapped_column(Float) + model: Mapped[str | None] = mapped_column(Text) + prompt_version: Mapped[str] = mapped_column(Text, nullable=False) + # 분석 시점 leaf chunk_content_hash 스냅샷 — 원문 변경(재분해) stale 탐지. + source_content_hash: Mapped[str | None] = mapped_column(Text) + error: Mapped[str | None] = mapped_column(Text) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=text("now()"), nullable=False + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=text("now()"), nullable=False + ) + + # UNIQUE(chunk_id, prompt_version) 는 migration 286 에 정의 (ORM 미반영 — 조회/upsert 는 raw SQL). diff --git a/migrations/286_chunk_section_analysis.sql b/migrations/286_chunk_section_analysis.sql new file mode 100644 index 0000000..0fb235e --- /dev/null +++ b/migrations/286_chunk_section_analysis.sql @@ -0,0 +1,37 @@ +-- 286_chunk_section_analysis.sql +-- PR-DocSrv-Hier-Section-Summary-1: per-절(leaf) Mac mini 분석 결과 저장 테이블. +-- +-- document_chunks(retrieval-hot: ivfflat + 2 gin + 6 btree)와 분리된 "절-레벨 분석 축". +-- hot 테이블 무손상 + builder/replace 재작성과 무관. 검색 코퍼스와 완전 분리(additive). +-- +-- 컬럼 설계 (사용자 review 2026-05-24): +-- status : summarized | skipped_tiny | failed — skip 도 행으로 박제(미처리 vs 의도 skip 구분) +-- summary/section_type/domain/confidence : 분석 결과 (skip/failed 행은 NULL 가능) +-- section_type : 절-전용 역할 enum (느슨한 text, CHECK 미설정 — pilot 관찰 후 조임) +-- domain : doc-level taxonomy path(d.ai_domain) 상속 스냅샷 +-- model/prompt_version: 모델·프롬프트 변경 추적 +-- source_content_hash : 분석 시점 leaf chunk_content_hash 스냅샷 — 원문 변경(재분해) stale 탐지 +-- +-- chunk_id = FK CASCADE: chunk_section_analysis 는 document_chunks 에 종속된 분석 데이터. +-- parent_id(자기참조 트리, cascade 모호성 회피 위해 app-level)와 달리 여기는 단순 1:1 종속 → FK 정당. +-- +-- UNIQUE(chunk_id, prompt_version): 같은 절을 같은 프롬프트로 1행. 원문 변경 시 동일 키 ON CONFLICT 갱신. +-- (chunk_id 단독 조회는 이 복합 UNIQUE 의 leftmost prefix btree 로 커버 → 별 인덱스 불필요) +-- +-- ⚠ 단일 statement (migration runner exec_driver_sql). BEGIN/COMMIT/ROLLBACK 금지. +CREATE TABLE IF NOT EXISTS chunk_section_analysis ( + id BIGSERIAL PRIMARY KEY, + chunk_id BIGINT NOT NULL REFERENCES document_chunks(id) ON DELETE CASCADE, + status TEXT NOT NULL, + summary TEXT, + section_type TEXT, + domain TEXT, + confidence REAL, + model TEXT, + prompt_version TEXT NOT NULL, + source_content_hash TEXT, + error TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + UNIQUE (chunk_id, prompt_version) +); diff --git a/scripts/section_summary_pilot.py b/scripts/section_summary_pilot.py new file mode 100644 index 0000000..8feefb8 --- /dev/null +++ b/scripts/section_summary_pilot.py @@ -0,0 +1,309 @@ +"""PR-DocSrv-Hier-Section-Summary-1 — per-절(leaf) Mac mini 분석 pilot (one-shot admin script). + +hier_section is_leaf 청크(절)를 Mac mini gemma-4-26B 로 절 단위 **요약 + 기능 type 분류**. +결과를 chunk_section_analysis(migration 286)에 저장. 문서레벨 분석과 별개의 절-레벨 축. + +* 영구 worker 경로 아님 — pilot 한정 수동 배치. 상시 enqueue worker 배선은 별 PR(rebuild 동반). +* `./scripts` mount 라 image rebuild 없이 실행. baked 모듈(AIClient/llm_gate/settings)만 import. +* domain 은 doc-level taxonomy(documents.ai_domain) 상속 — LLM 에 domain 안 물음(프롬프트 경량). +* no silent fallback: call_triage 직접 호출(primary→Claude 분기 없음). 실패=status='failed', Claude 호출 0. +* Semaphore(1): acquire_mlx_gate(Priority.BACKGROUND) 경유 — foreground ask 우선, 새 Semaphore 금지. + +선별 멱등 predicate (재시작/중복 확대 시 26B 재호출 0): + is_leaf=true AND source_type='hier_section' AND length(text)>=MIN_CHARS + AND NOT EXISTS(분석행 with 동일 chunk_id+prompt_version+source_content_hash) + (", + "confidence": 0.0~1.0 +}} + +section_type enum (주된 역할 하나만 선택): +- definition: 용어/개념의 정의 +- requirement: 요구사항/기준/규정/제약 +- procedure: 절차/단계/방법/수행 지침 +- formula: 수식/계산식/산식 +- data_table: 표/수치 데이터 나열 +- example: 예시/사례 설명 +- case_study: 구체적 사례 연구 +- question: 문제/질문 +- reference: 참고/인용/목록/색인 +- overview: 개요/서론/소개/범위 +- other: 위 어디에도 해당 없음 + +JSON 외 다른 텍스트는 절대 출력하지 마세요.""" + + +def _make_engine(): + """phase1d_pilot 패턴 — script 전용 engine (event-loop 바인딩 안전).""" + db_url = os.environ["DATABASE_URL"] + return create_async_engine(db_url, pool_pre_ping=True) + + +# ── 선별 (멱등) ────────────────────────────────────────────────────────────── +_SELECT_SQL = text(""" + SELECT dc.id AS chunk_id, + dc.doc_id AS doc_id, + dc.chunk_index AS chunk_index, + dc.heading_path AS heading_path, + dc.section_title AS section_title, + dc.text AS body, + length(dc.text) AS body_len, + dc.chunk_content_hash AS content_hash, + d.ai_domain AS doc_domain + FROM document_chunks dc + JOIN documents d ON d.id = dc.doc_id + WHERE dc.source_type = 'hier_section' + AND dc.is_leaf = true + AND dc.doc_id = :doc + AND NOT EXISTS ( + SELECT 1 FROM chunk_section_analysis a + WHERE a.chunk_id = dc.id + AND a.prompt_version = :pv + AND a.source_content_hash = dc.chunk_content_hash + ) + ORDER BY dc.chunk_index +""") + +_UPSERT_SQL = text(""" + INSERT INTO chunk_section_analysis + (chunk_id, status, summary, section_type, domain, confidence, + model, prompt_version, source_content_hash, error, updated_at) + VALUES + (:chunk_id, :status, :summary, :section_type, :domain, :confidence, + :model, :pv, :content_hash, :error, now()) + ON CONFLICT (chunk_id, prompt_version) DO UPDATE SET + status = EXCLUDED.status, + summary = EXCLUDED.summary, + section_type = EXCLUDED.section_type, + domain = EXCLUDED.domain, + confidence = EXCLUDED.confidence, + model = EXCLUDED.model, + source_content_hash = EXCLUDED.source_content_hash, + error = EXCLUDED.error, + updated_at = now() +""") + + +async def _select_targets(session, doc: int): + rows = (await session.execute(_SELECT_SQL, {"doc": doc, "pv": PROMPT_VERSION})).mappings().all() + skip = [r for r in rows if r["body_len"] < MIN_CHARS] + analyze = [r for r in rows if r["body_len"] >= MIN_CHARS] + return analyze, skip + + +def _coerce_type(raw_type) -> str: + t = (raw_type or "").strip().lower() + return t if t in SECTION_TYPES else "other" + + +def _build_prompt(row) -> str: + return PROMPT_TEMPLATE.format( + heading_path=(row["heading_path"] or row["section_title"] or "(제목 없음)"), + body=row["body"], + ) + + +# ── subcommands ────────────────────────────────────────────────────────────── +async def cmd_dry_run(args): + """LLM 호출 0. 대상/skip 집계 + 본문길이 분포 + 샘플 heading 출력.""" + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + async with sm() as session: + analyze, skip = await _select_targets(session, args.doc) + await engine.dispose() + + print(f"[dry-run] doc={args.doc} prompt_version={PROMPT_VERSION} MIN_CHARS={MIN_CHARS}") + print(f" analyze (>= {MIN_CHARS}자, 미처리분): {len(analyze)}") + print(f" skip (< {MIN_CHARS}자, skipped_tiny 예정): {len(skip)}") + if analyze: + lens = [r["body_len"] for r in analyze] + print(f" analyze 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)}") + print(f" 샘플 heading (앞 8개):") + for r in analyze[:8]: + print(f" [{r['body_len']:>5}자] {(r['heading_path'] or r['section_title'] or '')[:70]}") + print(" ⚠ LLM 호출 0 (scaffold 검증용).") + + +async def cmd_run(args): + """active — skip 행 박제 + analyze 절 26B 호출(gate) + upsert. leaf당 시간 측정.""" + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + async with sm() as session: + analyze, skip = await _select_targets(session, args.doc) + + if args.limit is not None: + analyze = analyze[: args.limit] + + # 1) skip 행 박제 (LLM 0) + for r in skip: + await session.execute(_UPSERT_SQL, { + "chunk_id": r["chunk_id"], "status": "skipped_tiny", + "summary": None, "section_type": None, + "domain": r["doc_domain"], "confidence": None, + "model": None, "pv": PROMPT_VERSION, + "content_hash": r["content_hash"], "error": None, + }) + await session.commit() + print(f"[run] doc={args.doc} skip 행 {len(skip)} 박제(skipped_tiny). analyze 대상 {len(analyze)} 시작.") + + # 2) analyze — Mac mini 26B (BACKGROUND gate, no fallback) + client = AIClient() + model_name = settings.ai.triage.model + timings, types, confs = [], [], [] + n_ok = n_fail = 0 + try: + for i, r in enumerate(analyze, 1): + prompt = _build_prompt(r) + status, summary, sec_type, conf, err = "failed", None, None, None, None + start = time.perf_counter() + try: + async with acquire_mlx_gate(Priority.BACKGROUND): + async with asyncio.timeout(CALL_TIMEOUT_S): + raw = await client.call_triage(prompt) + elapsed = time.perf_counter() - start + timings.append(elapsed) + parsed = parse_json_response(strip_thinking(raw)) if raw else None + if parsed and isinstance(parsed, dict): + summary = (parsed.get("summary") or "").strip() or None + sec_type = _coerce_type(parsed.get("section_type")) + try: + conf = float(parsed.get("confidence")) + except (TypeError, ValueError): + conf = 0.5 + status = "summarized" + n_ok += 1 + types.append(sec_type) + confs.append(conf) + else: + err = "parse_failed" + n_fail += 1 + except Exception as exc: # timeout / 호출 실패 — no fallback + elapsed = time.perf_counter() - start + timings.append(elapsed) + err = f"{type(exc).__name__}: {repr(exc)[:160]}" + n_fail += 1 + + await session.execute(_UPSERT_SQL, { + "chunk_id": r["chunk_id"], "status": status, + "summary": summary, "section_type": sec_type, + "domain": r["doc_domain"], "confidence": conf, + "model": model_name, "pv": PROMPT_VERSION, + "content_hash": r["content_hash"], "error": err, + }) + await session.commit() + if i % 20 == 0 or i == len(analyze): + print(f" ... {i}/{len(analyze)} (ok={n_ok} fail={n_fail}, last={elapsed:.1f}s)") + finally: + await client.close() + await engine.dispose() + + # 측정 보고 (ETA + guard 충분성 lock 입력) + print(f"\n[run] doc={args.doc} 완료: ok={n_ok} fail={n_fail} skip={len(skip)}") + if timings: + print(f" leaf당 호출시간: avg={statistics.mean(timings):.2f}s " + f"p50={statistics.median(timings):.2f}s " + f"max={max(timings):.2f}s (n={len(timings)})") + print(f" → foreground worst-case 지연 ≈ 진행중 leaf 1건 = max {max(timings):.1f}s") + if types: + from collections import Counter + dist = Counter(types) + other_ratio = dist.get("other", 0) / len(types) + print(f" section_type 분포: {dict(dist.most_common())}") + print(f" other 비율: {other_ratio:.1%} (높으면 enum 확장 신호)") + if confs: + print(f" confidence: avg={statistics.mean(confs):.2f} min={min(confs):.2f}") + + +async def cmd_report(args): + """chunk_section_analysis 현황 (doc 또는 전체).""" + engine = _make_engine() + sm = async_sessionmaker(engine, expire_on_commit=False) + where = "WHERE dc.doc_id = :doc" if args.doc else "" + params = {"doc": args.doc} if args.doc else {} + async with sm() as session: + rows = (await session.execute(text(f""" + SELECT dc.doc_id, a.status, a.section_type, count(*) AS n, + round(avg(a.confidence)::numeric, 2) AS avg_conf + FROM chunk_section_analysis a + JOIN document_chunks dc ON dc.id = a.chunk_id + {where} + GROUP BY dc.doc_id, a.status, a.section_type + ORDER BY dc.doc_id, a.status, n DESC + """), params)).mappings().all() + await engine.dispose() + if not rows: + print("[report] 분석 행 없음.") + return + print(f"[report] doc={args.doc or 'ALL'}") + for r in rows: + print(f" doc={r['doc_id']} status={r['status']:<13} " + f"type={str(r['section_type']):<12} n={r['n']:<4} avg_conf={r['avg_conf']}") + + +def main(): + ap = argparse.ArgumentParser(description="hier_section per-leaf Mac mini 분석 pilot") + sub = ap.add_subparsers(dest="cmd", required=True) + + p_dry = sub.add_parser("dry-run", help="대상/skip 집계 (LLM 0)") + p_dry.add_argument("--doc", type=int, required=True) + + p_run = sub.add_parser("run", help="active — 26B 호출 + 저장") + p_run.add_argument("--doc", type=int, required=True) + p_run.add_argument("--limit", type=int, default=None, help="analyze 상한 (Step B 'first N')") + + p_rep = sub.add_parser("report", help="현황 집계") + p_rep.add_argument("--doc", type=int, default=None) + + args = ap.parse_args() + fn = {"dry-run": cmd_dry_run, "run": cmd_run, "report": cmd_report}[args.cmd] + asyncio.run(fn(args)) + + +if __name__ == "__main__": + main()