a6db6c999b
적대 리뷰(10에이전트) 확정 반영: - license_filter.py 신설 — restricted_exclude_sql(raw)/restricted_exclude_orm(ORM) 단일 정의. retrieval _license_sql·digest·briefing·study 풀이가 공유(드리프트 방지). - major: explanation_rag(study 문제 AI 풀이 RAG)에 술어 누락 → doc_meta 쿼리에 ORM 적용(valid_doc_ids 경유로 청크도 차단). briefing/loader 2쿼리에 누락 → digest 와 동일 술어 추가(news restricted 부재=방어적·경로 일관성). - blocker(low-impact): file_watcher changed-doc 경로 material/license 보정(merge 주입· license 부재 시만 — extract_meta clobber 회피, pre-B-4 적재분 동기화). - 테스트: 단일-source 검증 + ORM 구성 스모크 2건 추가. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
344 lines
16 KiB
Python
344 lines
16 KiB
Python
"""파일 감시 워커 — PKM(Inbox/Recordings/Videos) + Web(devonagent) 스캔, 자동 등록.
|
|
|
|
§3 확장:
|
|
- 스캔 대상: PKM/Inbox (문서) + PKM/Recordings (오디오) + PKM/Videos (비디오)
|
|
- 확장자 → category 매핑 (audio/video)
|
|
- video 채널 정책: 웹 업로드는 upload 엔드포인트에서 mov/mkv/avi 거부.
|
|
NAS 드롭은 여기서 quarantine import (category='video', needs_conversion=true, stage 없음).
|
|
- Roon 음원 경로(prefix match) skip — settings.roon_library_path
|
|
- 파이프 분기: audio → stage='stt', video direct-play → stage='thumbnail',
|
|
video quarantine → stage 없음 (처리 안 함, UI 에서 재생 불가 안내)
|
|
|
|
Web/Blog ingest (devonagent 트랙, plan db-snuggly-petal.md):
|
|
- 스캔 대상: NAS/Web/{domain}/{YYYY-MM-DD}/{slug}.{html,md,json}
|
|
- DEVONthink Smart Rule 이 3종 export → 여기서 .html 만 진입 (sidecar 는 메타 소스)
|
|
- source_channel='devonagent', dedup = file_hash = sha256(canonical_url)
|
|
- first-wins 정책: 같은 canonical_url 재저장은 ingest 안 함
|
|
- sidecar (.json) 누락 시: skip 안 하고 ingest, web_meta.sidecar_missing=true
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
from pathlib import Path
|
|
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
|
|
|
|
from sqlalchemy import select
|
|
|
|
from core.config import settings
|
|
from core.database import async_session
|
|
from core.utils import file_hash, setup_logger
|
|
from models.document import Document
|
|
from models.queue import enqueue_stage
|
|
|
|
logger = setup_logger("file_watcher")
|
|
|
|
# 무시할 파일
|
|
SKIP_NAMES = {".DS_Store", "Thumbs.db", "desktop.ini", "Icon\r"}
|
|
SKIP_EXTENSIONS = {".tmp", ".part", ".crdownload", ".uploading"}
|
|
|
|
# §3 확장자 매핑
|
|
AUDIO_EXTS = {".mp3", ".m4a", ".opus", ".wav", ".flac", ".ogg"}
|
|
VIDEO_DIRECT_EXTS = {".mp4", ".webm"} # 브라우저 direct play
|
|
VIDEO_QUARANTINE_EXTS = {".mov", ".mkv", ".avi"} # 변환 필요, 보관만
|
|
|
|
# library (외부 작성 학습 자료) 폴더 — md/pdf/docx 등 문서 확장자만 수락
|
|
LIBRARY_DOC_EXTS = {".md", ".pdf", ".docx", ".doc", ".txt", ".rtf", ".html", ".odt"}
|
|
|
|
# Web ingest — canonical URL 정규화 시 strip 할 추적 파라미터
|
|
TRACKING_PARAMS = {
|
|
"utm_source", "utm_medium", "utm_campaign", "utm_term", "utm_content",
|
|
"fbclid", "gclid", "msclkid", "ref", "ref_src", "ref_url", "mc_cid", "mc_eid",
|
|
}
|
|
|
|
# 스캔 대상: (PKM 상대 하위경로, 예상 category) — None 은 문서함(카테고리 미지정)
|
|
# 모든 PKM 스캔은 source_channel='drive_sync'. Web 트랙은 별도 처리 (watch_inbox 안).
|
|
SCAN_TARGETS: list[tuple[str, str | None]] = [
|
|
("Inbox", None),
|
|
("Recordings", "audio"),
|
|
("Videos", "video"),
|
|
]
|
|
|
|
# 안전 자료실 A-2/B-4 — watch 타깃별 (material_type, jurisdiction, license) deterministic 축.
|
|
# 키 = 타깃 경로의 마지막 성분. license = extract_meta.license 에 그대로 주입(None=미주입).
|
|
# restricted=true → retrieval_service._license_sql 가 RAG 증거·digest 에서 제외(a안 U-2① —
|
|
# 구매자료 verbatim span 차단, 색인 자체는 허용. 개인 파일 열람은 미차단).
|
|
# 사용자 결정(2026-06-13): Books/Papers=proprietary+restricted / Manuals=proprietary·restricted=false
|
|
# (검색·RAG 활용) / KGS=법정 위임 상세기준 law/KR·KOGL 공공·restricted 아님.
|
|
_TARGET_AXIS: dict[str, tuple[str, str | None, dict | None]] = {
|
|
"KGS_Code": ("law", "KR", {"scheme": "kogl", "redistribute": True,
|
|
"restricted": False, "attribution": "한국가스안전공사(KGS)"}),
|
|
"Books": ("book", None, {"scheme": "proprietary", "redistribute": False,
|
|
"restricted": True, "attribution": "구매 도서"}),
|
|
"Papers_Purchased": ("paper", None, {"scheme": "proprietary", "redistribute": False,
|
|
"restricted": True, "attribution": "구매 논문"}),
|
|
"Manuals": ("manual", None, {"scheme": "proprietary", "redistribute": False,
|
|
"restricted": False, "attribution": "기술 매뉴얼"}),
|
|
}
|
|
|
|
|
|
def should_skip(path: Path) -> bool:
|
|
if path.name in SKIP_NAMES or path.name.startswith("._"):
|
|
return True
|
|
if path.suffix.lower() in SKIP_EXTENSIONS:
|
|
return True
|
|
# .derived / .preview / .thumbs 는 파생물 디렉토리
|
|
if ".derived" in path.parts or ".preview" in path.parts or ".thumbs" in path.parts:
|
|
return True
|
|
# Roon 라이브러리 skip (설정된 경우만)
|
|
roon = settings.roon_library_path
|
|
if roon and str(path).startswith(roon):
|
|
return True
|
|
return False
|
|
|
|
|
|
def _route_media(path: Path, expected_category: str | None) -> tuple[str | None, bool, str | None]:
|
|
"""확장자 기반으로 (category, needs_conversion, next_stage) 결정.
|
|
|
|
- Inbox 드롭: expected_category=None — 문서 확장자면 기존 'extract' 파이프,
|
|
audio/video 확장자면 혼란 방지로 skip (사용자가 Recordings/Videos 로 넣도록 유도)
|
|
- Recordings 드롭: audio 확장자만 수락. 그 외는 skip (log)
|
|
- Videos 드롭: direct-play → category+thumbnail, quarantine → category만 (needs_conversion=true)
|
|
"""
|
|
ext = path.suffix.lower()
|
|
|
|
if expected_category == "audio":
|
|
if ext in AUDIO_EXTS:
|
|
return ("audio", False, "stt")
|
|
return (None, False, None) # audio 폴더에 엉뚱한 포맷 → skip
|
|
|
|
if expected_category == "video":
|
|
if ext in VIDEO_DIRECT_EXTS:
|
|
return ("video", False, "thumbnail")
|
|
if ext in VIDEO_QUARANTINE_EXTS:
|
|
# quarantine — category 설정하되 stage 안 걸어둠 (재생 불가 안내만)
|
|
return ("video", True, None)
|
|
return (None, False, None) # 기타 → skip
|
|
|
|
if expected_category == "library":
|
|
# 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락.
|
|
# frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만.
|
|
if ext in LIBRARY_DOC_EXTS:
|
|
return ("library", False, "extract")
|
|
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
|
|
return (None, False, None) # audio/video 잘못 들어오면 skip
|
|
return (None, False, None) # 기타 알 수 없는 확장자 skip
|
|
|
|
# Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip.
|
|
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
|
|
return (None, False, None)
|
|
return (None, False, "extract")
|
|
|
|
|
|
# ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ──────────────────────────────────
|
|
|
|
def _canonicalize_url(url: str) -> str:
|
|
"""URL 정규화 — UTM/fbclid/fragment/trailing-slash 제거. dedup 의 진짜 기준.
|
|
|
|
같은 글의 utm 변형 (`?utm_source=foo`) 과 fragment 변형 (`#section`) 을
|
|
한 row 로 수렴시키기 위해 file_hash 산출 전 반드시 거친다.
|
|
"""
|
|
if not url:
|
|
return ""
|
|
try:
|
|
p = urlparse(url.strip())
|
|
clean_qs = [
|
|
(k, v) for k, v in parse_qsl(p.query, keep_blank_values=True)
|
|
if k.lower() not in TRACKING_PARAMS
|
|
]
|
|
clean_qs.sort()
|
|
path = p.path.rstrip("/") or "/"
|
|
netloc = p.netloc.lower()
|
|
return urlunparse((p.scheme.lower(), netloc, path, "", urlencode(clean_qs), ""))
|
|
except Exception:
|
|
return url.strip()
|
|
|
|
|
|
def _load_web_sidecar(html_path: Path) -> dict | None:
|
|
"""sibling .json sidecar 읽기. 부재/파싱실패 시 None."""
|
|
json_path = html_path.with_suffix(".json")
|
|
if not json_path.is_file():
|
|
return None
|
|
try:
|
|
return json.loads(json_path.read_text(encoding="utf-8", errors="replace"))
|
|
except Exception as e:
|
|
logger.warning(f"[devonagent] sidecar parse 실패 {json_path}: {e}")
|
|
return None
|
|
|
|
|
|
async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int, int]:
|
|
"""devonagent 트랙: .html 1건을 documents row + extract enqueue 로 등록.
|
|
|
|
- .md/.json 은 sidecar 라 caller 가 skip (여기 진입 안 함)
|
|
- sidecar (.json) 있으면: canonical_url 기반 dedup, web_meta 풍부
|
|
- sidecar 없으면: ingest 하되 web_meta.sidecar_missing=true (조용한 누락 방지)
|
|
- first-wins: 같은 canonical_url 재저장 시 변경 ingest 안 함
|
|
"""
|
|
sidecar = _load_web_sidecar(file_path)
|
|
if sidecar and sidecar.get("url"):
|
|
raw_url = str(sidecar["url"])
|
|
canonical_url = _canonicalize_url(raw_url)
|
|
fhash = hashlib.sha256(canonical_url.encode("utf-8")).hexdigest()
|
|
title = str(sidecar.get("title") or file_path.stem)
|
|
web_meta = {
|
|
"raw_url": raw_url,
|
|
"devonthink_uuid": sidecar.get("devonthink_uuid"),
|
|
"pub_date": sidecar.get("pub_date"),
|
|
"author": sidecar.get("author"),
|
|
"source_agent": sidecar.get("source_agent"),
|
|
}
|
|
edit_url = canonical_url
|
|
else:
|
|
canonical_url = None
|
|
fhash = hashlib.sha256(f"NO_URL:{rel_path}".encode("utf-8")).hexdigest()
|
|
title = file_path.stem
|
|
web_meta = {"sidecar_missing": True}
|
|
edit_url = None
|
|
|
|
# devonagent dedup: file_path OR file_hash (URL identity 우선, path re-slug 흡수)
|
|
result = await session.execute(
|
|
select(Document).where(
|
|
(Document.file_path == rel_path) | (Document.file_hash == fhash)
|
|
)
|
|
)
|
|
existing = result.scalar_one_or_none()
|
|
if existing is not None:
|
|
# first-wins: 변경 ingest 안 함 (Phase 1 정책. 업데이트는 별 PR)
|
|
return (0, 0)
|
|
|
|
doc = Document(
|
|
file_path=rel_path,
|
|
file_hash=fhash,
|
|
file_format="html",
|
|
file_size=file_path.stat().st_size,
|
|
file_type="immutable",
|
|
title=title,
|
|
source_channel="devonagent",
|
|
category="document",
|
|
data_origin="external",
|
|
import_source="devonthink",
|
|
edit_url=edit_url,
|
|
extract_meta={"web_meta": web_meta},
|
|
)
|
|
session.add(doc)
|
|
await session.flush()
|
|
await enqueue_stage(session, doc.id, "extract")
|
|
return (1, 0)
|
|
|
|
|
|
async def watch_inbox():
|
|
"""PKM 하위 디렉토리 + Web/ 를 스캔하여 새/변경 파일을 DB 등록 + 파이프 투입."""
|
|
nas_root = Path(settings.nas_mount_path)
|
|
pkm_root = nas_root / "PKM"
|
|
web_root = nas_root / "Web"
|
|
|
|
if not pkm_root.exists() and not web_root.exists():
|
|
return
|
|
|
|
new_count = 0
|
|
changed_count = 0
|
|
|
|
# 동적 스캔 대상 합성: 기본 (Inbox/Recordings/Videos) + env 로 확장된 library 경로
|
|
# settings.additional_watch_targets 는 PKM 상대 경로 리스트 (예: "Knowledge/Industrial_Safety/가스기사/KGS_Code")
|
|
targets = list(SCAN_TARGETS)
|
|
for extra_path in settings.additional_watch_targets:
|
|
targets.append((extra_path, "library"))
|
|
|
|
async with async_session() as session:
|
|
# ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ───
|
|
if web_root.exists():
|
|
for file_path in web_root.rglob("*.html"):
|
|
if not file_path.is_file() or should_skip(file_path):
|
|
continue
|
|
rel_path = str(file_path.relative_to(nas_root))
|
|
added, _ = await _ingest_web_file(session, file_path, rel_path)
|
|
new_count += added
|
|
|
|
# ─── PKM 트랙 (기존 drive_sync) ─────────────────────────────────────────
|
|
for sub, expected_category in targets:
|
|
scan_root = pkm_root / sub
|
|
if not scan_root.exists():
|
|
continue
|
|
|
|
# 안전 자료실 A-2/B-4 — 타깃 폴더 기반 (material, jurisdiction, license)
|
|
target_mt, target_jur, target_license = _TARGET_AXIS.get(
|
|
Path(sub).name, (None, None, None)
|
|
)
|
|
|
|
for file_path in scan_root.rglob("*"):
|
|
if not file_path.is_file() or should_skip(file_path):
|
|
continue
|
|
|
|
category, needs_conversion, next_stage = _route_media(
|
|
file_path, expected_category
|
|
)
|
|
|
|
# audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에
|
|
# audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip
|
|
if category is None and next_stage is None:
|
|
continue
|
|
|
|
rel_path = str(file_path.relative_to(nas_root))
|
|
fhash = file_hash(file_path)
|
|
|
|
result = await session.execute(
|
|
select(Document).where(Document.file_path == rel_path)
|
|
)
|
|
existing = result.scalar_one_or_none()
|
|
|
|
if existing is None:
|
|
ext = file_path.suffix.lstrip(".").lower() or "unknown"
|
|
doc = Document(
|
|
file_path=rel_path,
|
|
file_hash=fhash,
|
|
file_format=ext,
|
|
file_size=file_path.stat().st_size,
|
|
file_type="immutable",
|
|
title=file_path.stem,
|
|
source_channel="drive_sync",
|
|
category=category,
|
|
needs_conversion=needs_conversion,
|
|
# 안전 자료실 A-2/B-4 — watch 타깃 매핑 (KGS=law/KR 등, 비대상=NULL)
|
|
material_type=target_mt,
|
|
jurisdiction=target_jur,
|
|
)
|
|
# B-4 — 타깃 폴더 license 주입(restricted 포함, 비대상=미주입). classify 는
|
|
# material_type IS NULL 일 때만 제안 + extract_meta 미기록이라 주입 보존.
|
|
if target_license:
|
|
doc.extract_meta = {"license": dict(target_license)}
|
|
session.add(doc)
|
|
await session.flush()
|
|
|
|
if next_stage:
|
|
await enqueue_stage(session, doc.id, next_stage)
|
|
new_count += 1
|
|
|
|
elif existing.file_hash != fhash:
|
|
existing.file_hash = fhash
|
|
existing.file_size = file_path.stat().st_size
|
|
# 기존 문서에 category/quarantine flag 가 비어있으면 보정
|
|
if existing.category is None and category is not None:
|
|
existing.category = category
|
|
if needs_conversion and not getattr(existing, "needs_conversion", False):
|
|
existing.needs_conversion = True
|
|
# B-4 — 축/license 보정(B-4 이전 적재분이 재변경 시): material 미설정 시 주입,
|
|
# license 부재 시에만 merge 주입(clobber 회피 — 기존 extract_meta 키 보존).
|
|
if existing.material_type is None and target_mt is not None:
|
|
existing.material_type = target_mt
|
|
existing.jurisdiction = target_jur
|
|
if target_license and not (existing.extract_meta or {}).get("license"):
|
|
meta = dict(existing.extract_meta or {})
|
|
meta["license"] = dict(target_license)
|
|
existing.extract_meta = meta
|
|
|
|
if next_stage:
|
|
await enqueue_stage(session, existing.id, next_stage)
|
|
changed_count += 1
|
|
|
|
await session.commit()
|
|
|
|
if new_count or changed_count:
|
|
logger.info(f"[Inbox+§3] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")
|
|
else:
|
|
# idle fire 가시화 — PR-NAS-Watch-Folder 검증 시 silent fire 추적 부재 보완
|
|
logger.info("[Inbox+§3] watch_inbox fire — 변경 없음 (idle)")
|