Files
hyungi_document_server/app/workers/classify_worker.py
T
hyungi 8998cbea8c ops(triage): PR-4B-Diagnose — exception logging 강화 (type/repr/exc_info)
Layer 1 root cause 진단을 위해 classify_worker.py:595 의 exception logging
을 lazy formatting + exc_info=True 로 강화. f-string 1줄 → 5줄 block.
- type=%s: exception class name (TimeoutError/JSONDecodeError/ValueError/etc.)
- repr=%r: full exception state
- exc_info=True: traceback 까지 capture (wrapper 정확 지점 추적)

본 PR scope = Diagnose only. Layer 1 specific fix (H1/H2/H3/H4) + Layer 2
escalate path ai_event_kind fallback set 은 별 PR queue.

plan: ~/.claude/plans/c-1-pr-infra-drift-1-phase-1b-linear-frost.md
backup: app/workers/classify_worker.py.pre-4b-diagnose.20260517
2026-05-17 06:22:27 +00:00

787 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AI 분류 워커 — 도메인/문서타입/태그/요약 생성 + PR-B B-1 tier triage.
Legacy 경로 (primary 26B 호출):
- app/prompts/classify.txt 기반 classify()
- AIClient.summarize() 로 ai_summary 생성
- facet_doctype + ai_suggestion (library 제안) 저장
→ ai_domain / ai_sub_group / document_type / ai_confidence / ai_tags /
ai_summary / ai_suggestion / facet_doctype / importance 필드
PR-B B-1 tier triage (Mac mini 26B MLX, config.yaml ai.models.triage):
- policy.routing.decide_routing 으로 RoutingDecision
- policy.prompt_render.render_4b("p3a_short_summary", subject_domain) 로 프롬프트 렌더
- AIClient.call_triage(rendered) 호출 (llm_gate 외부, Mac mini 26B MLX — concurrent 안전성 별 검토)
- TriageOutput pydantic validate + JSON 깨짐 시 fallback escalate (R1)
- R2 backlog guard: deep_summary 큐 ratio > threshold or pending >= threshold 이면 suppress
- R3 head/middle/tail: 260k 초과 시 envelope text_ranges 3조각
→ ai_tldr / ai_bullets / ai_analysis_tier='triage' 저장 + analyze_events 기록
→ escalate 시 EscalationEnvelope payload 로 deep_summary 큐 enqueue
"""
from __future__ import annotations
import json
import re
import time
from datetime import datetime, timezone
import yaml
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import text as sql_text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response, strip_thinking
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.queue import enqueue_stage
from policy.prompt_render import render_4b, policy_version as compute_policy_version
from policy.routing import decide_routing
from services.document_telemetry import record_analyze_event
logger = setup_logger("classify_worker")
MAX_CLASSIFY_TEXT = 8000 # legacy classify 입력 상한
TRIAGE_TEXT_LIMIT = 120_000 # p3a summary_triage 입력 상한 (4B)
DEEP_PRIMARY_LIMIT = 260_000 # primary context_char_limit (envelope slicing)
# settings에서 taxonomy/document_types 로딩
DOCUMENT_TYPES = set(settings.document_types)
# facet_doctype 허용값 (실무 문서 유형 — AI 식별 신호, library 자동 분류 제안 트리거)
FACET_DOCTYPES = {"발주서", "세금계산서", "명세표", "도면", "증명서", "계획서", "시방서"}
# 자료실 자동 분류 제안 대상 (거래 하위)
LIBRARY_SUGGESTION_DOCTYPES = {"발주서", "세금계산서", "명세표"}
# PR-B prompt_version task 이름
SUMMARY_TRIAGE_TASK = "p3a_short_summary"
# R2 — hard escalate 사유 (suppress 되지 않는 critical reason).
# 'risk_flag_requires_26b' 는 PR-A domain_policy 가 "이 문서 유형은 26B 필수" 로 지정한
# 신호 (safety_legal_interpretation / chemical_hazard / incident_causation 등). backlog
# 가 많아도 억제하지 말 것 — safety 도메인 전수 coverage 보장.
HARD_ESCALATE_REASONS = {
"long_context", "triage_json_invalid", "low_confidence",
"risk_flag_requires_26b",
}
# ───────────────────────── TriageOutput (R1) ─────────────────────────
class TriageOutput(BaseModel):
"""p3a_short_summary (4B) 응답 스키마. 파싱 실패 시 기본값 + escalate=True fallback."""
tldr: str = ""
bullets: list[str] = Field(default_factory=list)
tags: list[str] = Field(default_factory=list)
doc_type: str = ""
time_scope: str | None = None
confidence: float = 0.3
high_impact_self_declared: bool = False
high_impact_reason: str | None = None
recommend_deep_summary: bool = False
recommend_entity_pass: bool = False
escalate_to_26b: bool = False
risk_flags: list[str] = Field(default_factory=list)
# Memo Intake Upgrade PR-2B — 메모 의도 분류 hint (선택 응답)
# 4B 가 출력하지 않으면 None 유지. AI 자동 events 생성 X (사용자 promote 시점만).
event_kind_hint: str | None = None # 'note' | 'task' | 'calendar_event' | 'activity_log' | 'reference'
event_kind_confidence: float | None = None # 0.01.0
# ───────────────────────── legacy classify (primary) ──────────────────
def _get_taxonomy_leaf_paths(taxonomy: dict, prefix: str = "") -> set[str]:
"""taxonomy dict에서 모든 유효한 경로를 추출"""
paths = set()
for key, value in taxonomy.items():
current = f"{prefix}/{key}" if prefix else key
if isinstance(value, dict):
if not value:
paths.add(current)
else:
paths.update(_get_taxonomy_leaf_paths(value, current))
elif isinstance(value, list):
if not value:
paths.add(current)
else:
for leaf in value:
paths.add(f"{current}/{leaf}")
paths.add(current) # 2단계도 허용 (leaf가 없는 경우용)
else:
paths.add(current)
return paths
VALID_DOMAIN_PATHS = _get_taxonomy_leaf_paths(settings.taxonomy)
def _validate_domain(domain: str) -> str:
"""domain이 taxonomy에 존재하는지 검증, 없으면 최대한 가까운 경로 찾기"""
if domain in VALID_DOMAIN_PATHS:
return domain
parts = domain.split("/")
for i in range(len(parts), 0, -1):
partial = "/".join(parts[:i])
if partial in VALID_DOMAIN_PATHS:
logger.warning(f"[분류] domain '{domain}''{partial}' (부분 매칭)")
return partial
logger.warning(f"[분류] domain '{domain}' taxonomy에 없음, General/Reading_Notes로 대체")
return "General/Reading_Notes"
# ───────────────────────── B-1 subject_domain 매칭 ─────────────────────
def _match_subject_domain(doc: Document, text: str) -> str:
"""본문/메타/제목 기반으로 domain_policy.yaml 의 subject_domain 이름 결정.
매칭 우선순위 (feedback_category_vs_ai_domain_axis.md — category 매칭 금지):
1. source_channel (news / law_monitor / memo)
2. **제목(title) 기반 매칭** — 본문에 키워드가 한두 번 스치는 문서까지 잘못 잡지 않도록
제목의 의도를 우선 (예: PPE 선정기준.md 가 본문 MSDS 언급 한 번으로 msds 분류되는 문제 방지)
3. 본문 keywords (첫 8k, 대소문자 무시, 키워드 **2회 이상**)
4. ai_domain prefix — legacy classify 가 채운 taxonomy path
5. generic (fallback)
"""
sc = (doc.source_channel or "").lower()
if sc in ("news", "news_collector"):
return "news_item"
if sc == "law_monitor":
return "safety_reference"
if sc == "memo":
return "generic"
# 제목 기반 매칭 — 우선순위 가장 높음. title 은 사용자 의도 시그널이라 정확.
title = (doc.title or "").lower()
def _title_hit(words: list[str]) -> bool:
return any(w.lower() in title for w in words if w)
if _title_hit(["산업안전보건법", "산안법", "중처법", "안전보건법", "시행규칙", "시행령"]):
return "safety_reference"
if _title_hit(["사고", "재해", "재발방지", "원인분석"]):
return "incident_report"
if _title_hit(["건강검진", "작업환경측정", "특수건강", "보건관리"]):
return "health_record"
if _title_hit(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "화기작업"]):
return "hazard_specific"
if _title_hit(["msds", "sds", "물질안전보건자료", "화학물질"]):
return "msds"
if _title_hit(["위험성평가", "작업허가", "jsa", "sop", "ptw", "ppe", "보호구", "안전작업"]):
return "safety_operational"
# 본문 keyword — 2회 이상 등장해야 도메인 매칭 (single-mention 오분류 방지)
head = (text or "")[:8000].lower()
def _body_count(keywords: list[str]) -> int:
return sum(head.count(kw.lower()) for kw in keywords if kw)
# 우선순위: 구체 먼저
if _body_count(["msds", "sds", "물질안전보건자료"]) >= 2:
return "msds"
if _body_count(["사고보고", "재해조사", "원인분석", "재발방지", "중대재해"]) >= 2:
return "incident_report"
if _body_count(["건강검진", "작업환경측정", "보건관리", "특수건강진단"]) >= 2:
return "health_record"
if _body_count(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "중독", "질식"]) >= 2:
return "hazard_specific"
if _body_count(["위험성평가", "작업허가서", "jsa", "sop", "보호구"]) >= 2:
return "safety_operational"
if _body_count(["산업안전보건법", "안전보건관리체계", "유해위험방지계획서"]) >= 2:
return "safety_reference"
if (doc.ai_domain or "").startswith("Industrial_Safety"):
return "safety_reference"
return "generic"
def _slice_text_ranges(doc_id: int, total_len: int) -> dict:
"""R3 — 26B 에 전달할 원문 슬라이스. total_len ≤ primary limit 이면 단일 range, 초과 시 head/mid/tail 3조각."""
if total_len <= DEEP_PRIMARY_LIMIT:
return {
"doc_ids": [doc_id],
"text_ranges": [{"doc_id": doc_id, "start": 0, "end": total_len}],
}
head_end = 120_000
mid_start = max(head_end, total_len // 2 - 10_000)
mid_end = mid_start + 20_000
tail_start = max(mid_end, total_len - 120_000)
return {
"doc_ids": [doc_id],
"text_ranges": [
{"doc_id": doc_id, "start": 0, "end": head_end},
{"doc_id": doc_id, "start": mid_start, "end": mid_end},
{"doc_id": doc_id, "start": tail_start, "end": total_len},
],
}
async def _check_backlog_guard(session: AsyncSession) -> str | None:
"""R2 — deep_summary 큐 적체 시 soft escalate 억제. 억제 사유 문자열 반환, 아니면 None."""
backlog = settings.ai.deep_summary_backlog
window = backlog.window_minutes
ratio_threshold = backlog.ratio_threshold
pending_threshold = backlog.pending_threshold
row = (await session.execute(sql_text("""
SELECT
COUNT(*) FILTER (WHERE stage = 'classify') AS classify_n,
COUNT(*) FILTER (WHERE stage = 'deep_summary') AS deep_n
FROM processing_queue
WHERE created_at > NOW() - make_interval(mins => :w)
"""), {"w": window})).one()
classify_n = row.classify_n or 0
deep_n = row.deep_n or 0
ratio = (deep_n / classify_n) if classify_n > 0 else 0.0
pending_deep = int(await session.scalar(sql_text("""
SELECT COUNT(*) FROM processing_queue
WHERE stage = 'deep_summary' AND status IN ('pending', 'processing')
""")) or 0)
if ratio > ratio_threshold or pending_deep > pending_threshold:
return f"backlog_guard(ratio={ratio:.2f},pending={pending_deep})"
return None
def _classify_escalation_reason(
triage_out: TriageOutput,
input_chars: int,
triage_limit: int,
confidence_floor: float,
routing_decision=None,
) -> str | None:
"""escalate_to_26b 가 True 이도록 만든 근본 사유 하나를 반환. 아니면 None.
우선순위 (높은 것부터 먼저 매칭):
1. long_context (입력 초과) — hard escalate
2. low_confidence (4B 자체 신뢰도 낮음) — hard escalate
3. self_declare (4B 가 자기 상한 선언) — soft
4. deep_requested (4B 가 recommend_deep_summary=true) — soft
5. PR-A routing policy (high_impact / risk_flag_requires_26b / multi_doc) — soft
"""
if input_chars > triage_limit:
return "long_context"
if triage_out.confidence < confidence_floor:
return "low_confidence"
if triage_out.escalate_to_26b:
return "self_declare"
if triage_out.recommend_deep_summary:
return "deep_requested"
# PR-A 정책 반영 — domain_policy.yaml 의 high_impact=true / default_risk_flags 의
# requires_26b=true 때문에 decide_routing 이 escalate_to_26b=True 로 판정하면
# 그 첫 reason 을 사용. safety_reference/msds/hazard_specific/incident_report 등
# safety/health 도메인은 여기서 escalate 됨.
if routing_decision and routing_decision.escalate_to_26b:
reasons = routing_decision.escalation_reasons
if reasons:
# high_impact 는 모든 safety 도메인에 공통이라 너무 흔함. 더 구체적인 사유를 우선.
for r in reasons:
if r != "high_impact":
return r
return reasons[0]
return "policy_escalate"
return None
def _is_hard_escalate(reason: str | None) -> bool:
"""R2 — hard escalate 는 suppress 되지 않는다."""
return reason in HARD_ESCALATE_REASONS
def _distill(triage_out: TriageOutput, limit: int = 2000) -> str:
"""26B envelope.distilled_context 용 4B 출력 압축 텍스트."""
parts: list[str] = []
if triage_out.tldr:
parts.append(f"TL;DR: {triage_out.tldr}")
if triage_out.bullets:
parts.append("핵심:")
parts.extend(f"- {b}" for b in triage_out.bullets)
if triage_out.doc_type:
parts.append(f"doc_type: {triage_out.doc_type}")
if triage_out.tags:
parts.append(f"tags: {', '.join(triage_out.tags)}")
return "\n".join(parts)[:limit]
# ───────────────────── frontmatter 파싱 (옵션 C) ──────────────────────
# YAML frontmatter (--- ... ---) + body 분리. body 가 없거나 frontmatter 가 형식 오류여도 안전하게 fallback.
_FM_PATTERN = re.compile("^---\\s*\\n(.*?)\\n---\\s*\\n?(.*)$", re.DOTALL)
def _parse_frontmatter(extracted_text: str) -> tuple[dict, str]:
"""extracted_text 시작에 YAML frontmatter 가 있으면 (frontmatter_dict, body) 반환.
없으면 ({}, extracted_text). YAML 파싱 실패 시도 ({}, extracted_text) 로 안전 fallback.
"""
if not extracted_text or not extracted_text.startswith("---"):
return {}, extracted_text
m = _FM_PATTERN.match(extracted_text)
if not m:
return {}, extracted_text
fm_text, body = m.group(1), m.group(2)
try:
fm = yaml.safe_load(fm_text)
if not isinstance(fm, dict):
return {}, extracted_text
return fm, body
except yaml.YAMLError:
return {}, extracted_text
# frontmatter 우선 인식: code/section/source_pdf/source_pages/source_basis/verified_level/verification_pending
# 등 원문 추적 메타데이터는 LLM 이 절대 덮어쓰지 못하게 차단.
_FRONTMATTER_PRESERVED_KEYS = {
"code", "section", "source_pdf", "source_pages", "source_basis",
"verified_level", "verification_pending", "source_type", "kgs_code",
}
# ───────────────────────── main process ────────────────────────────────
async def process(document_id: int, session: AsyncSession) -> None:
"""문서 분류 + 요약 + tier triage.
1) Legacy: classify() → ai_domain/document_type/ai_tags/ai_confidence/ai_suggestion
2) Legacy: summarize() → ai_summary
3) PR-B B-1: summary_triage (4B) → ai_tldr/ai_bullets/ai_analysis_tier='triage'
예외 — source_channel='law_monitor':
법령은 외부 source-of-truth (law.go.kr) 보유 + immutable + 자동 재수집.
AI 분류는 무가치 + 본문 해석 환각 위험. 26B legacy + 4B triage 전부 skip.
최소 필드만 세팅 후 return → queue_consumer 가 embed/chunk 자동 chain.
참고: feedback_category_vs_ai_domain_axis.md, plan stateless-churning-raccoon.md.
"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if doc.source_channel == "law_monitor":
if not doc.ai_domain:
doc.ai_domain = "법령"
if not doc.ai_tags:
doc.ai_tags = ["법령"]
if not doc.importance:
doc.importance = "medium"
await session.commit()
logger.info(f"doc {document_id}: law_monitor → classify skip")
return
# Web/Blog ingest (devonagent 트랙) — plan db-snuggly-petal.md
# queue_consumer override 가 classify 를 skip 시키지만, 우회 경로 (예: 수동 enqueue)
# 로 들어왔을 때 안전망. ai_tldr/ai_bullets 같은 LLM 가공은 별 PR (Mac mini derived-worker).
if doc.source_channel == "devonagent":
from urllib.parse import urlparse
if not doc.ai_domain:
doc.ai_domain = "Web"
if not doc.ai_tags:
host = (urlparse(doc.edit_url or "").hostname or "web").lower()
doc.ai_tags = [f"Web/{host}"]
if not doc.importance:
doc.importance = "medium"
await session.commit()
logger.info(f"doc {document_id}: devonagent → classify skip")
return
if not doc.extracted_text:
raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음")
# ─── 옵션 C: markdown frontmatter 우선 인식 ───────────────────────────
# KGS Code 등 외부 작성 마크다운은 frontmatter 에 정확한 메타가 있다.
# title / tags / ai_summary / ai_domain 은 frontmatter 에 있으면 그대로 사용,
# 없는 필드만 LLM 호출. code/section/source_pages/verified_level 등 원문
# 추적 메타는 documents.md_frontmatter JSONB 에 보존하고 LLM 이 덮어쓰지 못하게 한다.
fm, body = _parse_frontmatter(doc.extracted_text)
if fm:
# frontmatter 전체를 md_frontmatter JSONB 에 저장 (원문 추적용 single source)
doc.md_frontmatter = fm
# 우선 반영 (LLM 보다 신뢰도 높음, frontmatter 가 authoritative)
if fm.get("title"):
doc.title = str(fm["title"])
fm_tags = fm.get("tags")
if isinstance(fm_tags, list) and fm_tags:
# ai_tags 에 frontmatter 태그 우선 적재 (LLM 이 추가만 가능)
doc.ai_tags = [str(t) for t in fm_tags]
if fm.get("ai_domain"):
doc.ai_domain = str(fm["ai_domain"])
parts = doc.ai_domain.split("/")
if len(parts) > 1 and not doc.ai_sub_group:
doc.ai_sub_group = parts[1]
if fm.get("ai_sub_group"):
doc.ai_sub_group = str(fm["ai_sub_group"])
if fm.get("document_type"):
doc.document_type = str(fm["document_type"])
if fm.get("ai_summary"):
doc.ai_summary = str(fm["ai_summary"])
if fm.get("importance") in ("high", "medium", "low"):
doc.importance = fm["importance"]
# 핵심 메타 (title + ai_domain + ai_summary) 가 모두 frontmatter 로 채워졌으면
# LLM classify/summarize 스킵. tier triage 도 스킵 (frontmatter 가 더 정확).
# frontmatter 미커버 필드는 그대로 두어 향후 필요 시 manual UI 채움.
if doc.title and doc.ai_domain and doc.ai_summary:
if not doc.ai_confidence:
doc.ai_confidence = 1.0 # frontmatter 는 사람이 작성한 단정값
doc.ai_processed_at = datetime.now(timezone.utc)
doc.ai_model_version = "frontmatter@manual"
await session.commit()
logger.info(f"doc {document_id}: frontmatter 옵션 C → classify/summarize/triage 전부 skip")
return
# 일부만 frontmatter 에 있을 때는 LLM 으로 미설정 필드 보완. 단 _FRONTMATTER_PRESERVED_KEYS
# 는 이미 md_frontmatter 에 있으므로 LLM 이 ai_domain/document_type 등에 영향 못 준다.
logger.info(f"doc {document_id}: frontmatter 부분 인식 → LLM 으로 미설정 필드 보완")
client = AIClient()
try:
# ─── 1. Legacy classify (primary 26B) ───
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
raw_response = await client.classify(truncated)
parsed = parse_json_response(raw_response)
if not parsed:
raise ValueError(f"AI 응답에서 JSON 추출 실패: {raw_response[:200]}")
# domain 검증 (frontmatter 가 이미 채웠으면 LLM 결과 무시)
domain = _validate_domain(parsed.get("domain", ""))
if not doc.ai_domain:
doc.ai_domain = domain
# sub_group은 domain 경로에서 추출 (호환성)
parts = domain.split("/")
doc.ai_sub_group = parts[1] if len(parts) > 1 else ""
# document_type 검증 (frontmatter 가 이미 채웠으면 LLM 결과 무시)
doc_type = parsed.get("document_type", "")
if not doc.document_type:
doc.document_type = doc_type if doc_type in DOCUMENT_TYPES else "Note"
# confidence
confidence = parsed.get("confidence", 0.5)
doc.ai_confidence = max(0.0, min(1.0, float(confidence)))
# importance
importance = parsed.get("importance", "medium")
doc.importance = importance if importance in ("high", "medium", "low") else "medium"
# tags
doc.ai_tags = parsed.get("tags", [])[:5]
# source/origin
if parsed.get("sourceChannel") and not doc.source_channel:
doc.source_channel = parsed["sourceChannel"]
# data_origin enum 검증 — AI 가 'knowledge' 같은 doc_purpose 값을 잘못 던지면
# asyncpg InvalidTextRepresentationError → 같은 session 후속 호출 cascade fail.
# 허용값(work/external) 외는 skip (NULL 유지).
if parsed.get("dataOrigin") and not doc.data_origin:
origin = parsed["dataOrigin"]
if origin in ("work", "external"):
doc.data_origin = origin
# 용도 (AI는 빈 값만 채움 — 수동/업로드 명시값 우선)
if parsed.get("docPurpose") and not doc.doc_purpose:
purpose = parsed["docPurpose"]
if purpose in ("business", "knowledge"):
doc.doc_purpose = purpose
# ─── facet_doctype 식별 (§1 실무 문서 유형 신호) ───
ai_doctype_raw = parsed.get("facet_doctype")
ai_doctype = ai_doctype_raw if ai_doctype_raw in FACET_DOCTYPES else None
if ai_doctype and not doc.facet_doctype:
doc.facet_doctype = ai_doctype
# ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ───
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES:
year = doc.facet_year or datetime.now(timezone.utc).year
doc.ai_suggestion = {
"proposed_category": "library",
"proposed_path": f"@library/거래/{year}/{ai_doctype}",
"proposed_doctype": ai_doctype,
"confidence": doc.ai_confidence,
"source_updated_at": (
doc.updated_at.isoformat() if doc.updated_at else None
),
"reason": "classify pipeline",
}
# ─── 2. Legacy 요약 (primary 26B) ───
summary = await client.summarize(doc.extracted_text[:50000])
doc.ai_summary = strip_thinking(summary)
# ─── 메타데이터 (legacy 완료) ───
doc.ai_model_version = settings.ai.primary.model
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
f"[분류] document_id={document_id}: "
f"domain={domain}, type={doc.document_type}, "
f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}"
)
# ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ───
try:
await _run_tier_triage(client, doc, session)
except Exception as exc:
logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}")
finally:
await client.close()
async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None:
"""summary_triage (p3a_short_summary) 경로."""
document_id = doc.id
text = doc.extracted_text or ""
input_chars = len(text)
subject_domain = _match_subject_domain(doc, text)
triage_start = time.perf_counter()
parse_error: str | None = None
triage_out = TriageOutput()
# 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate
if input_chars > TRIAGE_TEXT_LIMIT:
# routing_decision 은 long_context 경로에선 content_chars 로 바로 판정
rd = None
try:
rd = decide_routing(
subject_domain=subject_domain,
content_chars=input_chars,
deterministic_keyword_hits=(),
self_declared_high_impact=False,
self_declared_risk_flags=(),
confidence=0.3,
evidence_doc_count=0,
)
except Exception:
pass
await _apply_triage_result(
doc=doc,
session=session,
triage_out=triage_out,
subject_domain=subject_domain,
input_chars=input_chars,
latency_ms=0,
escalation_reason="long_context",
parse_error=None,
routing_decision=rd,
)
return
try:
rendered = render_4b(SUMMARY_TRIAGE_TASK, subject_domain)
except Exception as exc:
logger.exception(f"[triage] render_4b 실패 subject={subject_domain}: {exc}")
return
# 템플릿의 {extracted_text} 는 단일중괄호로 format 후 남아있음. str.replace 로 주입.
prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT])
try:
raw_triage = await client.call_triage(prompt)
except Exception as exc:
logger.warning(
"[triage] 4B 호출 실패 id=%s type=%s repr=%r",
document_id,
type(exc).__name__,
exc,
exc_info=True,
)
parse_error = "call_failed"
raw_triage = ""
latency_ms = int((time.perf_counter() - triage_start) * 1000)
# R1 — JSON parsing + schema validation + fallback
if raw_triage:
try:
parsed_triage = parse_json_response(raw_triage) or {}
triage_out = TriageOutput.model_validate(parsed_triage)
except (ValidationError, ValueError, TypeError) as exc:
parse_error = f"parse:{type(exc).__name__}"
logger.warning(f"[triage] JSON 파싱/검증 실패 → fallback: {exc}")
if parse_error:
triage_out = triage_out.model_copy(update={
"escalate_to_26b": True,
"confidence": 0.3,
})
# decide_routing 먼저 계산 — _classify_escalation_reason 이 PR-A high_impact / risk_flag
# 결정을 존중하도록 routing_decision 을 전달.
routing_decision = None
try:
routing_decision = decide_routing(
subject_domain=subject_domain,
content_chars=input_chars,
deterministic_keyword_hits=(),
self_declared_high_impact=triage_out.high_impact_self_declared,
self_declared_risk_flags=tuple(triage_out.risk_flags),
confidence=triage_out.confidence,
evidence_doc_count=0,
)
except Exception as exc:
logger.warning(f"[triage] decide_routing 실패 id={document_id}: {exc}")
if parse_error:
escalation_reason: str | None = "triage_json_invalid"
else:
escalation_reason = _classify_escalation_reason(
triage_out,
input_chars=input_chars,
triage_limit=TRIAGE_TEXT_LIMIT,
confidence_floor=0.6,
routing_decision=routing_decision,
)
await _apply_triage_result(
doc=doc,
session=session,
triage_out=triage_out,
subject_domain=subject_domain,
input_chars=input_chars,
latency_ms=latency_ms,
escalation_reason=escalation_reason,
parse_error=parse_error,
routing_decision=routing_decision,
)
async def _apply_triage_result(
*,
doc: Document,
session: AsyncSession,
triage_out: TriageOutput,
subject_domain: str,
input_chars: int,
latency_ms: int,
escalation_reason: str | None,
parse_error: str | None,
routing_decision=None,
) -> None:
"""TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit.
routing_decision 은 호출자(_run_tier_triage) 가 이미 계산해 전달. 여기서 다시
계산하지 않는다 (escalation_reason 판정이 routing_decision 결과에 의존하므로
양쪽이 같은 값을 봐야 함).
"""
document_id = doc.id
# Document 필드 (파싱 실패 시에는 legacy ai_summary 가 노출되도록 tldr 비움)
if not parse_error:
doc.ai_tldr = (triage_out.tldr or "").strip() or None
doc.ai_bullets = triage_out.bullets or []
# Memo Intake Upgrade PR-2B — event kind hint (4B 가 출력했을 때만)
# 허용 enum 외 값이면 무시 (DB enum 제약). AI worker 는 events row 직접 생성 X.
valid_kinds = {"note", "task", "calendar_event", "activity_log", "reference"}
hint = (triage_out.event_kind_hint or "").strip().lower() or None
if hint in valid_kinds:
doc.ai_event_kind = hint
try:
conf = triage_out.event_kind_confidence
if conf is not None and 0.0 <= float(conf) <= 1.0:
doc.ai_event_confidence = float(conf)
except (TypeError, ValueError):
pass
doc.ai_analysis_tier = "triage"
# R2 — backlog guard (hard 제외 soft escalate 만 억제)
suppressed_reason: str | None = None
escalate = escalation_reason is not None
if escalate and not _is_hard_escalate(escalation_reason):
suppressed_reason = await _check_backlog_guard(session)
if suppressed_reason:
escalate = False
escalation_reason = None
# enqueue deep_summary
if escalate:
try:
# PR-A envelope.ValidFromStage 기준 — P3a 에서 에스컬레이션은 'summarize_short'.
# 내부 task 이름 'summary_triage' 는 analyze_events.prompt_version 에만 쓰고,
# envelope.from_stage 는 PR-A 가 정의한 enum 값을 따른다.
envelope = EscalationEnvelope(
from_stage="summarize_short",
escalation_reasons=tuple([escalation_reason] if escalation_reason else []),
risk_flags=tuple(routing_decision.risk_flags) if routing_decision
else tuple(triage_out.risk_flags),
distilled_context=_distill(triage_out),
original_pointers=_slice_text_ranges(document_id, input_chars),
synthesis_directives=(
tuple(routing_decision.synthesis_directives) if routing_decision else ()
),
user_intent=None,
draft_hint=(triage_out.tldr or None),
)
await enqueue_stage(
session,
document_id,
"deep_summary",
payload={
"envelope": json.loads(envelope.to_json()),
"subject_domain": subject_domain,
},
)
except Exception as exc:
logger.exception(f"[triage] envelope enqueue 실패 id={document_id}: {exc}")
# analyze_events
try:
pv = compute_policy_version(SUMMARY_TRIAGE_TASK)
except Exception:
pv = None
escalation_reasons_list = (
list(routing_decision.escalation_reasons) if routing_decision
else ([escalation_reason] if escalation_reason else [])
)
shadow_target = (
"primary" if (routing_decision and routing_decision.escalate_to_26b) else "triage"
)
await record_analyze_event(
doc_id=document_id,
user_id=None,
mode="summary_triage",
text_limit=TRIAGE_TEXT_LIMIT,
truncated=input_chars > TRIAGE_TEXT_LIMIT,
layers_returned=["tldr", "bullets"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=settings.ai.triage.model,
prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK),
error_code=parse_error,
source="document_server",
subject_domain=subject_domain,
risk_flags=(list(routing_decision.risk_flags) if routing_decision
else list(triage_out.risk_flags)),
high_impact_task=(routing_decision.high_impact_task if routing_decision else None),
escalation_reasons=escalation_reasons_list,
confidence=triage_out.confidence,
policy_version=pv,
shadow_would_route_to=shadow_target,
tier="triage",
escalated_to_26b=escalate,
suppressed_reason=suppressed_reason,
)
logger.info(
f"[triage] id={document_id} subject={subject_domain} "
f"escalate={escalate} reason={escalation_reason} "
f"suppressed={bool(suppressed_reason)} "
f"confidence={triage_out.confidence:.2f} "
f"tldr_len={len(doc.ai_tldr or '')} bullets={len(doc.ai_bullets or [])}"
)