Files
hyungi_document_server/app/workers/tier_backfill.py
T
Hyungi Ahn c6335c9a1e fix(classify): law_monitor skip 분기 복원 + tier_backfill law 제외
PR-B refactor 과정에서 e88640d 의 process() 진입부 source_channel='law_monitor'
skip 분기가 사라져 매일 07:00 신규 법령 분할마다 26B legacy classify(8s) +
26B legacy summarize(10s) + 4B triage(1.5s) 전부 호출되고 있었다.

법령 분리 PR (stateless-churning-raccoon) 의 명제:
  "법령은 외부 source-of-truth + immutable + 자동 재수집 → 다른 수명주기"
와 일치하도록 process() 진입부에 skip 분기 복원. 최소 필드 (ai_domain='법령',
ai_tags=['법령'], importance='medium') 만 세팅 후 return. queue_consumer 의
NEXT_STAGES['classify']=['embed','chunk'] 가 자동 chain 하므로 검색 영향 0.

법령 도메인 AI 산출물 가치 분석:
  - ai_summary: 법령 해석 환각 위험 (ASME/안전 엔지니어 사고 책임 소지)
  - ai_tldr/bullets: 이미 title 이 같은 정보 노출 — redundant
  - ai_inconsistencies: 공식 정합 문서라 100% false positive
  → 비용 (월 ~14분 26B 점유) 대비 가치 음수, skip 합당.

tier_backfill.py 도 함께 수정:
  - DOMAIN_PRIORITY 에서 ('law', source_channel='law_monitor') 항목 제거
  - safety 필터에 source_channel != 'law_monitor' 추가 (기존 ai_domain LIKE
    'Industrial_Safety%' 매칭 안에 backfill 기 처리한 법령 doc 들이 잡혀
    들어가는 case 차단)
  - 사유: skip 처리될 doc 을 enqueue 하면 야간마다 enqueue→skip→NULL→
    enqueue 무한 루프

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-27 07:35:27 +09:00

129 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""PR-B B-1 레거시 문서 tier triage 야간 자동 백필 스케줄러.
plan: ~/.claude/plans/swirling-swimming-liskov.md — 백필 장기 운영.
매 30분마다 트리거되어 (KST 00:00~06:00 시간대에만 실제 enqueue):
1. 우선순위 도메인별 NULL 문서 25건씩 classify 큐 재투입
2. 우선순위: safety > manual
(drive_sync / memo / news / law_monitor 는 본 스케줄러 제외)
- news/memo: 분야 확정, classify 무가치 (legacy 결정)
- law_monitor: classify_worker 가 진입 시 skip 처리 (plan stateless-churning-raccoon.md).
backfill 에서 enqueue 해도 skip 만 반복되므로 시작부터 제외.
3. classify 큐가 이미 많으면 스킵 (MLX 부하 보호)
사유:
- 6720건 레거시 수동 처리 부담 (170h MLX 점유).
- 야간 0~6시 사용자 활동 낮은 시간대 활용.
- 30분 간격 × 25건 = 야간당 150건. 6720 / 150 = 45일이면 전체 소화.
- 한 번에 너무 많이 enqueue 하면 R2 backlog guard 가 soft escalate 를 과도 suppress.
운영 off-switch:
settings.ai.tier_backfill.enabled = false 면 전면 중단 (config.yaml 에서).
"""
from __future__ import annotations
from datetime import datetime
from zoneinfo import ZoneInfo
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
logger = setup_logger("tier_backfill")
# 야간 운영 시간 (KST, Asia/Seoul). 이 창 바깥에서는 스케줄러가 돌아도 skip.
NIGHT_START_HOUR = 0 # 00:00 KST 부터
NIGHT_END_HOUR = 6 # 06:00 KST 까지 (end exclusive)
# 한 번 트리거 시 최대 enqueue. MLX 단일 Semaphore 보호.
BATCH_SIZE = 25
# classify 큐가 이 이상 쌓여있으면 skip (파이프 적체 방지)
QUEUE_SKIP_THRESHOLD = 40
# 우선순위 도메인 (첫 번째가 후보 먼저 소진)
# law_monitor 제외: classify_worker 가 진입 시 skip — backfill 무한 루프 방지.
DOMAIN_PRIORITY: list[tuple[str, str]] = [
("safety", "ai_domain LIKE 'Industrial_Safety%' AND source_channel != 'law_monitor'"),
("manual", "source_channel = 'manual'"),
]
async def _classify_pending(session: AsyncSession) -> int:
return int(await session.scalar(text("""
SELECT COUNT(*) FROM processing_queue
WHERE stage = 'classify' AND status IN ('pending', 'processing')
""")) or 0)
async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int) -> int:
"""도메인 조건 + NULL tier 문서 limit 건 classify 큐에 enqueue. 반환 = 실제 enqueue 수."""
sql = text(f"""
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
SELECT id, 'classify', 'pending', 0, 3
FROM documents
WHERE deleted_at IS NULL
AND extracted_text IS NOT NULL
AND ai_analysis_tier IS NULL
AND {filter_clause}
ORDER BY created_at DESC
LIMIT :n
ON CONFLICT DO NOTHING
RETURNING document_id
""")
result = await session.execute(sql, {"n": limit})
return len(result.all())
async def run() -> None:
"""APScheduler 가 30분 주기로 호출. 야간만 실제 작업."""
now = datetime.now(tz=ZoneInfo("Asia/Seoul"))
hour = now.hour
# off-switch
if not getattr(getattr(settings.ai, "tier_backfill", None), "enabled", True):
return
# 야간 시간대 아니면 skip (조용히)
if not (NIGHT_START_HOUR <= hour < NIGHT_END_HOUR):
return
async with async_session() as session:
pending = await _classify_pending(session)
if pending >= QUEUE_SKIP_THRESHOLD:
logger.info(
f"[tier_backfill] skip — classify 큐 {pending} >= {QUEUE_SKIP_THRESHOLD} "
f"(MLX 부하 보호)"
)
return
remaining = BATCH_SIZE
total_enqueued = 0
for domain_name, filter_clause in DOMAIN_PRIORITY:
if remaining <= 0:
break
try:
n = await _enqueue_domain(session, filter_clause, remaining)
except Exception as exc:
logger.exception(
f"[tier_backfill] enqueue 실패 domain={domain_name}: {exc}"
)
continue
if n > 0:
logger.info(f"[tier_backfill] enqueue {n} 건 from domain={domain_name}")
total_enqueued += n
remaining -= n
if total_enqueued == 0:
logger.info("[tier_backfill] 소진 — 우선순위 3 도메인 모두 0 건")
else:
await session.commit()
logger.info(
f"[tier_backfill] batch 완료 enqueued={total_enqueued} "
f"hour={hour} kst now={now.isoformat()}"
)