security: fix 5 review findings (2 high, 3 medium)

HIGH:
- Lock setup TOTP/NAS endpoints behind _require_setup() guard
  (prevented unauthenticated admin 2FA takeover after setup)
- Sanitize upload filename with Path().name + resolve() validation
  (prevented path traversal writing outside Inbox)

MEDIUM:
- Add score > 0.01 filter to hybrid search via subquery
  (prevented returning irrelevant documents with zero score)
- Implement Inbox → Knowledge file move after classification
  (classify_worker now moves files based on ai_domain)
- Add Anthropic Messages API support in _request()
  (premium/Claude path now sends correct format and parses
  content[0].text instead of choices[0].message.content)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-02 15:33:31 +09:00
parent 31d5498f8d
commit d93e50b55c
5 changed files with 114 additions and 37 deletions

View File

@@ -89,19 +89,42 @@ class AIClient:
raise raise
async def _request(self, model_config, prompt: str) -> str: async def _request(self, model_config, prompt: str) -> str:
"""단일 모델 API 호출""" """단일 모델 API 호출 (OpenAI 호환 + Anthropic Messages API)"""
response = await self._http.post( is_anthropic = "anthropic.com" in model_config.endpoint
model_config.endpoint,
json={ if is_anthropic:
"model": model_config.model, import os
"messages": [{"role": "user", "content": prompt}], headers = {
"max_tokens": model_config.max_tokens, "x-api-key": os.getenv("CLAUDE_API_KEY", ""),
}, "anthropic-version": "2023-06-01",
timeout=model_config.timeout, "content-type": "application/json",
) }
response.raise_for_status() response = await self._http.post(
data = response.json() model_config.endpoint,
return data["choices"][0]["message"]["content"] headers=headers,
json={
"model": model_config.model,
"max_tokens": model_config.max_tokens,
"messages": [{"role": "user", "content": prompt}],
},
timeout=model_config.timeout,
)
response.raise_for_status()
data = response.json()
return data["content"][0]["text"]
else:
response = await self._http.post(
model_config.endpoint,
json={
"model": model_config.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": model_config.max_tokens,
},
timeout=model_config.timeout,
)
response.raise_for_status()
data = response.json()
return data["choices"][0]["message"]["content"]
async def close(self): async def close(self):
await self._http.aclose() await self._http.aclose()

View File

@@ -127,16 +127,25 @@ async def upload_document(
if not file.filename: if not file.filename:
raise HTTPException(status_code=400, detail="파일명이 필요합니다") raise HTTPException(status_code=400, detail="파일명이 필요합니다")
# 파일명 정규화 (경로 이탈 방지)
safe_name = Path(file.filename).name
if not safe_name or safe_name.startswith("."):
raise HTTPException(status_code=400, detail="유효하지 않은 파일명")
# Inbox에 파일 저장 # Inbox에 파일 저장
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox" inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True) inbox_dir.mkdir(parents=True, exist_ok=True)
target = inbox_dir / file.filename target = (inbox_dir / safe_name).resolve()
# Inbox 하위 경로 검증
if not str(target).startswith(str(inbox_dir.resolve())):
raise HTTPException(status_code=400, detail="잘못된 파일 경로")
# 중복 파일명 처리 # 중복 파일명 처리
counter = 1 counter = 1
stem, suffix = target.stem, target.suffix stem, suffix = target.stem, target.suffix
while target.exists(): while target.exists():
target = inbox_dir / f"{stem}_{counter}{suffix}" target = inbox_dir.resolve() / f"{stem}_{counter}{suffix}"
counter += 1 counter += 1
content = await file.read() content = await file.read()

View File

@@ -165,23 +165,26 @@ async def _search_hybrid(session: AsyncSession, query: str, limit: int) -> list[
result = await session.execute( result = await session.execute(
text(f""" text(f"""
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format, SELECT * FROM (
( SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
:w_fts * coalesce(ts_rank( (
to_tsvector('simple', coalesce(d.title, '') || ' ' || coalesce(d.extracted_text, '')), :w_fts * coalesce(ts_rank(
plainto_tsquery('simple', :query) to_tsvector('simple', coalesce(d.title, '') || ' ' || coalesce(d.extracted_text, '')),
), 0) plainto_tsquery('simple', :query)
+ :w_trgm * coalesce(similarity( ), 0)
coalesce(d.title, '') || ' ' || coalesce(d.extracted_text, ''), + :w_trgm * coalesce(similarity(
:query coalesce(d.title, '') || ' ' || coalesce(d.extracted_text, ''),
), 0) :query
+ :w_vector * {vector_score} ), 0)
) AS score, + :w_vector * {vector_score}
left(d.extracted_text, 200) AS snippet ) AS score,
FROM documents d left(d.extracted_text, 200) AS snippet
{vector_clause} FROM documents d
WHERE coalesce(d.extracted_text, '') != '' {vector_clause}
ORDER BY score DESC WHERE coalesce(d.extracted_text, '') != ''
) sub
WHERE sub.score > 0.01
ORDER BY sub.score DESC
LIMIT :limit LIMIT :limit
"""), """),
params, params,

View File

@@ -154,8 +154,7 @@ async def totp_init(
session: Annotated[AsyncSession, Depends(get_session)], session: Annotated[AsyncSession, Depends(get_session)],
): ):
"""TOTP 시크릿 생성 + otpauth URI 반환 (DB에 저장하지 않음)""" """TOTP 시크릿 생성 + otpauth URI 반환 (DB에 저장하지 않음)"""
# 셋업 중이거나 인증된 유저만 사용 가능 await _require_setup(session)
# 셋업 중에는 admin 생성 직후 호출됨
secret = pyotp.random_base32() secret = pyotp.random_base32()
totp = pyotp.TOTP(secret) totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri( uri = totp.provisioning_uri(
@@ -171,7 +170,7 @@ async def totp_verify(
session: Annotated[AsyncSession, Depends(get_session)], session: Annotated[AsyncSession, Depends(get_session)],
): ):
"""TOTP 코드 검증 후 DB에 시크릿 저장""" """TOTP 코드 검증 후 DB에 시크릿 저장"""
# 코드 검증 await _require_setup(session)
totp = pyotp.TOTP(body.secret) totp = pyotp.TOTP(body.secret)
if not totp.verify(body.code): if not totp.verify(body.code):
raise HTTPException( raise HTTPException(
@@ -194,8 +193,12 @@ async def totp_verify(
@router.post("/verify-nas", response_model=VerifyNASResponse) @router.post("/verify-nas", response_model=VerifyNASResponse)
async def verify_nas(body: VerifyNASRequest): async def verify_nas(
body: VerifyNASRequest,
session: Annotated[AsyncSession, Depends(get_session)],
):
"""NAS 마운트 경로 읽기/쓰기 테스트""" """NAS 마운트 경로 읽기/쓰기 테스트"""
await _require_setup(session)
path = Path(body.path) path = Path(body.path)
exists = path.exists() exists = path.exists()
readable = path.is_dir() and any(True for _ in path.iterdir()) if exists else False readable = path.is_dir() and any(True for _ in path.iterdir()) if exists else False

View File

@@ -1,10 +1,13 @@
"""AI 분류 워커 — Qwen3.5로 도메인/태그/요약 생성""" """AI 분류 워커 — Qwen3.5로 도메인/태그/요약 생성 + Inbox→Knowledge 이동"""
import shutil
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response from ai.client import AIClient, parse_json_response
from core.config import settings
from core.utils import setup_logger from core.utils import setup_logger
from models.document import Document from models.document import Document
@@ -67,6 +70,10 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc.ai_model_version = "qwen3.5-35b-a3b" doc.ai_model_version = "qwen3.5-35b-a3b"
doc.ai_processed_at = datetime.now(timezone.utc) doc.ai_processed_at = datetime.now(timezone.utc)
# ─── Inbox → Knowledge 폴더 이동 ───
if doc.file_path.startswith("PKM/Inbox/") and domain:
_move_to_knowledge(doc, domain)
logger.info( logger.info(
f"[분류] document_id={document_id}: " f"[분류] document_id={document_id}: "
f"domain={domain}, tags={doc.ai_tags}, summary={len(summary)}" f"domain={domain}, tags={doc.ai_tags}, summary={len(summary)}"
@@ -74,3 +81,35 @@ async def process(document_id: int, session: AsyncSession) -> None:
finally: finally:
await client.close() await client.close()
def _move_to_knowledge(doc: Document, domain: str):
"""분류 완료 후 Inbox에서 Knowledge 폴더로 파일 이동"""
nas_root = Path(settings.nas_mount_path)
src = nas_root / doc.file_path
if not src.exists():
logger.warning(f"[이동] 원본 파일 없음: {src}")
return
# 대상 경로: PKM/{domain}/{파일명}
sub_group = doc.ai_sub_group
if sub_group:
new_rel = f"PKM/{domain}/{sub_group}/{src.name}"
else:
new_rel = f"PKM/{domain}/{src.name}"
dst = nas_root / new_rel
dst.parent.mkdir(parents=True, exist_ok=True)
# 중복 파일명 처리
counter = 1
stem, suffix = dst.stem, dst.suffix
while dst.exists():
dst = dst.parent / f"{stem}_{counter}{suffix}"
new_rel = str(dst.relative_to(nas_root))
counter += 1
shutil.move(str(src), str(dst))
doc.file_path = new_rel
logger.info(f"[이동] {doc.file_path}{new_rel}")