From 46537ee11af7ec8d3300392862db43f4550bbcac Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Thu, 2 Apr 2026 15:55:38 +0900 Subject: [PATCH] =?UTF-8?q?fix:=20Codex=20=EB=A6=AC=EB=B7=B0=20P1/P2=20?= =?UTF-8?q?=EB=B2=84=EA=B7=B8=204=EA=B1=B4=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - [P1] migration runner 도입: schema_migrations 추적, advisory lock, 단일 트랜잭션 실행, SQL 검증 (기존 DB 업그레이드 대응) - [P1] eml extract 큐 조건 분기: extract_worker 미지원 포맷 큐 스킵 - [P2] iCalendar escape_ical_text() 추가: RFC 5545 준수 - [P2] 이메일 charset 감지: get_content_charset() 사용 + payload None 방어 Co-Authored-By: Claude Opus 4.6 (1M context) --- app/core/database.py | 113 +++++++++++++++++++++++++++++++- app/core/utils.py | 18 ++++- app/workers/mailplus_archive.py | 28 ++++++-- 3 files changed, 150 insertions(+), 9 deletions(-) diff --git a/app/core/database.py b/app/core/database.py index a1da05d..f4fef53 100644 --- a/app/core/database.py +++ b/app/core/database.py @@ -1,10 +1,17 @@ """PostgreSQL 연결 — SQLAlchemy async engine + session factory""" +import logging +import re +import time +from pathlib import Path + from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from sqlalchemy.orm import DeclarativeBase from core.config import settings +logger = logging.getLogger("migration") + engine = create_async_engine( settings.database_url, echo=False, @@ -19,12 +26,116 @@ class Base(DeclarativeBase): pass +# NOTE: 모든 pending migration은 단일 트랜잭션으로 실행됨. +# DDL이 많거나 대량 데이터 변경이 포함된 migration은 장시간 lock을 유발할 수 있음. +_MIGRATION_VERSION_RE = re.compile(r"^(\d+)_") +_MIGRATION_LOCK_KEY = 938475 + + +def _parse_migration_files(migrations_dir: Path) -> list[tuple[int, str, Path]]: + """migration 파일 스캔 → (version, name, path) 리스트, 버전순 정렬""" + files = [] + for p in sorted(migrations_dir.glob("*.sql")): + m = _MIGRATION_VERSION_RE.match(p.name) + if not m: + continue + version = int(m.group(1)) + files.append((version, p.name, p)) + + # 중복 버전 검사 + seen: dict[int, str] = {} + for version, name, _ in files: + if version in seen: + raise RuntimeError( + f"migration 버전 중복: {seen[version]} vs {name} (version={version})" + ) + seen[version] = name + + files.sort(key=lambda x: x[0]) + return files + + +def _validate_sql_content(name: str, sql: str) -> None: + """migration SQL에 BEGIN/COMMIT이 포함되어 있으면 에러 (외부 트랜잭션 깨짐 방지)""" + # 주석(-- ...) 라인 제거 후 검사 + lines = [ + line for line in sql.splitlines() + if not line.strip().startswith("--") + ] + stripped = "\n".join(lines).upper() + for keyword in ("BEGIN", "COMMIT", "ROLLBACK"): + # 단어 경계로 매칭 (예: BEGIN_SOMETHING은 제외) + if re.search(rf"\b{keyword}\b", stripped): + raise RuntimeError( + f"migration {name}에 {keyword} 포함됨 — " + f"migration SQL에는 트랜잭션 제어문을 넣지 마세요" + ) + + +async def _run_migrations(conn) -> None: + """미적용 migration 실행 (호출자가 트랜잭션 관리)""" + from sqlalchemy import text + + # schema_migrations 테이블 생성 + await conn.execute(text(""" + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INT PRIMARY KEY, + name TEXT NOT NULL, + applied_at TIMESTAMPTZ DEFAULT NOW() + ) + """)) + + # advisory lock 획득 (트랜잭션 끝나면 자동 해제) + await conn.execute(text( + f"SELECT pg_advisory_xact_lock({_MIGRATION_LOCK_KEY})" + )) + + # 적용 이력 조회 + result = await conn.execute(text("SELECT version FROM schema_migrations")) + applied = {row[0] for row in result} + + # migration 파일 스캔 + migrations_dir = Path(__file__).resolve().parent.parent.parent / "migrations" + if not migrations_dir.is_dir(): + logger.info("[migration] migrations/ 디렉토리 없음, 스킵") + return + + files = _parse_migration_files(migrations_dir) + pending = [(v, name, path) for v, name, path in files if v not in applied] + + if not pending: + logger.info("[migration] 미적용 migration 없음") + return + + start = time.monotonic() + logger.info(f"[migration] {len(pending)}건 적용 시작") + + for version, name, path in pending: + sql = path.read_text(encoding="utf-8") + _validate_sql_content(name, sql) + logger.info(f"[migration] {name} 실행 중...") + await conn.execute(text(sql)) + await conn.execute( + text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"), + {"v": version, "n": name}, + ) + logger.info(f"[migration] {name} 완료") + + elapsed = time.monotonic() - start + logger.info(f"[migration] 전체 {len(pending)}건 완료 ({elapsed:.1f}s)") + + async def init_db(): - """DB 연결 확인 (스키마는 migrations/로 관리)""" + """DB 연결 확인 + pending migration 실행""" from sqlalchemy import text async with engine.begin() as conn: await conn.execute(text("SELECT 1")) + try: + await _run_migrations(conn) + except Exception as e: + logger.error(f"[migration] 실패: {e} — 전체 트랜잭션 롤백") + raise async def get_session() -> AsyncSession: diff --git a/app/core/utils.py b/app/core/utils.py index 646c77b..09763bb 100644 --- a/app/core/utils.py +++ b/app/core/utils.py @@ -49,6 +49,20 @@ def count_log_errors(log_path: str) -> int: # ─── CalDAV 헬퍼 ─── +def escape_ical_text(text: str | None) -> str: + """iCalendar TEXT 값 이스케이프 (RFC 5545 §3.3.11). + SUMMARY, DESCRIPTION, LOCATION 등 TEXT 프로퍼티에 사용. + """ + if not text: + return "" + text = text.replace("\r\n", "\n").replace("\r", "\n") # CRLF 정규화 + text = text.replace("\\", "\\\\") # 백슬래시 먼저 + text = text.replace("\n", "\\n") + text = text.replace(",", "\\,") + text = text.replace(";", "\\;") + return text + + def create_caldav_todo( caldav_url: str, username: str, @@ -79,8 +93,8 @@ def create_caldav_todo( VERSION:2.0 BEGIN:VTODO UID:{uid} -SUMMARY:{title} -DESCRIPTION:{description} +SUMMARY:{escape_ical_text(title)} +DESCRIPTION:{escape_ical_text(description)} DUE:{due_str} STATUS:NEEDS-ACTION PRIORITY:5 diff --git a/app/workers/mailplus_archive.py b/app/workers/mailplus_archive.py index 75be597..c8b2e2a 100644 --- a/app/workers/mailplus_archive.py +++ b/app/workers/mailplus_archive.py @@ -139,13 +139,23 @@ async def run(): # 본문 추출 (텍스트 파트) body = "" + charset = None if msg.is_multipart(): for part in msg.walk(): if part.get_content_type() == "text/plain": - body = part.get_payload(decode=True).decode("utf-8", errors="replace") + payload = part.get_payload(decode=True) + if payload is not None: + charset = part.get_content_charset() or "utf-8" + body = payload.decode(charset, errors="replace") break else: - body = msg.get_payload(decode=True).decode("utf-8", errors="replace") + payload = msg.get_payload(decode=True) + if payload is not None: + charset = msg.get_content_charset() or "utf-8" + body = payload.decode(charset, errors="replace") + + if "\ufffd" in body[:1000]: + logger.debug(f"[메일] charset={charset or 'unknown'} 디코딩 중 replacement 발생") # DB 등록 rel_path = str(eml_path.relative_to(Path(settings.nas_mount_path))) @@ -164,11 +174,17 @@ async def run(): session.add(doc) await session.flush() - session.add(ProcessingQueue( - document_id=doc.id, stage="extract", status="pending", - )) + safe_subj = subject.replace("\n", " ").replace("\r", " ")[:200] - archived.append(subject) + # TODO: extract_worker가 eml 본문/첨부 파싱 지원 시 이 조건 제거 + if doc.file_format != "eml": + session.add(ProcessingQueue( + document_id=doc.id, stage="extract", status="pending", + )) + else: + logger.debug(f"[메일] {safe_subj} — eml extract 미지원, 큐 스킵") + + archived.append(safe_subj) max_uid = max(max_uid, uid) except Exception as e: