8ec1e53ca4
stale processing 행을 pending으로 bulk UPDATE 시 이미 같은 (document_id, stage, pending) 행이 존재하면 unique constraint 위반으로 APScheduler consume_queue 잡 전체가 크래시. 2-step 접근으로 변경: 1) pending 중복 있는 stale processing 행은 DELETE 2) 나머지만 pending으로 UPDATE + 예외 삼키기로 stale reset 실패가 전체 큐 소비를 죽이지 않게 방어 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
228 lines
9.6 KiB
Python
228 lines
9.6 KiB
Python
"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출"""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlalchemy import select, update, delete, exists
|
|
from sqlalchemy.exc import IntegrityError, SQLAlchemyError
|
|
from sqlalchemy.orm import aliased
|
|
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.queue import ProcessingQueue
|
|
|
|
logger = setup_logger("queue_consumer")
|
|
|
|
# stage별 배치 크기
|
|
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, "preview": 2}
|
|
STALE_THRESHOLD_MINUTES = 10
|
|
|
|
|
|
async def reset_stale_items():
|
|
"""processing 상태로 오래 방치된 항목 복구
|
|
|
|
1) 같은 (document_id, stage)에 pending 행이 이미 있으면
|
|
stale processing 행은 중복이므로 삭제
|
|
2) pending이 없는 stale processing 행만 pending으로 되돌림
|
|
"""
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_THRESHOLD_MINUTES)
|
|
processing_row = aliased(ProcessingQueue)
|
|
pending_row = aliased(ProcessingQueue)
|
|
|
|
try:
|
|
async with async_session() as session:
|
|
# Step A: pending 중복이 이미 있는 stale processing 삭제
|
|
delete_stmt = (
|
|
delete(ProcessingQueue)
|
|
.where(
|
|
ProcessingQueue.id.in_(
|
|
select(processing_row.id).where(
|
|
processing_row.status == "processing",
|
|
processing_row.started_at.is_not(None),
|
|
processing_row.started_at < cutoff,
|
|
exists(
|
|
select(1).where(
|
|
pending_row.document_id == processing_row.document_id,
|
|
pending_row.stage == processing_row.stage,
|
|
pending_row.status == "pending",
|
|
)
|
|
),
|
|
)
|
|
)
|
|
)
|
|
)
|
|
delete_result = await session.execute(delete_stmt)
|
|
|
|
# Step B: pending 없는 stale processing만 pending으로 복구
|
|
recoverable_ids = select(processing_row.id).where(
|
|
processing_row.status == "processing",
|
|
processing_row.started_at.is_not(None),
|
|
processing_row.started_at < cutoff,
|
|
~exists(
|
|
select(1).where(
|
|
pending_row.document_id == processing_row.document_id,
|
|
pending_row.stage == processing_row.stage,
|
|
pending_row.status == "pending",
|
|
)
|
|
),
|
|
)
|
|
update_stmt = (
|
|
update(ProcessingQueue)
|
|
.where(ProcessingQueue.id.in_(recoverable_ids))
|
|
.values(status="pending", started_at=None)
|
|
)
|
|
update_result = await session.execute(update_stmt)
|
|
|
|
await session.commit()
|
|
|
|
deleted = delete_result.rowcount or 0
|
|
recovered = update_result.rowcount or 0
|
|
if deleted > 0:
|
|
logger.warning(
|
|
"deleted %s stale processing rows that already had pending duplicates",
|
|
deleted,
|
|
)
|
|
if recovered > 0:
|
|
logger.warning(
|
|
"recovered %s stale processing rows back to pending",
|
|
recovered,
|
|
)
|
|
except IntegrityError:
|
|
logger.exception("reset_stale_items failed with IntegrityError; skipping this cycle")
|
|
except SQLAlchemyError:
|
|
logger.exception("reset_stale_items failed with database error; skipping this cycle")
|
|
except Exception:
|
|
logger.exception("reset_stale_items failed unexpectedly; skipping this cycle")
|
|
|
|
|
|
async def enqueue_next_stage(document_id: int, current_stage: str):
|
|
"""현재 stage 완료 후 다음 stage를 pending으로 등록"""
|
|
next_stages = {"extract": ["classify", "preview"], "classify": ["embed", "chunk"]}
|
|
stages = next_stages.get(current_stage, [])
|
|
if not stages:
|
|
return
|
|
|
|
async with async_session() as session:
|
|
for next_stage in stages:
|
|
existing = await session.execute(
|
|
select(ProcessingQueue).where(
|
|
ProcessingQueue.document_id == document_id,
|
|
ProcessingQueue.stage == next_stage,
|
|
ProcessingQueue.status.in_(["pending", "processing"]),
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
continue
|
|
|
|
session.add(ProcessingQueue(
|
|
document_id=document_id,
|
|
stage=next_stage,
|
|
status="pending",
|
|
))
|
|
await session.commit()
|
|
|
|
|
|
async def consume_queue():
|
|
"""큐에서 pending 항목을 가져와 stage별 워커 실행"""
|
|
from workers.classify_worker import process as classify_process
|
|
from workers.chunk_worker import process as chunk_process
|
|
from workers.embed_worker import process as embed_process
|
|
from workers.extract_worker import process as extract_process
|
|
from workers.preview_worker import process as preview_process
|
|
from workers.summarize_worker import process as summarize_process
|
|
|
|
workers = {
|
|
"extract": extract_process,
|
|
"classify": classify_process,
|
|
"summarize": summarize_process,
|
|
"embed": embed_process,
|
|
"chunk": chunk_process,
|
|
"preview": preview_process,
|
|
}
|
|
|
|
try:
|
|
await reset_stale_items()
|
|
except Exception:
|
|
logger.exception("stale reset failed, but continuing queue consumption")
|
|
|
|
for stage, worker_fn in workers.items():
|
|
batch_size = BATCH_SIZE.get(stage, 3)
|
|
|
|
# pending 항목 조회
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(ProcessingQueue.id, ProcessingQueue.document_id)
|
|
.where(
|
|
ProcessingQueue.stage == stage,
|
|
ProcessingQueue.status == "pending",
|
|
)
|
|
.order_by(ProcessingQueue.created_at)
|
|
.limit(batch_size)
|
|
)
|
|
pending_items = result.all()
|
|
|
|
# 각 항목을 독립 세션에서 처리
|
|
for queue_id, document_id in pending_items:
|
|
# 상태를 processing으로 변경
|
|
async with async_session() as session:
|
|
item = await session.get(ProcessingQueue, queue_id)
|
|
if not item or item.status != "pending":
|
|
continue
|
|
item.status = "processing"
|
|
item.started_at = datetime.now(timezone.utc)
|
|
item.attempts += 1
|
|
await session.commit()
|
|
|
|
# 워커 실행 (독립 세션)
|
|
try:
|
|
# note(메모)는 이미 extracted_text가 있으므로 extract/preview skip
|
|
if stage in ("extract", "preview"):
|
|
from models.document import Document
|
|
async with async_session() as check_session:
|
|
doc = await check_session.get(Document, document_id)
|
|
if doc and doc.file_type == "note":
|
|
async with async_session() as skip_session:
|
|
item = await skip_session.get(ProcessingQueue, queue_id)
|
|
if item:
|
|
item.status = "completed"
|
|
item.completed_at = datetime.now(timezone.utc)
|
|
await skip_session.commit()
|
|
await enqueue_next_stage(document_id, stage)
|
|
logger.info(f"[{stage}] document_id={document_id} skip (note)")
|
|
continue
|
|
|
|
async with async_session() as worker_session:
|
|
await worker_fn(document_id, worker_session)
|
|
await worker_session.commit()
|
|
|
|
# 완료 처리
|
|
async with async_session() as session:
|
|
item = await session.get(ProcessingQueue, queue_id)
|
|
if not item:
|
|
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
|
continue
|
|
item.status = "completed"
|
|
item.completed_at = datetime.now(timezone.utc)
|
|
await session.commit()
|
|
|
|
await enqueue_next_stage(document_id, stage)
|
|
logger.info(f"[{stage}] document_id={document_id} 완료")
|
|
|
|
except Exception as e:
|
|
# 실패 처리
|
|
async with async_session() as session:
|
|
item = await session.get(ProcessingQueue, queue_id)
|
|
if not item:
|
|
logger.warning(f"[{stage}] queue_id={queue_id} 없음 (삭제됨?), skip")
|
|
continue
|
|
# 빈 메시지 방어: str → repr → 클래스명 순 fallback
|
|
err_text = str(e) or repr(e) or type(e).__name__
|
|
item.error_message = err_text[:500]
|
|
if item.attempts >= item.max_attempts:
|
|
item.status = "failed"
|
|
logger.error(f"[{stage}] document_id={document_id} 영구 실패: {e}")
|
|
else:
|
|
item.status = "pending"
|
|
item.started_at = None
|
|
logger.warning(f"[{stage}] document_id={document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
|
|
await session.commit()
|