6 Commits

Author SHA1 Message Date
hyungi
dd0d7833f6 feat: DEVONthink 전체 문서 배치 임베딩 스크립트
- batch_embed.py: 9,000+ 문서 배치 임베딩
  - DB별 순차 처리, 500건씩 AppleScript 배치 텍스트 추출
  - GPU bge-m3 배치 임베딩 (32건/호출)
  - Qdrant 배치 upsert (100건/호출)
  - --sync: 삭제된 문서 Qdrant 정리 (고아 포인트 제거)
  - --force: 전체 재임베딩
  - --db: 특정 DB만 처리
  - GPU 헬스체크 + Qdrant UUID 중복 스킵
  - 페이로드: uuid, title, db_name, text_preview, embedded_at

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 16:41:19 +09:00
hyungi
a4f8e56633 feat: 법령 크로스 링크 2-pass + launchd 등록 + RAG thinking 필터
- law_monitor.py: 2-pass 크로스 링크 적용
  - Pass 1: 전체 법령 파싱 + 조문-장 매핑 테이블 생성
  - Pass 2: 「법령명」 제X조 → [[법명_제N장#제X조]] wiki-link 일괄 적용
  - 변경된 법령에만 크로스 링크 적용 후 DEVONthink 임포트
- pkm_api_server.py: RAG 응답에 enable_thinking=false + strip_thinking 적용
- launchd: pkm-api(Flask), law-monitor(07:00), mailplus(07:00+18:00), digest(20:00) plist

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 15:28:36 +09:00
hyungi
c79e26e822 fix: 법령 임포트 경로 수정 — /10_Legislation/Law/{법령명}
기존: /10_Legislation/{법령명} (Law 폴더 누락)
수정: /10_Legislation/Law/{법령명} (architecture 설계 구조와 일치)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 15:05:07 +09:00
hyungi
4b7ddf39c1 feat: 법령 모니터링 대폭 개선 — 장 단위 MD 분할 + 크로스 링크 + Tier 분리
- law_parser.py 신규: XML→MD 장 단위 분할, 조문 앵커 링크, 부칙 분리
  - 장/절/편 자동 식별 (<조문여부>=전문), 장 없는 법령 fallback
  - DEVONthink wiki-link 크로스 링크 (같은 법률 내 + 다른 법률 간)
  - MST 자동 조회 + 7일 TTL 캐시 + 원자적 파일 쓰기
  - 법령 약칭 매핑 (산안법→산업안전보건법 등)

- law_monitor.py 리팩터링:
  - MONITORED_LAWS → Tier 1(15개 필수) / Tier 2(8개 참고, 비활성)
  - law_id → MST 방식 (현행 법령 자동 조회)
  - XML 통짜 저장 → 장별 Markdown 분할 저장
  - DEVONthink 3단계 교체 (이동→생성→삭제, wiki-link 보존)
  - 에러 핸들링: 재시도 3회/백오프 + 부분 실패 허용 + 법령명 검증
  - 실행 결과 law_last_run.json 기록

테스트: 15개 법령 전체 성공 (148개 MD 파일 생성)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 15:00:28 +09:00
hyungi
dc3f03b421 fix: Phase 2 버그 픽스 — JP 번역, API 서버, AppleScript 경로
- pkm_utils.py: strip_thinking() 추가 + llm_generate() no_think 옵션
  - <think> 태그 제거 + thinking 패턴("Wait,", "Let me" 등) 필터링
  - enable_thinking: false 파라미터 지원
- law_monitor.py: JP 번역 호출에 no_think=True 적용
- pkm_api_server.py: /devonthink/stats 최적화 (children 순회 → count 사용)
  + /devonthink/search 한글 쿼리 이스케이프 수정
- auto_classify.scpt: baseDir property로 경로 변수화
- omnifocus_sync.scpt: 로그 경로 변수화

인프라: MailPlus IMAP HOST → LAN IP(192.168.1.227)로 변경
참고: 한국 법령 API IP(122.153.226.74) open.law.go.kr 등록 필요

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 14:00:46 +09:00
hyungi
f21f950c04 docs: architecture.md 대규모 갱신 — GPU 서버 재구성 반영
- ChromaDB → Qdrant 전체 치환 (28건)
- nomic-embed-text → bge-m3 (1024차원) 전체 치환 (12건)
- Qwen2.5-VL-7B → Surya OCR (:8400) 전체 치환 (5건)
- VRAM 다이어그램 갱신 (~11.3GB → ~7-8GB)
- 3-Tier 라우팅 전략, 모델 협업 파이프라인 갱신
- Komga 만화 서버 GPU 서버 이전 반영
- embed_to_chroma.py 삭제 (embed_to_qdrant.py로 대체)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 13:45:16 +09:00
10 changed files with 1234 additions and 283 deletions

View File

@@ -2,7 +2,14 @@
-- Inbox DB 새 문서 → OCR 전처리 → MLX 분류 → 태그 + 메타데이터 + 도메인 DB 이동 → Qdrant 임베딩
-- Smart Rule 설정: Event = On Import, 조건 = Tags is empty
property baseDir : "Documents/code/DEVONThink_my server"
on performSmartRule(theRecords)
set homeDir to POSIX path of (path to home folder)
set pkmRoot to homeDir & baseDir
set venvPython to pkmRoot & "/venv/bin/python3"
set logFile to pkmRoot & "/logs/auto_classify.log"
tell application id "DNtp"
repeat with theRecord in theRecords
try
@@ -13,16 +20,15 @@ on performSmartRule(theRecords)
if docText is "" then
if docType is in {"PDF Document", "JPEG image", "PNG image", "TIFF image"} then
set ocrScript to (POSIX path of (path to home folder)) & "Documents/code/DEVONThink_my server/venv/bin/python3"
set ocrPy to (POSIX path of (path to home folder)) & "Documents/code/DEVONThink_my server/scripts/ocr_preprocess.py"
set ocrPy to pkmRoot & "/scripts/ocr_preprocess.py"
try
set ocrText to do shell script ocrScript & " " & quoted form of ocrPy & " " & quoted form of docUUID
set ocrText to do shell script venvPython & " " & quoted form of ocrPy & " " & quoted form of docUUID
if length of ocrText > 0 then
set plain text of theRecord to ocrText
set docText to ocrText
end if
on error ocrErr
do shell script "echo '[OCR ERROR] " & ocrErr & "' >> ~/Documents/code/DEVONThink_my\\ server/logs/auto_classify.log"
do shell script "echo '[OCR ERROR] " & ocrErr & "' >> " & quoted form of logFile
end try
end if
end if
@@ -39,7 +45,7 @@ on performSmartRule(theRecords)
end if
-- 2. 분류 프롬프트 로딩
set promptPath to (POSIX path of (path to home folder)) & "Documents/code/DEVONThink_my server/scripts/prompts/classify_document.txt"
set promptPath to pkmRoot & "/scripts/prompts/classify_document.txt"
set promptTemplate to do shell script "cat " & quoted form of promptPath
-- 문서 텍스트를 프롬프트에 삽입 (특수문자 이스케이프)
@@ -105,14 +111,13 @@ except:
end if
-- 8. GPU 서버 벡터 임베딩 비동기 전송
set embedScript to (POSIX path of (path to home folder)) & "Documents/code/DEVONThink_my server/venv/bin/python3"
set embedPy to (POSIX path of (path to home folder)) & "Documents/code/DEVONThink_my server/scripts/embed_to_qdrant.py"
do shell script embedScript & " " & quoted form of embedPy & " " & quoted form of docUUID & " &> /dev/null &"
set embedPy to pkmRoot & "/scripts/embed_to_qdrant.py"
do shell script venvPython & " " & quoted form of embedPy & " " & quoted form of docUUID & " &> /dev/null &"
on error errMsg
-- 에러 시 로그 기록 + 검토필요 태그
set tags of theRecord to {"@상태/검토필요", "AI분류실패"}
do shell script "echo '[" & (current date) & "] [auto_classify] [ERROR] " & errMsg & "' >> ~/Documents/code/DEVONThink_my\\ server/logs/auto_classify.log"
do shell script "echo '[" & (current date) & "] [auto_classify] [ERROR] " & errMsg & "' >> " & quoted form of logFile
end try
end repeat
end tell

View File

@@ -2,7 +2,11 @@
-- Projects DB 새 문서에서 TODO 패턴 감지 → OmniFocus 작업 생성
-- Smart Rule 설정: Event = On Import, DB = Projects
property baseDir : "Documents/code/DEVONThink_my server"
on performSmartRule(theRecords)
set homeDir to POSIX path of (path to home folder)
set logFile to homeDir & baseDir & "/logs/omnifocus_sync.log"
tell application id "DNtp"
repeat with theRecord in theRecords
try
@@ -64,7 +68,7 @@ for item in items[:10]:
add custom meta data taskIDString for "omnifocusTaskID" to theRecord
on error errMsg
do shell script "echo '[" & (current date) & "] [omnifocus_sync] [ERROR] " & errMsg & "' >> ~/Documents/code/DEVONThink_my\\ server/logs/omnifocus_sync.log"
do shell script "echo '[" & (current date) & "] [omnifocus_sync] [ERROR] " & errMsg & "' >> " & quoted form of logFile
end try
end repeat
end tell

View File

@@ -169,12 +169,12 @@ DEVONthink 4의 커스텀 메타데이터 필드를 활용합니다.
### AI 결과물 저장 전략 — 중복 저장 금지
GPU 서버에서 처리된 AI 결과물은 **각자 목적에 맞는 곳에만** 저장합니다.
DEVONthink와 ChromaDB에 같은 정보를 이중으로 넣지 않습니다.
DEVONthink와 Qdrant에 같은 정보를 이중으로 넣지 않습니다.
```
처리 결과 저장 위치 이유
───────────────────────────────────────────────────────
벡터 임베딩 ChromaDB만 시맨틱 검색 전용, DEVONthink에선 쓸모없음
벡터 임베딩 Qdrant만 시맨틱 검색 전용, DEVONthink에선 쓸모없음
비전 OCR 텍스트 DEVONthink 본문에 병합 검색 가능한 텍스트가 되어야 하므로 필수
리랭킹 점수 저장 안 함 (휘발) 쿼리 시점에만 의미 있는 일회성 데이터
태그/분류 DEVONthink 태그만 Smart Group, 브라우징에 활용
@@ -183,10 +183,10 @@ OmniFocus 역링크 DEVONthink 메타데이터 양방향 참조에 필요
```
**핵심 원칙:**
- ChromaDB = 벡터 검색 엔진. 여기엔 임베딩만 들어감
- Qdrant = 벡터 검색 엔진. 여기엔 임베딩만 들어감
- DEVONthink = 원본 문서 + 사람이 읽는 메타데이터(태그, 링크)
- 요약/분석은 RAG로 실시간 생성하면 되므로 별도 캐싱 불필요
- 비전 모델의 OCR 결과만 DEVONthink 본문에 반드시 병합 (검색성 확보)
- Surya OCR 결과만 DEVONthink 본문에 반드시 병합 (검색성 확보)
---
@@ -211,7 +211,7 @@ OmniFocus 역링크 DEVONthink 메타데이터 양방향 참조에 필요
DEVONagent ────┤ ┌──────────────┐
스캔 문서 ──────┼──► Inbox ──►│ Smart Rule │──► 자동 태깅
이메일 ────────┤ │ + Ollama API │ + 적절한 DB로 이동
파일 드롭 ──────┘ │ + GPU 서버 │ + 벡터 인덱싱 (ChromaDB)
파일 드롭 ──────┘ │ + GPU 서버 │ + 벡터 인덱싱 (Qdrant)
└──────────────┘ + OCR 텍스트 병합 (스캔 시)
OmniFocus 작업 생성
@@ -225,9 +225,9 @@ DEVONagent ────┤ ┌─────────────
트리거: Inbox DB에 새 문서 추가
조건: 태그가 비어있음
동작:
1. 이미지/스캔 문서 → GPU 서버 VL-7B로 OCR → 본문에 병합
1. 이미지/스캔 문서 → GPU 서버 Surya OCR(:8400)로 OCR → 본문에 병합
2. Mac mini 35B → 태그 + 분류 대상 DB 생성 → DEVONthink 태그에만 저장
3. GPU 서버 nomic-embed → 벡터화 → ChromaDB에만 저장
3. GPU 서버 bge-m3 → 벡터화 → Qdrant에만 저장
4. 태그 기반 도메인 DB 자동 이동:
#주제/프로그래밍, #주제/AI-ML → 05_Programming
#주제/공학, #주제/네트워크 → 03_Engineering
@@ -249,7 +249,7 @@ DEVONagent ────┤ ┌─────────────
동작:
1. 발신자 기준 그룹 자동 생성/분류
2. 첨부파일 추출 → 태그 기반 도메인 DB로 복제 (기술문서→03, 도면→97 등)
3. GPU 서버에서 벡터 임베딩 → ChromaDB 인덱싱
3. GPU 서버에서 벡터 임베딩 → Qdrant 인덱싱
※ 이메일 요약은 저장하지 않음 (RAG로 검색 시 생성)
```
@@ -336,8 +336,8 @@ on performSmartRule(theRecords)
end if
end try
-- Step 4: GPU 서버 → 벡터 임베딩 → ChromaDB 인덱싱 (비동기)
do shell script "python3 ~/scripts/embed_to_chroma.py " & ¬
-- Step 4: GPU 서버 → 벡터 임베딩 → Qdrant 인덱싱 (비동기)
do shell script "python3 ~/scripts/embed_to_qdrant.py " & ¬
quoted form of docUUID & " &"
-- Step 5: 처리 완료 표시
@@ -567,59 +567,59 @@ if __name__ == "__main__":
│ RTX 4070 Ti Super 16GB VRAM │
│ │
│ ┌──────────────────────┐ ┌──────────────────────────────────┐ │
│ │ 👁️ 비전 모델 │ │ 🔍 리랭커 (Reranker) │ │
│ │ Qwen2.5-VL-7B (8Q) │ │ bge-reranker-v2-m3 │ │
│ │ VRAM: ~8GB │ │ VRAM: ~1GB │ │
│ │ │ │ │ │
│ │ 용도: │ │ 용도: │ │
│ │ · 스캔 문서 분석 │ │ · RAG 검색 품질 극대화 │ │
│ │ · 이미지 캡션/태깅 │ │ · 임베딩 검색 후 정밀 재정렬 │ │
│ │ · 차트/그래프 해석 │ │ · Top-K → Top-N 정확도 향상 │ │
│ │ · 사진 자동 분류 │ │ │ │
│ · OCR 보완 │ │ │
│ └──────────────────────┘ └──────────────────────────────────┘ │
│ │ 📄 Surya OCR │ │ 🔍 리랭커 (Reranker) │ │
│ │ FastAPI :8400 │ │ bge-reranker-v2-m3 │ │
│ │ VRAM: ~2-3GB │ │ VRAM: ~1GB │ │
│ │ │ │ │ │
│ │ 용도: │ │ 용도: │ │
│ │ · 스캔 문서 OCR │ │ · RAG 검색 품질 극대화 │ │
│ │ · 이미지 텍스트 추출 │ │ · 임베딩 검색 후 정밀 재정렬 │ │
│ │ · 만화 말풍선 OCR │ │ · Top-K → Top-N 정확도 향상 │ │
│ │ · 한/영/일 다국어 │ │ │ │
└───────────────────────┘ └──────────────────────────────────┘
│ │
│ ┌──────────────────────┐ ┌──────────────────────────────────┐ │
│ │ 🔗 임베딩 모델 │ │ 📊 VRAM 배분 │ │
│ │ nomic-embed-text │ │ │ │
│ │ VRAM: ~0.3GB │ │ 비전 모델 (8Q): ~8GB │ │
│ │ bge-m3 (1024차원) │ │ │ │
│ │ VRAM: ~1.5GB │ │ Surya OCR: ~2-3GB │ │
│ │ │ │ 리랭커: ~1GB │ │
│ │ 용도: │ │ 임베딩: ~0.3GB │ │
│ │ · 문서 벡터 임베딩 │ │ 시스템: ~2GB │ │
│ │ 용도: │ │ 임베딩: ~1.5GB │ │
│ │ · 문서 벡터 임베딩 │ │ Plex HW 트랜스: ~1-2GB │ │
│ │ · RAG 인덱싱 │ │ ───────────────────── │ │
│ │ · 쿼리 임베딩 │ │ 합계: ~11.3GB / 16GB │ │
│ │ │ │ 여유: ~4.7GB ✅ │ │
│ │ · 쿼리 임베딩 │ │ 합계: ~7-8GB / 16GB │ │
│ │ │ │ 여유: ~8-9GB ✅ │ │
│ │ ※ GPU 가속으로 │ │ │ │
│ │ 대량 임베딩 시 유리 │ │ │ │
│ └──────────────────────┘ └──────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 🎬 미디어 서비스 │ │
│ │ Plex Media Server — GPU 하드웨어 트랜스코딩 활용 │ │
│ │ 🎬 미디어 + 만화 서비스 │ │
│ │ Plex Media Server — GPU 하드웨어 트랜스코딩 │ │
│ │ Komga — 만화 서버 (Docker, NFS → NAS /Comic) │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### 임베딩을 GPU 서버로 이전하는 이유
임베딩 모델(nomic-embed-text)을 Mac mini에서 GPU 서버로 이전하는 것을 **권장**합니다:
임베딩 모델(bge-m3)을 Mac mini에서 GPU 서버로 이전하는 것을 **권장**합니다:
| 비교 항목 | Mac mini에서 실행 | GPU 서버에서 실행 |
|---|---|---|
| **대량 인덱싱 속도** | CPU 기반, 느림 | CUDA 가속, 5-10배 빠름 |
| **Mac mini 부하** | 35B 모델 + 임베딩 동시 시 경합 | 35B 모델 전용, 쾌적 |
| **VRAM 영향** | 해당 없음 | +0.3GB (무시할 수준) |
| **VRAM 영향** | 해당 없음 | +1.5GB (bge-m3, 1024차원) |
| **네트워크 레이턴시** | 없음 | 2.5G 네트워크, 1ms 미만 |
| **배치 처리** | 문서 100개 인덱싱 시 수분 | 문서 100개 인덱싱 시 수십초 |
| **ChromaDB 위치** | Mac mini 유지 | Mac mini 유지 (변동 없음) |
| **Qdrant 위치** | Mac mini 유지 | Mac mini 유지 (변동 없음) |
**결론:** 임베딩 모델은 단일 요청 레이턴시보다 **배치 처리량**이 중요합니다.
GPU 서버의 CUDA 가속을 활용하면 대량 문서 인덱싱이 훨씬 빨라지고,
Mac mini의 통합메모리를 35B 모델에 온전히 할당할 수 있습니다.
nomic-embed-text는 0.3GB에 불과해 GPU 서버 VRAM에 거의 영향이 없고,
bge-m3는 ~1.5GB로 GPU 서버 VRAM 16GB 대비 여유 충분하고,
2.5G 네트워크 환경이라 API 호출 레이턴시도 무시할 수준입니다.
다만 **ChromaDB는 Mac mini에 유지**합니다. RAG 질의 시 벡터 검색 →
다만 **Qdrant는 Mac mini에 유지**합니다. RAG 질의 시 벡터 검색 →
리랭킹 → 35B 응답 생성이 연속으로 일어나는데, 벡터 DB가 로컬에 있어야
이 파이프라인이 가장 빠릅니다.
@@ -638,15 +638,15 @@ nomic-embed-text는 0.3GB에 불과해 GPU 서버 VRAM에 거의 영향이 없
│ Mac mini │ │ Claude │ │ GPU 서버 │
│ (메인) │ │ (클라우드) │ │ (보조) │
├─────────────────┤ ├──────────────┤ ├────────────────────┤
│ Qwen3.5-35B-A3B │ │ Sonnet 4.6 │ │ Qwen2.5-VL-7B (8Q)
│ Qwen3.5-35B-A3B │ │ Sonnet 4.6 │ │ Surya OCR (:8400)
│ 4Q / ~80 tok/s │ │ │ │ bge-reranker-v2-m3 │
│ │ │ │ │ nomic-embed-text
│ │ │ │ │ bge-m3 (1024차원)
├─────────────────┤ ├──────────────┤ ├────────────────────┤
│ · 자동 태깅/분류 │ │ · 심층 분석 │ │ · 이미지/스캔 분석
│ · 자동 태깅/분류 │ │ · 심층 분석 │ │ · 스캔/이미지 OCR
│ · 문서 요약 │ │ · 리서치 합성 │ │ · RAG 리랭킹 │
│ · 메타데이터 │ │ · 보고서 생성 │ │ · 문서 임베딩/인덱싱│
│ · 액션아이템추출 │ │ · 복잡한 추론 │ │ · 사진 자동 분류
│ · RAG 응답생성 │ │ · 다국어 번역 │ │ · OCR 후처리
│ · 액션아이템추출 │ │ · 복잡한 추론 │ │ · 만화 텍스트 추출
│ · RAG 응답생성 │ │ · 다국어 번역 │ │ · 한/영/일 다국어
├─────────────────┤ ├──────────────┤ ├────────────────────┤
│ 속도: ~80 tok/s │ │ 속도: ~3초 │ │ 속도: GPU 가속 │
│ 비용: 무료 │ │ 비용: 과금 │ │ 비용: 무료 │
@@ -659,13 +659,13 @@ nomic-embed-text는 0.3GB에 불과해 GPU 서버 VRAM에 거의 영향이 없
| 조건 | 라우팅 | 이유 |
|---|---|---|
| 텍스트 문서 + 태깅/분류/요약 | Tier 1 (Mac mini 35B) | 메인 범용, 품질 충분 |
| 이미지 포함 문서 / 스캔 PDF | Tier 3 → Tier 1 | 비전 모델로 텍스트 추출 후 35B로 분석 |
| 이미지 포함 문서 / 스캔 PDF | Tier 3 → Tier 1 | Surya OCR로 텍스트 추출 후 35B로 분석 |
| 심층 분석 / 긴 보고서 생성 | Tier 2 (Claude API) | 최고 품질 필요 시 |
| RAG 검색 결과 리랭킹 | Tier 3 (GPU reranker) | 검색 정확도 극대화 |
| RAG 최종 응답 생성 | Tier 1 (Mac mini 35B) | 컨텍스트 기반 응답 |
| 새 문서 벡터 인덱싱 | Tier 3 (GPU embed) | CUDA 가속 배치 처리 |
| 대량 배치 (100+ 문서) | Tier 1 + Tier 3 병렬 | 양쪽 분산 처리 |
| Synology Photos 자동 태깅 | Tier 3 (GPU vision) | 이미지 분석 특화 |
| 만화 OCR (Komga 연동) | Tier 3 (GPU Surya OCR) | GPU 서버 로컬 처리 |
### 모델 간 협업 파이프라인
@@ -674,26 +674,26 @@ nomic-embed-text는 0.3GB에 불과해 GPU 서버 VRAM에 거의 영향이 없
1. [Smart Rule 트리거] 새 PDF 감지, 이미지 기반 문서로 판단
2. [GPU 서버 · Qwen2.5-VL-7B 8Q]
이미지 분석 → 텍스트 추출 (OCR) → DEVONthink 본문에 병합
2. [GPU 서버 · Surya OCR :8400]
이미지/스캔 PDF → OCR 텍스트 추출 → DEVONthink 본문에 병합
3. [Mac mini · Qwen3.5-35B-A3B]
추출된 텍스트로 태그 생성 → DEVONthink 태그에만 저장
4. [GPU 서버 · nomic-embed-text]
문서 벡터 임베딩 → ChromaDB에만 저장
4. [GPU 서버 · bge-m3]
문서 벡터 임베딩 → Qdrant에만 저장
5. [결과] DEVONthink에는 본문(OCR)+태그+처리일시만
ChromaDB에는 벡터만. 요약은 저장하지 않음 (RAG로 실시간 생성)
Qdrant에는 벡터만. 요약은 저장하지 않음 (RAG로 실시간 생성)
예시: RAG 질의 시
1. [사용자 질문] "서버 마이그레이션 관련 자료 정리해줘"
2. [GPU 서버 · nomic-embed-text] 쿼리 임베딩
2. [GPU 서버 · bge-m3] 쿼리 임베딩
3. [Mac mini · ChromaDB] 벡터 유사도 검색 → Top-20 후보
3. [Mac mini · Qdrant] 벡터 유사도 검색 → Top-20 후보
4. [GPU 서버 · bge-reranker-v2-m3]
Top-20 → 정밀 리랭킹 → Top-5 선정
@@ -714,9 +714,9 @@ OLLAMA_MAX_LOADED_MODELS=3 # 동시 로드 모델 3개 (비전+리랭커+
OLLAMA_KEEP_ALIVE=10m # 미사용 시 10분 후 언로드
# 모델 다운로드
ollama pull qwen2.5-vl:7b-instruct-q8_0 # 비전 모델 8Q (~8GB)
# Surya OCR은 별도 systemd 서비스로 운영 (:8400)
ollama pull bge-reranker-v2-m3 # 리랭커 (~1GB)
ollama pull nomic-embed-text # 임베딩 (~0.3GB)
ollama pull bge-m3 # 임베딩 (~1.5GB, 1024차원)
# Mac mini에서 GPU 서버 호출 예시
# 비전 분석
@@ -725,11 +725,11 @@ curl http://gpu-server:11434/api/generate \
# 임베딩 (배치)
curl http://gpu-server:11434/api/embed \
-d '{"model":"nomic-embed-text", "input":["문서1 텍스트", "문서2 텍스트", ...]}'
-d '{"model":"bge-m3", "input":["문서1 텍스트", "문서2 텍스트", ...]}'
```
**`keep_alive` 활용 전략:**
- 비전 모델 (8Q): `keep_alive: "30m"` — 자주 사용, 항상 대기
- Surya OCR: systemd 서비스로 상시 구동 (포트 8400)
- 리랭커: `keep_alive: "10m"` — RAG 쿼리 시 활성
- 임베딩: `keep_alive: "30m"` — 새 문서 인덱싱 빈도에 맞춰
@@ -750,20 +750,20 @@ curl http://gpu-server:11434/api/embed \
│ [청킹] → 의미 단위로 텍스트 분할 (500토큰) │
│ │ │
│ ▼ │
│ [임베딩] → GPU 서버 Ollama (nomic-embed-text, CUDA) │
│ [임베딩] → GPU 서버 Ollama (bge-m3, CUDA) │
│ │ │
│ ▼ │
│ [벡터 저장] → ChromaDB (Mac mini 로컬) │
│ [벡터 저장] → Qdrant (Mac mini 로컬) │
│ │ │
│ ─ ─ ─ ─ ─ ─ 쿼리 시 ─ ─ ─ ─ ─ ─ │
│ │ │
│ [질문 입력] │
│ │ │
│ ▼ │
│ [쿼리 임베딩] → GPU 서버 (nomic-embed-text) │
│ [쿼리 임베딩] → GPU 서버 (bge-m3) │
│ │ │
│ ▼ │
│ [유사도 검색] → ChromaDB (Mac mini, Top-20) │
│ [유사도 검색] → Qdrant (Mac mini, Top-20) │
│ │ │
│ ▼ │
│ [리랭킹] → GPU 서버 (bge-reranker, Top-5 선정) │
@@ -841,7 +841,7 @@ Smart Rule 2차: 하위 그룹 라우팅
→ 80_Reference/Standards/
ChromaDB 벡터 인덱싱 (비동기)
Qdrant 벡터 인덱싱 (비동기)
→ RAG 검색에 즉시 반영
@@ -1023,7 +1023,7 @@ Mac mini에서는 **자동 스케줄 리서치**, 맥북에서는 **현장 수
│ 배치 + 자동화 중심 │ 인터랙티브 + 즉시성 중심 │
├────────────────────────┴────────────────────────────────┤
│ 공통: 결과는 모두 DEVONthink Inbox → CloudKit 동기화 │
│ → Mac mini Smart Rule이 자동 태깅 + ChromaDB 인덱싱 │
│ → Mac mini Smart Rule이 자동 태깅 + Qdrant 인덱싱 │
└─────────────────────────────────────────────────────────┘
```
@@ -1095,7 +1095,7 @@ DEVONthink에서 자료 검색/열람 (동기화된 DB)
[RAG 질의 시]
Tailscale 연결 → RAG API에 자연어 질문
→ Mac mini에서 GPU 임베딩 → ChromaDB 검색 → 리랭킹 → 35B 응답
→ Mac mini에서 GPU 임베딩 → Qdrant 검색 → 리랭킹 → 35B 응답
→ 결과에 x-devonthink-item:// 링크 포함
→ 맥북 DEVONthink에서 해당 문서 바로 열기
@@ -1183,7 +1183,7 @@ RAG 시스템으로 내 지식베이스에 질문
│ 완료 5건 | 신규 3건 | 기한초과 1건 │
│ │
│ ■ 시스템 상태 │
ChromaDB 벡터: 12,847개 (+15) │
Qdrant 벡터: 12,847개 (+15) │
│ Inbox 잔여: 2건 │
│ NAS 동기화: 정상 │
└─────────────────────────────────────────────┘
@@ -1194,7 +1194,7 @@ RAG 시스템으로 내 지식베이스에 질문
· Inbox 미처리 3건 이상 → "Inbox 정리 필요 (N건 미분류)"
· 시정조치 overdue → "시정조치 기한초과: [내용]" (긴급 플래그)
· 분류 실패 문서 존재 → "수동 분류 필요 (N건)"
· ChromaDB 인덱싱 실패 → "벡터 인덱싱 오류 점검"
· Qdrant 인덱싱 실패 → "벡터 인덱싱 오류 점검"
출력 3 — Synology Chat 알림 (선택, 한 줄 요약):
"📋 오늘 다이제스트: 신규 12건, 법령변경 2건, overdue 1건 ⚠"
@@ -1222,7 +1222,7 @@ RAG 시스템으로 내 지식베이스에 질문
end tell
5. 시스템 상태 — Python
ChromaDB collection.count(), NAS ping, sync 로그 확인
Qdrant collection.count(), NAS ping, sync 로그 확인
6. 상위 뉴스 요약 — Ollama 35B
오늘 수집된 뉴스 중 상위 3건을 2-3문장으로 요약
@@ -1259,8 +1259,8 @@ OmniFocus 리뷰 → 완료 작업의 DEVONthink 메타데이터 업데이트
□ DEVONsphere Express 설치
□ OmniFocus, OmniOutliner, OmniGraffle, OmniPlan 설치
□ Ollama 확인 (이미 설치됨)
□ GPU 서버에 nomic-embed-text, Qwen2.5-VL-7B 8Q, bge-reranker 다운로드
ChromaDB 설치 (pip install chromadb) — Mac mini
□ GPU 서버에 bge-m3, bge-reranker 다운로드 + Surya OCR 서비스 설치
Qdrant (Docker, Mac mini) — pkm_documents 컬렉션 (1024차원, Cosine)
□ Python 환경 설정 (venv 권장)
□ Plex Media Server를 GPU 서버로 이전
```
@@ -1288,7 +1288,7 @@ OmniFocus 리뷰 → 완료 작업의 DEVONthink 메타데이터 업데이트
```
□ Ollama 태깅/분류 프롬프트 최적화
□ Claude API 키 Keychain 등록
□ RAG 파이프라인 구축 (GPU 서버 임베딩 + Mac mini ChromaDB)
□ RAG 파이프라인 구축 (GPU bge-m3 임베딩 + Mac mini Qdrant + MLX 35B 응답)
□ DEVONthink Smart Rule과 AI 연동 테스트
□ DEVONagent 자동 검색 스케줄 설정
```
@@ -1325,7 +1325,7 @@ OmniPlan 0.5GB 낮음
OmniOutliner 0.3GB 낮음
OmniGraffle 0.5GB 낮음
MLX (Qwen3.5-35B-A3B 4bit) ~20GB 중간 MoE: 3B만 활성
ChromaDB 1-2GB 낮음
Qdrant (Docker) 1-2GB 낮음
Roon Core 2-4GB 낮음
Komga 0.5GB 낮음
기타 시스템 4-6GB -
@@ -1347,9 +1347,9 @@ Plex를 GPU 서버로 이전하고 임베딩도 GPU로 넘김으로써, Mac mini
```
서비스 VRAM 상태 비고
─────────────────────────────────────────────────────────────
Qwen2.5-VL-7B (8Q) ~8GB 상주 비전/이미지 분석
Surya OCR (systemd) ~2-3GB 상주 문서/만화 OCR
bge-reranker-v2-m3 ~1GB 상주 RAG 리랭킹
nomic-embed-text ~0.3GB 상주 임베딩 (CUDA 가속)
bge-m3 (1024차원) ~1.5GB 상주 임베딩 (CUDA 가속)
Plex HW Transcoding ~1-2GB 간헐적 NVENC/NVDEC 활용
시스템 오버헤드 ~2GB -

View File

@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key>
<string>net.hyungi.pkm-api</string>
<key>ProgramArguments</key>
<array>
<string>/Users/hyungi/Documents/code/DEVONThink_my server/venv/bin/python3</string>
<string>/Users/hyungi/Documents/code/DEVONThink_my server/scripts/pkm_api_server.py</string>
<string>9900</string>
</array>
<key>WorkingDirectory</key>
<string>/Users/hyungi/Documents/code/DEVONThink_my server</string>
<key>RunAtLoad</key>
<true/>
<key>KeepAlive</key>
<true/>
<key>StandardOutPath</key>
<string>/Users/hyungi/Documents/code/DEVONThink_my server/logs/pkm-api.log</string>
<key>StandardErrorPath</key>
<string>/Users/hyungi/Documents/code/DEVONThink_my server/logs/pkm-api.error.log</string>
</dict>
</plist>

359
scripts/batch_embed.py Normal file
View File

@@ -0,0 +1,359 @@
#!/usr/bin/env python3
"""
DEVONthink 전체 문서 배치 임베딩
- DB별 순차 처리, 500건씩 AppleScript 배치 텍스트 추출
- GPU bge-m3 배치 임베딩 (32건/호출)
- Qdrant 배치 upsert (100건/호출)
- --sync: 삭제된 문서 Qdrant 정리
- --force: 전체 재임베딩
- --db: 특정 DB만 처리
사용법:
python3 batch_embed.py # 신규 문서만
python3 batch_embed.py --sync # 신규 + 삭제 동기화
python3 batch_embed.py --force # 전체 재임베딩
python3 batch_embed.py --db "04_Industrial safety"
"""
import argparse
import sys
import uuid as uuid_mod
import time
import requests
from pathlib import Path
from datetime import datetime
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline
logger = setup_logger("batch_embed")
QDRANT_URL = "http://localhost:6333"
COLLECTION = "pkm_documents"
EMBED_BATCH_SIZE = 32
QDRANT_BATCH_SIZE = 100
APPLESCRIPT_CHUNK = 500
# --- GPU 헬스체크 ---
def check_gpu_health(gpu_ip: str) -> bool:
"""GPU bge-m3 API ping"""
try:
resp = requests.post(
f"http://{gpu_ip}:11434/api/embed",
json={"model": "bge-m3", "input": ["test"]},
timeout=10,
)
return resp.status_code == 200
except Exception:
return False
# --- Qdrant ---
def get_existing_uuids_from_qdrant() -> set[str]:
"""Qdrant에 이미 저장된 UUID 집합 조회"""
uuids = set()
offset = None
while True:
body = {"limit": 1000, "with_payload": {"include": ["uuid"]}}
if offset:
body["offset"] = offset
resp = requests.post(
f"{QDRANT_URL}/collections/{COLLECTION}/points/scroll",
json=body, timeout=30,
)
resp.raise_for_status()
result = resp.json()["result"]
points = result.get("points", [])
for p in points:
uuid_val = p.get("payload", {}).get("uuid")
if uuid_val:
uuids.add(uuid_val)
offset = result.get("next_page_offset")
if not offset or not points:
break
return uuids
def delete_from_qdrant(point_ids: list[int]):
"""Qdrant에서 포인트 삭제"""
if not point_ids:
return
resp = requests.post(
f"{QDRANT_URL}/collections/{COLLECTION}/points/delete",
json={"points": point_ids},
timeout=30,
)
resp.raise_for_status()
def uuid_to_point_id(doc_uuid: str) -> int:
return uuid_mod.uuid5(uuid_mod.NAMESPACE_URL, doc_uuid).int >> 64
def store_batch_in_qdrant(docs: list[dict]):
"""Qdrant 배치 upsert"""
if not docs:
return
points = []
for doc in docs:
points.append({
"id": uuid_to_point_id(doc["uuid"]),
"vector": doc["embedding"],
"payload": {
"uuid": doc["uuid"],
"title": doc["title"],
"db_name": doc.get("db_name", ""),
"text_preview": doc.get("text", "")[:200],
"source": "devonthink",
"embedded_at": datetime.now().isoformat(),
},
})
for i in range(0, len(points), QDRANT_BATCH_SIZE):
batch = points[i:i + QDRANT_BATCH_SIZE]
resp = requests.put(
f"{QDRANT_URL}/collections/{COLLECTION}/points",
json={"points": batch},
timeout=60,
)
resp.raise_for_status()
# --- GPU 임베딩 ---
def get_embeddings_batch(texts: list[str], gpu_ip: str) -> list[list[float]]:
"""GPU bge-m3 배치 임베딩 (4000자 제한 — bge-m3 토큰 한도 고려)"""
truncated = [t[:4000] for t in texts]
resp = requests.post(
f"http://{gpu_ip}:11434/api/embed",
json={"model": "bge-m3", "input": truncated},
timeout=120,
)
resp.raise_for_status()
return resp.json().get("embeddings", [])
# --- DEVONthink 텍스트 추출 ---
def get_db_names() -> list[str]:
"""DEVONthink DB 이름 목록"""
script = '''
tell application id "DNtp"
set dbNames to {}
repeat with db in databases
set end of dbNames to name of db
end repeat
set AppleScript's text item delimiters to linefeed
return dbNames as text
end tell
'''
result = run_applescript_inline(script)
return [n.strip() for n in result.split("\n") if n.strip()]
def get_db_document_uuids(db_name: str) -> list[str]:
"""특정 DB의 임베딩 대상 UUID 목록 (그룹 제외, 텍스트 10자 이상)"""
script = f'''
tell application id "DNtp"
set theDB to database "{db_name}"
set allDocs to contents of theDB
set output to {{}}
repeat with rec in allDocs
try
set recType to type of rec as string
if recType is not "group" then
set recText to plain text of rec
if length of recText > 10 then
set end of output to uuid of rec
end if
end if
end try
end repeat
set AppleScript's text item delimiters to linefeed
return output as text
end tell
'''
try:
result = run_applescript_inline(script)
return [u.strip() for u in result.split("\n") if u.strip()]
except Exception as e:
logger.error(f"UUID 수집 실패 [{db_name}]: {e}")
return []
def get_documents_batch(uuids: list[str]) -> list[dict]:
"""UUID 리스트로 배치 텍스트 추출 (AppleScript 1회 호출)"""
if not uuids:
return []
# UUID를 AppleScript 리스트로 변환
uuid_list = ", ".join(f'"{u}"' for u in uuids)
script = f'''
tell application id "DNtp"
set uuidList to {{{uuid_list}}}
set output to {{}}
repeat with u in uuidList
try
set theRecord to get record with uuid u
set recText to plain text of theRecord
set recTitle to name of theRecord
set recDB to name of database of theRecord
if length of recText > 8000 then
set recText to text 1 thru 8000 of recText
end if
set end of output to u & "|||" & recTitle & "|||" & recDB & "|||" & recText
on error
set end of output to u & "|||ERROR|||||||"
end try
end repeat
set AppleScript's text item delimiters to linefeed & "<<<>>>" & linefeed
return output as text
end tell
'''
try:
result = run_applescript_inline(script)
except Exception as e:
logger.error(f"배치 텍스트 추출 실패: {e}")
return []
docs = []
for entry in result.split("\n<<<>>>\n"):
entry = entry.strip()
if not entry or "|||ERROR|||" in entry:
continue
parts = entry.split("|||", 3)
if len(parts) >= 4:
text = parts[3].strip()
if len(text) >= 10:
docs.append({
"uuid": parts[0].strip(),
"title": parts[1].strip(),
"db_name": parts[2].strip(),
"text": text,
})
return docs
# --- 메인 배치 ---
def run_batch(gpu_ip: str, target_db: str = None, force: bool = False, sync: bool = False):
"""배치 임베딩 실행"""
# GPU 헬스체크
if not check_gpu_health(gpu_ip):
logger.error(f"GPU 서버 연결 실패 ({gpu_ip}) — 종료")
sys.exit(1)
logger.info(f"GPU 서버 연결 확인: {gpu_ip}")
# 기존 임베딩 UUID 조회
existing_uuids = set()
if not force:
existing_uuids = get_existing_uuids_from_qdrant()
logger.info(f"Qdrant 기존 임베딩: {len(existing_uuids)}")
# DB 목록
db_names = [target_db] if target_db else get_db_names()
logger.info(f"처리 대상 DB: {db_names}")
total_embedded = 0
total_skipped = 0
total_failed = 0
all_dt_uuids = set()
for db_name in db_names:
logger.info(f"--- DB: {db_name} ---")
# UUID 수집
uuids = get_db_document_uuids(db_name)
all_dt_uuids.update(uuids)
logger.info(f" 문서: {len(uuids)}")
# 기존 스킵
if not force:
new_uuids = [u for u in uuids if u not in existing_uuids]
skipped = len(uuids) - len(new_uuids)
total_skipped += skipped
if skipped > 0:
logger.info(f" 스킵: {skipped}건 (이미 임베딩)")
uuids = new_uuids
if not uuids:
continue
# 500건씩 AppleScript 배치 텍스트 추출
for chunk_start in range(0, len(uuids), APPLESCRIPT_CHUNK):
chunk_uuids = uuids[chunk_start:chunk_start + APPLESCRIPT_CHUNK]
docs = get_documents_batch(chunk_uuids)
if not docs:
continue
# 32건씩 GPU 임베딩
for batch_start in range(0, len(docs), EMBED_BATCH_SIZE):
batch = docs[batch_start:batch_start + EMBED_BATCH_SIZE]
texts = [d["text"] for d in batch]
try:
embeddings = get_embeddings_batch(texts, gpu_ip)
if len(embeddings) != len(batch):
logger.warning(f"임베딩 수 불일치: {len(embeddings)} != {len(batch)}")
total_failed += len(batch)
continue
for doc, emb in zip(batch, embeddings):
doc["embedding"] = emb
store_batch_in_qdrant(batch)
total_embedded += len(batch)
except Exception as e:
logger.error(f"배치 임베딩 실패: {e}")
total_failed += len(batch)
progress = chunk_start + len(chunk_uuids)
logger.info(f" 진행: {progress}/{len(uuids)}")
# --sync: 고아 포인트 삭제
orphan_deleted = 0
if sync and all_dt_uuids:
orphan_uuids = existing_uuids - all_dt_uuids
if orphan_uuids:
orphan_ids = [uuid_to_point_id(u) for u in orphan_uuids]
delete_from_qdrant(orphan_ids)
orphan_deleted = len(orphan_uuids)
logger.info(f"고아 포인트 삭제: {orphan_deleted}")
# 통계
logger.info("=== 배치 임베딩 완료 ===")
logger.info(f" 임베딩: {total_embedded}")
logger.info(f" 스킵: {total_skipped}")
logger.info(f" 실패: {total_failed}")
if orphan_deleted:
logger.info(f" 고아 삭제: {orphan_deleted}")
# Qdrant 최종 카운트
try:
resp = requests.get(f"{QDRANT_URL}/collections/{COLLECTION}", timeout=10)
count = resp.json()["result"]["points_count"]
logger.info(f" Qdrant 총 포인트: {count}")
except Exception:
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="DEVONthink 배치 임베딩")
parser.add_argument("--force", action="store_true", help="전체 재임베딩")
parser.add_argument("--sync", action="store_true", help="삭제 동기화 포함")
parser.add_argument("--db", type=str, help="특정 DB만 처리")
args = parser.parse_args()
creds = load_credentials()
gpu_ip = creds.get("GPU_SERVER_IP")
if not gpu_ip:
logger.error("GPU_SERVER_IP 미설정")
sys.exit(1)
run_batch(gpu_ip, target_db=args.db, force=args.force, sync=args.sync)

View File

@@ -1,104 +0,0 @@
#!/usr/bin/env python3
"""
벡터 임베딩 스크립트
- DEVONthink 문서 UUID로 텍스트 추출
- GPU 서버(nomic-embed-text)로 임베딩 생성
- ChromaDB에 저장
"""
import os
import sys
import requests
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline
logger = setup_logger("embed")
# ChromaDB 저장 경로
CHROMA_DIR = Path.home() / ".local" / "share" / "pkm" / "chromadb"
CHROMA_DIR.mkdir(parents=True, exist_ok=True)
def get_document_text(uuid: str) -> tuple[str, str]:
"""DEVONthink에서 UUID로 문서 텍스트 + 제목 추출"""
script = f'''
tell application id "DNtp"
set theRecord to get record with uuid "{uuid}"
set docText to plain text of theRecord
set docTitle to name of theRecord
return docTitle & "|||" & docText
end tell
'''
result = run_applescript_inline(script)
parts = result.split("|||", 1)
title = parts[0] if len(parts) > 0 else ""
text = parts[1] if len(parts) > 1 else ""
return title, text
def get_embedding(text: str, gpu_server_ip: str) -> list[float] | None:
"""GPU 서버의 nomic-embed-text로 임베딩 생성"""
url = f"http://{gpu_server_ip}:11434/api/embeddings"
try:
resp = requests.post(url, json={
"model": "nomic-embed-text",
"prompt": text[:8000] # 토큰 제한
}, timeout=60)
resp.raise_for_status()
return resp.json().get("embedding")
except Exception as e:
logger.error(f"임베딩 생성 실패: {e}")
return None
def store_in_chromadb(doc_id: str, title: str, text: str, embedding: list[float]):
"""ChromaDB에 저장"""
import chromadb
client = chromadb.PersistentClient(path=str(CHROMA_DIR))
collection = client.get_or_create_collection(
name="pkm_documents",
metadata={"hnsw:space": "cosine"}
)
collection.upsert(
ids=[doc_id],
embeddings=[embedding],
documents=[text[:2000]],
metadatas=[{"title": title, "source": "devonthink"}]
)
logger.info(f"ChromaDB 저장: {doc_id} ({title[:30]})")
def run(uuid: str):
"""단일 문서 임베딩 처리"""
logger.info(f"임베딩 처리 시작: {uuid}")
creds = load_credentials()
gpu_ip = creds.get("GPU_SERVER_IP")
if not gpu_ip:
logger.warning("GPU_SERVER_IP 미설정 — 임베딩 건너뜀")
return
try:
title, text = get_document_text(uuid)
if not text or len(text) < 10:
logger.warning(f"텍스트 부족 [{uuid}]: {len(text)}")
return
embedding = get_embedding(text, gpu_ip)
if embedding:
store_in_chromadb(uuid, title, text, embedding)
logger.info(f"임베딩 완료: {uuid}")
else:
logger.error(f"임베딩 실패: {uuid}")
except Exception as e:
logger.error(f"임베딩 처리 에러 [{uuid}]: {e}", exc_info=True)
if __name__ == "__main__":
if len(sys.argv) < 2:
print("사용법: python3 embed_to_chroma.py <DEVONthink_UUID>")
sys.exit(1)
run(sys.argv[1])

View File

@@ -17,18 +17,50 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger, load_credentials, run_applescript_inline, llm_generate, PROJECT_ROOT, DATA_DIR
from law_parser import (
parse_law_xml, save_law_as_markdown, build_article_chapter_map,
add_cross_law_links, lookup_current_mst, atomic_write_json,
)
logger = setup_logger("law_monitor")
# 모니터링 대상 법령
MONITORED_LAWS = [
{"name": "산업안전보건법", "law_id": "001789", "category": "법률"},
{"name": "산업안전보건법 시행령", "law_id": "001790", "category": "대통령령"},
{"name": "산업안전보건법 시행규칙", "law_id": "001791", "category": "부령"},
{"name": "중대재해 처벌 등에 관한 법률", "law_id": "019005", "category": "법률"},
{"name": "중대재해 처벌 등에 관한 법률 시행령", "law_id": "019006", "category": "대통령령"},
{"name": "화학물질관리법", "law_id": "012354", "category": "법률"},
{"name": "위험물안전관리법", "law_id": "001478", "category": "법률"},
MST_CACHE_FILE = DATA_DIR / "law_mst_cache.json"
MD_OUTPUT_DIR = DATA_DIR / "laws" / "md"
MD_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# Tier 1 — 필수 모니터링 (업무 직접 관련, 매일 확인)
TIER1_LAWS = [
# 산업안전 핵심
{"name": "산업안전보건법", "category": "법률"},
{"name": "산업안전보건법 시행령", "category": "대통령령"},
{"name": "산업안전보건법 시행규칙", "category": "부령"},
{"name": "중대재해 처벌 등에 관한 법률", "category": "법률"},
{"name": "중대재해 처벌 등에 관한 법률 시행령", "category": "대통령령"},
# 화학/위험물
{"name": "화학물질관리법", "category": "법률"},
{"name": "위험물안전관리법", "category": "법률"},
{"name": "고압가스 안전관리법", "category": "법률"},
# 전기/소방/건설
{"name": "전기안전관리법", "category": "법률"},
{"name": "소방시설 설치 및 관리에 관한 법률", "category": "법률"},
{"name": "건설기술 진흥법", "category": "법률"},
# 시설물/노동
{"name": "시설물의 안전 및 유지관리에 관한 특별법", "category": "법률"},
{"name": "근로기준법", "category": "법률"},
{"name": "산업재해보상보험법", "category": "법률"},
{"name": "근로자참여 및 협력증진에 관한 법률", "category": "법률"},
]
# Tier 2 — 참고 (기본 비활성, --include-tier2 또는 설정으로 활성화)
TIER2_LAWS = [
{"name": "원자력안전법", "category": "법률"},
{"name": "방사선안전관리법", "category": "법률"},
{"name": "환경영향평가법", "category": "법률"},
{"name": "석면안전관리법", "category": "법률"},
{"name": "승강기 안전관리법", "category": "법률"},
{"name": "연구실 안전환경 조성에 관한 법률", "category": "법률"},
{"name": "재난 및 안전관리 기본법", "category": "법률"},
{"name": "고용보험법", "category": "법률"},
]
# 마지막 확인 일자 저장 파일
@@ -46,37 +78,36 @@ def load_last_check() -> dict:
def save_last_check(data: dict):
"""마지막 확인 일자 저장"""
with open(LAST_CHECK_FILE, "w") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
"""마지막 확인 일자 저장 (원자적 쓰기)"""
atomic_write_json(LAST_CHECK_FILE, data)
def fetch_law_info(law_oc: str, law_id: str) -> dict | None:
"""법령 정보 조회 (법령 API)"""
url = "https://www.law.go.kr/DRF/lawSearch.do"
def fetch_law_info(law_oc: str, mst: str) -> dict | None:
"""법령 정보 조회 — lawService.do로 MST 직접 조회 (XML → 기본정보 추출)"""
url = "https://www.law.go.kr/DRF/lawService.do"
params = {
"OC": law_oc,
"target": "law",
"type": "JSON",
"MST": law_id,
"type": "XML",
"MST": mst,
}
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
data = resp.json()
# API 에러 응답 감지
if "result" in data and "실패" in str(data.get("result", "")):
logger.error(f"법령 API 에러 [{law_id}]: {data.get('result')}{data.get('msg')}")
root = ET.fromstring(resp.content)
info_el = root.find(".//기본정보")
if info_el is None:
logger.warning(f"기본정보 없음 [MST={mst}]")
return None
if "LawSearch" in data and "law" in data["LawSearch"]:
laws = data["LawSearch"]["law"]
if isinstance(laws, list):
return laws[0] if laws else None
return laws
logger.warning(f"법령 응답에 데이터 없음 [{law_id}]: {list(data.keys())}")
return None
return {
"법령명한글": (info_el.findtext("법령명_한글", "") or "").strip(),
"공포일자": (info_el.findtext("공포일자", "") or "").strip(),
"시행일자": (info_el.findtext("시행일자", "") or "").strip(),
"법령ID": (info_el.findtext("법령ID", "") or "").strip(),
"소관부처": (info_el.findtext("소관부처", "") or "").strip(),
}
except Exception as e:
logger.error(f"법령 조회 실패 [{law_id}]: {e}")
logger.error(f"법령 조회 실패 [MST={mst}]: {e}")
return None
@@ -109,32 +140,91 @@ def save_law_file(law_name: str, content: str) -> Path:
return filepath
def import_to_devonthink(filepath: Path, law_name: str, category: str):
"""DEVONthink 04_Industrial Safety로 임포트 — 변수 방식"""
fp = str(filepath)
script = f'set fp to "{fp}"\n'
script += 'tell application id "DNtp"\n'
script += ' repeat with db in databases\n'
script += ' if name of db is "04_Industrial safety" then\n'
script += ' set targetGroup to create location "/10_Legislation/Law" in db\n'
script += ' set theRecord to import fp to targetGroup\n'
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n'
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
script += ' exit repeat\n'
script += ' end if\n'
script += ' end repeat\n'
script += 'end tell'
def import_law_to_devonthink(law_name: str, md_files: list[Path], category: str):
"""DEVONthink 04_Industrial Safety로 장별 MD 파일 임포트
3단계 교체: 기존 폴더 이동 → 신규 생성 → 구 폴더 삭제 (wiki-link 끊김 최소화)
"""
safe_name = law_name.replace(" ", "_")
group_path = f"/10_Legislation/Law/{safe_name}"
# 1단계: 기존 폴더 이동 (있으면)
rename_script = (
'tell application id "DNtp"\n'
' repeat with db in databases\n'
' if name of db is "04_Industrial safety" then\n'
f' set oldGroup to get record at "{group_path}" in db\n'
' if oldGroup is not missing value then\n'
f' set name of oldGroup to "{safe_name}_old"\n'
' end if\n'
' exit repeat\n'
' end if\n'
' end repeat\n'
'end tell'
)
try:
run_applescript_inline(script)
logger.info(f"DEVONthink 임포트 완료: {law_name}")
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{law_name}]: {e}")
run_applescript_inline(rename_script)
except Exception:
pass # 기존 폴더 없으면 무시
# 2단계: 신규 폴더 생성 + 파일 임포트
for filepath in md_files:
fp = str(filepath)
script = f'set fp to "{fp}"\n'
script += 'tell application id "DNtp"\n'
script += ' repeat with db in databases\n'
script += ' if name of db is "04_Industrial safety" then\n'
script += f' set targetGroup to create location "{group_path}" in db\n'
script += ' set theRecord to import fp to targetGroup\n'
script += f' set tags of theRecord to {{"#주제/산업안전/법령", "$유형/법령", "{category}"}}\n'
script += ' add custom meta data "law_monitor" for "sourceChannel" to theRecord\n'
script += ' add custom meta data "external" for "dataOrigin" to theRecord\n'
script += ' add custom meta data (current date) for "lastAIProcess" to theRecord\n'
script += ' exit repeat\n'
script += ' end if\n'
script += ' end repeat\n'
script += 'end tell'
try:
run_applescript_inline(script)
except Exception as e:
logger.error(f"DEVONthink 임포트 실패 [{filepath.name}]: {e}")
# 3단계: 구 폴더 삭제
delete_script = (
'tell application id "DNtp"\n'
' repeat with db in databases\n'
' if name of db is "04_Industrial safety" then\n'
f' set oldGroup to get record at "/10_Legislation/Law/{safe_name}_old" in db\n'
' if oldGroup is not missing value then\n'
' delete record oldGroup\n'
' end if\n'
' exit repeat\n'
' end if\n'
' end repeat\n'
'end tell'
)
try:
run_applescript_inline(delete_script)
except Exception:
pass
logger.info(f"DEVONthink 임포트 완료: {law_name} ({len(md_files)}개 파일)")
def run():
"""메인 실행"""
def _fetch_with_retry(func, *args, retries=3, backoff=(5, 15, 30)):
"""API 호출 재시도 래퍼"""
import time
for i in range(retries):
result = func(*args)
if result is not None:
return result
if i < retries - 1:
logger.warning(f"재시도 {i+2}/{retries} ({backoff[i]}초 후)")
time.sleep(backoff[i])
return None
def run(include_tier2: bool = False):
"""메인 실행 — MST 자동 조회 + 장 단위 MD 분할 + DEVONthink 임포트"""
logger.info("=== 법령 모니터링 시작 ===")
creds = load_credentials()
@@ -143,41 +233,127 @@ def run():
logger.error("LAW_OC 인증키가 설정되지 않았습니다. credentials.env를 확인하세요.")
sys.exit(1)
laws = TIER1_LAWS + (TIER2_LAWS if include_tier2 else [])
last_check = load_last_check()
changes_found = 0
failures = []
parsed_laws = {} # 크로스 링크 2-pass용
for law in MONITORED_LAWS:
for law in laws:
law_name = law["name"]
law_id = law["law_id"]
category = law["category"]
logger.info(f"확인 중: {law_name} ({law_id})")
info = fetch_law_info(law_oc, law_id)
if not info:
# MST 자동 조회 (캐시 TTL 7일)
mst = lookup_current_mst(law_oc, law_name, category, cache_path=MST_CACHE_FILE)
if not mst:
failures.append({"name": law_name, "error": "MST 조회 실패"})
continue
# 시행일자 또는 공포일자로 변경 감지
announce_date = info.get("공포일자", info.get("시행일자", ""))
prev_date = last_check.get(law_id, "")
logger.info(f"확인 중: {law_name} (MST={mst})")
# XML 한 번에 다운로드 (정보 추출 + 파싱 겸용)
xml_text = _fetch_with_retry(fetch_law_text, law_oc, mst)
if not xml_text:
failures.append({"name": law_name, "error": "XML 다운로드 실패"})
continue
# XML에서 기본정보 추출
try:
root = ET.fromstring(xml_text)
info_el = root.find(".//기본정보")
returned_name = (info_el.findtext("법령명_한글", "") or "").strip() if info_el else ""
except Exception:
failures.append({"name": law_name, "error": "XML 파싱 실패"})
continue
# 법령명 검증
if law_name not in returned_name and returned_name not in law_name:
logger.warning(f"법령명 불일치: 요청='{law_name}' 응답='{returned_name}' — 스킵")
failures.append({"name": law_name, "error": f"법령명 불일치: {returned_name}"})
continue
# 공포일자로 변경 감지
announce_date = (info_el.findtext("공포일자", "") or "").strip() if info_el else ""
prev_date = last_check.get(law_name, "")
if announce_date and announce_date != prev_date:
logger.info(f"변경 감지: {law_name} — 공포일자 {announce_date} (이전: {prev_date or '없음'})")
# 법령 본문 다운로드
law_mst = info.get("법령MST", law_id)
text = fetch_law_text(law_oc, law_mst)
if text:
filepath = save_law_file(law_name, text)
import_to_devonthink(filepath, law_name, category)
changes_found += 1
# XML 저장
xml_path = save_law_file(law_name, xml_text)
last_check[law_id] = announce_date
# Pass 1: XML 파싱 + 장 분할 MD 저장 (내부 링크만)
try:
parsed = parse_law_xml(str(xml_path))
md_files = save_law_as_markdown(law_name, parsed, MD_OUTPUT_DIR)
# 크로스 링크용 매핑 수집
parsed_laws[law_name] = {
"parsed": parsed,
"md_files": md_files,
"category": category,
"article_map": build_article_chapter_map(law_name, parsed),
}
changes_found += 1
except Exception as e:
logger.error(f"법령 파싱/임포트 실패 [{law_name}]: {e}", exc_info=True)
failures.append({"name": law_name, "error": str(e)})
continue
last_check[law_name] = announce_date
else:
logger.debug(f"변경 없음: {law_name}")
# 변경 없어도 기존 파싱 데이터로 매핑 수집 (크로스 링크용)
xml_path = LAWS_DIR / f"{law_name.replace(' ', '_').replace('/', '_')}_{datetime.now().strftime('%Y%m%d')}.xml"
if not xml_path.exists():
# 오늘 날짜 파일이 없으면 가장 최근 파일 찾기
candidates = sorted(LAWS_DIR.glob(f"{law_name.replace(' ', '_').replace('/', '_')}_*.xml"))
xml_path = candidates[-1] if candidates else None
if xml_path and xml_path.exists():
try:
parsed = parse_law_xml(str(xml_path))
parsed_laws[law_name] = {
"parsed": parsed,
"md_files": [],
"category": category,
"article_map": build_article_chapter_map(law_name, parsed),
}
except Exception:
pass
# Pass 2: 크로스 링크 일괄 적용 (변경된 법령만)
if parsed_laws:
# 전체 조문-장 매핑 테이블
global_article_map = {name: data["article_map"] for name, data in parsed_laws.items()}
changed_laws = {name: data for name, data in parsed_laws.items() if data["md_files"]}
if changed_laws and len(global_article_map) > 1:
logger.info(f"크로스 링크 적용: {len(changed_laws)}개 법령, 매핑 {len(global_article_map)}")
for law_name, data in changed_laws.items():
for md_file in data["md_files"]:
if md_file.name == "00_기본정보.md" or md_file.name == "부칙.md":
continue
content = md_file.read_text(encoding="utf-8")
updated = add_cross_law_links(content, law_name, global_article_map)
if updated != content:
md_file.write_text(updated, encoding="utf-8")
# DEVONthink 임포트 (크로스 링크 적용 후)
for law_name, data in changed_laws.items():
if data["md_files"]:
import_law_to_devonthink(law_name, data["md_files"], data["category"])
save_last_check(last_check)
# 실행 결과 기록
run_result = {
"timestamp": datetime.now().isoformat(),
"total": len(laws),
"changes": changes_found,
"failures": failures,
}
atomic_write_json(DATA_DIR / "law_last_run.json", run_result)
if failures:
logger.warning(f"실패 {len(failures)}건: {[f['name'] for f in failures]}")
# ─── 외국 법령 (빈도 체크 후 실행) ───
us_count = fetch_us_osha(last_check)
jp_count = fetch_jp_mhlw(last_check)
@@ -315,11 +491,9 @@ def fetch_jp_mhlw(last_check: dict) -> int:
translated = ""
try:
translated = llm_generate(
f"다음 일본어 제목을 한국어로 번역해줘. 번역만 출력하고 다른 말은 하지 마.\n\n{title}"
f"다음 일본어 제목을 한국어로 번역해줘. 번역만 출력하고 다른 말은 하지 마.\n\n{title}",
no_think=True
)
# thinking 출력 제거 — 마지막 줄만 사용
lines = [l.strip() for l in translated.strip().split("\n") if l.strip()]
translated = lines[-1] if lines else title
except Exception:
translated = title
@@ -397,4 +571,5 @@ def fetch_eu_osha(last_check: dict) -> int:
if __name__ == "__main__":
run()
tier2 = "--include-tier2" in sys.argv
run(include_tier2=tier2)

471
scripts/law_parser.py Normal file
View File

@@ -0,0 +1,471 @@
#!/usr/bin/env python3
"""
법령 XML → Markdown 장 단위 분할 파서
- law.go.kr XML 파싱 → 장/절 구조 식별
- 장별 Markdown 파일 생성 (앵커 + 크로스 링크)
- 부칙 별도 파일 저장
"""
import re
import json
import os
import xml.etree.ElementTree as ET
from pathlib import Path
from datetime import datetime, timedelta
import sys
sys.path.insert(0, str(Path(__file__).parent))
from pkm_utils import setup_logger
logger = setup_logger("law_parser")
# 법령 약칭 매핑 (조문 내 참조 → 정식명칭)
LAW_ALIASES = {
"산안법": "산업안전보건법",
"산업안전보건법": "산업안전보건법",
"중대재해법": "중대재해 처벌 등에 관한 법률",
"중대재해처벌법": "중대재해 처벌 등에 관한 법률",
"화관법": "화학물질관리법",
"위안법": "위험물안전관리법",
"고압가스법": "고압가스 안전관리법",
"건설기술진흥법": "건설기술 진흥법",
"산재보험법": "산업재해보상보험법",
}
def atomic_write_json(filepath: Path, data: dict):
"""원자적 JSON 파일 쓰기 (경합 방지)"""
tmp = filepath.with_suffix(".json.tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
os.replace(str(tmp), str(filepath))
# --- XML 파싱 ---
def parse_law_xml(xml_path: str) -> dict:
"""XML 파싱 → 법령 구조 추출"""
tree = ET.parse(xml_path)
root = tree.getroot()
# 기본정보
info_el = root.find(".//기본정보")
info = {
"name": (info_el.findtext("법령명_한글", "") or "").strip(),
"law_id": (info_el.findtext("법령ID", "") or "").strip(),
"announce_date": (info_el.findtext("공포일자", "") or "").strip(),
"enforce_date": (info_el.findtext("시행일자", "") or "").strip(),
"ministry": (info_el.findtext("소관부처", "") or "").strip(),
"category": (info_el.findtext("법종구분", "") or "").strip(),
}
# 조문 추출
articles = []
for el in root.findall(".//조문단위"):
kind = (el.findtext("조문여부", "") or "").strip()
num = (el.findtext("조문번호", "") or "").strip()
title = (el.findtext("조문제목", "") or "").strip()
content = (el.findtext("조문내용", "") or "").strip()
# 항 추출
paragraphs = []
for p_el in el.findall(""):
p_num = (p_el.findtext("항번호", "") or "").strip()
p_content = (p_el.findtext("항내용", "") or "").strip()
# 호 추출
sub_items = []
for h_el in p_el.findall(""):
h_num = (h_el.findtext("호번호", "") or "").strip()
h_content = (h_el.findtext("호내용", "") or "").strip()
sub_items.append({"num": h_num, "content": h_content})
paragraphs.append({"num": p_num, "content": p_content, "sub_items": sub_items})
articles.append({
"kind": kind,
"num": num,
"title": title,
"content": content,
"paragraphs": paragraphs,
})
# 부칙 추출
appendices = []
for el in root.findall(".//부칙단위"):
date = (el.findtext("부칙공포일자", "") or "").strip()
num = (el.findtext("부칙공포번호", "") or "").strip()
content = (el.findtext("부칙내용", "") or "").strip()
appendices.append({"date": date, "num": num, "content": content})
return {"info": info, "articles": articles, "appendices": appendices}
# --- 장 분할 ---
def split_by_chapter(articles: list) -> list[dict]:
"""조문 목록을 장 단위로 그룹핑
Returns: [{"chapter": "제1장 총칙", "sections": [...], "articles": [...]}]
"""
chapters = []
current_chapter = {"chapter": "", "sections": [], "articles": []}
current_section = ""
for article in articles:
content_stripped = article["content"].strip()
if article["kind"] == "전문":
# 장/절/편 구분자
if re.match(r"\d+장", content_stripped):
# 새 장 시작
if current_chapter["chapter"] or current_chapter["articles"]:
chapters.append(current_chapter)
current_chapter = {"chapter": content_stripped, "sections": [], "articles": []}
current_section = ""
elif re.match(r"\d+절", content_stripped):
current_section = content_stripped
current_chapter["sections"].append(current_section)
elif re.match(r"\d+편", content_stripped):
# 편은 장보다 상위 — 별도 처리 없이 장 파일 내 표시
if current_chapter["articles"]:
chapters.append(current_chapter)
current_chapter = {"chapter": content_stripped, "sections": [], "articles": []}
current_section = ""
continue
if article["kind"] == "조문":
article["_section"] = current_section
current_chapter["articles"].append(article)
# 마지막 장
if current_chapter["chapter"] or current_chapter["articles"]:
chapters.append(current_chapter)
# 장이 없는 법령 (fallback)
if not chapters and articles:
chapters = [{"chapter": "", "sections": [], "articles": [
a for a in articles if a["kind"] == "조문"
]}]
return chapters
# --- Markdown 변환 ---
def _format_article_num(article: dict) -> str:
"""조문번호 + 제목 → 앵커용 ID 생성"""
num = article["num"]
title = article["title"]
# "제38조" 또는 "제38조의2" 형태 추출
content = article["content"]
match = re.match(r"(제\d+조(?:의\d+)*)\s*", content)
if match:
return match.group(1)
return f"{num}"
def article_to_markdown(article: dict) -> str:
"""단일 조문 → Markdown"""
article_id = _format_article_num(article)
title = article["title"]
# 제목 정리 (한자 괄호 등)
if title:
header = f"## {article_id} ({title})" + " {#" + article_id + "}"
else:
header = f"## {article_id}" + " {#" + article_id + "}"
lines = [header]
# 본문 내용
content = article["content"].strip()
# 조문번호 접두사 제거 (예: "제38조 (안전조치)" → 본문만)
content = re.sub(r"^제\d+조(?:의\d+)*\s*(?:\([^)]*\))?\s*", "", content)
if content:
lines.append(content)
# 항
for p in article.get("paragraphs", []):
p_content = p["content"].strip()
if p_content:
lines.append(f"\n{p_content}")
for si in p.get("sub_items", []):
si_content = si["content"].strip()
if si_content:
lines.append(f" {si_content}")
return "\n".join(lines)
def chapter_to_markdown(law_name: str, info: dict, chapter: dict) -> str:
"""장 → Markdown 파일 내용"""
chapter_name = chapter["chapter"] or law_name
enforce = info.get("enforce_date", "")
if len(enforce) == 8:
enforce = f"{enforce[:4]}-{enforce[4:6]}-{enforce[6:]}"
ministry = info.get("ministry", "")
lines = [
f"# {chapter_name}",
f"> {law_name} | 시행 {enforce} | {ministry}",
"",
]
# 절 표시
current_section = ""
for article in chapter["articles"]:
section = article.get("_section", "")
if section and section != current_section:
current_section = section
lines.append(f"\n### {section}\n")
lines.append(article_to_markdown(article))
lines.append("")
return "\n".join(lines)
def info_to_markdown(info: dict) -> str:
"""기본정보 → Markdown"""
enforce = info.get("enforce_date", "")
if len(enforce) == 8:
enforce = f"{enforce[:4]}-{enforce[4:6]}-{enforce[6:]}"
announce = info.get("announce_date", "")
if len(announce) == 8:
announce = f"{announce[:4]}-{announce[4:6]}-{announce[6:]}"
return f"""# {info['name']} — 기본정보
| 항목 | 내용 |
|------|------|
| **법령명** | {info['name']} |
| **법령구분** | {info.get('category', '')} |
| **소관부처** | {info.get('ministry', '')} |
| **공포일자** | {announce} |
| **시행일자** | {enforce} |
| **법령ID** | {info.get('law_id', '')} |
> 이 문서는 law.go.kr API에서 자동 생성되었습니다.
> 마지막 업데이트: {datetime.now().strftime('%Y-%m-%d')}
"""
def appendices_to_markdown(law_name: str, appendices: list) -> str:
"""부칙 → Markdown"""
lines = [f"# {law_name} — 부칙", ""]
for ap in appendices:
date = ap["date"]
if len(date) == 8:
date = f"{date[:4]}-{date[4:6]}-{date[6:]}"
lines.append(f"## 부칙 (공포 {date}, 제{ap['num']}호)")
lines.append(ap["content"])
lines.append("")
return "\n".join(lines)
# --- 크로스 링크 ---
def add_internal_links(text: str, article_ids: set[str]) -> str:
"""같은 법률 내 조문 참조 → Markdown 앵커 링크
{#...} 앵커 내부와 이미 링크된 부분은 스킵
"""
def replace_ref(m):
full = m.group(0)
article_ref = m.group(1) # "제38조" or "제38조의2"
if article_ref in article_ids:
return f"[{full}](#{article_ref})"
return full
# {#...} 앵커와 [...](...) 링크 내부는 보호
protected = re.sub(r'\{#[^}]+\}|\[[^\]]*\]\([^)]*\)', lambda m: '\x00' * len(m.group()), text)
# "제N조(의N)*" 패턴 매칭 (항/호 부분은 링크에 포함하지 않음)
pattern = r"(제\d+조(?:의\d+)*)(?:제\d+항)?(?:제\d+호)?"
result = []
last = 0
for m in re.finditer(pattern, protected):
result.append(text[last:m.start()])
if '\x00' in protected[m.start():m.end()]:
result.append(text[m.start():m.end()]) # 보호 영역 — 원문 유지
else:
orig = text[m.start():m.end()]
article_ref = re.match(r"(제\d+조(?:의\d+)*)", orig)
if article_ref and article_ref.group(1) in article_ids:
result.append(f"[{orig}](#{article_ref.group(1)})")
else:
result.append(orig)
last = m.end()
result.append(text[last:])
return "".join(result)
def add_cross_law_links(text: str, law_name: str, article_chapter_map: dict) -> str:
"""다른 법률 참조 → DEVONthink wiki-link
article_chapter_map: {법령명: {제X조: 파일명}}
"""
# 「법령명」 제X조 패턴
def replace_cross_ref(m):
raw_name = m.group(1).strip()
article_ref = m.group(2)
# 약칭 → 정식명칭
resolved = LAW_ALIASES.get(raw_name, raw_name)
if resolved == law_name:
return m.group(0) # 같은 법률이면 스킵 (내부 링크로 처리)
# 장 매핑 조회
law_map = article_chapter_map.get(resolved, {})
chapter_file = law_map.get(article_ref)
if chapter_file:
return f"[[{chapter_file}#{article_ref}|{m.group(0)}]]"
return m.group(0)
pattern = r"「([^」]+)」\s*(제\d+조(?:의\d+)*)"
return re.sub(pattern, replace_cross_ref, text)
# --- 파일 저장 ---
def save_law_as_markdown(law_name: str, parsed: dict, output_dir: Path) -> list[Path]:
"""파싱된 법령 → 장별 MD 파일 저장. 생성된 파일 경로 리스트 반환."""
law_dir = output_dir / law_name.replace(" ", "_")
law_dir.mkdir(parents=True, exist_ok=True)
info = parsed["info"]
chapters = split_by_chapter(parsed["articles"])
files = []
# 기본정보
info_path = law_dir / "00_기본정보.md"
info_path.write_text(info_to_markdown(info), encoding="utf-8")
files.append(info_path)
# 같은 법률 내 조문 ID 수집 (내부 링크용)
all_article_ids = set()
for ch in chapters:
for a in ch["articles"]:
all_article_ids.add(_format_article_num(a))
# 장별 파일
for i, chapter in enumerate(chapters, 1):
ch_name = chapter["chapter"] or law_name
# 파일명 안전화
safe_name = re.sub(r"[·ㆍ\s]+", "_", ch_name)
safe_name = re.sub(r"[^\w가-힣]", "", safe_name)
filename = f"{safe_name}.md"
md_content = chapter_to_markdown(law_name, info, chapter)
# 내부 링크 적용
md_content = add_internal_links(md_content, all_article_ids)
filepath = law_dir / filename
filepath.write_text(md_content, encoding="utf-8")
files.append(filepath)
# 부칙
if parsed["appendices"]:
ap_path = law_dir / "부칙.md"
ap_path.write_text(appendices_to_markdown(law_name, parsed["appendices"]), encoding="utf-8")
files.append(ap_path)
logger.info(f"{law_name}: {len(files)}개 파일 생성 → {law_dir}")
return files
def build_article_chapter_map(law_name: str, parsed: dict) -> dict:
"""조문→장 파일명 매핑 생성 (크로스 링크용)
Returns: {제X조: 파일명(확장자 없음)}
"""
chapters = split_by_chapter(parsed["articles"])
mapping = {}
for chapter in chapters:
ch_name = chapter["chapter"] or law_name
safe_name = re.sub(r"[·ㆍ\s]+", "_", ch_name)
safe_name = re.sub(r"[^\w가-힣]", "", safe_name)
file_stem = f"{law_name.replace(' ', '_')}_{safe_name}" if chapter["chapter"] else law_name.replace(" ", "_")
for article in chapter["articles"]:
article_id = _format_article_num(article)
mapping[article_id] = file_stem
return mapping
# --- MST 캐시 ---
def load_mst_cache(cache_path: Path) -> dict:
if cache_path.exists():
with open(cache_path, "r", encoding="utf-8") as f:
return json.load(f)
return {}
def save_mst_cache(cache_path: Path, data: dict):
atomic_write_json(cache_path, data)
def lookup_current_mst(law_oc: str, law_name: str, category: str = "법률",
cache_path: Path = None, cache_ttl_days: int = 7) -> str | None:
"""법령명으로 현행 MST 검색 (캐시 TTL 적용)
- category → API 법령구분코드 매핑으로 검색 정확도 향상
"""
import requests
# 캐시 확인
if cache_path:
cache = load_mst_cache(cache_path)
entry = cache.get(law_name)
if entry:
cached_at = datetime.fromisoformat(entry["cached_at"])
if datetime.now() - cached_at < timedelta(days=cache_ttl_days):
return entry["mst"]
try:
resp = requests.get("https://www.law.go.kr/DRF/lawSearch.do", params={
"OC": law_oc, "target": "law", "type": "JSON",
"query": law_name, "display": "5",
}, timeout=15)
resp.raise_for_status()
data = resp.json().get("LawSearch", {})
laws = data.get("law", [])
if isinstance(laws, dict):
laws = [laws]
# 현행 필터 + 법령명 정확 매칭
current = [l for l in laws
if l.get("현행연혁코드") == "현행"
and law_name in l.get("법령명한글", "")]
if not current:
logger.warning(f"MST 검색 실패: {law_name} — 현행 법령 없음")
return None
mst = current[0]["법령일련번호"]
# 캐시 저장
if cache_path:
cache = load_mst_cache(cache_path)
cache[law_name] = {"mst": mst, "cached_at": datetime.now().isoformat()}
save_mst_cache(cache_path, cache)
return mst
except Exception as e:
logger.error(f"MST 조회 에러 [{law_name}]: {e}")
return None
if __name__ == "__main__":
# 단독 실행: XML 파일을 MD로 변환
if len(sys.argv) < 2:
print("사용법: python3 law_parser.py <xml_path> [output_dir]")
sys.exit(1)
xml_path = sys.argv[1]
output_dir = Path(sys.argv[2]) if len(sys.argv) > 2 else Path("data/laws/md")
parsed = parse_law_xml(xml_path)
print(f"법령: {parsed['info']['name']}")
print(f"조문: {len(parsed['articles'])}개, 부칙: {len(parsed['appendices'])}")
files = save_law_as_markdown(parsed["info"]["name"], parsed, output_dir)
print(f"생성된 파일: {len(files)}")
for f in files:
print(f" {f}")

View File

@@ -35,24 +35,14 @@ def run_applescript(script: str, timeout: int = 120) -> str:
@app.route('/devonthink/stats')
def devonthink_stats():
try:
# DB별 문서 수만 빠르게 조회 (children 순회 대신 count 사용)
script = (
'tell application id "DNtp"\n'
' set today to current date\n'
' set time of today to 0\n'
' set stats to {}\n'
' repeat with db in databases\n'
' set dbName to name of db\n'
' set addedCount to 0\n'
' set modifiedCount to 0\n'
' repeat with rec in children of root of db\n'
' try\n'
' if creation date of rec >= today then set addedCount to addedCount + 1\n'
' if modification date of rec >= today then set modifiedCount to modifiedCount + 1\n'
' end try\n'
' end repeat\n'
' if addedCount > 0 or modifiedCount > 0 then\n'
' set end of stats to dbName & ":" & addedCount & ":" & modifiedCount\n'
' end if\n'
' set docCount to count of contents of db\n'
' set end of stats to dbName & ":" & docCount\n'
' end repeat\n'
' set AppleScript\'s text item delimiters to "|"\n'
' return stats as text\n'
@@ -60,17 +50,18 @@ def devonthink_stats():
)
result = run_applescript(script)
stats = {}
total = 0
if result:
for item in result.split('|'):
parts = item.split(':')
if len(parts) == 3:
stats[parts[0]] = {'added': int(parts[1]), 'modified': int(parts[2])}
total_added = sum(s['added'] for s in stats.values())
total_modified = sum(s['modified'] for s in stats.values())
if len(parts) == 2:
count = int(parts[1])
stats[parts[0]] = {'count': count}
total += count
return jsonify(success=True, data={
'databases': stats,
'total_added': total_added,
'total_modified': total_modified
'total_documents': total,
'database_count': len(stats),
})
except Exception as e:
return jsonify(success=False, error=str(e)), 500
@@ -83,9 +74,11 @@ def devonthink_search():
if not q:
return jsonify(success=False, error='q parameter required'), 400
try:
# 한글 쿼리 이스케이프 (따옴표, 백슬래시)
safe_q = q.replace('\\', '\\\\').replace('"', '\\"')
script = (
'tell application id "DNtp"\n'
f' set results to search "{q}"\n'
f' set results to search "{safe_q}"\n'
' set output to {}\n'
f' set maxCount to {limit}\n'
' set i to 0\n'
@@ -259,16 +252,19 @@ def _search_qdrant(vector: list[float], limit: int = 20) -> list[dict]:
def _llm_generate(prompt: str) -> str:
"""Mac Mini MLX로 답변 생성"""
"""Mac Mini MLX로 답변 생성 (thinking 필터링 포함)"""
import requests as req
from pkm_utils import strip_thinking
resp = req.post("http://localhost:8800/v1/chat/completions", json={
"model": "mlx-community/Qwen3.5-35B-A3B-4bit",
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 2048,
"enable_thinking": False,
}, timeout=120)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
content = resp.json()["choices"][0]["message"]["content"]
return strip_thinking(content)
@app.route('/rag/query', methods=['POST'])

View File

@@ -105,19 +105,40 @@ def run_applescript_inline(script: str) -> str:
raise RuntimeError("AppleScript 타임아웃 (인라인)")
def strip_thinking(text: str) -> str:
"""LLM thinking 출력 제거 — <think>...</think> 태그 및 thinking 패턴 필터링"""
import re
# <think>...</think> 태그 제거
text = re.sub(r'<think>[\s\S]*?</think>\s*', '', text)
# "Wait,", "Let me", "I'll check" 등으로 시작하는 thinking 줄 제거
lines = text.strip().split('\n')
filtered = [l for l in lines if not re.match(
r'^\s*(Wait|Let me|I\'ll|Hmm|OK,|Okay|Let\'s|Actually|So,|First)', l, re.IGNORECASE
)]
return '\n'.join(filtered).strip() if filtered else text.strip()
def llm_generate(prompt: str, model: str = "mlx-community/Qwen3.5-35B-A3B-4bit",
host: str = "http://localhost:8800", json_mode: bool = False) -> str:
"""MLX 서버 API 호출 (OpenAI 호환)"""
host: str = "http://localhost:8800", json_mode: bool = False,
no_think: bool = False) -> str:
"""MLX 서버 API 호출 (OpenAI 호환)
no_think=True: thinking 비활성화 + 응답 필터링 (번역 등 단순 작업용)
"""
import requests
messages = [{"role": "user", "content": prompt}]
resp = requests.post(f"{host}/v1/chat/completions", json={
payload = {
"model": model,
"messages": messages,
"temperature": 0.3,
"max_tokens": 4096,
}, timeout=300)
}
if no_think:
payload["enable_thinking"] = False
resp = requests.post(f"{host}/v1/chat/completions", json=payload, timeout=300)
resp.raise_for_status()
content = resp.json()["choices"][0]["message"]["content"]
if no_think:
content = strip_thinking(content)
if not json_mode:
return content
# JSON 모드: thinking 허용 → 마지막 유효 JSON 객체 추출