diff --git a/scripts/hier_overnight_backfill.py b/scripts/hier_overnight_backfill.py index eada147..5270541 100644 --- a/scripts/hier_overnight_backfill.py +++ b/scripts/hier_overnight_backfill.py @@ -42,17 +42,23 @@ DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTUR BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단 -def _candidate_sql(allowlist): +def _candidate_sql(allowlist, doc_ids=None): """allowlist 있으면 그 domain 만, 없으면 EXCLUDE_DOMAINS(news) 제외 전부. + doc_ids 명시 시 = 그 doc 만(크기 게이트 DOC_MIN_CHARS + domain 필터 우회 — + 구조화 소형 문서(법령 등) eval coverage 보정용. NOT EXISTS hier 멱등 가드는 유지). 작은 doc 먼저 = 완료 doc 수 최대화 + 단일 mega-doc 예산 독식 방지.""" - cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)" - if allowlist else - "lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)") + if doc_ids: + cond, gate = "d.id = ANY(:doc_ids)", "" # 명시 doc = 크기 게이트 우회 + else: + cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)" + if allowlist else + "lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)") + gate = "AND length(d.extracted_text) > :minchars" return text(f""" SELECT d.id AS doc_id, d.extracted_text AS body, d.ai_domain AS ai_domain FROM documents d WHERE d.extracted_text IS NOT NULL - AND length(d.extracted_text) > :minchars + {gate} AND {cond} AND NOT EXISTS (SELECT 1 FROM document_chunks dc WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section') @@ -60,7 +66,9 @@ def _candidate_sql(allowlist): """) -def _candidate_params(allowlist): +def _candidate_params(allowlist, doc_ids=None): + if doc_ids: + return {"doc_ids": doc_ids} p = {"minchars": DOC_MIN_CHARS} if allowlist: p["domains"] = allowlist @@ -69,7 +77,9 @@ def _candidate_params(allowlist): return p -def _scope_label(allowlist): +def _scope_label(allowlist, doc_ids=None): + if doc_ids: + return f"doc-list={len(doc_ids)}건(크기게이트 우회)" return f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}" # 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외) @@ -159,15 +169,22 @@ async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, s "timings": timings, "types": types, "aborted": aborted} +def _parse_doc_ids(args): + raw = getattr(args, "doc", None) + return [int(x) for x in raw.split(",") if x.strip()] if raw else None + + async def cmd_dry_run(args): allowlist = args.domains.split(",") if args.domains else None + doc_ids = _parse_doc_ids(args) engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) async with sm() as session: - rows = (await session.execute(_candidate_sql(allowlist), - _candidate_params(allowlist))).mappings().all() + rows = (await session.execute(_candidate_sql(allowlist, doc_ids), + _candidate_params(allowlist, doc_ids))).mappings().all() await engine.dispose() - print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist)}, >{DOC_MIN_CHARS}자, 미분해)") + gate_lbl = "doc-list" if doc_ids else f">{DOC_MIN_CHARS}자" + print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids)}, {gate_lbl}, 미분해)") if rows: lens = [len(r["body"]) for r in rows] print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}") @@ -178,10 +195,12 @@ async def cmd_dry_run(args): async def cmd_run(args): allowlist = args.domains.split(",") if args.domains else None + doc_ids = _parse_doc_ids(args) + skip_analysis = getattr(args, "skip_analysis", False) deadline = _compute_deadline(args.deadline) stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp() _log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) " - f"{_scope_label(allowlist)}") + f"{_scope_label(allowlist, doc_ids)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}") engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) @@ -200,8 +219,8 @@ async def cmd_run(args): run_start = time.time() try: async with sm() as session: - cands = (await session.execute(_candidate_sql(allowlist), - _candidate_params(allowlist))).mappings().all() + cands = (await session.execute(_candidate_sql(allowlist, doc_ids), + _candidate_params(allowlist, doc_ids))).mappings().all() _log(f"후보 doc {len(cands)} 선별. 시작.") for c in cands: @@ -214,8 +233,13 @@ async def cmd_run(args): pstat = await persist_hier_tree(session, doc_id, body, embed_leaf) leaves_created = pstat.get("leaves", 0) tot_leaves_created += leaves_created - async with sm() as session: - astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at) + if skip_analysis: + # 분해+임베딩만 (절 분석 = Mac mini 별 축, retrieval 무관). 멱등. + astat = {"ok": 0, "fail": 0, "skip": 0, "leaves": leaves_created, + "timings": [], "types": [], "aborted": False} + else: + async with sm() as session: + astat = await _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, stop_at) except Exception as exc: _log(f" ✗ doc={doc_id} 처리 실패(건너뜀): {type(exc).__name__}: {repr(exc)[:160]}") continue @@ -250,9 +274,12 @@ def main(): sub = ap.add_subparsers(dest="cmd", required=True) p_dry = sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)") p_dry.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)") + p_dry.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)") p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)") p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00 — 컨테이너 UTC 주의, 07:00 KST=22:00 UTC)") p_run.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)") + p_run.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)") + p_run.add_argument("--skip-analysis", action="store_true", help="절 분석(Mac mini) 생략, 분해+임베딩만 (retrieval go/no-go 측정 준비용)") args = ap.parse_args() fn = {"dry-run": cmd_dry_run, "run": cmd_run}[args.cmd] asyncio.run(fn(args))