Files
hyungi_document_server/app/models/queue.py
T
Hyungi Ahn 6fdc48e5b6 feat(ai): B-1 summary tier 분할 — triage(4B) + deep_summary(26B)
PR-A policy 레이어를 재사용하여 classify_worker 에 tier triage 경로를 추가.
Legacy ai_summary / ai_domain / ai_suggestion 은 유지 (회귀 0), tldr/bullets/
detail/inconsistencies 는 별도 필드로 분리.

Migrations (156~160):
- 156 documents: ai_tldr, ai_bullets, ai_detail_summary, ai_inconsistencies,
  ai_analysis_tier 5컬럼
- 157 process_stage 에 'deep_summary' ADD VALUE 단독 (Postgres 동일 트랜잭션
  제약 회피)
- 158 processing_queue.payload JSONB (envelope 전달)
- 159 analyze_events 에 tier + suppressed_reason
- 160 suppressed_reason partial index

Models/ORM:
- Document: 5컬럼 Mapped 추가
- ProcessingQueue: deep_summary enum 확장 + payload 필드, enqueue_stage 에
  payload 옵션
- AnalyzeEvent: PR-A shadow 6컬럼 + PR-B tier/suppressed_reason

Workers:
- classify_worker: 기존 legacy 경로 뒤에 _run_tier_triage 추가.
  - _match_subject_domain(doc, text): source_channel + 본문 keywords + ai_domain
    prefix 로 PR-A policy 의 subject_domain 이름 결정 (category 매칭 금지).
  - R1 TriageOutput pydantic + JSON 깨짐 fallback (triage_json_invalid).
  - R2 _check_backlog_guard(): 30분 window ratio > threshold OR pending 초과면
    soft escalate suppress. hard escalate 는 통과.
  - R3 _slice_text_ranges(): 260k 초과 시 head 120k + mid 20k + tail 120k 3조각.
  - escalate 시 EscalationEnvelope 구성 + {envelope, subject_domain} payload 로
    deep_summary enqueue.
- deep_summary_worker (신규): queue payload 에서 envelope + subject_domain 읽기 →
  render_26b("p3c_deep_summary", subject_domain) + MLX 호출 (llm_gate Semaphore(1)
  경유) → ai_detail_summary + ai_inconsistencies 저장 + ai_analysis_tier='deep'.
  _filter_inconsistencies 로 허용 kind (version_drift / procedure_conflict /
  source_conflict / missing_basis) 만 통과 — 구매/계약 kind drop.
- queue_consumer: workers dict 에 deep_summary 추가 + BATCH_SIZE=1. next_stages
  는 건드리지 않음 — classify → embed/chunk 는 그대로, deep_summary 는 독립 체인.

Telemetry:
- record_analyze_event: subject_domain / risk_flags / escalation_reasons /
  confidence / policy_version / shadow_would_route_to / tier / escalated_to_26b /
  suppressed_reason 파라미터 확장. classify/deep worker 가 mode="summary_triage"
  또는 "summary_deep" 로 기록.

API:
- DocumentResponse 에 ai_tldr / ai_bullets / ai_detail_summary /
  ai_inconsistencies / ai_analysis_tier 5필드 노출.

Prompts:
- classify.txt 에 DEPRECATED 주석만 추가 (파일 유지 — rollback 경로 보존).
- PR-A 의 app/prompts/policy/p3a_short_summary.txt (4B) 와 p3c_deep_summary.txt
  (26B) 를 그대로 사용. 내 소유의 summary_triage.txt / summary_deep.txt 는 중복
  이라 별도 커밋에서 제거하지 않고 바로 생성 전 삭제.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 10:22:40 +09:00

78 lines
3.1 KiB
Python

"""processing_queue 테이블 ORM (비동기 가공 큐)"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class ProcessingQueue(Base):
__tablename__ = "processing_queue"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False)
stage: Mapped[str] = mapped_column(
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary",
name="process_stage",
create_type=False,
),
nullable=False,
)
status: Mapped[str] = mapped_column(
Enum("pending", "processing", "completed", "failed", name="process_status"),
default="pending"
)
attempts: Mapped[int] = mapped_column(SmallInteger, default=0)
max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3)
error_message: Mapped[str | None] = mapped_column(Text)
# B-1: deep_summary stage 가 EscalationEnvelope 를 payload 로 싣는다. 다른 stage 는 NULL.
payload: Mapped[dict | None] = mapped_column(JSONB)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# DB 제약은 partial unique index uq_queue_active로 관리 (migration 117)
async def enqueue_stage(
session: AsyncSession,
document_id: int,
stage: str,
*,
status: str = "pending",
payload: dict | None = None,
) -> bool:
"""ProcessingQueue에 행 추가 (DB 레벨 중복 방어).
같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면
아무것도 하지 않고 False 반환.
B-1: payload 옵션으로 deep_summary 에 EscalationEnvelope JSON 을 실을 수 있다.
같은 문서 deep_summary 가 재제안될 경우 on_conflict_do_nothing 으로 기존 payload
유지 (최초 envelope 가 원본). 이후 재처리 시 재분석은 새 classify 가 트리거.
"""
values: dict = {"document_id": document_id, "stage": stage, "status": status}
if payload is not None:
values["payload"] = payload
stmt = (
pg_insert(ProcessingQueue)
.values(**values)
.on_conflict_do_nothing(
index_elements=["document_id", "stage"],
index_where=text("status IN ('pending', 'processing')"),
)
)
result = await session.execute(stmt)
return result.rowcount > 0