cec464ae2d
stt: - services/stt/server.py: lazy → eager preload in FastAPI lifespan. STT_PRELOAD=0 으로 lazy 강제 가능 (개발/테스트). preload 실패해도 프로세스는 살아 있고 /ready false 로 남아 healthcheck 가 unhealthy 처리. - docker-compose.yml: healthcheck /health → /ready. /health 는 단순 liveness 라 모델 미적재 상태도 healthy 로 잡혀 운영 신호 부적합. queue ORM: - app/models/queue.py: process_stage enum 에 'stt'/'thumbnail' 추가 + create_type=False (migration 150/151 가 DB enum 확장 담당). 이게 없으면 stt_worker INSERT 시 SQLAlchemy 가 enum value 를 거부. dashboard 강화 (§4 선제, §3 신규 stage 까지 자동 커버): - app/api/dashboard.py: category_counts + library_pending_suggestions + queue_lag (stage 별 pending/processing/failed + oldest_pending_age_sec). - frontend/src/lib/stores/system.ts: QueueLag 타입 + DashboardSummary 확장. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
231 lines
7.2 KiB
Python
231 lines
7.2 KiB
Python
"""대시보드 위젯 데이터 API"""
|
|
|
|
from typing import Annotated
|
|
|
|
from fastapi import APIRouter, Depends
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import func, select, text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from core.auth import get_current_user
|
|
from core.database import get_session
|
|
from models.document import Document
|
|
from models.queue import ProcessingQueue
|
|
from models.user import User
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
class DomainCount(BaseModel):
|
|
domain: str | None
|
|
count: int
|
|
|
|
|
|
class RecentDocument(BaseModel):
|
|
id: int
|
|
title: str | None
|
|
file_format: str
|
|
ai_domain: str | None
|
|
created_at: str
|
|
|
|
|
|
class PipelineStatus(BaseModel):
|
|
stage: str
|
|
status: str
|
|
count: int
|
|
|
|
|
|
class QueueLag(BaseModel):
|
|
"""파이프라인 stage 별 처리 지연 — 운영 카드용.
|
|
|
|
pipeline_status 는 24h 누적 통계라 현재 적체 신호로 부족.
|
|
queue_lag 는 현재 시점 pending/processing/failed + oldest pending age 로
|
|
"지금 막힌 게 있는가" 를 보여준다.
|
|
"""
|
|
stage: str
|
|
pending: int
|
|
processing: int
|
|
failed: int
|
|
oldest_pending_age_sec: int | None # 가장 오래된 pending 의 created_at 기준 경과 (초)
|
|
|
|
|
|
class DashboardResponse(BaseModel):
|
|
today_added: int
|
|
today_by_domain: list[DomainCount]
|
|
inbox_count: int
|
|
law_alerts: int
|
|
recent_documents: list[RecentDocument]
|
|
pipeline_status: list[PipelineStatus]
|
|
failed_count: int
|
|
total_documents: int
|
|
# 카운트 분리: 문서함(비-note/비-news) / 메모(memo+note) / 뉴스(news)
|
|
documents_count: int = 0
|
|
memos_count: int = 0
|
|
news_count: int = 0
|
|
# §4 — category 기반 카드 + 승인 pending + queue lag
|
|
category_counts: dict[str, int] = {}
|
|
library_pending_suggestions: int = 0
|
|
queue_lag: list[QueueLag] = []
|
|
|
|
|
|
@router.get("/", response_model=DashboardResponse)
|
|
async def get_dashboard(
|
|
user: Annotated[User, Depends(get_current_user)],
|
|
session: Annotated[AsyncSession, Depends(get_session)],
|
|
):
|
|
"""대시보드 위젯 데이터 집계"""
|
|
|
|
# 오늘 추가된 문서
|
|
today_result = await session.execute(
|
|
select(Document.ai_domain, func.count(Document.id))
|
|
.where(func.date(Document.created_at) == func.current_date())
|
|
.group_by(Document.ai_domain)
|
|
)
|
|
today_rows = today_result.all()
|
|
today_added = sum(row[1] for row in today_rows)
|
|
|
|
# Inbox 미분류 수 (review_status = pending)
|
|
inbox_result = await session.execute(
|
|
select(func.count(Document.id))
|
|
.where(
|
|
Document.review_status == "pending",
|
|
Document.deleted_at == None,
|
|
)
|
|
)
|
|
inbox_count = inbox_result.scalar() or 0
|
|
|
|
# 법령 알림 (오늘)
|
|
law_result = await session.execute(
|
|
select(func.count(Document.id))
|
|
.where(
|
|
Document.source_channel == "law_monitor",
|
|
func.date(Document.created_at) == func.current_date(),
|
|
)
|
|
)
|
|
law_alerts = law_result.scalar() or 0
|
|
|
|
# 최근 문서 7건
|
|
recent_result = await session.execute(
|
|
select(Document)
|
|
.order_by(Document.created_at.desc())
|
|
.limit(7)
|
|
)
|
|
recent_docs = recent_result.scalars().all()
|
|
|
|
# 파이프라인 상태 (24h)
|
|
pipeline_result = await session.execute(
|
|
text("""
|
|
SELECT stage, status, COUNT(*)
|
|
FROM processing_queue
|
|
WHERE created_at > NOW() - INTERVAL '24 hours'
|
|
GROUP BY stage, status
|
|
""")
|
|
)
|
|
|
|
# 실패 건수
|
|
failed_result = await session.execute(
|
|
select(func.count())
|
|
.select_from(ProcessingQueue)
|
|
.where(ProcessingQueue.status == "failed")
|
|
)
|
|
failed_count = failed_result.scalar() or 0
|
|
|
|
# 전체 문서 수 + 카테고리별 분리 (단일 쿼리)
|
|
# 문서함: 비-note, 비-news / 메모: memo+note / 뉴스: news 유입 경로 기준
|
|
count_result = await session.execute(
|
|
text("""
|
|
SELECT
|
|
COUNT(*) AS total,
|
|
COUNT(*) FILTER (WHERE source_channel != 'news' AND file_type != 'note') AS documents,
|
|
COUNT(*) FILTER (WHERE source_channel = 'memo' AND file_type = 'note') AS memos,
|
|
COUNT(*) FILTER (WHERE source_channel = 'news') AS news
|
|
FROM documents WHERE deleted_at IS NULL
|
|
""")
|
|
)
|
|
counts = count_result.one()
|
|
total_documents = counts[0]
|
|
documents_count = counts[1]
|
|
memos_count = counts[2]
|
|
news_count = counts[3]
|
|
|
|
# §4 — 카테고리별 count (§1 documents.category enum)
|
|
cat_result = await session.execute(
|
|
text("""
|
|
SELECT category, COUNT(*)
|
|
FROM documents
|
|
WHERE deleted_at IS NULL AND category IS NOT NULL
|
|
GROUP BY category
|
|
""")
|
|
)
|
|
category_counts = {row[0]: row[1] for row in cat_result.all()}
|
|
|
|
# §4 — 승인 대기 (library 제안)
|
|
pending_result = await session.execute(
|
|
text("""
|
|
SELECT COUNT(*)
|
|
FROM documents
|
|
WHERE deleted_at IS NULL
|
|
AND ai_suggestion IS NOT NULL
|
|
AND ai_suggestion->>'proposed_category' = 'library'
|
|
""")
|
|
)
|
|
library_pending_suggestions = pending_result.scalar() or 0
|
|
|
|
# §4 — queue lag (현재 시점 stage 별 적체 신호)
|
|
# extract/classify/embed 외에 stt/thumbnail (§3) 도 자동 포함.
|
|
lag_result = await session.execute(
|
|
text("""
|
|
SELECT
|
|
stage,
|
|
COUNT(*) FILTER (WHERE status='pending') AS pending,
|
|
COUNT(*) FILTER (WHERE status='processing') AS processing,
|
|
COUNT(*) FILTER (WHERE status='failed') AS failed,
|
|
EXTRACT(EPOCH FROM (NOW() - MIN(created_at) FILTER (WHERE status='pending')))::int
|
|
AS oldest_pending_age_sec
|
|
FROM processing_queue
|
|
GROUP BY stage
|
|
ORDER BY stage
|
|
""")
|
|
)
|
|
queue_lag = [
|
|
QueueLag(
|
|
stage=row[0],
|
|
pending=row[1] or 0,
|
|
processing=row[2] or 0,
|
|
failed=row[3] or 0,
|
|
oldest_pending_age_sec=row[4],
|
|
)
|
|
for row in lag_result.all()
|
|
]
|
|
|
|
return DashboardResponse(
|
|
today_added=today_added,
|
|
today_by_domain=[
|
|
DomainCount(domain=row[0], count=row[1]) for row in today_rows
|
|
],
|
|
inbox_count=inbox_count,
|
|
law_alerts=law_alerts,
|
|
recent_documents=[
|
|
RecentDocument(
|
|
id=doc.id,
|
|
title=doc.title,
|
|
file_format=doc.file_format,
|
|
ai_domain=doc.ai_domain,
|
|
created_at=doc.created_at.isoformat() if doc.created_at else "",
|
|
)
|
|
for doc in recent_docs
|
|
],
|
|
pipeline_status=[
|
|
PipelineStatus(stage=row[0], status=row[1], count=row[2])
|
|
for row in pipeline_result
|
|
],
|
|
failed_count=failed_count,
|
|
total_documents=total_documents,
|
|
documents_count=documents_count,
|
|
memos_count=memos_count,
|
|
news_count=news_count,
|
|
category_counts=category_counts,
|
|
library_pending_suggestions=library_pending_suggestions,
|
|
queue_lag=queue_lag,
|
|
)
|