"""DEVONthink → NAS PKM 마이그레이션 스크립트 DEVONthink에서 "파일 및 폴더" 내보내기 한 디렉토리를 스캔하여 NAS PKM 폴더 구조로 복사하고 DB에 등록합니다. 사용법: # Dry-run (실제 복사/DB 등록 없이 시뮬레이션) python scripts/migrate_from_devonthink.py --source-dir /path/to/export --dry-run # 실제 실행 python scripts/migrate_from_devonthink.py \ --source-dir /path/to/export \ --target-dir /documents/PKM \ --database-url postgresql+asyncpg://pkm:PASSWORD@localhost:15432/pkm """ import argparse import asyncio import os import shutil import sys from pathlib import Path sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from core.utils import file_hash, setup_logger logger = setup_logger("migrate") # ─── DEVONthink DB → NAS PKM 폴더 매핑 ─── FOLDER_MAPPING = { "00_Inbox DB": "PKM/Inbox", "Inbox": "PKM/Inbox", "00_Note_BOX": "PKM/Knowledge", "01_Philosophie": "PKM/Knowledge/Philosophy", "02_Language": "PKM/Knowledge/Language", "03_Engineering": "PKM/Knowledge/Engineering", "04_Industrial safety": "PKM/Knowledge/Industrial_Safety", "05_Programming": "PKM/Knowledge/Programming", "07_General Book": "PKM/Knowledge/General", "97_Production drawing": "PKM/References", "99_Reference Data": "PKM/References", "99_Home File": "PKM/References", "Archive": "PKM/Archive", "Projects": "PKM/Knowledge", # 아래는 별도 처리 또는 스킵 "99_Technicalkorea": "Technicalkorea", "98_명일방주 엔드필드": None, # 스킵 } # 무시할 파일/디렉토리 패턴 SKIP_NAMES = {".DS_Store", "._*", "Thumbs.db", "Icon\r", "Icon"} SKIP_EXTENSIONS = {".dtMeta", ".dtBase2", ".sparseimage"} def should_skip(path: Path) -> bool: """스킵해야 할 파일인지 확인""" if path.name in SKIP_NAMES or path.name.startswith("._"): return True if path.suffix.lower() in SKIP_EXTENSIONS: return True return False def resolve_target(source_file: Path, source_root: Path) -> str | None: """소스 파일의 NAS 대상 경로 결정 (NAS 루트 기준 상대 경로)""" relative = source_file.relative_to(source_root) parts = relative.parts # 첫 번째 디렉토리가 DEVONthink DB 이름 if not parts: return None db_name = parts[0] target_prefix = FOLDER_MAPPING.get(db_name) if target_prefix is None: return None # 스킵 대상 # 나머지 경로 조합 sub_path = Path(*parts[1:]) if len(parts) > 1 else Path(source_file.name) return str(Path(target_prefix) / sub_path) async def migrate( source_dir: str, target_dir: str, database_url: str, dry_run: bool = False, batch_size: int = 100, ): """마이그레이션 실행""" source = Path(source_dir) target = Path(target_dir) if not source.exists(): logger.error(f"소스 디렉토리 없음: {source}") return engine = create_async_engine(database_url) async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) stats = {"total": 0, "copied": 0, "skipped": 0, "duplicates": 0, "errors": 0} batch = [] # 모든 파일 수집 files = [f for f in source.rglob("*") if f.is_file() and not should_skip(f)] logger.info(f"스캔 완료: {len(files)}개 파일 발견") for source_file in files: stats["total"] += 1 target_rel = resolve_target(source_file, source) if target_rel is None: stats["skipped"] += 1 continue target_path = target / target_rel ext = source_file.suffix.lstrip(".").lower() or "unknown" fhash = file_hash(source_file) fsize = source_file.stat().st_size if dry_run: logger.info(f"[DRY-RUN] {source_file.name} → {target_rel}") stats["copied"] += 1 continue # 파일 복사 target_path.parent.mkdir(parents=True, exist_ok=True) if not target_path.exists(): shutil.copy2(source_file, target_path) # DB 등록 배치에 추가 batch.append({ "file_path": target_rel, "file_hash": fhash, "file_format": ext, "file_size": fsize, "file_type": "immutable", "import_source": f"devonthink:{source_file.relative_to(source).parts[0]}", "title": source_file.stem, "source_channel": "manual", }) stats["copied"] += 1 # 배치 커밋 if len(batch) >= batch_size: dups = await _insert_batch(async_session, batch) stats["duplicates"] += dups batch.clear() # 남은 배치 처리 if batch and not dry_run: dups = await _insert_batch(async_session, batch) stats["duplicates"] += dups await engine.dispose() # 결과 출력 logger.info("=" * 50) logger.info(f"마이그레이션 {'시뮬레이션' if dry_run else '완료'}") logger.info(f" 전체 파일: {stats['total']}") logger.info(f" 복사/등록: {stats['copied']}") logger.info(f" 스킵: {stats['skipped']}") logger.info(f" 중복: {stats['duplicates']}") logger.info(f" 오류: {stats['errors']}") async def _insert_batch(async_session_factory, batch: list[dict]) -> int: """배치 단위로 documents + processing_queue 삽입, 중복 수 반환""" duplicates = 0 async with async_session_factory() as session: for item in batch: try: # documents 삽입 result = await session.execute( text(""" INSERT INTO documents (file_path, file_hash, file_format, file_size, file_type, import_source, title, source_channel) VALUES (:file_path, :file_hash, :file_format, :file_size, :file_type, :import_source, :title, :source_channel) ON CONFLICT (file_path) DO NOTHING RETURNING id """), item, ) row = result.fetchone() if row is None: duplicates += 1 continue doc_id = row[0] # processing_queue에 extract 등록 await session.execute( text(""" INSERT INTO processing_queue (document_id, stage, status) VALUES (:doc_id, 'extract', 'pending') ON CONFLICT DO NOTHING """), {"doc_id": doc_id}, ) except Exception as e: logger.error(f"등록 실패: {item['file_path']}: {e}") await session.commit() return duplicates if __name__ == "__main__": parser = argparse.ArgumentParser(description="DEVONthink → NAS PKM 마이그레이션") parser.add_argument("--source-dir", required=True, help="DEVONthink 내보내기 디렉토리") parser.add_argument("--target-dir", default="/documents/PKM", help="NAS PKM 루트 경로") parser.add_argument( "--database-url", default="postgresql+asyncpg://pkm:pkm@localhost:15432/pkm", help="PostgreSQL 연결 URL", ) parser.add_argument("--dry-run", action="store_true", help="시뮬레이션만 실행") parser.add_argument("--batch-size", type=int, default=100, help="배치 커밋 크기") args = parser.parse_args() asyncio.run(migrate( source_dir=args.source_dir, target_dir=args.target_dir, database_url=args.database_url, dry_run=args.dry_run, batch_size=args.batch_size, ))