Files
hyungi_document_server/app/models/queue.py
T
hyungi 7cd8cfde0a feat(news): crawl-24x7 A그룹 — 레지스트리 증축·조건부 GET·fulltext 승격·politeness·source_health
A-3 migrations 319-323 (news_sources 9컬럼 + source_channel 'crawl' + process_stage 'fulltext' + source_health)
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송)+콘텐츠 해시 변경감지, A-4 politeness 코어(per-domain 직렬+robots+정직UA),
A-2+A-7 fulltext_worker(4-tier 재사용·NAS crawl_raw gzip 보존·격하 경로·03:40 reconcile 안전망),
A-5 circuit breaker(3/10 임계, enabled 미터치), A-6 포털 전재 2차 dedup(제목+3일, 12자 게이트).
기존 소스 fulltext_policy='none' 기본 = 무회귀. plan crawl-24x7-1, 예외 박제 crawl-24x7-exec1-20260610.md
2026-06-10 13:03:31 +09:00

79 lines
3.2 KiB
Python

"""processing_queue 테이블 ORM (비동기 가공 큐)"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, Enum, ForeignKey, SmallInteger, Text, text
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class ProcessingQueue(Base):
__tablename__ = "processing_queue"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False)
stage: Mapped[str] = mapped_column(
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
name="process_stage",
create_type=False,
),
nullable=False,
)
status: Mapped[str] = mapped_column(
Enum("pending", "processing", "completed", "failed", name="process_status"),
default="pending"
)
attempts: Mapped[int] = mapped_column(SmallInteger, default=0)
max_attempts: Mapped[int] = mapped_column(SmallInteger, default=3)
error_message: Mapped[str | None] = mapped_column(Text)
# B-1: deep_summary stage 가 EscalationEnvelope 를 payload 로 싣는다. 다른 stage 는 NULL.
payload: Mapped[dict | None] = mapped_column(JSONB)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# DB 제약은 partial unique index uq_queue_active로 관리 (migration 117)
async def enqueue_stage(
session: AsyncSession,
document_id: int,
stage: str,
*,
status: str = "pending",
payload: dict | None = None,
) -> bool:
"""ProcessingQueue에 행 추가 (DB 레벨 중복 방어).
같은 (document_id, stage)에 활성 행(pending/processing)이 이미 있으면
아무것도 하지 않고 False 반환.
B-1: payload 옵션으로 deep_summary 에 EscalationEnvelope JSON 을 실을 수 있다.
같은 문서 deep_summary 가 재제안될 경우 on_conflict_do_nothing 으로 기존 payload
유지 (최초 envelope 가 원본). 이후 재처리 시 재분석은 새 classify 가 트리거.
"""
values: dict = {"document_id": document_id, "stage": stage, "status": status}
if payload is not None:
values["payload"] = payload
stmt = (
pg_insert(ProcessingQueue)
.values(**values)
.on_conflict_do_nothing(
index_elements=["document_id", "stage"],
index_where=text("status IN ('pending', 'processing')"),
)
)
result = await session.execute(stmt)
return result.rowcount > 0