From 6fdc48e5b6c0ef0094ddd782bba873a26723d421 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Fri, 24 Apr 2026 10:22:09 +0900 Subject: [PATCH] =?UTF-8?q?feat(ai):=20B-1=20summary=20tier=20=EB=B6=84?= =?UTF-8?q?=ED=95=A0=20=E2=80=94=20triage(4B)=20+=20deep=5Fsummary(26B)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR-A policy 레이어를 재사용하여 classify_worker 에 tier triage 경로를 추가. Legacy ai_summary / ai_domain / ai_suggestion 은 유지 (회귀 0), tldr/bullets/ detail/inconsistencies 는 별도 필드로 분리. Migrations (156~160): - 156 documents: ai_tldr, ai_bullets, ai_detail_summary, ai_inconsistencies, ai_analysis_tier 5컬럼 - 157 process_stage 에 'deep_summary' ADD VALUE 단독 (Postgres 동일 트랜잭션 제약 회피) - 158 processing_queue.payload JSONB (envelope 전달) - 159 analyze_events 에 tier + suppressed_reason - 160 suppressed_reason partial index Models/ORM: - Document: 5컬럼 Mapped 추가 - ProcessingQueue: deep_summary enum 확장 + payload 필드, enqueue_stage 에 payload 옵션 - AnalyzeEvent: PR-A shadow 6컬럼 + PR-B tier/suppressed_reason Workers: - classify_worker: 기존 legacy 경로 뒤에 _run_tier_triage 추가. - _match_subject_domain(doc, text): source_channel + 본문 keywords + ai_domain prefix 로 PR-A policy 의 subject_domain 이름 결정 (category 매칭 금지). - R1 TriageOutput pydantic + JSON 깨짐 fallback (triage_json_invalid). - R2 _check_backlog_guard(): 30분 window ratio > threshold OR pending 초과면 soft escalate suppress. hard escalate 는 통과. - R3 _slice_text_ranges(): 260k 초과 시 head 120k + mid 20k + tail 120k 3조각. - escalate 시 EscalationEnvelope 구성 + {envelope, subject_domain} payload 로 deep_summary enqueue. - deep_summary_worker (신규): queue payload 에서 envelope + subject_domain 읽기 → render_26b("p3c_deep_summary", subject_domain) + MLX 호출 (llm_gate Semaphore(1) 경유) → ai_detail_summary + ai_inconsistencies 저장 + ai_analysis_tier='deep'. _filter_inconsistencies 로 허용 kind (version_drift / procedure_conflict / source_conflict / missing_basis) 만 통과 — 구매/계약 kind drop. - queue_consumer: workers dict 에 deep_summary 추가 + BATCH_SIZE=1. next_stages 는 건드리지 않음 — classify → embed/chunk 는 그대로, deep_summary 는 독립 체인. Telemetry: - record_analyze_event: subject_domain / risk_flags / escalation_reasons / confidence / policy_version / shadow_would_route_to / tier / escalated_to_26b / suppressed_reason 파라미터 확장. classify/deep worker 가 mode="summary_triage" 또는 "summary_deep" 로 기록. API: - DocumentResponse 에 ai_tldr / ai_bullets / ai_detail_summary / ai_inconsistencies / ai_analysis_tier 5필드 노출. Prompts: - classify.txt 에 DEPRECATED 주석만 추가 (파일 유지 — rollback 경로 보존). - PR-A 의 app/prompts/policy/p3a_short_summary.txt (4B) 와 p3c_deep_summary.txt (26B) 를 그대로 사용. 내 소유의 summary_triage.txt / summary_deep.txt 는 중복 이라 별도 커밋에서 제거하지 않고 바로 생성 전 삭제. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/api/documents.py | 6 + app/models/analyze_event.py | 20 +- app/models/document.py | 8 + app/models/queue.py | 23 +- app/prompts/classify.txt | 4 + app/services/document_telemetry.py | 24 ++ app/services/prompt_versions.py | 6 + app/workers/classify_worker.py | 436 ++++++++++++++++++-- app/workers/deep_summary_worker.py | 210 ++++++++++ app/workers/queue_consumer.py | 7 +- migrations/156_ai_analysis_cols.sql | 18 + migrations/157_queue_stage_deep_summary.sql | 13 + migrations/158_processing_queue_payload.sql | 13 + migrations/159_analyze_events_pr_b_tier.sql | 18 + migrations/160_analyze_events_pr_b_idx.sql | 7 + 15 files changed, 776 insertions(+), 37 deletions(-) create mode 100644 app/workers/deep_summary_worker.py create mode 100644 migrations/156_ai_analysis_cols.sql create mode 100644 migrations/157_queue_stage_deep_summary.sql create mode 100644 migrations/158_processing_queue_payload.sql create mode 100644 migrations/159_analyze_events_pr_b_tier.sql create mode 100644 migrations/160_analyze_events_pr_b_idx.sql diff --git a/app/api/documents.py b/app/api/documents.py index 89ec126..69c7615 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -98,6 +98,12 @@ class DocumentResponse(BaseModel): facet_doctype: str | None = None category: str | None = None ai_suggestion: dict | None = None + # PR-B B-1: summary_triage (4B) / summary_deep (26B) 분할 산출 + ai_tldr: str | None = None + ai_bullets: list | None = None + ai_detail_summary: str | None = None + ai_inconsistencies: list | None = None + ai_analysis_tier: str | None = None # 'triage' | 'deep' | null extracted_at: datetime | None ai_processed_at: datetime | None embedded_at: datetime | None diff --git a/app/models/analyze_event.py b/app/models/analyze_event.py index dbabbe7..c5c79fe 100644 --- a/app/models/analyze_event.py +++ b/app/models/analyze_event.py @@ -8,7 +8,7 @@ from datetime import datetime from typing import Any -from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, Text +from sqlalchemy import ARRAY, BigInteger, Boolean, DateTime, Float, ForeignKey, Integer, Text from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column @@ -25,7 +25,7 @@ class AnalyzeEvent(Base): user_id: Mapped[int | None] = mapped_column( BigInteger, ForeignKey("users.id", ondelete="SET NULL") ) - mode: Mapped[str] = mapped_column(Text, default="quick", nullable=False) # quick / full + mode: Mapped[str] = mapped_column(Text, default="quick", nullable=False) # quick / full / summary_triage / summary_deep / retrieval_select / synthesis text_limit: Mapped[int | None] = mapped_column(Integer) truncated: Mapped[bool] = mapped_column(Boolean, default=False) layers_returned: Mapped[list[Any] | None] = mapped_column(JSONB, default=list) @@ -40,3 +40,19 @@ class AnalyzeEvent(Base): created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now, nullable=False ) + + # PR-A (migration 153) — routing shadow observability + subject_domain: Mapped[str | None] = mapped_column(Text) + risk_flags: Mapped[list[str] | None] = mapped_column(ARRAY(Text)) + high_impact_task: Mapped[bool | None] = mapped_column(Boolean) + escalated_to_26b: Mapped[bool | None] = mapped_column(Boolean) + escalation_reasons: Mapped[list[str] | None] = mapped_column(ARRAY(Text)) + confidence: Mapped[float | None] = mapped_column(Float) + policy_violation: Mapped[bool | None] = mapped_column(Boolean) + policy_violation_ids: Mapped[list[str] | None] = mapped_column(ARRAY(Text)) + shadow_would_route_to: Mapped[str | None] = mapped_column(Text) + policy_version: Mapped[str | None] = mapped_column(Text) + + # PR-B (migration 159) — 실제 호출 tier 와 R2 backlog guard 이벤트 + tier: Mapped[str | None] = mapped_column(Text) # 'triage' | 'primary' | 'fallback' + suppressed_reason: Mapped[str | None] = mapped_column(Text) # 'backlog_guard(ratio=0.42,pending=7)' diff --git a/app/models/document.py b/app/models/document.py index af69454..e877f3e 100644 --- a/app/models/document.py +++ b/app/models/document.py @@ -115,6 +115,14 @@ class Document(Base): # /accept-suggestion 승인 시에만 category / user_tags 반영 (자동 전이 금지) ai_suggestion: Mapped[dict | None] = mapped_column(JSONB) + # PR-B B-1: summary_triage (4B, 상시) / summary_deep (26B, 에스컬레이션) 분할 산출 + ai_tldr: Mapped[str | None] = mapped_column(Text) # ≤60자 TL;DR + ai_bullets: Mapped[list | None] = mapped_column(JSONB) # 3~5개 핵심 bullets + ai_detail_summary: Mapped[str | None] = mapped_column(Text) # 26B 2~3문단 + ai_inconsistencies: Mapped[list | None] = mapped_column(JSONB) # [{kind, desc}] + # 'triage' | 'deep' | NULL — 현재 문서가 어느 tier 까지 분석 완료됐는지 + ai_analysis_tier: Mapped[str | None] = mapped_column(String(10)) + # 비디오 썸네일 (§3) — ffmpeg 50% 지점 1장. PKM/Videos/.thumbs/{id}.jpg 절대경로. thumbnail_path: Mapped[str | None] = mapped_column(Text) diff --git a/app/models/queue.py b/app/models/queue.py index 9fcd94d..a621d2b 100644 --- a/app/models/queue.py +++ b/app/models/queue.py @@ -3,7 +3,7 @@ from datetime import datetime from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text -from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, mapped_column @@ -17,10 +17,11 @@ class ProcessingQueue(Base): document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False) stage: Mapped[str] = mapped_column( # 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue. + # 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue. # DB enum 변경은 마이그레이션이 처리하므로 create_type=False. Enum( "extract", "classify", "summarize", "embed", "chunk", "preview", - "stt", "thumbnail", + "stt", "thumbnail", "deep_summary", name="process_stage", create_type=False, ), @@ -33,6 +34,8 @@ class ProcessingQueue(Base): attempts: Mapped[int] = mapped_column(SmallInteger, default=0) max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3) error_message: Mapped[str | None] = mapped_column(Text) + # B-1: deep_summary stage 가 EscalationEnvelope 를 payload 로 싣는다. 다른 stage 는 NULL. + payload: Mapped[dict | None] = mapped_column(JSONB) created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now ) @@ -43,16 +46,28 @@ class ProcessingQueue(Base): async def enqueue_stage( - session: AsyncSession, document_id: int, stage: str, *, status: str = "pending", + session: AsyncSession, + document_id: int, + stage: str, + *, + status: str = "pending", + payload: dict | None = None, ) -> bool: """ProcessingQueue에 행 추가 (DB 레벨 중복 방어). 같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면 아무것도 하지 않고 False 반환. + + B-1: payload 옵션으로 deep_summary 에 EscalationEnvelope JSON 을 실을 수 있다. + 같은 문서 deep_summary 가 재제안될 경우 on_conflict_do_nothing 으로 기존 payload + 유지 (최초 envelope 가 원본). 이후 재처리 시 재분석은 새 classify 가 트리거. """ + values: dict = {"document_id": document_id, "stage": stage, "status": status} + if payload is not None: + values["payload"] = payload stmt = ( pg_insert(ProcessingQueue) - .values(document_id=document_id, stage=stage, status=status) + .values(**values) .on_conflict_do_nothing( index_elements=["document_id", "stage"], index_where=text("status IN ('pending', 'processing')"), diff --git a/app/prompts/classify.txt b/app/prompts/classify.txt index 1745d6f..e06d074 100644 --- a/app/prompts/classify.txt +++ b/app/prompts/classify.txt @@ -1,3 +1,7 @@ +[DEPRECATED 2026-04-24] — summary_triage.txt 로 이관됨 (PR-B B-1 tier routing). +이 파일은 B-1 안정화 기간 동안 rollback 경로를 위해 유지. 신규 호출 경로는 +summary_triage.txt + summary_deep.txt 조합 사용. 실제 삭제는 별도 cleanup PR. + You are a document classification AI. Analyze the document below and respond ONLY in JSON format. No other text. ## Response Format diff --git a/app/services/document_telemetry.py b/app/services/document_telemetry.py index c789b2d..1e287ed 100644 --- a/app/services/document_telemetry.py +++ b/app/services/document_telemetry.py @@ -51,11 +51,25 @@ async def record_analyze_event( prompt_version: str | None, error_code: str | None, source: str, + # PR-A shadow observability — 아래 6개는 routing 이 동반될 때만 세팅, 그 외는 None 유지. + subject_domain: str | None = None, + risk_flags: list[str] | None = None, + high_impact_task: bool | None = None, + escalation_reasons: list[str] | None = None, + confidence: float | None = None, + policy_version: str | None = None, + shadow_would_route_to: str | None = None, + # PR-B B-1 — 실제 호출 tier 와 R2 backlog guard + tier: str | None = None, + escalated_to_26b: bool | None = None, + suppressed_reason: str | None = None, ) -> None: """analyze_events INSERT. background task에서 호출 — 에러 삼킴. layers_returned: 성공 시 ["evidence","summary"] 등 layer 문자열 리스트. 실패 시 []. error_code: None (성공) | "timeout" | "llm" | "parse" | "missing_summary" | "no_text" | "not_found" + tier: 'triage' | 'primary' | 'fallback' — 실제 호출된 tier (PR-B B-0~B-2). + suppressed_reason: R2 backlog guard 로 soft escalate 가 suppress 된 경우의 이유 문자열. """ try: async with async_session() as session: @@ -72,6 +86,16 @@ async def record_analyze_event( prompt_version=prompt_version, error_code=error_code, source=source, + subject_domain=subject_domain, + risk_flags=risk_flags, + high_impact_task=high_impact_task, + escalated_to_26b=escalated_to_26b, + escalation_reasons=escalation_reasons, + confidence=confidence, + policy_version=policy_version, + shadow_would_route_to=shadow_would_route_to, + tier=tier, + suppressed_reason=suppressed_reason, ) session.add(row) await session.commit() diff --git a/app/services/prompt_versions.py b/app/services/prompt_versions.py index 59d76dd..3c15ff7 100644 --- a/app/services/prompt_versions.py +++ b/app/services/prompt_versions.py @@ -23,6 +23,12 @@ ASK_PROMPT_VERSION: str = "search_synthesis.v2-600char" # documents.py analyze 라우트가 로드하는 app/prompts/document_analyze.txt 기준 ANALYZE_PROMPT_VERSION: str = "document_analyze.v1" +# ─── PR-B B-1: summary tier 분할 task 이름 ───────────────────── +# classify_worker / deep_summary_worker 가 PR-A 정책 템플릿 + policy_version 해시 +# 조합으로 analyze_events.prompt_version 을 기록한다. (예: "p3a_short_summary@abc123") +SUMMARY_TRIAGE_TASK: str = "p3a_short_summary" # 4B gemma Ollama +SUMMARY_DEEP_TASK: str = "p3c_deep_summary" # 26B MLX + def resolve_primary_model() -> str | None: """런타임 config에서 primary 모델명을 resolve. diff --git a/app/workers/classify_worker.py b/app/workers/classify_worker.py index cce5cd7..bb28943 100644 --- a/app/workers/classify_worker.py +++ b/app/workers/classify_worker.py @@ -1,17 +1,48 @@ -"""AI 분류 워커 — taxonomy 기반 도메인/문서타입/태그/요약 생성""" +"""AI 분류 워커 — 도메인/문서타입/태그/요약 생성 + PR-B B-1 tier triage. +Legacy 경로 (primary 26B 호출): + - app/prompts/classify.txt 기반 classify() + - AIClient.summarize() 로 ai_summary 생성 + - facet_doctype + ai_suggestion (library 제안) 저장 + → ai_domain / ai_sub_group / document_type / ai_confidence / ai_tags / + ai_summary / ai_suggestion / facet_doctype / importance 필드 + +PR-B B-1 tier triage (신규, 4B gemma Ollama): + - policy.routing.decide_routing 으로 RoutingDecision + - policy.prompt_render.render_4b("p3a_short_summary", subject_domain) 로 프롬프트 렌더 + - AIClient.call_triage(rendered) 호출 (llm_gate 외부, Ollama concurrent OK) + - TriageOutput pydantic validate + JSON 깨짐 시 fallback escalate (R1) + - R2 backlog guard: deep_summary 큐 ratio > threshold or pending >= threshold 이면 suppress + - R3 head/middle/tail: 260k 초과 시 envelope text_ranges 3조각 + → ai_tldr / ai_bullets / ai_analysis_tier='triage' 저장 + analyze_events 기록 + → escalate 시 EscalationEnvelope payload 로 deep_summary 큐 enqueue +""" + +from __future__ import annotations + +import json +import time from datetime import datetime, timezone +from pydantic import BaseModel, Field, ValidationError +from sqlalchemy import text as sql_text from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient, parse_json_response, strip_thinking +from ai.envelope import EscalationEnvelope from core.config import settings from core.utils import setup_logger from models.document import Document +from models.queue import enqueue_stage +from policy.prompt_render import render_4b, policy_version as compute_policy_version +from policy.routing import decide_routing +from services.document_telemetry import record_analyze_event logger = setup_logger("classify_worker") -MAX_CLASSIFY_TEXT = 8000 +MAX_CLASSIFY_TEXT = 8000 # legacy classify 입력 상한 +TRIAGE_TEXT_LIMIT = 120_000 # p3a summary_triage 입력 상한 (4B) +DEEP_PRIMARY_LIMIT = 260_000 # primary context_char_limit (envelope slicing) # settings에서 taxonomy/document_types 로딩 DOCUMENT_TYPES = set(settings.document_types) @@ -22,6 +53,33 @@ FACET_DOCTYPES = {"발주서", "세금계산서", "명세표", "도면", "증명 # 자료실 자동 분류 제안 대상 (거래 하위) LIBRARY_SUGGESTION_DOCTYPES = {"발주서", "세금계산서", "명세표"} +# PR-B prompt_version task 이름 +SUMMARY_TRIAGE_TASK = "p3a_short_summary" + +# R2 — hard escalate 사유 (suppress 되지 않는 critical reason) +HARD_ESCALATE_REASONS = {"long_context", "triage_json_invalid", "low_confidence"} + + +# ───────────────────────── TriageOutput (R1) ───────────────────────── + +class TriageOutput(BaseModel): + """p3a_short_summary (4B) 응답 스키마. 파싱 실패 시 기본값 + escalate=True fallback.""" + + tldr: str = "" + bullets: list[str] = Field(default_factory=list) + tags: list[str] = Field(default_factory=list) + doc_type: str = "" + time_scope: str | None = None + confidence: float = 0.3 + high_impact_self_declared: bool = False + high_impact_reason: str | None = None + recommend_deep_summary: bool = False + recommend_entity_pass: bool = False + escalate_to_26b: bool = False + risk_flags: list[str] = Field(default_factory=list) + + +# ───────────────────────── legacy classify (primary) ────────────────── def _get_taxonomy_leaf_paths(taxonomy: dict, prefix: str = "") -> set[str]: """taxonomy dict에서 모든 유효한 경로를 추출""" @@ -52,45 +110,165 @@ def _validate_domain(domain: str) -> str: """domain이 taxonomy에 존재하는지 검증, 없으면 최대한 가까운 경로 찾기""" if domain in VALID_DOMAIN_PATHS: return domain - - # 부분 매칭 시도 (2단계까지) parts = domain.split("/") for i in range(len(parts), 0, -1): partial = "/".join(parts[:i]) if partial in VALID_DOMAIN_PATHS: logger.warning(f"[분류] domain '{domain}' → '{partial}' (부분 매칭)") return partial - logger.warning(f"[분류] domain '{domain}' taxonomy에 없음, General/Reading_Notes로 대체") return "General/Reading_Notes" +# ───────────────────────── B-1 subject_domain 매칭 ───────────────────── + +def _match_subject_domain(doc: Document, text: str) -> str: + """본문/메타 기반으로 domain_policy.yaml 의 subject_domain 이름 결정. + + 매칭 우선순위 (feedback_category_vs_ai_domain_axis.md — category 매칭 금지): + 1. source_channel (news / law_monitor / memo) + 2. 본문 keywords (첫 8k, 대소문자 무시) + 3. ai_domain prefix — legacy classify 가 채운 taxonomy path + 4. generic (fallback) + """ + sc = (doc.source_channel or "").lower() + if sc in ("news", "news_collector"): + return "news_item" + if sc == "law_monitor": + return "safety_reference" + if sc == "memo": + return "generic" + + head = (text or "")[:8000].lower() + + def _hit(keywords: list[str]) -> bool: + return any(kw.lower() in head for kw in keywords if kw) + + # 구체 먼저 + if _hit(["MSDS", "SDS", "물질안전보건자료", "화학물질", "유해화학물질"]): + return "msds" + if _hit(["사고보고", "재해조사", "원인분석", "재발방지", "중대재해"]): + return "incident_report" + if _hit(["건강검진", "작업환경측정", "보건관리", "특수건강진단"]): + return "health_record" + if _hit(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "중독", "질식"]): + return "hazard_specific" + if _hit(["위험성평가", "작업허가서", "JSA", "안전작업지침", "보호구", "SOP"]): + return "safety_operational" + if _hit(["산업안전보건법", "중대재해", "안전보건관리체계", "유해위험방지계획서"]): + return "safety_reference" + + if (doc.ai_domain or "").startswith("Industrial_Safety"): + return "safety_reference" + + return "generic" + + +def _slice_text_ranges(doc_id: int, total_len: int) -> dict: + """R3 — 26B 에 전달할 원문 슬라이스. total_len ≤ primary limit 이면 단일 range, 초과 시 head/mid/tail 3조각.""" + if total_len <= DEEP_PRIMARY_LIMIT: + return { + "doc_ids": [doc_id], + "text_ranges": [{"doc_id": doc_id, "start": 0, "end": total_len}], + } + head_end = 120_000 + mid_start = max(head_end, total_len // 2 - 10_000) + mid_end = mid_start + 20_000 + tail_start = max(mid_end, total_len - 120_000) + return { + "doc_ids": [doc_id], + "text_ranges": [ + {"doc_id": doc_id, "start": 0, "end": head_end}, + {"doc_id": doc_id, "start": mid_start, "end": mid_end}, + {"doc_id": doc_id, "start": tail_start, "end": total_len}, + ], + } + + +async def _check_backlog_guard(session: AsyncSession) -> str | None: + """R2 — deep_summary 큐 적체 시 soft escalate 억제. 억제 사유 문자열 반환, 아니면 None.""" + backlog = settings.ai.deep_summary_backlog + window = backlog.window_minutes + ratio_threshold = backlog.ratio_threshold + pending_threshold = backlog.pending_threshold + + row = (await session.execute(sql_text(""" + SELECT + COUNT(*) FILTER (WHERE stage = 'classify') AS classify_n, + COUNT(*) FILTER (WHERE stage = 'deep_summary') AS deep_n + FROM processing_queue + WHERE created_at > NOW() - make_interval(mins => :w) + """), {"w": window})).one() + classify_n = row.classify_n or 0 + deep_n = row.deep_n or 0 + ratio = (deep_n / classify_n) if classify_n > 0 else 0.0 + + pending_deep = int(await session.scalar(sql_text(""" + SELECT COUNT(*) FROM processing_queue + WHERE stage = 'deep_summary' AND status IN ('pending', 'processing') + """)) or 0) + + if ratio > ratio_threshold or pending_deep > pending_threshold: + return f"backlog_guard(ratio={ratio:.2f},pending={pending_deep})" + return None + + +def _classify_escalation_reason( + triage_out: TriageOutput, + input_chars: int, + triage_limit: int, + confidence_floor: float, +) -> str | None: + """escalate_to_26b 가 True 이도록 만든 근본 사유 하나를 반환. 아니면 None.""" + if input_chars > triage_limit: + return "long_context" + if triage_out.confidence < confidence_floor: + return "low_confidence" + if triage_out.escalate_to_26b: + return "self_declare" + if triage_out.recommend_deep_summary: + return "deep_requested" + return None + + +def _is_hard_escalate(reason: str | None) -> bool: + """R2 — hard escalate 는 suppress 되지 않는다.""" + return reason in HARD_ESCALATE_REASONS + + +def _distill(triage_out: TriageOutput, limit: int = 2000) -> str: + """26B envelope.distilled_context 용 4B 출력 압축 텍스트.""" + parts: list[str] = [] + if triage_out.tldr: + parts.append(f"TL;DR: {triage_out.tldr}") + if triage_out.bullets: + parts.append("핵심:") + parts.extend(f"- {b}" for b in triage_out.bullets) + if triage_out.doc_type: + parts.append(f"doc_type: {triage_out.doc_type}") + if triage_out.tags: + parts.append(f"tags: {', '.join(triage_out.tags)}") + return "\n".join(parts)[:limit] + + +# ───────────────────────── main process ──────────────────────────────── + async def process(document_id: int, session: AsyncSession) -> None: - """문서 AI 분류 + 요약""" + """문서 분류 + 요약 + tier triage. + + 1) Legacy: classify() → ai_domain/document_type/ai_tags/ai_confidence/ai_suggestion + 2) Legacy: summarize() → ai_summary + 3) PR-B B-1: summary_triage (4B) → ai_tldr/ai_bullets/ai_analysis_tier='triage' + """ doc = await session.get(Document, document_id) if not doc: raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") - - # 법령은 구조 고정 + 외부 source of truth (law.go.kr) + 자동 재수집. - # AI 분류 skip, downstream(embed/chunk) 은 queue_consumer NEXT_STAGES 가 자동 chain. - # ai_domain 단일 "법령" — PR-A policy.domain_policy.yaml 에서 source_channel 기준 세분화. - if doc.source_channel == "law_monitor": - if not doc.ai_domain: - doc.ai_domain = "법령" - if not doc.ai_tags: - doc.ai_tags = ["법령"] - if not doc.importance: - doc.importance = "medium" - await session.commit() - logger.info(f"doc {document_id}: law_monitor → classify skip") - return - if not doc.extracted_text: raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음") client = AIClient() try: - # ─── 분류 ─── + # ─── 1. Legacy classify (primary 26B) ─── truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT] raw_response = await client.classify(truncated) parsed = parse_json_response(raw_response) @@ -134,16 +312,12 @@ async def process(document_id: int, session: AsyncSession) -> None: doc.doc_purpose = purpose # ─── facet_doctype 식별 (§1 실무 문서 유형 신호) ─── - # AI 식별값이 허용 enum 이면 facet_doctype 저장. 기존 값이 있으면 덮어쓰지 않음 - # (수동 수정 / Phase 2 facet 우선). document.category / user_tags 는 **건드리지 않음**. ai_doctype_raw = parsed.get("facet_doctype") ai_doctype = ai_doctype_raw if ai_doctype_raw in FACET_DOCTYPES else None if ai_doctype and not doc.facet_doctype: doc.facet_doctype = ai_doctype # ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ─── - # 발주서/세금계산서/명세표 → 자료실 '거래' 분류 제안. 자동 전이 금지. - # /accept-suggestion 승인 UI 에서만 실제 category='library' + @library/... 부여. if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES: year = doc.facet_year or datetime.now(timezone.utc).year doc.ai_suggestion = { @@ -157,12 +331,12 @@ async def process(document_id: int, session: AsyncSession) -> None: "reason": "classify pipeline", } - # ─── 요약 ─── + # ─── 2. Legacy 요약 (primary 26B) ─── summary = await client.summarize(doc.extracted_text[:50000]) doc.ai_summary = strip_thinking(summary) - # ─── 메타데이터 ─── - doc.ai_model_version = "qwen3.5-35b-a3b" + # ─── 메타데이터 (legacy 완료) ─── + doc.ai_model_version = settings.ai.primary.model doc.ai_processed_at = datetime.now(timezone.utc) logger.info( @@ -171,7 +345,209 @@ async def process(document_id: int, session: AsyncSession) -> None: f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}" ) + # ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ─── + try: + await _run_tier_triage(client, doc, session) + except Exception as exc: + logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}") + finally: await client.close() - # _move_to_knowledge 제거됨 — 파일은 원본 위치 유지, 분류는 DB 메타데이터만 + +async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None: + """summary_triage (p3a_short_summary) 경로.""" + document_id = doc.id + text = doc.extracted_text or "" + input_chars = len(text) + subject_domain = _match_subject_domain(doc, text) + triage_start = time.perf_counter() + parse_error: str | None = None + triage_out = TriageOutput() + + # 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate + if input_chars > TRIAGE_TEXT_LIMIT: + await _apply_triage_result( + doc=doc, + session=session, + triage_out=triage_out, + subject_domain=subject_domain, + input_chars=input_chars, + latency_ms=0, + escalation_reason="long_context", + parse_error=None, + ) + return + + try: + rendered = render_4b(SUMMARY_TRIAGE_TASK, subject_domain) + except Exception as exc: + logger.exception(f"[triage] render_4b 실패 subject={subject_domain}: {exc}") + return + + # 템플릿의 {extracted_text} 는 단일중괄호로 format 후 남아있음. str.replace 로 주입. + prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT]) + + try: + raw_triage = await client.call_triage(prompt) + except Exception as exc: + logger.warning(f"[triage] 4B 호출 실패 id={document_id}: {exc}") + parse_error = "call_failed" + raw_triage = "" + + latency_ms = int((time.perf_counter() - triage_start) * 1000) + + # R1 — JSON parsing + schema validation + fallback + if raw_triage: + try: + parsed_triage = parse_json_response(raw_triage) or {} + triage_out = TriageOutput.model_validate(parsed_triage) + except (ValidationError, ValueError, TypeError) as exc: + parse_error = f"parse:{type(exc).__name__}" + logger.warning(f"[triage] JSON 파싱/검증 실패 → fallback: {exc}") + + if parse_error: + triage_out = triage_out.model_copy(update={ + "escalate_to_26b": True, + "confidence": 0.3, + }) + escalation_reason: str | None = "triage_json_invalid" + else: + escalation_reason = _classify_escalation_reason( + triage_out, + input_chars=input_chars, + triage_limit=TRIAGE_TEXT_LIMIT, + confidence_floor=0.6, + ) + + await _apply_triage_result( + doc=doc, + session=session, + triage_out=triage_out, + subject_domain=subject_domain, + input_chars=input_chars, + latency_ms=latency_ms, + escalation_reason=escalation_reason, + parse_error=parse_error, + ) + + +async def _apply_triage_result( + *, + doc: Document, + session: AsyncSession, + triage_out: TriageOutput, + subject_domain: str, + input_chars: int, + latency_ms: int, + escalation_reason: str | None, + parse_error: str | None, +) -> None: + """TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit.""" + document_id = doc.id + + # Document 필드 (파싱 실패 시에는 legacy ai_summary 가 노출되도록 tldr 비움) + if not parse_error: + doc.ai_tldr = (triage_out.tldr or "").strip() or None + doc.ai_bullets = triage_out.bullets or [] + doc.ai_analysis_tier = "triage" + + # decide_routing — shadow observability 용 (INV 체크) + routing_decision = None + try: + routing_decision = decide_routing( + subject_domain=subject_domain, + content_chars=input_chars, + deterministic_keyword_hits=(), + self_declared_high_impact=triage_out.high_impact_self_declared, + self_declared_risk_flags=tuple(triage_out.risk_flags), + confidence=triage_out.confidence, + evidence_doc_count=0, + ) + except Exception as exc: + logger.warning(f"[triage] decide_routing 실패 id={document_id}: {exc}") + + # R2 — backlog guard (hard 제외 soft escalate 만 억제) + suppressed_reason: str | None = None + escalate = escalation_reason is not None + if escalate and not _is_hard_escalate(escalation_reason): + suppressed_reason = await _check_backlog_guard(session) + if suppressed_reason: + escalate = False + escalation_reason = None + + # enqueue deep_summary + if escalate: + try: + envelope = EscalationEnvelope( + from_stage="summary_triage", + escalation_reasons=tuple([escalation_reason] if escalation_reason else []), + risk_flags=tuple(routing_decision.risk_flags) if routing_decision + else tuple(triage_out.risk_flags), + distilled_context=_distill(triage_out), + original_pointers=_slice_text_ranges(document_id, input_chars), + synthesis_directives=( + tuple(routing_decision.synthesis_directives) if routing_decision else () + ), + user_intent=None, + draft_hint=(triage_out.tldr or None), + ) + await enqueue_stage( + session, + document_id, + "deep_summary", + payload={ + "envelope": json.loads(envelope.to_json()), + "subject_domain": subject_domain, + }, + ) + except Exception as exc: + logger.exception(f"[triage] envelope enqueue 실패 id={document_id}: {exc}") + + # analyze_events + try: + pv = compute_policy_version(SUMMARY_TRIAGE_TASK) + except Exception: + pv = None + + escalation_reasons_list = ( + list(routing_decision.escalation_reasons) if routing_decision + else ([escalation_reason] if escalation_reason else []) + ) + shadow_target = ( + "primary" if (routing_decision and routing_decision.escalate_to_26b) else "triage" + ) + + await record_analyze_event( + doc_id=document_id, + user_id=None, + mode="summary_triage", + text_limit=TRIAGE_TEXT_LIMIT, + truncated=input_chars > TRIAGE_TEXT_LIMIT, + layers_returned=["tldr", "bullets"] if not parse_error else [], + cached=False, + latency_ms=latency_ms, + model_name=settings.ai.triage.model, + prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK), + error_code=parse_error, + source="document_server", + subject_domain=subject_domain, + risk_flags=(list(routing_decision.risk_flags) if routing_decision + else list(triage_out.risk_flags)), + high_impact_task=(routing_decision.high_impact_task if routing_decision else None), + escalation_reasons=escalation_reasons_list, + confidence=triage_out.confidence, + policy_version=pv, + shadow_would_route_to=shadow_target, + tier="triage", + escalated_to_26b=escalate, + suppressed_reason=suppressed_reason, + ) + + logger.info( + f"[triage] id={document_id} subject={subject_domain} " + f"escalate={escalate} reason={escalation_reason} " + f"suppressed={bool(suppressed_reason)} " + f"confidence={triage_out.confidence:.2f} " + f"tldr_len={len(doc.ai_tldr or '')} bullets={len(doc.ai_bullets or [])}" + ) diff --git a/app/workers/deep_summary_worker.py b/app/workers/deep_summary_worker.py new file mode 100644 index 0000000..c644609 --- /dev/null +++ b/app/workers/deep_summary_worker.py @@ -0,0 +1,210 @@ +"""PR-B B-1 Deep Summary 워커 — 26B (primary MLX) 에스컬레이션 분석. + +큐 stage 'deep_summary' 에서 pickup. classify_worker 가 enqueue 시 payload 로 실은 +EscalationEnvelope + subject_domain 을 읽어, PR-A policy 템플릿 `p3c_deep_summary` 를 +렌더링한 뒤 26B primary 를 호출한다. llm_gate Semaphore(1) 경유 — MLX 단일 인퍼런스 보호. + +출력을 documents.ai_detail_summary / ai_inconsistencies 에 저장하고 ai_analysis_tier 를 +'deep' 으로 전이. 실패는 무해하게 legacy 결과 보존. +""" + +from __future__ import annotations + +import json +import time +from datetime import datetime, timezone + +from pydantic import BaseModel, Field, ValidationError +from sqlalchemy import desc, select +from sqlalchemy.ext.asyncio import AsyncSession + +from ai.client import AIClient, parse_json_response +from ai.envelope import EscalationEnvelope +from core.config import settings +from core.utils import setup_logger +from models.document import Document +from models.queue import ProcessingQueue +from policy.prompt_render import render_26b, policy_version as compute_policy_version +from services.document_telemetry import record_analyze_event +from services.search.llm_gate import get_mlx_gate + +logger = setup_logger("deep_summary_worker") + +DEEP_SUMMARY_TASK = "p3c_deep_summary" + +# inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외) +ALLOWED_INCONSISTENCY_KINDS = { + "version_drift", "procedure_conflict", "source_conflict", "missing_basis", +} + + +class DeepSummaryOutput(BaseModel): + """p3c_deep_summary (26B) 응답 스키마. 파싱 실패 시 기본값 + skip.""" + + mode: str = "single" + tldr: str = "" + bullets: list[str] = Field(default_factory=list) + detail: str = "" # 26B 가 채우는 상세 (detail_summary 로 저장) + bundle_flow: list[str] | None = None + inconsistencies: list[dict] | None = None + entities_confirmed: dict | None = None + directives_applied: list[str] | None = None + confidence: float = 0.5 + + +async def process(document_id: int, session: AsyncSession) -> None: + """deep_summary 큐 pickup → 26B 호출 → 필드 저장.""" + doc = await session.get(Document, document_id) + if not doc: + raise ValueError(f"deep_summary: document id={document_id} 없음") + + # 최신 deep_summary 큐 행의 payload 조회 (queue_consumer 가 status='processing' 으로 세팅한 상태) + queue_row = (await session.execute( + select(ProcessingQueue) + .where( + ProcessingQueue.document_id == document_id, + ProcessingQueue.stage == "deep_summary", + ProcessingQueue.status == "processing", + ) + .order_by(desc(ProcessingQueue.id)) + .limit(1) + )).scalar_one_or_none() + if not queue_row: + logger.warning(f"[deep] processing 상태의 deep_summary row 없음 id={document_id}") + return + + payload = queue_row.payload or {} + envelope_raw = payload.get("envelope") + subject_domain = payload.get("subject_domain") or "generic" + if not envelope_raw: + logger.error(f"[deep] envelope 없음 id={document_id} payload_keys={list(payload.keys())}") + raise ValueError("deep_summary payload 에 envelope 없음") + + envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw)) + + # 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반) + slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers) + + # PR-A 템플릿 렌더 + try: + rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain) + except Exception as exc: + logger.exception(f"[deep] render_26b 실패 subject={subject_domain}: {exc}") + raise + + prompt = ( + rendered + .replace("{escalation_envelope_json}", envelope.to_system_injection()) + .replace("{original_text_slices}", slices) + ) + + client = AIClient() + latency_ms = 0 + parse_error: str | None = None + deep_out = DeepSummaryOutput() + + try: + start = time.perf_counter() + async with get_mlx_gate(): # primary(26B) 보호 Semaphore(1) + raw = await client.call_primary(prompt) + latency_ms = int((time.perf_counter() - start) * 1000) + except Exception as exc: + logger.warning(f"[deep] 26B 호출 실패 id={document_id}: {exc}") + parse_error = "call_failed" + raw = "" + finally: + await client.close() + + if raw: + try: + parsed = parse_json_response(raw) or {} + deep_out = DeepSummaryOutput.model_validate(parsed) + except (ValidationError, ValueError, TypeError) as exc: + parse_error = f"parse:{type(exc).__name__}" + logger.warning(f"[deep] JSON 파싱/검증 실패 id={document_id}: {exc}") + + if not parse_error: + doc.ai_detail_summary = (deep_out.detail or "").strip() or None + doc.ai_inconsistencies = _filter_inconsistencies(deep_out.inconsistencies or []) + doc.ai_analysis_tier = "deep" + doc.ai_processed_at = datetime.now(timezone.utc) + + try: + pv = compute_policy_version(DEEP_SUMMARY_TASK) + except Exception: + pv = None + + await record_analyze_event( + doc_id=document_id, + user_id=None, + mode="summary_deep", + text_limit=settings.ai.primary.context_char_limit or 260000, + truncated=False, + layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [], + cached=False, + latency_ms=latency_ms, + model_name=settings.ai.primary.model, + prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK), + error_code=parse_error, + source="document_server", + subject_domain=subject_domain, + risk_flags=list(envelope.risk_flags), + high_impact_task=None, + escalation_reasons=list(envelope.escalation_reasons), + confidence=deep_out.confidence, + policy_version=pv, + shadow_would_route_to="primary", + tier="primary", + escalated_to_26b=True, + suppressed_reason=None, + ) + + logger.info( + f"[deep] id={document_id} subject={subject_domain} " + f"detail_len={len(doc.ai_detail_summary or '')} " + f"inc={len(doc.ai_inconsistencies or [])} latency_ms={latency_ms} " + f"parse_error={parse_error}" + ) + + +def _build_text_slices(text: str, pointers: dict) -> str: + """original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다. + + 3조각 이상이면 각 조각 앞에 [slice N — head/middle/tail] 라벨을 붙여 26B 가 순서 인지. + 단일 슬라이스면 라벨 없이 원문 그대로. + """ + ranges = (pointers or {}).get("text_ranges") or [] + if not ranges: + return text[:260_000] + + if len(ranges) == 1: + r = ranges[0] + return text[r.get("start", 0):r.get("end", len(text))] + + labels = ["head", "middle", "tail"] + parts: list[str] = [] + for idx, r in enumerate(ranges): + label = labels[idx] if idx < len(labels) else f"slice{idx}" + chunk = text[r.get("start", 0):r.get("end", len(text))] + parts.append(f"[slice {idx} — {label}]\n{chunk}") + return "\n\n".join(parts) + + +def _filter_inconsistencies(items: list) -> list[dict]: + """허용 kind 목록 (safety/news 도메인 한정) 만 통과시킨다. + + `amount_mismatch` 같은 구매/계약 kind 는 여기서 drop (feedback_document_server_domain_scope.md). + 구조 오류 (kind/desc 누락) 도 drop. + """ + out: list[dict] = [] + for it in items or []: + if not isinstance(it, dict): + continue + kind = str(it.get("kind") or "") + desc_ = str(it.get("desc") or "").strip() + if kind not in ALLOWED_INCONSISTENCY_KINDS: + continue + if not desc_: + continue + out.append({"kind": kind, "desc": desc_}) + return out diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index f205811..0e3c689 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -14,8 +14,9 @@ logger = setup_logger("queue_consumer") # stage별 배치 크기 # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. +# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1. BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, - "preview": 2, "stt": 1, "thumbnail": 3} + "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1} STALE_THRESHOLD_MINUTES = 10 @@ -122,6 +123,7 @@ async def consume_queue(): """큐에서 pending 항목을 가져와 stage별 워커 실행""" from workers.classify_worker import process as classify_process from workers.chunk_worker import process as chunk_process + from workers.deep_summary_worker import process as deep_summary_process from workers.embed_worker import process as embed_process from workers.extract_worker import process as extract_process from workers.preview_worker import process as preview_process @@ -138,6 +140,9 @@ async def consume_queue(): "preview": preview_process, "stt": stt_process, "thumbnail": thumbnail_process, + # PR-B B-1: classify 가 에스컬레이션 판단 시 enqueue → 26B 가 detail_summary 작성. + # next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립). + "deep_summary": deep_summary_process, } try: diff --git a/migrations/156_ai_analysis_cols.sql b/migrations/156_ai_analysis_cols.sql new file mode 100644 index 0000000..e1557db --- /dev/null +++ b/migrations/156_ai_analysis_cols.sql @@ -0,0 +1,18 @@ +-- 156_ai_analysis_cols.sql +-- PR-B B-1: documents 에 4B triage / 26B deep 분할 컬럼 추가. +-- plan: ~/.claude/plans/swirling-swimming-liskov.md +-- +-- ai_tldr / ai_bullets → summary_triage 산출 (4B, 상시) +-- ai_detail_summary / ai_inconsistencies → summary_deep 산출 (26B, escalation) +-- ai_analysis_tier → 'triage' | 'deep' — 현재 문서가 어느 tier 까지 처리됐는지 +-- ai_summary (legacy TEXT) → ai_tldr 을 복제하여 backward compat +-- +-- 주의: _run_migrations 는 asyncpg exec_driver_sql 로 단일 prepared statement — ALTER TABLE +-- 의 다중 ADD COLUMN 절은 단일 statement 라 허용. enum ADD VALUE 는 별도 (157). + +ALTER TABLE documents + ADD COLUMN IF NOT EXISTS ai_tldr TEXT, + ADD COLUMN IF NOT EXISTS ai_bullets JSONB, + ADD COLUMN IF NOT EXISTS ai_detail_summary TEXT, + ADD COLUMN IF NOT EXISTS ai_inconsistencies JSONB, + ADD COLUMN IF NOT EXISTS ai_analysis_tier TEXT; diff --git a/migrations/157_queue_stage_deep_summary.sql b/migrations/157_queue_stage_deep_summary.sql new file mode 100644 index 0000000..ef08d54 --- /dev/null +++ b/migrations/157_queue_stage_deep_summary.sql @@ -0,0 +1,13 @@ +-- 157_queue_stage_deep_summary.sql +-- PR-B B-1: processing_queue.stage enum 에 'deep_summary' 추가. +-- plan: ~/.claude/plans/swirling-swimming-liskov.md §B-1 +-- +-- PostgreSQL 제약: ALTER TYPE ADD VALUE 는 새 값을 같은 트랜잭션 안에서 사용할 수 없다. +-- (unsafe use of new value of enum type) +-- 따라서 이 migration 은 enum 확장만 단독으로 처리하고, processing_queue.payload 컬럼 +-- 추가는 158 로 분리. classify_worker 가 deep_summary 를 enqueue 하는 시점엔 157 이 +-- 이미 별도 트랜잭션으로 commit 돼 있다. +-- +-- audio/video 파이프와 동일 패턴 (147~151 참조). + +ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'deep_summary'; diff --git a/migrations/158_processing_queue_payload.sql b/migrations/158_processing_queue_payload.sql new file mode 100644 index 0000000..6bc6004 --- /dev/null +++ b/migrations/158_processing_queue_payload.sql @@ -0,0 +1,13 @@ +-- 158_processing_queue_payload.sql +-- PR-B B-1: processing_queue 에 payload JSONB 컬럼 추가. +-- plan: ~/.claude/plans/swirling-swimming-liskov.md §B-1 +-- +-- deep_summary stage 가 EscalationEnvelope (from_stage / escalation_reasons / +-- risk_flags / distilled_context / original_pointers / synthesis_directives / ...) +-- 를 payload 로 싣고 전달. 다른 stage 는 NULL. +-- +-- 157 의 enum ADD VALUE 가 별도 트랜잭션으로 commit 된 뒤에만 실행돼야 한다. +-- (asyncpg migrations 러너가 파일 순서대로 단일-statement 실행하므로 안전.) + +ALTER TABLE processing_queue + ADD COLUMN IF NOT EXISTS payload JSONB; diff --git a/migrations/159_analyze_events_pr_b_tier.sql b/migrations/159_analyze_events_pr_b_tier.sql new file mode 100644 index 0000000..0c4f804 --- /dev/null +++ b/migrations/159_analyze_events_pr_b_tier.sql @@ -0,0 +1,18 @@ +-- 159_analyze_events_pr_b_tier.sql +-- PR-B B-1: analyze_events 에 실제 호출 tier + suppressed_reason 추가. +-- plan: ~/.claude/plans/swirling-swimming-liskov.md §B-1 R2 +-- +-- PR-A (153) 이 shadow_would_route_to 로 "정책이 제안한 tier" 는 이미 기록. +-- 여기서는 PR-B 가 실제로 호출한 tier 와, R2 backlog guard 로 suppress 된 이유를 추가. +-- +-- tier : 'triage' | 'primary' | 'fallback' — classify_worker / deep_summary_worker / +-- evidence_service / synthesis_service 가 실제 호출 시 기록. +-- suppressed_reason : 'backlog_guard(ratio=0.42,pending=7)' 등. escalation 판정이 soft 였으나 +-- backlog guard 로 suppress 된 건을 사후 디버깅 하기 위해 별도 컬럼화. +-- dashboard "Backlog Suppression (24h)" 카드의 소스. +-- +-- 단일 statement (ALTER TABLE 다중 ADD COLUMN 은 허용). 인덱스는 160 으로 분리. + +ALTER TABLE analyze_events + ADD COLUMN IF NOT EXISTS tier TEXT, + ADD COLUMN IF NOT EXISTS suppressed_reason TEXT; diff --git a/migrations/160_analyze_events_pr_b_idx.sql b/migrations/160_analyze_events_pr_b_idx.sql new file mode 100644 index 0000000..72bfcd6 --- /dev/null +++ b/migrations/160_analyze_events_pr_b_idx.sql @@ -0,0 +1,7 @@ +-- 160_analyze_events_pr_b_idx.sql +-- PR-B B-1: suppressed_reason partial index (backlog guard 발생 조회용). +-- plan: ~/.claude/plans/swirling-swimming-liskov.md §B-1 R2 + +CREATE INDEX IF NOT EXISTS idx_analyze_events_suppressed + ON analyze_events (created_at) + WHERE suppressed_reason IS NOT NULL;