Files
hyungi_document_server/app/workers/tier_backfill.py
hyungi 690b22fe58 fix(hardening): collect-lock TOCTOU 제거 (R9) + tier_backfill fstring allowlist (R12)
- news.collect: locked() 체크 후 실제 acquire 가 별도 task 안에서 일어나 그 사이 다른 요청이
  끼어들어 이중 수집 task 가 생기던 TOCTOU. 핸들러에서 동기 acquire + task finally release 로 원자화.
- tier_backfill._enqueue_domain: filter_clause 가 SQL 에 직접 보간되나 allowlist 가드 부재
  (retrieval_service _VALID_DOCS_TABLE 정본 대비 비대칭). DOMAIN_PRIORITY 출처 allowlist final
  gate 추가 — 현재 모듈 상수라 injection 0 이나 외부 입력화 시 즉시 차단.

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:07:07 +09:00

142 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""PR-B B-1 레거시 문서 tier triage 야간 자동 백필 스케줄러.
plan: ~/.claude/plans/swirling-swimming-liskov.md — 백필 장기 운영.
매 30분마다 트리거되어 (KST 00:00~06:00 시간대에만 실제 enqueue):
1. 우선순위 도메인별 NULL 문서 25건씩 classify 큐 재투입
2. 우선순위: safety > manual
(drive_sync / memo / news / law_monitor 는 본 스케줄러 제외)
- news/memo: 분야 확정, classify 무가치 (legacy 결정)
- law_monitor: classify_worker 가 진입 시 skip 처리 (plan stateless-churning-raccoon.md).
backfill 에서 enqueue 해도 skip 만 반복되므로 시작부터 제외.
3. classify 큐가 이미 많으면 스킵 (MLX 부하 보호)
사유:
- 6720건 레거시 수동 처리 부담 (170h MLX 점유).
- 야간 0~6시 사용자 활동 낮은 시간대 활용.
- 30분 간격 × 25건 = 야간당 150건. 6720 / 150 = 45일이면 전체 소화.
- 한 번에 너무 많이 enqueue 하면 R2 backlog guard 가 soft escalate 를 과도 suppress.
운영 off-switch:
settings.ai.tier_backfill.enabled = false 면 전면 중단 (config.yaml 에서).
"""
from __future__ import annotations
from datetime import datetime
from zoneinfo import ZoneInfo
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
logger = setup_logger("tier_backfill")
# 야간 운영 시간 (KST, Asia/Seoul). 이 창 바깥에서는 스케줄러가 돌아도 skip.
NIGHT_START_HOUR = 0 # 00:00 KST 부터
NIGHT_END_HOUR = 6 # 06:00 KST 까지 (end exclusive)
# 한 번 트리거 시 최대 enqueue. MLX 단일 Semaphore 보호.
BATCH_SIZE = 25
# classify 큐가 이 이상 쌓여있으면 skip (파이프 적체 방지)
QUEUE_SKIP_THRESHOLD = 40
# 우선순위 도메인 (첫 번째가 후보 먼저 소진)
# law_monitor 제외: classify_worker 가 진입 시 skip — backfill 무한 루프 방지.
DOMAIN_PRIORITY: list[tuple[str, str]] = [
("safety", "ai_domain LIKE 'Industrial_Safety%' AND source_channel != 'law_monitor'"),
("manual", "source_channel = 'manual'"),
]
# R12: filter_clause 는 SQL 에 직접 보간되므로 이 allowlist(DOMAIN_PRIORITY 출처) 통과분만
# 허용 — 현재 모듈 상수라 injection 경로 0 이나, 외부 입력화 시 즉시 차단하는 final gate
# (retrieval_service 의 _VALID_DOCS_TABLE allowlist 정본 대비 비대칭 해소).
_ALLOWED_FILTER_CLAUSES: frozenset[str] = frozenset(c for _, c in DOMAIN_PRIORITY)
async def _classify_pending(session: AsyncSession) -> int:
return int(await session.scalar(text("""
SELECT COUNT(*) FROM processing_queue
WHERE stage = 'classify' AND status IN ('pending', 'processing')
""")) or 0)
async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int) -> int:
"""도메인 조건 + NULL tier 문서 limit 건 classify 큐에 enqueue. 반환 = 실제 enqueue 수.
extracted_text 빈 문자열 (LENGTH=0) 도 제외 — classify_worker 는 not doc.extracted_text
truthy 체크라 빈 문자열에서 ValueError raise. 무한 retry 루프 방지.
"""
# R12: SQL 직접 보간 전 allowlist final gate.
if filter_clause not in _ALLOWED_FILTER_CLAUSES:
raise ValueError(f"비허용 filter_clause (allowlist 외): {filter_clause!r}")
sql = text(f"""
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
SELECT id, 'classify', 'pending', 0, 3
FROM documents
WHERE deleted_at IS NULL
AND extracted_text IS NOT NULL
AND LENGTH(extracted_text) > 0
AND ai_analysis_tier IS NULL
AND {filter_clause}
ORDER BY created_at DESC
LIMIT :n
ON CONFLICT DO NOTHING
RETURNING document_id
""")
result = await session.execute(sql, {"n": limit})
return len(result.all())
async def run() -> None:
"""APScheduler 가 30분 주기로 호출. 야간만 실제 작업."""
now = datetime.now(tz=ZoneInfo("Asia/Seoul"))
hour = now.hour
# off-switch
if not getattr(getattr(settings.ai, "tier_backfill", None), "enabled", True):
return
# 야간 시간대 아니면 skip (조용히)
if not (NIGHT_START_HOUR <= hour < NIGHT_END_HOUR):
return
async with async_session() as session:
pending = await _classify_pending(session)
if pending >= QUEUE_SKIP_THRESHOLD:
logger.info(
f"[tier_backfill] skip — classify 큐 {pending} >= {QUEUE_SKIP_THRESHOLD} "
f"(MLX 부하 보호)"
)
return
remaining = BATCH_SIZE
total_enqueued = 0
for domain_name, filter_clause in DOMAIN_PRIORITY:
if remaining <= 0:
break
try:
n = await _enqueue_domain(session, filter_clause, remaining)
except Exception as exc:
logger.exception(
f"[tier_backfill] enqueue 실패 domain={domain_name}: {exc}"
)
continue
if n > 0:
logger.info(f"[tier_backfill] enqueue {n} 건 from domain={domain_name}")
total_enqueued += n
remaining -= n
if total_enqueued == 0:
logger.info("[tier_backfill] 소진 — 우선순위 3 도메인 모두 0 건")
else:
await session.commit()
logger.info(
f"[tier_backfill] batch 완료 enqueued={total_enqueued} "
f"hour={hour} kst now={now.isoformat()}"
)