"""오버나이트 hier 분해 + 절 분석 backfill (ADDITIVE — 검색 코퍼스 미교체). Engineering + Industrial_Safety 미분해 기술문서를 deadline(기본 07:00 KST) 전까지: doc → persist_hier_tree(build + leaf embed, in_corpus=false) → 절 분석(Mac mini gemma-26B) → commit 검색 코퍼스(replace_doc_corpus) 미터치 → eval baseline/reindex 무관, 무위험. 시간 초과 시 leaf 경계에서 안전 중단(멱등 — 다음 실행이 미처리분만 이어서). 절 분석 상수/헬퍼는 section_summary_pilot 에서 import = PROMPT_VERSION 단일 진실(멱등 보존). no silent fallback(call_triage 직접) / Semaphore(1) BACKGROUND gate / 새 Semaphore 금지. 실행 (GPU 서버, background): docker compose exec -T fastapi python /app/scripts/hier_overnight_backfill.py run --deadline 07:00 docker compose exec -T fastapi python /app/scripts/hier_overnight_backfill.py dry-run """ import argparse import asyncio import os import statistics import sys import time from collections import Counter from datetime import datetime, timedelta sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) from sqlalchemy import text from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from ai.client import AIClient, parse_json_response, strip_thinking from core.config import settings from services.hier_decomp.persist import persist_hier_tree from services.search.llm_gate import Priority, acquire_mlx_gate # 단일 진실: 절 분석 상수/헬퍼 (PROMPT_VERSION 일치 = 멱등 보존) from section_summary_pilot import ( CALL_TIMEOUT_S, MIN_CHARS, PROMPT_VERSION, _UPSERT_SQL, _build_prompt, _coerce_type, ) EXCLUDE_DOMAINS = ["news"] # 기본 = 뉴스만 제외, 나머지 전부 (allowlist 는 --domains 로 override) DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTURE_SPLIT_THRESHOLD=4000) BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단 def _candidate_sql(allowlist, doc_ids=None): """allowlist 있으면 그 domain 만, 없으면 EXCLUDE_DOMAINS(news) 제외 전부. doc_ids 명시 시 = 그 doc 만(크기 게이트 DOC_MIN_CHARS + domain 필터 우회 — 구조화 소형 문서(법령 등) eval coverage 보정용. NOT EXISTS hier 멱등 가드는 유지). 작은 doc 먼저 = 완료 doc 수 최대화 + 단일 mega-doc 예산 독식 방지.""" if doc_ids: cond, gate = "d.id = ANY(:doc_ids)", "" # 명시 doc = 크기 게이트 우회 else: cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)" if allowlist else "lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)") gate = "AND length(d.extracted_text) > :minchars" return text(f""" SELECT d.id AS doc_id, d.extracted_text AS body, d.ai_domain AS ai_domain FROM documents d WHERE d.extracted_text IS NOT NULL {gate} AND {cond} AND NOT EXISTS (SELECT 1 FROM document_chunks dc WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section') ORDER BY length(d.extracted_text) ASC """) def _candidate_params(allowlist, doc_ids=None): if doc_ids: return {"doc_ids": doc_ids} p = {"minchars": DOC_MIN_CHARS} if allowlist: p["domains"] = allowlist else: p["exclude"] = EXCLUDE_DOMAINS return p def _scope_label(allowlist, doc_ids=None): if doc_ids: return f"doc-list={len(doc_ids)}건(크기게이트 우회)" return f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}" # 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외) LEAF_SQL = text(""" SELECT dc.id AS chunk_id, dc.heading_path, dc.section_title, dc.text AS body, length(dc.text) AS body_len, dc.chunk_content_hash AS content_hash FROM document_chunks dc WHERE dc.doc_id = :doc AND dc.source_type = 'hier_section' AND dc.is_leaf = true AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a WHERE a.chunk_id = dc.id AND a.prompt_version = :pv AND a.source_content_hash = dc.chunk_content_hash) ORDER BY dc.chunk_index """) def _now(): return datetime.now() def _log(msg): print(f"[{_now():%H:%M:%S}] {msg}", flush=True) def _compute_deadline(hhmm: str) -> datetime: h, m = (int(x) for x in hhmm.split(":")) now = _now() target = now.replace(hour=h, minute=m, second=0, microsecond=0) if target <= now: target += timedelta(days=1) return target def _make_engine(): return create_async_engine(os.environ["DATABASE_URL"], pool_pre_ping=True) async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at): """doc 의 미분석 hier leaf 분석 → upsert. stop_at(epoch) 넘으면 leaf 경계 중단.""" rows = (await session.execute(LEAF_SQL, {"doc": doc_id, "pv": PROMPT_VERSION})).mappings().all() ok = fail = skip = 0 timings, types = [], [] aborted = False for r in rows: if time.time() >= stop_at: aborted = True break if r["body_len"] < MIN_CHARS: await session.execute(_UPSERT_SQL, { "chunk_id": r["chunk_id"], "status": "skipped_tiny", "summary": None, "section_type": None, "domain": doc_domain, "confidence": None, "model": None, "pv": PROMPT_VERSION, "content_hash": r["content_hash"], "error": None, }) skip += 1 continue status, summary, sec_type, conf, err = "failed", None, None, None, None start = time.perf_counter() try: async with acquire_mlx_gate(Priority.BACKGROUND): async with asyncio.timeout(CALL_TIMEOUT_S): raw = await client.call_triage(_build_prompt(r)) timings.append(time.perf_counter() - start) parsed = parse_json_response(strip_thinking(raw)) if raw else None if parsed and isinstance(parsed, dict): summary = (parsed.get("summary") or "").strip() or None sec_type = _coerce_type(parsed.get("section_type")) try: conf = float(parsed.get("confidence")) except (TypeError, ValueError): conf = 0.5 status, ok = "summarized", ok + 1 types.append(sec_type) else: err, fail = "parse_failed", fail + 1 except Exception as exc: # timeout/호출 실패 — no fallback timings.append(time.perf_counter() - start) err, fail = f"{type(exc).__name__}: {repr(exc)[:160]}", fail + 1 await session.execute(_UPSERT_SQL, { "chunk_id": r["chunk_id"], "status": status, "summary": summary, "section_type": sec_type, "domain": doc_domain, "confidence": conf, "model": model_name, "pv": PROMPT_VERSION, "content_hash": r["content_hash"], "error": err, }) await session.commit() await session.commit() return {"ok": ok, "fail": fail, "skip": skip, "leaves": len(rows), "timings": timings, "types": types, "aborted": aborted} def _parse_doc_ids(args): raw = getattr(args, "doc", None) return [int(x) for x in raw.split(",") if x.strip()] if raw else None async def cmd_dry_run(args): allowlist = args.domains.split(",") if args.domains else None doc_ids = _parse_doc_ids(args) engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) async with sm() as session: rows = (await session.execute(_candidate_sql(allowlist, doc_ids), _candidate_params(allowlist, doc_ids))).mappings().all() await engine.dispose() gate_lbl = "doc-list" if doc_ids else f">{DOC_MIN_CHARS}자" print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids)}, {gate_lbl}, 미분해)") if rows: lens = [len(r["body"]) for r in rows] print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}") print(" 앞 5개:") for r in rows[:5]: print(f" doc={r['doc_id']} {len(r['body']):>7,}자 {r['ai_domain']}") async def cmd_run(args): allowlist = args.domains.split(",") if args.domains else None doc_ids = _parse_doc_ids(args) skip_analysis = getattr(args, "skip_analysis", False) deadline = _compute_deadline(args.deadline) stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp() _log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) " f"{_scope_label(allowlist, doc_ids)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}") engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) client = AIClient() model_name = settings.ai.triage.model async def embed_leaf(t): try: return await client.embed(t) except Exception as exc: _log(f" embed 실패(무시, in_corpus=false): {type(exc).__name__}") return None tot_docs = tot_ok = tot_fail = tot_skip = tot_leaves_created = 0 all_timings, all_types = [], [] run_start = time.time() try: async with sm() as session: cands = (await session.execute(_candidate_sql(allowlist, doc_ids), _candidate_params(allowlist, doc_ids))).mappings().all() _log(f"후보 doc {len(cands)} 선별. 시작.") for c in cands: if time.time() >= stop_at: _log(f"⏰ deadline 버퍼 도달 — doc 경계에서 중단 (처리 {tot_docs} doc)") break doc_id, body, doc_domain = c["doc_id"], c["body"], c["ai_domain"] try: async with sm() as session: pstat = await persist_hier_tree(session, doc_id, body, embed_leaf) leaves_created = pstat.get("leaves", 0) tot_leaves_created += leaves_created if skip_analysis: # 분해+임베딩만 (절 분석 = Mac mini 별 축, retrieval 무관). 멱등. astat = {"ok": 0, "fail": 0, "skip": 0, "leaves": leaves_created, "timings": [], "types": [], "aborted": False} else: async with sm() as session: astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at) except Exception as exc: _log(f" ✗ doc={doc_id} 처리 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}") continue tot_docs += 1 tot_ok += astat["ok"]; tot_fail += astat["fail"]; tot_skip += astat["skip"] all_timings += astat["timings"]; all_types += astat["types"] avg = statistics.mean(astat["timings"]) if astat["timings"] else 0 _log(f" ✓ doc={doc_id} ({len(body):,}자 {doc_domain.split('/')[0]}) " f"leaf생성={leaves_created} 분석ok={astat['ok']} fail={astat['fail']} skip={astat['skip']} " f"avg={avg:.1f}s{' [ABORT]' if astat['aborted'] else ''} | 누적 {tot_docs}doc {tot_ok}leaf") if astat["aborted"]: _log("⏰ leaf 분석 중 deadline 도달 — 중단") break finally: await client.close() await engine.dispose() elapsed = (time.time() - run_start) / 60 _log(f"=== 종료: {tot_docs} doc, leaf생성 {tot_leaves_created}, " f"분석 ok={tot_ok} fail={tot_fail} skip={tot_skip}, 경과 {elapsed:.0f}분 ===") if all_timings: _log(f" leaf당 {statistics.mean(all_timings):.2f}s (p50={statistics.median(all_timings):.2f} " f"max={max(all_timings):.2f})") if all_types: d = Counter(all_types) _log(f" section_type: {dict(d.most_common())} other={d.get('other',0)/len(all_types):.1%}") def main(): ap = argparse.ArgumentParser(description="오버나이트 hier 분해+절 분석 backfill (additive)") sub = ap.add_subparsers(dest="cmd", required=True) p_dry = sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)") p_dry.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)") p_dry.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)") p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)") p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00 — 컨테이너 UTC 주의, 07:00 KST=22:00 UTC)") p_run.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)") p_run.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)") p_run.add_argument("--skip-analysis", action="store_true", help="절 분석(Mac mini) 생략, 분해+임베딩만 (retrieval go/no-go 측정 준비용)") args = ap.parse_args() fn = {"dry-run": cmd_dry_run, "run": cmd_run}[args.cmd] asyncio.run(fn(args)) if __name__ == "__main__": main()