95bcdb851b
3일 운영 결과 doc 4811, 5181 가 extracted_text='' (빈 문자열) 인데
IS NOT NULL 만 걸려 enqueue → classify_worker 의 not doc.extracted_text
truthy 체크에서 ValueError → max_attempts(3) 도달 → status=failed.
다음 backfill 사이클에서 다시 enqueue 되어 12회 반복, failed 24건 누적.
수정: tier_backfill.py + backfill_tier.py 양쪽 SQL 에
LENGTH(extracted_text) > 0 추가. 빈 문자열 문서는 enqueue 자체에서 제외.
기존 failed 24건 정리 SQL (사용자가 수동 실행):
DELETE FROM processing_queue
WHERE stage='classify' AND status='failed'
AND error_message LIKE '%extracted_text%';
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
134 lines
5.0 KiB
Python
134 lines
5.0 KiB
Python
"""PR-B B-1 레거시 문서 tier triage 야간 자동 백필 스케줄러.
|
||
|
||
plan: ~/.claude/plans/swirling-swimming-liskov.md — 백필 장기 운영.
|
||
|
||
매 30분마다 트리거되어 (KST 00:00~06:00 시간대에만 실제 enqueue):
|
||
1. 우선순위 도메인별 NULL 문서 25건씩 classify 큐 재투입
|
||
2. 우선순위: safety > manual
|
||
(drive_sync / memo / news / law_monitor 는 본 스케줄러 제외)
|
||
- news/memo: 분야 확정, classify 무가치 (legacy 결정)
|
||
- law_monitor: classify_worker 가 진입 시 skip 처리 (plan stateless-churning-raccoon.md).
|
||
backfill 에서 enqueue 해도 skip 만 반복되므로 시작부터 제외.
|
||
3. classify 큐가 이미 많으면 스킵 (MLX 부하 보호)
|
||
|
||
사유:
|
||
- 6720건 레거시 수동 처리 부담 (170h MLX 점유).
|
||
- 야간 0~6시 사용자 활동 낮은 시간대 활용.
|
||
- 30분 간격 × 25건 = 야간당 150건. 6720 / 150 = 45일이면 전체 소화.
|
||
- 한 번에 너무 많이 enqueue 하면 R2 backlog guard 가 soft escalate 를 과도 suppress.
|
||
|
||
운영 off-switch:
|
||
settings.ai.tier_backfill.enabled = false 면 전면 중단 (config.yaml 에서).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from datetime import datetime
|
||
from zoneinfo import ZoneInfo
|
||
|
||
from sqlalchemy import text
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from core.config import settings
|
||
from core.database import async_session
|
||
from core.utils import setup_logger
|
||
|
||
logger = setup_logger("tier_backfill")
|
||
|
||
# 야간 운영 시간 (KST, Asia/Seoul). 이 창 바깥에서는 스케줄러가 돌아도 skip.
|
||
NIGHT_START_HOUR = 0 # 00:00 KST 부터
|
||
NIGHT_END_HOUR = 6 # 06:00 KST 까지 (end exclusive)
|
||
|
||
# 한 번 트리거 시 최대 enqueue. MLX 단일 Semaphore 보호.
|
||
BATCH_SIZE = 25
|
||
|
||
# classify 큐가 이 이상 쌓여있으면 skip (파이프 적체 방지)
|
||
QUEUE_SKIP_THRESHOLD = 40
|
||
|
||
# 우선순위 도메인 (첫 번째가 후보 먼저 소진)
|
||
# law_monitor 제외: classify_worker 가 진입 시 skip — backfill 무한 루프 방지.
|
||
DOMAIN_PRIORITY: list[tuple[str, str]] = [
|
||
("safety", "ai_domain LIKE 'Industrial_Safety%' AND source_channel != 'law_monitor'"),
|
||
("manual", "source_channel = 'manual'"),
|
||
]
|
||
|
||
|
||
async def _classify_pending(session: AsyncSession) -> int:
|
||
return int(await session.scalar(text("""
|
||
SELECT COUNT(*) FROM processing_queue
|
||
WHERE stage = 'classify' AND status IN ('pending', 'processing')
|
||
""")) or 0)
|
||
|
||
|
||
async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int) -> int:
|
||
"""도메인 조건 + NULL tier 문서 limit 건 classify 큐에 enqueue. 반환 = 실제 enqueue 수.
|
||
|
||
extracted_text 빈 문자열 (LENGTH=0) 도 제외 — classify_worker 는 not doc.extracted_text
|
||
truthy 체크라 빈 문자열에서 ValueError raise. 무한 retry 루프 방지.
|
||
"""
|
||
sql = text(f"""
|
||
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
|
||
SELECT id, 'classify', 'pending', 0, 3
|
||
FROM documents
|
||
WHERE deleted_at IS NULL
|
||
AND extracted_text IS NOT NULL
|
||
AND LENGTH(extracted_text) > 0
|
||
AND ai_analysis_tier IS NULL
|
||
AND {filter_clause}
|
||
ORDER BY created_at DESC
|
||
LIMIT :n
|
||
ON CONFLICT DO NOTHING
|
||
RETURNING document_id
|
||
""")
|
||
result = await session.execute(sql, {"n": limit})
|
||
return len(result.all())
|
||
|
||
|
||
async def run() -> None:
|
||
"""APScheduler 가 30분 주기로 호출. 야간만 실제 작업."""
|
||
now = datetime.now(tz=ZoneInfo("Asia/Seoul"))
|
||
hour = now.hour
|
||
|
||
# off-switch
|
||
if not getattr(getattr(settings.ai, "tier_backfill", None), "enabled", True):
|
||
return
|
||
|
||
# 야간 시간대 아니면 skip (조용히)
|
||
if not (NIGHT_START_HOUR <= hour < NIGHT_END_HOUR):
|
||
return
|
||
|
||
async with async_session() as session:
|
||
pending = await _classify_pending(session)
|
||
if pending >= QUEUE_SKIP_THRESHOLD:
|
||
logger.info(
|
||
f"[tier_backfill] skip — classify 큐 {pending} >= {QUEUE_SKIP_THRESHOLD} "
|
||
f"(MLX 부하 보호)"
|
||
)
|
||
return
|
||
|
||
remaining = BATCH_SIZE
|
||
total_enqueued = 0
|
||
for domain_name, filter_clause in DOMAIN_PRIORITY:
|
||
if remaining <= 0:
|
||
break
|
||
try:
|
||
n = await _enqueue_domain(session, filter_clause, remaining)
|
||
except Exception as exc:
|
||
logger.exception(
|
||
f"[tier_backfill] enqueue 실패 domain={domain_name}: {exc}"
|
||
)
|
||
continue
|
||
if n > 0:
|
||
logger.info(f"[tier_backfill] enqueue {n} 건 from domain={domain_name}")
|
||
total_enqueued += n
|
||
remaining -= n
|
||
|
||
if total_enqueued == 0:
|
||
logger.info("[tier_backfill] 소진 — 우선순위 3 도메인 모두 0 건")
|
||
else:
|
||
await session.commit()
|
||
logger.info(
|
||
f"[tier_backfill] batch 완료 enqueued={total_enqueued} "
|
||
f"hour={hour} kst now={now.isoformat()}"
|
||
)
|