From 4b695332b984c4c94e5949ad16d1051a56e99fff Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Thu, 2 Apr 2026 14:49:12 +0900 Subject: [PATCH] feat: implement Phase 2 core features - Add document CRUD API (list/get/upload/update/delete with auth) - Upload saves to Inbox + auto-enqueues processing pipeline - Delete defaults to DB-only, explicit flag for file deletion - Add hybrid search API (FTS 0.4 + trigram 0.2 + vector 0.4 weighted) - Modes: fts, trgm, vector, hybrid (default) - Vector search gracefully degrades if GPU unavailable - Add Inbox file watcher (5min interval, new file + hash change detection) - Register documents/search routers and file_watcher scheduler in main.py - Add IVFFLAT vector index migration (lists=50, with tuning guide) Co-Authored-By: Claude Opus 4.6 (1M context) --- app/api/documents.py | 215 ++++++++++++++++++++++++++++++++ app/api/search.py | 189 ++++++++++++++++++++++++++++ app/main.py | 12 +- app/workers/file_watcher.py | 97 ++++++++++++++ migrations/002_vector_index.sql | 11 ++ 5 files changed, 520 insertions(+), 4 deletions(-) create mode 100644 app/api/documents.py create mode 100644 app/api/search.py create mode 100644 app/workers/file_watcher.py create mode 100644 migrations/002_vector_index.sql diff --git a/app/api/documents.py b/app/api/documents.py new file mode 100644 index 0000000..6530867 --- /dev/null +++ b/app/api/documents.py @@ -0,0 +1,215 @@ +"""문서 CRUD API""" + +import shutil +from datetime import datetime, timezone +from pathlib import Path +from typing import Annotated + +from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, status +from pydantic import BaseModel +from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession + +from core.auth import get_current_user +from core.config import settings +from core.database import get_session +from core.utils import file_hash +from models.document import Document +from models.queue import ProcessingQueue +from models.user import User + +router = APIRouter() + + +# ─── 스키마 ─── + + +class DocumentResponse(BaseModel): + id: int + file_path: str + file_format: str + file_size: int | None + file_type: str + title: str | None + ai_domain: str | None + ai_sub_group: str | None + ai_tags: list | None + ai_summary: str | None + source_channel: str | None + data_origin: str | None + extracted_at: datetime | None + ai_processed_at: datetime | None + embedded_at: datetime | None + created_at: datetime + updated_at: datetime + + class Config: + from_attributes = True + + +class DocumentListResponse(BaseModel): + items: list[DocumentResponse] + total: int + page: int + page_size: int + + +class DocumentUpdate(BaseModel): + title: str | None = None + ai_domain: str | None = None + ai_sub_group: str | None = None + ai_tags: list | None = None + source_channel: str | None = None + data_origin: str | None = None + + +# ─── 엔드포인트 ─── + + +@router.get("/", response_model=DocumentListResponse) +async def list_documents( + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], + page: int = Query(1, ge=1), + page_size: int = Query(20, ge=1, le=100), + domain: str | None = None, + source: str | None = None, + format: str | None = None, +): + """문서 목록 조회 (페이지네이션 + 필터)""" + query = select(Document) + + if domain: + query = query.where(Document.ai_domain == domain) + if source: + query = query.where(Document.source_channel == source) + if format: + query = query.where(Document.file_format == format) + + # 전체 건수 + count_query = select(func.count()).select_from(query.subquery()) + total = (await session.execute(count_query)).scalar() + + # 페이지네이션 + query = query.order_by(Document.created_at.desc()) + query = query.offset((page - 1) * page_size).limit(page_size) + result = await session.execute(query) + items = result.scalars().all() + + return DocumentListResponse( + items=[DocumentResponse.model_validate(doc) for doc in items], + total=total, + page=page, + page_size=page_size, + ) + + +@router.get("/{doc_id}", response_model=DocumentResponse) +async def get_document( + doc_id: int, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """문서 단건 조회""" + doc = await session.get(Document, doc_id) + if not doc: + raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") + return DocumentResponse.model_validate(doc) + + +@router.post("/", response_model=DocumentResponse, status_code=201) +async def upload_document( + file: UploadFile, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """파일 업로드 → Inbox 저장 + DB 등록 + 처리 큐 등록""" + if not file.filename: + raise HTTPException(status_code=400, detail="파일명이 필요합니다") + + # Inbox에 파일 저장 + inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox" + inbox_dir.mkdir(parents=True, exist_ok=True) + target = inbox_dir / file.filename + + # 중복 파일명 처리 + counter = 1 + stem, suffix = target.stem, target.suffix + while target.exists(): + target = inbox_dir / f"{stem}_{counter}{suffix}" + counter += 1 + + content = await file.read() + target.write_bytes(content) + + # 상대 경로 (NAS 루트 기준) + rel_path = str(target.relative_to(Path(settings.nas_mount_path))) + fhash = file_hash(target) + ext = target.suffix.lstrip(".").lower() or "unknown" + + # DB 등록 + doc = Document( + file_path=rel_path, + file_hash=fhash, + file_format=ext, + file_size=len(content), + file_type="immutable", + title=target.stem, + source_channel="manual", + ) + session.add(doc) + await session.flush() + + # 처리 큐 등록 + session.add(ProcessingQueue( + document_id=doc.id, + stage="extract", + status="pending", + )) + await session.commit() + + return DocumentResponse.model_validate(doc) + + +@router.patch("/{doc_id}", response_model=DocumentResponse) +async def update_document( + doc_id: int, + body: DocumentUpdate, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], +): + """문서 메타데이터 수정 (수동 오버라이드)""" + doc = await session.get(Document, doc_id) + if not doc: + raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") + + update_data = body.model_dump(exclude_unset=True) + for field, value in update_data.items(): + setattr(doc, field, value) + doc.updated_at = datetime.now(timezone.utc) + await session.commit() + + return DocumentResponse.model_validate(doc) + + +@router.delete("/{doc_id}") +async def delete_document( + doc_id: int, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], + delete_file: bool = Query(False, description="NAS 파일도 함께 삭제"), +): + """문서 삭제 (기본: DB만 삭제, 파일 유지)""" + doc = await session.get(Document, doc_id) + if not doc: + raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다") + + if delete_file: + file_path = Path(settings.nas_mount_path) / doc.file_path + if file_path.exists(): + file_path.unlink() + + await session.delete(doc) + await session.commit() + + return {"message": f"문서 {doc_id} 삭제됨", "file_deleted": delete_file} diff --git a/app/api/search.py b/app/api/search.py new file mode 100644 index 0000000..5c0f014 --- /dev/null +++ b/app/api/search.py @@ -0,0 +1,189 @@ +"""하이브리드 검색 API — FTS + 트리그램 + 벡터""" + +from typing import Annotated + +from fastapi import APIRouter, Depends, Query +from pydantic import BaseModel +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +from ai.client import AIClient +from core.auth import get_current_user +from core.database import get_session +from models.user import User + +router = APIRouter() + +# 가중치 (초기값, 튜닝 가능) +W_FTS = 0.4 +W_TRGM = 0.2 +W_VECTOR = 0.4 + + +class SearchResult(BaseModel): + id: int + title: str | None + ai_domain: str | None + ai_summary: str | None + file_format: str + score: float + snippet: str | None + + +class SearchResponse(BaseModel): + results: list[SearchResult] + total: int + query: str + mode: str + + +@router.get("/", response_model=SearchResponse) +async def search( + q: str, + user: Annotated[User, Depends(get_current_user)], + session: Annotated[AsyncSession, Depends(get_session)], + mode: str = Query("hybrid", regex="^(fts|trgm|vector|hybrid)$"), + limit: int = Query(20, ge=1, le=100), +): + """문서 검색 + + mode: + - fts: PostgreSQL 전문검색 (GIN 인덱스) + - trgm: 트리그램 부분매칭 (한국어 지원) + - vector: 벡터 유사도 검색 (의미검색) + - hybrid: FTS + 트리그램 + 벡터 결합 (기본) + """ + if mode == "fts": + results = await _search_fts(session, q, limit) + elif mode == "trgm": + results = await _search_trgm(session, q, limit) + elif mode == "vector": + results = await _search_vector(session, q, limit) + else: + results = await _search_hybrid(session, q, limit) + + return SearchResponse( + results=results, + total=len(results), + query=q, + mode=mode, + ) + + +async def _search_fts(session: AsyncSession, query: str, limit: int) -> list[SearchResult]: + """PostgreSQL 전문검색 (GIN 인덱스)""" + # simple 설정으로 한국어 토큰화 없이 공백 기반 분리 + result = await session.execute( + text(""" + SELECT id, title, ai_domain, ai_summary, file_format, + ts_rank( + to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')), + plainto_tsquery('simple', :query) + ) AS score, + left(extracted_text, 200) AS snippet + FROM documents + WHERE to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')) + @@ plainto_tsquery('simple', :query) + ORDER BY score DESC + LIMIT :limit + """), + {"query": query, "limit": limit}, + ) + return [SearchResult(**row._mapping) for row in result] + + +async def _search_trgm(session: AsyncSession, query: str, limit: int) -> list[SearchResult]: + """트리그램 부분매칭 (한국어 지원)""" + result = await session.execute( + text(""" + SELECT id, title, ai_domain, ai_summary, file_format, + similarity( + coalesce(title, '') || ' ' || coalesce(extracted_text, ''), + :query + ) AS score, + left(extracted_text, 200) AS snippet + FROM documents + WHERE (coalesce(title, '') || ' ' || coalesce(extracted_text, '')) %% :query + ORDER BY score DESC + LIMIT :limit + """), + {"query": query, "limit": limit}, + ) + return [SearchResult(**row._mapping) for row in result] + + +async def _search_vector(session: AsyncSession, query: str, limit: int) -> list[SearchResult]: + """벡터 유사도 검색 (코사인 거리)""" + client = AIClient() + try: + query_embedding = await client.embed(query) + except Exception: + return [] # GPU 서버 불가 시 빈 결과 + finally: + await client.close() + + # pgvector 코사인 거리 (0=동일, 2=반대) + result = await session.execute( + text(""" + SELECT id, title, ai_domain, ai_summary, file_format, + (1 - (embedding <=> :embedding::vector)) AS score, + left(extracted_text, 200) AS snippet + FROM documents + WHERE embedding IS NOT NULL + ORDER BY embedding <=> :embedding::vector + LIMIT :limit + """), + {"embedding": str(query_embedding), "limit": limit}, + ) + return [SearchResult(**row._mapping) for row in result] + + +async def _search_hybrid(session: AsyncSession, query: str, limit: int) -> list[SearchResult]: + """하이브리드 검색 — FTS + 트리그램 + 벡터 가중 합산""" + # 벡터 임베딩 생성 (실패 시 FTS+트리그램만) + query_embedding = None + try: + client = AIClient() + query_embedding = await client.embed(query) + await client.close() + except Exception: + pass + + vector_clause = "" + vector_score = "0" + params = {"query": query, "limit": limit, "w_fts": W_FTS, "w_trgm": W_TRGM, "w_vector": W_VECTOR} + + if query_embedding: + vector_clause = "LEFT JOIN LATERAL (SELECT 1 - (d.embedding <=> :embedding::vector) AS vscore) v ON true" + vector_score = "coalesce(v.vscore, 0)" + params["embedding"] = str(query_embedding) + else: + # 벡터 없으면 FTS+트리그램만 사용 + params["w_fts"] = 0.6 + params["w_trgm"] = 0.4 + params["w_vector"] = 0.0 + + result = await session.execute( + text(f""" + SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format, + ( + :w_fts * coalesce(ts_rank( + to_tsvector('simple', coalesce(d.title, '') || ' ' || coalesce(d.extracted_text, '')), + plainto_tsquery('simple', :query) + ), 0) + + :w_trgm * coalesce(similarity( + coalesce(d.title, '') || ' ' || coalesce(d.extracted_text, ''), + :query + ), 0) + + :w_vector * {vector_score} + ) AS score, + left(d.extracted_text, 200) AS snippet + FROM documents d + {vector_clause} + WHERE coalesce(d.extracted_text, '') != '' + ORDER BY score DESC + LIMIT :limit + """), + params, + ) + return [SearchResult(**row._mapping) for row in result] diff --git a/app/main.py b/app/main.py index 8621ab8..2167924 100644 --- a/app/main.py +++ b/app/main.py @@ -7,6 +7,8 @@ from fastapi.responses import RedirectResponse from sqlalchemy import func, select, text from api.auth import router as auth_router +from api.documents import router as documents_router +from api.search import router as search_router from api.setup import router as setup_router from core.config import settings from core.database import async_session, engine, init_db @@ -17,14 +19,16 @@ from models.user import User async def lifespan(app: FastAPI): """앱 시작/종료 시 실행되는 lifespan 핸들러""" from apscheduler.schedulers.asyncio import AsyncIOScheduler + from workers.file_watcher import watch_inbox from workers.queue_consumer import consume_queue # 시작: DB 연결 확인 await init_db() - # APScheduler: 큐 소비자 1분 간격 실행 + # APScheduler: 백그라운드 작업 scheduler = AsyncIOScheduler() scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer") + scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher") scheduler.start() yield @@ -44,10 +48,10 @@ app = FastAPI( # ─── 라우터 등록 ─── app.include_router(setup_router, prefix="/api/setup", tags=["setup"]) app.include_router(auth_router, prefix="/api/auth", tags=["auth"]) +app.include_router(documents_router, prefix="/api/documents", tags=["documents"]) +app.include_router(search_router, prefix="/api/search", tags=["search"]) -# TODO: Phase 2에서 추가 -# app.include_router(documents.router, prefix="/api/documents", tags=["documents"]) -# app.include_router(search.router, prefix="/api/search", tags=["search"]) +# TODO: Phase 3~4에서 추가 # app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"]) # app.include_router(dashboard.router, prefix="/api/dashboard", tags=["dashboard"]) # app.include_router(export.router, prefix="/api/export", tags=["export"]) diff --git a/app/workers/file_watcher.py b/app/workers/file_watcher.py new file mode 100644 index 0000000..4600314 --- /dev/null +++ b/app/workers/file_watcher.py @@ -0,0 +1,97 @@ +"""파일 감시 워커 — Inbox 디렉토리 스캔, 새 파일/변경 파일 자동 등록""" + +from pathlib import Path + +from sqlalchemy import select + +from core.config import settings +from core.database import async_session +from core.utils import file_hash, setup_logger +from models.document import Document +from models.queue import ProcessingQueue + +logger = setup_logger("file_watcher") + +# 무시할 파일 +SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"} +SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload"} + + +def should_skip(path: Path) -> bool: + if path.name in SKIP_NAMES or path.name.startswith("._"): + return True + if path.suffix.lower() in SKIP_EXTENSIONS: + return True + return False + + +async def watch_inbox(): + """Inbox 디렉토리를 스캔하여 새/변경 파일을 DB에 등록""" + inbox_path = Path(settings.nas_mount_path) / "PKM" / "Inbox" + if not inbox_path.exists(): + return + + files = [f for f in inbox_path.rglob("*") if f.is_file() and not should_skip(f)] + if not files: + return + + new_count = 0 + changed_count = 0 + + async with async_session() as session: + for file_path in files: + rel_path = str(file_path.relative_to(Path(settings.nas_mount_path))) + fhash = file_hash(file_path) + + # DB에서 기존 문서 확인 + result = await session.execute( + select(Document).where(Document.file_path == rel_path) + ) + existing = result.scalar_one_or_none() + + if existing is None: + # 새 파일 → 등록 + ext = file_path.suffix.lstrip(".").lower() or "unknown" + doc = Document( + file_path=rel_path, + file_hash=fhash, + file_format=ext, + file_size=file_path.stat().st_size, + file_type="immutable", + title=file_path.stem, + source_channel="drive_sync", + ) + session.add(doc) + await session.flush() + + session.add(ProcessingQueue( + document_id=doc.id, + stage="extract", + status="pending", + )) + new_count += 1 + + elif existing.file_hash != fhash: + # 해시 변경 → 재가공 + existing.file_hash = fhash + existing.file_size = file_path.stat().st_size + + # 기존 pending/processing 큐 항목이 없으면 extract부터 재시작 + queue_check = await session.execute( + select(ProcessingQueue).where( + ProcessingQueue.document_id == existing.id, + ProcessingQueue.status.in_(["pending", "processing"]), + ) + ) + if not queue_check.scalar_one_or_none(): + session.add(ProcessingQueue( + document_id=existing.id, + stage="extract", + status="pending", + )) + changed_count += 1 + + await session.commit() + + if new_count or changed_count: + logger.info(f"[Inbox] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록") diff --git a/migrations/002_vector_index.sql b/migrations/002_vector_index.sql new file mode 100644 index 0000000..8aed111 --- /dev/null +++ b/migrations/002_vector_index.sql @@ -0,0 +1,11 @@ +-- 벡터 유사도 인덱스 (코사인 거리) +-- 주의: lists 값은 문서 수에 따라 조정 필요 +-- 문서 수 < 1,000: 인덱스 불필요 (seq scan이 더 빠름) +-- 문서 수 1,000~10,000: lists = 문서수 / 50 +-- 문서 수 10,000+: lists = 문서수 / 100 +-- 초기 마이그레이션 후 문서 수 확인하여 lists 값 조정할 것 + +-- 최초 실행 시 lists=50으로 시작 (500~2,500건 최적) +CREATE INDEX IF NOT EXISTS idx_documents_embedding + ON documents USING ivfflat (embedding vector_cosine_ops) + WITH (lists = 50);