"""업로드 size 경계 검증 스크립트 (Phase B — 수동 on-demand 실행) `app/api/documents.py` 의 upload_document 가 설정된 `upload.max_bytes` 를 스트리밍으로 강제 집행하는지 경계 케이스로 확인. pytest 인프라가 구축되기 전까지의 임시 검증 수단이며, 실행 결과는 exit code 로 전달. 사용 환경: - GPU 서버에서 `docker compose exec fastapi python /app/scripts/verify_upload_size.py` - 로컬에서 tailscale 경유도 가능하지만 대용량 payload + cleanup 복잡도 때문에 컨테이너 내부 실행이 표준. 필수 환경 변수: - UPLOAD_TEST_TOKEN : admin 계정 로그인으로 획득한 JWT Bearer 값 - DATABASE_URL : docker-compose 기존 env 재사용 (cleanup 용) 옵션 환경 변수: - UPLOAD_TEST_BASE_URL : 기본값 http://localhost:8000 - UPLOAD_TEST_INBOX_PATH : 기본값 /documents/PKM/Inbox - UPLOAD_TEST_TIMEOUT : httpx 요청 타임아웃 초, 기본값 120 Cleanup 전략: - 파일명 prefix `__upload_boundary_test__` 로 실데이터와 격리 - 스크립트 시작 시 같은 prefix 잔여물 pre-cleanup (pre-run residue 정리) - 각 케이스 직후 DB rows + 파일 즉시 cleanup - 예외 시 finally 블록에서 one-more-pass cleanup """ import asyncio import os import sys import time from dataclasses import dataclass from io import BytesIO from pathlib import Path import httpx # seed_admin.py 와 동일한 sys.path 패턴 sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine TEST_PREFIX = "__upload_boundary_test__" @dataclass class Case: name: str size: int expected_status: int | set[int] # 특수 CL 헤더 (override 필요 시 문자열, 없으면 None) content_length_override: str | None = None skip_reason: str | None = None def _status_matches(actual: int, expected: int | set[int]) -> bool: if isinstance(expected, set): return actual in expected return actual == expected async def _pre_cleanup(session_factory, inbox: Path) -> int: """이전 실행에서 남은 잔여 테스트 row + 파일 정리.""" async with session_factory() as s: rows = ( await s.execute( text( "SELECT id, file_path FROM documents " "WHERE title LIKE :pat" ), {"pat": f"{TEST_PREFIX}%"}, ) ).all() for doc_id, file_path in rows: _unlink_by_relpath(file_path, inbox) await s.execute( text("DELETE FROM processing_queue WHERE document_id = :id"), {"id": doc_id}, ) await s.execute( text("DELETE FROM documents WHERE id = :id"), {"id": doc_id}, ) await s.commit() return len(rows) def _unlink_by_relpath(rel_path: str | None, inbox: Path) -> None: if not rel_path: return # file_path 는 NAS 루트 기준 상대경로 ("PKM/Inbox/"). # inbox 는 /documents/PKM/Inbox 이므로 NAS 루트는 inbox.parent.parent. nas_root = inbox.parent.parent target = nas_root / rel_path target.unlink(missing_ok=True) async def _case_cleanup( session_factory, filename: str, inbox: Path ) -> tuple[bool, list[str]]: """특정 테스트 파일명 기준 정리. (ok, notes) 반환.""" notes: list[str] = [] # 파일 존재 여부 file_present = (inbox / filename).exists() async with session_factory() as s: rows = ( await s.execute( text( "SELECT id, file_path FROM documents " "WHERE title = :t" ), {"t": Path(filename).stem}, ) ).all() for doc_id, file_path in rows: _unlink_by_relpath(file_path, inbox) await s.execute( text("DELETE FROM processing_queue WHERE document_id = :id"), {"id": doc_id}, ) await s.execute( text("DELETE FROM documents WHERE id = :id"), {"id": doc_id}, ) await s.commit() # 파일이 inbox 에 남아 있으면 unlink if (inbox / filename).exists(): (inbox / filename).unlink(missing_ok=True) notes.append("orphan-file-unlinked") return True, notes async def _run_case( client: httpx.AsyncClient, base_url: str, token: str, session_factory, inbox: Path, case: Case, ) -> tuple[bool, str]: """단일 케이스 실행. (pass, detail) 반환.""" filename = f"{TEST_PREFIX}{time.time_ns()}_{case.name.replace(' ', '_')}.bin" payload = b"\x00" * case.size headers = {"Authorization": f"Bearer {token}"} # httpx 는 files= 에 대해 Content-Length 자동 계산. # override 가 필요한 케이스는 raw content 로 multipart 수동 구성. try: if case.content_length_override is not None: boundary = "TestBoundary__verify_upload_size__" body = ( f"--{boundary}\r\n" f'Content-Disposition: form-data; name="file"; filename="{filename}"\r\n' f"Content-Type: application/octet-stream\r\n\r\n" ).encode() + payload + f"\r\n--{boundary}--\r\n".encode() headers["Content-Type"] = f"multipart/form-data; boundary={boundary}" headers["Content-Length"] = case.content_length_override resp = await client.post( f"{base_url}/api/documents/", content=body, headers=headers, ) else: resp = await client.post( f"{base_url}/api/documents/", files={ "file": (filename, BytesIO(payload), "application/octet-stream") }, headers=headers, ) except httpx.RequestError as e: # 네트워크 레벨 실패 — uvicorn 이 body 검증 단계에서 connection close 한 경우 포함 # 서버 거절로 간주해 "pass" (case 7 같은 보호 시나리오에 해당) actual_ok = _status_matches(599, case.expected_status) if isinstance( case.expected_status, set ) and 599 in case.expected_status else ( isinstance(case.expected_status, set) and any(s >= 400 for s in case.expected_status) ) await _case_cleanup(session_factory, filename, inbox) if actual_ok: return True, f"network-level reject ({type(e).__name__}) treated as pass" return False, f"network error: {e}" status = resp.status_code status_ok = _status_matches(status, case.expected_status) # cleanup (성공/실패 무관) await _case_cleanup(session_factory, filename, inbox) exp_str = ( "|".join(str(s) for s in sorted(case.expected_status)) if isinstance(case.expected_status, set) else str(case.expected_status) ) detail = f"expected={exp_str} actual={status}" return status_ok, detail async def main() -> int: base_url = os.getenv("UPLOAD_TEST_BASE_URL", "http://localhost:8000") token = os.getenv("UPLOAD_TEST_TOKEN") inbox_path = os.getenv("UPLOAD_TEST_INBOX_PATH", "/documents/PKM/Inbox") timeout = float(os.getenv("UPLOAD_TEST_TIMEOUT", "120")) database_url = os.getenv("DATABASE_URL") if not token: print("ERROR: UPLOAD_TEST_TOKEN 환경 변수 필요 (admin JWT Bearer).", file=sys.stderr) return 2 if not database_url: print("ERROR: DATABASE_URL 환경 변수 필요 (cleanup 용).", file=sys.stderr) return 2 inbox = Path(inbox_path) if not inbox.is_dir(): print(f"ERROR: inbox 경로 없음: {inbox}", file=sys.stderr) return 2 # /api/config/public 로 max_bytes 동적 획득 (하드코딩 방지) async with httpx.AsyncClient(timeout=10.0) as probe: cfg_resp = await probe.get(f"{base_url}/api/config/public") cfg_resp.raise_for_status() max_bytes = int(cfg_resp.json()["upload"]["max_bytes"]) # slack_ratio 는 서버 내부값 (공개 안 됨). config.yaml 과 동기화 유지. SLACK_RATIO = 1.05 engine = create_async_engine(database_url) session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) # CL override + body size mismatch 는 httpx 의 h11 레이어가 client-side 에서 # LocalProtocolError 로 거절하므로 서버 pre-check 경로를 외부에서 격리 테스트 # 하기 어렵다. 대신 body 가 정직하게 slack 임계치 초과인 케이스로 pre-check # 경로도 간접 확인 (body + multipart 오버헤드가 slack 초과 → CL 헤더 자동 # 계산 시에도 초과되어 pre-check 가 먼저 catch). slack_over_size = int(max_bytes * SLACK_RATIO) + 1 cases: list[Case] = [ Case(name="0 bytes", size=0, expected_status=400), Case(name="1 byte", size=1, expected_status=201), Case(name="max_bytes minus 1", size=max_bytes - 1, expected_status=201), Case(name="max_bytes exact", size=max_bytes, expected_status=201), Case(name="max_bytes plus 1", size=max_bytes + 1, expected_status=413), Case(name="slack over (body > max*slack)", size=slack_over_size, expected_status=413), ] print(f"=== base_url={base_url} max_bytes={max_bytes} slack={SLACK_RATIO} ===") pre_removed = await _pre_cleanup(session_factory, inbox) print(f"=== pre-cleanup: {pre_removed} row(s) removed ===") passed = 0 failed = 0 skipped = 0 async with httpx.AsyncClient(timeout=timeout) as client: try: for idx, case in enumerate(cases, start=1): if case.skip_reason: print(f"case {idx} ({case.name:30s}): SKIP ({case.skip_reason})") skipped += 1 continue ok, detail = await _run_case( client, base_url, token, session_factory, inbox, case ) status_label = "PASS" if ok else "FAIL" if ok: passed += 1 else: failed += 1 print(f"case {idx} ({case.name:30s}): {detail:35s} {status_label}") finally: # 혹시 남은 잔여물 (running exception 등) 정리 tail = await _pre_cleanup(session_factory, inbox) if tail: print(f"=== final cleanup: {tail} row(s) removed ===") await engine.dispose() total = passed + failed + skipped if failed == 0: print(f"=== {passed}/{total - skipped} PASS" + (f" ({skipped} SKIP)" if skipped else "") + " ===") return 0 print(f"=== {passed}/{total - skipped} PASS, {failed} FAIL" + (f" ({skipped} SKIP)" if skipped else "") + " ===") return failed if __name__ == "__main__": sys.exit(asyncio.run(main()))