Files
hyungi_document_server/app/workers/classify_worker.py
T
Hyungi Ahn 8fdea88676 feat(documents): §1 category enum + ai_suggestion 승인 파이프
plan: ~/.claude/plans/luminous-sprouting-hamster.md §1

- migrations/143_category.sql: doc_category enum (6 활성 + 3 유보) +
  documents.category + documents.ai_suggestion JSONB + 2 idx.
- app/models/document.py: category (Enum, create_type=False), ai_suggestion (JSONB).
- app/prompts/classify.txt: document_type enum 에 7 실무 doctype 추가
  (발주서/세금계산서/명세표/도면/증명서/계획서/시방서) + facet_doctype
  필드 directive.
- config.yaml: document_types 에 7 항목 추가 (worker 검증 통과).
- app/workers/classify_worker.py: FACET_DOCTYPES / LIBRARY_SUGGESTION_DOCTYPES
  상수, facet_doctype 파싱(기존값 미덮어씀), 발주서/세금계산서/명세표
  감지 시 ai_suggestion={proposed_category=library, proposed_path=@library/
  거래/{YYYY}/{doctype}, source_updated_at=doc.updated_at.isoformat(), ...}.
  category / user_tags 자동 전이 금지 (suggestion-only).
- app/api/documents.py:
  · DocumentResponse 에 category / ai_suggestion 노출
  · GET /documents ?category=<cat> / ?has_suggestion / ?proposed_category
    (category 지정 시 기본 news/memo 제외 해제 — §2 승인 UI 계약)
  · GET /documents/library 를 Document.category=='library' 기반으로 재구현
    (path subquery 는 user_tags 유지 — 분류 내부 서가 경로)
  · POST /documents/{id}/accept-suggestion — FOR UPDATE + idempotent no-op +
    dual 409 stale (payload source_updated_at / documents.updated_at) +
    user_tags idempotent append
  · DELETE /documents/{id}/suggestion — idempotent, stale 검사 없음
- scripts/backfill_category.py: dry-run / apply. 매핑(news/memo/@library/else)
  + 3-way 상대 검증 (all_rows==categorized, uncategorized==0,
  cat_library==has_library_tag — 자동 전이 금지 정책 검증).

남은 DoD (원격 배포 후): docker compose up → migration 143 적용 → backfill
apply → smoke (drive_sync 발주서 업로드 suggestion 생성 / category 유지,
accept-suggestion idempotency + 409 stale 두 벡터, /documents?category=library
== /documents/library 건수 일치).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 15:32:01 +09:00

164 lines
6.4 KiB
Python

"""AI 분류 워커 — taxonomy 기반 도메인/문서타입/태그/요약 생성"""
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response, strip_thinking
from core.config import settings
from core.utils import setup_logger
from models.document import Document
logger = setup_logger("classify_worker")
MAX_CLASSIFY_TEXT = 8000
# settings에서 taxonomy/document_types 로딩
DOCUMENT_TYPES = set(settings.document_types)
# facet_doctype 허용값 (실무 문서 유형 — AI 식별 신호, library 자동 분류 제안 트리거)
FACET_DOCTYPES = {"발주서", "세금계산서", "명세표", "도면", "증명서", "계획서", "시방서"}
# 자료실 자동 분류 제안 대상 (거래 하위)
LIBRARY_SUGGESTION_DOCTYPES = {"발주서", "세금계산서", "명세표"}
def _get_taxonomy_leaf_paths(taxonomy: dict, prefix: str = "") -> set[str]:
"""taxonomy dict에서 모든 유효한 경로를 추출"""
paths = set()
for key, value in taxonomy.items():
current = f"{prefix}/{key}" if prefix else key
if isinstance(value, dict):
if not value:
paths.add(current)
else:
paths.update(_get_taxonomy_leaf_paths(value, current))
elif isinstance(value, list):
if not value:
paths.add(current)
else:
for leaf in value:
paths.add(f"{current}/{leaf}")
paths.add(current) # 2단계도 허용 (leaf가 없는 경우용)
else:
paths.add(current)
return paths
VALID_DOMAIN_PATHS = _get_taxonomy_leaf_paths(settings.taxonomy)
def _validate_domain(domain: str) -> str:
"""domain이 taxonomy에 존재하는지 검증, 없으면 최대한 가까운 경로 찾기"""
if domain in VALID_DOMAIN_PATHS:
return domain
# 부분 매칭 시도 (2단계까지)
parts = domain.split("/")
for i in range(len(parts), 0, -1):
partial = "/".join(parts[:i])
if partial in VALID_DOMAIN_PATHS:
logger.warning(f"[분류] domain '{domain}''{partial}' (부분 매칭)")
return partial
logger.warning(f"[분류] domain '{domain}' taxonomy에 없음, General/Reading_Notes로 대체")
return "General/Reading_Notes"
async def process(document_id: int, session: AsyncSession) -> None:
"""문서 AI 분류 + 요약"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.extracted_text:
raise ValueError(f"문서 ID {document_id}: extracted_text가 비어있음")
client = AIClient()
try:
# ─── 분류 ───
truncated = doc.extracted_text[:MAX_CLASSIFY_TEXT]
raw_response = await client.classify(truncated)
parsed = parse_json_response(raw_response)
if not parsed:
raise ValueError(f"AI 응답에서 JSON 추출 실패: {raw_response[:200]}")
# domain 검증
domain = _validate_domain(parsed.get("domain", ""))
doc.ai_domain = domain
# sub_group은 domain 경로에서 추출 (호환성)
parts = domain.split("/")
doc.ai_sub_group = parts[1] if len(parts) > 1 else ""
# document_type 검증
doc_type = parsed.get("document_type", "")
doc.document_type = doc_type if doc_type in DOCUMENT_TYPES else "Note"
# confidence
confidence = parsed.get("confidence", 0.5)
doc.ai_confidence = max(0.0, min(1.0, float(confidence)))
# importance
importance = parsed.get("importance", "medium")
doc.importance = importance if importance in ("high", "medium", "low") else "medium"
# tags
doc.ai_tags = parsed.get("tags", [])[:5]
# source/origin
if parsed.get("sourceChannel") and not doc.source_channel:
doc.source_channel = parsed["sourceChannel"]
if parsed.get("dataOrigin") and not doc.data_origin:
doc.data_origin = parsed["dataOrigin"]
# 용도 (AI는 빈 값만 채움 — 수동/업로드 명시값 우선)
if parsed.get("docPurpose") and not doc.doc_purpose:
purpose = parsed["docPurpose"]
if purpose in ("business", "knowledge"):
doc.doc_purpose = purpose
# ─── facet_doctype 식별 (§1 실무 문서 유형 신호) ───
# AI 식별값이 허용 enum 이면 facet_doctype 저장. 기존 값이 있으면 덮어쓰지 않음
# (수동 수정 / Phase 2 facet 우선). document.category / user_tags 는 **건드리지 않음**.
ai_doctype_raw = parsed.get("facet_doctype")
ai_doctype = ai_doctype_raw if ai_doctype_raw in FACET_DOCTYPES else None
if ai_doctype and not doc.facet_doctype:
doc.facet_doctype = ai_doctype
# ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ───
# 발주서/세금계산서/명세표 → 자료실 '거래' 분류 제안. 자동 전이 금지.
# /accept-suggestion 승인 UI 에서만 실제 category='library' + @library/... 부여.
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES:
year = doc.facet_year or datetime.now(timezone.utc).year
doc.ai_suggestion = {
"proposed_category": "library",
"proposed_path": f"@library/거래/{year}/{ai_doctype}",
"proposed_doctype": ai_doctype,
"confidence": doc.ai_confidence,
"source_updated_at": (
doc.updated_at.isoformat() if doc.updated_at else None
),
"reason": "classify pipeline",
}
# ─── 요약 ───
summary = await client.summarize(doc.extracted_text[:50000])
doc.ai_summary = strip_thinking(summary)
# ─── 메타데이터 ───
doc.ai_model_version = "qwen3.5-35b-a3b"
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
f"[분류] document_id={document_id}: "
f"domain={domain}, type={doc.document_type}, "
f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}"
)
finally:
await client.close()
# _move_to_knowledge 제거됨 — 파일은 원본 위치 유지, 분류는 DB 메타데이터만