From 7a8aced2a9b2fd383791ace474de6ceb63781259 Mon Sep 17 00:00:00 2001 From: hyungi Date: Mon, 29 Jun 2026 13:10:19 +0900 Subject: [PATCH] =?UTF-8?q?fix(workers):=20file=5Fwatcher=20=ED=8C=8C?= =?UTF-8?q?=EC=9D=BC=EB=B3=84=20=EC=84=B8=EC=85=98=20=EA=B2=A9=EB=A6=AC=20?= =?UTF-8?q?(=EC=82=AC=EC=9D=B4=ED=81=B4=20=EC=A0=84=EC=B2=B4=20=EB=A1=A4?= =?UTF-8?q?=EB=B0=B1=20=EB=B0=A9=EC=A7=80)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 스캔 전체(Web+PKM)가 단일 세션·단일 commit 이라 한 파일 예외(rglob↔stat 사이 삭제로 FileNotFoundError, flush 오류 등)가 watch_inbox 전체를 raise·롤백 → 그 사이클 등록분을 모두 잃거나, 결정적 poison 파일이 매 사이클 같은 지점에서 중단시켜 그 뒤 파일 영구 미등록. 파일별 독립 세션+commit + try/continue 격리 (news_collector/csb_collector 동형). file_hash 는 세션 밖에서 계산(커넥션 미점유), 무변경 파일은 쓰기/commit 없음. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/workers/file_watcher.py | 178 +++++++++++++++++++----------------- 1 file changed, 96 insertions(+), 82 deletions(-) diff --git a/app/workers/file_watcher.py b/app/workers/file_watcher.py index a96e9df..3e9a4dc 100644 --- a/app/workers/file_watcher.py +++ b/app/workers/file_watcher.py @@ -251,104 +251,118 @@ async def watch_inbox(): for extra_path in settings.additional_watch_targets: targets.append((extra_path, "library")) - async with async_session() as session: - # ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ─── - if web_root.exists(): - # rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5). - for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))): - if not file_path.is_file() or should_skip(file_path): - continue - rel_path = str(file_path.relative_to(nas_root)) - added, _ = await _ingest_web_file(session, file_path, rel_path) + # 파일별 독립 세션+commit 으로 격리 — 한 파일 실패(예: rglob↔stat 사이 삭제로 FileNotFoundError, + # flush 오류)가 watch_inbox 전체를 raise·롤백해 그 사이클 등록분을 모두 잃거나, 결정적 poison + # 파일이 매 사이클 같은 지점에서 중단시키는 것을 차단 (news_collector/csb_collector 와 동형). + # ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ─── + if web_root.exists(): + # rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5). + for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))): + if not file_path.is_file() or should_skip(file_path): + continue + rel_path = str(file_path.relative_to(nas_root)) + try: + async with async_session() as session: + added, _ = await _ingest_web_file(session, file_path, rel_path) + await session.commit() new_count += added - - # ─── PKM 트랙 (기존 drive_sync) ───────────────────────────────────────── - for sub, expected_category in targets: - scan_root = pkm_root / sub - if not scan_root.exists(): + except Exception as e: + logger.warning("[Web] 파일 처리 실패 skip path=%s: %s", rel_path, e) continue - # 안전 자료실 A-2/B-4 — 타깃 폴더 기반 (material, jurisdiction, license) - target_mt, target_jur, target_license = _TARGET_AXIS.get( - Path(sub).name, (None, None, None) + # ─── PKM 트랙 (기존 drive_sync) ───────────────────────────────────────── + for sub, expected_category in targets: + scan_root = pkm_root / sub + if not scan_root.exists(): + continue + + # 안전 자료실 A-2/B-4 — 타깃 폴더 기반 (material, jurisdiction, license) + target_mt, target_jur, target_license = _TARGET_AXIS.get( + Path(sub).name, (None, None, None) + ) + + # NFS 디렉토리 walk(blocking) off-thread 수집 (R5). + for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))): + if not file_path.is_file() or should_skip(file_path): + continue + + category, needs_conversion, next_stage = _route_media( + file_path, expected_category ) - # NFS 디렉토리 walk(blocking) off-thread 수집 (R5). - for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))): - if not file_path.is_file() or should_skip(file_path): - continue + # audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에 + # audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip + if category is None and next_stage is None: + continue - category, needs_conversion, next_stage = _route_media( - file_path, expected_category - ) - - # audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에 - # audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip - if category is None and next_stage is None: - continue - - rel_path = str(file_path.relative_to(nas_root)) + rel_path = str(file_path.relative_to(nas_root)) + try: # GB 파일 SHA-256 은 이벤트 루프를 점유 → 같은 루프의 모든 1분 주기 consumer # + FastAPI 요청이 수십초~분 동시 정지. to_thread 오프로드. 스캔 루프가 이미 # 순차라 file_hash 는 한 번에 하나만 실행(직렬화) — 병렬 해싱 X = NFS 2.5GbE - # 대역폭·버퍼 메모리 blowup 방지 (R5). + # 대역폭·버퍼 메모리 blowup 방지 (R5). 세션 밖에서 계산(커넥션 미점유). fhash = await asyncio.to_thread(file_hash, file_path) - result = await session.execute( - select(Document).where(Document.file_path == rel_path) - ) - existing = result.scalar_one_or_none() - - if existing is None: - ext = file_path.suffix.lstrip(".").lower() or "unknown" - doc = Document( - file_path=rel_path, - file_hash=fhash, - file_format=ext, - file_size=file_path.stat().st_size, - file_type="immutable", - title=file_path.stem, - source_channel="drive_sync", - category=category, - needs_conversion=needs_conversion, - # 안전 자료실 A-2/B-4 — watch 타깃 매핑 (KGS=law/KR 등, 비대상=NULL) - material_type=target_mt, - jurisdiction=target_jur, + async with async_session() as session: + result = await session.execute( + select(Document).where(Document.file_path == rel_path) ) - # B-4 — 타깃 폴더 license 주입(restricted 포함, 비대상=미주입). classify 는 - # material_type IS NULL 일 때만 제안 + extract_meta 미기록이라 주입 보존. - if target_license: - doc.extract_meta = {"license": dict(target_license)} - session.add(doc) - await session.flush() + existing = result.scalar_one_or_none() - if next_stage: - await enqueue_stage(session, doc.id, next_stage) - new_count += 1 + if existing is None: + ext = file_path.suffix.lstrip(".").lower() or "unknown" + doc = Document( + file_path=rel_path, + file_hash=fhash, + file_format=ext, + file_size=file_path.stat().st_size, + file_type="immutable", + title=file_path.stem, + source_channel="drive_sync", + category=category, + needs_conversion=needs_conversion, + # 안전 자료실 A-2/B-4 — watch 타깃 매핑 (KGS=law/KR 등, 비대상=NULL) + material_type=target_mt, + jurisdiction=target_jur, + ) + # B-4 — 타깃 폴더 license 주입(restricted 포함, 비대상=미주입). classify 는 + # material_type IS NULL 일 때만 제안 + extract_meta 미기록이라 주입 보존. + if target_license: + doc.extract_meta = {"license": dict(target_license)} + session.add(doc) + await session.flush() - elif existing.file_hash != fhash: - existing.file_hash = fhash - existing.file_size = file_path.stat().st_size - # 기존 문서에 category/quarantine flag 가 비어있으면 보정 - if existing.category is None and category is not None: - existing.category = category - if needs_conversion and not getattr(existing, "needs_conversion", False): - existing.needs_conversion = True - # B-4 — 축/license 보정(B-4 이전 적재분이 재변경 시): material 미설정 시 주입, - # license 부재 시에만 merge 주입(clobber 회피 — 기존 extract_meta 키 보존). - if existing.material_type is None and target_mt is not None: - existing.material_type = target_mt - existing.jurisdiction = target_jur - if target_license and not (existing.extract_meta or {}).get("license"): - meta = dict(existing.extract_meta or {}) - meta["license"] = dict(target_license) - existing.extract_meta = meta + if next_stage: + await enqueue_stage(session, doc.id, next_stage) + await session.commit() + new_count += 1 - if next_stage: - await enqueue_stage(session, existing.id, next_stage) - changed_count += 1 + elif existing.file_hash != fhash: + existing.file_hash = fhash + existing.file_size = file_path.stat().st_size + # 기존 문서에 category/quarantine flag 가 비어있으면 보정 + if existing.category is None and category is not None: + existing.category = category + if needs_conversion and not getattr(existing, "needs_conversion", False): + existing.needs_conversion = True + # B-4 — 축/license 보정(B-4 이전 적재분이 재변경 시): material 미설정 시 주입, + # license 부재 시에만 merge 주입(clobber 회피 — 기존 extract_meta 키 보존). + if existing.material_type is None and target_mt is not None: + existing.material_type = target_mt + existing.jurisdiction = target_jur + if target_license and not (existing.extract_meta or {}).get("license"): + meta = dict(existing.extract_meta or {}) + meta["license"] = dict(target_license) + existing.extract_meta = meta - await session.commit() + if next_stage: + await enqueue_stage(session, existing.id, next_stage) + await session.commit() + changed_count += 1 + # else: 무변경 → 쓰기 없음 (세션 자동 닫힘, commit 불요) + except Exception as e: + logger.warning("[PKM] 파일 처리 실패 skip path=%s: %s", rel_path, e) + continue if new_count or changed_count: logger.info(f"[Inbox+§3] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")