diff --git a/scripts/hier_overnight_backfill.py b/scripts/hier_overnight_backfill.py index d2de1d7..eada147 100644 --- a/scripts/hier_overnight_backfill.py +++ b/scripts/hier_overnight_backfill.py @@ -37,20 +37,40 @@ from section_summary_pilot import ( CALL_TIMEOUT_S, MIN_CHARS, PROMPT_VERSION, _UPSERT_SQL, _build_prompt, _coerce_type, ) -DEFAULT_DOMAINS = ["engineering", "industrial_safety"] +EXCLUDE_DOMAINS = ["news"] # 기본 = 뉴스만 제외, 나머지 전부 (allowlist 는 --domains 로 override) DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTURE_SPLIT_THRESHOLD=4000) BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단 -CANDIDATE_SQL = text(""" - SELECT d.id AS doc_id, d.extracted_text AS body, d.ai_domain AS ai_domain - FROM documents d - WHERE d.extracted_text IS NOT NULL - AND length(d.extracted_text) > :minchars - AND lower(split_part(d.ai_domain, '/', 1)) = ANY(:domains) - AND NOT EXISTS (SELECT 1 FROM document_chunks dc - WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section') - ORDER BY length(d.extracted_text) ASC -""") # 작은 doc 먼저 = 완료 doc 수 최대화 + 단일 mega-doc 예산 독식 방지 + +def _candidate_sql(allowlist): + """allowlist 있으면 그 domain 만, 없으면 EXCLUDE_DOMAINS(news) 제외 전부. + 작은 doc 먼저 = 완료 doc 수 최대화 + 단일 mega-doc 예산 독식 방지.""" + cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)" + if allowlist else + "lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)") + return text(f""" + SELECT d.id AS doc_id, d.extracted_text AS body, d.ai_domain AS ai_domain + FROM documents d + WHERE d.extracted_text IS NOT NULL + AND length(d.extracted_text) > :minchars + AND {cond} + AND NOT EXISTS (SELECT 1 FROM document_chunks dc + WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section') + ORDER BY length(d.extracted_text) ASC + """) + + +def _candidate_params(allowlist): + p = {"minchars": DOC_MIN_CHARS} + if allowlist: + p["domains"] = allowlist + else: + p["exclude"] = EXCLUDE_DOMAINS + return p + + +def _scope_label(allowlist): + return f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}" # 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외) LEAF_SQL = text(""" @@ -140,13 +160,14 @@ async def _analyze_doc_leaves(session, client, doc_id, doc_domain, model_name, s async def cmd_dry_run(args): + allowlist = args.domains.split(",") if args.domains else None engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) async with sm() as session: - rows = (await session.execute(CANDIDATE_SQL, - {"minchars": DOC_MIN_CHARS, "domains": DEFAULT_DOMAINS})).mappings().all() + rows = (await session.execute(_candidate_sql(allowlist), + _candidate_params(allowlist))).mappings().all() await engine.dispose() - print(f"[dry-run] 후보 doc {len(rows)} (domains={DEFAULT_DOMAINS}, >{DOC_MIN_CHARS}자, 미분해)") + print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist)}, >{DOC_MIN_CHARS}자, 미분해)") if rows: lens = [len(r["body"]) for r in rows] print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}") @@ -156,10 +177,11 @@ async def cmd_dry_run(args): async def cmd_run(args): + allowlist = args.domains.split(",") if args.domains else None deadline = _compute_deadline(args.deadline) stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp() _log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) " - f"domains={DEFAULT_DOMAINS}") + f"{_scope_label(allowlist)}") engine = _make_engine() sm = async_sessionmaker(engine, expire_on_commit=False) @@ -178,8 +200,8 @@ async def cmd_run(args): run_start = time.time() try: async with sm() as session: - cands = (await session.execute(CANDIDATE_SQL, - {"minchars": DOC_MIN_CHARS, "domains": DEFAULT_DOMAINS})).mappings().all() + cands = (await session.execute(_candidate_sql(allowlist), + _candidate_params(allowlist))).mappings().all() _log(f"후보 doc {len(cands)} 선별. 시작.") for c in cands: @@ -226,9 +248,11 @@ async def cmd_run(args): def main(): ap = argparse.ArgumentParser(description="오버나이트 hier 분해+절 분석 backfill (additive)") sub = ap.add_subparsers(dest="cmd", required=True) - sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)") + p_dry = sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)") + p_dry.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)") p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)") - p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00, 지나면 다음날)") + p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00 — 컨테이너 UTC 주의, 07:00 KST=22:00 UTC)") + p_run.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)") args = ap.parse_args() fn = {"dry-run": cmd_dry_run, "run": cmd_run}[args.cmd] asyncio.run(fn(args))