feat(ops): 야간 auto tier 백필 스케줄러 (PR-B 레거시 해소)

6720건 레거시 문서를 야간에 자동으로 tier triage + deep_summary 처리.

app/workers/tier_backfill.py (신규):
- APScheduler 30분 주기 트리거. KST 00:00~06:00 시간대만 실제 enqueue.
- safety > law > manual 우선순위 25건씩 classify 큐 재투입.
- classify 큐 40건 이상 쌓여있으면 MLX 부하 보호로 skip.
- drive_sync / memo / news 는 제외 (plan 스코프 밖 또는 가치 낮음).
- off-switch: settings.ai.tier_backfill.enabled = false 로 전면 중단 가능.

app/main.py lifespan:
- scheduler.add_job(tier_backfill_run, interval=30min, id='tier_backfill').
- AsyncIOScheduler 이미 timezone='Asia/Seoul' 로 설정돼 tier_backfill 내부의
  zoneinfo('Asia/Seoul') 와 일치.

수치 예상: 야간 6시간 × 2회/시간 × 25건 = 150건/야간.
6720 / 150 = 약 45일이면 전체 레거시 소화.
MLX 부하 제어가 가장 강한 관심 — R2 backlog guard 와 중복 안전장치.

운영 중 과부하 감지 시: config.yaml 에 `ai.tier_backfill.enabled: false` 만
넣으면 즉시 정지 (재시작 없이 스케줄러가 매번 체크).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-24 15:28:28 +09:00
parent 814882a0fe
commit a95294ff42
2 changed files with 129 additions and 0 deletions
+4
View File
@@ -38,6 +38,7 @@ async def lifespan(app: FastAPI):
from workers.mailplus_archive import run as mailplus_run
from workers.news_collector import run as news_collector_run
from workers.queue_consumer import consume_queue
from workers.tier_backfill import run as tier_backfill_run
from workers.upload_cleanup import cleanup_orphan_uploads
# 시작: DB 연결 확인
@@ -58,6 +59,9 @@ async def lifespan(app: FastAPI):
scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer")
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup")
# PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue.
# safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화.
scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill")
# 일일 스케줄 (KST)
scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor")
scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning")
+125
View File
@@ -0,0 +1,125 @@
"""PR-B B-1 레거시 문서 tier triage 야간 자동 백필 스케줄러.
plan: ~/.claude/plans/swirling-swimming-liskov.md — 백필 장기 운영.
매 30분마다 트리거되어 (KST 00:00~06:00 시간대에만 실제 enqueue):
1. 우선순위 도메인별 NULL 문서 25건씩 classify 큐 재투입
2. 우선순위: safety > law > manual
(drive_sync / memo / news 는 별도 판단 — 본 스케줄러 제외)
3. classify 큐가 이미 많으면 스킵 (MLX 부하 보호)
사유:
- 6720건 레거시 수동 처리 부담 (170h MLX 점유).
- 야간 0~6시 사용자 활동 낮은 시간대 활용.
- 30분 간격 × 25건 = 야간당 150건. 6720 / 150 = 45일이면 전체 소화.
- 한 번에 너무 많이 enqueue 하면 R2 backlog guard 가 soft escalate 를 과도 suppress.
운영 off-switch:
settings.ai.tier_backfill.enabled = false 면 전면 중단 (config.yaml 에서).
"""
from __future__ import annotations
from datetime import datetime
from zoneinfo import ZoneInfo
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
logger = setup_logger("tier_backfill")
# 야간 운영 시간 (KST, Asia/Seoul). 이 창 바깥에서는 스케줄러가 돌아도 skip.
NIGHT_START_HOUR = 0 # 00:00 KST 부터
NIGHT_END_HOUR = 6 # 06:00 KST 까지 (end exclusive)
# 한 번 트리거 시 최대 enqueue. MLX 단일 Semaphore 보호.
BATCH_SIZE = 25
# classify 큐가 이 이상 쌓여있으면 skip (파이프 적체 방지)
QUEUE_SKIP_THRESHOLD = 40
# 우선순위 도메인 (첫 번째가 후보 먼저 소진)
DOMAIN_PRIORITY: list[tuple[str, str]] = [
("safety", "ai_domain LIKE 'Industrial_Safety%'"),
("law", "source_channel = 'law_monitor'"),
("manual", "source_channel = 'manual'"),
]
async def _classify_pending(session: AsyncSession) -> int:
return int(await session.scalar(text("""
SELECT COUNT(*) FROM processing_queue
WHERE stage = 'classify' AND status IN ('pending', 'processing')
""")) or 0)
async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int) -> int:
"""도메인 조건 + NULL tier 문서 limit 건 classify 큐에 enqueue. 반환 = 실제 enqueue 수."""
sql = text(f"""
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
SELECT id, 'classify', 'pending', 0, 3
FROM documents
WHERE deleted_at IS NULL
AND extracted_text IS NOT NULL
AND ai_analysis_tier IS NULL
AND {filter_clause}
ORDER BY created_at DESC
LIMIT :n
ON CONFLICT DO NOTHING
RETURNING document_id
""")
result = await session.execute(sql, {"n": limit})
return len(result.all())
async def run() -> None:
"""APScheduler 가 30분 주기로 호출. 야간만 실제 작업."""
now = datetime.now(tz=ZoneInfo("Asia/Seoul"))
hour = now.hour
# off-switch
if not getattr(getattr(settings.ai, "tier_backfill", None), "enabled", True):
return
# 야간 시간대 아니면 skip (조용히)
if not (NIGHT_START_HOUR <= hour < NIGHT_END_HOUR):
return
async with async_session() as session:
pending = await _classify_pending(session)
if pending >= QUEUE_SKIP_THRESHOLD:
logger.info(
f"[tier_backfill] skip — classify 큐 {pending} >= {QUEUE_SKIP_THRESHOLD} "
f"(MLX 부하 보호)"
)
return
remaining = BATCH_SIZE
total_enqueued = 0
for domain_name, filter_clause in DOMAIN_PRIORITY:
if remaining <= 0:
break
try:
n = await _enqueue_domain(session, filter_clause, remaining)
except Exception as exc:
logger.exception(
f"[tier_backfill] enqueue 실패 domain={domain_name}: {exc}"
)
continue
if n > 0:
logger.info(f"[tier_backfill] enqueue {n} 건 from domain={domain_name}")
total_enqueued += n
remaining -= n
if total_enqueued == 0:
logger.info("[tier_backfill] 소진 — 우선순위 3 도메인 모두 0 건")
else:
await session.commit()
logger.info(
f"[tier_backfill] batch 완료 enqueued={total_enqueued} "
f"hour={hour} kst now={now.isoformat()}"
)