feat(ai): B-1 summary tier 분할 — triage(4B) + deep_summary(26B)

PR-A policy 레이어를 재사용하여 classify_worker 에 tier triage 경로를 추가.
Legacy ai_summary / ai_domain / ai_suggestion 은 유지 (회귀 0), tldr/bullets/
detail/inconsistencies 는 별도 필드로 분리.

Migrations (156~160):
- 156 documents: ai_tldr, ai_bullets, ai_detail_summary, ai_inconsistencies,
  ai_analysis_tier 5컬럼
- 157 process_stage 에 'deep_summary' ADD VALUE 단독 (Postgres 동일 트랜잭션
  제약 회피)
- 158 processing_queue.payload JSONB (envelope 전달)
- 159 analyze_events 에 tier + suppressed_reason
- 160 suppressed_reason partial index

Models/ORM:
- Document: 5컬럼 Mapped 추가
- ProcessingQueue: deep_summary enum 확장 + payload 필드, enqueue_stage 에
  payload 옵션
- AnalyzeEvent: PR-A shadow 6컬럼 + PR-B tier/suppressed_reason

Workers:
- classify_worker: 기존 legacy 경로 뒤에 _run_tier_triage 추가.
  - _match_subject_domain(doc, text): source_channel + 본문 keywords + ai_domain
    prefix 로 PR-A policy 의 subject_domain 이름 결정 (category 매칭 금지).
  - R1 TriageOutput pydantic + JSON 깨짐 fallback (triage_json_invalid).
  - R2 _check_backlog_guard(): 30분 window ratio > threshold OR pending 초과면
    soft escalate suppress. hard escalate 는 통과.
  - R3 _slice_text_ranges(): 260k 초과 시 head 120k + mid 20k + tail 120k 3조각.
  - escalate 시 EscalationEnvelope 구성 + {envelope, subject_domain} payload 로
    deep_summary enqueue.
- deep_summary_worker (신규): queue payload 에서 envelope + subject_domain 읽기 →
  render_26b("p3c_deep_summary", subject_domain) + MLX 호출 (llm_gate Semaphore(1)
  경유) → ai_detail_summary + ai_inconsistencies 저장 + ai_analysis_tier='deep'.
  _filter_inconsistencies 로 허용 kind (version_drift / procedure_conflict /
  source_conflict / missing_basis) 만 통과 — 구매/계약 kind drop.
- queue_consumer: workers dict 에 deep_summary 추가 + BATCH_SIZE=1. next_stages
  는 건드리지 않음 — classify → embed/chunk 는 그대로, deep_summary 는 독립 체인.

Telemetry:
- record_analyze_event: subject_domain / risk_flags / escalation_reasons /
  confidence / policy_version / shadow_would_route_to / tier / escalated_to_26b /
  suppressed_reason 파라미터 확장. classify/deep worker 가 mode="summary_triage"
  또는 "summary_deep" 로 기록.

API:
- DocumentResponse 에 ai_tldr / ai_bullets / ai_detail_summary /
  ai_inconsistencies / ai_analysis_tier 5필드 노출.

Prompts:
- classify.txt 에 DEPRECATED 주석만 추가 (파일 유지 — rollback 경로 보존).
- PR-A 의 app/prompts/policy/p3a_short_summary.txt (4B) 와 p3c_deep_summary.txt
  (26B) 를 그대로 사용. 내 소유의 summary_triage.txt / summary_deep.txt 는 중복
  이라 별도 커밋에서 제거하지 않고 바로 생성 전 삭제.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-24 10:22:09 +09:00
parent 18d684b501
commit 6fdc48e5b6
15 changed files with 776 additions and 37 deletions
+6
View File
@@ -98,6 +98,12 @@ class DocumentResponse(BaseModel):
facet_doctype: str | None = None facet_doctype: str | None = None
category: str | None = None category: str | None = None
ai_suggestion: dict | None = None ai_suggestion: dict | None = None
# PR-B B-1: summary_triage (4B) / summary_deep (26B) 분할 산출
ai_tldr: str | None = None
ai_bullets: list | None = None
ai_detail_summary: str | None = None
ai_inconsistencies: list | None = None
ai_analysis_tier: str | None = None # 'triage' | 'deep' | null
extracted_at: datetime | None extracted_at: datetime | None
ai_processed_at: datetime | None ai_processed_at: datetime | None
embedded_at: datetime | None embedded_at: datetime | None
+18 -2
View File
@@ -8,7 +8,7 @@
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, Text from sqlalchemy import ARRAY, BigInteger, Boolean, DateTime, Float, ForeignKey, Integer, Text
from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
@@ -25,7 +25,7 @@ class AnalyzeEvent(Base):
user_id: Mapped[int | None] = mapped_column( user_id: Mapped[int | None] = mapped_column(
BigInteger, ForeignKey("users.id", ondelete="SET NULL") BigInteger, ForeignKey("users.id", ondelete="SET NULL")
) )
mode: Mapped[str] = mapped_column(Text, default="quick", nullable=False) # quick / full mode: Mapped[str] = mapped_column(Text, default="quick", nullable=False) # quick / full / summary_triage / summary_deep / retrieval_select / synthesis
text_limit: Mapped[int | None] = mapped_column(Integer) text_limit: Mapped[int | None] = mapped_column(Integer)
truncated: Mapped[bool] = mapped_column(Boolean, default=False) truncated: Mapped[bool] = mapped_column(Boolean, default=False)
layers_returned: Mapped[list[Any] | None] = mapped_column(JSONB, default=list) layers_returned: Mapped[list[Any] | None] = mapped_column(JSONB, default=list)
@@ -40,3 +40,19 @@ class AnalyzeEvent(Base):
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False DateTime(timezone=True), default=datetime.now, nullable=False
) )
# PR-A (migration 153) — routing shadow observability
subject_domain: Mapped[str | None] = mapped_column(Text)
risk_flags: Mapped[list[str] | None] = mapped_column(ARRAY(Text))
high_impact_task: Mapped[bool | None] = mapped_column(Boolean)
escalated_to_26b: Mapped[bool | None] = mapped_column(Boolean)
escalation_reasons: Mapped[list[str] | None] = mapped_column(ARRAY(Text))
confidence: Mapped[float | None] = mapped_column(Float)
policy_violation: Mapped[bool | None] = mapped_column(Boolean)
policy_violation_ids: Mapped[list[str] | None] = mapped_column(ARRAY(Text))
shadow_would_route_to: Mapped[str | None] = mapped_column(Text)
policy_version: Mapped[str | None] = mapped_column(Text)
# PR-B (migration 159) — 실제 호출 tier 와 R2 backlog guard 이벤트
tier: Mapped[str | None] = mapped_column(Text) # 'triage' | 'primary' | 'fallback'
suppressed_reason: Mapped[str | None] = mapped_column(Text) # 'backlog_guard(ratio=0.42,pending=7)'
+8
View File
@@ -115,6 +115,14 @@ class Document(Base):
# /accept-suggestion 승인 시에만 category / user_tags 반영 (자동 전이 금지) # /accept-suggestion 승인 시에만 category / user_tags 반영 (자동 전이 금지)
ai_suggestion: Mapped[dict | None] = mapped_column(JSONB) ai_suggestion: Mapped[dict | None] = mapped_column(JSONB)
# PR-B B-1: summary_triage (4B, 상시) / summary_deep (26B, 에스컬레이션) 분할 산출
ai_tldr: Mapped[str | None] = mapped_column(Text) # ≤60자 TL;DR
ai_bullets: Mapped[list | None] = mapped_column(JSONB) # 3~5개 핵심 bullets
ai_detail_summary: Mapped[str | None] = mapped_column(Text) # 26B 2~3문단
ai_inconsistencies: Mapped[list | None] = mapped_column(JSONB) # [{kind, desc}]
# 'triage' | 'deep' | NULL — 현재 문서가 어느 tier 까지 분석 완료됐는지
ai_analysis_tier: Mapped[str | None] = mapped_column(String(10))
# 비디오 썸네일 (§3) — ffmpeg 50% 지점 1장. PKM/Videos/.thumbs/{id}.jpg 절대경로. # 비디오 썸네일 (§3) — ffmpeg 50% 지점 1장. PKM/Videos/.thumbs/{id}.jpg 절대경로.
thumbnail_path: Mapped[str | None] = mapped_column(Text) thumbnail_path: Mapped[str | None] = mapped_column(Text)
+19 -4
View File
@@ -3,7 +3,7 @@
from datetime import datetime from datetime import datetime
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column from sqlalchemy.orm import Mapped, mapped_column
@@ -17,10 +17,11 @@ class ProcessingQueue(Base):
document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False) document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False)
stage: Mapped[str] = mapped_column( stage: Mapped[str] = mapped_column(
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue. # 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False. # DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum( Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview", "extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "stt", "thumbnail", "deep_summary",
name="process_stage", name="process_stage",
create_type=False, create_type=False,
), ),
@@ -33,6 +34,8 @@ class ProcessingQueue(Base):
attempts: Mapped[int] = mapped_column(SmallInteger, default=0) attempts: Mapped[int] = mapped_column(SmallInteger, default=0)
max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3) max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3)
error_message: Mapped[str | None] = mapped_column(Text) error_message: Mapped[str | None] = mapped_column(Text)
# B-1: deep_summary stage 가 EscalationEnvelope 를 payload 로 싣는다. 다른 stage 는 NULL.
payload: Mapped[dict | None] = mapped_column(JSONB)
created_at: Mapped[datetime] = mapped_column( created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now DateTime(timezone=True), default=datetime.now
) )
@@ -43,16 +46,28 @@ class ProcessingQueue(Base):
async def enqueue_stage( async def enqueue_stage(
session: AsyncSession, document_id: int, stage: str, *, status: str = "pending", session: AsyncSession,
document_id: int,
stage: str,
*,
status: str = "pending",
payload: dict | None = None,
) -> bool: ) -> bool:
"""ProcessingQueue에 행 추가 (DB 레벨 중복 방어). """ProcessingQueue에 행 추가 (DB 레벨 중복 방어).
같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면 같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면
아무것도 하지 않고 False 반환. 아무것도 하지 않고 False 반환.
B-1: payload 옵션으로 deep_summary 에 EscalationEnvelope JSON 을 실을 수 있다.
같은 문서 deep_summary 가 재제안될 경우 on_conflict_do_nothing 으로 기존 payload
유지 (최초 envelope 가 원본). 이후 재처리 시 재분석은 새 classify 가 트리거.
""" """
values: dict = {"document_id": document_id, "stage": stage, "status": status}
if payload is not None:
values["payload"] = payload
stmt = ( stmt = (
pg_insert(ProcessingQueue) pg_insert(ProcessingQueue)
.values(document_id=document_id, stage=stage, status=status) .values(**values)
.on_conflict_do_nothing( .on_conflict_do_nothing(
index_elements=["document_id", "stage"], index_elements=["document_id", "stage"],
index_where=text("status IN ('pending', 'processing')"), index_where=text("status IN ('pending', 'processing')"),
+4
View File
@@ -1,3 +1,7 @@
[DEPRECATED 2026-04-24] — summary_triage.txt 로 이관됨 (PR-B B-1 tier routing).
이 파일은 B-1 안정화 기간 동안 rollback 경로를 위해 유지. 신규 호출 경로는
summary_triage.txt + summary_deep.txt 조합 사용. 실제 삭제는 별도 cleanup PR.
You are a document classification AI. Analyze the document below and respond ONLY in JSON format. No other text. You are a document classification AI. Analyze the document below and respond ONLY in JSON format. No other text.
## Response Format ## Response Format
+24
View File
@@ -51,11 +51,25 @@ async def record_analyze_event(
prompt_version: str | None, prompt_version: str | None,
error_code: str | None, error_code: str | None,
source: str, source: str,
# PR-A shadow observability — 아래 6개는 routing 이 동반될 때만 세팅, 그 외는 None 유지.
subject_domain: str | None = None,
risk_flags: list[str] | None = None,
high_impact_task: bool | None = None,
escalation_reasons: list[str] | None = None,
confidence: float | None = None,
policy_version: str | None = None,
shadow_would_route_to: str | None = None,
# PR-B B-1 — 실제 호출 tier 와 R2 backlog guard
tier: str | None = None,
escalated_to_26b: bool | None = None,
suppressed_reason: str | None = None,
) -> None: ) -> None:
"""analyze_events INSERT. background task에서 호출 — 에러 삼킴. """analyze_events INSERT. background task에서 호출 — 에러 삼킴.
layers_returned: 성공 시 ["evidence","summary"] 등 layer 문자열 리스트. 실패 시 []. layers_returned: 성공 시 ["evidence","summary"] 등 layer 문자열 리스트. 실패 시 [].
error_code: None (성공) | "timeout" | "llm" | "parse" | "missing_summary" | "no_text" | "not_found" error_code: None (성공) | "timeout" | "llm" | "parse" | "missing_summary" | "no_text" | "not_found"
tier: 'triage' | 'primary' | 'fallback' — 실제 호출된 tier (PR-B B-0~B-2).
suppressed_reason: R2 backlog guard 로 soft escalate 가 suppress 된 경우의 이유 문자열.
""" """
try: try:
async with async_session() as session: async with async_session() as session:
@@ -72,6 +86,16 @@ async def record_analyze_event(
prompt_version=prompt_version, prompt_version=prompt_version,
error_code=error_code, error_code=error_code,
source=source, source=source,
subject_domain=subject_domain,
risk_flags=risk_flags,
high_impact_task=high_impact_task,
escalated_to_26b=escalated_to_26b,
escalation_reasons=escalation_reasons,
confidence=confidence,
policy_version=policy_version,
shadow_would_route_to=shadow_would_route_to,
tier=tier,
suppressed_reason=suppressed_reason,
) )
session.add(row) session.add(row)
await session.commit() await session.commit()
+6
View File
@@ -23,6 +23,12 @@ ASK_PROMPT_VERSION: str = "search_synthesis.v2-600char"
# documents.py analyze 라우트가 로드하는 app/prompts/document_analyze.txt 기준 # documents.py analyze 라우트가 로드하는 app/prompts/document_analyze.txt 기준
ANALYZE_PROMPT_VERSION: str = "document_analyze.v1" ANALYZE_PROMPT_VERSION: str = "document_analyze.v1"
# ─── PR-B B-1: summary tier 분할 task 이름 ─────────────────────
# classify_worker / deep_summary_worker 가 PR-A 정책 템플릿 + policy_version 해시
# 조합으로 analyze_events.prompt_version 을 기록한다. (예: "p3a_short_summary@abc123")
SUMMARY_TRIAGE_TASK: str = "p3a_short_summary" # 4B gemma Ollama
SUMMARY_DEEP_TASK: str = "p3c_deep_summary" # 26B MLX
def resolve_primary_model() -> str | None: def resolve_primary_model() -> str | None:
"""런타임 config에서 primary 모델명을 resolve. """런타임 config에서 primary 모델명을 resolve.
+406 -30
View File
@@ -1,17 +1,48 @@
"""AI 분류 워커 — taxonomy 기반 도메인/문서타입/태그/요약 생성""" """AI 분류 워커 — 도메인/문서타입/태그/요약 생성 + PR-B B-1 tier triage.
Legacy 경로 (primary 26B 호출):
- app/prompts/classify.txt 기반 classify()
- AIClient.summarize() 로 ai_summary 생성
- facet_doctype + ai_suggestion (library 제안) 저장
→ ai_domain / ai_sub_group / document_type / ai_confidence / ai_tags /
ai_summary / ai_suggestion / facet_doctype / importance 필드
PR-B B-1 tier triage (신규, 4B gemma Ollama):
- policy.routing.decide_routing 으로 RoutingDecision
- policy.prompt_render.render_4b("p3a_short_summary", subject_domain) 로 프롬프트 렌더
- AIClient.call_triage(rendered) 호출 (llm_gate 외부, Ollama concurrent OK)
- TriageOutput pydantic validate + JSON 깨짐 시 fallback escalate (R1)
- R2 backlog guard: deep_summary 큐 ratio > threshold or pending >= threshold 이면 suppress
- R3 head/middle/tail: 260k 초과 시 envelope text_ranges 3조각
→ ai_tldr / ai_bullets / ai_analysis_tier='triage' 저장 + analyze_events 기록
→ escalate 시 EscalationEnvelope payload 로 deep_summary 큐 enqueue
"""
from __future__ import annotations
import json
import time
from datetime import datetime, timezone from datetime import datetime, timezone
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import text as sql_text
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response, strip_thinking from ai.client import AIClient, parse_json_response, strip_thinking
from ai.envelope import EscalationEnvelope
from core.config import settings from core.config import settings
from core.utils import setup_logger from core.utils import setup_logger
from models.document import Document from models.document import Document
from models.queue import enqueue_stage
from policy.prompt_render import render_4b, policy_version as compute_policy_version
from policy.routing import decide_routing
from services.document_telemetry import record_analyze_event
logger = setup_logger("classify_worker") logger = setup_logger("classify_worker")
MAX_CLASSIFY_TEXT = 8000 MAX_CLASSIFY_TEXT = 8000 # legacy classify 입력 상한
TRIAGE_TEXT_LIMIT = 120_000 # p3a summary_triage 입력 상한 (4B)
DEEP_PRIMARY_LIMIT = 260_000 # primary context_char_limit (envelope slicing)
# settings에서 taxonomy/document_types 로딩 # settings에서 taxonomy/document_types 로딩
DOCUMENT_TYPES = set(settings.document_types) DOCUMENT_TYPES = set(settings.document_types)
@@ -22,6 +53,33 @@ FACET_DOCTYPES = {"발주서", "세금계산서", "명세표", "도면", "증명
# 자료실 자동 분류 제안 대상 (거래 하위) # 자료실 자동 분류 제안 대상 (거래 하위)
LIBRARY_SUGGESTION_DOCTYPES = {"발주서", "세금계산서", "명세표"} LIBRARY_SUGGESTION_DOCTYPES = {"발주서", "세금계산서", "명세표"}
# PR-B prompt_version task 이름
SUMMARY_TRIAGE_TASK = "p3a_short_summary"
# R2 — hard escalate 사유 (suppress 되지 않는 critical reason)
HARD_ESCALATE_REASONS = {"long_context", "triage_json_invalid", "low_confidence"}
# ───────────────────────── TriageOutput (R1) ─────────────────────────
class TriageOutput(BaseModel):
"""p3a_short_summary (4B) 응답 스키마. 파싱 실패 시 기본값 + escalate=True fallback."""
tldr: str = ""
bullets: list[str] = Field(default_factory=list)
tags: list[str] = Field(default_factory=list)
doc_type: str = ""
time_scope: str | None = None
confidence: float = 0.3
high_impact_self_declared: bool = False
high_impact_reason: str | None = None
recommend_deep_summary: bool = False
recommend_entity_pass: bool = False
escalate_to_26b: bool = False
risk_flags: list[str] = Field(default_factory=list)
# ───────────────────────── legacy classify (primary) ──────────────────
def _get_taxonomy_leaf_paths(taxonomy: dict, prefix: str = "") -> set[str]: def _get_taxonomy_leaf_paths(taxonomy: dict, prefix: str = "") -> set[str]:
"""taxonomy dict에서 모든 유효한 경로를 추출""" """taxonomy dict에서 모든 유효한 경로를 추출"""
@@ -52,45 +110,165 @@ def _validate_domain(domain: str) -> str:
"""domain이 taxonomy에 존재하는지 검증, 없으면 최대한 가까운 경로 찾기""" """domain이 taxonomy에 존재하는지 검증, 없으면 최대한 가까운 경로 찾기"""
if domain in VALID_DOMAIN_PATHS: if domain in VALID_DOMAIN_PATHS:
return domain return domain
# 부분 매칭 시도 (2단계까지)
parts = domain.split("/") parts = domain.split("/")
for i in range(len(parts), 0, -1): for i in range(len(parts), 0, -1):
partial = "/".join(parts[:i]) partial = "/".join(parts[:i])
if partial in VALID_DOMAIN_PATHS: if partial in VALID_DOMAIN_PATHS:
logger.warning(f"[분류] domain '{domain}''{partial}' (부분 매칭)") logger.warning(f"[분류] domain '{domain}''{partial}' (부분 매칭)")
return partial return partial
logger.warning(f"[분류] domain '{domain}' taxonomy에 없음, General/Reading_Notes로 대체") logger.warning(f"[분류] domain '{domain}' taxonomy에 없음, General/Reading_Notes로 대체")
return "General/Reading_Notes" return "General/Reading_Notes"
# ───────────────────────── B-1 subject_domain 매칭 ─────────────────────
def _match_subject_domain(doc: Document, text: str) -> str:
"""본문/메타 기반으로 domain_policy.yaml 의 subject_domain 이름 결정.
매칭 우선순위 (feedback_category_vs_ai_domain_axis.md — category 매칭 금지):
1. source_channel (news / law_monitor / memo)
2. 본문 keywords (첫 8k, 대소문자 무시)
3. ai_domain prefix — legacy classify 가 채운 taxonomy path
4. generic (fallback)
"""
sc = (doc.source_channel or "").lower()
if sc in ("news", "news_collector"):
return "news_item"
if sc == "law_monitor":
return "safety_reference"
if sc == "memo":
return "generic"
head = (text or "")[:8000].lower()
def _hit(keywords: list[str]) -> bool:
return any(kw.lower() in head for kw in keywords if kw)
# 구체 먼저
if _hit(["MSDS", "SDS", "물질안전보건자료", "화학물질", "유해화학물질"]):
return "msds"
if _hit(["사고보고", "재해조사", "원인분석", "재발방지", "중대재해"]):
return "incident_report"
if _hit(["건강검진", "작업환경측정", "보건관리", "특수건강진단"]):
return "health_record"
if _hit(["밀폐공간", "추락", "끼임", "감전", "화재", "폭발", "중독", "질식"]):
return "hazard_specific"
if _hit(["위험성평가", "작업허가서", "JSA", "안전작업지침", "보호구", "SOP"]):
return "safety_operational"
if _hit(["산업안전보건법", "중대재해", "안전보건관리체계", "유해위험방지계획서"]):
return "safety_reference"
if (doc.ai_domain or "").startswith("Industrial_Safety"):
return "safety_reference"
return "generic"
def _slice_text_ranges(doc_id: int, total_len: int) -> dict:
"""R3 — 26B 에 전달할 원문 슬라이스. total_len ≤ primary limit 이면 단일 range, 초과 시 head/mid/tail 3조각."""
if total_len <= DEEP_PRIMARY_LIMIT:
return {
"doc_ids": [doc_id],
"text_ranges": [{"doc_id": doc_id, "start": 0, "end": total_len}],
}
head_end = 120_000
mid_start = max(head_end, total_len // 2 - 10_000)
mid_end = mid_start + 20_000
tail_start = max(mid_end, total_len - 120_000)
return {
"doc_ids": [doc_id],
"text_ranges": [
{"doc_id": doc_id, "start": 0, "end": head_end},
{"doc_id": doc_id, "start": mid_start, "end": mid_end},
{"doc_id": doc_id, "start": tail_start, "end": total_len},
],
}
async def _check_backlog_guard(session: AsyncSession) -> str | None:
"""R2 — deep_summary 큐 적체 시 soft escalate 억제. 억제 사유 문자열 반환, 아니면 None."""
backlog = settings.ai.deep_summary_backlog
window = backlog.window_minutes
ratio_threshold = backlog.ratio_threshold
pending_threshold = backlog.pending_threshold
row = (await session.execute(sql_text("""
SELECT
COUNT(*) FILTER (WHERE stage = 'classify') AS classify_n,
COUNT(*) FILTER (WHERE stage = 'deep_summary') AS deep_n
FROM processing_queue
WHERE created_at > NOW() - make_interval(mins => :w)
"""), {"w": window})).one()
classify_n = row.classify_n or 0
deep_n = row.deep_n or 0
ratio = (deep_n / classify_n) if classify_n > 0 else 0.0
pending_deep = int(await session.scalar(sql_text("""
SELECT COUNT(*) FROM processing_queue
WHERE stage = 'deep_summary' AND status IN ('pending', 'processing')
""")) or 0)
if ratio > ratio_threshold or pending_deep > pending_threshold:
return f"backlog_guard(ratio={ratio:.2f},pending={pending_deep})"
return None
def _classify_escalation_reason(
triage_out: TriageOutput,
input_chars: int,
triage_limit: int,
confidence_floor: float,
) -> str | None:
"""escalate_to_26b 가 True 이도록 만든 근본 사유 하나를 반환. 아니면 None."""
if input_chars > triage_limit:
return "long_context"
if triage_out.confidence < confidence_floor:
return "low_confidence"
if triage_out.escalate_to_26b:
return "self_declare"
if triage_out.recommend_deep_summary:
return "deep_requested"
return None
def _is_hard_escalate(reason: str | None) -> bool:
"""R2 — hard escalate 는 suppress 되지 않는다."""
return reason in HARD_ESCALATE_REASONS
def _distill(triage_out: TriageOutput, limit: int = 2000) -> str:
"""26B envelope.distilled_context 용 4B 출력 압축 텍스트."""
parts: list[str] = []
if triage_out.tldr:
parts.append(f"TL;DR: {triage_out.tldr}")
if triage_out.bullets:
parts.append("핵심:")
parts.extend(f"- {b}" for b in triage_out.bullets)
if triage_out.doc_type:
parts.append(f"doc_type: {triage_out.doc_type}")
if triage_out.tags:
parts.append(f"tags: {', '.join(triage_out.tags)}")
return "\n".join(parts)[:limit]
# ───────────────────────── main process ────────────────────────────────
async def process(document_id: int, session: AsyncSession) -> None: async def process(document_id: int, session: AsyncSession) -> None:
"""문서 AI 분류 + 요약""" """문서 분류 + 요약 + tier triage.
1) Legacy: classify() → ai_domain/document_type/ai_tags/ai_confidence/ai_suggestion
2) Legacy: summarize() → ai_summary
3) PR-B B-1: summary_triage (4B) → ai_tldr/ai_bullets/ai_analysis_tier='triage'
"""
doc = await session.get(Document, document_id) doc = await session.get(Document, document_id)
if not doc: if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
# 법령은 구조 고정 + 외부 source of truth (law.go.kr) + 자동 재수집.
# AI 분류 skip, downstream(embed/chunk) 은 queue_consumer NEXT_STAGES 가 자동 chain.
# ai_domain 단일 "법령" — PR-A policy.domain_policy.yaml 에서 source_channel 기준 세분화.
if doc.source_channel == "law_monitor":
if not doc.ai_domain:
doc.ai_domain = "법령"
if not doc.ai_tags:
doc.ai_tags = ["법령"]
if not doc.importance:
doc.importance = "medium"
await session.commit()
logger.info(f"doc {document_id}: law_monitor → classify skip")
return
if not doc.extracted_text: if not doc.extracted_text:
raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음") raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음")
client = AIClient() client = AIClient()
try: try:
# ─── 분류 ─── # ─── 1. Legacy classify (primary 26B) ───
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT] truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
raw_response = await client.classify(truncated) raw_response = await client.classify(truncated)
parsed = parse_json_response(raw_response) parsed = parse_json_response(raw_response)
@@ -134,16 +312,12 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc.doc_purpose = purpose doc.doc_purpose = purpose
# ─── facet_doctype 식별 (§1 실무 문서 유형 신호) ─── # ─── facet_doctype 식별 (§1 실무 문서 유형 신호) ───
# AI 식별값이 허용 enum 이면 facet_doctype 저장. 기존 값이 있으면 덮어쓰지 않음
# (수동 수정 / Phase 2 facet 우선). document.category / user_tags 는 **건드리지 않음**.
ai_doctype_raw = parsed.get("facet_doctype") ai_doctype_raw = parsed.get("facet_doctype")
ai_doctype = ai_doctype_raw if ai_doctype_raw in FACET_DOCTYPES else None ai_doctype = ai_doctype_raw if ai_doctype_raw in FACET_DOCTYPES else None
if ai_doctype and not doc.facet_doctype: if ai_doctype and not doc.facet_doctype:
doc.facet_doctype = ai_doctype doc.facet_doctype = ai_doctype
# ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ─── # ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ───
# 발주서/세금계산서/명세표 → 자료실 '거래' 분류 제안. 자동 전이 금지.
# /accept-suggestion 승인 UI 에서만 실제 category='library' + @library/... 부여.
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES: if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES:
year = doc.facet_year or datetime.now(timezone.utc).year year = doc.facet_year or datetime.now(timezone.utc).year
doc.ai_suggestion = { doc.ai_suggestion = {
@@ -157,12 +331,12 @@ async def process(document_id: int, session: AsyncSession) -> None:
"reason": "classify pipeline", "reason": "classify pipeline",
} }
# ─── 요약 ─── # ─── 2. Legacy 요약 (primary 26B) ───
summary = await client.summarize(doc.extracted_text[:50000]) summary = await client.summarize(doc.extracted_text[:50000])
doc.ai_summary = strip_thinking(summary) doc.ai_summary = strip_thinking(summary)
# ─── 메타데이터 ─── # ─── 메타데이터 (legacy 완료) ───
doc.ai_model_version = "qwen3.5-35b-a3b" doc.ai_model_version = settings.ai.primary.model
doc.ai_processed_at = datetime.now(timezone.utc) doc.ai_processed_at = datetime.now(timezone.utc)
logger.info( logger.info(
@@ -171,7 +345,209 @@ async def process(document_id: int, session: AsyncSession) -> None:
f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}" f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}"
) )
# ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ───
try:
await _run_tier_triage(client, doc, session)
except Exception as exc:
logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}")
finally: finally:
await client.close() await client.close()
# _move_to_knowledge 제거됨 — 파일은 원본 위치 유지, 분류는 DB 메타데이터만
async def _run_tier_triage(client: AIClient, doc: Document, session: AsyncSession) -> None:
"""summary_triage (p3a_short_summary) 경로."""
document_id = doc.id
text = doc.extracted_text or ""
input_chars = len(text)
subject_domain = _match_subject_domain(doc, text)
triage_start = time.perf_counter()
parse_error: str | None = None
triage_out = TriageOutput()
# 입력이 triage 한도 초과면 호출 생략하고 long_context 로 escalate
if input_chars > TRIAGE_TEXT_LIMIT:
await _apply_triage_result(
doc=doc,
session=session,
triage_out=triage_out,
subject_domain=subject_domain,
input_chars=input_chars,
latency_ms=0,
escalation_reason="long_context",
parse_error=None,
)
return
try:
rendered = render_4b(SUMMARY_TRIAGE_TASK, subject_domain)
except Exception as exc:
logger.exception(f"[triage] render_4b 실패 subject={subject_domain}: {exc}")
return
# 템플릿의 {extracted_text} 는 단일중괄호로 format 후 남아있음. str.replace 로 주입.
prompt = rendered.replace("{extracted_text}", text[:TRIAGE_TEXT_LIMIT])
try:
raw_triage = await client.call_triage(prompt)
except Exception as exc:
logger.warning(f"[triage] 4B 호출 실패 id={document_id}: {exc}")
parse_error = "call_failed"
raw_triage = ""
latency_ms = int((time.perf_counter() - triage_start) * 1000)
# R1 — JSON parsing + schema validation + fallback
if raw_triage:
try:
parsed_triage = parse_json_response(raw_triage) or {}
triage_out = TriageOutput.model_validate(parsed_triage)
except (ValidationError, ValueError, TypeError) as exc:
parse_error = f"parse:{type(exc).__name__}"
logger.warning(f"[triage] JSON 파싱/검증 실패 → fallback: {exc}")
if parse_error:
triage_out = triage_out.model_copy(update={
"escalate_to_26b": True,
"confidence": 0.3,
})
escalation_reason: str | None = "triage_json_invalid"
else:
escalation_reason = _classify_escalation_reason(
triage_out,
input_chars=input_chars,
triage_limit=TRIAGE_TEXT_LIMIT,
confidence_floor=0.6,
)
await _apply_triage_result(
doc=doc,
session=session,
triage_out=triage_out,
subject_domain=subject_domain,
input_chars=input_chars,
latency_ms=latency_ms,
escalation_reason=escalation_reason,
parse_error=parse_error,
)
async def _apply_triage_result(
*,
doc: Document,
session: AsyncSession,
triage_out: TriageOutput,
subject_domain: str,
input_chars: int,
latency_ms: int,
escalation_reason: str | None,
parse_error: str | None,
) -> None:
"""TriageOutput → Document 필드 + R2 suppression + envelope enqueue + audit."""
document_id = doc.id
# Document 필드 (파싱 실패 시에는 legacy ai_summary 가 노출되도록 tldr 비움)
if not parse_error:
doc.ai_tldr = (triage_out.tldr or "").strip() or None
doc.ai_bullets = triage_out.bullets or []
doc.ai_analysis_tier = "triage"
# decide_routing — shadow observability 용 (INV 체크)
routing_decision = None
try:
routing_decision = decide_routing(
subject_domain=subject_domain,
content_chars=input_chars,
deterministic_keyword_hits=(),
self_declared_high_impact=triage_out.high_impact_self_declared,
self_declared_risk_flags=tuple(triage_out.risk_flags),
confidence=triage_out.confidence,
evidence_doc_count=0,
)
except Exception as exc:
logger.warning(f"[triage] decide_routing 실패 id={document_id}: {exc}")
# R2 — backlog guard (hard 제외 soft escalate 만 억제)
suppressed_reason: str | None = None
escalate = escalation_reason is not None
if escalate and not _is_hard_escalate(escalation_reason):
suppressed_reason = await _check_backlog_guard(session)
if suppressed_reason:
escalate = False
escalation_reason = None
# enqueue deep_summary
if escalate:
try:
envelope = EscalationEnvelope(
from_stage="summary_triage",
escalation_reasons=tuple([escalation_reason] if escalation_reason else []),
risk_flags=tuple(routing_decision.risk_flags) if routing_decision
else tuple(triage_out.risk_flags),
distilled_context=_distill(triage_out),
original_pointers=_slice_text_ranges(document_id, input_chars),
synthesis_directives=(
tuple(routing_decision.synthesis_directives) if routing_decision else ()
),
user_intent=None,
draft_hint=(triage_out.tldr or None),
)
await enqueue_stage(
session,
document_id,
"deep_summary",
payload={
"envelope": json.loads(envelope.to_json()),
"subject_domain": subject_domain,
},
)
except Exception as exc:
logger.exception(f"[triage] envelope enqueue 실패 id={document_id}: {exc}")
# analyze_events
try:
pv = compute_policy_version(SUMMARY_TRIAGE_TASK)
except Exception:
pv = None
escalation_reasons_list = (
list(routing_decision.escalation_reasons) if routing_decision
else ([escalation_reason] if escalation_reason else [])
)
shadow_target = (
"primary" if (routing_decision and routing_decision.escalate_to_26b) else "triage"
)
await record_analyze_event(
doc_id=document_id,
user_id=None,
mode="summary_triage",
text_limit=TRIAGE_TEXT_LIMIT,
truncated=input_chars > TRIAGE_TEXT_LIMIT,
layers_returned=["tldr", "bullets"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=settings.ai.triage.model,
prompt_version=(f"{SUMMARY_TRIAGE_TASK}@{pv}" if pv else SUMMARY_TRIAGE_TASK),
error_code=parse_error,
source="document_server",
subject_domain=subject_domain,
risk_flags=(list(routing_decision.risk_flags) if routing_decision
else list(triage_out.risk_flags)),
high_impact_task=(routing_decision.high_impact_task if routing_decision else None),
escalation_reasons=escalation_reasons_list,
confidence=triage_out.confidence,
policy_version=pv,
shadow_would_route_to=shadow_target,
tier="triage",
escalated_to_26b=escalate,
suppressed_reason=suppressed_reason,
)
logger.info(
f"[triage] id={document_id} subject={subject_domain} "
f"escalate={escalate} reason={escalation_reason} "
f"suppressed={bool(suppressed_reason)} "
f"confidence={triage_out.confidence:.2f} "
f"tldr_len={len(doc.ai_tldr or '')} bullets={len(doc.ai_bullets or [])}"
)
+210
View File
@@ -0,0 +1,210 @@
"""PR-B B-1 Deep Summary 워커 — 26B (primary MLX) 에스컬레이션 분석.
stage 'deep_summary' 에서 pickup. classify_worker enqueue payload 실은
EscalationEnvelope + subject_domain 읽어, PR-A policy 템플릿 `p3c_deep_summary`
렌더링한 26B primary 호출한다. llm_gate Semaphore(1) 경유 MLX 단일 인퍼런스 보호.
출력을 documents.ai_detail_summary / ai_inconsistencies 저장하고 ai_analysis_tier
'deep' 으로 전이. 실패는 무해하게 legacy 결과 보존.
"""
from __future__ import annotations
import json
import time
from datetime import datetime, timezone
from pydantic import BaseModel, Field, ValidationError
from sqlalchemy import desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from ai.envelope import EscalationEnvelope
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue
from policy.prompt_render import render_26b, policy_version as compute_policy_version
from services.document_telemetry import record_analyze_event
from services.search.llm_gate import get_mlx_gate
logger = setup_logger("deep_summary_worker")
DEEP_SUMMARY_TASK = "p3c_deep_summary"
# inconsistencies kind 허용 목록 (feedback_document_server_domain_scope.md — 구매/계약 제외)
ALLOWED_INCONSISTENCY_KINDS = {
"version_drift", "procedure_conflict", "source_conflict", "missing_basis",
}
class DeepSummaryOutput(BaseModel):
"""p3c_deep_summary (26B) 응답 스키마. 파싱 실패 시 기본값 + skip."""
mode: str = "single"
tldr: str = ""
bullets: list[str] = Field(default_factory=list)
detail: str = "" # 26B 가 채우는 상세 (detail_summary 로 저장)
bundle_flow: list[str] | None = None
inconsistencies: list[dict] | None = None
entities_confirmed: dict | None = None
directives_applied: list[str] | None = None
confidence: float = 0.5
async def process(document_id: int, session: AsyncSession) -> None:
"""deep_summary 큐 pickup → 26B 호출 → 필드 저장."""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"deep_summary: document id={document_id} 없음")
# 최신 deep_summary 큐 행의 payload 조회 (queue_consumer 가 status='processing' 으로 세팅한 상태)
queue_row = (await session.execute(
select(ProcessingQueue)
.where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == "deep_summary",
ProcessingQueue.status == "processing",
)
.order_by(desc(ProcessingQueue.id))
.limit(1)
)).scalar_one_or_none()
if not queue_row:
logger.warning(f"[deep] processing 상태의 deep_summary row 없음 id={document_id}")
return
payload = queue_row.payload or {}
envelope_raw = payload.get("envelope")
subject_domain = payload.get("subject_domain") or "generic"
if not envelope_raw:
logger.error(f"[deep] envelope 없음 id={document_id} payload_keys={list(payload.keys())}")
raise ValueError("deep_summary payload 에 envelope 없음")
envelope = EscalationEnvelope.from_json(json.dumps(envelope_raw))
# 원문 슬라이스 추출 (envelope.original_pointers.text_ranges 기반)
slices = _build_text_slices(doc.extracted_text or "", envelope.original_pointers)
# PR-A 템플릿 렌더
try:
rendered = render_26b(DEEP_SUMMARY_TASK, subject_domain)
except Exception as exc:
logger.exception(f"[deep] render_26b 실패 subject={subject_domain}: {exc}")
raise
prompt = (
rendered
.replace("{escalation_envelope_json}", envelope.to_system_injection())
.replace("{original_text_slices}", slices)
)
client = AIClient()
latency_ms = 0
parse_error: str | None = None
deep_out = DeepSummaryOutput()
try:
start = time.perf_counter()
async with get_mlx_gate(): # primary(26B) 보호 Semaphore(1)
raw = await client.call_primary(prompt)
latency_ms = int((time.perf_counter() - start) * 1000)
except Exception as exc:
logger.warning(f"[deep] 26B 호출 실패 id={document_id}: {exc}")
parse_error = "call_failed"
raw = ""
finally:
await client.close()
if raw:
try:
parsed = parse_json_response(raw) or {}
deep_out = DeepSummaryOutput.model_validate(parsed)
except (ValidationError, ValueError, TypeError) as exc:
parse_error = f"parse:{type(exc).__name__}"
logger.warning(f"[deep] JSON 파싱/검증 실패 id={document_id}: {exc}")
if not parse_error:
doc.ai_detail_summary = (deep_out.detail or "").strip() or None
doc.ai_inconsistencies = _filter_inconsistencies(deep_out.inconsistencies or [])
doc.ai_analysis_tier = "deep"
doc.ai_processed_at = datetime.now(timezone.utc)
try:
pv = compute_policy_version(DEEP_SUMMARY_TASK)
except Exception:
pv = None
await record_analyze_event(
doc_id=document_id,
user_id=None,
mode="summary_deep",
text_limit=settings.ai.primary.context_char_limit or 260000,
truncated=False,
layers_returned=["detail_summary", "inconsistencies"] if not parse_error else [],
cached=False,
latency_ms=latency_ms,
model_name=settings.ai.primary.model,
prompt_version=(f"{DEEP_SUMMARY_TASK}@{pv}" if pv else DEEP_SUMMARY_TASK),
error_code=parse_error,
source="document_server",
subject_domain=subject_domain,
risk_flags=list(envelope.risk_flags),
high_impact_task=None,
escalation_reasons=list(envelope.escalation_reasons),
confidence=deep_out.confidence,
policy_version=pv,
shadow_would_route_to="primary",
tier="primary",
escalated_to_26b=True,
suppressed_reason=None,
)
logger.info(
f"[deep] id={document_id} subject={subject_domain} "
f"detail_len={len(doc.ai_detail_summary or '')} "
f"inc={len(doc.ai_inconsistencies or [])} latency_ms={latency_ms} "
f"parse_error={parse_error}"
)
def _build_text_slices(text: str, pointers: dict) -> str:
"""original_pointers.text_ranges 의 [{start, end}] 를 실제 본문 슬라이스로 합친다.
3조각 이상이면 조각 앞에 [slice N head/middle/tail] 라벨을 붙여 26B 순서 인지.
단일 슬라이스면 라벨 없이 원문 그대로.
"""
ranges = (pointers or {}).get("text_ranges") or []
if not ranges:
return text[:260_000]
if len(ranges) == 1:
r = ranges[0]
return text[r.get("start", 0):r.get("end", len(text))]
labels = ["head", "middle", "tail"]
parts: list[str] = []
for idx, r in enumerate(ranges):
label = labels[idx] if idx < len(labels) else f"slice{idx}"
chunk = text[r.get("start", 0):r.get("end", len(text))]
parts.append(f"[slice {idx}{label}]\n{chunk}")
return "\n\n".join(parts)
def _filter_inconsistencies(items: list) -> list[dict]:
"""허용 kind 목록 (safety/news 도메인 한정) 만 통과시킨다.
`amount_mismatch` 같은 구매/계약 kind 여기서 drop (feedback_document_server_domain_scope.md).
구조 오류 (kind/desc 누락) drop.
"""
out: list[dict] = []
for it in items or []:
if not isinstance(it, dict):
continue
kind = str(it.get("kind") or "")
desc_ = str(it.get("desc") or "").strip()
if kind not in ALLOWED_INCONSISTENCY_KINDS:
continue
if not desc_:
continue
out.append({"kind": kind, "desc": desc_})
return out
+6 -1
View File
@@ -14,8 +14,9 @@ logger = setup_logger("queue_consumer")
# stage별 배치 크기 # stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움. # stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
"preview": 2, "stt": 1, "thumbnail": 3} "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1}
STALE_THRESHOLD_MINUTES = 10 STALE_THRESHOLD_MINUTES = 10
@@ -122,6 +123,7 @@ async def consume_queue():
"""큐에서 pending 항목을 가져와 stage별 워커 실행""" """큐에서 pending 항목을 가져와 stage별 워커 실행"""
from workers.classify_worker import process as classify_process from workers.classify_worker import process as classify_process
from workers.chunk_worker import process as chunk_process from workers.chunk_worker import process as chunk_process
from workers.deep_summary_worker import process as deep_summary_process
from workers.embed_worker import process as embed_process from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process from workers.extract_worker import process as extract_process
from workers.preview_worker import process as preview_process from workers.preview_worker import process as preview_process
@@ -138,6 +140,9 @@ async def consume_queue():
"preview": preview_process, "preview": preview_process,
"stt": stt_process, "stt": stt_process,
"thumbnail": thumbnail_process, "thumbnail": thumbnail_process,
# PR-B B-1: classify 가 에스컬레이션 판단 시 enqueue → 26B 가 detail_summary 작성.
# next_stages 에 추가하지 않음 — deep_summary 는 leaf (classify→embed/chunk 흐름과 독립).
"deep_summary": deep_summary_process,
} }
try: try:
+18
View File
@@ -0,0 +1,18 @@
-- 156_ai_analysis_cols.sql
-- PR-B B-1: documents 에 4B triage / 26B deep 분할 컬럼 추가.
-- plan: ~/.claude/plans/swirling-swimming-liskov.md
--
-- ai_tldr / ai_bullets → summary_triage 산출 (4B, 상시)
-- ai_detail_summary / ai_inconsistencies → summary_deep 산출 (26B, escalation)
-- ai_analysis_tier → 'triage' | 'deep' — 현재 문서가 어느 tier 까지 처리됐는지
-- ai_summary (legacy TEXT) → ai_tldr 을 복제하여 backward compat
--
-- 주의: _run_migrations 는 asyncpg exec_driver_sql 로 단일 prepared statement — ALTER TABLE
-- 의 다중 ADD COLUMN 절은 단일 statement 라 허용. enum ADD VALUE 는 별도 (157).
ALTER TABLE documents
ADD COLUMN IF NOT EXISTS ai_tldr TEXT,
ADD COLUMN IF NOT EXISTS ai_bullets JSONB,
ADD COLUMN IF NOT EXISTS ai_detail_summary TEXT,
ADD COLUMN IF NOT EXISTS ai_inconsistencies JSONB,
ADD COLUMN IF NOT EXISTS ai_analysis_tier TEXT;
@@ -0,0 +1,13 @@
-- 157_queue_stage_deep_summary.sql
-- PR-B B-1: processing_queue.stage enum 에 'deep_summary' 추가.
-- plan: ~/.claude/plans/swirling-swimming-liskov.md §B-1
--
-- PostgreSQL 제약: ALTER TYPE ADD VALUE 는 새 값을 같은 트랜잭션 안에서 사용할 수 없다.
-- (unsafe use of new value of enum type)
-- 따라서 이 migration 은 enum 확장만 단독으로 처리하고, processing_queue.payload 컬럼
-- 추가는 158 로 분리. classify_worker 가 deep_summary 를 enqueue 하는 시점엔 157 이
-- 이미 별도 트랜잭션으로 commit 돼 있다.
--
-- audio/video 파이프와 동일 패턴 (147~151 참조).
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'deep_summary';
@@ -0,0 +1,13 @@
-- 158_processing_queue_payload.sql
-- PR-B B-1: processing_queue 에 payload JSONB 컬럼 추가.
-- plan: ~/.claude/plans/swirling-swimming-liskov.md §B-1
--
-- deep_summary stage 가 EscalationEnvelope (from_stage / escalation_reasons /
-- risk_flags / distilled_context / original_pointers / synthesis_directives / ...)
-- 를 payload 로 싣고 전달. 다른 stage 는 NULL.
--
-- 157 의 enum ADD VALUE 가 별도 트랜잭션으로 commit 된 뒤에만 실행돼야 한다.
-- (asyncpg migrations 러너가 파일 순서대로 단일-statement 실행하므로 안전.)
ALTER TABLE processing_queue
ADD COLUMN IF NOT EXISTS payload JSONB;
@@ -0,0 +1,18 @@
-- 159_analyze_events_pr_b_tier.sql
-- PR-B B-1: analyze_events 에 실제 호출 tier + suppressed_reason 추가.
-- plan: ~/.claude/plans/swirling-swimming-liskov.md §B-1 R2
--
-- PR-A (153) 이 shadow_would_route_to 로 "정책이 제안한 tier" 는 이미 기록.
-- 여기서는 PR-B 가 실제로 호출한 tier 와, R2 backlog guard 로 suppress 된 이유를 추가.
--
-- tier : 'triage' | 'primary' | 'fallback' — classify_worker / deep_summary_worker /
-- evidence_service / synthesis_service 가 실제 호출 시 기록.
-- suppressed_reason : 'backlog_guard(ratio=0.42,pending=7)' 등. escalation 판정이 soft 였으나
-- backlog guard 로 suppress 된 건을 사후 디버깅 하기 위해 별도 컬럼화.
-- dashboard "Backlog Suppression (24h)" 카드의 소스.
--
-- 단일 statement (ALTER TABLE 다중 ADD COLUMN 은 허용). 인덱스는 160 으로 분리.
ALTER TABLE analyze_events
ADD COLUMN IF NOT EXISTS tier TEXT,
ADD COLUMN IF NOT EXISTS suppressed_reason TEXT;
@@ -0,0 +1,7 @@
-- 160_analyze_events_pr_b_idx.sql
-- PR-B B-1: suppressed_reason partial index (backlog guard 발생 조회용).
-- plan: ~/.claude/plans/swirling-swimming-liskov.md §B-1 R2
CREATE INDEX IF NOT EXISTS idx_analyze_events_suppressed
ON analyze_events (created_at)
WHERE suppressed_reason IS NOT NULL;