fix(workers): file_watcher 파일별 세션 격리 (사이클 전체 롤백 방지)
스캔 전체(Web+PKM)가 단일 세션·단일 commit 이라 한 파일 예외(rglob↔stat 사이 삭제로 FileNotFoundError, flush 오류 등)가 watch_inbox 전체를 raise·롤백 → 그 사이클 등록분을 모두 잃거나, 결정적 poison 파일이 매 사이클 같은 지점에서 중단시켜 그 뒤 파일 영구 미등록. 파일별 독립 세션+commit + try/continue 격리 (news_collector/csb_collector 동형). file_hash 는 세션 밖에서 계산(커넥션 미점유), 무변경 파일은 쓰기/commit 없음. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+96
-82
@@ -251,104 +251,118 @@ async def watch_inbox():
|
||||
for extra_path in settings.additional_watch_targets:
|
||||
targets.append((extra_path, "library"))
|
||||
|
||||
async with async_session() as session:
|
||||
# ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ───
|
||||
if web_root.exists():
|
||||
# rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5).
|
||||
for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))):
|
||||
if not file_path.is_file() or should_skip(file_path):
|
||||
continue
|
||||
rel_path = str(file_path.relative_to(nas_root))
|
||||
added, _ = await _ingest_web_file(session, file_path, rel_path)
|
||||
# 파일별 독립 세션+commit 으로 격리 — 한 파일 실패(예: rglob↔stat 사이 삭제로 FileNotFoundError,
|
||||
# flush 오류)가 watch_inbox 전체를 raise·롤백해 그 사이클 등록분을 모두 잃거나, 결정적 poison
|
||||
# 파일이 매 사이클 같은 지점에서 중단시키는 것을 차단 (news_collector/csb_collector 와 동형).
|
||||
# ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ───
|
||||
if web_root.exists():
|
||||
# rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5).
|
||||
for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))):
|
||||
if not file_path.is_file() or should_skip(file_path):
|
||||
continue
|
||||
rel_path = str(file_path.relative_to(nas_root))
|
||||
try:
|
||||
async with async_session() as session:
|
||||
added, _ = await _ingest_web_file(session, file_path, rel_path)
|
||||
await session.commit()
|
||||
new_count += added
|
||||
|
||||
# ─── PKM 트랙 (기존 drive_sync) ─────────────────────────────────────────
|
||||
for sub, expected_category in targets:
|
||||
scan_root = pkm_root / sub
|
||||
if not scan_root.exists():
|
||||
except Exception as e:
|
||||
logger.warning("[Web] 파일 처리 실패 skip path=%s: %s", rel_path, e)
|
||||
continue
|
||||
|
||||
# 안전 자료실 A-2/B-4 — 타깃 폴더 기반 (material, jurisdiction, license)
|
||||
target_mt, target_jur, target_license = _TARGET_AXIS.get(
|
||||
Path(sub).name, (None, None, None)
|
||||
# ─── PKM 트랙 (기존 drive_sync) ─────────────────────────────────────────
|
||||
for sub, expected_category in targets:
|
||||
scan_root = pkm_root / sub
|
||||
if not scan_root.exists():
|
||||
continue
|
||||
|
||||
# 안전 자료실 A-2/B-4 — 타깃 폴더 기반 (material, jurisdiction, license)
|
||||
target_mt, target_jur, target_license = _TARGET_AXIS.get(
|
||||
Path(sub).name, (None, None, None)
|
||||
)
|
||||
|
||||
# NFS 디렉토리 walk(blocking) off-thread 수집 (R5).
|
||||
for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))):
|
||||
if not file_path.is_file() or should_skip(file_path):
|
||||
continue
|
||||
|
||||
category, needs_conversion, next_stage = _route_media(
|
||||
file_path, expected_category
|
||||
)
|
||||
|
||||
# NFS 디렉토리 walk(blocking) off-thread 수집 (R5).
|
||||
for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))):
|
||||
if not file_path.is_file() or should_skip(file_path):
|
||||
continue
|
||||
# audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에
|
||||
# audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip
|
||||
if category is None and next_stage is None:
|
||||
continue
|
||||
|
||||
category, needs_conversion, next_stage = _route_media(
|
||||
file_path, expected_category
|
||||
)
|
||||
|
||||
# audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에
|
||||
# audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip
|
||||
if category is None and next_stage is None:
|
||||
continue
|
||||
|
||||
rel_path = str(file_path.relative_to(nas_root))
|
||||
rel_path = str(file_path.relative_to(nas_root))
|
||||
try:
|
||||
# GB 파일 SHA-256 은 이벤트 루프를 점유 → 같은 루프의 모든 1분 주기 consumer
|
||||
# + FastAPI 요청이 수십초~분 동시 정지. to_thread 오프로드. 스캔 루프가 이미
|
||||
# 순차라 file_hash 는 한 번에 하나만 실행(직렬화) — 병렬 해싱 X = NFS 2.5GbE
|
||||
# 대역폭·버퍼 메모리 blowup 방지 (R5).
|
||||
# 대역폭·버퍼 메모리 blowup 방지 (R5). 세션 밖에서 계산(커넥션 미점유).
|
||||
fhash = await asyncio.to_thread(file_hash, file_path)
|
||||
|
||||
result = await session.execute(
|
||||
select(Document).where(Document.file_path == rel_path)
|
||||
)
|
||||
existing = result.scalar_one_or_none()
|
||||
|
||||
if existing is None:
|
||||
ext = file_path.suffix.lstrip(".").lower() or "unknown"
|
||||
doc = Document(
|
||||
file_path=rel_path,
|
||||
file_hash=fhash,
|
||||
file_format=ext,
|
||||
file_size=file_path.stat().st_size,
|
||||
file_type="immutable",
|
||||
title=file_path.stem,
|
||||
source_channel="drive_sync",
|
||||
category=category,
|
||||
needs_conversion=needs_conversion,
|
||||
# 안전 자료실 A-2/B-4 — watch 타깃 매핑 (KGS=law/KR 등, 비대상=NULL)
|
||||
material_type=target_mt,
|
||||
jurisdiction=target_jur,
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
select(Document).where(Document.file_path == rel_path)
|
||||
)
|
||||
# B-4 — 타깃 폴더 license 주입(restricted 포함, 비대상=미주입). classify 는
|
||||
# material_type IS NULL 일 때만 제안 + extract_meta 미기록이라 주입 보존.
|
||||
if target_license:
|
||||
doc.extract_meta = {"license": dict(target_license)}
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
existing = result.scalar_one_or_none()
|
||||
|
||||
if next_stage:
|
||||
await enqueue_stage(session, doc.id, next_stage)
|
||||
new_count += 1
|
||||
if existing is None:
|
||||
ext = file_path.suffix.lstrip(".").lower() or "unknown"
|
||||
doc = Document(
|
||||
file_path=rel_path,
|
||||
file_hash=fhash,
|
||||
file_format=ext,
|
||||
file_size=file_path.stat().st_size,
|
||||
file_type="immutable",
|
||||
title=file_path.stem,
|
||||
source_channel="drive_sync",
|
||||
category=category,
|
||||
needs_conversion=needs_conversion,
|
||||
# 안전 자료실 A-2/B-4 — watch 타깃 매핑 (KGS=law/KR 등, 비대상=NULL)
|
||||
material_type=target_mt,
|
||||
jurisdiction=target_jur,
|
||||
)
|
||||
# B-4 — 타깃 폴더 license 주입(restricted 포함, 비대상=미주입). classify 는
|
||||
# material_type IS NULL 일 때만 제안 + extract_meta 미기록이라 주입 보존.
|
||||
if target_license:
|
||||
doc.extract_meta = {"license": dict(target_license)}
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
elif existing.file_hash != fhash:
|
||||
existing.file_hash = fhash
|
||||
existing.file_size = file_path.stat().st_size
|
||||
# 기존 문서에 category/quarantine flag 가 비어있으면 보정
|
||||
if existing.category is None and category is not None:
|
||||
existing.category = category
|
||||
if needs_conversion and not getattr(existing, "needs_conversion", False):
|
||||
existing.needs_conversion = True
|
||||
# B-4 — 축/license 보정(B-4 이전 적재분이 재변경 시): material 미설정 시 주입,
|
||||
# license 부재 시에만 merge 주입(clobber 회피 — 기존 extract_meta 키 보존).
|
||||
if existing.material_type is None and target_mt is not None:
|
||||
existing.material_type = target_mt
|
||||
existing.jurisdiction = target_jur
|
||||
if target_license and not (existing.extract_meta or {}).get("license"):
|
||||
meta = dict(existing.extract_meta or {})
|
||||
meta["license"] = dict(target_license)
|
||||
existing.extract_meta = meta
|
||||
if next_stage:
|
||||
await enqueue_stage(session, doc.id, next_stage)
|
||||
await session.commit()
|
||||
new_count += 1
|
||||
|
||||
if next_stage:
|
||||
await enqueue_stage(session, existing.id, next_stage)
|
||||
changed_count += 1
|
||||
elif existing.file_hash != fhash:
|
||||
existing.file_hash = fhash
|
||||
existing.file_size = file_path.stat().st_size
|
||||
# 기존 문서에 category/quarantine flag 가 비어있으면 보정
|
||||
if existing.category is None and category is not None:
|
||||
existing.category = category
|
||||
if needs_conversion and not getattr(existing, "needs_conversion", False):
|
||||
existing.needs_conversion = True
|
||||
# B-4 — 축/license 보정(B-4 이전 적재분이 재변경 시): material 미설정 시 주입,
|
||||
# license 부재 시에만 merge 주입(clobber 회피 — 기존 extract_meta 키 보존).
|
||||
if existing.material_type is None and target_mt is not None:
|
||||
existing.material_type = target_mt
|
||||
existing.jurisdiction = target_jur
|
||||
if target_license and not (existing.extract_meta or {}).get("license"):
|
||||
meta = dict(existing.extract_meta or {})
|
||||
meta["license"] = dict(target_license)
|
||||
existing.extract_meta = meta
|
||||
|
||||
await session.commit()
|
||||
if next_stage:
|
||||
await enqueue_stage(session, existing.id, next_stage)
|
||||
await session.commit()
|
||||
changed_count += 1
|
||||
# else: 무변경 → 쓰기 없음 (세션 자동 닫힘, commit 불요)
|
||||
except Exception as e:
|
||||
logger.warning("[PKM] 파일 처리 실패 skip path=%s: %s", rel_path, e)
|
||||
continue
|
||||
|
||||
if new_count or changed_count:
|
||||
logger.info(f"[Inbox+§3] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")
|
||||
|
||||
Reference in New Issue
Block a user