Files
hyungi_document_server/app/api/memos.py
T
Hyungi Ahn 751cdc5be8 fix(queue): enqueue 경로 중복 방어 — partial unique index + 중앙 enqueue_stage 함수
기존 UNIQUE(document_id, stage, status)는 pending+processing 동시 존재를
허용해서 stale 복구 시 충돌 발생. 2-layer 방어로 근본 차단:

1) DB: partial unique index uq_queue_active — 활성 행(pending/processing)은
   (document_id, stage)당 최대 1개만 허용
2) App: enqueue_stage() 중앙 함수 — INSERT ON CONFLICT DO NOTHING으로
   모든 9개 경로의 check-then-insert TOCTOU race 제거

migration 117은 guard check 포함 — 활성 중복이 남아있으면 RAISE EXCEPTION
으로 중단, 수동 정리 유도.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 08:37:32 +09:00

346 lines
11 KiB
Python

"""메모 CRUD API — 파일 없는 문서(file_type='note')"""
import hashlib
import re
from datetime import datetime, timezone
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import delete, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from models.document import Document
from models.queue import ProcessingQueue, enqueue_stage
from models.user import User
router = APIRouter()
# #태그 파싱 패턴: 한글/영문/숫자/밑줄, 2자 이상
TAG_PATTERN = re.compile(r"(?:^|(?<=\s))#([가-힣a-zA-Z0-9_]{2,})")
def _parse_hashtags(content: str) -> list[str]:
"""본문에서 #태그 추출, 중복 제거, 순서 유지"""
seen: set[str] = set()
tags: list[str] = []
for m in TAG_PATTERN.finditer(content):
tag = m.group(1)
if tag not in seen:
seen.add(tag)
tags.append(tag)
return tags
def _content_hash(content: str) -> str:
"""메모 본문의 SHA-256 해시 (note의 file_hash = content hash)"""
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def _auto_title(content: str) -> str:
"""첫 줄에서 제목 자동 생성 (80자 절단, 마크다운 헤딩 제거)"""
first_line = content.split("\n", 1)[0].strip()
title = re.sub(r"^#+\s*", "", first_line)[:80] or "메모"
return title
async def _enqueue_ai_stages(session: AsyncSession, document_id: int):
"""classify + embed + chunk 큐 등록. 기존 pending 건 정리 (중복 방지)."""
stages = ["classify", "embed", "chunk"]
await session.execute(
delete(ProcessingQueue).where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage.in_(stages),
ProcessingQueue.status == "pending",
)
)
for stage in stages:
await enqueue_stage(session, document_id, stage)
# ─── 스키마 ───
class MemoCreate(BaseModel):
content: str
title: str | None = None # 선택적 제목 (없으면 첫 줄 자동 생성)
ask_includable: bool = True
class MemoUpdate(BaseModel):
content: str
title: str | None = None # 명시 제목 변경 (None이면 자동 생성)
class ArchiveSet(BaseModel):
archived: bool
class MemoResponse(BaseModel):
id: int
title: str | None
content: str | None # extracted_text
file_format: str
user_tags: list | None
ai_tags: list | None
ai_domain: str | None
ai_sub_group: str | None
ai_summary: str | None
pinned: bool
archived: bool
ask_includable: bool
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class MemoListResponse(BaseModel):
items: list[MemoResponse]
total: int
page: int
page_size: int
def _to_memo_response(doc: Document) -> MemoResponse:
return MemoResponse(
id=doc.id,
title=doc.title,
content=doc.extracted_text,
file_format=doc.file_format,
user_tags=doc.user_tags,
ai_tags=doc.ai_tags,
ai_domain=doc.ai_domain,
ai_sub_group=doc.ai_sub_group,
ai_summary=doc.ai_summary,
pinned=doc.pinned,
archived=doc.archived,
ask_includable=doc.ask_includable,
created_at=doc.created_at,
updated_at=doc.updated_at,
)
# ─── 엔드포인트 ───
@router.post("/", response_model=MemoResponse, status_code=201)
async def create_memo(
body: MemoCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 생성 — file_type='note', 파일 없는 문서"""
content = body.content.strip()
if not content:
raise HTTPException(status_code=400, detail="메모 내용이 비어있습니다")
doc = Document(
file_path=None,
file_hash=_content_hash(content),
file_format="md",
file_size=len(content.encode("utf-8")),
file_type="note",
title=body.title.strip() if body.title and body.title.strip() else _auto_title(content),
extracted_text=content,
review_status="approved",
source_channel="memo",
user_tags=_parse_hashtags(content),
pinned=False,
archived=False,
ask_includable=body.ask_includable,
)
session.add(doc)
await session.flush()
await _enqueue_ai_stages(session, doc.id)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.get("/", response_model=MemoListResponse)
async def list_memos(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
tag: str | None = Query(None, description="user_tags 또는 ai_tags 필터"),
archived: bool = Query(False, description="true면 아카이브 목록"),
pinned: bool | None = Query(None, description="true면 핀 고정된 메모만"),
):
"""메모 목록 — 활성: 핀 우선 + 최신순 / 아카이브: 최신순 (핀 무시)"""
base = select(Document).where(
Document.file_type == "note",
Document.source_channel == "memo",
Document.deleted_at == None, # noqa: E711
Document.archived == archived,
)
if pinned is not None:
base = base.where(Document.pinned == pinned)
if tag:
base = base.where(
Document.user_tags.op("@>")(f'["{tag}"]')
| Document.ai_tags.op("@>")(f'["{tag}"]')
)
count_query = select(func.count()).select_from(base.subquery())
total = (await session.execute(count_query)).scalar() or 0
# 활성: pinned DESC + created_at DESC / 아카이브: created_at DESC (핀 무시)
if archived:
query = base.order_by(Document.created_at.desc())
else:
query = base.order_by(Document.pinned.desc(), Document.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await session.execute(query)
items = result.scalars().all()
return MemoListResponse(
items=[_to_memo_response(doc) for doc in items],
total=total,
page=page,
page_size=page_size,
)
@router.get("/{memo_id}", response_model=MemoResponse)
async def get_memo(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 단건 조회"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
return _to_memo_response(doc)
@router.patch("/{memo_id}", response_model=MemoResponse)
async def update_memo(
memo_id: int,
body: MemoUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 수정 — content 변경 시 AI 데이터 초기화 + 재처리 큐 등록"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
content = body.content.strip()
if not content:
raise HTTPException(status_code=400, detail="메모 내용이 비어있습니다")
doc.extracted_text = content
doc.file_hash = _content_hash(content)
doc.file_size = len(content.encode("utf-8"))
doc.title = body.title.strip() if body.title and body.title.strip() else _auto_title(content)
doc.user_tags = _parse_hashtags(content)
# stale AI 데이터 즉시 초기화
doc.ai_summary = None
doc.ai_domain = None
doc.ai_sub_group = None
doc.ai_tags = None
doc.ai_confidence = None
doc.ai_processed_at = None
doc.embedding = None
doc.embedded_at = None
# 기존 chunks 삭제
from models.chunk import DocumentChunk
await session.execute(
delete(DocumentChunk).where(DocumentChunk.doc_id == memo_id)
)
# 재처리 큐 등록
await _enqueue_ai_stages(session, memo_id)
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.delete("/{memo_id}", status_code=204)
async def delete_memo(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 soft delete"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.deleted_at = datetime.now(timezone.utc)
await session.commit()
@router.patch("/{memo_id}/pin", response_model=MemoResponse)
async def toggle_pin(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 핀 토글"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.pinned = not doc.pinned
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.patch("/{memo_id}/archive", response_model=MemoResponse)
async def set_archive(
memo_id: int,
body: ArchiveSet,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 아카이브 설정 (멱등, 토글 아님)"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.archived = body.archived
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.patch("/{memo_id}/ask-includable", response_model=MemoResponse)
async def toggle_ask_includable(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""/ask 합성 포함 여부 토글"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.ask_includable = not doc.ask_includable
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)