Compare commits

..

2 Commits

Author SHA1 Message Date
hyungi 61bb6f401b refactor(workers): 죽은 코드 law_monitor.py 삭제 (367줄)
statute_collector(safety-library-1 B-1)가 대체 — 모듈 import 0·스케줄러 미등록·동적로딩 없음 = 절대 로딩 안 되는 dead file. 'law_monitor' 잔존 출현 34건은 전부 source_channel enum 값/config 키/주석(모듈 참조 아님). 복구는 git history.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 06:29:02 +09:00
hyungi 2d86683636 refactor(ai): AIClient PR-B — gate 누락 경로 봉인 + 공유 httpx + public classifier/verifier
코드리뷰 AIClient 정비 PR-B (#2 gate·#3 httpx·#4 public).

#2 gate 구조 (call-site 컨벤션 — gate 는 caller-managed, AIClient self-gate 금지):
  · classify_worker consumer call_triage: gate 없이 Mac mini 직타하던 것 → acquire_mlx_gate(BACKGROUND).
    (drain 경로 call_deep_or_defer 는 맥북 deep 슬롯이라 mini gate 무관, 미적용.)
  · verifier_service: gate 없이 _request(verifier) 하던 것 → acquire_mlx_gate(FOREGROUND) + call_verifier.
    classifier/evidence 와 동일 gate 공유로 thundering-herd(22-timeout 사고) 방어.
  ★재진입 안전 검증: AIClient 메서드 내부 self-gate 0(전부 call-site) + evidence/classifier 는 이미
   독립 gate 보유 + api/search 오케스트레이터 gate 미보유 → double-acquire 데드락 불가.

#4 public 메서드: call_classifier/call_verifier 추가 → classifier/verifier_service 의 private _request
  직접호출 봉인(egress 가드 일관 적용). gate 는 caller-managed 유지(call_primary 와 동일 계약).

#3 공유 httpx: 호출마다 AsyncClient 생성(30+ 사이트)을 _get_shared_http() 단일 풀로 — keep-alive
  재사용. 이벤트루프 바인딩이라 루프 변경(테스트) 시 재생성, close() 는 no-op.

py_compile PASS. (잔여 #4: query_analyzer/digest/backends 의 _request·_call_chat 직접호출은 gated 라
안전, 후속 sweep.)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 20:07:30 +09:00
5 changed files with 51 additions and 374 deletions
+40 -2
View File
@@ -1,5 +1,6 @@
"""AI 추상화 레이어 — 통합 클라이언트. 기본값은 항상 Qwen3.5."""
import asyncio
import json
import re
from pathlib import Path
@@ -188,6 +189,25 @@ def _load_prompt(name: str) -> str:
CLASSIFY_PROMPT = _load_prompt("classify.txt") if (PROMPTS_DIR / "classify.txt").exists() else ""
# 공유 httpx 클라이언트 — 호출마다 AsyncClient 를 새로 만들던 것(30+ 사이트, 연결풀 재사용 0)을
# 일원화해 keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(pytest 격리 등) 시 재생성한다.
# close() 는 공유 풀이라 no-op — 프로세스 종료 시 GC.
_shared_http: httpx.AsyncClient | None = None
_shared_http_loop: object | None = None
def _get_shared_http() -> httpx.AsyncClient:
global _shared_http, _shared_http_loop
try:
loop: object | None = asyncio.get_running_loop()
except RuntimeError:
loop = None
if _shared_http is None or _shared_http.is_closed or _shared_http_loop is not loop:
_shared_http = httpx.AsyncClient(timeout=120)
_shared_http_loop = loop
return _shared_http
class AIClient:
"""AI 모델 통합 클라이언트.
@@ -202,7 +222,7 @@ class AIClient:
def __init__(self):
self.ai = settings.ai
self._http = httpx.AsyncClient(timeout=120)
self._http = _get_shared_http()
# ─── 3-tier routing (B-0) ───────────────────────────────────────────────
@@ -240,6 +260,23 @@ class AIClient:
cfg = self.ai.deep or self.ai.primary
return await self._request(cfg, prompt, system=system)
async def call_classifier(self, prompt: str) -> str:
"""answerability classifier (config ai.classifier, Mac mini 26B MLX).
private _request 직접 호출(classifier_service)을 봉인하는 public 진입점. gate 는
caller(classifier_service)가 acquire_mlx_gate 로 관리 — call_primary 와 동일한
caller-managed 계약(여기서 self-gate 하면 caller 와 double-acquire 데드락).
"""
return await self._request(self.ai.classifier, prompt)
async def call_verifier(self, prompt: str) -> str:
"""semantic verifier (config ai.verifier, Mac mini 26B MLX).
private _request 직접 호출(verifier_service)을 봉인. gate 는 caller(verifier_service)
가 관리(caller-managed — self-gate 금지).
"""
return await self._request(self.ai.verifier, prompt)
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
async def classify(self, text: str, cfg=None) -> dict:
@@ -360,4 +397,5 @@ class AIClient:
return data["choices"][0]["message"]["content"]
async def close(self):
await self._http.aclose()
# 공유 풀(_get_shared_http) 이라 per-use close 안 함 — 연결 재사용. 프로세스 종료 시 GC.
return None
+1 -1
View File
@@ -102,7 +102,7 @@ async def classify(
# "MLX primary 호출 경로는 예외 없이 gate 획득 필수".
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client._request(settings.ai.classifier, prompt)
raw = await client.call_classifier(prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
+5 -3
View File
@@ -11,7 +11,7 @@
## 핵심 원칙
- **Verifier strong 단독 refuse 금지** — grounding strong 과 교차해야 refuse
- **Timeout 3s** — 느리면 없는 게 낫다 (fail open)
- MLX gate 사용 (PR #20 이후 Mac mini 26B endpoint — concurrent 안전성 별 검토)
- MLX gate 사용 (Mac mini 26B endpoint — classifier/evidence 와 동일 gate 공유, 동시 race 방지)
"""
from __future__ import annotations
@@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import Priority, acquire_mlx_gate
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
@@ -132,8 +133,9 @@ async def verify(
prompt = _build_input(answer, evidence)
client = AIClient()
try:
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client._request(settings.ai.verifier, prompt)
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client.call_verifier(prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
+5 -1
View File
@@ -40,6 +40,7 @@ from ai.client import (
)
from ai.envelope import EscalationEnvelope
from core.config import settings
from services.search.llm_gate import Priority, acquire_mlx_gate
from core.utils import setup_logger
from models.document import Document
from models.queue import StageDeferred, enqueue_stage
@@ -673,7 +674,10 @@ async def _run_tier_triage(
# 는 아래 generic except 에 먹히지 않게 먼저 전파.
raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg)
else:
raw_triage = await client.call_triage(prompt)
# consumer 경로 call_triage 는 PR #20 이후 primary 와 동일 Mac mini endpoint —
# evidence/classifier 처럼 gate 안에서 호출(영구 룰: 같은 endpoint 예외 없이 gate).
async with acquire_mlx_gate(Priority.BACKGROUND):
raw_triage = await client.call_triage(prompt)
except StageDeferred:
raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전)
except Exception as exc:
-367
View File
@@ -1,367 +0,0 @@
"""법령 모니터 워커 — 국가법령정보센터 API 연동
26 법령 모니터링, / 단위 분할 저장, 변경 이력 추적.
매일 07:00 실행 (APScheduler).
"""
import os
import re
from datetime import date, datetime, timezone
from pathlib import Path
from xml.etree import ElementTree as ET
import httpx
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import create_caldav_todo, file_hash, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import enqueue_stage
logger = setup_logger("law_monitor")
LAW_SEARCH_URL = "https://www.law.go.kr/DRF/lawSearch.do"
LAW_SERVICE_URL = "https://www.law.go.kr/DRF/lawService.do"
# 모니터링 대상 법령 (26개)
MONITORED_LAWS = [
# 산업안전보건 핵심
"산업안전보건법",
"산업안전보건법 시행령",
"산업안전보건법 시행규칙",
"산업안전보건기준에 관한 규칙",
"유해위험작업의 취업 제한에 관한 규칙",
"중대재해 처벌 등에 관한 법률",
"중대재해 처벌 등에 관한 법률 시행령",
# 건설안전
"건설기술 진흥법",
"건설기술 진흥법 시행령",
"건설기술 진흥법 시행규칙",
"시설물의 안전 및 유지관리에 관한 특별법",
# 위험물/화학
"위험물안전관리법",
"위험물안전관리법 시행령",
"위험물안전관리법 시행규칙",
"화학물질관리법",
"화학물질관리법 시행령",
"화학물질의 등록 및 평가 등에 관한 법률",
# 소방/전기/가스
"소방시설 설치 및 관리에 관한 법률",
"소방시설 설치 및 관리에 관한 법률 시행령",
"전기사업법",
"전기안전관리법",
"고압가스 안전관리법",
"고압가스 안전관리법 시행령",
"액화석유가스의 안전관리 및 사업법",
# 근로/환경
"근로기준법",
"환경영향평가법",
]
async def run():
"""법령 변경 모니터링 실행"""
law_oc = os.getenv("LAW_OC", "")
if not law_oc:
logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중")
return
async with async_session() as session:
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "law_monitor")
)
state_row = state.scalar_one_or_none()
last_check = state_row.last_check_value if state_row else None
today = datetime.now(timezone.utc).strftime("%Y%m%d")
if last_check == today:
logger.info("오늘 이미 체크 완료")
return
new_count = 0
async with httpx.AsyncClient(timeout=30) as client:
for law_name in MONITORED_LAWS:
try:
count = await _check_law(client, law_oc, law_name, session)
new_count += count
except Exception as e:
logger.error(f"[{law_name}] 체크 실패: {e}")
# 상태 업데이트
if state_row:
state_row.last_check_value = today
state_row.last_run_at = datetime.now(timezone.utc)
else:
session.add(AutomationState(
job_name="law_monitor",
last_check_value=today,
last_run_at=datetime.now(timezone.utc),
))
await session.commit()
logger.info(f"법령 모니터 완료: {new_count}건 신규/변경 감지")
async def _check_law(
client: httpx.AsyncClient,
law_oc: str,
law_name: str,
session,
) -> int:
"""단일 법령 검색 → 변경 감지 → 분할 저장"""
# 법령 검색 (lawSearch.do)
resp = await client.get(
LAW_SEARCH_URL,
params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name},
)
resp.raise_for_status()
root = ET.fromstring(resp.text)
total = root.findtext(".//totalCnt", "0")
if total == "0":
logger.debug(f"[{law_name}] 검색 결과 없음")
return 0
# 정확히 일치하는 법령 찾기
for law_elem in root.findall(".//law"):
found_name = law_elem.findtext("법령명한글", "").strip()
if found_name != law_name:
continue
mst = law_elem.findtext("법령일련번호", "")
proclamation_date = law_elem.findtext("공포일자", "")
revision_type = law_elem.findtext("제개정구분명", "")
if not mst:
continue
# 이미 등록된 법령인지 확인 (같은 법령명 + 공포일자)
existing = await session.execute(
select(Document).where(
Document.title.like(f"{law_name}%"),
Document.source_channel == "law_monitor",
)
)
existing_docs = existing.scalars().all()
# 같은 공포일자 이미 있으면 skip
for doc in existing_docs:
if proclamation_date in (doc.title or ""):
return 0
# 이전 공포일 찾기 (변경 이력용)
prev_date = ""
if existing_docs:
prev_date = max(
(re.search(r'\d{8}', doc.title or "").group() for doc in existing_docs
if re.search(r'\d{8}', doc.title or "")),
default=""
)
# 본문 조회 (lawService.do)
text_resp = await client.get(
LAW_SERVICE_URL,
params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
)
text_resp.raise_for_status()
# 분할 저장
count = await _save_law_split(
session, text_resp.text, law_name, proclamation_date,
revision_type, prev_date,
)
# DB 먼저 커밋 (알림 실패가 저장을 막지 않도록)
await session.commit()
# CalDAV + SMTP 알림 (실패해도 무시)
try:
_send_notifications(law_name, proclamation_date, revision_type)
except Exception as e:
logger.warning(f"[{law_name}] 알림 발송 실패 (무시): {e}")
return count
return 0
async def _save_law_split(
session, xml_text: str, law_name: str, proclamation_date: str,
revision_type: str, prev_date: str,
) -> int:
"""법령 XML → 장(章) 단위 Markdown 분할 저장"""
root = ET.fromstring(xml_text)
# 조문단위에서 장 구분자 찾기 (조문키가 000으로 끝나는 조문)
units = root.findall(".//조문단위")
chapters = [] # [(장제목, [조문들])]
current_chapter = None
current_articles = []
for unit in units:
key = unit.attrib.get("조문키", "")
content = (unit.findtext("조문내용", "") or "").strip()
# 장 구분자: 키가 000으로 끝나고 내용에 "제X장" 포함
if key.endswith("000") and re.search(r"\d+장", content):
# 이전 장/서문 저장
if current_articles:
chapter_name = current_chapter or "서문"
chapters.append((chapter_name, current_articles))
chapter_match = re.search(r"(제\d+장\s*.+)", content)
current_chapter = chapter_match.group(1).strip() if chapter_match else content.strip()
current_articles = []
else:
current_articles.append(unit)
# 마지막 장 저장
if current_articles:
chapter_name = current_chapter or "서문"
chapters.append((chapter_name, current_articles))
# 장 분할 성공
sections = []
if chapters:
for chapter_title, articles in chapters:
md_lines = [f"# {law_name}\n", f"## {chapter_title}\n"]
for article in articles:
title = article.findtext("조문제목", "")
content = article.findtext("조문내용", "")
if title:
md_lines.append(f"\n### {title}\n")
if content:
md_lines.append(content.strip())
section_name = _safe_name(chapter_title)
sections.append((section_name, "\n".join(md_lines)))
else:
# 장 분할 실패 → 전체 1파일
full_md = _law_xml_to_markdown(xml_text, law_name)
sections.append(("전문", full_md))
# 각 섹션 저장
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
count = 0
for section_name, content in sections:
filename = f"{law_name}_{proclamation_date}_{section_name}.md"
file_path = inbox_dir / filename
file_path.write_text(content, encoding="utf-8")
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
# 변경 이력 메모
note = ""
if prev_date:
note = (
f"[자동] 법령 개정 감지\n"
f"이전 공포일: {prev_date}\n"
f"현재 공포일: {proclamation_date}\n"
f"개정구분: {revision_type}"
)
# 안전 자료실 A-2 — 공포일 파싱 (law published_date = COALESCE(시행일, 공포일) 계약,
# 본 레거시 워커는 공포일만 보유 — 시행일 기반 버전 체인은 B-1 statute_collector 소관)
_digits = re.sub(r"\D", "", str(proclamation_date or ""))
pub_date = None
if len(_digits) == 8:
try:
pub_date = date(int(_digits[:4]), int(_digits[4:6]), int(_digits[6:8]))
except ValueError:
pub_date = None
doc = Document(
file_path=rel_path,
file_hash=file_hash(file_path),
file_format="md",
file_size=len(content.encode()),
file_type="immutable",
title=f"{law_name} ({proclamation_date}) {section_name}",
source_channel="law_monitor",
data_origin="work",
category="law",
# 안전 자료실 A-2 — ingest 시점 deterministic. 법령 텍스트 = 저작권법 제7조
# 비보호 저작물 (public domain). 본 워커는 휴면(LAW_OC 미설정)이나 코드 경로 유지.
material_type="law",
jurisdiction="KR",
published_date=pub_date,
extract_meta={"license": {"scheme": "public_domain", "redistribute": True,
"attribution": "국가법령정보센터"}},
user_note=note or None,
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
count += 1
logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장")
return count
def _xml_section_to_markdown(elem) -> str:
"""XML 섹션(편/장)을 Markdown으로 변환"""
lines = []
for article in elem.iter():
tag = article.tag
text = (article.text or "").strip()
if not text:
continue
if "" in tag:
lines.append(f"\n### {text}\n")
elif "" in tag:
lines.append(f"\n{text}\n")
elif "" in tag:
lines.append(f"- {text}")
elif "" in tag:
lines.append(f" - {text}")
else:
lines.append(text)
return "\n".join(lines)
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str:
"""법령 XML 전체를 Markdown으로 변환"""
root = ET.fromstring(xml_text)
lines = [f"# {law_name}\n"]
for elem in root.iter():
tag = elem.tag
text = (elem.text or "").strip()
if not text:
continue
if "" in tag and "제목" not in tag:
lines.append(f"\n## {text}\n")
elif "" in tag and "제목" not in tag:
lines.append(f"\n## {text}\n")
elif "" in tag:
lines.append(f"\n### {text}\n")
elif "" in tag:
lines.append(f"\n{text}\n")
elif "" in tag:
lines.append(f"- {text}")
elif "" in tag:
lines.append(f" - {text}")
return "\n".join(lines)
def _safe_name(name: str) -> str:
"""파일명 안전 변환"""
return re.sub(r'[^\w가-힣-]', '_', name).strip("_")
def _send_notifications(law_name: str, proclamation_date: str, revision_type: str):
"""CalDAV 할일 알림 (SMTP 발송은 2026-06-10 폐기 — CalDAV 가 단일 알림 채널)"""
caldav_url = os.getenv("CALDAV_URL", "")
caldav_user = os.getenv("CALDAV_USER", "")
caldav_pass = os.getenv("CALDAV_PASS", "")
if caldav_url and caldav_user:
create_caldav_todo(
caldav_url, caldav_user, caldav_pass,
title=f"법령 검토: {law_name}",
description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}",
due_days=7,
)