Files
hyungi_document_server/scripts/hier_overnight_backfill.py
T
hyungi d3aa640f65 feat(documents): hier analyze 서브커맨드 — 재분해와 독립한 절분석 self-heal (g3-t3 갭)
re-decompose 의 char_start 완료마커는 'jump-target char_start 보유'라 컨테이너 recreate/deadline 으로
analyze 가 잘린 doc(char_start 있으나 일부 leaf 미분석)을 재선별 못 함 → rail summary 영구 미수렴 갭.
`analyze` 가 LEAF_SQL(미분석 leaf 보유) 기준 독립 선별로 수렴(멱등, --doc 제한 가능, jump 무관).
sweep 로그도 `analyze` 커맨드 안내로 갱신. (2026-06-10 백필서 recreate 로 잘린 5 doc·53 leaf 수동 처리한 케이스 항구화.)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-11 11:16:44 +09:00

495 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""오버나이트 hier 분해 + 절 분석 backfill (ADDITIVE — 검색 코퍼스 미교체).
Engineering + Industrial_Safety 미분해 기술문서를 deadline(기본 07:00 KST) 전까지:
doc → persist_hier_tree(build + leaf embed, in_corpus=false) → 절 분석(Mac mini gemma-26B) → commit
검색 코퍼스(replace_doc_corpus) 미터치 → eval baseline/reindex 무관, 무위험.
시간 초과 시 leaf 경계에서 안전 중단(멱등 — 다음 실행이 미처리분만 이어서).
절 분석 상수/헬퍼는 section_summary_pilot 에서 import = PROMPT_VERSION 단일 진실(멱등 보존).
no silent fallback(call_triage 직접) / Semaphore(1) BACKGROUND gate / 새 Semaphore 금지.
실행 (GPU 서버, background):
docker compose exec -T fastapi python /app/scripts/hier_overnight_backfill.py run --deadline 07:00
docker compose exec -T fastapi python /app/scripts/hier_overnight_backfill.py dry-run
"""
import argparse
import asyncio
import os
import statistics
import sys
import time
from collections import Counter
from datetime import datetime, timedelta
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from ai.client import AIClient, parse_json_response, strip_thinking
from core.config import settings
from services.hier_decomp.builder import build_hier_tree
from services.hier_decomp.persist import persist_hier_tree
from services.search.llm_gate import Priority, acquire_mlx_gate
# 단일 진실: 절 분석 상수/헬퍼 (PROMPT_VERSION 일치 = 멱등 보존)
from section_summary_pilot import (
CALL_TIMEOUT_S, MIN_CHARS, PROMPT_VERSION, _UPSERT_SQL, _build_prompt, _coerce_type,
)
EXCLUDE_DOMAINS = ["news"] # 기본 = 뉴스만 제외, 나머지 전부 (allowlist 는 --domains 로 override)
DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTURE_SPLIT_THRESHOLD=4000)
BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단
# jump-target = 비-window leaf OR %_split parent (B1/B3 완료마커 + B_jumptarget 분모, 플랜 g3-t2).
# 이 집합만 char_start 를 받는다(window-child/preamble 은 설계상 NULL).
_JUMP_TARGET_PRED = r"((c.is_leaf AND c.node_type IS DISTINCT FROM 'window') OR c.node_type LIKE '%\_split' ESCAPE '\')"
def _candidate_sql(allowlist, doc_ids=None, reprocess=False):
"""body = d.md_content (g0-t1: hier 출처 md_content 영구확정 — extracted_text 폐기. char_start 가
md_content offset 이라 FE splice basis 와 일치해야 하므로 분해 source 도 md_content 여야 함[F1]).
reprocess=False (additive): 아직 hier 없는 doc 만 신규 분해 (NOT EXISTS hier_section 멱등).
reprocess=True (re-decompose): hier 는 있으나 jump-target char_start 가 아직 안 채워진 doc 재분해.
[B1] 완료마커 = jump-target 중 char_start NOT NULL 행이 존재(=한 번 재분해되면 atomic 하게 전부 채워짐);
window-child/preamble 은 설계상 NULL 이라 'all-leaf NOT NULL' 마커의 무한 trap 을 피한다.
[B3] 빈 jump-target doc(B_jumptarget==0)은 NOT EXISTS 가 vacuous TRUE → 영구 재선택 trap →
호출측이 --doc 을 REFINED PASS(B_jumptarget>=1) 로 제한해 차단(--reprocess 는 --doc 필수, REFUSE).
doc_ids 명시 시 크기 게이트 우회. 작은 doc 먼저 = 완료 doc 수 최대화."""
if doc_ids:
cond, gate = "d.id = ANY(:doc_ids)", "" # 명시 doc = 크기 게이트 우회
else:
cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)"
if allowlist else
"lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)")
gate = "AND length(d.md_content) > :minchars"
if reprocess:
marker = f"""
AND EXISTS (SELECT 1 FROM document_chunks dc
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')
AND NOT EXISTS (SELECT 1 FROM document_chunks c
WHERE c.doc_id = d.id AND c.source_type = 'hier_section'
AND c.char_start IS NOT NULL AND {_JUMP_TARGET_PRED})"""
else:
marker = """
AND NOT EXISTS (SELECT 1 FROM document_chunks dc
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')"""
return text(f"""
SELECT d.id AS doc_id, d.md_content AS body, d.ai_domain AS ai_domain
FROM documents d
WHERE d.md_content IS NOT NULL AND length(d.md_content) > 0
{gate}
AND {cond}
{marker}
ORDER BY length(d.md_content) ASC
""")
def _candidate_params(allowlist, doc_ids=None):
if doc_ids:
return {"doc_ids": doc_ids}
p = {"minchars": DOC_MIN_CHARS}
if allowlist:
p["domains"] = allowlist
else:
p["exclude"] = EXCLUDE_DOMAINS
return p
def _scope_label(allowlist, doc_ids=None, reprocess=False):
tag = "RE-DECOMPOSE" if reprocess else "additive"
if doc_ids:
return f"doc-list={len(doc_ids)}건(크기게이트 우회, {tag})"
return (f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}") + f" ({tag})"
# 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외)
LEAF_SQL = text("""
SELECT dc.id AS chunk_id, dc.heading_path, dc.section_title,
dc.text AS body, length(dc.text) AS body_len,
dc.chunk_content_hash AS content_hash
FROM document_chunks dc
WHERE dc.doc_id = :doc AND dc.source_type = 'hier_section' AND dc.is_leaf = true
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
AND a.source_content_hash = dc.chunk_content_hash)
ORDER BY dc.chunk_index
""")
def _now():
return datetime.now()
def _log(msg):
print(f"[{_now():%H:%M:%S}] {msg}", flush=True)
def _compute_deadline(hhmm: str) -> datetime:
h, m = (int(x) for x in hhmm.split(":"))
now = _now()
target = now.replace(hour=h, minute=m, second=0, microsecond=0)
if target <= now:
target += timedelta(days=1)
return target
def _make_engine():
return create_async_engine(os.environ["DATABASE_URL"], pool_pre_ping=True)
async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at):
"""doc 의 미분석 hier leaf 분석 → upsert. stop_at(epoch) 넘으면 leaf 경계 중단."""
rows = (await session.execute(LEAF_SQL, {"doc": doc_id, "pv": PROMPT_VERSION})).mappings().all()
ok = fail = skip = 0
timings, types = [], []
aborted = False
for r in rows:
if time.time() >= stop_at:
aborted = True
break
if r["body_len"] < MIN_CHARS:
await session.execute(_UPSERT_SQL, {
"chunk_id": r["chunk_id"], "status": "skipped_tiny", "summary": None,
"section_type": None, "domain": doc_domain, "confidence": None,
"model": None, "pv": PROMPT_VERSION, "content_hash": r["content_hash"], "error": None,
})
skip += 1
continue
status, summary, sec_type, conf, err = "failed", None, None, None, None
start = time.perf_counter()
try:
async with acquire_mlx_gate(Priority.BACKGROUND):
async with asyncio.timeout(CALL_TIMEOUT_S):
raw = await client.call_triage(_build_prompt(r))
timings.append(time.perf_counter() - start)
parsed = parse_json_response(strip_thinking(raw)) if raw else None
if parsed and isinstance(parsed, dict):
summary = (parsed.get("summary") or "").strip() or None
sec_type = _coerce_type(parsed.get("section_type"))
try:
conf = float(parsed.get("confidence"))
except (TypeError, ValueError):
conf = 0.5
status, ok = "summarized", ok + 1
types.append(sec_type)
else:
err, fail = "parse_failed", fail + 1
except Exception as exc: # timeout/호출 실패 — no fallback
timings.append(time.perf_counter() - start)
err, fail = f"{type(exc).__name__}: {repr(exc)[:160]}", fail + 1
await session.execute(_UPSERT_SQL, {
"chunk_id": r["chunk_id"], "status": status, "summary": summary,
"section_type": sec_type, "domain": doc_domain, "confidence": conf,
"model": model_name, "pv": PROMPT_VERSION,
"content_hash": r["content_hash"], "error": err,
})
await session.commit()
await session.commit()
return {"ok": ok, "fail": fail, "skip": skip, "leaves": len(rows),
"timings": timings, "types": types, "aborted": aborted}
def _parse_doc_ids(args):
raw = getattr(args, "doc", None)
return [int(x) for x in raw.split(",") if x.strip()] if raw else None
async def cmd_dry_run(args):
allowlist = args.domains.split(",") if args.domains else None
doc_ids = _parse_doc_ids(args)
reprocess = getattr(args, "reprocess", False)
if reprocess and not doc_ids:
print("REFUSE: --reprocess 는 --doc <list> 필수 (B3 빈 jump-target trap 차단 — REFINED PASS 리스트만)")
sys.exit(2)
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
async with sm() as session:
rows = (await session.execute(_candidate_sql(allowlist, doc_ids, reprocess),
_candidate_params(allowlist, doc_ids))).mappings().all()
await engine.dispose()
gate_lbl = "doc-list" if doc_ids else f">{DOC_MIN_CHARS}"
state_lbl = "재분해 미완료(jump-target char_start 부재)" if reprocess else "미분해"
print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids, reprocess)}, {gate_lbl}, {state_lbl})")
if rows:
lens = [len(r["body"]) for r in rows]
print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}")
print(" 앞 5개:")
for r in rows[:5]:
print(f" doc={r['doc_id']} {len(r['body']):>7,}{r['ai_domain']}")
async def cmd_run(args):
allowlist = args.domains.split(",") if args.domains else None
doc_ids = _parse_doc_ids(args)
reprocess = getattr(args, "reprocess", False)
if reprocess and not doc_ids:
_log("REFUSE: --reprocess 는 --doc <list> 필수 (B3 빈 jump-target trap 차단 — REFINED PASS 리스트만)")
sys.exit(2)
skip_analysis = getattr(args, "skip_analysis", False)
deadline = _compute_deadline(args.deadline)
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
_log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
f"{_scope_label(allowlist, doc_ids, reprocess)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}"
f"{' [RE-DECOMPOSE: 기존 hier DELETE→CASCADE chunk_section_analysis→재INSERT; 스냅샷 선행 필수]' if reprocess else ''}")
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
client = AIClient()
model_name = settings.ai.triage.model
async def embed_leaf(t):
try:
return await client.embed(t)
except Exception as exc:
_log(f" embed 실패(무시, in_corpus=false): {type(exc).__name__}")
return None
tot_docs = tot_ok = tot_fail = tot_skip = tot_leaves_created = 0
all_timings, all_types = [], []
run_start = time.time()
try:
async with sm() as session:
cands = (await session.execute(_candidate_sql(allowlist, doc_ids, reprocess),
_candidate_params(allowlist, doc_ids))).mappings().all()
_log(f"후보 doc {len(cands)} 선별. 시작.")
for c in cands:
if time.time() >= stop_at:
_log(f"⏰ deadline 버퍼 도달 — doc 경계에서 중단 (처리 {tot_docs} doc)")
break
doc_id, body, doc_domain = c["doc_id"], c["body"], c["ai_domain"]
try:
async with sm() as session:
pstat = await persist_hier_tree(session, doc_id, body, embed_leaf)
leaves_created = pstat.get("leaves", 0)
tot_leaves_created += leaves_created
if skip_analysis:
# 분해+임베딩만 (절 분석 = Mac mini 별 축, retrieval 무관). 멱등.
astat = {"ok": 0, "fail": 0, "skip": 0, "leaves": leaves_created,
"timings": [], "types": [], "aborted": False}
else:
async with sm() as session:
astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at)
except Exception as exc:
_log(f" ✗ doc={doc_id} 처리 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}")
continue
tot_docs += 1
tot_ok += astat["ok"]; tot_fail += astat["fail"]; tot_skip += astat["skip"]
all_timings += astat["timings"]; all_types += astat["types"]
avg = statistics.mean(astat["timings"]) if astat["timings"] else 0
_log(f" ✓ doc={doc_id} ({len(body):,}{doc_domain.split('/')[0]}) "
f"leaf생성={leaves_created} 분석ok={astat['ok']} fail={astat['fail']} skip={astat['skip']} "
f"avg={avg:.1f}s{' [ABORT]' if astat['aborted'] else ''} | 누적 {tot_docs}doc {tot_ok}leaf")
if astat["aborted"]:
_log("⏰ leaf 분석 중 deadline 도달 — 중단")
break
finally:
await client.close()
await engine.dispose()
elapsed = (time.time() - run_start) / 60
_log(f"=== 종료: {tot_docs} doc, leaf생성 {tot_leaves_created}, "
f"분석 ok={tot_ok} fail={tot_fail} skip={tot_skip}, 경과 {elapsed:.0f}분 ===")
if all_timings:
_log(f" leaf당 {statistics.mean(all_timings):.2f}s (p50={statistics.median(all_timings):.2f} "
f"max={max(all_timings):.2f})")
if all_types:
d = Counter(all_types)
_log(f" section_type: {dict(d.most_common())} other={d.get('other',0)/len(all_types):.1%}")
# [g3-t3/g3-t4] post-run sweep: 처리한 doc 중 미분석 leaf 잔여 집계(반쪽상태/stall 검출).
# GOAL(jump=char_start)/rail-summary(re-analyze) DECOUPLE — 잔여는 다음 실행이 LEAF_SQL 멱등으로 흡수.
if doc_ids:
try:
async with sm() as session:
pending = (await session.execute(text(f"""
SELECT dc.doc_id, count(*) AS unanalyzed
FROM document_chunks dc
WHERE dc.doc_id = ANY(:ids) AND dc.source_type='hier_section' AND dc.is_leaf=true
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
AND a.source_content_hash = dc.chunk_content_hash)
GROUP BY dc.doc_id ORDER BY unanalyzed DESC"""),
{"ids": doc_ids, "pv": PROMPT_VERSION})).mappings().all()
if pending:
tot = sum(r["unanalyzed"] for r in pending)
_log(f" [sweep] 미분석 leaf 잔여: {tot} (doc {len(pending)}) — char_start 마커는 이들을 재선별 안 함; "
f"`analyze` 커맨드로 수렴(`analyze --deadline HH:MM`, 멱등). "
f"상위: {[(r['doc_id'], r['unanalyzed']) for r in pending[:5]]}")
else:
_log(" [sweep] 미분석 leaf 잔여 0 — 분석 수렴.")
except Exception as exc:
_log(f" [sweep] 잔여 집계 실패(무해): {type(exc).__name__}")
def _is_jump_target(node) -> bool:
"""jump-target = 비-window leaf OR %_split parent (builder HierNode 판정, _JUMP_TARGET_PRED 와 일치)."""
return ((node.is_leaf and node.node_type != "window")
or bool(node.node_type and node.node_type.endswith("_split")))
async def cmd_update_char_start(args):
"""[g3-tU] hash_stable doc 전용 비파괴 char_start UPDATE.
각 doc: build(md_content) → stored hier 행과 position-by-position(chunk_index 순) 정렬 →
[NEW-1] jump-target 전수 100% hash 일치(ALL-OR-NOTHING) VERIFY. 단 한 자리라도 불일치 → DEMOTE.
[NEW-2] hash 로 WHERE 하지 않음(동일-body 절 충돌 회피) — position 의 stored row PK(id)로 UPDATE.
통과 doc: UPDATE document_chunks SET char_start (DELETE/CASCADE/embed/analyze 0, 가역).
미달 doc: DEMOTE-LIST 로 emit → re-decompose 배치에 UNION(NEW-4). stdout 마지막에 DEMOTE_DOC_IDS= 출력.
"""
doc_ids = _parse_doc_ids(args)
if not doc_ids:
_log("REFUSE: update-char-start 는 --doc <list> 필수 (hash_stable 32 = gm-t1 산출)")
sys.exit(2)
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
updated, demoted, noop = [], [], []
try:
for doc_id in doc_ids:
async with sm() as session:
md = await session.scalar(text("SELECT md_content FROM documents WHERE id=:d"), {"d": doc_id})
if not md or not md.strip():
noop.append(doc_id)
_log(f" doc={doc_id} md_content 없음 → no-op(suspect, V4)")
continue
nodes = build_hier_tree(md)
stored = (await session.execute(text("""
SELECT id, chunk_index, chunk_content_hash, node_type, is_leaf
FROM document_chunks
WHERE doc_id=:d AND source_type='hier_section'
ORDER BY chunk_index"""), {"d": doc_id})).mappings().all()
# [NEW-2] position 정렬: build node[i] ↔ stored[i] (chunk_index = base + idx 라 동일 순서).
# 노드 수가 다르면 구조 변경 = hash_changed → DEMOTE.
if len(nodes) != len(stored):
demoted.append(doc_id)
_log(f" doc={doc_id} 노드수 build {len(nodes)} ≠ stored {len(stored)} → DEMOTE(re-decompose)")
continue
# [NEW-1] 전 position hash 일치 VERIFY (position-alignment 가 ordering 도 검증).
# 임의 position 불일치 → DEMOTE (jump-target 1% miss 도 whole-doc 폴백 회귀를 부르므로 100%).
mismatch = next((i for i, (nd, sr) in enumerate(zip(nodes, stored))
if nd.chunk_content_hash != sr["chunk_content_hash"]), None)
if mismatch is not None:
demoted.append(doc_id)
_log(f" doc={doc_id} position {mismatch} hash 불일치 → DEMOTE(re-decompose, NEW-1)")
continue
# 통과 → jump-target 의 char_start 를 stored row PK 로 UPDATE.
n_upd = 0
for nd, sr in zip(nodes, stored):
if _is_jump_target(nd) and nd.char_start is not None:
await session.execute(
text("UPDATE document_chunks SET char_start=:cs WHERE id=:id"),
{"cs": nd.char_start, "id": sr["id"]})
n_upd += 1
await session.commit()
updated.append(doc_id)
_log(f" ✓ doc={doc_id} char_start UPDATE {n_upd} jump-target (VERIFY 100%, 비파괴)")
finally:
await engine.dispose()
_log(f"=== update-char-start: updated={len(updated)} demoted={len(demoted)} noop={len(noop)} ===")
if demoted:
_log(f" DEMOTE(re-decompose 배치 합류, NEW-4): {demoted}")
if noop:
_log(f" NO-OP(md_content NULL suspect, V4): {noop}")
# 기계가독: re-decompose --doc = (gm-t1 hash_changed 230) UNION (이 리스트)
print("DEMOTE_DOC_IDS=" + ",".join(str(x) for x in demoted), flush=True)
# 미분석 hier leaf 보유 doc 선별 (재분해 마커와 독립 — analyze 추적 별도 축, g3-t3).
def _analyze_candidate_sql(doc_ids=None):
scope = "AND dc.doc_id = ANY(:ids)" if doc_ids else ""
return text(f"""
SELECT DISTINCT dc.doc_id AS doc_id, d.ai_domain AS ai_domain
FROM document_chunks dc JOIN documents d ON d.id = dc.doc_id
WHERE dc.source_type = 'hier_section' AND dc.is_leaf = true {scope}
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
AND a.source_content_hash = dc.chunk_content_hash)
ORDER BY dc.doc_id
""")
async def cmd_analyze(args):
"""[g3-t3 self-heal] 미분석 hier leaf 만 분석 (재분해/char_start 마커와 독립, 멱등).
re-decompose 의 char_start 완료마커는 'jump-target char_start 보유'라서, 컨테이너 recreate/deadline 으로
analyze 가 잘린 doc(char_start 는 있으나 일부 leaf 미분석)을 재선별하지 못한다 → 이 커맨드가 LEAF_SQL 기준
(미분석 leaf 보유)으로 독립 선별해 eventually-consistent rail summary 를 수렴시킨다. 멱등(LEAF_SQL NOT EXISTS).
--doc 로 제한 가능(미지정=전체). jump(char_start)와 무관 — rail summary 수렴 전용."""
doc_ids = _parse_doc_ids(args)
deadline = _compute_deadline(args.deadline)
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
_log(f"[analyze] deadline={deadline:%m-%d %H:%M} (stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
f"{'doc-list='+str(len(doc_ids)) if doc_ids else 'all'} 미분석 leaf 보유 doc 선별")
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
client = AIClient()
model_name = settings.ai.triage.model
params = {"pv": PROMPT_VERSION}
if doc_ids:
params["ids"] = doc_ids
tot_docs = tot_ok = tot_fail = tot_skip = 0
try:
async with sm() as session:
cands = (await session.execute(_analyze_candidate_sql(doc_ids), params)).mappings().all()
_log(f"[analyze] 후보 doc {len(cands)} (미분석 leaf 보유). 시작.")
for c in cands:
if time.time() >= stop_at:
_log(f"⏰ deadline 버퍼 도달 — 중단 (처리 {tot_docs} doc)")
break
doc_id, doc_domain = c["doc_id"], c["ai_domain"] or "general"
try:
async with sm() as session:
st = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at)
except Exception as exc:
_log(f" ✗ doc={doc_id} 분석 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}")
continue
tot_docs += 1
tot_ok += st["ok"]; tot_fail += st["fail"]; tot_skip += st["skip"]
_log(f" ✓ doc={doc_id} ok={st['ok']} fail={st['fail']} skip={st['skip']} leaves={st['leaves']}"
f"{' [ABORT]' if st['aborted'] else ''} | 누적 {tot_docs}doc {tot_ok}ok")
if st["aborted"]:
_log("⏰ leaf 분석 중 deadline 도달 — 중단")
break
finally:
await client.close()
await engine.dispose()
_log(f"=== [analyze] 종료: {tot_docs} doc, ok={tot_ok} fail={tot_fail} skip={tot_skip} ===")
def main():
ap = argparse.ArgumentParser(description="오버나이트 hier 분해+절 분석 backfill (additive)")
sub = ap.add_subparsers(dest="cmd", required=True)
p_dry = sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)")
p_dry.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
p_dry.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
p_dry.add_argument("--reprocess", action="store_true", help="재분해 후보(기존 hier+jump-target char_start 부재) — --doc 필수")
p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)")
p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00 — 컨테이너 UTC 주의, 07:00 KST=22:00 UTC)")
p_run.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
p_run.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
p_run.add_argument("--skip-analysis", action="store_true", help="절 분석(Mac mini) 생략, 분해+임베딩만 (retrieval go/no-go 측정 준비용)")
p_run.add_argument("--reprocess", action="store_true",
help="[g3-t2] RE-DECOMPOSE: 기존 hier DELETE→CASCADE→재INSERT (md_content 출처, char_start). "
"--doc(REFINED PASS hash_changeddemote) 필수 / 스냅샷 선행 필수")
p_upd = sub.add_parser("update-char-start",
help="[g3-tU] hash_stable doc 비파괴 char_start UPDATE (100% VERIFY, --doc 필수)")
p_upd.add_argument("--doc", default=None, help="comma-sep doc id (gm-t1 hash_stable 32)")
p_an = sub.add_parser("analyze",
help="[g3-t3] 미분석 hier leaf 만 분석(재분해 무관, 멱등) — recreate/deadline 으로 잘린 절분석 수렴")
p_an.add_argument("--deadline", default="07:00", help="HH:MM (컨테이너 UTC, 07:00 KST=22:00 UTC)")
p_an.add_argument("--doc", default=None, help="comma-sep doc id (미지정=미분석 leaf 보유 전체)")
args = ap.parse_args()
fn = {"dry-run": cmd_dry_run, "run": cmd_run, "update-char-start": cmd_update_char_start,
"analyze": cmd_analyze}[args.cmd]
asyncio.run(fn(args))
if __name__ == "__main__":
main()