ops(hier): add section analysis backfill runner
hier 분해(additive, in_corpus=false) + 절 분석(Mac mini gemma-26B BACKGROUND gate) 오버나이트 backfill 러너. time-box deadline + per-doc commit + 멱등 선별(NOT EXISTS). section_summary_pilot 상수 재사용(PROMPT_VERSION 단일화). no silent fallback. 검증: Engineering+Industrial_Safety 245 doc / 6066 절 요약 / fail 0 (2026-05-24~25). 컨테이너 TZ=UTC → deadline KST 환산 주의. 종료는 컨테이너 내부 PID kill 필수. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,238 @@
|
||||
"""오버나이트 hier 분해 + 절 분석 backfill (ADDITIVE — 검색 코퍼스 미교체).
|
||||
|
||||
Engineering + Industrial_Safety 미분해 기술문서를 deadline(기본 07:00 KST) 전까지:
|
||||
doc → persist_hier_tree(build + leaf embed, in_corpus=false) → 절 분석(Mac mini gemma-26B) → commit
|
||||
검색 코퍼스(replace_doc_corpus) 미터치 → eval baseline/reindex 무관, 무위험.
|
||||
시간 초과 시 leaf 경계에서 안전 중단(멱등 — 다음 실행이 미처리분만 이어서).
|
||||
|
||||
절 분석 상수/헬퍼는 section_summary_pilot 에서 import = PROMPT_VERSION 단일 진실(멱등 보존).
|
||||
no silent fallback(call_triage 직접) / Semaphore(1) BACKGROUND gate / 새 Semaphore 금지.
|
||||
|
||||
실행 (GPU 서버, background):
|
||||
docker compose exec -T fastapi python /app/scripts/hier_overnight_backfill.py run --deadline 07:00
|
||||
docker compose exec -T fastapi python /app/scripts/hier_overnight_backfill.py dry-run
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import statistics
|
||||
import sys
|
||||
import time
|
||||
from collections import Counter
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from ai.client import AIClient, parse_json_response, strip_thinking
|
||||
from core.config import settings
|
||||
from services.hier_decomp.persist import persist_hier_tree
|
||||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||||
|
||||
# 단일 진실: 절 분석 상수/헬퍼 (PROMPT_VERSION 일치 = 멱등 보존)
|
||||
from section_summary_pilot import (
|
||||
CALL_TIMEOUT_S, MIN_CHARS, PROMPT_VERSION, _UPSERT_SQL, _build_prompt, _coerce_type,
|
||||
)
|
||||
|
||||
DEFAULT_DOMAINS = ["engineering", "industrial_safety"]
|
||||
DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTURE_SPLIT_THRESHOLD=4000)
|
||||
BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단
|
||||
|
||||
CANDIDATE_SQL = text("""
|
||||
SELECT d.id AS doc_id, d.extracted_text AS body, d.ai_domain AS ai_domain
|
||||
FROM documents d
|
||||
WHERE d.extracted_text IS NOT NULL
|
||||
AND length(d.extracted_text) > :minchars
|
||||
AND lower(split_part(d.ai_domain, '/', 1)) = ANY(:domains)
|
||||
AND NOT EXISTS (SELECT 1 FROM document_chunks dc
|
||||
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')
|
||||
ORDER BY length(d.extracted_text) ASC
|
||||
""") # 작은 doc 먼저 = 완료 doc 수 최대화 + 단일 mega-doc 예산 독식 방지
|
||||
|
||||
# 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외)
|
||||
LEAF_SQL = text("""
|
||||
SELECT dc.id AS chunk_id, dc.heading_path, dc.section_title,
|
||||
dc.text AS body, length(dc.text) AS body_len,
|
||||
dc.chunk_content_hash AS content_hash
|
||||
FROM document_chunks dc
|
||||
WHERE dc.doc_id = :doc AND dc.source_type = 'hier_section' AND dc.is_leaf = true
|
||||
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
|
||||
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
|
||||
AND a.source_content_hash = dc.chunk_content_hash)
|
||||
ORDER BY dc.chunk_index
|
||||
""")
|
||||
|
||||
|
||||
def _now():
|
||||
return datetime.now()
|
||||
|
||||
|
||||
def _log(msg):
|
||||
print(f"[{_now():%H:%M:%S}] {msg}", flush=True)
|
||||
|
||||
|
||||
def _compute_deadline(hhmm: str) -> datetime:
|
||||
h, m = (int(x) for x in hhmm.split(":"))
|
||||
now = _now()
|
||||
target = now.replace(hour=h, minute=m, second=0, microsecond=0)
|
||||
if target <= now:
|
||||
target += timedelta(days=1)
|
||||
return target
|
||||
|
||||
|
||||
def _make_engine():
|
||||
return create_async_engine(os.environ["DATABASE_URL"], pool_pre_ping=True)
|
||||
|
||||
|
||||
async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at):
|
||||
"""doc 의 미분석 hier leaf 분석 → upsert. stop_at(epoch) 넘으면 leaf 경계 중단."""
|
||||
rows = (await session.execute(LEAF_SQL, {"doc": doc_id, "pv": PROMPT_VERSION})).mappings().all()
|
||||
ok = fail = skip = 0
|
||||
timings, types = [], []
|
||||
aborted = False
|
||||
for r in rows:
|
||||
if time.time() >= stop_at:
|
||||
aborted = True
|
||||
break
|
||||
if r["body_len"] < MIN_CHARS:
|
||||
await session.execute(_UPSERT_SQL, {
|
||||
"chunk_id": r["chunk_id"], "status": "skipped_tiny", "summary": None,
|
||||
"section_type": None, "domain": doc_domain, "confidence": None,
|
||||
"model": None, "pv": PROMPT_VERSION, "content_hash": r["content_hash"], "error": None,
|
||||
})
|
||||
skip += 1
|
||||
continue
|
||||
status, summary, sec_type, conf, err = "failed", None, None, None, None
|
||||
start = time.perf_counter()
|
||||
try:
|
||||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||||
async with asyncio.timeout(CALL_TIMEOUT_S):
|
||||
raw = await client.call_triage(_build_prompt(r))
|
||||
timings.append(time.perf_counter() - start)
|
||||
parsed = parse_json_response(strip_thinking(raw)) if raw else None
|
||||
if parsed and isinstance(parsed, dict):
|
||||
summary = (parsed.get("summary") or "").strip() or None
|
||||
sec_type = _coerce_type(parsed.get("section_type"))
|
||||
try:
|
||||
conf = float(parsed.get("confidence"))
|
||||
except (TypeError, ValueError):
|
||||
conf = 0.5
|
||||
status, ok = "summarized", ok + 1
|
||||
types.append(sec_type)
|
||||
else:
|
||||
err, fail = "parse_failed", fail + 1
|
||||
except Exception as exc: # timeout/호출 실패 — no fallback
|
||||
timings.append(time.perf_counter() - start)
|
||||
err, fail = f"{type(exc).__name__}: {repr(exc)[:160]}", fail + 1
|
||||
await session.execute(_UPSERT_SQL, {
|
||||
"chunk_id": r["chunk_id"], "status": status, "summary": summary,
|
||||
"section_type": sec_type, "domain": doc_domain, "confidence": conf,
|
||||
"model": model_name, "pv": PROMPT_VERSION,
|
||||
"content_hash": r["content_hash"], "error": err,
|
||||
})
|
||||
await session.commit()
|
||||
await session.commit()
|
||||
return {"ok": ok, "fail": fail, "skip": skip, "leaves": len(rows),
|
||||
"timings": timings, "types": types, "aborted": aborted}
|
||||
|
||||
|
||||
async def cmd_dry_run(args):
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
rows = (await session.execute(CANDIDATE_SQL,
|
||||
{"minchars": DOC_MIN_CHARS, "domains": DEFAULT_DOMAINS})).mappings().all()
|
||||
await engine.dispose()
|
||||
print(f"[dry-run] 후보 doc {len(rows)} (domains={DEFAULT_DOMAINS}, >{DOC_MIN_CHARS}자, 미분해)")
|
||||
if rows:
|
||||
lens = [len(r["body"]) for r in rows]
|
||||
print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}")
|
||||
print(" 앞 5개:")
|
||||
for r in rows[:5]:
|
||||
print(f" doc={r['doc_id']} {len(r['body']):>7,}자 {r['ai_domain']}")
|
||||
|
||||
|
||||
async def cmd_run(args):
|
||||
deadline = _compute_deadline(args.deadline)
|
||||
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
|
||||
_log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
|
||||
f"domains={DEFAULT_DOMAINS}")
|
||||
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
client = AIClient()
|
||||
model_name = settings.ai.triage.model
|
||||
|
||||
async def embed_leaf(t):
|
||||
try:
|
||||
return await client.embed(t)
|
||||
except Exception as exc:
|
||||
_log(f" embed 실패(무시, in_corpus=false): {type(exc).__name__}")
|
||||
return None
|
||||
|
||||
tot_docs = tot_ok = tot_fail = tot_skip = tot_leaves_created = 0
|
||||
all_timings, all_types = [], []
|
||||
run_start = time.time()
|
||||
try:
|
||||
async with sm() as session:
|
||||
cands = (await session.execute(CANDIDATE_SQL,
|
||||
{"minchars": DOC_MIN_CHARS, "domains": DEFAULT_DOMAINS})).mappings().all()
|
||||
_log(f"후보 doc {len(cands)} 선별. 시작.")
|
||||
|
||||
for c in cands:
|
||||
if time.time() >= stop_at:
|
||||
_log(f"⏰ deadline 버퍼 도달 — doc 경계에서 중단 (처리 {tot_docs} doc)")
|
||||
break
|
||||
doc_id, body, doc_domain = c["doc_id"], c["body"], c["ai_domain"]
|
||||
try:
|
||||
async with sm() as session:
|
||||
pstat = await persist_hier_tree(session, doc_id, body, embed_leaf)
|
||||
leaves_created = pstat.get("leaves", 0)
|
||||
tot_leaves_created += leaves_created
|
||||
async with sm() as session:
|
||||
astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at)
|
||||
except Exception as exc:
|
||||
_log(f" ✗ doc={doc_id} 처리 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}")
|
||||
continue
|
||||
|
||||
tot_docs += 1
|
||||
tot_ok += astat["ok"]; tot_fail += astat["fail"]; tot_skip += astat["skip"]
|
||||
all_timings += astat["timings"]; all_types += astat["types"]
|
||||
avg = statistics.mean(astat["timings"]) if astat["timings"] else 0
|
||||
_log(f" ✓ doc={doc_id} ({len(body):,}자 {doc_domain.split('/')[0]}) "
|
||||
f"leaf생성={leaves_created} 분석ok={astat['ok']} fail={astat['fail']} skip={astat['skip']} "
|
||||
f"avg={avg:.1f}s{' [ABORT]' if astat['aborted'] else ''} | 누적 {tot_docs}doc {tot_ok}leaf")
|
||||
if astat["aborted"]:
|
||||
_log("⏰ leaf 분석 중 deadline 도달 — 중단")
|
||||
break
|
||||
finally:
|
||||
await client.close()
|
||||
await engine.dispose()
|
||||
|
||||
elapsed = (time.time() - run_start) / 60
|
||||
_log(f"=== 종료: {tot_docs} doc, leaf생성 {tot_leaves_created}, "
|
||||
f"분석 ok={tot_ok} fail={tot_fail} skip={tot_skip}, 경과 {elapsed:.0f}분 ===")
|
||||
if all_timings:
|
||||
_log(f" leaf당 {statistics.mean(all_timings):.2f}s (p50={statistics.median(all_timings):.2f} "
|
||||
f"max={max(all_timings):.2f})")
|
||||
if all_types:
|
||||
d = Counter(all_types)
|
||||
_log(f" section_type: {dict(d.most_common())} other={d.get('other',0)/len(all_types):.1%}")
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="오버나이트 hier 분해+절 분석 backfill (additive)")
|
||||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||||
sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)")
|
||||
p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)")
|
||||
p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00, 지나면 다음날)")
|
||||
args = ap.parse_args()
|
||||
fn = {"dry-run": cmd_dry_run, "run": cmd_run}[args.cmd]
|
||||
asyncio.run(fn(args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user