Files
hyungi_document_server/app/workers/classify_worker.py
Hyungi Ahn 24142ea605 fix: Codex 리뷰 5건 수정 (critical 1 + high 4)
1. [critical] config.yaml → settings 객체에서 taxonomy 로드 (import crash 방지)
2. [high] ODF 변환: file_path 유지, derived_path 별도 필드 (무한 중복 방지)
3. [high] 법령 분할: 첫 장 이전 조문을 "서문"으로 보존
4. [high] Inbox: review_status 필드 분리 (pending/approved/rejected)
5. [high] 삭제: soft-delete (deleted_at) + worker 방어 + active_documents 뷰
   - 모든 조회에 deleted_at IS NULL 일관 적용
   - queue_consumer: row 없으면 gracefully skip

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-06 07:15:13 +09:00

128 lines
4.4 KiB
Python

"""AI 분류 워커 — taxonomy 기반 도메인/문서타입/태그/요약 생성"""
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response, strip_thinking
from core.config import settings
from core.utils import setup_logger
from models.document import Document
logger = setup_logger("classify_worker")
MAX_CLASSIFY_TEXT = 8000
# settings에서 taxonomy/document_types 로딩
DOCUMENT_TYPES = set(settings.document_types)
def _get_taxonomy_leaf_paths(taxonomy: dict, prefix: str = "") -> set[str]:
"""taxonomy dict에서 모든 유효한 경로를 추출"""
paths = set()
for key, value in taxonomy.items():
current = f"{prefix}/{key}" if prefix else key
if isinstance(value, dict):
if not value:
paths.add(current)
else:
paths.update(_get_taxonomy_leaf_paths(value, current))
elif isinstance(value, list):
if not value:
paths.add(current)
else:
for leaf in value:
paths.add(f"{current}/{leaf}")
paths.add(current) # 2단계도 허용 (leaf가 없는 경우용)
else:
paths.add(current)
return paths
VALID_DOMAIN_PATHS = _get_taxonomy_leaf_paths(settings.taxonomy)
def _validate_domain(domain: str) -> str:
"""domain이 taxonomy에 존재하는지 검증, 없으면 최대한 가까운 경로 찾기"""
if domain in VALID_DOMAIN_PATHS:
return domain
# 부분 매칭 시도 (2단계까지)
parts = domain.split("/")
for i in range(len(parts), 0, -1):
partial = "/".join(parts[:i])
if partial in VALID_DOMAIN_PATHS:
logger.warning(f"[분류] domain '{domain}''{partial}' (부분 매칭)")
return partial
logger.warning(f"[분류] domain '{domain}' taxonomy에 없음, General/Reading_Notes로 대체")
return "General/Reading_Notes"
async def process(document_id: int, session: AsyncSession) -> None:
"""문서 AI 분류 + 요약"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.extracted_text:
raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음")
client = AIClient()
try:
# ─── 분류 ───
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
raw_response = await client.classify(truncated)
parsed = parse_json_response(raw_response)
if not parsed:
raise ValueError(f"AI 응답에서 JSON 추출 실패: {raw_response[:200]}")
# domain 검증
domain = _validate_domain(parsed.get("domain", ""))
doc.ai_domain = domain
# sub_group은 domain 경로에서 추출 (호환성)
parts = domain.split("/")
doc.ai_sub_group = parts[1] if len(parts) > 1 else ""
# document_type 검증
doc_type = parsed.get("document_type", "")
doc.document_type = doc_type if doc_type in DOCUMENT_TYPES else "Note"
# confidence
confidence = parsed.get("confidence", 0.5)
doc.ai_confidence = max(0.0, min(1.0, float(confidence)))
# importance
importance = parsed.get("importance", "medium")
doc.importance = importance if importance in ("high", "medium", "low") else "medium"
# tags
doc.ai_tags = parsed.get("tags", [])[:5]
# source/origin
if parsed.get("sourceChannel") and not doc.source_channel:
doc.source_channel = parsed["sourceChannel"]
if parsed.get("dataOrigin") and not doc.data_origin:
doc.data_origin = parsed["dataOrigin"]
# ─── 요약 ───
summary = await client.summarize(doc.extracted_text[:15000])
doc.ai_summary = strip_thinking(summary)
# ─── 메타데이터 ───
doc.ai_model_version = "qwen3.5-35b-a3b"
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
f"[분류] document_id={document_id}: "
f"domain={domain}, type={doc.document_type}, "
f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}"
)
finally:
await client.close()
# _move_to_knowledge 제거됨 — 파일은 원본 위치 유지, 분류는 DB 메타데이터만