"""PR-B B-1 레거시 문서 tier triage 야간 자동 백필 스케줄러. plan: ~/.claude/plans/swirling-swimming-liskov.md — 백필 장기 운영. 매 30분마다 트리거되어 (KST 00:00~06:00 시간대에만 실제 enqueue): 1. 우선순위 도메인별 NULL 문서 25건씩 classify 큐 재투입 2. 우선순위: safety > manual (drive_sync / memo / news / law_monitor 는 본 스케줄러 제외) - news/memo: 분야 확정, classify 무가치 (legacy 결정) - law_monitor: classify_worker 가 진입 시 skip 처리 (plan stateless-churning-raccoon.md). backfill 에서 enqueue 해도 skip 만 반복되므로 시작부터 제외. 3. classify 큐가 이미 많으면 스킵 (MLX 부하 보호) 사유: - 6720건 레거시 수동 처리 부담 (170h MLX 점유). - 야간 0~6시 사용자 활동 낮은 시간대 활용. - 30분 간격 × 25건 = 야간당 150건. 6720 / 150 = 45일이면 전체 소화. - 한 번에 너무 많이 enqueue 하면 R2 backlog guard 가 soft escalate 를 과도 suppress. 운영 off-switch: settings.ai.tier_backfill.enabled = false 면 전면 중단 (config.yaml 에서). """ from __future__ import annotations from datetime import datetime from zoneinfo import ZoneInfo from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from core.config import settings from core.database import async_session from core.utils import setup_logger logger = setup_logger("tier_backfill") # 야간 운영 시간 (KST, Asia/Seoul). 이 창 바깥에서는 스케줄러가 돌아도 skip. NIGHT_START_HOUR = 0 # 00:00 KST 부터 NIGHT_END_HOUR = 6 # 06:00 KST 까지 (end exclusive) # 한 번 트리거 시 최대 enqueue. MLX 단일 Semaphore 보호. BATCH_SIZE = 25 # classify 큐가 이 이상 쌓여있으면 skip (파이프 적체 방지) QUEUE_SKIP_THRESHOLD = 40 # 우선순위 도메인 (첫 번째가 후보 먼저 소진) # law_monitor 제외: classify_worker 가 진입 시 skip — backfill 무한 루프 방지. DOMAIN_PRIORITY: list[tuple[str, str]] = [ ("safety", "ai_domain LIKE 'Industrial_Safety%' AND source_channel != 'law_monitor'"), ("manual", "source_channel = 'manual'"), ] # R12: filter_clause 는 SQL 에 직접 보간되므로 이 allowlist(DOMAIN_PRIORITY 출처) 통과분만 # 허용 — 현재 모듈 상수라 injection 경로 0 이나, 외부 입력화 시 즉시 차단하는 final gate # (retrieval_service 의 _VALID_DOCS_TABLE allowlist 정본 대비 비대칭 해소). _ALLOWED_FILTER_CLAUSES: frozenset[str] = frozenset(c for _, c in DOMAIN_PRIORITY) async def _classify_pending(session: AsyncSession) -> int: return int(await session.scalar(text(""" SELECT COUNT(*) FROM processing_queue WHERE stage = 'classify' AND status IN ('pending', 'processing') """)) or 0) async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int) -> int: """도메인 조건 + NULL tier 문서 limit 건 classify 큐에 enqueue. 반환 = 실제 enqueue 수. extracted_text 빈 문자열 (LENGTH=0) 도 제외 — classify_worker 는 not doc.extracted_text truthy 체크라 빈 문자열에서 ValueError raise. 무한 retry 루프 방지. """ # R12: SQL 직접 보간 전 allowlist final gate. if filter_clause not in _ALLOWED_FILTER_CLAUSES: raise ValueError(f"비허용 filter_clause (allowlist 외): {filter_clause!r}") sql = text(f""" INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts) SELECT id, 'classify', 'pending', 0, 3 FROM documents WHERE deleted_at IS NULL AND extracted_text IS NOT NULL AND LENGTH(extracted_text) > 0 AND ai_analysis_tier IS NULL AND {filter_clause} ORDER BY created_at DESC LIMIT :n ON CONFLICT DO NOTHING RETURNING document_id """) result = await session.execute(sql, {"n": limit}) return len(result.all()) async def run() -> None: """APScheduler 가 30분 주기로 호출. 야간만 실제 작업.""" now = datetime.now(tz=ZoneInfo("Asia/Seoul")) hour = now.hour # off-switch if not getattr(getattr(settings.ai, "tier_backfill", None), "enabled", True): return # 야간 시간대 아니면 skip (조용히) if not (NIGHT_START_HOUR <= hour < NIGHT_END_HOUR): return async with async_session() as session: pending = await _classify_pending(session) if pending >= QUEUE_SKIP_THRESHOLD: logger.info( f"[tier_backfill] skip — classify 큐 {pending} >= {QUEUE_SKIP_THRESHOLD} " f"(MLX 부하 보호)" ) return remaining = BATCH_SIZE total_enqueued = 0 for domain_name, filter_clause in DOMAIN_PRIORITY: if remaining <= 0: break try: n = await _enqueue_domain(session, filter_clause, remaining) except Exception as exc: logger.exception( f"[tier_backfill] enqueue 실패 domain={domain_name}: {exc}" ) continue if n > 0: logger.info(f"[tier_backfill] enqueue {n} 건 from domain={domain_name}") total_enqueued += n remaining -= n if total_enqueued == 0: logger.info("[tier_backfill] 소진 — 우선순위 3 도메인 모두 0 건") else: await session.commit() logger.info( f"[tier_backfill] batch 완료 enqueued={total_enqueued} " f"hour={hour} kst now={now.isoformat()}" )