ebbcaf86d8
processing_queue 는 파이프라인 stage 전용이라 hier_overnight_backfill 같은 off-queue 관리 스크립트 작업이 대시보드 보드에 안 잡혀, 다른 세션이 모르고 fastapi 를 재생성해 in-flight 재분해를 끊는 사고가 발생(2026-06-14). 사각지대 해소. - migrations/357_background_jobs.sql: background_jobs 테이블(kind/label/state/processed/ total/heartbeat). worker_jobs(user_id 필수, worker-pool 전용)와 별개. - services/background_jobs.py: start/heartbeat/finish 헬퍼 — 자율 트랜잭션(즉시 commit → 실시간 가시화) + best-effort(관측 실패가 본작업 안 깸). - hier_overnight_backfill: 작업 시작/절 ~10개마다 heartbeat/종료 계측. - queue_overview: /api/queue/overview 응답에 background_jobs 추가(running + 최근 6h 완료, stale=heartbeat 끊김 추정). SAVEPOINT 로 테이블 부재/오류 시 보드 본체 무영향. - ProcessingFlowBoard: "백그라운드 작업" 패널(진행/경과/state, stale 끊김 경고). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
512 lines
27 KiB
Python
512 lines
27 KiB
Python
"""오버나이트 hier 분해 + 절 분석 backfill (ADDITIVE — 검색 코퍼스 미교체).
|
||
|
||
Engineering + Industrial_Safety 미분해 기술문서를 deadline(기본 07:00 KST) 전까지:
|
||
doc → persist_hier_tree(build + leaf embed, in_corpus=false) → 절 분석(Mac mini gemma-26B) → commit
|
||
검색 코퍼스(replace_doc_corpus) 미터치 → eval baseline/reindex 무관, 무위험.
|
||
시간 초과 시 leaf 경계에서 안전 중단(멱등 — 다음 실행이 미처리분만 이어서).
|
||
|
||
절 분석 상수/헬퍼는 section_summary_pilot 에서 import = PROMPT_VERSION 단일 진실(멱등 보존).
|
||
no silent fallback(call_triage 직접) / Semaphore(1) BACKGROUND gate / 새 Semaphore 금지.
|
||
|
||
실행 (GPU 서버, background):
|
||
docker compose exec -T fastapi python /app/scripts/hier_overnight_backfill.py run --deadline 07:00
|
||
docker compose exec -T fastapi python /app/scripts/hier_overnight_backfill.py dry-run
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import os
|
||
import statistics
|
||
import sys
|
||
import time
|
||
from collections import Counter
|
||
from datetime import datetime, timedelta
|
||
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||
|
||
from sqlalchemy import text
|
||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||
|
||
from ai.client import AIClient, parse_json_response, strip_thinking
|
||
from core.config import settings
|
||
from services.hier_decomp.builder import build_hier_tree
|
||
from services.hier_decomp.persist import persist_hier_tree
|
||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||
from services.background_jobs import finish_job, heartbeat, start_job
|
||
|
||
# 단일 진실: 절 분석 상수/헬퍼 (PROMPT_VERSION 일치 = 멱등 보존)
|
||
from section_summary_pilot import (
|
||
CALL_TIMEOUT_S, MIN_CHARS, PROMPT_VERSION, _UPSERT_SQL, _build_prompt, _coerce_type,
|
||
)
|
||
|
||
EXCLUDE_DOMAINS = ["news"] # 기본 = 뉴스만 제외, 나머지 전부 (allowlist 는 --domains 로 override)
|
||
DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTURE_SPLIT_THRESHOLD=4000)
|
||
BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단
|
||
|
||
|
||
# jump-target = 비-window leaf OR %_split parent (B1/B3 완료마커 + B_jumptarget 분모, 플랜 g3-t2).
|
||
# 이 집합만 char_start 를 받는다(window-child/preamble 은 설계상 NULL).
|
||
_JUMP_TARGET_PRED = r"((c.is_leaf AND c.node_type IS DISTINCT FROM 'window') OR c.node_type LIKE '%\_split' ESCAPE '\')"
|
||
|
||
|
||
def _candidate_sql(allowlist, doc_ids=None, reprocess=False):
|
||
"""body = d.md_content (g0-t1: hier 출처 md_content 영구확정 — extracted_text 폐기. char_start 가
|
||
md_content offset 이라 FE splice basis 와 일치해야 하므로 분해 source 도 md_content 여야 함[F1]).
|
||
|
||
reprocess=False (additive): 아직 hier 없는 doc 만 신규 분해 (NOT EXISTS hier_section 멱등).
|
||
reprocess=True (re-decompose): hier 는 있으나 jump-target char_start 가 아직 안 채워진 doc 재분해.
|
||
[B1] 완료마커 = jump-target 중 char_start NOT NULL 행이 존재(=한 번 재분해되면 atomic 하게 전부 채워짐);
|
||
window-child/preamble 은 설계상 NULL 이라 'all-leaf NOT NULL' 마커의 무한 trap 을 피한다.
|
||
[B3] 빈 jump-target doc(B_jumptarget==0)은 NOT EXISTS 가 vacuous TRUE → 영구 재선택 trap →
|
||
호출측이 --doc 을 REFINED PASS(B_jumptarget>=1) 로 제한해 차단(--reprocess 는 --doc 필수, REFUSE).
|
||
doc_ids 명시 시 크기 게이트 우회. 작은 doc 먼저 = 완료 doc 수 최대화."""
|
||
if doc_ids:
|
||
cond, gate = "d.id = ANY(:doc_ids)", "" # 명시 doc = 크기 게이트 우회
|
||
else:
|
||
cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)"
|
||
if allowlist else
|
||
"lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)")
|
||
gate = "AND length(d.md_content) > :minchars"
|
||
if reprocess:
|
||
marker = f"""
|
||
AND EXISTS (SELECT 1 FROM document_chunks dc
|
||
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')
|
||
AND NOT EXISTS (SELECT 1 FROM document_chunks c
|
||
WHERE c.doc_id = d.id AND c.source_type = 'hier_section'
|
||
AND c.char_start IS NOT NULL AND {_JUMP_TARGET_PRED})"""
|
||
else:
|
||
marker = """
|
||
AND NOT EXISTS (SELECT 1 FROM document_chunks dc
|
||
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')"""
|
||
return text(f"""
|
||
SELECT d.id AS doc_id, d.md_content AS body, d.ai_domain AS ai_domain
|
||
FROM documents d
|
||
WHERE d.md_content IS NOT NULL AND length(d.md_content) > 0
|
||
{gate}
|
||
AND {cond}
|
||
{marker}
|
||
ORDER BY length(d.md_content) ASC
|
||
""")
|
||
|
||
|
||
def _candidate_params(allowlist, doc_ids=None):
|
||
if doc_ids:
|
||
return {"doc_ids": doc_ids}
|
||
p = {"minchars": DOC_MIN_CHARS}
|
||
if allowlist:
|
||
p["domains"] = allowlist
|
||
else:
|
||
p["exclude"] = EXCLUDE_DOMAINS
|
||
return p
|
||
|
||
|
||
def _scope_label(allowlist, doc_ids=None, reprocess=False):
|
||
tag = "RE-DECOMPOSE" if reprocess else "additive"
|
||
if doc_ids:
|
||
return f"doc-list={len(doc_ids)}건(크기게이트 우회, {tag})"
|
||
return (f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}") + f" ({tag})"
|
||
|
||
# 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외)
|
||
LEAF_SQL = text("""
|
||
SELECT dc.id AS chunk_id, dc.heading_path, dc.section_title,
|
||
dc.text AS body, length(dc.text) AS body_len,
|
||
dc.chunk_content_hash AS content_hash
|
||
FROM document_chunks dc
|
||
WHERE dc.doc_id = :doc AND dc.source_type = 'hier_section' AND dc.is_leaf = true
|
||
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
|
||
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
|
||
AND a.source_content_hash = dc.chunk_content_hash)
|
||
ORDER BY dc.chunk_index
|
||
""")
|
||
|
||
|
||
def _now():
|
||
return datetime.now()
|
||
|
||
|
||
def _log(msg):
|
||
print(f"[{_now():%H:%M:%S}] {msg}", flush=True)
|
||
|
||
|
||
def _compute_deadline(hhmm: str) -> datetime:
|
||
h, m = (int(x) for x in hhmm.split(":"))
|
||
now = _now()
|
||
target = now.replace(hour=h, minute=m, second=0, microsecond=0)
|
||
if target <= now:
|
||
target += timedelta(days=1)
|
||
return target
|
||
|
||
|
||
def _make_engine():
|
||
return create_async_engine(os.environ["DATABASE_URL"], pool_pre_ping=True)
|
||
|
||
|
||
async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at,
|
||
engine=None, job_id=None, base_processed=0):
|
||
"""doc 의 미분석 hier leaf 분석 → upsert. stop_at(epoch) 넘으면 leaf 경계 중단.
|
||
engine/job_id 주어지면 background_jobs 에 ~10절마다 진행 heartbeat(보드 가시화)."""
|
||
rows = (await session.execute(LEAF_SQL, {"doc": doc_id, "pv": PROMPT_VERSION})).mappings().all()
|
||
ok = fail = skip = 0
|
||
timings, types = [], []
|
||
aborted = False
|
||
for r in rows:
|
||
if time.time() >= stop_at:
|
||
aborted = True
|
||
break
|
||
if r["body_len"] < MIN_CHARS:
|
||
await session.execute(_UPSERT_SQL, {
|
||
"chunk_id": r["chunk_id"], "status": "skipped_tiny", "summary": None,
|
||
"section_type": None, "domain": doc_domain, "confidence": None,
|
||
"model": None, "pv": PROMPT_VERSION, "content_hash": r["content_hash"], "error": None,
|
||
})
|
||
skip += 1
|
||
continue
|
||
status, summary, sec_type, conf, err = "failed", None, None, None, None
|
||
start = time.perf_counter()
|
||
try:
|
||
async with acquire_mlx_gate(Priority.BACKGROUND):
|
||
async with asyncio.timeout(CALL_TIMEOUT_S):
|
||
raw = await client.call_triage(_build_prompt(r))
|
||
timings.append(time.perf_counter() - start)
|
||
parsed = parse_json_response(strip_thinking(raw)) if raw else None
|
||
if parsed and isinstance(parsed, dict):
|
||
summary = (parsed.get("summary") or "").strip() or None
|
||
sec_type = _coerce_type(parsed.get("section_type"))
|
||
try:
|
||
conf = float(parsed.get("confidence"))
|
||
except (TypeError, ValueError):
|
||
conf = 0.5
|
||
status, ok = "summarized", ok + 1
|
||
types.append(sec_type)
|
||
else:
|
||
err, fail = "parse_failed", fail + 1
|
||
except Exception as exc: # timeout/호출 실패 — no fallback
|
||
timings.append(time.perf_counter() - start)
|
||
err, fail = f"{type(exc).__name__}: {repr(exc)[:160]}", fail + 1
|
||
await session.execute(_UPSERT_SQL, {
|
||
"chunk_id": r["chunk_id"], "status": status, "summary": summary,
|
||
"section_type": sec_type, "domain": doc_domain, "confidence": conf,
|
||
"model": model_name, "pv": PROMPT_VERSION,
|
||
"content_hash": r["content_hash"], "error": err,
|
||
})
|
||
await session.commit()
|
||
if job_id and (ok + fail + skip) % 10 == 0:
|
||
await heartbeat(engine, job_id, processed=base_processed + ok + fail + skip)
|
||
await session.commit()
|
||
return {"ok": ok, "fail": fail, "skip": skip, "leaves": len(rows),
|
||
"timings": timings, "types": types, "aborted": aborted}
|
||
|
||
|
||
def _parse_doc_ids(args):
|
||
raw = getattr(args, "doc", None)
|
||
return [int(x) for x in raw.split(",") if x.strip()] if raw else None
|
||
|
||
|
||
async def cmd_dry_run(args):
|
||
allowlist = args.domains.split(",") if args.domains else None
|
||
doc_ids = _parse_doc_ids(args)
|
||
reprocess = getattr(args, "reprocess", False)
|
||
if reprocess and not doc_ids:
|
||
print("REFUSE: --reprocess 는 --doc <list> 필수 (B3 빈 jump-target trap 차단 — REFINED PASS 리스트만)")
|
||
sys.exit(2)
|
||
engine = _make_engine()
|
||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||
async with sm() as session:
|
||
rows = (await session.execute(_candidate_sql(allowlist, doc_ids, reprocess),
|
||
_candidate_params(allowlist, doc_ids))).mappings().all()
|
||
await engine.dispose()
|
||
gate_lbl = "doc-list" if doc_ids else f">{DOC_MIN_CHARS}자"
|
||
state_lbl = "재분해 미완료(jump-target char_start 부재)" if reprocess else "미분해"
|
||
print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids, reprocess)}, {gate_lbl}, {state_lbl})")
|
||
if rows:
|
||
lens = [len(r["body"]) for r in rows]
|
||
print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}")
|
||
print(" 앞 5개:")
|
||
for r in rows[:5]:
|
||
print(f" doc={r['doc_id']} {len(r['body']):>7,}자 {r['ai_domain']}")
|
||
|
||
|
||
async def cmd_run(args):
|
||
allowlist = args.domains.split(",") if args.domains else None
|
||
doc_ids = _parse_doc_ids(args)
|
||
reprocess = getattr(args, "reprocess", False)
|
||
if reprocess and not doc_ids:
|
||
_log("REFUSE: --reprocess 는 --doc <list> 필수 (B3 빈 jump-target trap 차단 — REFINED PASS 리스트만)")
|
||
sys.exit(2)
|
||
skip_analysis = getattr(args, "skip_analysis", False)
|
||
deadline = _compute_deadline(args.deadline)
|
||
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
|
||
_log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
|
||
f"{_scope_label(allowlist, doc_ids, reprocess)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}"
|
||
f"{' [RE-DECOMPOSE: 기존 hier DELETE→CASCADE chunk_section_analysis→재INSERT; 스냅샷 선행 필수]' if reprocess else ''}")
|
||
|
||
engine = _make_engine()
|
||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||
client = AIClient()
|
||
model_name = settings.ai.triage.model
|
||
|
||
async def embed_leaf(t):
|
||
try:
|
||
return await client.embed(t)
|
||
except Exception as exc:
|
||
_log(f" embed 실패(무시, in_corpus=false): {type(exc).__name__}")
|
||
return None
|
||
|
||
tot_docs = tot_ok = tot_fail = tot_skip = tot_leaves_created = 0
|
||
all_timings, all_types = [], []
|
||
run_start = time.time()
|
||
try:
|
||
async with sm() as session:
|
||
cands = (await session.execute(_candidate_sql(allowlist, doc_ids, reprocess),
|
||
_candidate_params(allowlist, doc_ids))).mappings().all()
|
||
_log(f"후보 doc {len(cands)} 선별. 시작.")
|
||
|
||
# 관측: 큐 밖 작업이라 대시보드 보드가 못 보므로 background_jobs 에 진행 노출(best-effort)
|
||
_job_kind = "hier_redecompose" if reprocess else "hier_backfill"
|
||
_job_label = (f"doc {args.doc} {'재분해' if reprocess else '분해'}" if doc_ids
|
||
else f"{len(cands)}개 문서 {'재분해' if reprocess else '분해'}")
|
||
job_id = await start_job(engine, _job_kind, _job_label, total=None)
|
||
|
||
for c in cands:
|
||
if time.time() >= stop_at:
|
||
_log(f"⏰ deadline 버퍼 도달 — doc 경계에서 중단 (처리 {tot_docs} doc)")
|
||
break
|
||
doc_id, body, doc_domain = c["doc_id"], c["body"], c["ai_domain"]
|
||
try:
|
||
async with sm() as session:
|
||
pstat = await persist_hier_tree(session, doc_id, body, embed_leaf)
|
||
leaves_created = pstat.get("leaves", 0)
|
||
tot_leaves_created += leaves_created
|
||
if skip_analysis:
|
||
# 분해+임베딩만 (절 분석 = Mac mini 별 축, retrieval 무관). 멱등.
|
||
astat = {"ok": 0, "fail": 0, "skip": 0, "leaves": leaves_created,
|
||
"timings": [], "types": [], "aborted": False}
|
||
else:
|
||
async with sm() as session:
|
||
astat = await _analyze_doc_leaves(
|
||
session, client, doc_id, doc_domain, model_name, stop_at,
|
||
engine=engine, job_id=job_id,
|
||
base_processed=(tot_ok + tot_fail + tot_skip))
|
||
except Exception as exc:
|
||
_log(f" ✗ doc={doc_id} 처리 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}")
|
||
continue
|
||
|
||
tot_docs += 1
|
||
tot_ok += astat["ok"]; tot_fail += astat["fail"]; tot_skip += astat["skip"]
|
||
all_timings += astat["timings"]; all_types += astat["types"]
|
||
await heartbeat(engine, job_id, processed=(tot_ok + tot_fail + tot_skip),
|
||
total=tot_leaves_created)
|
||
avg = statistics.mean(astat["timings"]) if astat["timings"] else 0
|
||
_log(f" ✓ doc={doc_id} ({len(body):,}자 {doc_domain.split('/')[0]}) "
|
||
f"leaf생성={leaves_created} 분석ok={astat['ok']} fail={astat['fail']} skip={astat['skip']} "
|
||
f"avg={avg:.1f}s{' [ABORT]' if astat['aborted'] else ''} | 누적 {tot_docs}doc {tot_ok}leaf")
|
||
if astat["aborted"]:
|
||
_log("⏰ leaf 분석 중 deadline 도달 — 중단")
|
||
break
|
||
await finish_job(engine, job_id, state="done")
|
||
finally:
|
||
await client.close()
|
||
await engine.dispose()
|
||
|
||
elapsed = (time.time() - run_start) / 60
|
||
_log(f"=== 종료: {tot_docs} doc, leaf생성 {tot_leaves_created}, "
|
||
f"분석 ok={tot_ok} fail={tot_fail} skip={tot_skip}, 경과 {elapsed:.0f}분 ===")
|
||
if all_timings:
|
||
_log(f" leaf당 {statistics.mean(all_timings):.2f}s (p50={statistics.median(all_timings):.2f} "
|
||
f"max={max(all_timings):.2f})")
|
||
if all_types:
|
||
d = Counter(all_types)
|
||
_log(f" section_type: {dict(d.most_common())} other={d.get('other',0)/len(all_types):.1%}")
|
||
|
||
# [g3-t3/g3-t4] post-run sweep: 처리한 doc 중 미분석 leaf 잔여 집계(반쪽상태/stall 검출).
|
||
# GOAL(jump=char_start)/rail-summary(re-analyze) DECOUPLE — 잔여는 다음 실행이 LEAF_SQL 멱등으로 흡수.
|
||
if doc_ids:
|
||
try:
|
||
async with sm() as session:
|
||
pending = (await session.execute(text(f"""
|
||
SELECT dc.doc_id, count(*) AS unanalyzed
|
||
FROM document_chunks dc
|
||
WHERE dc.doc_id = ANY(:ids) AND dc.source_type='hier_section' AND dc.is_leaf=true
|
||
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
|
||
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
|
||
AND a.source_content_hash = dc.chunk_content_hash)
|
||
GROUP BY dc.doc_id ORDER BY unanalyzed DESC"""),
|
||
{"ids": doc_ids, "pv": PROMPT_VERSION})).mappings().all()
|
||
if pending:
|
||
tot = sum(r["unanalyzed"] for r in pending)
|
||
_log(f" [sweep] 미분석 leaf 잔여: {tot} (doc {len(pending)}) — char_start 마커는 이들을 재선별 안 함; "
|
||
f"`analyze` 커맨드로 수렴(`analyze --deadline HH:MM`, 멱등). "
|
||
f"상위: {[(r['doc_id'], r['unanalyzed']) for r in pending[:5]]}")
|
||
else:
|
||
_log(" [sweep] 미분석 leaf 잔여 0 — 분석 수렴.")
|
||
except Exception as exc:
|
||
_log(f" [sweep] 잔여 집계 실패(무해): {type(exc).__name__}")
|
||
|
||
|
||
def _is_jump_target(node) -> bool:
|
||
"""jump-target = 비-window leaf OR %_split parent (builder HierNode 판정, _JUMP_TARGET_PRED 와 일치)."""
|
||
return ((node.is_leaf and node.node_type != "window")
|
||
or bool(node.node_type and node.node_type.endswith("_split")))
|
||
|
||
|
||
async def cmd_update_char_start(args):
|
||
"""[g3-tU] hash_stable doc 전용 비파괴 char_start UPDATE.
|
||
|
||
각 doc: build(md_content) → stored hier 행과 position-by-position(chunk_index 순) 정렬 →
|
||
[NEW-1] jump-target 전수 100% hash 일치(ALL-OR-NOTHING) VERIFY. 단 한 자리라도 불일치 → DEMOTE.
|
||
[NEW-2] hash 로 WHERE 하지 않음(동일-body 절 충돌 회피) — position 의 stored row PK(id)로 UPDATE.
|
||
통과 doc: UPDATE document_chunks SET char_start (DELETE/CASCADE/embed/analyze 0, 가역).
|
||
미달 doc: DEMOTE-LIST 로 emit → re-decompose 배치에 UNION(NEW-4). stdout 마지막에 DEMOTE_DOC_IDS= 출력.
|
||
"""
|
||
doc_ids = _parse_doc_ids(args)
|
||
if not doc_ids:
|
||
_log("REFUSE: update-char-start 는 --doc <list> 필수 (hash_stable 32 = gm-t1 산출)")
|
||
sys.exit(2)
|
||
engine = _make_engine()
|
||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||
updated, demoted, noop = [], [], []
|
||
try:
|
||
for doc_id in doc_ids:
|
||
async with sm() as session:
|
||
md = await session.scalar(text("SELECT md_content FROM documents WHERE id=:d"), {"d": doc_id})
|
||
if not md or not md.strip():
|
||
noop.append(doc_id)
|
||
_log(f" doc={doc_id} md_content 없음 → no-op(suspect, V4)")
|
||
continue
|
||
nodes = build_hier_tree(md)
|
||
stored = (await session.execute(text("""
|
||
SELECT id, chunk_index, chunk_content_hash, node_type, is_leaf
|
||
FROM document_chunks
|
||
WHERE doc_id=:d AND source_type='hier_section'
|
||
ORDER BY chunk_index"""), {"d": doc_id})).mappings().all()
|
||
# [NEW-2] position 정렬: build node[i] ↔ stored[i] (chunk_index = base + idx 라 동일 순서).
|
||
# 노드 수가 다르면 구조 변경 = hash_changed → DEMOTE.
|
||
if len(nodes) != len(stored):
|
||
demoted.append(doc_id)
|
||
_log(f" doc={doc_id} 노드수 build {len(nodes)} ≠ stored {len(stored)} → DEMOTE(re-decompose)")
|
||
continue
|
||
# [NEW-1] 전 position hash 일치 VERIFY (position-alignment 가 ordering 도 검증).
|
||
# 임의 position 불일치 → DEMOTE (jump-target 1% miss 도 whole-doc 폴백 회귀를 부르므로 100%).
|
||
mismatch = next((i for i, (nd, sr) in enumerate(zip(nodes, stored))
|
||
if nd.chunk_content_hash != sr["chunk_content_hash"]), None)
|
||
if mismatch is not None:
|
||
demoted.append(doc_id)
|
||
_log(f" doc={doc_id} position {mismatch} hash 불일치 → DEMOTE(re-decompose, NEW-1)")
|
||
continue
|
||
# 통과 → jump-target 의 char_start 를 stored row PK 로 UPDATE.
|
||
n_upd = 0
|
||
for nd, sr in zip(nodes, stored):
|
||
if _is_jump_target(nd) and nd.char_start is not None:
|
||
await session.execute(
|
||
text("UPDATE document_chunks SET char_start=:cs WHERE id=:id"),
|
||
{"cs": nd.char_start, "id": sr["id"]})
|
||
n_upd += 1
|
||
await session.commit()
|
||
updated.append(doc_id)
|
||
_log(f" ✓ doc={doc_id} char_start UPDATE {n_upd} jump-target (VERIFY 100%, 비파괴)")
|
||
finally:
|
||
await engine.dispose()
|
||
_log(f"=== update-char-start: updated={len(updated)} demoted={len(demoted)} noop={len(noop)} ===")
|
||
if demoted:
|
||
_log(f" DEMOTE(re-decompose 배치 합류, NEW-4): {demoted}")
|
||
if noop:
|
||
_log(f" NO-OP(md_content NULL suspect, V4): {noop}")
|
||
# 기계가독: re-decompose --doc = (gm-t1 hash_changed 230) UNION (이 리스트)
|
||
print("DEMOTE_DOC_IDS=" + ",".join(str(x) for x in demoted), flush=True)
|
||
|
||
|
||
# 미분석 hier leaf 보유 doc 선별 (재분해 마커와 독립 — analyze 추적 별도 축, g3-t3).
|
||
def _analyze_candidate_sql(doc_ids=None):
|
||
scope = "AND dc.doc_id = ANY(:ids)" if doc_ids else ""
|
||
return text(f"""
|
||
SELECT DISTINCT dc.doc_id AS doc_id, d.ai_domain AS ai_domain
|
||
FROM document_chunks dc JOIN documents d ON d.id = dc.doc_id
|
||
WHERE dc.source_type = 'hier_section' AND dc.is_leaf = true {scope}
|
||
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
|
||
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
|
||
AND a.source_content_hash = dc.chunk_content_hash)
|
||
ORDER BY dc.doc_id
|
||
""")
|
||
|
||
|
||
async def cmd_analyze(args):
|
||
"""[g3-t3 self-heal] 미분석 hier leaf 만 분석 (재분해/char_start 마커와 독립, 멱등).
|
||
|
||
re-decompose 의 char_start 완료마커는 'jump-target char_start 보유'라서, 컨테이너 recreate/deadline 으로
|
||
analyze 가 잘린 doc(char_start 는 있으나 일부 leaf 미분석)을 재선별하지 못한다 → 이 커맨드가 LEAF_SQL 기준
|
||
(미분석 leaf 보유)으로 독립 선별해 eventually-consistent rail summary 를 수렴시킨다. 멱등(LEAF_SQL NOT EXISTS).
|
||
--doc 로 제한 가능(미지정=전체). jump(char_start)와 무관 — rail summary 수렴 전용."""
|
||
doc_ids = _parse_doc_ids(args)
|
||
deadline = _compute_deadline(args.deadline)
|
||
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
|
||
_log(f"[analyze] deadline={deadline:%m-%d %H:%M} (stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
|
||
f"{'doc-list='+str(len(doc_ids)) if doc_ids else 'all'} 미분석 leaf 보유 doc 선별")
|
||
|
||
engine = _make_engine()
|
||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||
client = AIClient()
|
||
model_name = settings.ai.triage.model
|
||
params = {"pv": PROMPT_VERSION}
|
||
if doc_ids:
|
||
params["ids"] = doc_ids
|
||
|
||
tot_docs = tot_ok = tot_fail = tot_skip = 0
|
||
try:
|
||
async with sm() as session:
|
||
cands = (await session.execute(_analyze_candidate_sql(doc_ids), params)).mappings().all()
|
||
_log(f"[analyze] 후보 doc {len(cands)} (미분석 leaf 보유). 시작.")
|
||
for c in cands:
|
||
if time.time() >= stop_at:
|
||
_log(f"⏰ deadline 버퍼 도달 — 중단 (처리 {tot_docs} doc)")
|
||
break
|
||
doc_id, doc_domain = c["doc_id"], c["ai_domain"] or "general"
|
||
try:
|
||
async with sm() as session:
|
||
st = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at)
|
||
except Exception as exc:
|
||
_log(f" ✗ doc={doc_id} 분석 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}")
|
||
continue
|
||
tot_docs += 1
|
||
tot_ok += st["ok"]; tot_fail += st["fail"]; tot_skip += st["skip"]
|
||
_log(f" ✓ doc={doc_id} ok={st['ok']} fail={st['fail']} skip={st['skip']} leaves={st['leaves']}"
|
||
f"{' [ABORT]' if st['aborted'] else ''} | 누적 {tot_docs}doc {tot_ok}ok")
|
||
if st["aborted"]:
|
||
_log("⏰ leaf 분석 중 deadline 도달 — 중단")
|
||
break
|
||
finally:
|
||
await client.close()
|
||
await engine.dispose()
|
||
_log(f"=== [analyze] 종료: {tot_docs} doc, ok={tot_ok} fail={tot_fail} skip={tot_skip} ===")
|
||
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description="오버나이트 hier 분해+절 분석 backfill (additive)")
|
||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||
p_dry = sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)")
|
||
p_dry.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
|
||
p_dry.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
|
||
p_dry.add_argument("--reprocess", action="store_true", help="재분해 후보(기존 hier+jump-target char_start 부재) — --doc 필수")
|
||
p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)")
|
||
p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00 — 컨테이너 UTC 주의, 07:00 KST=22:00 UTC)")
|
||
p_run.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
|
||
p_run.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
|
||
p_run.add_argument("--skip-analysis", action="store_true", help="절 분석(Mac mini) 생략, 분해+임베딩만 (retrieval go/no-go 측정 준비용)")
|
||
p_run.add_argument("--reprocess", action="store_true",
|
||
help="[g3-t2] RE-DECOMPOSE: 기존 hier DELETE→CASCADE→재INSERT (md_content 출처, char_start). "
|
||
"--doc(REFINED PASS hash_changed∪demote) 필수 / 스냅샷 선행 필수")
|
||
p_upd = sub.add_parser("update-char-start",
|
||
help="[g3-tU] hash_stable doc 비파괴 char_start UPDATE (100% VERIFY, --doc 필수)")
|
||
p_upd.add_argument("--doc", default=None, help="comma-sep doc id (gm-t1 hash_stable 32)")
|
||
p_an = sub.add_parser("analyze",
|
||
help="[g3-t3] 미분석 hier leaf 만 분석(재분해 무관, 멱등) — recreate/deadline 으로 잘린 절분석 수렴")
|
||
p_an.add_argument("--deadline", default="07:00", help="HH:MM (컨테이너 UTC, 07:00 KST=22:00 UTC)")
|
||
p_an.add_argument("--doc", default=None, help="comma-sep doc id (미지정=미분석 leaf 보유 전체)")
|
||
args = ap.parse_args()
|
||
fn = {"dry-run": cmd_dry_run, "run": cmd_run, "update-char-start": cmd_update_char_start,
|
||
"analyze": cmd_analyze}[args.cmd]
|
||
asyncio.run(fn(args))
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|