Files
hyungi_document_server/app/api/memos.py
T
hyungi 52dd7129a3 feat(memos): include source_channel=email in memo inbox list
list 쿼리 확장:
- 기존 source_channel IN (memo, voice) → OR (source_channel = email AND source_external_id IS NOT NULL)
- mailplus_archive 의 INBOX root archive row (source_external_id=NULL) 는 자동 제외
- inbox_ingest 가 만든 email memo 만 /memos UI 에 노출

MemoResponse 확장:
- source_external_id: Message-ID 또는 imap UID fallback
- email_subject: email_metadata.subject (UI 부제/툴팁)

_to_memo_response 가 email_metadata JSONB 에서 subject 추출.

ingest 가 만든 row 가 UI 에 보이는 게 PR-2B 의 분류 배지/4 버튼/promote flow 자산 재사용의 전제.

plan: ~/.claude/plans/document-enchanted-candy.md
2026-05-12 06:56:44 +00:00

792 lines
28 KiB
Python

"""메모 CRUD API — text 메모(file_type='note') + voice 메모 (file_type='immutable', category='audio', source_channel='voice')
doc_type enum = (immutable, editable, note). 기존 audio 파일이 file_type='immutable' + category='audio'
패턴을 사용하므로 voice 메모도 같은 패턴 따름 (enum 확장 회피).
"""
import hashlib
import logging
import os
import re
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated, Any
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, UploadFile
from pydantic import BaseModel, Field
from sqlalchemy import delete, func, select, or_, and_
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from models.document import Document
from models.event import Event
from models.event_history import EventHistory
from models.queue import ProcessingQueue, enqueue_stage
from models.user import User
# Voice upload 제한 (plan v9 결정 — 10분 / 50MB)
VOICE_MAX_BYTES = 50 * 1024 * 1024
VOICE_ALLOWED_EXTS = {".m4a", ".mp3", ".wav", ".webm", ".ogg", ".opus", ".aac"}
VOICE_ALLOWED_CONTENT_PREFIXES = ("audio/",)
VOICE_NAS_SUBDIR = "PKM/Recordings" # /mnt/nas/Document_Server/PKM/Recordings/{YYYY-MM}/{uuid}.{ext}
logger = logging.getLogger(__name__)
router = APIRouter()
# markdown task line: "- [ ] ..." 또는 "- [x] ..."
TASK_LINE_RE = re.compile(r"^(\s*- \[)([ xX])(\].*)$")
# #태그 파싱 패턴: 한글/영문/숫자/밑줄, 2자 이상
TAG_PATTERN = re.compile(r"(?:^|(?<=\s))#([가-힣a-zA-Z0-9_]{2,})")
def _parse_hashtags(content: str) -> list[str]:
"""본문에서 #태그 추출, 중복 제거, 순서 유지"""
seen: set[str] = set()
tags: list[str] = []
for m in TAG_PATTERN.finditer(content):
tag = m.group(1)
if tag not in seen:
seen.add(tag)
tags.append(tag)
return tags
def _content_hash(content: str) -> str:
"""메모 본문의 SHA-256 해시 (note의 file_hash = content hash)"""
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def _auto_title(content: str) -> str:
"""첫 줄에서 제목 자동 생성 (80자 절단, 마크다운 헤딩 제거)"""
first_line = content.split("\n", 1)[0].strip()
title = re.sub(r"^#+\s*", "", first_line)[:80] or "메모"
return title
def _toggle_task_line(content: str, target_index: int, checked: bool) -> tuple[str, bool]:
"""N번째 markdown task line을 찾아 checked/unchecked 상태로 설정.
(new_content, found) 반환. found=False면 target_index에 해당하는 task line이 없음
(본문 편집으로 drift된 경우).
"""
lines = content.split("\n")
ti = 0
found = False
for i, line in enumerate(lines):
m = TASK_LINE_RE.match(line)
if not m:
continue
if ti == target_index:
mark = "x" if checked else " "
lines[i] = m.group(1) + mark + m.group(3)
found = True
break
ti += 1
return "\n".join(lines), found
def _sync_task_state_with_content(content: str, existing_state: dict | None) -> dict:
"""content 의 체크리스트 상태를 memo_task_state 와 동기화.
- content 의 `- [x]` 중 state 에 checked_at 이 없으면 현재 시각으로 기록
→ 본문에 `- [x]` 로 직접 입력된 legacy 항목도 저장 시각 기준으로 10초 후 숨김 동작.
- content 의 `- [ ]` 에 해당하는 index 는 state 에서 제거.
- content 에 task 가 줄어들어 사라진 index 도 정리.
"""
state = dict(existing_state or {})
current_keys: set[str] = set()
task_idx = 0
now_iso = datetime.now(timezone.utc).isoformat()
for line in (content or "").split("\n"):
m = TASK_LINE_RE.match(line)
if not m:
continue
key = str(task_idx)
is_checked = m.group(2).lower() == "x"
if is_checked:
current_keys.add(key)
entry = state.get(key) or {}
if not entry.get("checked_at"):
state[key] = {"checked_at": now_iso}
# unchecked 는 current_keys 에 넣지 않음 → 아래에서 제거
task_idx += 1
# content 에서 unchecked 가 됐거나 아예 사라진 index 의 state 정리
for k in list(state.keys()):
if k not in current_keys:
state.pop(k, None)
return state
async def _enqueue_ai_stages(session: AsyncSession, document_id: int):
"""classify + embed + chunk 큐 등록. 기존 pending 건 정리 (중복 방지)."""
stages = ["classify", "embed", "chunk"]
await session.execute(
delete(ProcessingQueue).where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage.in_(stages),
ProcessingQueue.status == "pending",
)
)
for stage in stages:
await enqueue_stage(session, document_id, stage)
# ─── 스키마 ───
class MemoCreate(BaseModel):
content: str
title: str | None = None # 선택적 제목 (없으면 첫 줄 자동 생성)
ask_includable: bool = True
class MemoUpdate(BaseModel):
content: str
title: str | None = None # 명시 제목 변경 (None이면 자동 생성)
class ArchiveSet(BaseModel):
archived: bool
class TaskToggle(BaseModel):
checked: bool
class MemoResponse(BaseModel):
id: int
title: str | None
content: str | None # extracted_text
file_format: str
user_tags: list | None
ai_tags: list | None
ai_domain: str | None
ai_sub_group: str | None
ai_summary: str | None
pinned: bool
archived: bool
ask_includable: bool
memo_task_state: dict # {"<task_index>": {"checked_at": "<ISO8601>"}}
# Memo Intake Upgrade PR-2B — AI 추천 분류 (사용자 1-click promote 의 hint)
ai_event_kind: str | None = None
ai_event_confidence: float | None = None
source_channel: str | None = None # voice/memo 등 진입점 식별 (UI 배지)
file_type: str | None = None # audio (voice 메모) vs note (text 메모)
file_path: str | None = None # voice 메모의 NAS audio 경로 (audio player 용)
# PR-4 Email Ingest — 이메일 source 메모 식별 + UI 표시용
source_external_id: str | None = None # email 의 Message-ID 또는 imap UID fallback
email_subject: str | None = None # email_metadata.subject — 메모 카드 부제 / 툴팁
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class MemoListResponse(BaseModel):
items: list[MemoResponse]
total: int
page: int
page_size: int
def _to_memo_response(doc: Document) -> MemoResponse:
return MemoResponse(
id=doc.id,
title=doc.title,
content=doc.extracted_text,
file_format=doc.file_format,
user_tags=doc.user_tags,
ai_tags=doc.ai_tags,
ai_domain=doc.ai_domain,
ai_sub_group=doc.ai_sub_group,
ai_summary=doc.ai_summary,
pinned=doc.pinned,
archived=doc.archived,
ask_includable=doc.ask_includable,
memo_task_state=dict(doc.memo_task_state or {}),
ai_event_kind=doc.ai_event_kind,
ai_event_confidence=doc.ai_event_confidence,
source_channel=doc.source_channel,
file_type=doc.file_type,
file_path=doc.file_path,
source_external_id=doc.source_external_id,
email_subject=(doc.email_metadata or {}).get('subject') if doc.email_metadata else None,
created_at=doc.created_at,
updated_at=doc.updated_at,
)
# ─── 엔드포인트 ───
@router.post("/", response_model=MemoResponse, status_code=201)
async def create_memo(
body: MemoCreate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 생성 — file_type='note', 파일 없는 문서"""
content = body.content.strip()
if not content:
raise HTTPException(status_code=400, detail="메모 내용이 비어있습니다")
doc = Document(
file_path=None,
file_hash=_content_hash(content),
file_format="md",
file_size=len(content.encode("utf-8")),
file_type="note",
title=body.title.strip() if body.title and body.title.strip() else _auto_title(content),
extracted_text=content,
review_status="approved",
source_channel="memo",
user_tags=_parse_hashtags(content),
pinned=False,
archived=False,
ask_includable=body.ask_includable,
# 본문에 `- [x]` 로 입력된 체크 항목도 생성 시각 기준 10초 후 자동 숨김 대상이 되도록 sync.
memo_task_state=_sync_task_state_with_content(content, None),
)
session.add(doc)
await session.flush()
await _enqueue_ai_stages(session, doc.id)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.get("/", response_model=MemoListResponse)
async def list_memos(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
tag: str | None = Query(None, description="user_tags 또는 ai_tags 필터"),
archived: bool = Query(False, description="true면 아카이브 목록"),
pinned: bool | None = Query(None, description="true면 핀 고정된 메모만"),
):
"""메모 목록 — 활성: 핀 우선 + 최신순 / 아카이브: 최신순 (핀 무시)
PR-2C: source_channel='voice' (음성 메모) 도 포함. 사용자 의도 = 메모는 모든 입력의 inbox.
voice 메모는 file_type='immutable' + category='audio' + source_channel='voice' 패턴.
source_channel 만으로 분리 (file_type 필터는 immutable 다른 binary 까지 끌어옴 — 회피).
"""
# PR-4: inbox_ingest 가 만든 email memo 도 포함 (source_external_id != NULL 로 mailplus_archive 의 archive row 제외)
base = select(Document).where(
or_(
Document.source_channel.in_(("memo", "voice")),
and_(Document.source_channel == "email", Document.source_external_id.isnot(None)),
),
Document.deleted_at == None, # noqa: E711
Document.archived == archived,
)
if pinned is not None:
base = base.where(Document.pinned == pinned)
if tag:
base = base.where(
Document.user_tags.op("@>")(f'["{tag}"]')
| Document.ai_tags.op("@>")(f'["{tag}"]')
)
count_query = select(func.count()).select_from(base.subquery())
total = (await session.execute(count_query)).scalar() or 0
# 활성: pinned DESC + created_at DESC / 아카이브: created_at DESC (핀 무시)
if archived:
query = base.order_by(Document.created_at.desc())
else:
query = base.order_by(Document.pinned.desc(), Document.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await session.execute(query)
items = result.scalars().all()
return MemoListResponse(
items=[_to_memo_response(doc) for doc in items],
total=total,
page=page,
page_size=page_size,
)
@router.get("/{memo_id}", response_model=MemoResponse)
async def get_memo(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 단건 조회"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
return _to_memo_response(doc)
@router.patch("/{memo_id}", response_model=MemoResponse)
async def update_memo(
memo_id: int,
body: MemoUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 수정 — content 변경 시 AI 데이터 초기화 + 재처리 큐 등록"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
content = body.content.strip()
if not content:
raise HTTPException(status_code=400, detail="메모 내용이 비어있습니다")
doc.extracted_text = content
doc.file_hash = _content_hash(content)
doc.file_size = len(content.encode("utf-8"))
# 본문 편집으로 task 순서/추가/삭제가 일어났을 수 있으니 state 재동기화.
# `- [x]` 에 checked_at 없으면 이번 수정 시각으로 기록 → 10초 후 자동 숨김 동작.
doc.memo_task_state = _sync_task_state_with_content(content, doc.memo_task_state)
# PATCH semantics: title 필드를 명시적으로 보낸 경우만 덮어쓴다.
# 체크박스 토글 경로처럼 {content}만 PATCH 하면 기존 title을 보존해야 함
# (이전엔 None→_auto_title(content)로 제목이 체크박스 라인으로 덮어씌워지는 버그).
if "title" in body.model_fields_set:
doc.title = body.title.strip() if body.title and body.title.strip() else _auto_title(content)
elif not (doc.title or "").strip():
# 기존 title이 비어 있던 경우만 보강
doc.title = _auto_title(content)
doc.user_tags = _parse_hashtags(content)
# stale AI 데이터 즉시 초기화
doc.ai_summary = None
doc.ai_domain = None
doc.ai_sub_group = None
doc.ai_tags = None
doc.ai_confidence = None
doc.ai_processed_at = None
doc.embedding = None
doc.embedded_at = None
# 기존 chunks 삭제
from models.chunk import DocumentChunk
await session.execute(
delete(DocumentChunk).where(DocumentChunk.doc_id == memo_id)
)
# 재처리 큐 등록
await _enqueue_ai_stages(session, memo_id)
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.patch("/{memo_id}/tasks/{task_index}", response_model=MemoResponse)
async def toggle_memo_task(
memo_id: int,
task_index: int,
body: TaskToggle,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 체크박스 토글 전용 엔드포인트.
N번째 markdown task line의 체크 상태를 설정하고 memo_task_state에 시각 기록.
AI 재처리(classify/embed/chunk)는 **의도적으로 스킵** — 체크박스 한 번에 재분석을 트리거하는 건 과하다.
같은 row를 동시에 토글하는 race 방지를 위해 SELECT ... FOR UPDATE 사용.
"""
# ❶ FOR UPDATE: 같은 row 동시 토글 race 차단 (JSONB 전체 replace라 필수)
doc = await session.get(Document, memo_id, with_for_update=True)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
state = dict(doc.memo_task_state or {})
key = str(task_index)
# ❷ content의 N번째 task line 토글
new_content, found = _toggle_task_line(doc.extracted_text or "", task_index, body.checked)
if not found:
# drift: 사용자가 본문 편집으로 task_index 매칭이 깨짐 → stale state만 정리하고 200 OK
stale_removed = key in state
if stale_removed:
state.pop(key, None)
doc.memo_task_state = state
await session.commit()
await session.refresh(doc)
logger.info(
"memo_task_toggle_drift memo_id=%s task_index=%s stale_removed=%s",
memo_id, task_index, stale_removed,
)
return _to_memo_response(doc)
doc.extracted_text = new_content
doc.file_hash = _content_hash(new_content)
doc.file_size = len(new_content.encode("utf-8"))
# ❸ task_state 갱신 (JSONB 전체 replace — FOR UPDATE lock 아래라 race safe)
if body.checked:
state[key] = {"checked_at": datetime.now(timezone.utc).isoformat()}
else:
state.pop(key, None)
doc.memo_task_state = state
doc.updated_at = datetime.now(timezone.utc)
# AI 재처리 / user_tags 재파싱 / chunks 삭제 / queue enqueue — 모두 의도적 스킵.
# 왜 스킵하는지 나중에 디버깅하지 않아도 되도록 명시 로그.
logger.info(
"memo_task_toggle_skip_ai memo_id=%s task_index=%s checked=%s",
memo_id, task_index, body.checked,
)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.delete("/{memo_id}", status_code=204)
async def delete_memo(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 soft delete"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.deleted_at = datetime.now(timezone.utc)
await session.commit()
@router.patch("/{memo_id}/pin", response_model=MemoResponse)
async def toggle_pin(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 핀 토글"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.pinned = not doc.pinned
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.patch("/{memo_id}/archive", response_model=MemoResponse)
async def set_archive(
memo_id: int,
body: ArchiveSet,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 아카이브 설정 (멱등, 토글 아님)"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.archived = body.archived
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
@router.patch("/{memo_id}/ask-includable", response_model=MemoResponse)
async def toggle_ask_includable(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""/ask 합성 포함 여부 토글"""
doc = await session.get(Document, memo_id)
if not doc or doc.file_type != "note" or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.ask_includable = not doc.ask_includable
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
# ─── Memo Intake Upgrade PR-2B: promote to event ───
class PromotePayload(BaseModel):
"""메모 → events 승급. kind 미지정 시 documents.ai_event_kind 사용.
AI worker 는 events row 직접 생성 X — 본 endpoint 만이 사용자 의도 channel.
"""
kind: str | None = None # 'task' | 'calendar_event' | 'activity_log'
due_at: datetime | None = None
start_at: datetime | None = None
end_at: datetime | None = None
started_at: datetime | None = None
ended_at: datetime | None = None
priority: int | None = None
project_tag: str | None = None
_PROMOTE_KIND_MAP = {
# AI 추천 (event_kind_hint) → events.kind
"task": "task",
"calendar_event": "calendar_event",
"activity_log": "activity_log",
# 'note' / 'reference' 는 promote 대상 아님 (사용자가 명시 kind 지정 필요)
}
@router.post("/{memo_id}/promote-to-event", status_code=201)
async def promote_memo_to_event(
memo_id: int,
body: PromotePayload,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 1건 → events row 1건 생성. memo_document_id 자동 link.
kind 결정 순서: body.kind > documents.ai_event_kind > 400 거부.
한 메모 → N events 가능 (정책: dedup 없음, 사용자 의도 따라).
"""
doc = await session.get(Document, memo_id)
if (
not doc
or doc.deleted_at is not None
or doc.source_channel not in ("memo", "voice")
):
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
# kind 결정
requested = (body.kind or "").strip().lower() or None
ai_hint = (doc.ai_event_kind or "").strip().lower() or None
chosen = requested or ai_hint
event_kind = _PROMOTE_KIND_MAP.get(chosen or "")
if not event_kind:
raise HTTPException(
status_code=400,
detail="promote 할 kind 가 명확하지 않습니다 (task/calendar_event/activity_log 중 1개 지정 또는 ai_event_kind 필요)",
)
# 시간 필드 default — activity_log 는 빠른 행동 기록 UX 그대로
now = datetime.now(timezone.utc)
started_at = body.started_at
ended_at = body.ended_at
completed_at: datetime | None = None
status_val = "inbox"
if event_kind == "activity_log":
ended_at = ended_at or now
started_at = started_at or ended_at
completed_at = now
status_val = "done"
elif event_kind == "calendar_event":
status_val = "scheduled" if body.start_at else "inbox"
title = (doc.title or "").strip() or "메모"
description = doc.extracted_text
ev = Event(
title=title,
description=description,
kind=event_kind,
status=status_val,
due_at=body.due_at,
start_at=body.start_at,
end_at=body.end_at,
started_at=started_at,
ended_at=ended_at,
completed_at=completed_at,
priority=body.priority,
project_tag=body.project_tag,
source="memo",
source_ref=str(doc.id), # 같은 메모 N promote 시 별 row → dedup 의도 X
raw_metadata={
"memo_id": doc.id,
"ai_event_kind": doc.ai_event_kind,
"ai_event_confidence": doc.ai_event_confidence,
"promoted_at": now.isoformat(),
},
memo_document_id=doc.id,
user_id=user.id,
created_by="manual",
)
session.add(ev)
await session.flush()
# events_history.create row (events 도메인 패턴 — events/api/events.py 의 _record_history 와 동일 형태)
history = EventHistory(
event_id=ev.id,
changed_by="manual",
change_kind="create",
before=None,
after={
"id": ev.id,
"title": ev.title,
"kind": ev.kind,
"status": ev.status,
"source": ev.source,
"memo_document_id": ev.memo_document_id,
},
)
session.add(history)
await session.commit()
await session.refresh(ev)
return {
"event_id": ev.id,
"kind": ev.kind,
"status": ev.status,
"memo_document_id": ev.memo_document_id,
}
@router.post("/{memo_id}/dismiss-event-suggestion", response_model=MemoResponse)
async def dismiss_event_suggestion(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""'그냥 메모' — AI 추천 무시 + ai_event_kind='note' 강제. 4 버튼 숨김 신호.
MVP: AI 추천값과 사용자 확정값을 같은 컬럼에 저장 (정확도 측정 흐려짐 가능).
백로그: user_event_kind 별 컬럼 분리 (plan Memo Intake Upgrade 백로그).
"""
doc = await session.get(Document, memo_id)
if (
not doc
or doc.deleted_at is not None
or doc.source_channel not in ("memo", "voice")
):
raise HTTPException(status_code=404, detail="메모를 찾을 수 없습니다")
doc.ai_event_kind = "note"
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)
# ─── Memo Intake Upgrade PR-2C: voice upload ───
@router.post("/voice", response_model=MemoResponse, status_code=201)
async def upload_voice_memo(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
audio: UploadFile = File(...),
recorded_at: str | None = Form(None),
device_hint: str | None = Form(None),
):
"""애플워치 / 모바일 / 기타 음성 메모 업로드 → STT 큐 → 자동 분류.
PR-2C: source_channel='voice' + file_type='audio'. 기존 stt_worker → classify
파이프라인 자동 통과. plan 원칙: AI worker 는 events 직접 생성 X.
"""
# Content-Type 검증
if audio.content_type and not audio.content_type.startswith(VOICE_ALLOWED_CONTENT_PREFIXES):
raise HTTPException(status_code=415, detail=f"지원되지 않는 Content-Type: {audio.content_type}")
# 확장자 결정
orig_name = audio.filename or ""
ext = (Path(orig_name).suffix or "").lower()
if ext and ext not in VOICE_ALLOWED_EXTS:
raise HTTPException(status_code=415, detail=f"지원되지 않는 확장자: {ext}")
if not ext:
# content_type 으로 추정 (audio/m4a 등)
ext = ".m4a"
# 본문 읽기 + size 검증
payload: bytes = await audio.read()
if len(payload) > VOICE_MAX_BYTES:
raise HTTPException(status_code=413, detail=f"50MB 초과 ({len(payload)//1024//1024}MB)")
if len(payload) == 0:
raise HTTPException(status_code=400, detail="빈 audio")
# 저장 경로 (NAS) — fastapi 컨테이너 안 /documents = NAS mount
nas_root = Path(settings.nas_mount_path)
yyyy_mm = datetime.now(timezone.utc).astimezone().strftime("%Y-%m")
target_dir = nas_root / VOICE_NAS_SUBDIR / yyyy_mm
target_dir.mkdir(parents=True, exist_ok=True)
file_uuid = uuid.uuid4().hex
target_path = target_dir / f"{file_uuid}{ext}"
# fsync + rename(atomic) 패턴 — NAS soft mount 안전 (feedback_nfs_korean_path_normalize 결)
tmp_path = target_path.with_suffix(target_path.suffix + ".tmp")
try:
with open(tmp_path, "wb") as fh:
fh.write(payload)
fh.flush()
os.fsync(fh.fileno())
os.replace(tmp_path, target_path)
except OSError as e:
# NAS 쓰기 실패 graceful — DB row 미생성
if tmp_path.exists():
try:
tmp_path.unlink()
except OSError:
pass
logger.error("voice upload NAS write 실패: %s", e)
raise HTTPException(status_code=503, detail="NAS 저장 실패 (재시도 권장)")
# recorded_at 파싱
rec_at: datetime | None = None
if recorded_at:
try:
rec_at = datetime.fromisoformat(recorded_at.replace("Z", "+00:00"))
except ValueError:
rec_at = None
raw_metadata: dict[str, Any] = {}
if device_hint:
raw_metadata["device_hint"] = device_hint
if rec_at:
raw_metadata["recorded_at"] = rec_at.isoformat()
# file_path 는 NAS root 기준 상대 경로 (다른 documents 컨벤션, /api/documents/{id}/file endpoint 호환)
relative_path = target_path.relative_to(nas_root)
# Document row — file_type='immutable' (binary, doc_type enum 제약) + category='audio' + source_channel='voice'
# 기존 audio 컨테이너 인입과 같은 패턴. source_channel='voice' 로 일반 audio 와 구분.
title_seed = (orig_name or "음성 메모").rsplit(".", 1)[0]
doc = Document(
file_path=str(relative_path),
file_hash=hashlib.sha256(payload).hexdigest(),
file_format=ext.lstrip(".") or "m4a",
file_size=len(payload),
file_type="immutable",
title=title_seed[:80] or "음성 메모",
extracted_text=None, # STT 후 채움
review_status="approved",
source_channel="voice",
category="audio",
ask_includable=True,
pinned=False,
archived=False,
memo_task_state={},
extract_meta=raw_metadata or None,
)
session.add(doc)
await session.flush()
# STT 큐 등록 — 기존 stt_worker → classify → embed → chunk 파이프라인 자동
await enqueue_stage(session, doc.id, "stt")
await session.commit()
await session.refresh(doc)
return _to_memo_response(doc)