Compare commits

..

1 Commits

Author SHA1 Message Date
hyungi 23bb5ac9c9 feat(presegment): G2 PR-3 — LLM 경계 폴백 (flag-gated, 기본 OFF, scaffold-first)
ToC 없는/게이트 미달 대형 PDF(>=60p)에 한해 off-card Qwen(맥북, call_deep_or_defer,
StageDeferred-safe) 경계 제안 → 동일 검증게이트(_is_clear_bundle) 통과 시에만 deterministic 과
공유하는 _create_children 로 분할. is_bundle=false/파싱·검증 실패=단일문서(오늘과 동일)+로깅.
- env PRESEGMENT_LLM_FALLBACK 기본 false → 배포 동작 무변(LLM 미호출, 검증=unit test)
- 자식생성 _create_children 공유 헬퍼로 리팩터(deterministic+LLM 단일 경로, 동작 동일)
- SegmentationOutput Pydantic + parse_json_response(house 패턴) + per-page heading 샘플(본문 미전송)
- prompt app/prompts/presegment_boundaries.txt + tests/test_presegment_llm.py(14, fitz/DB/LLM mock)
no direct HTTP·no silent fallback. 활성=flag ON + 실 router fixture 검증 후.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 17:52:27 +09:00
62 changed files with 705 additions and 1997 deletions
-3
View File
@@ -47,6 +47,3 @@ caddy_data/
*.bak_*
*.pre-*
.pre-*/
# SQLite 로컬 아티팩트 (Django/툴링 잔재)
*.sqlite3
-7
View File
@@ -12,13 +12,6 @@ http://document.hyungi.net {
# 명시 Content-Type match — 기본 match 의 text/* 는 text/event-stream 까지 포함해
# SSE(/api/eid/chat)의 첫 ~512B 를 gzip 버퍼링함. SSE 제외, 기존 압축 대상은 보존.
# (응답 매처는 header <필드> <값> 한 쌍씩 — 여러 줄 = OR. 한 줄 다중 값은 파싱 에러)
# 2026-06-20 보안 헤더 (M: 클릭재킹·MIME 스니핑 방어). HSTS 는 TLS 종단 edge(home-caddy) 소관.
header {
X-Content-Type-Options nosniff
X-Frame-Options SAMEORIGIN
Referrer-Policy strict-origin-when-cross-origin
-Server
}
encode {
gzip
match {
+2 -44
View File
@@ -1,6 +1,5 @@
"""AI 추상화 레이어 — 통합 클라이언트. 기본값은 항상 Qwen3.5."""
import asyncio
import json
import re
from pathlib import Path
@@ -189,25 +188,6 @@ def _load_prompt(name: str) -> str:
CLASSIFY_PROMPT = _load_prompt("classify.txt") if (PROMPTS_DIR / "classify.txt").exists() else ""
# 공유 httpx 클라이언트 — 호출마다 AsyncClient 를 새로 만들던 것(30+ 사이트, 연결풀 재사용 0)을
# 일원화해 keep-alive 재사용. 이벤트루프 바인딩이라 루프 변경(pytest 격리 등) 시 재생성한다.
# close() 는 공유 풀이라 no-op — 프로세스 종료 시 GC.
_shared_http: httpx.AsyncClient | None = None
_shared_http_loop: object | None = None
def _get_shared_http() -> httpx.AsyncClient:
global _shared_http, _shared_http_loop
try:
loop: object | None = asyncio.get_running_loop()
except RuntimeError:
loop = None
if _shared_http is None or _shared_http.is_closed or _shared_http_loop is not loop:
_shared_http = httpx.AsyncClient(timeout=120)
_shared_http_loop = loop
return _shared_http
class AIClient:
"""AI 모델 통합 클라이언트.
@@ -222,7 +202,7 @@ class AIClient:
def __init__(self):
self.ai = settings.ai
self._http = _get_shared_http()
self._http = httpx.AsyncClient(timeout=120)
# ─── 3-tier routing (B-0) ───────────────────────────────────────────────
@@ -260,23 +240,6 @@ class AIClient:
cfg = self.ai.deep or self.ai.primary
return await self._request(cfg, prompt, system=system)
async def call_classifier(self, prompt: str) -> str:
"""answerability classifier (config ai.classifier, Mac mini 26B MLX).
private _request 직접 호출(classifier_service)을 봉인하는 public 진입점. gate 는
caller(classifier_service)가 acquire_mlx_gate 로 관리 — call_primary 와 동일한
caller-managed 계약(여기서 self-gate 하면 caller 와 double-acquire 데드락).
"""
return await self._request(self.ai.classifier, prompt)
async def call_verifier(self, prompt: str) -> str:
"""semantic verifier (config ai.verifier, Mac mini 26B MLX).
private _request 직접 호출(verifier_service)을 봉인. gate 는 caller(verifier_service)
가 관리(caller-managed — self-gate 금지).
"""
return await self._request(self.ai.verifier, prompt)
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
async def classify(self, text: str, cfg=None) -> dict:
@@ -383,10 +346,6 @@ class AIClient:
payload["temperature"] = model_config.temperature
if model_config.top_p is not None:
payload["top_p"] = model_config.top_p
if model_config.repetition_penalty is not None:
payload["repetition_penalty"] = model_config.repetition_penalty
if model_config.top_k is not None:
payload["top_k"] = model_config.top_k
response = await self._http.post(
model_config.endpoint,
json=payload,
@@ -397,5 +356,4 @@ class AIClient:
return data["choices"][0]["message"]["content"]
async def close(self):
# 공유 풀(_get_shared_http) 이라 per-use close 안 함 — 연결 재사용. 프로세스 종료 시 GC.
return None
await self._http.aclose()
+1 -74
View File
@@ -672,71 +672,6 @@ async def list_duplicates(
)
class ClauseHit(BaseModel):
doc_id: int
doc_title: str
section_title: str | None = None
char_start: int | None = None
chunk_id: int
node_type: str | None = None
class ClauseLookupResponse(BaseModel):
label: str
hits: list[ClauseHit]
# NOTE: '/{doc_id}' (int path param) 라우트보다 먼저 선언해야 '/clause-lookup' 이 doc_id 로
# 잘못 매칭되지 않는다 (FastAPI 선언 순서 매칭). 이동 금지.
@router.get("/clause-lookup", response_model=ClauseLookupResponse)
async def clause_lookup(
label: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""절 식별자(예: UG-79)로 크로스-doc 절 위치 조회 — 'UG-79 보여줘' 진입점 (U-1).
절(node_type=clause/clause_split)은 in_corpus=false(검색 비활성)라 의미검색으론 못 찾으므로,
라벨 prefix 정확매칭으로 (doc, char_start) 를 직접 해소해 읽기뷰 점프를 가능케 한다.
대부분 1건; 부록(A-/E-/F-) 등 doc 간 공유 라벨만 다중 반환(에디션 선택). /sections 와 동일하게
document_chunks 직접 조회 — corpus_chunks 우회는 retrieval 아닌 정확지목이므로 의도적 예외.
"""
from sqlalchemy import text as sql_text
lab = (label or "").strip()
if not lab:
return ClauseLookupResponse(label=label, hits=[])
rows = (
await session.execute(
sql_text(
"""
SELECT c.doc_id, d.title AS doc_title, c.section_title, c.char_start, c.node_type,
-- 점프 타깃 = outline(/sections: is_leaf 또는 %_split)에 있는 chunk 여야 딥링크 동작.
-- 자신이 그러면 자신, 아니면(컨테이너 절: 자식 heading 보유·is_leaf=false) 문서순서상
-- 자신 이후 첫 딥링크 가능 chunk(=그 절 내용 시작)로 해소. 그래도 없으면 자신(폴백).
COALESCE(
CASE WHEN c.is_leaf = true OR c.node_type LIKE '%\\_split' ESCAPE '\\' THEN c.id END,
(SELECT ch.id FROM document_chunks ch
WHERE ch.doc_id = c.doc_id AND ch.source_type = 'hier_section'
AND ch.chunk_index >= c.chunk_index
AND (ch.is_leaf = true OR ch.node_type LIKE '%\\_split' ESCAPE '\\')
ORDER BY ch.chunk_index LIMIT 1),
c.id
) AS chunk_id
FROM document_chunks c
JOIN documents d ON d.id = c.doc_id
WHERE c.node_type IN ('clause', 'clause_split')
AND (c.section_title ILIKE :lab_sp OR c.section_title ILIKE :lab_eq)
AND d.deleted_at IS NULL
ORDER BY c.doc_id, c.char_start NULLS LAST
LIMIT 50
"""
).bindparams(lab_sp=lab + " %", lab_eq=lab)
)
).mappings().all()
return ClauseLookupResponse(label=lab, hits=[ClauseHit(**dict(r)) for r in rows])
@router.get("/{doc_id}", response_model=DocumentDetailResponse)
async def get_document(
doc_id: int,
@@ -1277,14 +1212,6 @@ async def update_document(
if val is not None and val not in ("business", "knowledge"):
raise HTTPException(status_code=400, detail="doc_purpose는 business 또는 knowledge만 가능")
# edit_url SSRF 가드 (2026-06-20 M1): 내부/메타데이터 주소 후속 fetch 차단 (news.py 동형 검증)
if update_data.get("edit_url"):
from core.url_validator import validate_feed_url
try:
await asyncio.to_thread(validate_feed_url, update_data["edit_url"])
except Exception as e:
raise HTTPException(status_code=400, detail=f"edit_url 검증 실패: {e}")
for field, value in update_data.items():
setattr(doc, field, value)
doc.updated_at = datetime.now(timezone.utc)
@@ -1565,7 +1492,7 @@ ANALYZE_PROMPT = (
)
ANALYZE_TEXT_LIMIT = 12000 # chars (15000 → 12000, 실측 timeout 빈발)
ANALYZE_TIMEOUT_S = settings.llm_call_timeout_s # 2026-06-20 config 단일소스 (구 60s=빠른 Gemma)
ANALYZE_TIMEOUT_S = 60 # 15,000자 입력 + 4층 출력. 실측 7~45초, safety margin 포함
ANALYZE_CACHE_TTL_S = 1800 # 30분
ANALYZE_CACHE_MAXSIZE = 100
ANALYZE_LAYER_MIN_CHARS = 50 # 이 미만이면 억지 채움으로 보고 제거
-230
View File
@@ -1,230 +0,0 @@
"""뷰어 write-back ingest (study-to-viewer P2) — 뷰어 로컬 풀이 세션을 DS 로 흘려 finalize 재생.
흐름(plan study-to-viewer-slice1 P2, r2/r3 불변식):
뷰어 outbox POST /ingest/study/attempts (Bearer VIEWER_SYNC_TOKEN, study_ingest_enabled gate)
pub_idpublished.source_idStudyQuestion 해소(부재 graceful skip) principal=question.user_id
topic 그룹(뷰어 subject 퀴즈가 여러 DS topic 걸칠 있음) topic 마다 DS quiz_session
(source='viewer', client_session_uuid) 생성 + attempt(derive_outcome=채점 단일 소스) + 세션 done
finalize_session **무수정 재생**(SR/pattern/progress + 4-A/4-B enqueue) finalized_at 마커
전부 1 트랜잭션(원자) commit.
멱등(r2 P2-2): client_session_uuid 기존 세션 있으면 이미 적재된 캐시 요약 반환(재실행 0).
원자 1-tx 'uuid 존재 ⟺ finalize 완료' at-least-once outbox 재전송에도 SR 이중 advance 없음.
user_id 리터럴 금지(r2): principal = 해소된 질문의 owner(단일, mixed 거부).
"""
from __future__ import annotations
import hmac
import logging
from collections import defaultdict
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, Header, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import async_session
from models.published import Published
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_quiz_session import StudyQuizSession
from services.study.outcome import derive_outcome
from services.study.publish_projection import KIND_QUESTION
from services.study.session_finalize import finalize_session
logger = logging.getLogger(__name__)
router = APIRouter()
def _verify_token(authorization: str | None = Header(default=None)) -> None:
"""뷰어↔DS 발행 채널 Bearer(read 와 동일 토큰, r3 단일토큰 수용). default-deny(미설정=503)."""
if not settings.viewer_sync_token:
raise HTTPException(status_code=503, detail="viewer_sync_token not configured")
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="missing Bearer token")
token = authorization[7:].strip()
if not hmac.compare_digest(token, settings.viewer_sync_token):
raise HTTPException(status_code=403, detail="invalid token")
async def _session() -> AsyncSession:
async with async_session() as s:
yield s
class IngestAttempt(BaseModel):
question_pub_id: str
selected_choice: int | None = None
is_unsure: bool = False
answered_at: str | None = None # 클라(오프라인) ISO 시각 — 미래 스큐 클램프, id 가 타이브레이커
class IngestBody(BaseModel):
client_session_uuid: str
attempts: list[IngestAttempt]
def _parse_answered_at(s: str | None, now: datetime) -> datetime:
if not s:
return now
try:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return min(dt, now) # 미래 스큐는 now 로 클램프(클라 시계 오염 방지)
except Exception:
return now
@router.post("/attempts")
async def ingest_attempts(
body: IngestBody,
_auth: None = Depends(_verify_token),
session: AsyncSession = Depends(_session),
):
if not settings.study_ingest_enabled:
raise HTTPException(status_code=503, detail="study_ingest not enabled")
if not body.client_session_uuid or not body.attempts:
raise HTTPException(status_code=400, detail="client_session_uuid 와 attempts 필요")
# 멱등: 이 uuid 로 이미 적재됐나(원자 1-tx 라 존재=완료). 있으면 캐시 요약 반환(재실행 0).
existing = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.client_session_uuid == body.client_session_uuid
)
)
).scalars().all()
if existing:
return {
"status": "already_ingested",
"sessions": [
{
"topic_id": s.study_topic_id,
"correct": s.correct_count,
"wrong": s.wrong_count,
"unsure": s.unsure_count,
}
for s in existing
],
}
# pub_id → source_id(내부 질문 id) 해소. deleted tombstone 제외.
pub_ids = list({a.question_pub_id for a in body.attempts})
pub_rows = (
await session.execute(
select(Published.pub_id, Published.source_id).where(
Published.kind == KIND_QUESTION,
Published.pub_id.in_(pub_ids),
Published.deleted.is_(False),
)
)
).all()
src_by_pubid = {r.pub_id: r.source_id for r in pub_rows}
# 질문 fetch(미삭제). principal = owner(단일).
source_ids = list(set(src_by_pubid.values()))
q_rows = (
await session.execute(
select(StudyQuestion).where(
StudyQuestion.id.in_(source_ids), StudyQuestion.deleted_at.is_(None)
)
)
).scalars().all()
q_by_id = {q.id: q for q in q_rows}
owners = {q.user_id for q in q_by_id.values()}
if len(owners) > 1:
raise HTTPException(status_code=400, detail="여러 사용자 소유 질문 혼재 — 단일 principal 위반")
if not owners:
raise HTTPException(status_code=404, detail="해소 가능한 질문 없음")
user_id = owners.pop()
now = datetime.now(timezone.utc)
# topic 별 그룹(해소 실패 attempt 는 graceful skip). 같은 (uuid, topic) 1 세션.
by_topic: dict[int, list[tuple[IngestAttempt, StudyQuestion]]] = defaultdict(list)
skipped: list[str] = []
for a in body.attempts:
src = src_by_pubid.get(a.question_pub_id)
q = q_by_id.get(src) if src is not None else None
if q is None:
skipped.append(a.question_pub_id)
continue
by_topic[q.study_topic_id].append((a, q))
if not by_topic:
raise HTTPException(status_code=404, detail="해소된 attempt 없음")
summaries = []
for topic_id, items in by_topic.items():
qids = [q.id for (_, q) in items]
qs = StudyQuizSession(
user_id=user_id,
study_topic_id=topic_id,
question_ids=qids,
subject_distribution={},
status="done",
cursor=len(qids),
source="viewer",
client_session_uuid=body.client_session_uuid,
finished_at=now,
created_at=now,
updated_at=now,
)
session.add(qs)
await session.flush() # qs.id
c = w = u = 0
for a, q in items:
try:
sel, is_corr, outcome = derive_outcome(a.selected_choice, a.is_unsure, q.correct_choice)
except ValueError:
skipped.append(a.question_pub_id) # 선택 없고 unsure 아님 = 무효 → skip
continue
if outcome == "correct":
c += 1
elif outcome == "wrong":
w += 1
elif outcome == "unsure":
u += 1
session.add(
StudyQuestionAttempt(
user_id=user_id,
study_question_id=q.id,
study_topic_id=topic_id,
selected_choice=sel,
correct_choice=q.correct_choice,
is_correct=is_corr,
outcome=outcome,
quiz_session_id=qs.id,
answered_at=_parse_answered_at(a.answered_at, now),
)
)
qs.correct_count, qs.wrong_count, qs.unsure_count = c, w, u
await session.flush()
# finalize 무수정 재생(progress/SR/pattern + 4-A/4-B enqueue). 그 후 멱등 마커.
summary = await finalize_session(
session, user_id=user_id, study_topic_id=topic_id, quiz_session_id=qs.id
)
qs.finalized_at = now
summaries.append(
{
"topic_id": topic_id,
"quiz_session_id": qs.id,
"correct": summary.correct,
"wrong": summary.wrong,
"unsure": summary.unsure,
"newly_correct": summary.newly_correct,
"relapsed": summary.relapsed,
"recovered": summary.recovered,
}
)
await session.commit()
logger.info(
"study_ingest uuid=%s user=%s sessions=%s skipped=%s",
body.client_session_uuid, user_id, len(summaries), len(skipped),
)
return {"status": "ingested", "skipped": skipped, "sessions": summaries}
+5 -5
View File
@@ -9,7 +9,7 @@ from sqlalchemy import func, select
from sqlalchemy import text as sql_text
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user, require_admin
from core.auth import get_current_user
from core.database import get_session
from core.library import LIBRARY_PREFIX, MAX_DEPTH, normalize_library_path
from models.category import LibraryCategory
@@ -78,7 +78,7 @@ async def list_categories(
@router.post("/categories", response_model=CategoryResponse, status_code=201)
async def create_category(
body: CategoryCreate,
user: Annotated[User, Depends(require_admin)],
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리 생성 (조상 자동 생성 포함)"""
@@ -133,7 +133,7 @@ async def create_category(
@router.patch("/categories", response_model=CategoryResponse)
async def rename_category(
body: CategoryRename,
user: Annotated[User, Depends(require_admin)],
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""카테고리 이름 변경 (leaf only, path 기반 식별)"""
@@ -214,7 +214,7 @@ async def rename_category(
@router.delete("/categories", status_code=204)
async def delete_category(
path: str = Query(..., description="삭제할 카테고리 경로"),
user: Annotated[User, Depends(require_admin)] = None,
user: Annotated[User, Depends(get_current_user)] = None,
session: Annotated[AsyncSession, Depends(get_session)] = None,
):
"""카테고리 삭제 (leaf only, 문서 없는 경우만)"""
@@ -410,7 +410,7 @@ async def get_facet_values(
@router.post("/facets", response_model=FacetValueResponse, status_code=201)
async def add_facet_value(
body: FacetValueResponse,
user: Annotated[User, Depends(require_admin)],
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""facet 사전에 새 값 추가"""
-254
View File
@@ -1,254 +0,0 @@
"""발행 read API (docsrv-viewer-publish P0-2) — 뷰어가 pull-sync 로 당기는 feed.
published 테이블(발행 워커가 rev 커밋순 gapless 부여) rev 커서로 페이지네이션해 반환.
뷰어 = Bearer(settings.viewer_sync_token) 인증, default-deny. read-only(SELECT ).
GET /published/feed?since={rev}&kind={kind}&limit={n}
rev > since 행을 rev ASC limit 만큼. kind 옵션(study_question|study_explanation|... 후속).
tombstone(deleted=true) 1 이벤트로 포함 뷰어가 pub_id 로컬 삭제(stale 회피).
rev 커서 안전성: 워커가 pg_advisory_xact_lock 단일 라이터로 배치 rev 트랜잭션에
부여·커밋 리더는 rev N N-1 없이 보지 못함(부분가시 0). 뷰어는 next_since 반복.
엔벨로프 schema_version = 전송 계약 버전(payload 행별 schema_version 별개).
미지원 버전 가시거부는 뷰어 책임(no-silent-fallback) 여기선 행별 schema_version 그대로 전달.
"""
from __future__ import annotations
import hmac
import logging
import logging
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import select, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
from core.database import async_session
from models.published import Published
from models.published import Published
from services.queue_overview import build_overview
logger = logging.getLogger(__name__)
router = APIRouter()
# feed 엔벨로프(전송 계약) 버전 — payload schema_version 과 독립.
FEED_SCHEMA_VERSION = 1
DEFAULT_LIMIT = 200
MAX_LIMIT = 500
def _verify_token(authorization: str | None = Header(default=None)) -> None:
"""뷰어↔DS 발행 채널 Bearer 인증. default-deny(미설정=503). 상수시간 비교(internal_study 정본).
토큰은 정답 포함 study payload 노출하므로 hmac.compare_digest timing side-channel 차단.
"""
if not settings.viewer_sync_token:
raise HTTPException(status_code=503, detail="viewer_sync_token not configured")
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="missing Bearer token")
token = authorization[7:].strip()
if not hmac.compare_digest(token, settings.viewer_sync_token):
raise HTTPException(status_code=403, detail="invalid token")
async def _session() -> AsyncSession:
async with async_session() as s:
yield s
class FeedItem(BaseModel):
pub_id: str # opaque+stable = 뷰어 dedup키 = progress키
kind: str
source_id: int # DS 내부 소스 행 id (ingest write-back 역해소용, P2)
rev: int
deleted: bool # tombstone — 뷰어 로컬 삭제 트리거
schema_version: int # payload 모양 버전(뷰어 range 수용)
payload: dict # render-ready projection (tombstone 이면 {})
class FeedResponse(BaseModel):
schema_version: int # 엔벨로프(전송 계약) 버전
items: list[FeedItem]
next_since: int # 다음 호출 since (이 배치 max rev; 빈 배치면 입력 since 유지)
has_more: bool # limit 가득 = 더 있을 수 있음(뷰어 반복)
@router.get("/feed", response_model=FeedResponse)
async def published_feed(
since: int = Query(0, ge=0),
kind: str | None = Query(None, max_length=40),
limit: int = Query(DEFAULT_LIMIT, ge=1, le=MAX_LIMIT),
_auth: None = Depends(_verify_token),
session: AsyncSession = Depends(_session),
):
"""rev > since 행을 rev ASC 로 limit 만큼 반환. 뷰어가 next_since 로 incremental pull."""
stmt = select(Published).where(Published.rev > since)
if kind:
stmt = stmt.where(Published.kind == kind)
stmt = stmt.order_by(Published.rev.asc()).limit(limit)
rows = (await session.execute(stmt)).scalars().all()
items = [
FeedItem(
pub_id=r.pub_id,
kind=r.kind,
source_id=r.source_id,
rev=r.rev,
deleted=r.deleted,
schema_version=r.schema_version,
payload=r.payload if r.payload is not None else {},
)
for r in rows
]
next_since = items[-1].rev if items else since
has_more = len(rows) == limit
logger.info(
"published_feed since=%s kind=%s returned=%s next_since=%s has_more=%s",
since, kind, len(items), next_since, has_more,
)
return FeedResponse(
schema_version=FEED_SCHEMA_VERSION,
items=items,
next_since=next_since,
has_more=has_more,
)
# ── P1-1: 뉴스/다이제스트 발행 read API (docsrv-viewer-publish) ────────────────────
# global_digests(일간 컨테이너) + digest_topics(토픽 N, digest_id FK) -> render-ready
# read-time projection. content-type 파라미터화(plan r2): version 커서=global_digests.id
# (일간 단일 라이터라 gapless 불요·gap 무해) · pub_id=date-as-id(admin-gated feed 라 opacity
# 불필요) · tombstone 없음(다이제스트 미삭제). 엔벨로프는 /feed 와 동일(FeedResponse)=뷰어 재사용.
# scaffold-first: DIGEST_PUBLISH_ENABLED off(기본)=503(명시적 미가동, no-silent).
DIGEST_PAYLOAD_SCHEMA_VERSION = 1
@router.get("/digest", response_model=FeedResponse)
async def published_digest(
since: int = Query(0, ge=0),
limit: int = Query(DEFAULT_LIMIT, ge=1, le=MAX_LIMIT),
_auth: None = Depends(_verify_token),
session: AsyncSession = Depends(_session),
):
"""global_digests.id > since 를 id ASC 로 limit 만큼. 각 digest 에 topics 조인해 render-ready 반환."""
if not settings.digest_publish_enabled:
raise HTTPException(status_code=503, detail="digest publish not enabled (scaffold)")
drows = (await session.execute(
text(
"SELECT id, digest_date, status, total_articles, total_topics, total_countries, created_at "
"FROM global_digests WHERE id > :since ORDER BY id ASC LIMIT :limit"
),
{"since": since, "limit": limit},
)).mappings().all()
if not drows:
return FeedResponse(schema_version=FEED_SCHEMA_VERSION, items=[], next_since=since, has_more=False)
ids = [r["id"] for r in drows]
trows = (await session.execute(
text(
"SELECT digest_id, topic_rank, topic_label, summary, country, article_count, importance_score "
"FROM digest_topics WHERE digest_id = ANY(:ids) ORDER BY digest_id ASC, topic_rank ASC"
),
{"ids": ids},
)).mappings().all()
topics_by_digest: dict[int, list[dict]] = {}
for t in trows:
topics_by_digest.setdefault(t["digest_id"], []).append({
"rank": t["topic_rank"],
"label": t["topic_label"],
"summary": t["summary"],
"country": t["country"],
"article_count": t["article_count"],
"importance": t["importance_score"],
})
items = []
for r in drows:
d_date = r["digest_date"].isoformat() if r["digest_date"] else None
items.append(FeedItem(
pub_id=f"digest:{d_date}",
kind="digest",
source_id=r["id"],
rev=r["id"],
deleted=False,
schema_version=DIGEST_PAYLOAD_SCHEMA_VERSION,
payload={
"digest_date": d_date,
"status": r["status"],
"total_articles": r["total_articles"],
"total_topics": r["total_topics"],
"total_countries": r["total_countries"],
"generated_at": r["created_at"].isoformat() if r["created_at"] else None,
"topics": topics_by_digest.get(r["id"], []),
},
))
next_since = items[-1].rev
has_more = len(drows) == limit
logger.info(
"published_digest since=%s returned=%s next_since=%s has_more=%s",
since, len(items), next_since, has_more,
)
return FeedResponse(
schema_version=FEED_SCHEMA_VERSION,
items=items,
next_since=next_since,
has_more=has_more,
)
# ── P1-2: 가공현황 라이브 스냅샷 API (+P1-4 점검 플래그) ──────────────────────────
# 뷰어 리포트 '문서 가공현황' 섹션용. build_overview(기존 서비스) 재사용 + source_health
# 조인 요약. pull-through(저장 X) — 라이브 수치라 캐시 없음, 소비자(뷰어)가 2~3s timeout 책임
# (plan P1-2). P1-4: maintenance 플래그 동봉 — 소프트락/점검이 워커를 멈춰 수치가 정체로
# 보일 때 뷰어가 '점검·실험 중' 배너로 구분(표면 != 데이터). read-only.
@router.get("/processing-status")
async def published_processing_status(
_auth: None = Depends(_verify_token),
session: AsyncSession = Depends(_session),
):
"""가공현황 스냅샷: queue overview + source_health 요약 + maintenance 플래그."""
overview = await build_overview(session)
sh_rows = (await session.execute(text(
"SELECT ns.name, ns.category, sh.circuit_state, sh.consecutive_failures, sh.empty_streak, "
"sh.last_success_at, sh.last_probe_ok "
"FROM source_health sh JOIN news_sources ns ON ns.id = sh.source_id "
"ORDER BY (sh.circuit_state <> 'closed') DESC, sh.consecutive_failures DESC"
))).mappings().all()
by_state: dict[str, int] = {}
problems: list[dict] = []
for r in sh_rows:
st = r["circuit_state"]
by_state[st] = by_state.get(st, 0) + 1
if st != "closed":
problems.append({
"name": r["name"],
"category": r["category"],
"circuit_state": st,
"consecutive_failures": r["consecutive_failures"],
"empty_streak": r["empty_streak"],
"last_success_at": r["last_success_at"].isoformat() if r["last_success_at"] else None,
"last_probe_ok": r["last_probe_ok"],
})
return {
"schema_version": 1,
"generated_at": datetime.now(timezone.utc).isoformat(),
"overview": overview,
"sources": {
"total": len(sh_rows),
"by_circuit_state": by_state,
"problems": problems,
},
"maintenance": {
"active": settings.maintenance_mode,
"note": settings.maintenance_note,
},
}
+1 -21
View File
@@ -21,14 +21,12 @@ from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from models.study_memo_card import StudyMemoCard, StudyMemoCardEvidence, record_card_view
from models.study_memo_card_progress import StudyMemoCardProgress, rate_card
from models.study_question import StudyQuestion
from models.user import User
from services.study.card_normalize import compute_dedup_hash
from services.study.publish_enqueue import enqueue_card_progress_publish, enqueue_card_publish
router = APIRouter()
@@ -250,18 +248,9 @@ async def approve_batch(
StudyMemoCard.needs_review,
)
.values(needs_review=False, flagged_by=None, flagged_at=None)
.returning(StudyMemoCard.id)
)
approved_ids = list(result.scalars().all())
# 방금 검수완료된 카드 발행(같은 tx, flag off 면 no-op). S-2.
if settings.study_publish_enabled and approved_ids:
cards = (
await session.execute(select(StudyMemoCard).where(StudyMemoCard.id.in_(approved_ids)))
).scalars().all()
for c in cards:
await enqueue_card_publish(session, c)
await session.commit()
return {"approved": len(approved_ids)}
return {"approved": result.rowcount or 0}
# ─── 복습(SR) 트랙 ───
@@ -321,9 +310,6 @@ async def rate(
if outcome is None:
raise HTTPException(status_code=422, detail=f"invalid outcome: {body.outcome!r}")
progress = await rate_card(session, card=card, outcome=outcome, now=datetime.now(timezone.utc))
# 카드 SR 상태 발행(같은 tx, flag off=no-op) — ALL row(sentinel/terminal 포함). S-4.
if settings.study_publish_enabled:
await enqueue_card_progress_publish(session, progress)
await session.commit()
return RateResult(
card_id=card.id, outcome=outcome, review_stage=progress.review_stage, due_at=progress.due_at
@@ -406,9 +392,6 @@ async def update_card(
card.flagged_by = None
card.flagged_at = None
# 발행 재투영/tombstone(같은 tx) — 검수완료=발행·검수대기복귀=tombstone(상태 기반). S-2.
if settings.study_publish_enabled:
await enqueue_card_publish(session, card)
try:
await session.commit()
except IntegrityError:
@@ -431,7 +414,4 @@ async def delete_card(
card = await session.get(StudyMemoCard, card_id)
card = _verify_card(card, user)
card.deleted_at = datetime.now(timezone.utc)
# 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트. S-2.
if settings.study_publish_enabled:
await enqueue_card_publish(session, card)
await session.commit()
+16 -35
View File
@@ -39,9 +39,6 @@ from services.study.explanation_rag import (
gather_explanation_context,
render_evidence_block,
)
from services.study.publish_enqueue import enqueue_publish, enqueue_question_publish
from services.study.publish_projection import KIND_CARD, KIND_EXPLANATION, KIND_QUESTION
from services.study.outcome import derive_outcome
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -546,9 +543,6 @@ async def create_question_in_topic(
)
session.add(q)
await session.flush()
# 발행 outbox 적재(같은 tx, flag off 면 no-op) — 신규 문항 발행. P0-1b.
if settings.study_publish_enabled:
await enqueue_question_publish(session, q)
await session.commit()
stats = QuestionAttemptStats(attempt_count=0, correct_count=0, wrong_count=0)
@@ -911,16 +905,9 @@ async def update_question(
# 카드는 '구' ai_explanation 에서 추출됐으므로 정정 후 stale 가능 — 즉시 가시화 플래그.
# 최종 stale 정리는 card_extract 워커의 supersede 가 책임(새 버전 추출 시 구버전 retire).
if AI_STALE_TRIGGER & fields_set:
flagged_card_ids = await flag_cards_for_source(session, source_question_id=q.id, reason="source_changed")
# 발행 자격 잃은(검수대기 복귀) 파생 카드 tombstone(같은 tx). S-2.
if settings.study_publish_enabled:
for cid in flagged_card_ids:
await enqueue_publish(session, kind=KIND_CARD, source_id=cid, payload=None, deleted=True)
await flag_cards_for_source(session, source_question_id=q.id, reason="source_changed")
q.updated_at = datetime.now(timezone.utc)
# 발행 재투영(같은 tx) — 문항 갱신 반영. 해설은 ready 일 때만 동봉, stale→tombstone 은 P1-3. P0-1b.
if settings.study_publish_enabled:
await enqueue_question_publish(session, q)
await session.commit()
stats = await _attempt_stats(session, user.id, question_id)
@@ -983,16 +970,7 @@ async def soft_delete_question(
)
# 공부 암기노트: 소스 문제 삭제 시 파생 암기카드를 검토 대기로 마킹(source_deleted).
# study_questions 는 soft-delete 만이라 카드 FK CASCADE 는 미발동 — 이 훅이 실 경로.
flagged_card_ids = await flag_cards_for_source(session, source_question_id=q.id, reason="source_deleted")
# 발행 자격 잃은 파생 카드 tombstone(같은 tx). S-2.
if settings.study_publish_enabled:
for cid in flagged_card_ids:
await enqueue_publish(session, kind=KIND_CARD, source_id=cid, payload=None, deleted=True)
# 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트(raw DELETE 금지·워커 경유). 해설 본문 있으면 그 kind 도. P0-1b.
if settings.study_publish_enabled:
await enqueue_publish(session, kind=KIND_QUESTION, source_id=q.id, payload=None, deleted=True)
if q.ai_explanation:
await enqueue_publish(session, kind=KIND_EXPLANATION, source_id=q.id, payload=None, deleted=True)
await flag_cards_for_source(session, source_question_id=q.id, reason="source_deleted")
await session.commit()
@@ -1014,13 +992,19 @@ async def submit_attempt(
q = await session.get(StudyQuestion, question_id)
q = _verify_question_ownership(q, user)
# 채점 단일 소스 — 뷰어 ingest 와 동일 함수(P2). 선택 없고 unsure 아니면 422.
try:
selected, is_correct, outcome = derive_outcome(
body.selected_choice, body.is_unsure, q.correct_choice
if body.is_unsure:
selected = None
is_correct = False
outcome = "unsure"
elif body.selected_choice is None:
raise HTTPException(
status_code=422,
detail="selected_choice (1~4) 또는 is_unsure=true 가 필요합니다",
)
except ValueError as e:
raise HTTPException(status_code=422, detail=str(e))
else:
selected = body.selected_choice
is_correct = selected == q.correct_choice
outcome = "correct" if is_correct else "wrong"
# PR-10: 세션 연동. 기본은 None.
quiz_session: StudyQuizSession | None = None
@@ -1559,8 +1543,8 @@ async def delete_question_image(
# ─── PR-3: AI 풀이 생성 엔드포인트 ───
# 2026-06-20: config 단일소스 (구 하드코딩 30s = 빠른 Gemma 기준).
LLM_TIMEOUT_S = settings.llm_call_timeout_s
# MLX 호출 timeout (초). MLX gate + 26B 추론 평균 ~10s, 안전 마진.
LLM_TIMEOUT_S = 30.0
# 프롬프트 템플릿 lazy load
_PROMPT_PATH = "study_question_explanation.txt"
_prompt_cache: str | None = None
@@ -1729,9 +1713,6 @@ async def generate_ai_explanation(
primary_name = ai_client.ai.primary.model if hasattr(ai_client.ai.primary, "model") else "primary"
q.ai_explanation_model = f"mlx:{primary_name}"
q.updated_at = q.ai_explanation_generated_at
# 발행 재투영(같은 tx) — 실시간 해설 ready → 문항+해설 발행. P0-1b.
if settings.study_publish_enabled:
await enqueue_question_publish(session, q)
await session.commit()
return AIExplanationResponse(
+2 -15
View File
@@ -33,7 +33,6 @@ from ai.client import AIClient, strip_thinking
from eid.ai import EidAIClient
from eid.compose import compose
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from core.library import LIBRARY_PREFIX, normalize_library_path
from models.document import Document
@@ -47,8 +46,6 @@ from models.eid_study_weakness import EidStudyWeakness
from models.eid_review_set_draft import EidReviewSetDraft
from models.user import User
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.study.publish_enqueue import enqueue_publish, enqueue_topic_publish
from services.study.publish_projection import KIND_TOPIC
from services.study.subject_note_rag import (
SubjectNoteContext,
gather_subject_note_context,
@@ -469,9 +466,6 @@ async def create_study_topic(
session.add(topic)
try:
await session.flush()
# 발행 outbox 적재(같은 tx, flag off 면 no-op) — 신규 주제 발행. S-1.
if settings.study_publish_enabled:
await enqueue_topic_publish(session, topic)
await session.commit()
except IntegrityError:
await session.rollback()
@@ -701,10 +695,6 @@ async def update_study_topic(
topic.focused_at = datetime.now(timezone.utc) if body.focused else None
topic.updated_at = datetime.now(timezone.utc)
# 발행 재투영(같은 tx) — 주제 메타 갱신 반영. payload(name·exam_round_size) 무변경(focused 등)
# 은 워커 (payload_hash, deleted) 디둡이 rev 안 올리고 흡수 = churn 없음. S-1.
if settings.study_publish_enabled:
await enqueue_topic_publish(session, topic)
try:
await session.commit()
except IntegrityError:
@@ -780,9 +770,6 @@ async def delete_study_topic(
)
topic.deleted_at = datetime.now(timezone.utc)
# 발행 tombstone(같은 tx) — 삭제는 feed 1급 이벤트(raw DELETE 금지·워커 경유). S-1.
if settings.study_publish_enabled:
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=None, deleted=True)
await session.commit()
@@ -1028,7 +1015,7 @@ async def detach_session_from_topic(
# ─── PR-9: 분야 설명 (study_topic_subject_notes) ───
SUBJECT_NOTE_TIMEOUT_S = settings.llm_call_timeout_s
SUBJECT_NOTE_TIMEOUT_S = 30.0
_SUBJECT_NOTE_PROMPT_PATH = "study_subject_note.txt"
_subject_note_prompt_cache: str | None = None
@@ -1255,7 +1242,7 @@ async def generate_subject_note(
# 워커(study_weakness)가 산출한 최신 eid_study_weakness 스냅샷을 '학습 진단 코치'(study overlay)
# 로 번역. 약점/태도 '판정'은 코드 derived(스냅샷) — LLM 은 스냅샷 블록 값만 인용(환각 약점 차단).
# compose("study_diagnosis") = persona+rules+study overlay(+{placeholder}) → 표면이 블록 substitute.
DIAGNOSIS_TIMEOUT_S = settings.llm_call_timeout_s
DIAGNOSIS_TIMEOUT_S = 40.0
class StudyDiagnosisResponse(BaseModel):
-36
View File
@@ -30,11 +30,6 @@ class AIModelConfig(BaseModel):
# None = MLX/OpenAI server default. Anthropic branch 는 미적용 (별 plan 범위).
temperature: float | None = None
top_p: float | None = None
# mlx 네이티브 샘플링 — 한국어 장문 코드스위칭(CJK/라틴 누수)·반복루프 억제용.
# Qwen3 권장: top_k=20, repetition_penalty 1.05~1.1. None = 서버 기본값(주입 안 함).
# OpenAI 호환 분기(mlx)만 적용 — Anthropic 분기는 미적용(별 범위).
repetition_penalty: float | None = None
top_k: int | None = None
class DeepSummaryBacklogConfig(BaseModel):
@@ -181,29 +176,16 @@ class Settings(BaseModel):
digest_llm_timeout_s: int = 200
digest_llm_attempts: int = 2
digest_pipeline_hard_cap_s: int = 1800
# 2026-06-20: study/analyze 단일 primary-call 타임아웃 (구 하드코딩 30~60s = 빠른 Gemma 기준,
# Qwen 27B 교체 sweep 누락 → 사용자 대면 504 + 워커 영구 stuck). digest 와 동형 단일소스.
llm_call_timeout_s: int = 200
# PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini
# GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거.
study_explanation_enabled: bool = True
# 공부 암기노트 Phase 1: card_extract 폴러/consumer 게이트. owner 분리 시 false 로.
study_card_extract_enabled: bool = True
# 발행 레이어(docsrv-viewer-publish): publish_outbox 워커 게이트. 저자/4-A enqueue 결선(P0-1b) 후 true.
study_publish_enabled: bool = False
digest_publish_enabled: bool = False # docsrv-viewer-publish P1-1 (뉴스/다이제스트 발행 feed gate)
maintenance_mode: bool = False # P1-4: 점검/실험 중 = 가공현황 배너(표면 != 데이터)
maintenance_note: str = ""
# 뷰어 write-back ingest(study-to-viewer P2) 게이트. /ingest/study/attempts 활성. 기본 false=inert(503).
study_ingest_enabled: bool = False
# internal endpoint Bearer token (Mac mini derived-worker 호출용)
internal_worker_token: str = ""
# 뷰어↔DS 발행 채널 Bearer token (publish read API P0-2 + ingest P2). Mac mini 토큰과 분리(폭발반경 격리).
viewer_sync_token: str = ""
def load_settings() -> Settings:
"""config.yaml + 환경변수에서 설정 로딩"""
@@ -211,13 +193,7 @@ def load_settings() -> Settings:
database_url = os.getenv("DATABASE_URL", "")
study_explanation_enabled = os.getenv("STUDY_EXPLANATION_ENABLED", "true").lower() in ("1", "true", "yes")
study_card_extract_enabled = os.getenv("STUDY_CARD_EXTRACT_ENABLED", "true").lower() in ("1", "true", "yes")
study_publish_enabled = os.getenv("STUDY_PUBLISH_ENABLED", "false").lower() in ("1", "true", "yes")
digest_publish_enabled = os.getenv("DIGEST_PUBLISH_ENABLED", "false").lower() in ("1", "true", "yes")
maintenance_mode = os.getenv("MAINTENANCE_MODE", "false").lower() in ("1", "true", "yes")
maintenance_note = os.getenv("MAINTENANCE_NOTE", "")
study_ingest_enabled = os.getenv("STUDY_INGEST_ENABLED", "false").lower() in ("1", "true", "yes")
internal_worker_token = os.getenv("INTERNAL_WORKER_TOKEN", "")
viewer_sync_token = os.getenv("VIEWER_SYNC_TOKEN", "")
jwt_secret = os.getenv("JWT_SECRET", "")
totp_secret = os.getenv("TOTP_SECRET", "")
eval_runner_token = os.getenv("EVAL_RUNNER_TOKEN", "")
@@ -292,7 +268,6 @@ def load_settings() -> Settings:
digest_llm_timeout_s = 200
digest_llm_attempts = 2
digest_pipeline_hard_cap_s = 1800
llm_call_timeout_s = 200
if config_path.exists() and raw and "pipeline" in raw:
held_raw = (raw.get("pipeline") or {}).get("held_stages") or []
# 스칼라(문자열) 오기입 시 char-split 방지 — 단일 항목 리스트로 수용.
@@ -318,10 +293,6 @@ def load_settings() -> Settings:
digest_pipeline_hard_cap_s = max(60, int(_pl.get("digest_pipeline_hard_cap_s", 1800)))
except (TypeError, ValueError):
digest_pipeline_hard_cap_s = 1800
try:
llm_call_timeout_s = max(1, int(_pl.get("llm_call_timeout_s", 200)))
except (TypeError, ValueError):
llm_call_timeout_s = 200
taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {}
document_types = raw.get("document_types", []) if config_path.exists() and raw else []
@@ -350,19 +321,12 @@ def load_settings() -> Settings:
upload=upload_cfg,
study_explanation_enabled=study_explanation_enabled,
study_card_extract_enabled=study_card_extract_enabled,
study_publish_enabled=study_publish_enabled,
digest_publish_enabled=digest_publish_enabled,
maintenance_mode=maintenance_mode,
maintenance_note=maintenance_note,
study_ingest_enabled=study_ingest_enabled,
internal_worker_token=internal_worker_token,
viewer_sync_token=viewer_sync_token,
pipeline_held_stages=pipeline_held_stages,
mlx_gate_concurrency=mlx_gate_concurrency,
digest_llm_timeout_s=digest_llm_timeout_s,
digest_llm_attempts=digest_llm_attempts,
digest_pipeline_hard_cap_s=digest_pipeline_hard_cap_s,
llm_call_timeout_s=llm_call_timeout_s,
)
+1 -4
View File
@@ -2,7 +2,6 @@
import hashlib
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path
@@ -14,9 +13,7 @@ def setup_logger(name: str, log_dir: str = "logs") -> logging.Logger:
if not logger.handlers:
# 파일 핸들러
fh = RotatingFileHandler(
f"{log_dir}/{name}.log", maxBytes=10 * 1024 * 1024, backupCount=3, encoding="utf-8"
)
fh = logging.FileHandler(f"{log_dir}/{name}.log", encoding="utf-8")
fh.setFormatter(logging.Formatter(
"%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
+2 -22
View File
@@ -9,8 +9,6 @@ from sqlalchemy import func, select, text
from api.audio import router as audio_router
from api.internal_study import router as internal_study_router
from api.internal_worker import router as internal_worker_router
from api.published import router as published_router
from api.ingest_study import router as ingest_study_router
from api.auth import router as auth_router
from api.briefing import router as briefing_router
from api.config import router as config_router
@@ -72,7 +70,6 @@ async def lifespan(app: FastAPI):
from workers.study_session_queue_consumer import consume_study_session_queue
from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue
from workers.study_card_enqueue import run as study_card_enqueue_run
from workers.study_publish_worker import consume_publish_outbox
from workers.study_reminder import run as study_reminder_run
from workers.study_weakness import run as study_weakness_run
from workers.study_question_embed_worker import (
@@ -87,13 +84,6 @@ async def lifespan(app: FastAPI):
# 시작: DB 연결 확인
await init_db()
# 2026-06-20: JWT_SECRET 빈값 fail-loud — credentials.env 미로드/누락 시 빈 키로 전 토큰
# 서명하며 부팅하던 침묵 인증붕괴 차단 (totp_secret 은 per-user 라 미가드).
if not settings.jwt_secret:
raise RuntimeError(
"JWT_SECRET 미설정 — 빈 키 서명 방지. credentials.env / 환경변수 확인."
)
# NAS 마운트 확인 (NFS 미마운트 시 로컬 빈 디렉토리에 쓰는 것 방지)
from pathlib import Path
nas_check = Path(settings.nas_mount_path) / "PKM"
@@ -104,12 +94,7 @@ async def lifespan(app: FastAPI):
)
# APScheduler: 백그라운드 작업
scheduler = AsyncIOScheduler(
timezone="Asia/Seoul",
# 2026-06-20 H4: 기본 misfire_grace_time=1s 는 단일 asyncio 루프가 1초만 혼잡해도
# 1분 컨슈머 틱을 run time missed 로 침묵 스킵(에러·failed row 0). 45s 완화 + coalesce.
job_defaults={"misfire_grace_time": 45, "coalesce": True, "max_instances": 1},
)
scheduler = AsyncIOScheduler(timezone="Asia/Seoul")
# 상시 실행
scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer")
# PR-DocSrv-Markdown-Consumer-Split-1: markdown(marker) 전용 consumer.
@@ -143,9 +128,6 @@ async def lifespan(app: FastAPI):
# 별 테이블/별 consumer 로 기존 study queue 와 격리. settings.study_card_extract_enabled 게이트.
scheduler.add_job(consume_study_memo_card_queue, "interval", minutes=1, id="study_memo_card_consumer")
scheduler.add_job(study_card_enqueue_run, "interval", minutes=1, id="study_card_enqueue")
# 발행 레이어(docsrv-viewer-publish): publish_outbox drain → published rev 부여.
# study_publish_enabled=false(기본) 면 worker 내부 no-op. 단일 라이터(pg_advisory_xact_lock) max_instances=1.
scheduler.add_job(consume_publish_outbox, "interval", minutes=1, id="publish_outbox_consumer", max_instances=1)
# PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue.
# safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화.
scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill")
@@ -162,7 +144,7 @@ async def lifespan(app: FastAPI):
scheduler.add_job(study_reminder_run, CronTrigger(hour="9,13,19", timezone=KST), id="study_reminder")
# 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source.
scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness")
scheduler.add_job(news_collector_run, CronTrigger(hour="0,6,12,18", timezone=KST), id="news_collector")
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
# crawl-24x7 A-2 안전망: fulltext 영구 실패(3회 소진) 문서를 RSS 요약 기준으로
# 후속 enqueue (silent skip 누적 방지). 03:40 = dedup_reconcile(03:30) 직후 비충돌 슬롯.
scheduler.add_job(fulltext_reconcile_run, CronTrigger(hour=3, minute=40, timezone=KST), id="fulltext_reconcile")
@@ -238,8 +220,6 @@ app.include_router(briefing_router, prefix="/api/briefing", tags=["briefing"])
app.include_router(audio_router, prefix="/api/audio", tags=["audio"])
app.include_router(internal_study_router, prefix="/internal/study", tags=["internal-study"])
app.include_router(internal_worker_router, prefix="/internal/worker", tags=["internal-worker"])
app.include_router(published_router, prefix="/published", tags=["published"])
app.include_router(ingest_study_router, prefix="/ingest/study", tags=["ingest-study"])
app.include_router(video_router, prefix="/api/video", tags=["video"])
app.include_router(study_sessions_router, prefix="/api/study-sessions", tags=["study-sessions"])
app.include_router(study_topics_router, prefix="/api/study-topics", tags=["study-topics"])
-60
View File
@@ -1,60 +0,0 @@
"""발행 레이어 ORM (docsrv-viewer-publish) — published projection + publish_outbox.
관계(relationship) 없음 = 독립 테이블, configure_mappers 무영향. 마이그 367~372.
published = 뷰어가 read API(P0-2) 당기는 render-ready projection(kind-discriminated).
publish_outbox = 저작/4-A 트랜잭션이 같은 tx에서 INSERT, 발행 워커가 drain 하며 rev 부여.
불변식(plan study-to-viewer-slice1):
pub_id opaque+stable = dedup키 = progress키 / rev = 워커 커밋순 gapless(pg_advisory_lock 단일 라이터)
/ (payload_hash, deleted) 디둡 / 삭제 = tombstone(deleted=true) / schema_version = 엔벨로프 버전.
"""
from __future__ import annotations
from datetime import datetime
from sqlalchemy import BigInteger, Boolean, DateTime, SmallInteger, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class Published(Base):
__tablename__ = "published"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
kind: Mapped[str] = mapped_column(String(40), nullable=False)
source_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
pub_id: Mapped[str] = mapped_column(Text, nullable=False)
payload: Mapped[dict] = mapped_column(JSONB, nullable=False)
payload_hash: Mapped[str] = mapped_column(Text, nullable=False)
schema_version: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=1)
rev: Mapped[int] = mapped_column(BigInteger, nullable=False)
deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
# UNIQUE(kind, pub_id)=mig368, UNIQUE(kind, source_id)=mig369, idx(rev)=mig370.
class PublishOutbox(Base):
__tablename__ = "publish_outbox"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
kind: Mapped[str] = mapped_column(String(40), nullable=False)
source_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
payload: Mapped[dict] = mapped_column(JSONB, nullable=False)
payload_hash: Mapped[str] = mapped_column(Text, nullable=False)
schema_version: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=1)
deleted: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# 미처리 부분 인덱스 idx(id) WHERE processed_at IS NULL = mig372.
+8 -32
View File
@@ -25,7 +25,6 @@ from sqlalchemy import (
String,
Text,
func,
select,
text,
update,
)
@@ -100,25 +99,13 @@ async def supersede_old_cards(
*,
source_question_id: int,
keep_generated_at: datetime | None,
) -> list[int]:
) -> int:
"""같은 문제의 '다른 버전' 카드를 deleted_at 마킹(retire).
source_generated_at 카드 적재 '전에' 호출 살아있는 구버전 카드가 dedup PARTIAL
UNIQUE 추출을 막는 것을 방지(정정- stale 잔류 0). 같은 버전은 보존.
Returns: retire 되며 '발행 중이던'(needs_review=False) 카드 id 목록 발행 tombstone
대상(호출측이 enqueue). 검수 됐던(미발행) retire 카드는 tombstone 불요라 제외.
Returns: retire .
"""
# 발행 중이던 retire 대상 선캡처(update 전) — 미발행 카드 스푸리어스 tombstone 회피.
published_retired = (
await session.execute(
select(StudyMemoCard.id).where(
StudyMemoCard.source_question_id == source_question_id,
StudyMemoCard.deleted_at.is_(None),
StudyMemoCard.source_generated_at.is_distinct_from(keep_generated_at),
StudyMemoCard.needs_review.is_(False),
)
)
).scalars().all()
stmt = (
update(StudyMemoCard)
.where(
@@ -128,8 +115,8 @@ async def supersede_old_cards(
)
.values(deleted_at=func.now())
)
await session.execute(stmt)
return list(published_retired)
result = await session.execute(stmt)
return result.rowcount or 0
async def append_card(
@@ -229,24 +216,13 @@ async def flag_cards_for_source(
*,
source_question_id: int,
reason: str,
) -> list[int]:
) -> int:
"""소스 문제 정정/삭제 시 파생 카드를 needs_review=auto 마킹(임시 플래그).
최종 stale 정리는 워커 supersede 책임 이건 사용자 가시화용 즉시 플래그.
reason: 'source_changed' | 'source_deleted'.
Returns: 플래그로 '발행 자격을 잃은'(직전 needs_review=False) 카드 id 목록 발행
tombstone 대상(호출측 enqueue). 이미 검수대기였던(미발행) 카드는 제외.
Returns: 마킹된 .
"""
# 발행 중이던 카드 선캡처(update 전) — 플래그로 needs_review=True 가 되면 발행 자격 상실.
published_ids = (
await session.execute(
select(StudyMemoCard.id).where(
StudyMemoCard.source_question_id == source_question_id,
StudyMemoCard.deleted_at.is_(None),
StudyMemoCard.needs_review.is_(False),
)
)
).scalars().all()
stmt = (
update(StudyMemoCard)
.where(
@@ -255,5 +231,5 @@ async def flag_cards_for_source(
)
.values(needs_review=True, flagged_by=reason, flagged_at=func.now())
)
await session.execute(stmt)
return list(published_ids)
result = await session.execute(stmt)
return result.rowcount or 0
-4
View File
@@ -50,10 +50,6 @@ class StudyQuizSession(Base):
chronic_remaining_count: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# study-to-viewer P2: 뷰어 ingest 멱등/출처. 라이브 세션=finalized_at·client_session_uuid NULL, source='live'.
finalized_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) # 멱등 마커(mig 373)
client_session_uuid: Mapped[str | None] = mapped_column(String(64)) # 뷰어 세션 UUID(mig 374, uq mig376)
source: Mapped[str] = mapped_column(String(20), nullable=False, default="live") # live|viewer(mig 375)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
-2
View File
@@ -42,7 +42,6 @@ _NEWS_WINDOW_SQL = text(f"""
AND d.created_at < :window_end
AND d.embedding IS NOT NULL
AND d.ai_summary IS NOT NULL
AND length(d.ai_summary) > 0
-- 안전 자료실 B-4: licensed_restricted 발행 차단 (digest 동일 공유 술어, 경로 일관성)
AND {restricted_exclude_sql("d")}
""")
@@ -67,7 +66,6 @@ _HISTORICAL_CANDIDATES_SQL = text(f"""
AND d.created_at < :hist_end
AND d.embedding IS NOT NULL
AND d.ai_summary IS NOT NULL
AND length(d.ai_summary) > 0
-- 안전 자료실 B-4: licensed_restricted 발행 차단 (공유 술어)
AND {restricted_exclude_sql("d")}
""")
-1
View File
@@ -42,7 +42,6 @@ _NEWS_WINDOW_SQL = text(f"""
AND d.created_at < :window_end
AND d.embedding IS NOT NULL
AND d.ai_summary IS NOT NULL
AND length(d.ai_summary) > 0
-- 안전 자료실 B-4: licensed_restricted 발행 차단 (모든 경로 공유 술어 = license_filter).
-- news 채널엔 현재 restricted 부재 = 방어적 게이트(미래 유료 news 소스 대비, 경로 누락 방지).
AND {restricted_exclude_sql("d")}
+1 -18
View File
@@ -42,21 +42,6 @@ _ENG = re.compile(
_FENCE = re.compile(r'^\s{0,3}(```|~~~)')
# ASME 절 식별자 (A-1): UG-79 · PG-27.4.1 · UW-11 · UCS-56 · A-69 · PFT-14
# (대문자 1~4 + 하이픈 + 숫자[.숫자]*). _detect_heading 의 ATX 분기에서 node_type='clause' 판정에 사용.
# 한국 법령(제N조)은 _KO_JO 가 별도 처리 — 본 패턴/정제와 무관(무회귀).
_ASME_CLAUSE = re.compile(r'^[A-Z]{1,4}-\d+(?:\.\d+)*\b')
def _clean_label(title: str) -> str:
r"""C-4: marker 가 박는 LaTeX/markdown/페이지번호 아티팩트 제거 — 절번호 패턴 매칭의 전처리 겸 표시 라벨 정제.
실데이터 : '$\textbf{PG-20.1 …} \hspace{0.2cm} \textbf{(25)}$' 'PG-20.1 …' / '(25) **A-69**' 'A-69'.
노이즈 없는 제목(한국 법령·일반 ATX ) inert(무회귀)."""
t = re.sub(r'\\textbf|\\textit|\\mathbf|\\hspace\{[^}]*\}|[${}]|\*\*', '', title)
t = re.sub(r'^\s*\(\d+\)\s*', '', t) # 선두 페이지번호 '(25) '
return re.sub(r'\s{2,}', ' ', t).strip()
def _utf16_units(s: str) -> int:
"""JS 문자열 .length(= UTF-16 code unit 수) 와 동일. astral(BMP 밖)=surrogate pair=2 units.
FE `raw.length` / `out.slice(off)` UTF-16 code unit 단위라 char_start 같은 단위여야 .
@@ -87,9 +72,7 @@ def _detect_heading(line: str) -> tuple[int, str, str] | None:
"""(level, title, node_type) 또는 None. level 은 상대 깊이."""
m = _ATX.match(line)
if m:
title = _clean_label(m.group("title").strip()) # C-4: LaTeX/md/페이지번호 정제(전처리)
nt = "clause" if _ASME_CLAUSE.match(title) else None # A-1: ASME 절 식별자(UG-79 등) → clause
return (len(m.group(1)), title, nt)
return (len(m.group(1)), m.group("title").strip(), None) # node_type 은 후처리에서
for pat, lvl, nt in ((_KO_JANG, 1, "chapter"), (_KO_JEOL, 2, "section"),
(_KO_JO, 3, "clause"), (_ENG, 1, "chapter")):
m = pat.match(line)
+1 -1
View File
@@ -102,7 +102,7 @@ async def classify(
# "MLX primary 호출 경로는 예외 없이 gate 획득 필수".
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client.call_classifier(prompt)
raw = await client._request(settings.ai.classifier, prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
+14 -19
View File
@@ -1,6 +1,6 @@
"""Time-aware retrieval freshness decay (PR-RAG-Time-1).
뉴스(source_channel='news') / 재해사례(material_type='incident', KOSHA) 도메인은
뉴스(source_channel='news') / 법령 알림(source_channel='law_monitor') 도메인은
시간이 중요한 문서. 단순 relevance score 만으로는 오래된 문서가 상위에 머물러
검색 품질이 떨어짐. 모듈은 reranker 이후 final score 합성 단계에서
soft multiplier 시간 가중치 적용. 삭제는 없음 ranking demote.
@@ -9,10 +9,9 @@ soft multiplier 로 시간 가중치 적용. 삭제는 없음 — ranking 만 de
- reranker = 의미 관련도, freshness decay = 운영 정책. 단계 분리 유지.
- floor 0.7 (multiplier 0.7 미만으로 떨어짐) 오래되어도 죽지 않음.
- 일반 업로드 / 학습 자료 / KGS Code 원문 / ai_drafted 비적용 (no-op).
- 법령(law) C-1 후속에서 freshness 제외 현행성은 version_status(B-1 버전체인) 처리.
published_date 컬럼이 documents 없음 created_at(수집 시점) 임시 proxy.
news/KOSHA 워커가 수집 즉시 indexing 하므로 created_at published_date.
news/law_monitor 워커가 수집 즉시 indexing 하므로 created_at published_date.
정확도 향상은 후속 PR (worker published_date 메타 채우기) 분리.
"""
@@ -33,10 +32,10 @@ if TYPE_CHECKING:
# ─── Policy ────────────────────────────────────────────────────────
# half-life (일). 90 일: 한 달 ~0.79 / 6개월 ~0.25.
# C-1 후속(2026-06-13): law_365d 폐기 — 법령 현행성은 version_status(B-1 버전체인)가 처리,
# age-decay 는 current 법령을 부당 강등(의도 변경 기록). 재해사례(incident)는 news_90d 흡수.
# 365 일: 1년 ~0.5 / 3년 ~0.13.
HALF_LIFE_DAYS: dict[str, int] = {
"news_90d": 90,
"law_365d": 365,
}
# soft multiplier — final = base * (FLOOR + (1-FLOOR) * decay).
@@ -53,35 +52,32 @@ class _DocMeta:
source_channel: str | None
content_origin: str | None
created_at: datetime | None
material_type: str | None = None
def freshness_policy(meta: _DocMeta | None) -> str | None:
"""문서 메타 → freshness 정책 이름 또는 None (no-op).
적용:
- material_type='incident' (KOSHA 재해사례/사망사고) news_90d (C-1 후속 흡수, 시간 민감)
- source_channel='news' news_90d
- source_channel='news' news_90d
- source_channel='law_monitor' law_365d
비적용 (None 반환):
- meta 자체가 None
- content_origin='ai_drafted' (생성 시점 = 가치 시점, 시간 demote 부적합)
- 법령(source_channel='law_monitor'/material_type='law'): C-1 후속에서 law_365d 폐기.
법령 현행성은 version_status(B-1 버전체인 current/superseded) 처리 age-decay
current 법령을 부당 강등(의도 변경 기록). law 검색 ranking = version_status decorate.
- 모든 source_channel (manual, drive_sync, inbox_route, memo 자연 비적용)
- 모든 source_channel (manual, drive_sync, inbox_route, memo,
Study/Manual/Reference/Academic/Checklist 자연 비적용)
"""
if meta is None:
return None
# 가드 2: content_origin='ai_drafted' 비적용
if meta.content_origin == "ai_drafted":
return None
# 재해사례/사망사고 = 시간 민감 → news 와 동일 90d (source 무관, 업로드 incident 도 포함)
if meta.material_type == "incident":
sc = meta.source_channel
if sc == "news":
return "news_90d"
if meta.source_channel == "news":
return "news_90d"
# 법령 law_365d 폐기 + unknown source_channel → no decay
if sc == "law_monitor":
return "law_365d"
# 가드 6: unknown source_channel → no decay
return None
@@ -133,7 +129,7 @@ async def _fetch_meta(
text(
"""
SELECT id, source_channel::text AS source_channel,
content_origin, material_type, created_at
content_origin, created_at
FROM documents
WHERE id = ANY(:ids)
"""
@@ -145,7 +141,6 @@ async def _fetch_meta(
source_channel=row.source_channel,
content_origin=row.content_origin,
created_at=row.created_at,
material_type=getattr(row, "material_type", None),
)
for row in rows
}
+3 -5
View File
@@ -11,7 +11,7 @@
## 핵심 원칙
- **Verifier strong 단독 refuse 금지** grounding strong 교차해야 refuse
- **Timeout 3s** 느리면 없는 낫다 (fail open)
- MLX gate 사용 (Mac mini 26B endpoint classifier/evidence 동일 gate 공유, 동시 race 방지)
- MLX gate 사용 (PR #20 이후 Mac mini 26B endpoint — concurrent 안전성 별 검토)
"""
from __future__ import annotations
@@ -25,7 +25,6 @@ from typing import TYPE_CHECKING, Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import Priority, acquire_mlx_gate
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
@@ -133,9 +132,8 @@ async def verify(
prompt = _build_input(answer, evidence)
client = AIClient()
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client.call_verifier(prompt)
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client._request(settings.ai.verifier, prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
-25
View File
@@ -1,25 +0,0 @@
"""채점(outcome) 산출 단일 소스 (study-to-viewer P2).
라이브 attempt 엔드포인트(submit_attempt) 뷰어 ingest **동일 함수** 채점
정오 어휘가 (서버)에서 결정(plan r2: ingest raw 신호 selected+unsure 싣고
DS 산출 = '무수정 재생' 실제로 성립시키는 형태). correct_choice 항상 현재 DB .
규칙(라이브 study_questions.py:1008-1020 동일):
is_unsure=True (None, False, 'unsure') # unsure 가 정오 override, selected 폐기
selected None ValueError # 선택 없고 unsure 도 아니면 무효(엔드포인트가 처리)
selected==correct (selected, is_correct, 'correct'|'wrong')
"""
from __future__ import annotations
def derive_outcome(
selected_choice: int | None, is_unsure: bool, correct_choice: int
) -> tuple[int | None, bool, str]:
"""(selected, is_correct, outcome) 반환. skipped 는 여기서 안 나옴(선택 없으면 호출측이 거부/skip)."""
if is_unsure:
return None, False, "unsure"
if selected_choice is None:
raise ValueError("selected_choice (1~4) 또는 is_unsure=true 가 필요합니다")
is_correct = selected_choice == correct_choice
return selected_choice, is_correct, ("correct" if is_correct else "wrong")
-174
View File
@@ -1,174 +0,0 @@
"""발행 outbox enqueue + 초기 백필 (docsrv-viewer-publish).
enqueue_publish: 저작/4-A 트랜잭션이 같은 session(=같은 Postgres tx)에서 호출 caller commit
(P0-1 규율: 콘텐츠 변경과 outbox INSERT 원자성, dual-write 회피). payload/hash 스냅샷.
enqueue_question_publish: 문항 + (ready면)해설을 함께 적재. 저작 쓰기/4-A 완료/백필 공용.
backfill_publish_questions: 기존 active 문항을 bounded 1 outbox 적재(초기 백필, P2-1 bounded page).
멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해).
주의: 저작 엔드포인트(study_questions create/update)·4-A 워커에서의 enqueue 결선은 P0-1b
(기존 hot 파일 수정이라 increment). 모듈은 호출 라이브러리 + 수동/백필 진입점.
"""
from __future__ import annotations
from typing import Any
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.published import PublishOutbox
from models.study_memo_card import StudyMemoCard
from models.study_memo_card_progress import StudyMemoCardProgress
from models.study_question import StudyQuestion
from models.study_topic import StudyTopic
from services.study.publish_projection import (
KIND_CARD,
KIND_CARD_PROGRESS,
KIND_EXPLANATION,
KIND_QUESTION,
KIND_TOPIC,
SCHEMA_VERSION,
payload_hash,
project_card,
project_card_progress,
project_explanation,
project_question,
project_topic,
)
async def enqueue_publish(
session: AsyncSession,
*,
kind: str,
source_id: int,
payload: dict[str, Any] | None,
deleted: bool = False,
) -> None:
"""outbox 1행 INSERT. caller 가 commit (저자 tx 동봉). deleted=True 면 tombstone(payload={})."""
body: dict[str, Any] = payload if payload is not None else {}
session.add(
PublishOutbox(
kind=kind,
source_id=source_id,
payload=body,
payload_hash=payload_hash(body),
schema_version=SCHEMA_VERSION,
deleted=deleted,
)
)
async def enqueue_question_publish(session: AsyncSession, q: Any) -> None:
"""문항 + (ready면)해설을 outbox 적재. caller commit."""
await enqueue_publish(session, kind=KIND_QUESTION, source_id=q.id, payload=project_question(q))
expl = project_explanation(q)
if expl is not None:
await enqueue_publish(session, kind=KIND_EXPLANATION, source_id=q.id, payload=expl)
async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
"""active(미삭제) 문항을 id>after_id 부터 bounded 로 outbox 적재.
반환 = enqueue 문항 (0 이면 ). 셋은 마지막 id 페이지 반복. caller commit.
"""
rows = (
await session.execute(
select(StudyQuestion)
.where(StudyQuestion.deleted_at.is_(None), StudyQuestion.id > after_id)
.order_by(StudyQuestion.id.asc())
.limit(limit)
)
).scalars().all()
for q in rows:
await enqueue_question_publish(session, q)
return len(rows)
async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None:
"""주제 메타를 outbox 적재(S-1). caller commit. 저작 create/update 결선 + 백필 공용."""
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=project_topic(topic))
async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
"""active(미삭제) 주제를 id>after_id 부터 bounded 로 outbox 적재(S-1 초기 백필).
반환 = enqueue 주제 (0 이면 ). 셋은 마지막 id 페이지 반복. caller commit.
멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해).
"""
rows = (
await session.execute(
select(StudyTopic)
.where(StudyTopic.deleted_at.is_(None), StudyTopic.id > after_id)
.order_by(StudyTopic.id.asc())
.limit(limit)
)
).scalars().all()
for t in rows:
await enqueue_topic_publish(session, t)
return len(rows)
async def enqueue_card_publish(session: AsyncSession, card: Any) -> None:
"""카드 상태 기반 발행/tombstone (S-2). caller commit.
검수완료(needs_review=False) & 미삭제 발행 (검수대기 복귀·삭제·retire)
tombstone(feed 1 삭제 이벤트). 발행 자격이 카드 상태에 매여 있어 호출측은 '카드를
건드렸다'만 알면 되고 publish/tombstone 분기는 여기 단일화(경로별 가드 기억 회피).
"""
if card.deleted_at is not None or card.needs_review:
await enqueue_publish(session, kind=KIND_CARD, source_id=card.id, payload=None, deleted=True)
else:
await enqueue_publish(session, kind=KIND_CARD, source_id=card.id, payload=project_card(card))
async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
"""검수완료(needs_review=False)·미삭제 카드를 id>after_id 부터 bounded 로 outbox 적재(S-2 초기 백필).
반환 = enqueue 카드 (0 이면 ). 멱등 = 워커 (payload_hash, deleted) 디둡. caller commit.
"""
rows = (
await session.execute(
select(StudyMemoCard)
.where(
StudyMemoCard.deleted_at.is_(None),
StudyMemoCard.needs_review.is_(False),
StudyMemoCard.id > after_id,
)
.order_by(StudyMemoCard.id.asc())
.limit(limit)
)
).scalars().all()
for c in rows:
await enqueue_card_publish(session, c)
return len(rows)
async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> None:
"""카드 SR progress row 발행(S-4). caller commit. rate_card 결과(ALL row, sentinel/terminal 포함)."""
await enqueue_publish(
session,
kind=KIND_CARD_PROGRESS,
source_id=progress.id,
payload=project_card_progress(progress),
)
async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
"""모든 card progress row 를 id>after_id 부터 bounded 로 outbox 적재(S-4 초기 백필).
필터 없음 = ALL row(due_at NULL sentinel·terminal 포함) due-only 백필은 sentinel 누락.
반환 = enqueue row (0 이면 ). 멱등 = 워커 디둡. caller commit.
"""
rows = (
await session.execute(
select(StudyMemoCardProgress)
.where(StudyMemoCardProgress.id > after_id)
.order_by(StudyMemoCardProgress.id.asc())
.limit(limit)
)
).scalars().all()
for p in rows:
await enqueue_card_progress_publish(session, p)
return len(rows)
-112
View File
@@ -1,112 +0,0 @@
"""발행 projection — 소스 행을 render-ready payload + 안정 해시로 변환 (순수 함수).
뷰어가 보는 '단일 진실' payload 까지 (DS 내부 실험 스키마는 계약 격리).
kind projector. payload_hash = 정렬된 JSON sha256 = (payload_hash, deleted) 디둡 .
주의(plan study-to-viewer-slice1 r2): 과목/시험메타를 per-question payload 인라인
bulk subject rename N행 churn. 정규화(과목= kind subject ref) churn 최적화 후속(P0-1b),
읽기 정합엔 무영향. 지금은 인라인(상관관계 단순)으로 두고 후속 PR 에서 분리.
SCHEMA_VERSION = 엔벨로프 버전. payload 모양 진화 bump + 뷰어 range 수용(P0-2).
"""
from __future__ import annotations
import hashlib
import json
from typing import Any
SCHEMA_VERSION = 1
KIND_QUESTION = "study_question"
KIND_EXPLANATION = "study_explanation"
KIND_TOPIC = "study_topic"
KIND_CARD = "study_card" # ★뷰어 pubstudy.ts 의 KIND_CARD 와 일치 필수(S-3 forward-contract).
KIND_CARD_PROGRESS = "study_card_progress" # 카드 SR 상태 read model (S-4, viewer C-4 소비).
def payload_hash(payload: dict[str, Any]) -> str:
"""정렬 JSON 의 sha256 — (payload_hash, deleted) 디둡 키. 키 순서/공백 비의존."""
canonical = json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
def project_question(q: Any) -> dict[str, Any]:
"""study_question → 발행 payload. 정답 포함(개인 학습툴, plan Q2). 이미지는 ref 만(P0-4, 후속)."""
return {
"topic_id": q.study_topic_id,
"question_text": q.question_text,
"choices": [q.choice_1, q.choice_2, q.choice_3, q.choice_4],
"correct_choice": q.correct_choice,
"subject": q.subject,
"scope": q.scope,
"exam_name": q.exam_name,
"exam_round": q.exam_round,
"exam_question_number": q.exam_question_number,
"explanation": q.explanation, # 수동 해설(있으면). AI 해설은 별 kind.
}
def project_explanation(q: Any) -> dict[str, Any] | None:
"""study_question 의 AI 해설 → 별 발행 kind. ready 일 때만(없으면 None=발행 안 함).
재조우 표시용 선발행. 신규 오답은 4-A 워커가 ~90s ready재발행(P2-3 결선, P0-1b).
"""
if getattr(q, "ai_explanation_status", None) != "ready" or not getattr(q, "ai_explanation", None):
return None
gen = getattr(q, "ai_explanation_generated_at", None)
return {
"question_source_id": q.id,
"explanation_md": q.ai_explanation,
"model": getattr(q, "ai_explanation_model", None),
"generated_at": gen.isoformat() if gen else None,
}
def project_card(c: Any) -> dict[str, Any]:
"""study_memo_card → 발행 payload (S-2). 순수 변환 — 발행 자격(needs_review=false &
미삭제) 판단은 호출측(enqueue_card_publish) 카드 상태로. payload 계약 = 뷰어
pubstudy.ts getCards 동형(format·cue·fact·cloze_text·source_question_id·source_generated_at).
"""
gen = getattr(c, "source_generated_at", None)
return {
"format": c.format,
"cue": c.cue,
"fact": c.fact,
"cloze_text": c.cloze_text,
"source_question_id": c.source_question_id,
"source_generated_at": gen.isoformat() if gen else None,
}
def project_card_progress(p: Any) -> dict[str, Any]:
"""study_memo_card_progress → 발행 payload (S-4) = 카드 SR 상태 read model.
ALL row 발행(due_at NULL sentinel=-on-new · terminal=졸업 포함). due-only 발행하면
sentinel 누락 viewer '미확인' 오분류. SR 계산은 DS(sr_schedule), 여긴 결과만.
card_id = pub_card source_id(=DS card.id) viewer C-4 pub_card LEFT JOIN 하는 .
"""
due = getattr(p, "due_at", None)
rev = getattr(p, "last_reviewed_at", None)
return {
"card_id": p.card_id,
"topic_id": p.study_topic_id,
"last_outcome": p.last_outcome,
"last_reviewed_at": rev.isoformat() if rev else None,
"due_at": due.isoformat() if due else None,
"review_stage": p.review_stage,
}
def project_topic(t: Any) -> dict[str, Any]:
"""study_topic → 발행 payload (S-1, plan study-viewer-port).
topic 메타만 신규 발행 viewer 주제 단위 퀴즈를 만들 최소 정보.
회차 목록은 발행 = viewer pub_content(study_question) exam_name/exam_round
파생(추가 발행 불요, plan S-1 결정). topic_id project_question topic_id(=study_topic_id)
동일 DS 식별자라 viewer 문항주제 상관에 사용(pub_id opaque 상관 아님).
"""
return {
"topic_id": t.id,
"name": t.name,
"exam_round_size": t.exam_round_size,
}
+1 -5
View File
@@ -40,7 +40,6 @@ from ai.client import (
)
from ai.envelope import EscalationEnvelope
from core.config import settings
from services.search.llm_gate import Priority, acquire_mlx_gate
from core.utils import setup_logger
from models.document import Document
from models.queue import StageDeferred, enqueue_stage
@@ -674,10 +673,7 @@ async def _run_tier_triage(
# 는 아래 generic except 에 먹히지 않게 먼저 전파.
raw_triage = await call_deep_or_defer(client, prompt, cfg=deep_triage_cfg)
else:
# consumer 경로 call_triage 는 PR #20 이후 primary 와 동일 Mac mini endpoint —
# evidence/classifier 처럼 gate 안에서 호출(영구 룰: 같은 endpoint 예외 없이 gate).
async with acquire_mlx_gate(Priority.BACKGROUND):
raw_triage = await client.call_triage(prompt)
raw_triage = await client.call_triage(prompt)
except StageDeferred:
raise # drain 이 attempts 미소모 + 백오프로 처리 (sleep-안전)
except Exception as exc:
+5 -11
View File
@@ -374,17 +374,11 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
totals = {"page": 0, "pdf": 0, "skip": 0}
for i, (url, lastmod) in enumerate(todo, 1):
# 2026-06-20 C2: URL 1건 실패가 주간 run 전체를 중단(이후 URL 스킵·watermark 정지)하던 것 차단.
# 각 iteration 은 자체 session(async with) 이라 실패 격리 — 건너뛰고 계속.
try:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
counts = await _ingest_url(session, src, url, lastmod)
_set_watermark(src, lastmod)
await session.commit()
except Exception as e:
logger.error(f"[csb] URL 처리 실패 (건너뜀): {url}{str(e) or repr(e)}")
continue
async with async_session() as session:
src = await session.get(NewsSource, source_id)
counts = await _ingest_url(session, src, url, lastmod)
_set_watermark(src, lastmod)
await session.commit()
for k in totals:
totals[k] += counts[k]
if i % 10 == 0:
+367
View File
@@ -0,0 +1,367 @@
"""법령 모니터 워커 — 국가법령정보센터 API 연동
26 법령 모니터링, / 단위 분할 저장, 변경 이력 추적.
매일 07:00 실행 (APScheduler).
"""
import os
import re
from datetime import date, datetime, timezone
from pathlib import Path
from xml.etree import ElementTree as ET
import httpx
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import create_caldav_todo, file_hash, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import enqueue_stage
logger = setup_logger("law_monitor")
LAW_SEARCH_URL = "https://www.law.go.kr/DRF/lawSearch.do"
LAW_SERVICE_URL = "https://www.law.go.kr/DRF/lawService.do"
# 모니터링 대상 법령 (26개)
MONITORED_LAWS = [
# 산업안전보건 핵심
"산업안전보건법",
"산업안전보건법 시행령",
"산업안전보건법 시행규칙",
"산업안전보건기준에 관한 규칙",
"유해위험작업의 취업 제한에 관한 규칙",
"중대재해 처벌 등에 관한 법률",
"중대재해 처벌 등에 관한 법률 시행령",
# 건설안전
"건설기술 진흥법",
"건설기술 진흥법 시행령",
"건설기술 진흥법 시행규칙",
"시설물의 안전 및 유지관리에 관한 특별법",
# 위험물/화학
"위험물안전관리법",
"위험물안전관리법 시행령",
"위험물안전관리법 시행규칙",
"화학물질관리법",
"화학물질관리법 시행령",
"화학물질의 등록 및 평가 등에 관한 법률",
# 소방/전기/가스
"소방시설 설치 및 관리에 관한 법률",
"소방시설 설치 및 관리에 관한 법률 시행령",
"전기사업법",
"전기안전관리법",
"고압가스 안전관리법",
"고압가스 안전관리법 시행령",
"액화석유가스의 안전관리 및 사업법",
# 근로/환경
"근로기준법",
"환경영향평가법",
]
async def run():
"""법령 변경 모니터링 실행"""
law_oc = os.getenv("LAW_OC", "")
if not law_oc:
logger.warning("LAW_OC 미설정 — 법령 API 승인 대기 중")
return
async with async_session() as session:
state = await session.execute(
select(AutomationState).where(AutomationState.job_name == "law_monitor")
)
state_row = state.scalar_one_or_none()
last_check = state_row.last_check_value if state_row else None
today = datetime.now(timezone.utc).strftime("%Y%m%d")
if last_check == today:
logger.info("오늘 이미 체크 완료")
return
new_count = 0
async with httpx.AsyncClient(timeout=30) as client:
for law_name in MONITORED_LAWS:
try:
count = await _check_law(client, law_oc, law_name, session)
new_count += count
except Exception as e:
logger.error(f"[{law_name}] 체크 실패: {e}")
# 상태 업데이트
if state_row:
state_row.last_check_value = today
state_row.last_run_at = datetime.now(timezone.utc)
else:
session.add(AutomationState(
job_name="law_monitor",
last_check_value=today,
last_run_at=datetime.now(timezone.utc),
))
await session.commit()
logger.info(f"법령 모니터 완료: {new_count}건 신규/변경 감지")
async def _check_law(
client: httpx.AsyncClient,
law_oc: str,
law_name: str,
session,
) -> int:
"""단일 법령 검색 → 변경 감지 → 분할 저장"""
# 법령 검색 (lawSearch.do)
resp = await client.get(
LAW_SEARCH_URL,
params={"OC": law_oc, "target": "law", "type": "XML", "query": law_name},
)
resp.raise_for_status()
root = ET.fromstring(resp.text)
total = root.findtext(".//totalCnt", "0")
if total == "0":
logger.debug(f"[{law_name}] 검색 결과 없음")
return 0
# 정확히 일치하는 법령 찾기
for law_elem in root.findall(".//law"):
found_name = law_elem.findtext("법령명한글", "").strip()
if found_name != law_name:
continue
mst = law_elem.findtext("법령일련번호", "")
proclamation_date = law_elem.findtext("공포일자", "")
revision_type = law_elem.findtext("제개정구분명", "")
if not mst:
continue
# 이미 등록된 법령인지 확인 (같은 법령명 + 공포일자)
existing = await session.execute(
select(Document).where(
Document.title.like(f"{law_name}%"),
Document.source_channel == "law_monitor",
)
)
existing_docs = existing.scalars().all()
# 같은 공포일자 이미 있으면 skip
for doc in existing_docs:
if proclamation_date in (doc.title or ""):
return 0
# 이전 공포일 찾기 (변경 이력용)
prev_date = ""
if existing_docs:
prev_date = max(
(re.search(r'\d{8}', doc.title or "").group() for doc in existing_docs
if re.search(r'\d{8}', doc.title or "")),
default=""
)
# 본문 조회 (lawService.do)
text_resp = await client.get(
LAW_SERVICE_URL,
params={"OC": law_oc, "target": "law", "MST": mst, "type": "XML"},
)
text_resp.raise_for_status()
# 분할 저장
count = await _save_law_split(
session, text_resp.text, law_name, proclamation_date,
revision_type, prev_date,
)
# DB 먼저 커밋 (알림 실패가 저장을 막지 않도록)
await session.commit()
# CalDAV + SMTP 알림 (실패해도 무시)
try:
_send_notifications(law_name, proclamation_date, revision_type)
except Exception as e:
logger.warning(f"[{law_name}] 알림 발송 실패 (무시): {e}")
return count
return 0
async def _save_law_split(
session, xml_text: str, law_name: str, proclamation_date: str,
revision_type: str, prev_date: str,
) -> int:
"""법령 XML → 장(章) 단위 Markdown 분할 저장"""
root = ET.fromstring(xml_text)
# 조문단위에서 장 구분자 찾기 (조문키가 000으로 끝나는 조문)
units = root.findall(".//조문단위")
chapters = [] # [(장제목, [조문들])]
current_chapter = None
current_articles = []
for unit in units:
key = unit.attrib.get("조문키", "")
content = (unit.findtext("조문내용", "") or "").strip()
# 장 구분자: 키가 000으로 끝나고 내용에 "제X장" 포함
if key.endswith("000") and re.search(r"\d+장", content):
# 이전 장/서문 저장
if current_articles:
chapter_name = current_chapter or "서문"
chapters.append((chapter_name, current_articles))
chapter_match = re.search(r"(제\d+장\s*.+)", content)
current_chapter = chapter_match.group(1).strip() if chapter_match else content.strip()
current_articles = []
else:
current_articles.append(unit)
# 마지막 장 저장
if current_articles:
chapter_name = current_chapter or "서문"
chapters.append((chapter_name, current_articles))
# 장 분할 성공
sections = []
if chapters:
for chapter_title, articles in chapters:
md_lines = [f"# {law_name}\n", f"## {chapter_title}\n"]
for article in articles:
title = article.findtext("조문제목", "")
content = article.findtext("조문내용", "")
if title:
md_lines.append(f"\n### {title}\n")
if content:
md_lines.append(content.strip())
section_name = _safe_name(chapter_title)
sections.append((section_name, "\n".join(md_lines)))
else:
# 장 분할 실패 → 전체 1파일
full_md = _law_xml_to_markdown(xml_text, law_name)
sections.append(("전문", full_md))
# 각 섹션 저장
inbox_dir = Path(settings.nas_mount_path) / "PKM" / "Inbox"
inbox_dir.mkdir(parents=True, exist_ok=True)
count = 0
for section_name, content in sections:
filename = f"{law_name}_{proclamation_date}_{section_name}.md"
file_path = inbox_dir / filename
file_path.write_text(content, encoding="utf-8")
rel_path = str(file_path.relative_to(Path(settings.nas_mount_path)))
# 변경 이력 메모
note = ""
if prev_date:
note = (
f"[자동] 법령 개정 감지\n"
f"이전 공포일: {prev_date}\n"
f"현재 공포일: {proclamation_date}\n"
f"개정구분: {revision_type}"
)
# 안전 자료실 A-2 — 공포일 파싱 (law published_date = COALESCE(시행일, 공포일) 계약,
# 본 레거시 워커는 공포일만 보유 — 시행일 기반 버전 체인은 B-1 statute_collector 소관)
_digits = re.sub(r"\D", "", str(proclamation_date or ""))
pub_date = None
if len(_digits) == 8:
try:
pub_date = date(int(_digits[:4]), int(_digits[4:6]), int(_digits[6:8]))
except ValueError:
pub_date = None
doc = Document(
file_path=rel_path,
file_hash=file_hash(file_path),
file_format="md",
file_size=len(content.encode()),
file_type="immutable",
title=f"{law_name} ({proclamation_date}) {section_name}",
source_channel="law_monitor",
data_origin="work",
category="law",
# 안전 자료실 A-2 — ingest 시점 deterministic. 법령 텍스트 = 저작권법 제7조
# 비보호 저작물 (public domain). 본 워커는 휴면(LAW_OC 미설정)이나 코드 경로 유지.
material_type="law",
jurisdiction="KR",
published_date=pub_date,
extract_meta={"license": {"scheme": "public_domain", "redistribute": True,
"attribution": "국가법령정보센터"}},
user_note=note or None,
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
count += 1
logger.info(f"[법령] {law_name} ({proclamation_date}) → {count}개 섹션 저장")
return count
def _xml_section_to_markdown(elem) -> str:
"""XML 섹션(편/장)을 Markdown으로 변환"""
lines = []
for article in elem.iter():
tag = article.tag
text = (article.text or "").strip()
if not text:
continue
if "" in tag:
lines.append(f"\n### {text}\n")
elif "" in tag:
lines.append(f"\n{text}\n")
elif "" in tag:
lines.append(f"- {text}")
elif "" in tag:
lines.append(f" - {text}")
else:
lines.append(text)
return "\n".join(lines)
def _law_xml_to_markdown(xml_text: str, law_name: str) -> str:
"""법령 XML 전체를 Markdown으로 변환"""
root = ET.fromstring(xml_text)
lines = [f"# {law_name}\n"]
for elem in root.iter():
tag = elem.tag
text = (elem.text or "").strip()
if not text:
continue
if "" in tag and "제목" not in tag:
lines.append(f"\n## {text}\n")
elif "" in tag and "제목" not in tag:
lines.append(f"\n## {text}\n")
elif "" in tag:
lines.append(f"\n### {text}\n")
elif "" in tag:
lines.append(f"\n{text}\n")
elif "" in tag:
lines.append(f"- {text}")
elif "" in tag:
lines.append(f" - {text}")
return "\n".join(lines)
def _safe_name(name: str) -> str:
"""파일명 안전 변환"""
return re.sub(r'[^\w가-힣-]', '_', name).strip("_")
def _send_notifications(law_name: str, proclamation_date: str, revision_type: str):
"""CalDAV 할일 알림 (SMTP 발송은 2026-06-10 폐기 — CalDAV 가 단일 알림 채널)"""
caldav_url = os.getenv("CALDAV_URL", "")
caldav_user = os.getenv("CALDAV_USER", "")
caldav_pass = os.getenv("CALDAV_PASS", "")
if caldav_url and caldav_user:
create_caldav_todo(
caldav_url, caldav_user, caldav_pass,
title=f"법령 검토: {law_name}",
description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}",
due_days=7,
)
-6
View File
@@ -307,10 +307,6 @@ async def _process_single(
# ---- (7) image persist + md_content rewrite (Phase 1B.5) ----
md_content_raw = data["md_content"]
# 2026-06-20 H1: 빈 추출(스캔/이미지 PDF)을 md_status=success + 빈 md 로 박제 X
# (계약: md_status in {success,partial} => md 非공백). office arm 동형 raise → queue 재시도 후 failed.
if not md_content_raw.strip():
raise ValueError("empty md_content (blank extraction) — success 박제 차단")
images_resp = data.get("images") if MARKDOWN_IMAGE_PERSIST else None
saved_images: list[dict[str, Any]] = []
@@ -657,8 +653,6 @@ async def _process_split(
md_status = "success" if not failed else "partial"
stitched = "\n\n".join(b["md"] for b in succeeded)
if not stitched.strip():
raise ValueError("empty stitched md_content (all batches blank) — success 박제 차단")
md_content = _build_large_md_content(stitched[:LARGE_DOC_MD_CONTENT_HEAD_CHARS], manifest)
quality = _compute_quality(stitched, doc.extracted_text or "", {"page_count": page_count})
+13 -25
View File
@@ -213,25 +213,17 @@ async def _run_locked():
result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True)
)
source_ids = [s.id for s in result.scalars().all()]
sources = result.scalars().all()
if not source_ids:
logger.info("활성화된 뉴스 소스 없음")
return
if not sources:
logger.info("활성화된 뉴스 소스 없음")
return
# 2026-06-20 H3: 소스마다 독립 세션 — 한 소스의 DB 오류가 종단 단일 commit 을 깨뜨려
# 전 소스 insert 를 잃던 것 차단. 실패 시 rollback 후 깨끗한 상태에서 failure 기록.
# (csb_collector 의 per-iteration 세션 패턴과 동형.)
total = 0
for sid in source_ids:
async with async_session() as session:
source = await session.get(NewsSource, sid)
if source is None:
continue
sname = source.name
health = await _get_or_create_health(session, sid)
total = 0
for source in sources:
health = await _get_or_create_health(session, source.id)
if not _should_attempt(health, now):
logger.info(f"[{sname}] circuit {health.circuit_state} — 이번 사이클 skip")
logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip")
continue
try:
if source.feed_type == "api":
@@ -242,18 +234,14 @@ async def _run_locked():
source.last_fetched_at = datetime.now(timezone.utc)
_record_success(health, count, status == "not_modified", now)
total += count
await session.commit()
except Exception as e:
# str 이 빈 예외(httpx.ConnectError('')) 대비 — health 기록과 동일 규칙
await session.rollback()
logger.error(f"[{sname}] 수집 실패: {str(e) or repr(e)}")
health = await _get_or_create_health(session, sid)
src = await session.get(NewsSource, sid)
if src is not None:
src.last_fetched_at = datetime.now(timezone.utc)
logger.error(f"[{source.name}] 수집 실패: {str(e) or repr(e)}")
source.last_fetched_at = datetime.now(timezone.utc)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB
+2 -8
View File
@@ -25,7 +25,6 @@ import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from core.config import settings
from models.study_question import StudyQuestion
from models.study_question_job import StudyQuestionJob
from services.search.llm_gate import Priority, acquire_mlx_gate
@@ -33,12 +32,11 @@ from services.study.explanation_rag import (
gather_explanation_context,
render_evidence_block,
)
from services.study.publish_enqueue import enqueue_question_publish
logger = logging.getLogger(__name__)
# 2026-06-20: config 단일소스 (구 하드코딩 30s = 빠른 Gemma 기준, Qwen 27B 교체 sweep 누락).
LLM_TIMEOUT_S = settings.llm_call_timeout_s
# PR-3 LLM_TIMEOUT_S 와 동일 안전 마진 (26B 평균 ~10s, gate 직렬화 고려)
LLM_TIMEOUT_S = 30.0
# explanation_md hard cap — 운영 데이터 793/838/866자 사례에서 1200 으로 시작
# (800 은 공식·오답·핵심개념 묶이는 기사시험 풀이에 빡빡함). 1차 운영 후 조정.
@@ -228,10 +226,6 @@ async def run_explanation_job(session: AsyncSession, job: StudyQuestionJob) -> N
question.ai_explanation_model = f"mlx:{primary_name}"
question.updated_at = question.ai_explanation_generated_at
# 발행 재투영(같은 tx, caller commit) — 4-A 해설 ready → 문항+해설 발행. P0-1b.
if settings.study_publish_enabled:
await enqueue_question_publish(session, question)
job.status = "completed"
job.completed_at = now()
return
+3 -10
View File
@@ -24,7 +24,6 @@ import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from core.config import settings
from models.study_memo_card import (
append_card,
append_card_evidence,
@@ -34,8 +33,6 @@ from models.study_memo_card_job import StudyMemoCardJob
from models.study_question import StudyQuestion
from models.user import User # noqa: F401 (mapper 초기화 defensive)
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.study.publish_enqueue import enqueue_publish
from services.study.publish_projection import KIND_CARD
from services.study.explanation_rag import (
gather_explanation_context,
render_evidence_block,
@@ -44,8 +41,8 @@ from services.study.study_memo_card_guards import guard_cards
logger = logging.getLogger("study_memo_card_worker")
# 2026-06-20: config 단일소스 (구 하드코딩 45s = 빠른 Gemma 기준).
CARD_LLM_TIMEOUT_S = settings.llm_call_timeout_s
# 다카드 출력이라 explanation(30s)보다 여유. config primary.timeout(180, soft-lock)은 미변경.
CARD_LLM_TIMEOUT_S = 45.0
SOURCE_KIND_QUESTION = "question"
_ENVELOPE_PROMPT_FILE = "study_card_envelope.txt"
@@ -186,13 +183,9 @@ async def run_card_extract_job(session: AsyncSession, job: StudyMemoCardJob) ->
return
# 5. 성공 — 구버전 카드 retire 후 append (dedup partial unique 충돌 회피).
retired_published_ids = await supersede_old_cards(
await supersede_old_cards(
session, source_question_id=question.id, keep_generated_at=source_version
)
# 발행 중이던 구버전 카드 tombstone(같은 tx) — 재추출 retire 후 viewer stale 잔류 0. S-2.
if settings.study_publish_enabled:
for cid in retired_published_ids:
await enqueue_publish(session, kind=KIND_CARD, source_id=cid, payload=None, deleted=True)
model_name = f"mlx:{primary_name}"
inserted = 0
for g in guarded:
-120
View File
@@ -1,120 +0,0 @@
"""발행 워커 — publish_outbox drain → published 에 rev 부여 (docsrv-viewer-publish).
APScheduler 1(max_instances=1). pg_advisory_xact_lock 단일 라이터 rev 커밋순 gapless
(인플라이트 차단: bigserial seq 폴링이 아니라 outbox id + 단일 라이터 rev 부여).
outbox id(커밋순) 순으로 처리, (kind, source_id) published upsert:
- 기존 행과 (payload_hash, deleted) 동일 no-op(디둡, rev 올림) + processed 마킹
- pub_id 재사용(기존)|신규 uuid, rev = MAX(rev)+1, payload/hash/deleted 갱신
tombstone(deleted=True) 디둡 복합키라 삼켜짐. 배치 단일 트랜잭션.
배치 같은 (kind, source_id) 오면 flush 직전 반영을 다음 select 보게 (최신 ).
study_publish_enabled=False(기본) no-op 저자/4-A enqueue 결선(P0-1b) 전까지 inert.
"""
from __future__ import annotations
import uuid
from datetime import datetime, timezone
from sqlalchemy import func, select, text
from core.config import settings
from core.database import async_session
from core.utils import setup_logger
from models.published import Published, PublishOutbox
logger = setup_logger("study_publish_worker")
BATCH_SIZE = 500
# pg_advisory_xact_lock 전역 단일 라이터 키(발행 워커 전용 임의 상수, 타 advisory 락과 비충돌).
ADVISORY_LOCK_KEY = 838201
async def consume_publish_outbox() -> None:
"""APScheduler 진입점. 미처리 outbox 를 rev 부여하며 published 로 반영."""
if not settings.study_publish_enabled:
logger.debug("study_publish 비활성 (study_publish_enabled=false)")
return
async with async_session() as session:
try:
# 1) 전역 단일 라이터 락(트랜잭션 스코프 — commit/rollback 시 자동 해제).
await session.execute(
text("SELECT pg_advisory_xact_lock(:k)").bindparams(k=ADVISORY_LOCK_KEY)
)
# 2) 현재 최대 rev.
max_rev = int(
(await session.execute(select(func.coalesce(func.max(Published.rev), 0)))).scalar() or 0
)
# 3) 미처리 outbox 를 커밋순(id)으로.
rows = (
await session.execute(
select(PublishOutbox)
.where(PublishOutbox.processed_at.is_(None))
.order_by(PublishOutbox.id.asc())
.limit(BATCH_SIZE)
)
).scalars().all()
if not rows:
return
now = datetime.now(timezone.utc)
published_count = 0
for ob in rows:
existing = (
await session.execute(
select(Published).where(
Published.kind == ob.kind,
Published.source_id == ob.source_id,
)
)
).scalar_one_or_none()
# (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림.
if (
existing is not None
and existing.payload_hash == ob.payload_hash
and existing.deleted == ob.deleted
):
ob.processed_at = now
continue
max_rev += 1
if existing is None:
session.add(
Published(
kind=ob.kind,
source_id=ob.source_id,
pub_id=uuid.uuid4().hex,
payload=ob.payload,
payload_hash=ob.payload_hash,
schema_version=ob.schema_version,
rev=max_rev,
deleted=ob.deleted,
created_at=now,
updated_at=now,
)
)
else:
existing.payload = ob.payload
existing.payload_hash = ob.payload_hash
existing.schema_version = ob.schema_version
existing.deleted = ob.deleted
existing.rev = max_rev
existing.updated_at = now
ob.processed_at = now
# 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승).
await session.flush()
published_count += 1
await session.commit()
logger.info(
"publish_outbox_drained scanned=%s published=%s max_rev=%s",
len(rows),
published_count,
max_rev,
)
except Exception as e:
await session.rollback()
logger.exception("publish_outbox_drain_failed: %s", e)
+2 -3
View File
@@ -28,7 +28,6 @@ from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from core.config import settings
from models.study_question import StudyQuestion, StudyQuestionAttempt
from models.study_quiz_session import StudyQuizSession
from models.study_quiz_session_analysis import StudyQuizSessionAnalysis
@@ -43,8 +42,8 @@ from services.study.session_summary_rag import gather_session_summary_context
logger = logging.getLogger(__name__)
# 2026-06-20: config 단일소스 (구 하드코딩 30s = 빠른 Gemma 기준).
LLM_TIMEOUT_S = settings.llm_call_timeout_s
# 4-A 와 동일 안전 마진 (26B 평균 ~10s, gate 직렬화 고려)
LLM_TIMEOUT_S = 30.0
# wrong/unsure 5 미만은 분석 의미 X — insufficient_attempts skip
MIN_ATTEMPTS_FOR_ANALYSIS = 5
# 큰 세션 (84건 등) 에서 prompt 과대 + LLM timeout 방어. 가장 최근 attempt 기준 cap.
+1 -6
View File
@@ -91,12 +91,7 @@ async def process(document_id: int, session: AsyncSession, *, use_deep: bool = F
# sleep-안전 불변식: 쓰기는 전체 완주 후에만 — 중간 절단은 StageDeferred 로 빠져
# 이 지점에 도달하지 않는다 (carry 는 로컬 변수, doc 무변경).
final_summary = strip_thinking(summary)
# 2026-06-20 H2: 빈/think-only 요약을 ai_summary 빈문자열로 박제 → completed 마크 → briefing/digest 누출.
# raise → queue 재시도 후 failed(가시화). 기존 raise 계약(not-found·empty-text)과 동형.
if not final_summary.strip():
raise ValueError(f"empty ai_summary after strip (document_id={document_id})")
doc.ai_summary = final_summary
doc.ai_summary = strip_thinking(summary)
doc.ai_model_version = used_cfg.model
doc.ai_processed_at = datetime.now(timezone.utc)
logger.info(
-7
View File
@@ -27,8 +27,6 @@ ai:
context_char_limit: 260000
temperature: 0.3
top_p: 0.9
repetition_penalty: 1.05 # 한국어 장문 반복/코드스위칭(CJK·라틴 누수) 억제 (보수적 시작값)
top_k: 20 # Qwen3 권장
# deep: 야간 night-drain 전용 — 맥북 M5 Max Qwen3.6-27B-6bit (llm-router :8890 경유,
# model=qwen-macbook alias). 2026-06-11 재도입 (사용자: 자기 전 night-drain 으로 백로그 분담).
@@ -43,8 +41,6 @@ ai:
context_char_limit: 260000
temperature: 0.3
top_p: 0.9
repetition_penalty: 1.05 # 한국어 장문 반복/코드스위칭 억제 (보수적 시작값)
top_k: 20
# fallback: primary 장애 시 최후 방어선. Claude Sonnet 4 API (소액 한도, 자동 trigger).
# 호출 빈도 낮음 가정 (Mac mini 가 거의 항상 up) → premium 과 budget 공유 OK.
@@ -212,6 +208,3 @@ pipeline:
digest_llm_timeout_s: 300
digest_llm_attempts: 2
digest_pipeline_hard_cap_s: 5400
# 2026-06-20: study/analyze 단일 primary-call 타임아웃 (구 하드코딩 30~60s = 빠른 Gemma 기준).
# Qwen 27B(콜당 ~40~150s)에 맞춰 단일소스화 — 구 30s 즉사 = 사용자 504 + 워커 영구 재시도.
llm_call_timeout_s: 300
+135
View File
@@ -0,0 +1,135 @@
# Phase 2A — Embedding candidate compose override (Diagnose only)
#
# Profile-isolated: `--profile embed-cand` 명시 opt-in. default up 시 미기동.
# production fastapi/postgres/reranker 에 영향 0.
# 본 PR 종료 시 별 chore (PR-2A-Chunks-Cand-Cleanup-1) 에서 제거.
#
# 후보 상태 (2026-05-23):
# - me5_large_inst : ✅ smoke PASS (dim 1024)
# - bge_mgemma2 : ❌ Phase 2A-Extended 별 PR 이관 (9B FP16 → VRAM OOM risk + 다운로드 cost)
# - me5_ko : ❌ 폐기 (401 Unauthorized, gated/모델명 부정확)
# - snowflake_l_v2 : 신규 추가 (Snowflake/snowflake-arctic-embed-l-v2.0, 2024-12, multilingual 강화)
#
# 사용:
# docker compose -f docker-compose.yml -f docker-compose.override.cand.yml \
# --profile embed-cand up -d embedding-cand-me5-inst
#
# 호출 (DS network 내부):
# http://embedding-cand-me5-inst:80/embed
# http://embedding-cand-snowflake-l-v2:80/embed
services:
embedding-cand-me5-inst:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
restart: unless-stopped
container_name: hyungi_document_server-embedding-cand-me5-inst-1
expose:
- "80"
environment:
- MODEL_ID=intfloat/multilingual-e5-large-instruct
- MAX_BATCH_TOKENS=8192
- MAX_CONCURRENT_REQUESTS=4
volumes:
- embedding_cand_me5_inst_cache:/data
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
profiles: ["embed-cand"]
embedding-cand-snowflake-l-v2:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
restart: unless-stopped
container_name: hyungi_document_server-embedding-cand-snowflake-l-v2-1
expose:
- "80"
environment:
- MODEL_ID=Snowflake/snowflake-arctic-embed-l-v2.0
- MAX_BATCH_TOKENS=8192
- MAX_CONCURRENT_REQUESTS=4
volumes:
- embedding_cand_snowflake_l_v2_cache:/data
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
profiles: ["embed-cand"]
# ===== 비활성 후보 (Phase 2A-Extended 별 PR 이관 또는 폐기) =====
# 진단 박제만 보존. 본 PR scope 외.
embedding-cand-bge-mgemma2:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
container_name: hyungi_document_server-embedding-cand-bge-mgemma2-1
expose:
- "80"
environment:
- MODEL_ID=BAAI/bge-multilingual-gemma2
- MAX_BATCH_TOKENS=8192
- MAX_CONCURRENT_REQUESTS=4
volumes:
- embedding_cand_bge_mgemma2_cache:/data
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 300s
profiles: ["embed-cand-extended"] # 본 PR 미사용. extended 별 profile.
embedding-cand-me5-ko:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
container_name: hyungi_document_server-embedding-cand-me5-ko-1
expose:
- "80"
environment:
- MODEL_ID=dragonkue/multilingual-e5-large-ko
- MAX_BATCH_TOKENS=8192
- MAX_CONCURRENT_REQUESTS=4
volumes:
- embedding_cand_me5_ko_cache:/data
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
profiles: ["embed-cand-disabled"] # 401 fail. 사용 X.
volumes:
embedding_cand_me5_inst_cache:
embedding_cand_snowflake_l_v2_cache:
embedding_cand_bge_mgemma2_cache:
embedding_cand_me5_ko_cache:
+101
View File
@@ -0,0 +1,101 @@
# Phase 2B — Reranker candidate compose override (Diagnose only)
#
# Profile-isolated: `--profile rerank-cand` 명시 opt-in. default up 시 미기동.
# production fastapi/postgres/reranker(bge-reranker-v2-m3) 에 영향 0.
# 본 PR 종료 후 별 chore (PR-2B-Rerank-Cand-Cleanup-1) 에서 제거.
#
# 후보 상태 (2026-05-23):
# - gte_ml_base : Apache 2.0, 305M, smoke 대기
# - mxbai_large : Apache 2.0, ~435M, safetensors 부재 — TEI smoke risk
# - bge_v2_gemma_2b : Gemma 라이센스, 2.5B FP16 ~5GB, smoke 대기
#
# 사용:
# docker compose -f docker-compose.yml -f docker-compose.override.rerank-cand.yml \
# --profile rerank-cand up -d rerank-cand-gte-ml-base
services:
rerank-cand-gte-ml-base:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
restart: unless-stopped
container_name: hyungi_document_server-rerank-cand-gte-ml-base-1
expose:
- "80"
environment:
- MODEL_ID=Alibaba-NLP/gte-multilingual-reranker-base
- MAX_BATCH_TOKENS=8192
- MAX_CONCURRENT_REQUESTS=4
volumes:
- rerank_cand_gte_ml_base_cache:/data
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
profiles: ["rerank-cand"]
rerank-cand-mxbai-large:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
restart: unless-stopped
container_name: hyungi_document_server-rerank-cand-mxbai-large-1
expose:
- "80"
environment:
- MODEL_ID=mixedbread-ai/mxbai-rerank-large-v1
- MAX_BATCH_TOKENS=8192
- MAX_CONCURRENT_REQUESTS=4
volumes:
- rerank_cand_mxbai_large_cache:/data
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 60s
profiles: ["rerank-cand"]
rerank-cand-bge-v2-gemma-2b:
image: ghcr.io/huggingface/text-embeddings-inference:1.7
restart: unless-stopped
container_name: hyungi_document_server-rerank-cand-bge-v2-gemma-2b-1
expose:
- "80"
environment:
- MODEL_ID=BAAI/bge-reranker-v2-gemma
- MAX_BATCH_TOKENS=8192
- MAX_CONCURRENT_REQUESTS=2
volumes:
- rerank_cand_bge_v2_gemma_2b_cache:/data
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-fsS", "http://localhost/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 120s
profiles: ["rerank-cand"]
volumes:
rerank_cand_gte_ml_base_cache:
rerank_cand_mxbai_large_cache:
rerank_cand_bge_v2_gemma_2b_cache:
+1 -13
View File
@@ -16,8 +16,6 @@ services:
timeout: 5s
retries: 5
restart: unless-stopped
# 2026-06-20 tier-0 무장: 글로벌 OOM 시 커널이 postgres(prod DB)를 reap 하지 않도록.
oom_score_adj: -900
kordoc-service:
build: ./services/kordoc
@@ -61,7 +59,6 @@ services:
# 동기 do_parse 버그 회피 위해 server.py 는 async aio_do_parse 사용. 포트 3301.
mineru-service:
build: ./services/mineru
mem_limit: 16g # 2026-06-20: VLM 스파이크 봉쇄 (steady ~12GB) — 호스트 30GB 글로벌 OOM 차단
ports:
- "127.0.0.1:3301:3301"
expose:
@@ -176,7 +173,6 @@ services:
fastapi:
build: ./app
oom_score_adj: -900 # 2026-06-20 tier-0 무장 (앱+스케줄러 SPOF 보호)
ports:
- "100.110.63.63:8000:8000"
volumes:
@@ -210,14 +206,6 @@ services:
# PR-MacMini-Derived-Worker-1
- STUDY_EXPLANATION_ENABLED=${STUDY_EXPLANATION_ENABLED:-true}
- INTERNAL_WORKER_TOKEN=${INTERNAL_WORKER_TOKEN}
# docsrv-viewer-publish: 발행 워커/저작 enqueue 게이트(기본 false=inert) + 뷰어↔DS feed Bearer.
- STUDY_PUBLISH_ENABLED=${STUDY_PUBLISH_ENABLED:-false}
- DIGEST_PUBLISH_ENABLED=${DIGEST_PUBLISH_ENABLED:-false}
- MAINTENANCE_MODE=${MAINTENANCE_MODE:-false}
- MAINTENANCE_NOTE=${MAINTENANCE_NOTE:-}
- VIEWER_SYNC_TOKEN=${VIEWER_SYNC_TOKEN:-}
# study-to-viewer P2: 뷰어 write-back ingest 게이트(기본 false=inert, 검증 후 점등).
- STUDY_INGEST_ENABLED=${STUDY_INGEST_ENABLED:-false}
# Voice Memo PoC v1 — bot 계정 한정 long-expiry access token. default false → 일반 운영 영향 0.
# 활성화: host .env 에 VOICE_MEMO_BOT_TOKEN_ENABLED=true. plan: rosy-launching-otter.md
- VOICE_MEMO_BOT_TOKEN_ENABLED=${VOICE_MEMO_BOT_TOKEN_ENABLED:-false}
@@ -271,7 +259,7 @@ services:
caddy:
image: caddy:2
ports:
- "127.0.0.1:8080:80" # 2026-06-20: LAN 우회 차단 (실 ingress=home-caddy→caddy:80 도커망)
- "8080:80"
volumes:
- ./Caddyfile:/etc/caddy/Caddyfile
- caddy_data:/data
+1 -8
View File
@@ -2,7 +2,7 @@
import { page } from '$app/stores';
import { goto } from '$app/navigation';
import { api } from '$lib/api';
import { ChevronRight, ChevronDown, FolderOpen, FolderTree, Inbox, Clock, Mail, Scale, StickyNote, GraduationCap, CalendarCheck, MessageCircle, Hash } from 'lucide-svelte';
import { ChevronRight, ChevronDown, FolderOpen, FolderTree, Inbox, Clock, Mail, Scale, StickyNote, GraduationCap, CalendarCheck, MessageCircle } from 'lucide-svelte';
let tree = $state([]);
let loading = $state(true);
@@ -195,13 +195,6 @@
>
<FolderTree size={14} /> 자료실
</a>
<a
href="/clause"
class="w-full flex items-center gap-2 px-3 py-1.5 rounded-md text-sm transition-colors
{$page.url.pathname === '/clause' ? 'bg-accent/15 text-accent' : 'text-dim hover:bg-surface hover:text-text'}"
>
<Hash size={14} /> 절 바로가기
</a>
</div>
<!-- 메모 & Inbox -->
-73
View File
@@ -1,73 +0,0 @@
<script>
// 절(clause) 바로가기 — ASME 절 식별자(예: UG-79)로 크로스-doc 위치를 조회해 읽기뷰로 이동 (U-1).
// 절은 in_corpus=false(의미검색 비활성)라 일반 검색으론 안 잡히므로 라벨 정확지목 전용 진입점.
import { api } from '$lib/api';
import { goto } from '$app/navigation';
let label = $state('');
let hits = $state([]);
let loading = $state(false);
let searched = $state(false);
let error = $state('');
async function lookup() {
const q = label.trim();
if (!q) return;
loading = true;
error = '';
try {
const res = await api(`/documents/clause-lookup?label=${encodeURIComponent(q)}`);
hits = res?.hits ?? [];
searched = true;
} catch (e) {
error = '조회에 실패했습니다.';
hits = [];
} finally {
loading = false;
}
}
</script>
<div class="mx-auto max-w-3xl px-6 py-10">
<h1 class="mb-1 text-2xl font-bold text-base">절 바로가기</h1>
<p class="mb-6 text-sm text-dim">
ASME 절 식별자(예: <code class="text-accent">UG-79</code>, <code class="text-accent">PG-5</code>)로 문서·위치를 찾아 이동합니다.
</p>
<form onsubmit={(e) => { e.preventDefault(); lookup(); }} class="mb-6 flex gap-2">
<input
bind:value={label}
placeholder="절 식별자 (UG-79, PG-5.6, A-1 …)"
autocomplete="off"
class="flex-1 rounded-lg border border-default bg-surface px-4 py-2.5 text-base outline-none focus:border-accent"
/>
<button
type="submit"
disabled={loading || !label.trim()}
class="rounded-lg bg-accent px-5 py-2.5 font-medium text-white hover:bg-accent-hover disabled:opacity-50"
>
{loading ? '조회 중…' : '찾기'}
</button>
</form>
{#if error}
<p class="text-sm text-accent">{error}</p>
{:else if searched && hits.length === 0}
<p class="text-sm text-dim">'{label}' 에 해당하는 절을 찾지 못했습니다. (절은 분해된 코드 문서에만 존재합니다)</p>
{:else if hits.length > 0}
<div class="space-y-2">
{#if hits.length > 1}
<p class="text-xs text-dim">{hits.length}개 문서에 존재 — 에디션/부록을 선택하세요.</p>
{/if}
{#each hits as hit (hit.chunk_id)}
<button
onclick={() => goto(`/documents/${hit.doc_id}?section=${hit.chunk_id}`)}
class="block w-full rounded-lg border border-default bg-surface px-4 py-3 text-left transition hover:border-accent hover:bg-surface-hover"
>
<div class="font-medium text-base">{hit.section_title}</div>
<div class="mt-0.5 text-xs text-dim">{hit.doc_title}</div>
</button>
{/each}
</div>
{/if}
</div>
@@ -130,9 +130,6 @@
let manageOpen = $state(false);
// 기본 선택 = 첫 본문 Part 의 첫 절(front-matter TOC 가 아니라 실제 내용으로 진입, front-matter 접힘 유지).
let defaultSelId = $derived.by(() => {
// 딥링크 진입: ?section=<chunk_id> 가 outline 에 있으면 그 절로 (/clause 절 바로가기 → 해당 절 표시).
const deep = Number($page.url.searchParams.get('section'));
if (deep && outline.some((it) => it.section.chunk_id === deep)) return deep;
if (treeGroups) {
const body = treeGroups.find((g) => !g.isFrontMatter);
if (body && body.items.length) return body.items[0].section.chunk_id;
-56
View File
@@ -1,56 +0,0 @@
-- 스캔 기능: 잡 모델 + 배치 + 에이전트 생존 (plan: scan-feature-build r3)
-- 웹(fastapi)=intent/명령, 호스트 스캔 에이전트=결과. 싱글톤 스캐너 직렬화.
-- 주: 러너 규약상 이 파일은 schema_migrations 를 건드리지 않음(스탬프는 외부). BEGIN/COMMIT 없음.
-- 순서: 테이블 먼저 → 시드 → 인덱스 (인덱스 실패가 테이블 생성 막지 않게).
-- 잡: 한 스캔 세션 = 한 논리 문서 (배치 N개 → 합본 1 PDF → Inbox)
CREATE TABLE IF NOT EXISTS scan_jobs (
id BIGSERIAL PRIMARY KEY,
title TEXT NOT NULL, -- 사람 입력 제목 (commit 시 documents.title 로 전파)
settings JSONB NOT NULL DEFAULT '{}'::jsonb, -- mode/resolution/source(ADF Duplex) 등 스캔 프로파일
status TEXT NOT NULL DEFAULT 'draft', -- draft|queued|ready|scanning|assembling|preview|committing|committed|failed|canceled
batch_count INTEGER NOT NULL DEFAULT 0, -- 스캔 완료 배치 수
page_count INTEGER, -- 최종 합본 페이지 수 (assembling 후)
last_activity_at TIMESTAMPTZ, -- ready 휴지 벽시계 idle 타임아웃 기준 (방치 데드락 방지)
last_progress_at TIMESTAMPTZ, -- 잡 진행 갱신 (에이전트 생존과 분리)
staging_path TEXT, -- 호스트 로컬 잡 스테이징 디렉토리
nas_staging_path TEXT, -- NAS .scan-staging 합본 경로 (B안 미리보기/commit 소스)
inbox_path TEXT, -- 최종 PKM/Inbox 경로 (commit 후)
file_hash CHAR(64), -- 합본 sha256 = 정체성/멱등 커밋 키 (commit 시 채움)
doc_id BIGINT REFERENCES documents(id) ON DELETE SET NULL, -- commit 후 연결 (title 전파)
error TEXT, -- failed 사유 (no-silent)
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 배치: 스캔 1회(ADF 한 묶음) 단위. batch_seq = 결합 순서(글롭 정렬 아님).
CREATE TABLE IF NOT EXISTS scan_job_batches (
id BIGSERIAL PRIMARY KEY,
job_id BIGINT REFERENCES scan_jobs(id) ON DELETE CASCADE NOT NULL,
batch_seq INTEGER NOT NULL, -- 1-based 결합 순서
staging_path TEXT, -- 이 배치 PDF (호스트 로컬)
page_count INTEGER,
status TEXT NOT NULL DEFAULT 'scanned', -- scanned | discarded (잼 폐기 후 재스캔)
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE (job_id, batch_seq)
);
-- 에이전트 생존: 싱글톤 1행. 잡 진행(last_progress_at)과 분리 — queued 잡 stale 오탐 방지.
CREATE TABLE IF NOT EXISTS scan_agent_status (
id INTEGER PRIMARY KEY DEFAULT 1 CHECK (id = 1), -- 단일 행 강제
last_heartbeat TIMESTAMPTZ,
agent_version TEXT,
current_job_id BIGINT REFERENCES scan_jobs(id) ON DELETE SET NULL,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO scan_agent_status (id) VALUES (1) ON CONFLICT (id) DO NOTHING; -- 시드 1행
-- 활성 잡 락: 스캐너 싱글톤 → in-progress 잡은 전체에서 1개만(나머지 queued).
-- 상수 TRUE 에 unique + in-progress 필터 = 그 상태 행 최대 1개 강제.
CREATE UNIQUE INDEX IF NOT EXISTS uq_scan_jobs_single_active
ON scan_jobs ((TRUE))
WHERE status IN ('ready','scanning','assembling','preview','committing');
CREATE INDEX IF NOT EXISTS idx_scan_jobs_queued ON scan_jobs (created_at) WHERE status = 'queued';
CREATE INDEX IF NOT EXISTS idx_scan_jobs_file_hash ON scan_jobs (file_hash) WHERE file_hash IS NOT NULL;
CREATE INDEX IF NOT EXISTS idx_scan_job_batches_job ON scan_job_batches (job_id, batch_seq);
-4
View File
@@ -1,4 +0,0 @@
-- 스캔 잡 명령 채널 (이중 라이터: API=intent/명령, 에이전트=result) — plan scan-feature-build r3
-- API/수동이 pending_command 설정 → 에이전트가 조건부 claim(WHERE pending_command=X AND status=기대값) → 실행 → 결과 status write.
ALTER TABLE scan_jobs ADD COLUMN IF NOT EXISTS pending_command TEXT; -- scan_batch | finish | commit | cancel
ALTER TABLE scan_jobs ADD COLUMN IF NOT EXISTS command_requested_at TIMESTAMPTZ; -- 명령 요청 시각(staleness/디버그)
-21
View File
@@ -1,21 +0,0 @@
-- 367_published.sql
-- 발행 레이어(docsrv-viewer-publish) projection 테이블. 뷰어가 read API로 당겨 자기 SQLite로 복제.
-- kind-discriminated 단일 테이블(study_question | study_explanation | ... 후속 news/document).
-- pub_id = opaque+stable(워커가 (kind,source_id)당 1회 부여, republish=rev bump에도 불변) = 뷰어 dedup키=progress키.
-- source_id = 내부 소스 행 id (pub_id→내부 역매핑, ingest write-back 해소용).
-- rev = 발행 워커 커밋순 gapless 커서(pg_advisory_lock 단일 라이터). 뷰어 feed = WHERE rev>since.
-- payload_hash = sha256(정렬 JSON). (payload_hash, deleted) 디둡 — no-op 재투영 억제, tombstone 보존.
-- deleted = tombstone(삭제/만료도 feed 1급 이벤트). schema_version = 엔벨로프 버전(미지원 가시거부).
CREATE TABLE IF NOT EXISTS published (
id BIGSERIAL PRIMARY KEY,
kind VARCHAR(40) NOT NULL,
source_id BIGINT NOT NULL,
pub_id TEXT NOT NULL,
payload JSONB NOT NULL,
payload_hash TEXT NOT NULL,
schema_version SMALLINT NOT NULL DEFAULT 1,
rev BIGINT NOT NULL,
deleted BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
@@ -1,3 +0,0 @@
-- 368_published_kind_pubid_uq.sql
-- pub_id 는 kind 내 유일(뷰어 dedup/progress 키 무결성, pub_id→내부 역해소 유일성 보장).
CREATE UNIQUE INDEX IF NOT EXISTS published_kind_pubid_uq ON published (kind, pub_id);
@@ -1,3 +0,0 @@
-- 369_published_kind_source_uq.sql
-- (kind, source_id) 당 발행 행 1개 — 발행 워커 upsert 타깃 + pub_id 재사용(같은 source=같은 pub_id) 키.
CREATE UNIQUE INDEX IF NOT EXISTS published_kind_source_uq ON published (kind, source_id);
-3
View File
@@ -1,3 +0,0 @@
-- 370_published_rev_idx.sql
-- 뷰어 pull-sync feed: SELECT ... WHERE rev > :since ORDER BY rev LIMIT :page (P0-2).
CREATE INDEX IF NOT EXISTS published_rev_idx ON published (rev);
-15
View File
@@ -1,15 +0,0 @@
-- 371_publish_outbox.sql
-- transactional outbox — 저작/4-A 트랜잭션이 같은 tx에서 여기 INSERT(P0-1 규율),
-- 단일 발행 워커가 id(커밋순) 순으로 drain 하며 published 에 rev 부여(소스 updated_at 폴링 금지=갭 재발).
-- processed_at = 워커 drain 시 스탬프(NULL=미처리). payload/hash 는 enqueue 시점 스냅샷.
CREATE TABLE IF NOT EXISTS publish_outbox (
id BIGSERIAL PRIMARY KEY,
kind VARCHAR(40) NOT NULL,
source_id BIGINT NOT NULL,
payload JSONB NOT NULL,
payload_hash TEXT NOT NULL,
schema_version SMALLINT NOT NULL DEFAULT 1,
deleted BOOLEAN NOT NULL DEFAULT false,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
processed_at TIMESTAMPTZ
);
@@ -1,3 +0,0 @@
-- 372_publish_outbox_unprocessed_idx.sql
-- 워커 drain 쿼리: WHERE processed_at IS NULL ORDER BY id (커밋순). 부분 인덱스로 미처리분만.
CREATE INDEX IF NOT EXISTS publish_outbox_unprocessed_idx ON publish_outbox (id) WHERE processed_at IS NULL;
@@ -1,4 +0,0 @@
-- 373_quiz_session_finalized_at.sql
-- 발행 ingest(study-to-viewer P2) finalize 멱등 마커. finalize 성공 후 스탬프 →
-- 같은 세션 재전송(at-least-once outbox) 시 SR 이중 advance 차단. 라이브 세션은 NULL 유지(무영향).
ALTER TABLE study_quiz_sessions ADD COLUMN IF NOT EXISTS finalized_at TIMESTAMPTZ;
@@ -1,3 +0,0 @@
-- 374_quiz_session_client_uuid.sql
-- 뷰어 로컬 세션 UUID. ingest 가 (uuid, topic) 로 DS 세션 find-or-create = 멱등 키. 라이브=NULL.
ALTER TABLE study_quiz_sessions ADD COLUMN IF NOT EXISTS client_session_uuid TEXT;
-3
View File
@@ -1,3 +0,0 @@
-- 375_quiz_session_source.sql
-- 세션 출처 구분(live | viewer). 감사/필터용. 기존 행=live.
ALTER TABLE study_quiz_sessions ADD COLUMN IF NOT EXISTS source VARCHAR(20) NOT NULL DEFAULT 'live';
@@ -1,3 +0,0 @@
-- 376_quiz_session_client_uuid_uq.sql
-- (client_session_uuid, study_topic_id) 유일 — 뷰어 1세션이 topic 별 1 DS세션. partial(uuid 있는 viewer 행만).
CREATE UNIQUE INDEX IF NOT EXISTS study_quiz_sessions_client_uuid_topic_uq ON study_quiz_sessions (client_session_uuid, study_topic_id) WHERE client_session_uuid IS NOT NULL;
+1 -1
View File
@@ -289,7 +289,7 @@ async def run(topic_id: int, exam_round: str, apply: bool, abort_threshold: int)
host="postgres",
port=5432,
user="pkm",
password=os.environ["POSTGRES_PASSWORD"],
password="uW38friypljVS0X2ULoMnw",
database="pkm",
)
try:
-70
View File
@@ -1,70 +0,0 @@
"""S-4 초기 백필 — 모든 study_memo_card_progress row 를 발행 outbox 에 적재.
ALL row(필터 없음) due_at NULL sentinel(-on-new)·terminal(졸업) 포함. due-only 백필은
sentinel 누락 viewer 미확인 오분류. 멱등(워커 (payload_hash, deleted) 디둡). flag on 워커 drain.
실행 (GPU 서버):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_card_progress.py
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_card_progress.py --dry-run
"""
import argparse
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# standalone-model-registry-fix: app(라우터 경유 전 모델 import)과 달리 script 는 부분 모델만
# import → SQLAlchemy mapper string 관계(StudyTopic.sessions->StudySession 등) 해소 실패.
# 전 모델 모듈 import 로 레지스트리 완성(전부 컨테이너서 import 가능 = app 이 기동 시 로드).
import importlib as _il, pkgutil as _pu
import models as _mp
for _m in _pu.iter_modules(_mp.__path__):
_il.import_module("models." + _m.name)
from sqlalchemy import func, select
from core.config import settings
from core.database import async_session
from models.study_memo_card_progress import StudyMemoCardProgress
from services.study.publish_enqueue import backfill_publish_card_progress
# 개인 학습툴 progress row 대비 넉넉. 도달 시 가드 경보.
PAGE = 100000
async def run(dry_run: bool) -> None:
async with async_session() as session:
total = (
await session.execute(
select(func.count()).select_from(StudyMemoCardProgress)
)
).scalar() or 0
print(f"[info] study_publish_enabled={settings.study_publish_enabled} "
f"(False 면 적재는 되나 워커가 drain 안 함)")
print(f"[info] card progress row {total}건 (ALL row 발행)")
if dry_run:
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
return
async with async_session() as session:
n = await backfill_publish_card_progress(session, after_id=0, limit=PAGE)
await session.commit()
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
if n >= PAGE:
print(f"[warn] PAGE({PAGE}) 도달 — progress 가 더 있을 수 있음. after_id 페이징 추가 필요.")
def main() -> None:
parser = argparse.ArgumentParser(description="S-4 pub_card_progress 초기 백필")
parser.add_argument("--dry-run", action="store_true", default=False)
args = parser.parse_args()
asyncio.run(run(args.dry_run))
if __name__ == "__main__":
main()
-75
View File
@@ -1,75 +0,0 @@
"""S-2 초기 백필 — 검수완료(needs_review=False)·미삭제 study_memo_cards 를 발행 outbox 에 적재.
publish_outbox 에만 적재(멱등: 워커 (payload_hash, deleted) 디둡). study_publish_enabled=True
발행 워커가 drain published(kind=study_card) rev 부여 viewer pull-sync.
실행 (GPU 서버):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_cards.py
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_cards.py --dry-run
"""
import argparse
import asyncio
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# standalone-model-registry-fix: app(라우터 경유 전 모델 import)과 달리 script 는 부분 모델만
# import → SQLAlchemy mapper string 관계(StudyTopic.sessions->StudySession 등) 해소 실패.
# 전 모델 모듈 import 로 레지스트리 완성(전부 컨테이너서 import 가능 = app 이 기동 시 로드).
import importlib as _il, pkgutil as _pu
import models as _mp
for _m in _pu.iter_modules(_mp.__path__):
_il.import_module("models." + _m.name)
from sqlalchemy import func, select
from core.config import settings
from core.database import async_session
from models.study_memo_card import StudyMemoCard
from services.study.publish_enqueue import backfill_publish_cards
# 개인 학습툴 카드 수 대비 넉넉(단일 outbox 적재 tx, 워커는 BATCH_SIZE 로 drain). 도달 시 가드 경보.
PAGE = 100000
async def run(dry_run: bool) -> None:
async with async_session() as session:
active = (
await session.execute(
select(func.count())
.select_from(StudyMemoCard)
.where(
StudyMemoCard.deleted_at.is_(None),
StudyMemoCard.needs_review.is_(False),
)
)
).scalar() or 0
print(f"[info] study_publish_enabled={settings.study_publish_enabled} "
f"(False 면 적재는 되나 워커가 drain 안 함)")
print(f"[info] 검수완료·미삭제 카드 {active}")
if dry_run:
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
return
async with async_session() as session:
n = await backfill_publish_cards(session, after_id=0, limit=PAGE)
await session.commit()
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
if n >= PAGE:
print(f"[warn] PAGE({PAGE}) 도달 — 카드가 더 있을 수 있음. after_id 페이징 추가 필요.")
def main() -> None:
parser = argparse.ArgumentParser(description="S-2 pub_card 초기 백필")
parser.add_argument("--dry-run", action="store_true", default=False)
args = parser.parse_args()
asyncio.run(run(args.dry_run))
if __name__ == "__main__":
main()
-78
View File
@@ -1,78 +0,0 @@
"""S-1 초기 백필 — 기존 active study_topics 를 발행 outbox 에 1회 적재.
publish_outbox 에만 적재한다(멱등: 발행 워커의 (payload_hash, deleted) 디둡이
중복 enqueue no-op 으로 흡수). study_publish_enabled=True 발행 워커가
1 주기로 drain published rev 부여 viewer pull-sync.
주제 수는 개인 학습툴이라 소량 bounded page 사실상 1페이지지만 PAGE 도달
overflow 가드로 페이징 누락을 경보(silent truncation 금지).
실행 (GPU 서버):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_topics.py
# dry-run(적재 없이 카운트만):
docker exec hyungi_document_server-fastapi-1 python /app/scripts/backfill_publish_topics.py --dry-run
"""
import argparse
import asyncio
import os
import sys
# fastapi 컨테이너 WORKDIR=/app — `from models...` import 가능하게 path 추가.
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
# standalone-model-registry-fix: app(라우터 경유 전 모델 import)과 달리 script 는 부분 모델만
# import → SQLAlchemy mapper string 관계(StudyTopic.sessions->StudySession 등) 해소 실패.
# 전 모델 모듈 import 로 레지스트리 완성(전부 컨테이너서 import 가능 = app 이 기동 시 로드).
import importlib as _il, pkgutil as _pu
import models as _mp
for _m in _pu.iter_modules(_mp.__path__):
_il.import_module("models." + _m.name)
from sqlalchemy import func, select
from core.config import settings
from core.database import async_session
from models.study_topic import StudyTopic
from services.study.publish_enqueue import backfill_publish_topics
# 개인 학습툴 주제 수 대비 넉넉. 도달 시 overflow 가드가 경보.
PAGE = 5000
async def run(dry_run: bool) -> None:
async with async_session() as session:
active = (
await session.execute(
select(func.count())
.select_from(StudyTopic)
.where(StudyTopic.deleted_at.is_(None))
)
).scalar() or 0
print(f"[info] study_publish_enabled={settings.study_publish_enabled} "
f"(False 면 적재는 되나 워커가 drain 안 함)")
print(f"[info] active 주제 {active}")
if dry_run:
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
return
async with async_session() as session:
n = await backfill_publish_topics(session, after_id=0, limit=PAGE)
await session.commit()
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
if n >= PAGE:
print(f"[warn] PAGE({PAGE}) 도달 — 주제가 더 있을 수 있음. after_id 페이징 추가 필요.")
def main() -> None:
parser = argparse.ArgumentParser(description="S-1 pub_topics 초기 백필")
parser.add_argument("--dry-run", action="store_true", default=False)
args = parser.parse_args()
asyncio.run(run(args.dry_run))
if __name__ == "__main__":
main()
+1 -2
View File
@@ -16,7 +16,6 @@ dry-run 먼저 출력 (각 필드 N건). 그 다음 --apply 옵션으로 UPDATE.
from __future__ import annotations
import asyncio
import os
import re
import sys
@@ -100,7 +99,7 @@ async def main() -> None:
host="postgres",
port=5432,
user="pkm",
password=os.environ["POSTGRES_PASSWORD"],
password="uW38friypljVS0X2ULoMnw",
database="pkm",
)
try:
-107
View File
@@ -1,107 +0,0 @@
"""ASME 절(clause) 타이핑 + 라벨 정제 단위테스트 (A-1 / C-4, presegment-multigranularity).
핵심 불변식:
- (A-1) ATX heading 제목이 ASME 식별자(UG-79·PG-27.4.1·UW-11·A-69 ) node_type='clause'.
builder 과거 ATX 무조건 node_type=None 으로 반환해 ASME 절이 'clause' 잡히던 것을 고침.
- (C-4) marker LaTeX/markdown/페이지번호 아티팩트('$\\textbf{PG-20.1 …}', '(25) **A-69**') 절번호
매칭 전에 정제 정제 없으면 패턴이 노이즈에 막혀 매칭 0.
- (A-2) (>LEAF_HARD_MAX) 기존 window-split 로직으로 자동 'clause_split'
(char_start 보존 = 단일 점프 타깃). 추가 코드 없이 타이핑만으로 확보.
- (무회귀) 한국 법령 제N조(_KO_JO 경로)·일반 ATX 헤딩은 영향 없음(정제 inert, 타이핑 None 유지).
pytest + 단독 실행 양쪽 지원:
PYTHONPATH=. python3 tests/hier_decomp/test_asme_clause.py
"""
from __future__ import annotations
try: # pytest 경로 (앱 패키지)
from app.services.hier_decomp.builder import _detect_heading, _clean_label, build_hier_tree
except Exception: # 단독 실행 (앱 deps 없이 builder.py 직접 로드 — stdlib only)
import importlib.util
import pathlib
import sys
_bp = pathlib.Path(__file__).resolve().parents[2] / "app/services/hier_decomp/builder.py"
_spec = importlib.util.spec_from_file_location("_hier_builder_t", _bp)
_m = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _m # dataclass __module__ 해소
_spec.loader.exec_module(_m)
_detect_heading, _clean_label, build_hier_tree = _m._detect_heading, _m._clean_label, _m.build_hier_tree
# 5180/5210 실데이터에서 뽑은 noisy 라벨 (marker LaTeX/markdown/페이지번호 범벅).
ASME_NOISY = [
(r"# $\textbf{PG-20.1 Carbon and Carbon-Molybdenum Tube and} \hspace{0.2cm} \textbf{(25)}$", "PG-20.1"),
("# (25) **A-69**", "A-69"),
("# (25) PFT-14 GENERAL", "PFT-14"),
("## (25) PG-27.4.1", "PG-27.4.1"),
("### UG-79 Forming of Pressure Parts", "UG-79"),
("# UW-11 Radiographic Examination", "UW-11"),
("#### UCS-56 Requirements for Postweld Heat Treatment", "UCS-56"),
]
def test_asme_clause_typed_and_cleaned():
for line, head in ASME_NOISY:
r = _detect_heading(line)
assert r is not None, f"미탐지: {line!r}"
_lvl, title, nt = r
assert nt == "clause", f"{line!r} → node_type={nt} (clause 여야)"
assert title.startswith(head), f"{line!r} → 정제 라벨 {title!r}{head!r} 로 시작 안 함"
assert "\\textbf" not in title and "$" not in title and "**" not in title, f"라벨에 아티팩트 잔류: {title!r}"
def test_clean_label_strips_artifacts():
assert _clean_label(r"$\textbf{PG-20.1 Foo} \hspace{0.2cm} \textbf{(25)}$").startswith("PG-20.1 Foo")
assert _clean_label("(25) **A-69**") == "A-69"
assert _clean_label("(25) PFT-14 GENERAL") == "PFT-14 GENERAL"
def test_korean_jo_unaffected():
# 한국 법령 제N조 = _KO_JO 경로(ATX 아님) → clause 유지, _clean_label 미적용·inert.
r = _detect_heading("제3조(정의) 이 규칙에서 사용하는 용어의 뜻은")
assert r is not None and r[2] == "clause" and "제3조" in r[1], r
assert _clean_label("제3조(정의)") == "제3조(정의)" # 노이즈 없음 → inert(무회귀)
def test_plain_atx_not_clause():
# ASME 절 식별자가 아닌 일반 ATX 헤딩은 node_type None 유지 + 라벨 무변.
for line, want in [("# Introduction", "Introduction"), ("## Overview of Methods", "Overview of Methods")]:
r = _detect_heading(line)
assert r is not None and r[2] is None and r[1] == want, r
def test_large_clause_becomes_clause_split():
# A-2: 큰 절(>5000자) → 기존 window-split 이 'clause' 를 'clause_split' 로(char_start 보존=점프 타깃) + window 자식.
big = "# UG-22 Loadings\n\n" + ("This is a body paragraph describing loadings in detail. " * 30 + "\n\n") * 8
nodes = build_hier_tree(big)
splits = [n for n in nodes if n.node_type == "clause_split"]
assert splits, f"clause_split 없음: {[n.node_type for n in nodes]}"
assert all(n.char_start is not None for n in splits), "clause_split char_start(점프 타깃) 유실"
assert any(n.node_type == "window" for n in nodes), "window 자식 없음"
def test_typing_ratio_sample():
# V-1 스타일: 4 ASME 절 + 1 일반 → clause 4개만.
md = "\n\n".join(f"# {x}\n\nbody for {x} here.\n"
for x in ["UG-1 Scope", "UG-79 Forming", "PG-5 Service", "Introduction", "UW-11 RT"])
clauses = [n for n in build_hier_tree(md) if n.node_type in ("clause", "clause_split")]
assert len(clauses) == 4, [n.section_title for n in clauses]
if __name__ == "__main__":
import sys
import traceback
fns = [(k, v) for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)]
failed = 0
for name, fn in fns:
try:
fn()
print(f" PASS {name}")
except Exception:
failed += 1
print(f" FAIL {name}")
traceback.print_exc()
print(f"\n{len(fns) - failed}/{len(fns)} passed")
sys.exit(1 if failed else 0)
+1 -3
View File
@@ -67,9 +67,7 @@ def test_atx_part_and_item_still_detected():
assert lvl == 1 and nt is None, r # ATX = level(# 수), node_type None
assert title.startswith("PART PG")
r2 = _detect_heading("#### PG-1 SCOPE")
# A-1(asme-clause): ASME 절 식별자(PG-1) 는 이제 node_type='clause' 로 타이핑된다(과거 None).
# ATX 탐지·level(# 수) 보존은 그대로 — 변경은 타이핑 한정.
assert r2 is not None and r2[0] == 4 and r2[2] == "clause", r2
assert r2 is not None and r2[0] == 4 and r2[2] is None, r2
def test_build_hier_tree_drops_false_part_section():
+13 -39
View File
@@ -37,8 +37,7 @@ from services.search.freshness_decay import (
NOW = datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
def _meta(channel: str | None, *, days_ago: float | None = 30.0, origin: str | None = None,
material_type: str | None = None) -> _DocMeta:
def _meta(channel: str | None, *, days_ago: float | None = 30.0, origin: str | None = None) -> _DocMeta:
if days_ago is None:
created = None
elif days_ago < 0:
@@ -46,8 +45,7 @@ def _meta(channel: str | None, *, days_ago: float | None = 30.0, origin: str | N
created = NOW + timedelta(days=-days_ago)
else:
created = NOW - timedelta(days=days_ago)
return _DocMeta(source_channel=channel, content_origin=origin, created_at=created,
material_type=material_type)
return _DocMeta(source_channel=channel, content_origin=origin, created_at=created)
# ─── policy dispatcher ────────────────────────────────────────────
@@ -57,15 +55,8 @@ def test_policy_news():
assert freshness_policy(_meta("news")) == "news_90d"
def test_policy_law_monitor_now_unaffected():
# C-1 후속: law_365d 폐기 → law_monitor 비적용 (현행성은 version_status 가 처리)
assert freshness_policy(_meta("law_monitor")) is None
def test_policy_incident():
# C-1 후속: 재해사례/사망사고(material_type='incident') → news_90d 흡수 (source 무관)
assert freshness_policy(_meta("crawl", material_type="incident")) == "news_90d"
assert freshness_policy(_meta("inbox_route", material_type="incident")) == "news_90d"
def test_policy_law_monitor():
assert freshness_policy(_meta("law_monitor")) == "law_365d"
def test_policy_manual_unaffected():
@@ -132,9 +123,8 @@ def test_decay_at_half_life_news():
assert compute_decay(90.0, "news_90d") == pytest.approx(0.5, rel=1e-6)
def test_decay_law_365d_removed_returns_one():
# C-1 후속: law_365d 폐기 → HALF_LIFE_DAYS 미등록 policy → decay 1.0 (no-op)
assert compute_decay(365.0, "law_365d") == 1.0
def test_decay_at_half_life_law():
assert compute_decay(365.0, "law_365d") == pytest.approx(0.5, rel=1e-6)
def test_decay_age_zero_full():
@@ -222,38 +212,22 @@ async def test_apply_news_recent_vs_old_recent_higher():
@pytest.mark.asyncio
async def test_apply_law_monitor_now_unaffected():
# C-1 후속: law_monitor freshness 폐기 → recent/old 동일 score (재정렬 없음)
async def test_apply_law_monitor_recent_vs_old_recent_higher():
# 가드 2: law_monitor recent 가 위
base = 0.50
rows = [
{"id": 1, "source_channel": "law_monitor", "content_origin": "extracted",
"material_type": "law", "created_at": NOW - timedelta(days=10)},
"created_at": NOW - timedelta(days=10)},
{"id": 2, "source_channel": "law_monitor", "content_origin": "extracted",
"material_type": "law", "created_at": NOW - timedelta(days=730)},
]
session = _MockSession(rows)
results = [_result(1, base), _result(2, base)]
out = await apply_freshness_decay(results, session, now=NOW)
assert out[0].score == base and out[1].score == base
assert out[0].freshness_debug["freshness_policy"] is None
@pytest.mark.asyncio
async def test_apply_incident_recent_vs_old_recent_higher():
# C-1 후속: 재해사례(incident) recent 가 위 (news_90d 흡수, source_channel='crawl')
base = 0.50
rows = [
{"id": 1, "source_channel": "crawl", "content_origin": "extracted",
"material_type": "incident", "created_at": NOW - timedelta(days=5)},
{"id": 2, "source_channel": "crawl", "content_origin": "extracted",
"material_type": "incident", "created_at": NOW - timedelta(days=400)},
"created_at": NOW - timedelta(days=730)}, # 2년
]
session = _MockSession(rows)
results = [_result(1, base), _result(2, base)]
out = await apply_freshness_decay(results, session, now=NOW)
assert out[0].id == 1
assert out[0].score > out[1].score
assert out[0].freshness_debug["freshness_policy"] == "news_90d"
assert out[0].freshness_debug["freshness_policy"] == "law_365d"
# 2년 → law_365d 반감기 1년 → decay ~0.25 → multiplier ~ 0.775
assert out[1].score < out[0].score
@pytest.mark.asyncio