fix(queue): enqueue 경로 중복 방어 — partial unique index + 중앙 enqueue_stage 함수

기존 UNIQUE(document_id, stage, status)는 pending+processing 동시 존재를
허용해서 stale 복구 시 충돌 발생. 2-layer 방어로 근본 차단:

1) DB: partial unique index uq_queue_active — 활성 행(pending/processing)은
   (document_id, stage)당 최대 1개만 허용
2) App: enqueue_stage() 중앙 함수 — INSERT ON CONFLICT DO NOTHING으로
   모든 9개 경로의 check-then-insert TOCTOU race 제거

migration 117은 guard check 포함 — 활성 중복이 남아있으면 RAISE EXCEPTION
으로 중단, 수동 정리 유도.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-15 08:37:32 +09:00
parent 8ec1e53ca4
commit 751cdc5be8
9 changed files with 77 additions and 65 deletions
+2 -6
View File
@@ -17,7 +17,7 @@ from core.config import settings
from core.database import get_session
from core.utils import file_hash
from models.document import Document
from models.queue import ProcessingQueue
from models.queue import ProcessingQueue, enqueue_stage
from models.user import User
router = APIRouter()
@@ -473,11 +473,7 @@ async def upload_document(
await session.flush()
# 처리 큐 등록
session.add(ProcessingQueue(
document_id=doc.id,
stage="extract",
status="pending",
))
await enqueue_stage(session, doc.id, "extract")
await session.commit()
return DocumentResponse.model_validate(doc)
+2 -6
View File
@@ -13,7 +13,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.database import get_session
from models.document import Document
from models.queue import ProcessingQueue
from models.queue import ProcessingQueue, enqueue_stage
from models.user import User
router = APIRouter()
@@ -57,11 +57,7 @@ async def _enqueue_ai_stages(session: AsyncSession, document_id: int):
)
)
for stage in stages:
session.add(ProcessingQueue(
document_id=document_id,
stage=stage,
status="pending",
))
await enqueue_stage(session, document_id, stage)
# ─── 스키마 ───
+23 -3
View File
@@ -2,7 +2,9 @@
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, UniqueConstraint
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
@@ -29,6 +31,24 @@ class ProcessingQueue(Base):
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
__table_args__ = (
UniqueConstraint("document_id", "stage", "status"),
# DB 제약은 partial unique index uq_queue_active로 관리 (migration 117)
async def enqueue_stage(
session: AsyncSession, document_id: int, stage: str, *, status: str = "pending",
) -> bool:
"""ProcessingQueue에 행 추가 (DB 레벨 중복 방어).
같은 (document_id, stage) 활성 (pending/processing) 이미 있으면
아무것도 하지 않고 False 반환.
"""
stmt = (
pg_insert(ProcessingQueue)
.values(document_id=document_id, stage=stage, status=status)
.on_conflict_do_nothing(
index_elements=["document_id", "stage"],
index_where=text("status IN ('pending', 'processing')"),
)
)
result = await session.execute(stmt)
return result.rowcount > 0
+3 -21
View File
@@ -2,13 +2,11 @@
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash, setup_logger
from models.document import Document
from models.queue import ProcessingQueue
from models.queue import enqueue_stage
logger = setup_logger("file_watcher")
@@ -67,11 +65,7 @@ async def watch_inbox():
session.add(doc)
await session.flush()
session.add(ProcessingQueue(
document_id=doc.id,
stage="extract",
status="pending",
))
await enqueue_stage(session, doc.id, "extract")
new_count += 1
elif existing.file_hash != fhash:
@@ -79,19 +73,7 @@ async def watch_inbox():
existing.file_hash = fhash
existing.file_size = file_path.stat().st_size
# 기존 pending/processing 큐 항목이 없으면 extract부터 재시작
queue_check = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == existing.id,
ProcessingQueue.status.in_(["pending", "processing"]),
)
)
if not queue_check.scalar_one_or_none():
session.add(ProcessingQueue(
document_id=existing.id,
stage="extract",
status="pending",
))
await enqueue_stage(session, existing.id, "extract")
changed_count += 1
await session.commit()
+2 -4
View File
@@ -18,7 +18,7 @@ from core.database import async_session
from core.utils import create_caldav_todo, escape_ical_text, file_hash, send_smtp_email, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import ProcessingQueue
from models.queue import enqueue_stage
logger = setup_logger("law_monitor")
@@ -276,9 +276,7 @@ async def _save_law_split(
session.add(doc)
await session.flush()
session.add(ProcessingQueue(
document_id=doc.id, stage="extract", status="pending",
))
await enqueue_stage(session, doc.id, "extract")
count += 1
logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장")
+2 -4
View File
@@ -20,7 +20,7 @@ from core.database import async_session
from core.utils import file_hash, send_smtp_email, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import ProcessingQueue
from models.queue import enqueue_stage
logger = setup_logger("mailplus_archive")
@@ -178,9 +178,7 @@ async def run():
# TODO: extract_worker가 eml 본문/첨부 파싱 지원 시 이 조건 제거
if doc.file_format != "eml":
session.add(ProcessingQueue(
document_id=doc.id, stage="extract", status="pending",
))
await enqueue_stage(session, doc.id, "extract")
else:
logger.debug(f"[메일] {safe_subj} — eml extract 미지원, 큐 스킵")
+5 -5
View File
@@ -14,7 +14,7 @@ from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import ProcessingQueue
from models.queue import enqueue_stage
logger = setup_logger("news_collector")
@@ -218,10 +218,10 @@ async def _fetch_rss(session, source: NewsSource) -> int:
await session.flush()
# summarize + embed 등록 (classify 불필요)
session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending"))
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending"))
await enqueue_stage(session, doc.id, "embed")
count += 1
@@ -309,10 +309,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
session.add(doc)
await session.flush()
session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending"))
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending"))
await enqueue_stage(session, doc.id, "embed")
count += 1
+2 -16
View File
@@ -8,7 +8,7 @@ from sqlalchemy.orm import aliased
from core.database import async_session
from core.utils import setup_logger
from models.queue import ProcessingQueue
from models.queue import ProcessingQueue, enqueue_stage
logger = setup_logger("queue_consumer")
@@ -103,21 +103,7 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
async with async_session() as session:
for next_stage in stages:
existing = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == next_stage,
ProcessingQueue.status.in_(["pending", "processing"]),
)
)
if existing.scalar_one_or_none():
continue
session.add(ProcessingQueue(
document_id=document_id,
stage=next_stage,
status="pending",
))
await enqueue_stage(session, document_id, next_stage)
await session.commit()
+36
View File
@@ -0,0 +1,36 @@
-- Step 1: stale duplicate 삭제 (processing 10분+ 방치 + 같은 doc/stage에 pending 존재)
DELETE FROM processing_queue a
USING processing_queue b
WHERE a.document_id = b.document_id
AND a.stage = b.stage
AND a.status = 'processing'
AND a.started_at < NOW() - INTERVAL '10 minutes'
AND b.status = 'pending';
-- Step 2: guard check — 활성 중복이 남아있으면 migration 중단
DO $$
DECLARE
dup_count INTEGER;
BEGIN
SELECT COUNT(*) INTO dup_count
FROM (
SELECT document_id, stage
FROM processing_queue
WHERE status IN ('pending', 'processing')
GROUP BY document_id, stage
HAVING COUNT(*) > 1
) sub;
IF dup_count > 0 THEN
RAISE EXCEPTION 'migration 117 blocked: % active duplicate(s) remain. Run manual cleanup first.', dup_count;
END IF;
END $$;
-- Step 3: 기존 constraint 제거
ALTER TABLE processing_queue
DROP CONSTRAINT processing_queue_document_id_stage_status_key;
-- Step 4: partial unique index (활성 행은 (document_id, stage)당 최대 1개)
CREATE UNIQUE INDEX uq_queue_active
ON processing_queue (document_id, stage)
WHERE status IN ('pending', 'processing');