feat(ops): tier triage 레거시 백필 스크립트
PR-B B-1 배포 이전에 classify 된 6770건 레거시 문서에 대해 ai_tldr /
ai_bullets / ai_detail_summary 등 tier 산출물을 채우기 위한 백필 도구.
사용:
docker exec hyungi_document_server-fastapi-1 \
python /app/scripts/backfill_tier.py --domain safety --limit 50 --dry-run
docker exec hyungi_document_server-fastapi-1 \
python /app/scripts/backfill_tier.py --domain safety --limit 50 --apply
도메인 필터: safety / law / manual / news / drive_sync / memo
ORDER BY created_at DESC 로 최신 우선. ON CONFLICT DO NOTHING 이라
기존 pending/processing 행 있으면 중복 enqueue 방지.
MLX 26B 단일 Semaphore 경로라 처리 속도 ~1건/분. 50건 ≈ 1시간.
대량 백필은 야간 분할 권장. 이번 세션 Industrial_Safety 50건이
첫 smoke 대상.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,173 @@
|
||||
"""PR-B B-1 레거시 문서 tier triage 백필 도구.
|
||||
|
||||
사용법 (GPU 서버 fastapi 컨테이너 안에서 실행):
|
||||
docker exec hyungi_document_server-fastapi-1 \\
|
||||
python /app/scripts/backfill_tier.py --domain safety --limit 50 --dry-run
|
||||
docker exec hyungi_document_server-fastapi-1 \\
|
||||
python /app/scripts/backfill_tier.py --domain safety --limit 50 --apply
|
||||
|
||||
대상 선정 옵션 (--domain):
|
||||
safety : ai_domain LIKE 'Industrial_Safety%%'
|
||||
law : source_channel='law_monitor'
|
||||
manual : source_channel='manual'
|
||||
news : source_channel='news' (대부분 짧아 deep escalate 거의 없음)
|
||||
drive_sync : source_channel='drive_sync'
|
||||
memo : source_channel='memo'
|
||||
|
||||
선정 규칙:
|
||||
ai_analysis_tier IS NULL — 아직 tier triage 안 탄 문서
|
||||
extracted_text IS NOT NULL — 본문이 있어야 classify 가능
|
||||
deleted_at IS NULL
|
||||
ORDER BY created_at DESC — 최근 문서 우선 (사용자 관심도 가정)
|
||||
|
||||
실행:
|
||||
- --dry-run: SELECT 카운트 + 샘플 5건 출력, 큐 변경 없음
|
||||
- --apply: classify stage 에 pending 으로 INSERT (ON CONFLICT DO NOTHING)
|
||||
기존 pending/processing 행이 있으면 skip
|
||||
|
||||
MLX 부하 고려:
|
||||
legacy classify (primary 26B) + tier triage (4B) + deep_summary (26B escalate 시)
|
||||
~60~120초/건. 50건 ≈ 1시간. 100건+ 백필은 여러 세션 또는 야간 분할 권장.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
|
||||
DOMAIN_FILTERS: dict[str, str] = {
|
||||
"safety": "ai_domain LIKE 'Industrial_Safety%'",
|
||||
"law": "source_channel = 'law_monitor'",
|
||||
"manual": "source_channel = 'manual'",
|
||||
"news": "source_channel = 'news'",
|
||||
"drive_sync": "source_channel = 'drive_sync'",
|
||||
"memo": "source_channel = 'memo'",
|
||||
}
|
||||
|
||||
|
||||
COUNT_SQL = """
|
||||
SELECT COUNT(*)
|
||||
FROM documents
|
||||
WHERE deleted_at IS NULL
|
||||
AND extracted_text IS NOT NULL
|
||||
AND ai_analysis_tier IS NULL
|
||||
AND {filter}
|
||||
"""
|
||||
|
||||
SAMPLE_SQL = """
|
||||
SELECT id, LEFT(title, 60) AS title, ai_domain, source_channel,
|
||||
LENGTH(extracted_text) AS tlen, created_at::date AS created
|
||||
FROM documents
|
||||
WHERE deleted_at IS NULL
|
||||
AND extracted_text IS NOT NULL
|
||||
AND ai_analysis_tier IS NULL
|
||||
AND {filter}
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :n
|
||||
"""
|
||||
|
||||
APPLY_SQL = """
|
||||
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
|
||||
SELECT id, 'classify', 'pending', 0, 3
|
||||
FROM documents
|
||||
WHERE deleted_at IS NULL
|
||||
AND extracted_text IS NOT NULL
|
||||
AND ai_analysis_tier IS NULL
|
||||
AND {filter}
|
||||
ORDER BY created_at DESC
|
||||
LIMIT :n
|
||||
ON CONFLICT DO NOTHING
|
||||
RETURNING document_id
|
||||
"""
|
||||
|
||||
|
||||
async def run(domain: str, limit: int, apply: bool) -> int:
|
||||
if domain not in DOMAIN_FILTERS:
|
||||
print(f"[error] --domain 은 {list(DOMAIN_FILTERS)} 중 하나여야 함. got={domain!r}")
|
||||
return 1
|
||||
|
||||
filter_clause = DOMAIN_FILTERS[domain]
|
||||
database_url = os.getenv(
|
||||
"DATABASE_URL",
|
||||
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
|
||||
)
|
||||
|
||||
engine = create_async_engine(database_url)
|
||||
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
|
||||
|
||||
async with session_factory() as session:
|
||||
total = (await session.execute(
|
||||
text(COUNT_SQL.format(filter=filter_clause))
|
||||
)).scalar()
|
||||
print(f"=== {domain} 백필 후보 ===")
|
||||
print(f" 총 미처리 문서: {total} 건")
|
||||
print(f" 이번 실행 대상 limit: {limit} 건")
|
||||
print()
|
||||
|
||||
if total == 0:
|
||||
print("[done] 백필할 문서 없음.")
|
||||
await engine.dispose()
|
||||
return 0
|
||||
|
||||
print("=== 샘플 (상위 5건) ===")
|
||||
rows = (await session.execute(
|
||||
text(SAMPLE_SQL.format(filter=filter_clause)),
|
||||
{"n": 5},
|
||||
)).all()
|
||||
for r in rows:
|
||||
print(f" id={r.id} ({r.created}) [{r.source_channel} / {r.ai_domain or '-'}] "
|
||||
f"tlen={r.tlen} title={r.title}")
|
||||
print()
|
||||
|
||||
if not apply:
|
||||
print(f"[dry-run] --apply 로 실제 enqueue 수행. 예상 {min(limit, total)} 건.")
|
||||
await engine.dispose()
|
||||
return 0
|
||||
|
||||
print(f"[apply] classify 큐에 {limit} 건 enqueue...")
|
||||
result = await session.execute(
|
||||
text(APPLY_SQL.format(filter=filter_clause)),
|
||||
{"n": limit},
|
||||
)
|
||||
enqueued_ids = [r.document_id for r in result.all()]
|
||||
await session.commit()
|
||||
print(f" enqueue 성공: {len(enqueued_ids)} 건")
|
||||
if enqueued_ids:
|
||||
sample = enqueued_ids[:10]
|
||||
print(f" 첫 10건 id: {sample}")
|
||||
print()
|
||||
print("경과 관찰 쿼리:")
|
||||
print(" docker exec hyungi_document_server-postgres-1 psql -U pkm -d pkm -c \\")
|
||||
print(" \"SELECT stage, status, COUNT(*) FROM processing_queue \\")
|
||||
print(" WHERE created_at > NOW() - INTERVAL '3 hours' GROUP BY 1,2 ORDER BY 1,2\"")
|
||||
|
||||
await engine.dispose()
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="PR-B tier triage 백필 도구")
|
||||
parser.add_argument("--domain", required=True, choices=list(DOMAIN_FILTERS),
|
||||
help="백필 대상 도메인 선택")
|
||||
parser.add_argument("--limit", type=int, default=50,
|
||||
help="한 번에 enqueue 할 최대 건수 (기본 50)")
|
||||
mode = parser.add_mutually_exclusive_group(required=True)
|
||||
mode.add_argument("--dry-run", action="store_true", help="쿼리만 실행, 큐 변경 없음")
|
||||
mode.add_argument("--apply", action="store_true", help="classify 큐 실제 enqueue")
|
||||
args = parser.parse_args()
|
||||
|
||||
rc = asyncio.run(run(args.domain, args.limit, args.apply))
|
||||
sys.exit(rc)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user