Files
Hyungi Ahn 95bcdb851b fix(ops): backfill 쿼리에 빈 extracted_text 제외 — 무한 retry 방지
3일 운영 결과 doc 4811, 5181 가 extracted_text='' (빈 문자열) 인데
IS NOT NULL 만 걸려 enqueue → classify_worker 의 not doc.extracted_text
truthy 체크에서 ValueError → max_attempts(3) 도달 → status=failed.
다음 backfill 사이클에서 다시 enqueue 되어 12회 반복, failed 24건 누적.

수정: tier_backfill.py + backfill_tier.py 양쪽 SQL 에
LENGTH(extracted_text) > 0 추가. 빈 문자열 문서는 enqueue 자체에서 제외.

기존 failed 24건 정리 SQL (사용자가 수동 실행):
  DELETE FROM processing_queue
  WHERE stage='classify' AND status='failed'
    AND error_message LIKE '%extracted_text%';

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 08:25:12 +09:00

177 lines
6.0 KiB
Python

"""PR-B B-1 레거시 문서 tier triage 백필 도구.
사용법 (GPU 서버 fastapi 컨테이너 안에서 실행):
docker exec hyungi_document_server-fastapi-1 \\
python /app/scripts/backfill_tier.py --domain safety --limit 50 --dry-run
docker exec hyungi_document_server-fastapi-1 \\
python /app/scripts/backfill_tier.py --domain safety --limit 50 --apply
대상 선정 옵션 (--domain):
safety : ai_domain LIKE 'Industrial_Safety%%'
law : source_channel='law_monitor'
manual : source_channel='manual'
news : source_channel='news' (대부분 짧아 deep escalate 거의 없음)
drive_sync : source_channel='drive_sync'
memo : source_channel='memo'
선정 규칙:
ai_analysis_tier IS NULL — 아직 tier triage 안 탄 문서
extracted_text IS NOT NULL — 본문이 있어야 classify 가능
deleted_at IS NULL
ORDER BY created_at DESC — 최근 문서 우선 (사용자 관심도 가정)
실행:
- --dry-run: SELECT 카운트 + 샘플 5건 출력, 큐 변경 없음
- --apply: classify stage 에 pending 으로 INSERT (ON CONFLICT DO NOTHING)
기존 pending/processing 행이 있으면 skip
MLX 부하 고려:
legacy classify (primary 26B) + tier triage (4B) + deep_summary (26B escalate 시)
~60~120초/건. 50건 ≈ 1시간. 100건+ 백필은 여러 세션 또는 야간 분할 권장.
"""
from __future__ import annotations
import argparse
import asyncio
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "app"))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
DOMAIN_FILTERS: dict[str, str] = {
"safety": "ai_domain LIKE 'Industrial_Safety%'",
"law": "source_channel = 'law_monitor'",
"manual": "source_channel = 'manual'",
"news": "source_channel = 'news'",
"drive_sync": "source_channel = 'drive_sync'",
"memo": "source_channel = 'memo'",
}
COUNT_SQL = """
SELECT COUNT(*)
FROM documents
WHERE deleted_at IS NULL
AND extracted_text IS NOT NULL
AND LENGTH(extracted_text) > 0
AND ai_analysis_tier IS NULL
AND {filter}
"""
SAMPLE_SQL = """
SELECT id, LEFT(title, 60) AS title, ai_domain, source_channel,
LENGTH(extracted_text) AS tlen, created_at::date AS created
FROM documents
WHERE deleted_at IS NULL
AND extracted_text IS NOT NULL
AND LENGTH(extracted_text) > 0
AND ai_analysis_tier IS NULL
AND {filter}
ORDER BY created_at DESC
LIMIT :n
"""
APPLY_SQL = """
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
SELECT id, 'classify', 'pending', 0, 3
FROM documents
WHERE deleted_at IS NULL
AND extracted_text IS NOT NULL
AND LENGTH(extracted_text) > 0
AND ai_analysis_tier IS NULL
AND {filter}
ORDER BY created_at DESC
LIMIT :n
ON CONFLICT DO NOTHING
RETURNING document_id
"""
async def run(domain: str, limit: int, apply: bool) -> int:
if domain not in DOMAIN_FILTERS:
print(f"[error] --domain 은 {list(DOMAIN_FILTERS)} 중 하나여야 함. got={domain!r}")
return 1
filter_clause = DOMAIN_FILTERS[domain]
database_url = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://pkm:pkm@postgres:5432/pkm",
)
engine = create_async_engine(database_url)
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with session_factory() as session:
total = (await session.execute(
text(COUNT_SQL.format(filter=filter_clause))
)).scalar()
print(f"=== {domain} 백필 후보 ===")
print(f" 총 미처리 문서: {total}")
print(f" 이번 실행 대상 limit: {limit}")
print()
if total == 0:
print("[done] 백필할 문서 없음.")
await engine.dispose()
return 0
print("=== 샘플 (상위 5건) ===")
rows = (await session.execute(
text(SAMPLE_SQL.format(filter=filter_clause)),
{"n": 5},
)).all()
for r in rows:
print(f" id={r.id} ({r.created}) [{r.source_channel} / {r.ai_domain or '-'}] "
f"tlen={r.tlen} title={r.title}")
print()
if not apply:
print(f"[dry-run] --apply 로 실제 enqueue 수행. 예상 {min(limit, total)} 건.")
await engine.dispose()
return 0
print(f"[apply] classify 큐에 {limit} 건 enqueue...")
result = await session.execute(
text(APPLY_SQL.format(filter=filter_clause)),
{"n": limit},
)
enqueued_ids = [r.document_id for r in result.all()]
await session.commit()
print(f" enqueue 성공: {len(enqueued_ids)}")
if enqueued_ids:
sample = enqueued_ids[:10]
print(f" 첫 10건 id: {sample}")
print()
print("경과 관찰 쿼리:")
print(" docker exec hyungi_document_server-postgres-1 psql -U pkm -d pkm -c \\")
print(" \"SELECT stage, status, COUNT(*) FROM processing_queue \\")
print(" WHERE created_at > NOW() - INTERVAL '3 hours' GROUP BY 1,2 ORDER BY 1,2\"")
await engine.dispose()
return 0
def main() -> None:
parser = argparse.ArgumentParser(description="PR-B tier triage 백필 도구")
parser.add_argument("--domain", required=True, choices=list(DOMAIN_FILTERS),
help="백필 대상 도메인 선택")
parser.add_argument("--limit", type=int, default=50,
help="한 번에 enqueue 할 최대 건수 (기본 50)")
mode = parser.add_mutually_exclusive_group(required=True)
mode.add_argument("--dry-run", action="store_true", help="쿼리만 실행, 큐 변경 없음")
mode.add_argument("--apply", action="store_true", help="classify 큐 실제 enqueue")
args = parser.parse_args()
rc = asyncio.run(run(args.domain, args.limit, args.apply))
sys.exit(rc)
if __name__ == "__main__":
main()