690b22fe58
- news.collect: locked() 체크 후 실제 acquire 가 별도 task 안에서 일어나 그 사이 다른 요청이 끼어들어 이중 수집 task 가 생기던 TOCTOU. 핸들러에서 동기 acquire + task finally release 로 원자화. - tier_backfill._enqueue_domain: filter_clause 가 SQL 에 직접 보간되나 allowlist 가드 부재 (retrieval_service _VALID_DOCS_TABLE 정본 대비 비대칭). DOMAIN_PRIORITY 출처 allowlist final gate 추가 — 현재 모듈 상수라 injection 0 이나 외부 입력화 시 즉시 차단. 검증: py_compile 통과. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
142 lines
5.6 KiB
Python
142 lines
5.6 KiB
Python
"""PR-B B-1 레거시 문서 tier triage 야간 자동 백필 스케줄러.
|
||
|
||
plan: ~/.claude/plans/swirling-swimming-liskov.md — 백필 장기 운영.
|
||
|
||
매 30분마다 트리거되어 (KST 00:00~06:00 시간대에만 실제 enqueue):
|
||
1. 우선순위 도메인별 NULL 문서 25건씩 classify 큐 재투입
|
||
2. 우선순위: safety > manual
|
||
(drive_sync / memo / news / law_monitor 는 본 스케줄러 제외)
|
||
- news/memo: 분야 확정, classify 무가치 (legacy 결정)
|
||
- law_monitor: classify_worker 가 진입 시 skip 처리 (plan stateless-churning-raccoon.md).
|
||
backfill 에서 enqueue 해도 skip 만 반복되므로 시작부터 제외.
|
||
3. classify 큐가 이미 많으면 스킵 (MLX 부하 보호)
|
||
|
||
사유:
|
||
- 6720건 레거시 수동 처리 부담 (170h MLX 점유).
|
||
- 야간 0~6시 사용자 활동 낮은 시간대 활용.
|
||
- 30분 간격 × 25건 = 야간당 150건. 6720 / 150 = 45일이면 전체 소화.
|
||
- 한 번에 너무 많이 enqueue 하면 R2 backlog guard 가 soft escalate 를 과도 suppress.
|
||
|
||
운영 off-switch:
|
||
settings.ai.tier_backfill.enabled = false 면 전면 중단 (config.yaml 에서).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from sqlalchemy import text
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from core.config import settings
|
||
from core.database import async_session
|
||
from core.utils import setup_logger
|
||
|
||
logger = setup_logger("tier_backfill")
|
||
|
||
# 야간 운영 시간 (KST, Asia/Seoul). 이 창 바깥에서는 스케줄러가 돌아도 skip.
|
||
NIGHT_START_HOUR = 0 # 00:00 KST 부터
|
||
NIGHT_END_HOUR = 6 # 06:00 KST 까지 (end exclusive)
|
||
|
||
# 한 번 트리거 시 최대 enqueue. MLX 단일 Semaphore 보호.
|
||
BATCH_SIZE = 25
|
||
|
||
# classify 큐가 이 이상 쌓여있으면 skip (파이프 적체 방지)
|
||
QUEUE_SKIP_THRESHOLD = 40
|
||
|
||
# 우선순위 도메인 (첫 번째가 후보 먼저 소진)
|
||
# law_monitor 제외: classify_worker 가 진입 시 skip — backfill 무한 루프 방지.
|
||
DOMAIN_PRIORITY: list[tuple[str, str]] = [
|
||
("safety", "ai_domain LIKE 'Industrial_Safety%' AND source_channel != 'law_monitor'"),
|
||
("manual", "source_channel = 'manual'"),
|
||
]
|
||
|
||
# R12: filter_clause 는 SQL 에 직접 보간되므로 이 allowlist(DOMAIN_PRIORITY 출처) 통과분만
|
||
# 허용 — 현재 모듈 상수라 injection 경로 0 이나, 외부 입력화 시 즉시 차단하는 final gate
|
||
# (retrieval_service 의 _VALID_DOCS_TABLE allowlist 정본 대비 비대칭 해소).
|
||
_ALLOWED_FILTER_CLAUSES: frozenset[str] = frozenset(c for _, c in DOMAIN_PRIORITY)
|
||
|
||
|
||
async def _classify_pending(session: AsyncSession) -> int:
|
||
return int(await session.scalar(text("""
|
||
SELECT COUNT(*) FROM processing_queue
|
||
WHERE stage = 'classify' AND status IN ('pending', 'processing')
|
||
""")) or 0)
|
||
|
||
|
||
async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int) -> int:
|
||
"""도메인 조건 + NULL tier 문서 limit 건 classify 큐에 enqueue. 반환 = 실제 enqueue 수.
|
||
|
||
extracted_text 빈 문자열 (LENGTH=0) 도 제외 — classify_worker 는 not doc.extracted_text
|
||
truthy 체크라 빈 문자열에서 ValueError raise. 무한 retry 루프 방지.
|
||
"""
|
||
# R12: SQL 직접 보간 전 allowlist final gate.
|
||
if filter_clause not in _ALLOWED_FILTER_CLAUSES:
|
||
raise ValueError(f"비허용 filter_clause (allowlist 외): {filter_clause!r}")
|
||
sql = text(f"""
|
||
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
|
||
SELECT id, 'classify', 'pending', 0, 3
|
||
FROM documents
|
||
WHERE deleted_at IS NULL
|
||
AND extracted_text IS NOT NULL
|
||
AND LENGTH(extracted_text) > 0
|
||
AND ai_analysis_tier IS NULL
|
||
AND {filter_clause}
|
||
ORDER BY created_at DESC
|
||
LIMIT :n
|
||
ON CONFLICT DO NOTHING
|
||
RETURNING document_id
|
||
""")
|
||
result = await session.execute(sql, {"n": limit})
|
||
return len(result.all())
|
||
|
||
|
||
async def run() -> None:
|
||
"""APScheduler 가 30분 주기로 호출. 야간만 실제 작업."""
|
||
now = datetime.now(tz=ZoneInfo("Asia/Seoul"))
|
||
hour = now.hour
|
||
|
||
# off-switch
|
||
if not getattr(getattr(settings.ai, "tier_backfill", None), "enabled", True):
|
||
return
|
||
|
||
# 야간 시간대 아니면 skip (조용히)
|
||
if not (NIGHT_START_HOUR <= hour < NIGHT_END_HOUR):
|
||
return
|
||
|
||
async with async_session() as session:
|
||
pending = await _classify_pending(session)
|
||
if pending >= QUEUE_SKIP_THRESHOLD:
|
||
logger.info(
|
||
f"[tier_backfill] skip — classify 큐 {pending} >= {QUEUE_SKIP_THRESHOLD} "
|
||
f"(MLX 부하 보호)"
|
||
)
|
||
return
|
||
|
||
remaining = BATCH_SIZE
|
||
total_enqueued = 0
|
||
for domain_name, filter_clause in DOMAIN_PRIORITY:
|
||
if remaining <= 0:
|
||
break
|
||
try:
|
||
n = await _enqueue_domain(session, filter_clause, remaining)
|
||
except Exception as exc:
|
||
logger.exception(
|
||
f"[tier_backfill] enqueue 실패 domain={domain_name}: {exc}"
|
||
)
|
||
continue
|
||
if n > 0:
|
||
logger.info(f"[tier_backfill] enqueue {n} 건 from domain={domain_name}")
|
||
total_enqueued += n
|
||
remaining -= n
|
||
|
||
if total_enqueued == 0:
|
||
logger.info("[tier_backfill] 소진 — 우선순위 3 도메인 모두 0 건")
|
||
else:
|
||
await session.commit()
|
||
logger.info(
|
||
f"[tier_backfill] batch 완료 enqueued={total_enqueued} "
|
||
f"hour={hour} kst now={now.isoformat()}"
|
||
)
|