From 08cf676c268f490691f0df8a47b5a86cadbeaea1 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Fri, 15 May 2026 16:35:38 +0900 Subject: [PATCH] =?UTF-8?q?fix(news):=20news=20=EB=AC=B8=EC=84=9C=20chunk?= =?UTF-8?q?=20stage=20enqueue=20=EC=B6=94=EA=B0=80=20+=207=EC=9D=BC=20?= =?UTF-8?q?=EB=B0=B1=ED=95=84=20=EC=8A=A4=ED=81=AC=EB=A6=BD=ED=8A=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit document_chunks.country 가 7일 분포 기준 99.9% NULL 이었던 root cause = news_collector 가 summarize + embed 만 enqueue 하고 chunk 를 enqueue 하지 않아 chunk_worker 가 news 문서에 한 번도 안 돌고 있었음. queue_consumer.next_stages 의 summarize 키 부재가 follow-up 미연결 원인. news 외 summarize 흐름 부수영향 회피를 위해 next_stages 가 아니라 news_collector RSS/API 양쪽에 chunk enqueue 1줄씩 명시 추가. days_old <= 30 가드 안에서 embed 와 동일 정책. scripts/news_chunk_country_backfill.py — doc 단위 small batch, 실패 doc skip, 50건마다 progress. queue 우회 직접 chunk_worker.process 호출로 timing 통제. Gate (PR closure): A) chunked_doc_pct > 95% 최근 7일 news doc 중 chunk 보유 비율 B) country null_pct < 5% 최근 7일 news chunk country NULL 비율 plan: ~/.claude/plans/7-whimsical-crab.md (PR-News-Prep-Layer-1) Co-Authored-By: Claude Opus 4.7 (1M context) --- app/workers/news_collector.py | 4 +- scripts/news_chunk_country_backfill.py | 163 +++++++++++++++++++++++++ 2 files changed, 166 insertions(+), 1 deletion(-) create mode 100644 scripts/news_chunk_country_backfill.py diff --git a/app/workers/news_collector.py b/app/workers/news_collector.py index a1cbdea..99d515f 100644 --- a/app/workers/news_collector.py +++ b/app/workers/news_collector.py @@ -217,11 +217,12 @@ async def _fetch_rss(session, source: NewsSource) -> int: session.add(doc) await session.flush() - # summarize + embed 등록 (classify 불필요) + # summarize + embed + chunk 등록 (classify 불필요) await enqueue_stage(session, doc.id, "summarize") days_old = (datetime.now(timezone.utc) - pub_dt).days if days_old <= 30: await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") count += 1 @@ -313,6 +314,7 @@ async def _fetch_api(session, source: NewsSource) -> int: days_old = (datetime.now(timezone.utc) - pub_dt).days if days_old <= 30: await enqueue_stage(session, doc.id, "embed") + await enqueue_stage(session, doc.id, "chunk") count += 1 diff --git a/scripts/news_chunk_country_backfill.py b/scripts/news_chunk_country_backfill.py new file mode 100644 index 0000000..39b8533 --- /dev/null +++ b/scripts/news_chunk_country_backfill.py @@ -0,0 +1,163 @@ +"""PR-News-Prep-Layer-1 백필 — 최근 7일 news 문서 chunk 재생성 + country 채움. + +사용법 (GPU 서버 fastapi 컨테이너 안에서 실행): + docker exec hyungi_document_server-fastapi-1 \\ + python /app/scripts/news_chunk_country_backfill.py --days 7 --dry-run + docker exec hyungi_document_server-fastapi-1 \\ + python /app/scripts/news_chunk_country_backfill.py --days 7 --apply + +선정 규칙: + source_channel = 'news' + created_at >= NOW() - INTERVAL ':days days' + extracted_text IS NOT NULL + deleted_at IS NULL + ORDER BY created_at ASC (오래된 것부터 — 새 fire 와 시간차 분리) + +실행 (--apply): + doc 단위 small batch — 한 doc 라도 실패하면 그 doc 만 skip + 로그. + 1. 기존 chunks count 기록 + 2. DELETE FROM document_chunks WHERE doc_id = :id + 3. chunk_worker.process(doc_id, session) 직접 호출 + 4. 신규 chunks + chunks_with_country count 검증 + 5. 매 50 doc 마다 progress 출력 + +bge-m3 embedding 호출 동시성 1 — 컨테이너 1개 자연 직렬. 100건 ~ 15~30분 예상. +""" + +from __future__ import annotations + +import argparse +import asyncio +import sys +from datetime import datetime, timezone +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app")) + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + + +COUNT_SQL = """ +SELECT COUNT(*) +FROM documents +WHERE source_channel = 'news' + AND deleted_at IS NULL + AND extracted_text IS NOT NULL + AND created_at >= NOW() - (:days || ' days')::INTERVAL +""" + +SAMPLE_SQL = """ +SELECT d.id, LEFT(d.title, 60) AS title, d.ai_sub_group, + (SELECT COUNT(*) FROM document_chunks dc WHERE dc.doc_id = d.id) AS chunks, + (SELECT COUNT(*) FROM document_chunks dc WHERE dc.doc_id = d.id AND dc.country IS NOT NULL) AS chunks_w_country +FROM documents d +WHERE d.source_channel = 'news' + AND d.deleted_at IS NULL + AND d.extracted_text IS NOT NULL + AND d.created_at >= NOW() - (:days || ' days')::INTERVAL +ORDER BY d.created_at ASC +LIMIT 5 +""" + +ID_LIST_SQL = """ +SELECT d.id +FROM documents d +WHERE d.source_channel = 'news' + AND d.deleted_at IS NULL + AND d.extracted_text IS NOT NULL + AND d.created_at >= NOW() - (:days || ' days')::INTERVAL +ORDER BY d.created_at ASC +""" + +CHUNK_COUNT_SQL = """ +SELECT + COUNT(*) AS total, + COUNT(*) FILTER (WHERE country IS NOT NULL) AS with_country +FROM document_chunks +WHERE doc_id = :doc_id +""" + + +async def run_dry_run(session: AsyncSession, days: int) -> None: + total = (await session.execute(text(COUNT_SQL), {"days": days})).scalar() + print(f"\n[dry-run] 최근 {days}일 news 후보: {total}건") + print("\n샘플 5건:") + print(f" {'id':>6} | {'title':<60} | {'sub_group':<12} | chunks | with_country") + print(" " + "-" * 110) + rows = (await session.execute(text(SAMPLE_SQL), {"days": days})).all() + for r in rows: + print(f" {r.id:>6} | {r.title:<60} | {(r.ai_sub_group or ''):<12} | {r.chunks:>6} | {r.chunks_w_country:>12}") + + +async def run_apply(session: AsyncSession, days: int) -> None: + from workers.chunk_worker import process as chunk_process + + rows = (await session.execute(text(ID_LIST_SQL), {"days": days})).all() + doc_ids = [r.id for r in rows] + total = len(doc_ids) + print(f"\n[apply] 최근 {days}일 news doc 백필 시작: {total}건") + started_at = datetime.now(timezone.utc) + + ok = 0 + skipped = 0 + chunks_created = 0 + chunks_with_country = 0 + + for idx, doc_id in enumerate(doc_ids, 1): + try: + # 1. doc 단위 delete (기존 chunks 정리) + await session.execute( + text("DELETE FROM document_chunks WHERE doc_id = :doc_id"), + {"doc_id": doc_id}, + ) + # 2. chunk_worker.process 직접 호출 (chunking + embedding + country lookup) + await chunk_process(doc_id, session) + await session.commit() + + # 3. 결과 검증 + r = (await session.execute(text(CHUNK_COUNT_SQL), {"doc_id": doc_id})).one() + chunks_created += r.total + chunks_with_country += r.with_country + ok += 1 + except Exception as e: + await session.rollback() + skipped += 1 + print(f" [skip] doc_id={doc_id}: {type(e).__name__}: {e}") + + if idx % 50 == 0: + elapsed = (datetime.now(timezone.utc) - started_at).total_seconds() + rate = idx / elapsed if elapsed > 0 else 0 + print( + f" [progress] {idx}/{total} " + f"(ok={ok} skipped={skipped} chunks={chunks_created} " + f"with_country={chunks_with_country} rate={rate:.1f}/s)" + ) + + elapsed = (datetime.now(timezone.utc) - started_at).total_seconds() + print( + f"\n[done] total={total} ok={ok} skipped={skipped} " + f"chunks_created={chunks_created} chunks_with_country={chunks_with_country} " + f"elapsed={elapsed:.0f}s" + ) + + +async def main() -> None: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--days", type=int, default=7, help="최근 N일 (기본 7)") + mode = parser.add_mutually_exclusive_group(required=True) + mode.add_argument("--dry-run", action="store_true") + mode.add_argument("--apply", action="store_true") + args = parser.parse_args() + + from core.database import async_session + + async with async_session() as session: + if args.dry_run: + await run_dry_run(session, args.days) + else: + await run_apply(session, args.days) + + +if __name__ == "__main__": + asyncio.run(main())