- Implement kordoc /parse endpoint (HWP/HWPX/PDF via kordoc lib, text files direct read, images flagged for OCR) - Add queue consumer with APScheduler (1min interval, stage chaining extract→classify→embed, stale item recovery, retry logic) - Add extract worker (kordoc HTTP call + direct text read) - Add classify worker (Qwen3.5 AI classification with think-tag stripping and robust JSON extraction from AI responses) - Add embed worker (GPU server nomic-embed-text, graceful failure) - Add DEVONthink migration script with folder mapping for 16 DBs, dry-run mode, batch commits, and idempotent file_path UNIQUE - Enhance ai/client.py with strip_thinking() and parse_json_response() Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
118 lines
4.2 KiB
Python
118 lines
4.2 KiB
Python
"""처리 큐 소비자 — APScheduler에서 1분 간격으로 호출"""
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from sqlalchemy import select, update
|
|
|
|
from core.database import async_session
|
|
from core.utils import setup_logger
|
|
from models.queue import ProcessingQueue
|
|
|
|
logger = setup_logger("queue_consumer")
|
|
|
|
# stage별 배치 크기
|
|
BATCH_SIZE = {"extract": 5, "classify": 3, "embed": 1}
|
|
STALE_THRESHOLD_MINUTES = 10
|
|
|
|
|
|
async def reset_stale_items():
|
|
"""processing 상태로 10분 이상 방치된 항목 복구"""
|
|
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_THRESHOLD_MINUTES)
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
update(ProcessingQueue)
|
|
.where(
|
|
ProcessingQueue.status == "processing",
|
|
ProcessingQueue.started_at < cutoff,
|
|
)
|
|
.values(status="pending", started_at=None)
|
|
)
|
|
if result.rowcount > 0:
|
|
await session.commit()
|
|
logger.warning(f"stale 항목 {result.rowcount}건 복구")
|
|
|
|
|
|
async def enqueue_next_stage(document_id: int, current_stage: str, session):
|
|
"""현재 stage 완료 후 다음 stage를 pending으로 등록"""
|
|
next_stages = {"extract": "classify", "classify": "embed"}
|
|
next_stage = next_stages.get(current_stage)
|
|
if not next_stage:
|
|
return
|
|
|
|
# 이미 존재하는지 확인 (중복 방지)
|
|
existing = await session.execute(
|
|
select(ProcessingQueue).where(
|
|
ProcessingQueue.document_id == document_id,
|
|
ProcessingQueue.stage == next_stage,
|
|
ProcessingQueue.status.in_(["pending", "processing"]),
|
|
)
|
|
)
|
|
if existing.scalar_one_or_none():
|
|
return
|
|
|
|
session.add(ProcessingQueue(
|
|
document_id=document_id,
|
|
stage=next_stage,
|
|
status="pending",
|
|
))
|
|
|
|
|
|
async def consume_queue():
|
|
"""큐에서 pending 항목을 가져와 stage별 워커 실행"""
|
|
# 지연 임포트 (순환 참조 방지)
|
|
from workers.extract_worker import process as extract_process
|
|
from workers.classify_worker import process as classify_process
|
|
from workers.embed_worker import process as embed_process
|
|
|
|
workers = {
|
|
"extract": extract_process,
|
|
"classify": classify_process,
|
|
"embed": embed_process,
|
|
}
|
|
|
|
# stale 항목 복구
|
|
await reset_stale_items()
|
|
|
|
for stage, worker_fn in workers.items():
|
|
batch_size = BATCH_SIZE.get(stage, 3)
|
|
|
|
async with async_session() as session:
|
|
result = await session.execute(
|
|
select(ProcessingQueue)
|
|
.where(
|
|
ProcessingQueue.stage == stage,
|
|
ProcessingQueue.status == "pending",
|
|
)
|
|
.order_by(ProcessingQueue.created_at)
|
|
.limit(batch_size)
|
|
)
|
|
items = result.scalars().all()
|
|
|
|
for item in items:
|
|
item.status = "processing"
|
|
item.started_at = datetime.now(timezone.utc)
|
|
item.attempts += 1
|
|
await session.commit()
|
|
|
|
try:
|
|
await worker_fn(item.document_id, session)
|
|
item.status = "completed"
|
|
item.completed_at = datetime.now(timezone.utc)
|
|
await enqueue_next_stage(item.document_id, stage, session)
|
|
await session.commit()
|
|
logger.info(f"[{stage}] document_id={item.document_id} 완료")
|
|
|
|
except Exception as e:
|
|
await session.rollback()
|
|
# 세션에서 item 다시 로드
|
|
item = await session.get(ProcessingQueue, item.id)
|
|
item.error_message = str(e)[:500]
|
|
if item.attempts >= item.max_attempts:
|
|
item.status = "failed"
|
|
logger.error(f"[{stage}] document_id={item.document_id} 영구 실패: {e}")
|
|
else:
|
|
item.status = "pending"
|
|
item.started_at = None
|
|
logger.warning(f"[{stage}] document_id={item.document_id} 재시도 예정 ({item.attempts}/{item.max_attempts}): {e}")
|
|
await session.commit()
|