feat(search): hier backfill --skip-analysis + --doc gate-bypass flags
PR-DocSrv-Hier-Replace-Diagnose-1 c2. 구조화 소형 문서(법령 등) eval coverage 보정용 — --doc 명시 리스트로 DOC_MIN_CHARS=4000 게이트 우회, --skip-analysis 로 절분석(Mac mini) 생략하고 분해+임베딩만. retrieval go/no-go 측정 준비. additive, in_corpus 무영향. NOT EXISTS hier 멱등 가드 유지. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -42,17 +42,23 @@ DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTUR
|
||||
BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단
|
||||
|
||||
|
||||
def _candidate_sql(allowlist):
|
||||
def _candidate_sql(allowlist, doc_ids=None):
|
||||
"""allowlist 있으면 그 domain 만, 없으면 EXCLUDE_DOMAINS(news) 제외 전부.
|
||||
doc_ids 명시 시 = 그 doc 만(크기 게이트 DOC_MIN_CHARS + domain 필터 우회 —
|
||||
구조화 소형 문서(법령 등) eval coverage 보정용. NOT EXISTS hier 멱등 가드는 유지).
|
||||
작은 doc 먼저 = 완료 doc 수 최대화 + 단일 mega-doc 예산 독식 방지."""
|
||||
cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)"
|
||||
if allowlist else
|
||||
"lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)")
|
||||
if doc_ids:
|
||||
cond, gate = "d.id = ANY(:doc_ids)", "" # 명시 doc = 크기 게이트 우회
|
||||
else:
|
||||
cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)"
|
||||
if allowlist else
|
||||
"lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)")
|
||||
gate = "AND length(d.extracted_text) > :minchars"
|
||||
return text(f"""
|
||||
SELECT d.id AS doc_id, d.extracted_text AS body, d.ai_domain AS ai_domain
|
||||
FROM documents d
|
||||
WHERE d.extracted_text IS NOT NULL
|
||||
AND length(d.extracted_text) > :minchars
|
||||
{gate}
|
||||
AND {cond}
|
||||
AND NOT EXISTS (SELECT 1 FROM document_chunks dc
|
||||
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')
|
||||
@@ -60,7 +66,9 @@ def _candidate_sql(allowlist):
|
||||
""")
|
||||
|
||||
|
||||
def _candidate_params(allowlist):
|
||||
def _candidate_params(allowlist, doc_ids=None):
|
||||
if doc_ids:
|
||||
return {"doc_ids": doc_ids}
|
||||
p = {"minchars": DOC_MIN_CHARS}
|
||||
if allowlist:
|
||||
p["domains"] = allowlist
|
||||
@@ -69,7 +77,9 @@ def _candidate_params(allowlist):
|
||||
return p
|
||||
|
||||
|
||||
def _scope_label(allowlist):
|
||||
def _scope_label(allowlist, doc_ids=None):
|
||||
if doc_ids:
|
||||
return f"doc-list={len(doc_ids)}건(크기게이트 우회)"
|
||||
return f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}"
|
||||
|
||||
# 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외)
|
||||
@@ -159,15 +169,22 @@ async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, s
|
||||
"timings": timings, "types": types, "aborted": aborted}
|
||||
|
||||
|
||||
def _parse_doc_ids(args):
|
||||
raw = getattr(args, "doc", None)
|
||||
return [int(x) for x in raw.split(",") if x.strip()] if raw else None
|
||||
|
||||
|
||||
async def cmd_dry_run(args):
|
||||
allowlist = args.domains.split(",") if args.domains else None
|
||||
doc_ids = _parse_doc_ids(args)
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
rows = (await session.execute(_candidate_sql(allowlist),
|
||||
_candidate_params(allowlist))).mappings().all()
|
||||
rows = (await session.execute(_candidate_sql(allowlist, doc_ids),
|
||||
_candidate_params(allowlist, doc_ids))).mappings().all()
|
||||
await engine.dispose()
|
||||
print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist)}, >{DOC_MIN_CHARS}자, 미분해)")
|
||||
gate_lbl = "doc-list" if doc_ids else f">{DOC_MIN_CHARS}자"
|
||||
print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids)}, {gate_lbl}, 미분해)")
|
||||
if rows:
|
||||
lens = [len(r["body"]) for r in rows]
|
||||
print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}")
|
||||
@@ -178,10 +195,12 @@ async def cmd_dry_run(args):
|
||||
|
||||
async def cmd_run(args):
|
||||
allowlist = args.domains.split(",") if args.domains else None
|
||||
doc_ids = _parse_doc_ids(args)
|
||||
skip_analysis = getattr(args, "skip_analysis", False)
|
||||
deadline = _compute_deadline(args.deadline)
|
||||
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
|
||||
_log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
|
||||
f"{_scope_label(allowlist)}")
|
||||
f"{_scope_label(allowlist, doc_ids)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}")
|
||||
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
@@ -200,8 +219,8 @@ async def cmd_run(args):
|
||||
run_start = time.time()
|
||||
try:
|
||||
async with sm() as session:
|
||||
cands = (await session.execute(_candidate_sql(allowlist),
|
||||
_candidate_params(allowlist))).mappings().all()
|
||||
cands = (await session.execute(_candidate_sql(allowlist, doc_ids),
|
||||
_candidate_params(allowlist, doc_ids))).mappings().all()
|
||||
_log(f"후보 doc {len(cands)} 선별. 시작.")
|
||||
|
||||
for c in cands:
|
||||
@@ -214,8 +233,13 @@ async def cmd_run(args):
|
||||
pstat = await persist_hier_tree(session, doc_id, body, embed_leaf)
|
||||
leaves_created = pstat.get("leaves", 0)
|
||||
tot_leaves_created += leaves_created
|
||||
async with sm() as session:
|
||||
astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at)
|
||||
if skip_analysis:
|
||||
# 분해+임베딩만 (절 분석 = Mac mini 별 축, retrieval 무관). 멱등.
|
||||
astat = {"ok": 0, "fail": 0, "skip": 0, "leaves": leaves_created,
|
||||
"timings": [], "types": [], "aborted": False}
|
||||
else:
|
||||
async with sm() as session:
|
||||
astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at)
|
||||
except Exception as exc:
|
||||
_log(f" ✗ doc={doc_id} 처리 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}")
|
||||
continue
|
||||
@@ -250,9 +274,12 @@ def main():
|
||||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||||
p_dry = sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)")
|
||||
p_dry.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
|
||||
p_dry.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
|
||||
p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)")
|
||||
p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00 — 컨테이너 UTC 주의, 07:00 KST=22:00 UTC)")
|
||||
p_run.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
|
||||
p_run.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
|
||||
p_run.add_argument("--skip-analysis", action="store_true", help="절 분석(Mac mini) 생략, 분해+임베딩만 (retrieval go/no-go 측정 준비용)")
|
||||
args = ap.parse_args()
|
||||
fn = {"dry-run": cmd_dry_run, "run": cmd_run}[args.cmd]
|
||||
asyncio.run(fn(args))
|
||||
|
||||
Reference in New Issue
Block a user