diff --git a/scripts/backfill_tier.py b/scripts/backfill_tier.py new file mode 100644 index 0000000..ece998d --- /dev/null +++ b/scripts/backfill_tier.py @@ -0,0 +1,173 @@ +"""PR-B B-1 레거시 문서 tier triage 백필 도구. + +사용법 (GPU 서버 fastapi 컨테이너 안에서 실행): + docker exec hyungi_document_server-fastapi-1 \\ + python /app/scripts/backfill_tier.py --domain safety --limit 50 --dry-run + docker exec hyungi_document_server-fastapi-1 \\ + python /app/scripts/backfill_tier.py --domain safety --limit 50 --apply + +대상 선정 옵션 (--domain): + safety : ai_domain LIKE 'Industrial_Safety%%' + law : source_channel='law_monitor' + manual : source_channel='manual' + news : source_channel='news' (대부분 짧아 deep escalate 거의 없음) + drive_sync : source_channel='drive_sync' + memo : source_channel='memo' + +선정 규칙: + ai_analysis_tier IS NULL — 아직 tier triage 안 탄 문서 + extracted_text IS NOT NULL — 본문이 있어야 classify 가능 + deleted_at IS NULL + ORDER BY created_at DESC — 최근 문서 우선 (사용자 관심도 가정) + +실행: + - --dry-run: SELECT 카운트 + 샘플 5건 출력, 큐 변경 없음 + - --apply: classify stage 에 pending 으로 INSERT (ON CONFLICT DO NOTHING) + 기존 pending/processing 행이 있으면 skip + +MLX 부하 고려: + legacy classify (primary 26B) + tier triage (4B) + deep_summary (26B escalate 시) + ~60~120초/건. 50건 ≈ 1시간. 100건+ 백필은 여러 세션 또는 야간 분할 권장. +""" + +from __future__ import annotations + +import argparse +import asyncio +import os +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app")) + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + + +DOMAIN_FILTERS: dict[str, str] = { + "safety": "ai_domain LIKE 'Industrial_Safety%'", + "law": "source_channel = 'law_monitor'", + "manual": "source_channel = 'manual'", + "news": "source_channel = 'news'", + "drive_sync": "source_channel = 'drive_sync'", + "memo": "source_channel = 'memo'", +} + + +COUNT_SQL = """ +SELECT COUNT(*) +FROM documents +WHERE deleted_at IS NULL + AND extracted_text IS NOT NULL + AND ai_analysis_tier IS NULL + AND {filter} +""" + +SAMPLE_SQL = """ +SELECT id, LEFT(title, 60) AS title, ai_domain, source_channel, + LENGTH(extracted_text) AS tlen, created_at::date AS created +FROM documents +WHERE deleted_at IS NULL + AND extracted_text IS NOT NULL + AND ai_analysis_tier IS NULL + AND {filter} +ORDER BY created_at DESC +LIMIT :n +""" + +APPLY_SQL = """ +INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts) +SELECT id, 'classify', 'pending', 0, 3 +FROM documents +WHERE deleted_at IS NULL + AND extracted_text IS NOT NULL + AND ai_analysis_tier IS NULL + AND {filter} +ORDER BY created_at DESC +LIMIT :n +ON CONFLICT DO NOTHING +RETURNING document_id +""" + + +async def run(domain: str, limit: int, apply: bool) -> int: + if domain not in DOMAIN_FILTERS: + print(f"[error] --domain 은 {list(DOMAIN_FILTERS)} 중 하나여야 함. got={domain!r}") + return 1 + + filter_clause = DOMAIN_FILTERS[domain] + database_url = os.getenv( + "DATABASE_URL", + "postgresql+asyncpg://pkm:pkm@postgres:5432/pkm", + ) + + engine = create_async_engine(database_url) + session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + + async with session_factory() as session: + total = (await session.execute( + text(COUNT_SQL.format(filter=filter_clause)) + )).scalar() + print(f"=== {domain} 백필 후보 ===") + print(f" 총 미처리 문서: {total} 건") + print(f" 이번 실행 대상 limit: {limit} 건") + print() + + if total == 0: + print("[done] 백필할 문서 없음.") + await engine.dispose() + return 0 + + print("=== 샘플 (상위 5건) ===") + rows = (await session.execute( + text(SAMPLE_SQL.format(filter=filter_clause)), + {"n": 5}, + )).all() + for r in rows: + print(f" id={r.id} ({r.created}) [{r.source_channel} / {r.ai_domain or '-'}] " + f"tlen={r.tlen} title={r.title}") + print() + + if not apply: + print(f"[dry-run] --apply 로 실제 enqueue 수행. 예상 {min(limit, total)} 건.") + await engine.dispose() + return 0 + + print(f"[apply] classify 큐에 {limit} 건 enqueue...") + result = await session.execute( + text(APPLY_SQL.format(filter=filter_clause)), + {"n": limit}, + ) + enqueued_ids = [r.document_id for r in result.all()] + await session.commit() + print(f" enqueue 성공: {len(enqueued_ids)} 건") + if enqueued_ids: + sample = enqueued_ids[:10] + print(f" 첫 10건 id: {sample}") + print() + print("경과 관찰 쿼리:") + print(" docker exec hyungi_document_server-postgres-1 psql -U pkm -d pkm -c \\") + print(" \"SELECT stage, status, COUNT(*) FROM processing_queue \\") + print(" WHERE created_at > NOW() - INTERVAL '3 hours' GROUP BY 1,2 ORDER BY 1,2\"") + + await engine.dispose() + return 0 + + +def main() -> None: + parser = argparse.ArgumentParser(description="PR-B tier triage 백필 도구") + parser.add_argument("--domain", required=True, choices=list(DOMAIN_FILTERS), + help="백필 대상 도메인 선택") + parser.add_argument("--limit", type=int, default=50, + help="한 번에 enqueue 할 최대 건수 (기본 50)") + mode = parser.add_mutually_exclusive_group(required=True) + mode.add_argument("--dry-run", action="store_true", help="쿼리만 실행, 큐 변경 없음") + mode.add_argument("--apply", action="store_true", help="classify 큐 실제 enqueue") + args = parser.parse_args() + + rc = asyncio.run(run(args.domain, args.limit, args.apply)) + sys.exit(rc) + + +if __name__ == "__main__": + main()