feat(api): Phase E.2 — analyze_events 테이블 + 로깅

POST /documents/{id}/analyze 호출을 DB에 기록. failure mode 분류 + source 식별.

- migrations/137: analyze_events 테이블 (doc_id FK, mode, truncated, layers_returned JSONB, cached, latency_ms, error_code, source TEXT NOT NULL DEFAULT 'document_server', prompt_version)
- ORM: models/analyze_event.py 신규
- services/document_telemetry.py: record_analyze_event() + sanitize_source() 서버 fallback 강제 (enum 외 → unknown, None → document_server)
- app/api/documents.py:
  · X-Source 헤더 + BackgroundTasks 의존성 추가
  · try/finally 패턴으로 성공/cache/에러 모든 exit에서 background insert
  · error_code: None(성공) | not_found | no_text | timeout | llm | parse | missing_summary

Phase F에서 nanoclaude가 X-Source: synology_chat 헤더로 호출하면 source 구분 가능.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-16 13:58:58 +09:00
parent 72b7e65fca
commit 8a8096a444
4 changed files with 288 additions and 111 deletions
+166 -111
View File
@@ -9,7 +9,17 @@ from pathlib import Path
from typing import Annotated, Literal
from urllib.parse import quote
from fastapi import APIRouter, Depends, Form, HTTPException, Query, UploadFile, status
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
Form,
Header,
HTTPException,
Query,
UploadFile,
status,
)
from fastapi.responses import FileResponse
from pydantic import BaseModel
from sqlalchemy import func, select
@@ -23,6 +33,8 @@ from core.utils import file_hash
from models.document import Document
from models.queue import ProcessingQueue, enqueue_stage
from models.user import User
from services.document_telemetry import record_analyze_event, sanitize_source
from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_model
from services.search.llm_gate import get_mlx_gate
router = APIRouter()
@@ -756,125 +768,168 @@ async def analyze_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
x_source: Annotated[str | None, Header(alias="X-Source")] = None,
) -> AnalyzeResponse:
"""문서 전문을 Gemma 4로 구조화 분석. 층(근거/해설/사례/요약) 중 해당 없는 것은 생략."""
"""문서 전문을 Gemma 4로 구조화 분석. 층(근거/해설/사례/요약) 중 해당 없는 것은 생략.
Phase E.2: analyze_events 로깅. try/finally로 성공/에러 모두 background insert.
X-Source 헤더로 호출자 식별 (document_server / synology_chat / ui_search / ui_detail / eval).
"""
t_start = time.perf_counter()
source = sanitize_source(x_source)
# telemetry 변수 (try/finally에서 참조)
truncated_flag = False
cached_flag = False
layers_returned: list[str] = []
error_code: str | None = None
# 1. 문서 조회
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# 2. 텍스트 확보
raw_text = doc.extracted_text or ""
if not raw_text.strip():
raise HTTPException(status_code=404, detail="텍스트 추출 미완료")
truncated = len(raw_text) > ANALYZE_TEXT_LIMIT
doc_text = raw_text[:ANALYZE_TEXT_LIMIT]
# 3. 캐시 확인 (키: doc_id + updated_at/created_at)
cache_key = _analyze_cache_key(doc_id, doc.updated_at, doc.created_at)
cached = _analyze_cache_get(cache_key)
if cached is not None:
logger.info("document.analyze cache_hit doc_id=%s user=%s", doc_id, getattr(user, "username", "?"))
return AnalyzeResponse(
id=cached.id,
title=cached.title,
layers=cached.layers,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
truncated=cached.truncated,
cached=True,
)
# 4. 프롬프트 구성
if not ANALYZE_PROMPT:
raise HTTPException(status_code=500, detail="분석 프롬프트 미설치")
prompt = ANALYZE_PROMPT.replace("{document_title}", doc.title or "").replace(
"{document_text}", doc_text
)
# 5. LLM 호출 (MLX gate + timeout 안쪽)
ai_client = AIClient()
raw: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(ANALYZE_TIMEOUT_S):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
except asyncio.TimeoutError:
logger.warning("document.analyze timeout doc_id=%s", doc_id)
raise HTTPException(status_code=504, detail="분석 시간이 초과되었습니다")
except Exception as exc:
logger.warning("document.analyze llm_error doc_id=%s err=%s", doc_id, type(exc).__name__)
raise HTTPException(status_code=502, detail="AI 서버 일시 오류")
finally:
try:
await ai_client.close()
except Exception:
pass
# 1. 문서 조회
doc = await session.get(Document, doc_id)
if not doc:
error_code = "not_found"
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# 6. JSON 파싱
parsed = parse_json_response(raw or "")
if not isinstance(parsed, dict):
logger.warning("document.analyze parse_failed doc_id=%s raw_preview=%s", doc_id, (raw or "")[:200])
raise HTTPException(status_code=422, detail="분석 결과 파싱 실패")
# 2. 텍스트 확보
raw_text = doc.extracted_text or ""
if not raw_text.strip():
error_code = "no_text"
raise HTTPException(status_code=404, detail="텍스트 추출 미완료")
# 7. 층 검증 + 억지 채움 제거
raw_layers = parsed.get("layers") or []
if not isinstance(raw_layers, list):
raise HTTPException(status_code=422, detail="분석 결과 형식 오류")
truncated_flag = len(raw_text) > ANALYZE_TEXT_LIMIT
doc_text = raw_text[:ANALYZE_TEXT_LIMIT]
layer_titles = {
"evidence": "근거",
"explanation": "해설",
"examples": "사례",
"summary": "요약",
}
valid_layers: list[AnalysisLayer] = []
seen_layers: set[str] = set()
for item in raw_layers:
if not isinstance(item, dict):
continue
layer_type = item.get("layer")
content = (item.get("content") or "").strip()
if layer_type not in layer_titles:
continue
if layer_type in seen_layers:
continue
if len(content) < ANALYZE_LAYER_MIN_CHARS:
continue
if _is_skip_content(content):
continue
valid_layers.append(
AnalysisLayer(
layer=layer_type, # type: ignore[arg-type]
title=item.get("title") or layer_titles[layer_type],
content=content,
# 3. 캐시 확인 (키: doc_id + updated_at/created_at)
cache_key = _analyze_cache_key(doc_id, doc.updated_at, doc.created_at)
cached = _analyze_cache_get(cache_key)
if cached is not None:
logger.info("document.analyze cache_hit doc_id=%s user=%s", doc_id, getattr(user, "username", "?"))
cached_flag = True
layers_returned = [la.layer for la in cached.layers]
truncated_flag = cached.truncated
return AnalyzeResponse(
id=cached.id,
title=cached.title,
layers=cached.layers,
elapsed_ms=(time.perf_counter() - t_start) * 1000,
truncated=cached.truncated,
cached=True,
)
# 4. 프롬프트 구성
if not ANALYZE_PROMPT:
error_code = "llm" # 설정 오류도 llm 범주
raise HTTPException(status_code=500, detail="분석 프롬프트 미설치")
prompt = ANALYZE_PROMPT.replace("{document_title}", doc.title or "").replace(
"{document_text}", doc_text
)
seen_layers.add(layer_type)
if not valid_layers or "summary" not in seen_layers:
logger.warning("document.analyze missing_summary doc_id=%s layers=%s", doc_id, seen_layers)
raise HTTPException(status_code=422, detail="분석 결과에 요약이 없습니다")
# 5. LLM 호출 (MLX gate + timeout 안쪽)
ai_client = AIClient()
raw: str | None = None
try:
async with get_mlx_gate():
async with asyncio.timeout(ANALYZE_TIMEOUT_S):
raw = await ai_client._call_chat(ai_client.ai.primary, prompt)
except asyncio.TimeoutError:
logger.warning("document.analyze timeout doc_id=%s", doc_id)
error_code = "timeout"
raise HTTPException(status_code=504, detail="분석 시간이 초과되었습니다")
except Exception as exc:
logger.warning("document.analyze llm_error doc_id=%s err=%s", doc_id, type(exc).__name__)
error_code = "llm"
raise HTTPException(status_code=502, detail="AI 서버 일시 오류")
finally:
try:
await ai_client.close()
except Exception:
pass
# 8. 응답 + 캐시 저장
elapsed_ms = (time.perf_counter() - t_start) * 1000
result = AnalyzeResponse(
id=doc.id,
title=doc.title,
layers=valid_layers,
elapsed_ms=elapsed_ms,
truncated=truncated,
cached=False,
)
_analyze_cache_set(cache_key, result)
# 6. JSON 파싱
parsed = parse_json_response(raw or "")
if not isinstance(parsed, dict):
logger.warning("document.analyze parse_failed doc_id=%s raw_preview=%s", doc_id, (raw or "")[:200])
error_code = "parse"
raise HTTPException(status_code=422, detail="분석 결과 파싱 실패")
logger.info(
"document.analyze ok doc_id=%s user=%s layers=%d elapsed_ms=%.0f",
doc_id,
getattr(user, "username", "?"),
len(valid_layers),
elapsed_ms,
)
return result
# 7. 층 검증 + 억지 채움 제거
raw_layers = parsed.get("layers") or []
if not isinstance(raw_layers, list):
error_code = "parse"
raise HTTPException(status_code=422, detail="분석 결과 형식 오류")
layer_titles = {
"evidence": "근거",
"explanation": "해설",
"examples": "사례",
"summary": "요약",
}
valid_layers: list[AnalysisLayer] = []
seen_layers: set[str] = set()
for item in raw_layers:
if not isinstance(item, dict):
continue
layer_type = item.get("layer")
content = (item.get("content") or "").strip()
if layer_type not in layer_titles:
continue
if layer_type in seen_layers:
continue
if len(content) < ANALYZE_LAYER_MIN_CHARS:
continue
if _is_skip_content(content):
continue
valid_layers.append(
AnalysisLayer(
layer=layer_type, # type: ignore[arg-type]
title=item.get("title") or layer_titles[layer_type],
content=content,
)
)
seen_layers.add(layer_type)
if not valid_layers or "summary" not in seen_layers:
logger.warning("document.analyze missing_summary doc_id=%s layers=%s", doc_id, seen_layers)
error_code = "missing_summary"
raise HTTPException(status_code=422, detail="분석 결과에 요약이 없습니다")
# 8. 응답 + 캐시 저장
elapsed_ms = (time.perf_counter() - t_start) * 1000
result = AnalyzeResponse(
id=doc.id,
title=doc.title,
layers=valid_layers,
elapsed_ms=elapsed_ms,
truncated=truncated_flag,
cached=False,
)
_analyze_cache_set(cache_key, result)
layers_returned = [la.layer for la in valid_layers]
logger.info(
"document.analyze ok doc_id=%s user=%s layers=%d elapsed_ms=%.0f",
doc_id,
getattr(user, "username", "?"),
len(valid_layers),
elapsed_ms,
)
return result
finally:
# Phase E.2: 모든 exit (성공/cache/에러) 에서 analyze_events INSERT
latency_ms = int((time.perf_counter() - t_start) * 1000)
background_tasks.add_task(
record_analyze_event,
doc_id=doc_id,
user_id=getattr(user, "id", None),
mode="quick",
text_limit=ANALYZE_TEXT_LIMIT,
truncated=truncated_flag,
layers_returned=layers_returned,
cached=cached_flag,
latency_ms=latency_ms,
model_name=resolve_primary_model(),
prompt_version=ANALYZE_PROMPT_VERSION,
error_code=error_code,
source=source,
)
+42
View File
@@ -0,0 +1,42 @@
"""analyze_events 테이블 ORM — POST /documents/{id}/analyze 호출 관측 (Phase E.2)
목적: 분석 failure mode 분류 (timeout / parse / llm / missing_summary) +
source 별 사용 패턴 (document_server / synology_chat / ui_search / ui_detail / eval).
단계 3 snapshot DB 설계 입력이 됨.
"""
from datetime import datetime
from typing import Any
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class AnalyzeEvent(Base):
__tablename__ = "analyze_events"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
doc_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="CASCADE"), nullable=False
)
user_id: Mapped[int | None] = mapped_column(
BigInteger, ForeignKey("users.id", ondelete="SET NULL")
)
mode: Mapped[str] = mapped_column(Text, default="quick", nullable=False) # quick / full
text_limit: Mapped[int | None] = mapped_column(Integer)
truncated: Mapped[bool] = mapped_column(Boolean, default=False)
layers_returned: Mapped[list[Any] | None] = mapped_column(JSONB, default=list)
cached: Mapped[bool] = mapped_column(Boolean, default=False)
latency_ms: Mapped[int | None] = mapped_column(Integer)
model_name: Mapped[str | None] = mapped_column(Text)
prompt_version: Mapped[str | None] = mapped_column(Text)
# None (success) | "timeout" | "llm" | "parse" | "missing_summary" | "no_text"
error_code: Mapped[str | None] = mapped_column(Text)
# document_server / synology_chat / ui_search / ui_detail / eval / unknown
source: Mapped[str] = mapped_column(Text, default="document_server", nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
+79
View File
@@ -0,0 +1,79 @@
"""document 관련 telemetry — Phase E.2 (analyze_events).
/documents/{id}/analyze 호출을 background task로 DB에 기록.
search_telemetry.py 패턴 동일 (단독 세션 + 에러 흡수).
"""
from __future__ import annotations
import logging
from typing import Any
from sqlalchemy.exc import SQLAlchemyError
from core.database import async_session
from models.analyze_event import AnalyzeEvent
logger = logging.getLogger("document_telemetry")
# source enum validation — 서버 강제 fallback
VALID_SOURCES: set[str] = {
"document_server",
"synology_chat",
"ui_search",
"ui_detail",
"eval",
"unknown",
}
DEFAULT_SOURCE = "document_server"
def sanitize_source(raw: str | None) -> str:
"""source 값 서버 강제. enum 외 값은 unknown, None은 document_server."""
if raw is None:
return DEFAULT_SOURCE
lowered = raw.strip().lower()
if lowered in VALID_SOURCES:
return lowered
return "unknown"
async def record_analyze_event(
doc_id: int,
user_id: int | None,
mode: str,
text_limit: int | None,
truncated: bool,
layers_returned: list[str],
cached: bool,
latency_ms: int,
model_name: str | None,
prompt_version: str | None,
error_code: str | None,
source: str,
) -> None:
"""analyze_events INSERT. background task에서 호출 — 에러 삼킴.
layers_returned: 성공 시 ["evidence","summary"] 등 layer 문자열 리스트. 실패 시 [].
error_code: None (성공) | "timeout" | "llm" | "parse" | "missing_summary" | "no_text" | "not_found"
"""
try:
async with async_session() as session:
row = AnalyzeEvent(
doc_id=doc_id,
user_id=user_id,
mode=mode,
text_limit=text_limit,
truncated=truncated,
layers_returned=layers_returned,
cached=cached,
latency_ms=latency_ms,
model_name=model_name,
prompt_version=prompt_version,
error_code=error_code,
source=source,
)
session.add(row)
await session.commit()
except SQLAlchemyError as exc:
logger.warning(f"analyze_event insert failed: {exc}")
+1
View File
@@ -0,0 +1 @@
CREATE TABLE IF NOT EXISTS analyze_events (id BIGSERIAL PRIMARY KEY, doc_id BIGINT NOT NULL REFERENCES documents(id) ON DELETE CASCADE, user_id BIGINT REFERENCES users(id) ON DELETE SET NULL, mode TEXT NOT NULL DEFAULT 'quick', text_limit INT, truncated BOOLEAN DEFAULT false, layers_returned JSONB DEFAULT '[]'::jsonb, cached BOOLEAN DEFAULT false, latency_ms INT, model_name TEXT, prompt_version TEXT, error_code TEXT, source TEXT NOT NULL DEFAULT 'document_server', created_at TIMESTAMPTZ DEFAULT now())