Files
hyungi_document_server/scripts/news_chunk_country_backfill.py
T
Hyungi Ahn 73734d5585 fix(news): backfill INTERVAL bind 을 make_interval(days=>:days) 로 교체
asyncpg 가 :days || ' days' 의 int → text 암묵 변환을 거부함.
make_interval 사용으로 int 그대로 바인딩 가능.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-15 16:40:11 +09:00

168 lines
5.7 KiB
Python

"""PR-News-Prep-Layer-1 백필 — 최근 7일 news 문서 chunk 재생성 + country 채움.
사용법 (GPU 서버 fastapi 컨테이너 안에서 실행):
docker exec hyungi_document_server-fastapi-1 \\
python /app/scripts/news_chunk_country_backfill.py --days 7 --dry-run
docker exec hyungi_document_server-fastapi-1 \\
python /app/scripts/news_chunk_country_backfill.py --days 7 --apply
선정 규칙:
source_channel = 'news'
created_at >= NOW() - INTERVAL ':days days'
extracted_text IS NOT NULL
deleted_at IS NULL
ORDER BY created_at ASC (오래된 것부터 — 새 fire 와 시간차 분리)
실행 (--apply):
doc 단위 small batch — 한 doc 라도 실패하면 그 doc 만 skip + 로그.
1. 기존 chunks count 기록
2. DELETE FROM document_chunks WHERE doc_id = :id
3. chunk_worker.process(doc_id, session) 직접 호출
4. 신규 chunks + chunks_with_country count 검증
5. 매 50 doc 마다 progress 출력
bge-m3 embedding 호출 동시성 1 — 컨테이너 1개 자연 직렬. 100건 ~ 15~30분 예상.
"""
from __future__ import annotations
import argparse
import asyncio
import sys
from datetime import datetime, timezone
from pathlib import Path
_repo_root = Path(__file__).resolve().parent.parent
for _candidate in (_repo_root / "app", _repo_root):
if (_candidate / "core").is_dir() and str(_candidate) not in sys.path:
sys.path.insert(0, str(_candidate))
break
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
COUNT_SQL = """
SELECT COUNT(*)
FROM documents
WHERE source_channel = 'news'
AND deleted_at IS NULL
AND extracted_text IS NOT NULL
AND created_at >= NOW() - make_interval(days => :days)
"""
SAMPLE_SQL = """
SELECT d.id, LEFT(d.title, 60) AS title, d.ai_sub_group,
(SELECT COUNT(*) FROM document_chunks dc WHERE dc.doc_id = d.id) AS chunks,
(SELECT COUNT(*) FROM document_chunks dc WHERE dc.doc_id = d.id AND dc.country IS NOT NULL) AS chunks_w_country
FROM documents d
WHERE d.source_channel = 'news'
AND d.deleted_at IS NULL
AND d.extracted_text IS NOT NULL
AND d.created_at >= NOW() - make_interval(days => :days)
ORDER BY d.created_at ASC
LIMIT 5
"""
ID_LIST_SQL = """
SELECT d.id
FROM documents d
WHERE d.source_channel = 'news'
AND d.deleted_at IS NULL
AND d.extracted_text IS NOT NULL
AND d.created_at >= NOW() - make_interval(days => :days)
ORDER BY d.created_at ASC
"""
CHUNK_COUNT_SQL = """
SELECT
COUNT(*) AS total,
COUNT(*) FILTER (WHERE country IS NOT NULL) AS with_country
FROM document_chunks
WHERE doc_id = :doc_id
"""
async def run_dry_run(session: AsyncSession, days: int) -> None:
total = (await session.execute(text(COUNT_SQL), {"days": days})).scalar()
print(f"\n[dry-run] 최근 {days}일 news 후보: {total}")
print("\n샘플 5건:")
print(f" {'id':>6} | {'title':<60} | {'sub_group':<12} | chunks | with_country")
print(" " + "-" * 110)
rows = (await session.execute(text(SAMPLE_SQL), {"days": days})).all()
for r in rows:
print(f" {r.id:>6} | {r.title:<60} | {(r.ai_sub_group or ''):<12} | {r.chunks:>6} | {r.chunks_w_country:>12}")
async def run_apply(session: AsyncSession, days: int) -> None:
from workers.chunk_worker import process as chunk_process
rows = (await session.execute(text(ID_LIST_SQL), {"days": days})).all()
doc_ids = [r.id for r in rows]
total = len(doc_ids)
print(f"\n[apply] 최근 {days}일 news doc 백필 시작: {total}")
started_at = datetime.now(timezone.utc)
ok = 0
skipped = 0
chunks_created = 0
chunks_with_country = 0
for idx, doc_id in enumerate(doc_ids, 1):
try:
# 1. doc 단위 delete (기존 chunks 정리)
await session.execute(
text("DELETE FROM document_chunks WHERE doc_id = :doc_id"),
{"doc_id": doc_id},
)
# 2. chunk_worker.process 직접 호출 (chunking + embedding + country lookup)
await chunk_process(doc_id, session)
await session.commit()
# 3. 결과 검증
r = (await session.execute(text(CHUNK_COUNT_SQL), {"doc_id": doc_id})).one()
chunks_created += r.total
chunks_with_country += r.with_country
ok += 1
except Exception as e:
await session.rollback()
skipped += 1
print(f" [skip] doc_id={doc_id}: {type(e).__name__}: {e}")
if idx % 50 == 0:
elapsed = (datetime.now(timezone.utc) - started_at).total_seconds()
rate = idx / elapsed if elapsed > 0 else 0
print(
f" [progress] {idx}/{total} "
f"(ok={ok} skipped={skipped} chunks={chunks_created} "
f"with_country={chunks_with_country} rate={rate:.1f}/s)"
)
elapsed = (datetime.now(timezone.utc) - started_at).total_seconds()
print(
f"\n[done] total={total} ok={ok} skipped={skipped} "
f"chunks_created={chunks_created} chunks_with_country={chunks_with_country} "
f"elapsed={elapsed:.0f}s"
)
async def main() -> None:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--days", type=int, default=7, help="최근 N일 (기본 7)")
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("--dry-run", action="store_true")
mode.add_argument("--apply", action="store_true")
args = parser.parse_args()
from core.database import async_session
async with async_session() as session:
if args.dry_run:
await run_dry_run(session, args.days)
else:
await run_apply(session, args.days)
if __name__ == "__main__":
asyncio.run(main())