refactor: preview 병렬 트리거 + 파일 이동 제거 + domain 색상 바

- queue_consumer: extract 완료 시 classify + preview 동시 등록
- classify_worker: _move_to_knowledge() 제거, 파일 원본 위치 유지
- DocumentCard: 좌측 domain별 색상 바 (4px) 추가

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Hyungi Ahn
2026-04-03 12:31:57 +09:00
parent 47e9981660
commit 6893ea132d
3 changed files with 38 additions and 51 deletions

View File

@@ -1,6 +1,5 @@
"""AI 분류 워커 — Qwen3.5로 도메인/태그/요약 생성 + Inbox→Knowledge 이동"""
import shutil
from datetime import datetime, timezone
from pathlib import Path
@@ -70,9 +69,7 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc.ai_model_version = "qwen3.5-35b-a3b"
doc.ai_processed_at = datetime.now(timezone.utc)
# ─── Inbox → Knowledge 폴더 이동 ───
if doc.file_path.startswith("PKM/Inbox/") and domain:
_move_to_knowledge(doc, domain)
# 파일은 원본 위치 유지 (물리 이동 없음, DB 메타데이터만 관리)
logger.info(
f"[분류] document_id={document_id}: "
@@ -83,33 +80,4 @@ async def process(document_id: int, session: AsyncSession) -> None:
await client.close()
def _move_to_knowledge(doc: Document, domain: str):
"""분류 완료 후 Inbox에서 Knowledge 폴더로 파일 이동"""
nas_root = Path(settings.nas_mount_path)
src = nas_root / doc.file_path
if not src.exists():
logger.warning(f"[이동] 원본 파일 없음: {src}")
return
# 대상 경로: PKM/{domain}/{파일명}
sub_group = doc.ai_sub_group
if sub_group:
new_rel = f"PKM/{domain}/{sub_group}/{src.name}"
else:
new_rel = f"PKM/{domain}/{src.name}"
dst = nas_root / new_rel
dst.parent.mkdir(parents=True, exist_ok=True)
# 중복 파일명 처리
counter = 1
stem, suffix = dst.stem, dst.suffix
while dst.exists():
dst = dst.parent / f"{stem}_{counter}{suffix}"
new_rel = str(dst.relative_to(nas_root))
counter += 1
shutil.move(str(src), str(dst))
doc.file_path = new_rel
logger.info(f"[이동] {doc.file_path}{new_rel}")
# _move_to_knowledge 제거됨 — 파일은 원본 위치 유지, 분류는 DB 메타데이터만

View File

@@ -34,27 +34,28 @@ async def reset_stale_items():
async def enqueue_next_stage(document_id: int, current_stage: str):
"""현재 stage 완료 후 다음 stage를 pending으로 등록"""
next_stages = {"extract": "classify", "classify": "embed", "embed": "preview"}
next_stage = next_stages.get(current_stage)
if not next_stage:
next_stages = {"extract": ["classify", "preview"], "classify": ["embed"]}
stages = next_stages.get(current_stage, [])
if not stages:
return
async with async_session() as session:
existing = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == next_stage,
ProcessingQueue.status.in_(["pending", "processing"]),
for next_stage in stages:
existing = await session.execute(
select(ProcessingQueue).where(
ProcessingQueue.document_id == document_id,
ProcessingQueue.stage == next_stage,
ProcessingQueue.status.in_(["pending", "processing"]),
)
)
)
if existing.scalar_one_or_none():
return
if existing.scalar_one_or_none():
continue
session.add(ProcessingQueue(
document_id=document_id,
stage=next_stage,
status="pending",
))
session.add(ProcessingQueue(
document_id=document_id,
stage=next_stage,
status="pending",
))
await session.commit()