"""AI 분류 워커 — taxonomy 기반 도메인/문서타입/태그/요약 생성""" import yaml from datetime import datetime, timezone from pathlib import Path from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient, parse_json_response, strip_thinking from core.config import settings from core.utils import setup_logger from models.document import Document logger = setup_logger("classify_worker") MAX_CLASSIFY_TEXT = 8000 # config.yaml에서 taxonomy 로딩 _config_path = Path(__file__).resolve().parent.parent / "config.yaml" _config = yaml.safe_load(_config_path.read_text(encoding="utf-8")) DOCUMENT_TYPES = set(_config.get("document_types", [])) def _get_taxonomy_leaf_paths(taxonomy: dict, prefix: str = "") -> set[str]: """taxonomy dict에서 모든 유효한 경로를 추출""" paths = set() for key, value in taxonomy.items(): current = f"{prefix}/{key}" if prefix else key if isinstance(value, dict): if not value: paths.add(current) else: paths.update(_get_taxonomy_leaf_paths(value, current)) elif isinstance(value, list): if not value: paths.add(current) else: for leaf in value: paths.add(f"{current}/{leaf}") paths.add(current) # 2단계도 허용 (leaf가 없는 경우용) else: paths.add(current) return paths VALID_DOMAIN_PATHS = _get_taxonomy_leaf_paths(_config.get("taxonomy", {})) def _validate_domain(domain: str) -> str: """domain이 taxonomy에 존재하는지 검증, 없으면 최대한 가까운 경로 찾기""" if domain in VALID_DOMAIN_PATHS: return domain # 부분 매칭 시도 (2단계까지) parts = domain.split("/") for i in range(len(parts), 0, -1): partial = "/".join(parts[:i]) if partial in VALID_DOMAIN_PATHS: logger.warning(f"[분류] domain '{domain}' → '{partial}' (부분 매칭)") return partial logger.warning(f"[분류] domain '{domain}' taxonomy에 없음, General/Reading_Notes로 대체") return "General/Reading_Notes" async def process(document_id: int, session: AsyncSession) -> None: """문서 AI 분류 + 요약""" doc = await session.get(Document, document_id) if not doc: raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") if not doc.extracted_text: raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음") client = AIClient() try: # ─── 분류 ─── truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT] raw_response = await client.classify(truncated) parsed = parse_json_response(raw_response) if not parsed: raise ValueError(f"AI 응답에서 JSON 추출 실패: {raw_response[:200]}") # domain 검증 domain = _validate_domain(parsed.get("domain", "")) doc.ai_domain = domain # sub_group은 domain 경로에서 추출 (호환성) parts = domain.split("/") doc.ai_sub_group = parts[1] if len(parts) > 1 else "" # document_type 검증 doc_type = parsed.get("document_type", "") doc.document_type = doc_type if doc_type in DOCUMENT_TYPES else "Note" # confidence confidence = parsed.get("confidence", 0.5) doc.ai_confidence = max(0.0, min(1.0, float(confidence))) # importance importance = parsed.get("importance", "medium") doc.importance = importance if importance in ("high", "medium", "low") else "medium" # tags doc.ai_tags = parsed.get("tags", [])[:5] # source/origin if parsed.get("sourceChannel") and not doc.source_channel: doc.source_channel = parsed["sourceChannel"] if parsed.get("dataOrigin") and not doc.data_origin: doc.data_origin = parsed["dataOrigin"] # ─── 요약 ─── summary = await client.summarize(doc.extracted_text[:15000]) doc.ai_summary = strip_thinking(summary) # ─── 메타데이터 ─── doc.ai_model_version = "qwen3.5-35b-a3b" doc.ai_processed_at = datetime.now(timezone.utc) logger.info( f"[분류] document_id={document_id}: " f"domain={domain}, type={doc.document_type}, " f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}" ) finally: await client.close() # _move_to_knowledge 제거됨 — 파일은 원본 위치 유지, 분류는 DB 메타데이터만