Files
hyungi_document_server/app/workers/classify_worker.py
Hyungi Ahn 299fac3904 feat: implement Phase 1 data pipeline and migration
- Implement kordoc /parse endpoint (HWP/HWPX/PDF via kordoc lib,
  text files direct read, images flagged for OCR)
- Add queue consumer with APScheduler (1min interval, stage chaining
  extract→classify→embed, stale item recovery, retry logic)
- Add extract worker (kordoc HTTP call + direct text read)
- Add classify worker (Qwen3.5 AI classification with think-tag
  stripping and robust JSON extraction from AI responses)
- Add embed worker (GPU server nomic-embed-text, graceful failure)
- Add DEVONthink migration script with folder mapping for 16 DBs,
  dry-run mode, batch commits, and idempotent file_path UNIQUE
- Enhance ai/client.py with strip_thinking() and parse_json_response()

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 14:35:36 +09:00

77 lines
2.5 KiB
Python

"""AI 분류 워커 — Qwen3.5로 도메인/태그/요약 생성"""
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from core.utils import setup_logger
from models.document import Document
logger = setup_logger("classify_worker")
# 분류용 텍스트 최대 길이 (Qwen3.5 컨텍스트 관리)
MAX_CLASSIFY_TEXT = 8000
# 유효한 도메인 목록
VALID_DOMAINS = {
"Knowledge/Philosophy",
"Knowledge/Language",
"Knowledge/Engineering",
"Knowledge/Industrial_Safety",
"Knowledge/Programming",
"Knowledge/General",
"Reference",
}
async def process(document_id: int, session: AsyncSession) -> None:
"""문서 AI 분류 + 요약"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.extracted_text:
raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음")
client = AIClient()
try:
# ─── 분류 ───
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
raw_response = await client.classify(truncated)
parsed = parse_json_response(raw_response)
if not parsed:
raise ValueError(f"AI 응답에서 JSON 추출 실패: {raw_response[:200]}")
# 유효성 검증 + DB 업데이트
domain = parsed.get("domain", "")
if domain not in VALID_DOMAINS:
logger.warning(f"[분류] document_id={document_id}: 알 수 없는 도메인 '{domain}', Knowledge/General로 대체")
domain = "Knowledge/General"
doc.ai_domain = domain
doc.ai_sub_group = parsed.get("sub_group", "")
doc.ai_tags = parsed.get("tags", [])
if parsed.get("sourceChannel") and not doc.source_channel:
doc.source_channel = parsed["sourceChannel"]
if parsed.get("dataOrigin") and not doc.data_origin:
doc.data_origin = parsed["dataOrigin"]
# ─── 요약 ───
summary = await client.summarize(doc.extracted_text[:15000])
doc.ai_summary = summary
# ─── 메타데이터 ───
doc.ai_model_version = "qwen3.5-35b-a3b"
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
f"[분류] document_id={document_id}: "
f"domain={domain}, tags={doc.ai_tags}, summary={len(summary)}"
)
finally:
await client.close()