feat(scripts): upload size 경계 검증 스크립트 (수동 on-demand)

Phase B 의 스트리밍 size 검증을 외부에서 확인할 수 있는 스크립트.
pytest 인프라가 Phase 0 상태이므로 full test harness 구축을 미루고,
`scripts/verify_upload_size.py` 단일 파일로 경계 케이스를 즉시 회귀 검증.

7 케이스:
- 0 bytes → 400 (정책)
- 1 byte → 201 (happy path)
- max_bytes - 1 → 201 (경계 하)
- max_bytes 정확 → 201 (경계 상)
- max_bytes + 1 → 413 (스트리밍 차단)
- CL slack 초과 (override 헤더) → 413 (사전 차단)
- CL 위조 (작은 헤더 + 큰 body) → best-effort (서버 거절 status 수용)

`/api/config/public` 에서 max_bytes 를 동적 획득. slack_ratio 는 비공개라
스크립트 상수로 1.05 하드코딩 (config.yaml 과 동기화 유지 주석 명시).

Cleanup: 파일명 prefix `__upload_boundary_test__` + ns timestamp 로
실데이터와 격리. 시작 시 pre-cleanup + 각 케이스 직후 + finally 블록 cleanup.

`docker compose exec fastapi python /app/scripts/verify_upload_size.py` 로 실행.
UPLOAD_TEST_TOKEN + DATABASE_URL 환경 변수 필요. scripts/ 는 이미 read-only
volume 으로 마운트돼 있어 배포·재빌드 불필요.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-17 08:23:20 +09:00
parent 3a3cd832f6
commit da30eca829
+294
View File
@@ -0,0 +1,294 @@
"""업로드 size 경계 검증 스크립트 (Phase B — 수동 on-demand 실행)
`app/api/documents.py` 의 upload_document 가 설정된 `upload.max_bytes` 를
스트리밍으로 강제 집행하는지 경계 케이스로 확인. pytest 인프라가 구축되기
전까지의 임시 검증 수단이며, 실행 결과는 exit code 로 전달.
사용 환경:
- GPU 서버에서 `docker compose exec fastapi python /app/scripts/verify_upload_size.py`
- 로컬에서 tailscale 경유도 가능하지만 대용량 payload + cleanup 복잡도 때문에
컨테이너 내부 실행이 표준.
필수 환경 변수:
- UPLOAD_TEST_TOKEN : admin 계정 로그인으로 획득한 JWT Bearer 값
- DATABASE_URL : docker-compose 기존 env 재사용 (cleanup 용)
옵션 환경 변수:
- UPLOAD_TEST_BASE_URL : 기본값 http://localhost:8000
- UPLOAD_TEST_INBOX_PATH : 기본값 /documents/PKM/Inbox
- UPLOAD_TEST_TIMEOUT : httpx 요청 타임아웃 초, 기본값 120
Cleanup 전략:
- 파일명 prefix `__upload_boundary_test__` 로 실데이터와 격리
- 스크립트 시작 시 같은 prefix 잔여물 pre-cleanup (pre-run residue 정리)
- 각 케이스 직후 DB rows + 파일 즉시 cleanup
- 예외 시 finally 블록에서 one-more-pass cleanup
"""
import asyncio
import os
import sys
import time
from dataclasses import dataclass
from io import BytesIO
from pathlib import Path
import httpx
# seed_admin.py 와 동일한 sys.path 패턴
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
TEST_PREFIX = "__upload_boundary_test__"
@dataclass
class Case:
name: str
size: int
expected_status: int | set[int]
# 특수 CL 헤더 (override 필요 시 문자열, 없으면 None)
content_length_override: str | None = None
skip_reason: str | None = None
def _status_matches(actual: int, expected: int | set[int]) -> bool:
if isinstance(expected, set):
return actual in expected
return actual == expected
async def _pre_cleanup(session_factory, inbox: Path) -> int:
"""이전 실행에서 남은 잔여 테스트 row + 파일 정리."""
async with session_factory() as s:
rows = (
await s.execute(
text(
"SELECT id, file_path FROM documents "
"WHERE title LIKE :pat"
),
{"pat": f"{TEST_PREFIX}%"},
)
).all()
for doc_id, file_path in rows:
_unlink_by_relpath(file_path, inbox)
await s.execute(
text("DELETE FROM processing_queue WHERE doc_id = :id"),
{"id": doc_id},
)
await s.execute(
text("DELETE FROM documents WHERE id = :id"),
{"id": doc_id},
)
await s.commit()
return len(rows)
def _unlink_by_relpath(rel_path: str | None, inbox: Path) -> None:
if not rel_path:
return
# file_path 는 NAS 루트 기준 상대경로 ("PKM/Inbox/<name>").
# inbox 는 /documents/PKM/Inbox 이므로 NAS 루트는 inbox.parent.parent.
nas_root = inbox.parent.parent
target = nas_root / rel_path
target.unlink(missing_ok=True)
async def _case_cleanup(
session_factory, filename: str, inbox: Path
) -> tuple[bool, list[str]]:
"""특정 테스트 파일명 기준 정리. (ok, notes) 반환."""
notes: list[str] = []
# 파일 존재 여부
file_present = (inbox / filename).exists()
async with session_factory() as s:
rows = (
await s.execute(
text(
"SELECT id, file_path FROM documents "
"WHERE title = :t"
),
{"t": Path(filename).stem},
)
).all()
for doc_id, file_path in rows:
_unlink_by_relpath(file_path, inbox)
await s.execute(
text("DELETE FROM processing_queue WHERE doc_id = :id"),
{"id": doc_id},
)
await s.execute(
text("DELETE FROM documents WHERE id = :id"),
{"id": doc_id},
)
await s.commit()
# 파일이 inbox 에 남아 있으면 unlink
if (inbox / filename).exists():
(inbox / filename).unlink(missing_ok=True)
notes.append("orphan-file-unlinked")
return True, notes
async def _run_case(
client: httpx.AsyncClient,
base_url: str,
token: str,
session_factory,
inbox: Path,
case: Case,
) -> tuple[bool, str]:
"""단일 케이스 실행. (pass, detail) 반환."""
filename = f"{TEST_PREFIX}{time.time_ns()}_{case.name.replace(' ', '_')}.bin"
payload = b"\x00" * case.size
headers = {"Authorization": f"Bearer {token}"}
# httpx 는 files= 에 대해 Content-Length 자동 계산.
# override 가 필요한 케이스는 raw content 로 multipart 수동 구성.
try:
if case.content_length_override is not None:
boundary = "TestBoundary__verify_upload_size__"
body = (
f"--{boundary}\r\n"
f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n'
f"Content-Type: application/octet-stream\r\n\r\n"
).encode() + payload + f"\r\n--{boundary}--\r\n".encode()
headers["Content-Type"] = f"multipart/form-data; boundary={boundary}"
headers["Content-Length"] = case.content_length_override
resp = await client.post(
f"{base_url}/api/documents/",
content=body,
headers=headers,
)
else:
resp = await client.post(
f"{base_url}/api/documents/",
files={
"file": (filename, BytesIO(payload), "application/octet-stream")
},
headers=headers,
)
except httpx.RequestError as e:
# 네트워크 레벨 실패 — uvicorn 이 body 검증 단계에서 connection close 한 경우 포함
# 서버 거절로 간주해 "pass" (case 7 같은 보호 시나리오에 해당)
actual_ok = _status_matches(599, case.expected_status) if isinstance(
case.expected_status, set
) and 599 in case.expected_status else (
isinstance(case.expected_status, set)
and any(s >= 400 for s in case.expected_status)
)
await _case_cleanup(session_factory, filename, inbox)
if actual_ok:
return True, f"network-level reject ({type(e).__name__}) treated as pass"
return False, f"network error: {e}"
status = resp.status_code
status_ok = _status_matches(status, case.expected_status)
# cleanup (성공/실패 무관)
await _case_cleanup(session_factory, filename, inbox)
exp_str = (
"|".join(str(s) for s in sorted(case.expected_status))
if isinstance(case.expected_status, set)
else str(case.expected_status)
)
detail = f"expected={exp_str} actual={status}"
return status_ok, detail
async def main() -> int:
base_url = os.getenv("UPLOAD_TEST_BASE_URL", "http://localhost:8000")
token = os.getenv("UPLOAD_TEST_TOKEN")
inbox_path = os.getenv("UPLOAD_TEST_INBOX_PATH", "/documents/PKM/Inbox")
timeout = float(os.getenv("UPLOAD_TEST_TIMEOUT", "120"))
database_url = os.getenv("DATABASE_URL")
if not token:
print("ERROR: UPLOAD_TEST_TOKEN 환경 변수 필요 (admin JWT Bearer).", file=sys.stderr)
return 2
if not database_url:
print("ERROR: DATABASE_URL 환경 변수 필요 (cleanup 용).", file=sys.stderr)
return 2
inbox = Path(inbox_path)
if not inbox.is_dir():
print(f"ERROR: inbox 경로 없음: {inbox}", file=sys.stderr)
return 2
# /api/config/public 로 max_bytes 동적 획득 (하드코딩 방지)
async with httpx.AsyncClient(timeout=10.0) as probe:
cfg_resp = await probe.get(f"{base_url}/api/config/public")
cfg_resp.raise_for_status()
max_bytes = int(cfg_resp.json()["upload"]["max_bytes"])
# slack_ratio 는 서버 내부값 (공개 안 됨). config.yaml 과 동기화 유지.
SLACK_RATIO = 1.05
engine = create_async_engine(database_url)
session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
cases: list[Case] = [
Case(name="0 bytes", size=0, expected_status=400),
Case(name="1 byte", size=1, expected_status=201),
Case(name="max_bytes minus 1", size=max_bytes - 1, expected_status=201),
Case(name="max_bytes exact", size=max_bytes, expected_status=201),
Case(name="max_bytes plus 1", size=max_bytes + 1, expected_status=413),
Case(
name="cl slack over (override CL)",
size=100,
expected_status=413,
content_length_override=str(int(max_bytes * SLACK_RATIO) + 1),
),
Case(
name="cl forge (body > max, CL tiny)",
size=max_bytes + 1,
expected_status={400, 413, 422, 500},
content_length_override="1024",
skip_reason=None, # best-effort; 실패해도 전체 PASS 판정에는 영향 없음
),
]
print(f"=== base_url={base_url} max_bytes={max_bytes} slack={SLACK_RATIO} ===")
pre_removed = await _pre_cleanup(session_factory, inbox)
print(f"=== pre-cleanup: {pre_removed} row(s) removed ===")
passed = 0
failed = 0
skipped = 0
async with httpx.AsyncClient(timeout=timeout) as client:
try:
for idx, case in enumerate(cases, start=1):
if case.skip_reason:
print(f"case {idx} ({case.name:30s}): SKIP ({case.skip_reason})")
skipped += 1
continue
ok, detail = await _run_case(
client, base_url, token, session_factory, inbox, case
)
status_label = "PASS" if ok else "FAIL"
if ok:
passed += 1
else:
failed += 1
print(f"case {idx} ({case.name:30s}): {detail:35s} {status_label}")
finally:
# 혹시 남은 잔여물 (running exception 등) 정리
tail = await _pre_cleanup(session_factory, inbox)
if tail:
print(f"=== final cleanup: {tail} row(s) removed ===")
await engine.dispose()
total = passed + failed + skipped
if failed == 0:
print(f"=== {passed}/{total - skipped} PASS" + (f" ({skipped} SKIP)" if skipped else "") + " ===")
return 0
print(f"=== {passed}/{total - skipped} PASS, {failed} FAIL" + (f" ({skipped} SKIP)" if skipped else "") + " ===")
return failed
if __name__ == "__main__":
sys.exit(asyncio.run(main()))