"""PR-News-Prep-Layer-1 백필 — 최근 7일 news 문서 chunk 재생성 + country 채움. 사용법 (GPU 서버 fastapi 컨테이너 안에서 실행): docker exec hyungi_document_server-fastapi-1 \\ python /app/scripts/news_chunk_country_backfill.py --days 7 --dry-run docker exec hyungi_document_server-fastapi-1 \\ python /app/scripts/news_chunk_country_backfill.py --days 7 --apply 선정 규칙: source_channel = 'news' created_at >= NOW() - INTERVAL ':days days' extracted_text IS NOT NULL deleted_at IS NULL ORDER BY created_at ASC (오래된 것부터 — 새 fire 와 시간차 분리) 실행 (--apply): doc 단위 small batch — 한 doc 라도 실패하면 그 doc 만 skip + 로그. 1. 기존 chunks count 기록 2. DELETE FROM document_chunks WHERE doc_id = :id 3. chunk_worker.process(doc_id, session) 직접 호출 4. 신규 chunks + chunks_with_country count 검증 5. 매 50 doc 마다 progress 출력 bge-m3 embedding 호출 동시성 1 — 컨테이너 1개 자연 직렬. 100건 ~ 15~30분 예상. """ from __future__ import annotations import argparse import asyncio import sys from datetime import datetime, timezone from pathlib import Path _repo_root = Path(__file__).resolve().parent.parent for _candidate in (_repo_root / "app", _repo_root): if (_candidate / "core").is_dir() and str(_candidate) not in sys.path: sys.path.insert(0, str(_candidate)) break from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession COUNT_SQL = """ SELECT COUNT(*) FROM documents WHERE source_channel = 'news' AND deleted_at IS NULL AND extracted_text IS NOT NULL AND created_at >= NOW() - make_interval(days => :days) """ SAMPLE_SQL = """ SELECT d.id, LEFT(d.title, 60) AS title, d.ai_sub_group, (SELECT COUNT(*) FROM document_chunks dc WHERE dc.doc_id = d.id) AS chunks, (SELECT COUNT(*) FROM document_chunks dc WHERE dc.doc_id = d.id AND dc.country IS NOT NULL) AS chunks_w_country FROM documents d WHERE d.source_channel = 'news' AND d.deleted_at IS NULL AND d.extracted_text IS NOT NULL AND d.created_at >= NOW() - make_interval(days => :days) ORDER BY d.created_at ASC LIMIT 5 """ ID_LIST_SQL = """ SELECT d.id FROM documents d WHERE d.source_channel = 'news' AND d.deleted_at IS NULL AND d.extracted_text IS NOT NULL AND d.created_at >= NOW() - make_interval(days => :days) ORDER BY d.created_at ASC """ CHUNK_COUNT_SQL = """ SELECT COUNT(*) AS total, COUNT(*) FILTER (WHERE country IS NOT NULL) AS with_country FROM document_chunks WHERE doc_id = :doc_id """ async def run_dry_run(session: AsyncSession, days: int) -> None: total = (await session.execute(text(COUNT_SQL), {"days": days})).scalar() print(f"\n[dry-run] 최근 {days}일 news 후보: {total}건") print("\n샘플 5건:") print(f" {'id':>6} | {'title':<60} | {'sub_group':<12} | chunks | with_country") print(" " + "-" * 110) rows = (await session.execute(text(SAMPLE_SQL), {"days": days})).all() for r in rows: print(f" {r.id:>6} | {r.title:<60} | {(r.ai_sub_group or ''):<12} | {r.chunks:>6} | {r.chunks_w_country:>12}") async def run_apply(session: AsyncSession, days: int) -> None: from workers.chunk_worker import process as chunk_process rows = (await session.execute(text(ID_LIST_SQL), {"days": days})).all() doc_ids = [r.id for r in rows] total = len(doc_ids) print(f"\n[apply] 최근 {days}일 news doc 백필 시작: {total}건") started_at = datetime.now(timezone.utc) ok = 0 skipped = 0 chunks_created = 0 chunks_with_country = 0 for idx, doc_id in enumerate(doc_ids, 1): try: # 1. doc 단위 delete (기존 chunks 정리) await session.execute( text("DELETE FROM document_chunks WHERE doc_id = :doc_id"), {"doc_id": doc_id}, ) # 2. chunk_worker.process 직접 호출 (chunking + embedding + country lookup) await chunk_process(doc_id, session) await session.commit() # 3. 결과 검증 r = (await session.execute(text(CHUNK_COUNT_SQL), {"doc_id": doc_id})).one() chunks_created += r.total chunks_with_country += r.with_country ok += 1 except Exception as e: await session.rollback() skipped += 1 print(f" [skip] doc_id={doc_id}: {type(e).__name__}: {e}") if idx % 50 == 0: elapsed = (datetime.now(timezone.utc) - started_at).total_seconds() rate = idx / elapsed if elapsed > 0 else 0 print( f" [progress] {idx}/{total} " f"(ok={ok} skipped={skipped} chunks={chunks_created} " f"with_country={chunks_with_country} rate={rate:.1f}/s)" ) elapsed = (datetime.now(timezone.utc) - started_at).total_seconds() print( f"\n[done] total={total} ok={ok} skipped={skipped} " f"chunks_created={chunks_created} chunks_with_country={chunks_with_country} " f"elapsed={elapsed:.0f}s" ) async def main() -> None: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--days", type=int, default=7, help="최근 N일 (기본 7)") mode = parser.add_mutually_exclusive_group(required=True) mode.add_argument("--dry-run", action="store_true") mode.add_argument("--apply", action="store_true") args = parser.parse_args() from core.database import async_session async with async_session() as session: if args.dry_run: await run_dry_run(session, args.days) else: await run_apply(session, args.days) if __name__ == "__main__": asyncio.run(main())