diff --git a/app/api/documents.py b/app/api/documents.py index 99770e1..2c2acf8 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -9,7 +9,17 @@ from pathlib import Path from typing import Annotated, Literal from urllib.parse import quote -from fastapi import APIRouter, Depends, Form, HTTPException, Query, UploadFile, status +from fastapi import ( + APIRouter, + BackgroundTasks, + Depends, + Form, + Header, + HTTPException, + Query, + UploadFile, + status, +) from fastapi.responses import FileResponse from pydantic import BaseModel from sqlalchemy import func, select @@ -23,6 +33,8 @@ from core.utils import file_hash from models.document import Document from models.queue import ProcessingQueue, enqueue_stage from models.user import User +from services.document_telemetry import record_analyze_event, sanitize_source +from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_model from services.search.llm_gate import get_mlx_gate router = APIRouter() @@ -756,125 +768,168 @@ async def analyze_document( doc_id: int, user: Annotated[User, Depends(get_current_user)], session: Annotated[AsyncSession, Depends(get_session)], + background_tasks: BackgroundTasks, + x_source: Annotated[str | None, Header(alias="X-Source")] = None, ) -> AnalyzeResponse: - """문서 전문을 Gemma 4로 구조화 분석. 층(근거/해설/사례/요약) 중 해당 없는 것은 생략.""" + """문서 전문을 Gemma 4로 구조화 분석. 층(근거/해설/사례/요약) 중 해당 없는 것은 생략. + + Phase E.2: analyze_events 로깅. try/finally로 성공/에러 모두 background insert. + X-Source 헤더로 호출자 식별 (document_server / synology_chat / ui_search / ui_detail / eval). + """ t_start = time.perf_counter() + source = sanitize_source(x_source) + # telemetry 변수 (try/finally에서 참조) + truncated_flag = False + cached_flag = False + layers_returned: list[str] = [] + error_code: str | None = None - # 1. 문서 조회 - doc = await session.get(Document, doc_id) - if not doc: - raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") - - # 2. 텍스트 확보 - raw_text = doc.extracted_text or "" - if not raw_text.strip(): - raise HTTPException(status_code=404, detail="텍스트 추출 미완료") - - truncated = len(raw_text) > ANALYZE_TEXT_LIMIT - doc_text = raw_text[:ANALYZE_TEXT_LIMIT] - - # 3. 캐시 확인 (키: doc_id + updated_at/created_at) - cache_key = _analyze_cache_key(doc_id, doc.updated_at, doc.created_at) - cached = _analyze_cache_get(cache_key) - if cached is not None: - logger.info("document.analyze cache_hit doc_id=%s user=%s", doc_id, getattr(user, "username", "?")) - return AnalyzeResponse( - id=cached.id, - title=cached.title, - layers=cached.layers, - elapsed_ms=(time.perf_counter() - t_start) * 1000, - truncated=cached.truncated, - cached=True, - ) - - # 4. 프롬프트 구성 - if not ANALYZE_PROMPT: - raise HTTPException(status_code=500, detail="분석 프롬프트 미설치") - prompt = ANALYZE_PROMPT.replace("{document_title}", doc.title or "").replace( - "{document_text}", doc_text - ) - - # 5. LLM 호출 (MLX gate + timeout 안쪽) - ai_client = AIClient() - raw: str | None = None try: - async with get_mlx_gate(): - async with asyncio.timeout(ANALYZE_TIMEOUT_S): - raw = await ai_client._call_chat(ai_client.ai.primary, prompt) - except asyncio.TimeoutError: - logger.warning("document.analyze timeout doc_id=%s", doc_id) - raise HTTPException(status_code=504, detail="분석 시간이 초과되었습니다") - except Exception as exc: - logger.warning("document.analyze llm_error doc_id=%s err=%s", doc_id, type(exc).__name__) - raise HTTPException(status_code=502, detail="AI 서버 일시 오류") - finally: - try: - await ai_client.close() - except Exception: - pass + # 1. 문서 조회 + doc = await session.get(Document, doc_id) + if not doc: + error_code = "not_found" + raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") - # 6. JSON 파싱 - parsed = parse_json_response(raw or "") - if not isinstance(parsed, dict): - logger.warning("document.analyze parse_failed doc_id=%s raw_preview=%s", doc_id, (raw or "")[:200]) - raise HTTPException(status_code=422, detail="분석 결과 파싱 실패") + # 2. 텍스트 확보 + raw_text = doc.extracted_text or "" + if not raw_text.strip(): + error_code = "no_text" + raise HTTPException(status_code=404, detail="텍스트 추출 미완료") - # 7. 층 검증 + 억지 채움 제거 - raw_layers = parsed.get("layers") or [] - if not isinstance(raw_layers, list): - raise HTTPException(status_code=422, detail="분석 결과 형식 오류") + truncated_flag = len(raw_text) > ANALYZE_TEXT_LIMIT + doc_text = raw_text[:ANALYZE_TEXT_LIMIT] - layer_titles = { - "evidence": "근거", - "explanation": "해설", - "examples": "사례", - "summary": "요약", - } - valid_layers: list[AnalysisLayer] = [] - seen_layers: set[str] = set() - for item in raw_layers: - if not isinstance(item, dict): - continue - layer_type = item.get("layer") - content = (item.get("content") or "").strip() - if layer_type not in layer_titles: - continue - if layer_type in seen_layers: - continue - if len(content) < ANALYZE_LAYER_MIN_CHARS: - continue - if _is_skip_content(content): - continue - valid_layers.append( - AnalysisLayer( - layer=layer_type, # type: ignore[arg-type] - title=item.get("title") or layer_titles[layer_type], - content=content, + # 3. 캐시 확인 (키: doc_id + updated_at/created_at) + cache_key = _analyze_cache_key(doc_id, doc.updated_at, doc.created_at) + cached = _analyze_cache_get(cache_key) + if cached is not None: + logger.info("document.analyze cache_hit doc_id=%s user=%s", doc_id, getattr(user, "username", "?")) + cached_flag = True + layers_returned = [la.layer for la in cached.layers] + truncated_flag = cached.truncated + return AnalyzeResponse( + id=cached.id, + title=cached.title, + layers=cached.layers, + elapsed_ms=(time.perf_counter() - t_start) * 1000, + truncated=cached.truncated, + cached=True, ) + + # 4. 프롬프트 구성 + if not ANALYZE_PROMPT: + error_code = "llm" # 설정 오류도 llm 범주 + raise HTTPException(status_code=500, detail="분석 프롬프트 미설치") + prompt = ANALYZE_PROMPT.replace("{document_title}", doc.title or "").replace( + "{document_text}", doc_text ) - seen_layers.add(layer_type) - if not valid_layers or "summary" not in seen_layers: - logger.warning("document.analyze missing_summary doc_id=%s layers=%s", doc_id, seen_layers) - raise HTTPException(status_code=422, detail="분석 결과에 요약이 없습니다") + # 5. LLM 호출 (MLX gate + timeout 안쪽) + ai_client = AIClient() + raw: str | None = None + try: + async with get_mlx_gate(): + async with asyncio.timeout(ANALYZE_TIMEOUT_S): + raw = await ai_client._call_chat(ai_client.ai.primary, prompt) + except asyncio.TimeoutError: + logger.warning("document.analyze timeout doc_id=%s", doc_id) + error_code = "timeout" + raise HTTPException(status_code=504, detail="분석 시간이 초과되었습니다") + except Exception as exc: + logger.warning("document.analyze llm_error doc_id=%s err=%s", doc_id, type(exc).__name__) + error_code = "llm" + raise HTTPException(status_code=502, detail="AI 서버 일시 오류") + finally: + try: + await ai_client.close() + except Exception: + pass - # 8. 응답 + 캐시 저장 - elapsed_ms = (time.perf_counter() - t_start) * 1000 - result = AnalyzeResponse( - id=doc.id, - title=doc.title, - layers=valid_layers, - elapsed_ms=elapsed_ms, - truncated=truncated, - cached=False, - ) - _analyze_cache_set(cache_key, result) + # 6. JSON 파싱 + parsed = parse_json_response(raw or "") + if not isinstance(parsed, dict): + logger.warning("document.analyze parse_failed doc_id=%s raw_preview=%s", doc_id, (raw or "")[:200]) + error_code = "parse" + raise HTTPException(status_code=422, detail="분석 결과 파싱 실패") - logger.info( - "document.analyze ok doc_id=%s user=%s layers=%d elapsed_ms=%.0f", - doc_id, - getattr(user, "username", "?"), - len(valid_layers), - elapsed_ms, - ) - return result + # 7. 층 검증 + 억지 채움 제거 + raw_layers = parsed.get("layers") or [] + if not isinstance(raw_layers, list): + error_code = "parse" + raise HTTPException(status_code=422, detail="분석 결과 형식 오류") + + layer_titles = { + "evidence": "근거", + "explanation": "해설", + "examples": "사례", + "summary": "요약", + } + valid_layers: list[AnalysisLayer] = [] + seen_layers: set[str] = set() + for item in raw_layers: + if not isinstance(item, dict): + continue + layer_type = item.get("layer") + content = (item.get("content") or "").strip() + if layer_type not in layer_titles: + continue + if layer_type in seen_layers: + continue + if len(content) < ANALYZE_LAYER_MIN_CHARS: + continue + if _is_skip_content(content): + continue + valid_layers.append( + AnalysisLayer( + layer=layer_type, # type: ignore[arg-type] + title=item.get("title") or layer_titles[layer_type], + content=content, + ) + ) + seen_layers.add(layer_type) + + if not valid_layers or "summary" not in seen_layers: + logger.warning("document.analyze missing_summary doc_id=%s layers=%s", doc_id, seen_layers) + error_code = "missing_summary" + raise HTTPException(status_code=422, detail="분석 결과에 요약이 없습니다") + + # 8. 응답 + 캐시 저장 + elapsed_ms = (time.perf_counter() - t_start) * 1000 + result = AnalyzeResponse( + id=doc.id, + title=doc.title, + layers=valid_layers, + elapsed_ms=elapsed_ms, + truncated=truncated_flag, + cached=False, + ) + _analyze_cache_set(cache_key, result) + layers_returned = [la.layer for la in valid_layers] + + logger.info( + "document.analyze ok doc_id=%s user=%s layers=%d elapsed_ms=%.0f", + doc_id, + getattr(user, "username", "?"), + len(valid_layers), + elapsed_ms, + ) + return result + finally: + # Phase E.2: 모든 exit (성공/cache/에러) 에서 analyze_events INSERT + latency_ms = int((time.perf_counter() - t_start) * 1000) + background_tasks.add_task( + record_analyze_event, + doc_id=doc_id, + user_id=getattr(user, "id", None), + mode="quick", + text_limit=ANALYZE_TEXT_LIMIT, + truncated=truncated_flag, + layers_returned=layers_returned, + cached=cached_flag, + latency_ms=latency_ms, + model_name=resolve_primary_model(), + prompt_version=ANALYZE_PROMPT_VERSION, + error_code=error_code, + source=source, + ) diff --git a/app/models/analyze_event.py b/app/models/analyze_event.py new file mode 100644 index 0000000..dbabbe7 --- /dev/null +++ b/app/models/analyze_event.py @@ -0,0 +1,42 @@ +"""analyze_events 테이블 ORM — POST /documents/{id}/analyze 호출 관측 (Phase E.2) + +목적: 분석 failure mode 분류 (timeout / parse / llm / missing_summary) + + source 별 사용 패턴 (document_server / synology_chat / ui_search / ui_detail / eval). + 단계 3 snapshot DB 설계 입력이 됨. +""" + +from datetime import datetime +from typing import Any + +from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, Text +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.orm import Mapped, mapped_column + +from core.database import Base + + +class AnalyzeEvent(Base): + __tablename__ = "analyze_events" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + doc_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("documents.id", ondelete="CASCADE"), nullable=False + ) + user_id: Mapped[int | None] = mapped_column( + BigInteger, ForeignKey("users.id", ondelete="SET NULL") + ) + mode: Mapped[str] = mapped_column(Text, default="quick", nullable=False) # quick / full + text_limit: Mapped[int | None] = mapped_column(Integer) + truncated: Mapped[bool] = mapped_column(Boolean, default=False) + layers_returned: Mapped[list[Any] | None] = mapped_column(JSONB, default=list) + cached: Mapped[bool] = mapped_column(Boolean, default=False) + latency_ms: Mapped[int | None] = mapped_column(Integer) + model_name: Mapped[str | None] = mapped_column(Text) + prompt_version: Mapped[str | None] = mapped_column(Text) + # None (success) | "timeout" | "llm" | "parse" | "missing_summary" | "no_text" + error_code: Mapped[str | None] = mapped_column(Text) + # document_server / synology_chat / ui_search / ui_detail / eval / unknown + source: Mapped[str] = mapped_column(Text, default="document_server", nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, nullable=False + ) diff --git a/app/services/document_telemetry.py b/app/services/document_telemetry.py new file mode 100644 index 0000000..c789b2d --- /dev/null +++ b/app/services/document_telemetry.py @@ -0,0 +1,79 @@ +"""document 관련 telemetry — Phase E.2 (analyze_events). + +/documents/{id}/analyze 호출을 background task로 DB에 기록. +search_telemetry.py 패턴 동일 (단독 세션 + 에러 흡수). +""" + +from __future__ import annotations + +import logging +from typing import Any + +from sqlalchemy.exc import SQLAlchemyError + +from core.database import async_session +from models.analyze_event import AnalyzeEvent + +logger = logging.getLogger("document_telemetry") + +# source enum validation — 서버 강제 fallback +VALID_SOURCES: set[str] = { + "document_server", + "synology_chat", + "ui_search", + "ui_detail", + "eval", + "unknown", +} +DEFAULT_SOURCE = "document_server" + + +def sanitize_source(raw: str | None) -> str: + """source 값 서버 강제. enum 외 값은 unknown, None은 document_server.""" + if raw is None: + return DEFAULT_SOURCE + lowered = raw.strip().lower() + if lowered in VALID_SOURCES: + return lowered + return "unknown" + + +async def record_analyze_event( + doc_id: int, + user_id: int | None, + mode: str, + text_limit: int | None, + truncated: bool, + layers_returned: list[str], + cached: bool, + latency_ms: int, + model_name: str | None, + prompt_version: str | None, + error_code: str | None, + source: str, +) -> None: + """analyze_events INSERT. background task에서 호출 — 에러 삼킴. + + layers_returned: 성공 시 ["evidence","summary"] 등 layer 문자열 리스트. 실패 시 []. + error_code: None (성공) | "timeout" | "llm" | "parse" | "missing_summary" | "no_text" | "not_found" + """ + try: + async with async_session() as session: + row = AnalyzeEvent( + doc_id=doc_id, + user_id=user_id, + mode=mode, + text_limit=text_limit, + truncated=truncated, + layers_returned=layers_returned, + cached=cached, + latency_ms=latency_ms, + model_name=model_name, + prompt_version=prompt_version, + error_code=error_code, + source=source, + ) + session.add(row) + await session.commit() + except SQLAlchemyError as exc: + logger.warning(f"analyze_event insert failed: {exc}") diff --git a/migrations/137_analyze_events.sql b/migrations/137_analyze_events.sql new file mode 100644 index 0000000..464e304 --- /dev/null +++ b/migrations/137_analyze_events.sql @@ -0,0 +1 @@ +CREATE TABLE IF NOT EXISTS analyze_events (id BIGSERIAL PRIMARY KEY, doc_id BIGINT NOT NULL REFERENCES documents(id) ON DELETE CASCADE, user_id BIGINT REFERENCES users(id) ON DELETE SET NULL, mode TEXT NOT NULL DEFAULT 'quick', text_limit INT, truncated BOOLEAN DEFAULT false, layers_returned JSONB DEFAULT '[]'::jsonb, cached BOOLEAN DEFAULT false, latency_ms INT, model_name TEXT, prompt_version TEXT, error_code TEXT, source TEXT NOT NULL DEFAULT 'document_server', created_at TIMESTAMPTZ DEFAULT now())