diff --git a/app/models/queue.py b/app/models/queue.py index 28c19fd..a388272 100644 --- a/app/models/queue.py +++ b/app/models/queue.py @@ -14,7 +14,7 @@ class ProcessingQueue(Base): id: Mapped[int] = mapped_column(BigInteger, primary_key=True) document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False) stage: Mapped[str] = mapped_column( - Enum("extract", "classify", "embed", "preview", name="process_stage"), nullable=False + Enum("extract", "classify", "summarize", "embed", "preview", name="process_stage"), nullable=False ) status: Mapped[str] = mapped_column( Enum("pending", "processing", "completed", "failed", name="process_status"), diff --git a/app/workers/news_collector.py b/app/workers/news_collector.py index 3e7533e..228b56f 100644 --- a/app/workers/news_collector.py +++ b/app/workers/news_collector.py @@ -162,7 +162,8 @@ async def _fetch_rss(session, source: NewsSource) -> int: session.add(doc) await session.flush() - # embed만 등록 (classify 불필요 — 소스/분야 이미 확정) + # summarize + embed 등록 (classify 불필요) + session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending")) days_old = (datetime.now(timezone.utc) - pub_dt).days if days_old <= 30: session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending")) @@ -243,6 +244,7 @@ async def _fetch_api(session, source: NewsSource) -> int: session.add(doc) await session.flush() + session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending")) days_old = (datetime.now(timezone.utc) - pub_dt).days if days_old <= 30: session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending")) diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 05c8637..6a21b93 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -11,7 +11,7 @@ from models.queue import ProcessingQueue logger = setup_logger("queue_consumer") # stage별 배치 크기 -BATCH_SIZE = {"extract": 5, "classify": 3, "embed": 1, "preview": 2} +BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "preview": 2} STALE_THRESHOLD_MINUTES = 10 @@ -65,10 +65,12 @@ async def consume_queue(): from workers.embed_worker import process as embed_process from workers.extract_worker import process as extract_process from workers.preview_worker import process as preview_process + from workers.summarize_worker import process as summarize_process workers = { "extract": extract_process, "classify": classify_process, + "summarize": summarize_process, "embed": embed_process, "preview": preview_process, } diff --git a/app/workers/summarize_worker.py b/app/workers/summarize_worker.py new file mode 100644 index 0000000..848f8e4 --- /dev/null +++ b/app/workers/summarize_worker.py @@ -0,0 +1,35 @@ +"""요약 전용 워커 — 뉴스 등 classify 불필요한 문서의 AI 요약만 생성""" + +from datetime import datetime, timezone + +from sqlalchemy.ext.asyncio import AsyncSession + +from ai.client import AIClient, strip_thinking +from core.utils import setup_logger +from models.document import Document + +logger = setup_logger("summarize_worker") + + +async def process(document_id: int, session: AsyncSession) -> None: + """문서 AI 요약 생성 (분류 없이 요약만)""" + doc = await session.get(Document, document_id) + if not doc: + raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") + + if not doc.extracted_text: + raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음") + + if doc.ai_summary: + logger.info(f"[요약] document_id={document_id}: 이미 요약 있음, skip") + return + + client = AIClient() + try: + summary = await client.summarize(doc.extracted_text[:15000]) + doc.ai_summary = strip_thinking(summary) + doc.ai_model_version = "qwen3.5-35b-a3b" + doc.ai_processed_at = datetime.now(timezone.utc) + logger.info(f"[요약] document_id={document_id}: {len(doc.ai_summary)}자") + finally: + await client.close()