feat(upload): 스트리밍 size 검증 + 0바이트 reject + 고아 레코드 방지

기존 `await file.read()` 는 임의 크기 파일을 메모리에 전부 적재한 후 저장해
디스크 고갈 / OOM 공격 벡터 였음. Caddy/home-caddy 프록시 한도에만 의존했고
FastAPI 측 policy enforcement 가 전무했음. 이 커밋으로 서버가 authoritative
으로 강제 집행.

변경:
- `Request` DI 추가 → Content-Length 사전 차단 (max_bytes * slack_ratio 초과 시 413)
- `await file.read()` → 청크 루프 스트리밍 (stream_chunk_bytes 단위)
- 누적 size > max_bytes 시 스트리밍 중 413 (Content-Length 위조 방어)
- 0바이트 파일 → 400 reject (정책: 유의미한 문서 ingest 대상 아님)
- 파일 저장 완료 + close 이후 에만 file_hash 및 DB 레코드 생성
- Document 레코드 와 processing_queue 는 단일 트랜잭션으로 묶고,
  DB 예외 시 session rollback + partial file unlink 로 원자적 정리
- 예외 시 `except Exception` 으로 cleanup (BaseException 계열은 의도적으로 패스)

설정 값: config.yaml `upload.{max_bytes, content_length_slack_ratio, stream_chunk_bytes}`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-17 08:03:43 +09:00
parent 8622a97e7d
commit 7d2e678ea1
+74 -27
View File
@@ -17,6 +17,7 @@ from fastapi import (
Header,
HTTPException,
Query,
Request,
UploadFile,
status,
)
@@ -434,6 +435,7 @@ async def get_document_file(
@router.post("/", response_model=DocumentResponse, status_code=201)
async def upload_document(
request: Request,
file: UploadFile,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
@@ -444,9 +446,33 @@ async def upload_document(
facet_year: int | None = Form(None),
facet_doctype: str | None = Form(None),
):
"""파일 업로드 → Inbox 저장 + DB 등록 + 처리 큐 등록"""
"""파일 업로드 → Inbox 저장 + DB 등록 + 처리 큐 등록.
Size 한도: `settings.upload.max_bytes` (authoritative).
- Content-Length 사전 차단 (slack_ratio 여유) → 413
- 스트리밍 누적 검사 (Content-Length 위조 방어) → 413
- 0바이트 파일은 400 reject
- 파일 저장 완료 후에만 DB 레코드 생성 (고아 레코드 방지)
- 예외 발생 시 partial file cleanup
"""
from core.library import DEFAULT_LIBRARY_PATH, LIBRARY_PREFIX, normalize_library_path
max_bytes = settings.upload.max_bytes
slack_ratio = settings.upload.content_length_slack_ratio
chunk_size = settings.upload.stream_chunk_bytes
# ── 사전 검증 (스트리밍 IO 시작 전) ──
# Content-Length 사전 차단 (multipart 오버헤드 여유 반영)
content_length_header = request.headers.get("content-length")
if content_length_header:
try:
cl = int(content_length_header)
if cl > int(max_bytes * slack_ratio):
raise HTTPException(status_code=413, detail="파일이 너무 큽니다")
except ValueError:
pass # 잘못된 헤더는 스트리밍 단계에서 max_bytes 로 차단
# doc_purpose 검증
if doc_purpose is not None:
doc_purpose = doc_purpose.strip().lower()
@@ -476,7 +502,8 @@ async def upload_document(
if not safe_name or safe_name.startswith("."):
raise HTTPException(status_code=400, detail="유효하지 않은 파일명")
# Inbox에 파일 저장
# ── 대상 경로 결정 ──
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
target = (inbox_dir / safe_name).resolve()
@@ -492,36 +519,56 @@ async def upload_document(
target = inbox_dir.resolve() / f"{stem}_{counter}{suffix}"
counter += 1
content = await file.read()
target.write_bytes(content)
# ── 스트리밍 저장 + 누적 size 검사 ──
written = 0
try:
with target.open("wb") as f:
while chunk := await file.read(chunk_size):
written += len(chunk)
if written > max_bytes:
raise HTTPException(status_code=413, detail="파일이 너무 큽니다")
f.write(chunk)
# with 블록 종료 시 자동 flush + close
if written == 0:
raise HTTPException(status_code=400, detail="빈 파일은 업로드할 수 없습니다")
except Exception:
# partial file cleanup. KeyboardInterrupt/SystemExit 등 BaseException 계열은 잡지 않음.
target.unlink(missing_ok=True)
raise
# ── 파일 저장 완료 후에만 hash + DB 레코드 ──
# 상대 경로 (NAS 루트 기준)
rel_path = str(target.relative_to(Path(settings.nas_mount_path)))
fhash = file_hash(target)
ext = target.suffix.lstrip(".").lower() or "unknown"
# DB 등록
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=len(content),
file_type="immutable",
title=target.stem,
source_channel="manual",
doc_purpose=doc_purpose,
user_tags=[library_tag] if library_tag else [],
facet_company=facet_company or None,
facet_topic=facet_topic or None,
facet_year=facet_year,
facet_doctype=facet_doctype or None,
)
session.add(doc)
await session.flush()
# 처리 큐 등록
await enqueue_stage(session, doc.id, "extract")
await session.commit()
try:
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=written,
file_type="immutable",
title=target.stem,
source_channel="manual",
doc_purpose=doc_purpose,
user_tags=[library_tag] if library_tag else [],
facet_company=facet_company or None,
facet_topic=facet_topic or None,
facet_year=facet_year,
facet_doctype=facet_doctype or None,
)
session.add(doc)
await session.flush()
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리
await enqueue_stage(session, doc.id, "extract")
await session.commit()
except Exception:
# DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback.
# 파일시스템 자원은 DB 와 분리된 자원이므로 명시적 unlink.
target.unlink(missing_ok=True)
raise
return DocumentResponse.model_validate(doc)