fix: Codex 리뷰 P1/P2 버그 4건 수정
- [P1] migration runner 도입: schema_migrations 추적, advisory lock, 단일 트랜잭션 실행, SQL 검증 (기존 DB 업그레이드 대응) - [P1] eml extract 큐 조건 분기: extract_worker 미지원 포맷 큐 스킵 - [P2] iCalendar escape_ical_text() 추가: RFC 5545 준수 - [P2] 이메일 charset 감지: get_content_charset() 사용 + payload None 방어 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,10 +1,17 @@
|
|||||||
"""PostgreSQL 연결 — SQLAlchemy async engine + session factory"""
|
"""PostgreSQL 연결 — SQLAlchemy async engine + session factory"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||||
from sqlalchemy.orm import DeclarativeBase
|
from sqlalchemy.orm import DeclarativeBase
|
||||||
|
|
||||||
from core.config import settings
|
from core.config import settings
|
||||||
|
|
||||||
|
logger = logging.getLogger("migration")
|
||||||
|
|
||||||
engine = create_async_engine(
|
engine = create_async_engine(
|
||||||
settings.database_url,
|
settings.database_url,
|
||||||
echo=False,
|
echo=False,
|
||||||
@@ -19,12 +26,116 @@ class Base(DeclarativeBase):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE: 모든 pending migration은 단일 트랜잭션으로 실행됨.
|
||||||
|
# DDL이 많거나 대량 데이터 변경이 포함된 migration은 장시간 lock을 유발할 수 있음.
|
||||||
|
_MIGRATION_VERSION_RE = re.compile(r"^(\d+)_")
|
||||||
|
_MIGRATION_LOCK_KEY = 938475
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_migration_files(migrations_dir: Path) -> list[tuple[int, str, Path]]:
|
||||||
|
"""migration 파일 스캔 → (version, name, path) 리스트, 버전순 정렬"""
|
||||||
|
files = []
|
||||||
|
for p in sorted(migrations_dir.glob("*.sql")):
|
||||||
|
m = _MIGRATION_VERSION_RE.match(p.name)
|
||||||
|
if not m:
|
||||||
|
continue
|
||||||
|
version = int(m.group(1))
|
||||||
|
files.append((version, p.name, p))
|
||||||
|
|
||||||
|
# 중복 버전 검사
|
||||||
|
seen: dict[int, str] = {}
|
||||||
|
for version, name, _ in files:
|
||||||
|
if version in seen:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"migration 버전 중복: {seen[version]} vs {name} (version={version})"
|
||||||
|
)
|
||||||
|
seen[version] = name
|
||||||
|
|
||||||
|
files.sort(key=lambda x: x[0])
|
||||||
|
return files
|
||||||
|
|
||||||
|
|
||||||
|
def _validate_sql_content(name: str, sql: str) -> None:
|
||||||
|
"""migration SQL에 BEGIN/COMMIT이 포함되어 있으면 에러 (외부 트랜잭션 깨짐 방지)"""
|
||||||
|
# 주석(-- ...) 라인 제거 후 검사
|
||||||
|
lines = [
|
||||||
|
line for line in sql.splitlines()
|
||||||
|
if not line.strip().startswith("--")
|
||||||
|
]
|
||||||
|
stripped = "\n".join(lines).upper()
|
||||||
|
for keyword in ("BEGIN", "COMMIT", "ROLLBACK"):
|
||||||
|
# 단어 경계로 매칭 (예: BEGIN_SOMETHING은 제외)
|
||||||
|
if re.search(rf"\b{keyword}\b", stripped):
|
||||||
|
raise RuntimeError(
|
||||||
|
f"migration {name}에 {keyword} 포함됨 — "
|
||||||
|
f"migration SQL에는 트랜잭션 제어문을 넣지 마세요"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_migrations(conn) -> None:
|
||||||
|
"""미적용 migration 실행 (호출자가 트랜잭션 관리)"""
|
||||||
|
from sqlalchemy import text
|
||||||
|
|
||||||
|
# schema_migrations 테이블 생성
|
||||||
|
await conn.execute(text("""
|
||||||
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
||||||
|
version INT PRIMARY KEY,
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
applied_at TIMESTAMPTZ DEFAULT NOW()
|
||||||
|
)
|
||||||
|
"""))
|
||||||
|
|
||||||
|
# advisory lock 획득 (트랜잭션 끝나면 자동 해제)
|
||||||
|
await conn.execute(text(
|
||||||
|
f"SELECT pg_advisory_xact_lock({_MIGRATION_LOCK_KEY})"
|
||||||
|
))
|
||||||
|
|
||||||
|
# 적용 이력 조회
|
||||||
|
result = await conn.execute(text("SELECT version FROM schema_migrations"))
|
||||||
|
applied = {row[0] for row in result}
|
||||||
|
|
||||||
|
# migration 파일 스캔
|
||||||
|
migrations_dir = Path(__file__).resolve().parent.parent.parent / "migrations"
|
||||||
|
if not migrations_dir.is_dir():
|
||||||
|
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
|
||||||
|
return
|
||||||
|
|
||||||
|
files = _parse_migration_files(migrations_dir)
|
||||||
|
pending = [(v, name, path) for v, name, path in files if v not in applied]
|
||||||
|
|
||||||
|
if not pending:
|
||||||
|
logger.info("[migration] 미적용 migration 없음")
|
||||||
|
return
|
||||||
|
|
||||||
|
start = time.monotonic()
|
||||||
|
logger.info(f"[migration] {len(pending)}건 적용 시작")
|
||||||
|
|
||||||
|
for version, name, path in pending:
|
||||||
|
sql = path.read_text(encoding="utf-8")
|
||||||
|
_validate_sql_content(name, sql)
|
||||||
|
logger.info(f"[migration] {name} 실행 중...")
|
||||||
|
await conn.execute(text(sql))
|
||||||
|
await conn.execute(
|
||||||
|
text("INSERT INTO schema_migrations (version, name) VALUES (:v, :n)"),
|
||||||
|
{"v": version, "n": name},
|
||||||
|
)
|
||||||
|
logger.info(f"[migration] {name} 완료")
|
||||||
|
|
||||||
|
elapsed = time.monotonic() - start
|
||||||
|
logger.info(f"[migration] 전체 {len(pending)}건 완료 ({elapsed:.1f}s)")
|
||||||
|
|
||||||
|
|
||||||
async def init_db():
|
async def init_db():
|
||||||
"""DB 연결 확인 (스키마는 migrations/로 관리)"""
|
"""DB 연결 확인 + pending migration 실행"""
|
||||||
from sqlalchemy import text
|
from sqlalchemy import text
|
||||||
|
|
||||||
async with engine.begin() as conn:
|
async with engine.begin() as conn:
|
||||||
await conn.execute(text("SELECT 1"))
|
await conn.execute(text("SELECT 1"))
|
||||||
|
try:
|
||||||
|
await _run_migrations(conn)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"[migration] 실패: {e} — 전체 트랜잭션 롤백")
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
async def get_session() -> AsyncSession:
|
async def get_session() -> AsyncSession:
|
||||||
|
|||||||
@@ -49,6 +49,20 @@ def count_log_errors(log_path: str) -> int:
|
|||||||
# ─── CalDAV 헬퍼 ───
|
# ─── CalDAV 헬퍼 ───
|
||||||
|
|
||||||
|
|
||||||
|
def escape_ical_text(text: str | None) -> str:
|
||||||
|
"""iCalendar TEXT 값 이스케이프 (RFC 5545 §3.3.11).
|
||||||
|
SUMMARY, DESCRIPTION, LOCATION 등 TEXT 프로퍼티에 사용.
|
||||||
|
"""
|
||||||
|
if not text:
|
||||||
|
return ""
|
||||||
|
text = text.replace("\r\n", "\n").replace("\r", "\n") # CRLF 정규화
|
||||||
|
text = text.replace("\\", "\\\\") # 백슬래시 먼저
|
||||||
|
text = text.replace("\n", "\\n")
|
||||||
|
text = text.replace(",", "\\,")
|
||||||
|
text = text.replace(";", "\\;")
|
||||||
|
return text
|
||||||
|
|
||||||
|
|
||||||
def create_caldav_todo(
|
def create_caldav_todo(
|
||||||
caldav_url: str,
|
caldav_url: str,
|
||||||
username: str,
|
username: str,
|
||||||
@@ -79,8 +93,8 @@ def create_caldav_todo(
|
|||||||
VERSION:2.0
|
VERSION:2.0
|
||||||
BEGIN:VTODO
|
BEGIN:VTODO
|
||||||
UID:{uid}
|
UID:{uid}
|
||||||
SUMMARY:{title}
|
SUMMARY:{escape_ical_text(title)}
|
||||||
DESCRIPTION:{description}
|
DESCRIPTION:{escape_ical_text(description)}
|
||||||
DUE:{due_str}
|
DUE:{due_str}
|
||||||
STATUS:NEEDS-ACTION
|
STATUS:NEEDS-ACTION
|
||||||
PRIORITY:5
|
PRIORITY:5
|
||||||
|
|||||||
@@ -139,13 +139,23 @@ async def run():
|
|||||||
|
|
||||||
# 본문 추출 (텍스트 파트)
|
# 본문 추출 (텍스트 파트)
|
||||||
body = ""
|
body = ""
|
||||||
|
charset = None
|
||||||
if msg.is_multipart():
|
if msg.is_multipart():
|
||||||
for part in msg.walk():
|
for part in msg.walk():
|
||||||
if part.get_content_type() == "text/plain":
|
if part.get_content_type() == "text/plain":
|
||||||
body = part.get_payload(decode=True).decode("utf-8", errors="replace")
|
payload = part.get_payload(decode=True)
|
||||||
|
if payload is not None:
|
||||||
|
charset = part.get_content_charset() or "utf-8"
|
||||||
|
body = payload.decode(charset, errors="replace")
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
body = msg.get_payload(decode=True).decode("utf-8", errors="replace")
|
payload = msg.get_payload(decode=True)
|
||||||
|
if payload is not None:
|
||||||
|
charset = msg.get_content_charset() or "utf-8"
|
||||||
|
body = payload.decode(charset, errors="replace")
|
||||||
|
|
||||||
|
if "\ufffd" in body[:1000]:
|
||||||
|
logger.debug(f"[메일] charset={charset or 'unknown'} 디코딩 중 replacement 발생")
|
||||||
|
|
||||||
# DB 등록
|
# DB 등록
|
||||||
rel_path = str(eml_path.relative_to(Path(settings.nas_mount_path)))
|
rel_path = str(eml_path.relative_to(Path(settings.nas_mount_path)))
|
||||||
@@ -164,11 +174,17 @@ async def run():
|
|||||||
session.add(doc)
|
session.add(doc)
|
||||||
await session.flush()
|
await session.flush()
|
||||||
|
|
||||||
session.add(ProcessingQueue(
|
safe_subj = subject.replace("\n", " ").replace("\r", " ")[:200]
|
||||||
document_id=doc.id, stage="extract", status="pending",
|
|
||||||
))
|
|
||||||
|
|
||||||
archived.append(subject)
|
# TODO: extract_worker가 eml 본문/첨부 파싱 지원 시 이 조건 제거
|
||||||
|
if doc.file_format != "eml":
|
||||||
|
session.add(ProcessingQueue(
|
||||||
|
document_id=doc.id, stage="extract", status="pending",
|
||||||
|
))
|
||||||
|
else:
|
||||||
|
logger.debug(f"[메일] {safe_subj} — eml extract 미지원, 큐 스킵")
|
||||||
|
|
||||||
|
archived.append(safe_subj)
|
||||||
max_uid = max(max_uid, uid)
|
max_uid = max(max_uid, uid)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user