diff --git a/scripts/migrate_from_devonthink.py b/scripts/migrate_from_devonthink.py index 567580a..917ef76 100644 --- a/scripts/migrate_from_devonthink.py +++ b/scripts/migrate_from_devonthink.py @@ -1,16 +1,16 @@ """DEVONthink → NAS PKM 마이그레이션 스크립트 -DEVONthink에서 "파일 및 폴더" 내보내기 한 디렉토리를 스캔하여 +.dtBase2 번들의 Files.noindex/ 디렉토리에서 직접 파일을 추출하여 NAS PKM 폴더 구조로 복사하고 DB에 등록합니다. 사용법: # Dry-run (실제 복사/DB 등록 없이 시뮬레이션) - python scripts/migrate_from_devonthink.py --source-dir /path/to/export --dry-run + python scripts/migrate_from_devonthink.py --source-dir ~/Documents/Databases --dry-run # 실제 실행 python scripts/migrate_from_devonthink.py \ - --source-dir /path/to/export \ - --target-dir /documents/PKM \ + --source-dir ~/Documents/Databases \ + --target-dir /mnt/nas/Document_Server \ --database-url postgresql+asyncpg://pkm:PASSWORD@localhost:15432/pkm """ @@ -46,44 +46,26 @@ FOLDER_MAPPING = { "99_Home File": "PKM/References", "Archive": "PKM/Archive", "Projects": "PKM/Knowledge", - # 아래는 별도 처리 또는 스킵 "99_Technicalkorea": "Technicalkorea", - "98_명일방주 엔드필드": None, # 스킵 + # 스킵 대상 + "98_명일방주 엔드필드": None, } -# 무시할 파일/디렉토리 패턴 -SKIP_NAMES = {".DS_Store", "._*", "Thumbs.db", "Icon\r", "Icon"} -SKIP_EXTENSIONS = {".dtMeta", ".dtBase2", ".sparseimage"} +# 무시할 파일 +SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r", "Icon"} +SKIP_EXTENSIONS = {".dtMeta", ".dtBase2", ".sparseimage", ".dtStore", ".dtCloud"} def should_skip(path: Path) -> bool: - """스킵해야 할 파일인지 확인""" if path.name in SKIP_NAMES or path.name.startswith("._"): return True if path.suffix.lower() in SKIP_EXTENSIONS: return True + if path.stat().st_size == 0: + return True return False -def resolve_target(source_file: Path, source_root: Path) -> str | None: - """소스 파일의 NAS 대상 경로 결정 (NAS 루트 기준 상대 경로)""" - relative = source_file.relative_to(source_root) - parts = relative.parts - - # 첫 번째 디렉토리가 DEVONthink DB 이름 - if not parts: - return None - db_name = parts[0] - - target_prefix = FOLDER_MAPPING.get(db_name) - if target_prefix is None: - return None # 스킵 대상 - - # 나머지 경로 조합 - sub_path = Path(*parts[1:]) if len(parts) > 1 else Path(source_file.name) - return str(Path(target_prefix) / sub_path) - - async def migrate( source_dir: str, target_dir: str, @@ -100,65 +82,90 @@ async def migrate( return engine = create_async_engine(database_url) - async_session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) + async_session_factory = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False) stats = {"total": 0, "copied": 0, "skipped": 0, "duplicates": 0, "errors": 0} batch = [] - # 모든 파일 수집 - files = [f for f in source.rglob("*") if f.is_file() and not should_skip(f)] - logger.info(f"스캔 완료: {len(files)}개 파일 발견") + # .dtBase2 번들 탐색 + for dtbase in sorted(source.glob("*.dtBase2")): + db_name = dtbase.stem # "04_Industrial safety" + target_prefix = FOLDER_MAPPING.get(db_name) - for source_file in files: - stats["total"] += 1 - target_rel = resolve_target(source_file, source) - - if target_rel is None: - stats["skipped"] += 1 + if target_prefix is None: + logger.info(f"[스킵] {db_name} (매핑: None)") continue - target_path = target / target_rel - ext = source_file.suffix.lstrip(".").lower() or "unknown" - fhash = file_hash(source_file) - fsize = source_file.stat().st_size - - if dry_run: - logger.info(f"[DRY-RUN] {source_file.name} → {target_rel}") - stats["copied"] += 1 + files_dir = dtbase / "Files.noindex" + if not files_dir.exists(): + logger.warning(f"[스킵] {db_name}: Files.noindex 없음") continue - # 파일 복사 - target_path.parent.mkdir(parents=True, exist_ok=True) - if not target_path.exists(): - shutil.copy2(source_file, target_path) + logger.info(f"[DB] {db_name} → {target_prefix}") - # DB 등록 배치에 추가 - batch.append({ - "file_path": target_rel, - "file_hash": fhash, - "file_format": ext, - "file_size": fsize, - "file_type": "immutable", - "import_source": f"devonthink:{source_file.relative_to(source).parts[0]}", - "title": source_file.stem, - "source_channel": "manual", - }) - stats["copied"] += 1 + # Files.noindex 하위의 모든 파일 (format/hash/filename.ext 구조) + files = [f for f in files_dir.rglob("*") if f.is_file() and not should_skip(f)] - # 배치 커밋 - if len(batch) >= batch_size: - dups = await _insert_batch(async_session, batch) - stats["duplicates"] += dups - batch.clear() + for source_file in files: + stats["total"] += 1 + + # 대상 경로: PKM/{domain}/{파일명} + dest_rel = f"{target_prefix}/{source_file.name}" + dest_path = target / dest_rel + + if dry_run: + logger.info(f"[DRY-RUN] {source_file.name} → {dest_rel}") + stats["copied"] += 1 + continue + + try: + # 파일 복사 + dest_path.parent.mkdir(parents=True, exist_ok=True) + + # 중복 파일명 처리 + counter = 1 + stem, suffix = dest_path.stem, dest_path.suffix + while dest_path.exists(): + dest_path = dest_path.parent / f"{stem}_{counter}{suffix}" + dest_rel = str(dest_path.relative_to(target)) + counter += 1 + + shutil.copy2(source_file, dest_path) + + ext = source_file.suffix.lstrip(".").lower() or "unknown" + fhash = file_hash(dest_path) + fsize = dest_path.stat().st_size + + batch.append({ + "file_path": dest_rel, + "file_hash": fhash, + "file_format": ext, + "file_size": fsize, + "file_type": "immutable", + "import_source": f"devonthink:{db_name}", + "title": source_file.stem, + "source_channel": "manual", + }) + stats["copied"] += 1 + + except Exception as e: + logger.error(f"[오류] {source_file}: {e}") + stats["errors"] += 1 + + # 배치 커밋 + if len(batch) >= batch_size: + dups = await _insert_batch(async_session_factory, batch) + stats["duplicates"] += dups + batch.clear() + logger.info(f" 진행: {stats['copied']}건 처리됨") # 남은 배치 처리 if batch and not dry_run: - dups = await _insert_batch(async_session, batch) + dups = await _insert_batch(async_session_factory, batch) stats["duplicates"] += dups await engine.dispose() - # 결과 출력 logger.info("=" * 50) logger.info(f"마이그레이션 {'시뮬레이션' if dry_run else '완료'}") logger.info(f" 전체 파일: {stats['total']}") @@ -169,12 +176,11 @@ async def migrate( async def _insert_batch(async_session_factory, batch: list[dict]) -> int: - """배치 단위로 documents + processing_queue 삽입, 중복 수 반환""" + """배치 단위로 documents + processing_queue 삽입""" duplicates = 0 async with async_session_factory() as session: for item in batch: try: - # documents 삽입 result = await session.execute( text(""" INSERT INTO documents (file_path, file_hash, file_format, file_size, @@ -192,8 +198,6 @@ async def _insert_batch(async_session_factory, batch: list[dict]) -> int: continue doc_id = row[0] - - # processing_queue에 extract 등록 await session.execute( text(""" INSERT INTO processing_queue (document_id, stage, status) @@ -211,8 +215,8 @@ async def _insert_batch(async_session_factory, batch: list[dict]) -> int: if __name__ == "__main__": parser = argparse.ArgumentParser(description="DEVONthink → NAS PKM 마이그레이션") - parser.add_argument("--source-dir", required=True, help="DEVONthink 내보내기 디렉토리") - parser.add_argument("--target-dir", default="/documents/PKM", help="NAS PKM 루트 경로") + parser.add_argument("--source-dir", required=True, help="DEVONthink Databases 디렉토리") + parser.add_argument("--target-dir", default="/mnt/nas/Document_Server", help="NAS 루트 경로") parser.add_argument( "--database-url", default="postgresql+asyncpg://pkm:pkm@localhost:15432/pkm",