"""AI 분류 워커 — 도메인/문서타입/태그/요약 생성 + PR-B B-1 tier triage. Legacy 경로 (primary 26B 호출): - app/prompts/classify.txt 기반 classify() - AIClient.summarize() 로 ai_summary 생성 - facet_doctype + ai_suggestion (library 제안) 저장 → ai_domain / ai_sub_group / document_type / ai_confidence / ai_tags / ai_summary / ai_suggestion / facet_doctype / importance 필드 PR-B B-1 tier triage (신규, 4B gemma Ollama): - policy.routing.decide_routing 으로 RoutingDecision - policy.prompt_render.render_4b("p3a_short_summary", subject_domain) 로 프롬프트 렌더 - AIClient.call_triage(rendered) 호출 (llm_gate 외부, Ollama concurrent OK) - TriageOutput pydantic validate + JSON 깨짐 시 fallback escalate (R1) - R2 backlog guard: deep_summary 큐 ratio > threshold or pending >= threshold 이면 suppress - R3 head/middle/tail: 260k 초과 시 envelope text_ranges 3조각 → ai_tldr / ai_bullets / ai_analysis_tier='triage' 저장 + analyze_events 기록 → escalate 시 EscalationEnvelope payload 로 deep_summary 큐 enqueue """ from __future__ import annotations import json import time from datetime import datetime, timezone from pydantic import BaseModel, Field, ValidationError from sqlalchemy import text as sql_text from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient, parse_json_response, strip_thinking from ai.envelope import EscalationEnvelope from core.config import settings from core.utils import setup_logger from models.document import Document from models.queue import enqueue_stage from policy.prompt_render import render_4b, policy_version as compute_policy_version from policy.routing import decide_routing from services.document_telemetry import record_analyze_event logger = setup_logger("classify_worker") MAX_CLASSIFY_TEXT = 8000 # legacy classify 입력 상한 TRIAGE_TEXT_LIMIT = 120_000 # p3a summary_triage 입력 상한 (4B) DEEP_PRIMARY_LIMIT = 260_000 # primary context_char_limit (envelope slicing) # settings에서 taxonomy/document_types 로딩 DOCUMENT_TYPES = set(settings.document_types) # facet_doctype 허용값 (실무 문서 유형 — AI 식별 신호, library 자동 분류 제안 트리거) FACET_DOCTYPES = {"발주서", "세금계산서", "명세표", "도면", "증명서", "계획서", "시방서"} # 자료실 자동 분류 제안 대상 (거래 하위) LIBRARY_SUGGESTION_DOCTYPES = {"발주서", "세금계산서", "명세표"} # PR-B prompt_version task 이름 SUMMARY_TRIAGE_TASK = "p3a_short_summary" # R2 — hard escalate 사유 (suppress 되지 않는 critical reason). # 'risk_flag_requires_26b' 는 PR-A domain_policy 가 "이 문서 유형은 26B 필수" 로 지정한 # 신호 (safety_legal_interpretation / chemical_hazard / incident_causation 등). backlog # 가 많아도 억제하지 말 것 — safety 도메인 전수 coverage 보장. HARD_ESCALATE_REASONS = { "long_context", "triage_json_invalid", "low_confidence", "risk_flag_requires_26b", } # ───────────────────────── TriageOutput (R1) ───────────────────────── class TriageOutput(BaseModel): """p3a_short_summary (4B) 응답 스키마. 파싱 실패 시 기본값 + escalate=True fallback.""" tldr: str = "" bullets: list[str] = Field(default_factory=list) tags: list[str] = Field(default_factory=list) doc_type: str = "" time_scope: str | None = None confidence: float = 0.3 high_impact_self_declared: bool = False high_impact_reason: str | None = None recommend_deep_summary: bool = False recommend_entity_pass: bool = False escalate_to_26b: bool = False risk_flags: list[str] = Field(default_factory=list) # ───────────────────────── legacy classify (primary) ────────────────── def _get_taxonomy_leaf_paths(taxonomy: dict, prefix: str = "") -> set[str]: """taxonomy dict에서 모든 유효한 경로를 추출""" paths = set() for key, value in taxonomy.items(): current = f"{prefix}/{key}" if prefix else key if isinstance(value, dict): if not value: paths.add(current) else: paths.update(_get_taxonomy_leaf_paths(value, current)) elif isinstance(value, list): if not value: paths.add(current) else: for leaf in value: paths.add(f"{current}/{leaf}") paths.add(current) # 2단계도 허용 (leaf가 없는 경우용) else: paths.add(current) return paths VALID_DOMAIN_PATHS = _get_taxonomy_leaf_paths(settings.taxonomy) def _validate_domain(domain: str) -> str: """domain이 taxonomy에 존재하는지 검증, 없으면 최대한 가까운 경로 찾기""" if domain in VALID_DOMAIN_PATHS: return domain parts = domain.split("/") for i in range(len(parts), 0, -1): partial = "/".join(parts[:i]) if partial in VALID_DOMAIN_PATHS: logger.warning(f"[분류] domain '{domain}' → '{partial}' (부분 매칭)") return partial logger.warning(f"[분류] domain '{domain}' taxonomy에 없음, General/Reading_Notes로 대체") return "General/Reading_Notes" # ───────────────────────── B-1 subject_domain 매칭 ───────────────────── def _match_subject_domain(doc: Document, text: str) -> str: """본문/메타/제목 기반으로 domain_policy.yaml 의 subject_domain 이름 결정. 매칭 우선순위 (feedback_category_vs_ai_domain_axis.md — category 매칭 금지): 1. source_channel (news / law_monitor / memo) 2. **제목(title) 기반 매칭** — 본문에 키워드가 한두 번 스치는 문서까지 잘못 잡지 않도록 제목의 의도를 우선 (예: PPE 선정기준.md 가 본문 MSDS 언급 한 번으로 msds 분류되는 문제 방지) 3. 본문 keywords (첫 8k, 대소문자 무시, 키워드 **2회 이상**) 4. ai_domain prefix — legacy classify 가 채운 taxonomy path 5. generic (fallback) """ sc = (doc.source_channel or "").lower() if sc in ("news", "news_collector"): return "news_item" if sc == "law_monitor": return "safety_reference" if sc == "memo": return "generic" # 제목 기반 매칭 — 우선순위 가장 높음. title 은 사용자 의도 시그널이라 정확. title = (doc.title or "").lower() def _title_hit(words: list[str]) -> bool: return any(w.lower() in title for w in words if w) if _title_hit(["산업안전보건법", "산안법", "중처법", "안전보건법", "시행규칙", "시행령"]): return "safety_reference" if _title_hit(["사고", "재해", "재발방지", "원인분석"]): return "incident_report" if _title_hit(["건강검진", "작업환경측정", "특수건강", "보건관리"]): return "health_record" if _title_hit(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "화기작업"]): return "hazard_specific" if _title_hit(["msds", "sds", "물질안전보건자료", "화학물질"]): return "msds" if _title_hit(["위험성평가", "작업허가", "jsa", "sop", "ptw", "ppe", "보호구", "안전작업"]): return "safety_operational" # 본문 keyword — 2회 이상 등장해야 도메인 매칭 (single-mention 오분류 방지) head = (text or "")[:8000].lower() def _body_count(keywords: list[str]) -> int: return sum(head.count(kw.lower()) for kw in keywords if kw) # 우선순위: 구체 먼저 if _body_count(["msds", "sds", "물질안전보건자료"]) >= 2: return "msds" if _body_count(["사고보고", "재해조사", "원인분석", "재발방지", "중대재해"]) >= 2: return "incident_report" if _body_count(["건강검진", "작업환경측정", "보건관리", "특수건강진단"]) >= 2: return "health_record" if _body_count(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "중독", "질식"]) >= 2: return "hazard_specific" if _body_count(["위험성평가", "작업허가서", "jsa", "sop", "보호구"]) >= 2: return "safety_operational" if _body_count(["산업안전보건법", "안전보건관리체계", "유해위험방지계획서"]) >= 2: return "safety_reference" if (doc.ai_domain or "").startswith("Industrial_Safety"): return "safety_reference" return "generic" def _slice_text_ranges(doc_id: int, total_len: int) -> dict: """R3 — 26B 에 전달할 원문 슬라이스. total_len ≤ primary limit 이면 단일 range, 초과 시 head/mid/tail 3조각.""" if total_len <= DEEP_PRIMARY_LIMIT: return { "doc_ids": [doc_id], "text_ranges": [{"doc_id": doc_id, "start": 0, "end": total_len}], } head_end = 120_000 mid_start = max(head_end, total_len // 2 - 10_000) mid_end = mid_start + 20_000 tail_start = max(mid_end, total_len - 120_000) return { "doc_ids": [doc_id], "text_ranges": [ {"doc_id": doc_id, "start": 0, "end": head_end}, {"doc_id": doc_id, "start": mid_start, "end": mid_end}, {"doc_id": doc_id, "start": tail_start, "end": total_len}, ], } async def _check_backlog_guard(session: AsyncSession) -> str | None: """R2 — deep_summary 큐 적체 시 soft escalate 억제. 억제 사유 문자열 반환, 아니면 None.""" backlog = settings.ai.deep_summary_backlog window = backlog.window_minutes ratio_threshold = backlog.ratio_threshold pending_threshold = backlog.pending_threshold row = (await session.execute(sql_text(""" SELECT COUNT(*) FILTER (WHERE stage = 'classify') AS classify_n, COUNT(*) FILTER (WHERE stage = 'deep_summary') AS deep_n FROM processing_queue WHERE created_at > NOW() - make_interval(mins => :w) """), {"w": window})).one() classify_n = row.classify_n or 0 deep_n = row.deep_n or 0 ratio = (deep_n / classify_n) if classify_n > 0 else 0.0 pending_deep = int(await session.scalar(sql_text(""" SELECT COUNT(*) FROM processing_queue WHERE stage = 'deep_summary' AND status IN ('pending', 'processing') """)) or 0) if ratio > ratio_threshold or pending_deep > pending_threshold: return f"backlog_guard(ratio={ratio:.2f},pending={pending_deep})" return None def _classify_escalation_reason( triage_out: TriageOutput, input_chars: int, triage_limit: int, confidence_floor: float, routing_decision=None, ) -> str | None: """escalate_to_26b 가 True 이도록 만든 근본 사유 하나를 반환. 아니면 None. 우선순위 (높은 것부터 먼저 매칭): 1. long_context (입력 초과) — hard escalate 2. low_confidence (4B 자체 신뢰도 낮음) — hard escalate 3. self_declare (4B 가 자기 상한 선언) — soft 4. deep_requested (4B 가 recommend_deep_summary=true) — soft 5. PR-A routing policy (high_impact / risk_flag_requires_26b / multi_doc) — soft """ if input_chars > triage_limit: return "long_context" if triage_out.confidence < confidence_floor: return "low_confidence" if triage_out.escalate_to_26b: return "self_declare" if triage_out.recommend_deep_summary: return "deep_requested" # PR-A 정책 반영 — domain_policy.yaml 의 high_impact=true / default_risk_flags 의 # requires_26b=true 때문에 decide_routing 이 escalate_to_26b=True 로 판정하면 # 그 첫 reason 을 사용. safety_reference/msds/hazard_specific/incident_report 등 # safety/health 도메인은 여기서 escalate 됨. if routing_decision and routing_decision.escalate_to_26b: reasons = routing_decision.escalation_reasons if reasons: # high_impact 는 모든 safety 도메인에 공통이라 너무 흔함. 더 구체적인 사유를 우선. for r in reasons: if r != "high_impact": return r return reasons[0] return "policy_escalate" return None def _is_hard_escalate(reason: str | None) -> bool: """R2 — hard escalate 는 suppress 되지 않는다.""" return reason in HARD_ESCALATE_REASONS def _distill(triage_out: TriageOutput, limit: int = 2000) -> str: """26B envelope.distilled_context 용 4B 출력 압축 텍스트.""" parts: list[str] = [] if triage_out.tldr: parts.append(f"TL;DR: {triage_out.tldr}") if triage_out.bullets: parts.append("핵심:") parts.extend(f"- {b}" for b in triage_out.bullets) if triage_out.doc_type: parts.append(f"doc_type: {triage_out.doc_type}") if triage_out.tags: parts.append(f"tags: {', '.join(triage_out.tags)}") return "\n".join(parts)[:limit] # ───────────────────────── main process ──────────────────────────────── async def process(document_id: int, session: AsyncSession) -> None: """문서 분류 + 요약 + tier triage. 1) Legacy: classify() → ai_domain/document_type/ai_tags/ai_confidence/ai_suggestion 2) Legacy: summarize() → ai_summary 3) PR-B B-1: summary_triage (4B) → ai_tldr/ai_bullets/ai_analysis_tier='triage' """ doc = await session.get(Document, document_id) if not doc: raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") if not doc.extracted_text: raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음") client = AIClient() try: # ─── 1. Legacy classify (primary 26B) ─── truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT] raw_response = await client.classify(truncated) parsed = parse_json_response(raw_response) if not parsed: raise ValueError(f"AI 응답에서 JSON 추출 실패: {raw_response[:200]}") # domain 검증 domain = _validate_domain(parsed.get("domain", "")) doc.ai_domain = domain # sub_group은 domain 경로에서 추출 (호환성) parts = domain.split("/") doc.ai_sub_group = parts[1] if len(parts) > 1 else "" # document_type 검증 doc_type = parsed.get("document_type", "") doc.document_type = doc_type if doc_type in DOCUMENT_TYPES else "Note" # confidence confidence = parsed.get("confidence", 0.5) doc.ai_confidence = max(0.0, min(1.0, float(confidence))) # importance importance = parsed.get("importance", "medium") doc.importance = importance if importance in ("high", "medium", "low") else "medium" # tags doc.ai_tags = parsed.get("tags", [])[:5] # source/origin if parsed.get("sourceChannel") and not doc.source_channel: doc.source_channel = parsed["sourceChannel"] if parsed.get("dataOrigin") and not doc.data_origin: doc.data_origin = parsed["dataOrigin"] # 용도 (AI는 빈 값만 채움 — 수동/업로드 명시값 우선) if parsed.get("docPurpose") and not doc.doc_purpose: purpose = parsed["docPurpose"] if purpose in ("business", "knowledge"): doc.doc_purpose = purpose # ─── facet_doctype 식별 (§1 실무 문서 유형 신호) ─── ai_doctype_raw = parsed.get("facet_doctype") ai_doctype = ai_doctype_raw if ai_doctype_raw in FACET_DOCTYPES else None if ai_doctype and not doc.facet_doctype: doc.facet_doctype = ai_doctype # ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ─── if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES: year = doc.facet_year or datetime.now(timezone.utc).year doc.ai_suggestion = { "proposed_category": "library", "proposed_path": f"@library/거래/{year}/{ai_doctype}", "proposed_doctype": ai_doctype, "confidence": doc.ai_confidence, "source_updated_at": ( doc.updated_at.isoformat() if doc.updated_at else None ), "reason": "classify pipeline", } # ─── 2. Legacy 요약 (primary 26B) ─── summary = await client.summarize(doc.extracted_text[:50000]) doc.ai_summary = strip_thinking(summary) # ─── 메타데이터 (legacy 완료) ─── doc.ai_model_version = settings.ai.primary.model doc.ai_processed_at = datetime.now(timezone.utc) logger.info( f"[분류] document_id={document_id}: " f"domain={domain}, type={doc.document_type}, " f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}" ) # ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ─── try: await _run_tier_triage(client, doc, session) except Exception as exc: logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}") finally: await client.close() async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None: """summary_triage (p3a_short_summary) 경로.""" document_id = doc.id text = doc.extracted_text or "" input_chars = len(text) subject_domain = _match_subject_domain(doc, text) triage_start = time.perf_counter() parse_error: str | None = None triage_out = TriageOutput() # 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate if input_chars > TRIAGE_TEXT_LIMIT: # routing_decision 은 long_context 경로에선 content_chars 로 바로 판정 rd = None try: rd = decide_routing( subject_domain=subject_domain, content_chars=input_chars, deterministic_keyword_hits=(), self_declared_high_impact=False, self_declared_risk_flags=(), confidence=0.3, evidence_doc_count=0, ) except Exception: pass await _apply_triage_result( doc=doc, session=session, triage_out=triage_out, subject_domain=subject_domain, input_chars=input_chars, latency_ms=0, escalation_reason="long_context", parse_error=None, routing_decision=rd, ) return try: rendered = render_4b(SUMMARY_TRIAGE_TASK, subject_domain) except Exception as exc: logger.exception(f"[triage] render_4b 실패 subject={subject_domain}: {exc}") return # 템플릿의 {extracted_text} 는 단일중괄호로 format 후 남아있음. str.replace 로 주입. prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT]) try: raw_triage = await client.call_triage(prompt) except Exception as exc: logger.warning(f"[triage] 4B 호출 실패 id={document_id}: {exc}") parse_error = "call_failed" raw_triage = "" latency_ms = int((time.perf_counter() - triage_start) * 1000) # R1 — JSON parsing + schema validation + fallback if raw_triage: try: parsed_triage = parse_json_response(raw_triage) or {} triage_out = TriageOutput.model_validate(parsed_triage) except (ValidationError, ValueError, TypeError) as exc: parse_error = f"parse:{type(exc).__name__}" logger.warning(f"[triage] JSON 파싱/검증 실패 → fallback: {exc}") if parse_error: triage_out = triage_out.model_copy(update={ "escalate_to_26b": True, "confidence": 0.3, }) # decide_routing 먼저 계산 — _classify_escalation_reason 이 PR-A high_impact / risk_flag # 결정을 존중하도록 routing_decision 을 전달. routing_decision = None try: routing_decision = decide_routing( subject_domain=subject_domain, content_chars=input_chars, deterministic_keyword_hits=(), self_declared_high_impact=triage_out.high_impact_self_declared, self_declared_risk_flags=tuple(triage_out.risk_flags), confidence=triage_out.confidence, evidence_doc_count=0, ) except Exception as exc: logger.warning(f"[triage] decide_routing 실패 id={document_id}: {exc}") if parse_error: escalation_reason: str | None = "triage_json_invalid" else: escalation_reason = _classify_escalation_reason( triage_out, input_chars=input_chars, triage_limit=TRIAGE_TEXT_LIMIT, confidence_floor=0.6, routing_decision=routing_decision, ) await _apply_triage_result( doc=doc, session=session, triage_out=triage_out, subject_domain=subject_domain, input_chars=input_chars, latency_ms=latency_ms, escalation_reason=escalation_reason, parse_error=parse_error, routing_decision=routing_decision, ) async def _apply_triage_result( *, doc: Document, session: AsyncSession, triage_out: TriageOutput, subject_domain: str, input_chars: int, latency_ms: int, escalation_reason: str | None, parse_error: str | None, routing_decision=None, ) -> None: """TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit. routing_decision 은 호출자(_run_tier_triage) 가 이미 계산해 전달. 여기서 다시 계산하지 않는다 (escalation_reason 판정이 routing_decision 결과에 의존하므로 양쪽이 같은 값을 봐야 함). """ document_id = doc.id # Document 필드 (파싱 실패 시에는 legacy ai_summary 가 노출되도록 tldr 비움) if not parse_error: doc.ai_tldr = (triage_out.tldr or "").strip() or None doc.ai_bullets = triage_out.bullets or [] doc.ai_analysis_tier = "triage" # R2 — backlog guard (hard 제외 soft escalate 만 억제) suppressed_reason: str | None = None escalate = escalation_reason is not None if escalate and not _is_hard_escalate(escalation_reason): suppressed_reason = await _check_backlog_guard(session) if suppressed_reason: escalate = False escalation_reason = None # enqueue deep_summary if escalate: try: # PR-A envelope.ValidFromStage 기준 — P3a 에서 에스컬레이션은 'summarize_short'. # 내부 task 이름 'summary_triage' 는 analyze_events.prompt_version 에만 쓰고, # envelope.from_stage 는 PR-A 가 정의한 enum 값을 따른다. envelope = EscalationEnvelope( from_stage="summarize_short", escalation_reasons=tuple([escalation_reason] if escalation_reason else []), risk_flags=tuple(routing_decision.risk_flags) if routing_decision else tuple(triage_out.risk_flags), distilled_context=_distill(triage_out), original_pointers=_slice_text_ranges(document_id, input_chars), synthesis_directives=( tuple(routing_decision.synthesis_directives) if routing_decision else () ), user_intent=None, draft_hint=(triage_out.tldr or None), ) await enqueue_stage( session, document_id, "deep_summary", payload={ "envelope": json.loads(envelope.to_json()), "subject_domain": subject_domain, }, ) except Exception as exc: logger.exception(f"[triage] envelope enqueue 실패 id={document_id}: {exc}") # analyze_events try: pv = compute_policy_version(SUMMARY_TRIAGE_TASK) except Exception: pv = None escalation_reasons_list = ( list(routing_decision.escalation_reasons) if routing_decision else ([escalation_reason] if escalation_reason else []) ) shadow_target = ( "primary" if (routing_decision and routing_decision.escalate_to_26b) else "triage" ) await record_analyze_event( doc_id=document_id, user_id=None, mode="summary_triage", text_limit=TRIAGE_TEXT_LIMIT, truncated=input_chars > TRIAGE_TEXT_LIMIT, layers_returned=["tldr", "bullets"] if not parse_error else [], cached=False, latency_ms=latency_ms, model_name=settings.ai.triage.model, prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK), error_code=parse_error, source="document_server", subject_domain=subject_domain, risk_flags=(list(routing_decision.risk_flags) if routing_decision else list(triage_out.risk_flags)), high_impact_task=(routing_decision.high_impact_task if routing_decision else None), escalation_reasons=escalation_reasons_list, confidence=triage_out.confidence, policy_version=pv, shadow_would_route_to=shadow_target, tier="triage", escalated_to_26b=escalate, suppressed_reason=suppressed_reason, ) logger.info( f"[triage] id={document_id} subject={subject_domain} " f"escalate={escalate} reason={escalation_reason} " f"suppressed={bool(suppressed_reason)} " f"confidence={triage_out.confidence:.2f} " f"tldr_len={len(doc.ai_tldr or '')} bullets={len(doc.ai_bullets or [])}" )