feat: summarize 전용 stage — 뉴스 AI 요약 (classify 없이)
- summarize_worker: 요약만 생성 (분류 안 함) - queue_consumer: summarize stage 추가 (batch 3) - news_collector: summarize + embed 큐 등록 - process_stage enum에 'summarize' 추가 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -14,7 +14,7 @@ class ProcessingQueue(Base):
|
|||||||
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
||||||
document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False)
|
document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False)
|
||||||
stage: Mapped[str] = mapped_column(
|
stage: Mapped[str] = mapped_column(
|
||||||
Enum("extract", "classify", "embed", "preview", name="process_stage"), nullable=False
|
Enum("extract", "classify", "summarize", "embed", "preview", name="process_stage"), nullable=False
|
||||||
)
|
)
|
||||||
status: Mapped[str] = mapped_column(
|
status: Mapped[str] = mapped_column(
|
||||||
Enum("pending", "processing", "completed", "failed", name="process_status"),
|
Enum("pending", "processing", "completed", "failed", name="process_status"),
|
||||||
|
|||||||
@@ -162,7 +162,8 @@ async def _fetch_rss(session, source: NewsSource) -> int:
|
|||||||
session.add(doc)
|
session.add(doc)
|
||||||
await session.flush()
|
await session.flush()
|
||||||
|
|
||||||
# embed만 등록 (classify 불필요 — 소스/분야 이미 확정)
|
# summarize + embed 등록 (classify 불필요)
|
||||||
|
session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending"))
|
||||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||||
if days_old <= 30:
|
if days_old <= 30:
|
||||||
session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending"))
|
session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending"))
|
||||||
@@ -243,6 +244,7 @@ async def _fetch_api(session, source: NewsSource) -> int:
|
|||||||
session.add(doc)
|
session.add(doc)
|
||||||
await session.flush()
|
await session.flush()
|
||||||
|
|
||||||
|
session.add(ProcessingQueue(document_id=doc.id, stage="summarize", status="pending"))
|
||||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||||
if days_old <= 30:
|
if days_old <= 30:
|
||||||
session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending"))
|
session.add(ProcessingQueue(document_id=doc.id, stage="embed", status="pending"))
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ from models.queue import ProcessingQueue
|
|||||||
logger = setup_logger("queue_consumer")
|
logger = setup_logger("queue_consumer")
|
||||||
|
|
||||||
# stage별 배치 크기
|
# stage별 배치 크기
|
||||||
BATCH_SIZE = {"extract": 5, "classify": 3, "embed": 1, "preview": 2}
|
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "preview": 2}
|
||||||
STALE_THRESHOLD_MINUTES = 10
|
STALE_THRESHOLD_MINUTES = 10
|
||||||
|
|
||||||
|
|
||||||
@@ -65,10 +65,12 @@ async def consume_queue():
|
|||||||
from workers.embed_worker import process as embed_process
|
from workers.embed_worker import process as embed_process
|
||||||
from workers.extract_worker import process as extract_process
|
from workers.extract_worker import process as extract_process
|
||||||
from workers.preview_worker import process as preview_process
|
from workers.preview_worker import process as preview_process
|
||||||
|
from workers.summarize_worker import process as summarize_process
|
||||||
|
|
||||||
workers = {
|
workers = {
|
||||||
"extract": extract_process,
|
"extract": extract_process,
|
||||||
"classify": classify_process,
|
"classify": classify_process,
|
||||||
|
"summarize": summarize_process,
|
||||||
"embed": embed_process,
|
"embed": embed_process,
|
||||||
"preview": preview_process,
|
"preview": preview_process,
|
||||||
}
|
}
|
||||||
|
|||||||
35
app/workers/summarize_worker.py
Normal file
35
app/workers/summarize_worker.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
"""요약 전용 워커 — 뉴스 등 classify 불필요한 문서의 AI 요약만 생성"""
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
|
from ai.client import AIClient, strip_thinking
|
||||||
|
from core.utils import setup_logger
|
||||||
|
from models.document import Document
|
||||||
|
|
||||||
|
logger = setup_logger("summarize_worker")
|
||||||
|
|
||||||
|
|
||||||
|
async def process(document_id: int, session: AsyncSession) -> None:
|
||||||
|
"""문서 AI 요약 생성 (분류 없이 요약만)"""
|
||||||
|
doc = await session.get(Document, document_id)
|
||||||
|
if not doc:
|
||||||
|
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
|
||||||
|
|
||||||
|
if not doc.extracted_text:
|
||||||
|
raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음")
|
||||||
|
|
||||||
|
if doc.ai_summary:
|
||||||
|
logger.info(f"[요약] document_id={document_id}: 이미 요약 있음, skip")
|
||||||
|
return
|
||||||
|
|
||||||
|
client = AIClient()
|
||||||
|
try:
|
||||||
|
summary = await client.summarize(doc.extracted_text[:15000])
|
||||||
|
doc.ai_summary = strip_thinking(summary)
|
||||||
|
doc.ai_model_version = "qwen3.5-35b-a3b"
|
||||||
|
doc.ai_processed_at = datetime.now(timezone.utc)
|
||||||
|
logger.info(f"[요약] document_id={document_id}: {len(doc.ai_summary)}자")
|
||||||
|
finally:
|
||||||
|
await client.close()
|
||||||
Reference in New Issue
Block a user