From 8f25d396df1c469da5cf88245683277179ec2c76 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Fri, 24 Apr 2026 06:57:02 +0900 Subject: [PATCH] =?UTF-8?q?feat(upload):=20=C2=A74-=EB=8F=85=EB=A6=BD=20?= =?UTF-8?q?=E2=80=94=20error=5Fcode=20=EC=B2=B4=EA=B3=84=20+=20.uploading?= =?UTF-8?q?=20orphan=20cleanup=20+=20=EC=A7=84=ED=96=89=EB=A5=A0/abort=20U?= =?UTF-8?q?X?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit plan: ~/.claude/plans/luminous-sprouting-hamster.md §4 (1GB/stt/dashboard 외 독립 항목) backend: - _upload_error(status, code, msg) 헬퍼 정의 (§3 가 호출만 추가했던 누락 수정). detail = {error_code, message} — 프론트가 error_code 로 분기. - upload_document 의 모든 HTTPException 을 _upload_error 로 전환: body_too_large / invalid_input / empty_file / unsupported_codec / internal - ClientDisconnect → 499 network_abort + 임시파일 정리. asyncio.TimeoutError → 408 upload_timeout. - 쓰기 중 .uploading 임시명 → 완료 후 staging.replace(target) atomic rename. → 프로세스 크래시 잔존물은 cleanup_orphan_uploads 가 수거. - file_watcher SKIP_EXTENSIONS 에 .uploading 추가 (오해 픽업 방지). cleanup scheduler: - workers/upload_cleanup.py 신규. 10분 주기로 Inbox 하위 *.uploading 중 mtime > orphan_max_age_sec(3600) 인 파일 삭제. - 최근 3회 (≈30분) 누적 삭제 수가 cleanup_warn_threshold(10) 이상이면 WARNING 로그. in-memory deque (재시작 시 리셋) — 집요한 이슈만 잡는 목적. - core/config.py UploadConfig 에 두 임계치 필드 (defaults — config.yaml override 무관). frontend: - api.ts: ApiError 에 optional errorCode/errorMessage 필드 (detail string 유지로 기존 5+ 소비자 호환). parseDetail() 가 {error_code, message} 객체 응답을 풀어 정규화. uploadFile(path, formData, {signal, onProgress}) XHR 헬퍼 신규 (fetch() 가 upload progress 미지원이라 XHR). 401 refresh 1회 정책 동일. - UploadDropzone.svelte 재작성: 진행률 바, 파일별/전체 abort 버튼, 페이지 이탈 beforeunload 경고, errorCode 별 토스트 메시지 분기 (7 코드 — body_too_large / upload_timeout / network_abort / empty_file / invalid_input / unsupported_codec / internal). 컴포넌트 unmount 시 진행 중 업로드 abort. 보류: - max_bytes 1GB 상향 + Caddyfile 1100MB (별도 결정으로 100MB 유지) - /dashboard 카테고리 카드 (별도 plan) - docs/categories.md (§1-3 정의 안착 후) Co-Authored-By: Claude Opus 4.7 (1M context) --- app/api/documents.py | 89 ++++++-- app/core/config.py | 3 + app/main.py | 2 + app/workers/file_watcher.py | 2 +- app/workers/upload_cleanup.py | 78 +++++++ frontend/src/lib/api.ts | 144 +++++++++++- .../src/lib/components/UploadDropzone.svelte | 208 ++++++++++++++---- 7 files changed, 458 insertions(+), 68 deletions(-) create mode 100644 app/workers/upload_cleanup.py diff --git a/app/api/documents.py b/app/api/documents.py index 196a430..b484c8b 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -25,6 +25,7 @@ from fastapi.responses import FileResponse from pydantic import BaseModel from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession +from starlette.requests import ClientDisconnect from ai.client import AIClient, _load_prompt, parse_json_response from core.auth import get_current_user @@ -39,6 +40,25 @@ from services.prompt_versions import ANALYZE_PROMPT_VERSION, resolve_primary_mod from services.search.llm_gate import get_mlx_gate router = APIRouter() +logger = logging.getLogger(__name__) + + +def _upload_error(status_code: int, error_code: str, message: str) -> HTTPException: + """업로드 실패 응답. detail 은 객체 — 프론트가 error_code 로 분기. + + error_code 종류: + body_too_large — Content-Length 또는 스트리밍 누적이 max_bytes 초과 (413) + upload_timeout — 서버 read timeout (408) + network_abort — 클라이언트 abort / 연결 끊김 (499) + empty_file — 0바이트 (400) + invalid_input — 파일명/경로/필드 검증 실패 (400) + unsupported_codec — 웹 업로드에서 direct-play 불가 비디오 (400, §3 video) + internal — 그 외 알 수 없는 에러 (500) + """ + return HTTPException( + status_code=status_code, + detail={"error_code": error_code, "message": message}, + ) # ─── 스키마 ─── @@ -541,11 +561,14 @@ async def upload_document( """파일 업로드 → Inbox 저장 + DB 등록 + 처리 큐 등록. Size 한도: `settings.upload.max_bytes` (authoritative). - - Content-Length 사전 차단 (slack_ratio 여유) → 413 - - 스트리밍 누적 검사 (Content-Length 위조 방어) → 413 - - 0바이트 파일은 400 reject + - Content-Length 사전 차단 (slack_ratio 여유) → 413 body_too_large + - 스트리밍 누적 검사 (Content-Length 위조 방어) → 413 body_too_large + - 0바이트 파일은 400 empty_file reject + - 쓰기 중에는 `.uploading` 임시명 → 완료 후 atomic rename. + → 프로세스 크래시 시 잔존물은 cleanup_orphan_uploads 스케줄러가 수거. + - 클라이언트 abort (`ClientDisconnect`) → 499 network_abort + 임시파일 정리 - 파일 저장 완료 후에만 DB 레코드 생성 (고아 레코드 방지) - - 예외 발생 시 partial file cleanup + - 에러 응답 detail 은 `{error_code, message}` 객체 — 프론트가 코드별 분기. """ from core.library import DEFAULT_LIBRARY_PATH, LIBRARY_PREFIX, normalize_library_path @@ -561,7 +584,7 @@ async def upload_document( try: cl = int(content_length_header) if cl > int(max_bytes * slack_ratio): - raise HTTPException(status_code=413, detail="파일이 너무 큽니다") + raise _upload_error(413, "body_too_large", "파일이 너무 큽니다") except ValueError: pass # 잘못된 헤더는 스트리밍 단계에서 max_bytes 로 차단 @@ -571,7 +594,7 @@ async def upload_document( if doc_purpose == "": doc_purpose = None elif doc_purpose not in ("business", "knowledge"): - raise HTTPException(status_code=400, detail="doc_purpose는 business 또는 knowledge만 가능") + raise _upload_error(400, "invalid_input", "doc_purpose는 business 또는 knowledge만 가능") # library_path 검증 + 정규화 library_tag = None @@ -580,19 +603,19 @@ async def upload_document( normalized = normalize_library_path(library_path) library_tag = f"{LIBRARY_PREFIX}{normalized}" except ValueError as e: - raise HTTPException(status_code=400, detail=f"잘못된 자료실 경로: {e}") + raise _upload_error(400, "invalid_input", f"잘못된 자료실 경로: {e}") # 자료실 업로드인데 경로 미지정 → 미분류 자동 태깅 if doc_purpose == "business" and not library_tag: library_tag = f"{LIBRARY_PREFIX}{DEFAULT_LIBRARY_PATH}" if not file.filename: - raise HTTPException(status_code=400, detail="파일명이 필요합니다") + raise _upload_error(400, "invalid_input", "파일명이 필요합니다") # 파일명 정규화 (경로 이탈 방지) safe_name = Path(file.filename).name if not safe_name or safe_name.startswith("."): - raise HTTPException(status_code=400, detail="유효하지 않은 파일명") + raise _upload_error(400, "invalid_input", "유효하지 않은 파일명") # §3: 웹 업로드는 direct-play 불가 비디오 거부 (NAS 드롭은 file_watcher 가 # quarantine 으로 수용). UploadDropzone 이 error_code='unsupported_codec' 로 @@ -613,32 +636,55 @@ async def upload_document( # Inbox 하위 경로 검증 if not str(target).startswith(str(inbox_dir.resolve())): - raise HTTPException(status_code=400, detail="잘못된 파일 경로") + raise _upload_error(400, "invalid_input", "잘못된 파일 경로") - # 중복 파일명 처리 + # 중복 파일명 처리 — 최종 target 도, 임시 .uploading 파일도 모두 충돌 회피 counter = 1 stem, suffix = target.stem, target.suffix - while target.exists(): + staging = target.with_name(target.name + ".uploading") + while target.exists() or staging.exists(): target = inbox_dir.resolve() / f"{stem}_{counter}{suffix}" + staging = target.with_name(target.name + ".uploading") counter += 1 - # ── 스트리밍 저장 + 누적 size 검사 ── + # ── 스트리밍 저장 + 누적 size 검사 (`.uploading` 임시명) ── written = 0 try: - with target.open("wb") as f: + with staging.open("wb") as f: while chunk := await file.read(chunk_size): written += len(chunk) if written > max_bytes: - raise HTTPException(status_code=413, detail="파일이 너무 큽니다") + raise _upload_error(413, "body_too_large", "파일이 너무 큽니다") f.write(chunk) - # with 블록 종료 시 자동 flush + close if written == 0: - raise HTTPException(status_code=400, detail="빈 파일은 업로드할 수 없습니다") - except Exception: - # partial file cleanup. KeyboardInterrupt/SystemExit 등 BaseException 계열은 잡지 않음. - target.unlink(missing_ok=True) + raise _upload_error(400, "empty_file", "빈 파일은 업로드할 수 없습니다") + except ClientDisconnect: + staging.unlink(missing_ok=True) + logger.info("upload aborted by client: %s (written=%d)", safe_name, written) + # 499 = nginx 관용 (Client Closed Request). 응답 도달 가능성 낮지만 일관 형식 유지. + raise _upload_error(499, "network_abort", "업로드가 취소되었습니다") + except asyncio.TimeoutError: + staging.unlink(missing_ok=True) + logger.warning("upload timeout: %s (written=%d)", safe_name, written) + raise _upload_error(408, "upload_timeout", "업로드 시간 초과") + except HTTPException: + # _upload_error 가 만든 예외는 그대로 통과 + 임시 파일 정리 + staging.unlink(missing_ok=True) raise + except Exception: + staging.unlink(missing_ok=True) + logger.exception("upload internal error: %s (written=%d)", safe_name, written) + raise _upload_error(500, "internal", "업로드 처리 중 오류가 발생했습니다") + + # ── 파일 저장 완료: atomic rename → 최종 경로 ── + + try: + staging.replace(target) + except OSError: + staging.unlink(missing_ok=True) + logger.exception("upload rename failed: %s -> %s", staging, target) + raise _upload_error(500, "internal", "파일 저장 후 정리 중 오류가 발생했습니다") # ── 파일 저장 완료 후에만 hash + DB 레코드 ── @@ -909,9 +955,6 @@ async def delete_document( return {"message": f"문서 {doc_id} soft-delete 완료"} -logger = logging.getLogger(__name__) - - @router.get("/{doc_id}/content") async def get_document_content( doc_id: int, diff --git a/app/core/config.py b/app/core/config.py index 3bc030b..43260b4 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -11,6 +11,9 @@ class UploadConfig(BaseModel): max_bytes: int = 100_000_000 content_length_slack_ratio: float = 1.05 stream_chunk_bytes: int = 1_048_576 + # orphan cleanup (`*.uploading` — 크래시/abort 후 잔존물) + orphan_max_age_sec: int = 3600 + cleanup_warn_threshold: int = 10 class AIModelConfig(BaseModel): diff --git a/app/main.py b/app/main.py index 568c5e6..0a91a48 100644 --- a/app/main.py +++ b/app/main.py @@ -38,6 +38,7 @@ async def lifespan(app: FastAPI): from workers.mailplus_archive import run as mailplus_run from workers.news_collector import run as news_collector_run from workers.queue_consumer import consume_queue + from workers.upload_cleanup import cleanup_orphan_uploads # 시작: DB 연결 확인 await init_db() @@ -56,6 +57,7 @@ async def lifespan(app: FastAPI): # 상시 실행 scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer") scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher") + scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup") # 일일 스케줄 (KST) scheduler.add_job(law_monitor_run, CronTrigger(hour=7), id="law_monitor") scheduler.add_job(mailplus_run, CronTrigger(hour=7), id="mailplus_morning") diff --git a/app/workers/file_watcher.py b/app/workers/file_watcher.py index 7548fa6..15eaeed 100644 --- a/app/workers/file_watcher.py +++ b/app/workers/file_watcher.py @@ -24,7 +24,7 @@ logger = setup_logger("file_watcher") # 무시할 파일 SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"} -SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload"} +SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload", ".uploading"} # §3 확장자 매핑 AUDIO_EXTS = {".mp3", ".m4a", ".opus", ".wav", ".flac", ".ogg"} diff --git a/app/workers/upload_cleanup.py b/app/workers/upload_cleanup.py new file mode 100644 index 0000000..b47bfc2 --- /dev/null +++ b/app/workers/upload_cleanup.py @@ -0,0 +1,78 @@ +"""업로드 임시파일 cleanup 워커. + +업로드 엔드포인트는 `.uploading` 임시명으로 NAS Inbox 에 쓴 뒤 +완료 시 atomic rename 한다. 정상 abort 는 endpoint 의 except 절이 정리하지만 +프로세스 크래시 / 강제 종료 / 비정상 종료 시 `*.uploading` 잔존물이 남는다. + +이 워커는 10분 주기로 Inbox 하위를 스캔해서 + - mtime 이 `orphan_max_age_sec` (기본 1시간) 보다 오래된 `*.uploading` 삭제 + - 최근 3회 (≈30분) 누적 삭제 수가 `cleanup_warn_threshold` (기본 10) 이상이면 WARNING + +카운터는 in-memory deque (프로세스 재시작 시 리셋). 집요한 이슈만 잡는 것이 목적. +""" + +from __future__ import annotations + +import time +from collections import deque +from pathlib import Path + +from core.config import settings +from core.utils import setup_logger + +logger = setup_logger("upload_cleanup") + +# 최근 3회 run 의 삭제 카운트. 30분 윈도우 (10분 주기 × 3). +_recent_deletes: deque[int] = deque(maxlen=3) + + +async def cleanup_orphan_uploads() -> int: + """`*.uploading` orphan 파일을 수거. 삭제 수 반환. + + 호출은 APScheduler 가 10분 주기로 트리거. + """ + inbox_path = Path(settings.nas_mount_path) / "PKM" / "Inbox" + if not inbox_path.exists(): + return 0 + + max_age = settings.upload.orphan_max_age_sec + threshold = settings.upload.cleanup_warn_threshold + now = time.time() + + deleted = 0 + total_bytes = 0 + for f in inbox_path.rglob("*.uploading"): + try: + if not f.is_file(): + continue + age = now - f.stat().st_mtime + if age < max_age: + continue + size = f.stat().st_size + f.unlink() + deleted += 1 + total_bytes += size + logger.info("orphan upload deleted: %s (age=%ds, size=%d)", f.name, int(age), size) + except OSError as e: + # 다른 프로세스가 정리 중이거나 권한 문제 — 다음 주기에 재시도 + logger.warning("orphan upload cleanup skipped %s: %s", f, e) + + _recent_deletes.append(deleted) + window_total = sum(_recent_deletes) + if window_total >= threshold: + logger.warning( + "upload orphan cleanup high — window=%d (last %d runs), threshold=%d. " + "abort 가 구조적으로 많거나 대용량 업로드 실패 반복 의심.", + window_total, + len(_recent_deletes), + threshold, + ) + elif deleted > 0: + logger.info( + "upload orphan cleanup: deleted=%d bytes=%d window_sum=%d", + deleted, + total_bytes, + window_total, + ) + + return deleted diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 59014dc..887f41c 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -73,11 +73,36 @@ async function handleTokenRefresh(): Promise { } } +/** + * 일반 에러는 `detail` 만 string. 업로드 엔드포인트는 `{error_code, message}` + * 객체를 보내므로 (§4), 응답 파서가 그 객체를 풀어 `errorCode` / `errorMessage` + * 필드로 분리하고 `detail` 은 사람이 읽을 메시지(string) 로 정규화한다. + * 덕분에 기존 `err.detail` 소비자(toast 등)는 그대로 동작. + */ export type ApiError = { status: number; detail: string; + errorCode?: string; + errorMessage?: string; }; +function parseDetail(body: unknown, fallback: string): { + detail: string; + errorCode?: string; + errorMessage?: string; +} { + if (body && typeof body === 'object' && 'detail' in body) { + const d = (body as { detail: unknown }).detail; + if (typeof d === 'string' && d) return { detail: d }; + if (d && typeof d === 'object') { + const obj = d as { error_code?: string; message?: string }; + const message = obj.message || fallback; + return { detail: message, errorCode: obj.error_code, errorMessage: obj.message }; + } + } + return { detail: fallback }; +} + export async function api( path: string, options: RequestInit = {}, @@ -113,8 +138,9 @@ export async function api( credentials: 'include', }); if (!retryRes.ok) { - const err = await retryRes.json().catch(() => ({ detail: 'Unknown error' })); - throw { status: retryRes.status, detail: err.detail || retryRes.statusText } as ApiError; + const body = await retryRes.json().catch(() => null); + const parsed = parseDetail(body, retryRes.statusText); + throw { status: retryRes.status, ...parsed } as ApiError; } return retryRes.json(); } catch (e) { @@ -124,8 +150,9 @@ export async function api( } if (!res.ok) { - const err = await res.json().catch(() => ({ detail: res.statusText })); - throw { status: res.status, detail: err.detail || res.statusText } as ApiError; + const body = await res.json().catch(() => null); + const parsed = parseDetail(body, res.statusText); + throw { status: res.status, ...parsed } as ApiError; } // 204 No Content @@ -133,3 +160,112 @@ export async function api( return res.json(); } + +/** + * 업로드 전용 헬퍼 — XMLHttpRequest 기반. + * + * 이유: fetch() 는 upload progress 이벤트를 표준에서 지원하지 않음. + * 진행률 표시 + 사용자 abort 가 필수인 업로드만 XHR 로 분리. + * + * - access token 자동 첨부 + * - 401 → refresh 1회 재시도 (api() 와 같은 정책) + * - signal: AbortSignal — 사용자 취소시 xhr.abort() 호출 + * - onProgress(loaded, total): 전송 중 콜백 + * - 에러는 ApiError 형식으로 reject + */ +export interface UploadOptions { + signal?: AbortSignal; + onProgress?: (loaded: number, total: number) => void; +} + +export async function uploadFile( + path: string, + formData: FormData, + opts: UploadOptions = {}, +): Promise { + const tryOnce = (token: string | null) => + new Promise((resolve, reject) => { + const xhr = new XMLHttpRequest(); + xhr.open('POST', `${API_BASE}${path}`); + xhr.withCredentials = true; + if (token) xhr.setRequestHeader('Authorization', `Bearer ${token}`); + + const abortHandler = () => xhr.abort(); + if (opts.signal) { + if (opts.signal.aborted) { + xhr.abort(); + } else { + opts.signal.addEventListener('abort', abortHandler); + } + } + const cleanup = () => { + if (opts.signal) opts.signal.removeEventListener('abort', abortHandler); + }; + + if (opts.onProgress) { + xhr.upload.addEventListener('progress', (e) => { + if (e.lengthComputable) opts.onProgress!(e.loaded, e.total); + }); + } + + xhr.addEventListener('load', () => { + cleanup(); + if (xhr.status >= 200 && xhr.status < 300) { + if (!xhr.responseText) return resolve({} as T); + try { + resolve(JSON.parse(xhr.responseText)); + } catch { + resolve({} as T); + } + } else { + let body: unknown = null; + try { + body = JSON.parse(xhr.responseText); + } catch { + // 응답 본문 파싱 실패 — fallback 사용 + } + const parsed = parseDetail(body, xhr.statusText || '업로드 실패'); + reject({ status: xhr.status, ...parsed } as ApiError); + } + }); + + xhr.addEventListener('error', () => { + cleanup(); + reject({ + status: 0, + detail: '네트워크 오류', + errorCode: 'network_abort', + errorMessage: '네트워크 오류', + } as ApiError); + }); + + xhr.addEventListener('abort', () => { + cleanup(); + reject({ + status: 0, + detail: '업로드가 취소되었습니다', + errorCode: 'network_abort', + errorMessage: '업로드가 취소되었습니다', + } as ApiError); + }); + + xhr.send(formData); + }); + + try { + return await tryOnce(accessToken); + } catch (e) { + const err = e as ApiError; + // 401 → refresh 1회. abort/네트워크 에러는 retry 안 함. + if (err.status === 401 && accessToken) { + try { + await handleTokenRefresh(); + return await tryOnce(accessToken); + } catch (e2) { + if ((e2 as ApiError).detail) throw e2; + throw { status: 401, detail: '인증이 만료되었습니다' } as ApiError; + } + } + throw err; + } +} diff --git a/frontend/src/lib/components/UploadDropzone.svelte b/frontend/src/lib/components/UploadDropzone.svelte index a420438..cbb8e98 100644 --- a/frontend/src/lib/components/UploadDropzone.svelte +++ b/frontend/src/lib/components/UploadDropzone.svelte @@ -1,23 +1,71 @@ @@ -139,6 +233,7 @@

여기에 파일을 놓으세요

+

최대 {maxBytesLabel}

{/if} @@ -146,19 +241,52 @@ {#if uploading && uploadFiles.length > 0}
-

업로드 중...

-
- {#each uploadFiles as f} -
- {f.name} - - {f.status === 'done' ? '✓' : f.status === 'failed' ? '✗' : f.status === 'uploading' ? '↑' : '…'} - +
+

업로드 중...

+ {#if uploadFiles.some(f => f.status === 'uploading')} + + {/if} +
+
+ {#each uploadFiles as f, i} +
+
+ {f.name} + {formatSize(f.size)} + + {f.status === 'done' ? '✓' : + f.status === 'failed' ? '✗' : + f.status === 'aborted' ? '⊘' : + f.status === 'uploading' ? `${pct(f)}%` : '…'} + + {#if f.status === 'uploading'} + + {/if} +
+ {#if f.status === 'uploading'} +
+
+
+ {/if}
{/each}