feat(documents): hier analyze 서브커맨드 — 재분해와 독립한 절분석 self-heal (g3-t3 갭)
re-decompose 의 char_start 완료마커는 'jump-target char_start 보유'라 컨테이너 recreate/deadline 으로 analyze 가 잘린 doc(char_start 있으나 일부 leaf 미분석)을 재선별 못 함 → rail summary 영구 미수렴 갭. `analyze` 가 LEAF_SQL(미분석 leaf 보유) 기준 독립 선별로 수렴(멱등, --doc 제한 가능, jump 무관). sweep 로그도 `analyze` 커맨드 안내로 갱신. (2026-06-10 백필서 recreate 로 잘린 5 doc·53 leaf 수동 처리한 케이스 항구화.) Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -317,7 +317,8 @@ async def cmd_run(args):
|
||||
{"ids": doc_ids, "pv": PROMPT_VERSION})).mappings().all()
|
||||
if pending:
|
||||
tot = sum(r["unanalyzed"] for r in pending)
|
||||
_log(f" [sweep] 미분석 leaf 잔여: {tot} (doc {len(pending)}) — 다음 실행이 이어서 분석(멱등). "
|
||||
_log(f" [sweep] 미분석 leaf 잔여: {tot} (doc {len(pending)}) — char_start 마커는 이들을 재선별 안 함; "
|
||||
f"`analyze` 커맨드로 수렴(`analyze --deadline HH:MM`, 멱등). "
|
||||
f"상위: {[(r['doc_id'], r['unanalyzed']) for r in pending[:5]]}")
|
||||
else:
|
||||
_log(" [sweep] 미분석 leaf 잔여 0 — 분석 수렴.")
|
||||
@@ -397,6 +398,70 @@ async def cmd_update_char_start(args):
|
||||
print("DEMOTE_DOC_IDS=" + ",".join(str(x) for x in demoted), flush=True)
|
||||
|
||||
|
||||
# 미분석 hier leaf 보유 doc 선별 (재분해 마커와 독립 — analyze 추적 별도 축, g3-t3).
|
||||
def _analyze_candidate_sql(doc_ids=None):
|
||||
scope = "AND dc.doc_id = ANY(:ids)" if doc_ids else ""
|
||||
return text(f"""
|
||||
SELECT DISTINCT dc.doc_id AS doc_id, d.ai_domain AS ai_domain
|
||||
FROM document_chunks dc JOIN documents d ON d.id = dc.doc_id
|
||||
WHERE dc.source_type = 'hier_section' AND dc.is_leaf = true {scope}
|
||||
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
|
||||
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
|
||||
AND a.source_content_hash = dc.chunk_content_hash)
|
||||
ORDER BY dc.doc_id
|
||||
""")
|
||||
|
||||
|
||||
async def cmd_analyze(args):
|
||||
"""[g3-t3 self-heal] 미분석 hier leaf 만 분석 (재분해/char_start 마커와 독립, 멱등).
|
||||
|
||||
re-decompose 의 char_start 완료마커는 'jump-target char_start 보유'라서, 컨테이너 recreate/deadline 으로
|
||||
analyze 가 잘린 doc(char_start 는 있으나 일부 leaf 미분석)을 재선별하지 못한다 → 이 커맨드가 LEAF_SQL 기준
|
||||
(미분석 leaf 보유)으로 독립 선별해 eventually-consistent rail summary 를 수렴시킨다. 멱등(LEAF_SQL NOT EXISTS).
|
||||
--doc 로 제한 가능(미지정=전체). jump(char_start)와 무관 — rail summary 수렴 전용."""
|
||||
doc_ids = _parse_doc_ids(args)
|
||||
deadline = _compute_deadline(args.deadline)
|
||||
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
|
||||
_log(f"[analyze] deadline={deadline:%m-%d %H:%M} (stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
|
||||
f"{'doc-list='+str(len(doc_ids)) if doc_ids else 'all'} 미분석 leaf 보유 doc 선별")
|
||||
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
client = AIClient()
|
||||
model_name = settings.ai.triage.model
|
||||
params = {"pv": PROMPT_VERSION}
|
||||
if doc_ids:
|
||||
params["ids"] = doc_ids
|
||||
|
||||
tot_docs = tot_ok = tot_fail = tot_skip = 0
|
||||
try:
|
||||
async with sm() as session:
|
||||
cands = (await session.execute(_analyze_candidate_sql(doc_ids), params)).mappings().all()
|
||||
_log(f"[analyze] 후보 doc {len(cands)} (미분석 leaf 보유). 시작.")
|
||||
for c in cands:
|
||||
if time.time() >= stop_at:
|
||||
_log(f"⏰ deadline 버퍼 도달 — 중단 (처리 {tot_docs} doc)")
|
||||
break
|
||||
doc_id, doc_domain = c["doc_id"], c["ai_domain"] or "general"
|
||||
try:
|
||||
async with sm() as session:
|
||||
st = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at)
|
||||
except Exception as exc:
|
||||
_log(f" ✗ doc={doc_id} 분석 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}")
|
||||
continue
|
||||
tot_docs += 1
|
||||
tot_ok += st["ok"]; tot_fail += st["fail"]; tot_skip += st["skip"]
|
||||
_log(f" ✓ doc={doc_id} ok={st['ok']} fail={st['fail']} skip={st['skip']} leaves={st['leaves']}"
|
||||
f"{' [ABORT]' if st['aborted'] else ''} | 누적 {tot_docs}doc {tot_ok}ok")
|
||||
if st["aborted"]:
|
||||
_log("⏰ leaf 분석 중 deadline 도달 — 중단")
|
||||
break
|
||||
finally:
|
||||
await client.close()
|
||||
await engine.dispose()
|
||||
_log(f"=== [analyze] 종료: {tot_docs} doc, ok={tot_ok} fail={tot_fail} skip={tot_skip} ===")
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="오버나이트 hier 분해+절 분석 backfill (additive)")
|
||||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||||
@@ -415,8 +480,13 @@ def main():
|
||||
p_upd = sub.add_parser("update-char-start",
|
||||
help="[g3-tU] hash_stable doc 비파괴 char_start UPDATE (100% VERIFY, --doc 필수)")
|
||||
p_upd.add_argument("--doc", default=None, help="comma-sep doc id (gm-t1 hash_stable 32)")
|
||||
p_an = sub.add_parser("analyze",
|
||||
help="[g3-t3] 미분석 hier leaf 만 분석(재분해 무관, 멱등) — recreate/deadline 으로 잘린 절분석 수렴")
|
||||
p_an.add_argument("--deadline", default="07:00", help="HH:MM (컨테이너 UTC, 07:00 KST=22:00 UTC)")
|
||||
p_an.add_argument("--doc", default=None, help="comma-sep doc id (미지정=미분석 leaf 보유 전체)")
|
||||
args = ap.parse_args()
|
||||
fn = {"dry-run": cmd_dry_run, "run": cmd_run, "update-char-start": cmd_update_char_start}[args.cmd]
|
||||
fn = {"dry-run": cmd_dry_run, "run": cmd_run, "update-char-start": cmd_update_char_start,
|
||||
"analyze": cmd_analyze}[args.cmd]
|
||||
asyncio.run(fn(args))
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user