73734d5585
asyncpg 가 :days || ' days' 의 int → text 암묵 변환을 거부함. make_interval 사용으로 int 그대로 바인딩 가능. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
168 lines
5.7 KiB
Python
168 lines
5.7 KiB
Python
"""PR-News-Prep-Layer-1 백필 — 최근 7일 news 문서 chunk 재생성 + country 채움.
|
|
|
|
사용법 (GPU 서버 fastapi 컨테이너 안에서 실행):
|
|
docker exec hyungi_document_server-fastapi-1 \\
|
|
python /app/scripts/news_chunk_country_backfill.py --days 7 --dry-run
|
|
docker exec hyungi_document_server-fastapi-1 \\
|
|
python /app/scripts/news_chunk_country_backfill.py --days 7 --apply
|
|
|
|
선정 규칙:
|
|
source_channel = 'news'
|
|
created_at >= NOW() - INTERVAL ':days days'
|
|
extracted_text IS NOT NULL
|
|
deleted_at IS NULL
|
|
ORDER BY created_at ASC (오래된 것부터 — 새 fire 와 시간차 분리)
|
|
|
|
실행 (--apply):
|
|
doc 단위 small batch — 한 doc 라도 실패하면 그 doc 만 skip + 로그.
|
|
1. 기존 chunks count 기록
|
|
2. DELETE FROM document_chunks WHERE doc_id = :id
|
|
3. chunk_worker.process(doc_id, session) 직접 호출
|
|
4. 신규 chunks + chunks_with_country count 검증
|
|
5. 매 50 doc 마다 progress 출력
|
|
|
|
bge-m3 embedding 호출 동시성 1 — 컨테이너 1개 자연 직렬. 100건 ~ 15~30분 예상.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
_repo_root = Path(__file__).resolve().parent.parent
|
|
for _candidate in (_repo_root / "app", _repo_root):
|
|
if (_candidate / "core").is_dir() and str(_candidate) not in sys.path:
|
|
sys.path.insert(0, str(_candidate))
|
|
break
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
COUNT_SQL = """
|
|
SELECT COUNT(*)
|
|
FROM documents
|
|
WHERE source_channel = 'news'
|
|
AND deleted_at IS NULL
|
|
AND extracted_text IS NOT NULL
|
|
AND created_at >= NOW() - make_interval(days => :days)
|
|
"""
|
|
|
|
SAMPLE_SQL = """
|
|
SELECT d.id, LEFT(d.title, 60) AS title, d.ai_sub_group,
|
|
(SELECT COUNT(*) FROM document_chunks dc WHERE dc.doc_id = d.id) AS chunks,
|
|
(SELECT COUNT(*) FROM document_chunks dc WHERE dc.doc_id = d.id AND dc.country IS NOT NULL) AS chunks_w_country
|
|
FROM documents d
|
|
WHERE d.source_channel = 'news'
|
|
AND d.deleted_at IS NULL
|
|
AND d.extracted_text IS NOT NULL
|
|
AND d.created_at >= NOW() - make_interval(days => :days)
|
|
ORDER BY d.created_at ASC
|
|
LIMIT 5
|
|
"""
|
|
|
|
ID_LIST_SQL = """
|
|
SELECT d.id
|
|
FROM documents d
|
|
WHERE d.source_channel = 'news'
|
|
AND d.deleted_at IS NULL
|
|
AND d.extracted_text IS NOT NULL
|
|
AND d.created_at >= NOW() - make_interval(days => :days)
|
|
ORDER BY d.created_at ASC
|
|
"""
|
|
|
|
CHUNK_COUNT_SQL = """
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COUNT(*) FILTER (WHERE country IS NOT NULL) AS with_country
|
|
FROM document_chunks
|
|
WHERE doc_id = :doc_id
|
|
"""
|
|
|
|
|
|
async def run_dry_run(session: AsyncSession, days: int) -> None:
|
|
total = (await session.execute(text(COUNT_SQL), {"days": days})).scalar()
|
|
print(f"\n[dry-run] 최근 {days}일 news 후보: {total}건")
|
|
print("\n샘플 5건:")
|
|
print(f" {'id':>6} | {'title':<60} | {'sub_group':<12} | chunks | with_country")
|
|
print(" " + "-" * 110)
|
|
rows = (await session.execute(text(SAMPLE_SQL), {"days": days})).all()
|
|
for r in rows:
|
|
print(f" {r.id:>6} | {r.title:<60} | {(r.ai_sub_group or ''):<12} | {r.chunks:>6} | {r.chunks_w_country:>12}")
|
|
|
|
|
|
async def run_apply(session: AsyncSession, days: int) -> None:
|
|
from workers.chunk_worker import process as chunk_process
|
|
|
|
rows = (await session.execute(text(ID_LIST_SQL), {"days": days})).all()
|
|
doc_ids = [r.id for r in rows]
|
|
total = len(doc_ids)
|
|
print(f"\n[apply] 최근 {days}일 news doc 백필 시작: {total}건")
|
|
started_at = datetime.now(timezone.utc)
|
|
|
|
ok = 0
|
|
skipped = 0
|
|
chunks_created = 0
|
|
chunks_with_country = 0
|
|
|
|
for idx, doc_id in enumerate(doc_ids, 1):
|
|
try:
|
|
# 1. doc 단위 delete (기존 chunks 정리)
|
|
await session.execute(
|
|
text("DELETE FROM document_chunks WHERE doc_id = :doc_id"),
|
|
{"doc_id": doc_id},
|
|
)
|
|
# 2. chunk_worker.process 직접 호출 (chunking + embedding + country lookup)
|
|
await chunk_process(doc_id, session)
|
|
await session.commit()
|
|
|
|
# 3. 결과 검증
|
|
r = (await session.execute(text(CHUNK_COUNT_SQL), {"doc_id": doc_id})).one()
|
|
chunks_created += r.total
|
|
chunks_with_country += r.with_country
|
|
ok += 1
|
|
except Exception as e:
|
|
await session.rollback()
|
|
skipped += 1
|
|
print(f" [skip] doc_id={doc_id}: {type(e).__name__}: {e}")
|
|
|
|
if idx % 50 == 0:
|
|
elapsed = (datetime.now(timezone.utc) - started_at).total_seconds()
|
|
rate = idx / elapsed if elapsed > 0 else 0
|
|
print(
|
|
f" [progress] {idx}/{total} "
|
|
f"(ok={ok} skipped={skipped} chunks={chunks_created} "
|
|
f"with_country={chunks_with_country} rate={rate:.1f}/s)"
|
|
)
|
|
|
|
elapsed = (datetime.now(timezone.utc) - started_at).total_seconds()
|
|
print(
|
|
f"\n[done] total={total} ok={ok} skipped={skipped} "
|
|
f"chunks_created={chunks_created} chunks_with_country={chunks_with_country} "
|
|
f"elapsed={elapsed:.0f}s"
|
|
)
|
|
|
|
|
|
async def main() -> None:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--days", type=int, default=7, help="최근 N일 (기본 7)")
|
|
mode = parser.add_mutually_exclusive_group(required=True)
|
|
mode.add_argument("--dry-run", action="store_true")
|
|
mode.add_argument("--apply", action="store_true")
|
|
args = parser.parse_args()
|
|
|
|
from core.database import async_session
|
|
|
|
async with async_session() as session:
|
|
if args.dry_run:
|
|
await run_dry_run(session, args.days)
|
|
else:
|
|
await run_apply(session, args.days)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|