e8c348ab21
3일 telemetry (599 triage / 555 deep) 기반 임계치 재평가:
1. 에스컬레이션 비율 — 임계치 의미 reframe
- 기존: >20% 적색 (튜닝 필요) → 항상 적색 (운영 패턴 97%)
- 신규: <80% 적색 (정책 매칭 실패 증가)
- 메시지: "safety 정책상 95~100% 가 정상" 보조 표시
- safety_reference 99.7%, generic 100% (fallback risk_flag), msds 46.2%
→ 운영 정상 패턴 확인
2. Deep summary 안정성 — 신규 카드 추가
- mode='summary_deep' 의 error_code IS NOT NULL 비율
- 현재 5.2% (call_failed 21 + parse:ValidationError 8)
- >5% 적색 임계
- MLX 호출 timeout / JSON 파싱 실패 모니터
3. triage JSON 건강도, Backlog Suppression — 임계치 유지
- 현재 0%, 1% — 매우 안정. 보수적 임계 유효.
Backend: TierHealthStack 에 deep_total / deep_err_total 추가
Frontend: 카드 그리드 3열 → 4열 (lg), Day 4 신규 카드.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
301 lines
11 KiB
Python
301 lines
11 KiB
Python
"""대시보드 위젯 데이터 API"""
|
|
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import func, select, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from core.database import get_session
|
|
from models.document import Document
|
|
from models.queue import ProcessingQueue
|
|
from models.user import User
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class DomainCount(BaseModel):
|
|
domain: str | None
|
|
count: int
|
|
|
|
|
|
class RecentDocument(BaseModel):
|
|
id: int
|
|
title: str | None
|
|
file_format: str
|
|
ai_domain: str | None
|
|
created_at: str
|
|
|
|
|
|
class PipelineStatus(BaseModel):
|
|
stage: str
|
|
status: str
|
|
count: int
|
|
|
|
|
|
class QueueLag(BaseModel):
|
|
"""파이프라인 stage 별 처리 지연 — 운영 카드용.
|
|
|
|
pipeline_status 는 24h 누적 통계라 현재 적체 신호로 부족.
|
|
queue_lag 는 현재 시점 pending/processing/failed + oldest pending age 로
|
|
"지금 막힌 게 있는가" 를 보여준다.
|
|
"""
|
|
stage: str
|
|
pending: int
|
|
processing: int
|
|
failed: int
|
|
oldest_pending_age_sec: int | None # 가장 오래된 pending 의 created_at 기준 경과 (초)
|
|
|
|
|
|
class TierHealthStack(BaseModel):
|
|
"""PR-B B-3 — tier 관측성 카드 소스 (24h 윈도우).
|
|
|
|
대시보드 카드 (Day 4 튜닝 — 2026-04-27 임계치 재조정):
|
|
- "에스컬레이션 비율": escalated_total / triage_total
|
|
· <80% 적색 (정책 매칭 실패 증가 — 진짜 튜닝 필요)
|
|
· 80~99% 정상 (safety/health 정책 의도)
|
|
- "triage JSON 건강도": triage_json_invalid / triage_total (>5% 적색)
|
|
- "Backlog Suppression": suppressed_total / triage_total (>10% 주황)
|
|
- "Deep summary 안정성": deep_err_total / deep_total (>5% 적색)
|
|
"""
|
|
triage_total: int = 0
|
|
escalated_total: int = 0
|
|
escalation_by_reason: dict[str, int] = {} # long_context / low_confidence / deep_requested / self_declare
|
|
escalation_by_domain: dict[str, int] = {} # safety_reference / news_item / ...
|
|
triage_json_invalid: int = 0 # error_code='triage_json_invalid'
|
|
suppressed_total: int = 0 # suppressed_reason IS NOT NULL
|
|
# Day 4 튜닝 신규 — deep_summary 호출 안정성
|
|
deep_total: int = 0 # mode='summary_deep' 전체
|
|
deep_err_total: int = 0 # error_code IS NOT NULL (call_failed / parse:*)
|
|
|
|
|
|
class DashboardResponse(BaseModel):
|
|
today_added: int
|
|
today_by_domain: list[DomainCount]
|
|
inbox_count: int
|
|
law_alerts: int
|
|
recent_documents: list[RecentDocument]
|
|
pipeline_status: list[PipelineStatus]
|
|
failed_count: int
|
|
total_documents: int
|
|
# 카운트 분리: 문서함(비-note/비-news) / 메모(memo+note) / 뉴스(news)
|
|
documents_count: int = 0
|
|
memos_count: int = 0
|
|
news_count: int = 0
|
|
# §4 — category 기반 카드 + 승인 pending + queue lag
|
|
category_counts: dict[str, int] = {}
|
|
library_pending_suggestions: int = 0
|
|
queue_lag: list[QueueLag] = []
|
|
# PR-B B-3 — tier 관측성
|
|
tier_health: TierHealthStack = TierHealthStack()
|
|
|
|
|
|
@router.get("/", response_model=DashboardResponse)
|
|
async def get_dashboard(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""대시보드 위젯 데이터 집계"""
|
|
|
|
# 오늘 추가된 문서
|
|
today_result = await session.execute(
|
|
select(Document.ai_domain, func.count(Document.id))
|
|
.where(func.date(Document.created_at) == func.current_date())
|
|
.group_by(Document.ai_domain)
|
|
)
|
|
today_rows = today_result.all()
|
|
today_added = sum(row[1] for row in today_rows)
|
|
|
|
# Inbox 미분류 수 (review_status = pending)
|
|
inbox_result = await session.execute(
|
|
select(func.count(Document.id))
|
|
.where(
|
|
Document.review_status == "pending",
|
|
Document.deleted_at == None,
|
|
)
|
|
)
|
|
inbox_count = inbox_result.scalar() or 0
|
|
|
|
# 법령 알림 (오늘)
|
|
law_result = await session.execute(
|
|
select(func.count(Document.id))
|
|
.where(
|
|
Document.source_channel == "law_monitor",
|
|
func.date(Document.created_at) == func.current_date(),
|
|
)
|
|
)
|
|
law_alerts = law_result.scalar() or 0
|
|
|
|
# 최근 문서 7건
|
|
recent_result = await session.execute(
|
|
select(Document)
|
|
.order_by(Document.created_at.desc())
|
|
.limit(7)
|
|
)
|
|
recent_docs = recent_result.scalars().all()
|
|
|
|
# 파이프라인 상태 (24h)
|
|
pipeline_result = await session.execute(
|
|
text("""
|
|
SELECT stage, status, COUNT(*)
|
|
FROM processing_queue
|
|
WHERE created_at > NOW() - INTERVAL '24 hours'
|
|
GROUP BY stage, status
|
|
""")
|
|
)
|
|
|
|
# 실패 건수
|
|
failed_result = await session.execute(
|
|
select(func.count())
|
|
.select_from(ProcessingQueue)
|
|
.where(ProcessingQueue.status == "failed")
|
|
)
|
|
failed_count = failed_result.scalar() or 0
|
|
|
|
# 전체 문서 수 + 카테고리별 분리 (단일 쿼리)
|
|
# 문서함: 비-note, 비-news / 메모: memo+note / 뉴스: news 유입 경로 기준
|
|
count_result = await session.execute(
|
|
text("""
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COUNT(*) FILTER (WHERE source_channel NOT IN ('news', 'law_monitor') AND file_type != 'note') AS documents,
|
|
COUNT(*) FILTER (WHERE source_channel = 'memo' AND file_type = 'note') AS memos,
|
|
COUNT(*) FILTER (WHERE source_channel = 'news') AS news
|
|
FROM documents WHERE deleted_at IS NULL
|
|
""")
|
|
)
|
|
counts = count_result.one()
|
|
total_documents = counts[0]
|
|
documents_count = counts[1]
|
|
memos_count = counts[2]
|
|
news_count = counts[3]
|
|
|
|
# §4 — 카테고리별 count (§1 documents.category enum)
|
|
cat_result = await session.execute(
|
|
text("""
|
|
SELECT category, COUNT(*)
|
|
FROM documents
|
|
WHERE deleted_at IS NULL AND category IS NOT NULL
|
|
GROUP BY category
|
|
""")
|
|
)
|
|
category_counts = {row[0]: row[1] for row in cat_result.all()}
|
|
|
|
# §4 — 승인 대기 (library 제안)
|
|
pending_result = await session.execute(
|
|
text("""
|
|
SELECT COUNT(*)
|
|
FROM documents
|
|
WHERE deleted_at IS NULL
|
|
AND ai_suggestion IS NOT NULL
|
|
AND ai_suggestion->>'proposed_category' = 'library'
|
|
""")
|
|
)
|
|
library_pending_suggestions = pending_result.scalar() or 0
|
|
|
|
# §4 — queue lag (현재 시점 stage 별 적체 신호)
|
|
# extract/classify/embed 외에 stt/thumbnail (§3) 도 자동 포함.
|
|
lag_result = await session.execute(
|
|
text("""
|
|
SELECT
|
|
stage,
|
|
COUNT(*) FILTER (WHERE status='pending') AS pending,
|
|
COUNT(*) FILTER (WHERE status='processing') AS processing,
|
|
COUNT(*) FILTER (WHERE status='failed') AS failed,
|
|
EXTRACT(EPOCH FROM (NOW() - MIN(created_at) FILTER (WHERE status='pending')))::int
|
|
AS oldest_pending_age_sec
|
|
FROM processing_queue
|
|
GROUP BY stage
|
|
ORDER BY stage
|
|
""")
|
|
)
|
|
queue_lag = [
|
|
QueueLag(
|
|
stage=row[0],
|
|
pending=row[1] or 0,
|
|
processing=row[2] or 0,
|
|
failed=row[3] or 0,
|
|
oldest_pending_age_sec=row[4],
|
|
)
|
|
for row in lag_result.all()
|
|
]
|
|
|
|
# ─── PR-B B-3 — tier 관측성 (24h) + Day 4 deep_err 추가 ───
|
|
tier_rows = (await session.execute(text("""
|
|
SELECT
|
|
COUNT(*) FILTER (WHERE mode = 'summary_triage') AS triage_total,
|
|
COUNT(*) FILTER (WHERE mode = 'summary_triage' AND escalated_to_26b = true) AS escalated_total,
|
|
COUNT(*) FILTER (WHERE mode = 'summary_triage' AND error_code = 'triage_json_invalid') AS json_invalid,
|
|
COUNT(*) FILTER (WHERE mode = 'summary_triage' AND suppressed_reason IS NOT NULL) AS suppressed_total,
|
|
COUNT(*) FILTER (WHERE mode = 'summary_deep') AS deep_total,
|
|
COUNT(*) FILTER (WHERE mode = 'summary_deep' AND error_code IS NOT NULL) AS deep_err_total
|
|
FROM analyze_events
|
|
WHERE created_at > NOW() - INTERVAL '24 hours'
|
|
"""))).one()
|
|
|
|
reason_rows = await session.execute(text("""
|
|
SELECT unnest(escalation_reasons) AS reason, COUNT(*) AS n
|
|
FROM analyze_events
|
|
WHERE created_at > NOW() - INTERVAL '24 hours'
|
|
AND mode = 'summary_triage'
|
|
AND escalated_to_26b = true
|
|
GROUP BY 1 ORDER BY 2 DESC
|
|
"""))
|
|
escalation_by_reason = {r[0]: r[1] for r in reason_rows if r[0]}
|
|
|
|
domain_rows = await session.execute(text("""
|
|
SELECT subject_domain, COUNT(*) AS n
|
|
FROM analyze_events
|
|
WHERE created_at > NOW() - INTERVAL '24 hours'
|
|
AND mode = 'summary_triage'
|
|
AND escalated_to_26b = true
|
|
AND subject_domain IS NOT NULL
|
|
GROUP BY 1 ORDER BY 2 DESC
|
|
"""))
|
|
escalation_by_domain = {r[0]: r[1] for r in domain_rows}
|
|
|
|
tier_health = TierHealthStack(
|
|
triage_total=int(tier_rows.triage_total or 0),
|
|
escalated_total=int(tier_rows.escalated_total or 0),
|
|
triage_json_invalid=int(tier_rows.json_invalid or 0),
|
|
suppressed_total=int(tier_rows.suppressed_total or 0),
|
|
deep_total=int(tier_rows.deep_total or 0),
|
|
deep_err_total=int(tier_rows.deep_err_total or 0),
|
|
escalation_by_reason=escalation_by_reason,
|
|
escalation_by_domain=escalation_by_domain,
|
|
)
|
|
|
|
return DashboardResponse(
|
|
today_added=today_added,
|
|
today_by_domain=[
|
|
DomainCount(domain=row[0], count=row[1]) for row in today_rows
|
|
],
|
|
inbox_count=inbox_count,
|
|
law_alerts=law_alerts,
|
|
recent_documents=[
|
|
RecentDocument(
|
|
id=doc.id,
|
|
title=doc.title,
|
|
file_format=doc.file_format,
|
|
ai_domain=doc.ai_domain,
|
|
created_at=doc.created_at.isoformat() if doc.created_at else "",
|
|
)
|
|
for doc in recent_docs
|
|
],
|
|
pipeline_status=[
|
|
PipelineStatus(stage=row[0], status=row[1], count=row[2])
|
|
for row in pipeline_result
|
|
],
|
|
failed_count=failed_count,
|
|
total_documents=total_documents,
|
|
documents_count=documents_count,
|
|
memos_count=memos_count,
|
|
news_count=news_count,
|
|
category_counts=category_counts,
|
|
library_pending_suggestions=library_pending_suggestions,
|
|
queue_lag=queue_lag,
|
|
tier_health=tier_health,
|
|
)
|