feat: implement Phase 2 core features

- Add document CRUD API (list/get/upload/update/delete with auth)
  - Upload saves to Inbox + auto-enqueues processing pipeline
  - Delete defaults to DB-only, explicit flag for file deletion
- Add hybrid search API (FTS 0.4 + trigram 0.2 + vector 0.4 weighted)
  - Modes: fts, trgm, vector, hybrid (default)
  - Vector search gracefully degrades if GPU unavailable
- Add Inbox file watcher (5min interval, new file + hash change detection)
- Register documents/search routers and file_watcher scheduler in main.py
- Add IVFFLAT vector index migration (lists=50, with tuning guide)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-02 14:49:12 +09:00
parent 2dfb05e653
commit 4b695332b9
5 changed files with 520 additions and 4 deletions

215
app/api/documents.py Normal file
View File

@@ -0,0 +1,215 @@
"""문서 CRUD API"""
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, status
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from core.utils import file_hash
from models.document import Document
from models.queue import ProcessingQueue
from models.user import User
router = APIRouter()
# ─── 스키마 ───
class DocumentResponse(BaseModel):
id: int
file_path: str
file_format: str
file_size: int | None
file_type: str
title: str | None
ai_domain: str | None
ai_sub_group: str | None
ai_tags: list | None
ai_summary: str | None
source_channel: str | None
data_origin: str | None
extracted_at: datetime | None
ai_processed_at: datetime | None
embedded_at: datetime | None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class DocumentListResponse(BaseModel):
items: list[DocumentResponse]
total: int
page: int
page_size: int
class DocumentUpdate(BaseModel):
title: str | None = None
ai_domain: str | None = None
ai_sub_group: str | None = None
ai_tags: list | None = None
source_channel: str | None = None
data_origin: str | None = None
# ─── 엔드포인트 ───
@router.get("/", response_model=DocumentListResponse)
async def list_documents(
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
domain: str | None = None,
source: str | None = None,
format: str | None = None,
):
"""문서 목록 조회 (페이지네이션 + 필터)"""
query = select(Document)
if domain:
query = query.where(Document.ai_domain == domain)
if source:
query = query.where(Document.source_channel == source)
if format:
query = query.where(Document.file_format == format)
# 전체 건수
count_query = select(func.count()).select_from(query.subquery())
total = (await session.execute(count_query)).scalar()
# 페이지네이션
query = query.order_by(Document.created_at.desc())
query = query.offset((page - 1) * page_size).limit(page_size)
result = await session.execute(query)
items = result.scalars().all()
return DocumentListResponse(
items=[DocumentResponse.model_validate(doc) for doc in items],
total=total,
page=page,
page_size=page_size,
)
@router.get("/{doc_id}", response_model=DocumentResponse)
async def get_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 단건 조회"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
return DocumentResponse.model_validate(doc)
@router.post("/", response_model=DocumentResponse, status_code=201)
async def upload_document(
file: UploadFile,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""파일 업로드 → Inbox 저장 + DB 등록 + 처리 큐 등록"""
if not file.filename:
raise HTTPException(status_code=400, detail="파일명이 필요합니다")
# Inbox에 파일 저장
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
target = inbox_dir / file.filename
# 중복 파일명 처리
counter = 1
stem, suffix = target.stem, target.suffix
while target.exists():
target = inbox_dir / f"{stem}_{counter}{suffix}"
counter += 1
content = await file.read()
target.write_bytes(content)
# 상대 경로 (NAS 루트 기준)
rel_path = str(target.relative_to(Path(settings.nas_mount_path)))
fhash = file_hash(target)
ext = target.suffix.lstrip(".").lower() or "unknown"
# DB 등록
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=len(content),
file_type="immutable",
title=target.stem,
source_channel="manual",
)
session.add(doc)
await session.flush()
# 처리 큐 등록
session.add(ProcessingQueue(
document_id=doc.id,
stage="extract",
status="pending",
))
await session.commit()
return DocumentResponse.model_validate(doc)
@router.patch("/{doc_id}", response_model=DocumentResponse)
async def update_document(
doc_id: int,
body: DocumentUpdate,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 메타데이터 수정 (수동 오버라이드)"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
update_data = body.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(doc, field, value)
doc.updated_at = datetime.now(timezone.utc)
await session.commit()
return DocumentResponse.model_validate(doc)
@router.delete("/{doc_id}")
async def delete_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
delete_file: bool = Query(False, description="NAS 파일도 함께 삭제"),
):
"""문서 삭제 (기본: DB만 삭제, 파일 유지)"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
if delete_file:
file_path = Path(settings.nas_mount_path) / doc.file_path
if file_path.exists():
file_path.unlink()
await session.delete(doc)
await session.commit()
return {"message": f"문서 {doc_id} 삭제됨", "file_deleted": delete_file}

189
app/api/search.py Normal file
View File

@@ -0,0 +1,189 @@
"""하이브리드 검색 API — FTS + 트리그램 + 벡터"""
from typing import Annotated
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from core.auth import get_current_user
from core.database import get_session
from models.user import User
router = APIRouter()
# 가중치 (초기값, 튜닝 가능)
W_FTS = 0.4
W_TRGM = 0.2
W_VECTOR = 0.4
class SearchResult(BaseModel):
id: int
title: str | None
ai_domain: str | None
ai_summary: str | None
file_format: str
score: float
snippet: str | None
class SearchResponse(BaseModel):
results: list[SearchResult]
total: int
query: str
mode: str
@router.get("/", response_model=SearchResponse)
async def search(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
mode: str = Query("hybrid", regex="^(fts|trgm|vector|hybrid)$"),
limit: int = Query(20, ge=1, le=100),
):
"""문서 검색
mode:
- fts: PostgreSQL 전문검색 (GIN 인덱스)
- trgm: 트리그램 부분매칭 (한국어 지원)
- vector: 벡터 유사도 검색 (의미검색)
- hybrid: FTS + 트리그램 + 벡터 결합 (기본)
"""
if mode == "fts":
results = await _search_fts(session, q, limit)
elif mode == "trgm":
results = await _search_trgm(session, q, limit)
elif mode == "vector":
results = await _search_vector(session, q, limit)
else:
results = await _search_hybrid(session, q, limit)
return SearchResponse(
results=results,
total=len(results),
query=q,
mode=mode,
)
async def _search_fts(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""PostgreSQL 전문검색 (GIN 인덱스)"""
# simple 설정으로 한국어 토큰화 없이 공백 기반 분리
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
ts_rank(
to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, '')),
plainto_tsquery('simple', :query)
) AS score,
left(extracted_text, 200) AS snippet
FROM documents
WHERE to_tsvector('simple', coalesce(title, '') || ' ' || coalesce(extracted_text, ''))
@@ plainto_tsquery('simple', :query)
ORDER BY score DESC
LIMIT :limit
"""),
{"query": query, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def _search_trgm(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""트리그램 부분매칭 (한국어 지원)"""
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
similarity(
coalesce(title, '') || ' ' || coalesce(extracted_text, ''),
:query
) AS score,
left(extracted_text, 200) AS snippet
FROM documents
WHERE (coalesce(title, '') || ' ' || coalesce(extracted_text, '')) %% :query
ORDER BY score DESC
LIMIT :limit
"""),
{"query": query, "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def _search_vector(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""벡터 유사도 검색 (코사인 거리)"""
client = AIClient()
try:
query_embedding = await client.embed(query)
except Exception:
return [] # GPU 서버 불가 시 빈 결과
finally:
await client.close()
# pgvector 코사인 거리 (0=동일, 2=반대)
result = await session.execute(
text("""
SELECT id, title, ai_domain, ai_summary, file_format,
(1 - (embedding <=> :embedding::vector)) AS score,
left(extracted_text, 200) AS snippet
FROM documents
WHERE embedding IS NOT NULL
ORDER BY embedding <=> :embedding::vector
LIMIT :limit
"""),
{"embedding": str(query_embedding), "limit": limit},
)
return [SearchResult(**row._mapping) for row in result]
async def _search_hybrid(session: AsyncSession, query: str, limit: int) -> list[SearchResult]:
"""하이브리드 검색 — FTS + 트리그램 + 벡터 가중 합산"""
# 벡터 임베딩 생성 (실패 시 FTS+트리그램만)
query_embedding = None
try:
client = AIClient()
query_embedding = await client.embed(query)
await client.close()
except Exception:
pass
vector_clause = ""
vector_score = "0"
params = {"query": query, "limit": limit, "w_fts": W_FTS, "w_trgm": W_TRGM, "w_vector": W_VECTOR}
if query_embedding:
vector_clause = "LEFT JOIN LATERAL (SELECT 1 - (d.embedding <=> :embedding::vector) AS vscore) v ON true"
vector_score = "coalesce(v.vscore, 0)"
params["embedding"] = str(query_embedding)
else:
# 벡터 없으면 FTS+트리그램만 사용
params["w_fts"] = 0.6
params["w_trgm"] = 0.4
params["w_vector"] = 0.0
result = await session.execute(
text(f"""
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
(
:w_fts * coalesce(ts_rank(
to_tsvector('simple', coalesce(d.title, '') || ' ' || coalesce(d.extracted_text, '')),
plainto_tsquery('simple', :query)
), 0)
+ :w_trgm * coalesce(similarity(
coalesce(d.title, '') || ' ' || coalesce(d.extracted_text, ''),
:query
), 0)
+ :w_vector * {vector_score}
) AS score,
left(d.extracted_text, 200) AS snippet
FROM documents d
{vector_clause}
WHERE coalesce(d.extracted_text, '') != ''
ORDER BY score DESC
LIMIT :limit
"""),
params,
)
return [SearchResult(**row._mapping) for row in result]

View File

@@ -7,6 +7,8 @@ from fastapi.responses import RedirectResponse
from sqlalchemy import func, select, text
from api.auth import router as auth_router
from api.documents import router as documents_router
from api.search import router as search_router
from api.setup import router as setup_router
from core.config import settings
from core.database import async_session, engine, init_db
@@ -17,14 +19,16 @@ from models.user import User
async def lifespan(app: FastAPI):
"""앱 시작/종료 시 실행되는 lifespan 핸들러"""
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from workers.file_watcher import watch_inbox
from workers.queue_consumer import consume_queue
# 시작: DB 연결 확인
await init_db()
# APScheduler: 큐 소비자 1분 간격 실행
# APScheduler: 백그라운드 작업
scheduler = AsyncIOScheduler()
scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer")
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
scheduler.start()
yield
@@ -44,10 +48,10 @@ app = FastAPI(
# ─── 라우터 등록 ───
app.include_router(setup_router, prefix="/api/setup", tags=["setup"])
app.include_router(auth_router, prefix="/api/auth", tags=["auth"])
app.include_router(documents_router, prefix="/api/documents", tags=["documents"])
app.include_router(search_router, prefix="/api/search", tags=["search"])
# TODO: Phase 2에서 추가
# app.include_router(documents.router, prefix="/api/documents", tags=["documents"])
# app.include_router(search.router, prefix="/api/search", tags=["search"])
# TODO: Phase 3~4에서 추가
# app.include_router(tasks.router, prefix="/api/tasks", tags=["tasks"])
# app.include_router(dashboard.router, prefix="/api/dashboard", tags=["dashboard"])
# app.include_router(export.router, prefix="/api/export", tags=["export"])

View File

@@ -0,0 +1,97 @@
"""파일 감시 워커 — Inbox 디렉토리 스캔, 새 파일/변경 파일 자동 등록"""
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash, setup_logger
from models.document import Document
from models.queue import ProcessingQueue
logger = setup_logger("file_watcher")
# 무시할 파일
SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"}
SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload"}
def should_skip(path: Path) -> bool:
if path.name in SKIP_NAMES or path.name.startswith("._"):
return True
if path.suffix.lower() in SKIP_EXTENSIONS:
return True
return False
async def watch_inbox():
"""Inbox 디렉토리를 스캔하여 새/변경 파일을 DB에 등록"""
inbox_path = Path(settings.nas_mount_path) / "PKM" / "Inbox"
if not inbox_path.exists():
return
files = [f for f in inbox_path.rglob("*") if f.is_file() and not should_skip(f)]
if not files:
return
new_count = 0
changed_count = 0
async with async_session() as session:
for file_path in files:
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
fhash = file_hash(file_path)
# DB에서 기존 문서 확인
result = await session.execute(
select(Document).where(Document.file_path == rel_path)
)
existing = result.scalar_one_or_none()
if existing is None:
# 새 파일 → 등록
ext = file_path.suffix.lstrip(".").lower() or "unknown"
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=file_path.stat().st_size,
file_type="immutable",
title=file_path.stem,
source_channel="drive_sync",
)
session.add(doc)
await session.flush()
session.add(ProcessingQueue(
document_id=doc.id,
stage="extract",
status="pending",
))
new_count += 1
elif existing.file_hash != fhash:
# 해시 변경 → 재가공
existing.file_hash = fhash
existing.file_size = file_path.stat().st_size
# 기존 pending/processing 큐 항목이 없으면 extract부터 재시작
queue_check = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == existing.id,
ProcessingQueue.status.in_(["pending", "processing"]),
)
)
if not queue_check.scalar_one_or_none():
session.add(ProcessingQueue(
document_id=existing.id,
stage="extract",
status="pending",
))
changed_count += 1
await session.commit()
if new_count or changed_count:
logger.info(f"[Inbox] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")

View File

@@ -0,0 +1,11 @@
-- 벡터 유사도 인덱스 (코사인 거리)
-- 주의: lists 값은 문서 수에 따라 조정 필요
-- 문서 수 < 1,000: 인덱스 불필요 (seq scan이 더 빠름)
-- 문서 수 1,000~10,000: lists = 문서수 / 50
-- 문서 수 10,000+: lists = 문서수 / 100
-- 초기 마이그레이션 후 문서 수 확인하여 lists 값 조정할 것
-- 최초 실행 시 lists=50으로 시작 (500~2,500건 최적)
CREATE INDEX IF NOT EXISTS idx_documents_embedding
ON documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 50);