diff --git a/app/workers/classify_worker.py b/app/workers/classify_worker.py index f8be607..1533b02 100644 --- a/app/workers/classify_worker.py +++ b/app/workers/classify_worker.py @@ -123,13 +123,15 @@ def _validate_domain(domain: str) -> str: # ───────────────────────── B-1 subject_domain 매칭 ───────────────────── def _match_subject_domain(doc: Document, text: str) -> str: - """본문/메타 기반으로 domain_policy.yaml 의 subject_domain 이름 결정. + """본문/메타/제목 기반으로 domain_policy.yaml 의 subject_domain 이름 결정. 매칭 우선순위 (feedback_category_vs_ai_domain_axis.md — category 매칭 금지): 1. source_channel (news / law_monitor / memo) - 2. 본문 keywords (첫 8k, 대소문자 무시) - 3. ai_domain prefix — legacy classify 가 채운 taxonomy path - 4. generic (fallback) + 2. **제목(title) 기반 매칭** — 본문에 키워드가 한두 번 스치는 문서까지 잘못 잡지 않도록 + 제목의 의도를 우선 (예: PPE 선정기준.md 가 본문 MSDS 언급 한 번으로 msds 분류되는 문제 방지) + 3. 본문 keywords (첫 8k, 대소문자 무시, 키워드 **2회 이상**) + 4. ai_domain prefix — legacy classify 가 채운 taxonomy path + 5. generic (fallback) """ sc = (doc.source_channel or "").lower() if sc in ("news", "news_collector"): @@ -139,23 +141,43 @@ def _match_subject_domain(doc: Document, text: str) -> str: if sc == "memo": return "generic" + # 제목 기반 매칭 — 우선순위 가장 높음. title 은 사용자 의도 시그널이라 정확. + title = (doc.title or "").lower() + + def _title_hit(words: list[str]) -> bool: + return any(w.lower() in title for w in words if w) + + if _title_hit(["산업안전보건법", "산안법", "중처법", "안전보건법", "시행규칙", "시행령"]): + return "safety_reference" + if _title_hit(["사고", "재해", "재발방지", "원인분석"]): + return "incident_report" + if _title_hit(["건강검진", "작업환경측정", "특수건강", "보건관리"]): + return "health_record" + if _title_hit(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "화기작업"]): + return "hazard_specific" + if _title_hit(["msds", "sds", "물질안전보건자료", "화학물질"]): + return "msds" + if _title_hit(["위험성평가", "작업허가", "jsa", "sop", "ptw", "ppe", "보호구", "안전작업"]): + return "safety_operational" + + # 본문 keyword — 2회 이상 등장해야 도메인 매칭 (single-mention 오분류 방지) head = (text or "")[:8000].lower() - def _hit(keywords: list[str]) -> bool: - return any(kw.lower() in head for kw in keywords if kw) + def _body_count(keywords: list[str]) -> int: + return sum(head.count(kw.lower()) for kw in keywords if kw) - # 구체 먼저 - if _hit(["MSDS", "SDS", "물질안전보건자료", "화학물질", "유해화학물질"]): + # 우선순위: 구체 먼저 + if _body_count(["msds", "sds", "물질안전보건자료"]) >= 2: return "msds" - if _hit(["사고보고", "재해조사", "원인분석", "재발방지", "중대재해"]): + if _body_count(["사고보고", "재해조사", "원인분석", "재발방지", "중대재해"]) >= 2: return "incident_report" - if _hit(["건강검진", "작업환경측정", "보건관리", "특수건강진단"]): + if _body_count(["건강검진", "작업환경측정", "보건관리", "특수건강진단"]) >= 2: return "health_record" - if _hit(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "중독", "질식"]): + if _body_count(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "중독", "질식"]) >= 2: return "hazard_specific" - if _hit(["위험성평가", "작업허가서", "JSA", "안전작업지침", "보호구", "SOP"]): + if _body_count(["위험성평가", "작업허가서", "jsa", "sop", "보호구"]) >= 2: return "safety_operational" - if _hit(["산업안전보건법", "중대재해", "안전보건관리체계", "유해위험방지계획서"]): + if _body_count(["산업안전보건법", "안전보건관리체계", "유해위험방지계획서"]) >= 2: return "safety_reference" if (doc.ai_domain or "").startswith("Industrial_Safety"): @@ -218,8 +240,17 @@ def _classify_escalation_reason( input_chars: int, triage_limit: int, confidence_floor: float, + routing_decision=None, ) -> str | None: - """escalate_to_26b 가 True 이도록 만든 근본 사유 하나를 반환. 아니면 None.""" + """escalate_to_26b 가 True 이도록 만든 근본 사유 하나를 반환. 아니면 None. + + 우선순위 (높은 것부터 먼저 매칭): + 1. long_context (입력 초과) — hard escalate + 2. low_confidence (4B 자체 신뢰도 낮음) — hard escalate + 3. self_declare (4B 가 자기 상한 선언) — soft + 4. deep_requested (4B 가 recommend_deep_summary=true) — soft + 5. PR-A routing policy (high_impact / risk_flag_requires_26b / multi_doc) — soft + """ if input_chars > triage_limit: return "long_context" if triage_out.confidence < confidence_floor: @@ -228,6 +259,19 @@ def _classify_escalation_reason( return "self_declare" if triage_out.recommend_deep_summary: return "deep_requested" + # PR-A 정책 반영 — domain_policy.yaml 의 high_impact=true / default_risk_flags 의 + # requires_26b=true 때문에 decide_routing 이 escalate_to_26b=True 로 판정하면 + # 그 첫 reason 을 사용. safety_reference/msds/hazard_specific/incident_report 등 + # safety/health 도메인은 여기서 escalate 됨. + if routing_decision and routing_decision.escalate_to_26b: + reasons = routing_decision.escalation_reasons + if reasons: + # high_impact 는 모든 safety 도메인에 공통이라 너무 흔함. 더 구체적인 사유를 우선. + for r in reasons: + if r != "high_impact": + return r + return reasons[0] + return "policy_escalate" return None @@ -367,6 +411,20 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio # 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate if input_chars > TRIAGE_TEXT_LIMIT: + # routing_decision 은 long_context 경로에선 content_chars 로 바로 판정 + rd = None + try: + rd = decide_routing( + subject_domain=subject_domain, + content_chars=input_chars, + deterministic_keyword_hits=(), + self_declared_high_impact=False, + self_declared_risk_flags=(), + confidence=0.3, + evidence_doc_count=0, + ) + except Exception: + pass await _apply_triage_result( doc=doc, session=session, @@ -376,6 +434,7 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio latency_ms=0, escalation_reason="long_context", parse_error=None, + routing_decision=rd, ) return @@ -411,48 +470,9 @@ async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSessio "escalate_to_26b": True, "confidence": 0.3, }) - escalation_reason: str | None = "triage_json_invalid" - else: - escalation_reason = _classify_escalation_reason( - triage_out, - input_chars=input_chars, - triage_limit=TRIAGE_TEXT_LIMIT, - confidence_floor=0.6, - ) - await _apply_triage_result( - doc=doc, - session=session, - triage_out=triage_out, - subject_domain=subject_domain, - input_chars=input_chars, - latency_ms=latency_ms, - escalation_reason=escalation_reason, - parse_error=parse_error, - ) - - -async def _apply_triage_result( - *, - doc: Document, - session: AsyncSession, - triage_out: TriageOutput, - subject_domain: str, - input_chars: int, - latency_ms: int, - escalation_reason: str | None, - parse_error: str | None, -) -> None: - """TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit.""" - document_id = doc.id - - # Document 필드 (파싱 실패 시에는 legacy ai_summary 가 노출되도록 tldr 비움) - if not parse_error: - doc.ai_tldr = (triage_out.tldr or "").strip() or None - doc.ai_bullets = triage_out.bullets or [] - doc.ai_analysis_tier = "triage" - - # decide_routing — shadow observability 용 (INV 체크) + # decide_routing 먼저 계산 — _classify_escalation_reason 이 PR-A high_impact / risk_flag + # 결정을 존중하도록 routing_decision 을 전달. routing_decision = None try: routing_decision = decide_routing( @@ -467,6 +487,56 @@ async def _apply_triage_result( except Exception as exc: logger.warning(f"[triage] decide_routing 실패 id={document_id}: {exc}") + if parse_error: + escalation_reason: str | None = "triage_json_invalid" + else: + escalation_reason = _classify_escalation_reason( + triage_out, + input_chars=input_chars, + triage_limit=TRIAGE_TEXT_LIMIT, + confidence_floor=0.6, + routing_decision=routing_decision, + ) + + await _apply_triage_result( + doc=doc, + session=session, + triage_out=triage_out, + subject_domain=subject_domain, + input_chars=input_chars, + latency_ms=latency_ms, + escalation_reason=escalation_reason, + parse_error=parse_error, + routing_decision=routing_decision, + ) + + +async def _apply_triage_result( + *, + doc: Document, + session: AsyncSession, + triage_out: TriageOutput, + subject_domain: str, + input_chars: int, + latency_ms: int, + escalation_reason: str | None, + parse_error: str | None, + routing_decision=None, +) -> None: + """TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit. + + routing_decision 은 호출자(_run_tier_triage) 가 이미 계산해 전달. 여기서 다시 + 계산하지 않는다 (escalation_reason 판정이 routing_decision 결과에 의존하므로 + 양쪽이 같은 값을 봐야 함). + """ + document_id = doc.id + + # Document 필드 (파싱 실패 시에는 legacy ai_summary 가 노출되도록 tldr 비움) + if not parse_error: + doc.ai_tldr = (triage_out.tldr or "").strip() or None + doc.ai_bullets = triage_out.bullets or [] + doc.ai_analysis_tier = "triage" + # R2 — backlog guard (hard 제외 soft escalate 만 억제) suppressed_reason: str | None = None escalate = escalation_reason is not None