cfadaaffd9
chunk_section_analysis 테이블(migration 286) + ORM model + pilot script. document_chunks(retrieval-hot)와 분리된 절-레벨 분석 축. domain 상속, section_type 절-전용 역할 enum, status로 skip 박제, source_content_hash로 stale 탐지. script-only(scripts mount, rebuild 불필요). LLM 0 dry-run 검증 = 5225 147 analyze + 17 skip. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
310 lines
13 KiB
Python
310 lines
13 KiB
Python
"""PR-DocSrv-Hier-Section-Summary-1 — per-절(leaf) Mac mini 분석 pilot (one-shot admin script).
|
|
|
|
hier_section is_leaf 청크(절)를 Mac mini gemma-4-26B 로 절 단위 **요약 + 기능 type 분류**.
|
|
결과를 chunk_section_analysis(migration 286)에 저장. 문서레벨 분석과 별개의 절-레벨 축.
|
|
|
|
* 영구 worker 경로 아님 — pilot 한정 수동 배치. 상시 enqueue worker 배선은 별 PR(rebuild 동반).
|
|
* `./scripts` mount 라 image rebuild 없이 실행. baked 모듈(AIClient/llm_gate/settings)만 import.
|
|
* domain 은 doc-level taxonomy(documents.ai_domain) 상속 — LLM 에 domain 안 물음(프롬프트 경량).
|
|
* no silent fallback: call_triage 직접 호출(primary→Claude 분기 없음). 실패=status='failed', Claude 호출 0.
|
|
* Semaphore(1): acquire_mlx_gate(Priority.BACKGROUND) 경유 — foreground ask 우선, 새 Semaphore 금지.
|
|
|
|
선별 멱등 predicate (재시작/중복 확대 시 26B 재호출 0):
|
|
is_leaf=true AND source_type='hier_section' AND length(text)>=MIN_CHARS
|
|
AND NOT EXISTS(분석행 with 동일 chunk_id+prompt_version+source_content_hash)
|
|
(<MIN_CHARS leaf 는 LLM 0, status='skipped_tiny' 행만 박제)
|
|
|
|
실행 (GPU 서버, fastapi 컨테이너):
|
|
docker compose exec -T fastapi python /app/scripts/section_summary_pilot.py dry-run --doc 5225
|
|
docker compose exec -T fastapi python /app/scripts/section_summary_pilot.py run --doc 5225 [--limit 30]
|
|
docker compose exec -T fastapi python /app/scripts/section_summary_pilot.py report --doc 5225
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import statistics
|
|
import sys
|
|
import time
|
|
|
|
# fastapi 컨테이너는 WORKDIR=/app 에 코드를 펼쳐놓음 (app/ 디렉토리 없음).
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
|
|
|
from ai.client import AIClient, parse_json_response, strip_thinking
|
|
from core.config import settings
|
|
from services.search.llm_gate import Priority, acquire_mlx_gate
|
|
|
|
PROMPT_VERSION = "section-summary-v1"
|
|
MIN_CHARS = 100 # <100자 = heading-only orphan(실측 성격 확인) → skipped_tiny, LLM 0
|
|
CALL_TIMEOUT_S = 45 # gate 안쪽 안전 timeout (call_triage httpx 자체 timeout=30s 의 상위 보호)
|
|
|
|
# 절-전용 기능 역할 enum (locked). 'other' 안전판. LLM 이 이 외 값 내면 'other' coerce.
|
|
SECTION_TYPES = {
|
|
"definition", "requirement", "procedure", "formula", "data_table",
|
|
"example", "case_study", "question", "reference", "overview", "other",
|
|
}
|
|
|
|
PROMPT_TEMPLATE = """당신은 기술 문서의 한 '절(section)'을 분석하는 도우미입니다.
|
|
아래 절의 heading 경로와 본문을 읽고, JSON 하나만 출력하세요.
|
|
|
|
heading_path: {heading_path}
|
|
---본문 시작---
|
|
{body}
|
|
---본문 끝---
|
|
|
|
출력 JSON 스키마 (이 객체 하나만, 다른 텍스트 금지):
|
|
{{
|
|
"summary": "이 절의 핵심을 2~3문장 한국어로 요약",
|
|
"section_type": "<아래 enum 중 정확히 하나>",
|
|
"confidence": 0.0~1.0
|
|
}}
|
|
|
|
section_type enum (주된 역할 하나만 선택):
|
|
- definition: 용어/개념의 정의
|
|
- requirement: 요구사항/기준/규정/제약
|
|
- procedure: 절차/단계/방법/수행 지침
|
|
- formula: 수식/계산식/산식
|
|
- data_table: 표/수치 데이터 나열
|
|
- example: 예시/사례 설명
|
|
- case_study: 구체적 사례 연구
|
|
- question: 문제/질문
|
|
- reference: 참고/인용/목록/색인
|
|
- overview: 개요/서론/소개/범위
|
|
- other: 위 어디에도 해당 없음
|
|
|
|
JSON 외 다른 텍스트는 절대 출력하지 마세요."""
|
|
|
|
|
|
def _make_engine():
|
|
"""phase1d_pilot 패턴 — script 전용 engine (event-loop 바인딩 안전)."""
|
|
db_url = os.environ["DATABASE_URL"]
|
|
return create_async_engine(db_url, pool_pre_ping=True)
|
|
|
|
|
|
# ── 선별 (멱등) ──────────────────────────────────────────────────────────────
|
|
_SELECT_SQL = text("""
|
|
SELECT dc.id AS chunk_id,
|
|
dc.doc_id AS doc_id,
|
|
dc.chunk_index AS chunk_index,
|
|
dc.heading_path AS heading_path,
|
|
dc.section_title AS section_title,
|
|
dc.text AS body,
|
|
length(dc.text) AS body_len,
|
|
dc.chunk_content_hash AS content_hash,
|
|
d.ai_domain AS doc_domain
|
|
FROM document_chunks dc
|
|
JOIN documents d ON d.id = dc.doc_id
|
|
WHERE dc.source_type = 'hier_section'
|
|
AND dc.is_leaf = true
|
|
AND dc.doc_id = :doc
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM chunk_section_analysis a
|
|
WHERE a.chunk_id = dc.id
|
|
AND a.prompt_version = :pv
|
|
AND a.source_content_hash = dc.chunk_content_hash
|
|
)
|
|
ORDER BY dc.chunk_index
|
|
""")
|
|
|
|
_UPSERT_SQL = text("""
|
|
INSERT INTO chunk_section_analysis
|
|
(chunk_id, status, summary, section_type, domain, confidence,
|
|
model, prompt_version, source_content_hash, error, updated_at)
|
|
VALUES
|
|
(:chunk_id, :status, :summary, :section_type, :domain, :confidence,
|
|
:model, :pv, :content_hash, :error, now())
|
|
ON CONFLICT (chunk_id, prompt_version) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
summary = EXCLUDED.summary,
|
|
section_type = EXCLUDED.section_type,
|
|
domain = EXCLUDED.domain,
|
|
confidence = EXCLUDED.confidence,
|
|
model = EXCLUDED.model,
|
|
source_content_hash = EXCLUDED.source_content_hash,
|
|
error = EXCLUDED.error,
|
|
updated_at = now()
|
|
""")
|
|
|
|
|
|
async def _select_targets(session, doc: int):
|
|
rows = (await session.execute(_SELECT_SQL, {"doc": doc, "pv": PROMPT_VERSION})).mappings().all()
|
|
skip = [r for r in rows if r["body_len"] < MIN_CHARS]
|
|
analyze = [r for r in rows if r["body_len"] >= MIN_CHARS]
|
|
return analyze, skip
|
|
|
|
|
|
def _coerce_type(raw_type) -> str:
|
|
t = (raw_type or "").strip().lower()
|
|
return t if t in SECTION_TYPES else "other"
|
|
|
|
|
|
def _build_prompt(row) -> str:
|
|
return PROMPT_TEMPLATE.format(
|
|
heading_path=(row["heading_path"] or row["section_title"] or "(제목 없음)"),
|
|
body=row["body"],
|
|
)
|
|
|
|
|
|
# ── subcommands ──────────────────────────────────────────────────────────────
|
|
async def cmd_dry_run(args):
|
|
"""LLM 호출 0. 대상/skip 집계 + 본문길이 분포 + 샘플 heading 출력."""
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
async with sm() as session:
|
|
analyze, skip = await _select_targets(session, args.doc)
|
|
await engine.dispose()
|
|
|
|
print(f"[dry-run] doc={args.doc} prompt_version={PROMPT_VERSION} MIN_CHARS={MIN_CHARS}")
|
|
print(f" analyze (>= {MIN_CHARS}자, 미처리분): {len(analyze)}")
|
|
print(f" skip (< {MIN_CHARS}자, skipped_tiny 예정): {len(skip)}")
|
|
if analyze:
|
|
lens = [r["body_len"] for r in analyze]
|
|
print(f" analyze 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)}")
|
|
print(f" 샘플 heading (앞 8개):")
|
|
for r in analyze[:8]:
|
|
print(f" [{r['body_len']:>5}자] {(r['heading_path'] or r['section_title'] or '')[:70]}")
|
|
print(" ⚠ LLM 호출 0 (scaffold 검증용).")
|
|
|
|
|
|
async def cmd_run(args):
|
|
"""active — skip 행 박제 + analyze 절 26B 호출(gate) + upsert. leaf당 시간 측정."""
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
async with sm() as session:
|
|
analyze, skip = await _select_targets(session, args.doc)
|
|
|
|
if args.limit is not None:
|
|
analyze = analyze[: args.limit]
|
|
|
|
# 1) skip 행 박제 (LLM 0)
|
|
for r in skip:
|
|
await session.execute(_UPSERT_SQL, {
|
|
"chunk_id": r["chunk_id"], "status": "skipped_tiny",
|
|
"summary": None, "section_type": None,
|
|
"domain": r["doc_domain"], "confidence": None,
|
|
"model": None, "pv": PROMPT_VERSION,
|
|
"content_hash": r["content_hash"], "error": None,
|
|
})
|
|
await session.commit()
|
|
print(f"[run] doc={args.doc} skip 행 {len(skip)} 박제(skipped_tiny). analyze 대상 {len(analyze)} 시작.")
|
|
|
|
# 2) analyze — Mac mini 26B (BACKGROUND gate, no fallback)
|
|
client = AIClient()
|
|
model_name = settings.ai.triage.model
|
|
timings, types, confs = [], [], []
|
|
n_ok = n_fail = 0
|
|
try:
|
|
for i, r in enumerate(analyze, 1):
|
|
prompt = _build_prompt(r)
|
|
status, summary, sec_type, conf, err = "failed", None, None, None, None
|
|
start = time.perf_counter()
|
|
try:
|
|
async with acquire_mlx_gate(Priority.BACKGROUND):
|
|
async with asyncio.timeout(CALL_TIMEOUT_S):
|
|
raw = await client.call_triage(prompt)
|
|
elapsed = time.perf_counter() - start
|
|
timings.append(elapsed)
|
|
parsed = parse_json_response(strip_thinking(raw)) if raw else None
|
|
if parsed and isinstance(parsed, dict):
|
|
summary = (parsed.get("summary") or "").strip() or None
|
|
sec_type = _coerce_type(parsed.get("section_type"))
|
|
try:
|
|
conf = float(parsed.get("confidence"))
|
|
except (TypeError, ValueError):
|
|
conf = 0.5
|
|
status = "summarized"
|
|
n_ok += 1
|
|
types.append(sec_type)
|
|
confs.append(conf)
|
|
else:
|
|
err = "parse_failed"
|
|
n_fail += 1
|
|
except Exception as exc: # timeout / 호출 실패 — no fallback
|
|
elapsed = time.perf_counter() - start
|
|
timings.append(elapsed)
|
|
err = f"{type(exc).__name__}: {repr(exc)[:160]}"
|
|
n_fail += 1
|
|
|
|
await session.execute(_UPSERT_SQL, {
|
|
"chunk_id": r["chunk_id"], "status": status,
|
|
"summary": summary, "section_type": sec_type,
|
|
"domain": r["doc_domain"], "confidence": conf,
|
|
"model": model_name, "pv": PROMPT_VERSION,
|
|
"content_hash": r["content_hash"], "error": err,
|
|
})
|
|
await session.commit()
|
|
if i % 20 == 0 or i == len(analyze):
|
|
print(f" ... {i}/{len(analyze)} (ok={n_ok} fail={n_fail}, last={elapsed:.1f}s)")
|
|
finally:
|
|
await client.close()
|
|
await engine.dispose()
|
|
|
|
# 측정 보고 (ETA + guard 충분성 lock 입력)
|
|
print(f"\n[run] doc={args.doc} 완료: ok={n_ok} fail={n_fail} skip={len(skip)}")
|
|
if timings:
|
|
print(f" leaf당 호출시간: avg={statistics.mean(timings):.2f}s "
|
|
f"p50={statistics.median(timings):.2f}s "
|
|
f"max={max(timings):.2f}s (n={len(timings)})")
|
|
print(f" → foreground worst-case 지연 ≈ 진행중 leaf 1건 = max {max(timings):.1f}s")
|
|
if types:
|
|
from collections import Counter
|
|
dist = Counter(types)
|
|
other_ratio = dist.get("other", 0) / len(types)
|
|
print(f" section_type 분포: {dict(dist.most_common())}")
|
|
print(f" other 비율: {other_ratio:.1%} (높으면 enum 확장 신호)")
|
|
if confs:
|
|
print(f" confidence: avg={statistics.mean(confs):.2f} min={min(confs):.2f}")
|
|
|
|
|
|
async def cmd_report(args):
|
|
"""chunk_section_analysis 현황 (doc 또는 전체)."""
|
|
engine = _make_engine()
|
|
sm = async_sessionmaker(engine, expire_on_commit=False)
|
|
where = "WHERE dc.doc_id = :doc" if args.doc else ""
|
|
params = {"doc": args.doc} if args.doc else {}
|
|
async with sm() as session:
|
|
rows = (await session.execute(text(f"""
|
|
SELECT dc.doc_id, a.status, a.section_type, count(*) AS n,
|
|
round(avg(a.confidence)::numeric, 2) AS avg_conf
|
|
FROM chunk_section_analysis a
|
|
JOIN document_chunks dc ON dc.id = a.chunk_id
|
|
{where}
|
|
GROUP BY dc.doc_id, a.status, a.section_type
|
|
ORDER BY dc.doc_id, a.status, n DESC
|
|
"""), params)).mappings().all()
|
|
await engine.dispose()
|
|
if not rows:
|
|
print("[report] 분석 행 없음.")
|
|
return
|
|
print(f"[report] doc={args.doc or 'ALL'}")
|
|
for r in rows:
|
|
print(f" doc={r['doc_id']} status={r['status']:<13} "
|
|
f"type={str(r['section_type']):<12} n={r['n']:<4} avg_conf={r['avg_conf']}")
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="hier_section per-leaf Mac mini 분석 pilot")
|
|
sub = ap.add_subparsers(dest="cmd", required=True)
|
|
|
|
p_dry = sub.add_parser("dry-run", help="대상/skip 집계 (LLM 0)")
|
|
p_dry.add_argument("--doc", type=int, required=True)
|
|
|
|
p_run = sub.add_parser("run", help="active — 26B 호출 + 저장")
|
|
p_run.add_argument("--doc", type=int, required=True)
|
|
p_run.add_argument("--limit", type=int, default=None, help="analyze 상한 (Step B 'first N')")
|
|
|
|
p_rep = sub.add_parser("report", help="현황 집계")
|
|
p_rep.add_argument("--doc", type=int, default=None)
|
|
|
|
args = ap.parse_args()
|
|
fn = {"dry-run": cmd_dry_run, "run": cmd_run, "report": cmd_report}[args.cmd]
|
|
asyncio.run(fn(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|