From 751cdc5be8d04acdefca1ab622011c1bcc2de227 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Wed, 15 Apr 2026 08:37:32 +0900 Subject: [PATCH] =?UTF-8?q?fix(queue):=20enqueue=20=EA=B2=BD=EB=A1=9C=20?= =?UTF-8?q?=EC=A4=91=EB=B3=B5=20=EB=B0=A9=EC=96=B4=20=E2=80=94=20partial?= =?UTF-8?q?=20unique=20index=20+=20=EC=A4=91=EC=95=99=20enqueue=5Fstage=20?= =?UTF-8?q?=ED=95=A8=EC=88=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 기존 UNIQUE(document_id, stage, status)는 pending+processing 동시 존재를 허용해서 stale 복구 시 충돌 발생. 2-layer 방어로 근본 차단: 1) DB: partial unique index uq_queue_active — 활성 행(pending/processing)은 (document_id, stage)당 최대 1개만 허용 2) App: enqueue_stage() 중앙 함수 — INSERT ON CONFLICT DO NOTHING으로 모든 9개 경로의 check-then-insert TOCTOU race 제거 migration 117은 guard check 포함 — 활성 중복이 남아있으면 RAISE EXCEPTION 으로 중단, 수동 정리 유도. Co-Authored-By: Claude Opus 4.6 (1M context) --- app/api/documents.py | 8 ++---- app/api/memos.py | 8 ++---- app/models/queue.py | 26 ++++++++++++++++--- app/workers/file_watcher.py | 24 +++-------------- app/workers/law_monitor.py | 6 ++--- app/workers/mailplus_archive.py | 6 ++--- app/workers/news_collector.py | 10 +++---- app/workers/queue_consumer.py | 18 ++----------- migrations/117_queue_active_unique.sql | 36 ++++++++++++++++++++++++++ 9 files changed, 77 insertions(+), 65 deletions(-) create mode 100644 migrations/117_queue_active_unique.sql diff --git a/app/api/documents.py b/app/api/documents.py index c8d2de9..58058b9 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -17,7 +17,7 @@ from core.config import settings from core.database import get_session from core.utils import file_hash from models.document import Document -from models.queue import ProcessingQueue +from models.queue import ProcessingQueue, enqueue_stage from models.user import User router = APIRouter() @@ -473,11 +473,7 @@ async def upload_document( await session.flush() # 처리 큐 등록 - session.add(ProcessingQueue( - document_id=doc.id, - stage="extract", - status="pending", - )) + await enqueue_stage(session, doc.id, "extract") await session.commit() return DocumentResponse.model_validate(doc) diff --git a/app/api/memos.py b/app/api/memos.py index 35f033a..9c94b47 100644 --- a/app/api/memos.py +++ b/app/api/memos.py @@ -13,7 +13,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from core.auth import get_current_user from core.database import get_session from models.document import Document -from models.queue import ProcessingQueue +from models.queue import ProcessingQueue, enqueue_stage from models.user import User router = APIRouter() @@ -57,11 +57,7 @@ async def _enqueue_ai_stages(session: AsyncSession, document_id: int): ) ) for stage in stages: - session.add(ProcessingQueue( - document_id=document_id, - stage=stage, - status="pending", - )) + await enqueue_stage(session, document_id, stage) # ─── 스키마 ─── diff --git a/app/models/queue.py b/app/models/queue.py index 480729b..0110f65 100644 --- a/app/models/queue.py +++ b/app/models/queue.py @@ -2,7 +2,9 @@ from datetime import datetime -from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, UniqueConstraint +from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, mapped_column from core.database import Base @@ -29,6 +31,24 @@ class ProcessingQueue(Base): started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) - __table_args__ = ( - UniqueConstraint("document_id", "stage", "status"), + # DB 제약은 partial unique index uq_queue_active로 관리 (migration 117) + + +async def enqueue_stage( + session: AsyncSession, document_id: int, stage: str, *, status: str = "pending", +) -> bool: + """ProcessingQueue에 행 추가 (DB 레벨 중복 방어). + + 같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면 + 아무것도 하지 않고 False 반환. + """ + stmt = ( + pg_insert(ProcessingQueue) + .values(document_id=document_id, stage=stage, status=status) + .on_conflict_do_nothing( + index_elements=["document_id", "stage"], + index_where=text("status IN ('pending', 'processing')"), + ) ) + result = await session.execute(stmt) + return result.rowcount > 0 diff --git a/app/workers/file_watcher.py b/app/workers/file_watcher.py index c4cacca..992f19c 100644 --- a/app/workers/file_watcher.py +++ b/app/workers/file_watcher.py @@ -2,13 +2,11 @@ from pathlib import Path -from sqlalchemy import select - from core.config import settings from core.database import async_session from core.utils import file_hash, setup_logger from models.document import Document -from models.queue import ProcessingQueue +from models.queue import enqueue_stage logger = setup_logger("file_watcher") @@ -67,11 +65,7 @@ async def watch_inbox(): session.add(doc) await session.flush() - session.add(ProcessingQueue( - document_id=doc.id, - stage="extract", - status="pending", - )) + await enqueue_stage(session, doc.id, "extract") new_count += 1 elif existing.file_hash != fhash: @@ -79,19 +73,7 @@ async def watch_inbox(): existing.file_hash = fhash existing.file_size = file_path.stat().st_size - # 기존 pending/processing 큐 항목이 없으면 extract부터 재시작 - queue_check = await session.execute( - select(ProcessingQueue).where( - ProcessingQueue.document_id == existing.id, - ProcessingQueue.status.in_(["pending", "processing"]), - ) - ) - if not queue_check.scalar_one_or_none(): - session.add(ProcessingQueue( - document_id=existing.id, - stage="extract", - status="pending", - )) + await enqueue_stage(session, existing.id, "extract") changed_count += 1 await session.commit() diff --git a/app/workers/law_monitor.py b/app/workers/law_monitor.py index 5505e1f..e95bbb6 100644 --- a/app/workers/law_monitor.py +++ b/app/workers/law_monitor.py @@ -18,7 +18,7 @@ from core.database import async_session from core.utils import create_caldav_todo, escape_ical_text, file_hash, send_smtp_email, setup_logger from models.automation import AutomationState from models.document import Document -from models.queue import ProcessingQueue +from models.queue import enqueue_stage logger = setup_logger("law_monitor") @@ -276,9 +276,7 @@ async def _save_law_split( session.add(doc) await session.flush() - session.add(ProcessingQueue( - document_id=doc.id, stage="extract", status="pending", - )) + await enqueue_stage(session, doc.id, "extract") count += 1 logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장") diff --git a/app/workers/mailplus_archive.py b/app/workers/mailplus_archive.py index c8b2e2a..7bbfd4d 100644 --- a/app/workers/mailplus_archive.py +++ b/app/workers/mailplus_archive.py @@ -20,7 +20,7 @@ from core.database import async_session from core.utils import file_hash, send_smtp_email, setup_logger from models.automation import AutomationState from models.document import Document -from models.queue import ProcessingQueue +from models.queue import enqueue_stage logger = setup_logger("mailplus_archive") @@ -178,9 +178,7 @@ async def run(): # TODO: extract_worker가 eml 본문/첨부 파싱 지원 시 이 조건 제거 if doc.file_format != "eml": - session.add(ProcessingQueue( - document_id=doc.id, stage="extract", status="pending", - )) + await enqueue_stage(session, doc.id, "extract") else: logger.debug(f"[메일] {safe_subj} — eml extract 미지원, 큐 스킵") diff --git a/app/workers/news_collector.py b/app/workers/news_collector.py index d709195..a1cbdea 100644 --- a/app/workers/news_collector.py +++ b/app/workers/news_collector.py @@ -14,7 +14,7 @@ from core.database import async_session from core.utils import setup_logger from models.document import Document from models.news_source import NewsSource -from models.queue import ProcessingQueue +from models.queue import enqueue_stage logger = setup_logger("news_collector") @@ -218,10 +218,10 @@ async def _fetch_rss(session, source: NewsSource) -> int: await session.flush() # summarize + embed 등록 (classify 불필요) - session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending")) + await enqueue_stage(session, doc.id, "summarize") days_old = (datetime.now(timezone.utc) - pub_dt).days if days_old <= 30: - session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending")) + await enqueue_stage(session, doc.id, "embed") count += 1 @@ -309,10 +309,10 @@ async def _fetch_api(session, source: NewsSource) -> int: session.add(doc) await session.flush() - session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending")) + await enqueue_stage(session, doc.id, "summarize") days_old = (datetime.now(timezone.utc) - pub_dt).days if days_old <= 30: - session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending")) + await enqueue_stage(session, doc.id, "embed") count += 1 diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 35f6d51..ac47db3 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import aliased from core.database import async_session from core.utils import setup_logger -from models.queue import ProcessingQueue +from models.queue import ProcessingQueue, enqueue_stage logger = setup_logger("queue_consumer") @@ -103,21 +103,7 @@ async def enqueue_next_stage(document_id: int, current_stage: str): async with async_session() as session: for next_stage in stages: - existing = await session.execute( - select(ProcessingQueue).where( - ProcessingQueue.document_id == document_id, - ProcessingQueue.stage == next_stage, - ProcessingQueue.status.in_(["pending", "processing"]), - ) - ) - if existing.scalar_one_or_none(): - continue - - session.add(ProcessingQueue( - document_id=document_id, - stage=next_stage, - status="pending", - )) + await enqueue_stage(session, document_id, next_stage) await session.commit() diff --git a/migrations/117_queue_active_unique.sql b/migrations/117_queue_active_unique.sql new file mode 100644 index 0000000..087c414 --- /dev/null +++ b/migrations/117_queue_active_unique.sql @@ -0,0 +1,36 @@ +-- Step 1: stale duplicate 삭제 (processing 10분+ 방치 + 같은 doc/stage에 pending 존재) +DELETE FROM processing_queue a +USING processing_queue b +WHERE a.document_id = b.document_id + AND a.stage = b.stage + AND a.status = 'processing' + AND a.started_at < NOW() - INTERVAL '10 minutes' + AND b.status = 'pending'; + +-- Step 2: guard check — 활성 중복이 남아있으면 migration 중단 +DO $$ +DECLARE + dup_count INTEGER; +BEGIN + SELECT COUNT(*) INTO dup_count + FROM ( + SELECT document_id, stage + FROM processing_queue + WHERE status IN ('pending', 'processing') + GROUP BY document_id, stage + HAVING COUNT(*) > 1 + ) sub; + + IF dup_count > 0 THEN + RAISE EXCEPTION 'migration 117 blocked: % active duplicate(s) remain. Run manual cleanup first.', dup_count; + END IF; +END $$; + +-- Step 3: 기존 constraint 제거 +ALTER TABLE processing_queue + DROP CONSTRAINT processing_queue_document_id_stage_status_key; + +-- Step 4: partial unique index (활성 행은 (document_id, stage)당 최대 1개) +CREATE UNIQUE INDEX uq_queue_active + ON processing_queue (document_id, stage) + WHERE status IN ('pending', 'processing');