"""PR-DocSrv-Hier-Section-Summary-1 — per-절(leaf) Mac mini 분석 pilot (one-shot admin script). hier_section is_leaf 청크(절)를 Mac mini gemma-4-26B 로 절 단위 **요약 + 기능 type 분류**. 결과를 chunk_section_analysis(migration 286)에 저장. 문서레벨 분석과 별개의 절-레벨 축. * 영구 worker 경로 아님 — pilot 한정 수동 배치. 상시 enqueue worker 배선은 별 PR(rebuild 동반). * `./scripts` mount 라 image rebuild 없이 실행. baked 모듈(AIClient/llm_gate/settings)만 import. * domain 은 doc-level taxonomy(documents.ai_domain) 상속 — LLM 에 domain 안 물음(프롬프트 경량). * no silent fallback: call_triage 직접 호출(primary→Claude 분기 없음). 실패=status='failed', Claude 호출 0. * Semaphore(1): acquire_mlx_gate(Priority.BACKGROUND) 경유 — foreground ask 우선, 새 Semaphore 금지. 선별 멱등 predicate (재시작/중복 확대 시 26B 재호출 0): is_leaf=true AND source_type='hier_section' AND length(text)>=MIN_CHARS AND NOT EXISTS(분석행 with 동일 chunk_id+prompt_version+source_content_hash) (", "confidence": 0.0~1.0 }} section_type enum (주된 역할 하나만 선택): - definition: 용어/개념의 정의 - requirement: 요구사항/기준/규정/제약 - procedure: 절차/단계/방법/수행 지침 - formula: 수식/계산식/산식 - data_table: 표/수치 데이터 나열 - example: 예시/사례 설명 - case_study: 구체적 사례 연구 - question: 문제/질문 - reference: 참고/인용/목록/색인 - overview: 개요/서론/소개/범위 - other: 위 어디에도 해당 없음 JSON 외 다른 텍스트는 절대 출력하지 마세요.""" def _make_engine(): """phase1d_pilot 패턴 — script 전용 engine (event-loop 바인딩 안전).""" db_url = os.environ["DATABASE_URL"] return create_async_engine(db_url, pool_pre_ping=True) # ── 선별 (멱등) ────────────────────────────────────────────────────────────── _SELECT_SQL = text(""" SELECT dc.id AS chunk_id, dc.doc_id AS doc_id, dc.chunk_index AS chunk_index, dc.heading_path AS heading_path, dc.section_title AS section_title, dc.text AS body, length(dc.text) AS body_len, dc.chunk_content_hash AS content_hash, d.ai_domain AS doc_domain FROM document_chunks dc JOIN documents d ON d.id = dc.doc_id WHERE dc.source_type = 'hier_section' AND dc.is_leaf = true AND dc.doc_id = :doc AND NOT EXISTS ( SELECT 1 FROM chunk_section_analysis a WHERE a.chunk_id = dc.id AND a.prompt_version = :pv AND a.source_content_hash = dc.chunk_content_hash ) ORDER BY dc.chunk_index """) _UPSERT_SQL = text(""" INSERT INTO chunk_section_analysis (chunk_id, status, summary, section_type, domain, confidence, model, prompt_version, source_content_hash, error, updated_at) VALUES (:chunk_id, :status, :summary, :section_type, :domain, :confidence, :model, :pv, :content_hash, :error, now()) ON CONFLICT (chunk_id, prompt_version) DO UPDATE SET status = EXCLUDED.status, summary = EXCLUDED.summary, section_type = EXCLUDED.section_type, domain = EXCLUDED.domain, confidence = EXCLUDED.confidence, model = EXCLUDED.model, source_content_hash = EXCLUDED.source_content_hash, error = EXCLUDED.error, updated_at = now() """) async def _select_targets(session, doc: int): rows = (await session.execute(_SELECT_SQL, {"doc": doc, "pv": PROMPT_VERSION})).mappings().all() skip = [r for r in rows if r["body_len"] < MIN_CHARS] analyze = [r for r in rows if r["body_len"] >= MIN_CHARS] return analyze, skip def _coerce_type(raw_type) -> str: t = (raw_type or "").strip().lower() return t if t in SECTION_TYPES else "other" def _build_prompt(row) -> str: return PROMPT_TEMPLATE.format( heading_path=(row["heading_path"] or row["section_title"] or "(제목 없음)"), body=row["body"], ) # ── subcommands ────────────────────────────────────────────────────────────── async def cmd_dry_run(args): """LLM 호출 0. 대상/skip 집계 + 본문길이 분포 + 샘플 heading 출력.""" engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) async with sm() as session: analyze, skip = await _select_targets(session, args.doc) await engine.dispose() print(f"[dry-run] doc={args.doc} prompt_version={PROMPT_VERSION} MIN_CHARS={MIN_CHARS}") print(f" analyze (>= {MIN_CHARS}자, 미처리분): {len(analyze)}") print(f" skip (< {MIN_CHARS}자, skipped_tiny 예정): {len(skip)}") if analyze: lens = [r["body_len"] for r in analyze] print(f" analyze 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)}") print(f" 샘플 heading (앞 8개):") for r in analyze[:8]: print(f" [{r['body_len']:>5}자] {(r['heading_path'] or r['section_title'] or '')[:70]}") print(" ⚠ LLM 호출 0 (scaffold 검증용).") async def cmd_run(args): """active — skip 행 박제 + analyze 절 26B 호출(gate) + upsert. leaf당 시간 측정.""" engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) async with sm() as session: analyze, skip = await _select_targets(session, args.doc) if args.limit is not None: analyze = analyze[: args.limit] # 1) skip 행 박제 (LLM 0) for r in skip: await session.execute(_UPSERT_SQL, { "chunk_id": r["chunk_id"], "status": "skipped_tiny", "summary": None, "section_type": None, "domain": r["doc_domain"], "confidence": None, "model": None, "pv": PROMPT_VERSION, "content_hash": r["content_hash"], "error": None, }) await session.commit() print(f"[run] doc={args.doc} skip 행 {len(skip)} 박제(skipped_tiny). analyze 대상 {len(analyze)} 시작.") # 2) analyze — Mac mini 26B (BACKGROUND gate, no fallback) client = AIClient() model_name = settings.ai.triage.model timings, types, confs = [], [], [] n_ok = n_fail = 0 try: for i, r in enumerate(analyze, 1): prompt = _build_prompt(r) status, summary, sec_type, conf, err = "failed", None, None, None, None start = time.perf_counter() try: async with acquire_mlx_gate(Priority.BACKGROUND): async with asyncio.timeout(CALL_TIMEOUT_S): raw = await client.call_triage(prompt) elapsed = time.perf_counter() - start timings.append(elapsed) parsed = parse_json_response(strip_thinking(raw)) if raw else None if parsed and isinstance(parsed, dict): summary = (parsed.get("summary") or "").strip() or None sec_type = _coerce_type(parsed.get("section_type")) try: conf = float(parsed.get("confidence")) except (TypeError, ValueError): conf = 0.5 status = "summarized" n_ok += 1 types.append(sec_type) confs.append(conf) else: err = "parse_failed" n_fail += 1 except Exception as exc: # timeout / 호출 실패 — no fallback elapsed = time.perf_counter() - start timings.append(elapsed) err = f"{type(exc).__name__}: {repr(exc)[:160]}" n_fail += 1 await session.execute(_UPSERT_SQL, { "chunk_id": r["chunk_id"], "status": status, "summary": summary, "section_type": sec_type, "domain": r["doc_domain"], "confidence": conf, "model": model_name, "pv": PROMPT_VERSION, "content_hash": r["content_hash"], "error": err, }) await session.commit() if i % 20 == 0 or i == len(analyze): print(f" ... {i}/{len(analyze)} (ok={n_ok} fail={n_fail}, last={elapsed:.1f}s)") finally: await client.close() await engine.dispose() # 측정 보고 (ETA + guard 충분성 lock 입력) print(f"\n[run] doc={args.doc} 완료: ok={n_ok} fail={n_fail} skip={len(skip)}") if timings: print(f" leaf당 호출시간: avg={statistics.mean(timings):.2f}s " f"p50={statistics.median(timings):.2f}s " f"max={max(timings):.2f}s (n={len(timings)})") print(f" → foreground worst-case 지연 ≈ 진행중 leaf 1건 = max {max(timings):.1f}s") if types: from collections import Counter dist = Counter(types) other_ratio = dist.get("other", 0) / len(types) print(f" section_type 분포: {dict(dist.most_common())}") print(f" other 비율: {other_ratio:.1%} (높으면 enum 확장 신호)") if confs: print(f" confidence: avg={statistics.mean(confs):.2f} min={min(confs):.2f}") async def cmd_report(args): """chunk_section_analysis 현황 (doc 또는 전체).""" engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) where = "WHERE dc.doc_id = :doc" if args.doc else "" params = {"doc": args.doc} if args.doc else {} async with sm() as session: rows = (await session.execute(text(f""" SELECT dc.doc_id, a.status, a.section_type, count(*) AS n, round(avg(a.confidence)::numeric, 2) AS avg_conf FROM chunk_section_analysis a JOIN document_chunks dc ON dc.id = a.chunk_id {where} GROUP BY dc.doc_id, a.status, a.section_type ORDER BY dc.doc_id, a.status, n DESC """), params)).mappings().all() await engine.dispose() if not rows: print("[report] 분석 행 없음.") return print(f"[report] doc={args.doc or 'ALL'}") for r in rows: print(f" doc={r['doc_id']} status={r['status']:<13} " f"type={str(r['section_type']):<12} n={r['n']:<4} avg_conf={r['avg_conf']}") def main(): ap = argparse.ArgumentParser(description="hier_section per-leaf Mac mini 분석 pilot") sub = ap.add_subparsers(dest="cmd", required=True) p_dry = sub.add_parser("dry-run", help="대상/skip 집계 (LLM 0)") p_dry.add_argument("--doc", type=int, required=True) p_run = sub.add_parser("run", help="active — 26B 호출 + 저장") p_run.add_argument("--doc", type=int, required=True) p_run.add_argument("--limit", type=int, default=None, help="analyze 상한 (Step B 'first N')") p_rep = sub.add_parser("report", help="현황 집계") p_rep.add_argument("--doc", type=int, default=None) args = ap.parse_args() fn = {"dry-run": cmd_dry_run, "run": cmd_run, "report": cmd_report}[args.cmd] asyncio.run(fn(args)) if __name__ == "__main__": main()