diff --git a/app/main.py b/app/main.py index 0a91a48..15c3804 100644 --- a/app/main.py +++ b/app/main.py @@ -38,6 +38,7 @@ async def lifespan(app: FastAPI): from workers.mailplus_archive import run as mailplus_run from workers.news_collector import run as news_collector_run from workers.queue_consumer import consume_queue + from workers.tier_backfill import run as tier_backfill_run from workers.upload_cleanup import cleanup_orphan_uploads # 시작: DB 연결 확인 @@ -58,6 +59,9 @@ async def lifespan(app: FastAPI): scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer") scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher") scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup") + # PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue. + # safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화. + scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill") # 일일 스케줄 (KST) scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor") scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning") diff --git a/app/workers/tier_backfill.py b/app/workers/tier_backfill.py new file mode 100644 index 0000000..38d9686 --- /dev/null +++ b/app/workers/tier_backfill.py @@ -0,0 +1,125 @@ +"""PR-B B-1 레거시 문서 tier triage 야간 자동 백필 스케줄러. + +plan: ~/.claude/plans/swirling-swimming-liskov.md — 백필 장기 운영. + +매 30분마다 트리거되어 (KST 00:00~06:00 시간대에만 실제 enqueue): + 1. 우선순위 도메인별 NULL 문서 25건씩 classify 큐 재투입 + 2. 우선순위: safety > law > manual + (drive_sync / memo / news 는 별도 판단 — 본 스케줄러 제외) + 3. classify 큐가 이미 많으면 스킵 (MLX 부하 보호) + +사유: + - 6720건 레거시 수동 처리 부담 (170h MLX 점유). + - 야간 0~6시 사용자 활동 낮은 시간대 활용. + - 30분 간격 × 25건 = 야간당 150건. 6720 / 150 = 45일이면 전체 소화. + - 한 번에 너무 많이 enqueue 하면 R2 backlog guard 가 soft escalate 를 과도 suppress. + +운영 off-switch: + settings.ai.tier_backfill.enabled = false 면 전면 중단 (config.yaml 에서). +""" + +from __future__ import annotations + +from datetime import datetime +from zoneinfo import ZoneInfo + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from core.config import settings +from core.database import async_session +from core.utils import setup_logger + +logger = setup_logger("tier_backfill") + +# 야간 운영 시간 (KST, Asia/Seoul). 이 창 바깥에서는 스케줄러가 돌아도 skip. +NIGHT_START_HOUR = 0 # 00:00 KST 부터 +NIGHT_END_HOUR = 6 # 06:00 KST 까지 (end exclusive) + +# 한 번 트리거 시 최대 enqueue. MLX 단일 Semaphore 보호. +BATCH_SIZE = 25 + +# classify 큐가 이 이상 쌓여있으면 skip (파이프 적체 방지) +QUEUE_SKIP_THRESHOLD = 40 + +# 우선순위 도메인 (첫 번째가 후보 먼저 소진) +DOMAIN_PRIORITY: list[tuple[str, str]] = [ + ("safety", "ai_domain LIKE 'Industrial_Safety%'"), + ("law", "source_channel = 'law_monitor'"), + ("manual", "source_channel = 'manual'"), +] + + +async def _classify_pending(session: AsyncSession) -> int: + return int(await session.scalar(text(""" + SELECT COUNT(*) FROM processing_queue + WHERE stage = 'classify' AND status IN ('pending', 'processing') + """)) or 0) + + +async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int) -> int: + """도메인 조건 + NULL tier 문서 limit 건 classify 큐에 enqueue. 반환 = 실제 enqueue 수.""" + sql = text(f""" + INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts) + SELECT id, 'classify', 'pending', 0, 3 + FROM documents + WHERE deleted_at IS NULL + AND extracted_text IS NOT NULL + AND ai_analysis_tier IS NULL + AND {filter_clause} + ORDER BY created_at DESC + LIMIT :n + ON CONFLICT DO NOTHING + RETURNING document_id + """) + result = await session.execute(sql, {"n": limit}) + return len(result.all()) + + +async def run() -> None: + """APScheduler 가 30분 주기로 호출. 야간만 실제 작업.""" + now = datetime.now(tz=ZoneInfo("Asia/Seoul")) + hour = now.hour + + # off-switch + if not getattr(getattr(settings.ai, "tier_backfill", None), "enabled", True): + return + + # 야간 시간대 아니면 skip (조용히) + if not (NIGHT_START_HOUR <= hour < NIGHT_END_HOUR): + return + + async with async_session() as session: + pending = await _classify_pending(session) + if pending >= QUEUE_SKIP_THRESHOLD: + logger.info( + f"[tier_backfill] skip — classify 큐 {pending} >= {QUEUE_SKIP_THRESHOLD} " + f"(MLX 부하 보호)" + ) + return + + remaining = BATCH_SIZE + total_enqueued = 0 + for domain_name, filter_clause in DOMAIN_PRIORITY: + if remaining <= 0: + break + try: + n = await _enqueue_domain(session, filter_clause, remaining) + except Exception as exc: + logger.exception( + f"[tier_backfill] enqueue 실패 domain={domain_name}: {exc}" + ) + continue + if n > 0: + logger.info(f"[tier_backfill] enqueue {n} 건 from domain={domain_name}") + total_enqueued += n + remaining -= n + + if total_enqueued == 0: + logger.info("[tier_backfill] 소진 — 우선순위 3 도메인 모두 0 건") + else: + await session.commit() + logger.info( + f"[tier_backfill] batch 완료 enqueued={total_enqueued} " + f"hour={hour} kst now={now.isoformat()}" + )