fix(ai): B-1 subject_domain 매칭 + RoutingDecision.escalate_to_26b 존중

실측 발견 (safety md 8건 tier triage 결과):
1. **분류 오분류**: 본문에 "MSDS" 한 번 스쳐도 msds 도메인 매칭됨.
   개인보호구/중대재해/밀폐공간/산업안전보건법 전부 msds 로 잘못 판정.
2. **RoutingDecision 무시**: PR-A domain_policy 의 high_impact=true 와
   risk_flag_requires_26b 때문에 RoutingDecision.escalate_to_26b=True 이지만
   내 _classify_escalation_reason 이 이걸 안 봐서 escalate=False 로 마감.
   safety/msds/hazard_specific 전부 4B 만 돌고 26B 정책 우회.

수정:
- _match_subject_domain: (a) title 기반 매칭 우선 추가 — 파일명이 의도의
  1차 시그널. (b) 본문 키워드는 **2회 이상 등장**해야 match (single-mention
  오분류 방지). 우선순위도 재배열 (msds 맨 앞 → hazard/safety 뒤로).
- _classify_escalation_reason: routing_decision 파라미터 추가. 4B 자체
  판정 (long_context / low_confidence / self_declare / deep_requested)
  이후 PR-A routing_decision.escalate_to_26b 가 True 이면 그 escalation_reasons
  중 "high_impact" 외의 구체 사유(risk_flag_requires_26b 등) 를 채택.
- _run_tier_triage: routing_decision 을 먼저 계산하여 _classify_escalation_reason
  에 전달. _apply_triage_result 는 routing_decision 을 param 으로 받음
  (중복 계산 제거).

이 변경 후 safety/msds/hazard_specific/incident_report 도메인 문서는 항상
26B escalate → deep_summary 큐. MLX 부하 증가하지만 plan 의도대로 정책 준수.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-24 11:18:59 +09:00
parent f872e4666f
commit 165b00f917
+125 -55
View File
@@ -123,13 +123,15 @@ def _validate_domain(domain: str) -> str:
# ───────────────────────── B-1 subject_domain 매칭 ─────────────────────
def _match_subject_domain(doc: Document, text: str) -> str:
"""본문/메타 기반으로 domain_policy.yaml 의 subject_domain 이름 결정.
"""본문/메타/제목 기반으로 domain_policy.yaml 의 subject_domain 이름 결정.
매칭 우선순위 (feedback_category_vs_ai_domain_axis.md — category 매칭 금지):
1. source_channel (news / law_monitor / memo)
2. 본문 keywords (첫 8k, 대소문자 무시)
3. ai_domain prefix — legacy classify 가 채운 taxonomy path
4. generic (fallback)
2. **제목(title) 기반 매칭** — 본문에 키워드가 한두 번 스치는 문서까지 잘못 잡지 않도록
제목의 의도를 우선 (예: PPE 선정기준.md 가 본문 MSDS 언급 한 번으로 msds 분류되는 문제 방지)
3. 본문 keywords (첫 8k, 대소문자 무시, 키워드 **2회 이상**)
4. ai_domain prefix — legacy classify 가 채운 taxonomy path
5. generic (fallback)
"""
sc = (doc.source_channel or "").lower()
if sc in ("news", "news_collector"):
@@ -139,23 +141,43 @@ def _match_subject_domain(doc: Document, text: str) -> str:
if sc == "memo":
return "generic"
# 제목 기반 매칭 — 우선순위 가장 높음. title 은 사용자 의도 시그널이라 정확.
title = (doc.title or "").lower()
def _title_hit(words: list[str]) -> bool:
return any(w.lower() in title for w in words if w)
if _title_hit(["산업안전보건법", "산안법", "중처법", "안전보건법", "시행규칙", "시행령"]):
return "safety_reference"
if _title_hit(["사고", "재해", "재발방지", "원인분석"]):
return "incident_report"
if _title_hit(["건강검진", "작업환경측정", "특수건강", "보건관리"]):
return "health_record"
if _title_hit(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "화기작업"]):
return "hazard_specific"
if _title_hit(["msds", "sds", "물질안전보건자료", "화학물질"]):
return "msds"
if _title_hit(["위험성평가", "작업허가", "jsa", "sop", "ptw", "ppe", "보호구", "안전작업"]):
return "safety_operational"
# 본문 keyword — 2회 이상 등장해야 도메인 매칭 (single-mention 오분류 방지)
head = (text or "")[:8000].lower()
def _hit(keywords: list[str]) -> bool:
return any(kw.lower() in head for kw in keywords if kw)
def _body_count(keywords: list[str]) -> int:
return sum(head.count(kw.lower()) for kw in keywords if kw)
# 구체 먼저
if _hit(["MSDS", "SDS", "물질안전보건자료", "화학물질", "유해화학물질"]):
# 우선순위: 구체 먼저
if _body_count(["msds", "sds", "물질안전보건자료"]) >= 2:
return "msds"
if _hit(["사고보고", "재해조사", "원인분석", "재발방지", "중대재해"]):
if _body_count(["사고보고", "재해조사", "원인분석", "재발방지", "중대재해"]) >= 2:
return "incident_report"
if _hit(["건강검진", "작업환경측정", "보건관리", "특수건강진단"]):
if _body_count(["건강검진", "작업환경측정", "보건관리", "특수건강진단"]) >= 2:
return "health_record"
if _hit(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "중독", "질식"]):
if _body_count(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "중독", "질식"]) >= 2:
return "hazard_specific"
if _hit(["위험성평가", "작업허가서", "JSA", "안전작업지침", "보호구", "SOP"]):
if _body_count(["위험성평가", "작업허가서", "jsa", "sop", "보호구"]) >= 2:
return "safety_operational"
if _hit(["산업안전보건법", "중대재해", "안전보건관리체계", "유해위험방지계획서"]):
if _body_count(["산업안전보건법", "안전보건관리체계", "유해위험방지계획서"]) >= 2:
return "safety_reference"
if (doc.ai_domain or "").startswith("Industrial_Safety"):
@@ -218,8 +240,17 @@ def _classify_escalation_reason(
input_chars: int,
triage_limit: int,
confidence_floor: float,
routing_decision=None,
) -> str | None:
"""escalate_to_26b 가 True 이도록 만든 근본 사유 하나를 반환. 아니면 None."""
"""escalate_to_26b 가 True 이도록 만든 근본 사유 하나를 반환. 아니면 None.
우선순위 (높은 것부터 먼저 매칭):
1. long_context (입력 초과) — hard escalate
2. low_confidence (4B 자체 신뢰도 낮음) — hard escalate
3. self_declare (4B 가 자기 상한 선언) — soft
4. deep_requested (4B 가 recommend_deep_summary=true) — soft
5. PR-A routing policy (high_impact / risk_flag_requires_26b / multi_doc) — soft
"""
if input_chars > triage_limit:
return "long_context"
if triage_out.confidence < confidence_floor:
@@ -228,6 +259,19 @@ def _classify_escalation_reason(
return "self_declare"
if triage_out.recommend_deep_summary:
return "deep_requested"
# PR-A 정책 반영 — domain_policy.yaml 의 high_impact=true / default_risk_flags 의
# requires_26b=true 때문에 decide_routing 이 escalate_to_26b=True 로 판정하면
# 그 첫 reason 을 사용. safety_reference/msds/hazard_specific/incident_report 등
# safety/health 도메인은 여기서 escalate 됨.
if routing_decision and routing_decision.escalate_to_26b:
reasons = routing_decision.escalation_reasons
if reasons:
# high_impact 는 모든 safety 도메인에 공통이라 너무 흔함. 더 구체적인 사유를 우선.
for r in reasons:
if r != "high_impact":
return r
return reasons[0]
return "policy_escalate"
return None
@@ -367,6 +411,20 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
# 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate
if input_chars > TRIAGE_TEXT_LIMIT:
# routing_decision 은 long_context 경로에선 content_chars 로 바로 판정
rd = None
try:
rd = decide_routing(
subject_domain=subject_domain,
content_chars=input_chars,
deterministic_keyword_hits=(),
self_declared_high_impact=False,
self_declared_risk_flags=(),
confidence=0.3,
evidence_doc_count=0,
)
except Exception:
pass
await _apply_triage_result(
doc=doc,
session=session,
@@ -376,6 +434,7 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
latency_ms=0,
escalation_reason="long_context",
parse_error=None,
routing_decision=rd,
)
return
@@ -411,48 +470,9 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio
"escalate_to_26b": True,
"confidence": 0.3,
})
escalation_reason: str | None = "triage_json_invalid"
else:
escalation_reason = _classify_escalation_reason(
triage_out,
input_chars=input_chars,
triage_limit=TRIAGE_TEXT_LIMIT,
confidence_floor=0.6,
)
await _apply_triage_result(
doc=doc,
session=session,
triage_out=triage_out,
subject_domain=subject_domain,
input_chars=input_chars,
latency_ms=latency_ms,
escalation_reason=escalation_reason,
parse_error=parse_error,
)
async def _apply_triage_result(
*,
doc: Document,
session: AsyncSession,
triage_out: TriageOutput,
subject_domain: str,
input_chars: int,
latency_ms: int,
escalation_reason: str | None,
parse_error: str | None,
) -> None:
"""TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit."""
document_id = doc.id
# Document 필드 (파싱 실패 시에는 legacy ai_summary 가 노출되도록 tldr 비움)
if not parse_error:
doc.ai_tldr = (triage_out.tldr or "").strip() or None
doc.ai_bullets = triage_out.bullets or []
doc.ai_analysis_tier = "triage"
# decide_routing — shadow observability 용 (INV 체크)
# decide_routing 먼저 계산 — _classify_escalation_reason 이 PR-A high_impact / risk_flag
# 결정을 존중하도록 routing_decision 을 전달.
routing_decision = None
try:
routing_decision = decide_routing(
@@ -467,6 +487,56 @@ async def _apply_triage_result(
except Exception as exc:
logger.warning(f"[triage] decide_routing 실패 id={document_id}: {exc}")
if parse_error:
escalation_reason: str | None = "triage_json_invalid"
else:
escalation_reason = _classify_escalation_reason(
triage_out,
input_chars=input_chars,
triage_limit=TRIAGE_TEXT_LIMIT,
confidence_floor=0.6,
routing_decision=routing_decision,
)
await _apply_triage_result(
doc=doc,
session=session,
triage_out=triage_out,
subject_domain=subject_domain,
input_chars=input_chars,
latency_ms=latency_ms,
escalation_reason=escalation_reason,
parse_error=parse_error,
routing_decision=routing_decision,
)
async def _apply_triage_result(
*,
doc: Document,
session: AsyncSession,
triage_out: TriageOutput,
subject_domain: str,
input_chars: int,
latency_ms: int,
escalation_reason: str | None,
parse_error: str | None,
routing_decision=None,
) -> None:
"""TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit.
routing_decision 은 호출자(_run_tier_triage) 가 이미 계산해 전달. 여기서 다시
계산하지 않는다 (escalation_reason 판정이 routing_decision 결과에 의존하므로
양쪽이 같은 값을 봐야 함).
"""
document_id = doc.id
# Document 필드 (파싱 실패 시에는 legacy ai_summary 가 노출되도록 tldr 비움)
if not parse_error:
doc.ai_tldr = (triage_out.tldr or "").strip() or None
doc.ai_bullets = triage_out.bullets or []
doc.ai_analysis_tier = "triage"
# R2 — backlog guard (hard 제외 soft escalate 만 억제)
suppressed_reason: str | None = None
escalate = escalation_reason is not None