Compare commits

..

1 Commits

Author SHA1 Message Date
hyungi db7ede04b7 fix(markdown): 이미지 ref pre-render — 렌더러 미발화 시에도 placeholder 표시
docMarked image 렌더러가 런타임 미발화 시 ![](docimg:img_NNN) 가 기본 <img src=docimg:>
로 떨어지고 DOMPurify(미지원 프로토콜)가 제거 → placeholder·이미지 둘 다 사라지던 문제
(수식 토크나이저 미발화와 동형). marked 이전에 image ref 를 placeholder figure 로 직접
pre-render(슬롯 보호, 수식과 동일 우회). 이후 MarkdownDoc swap effect 가 실제 <img> 로 교체.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 16:34:34 +09:00
58 changed files with 912 additions and 7065 deletions
+7 -10
View File
@@ -289,16 +289,13 @@ class AIClient:
return response.json()
async def _call_chat(self, model_config, prompt: str) -> str:
"""OpenAI 호환 API 호출 (R6: 무동의 클라우드 폴백 제거).
이전엔 primary(맥미니) TimeoutException/ConnectError 시 동의·과금 통제 없이
self.ai.fallback(Claude API)로 자동 전환 → 개인 문서/쿼리/메모가 Anthropic 으로
silent egress. on-prem 추론 프라이버시 계약 위반이라 봉쇄한다. 실패는 그대로 전파:
배치 워커는 재시도/StageDeferred(R3·queue_consumer), interactive 호출자는 5xx 표면화
(documents.analyze 등 이미 502/504 변환). 클라우드는 premium explicit-trigger
(summarize force_premium) 또는 call_fallback 명시 호출로만 — 자동 진입 금지.
"""
return await self._request(model_config, prompt)
"""OpenAI 호환 API 호출 + 자동 폴백"""
try:
return await self._request(model_config, prompt)
except (httpx.TimeoutException, httpx.ConnectError):
if model_config == self.ai.primary:
return await self._request(self.ai.fallback, prompt)
raise
async def _request(self, model_config, prompt: str, system: str | None = None) -> str:
"""단일 모델 API 호출 (OpenAI 호환 + Anthropic Messages API).
-6
View File
@@ -195,14 +195,8 @@ async def regenerate(
date 미지정 시 오늘 KST. 같은 날 row 존재 시 transaction 안에서 삭제 후 신규 생성.
응답 status='success' | 'partial' | 'failed' | 'empty'.
"""
from core.config import settings
from workers.briefing_worker import run
# held(정책상 정상 보류)를 409 로 표면화 (R8) — digest.py 정본 대칭. 이전엔 briefing_worker.run()
# 이 held/timeout/exception 셋 다 None 반환 → API 가 셋 다 500 으로 오보(silent-state-conflation).
if "briefing" in settings.pipeline_held_stages:
raise HTTPException(status_code=409, detail="briefing 단계가 일시 보류(held) 상태입니다")
result = await run(target_date=date)
if result is None:
raise HTTPException(status_code=500, detail="briefing 워커 실행 실패 (로그 확인)")
+24 -34
View File
@@ -69,19 +69,6 @@ def _upload_error(status_code: int, error_code: str, message: str) -> HTTPExcept
)
async def get_live_document(session: AsyncSession, doc_id: int) -> Document:
"""soft-delete(deleted_at) 가드 포함 문서 조회 — 없거나 삭제됐으면 404 (R7).
조회/수정 경로는 deleted_at 을 일관 가드하나 파일/콘텐츠 서빙 엔드포인트가 누락 →
삭제 문서의 원본/preview/전문이 doc_id(+유효 토큰)만으로 노출되던 비대칭. '경로마다
deleted_at 기억'에 의존하지 않게 헬퍼로 구조 강제(추가될 서빙 경로도 자동 보호).
"""
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
return doc
async def _near_dup_scan_bg(doc_id: int) -> None:
"""B-3: post-upload near_duplicate 스캔 (BackgroundTask). 자체 세션, best-effort.
@@ -851,7 +838,9 @@ async def get_document_file(
# 일반 Bearer 헤더 인증 시도
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await get_live_document(session, doc_id)
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# note(메모)는 물리 파일이 없음
if not doc.file_path:
@@ -954,8 +943,10 @@ async def get_document_image_raw(
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
# 문서 존재 확인 (image_key 만 있고 doc 가 사라진 케이스 차단 + soft-delete 가드)
doc = await get_live_document(session, doc_id)
# 문서 존재 확인 (image_key 만 있고 doc 가 사라진 케이스 차단)
doc = await session.get(Document, doc_id)
if doc is None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
img = await session.scalar(
select(DocumentImage).where(
@@ -1366,8 +1357,9 @@ async def save_document_content(
body: dict = None,
):
"""Markdown 원본 파일 저장 + extracted_text 갱신"""
# soft-delete 문서엔 쓰기 차단 (R7 — 삭제 문서 resurrect / NAS 재기록 방지)
doc = await get_live_document(session, doc_id)
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
if doc.file_format not in ("md", "txt"):
raise HTTPException(status_code=400, detail="편집 가능한 포맷이 아닙니다 (md, txt만 가능)")
@@ -1407,7 +1399,9 @@ async def get_document_preview(
else:
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await get_live_document(session, doc_id)
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
preview_path = Path(settings.nas_mount_path) / "PKM" / ".preview" / f"{doc_id}.pdf"
if not preview_path.exists():
@@ -1433,24 +1427,18 @@ async def delete_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
delete_file: bool = Query(False, description="NAS 원본도 삭제 (grace 후 retention sweep 이 물리삭제)"),
delete_file: bool = Query(False, description="NAS 파일도 함께 삭제"),
):
"""문서 삭제. 기본: soft-delete(숨김, 파일 보존). delete_file=true: purge 예약 (R7)."""
doc = await get_live_document(session, doc_id)
"""문서 삭제 (기본: DB만 삭제, 파일 유지)"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# soft-delete(숨김). delete_file=true 면 purge_requested_at 마커를 추가로 set —
# retention sweep cron(document_purge_sweep)이 grace(30일) 경과 후 NAS 원본 물리삭제
# + audit-log. ★일반 숨김(delete_file=false)은 파일 보존 = undelete 가능. sweep 는
# deleted_at 이 아니라 purge_requested_at 기준이라 단순 숨김이 영구삭제되지 않는다.
now = datetime.now(timezone.utc)
doc.deleted_at = now
if delete_file:
doc.purge_requested_at = now
# soft-delete (물리 파일은 cleanup job에서 나중에 정리)
doc.deleted_at = datetime.now(timezone.utc)
await session.commit()
if delete_file:
return {"message": f"문서 {doc_id} 삭제 — NAS 원본은 30일 후 정리 예약"}
return {"message": f"문서 {doc_id} soft-delete 완료 (파일 보존)"}
return {"message": f"문서 {doc_id} soft-delete 완료"}
@router.get("/{doc_id}/content")
@@ -1460,7 +1448,9 @@ async def get_document_content(
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 전문 텍스트 반환 (서비스 호출용)."""
doc = await get_live_document(session, doc_id)
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
raw_text = doc.extracted_text or ""
content = raw_text[:15000]
+5 -5
View File
@@ -21,7 +21,7 @@ from zoneinfo import ZoneInfo
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, func, or_, select
from sqlalchemy import and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
@@ -388,10 +388,10 @@ async def list_events(
)
base = select(Event).where(and_(*where))
# R10: 전체 ID 로딩 후 len() 대신 DB COUNT 푸시다운 (행 수 선형 메모리/전송 비용 제거).
total = (
await session.execute(select(func.count(Event.id)).where(and_(*where)))
).scalar() or 0
total_q = await session.execute(
select(Event.id).where(and_(*where))
)
total = len(total_q.scalars().all())
rows = await session.execute(
base.order_by(Event.created_at.desc())
+1 -5
View File
@@ -6,7 +6,6 @@ Bearer token 보호 (settings.internal_worker_token).
"""
from __future__ import annotations
import hmac
import logging
from fastapi import APIRouter, Depends, Header, HTTPException, Path, Response, status
@@ -29,10 +28,7 @@ def _verify_token(authorization: str | None = Header(default=None)) -> None:
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="missing Bearer token")
token = authorization[7:].strip()
# 상수시간 비교 (R7) — 일반 != 는 첫 불일치에서 단락돼 prefix 길이로 바이트 추정 가능한
# timing side-channel. 이 토큰이 RAG 정답 포함 endpoint 를 보호하므로 compare_digest 로
# 통일(search.py 정본과 일치).
if not hmac.compare_digest(token, settings.internal_worker_token):
if token != settings.internal_worker_token:
raise HTTPException(status_code=403, detail="invalid token")
+62 -25
View File
@@ -473,35 +473,72 @@ async def get_facet_counts(
result = FacetCountsResponse(company=[], topic=[], year=[], doctype=[])
# R10: 4 facet 블록 중복 제거 — 적용된 facet 필터(값 있는 것만)를 모아 각 축 집계 시
# '자기 자신 축'만 제외하고 적용하는 헬퍼로. 쿼리/자기제외/order_by/value 매핑 모두 동일.
applied: dict = {}
if facet_company:
applied["company"] = Document.facet_company == facet_company
# company counts (다른 facet 필터 적용, 자기 자신 제외)
q_company = base_query()
if facet_topic:
applied["topic"] = Document.facet_topic == facet_topic
q_company = q_company.where(Document.facet_topic == facet_topic)
if facet_year:
applied["year"] = Document.facet_year == facet_year
q_company = q_company.where(Document.facet_year == facet_year)
if facet_doctype:
applied["doctype"] = Document.facet_doctype == facet_doctype
q_company = q_company.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_company, func.count())
.where(Document.facet_company != None) # noqa: E711
.where(Document.id.in_(q_company.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_company)
.order_by(func.count().desc())
)
result.company = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
async def _facet_count(name, facet_col, order_by, value_fn):
q = base_query()
for k, cond in applied.items():
if k != name: # 자기 자신 facet 필터는 제외 (다른 축만 적용)
q = q.where(cond)
rows = await session.execute(
select(facet_col, func.count())
.where(facet_col != None) # noqa: E711
.where(Document.id.in_(q.with_only_columns(Document.id).subquery().select()))
.group_by(facet_col)
.order_by(order_by)
)
return [FacetCountItem(value=value_fn(r[0]), count=r[1]) for r in rows]
# topic counts
q_topic = base_query()
if facet_company:
q_topic = q_topic.where(Document.facet_company == facet_company)
if facet_year:
q_topic = q_topic.where(Document.facet_year == facet_year)
if facet_doctype:
q_topic = q_topic.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_topic, func.count())
.where(Document.facet_topic != None) # noqa: E711
.where(Document.id.in_(q_topic.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_topic)
.order_by(func.count().desc())
)
result.topic = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
result.company = await _facet_count("company", Document.facet_company, func.count().desc(), lambda v: v)
result.topic = await _facet_count("topic", Document.facet_topic, func.count().desc(), lambda v: v)
result.year = await _facet_count("year", Document.facet_year, Document.facet_year.desc(), lambda v: str(v))
result.doctype = await _facet_count("doctype", Document.facet_doctype, func.count().desc(), lambda v: v)
# year counts
q_year = base_query()
if facet_company:
q_year = q_year.where(Document.facet_company == facet_company)
if facet_topic:
q_year = q_year.where(Document.facet_topic == facet_topic)
if facet_doctype:
q_year = q_year.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_year, func.count())
.where(Document.facet_year != None) # noqa: E711
.where(Document.id.in_(q_year.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_year)
.order_by(Document.facet_year.desc())
)
result.year = [FacetCountItem(value=str(r[0]), count=r[1]) for r in rows]
# doctype counts
q_doctype = base_query()
if facet_company:
q_doctype = q_doctype.where(Document.facet_company == facet_company)
if facet_topic:
q_doctype = q_doctype.where(Document.facet_topic == facet_topic)
if facet_year:
q_doctype = q_doctype.where(Document.facet_year == facet_year)
rows = await session.execute(
select(Document.facet_doctype, func.count())
.where(Document.facet_doctype != None) # noqa: E711
.where(Document.id.in_(q_doctype.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_doctype)
.order_by(func.count().desc())
)
result.doctype = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
return result
+2 -6
View File
@@ -300,13 +300,9 @@ async def list_memos(
base = base.where(Document.pinned == pinned)
if tag:
# 파라미터 바인딩 (R7) — f-string 으로 사용자 tag 를 JSON 배열 리터럴에 직접 삽입하면
# tag 안 " 나 ] 가 JSON 을 깨 500 + 필터 의미 변형. jsonb_build_array 로 tag 를
# 바인드 파라미터로 전달(@> JSONB containment).
tag_arr = func.jsonb_build_array(tag)
base = base.where(
Document.user_tags.op("@>")(tag_arr)
| Document.ai_tags.op("@>")(tag_arr)
Document.user_tags.op("@>")(f'["{tag}"]')
| Document.ai_tags.op("@>")(f'["{tag}"]')
)
count_query = select(func.count()).select_from(base.subquery())
+2 -10
View File
@@ -65,8 +65,7 @@ async def create_source(
):
from core.url_validator import validate_feed_url
try:
# getaddrinfo(DNS) 는 blocking — 이벤트 루프 점유 방지 위해 off-thread (R5)
await asyncio.to_thread(validate_feed_url, body.feed_url)
validate_feed_url(body.feed_url)
except ValueError as e:
raise HTTPException(status_code=422, detail=f"feed_url 검증 실패: {e}")
source = NewsSource(**body.model_dump())
@@ -195,17 +194,10 @@ async def trigger_collect(
if _collect_lock.locked():
raise HTTPException(status_code=429, detail="수집이 이미 진행 중입니다")
# TOCTOU 제거 (R9) — 기존엔 locked() 체크 후 실제 acquire 가 별도 task 안에서 일어나, 그
# 사이 다른 요청이 끼어들어 이중 수집 task 가 생길 수 있었다. 핸들러에서 동기적으로(uncontended
# Lock.acquire 는 이벤트루프 양보 없이 즉시 완료) acquire 하고 task 의 finally 에서 release.
await _collect_lock.acquire()
async def _run_with_lock():
try:
async with _collect_lock:
from workers.news_collector import run
await run()
finally:
_collect_lock.release()
asyncio.create_task(_run_with_lock())
return {"message": "뉴스 수집 시작됨"}
+3 -7
View File
@@ -291,7 +291,7 @@ async def search(
content={
"error_reason": "unknown_embedding_backend",
"backend_requested": embedding_backend,
"allowed": ["baseline"],
"allowed": ["baseline", "cand_me5_large_inst", "cand_snowflake_l_v2"],
"detail": msg,
},
)
@@ -710,9 +710,7 @@ async def ask(
# 30s 로 align → classifier 동작 안정. ask 응답 latency 상한 ↑ 의도.
try:
classifier_result = await asyncio.wait_for(classifier_task, timeout=30.0)
except asyncio.CancelledError:
raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception:
except (asyncio.TimeoutError, Exception):
classifier_result = ClassifierResult("timeout", None, [], [], 0.0)
defense_log["classifier"] = {
@@ -874,9 +872,7 @@ async def ask(
# → classifier 와 동일 패턴 (search.py:522 가 6s→15s swap 했던 case). 10s 로 align.
try:
verifier_result = await asyncio.wait_for(verifier_task, timeout=10.0)
except asyncio.CancelledError:
raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception:
except (asyncio.TimeoutError, Exception):
verifier_result = VerifierResult("timeout", [], 0.0)
# Verifier contradictions → grounding flags 머지 (prefix 로 구분, severity 3단계)
+1 -10
View File
@@ -1009,16 +1009,7 @@ async def submit_attempt(
# PR-10: 세션 연동. 기본은 None.
quiz_session: StudyQuizSession | None = None
if body.quiz_session_id is not None:
# FOR UPDATE 로 행 잠금 (R9) — 모바일 더블탭/재시도로 같은 세션에 동시 제출이 들어오면
# 둘 다 cursor=N 을 읽고 둘 다 cursor+1·count 가산하는 race(이중 가산). 잠금으로 직렬화 →
# 두 번째 제출은 첫 commit 후 cursor=N+1 을 보고 cursor 불일치 409 로 거부된다.
quiz_session = (
await session.execute(
select(StudyQuizSession)
.where(StudyQuizSession.id == body.quiz_session_id)
.with_for_update()
)
).scalar_one_or_none()
quiz_session = await session.get(StudyQuizSession, body.quiz_session_id)
if quiz_session is None or quiz_session.user_id != user.id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
if quiz_session.study_topic_id != q.study_topic_id:
+4 -58
View File
@@ -72,55 +72,6 @@ def _validate_sql_content(name: str, sql: str) -> None:
)
# R1: baseline 스냅샷이 대표하는 마지막 마이그레이션 버전 (이하 버전은 baseline 에 포함).
# 새 baseline 재생성 시 이 값을 갱신한다 (migrations/_baseline/<cutoff>_schema_baseline.sql).
_BASELINE_CUTOFF = 358
async def _load_baseline_if_fresh(conn, migrations_dir: Path) -> None:
"""fresh DB(documents 부재)면 baseline 스키마 스냅샷 적재 + schema_migrations 1..cutoff 스탬프.
기존 DB(documents 존재) 즉시 반환 baseline 미적재, 무영향. baseline 파일 부재 시도
기존 replay 경로 유지(하위호환).
"""
from sqlalchemy import text
baseline_dir = migrations_dir / "_baseline"
baseline_files = (
sorted(baseline_dir.glob("*_schema_baseline.sql")) if baseline_dir.is_dir() else []
)
if not baseline_files:
return
docs_exists = (
await conn.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))
).scalar()
if docs_exists:
return # 기존 DB — baseline skip
baseline_path = baseline_files[-1]
logger.info(f"[migration] fresh DB 감지 — baseline 적재: {baseline_path.name}")
# baseline 은 multi-statement 덤프 — exec_driver_sql(asyncpg prepared)은 multi-statement
# 불허("cannot insert multiple commands into a prepared statement"). raw asyncpg 의 simple
# 프로토콜 execute() 로 적재한다(같은 connection = 현재 트랜잭션 내). psql 스모크는 이 제약을
# 못 잡으므로 init_db 런타임 검증으로 확인됨.
raw = await conn.get_raw_connection()
await raw.driver_connection.execute(baseline_path.read_text(encoding="utf-8"))
# baseline = cutoff 까지의 스키마 → 실제 파일 버전 기준으로 schema_migrations 스탬프.
versions = [v for v, _, _ in _parse_migration_files(migrations_dir) if v <= _BASELINE_CUTOFF]
for v in versions:
await conn.execute(
text(
"INSERT INTO schema_migrations (version, name) "
"VALUES (:v, :n) ON CONFLICT DO NOTHING"
),
{"v": v, "n": f"baseline:{v}"},
)
logger.info(
f"[migration] baseline 적재 + schema_migrations {len(versions)}건 스탬프 (cutoff {_BASELINE_CUTOFF})"
)
async def _run_migrations(conn) -> None:
"""미적용 migration 실행 (호출자가 트랜잭션 관리)"""
from sqlalchemy import text
@@ -139,6 +90,10 @@ async def _run_migrations(conn) -> None:
f"SELECT pg_advisory_xact_lock({_MIGRATION_LOCK_KEY})"
))
# 적용 이력 조회
result = await conn.execute(text("SELECT version FROM schema_migrations"))
applied = {row[0] for row in result}
# migration 파일 스캔
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
@@ -146,15 +101,6 @@ async def _run_migrations(conn) -> None:
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
return
# R1: fresh DB(documents 부재)면 baseline 스냅샷 먼저 적재 + schema_migrations 스탬프.
# migrations/ 전체 replay 는 누적 비-replayable(011 view 의존·326 enum-same-txn 등)로
# 깨지므로 신규/DR 환경은 prod 스키마 스냅샷에서 출발한다. 기존 DB 는 skip(무영향).
await _load_baseline_if_fresh(conn, migrations_dir)
# 적용 이력 조회 (baseline 스탬프 반영 — fresh DB 는 1..cutoff 가 이미 applied)
result = await conn.execute(text("SELECT version FROM schema_migrations"))
applied = {row[0] for row in result}
files = _parse_migration_files(migrations_dir)
pending = [(v, name, path) for v, name, path in files if v not in applied]
+3 -13
View File
@@ -51,7 +51,6 @@ async def lifespan(app: FastAPI):
from workers.briefing_worker import run as morning_briefing_run
from workers.daily_digest import run as daily_digest_run
from workers.dedup_reconcile import run as dedup_reconcile_run
from workers.document_purge_sweep import run as purge_sweep_run
from workers.digest_worker import run as global_digest_run
from workers.file_watcher import watch_inbox
from workers.mailplus_archive import run as mailplus_run
@@ -151,9 +150,6 @@ async def lifespan(app: FastAPI):
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
# R7: delete_file=true purge 요청 문서의 NAS 원본 grace(30일) 후 물리삭제 + audit.
# purge_requested_at 마커 기준(단순 숨김은 보존). 03:20 = 다른 새벽 잡과 비충돌 슬롯.
scheduler.add_job(purge_sweep_run, CronTrigger(hour=3, minute=20, timezone=KST), id="purge_sweep")
# B-3 PR4: 레거시 paper 행 arXiv DataCite DOI 스탬프(재유입 차단). keyless·in-DB·enqueue 0.
# dedup_reconcile(03:30)·fulltext_reconcile(03:40) 와 별 worker·비충돌 슬롯.
scheduler.add_job(paper_doi_reconcile_run, CronTrigger(hour=3, minute=50, timezone=KST), id="paper_doi_reconcile")
@@ -240,27 +236,21 @@ SETUP_BYPASS_PREFIXES = (
"/api/setup", "/api/config", "/setup", "/health", "/docs", "/openapi.json", "/redoc",
)
# R10: 셋업 완료(user 존재)는 단조(monotonic) — 한 번 확인되면 영구. 매 요청 COUNT 쿼리
# 대신 캐시 플래그로 전환 (setup 후 모든 요청이 users COUNT 하던 per-request 비용 제거).
_setup_complete = False
@app.middleware("http")
async def setup_redirect_middleware(request: Request, call_next):
global _setup_complete # 함수 내 read+assign 둘 다 모듈 전역 참조 (UnboundLocalError 방지)
path = request.url.path
# 셋업 완료됐거나 바이패스 경로면 즉시 통과 (DB 쿼리 없음)
if _setup_complete or any(path.startswith(p) for p in SETUP_BYPASS_PREFIXES):
# 바이패스 경로는 항상 통과
if any(path.startswith(p) for p in SETUP_BYPASS_PREFIXES):
return await call_next(request)
# 유저 존재 여부 확인 (셋업 완료 전 1회성 — 완료 확인되면 플래그 set 후 영구 skip)
# 유저 존재 여부 확인
try:
async with async_session() as session:
result = await session.execute(select(func.count(User.id)))
user_count = result.scalar()
if user_count == 0:
return RedirectResponse(url="/setup")
_setup_complete = True
except Exception:
pass # DB 연결 실패 시 통과 (health에서 확인 가능)
+2 -6
View File
@@ -52,8 +52,7 @@ class Document(Base):
# 2계층: AI 가공
ai_summary: Mapped[str | None] = mapped_column(Text)
# R11a: 주석 dict→list 정정(실제 list 적재), 공유 가변 default=[] → callable default=list.
ai_tags: Mapped[list | None] = mapped_column(JSONB, default=list)
ai_tags: Mapped[dict | None] = mapped_column(JSONB, default=[])
ai_domain: Mapped[str | None] = mapped_column(String(100))
ai_sub_group: Mapped[str | None] = mapped_column(String(100))
ai_model_version: Mapped[str | None] = mapped_column(String(50))
@@ -80,7 +79,7 @@ class Document(Base):
user_note: Mapped[str | None] = mapped_column(Text)
# 사용자 태그 (ai_tags와 분리, #태그 파싱 결과 또는 수동 입력)
user_tags: Mapped[list | None] = mapped_column(JSONB, default=list) # R11a: 공유 가변 default 제거
user_tags: Mapped[list | None] = mapped_column(JSONB, default=[])
# 핀 고정
pinned: Mapped[bool] = mapped_column(Boolean, default=False)
@@ -106,9 +105,6 @@ class Document(Base):
# 승인/삭제
review_status: Mapped[str | None] = mapped_column(String(20), default="pending")
deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# delete_file=true 명시 삭제 요청 마커 (R7) — retention sweep(document_purge_sweep)이
# grace 후 NAS 원본 물리삭제. deleted_at(단순 숨김, 파일 보존)과 분리.
purge_requested_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# 외부 편집 URL
edit_url: Mapped[str | None] = mapped_column(Text)
+2 -4
View File
@@ -7,7 +7,7 @@ PR-2 가드레일:
- correct_choice 변경 기존 attempt.is_correct 재계산 (기록은 시점의 사실).
"""
from datetime import datetime, timezone
from datetime import datetime
from pgvector.sqlalchemy import Vector
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, SmallInteger, String, Text
@@ -128,9 +128,7 @@ class StudyQuestionAttempt(Base):
# PR-9: outcome 권장값 (correct/wrong/unsure). 강한 enum 미사용.
outcome: Mapped[str] = mapped_column(String(20), nullable=False)
answered_at: Mapped[datetime] = mapped_column(
# TZ-aware 명시 (R8) — naive datetime.now() 는 컨테이너 TZ 의존. 현 컨테이너=UTC 라
# 값 동일(백필 불요)이나, 컨테이너 TZ 가 바뀌면 9시간 어긋나는 잠복 의존 제거.
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False
DateTime(timezone=True), default=datetime.now, nullable=False
)
# PR-10: 어떤 quiz 세션의 attempt 인지 (NULL = 세션 외 직접 입력 또는 세션 삭제됨).
quiz_session_id: Mapped[int | None] = mapped_column(
+1 -10
View File
@@ -26,16 +26,7 @@ _ATX = re.compile(r'^(#{1,6})\s+(?P<title>\S.*?)\s*#*\s*$')
_KO_JANG = re.compile(r'^\s*(?P<title>제\s*\d+\s*장\b.*)$')
_KO_JEOL = re.compile(r'^\s*(?P<title>제\s*\d+\s*절\b.*)$')
_KO_JO = re.compile(r'^\s*(?P<title>제\s*\d+\s*조\b.*)$')
# _ENG: 영문 구조 헤딩(ATX 미사용 문서용). ASME 파트는 보통 ATX(`# PART PG`)로 잡혀 _ENG 의존 낮음.
# D1: 식별자 뒤가 소문자 문장연속이면("Part III to demonstrate to the satisfaction…") 본문이므로
# 미탐지 — 가짜 절 차단. 선택 제목은 대문자/괄호/숫자로 시작해야 헤딩 인정(소문자 시작=문장으로 봄).
# 식별자는 번호/PG/3.31/UHX/A-1 등 (.·- 소수·하이픈 확장 허용).
_ENG = re.compile(
r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+'
r'[\dIVXLA-Z]+(?:[.\-][\dA-Za-z]+)*'
r'(?:\s+[A-Z(\d][^\n]*)?'
r')\s*$'
)
_ENG = re.compile(r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+[\dIVXLA-Z]+\b.*)$')
# 코드펜스 경계 (FE outlineAnchors.ts:60 `/^\s{0,3}(```|~~~)/` 와 동일). 펜스 내부 라인은
# heading 미탐지 — 코드블록 안 '# foo' 가 가짜 절을 만들지 않게(O3).
+1 -2
View File
@@ -32,8 +32,7 @@ async def find_paper_holder(session, raw_or_normalized_doi):
return None
result = await session.execute(
select(Document)
.where(Document.material_type == "paper", _DOI_EXPR == doi,
Document.deleted_at.is_(None))
.where(Document.material_type == "paper", _DOI_EXPR == doi)
.limit(1)
)
return result.scalars().first()
+36 -4
View File
@@ -54,10 +54,42 @@ QUERY_EMBED_MAXSIZE = 500
# server-side allowlist map. query parameter 가 raw table name 받지 않음.
CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
"baseline": None,
# Phase 2A 임베딩 후보(me5_large_inst·snowflake_l_v2·qwen06·qwen4·qwen4m) 전량 no-go
# 종결(2026-06-12, 후보 전부 -0.03~-0.04) → cand 슬러그·테이블 제거 (R13, 마이그 360
# DROP). read-path 슬러그를 먼저 빼야 embedding_backend=cand_X /search 가 dropped 테이블을
# 읽어 500 나지 않는다. baseline(production)만 잔존.
"cand_me5_large_inst": {
"docs_table": "documents_cand_me5_large_inst",
"chunks_table": "document_chunks_cand_me5_large_inst",
"embed_endpoint": "http://embedding-cand-me5-inst:80/embed",
},
"cand_snowflake_l_v2": {
"docs_table": "documents_cand_snowflake_l_v2",
"chunks_table": "document_chunks_cand_snowflake_l_v2",
"embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed",
},
# ─── Phase 2A (embedding-phase2a-1, 2026-06-12): Qwen3-Embedding 후보 3종 ───
# embed_kind="ollama" = /api/embed 호출 + 쿼리측 instruct prefix (비대칭 사용,
# G-1 fixture 실측: prefix 가 관련쌍 cos +0.016). 문서측은 backfill 이 plain 으로 적재.
# qwen4m = 4B 의 MRL 1024d (dimensions 옵션 — Ollama 가 truncate+재정규화 수행, G-1 실측).
"cand_qwen06": {
"docs_table": "documents_cand_qwen06",
"chunks_table": "document_chunks_cand_qwen06",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:0.6b",
},
"cand_qwen4": {
"docs_table": "documents_cand_qwen4",
"chunks_table": "document_chunks_cand_qwen4",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
},
"cand_qwen4m": {
"docs_table": "documents_cand_qwen4m",
"chunks_table": "document_chunks_cand_qwen4m",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
"embed_dimensions": 1024,
},
}
# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 —
+7 -18
View File
@@ -32,8 +32,6 @@ from typing import TYPE_CHECKING, Literal
from sqlalchemy.ext.asyncio import AsyncSession
from core.database import async_session
from . import query_analyzer, query_rewriter
from .fusion_service import (
DEFAULT_FUSION,
@@ -190,7 +188,6 @@ async def run_search(
snapshot_chunk_id_max=snapshot_chunk_id_max,
reranker_backend=reranker_backend,
rewrite_backend=rewrite_backend,
axis=axis,
)
timing: dict[str, float] = {}
@@ -539,7 +536,6 @@ async def search_with_rewrite(
snapshot_chunk_id_max: int | None,
reranker_backend: str | None,
rewrite_backend: str,
axis: "AxisFilter | None" = None,
) -> PipelineResult:
"""Phase 2Q multi-query retrieval 합성 path (plan v6 §5.5).
@@ -583,20 +579,13 @@ async def search_with_rewrite(
async def _variant_retrieve(
v: str,
) -> "tuple[list[SearchResult], list[SearchResult], dict[int, list[SearchResult]]]":
# 변형별 독립 AsyncSession (fan-out). 공유 session 을 asyncio.gather 로 동시
# execute 에 넘기면 SQLAlchemy async 가 'another operation in progress' 로
# 부하 의존적 비결정 크래시 — variant 마다 독립 연결로 분리한다.
# axis(material_type/jurisdiction/year) 도 single-query path 와 동일하게 전달
# (rewrite 경로가 axis 필터를 조용히 누락하던 결함 수정).
async with async_session() as vsession:
text = await search_text(vsession, v, per_variant_k, axis=axis)
raw_chunks = await search_vector(
vsession, v, per_variant_k,
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
axis=axis,
)
text = await search_text(session, v, per_variant_k)
raw_chunks = await search_vector(
session, v, per_variant_k,
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
)
vector, chunks_by_doc = compress_chunks_to_docs(raw_chunks, per_variant_k)
return text, vector, chunks_by_doc
+8 -12
View File
@@ -95,10 +95,8 @@ except FileNotFoundError:
)
# ─── in-memory 캐시 (FIFO eviction + TTL, query_analyzer 패턴 복제) ─
# R10: (ts, result) 저장 — TTL 미적용으로 원문 수정돼도 CACHE_MAXSIZE 찰 때까지 stale answer
# 반환하던 결함 수정. query_rewriter 의 expire_at TTL enforce 정본 복제.
_CACHE: dict[str, tuple[float, SynthesisResult]] = {}
# ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─
_CACHE: dict[str, SynthesisResult] = {}
def _model_version() -> str:
@@ -124,11 +122,10 @@ def get_cached(query: str, chunk_ids: list[int], backend_name: str = "gemma-macm
entry = _CACHE.get(key)
if entry is None:
return None
ts, result = entry
if time.time() - ts > CACHE_TTL:
_CACHE.pop(key, None) # 만료 — 삭제 후 miss
return None
return result
# TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장
# 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점)
# 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함.
return entry
def _should_cache(result: SynthesisResult) -> bool:
@@ -146,9 +143,8 @@ def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult, backen
if not _should_cache(result):
return
key = _cache_key(query, chunk_ids, backend_name)
now = time.time()
if key in _CACHE:
_CACHE[key] = (now, result)
_CACHE[key] = result
return
if len(_CACHE) >= CACHE_MAXSIZE:
try:
@@ -156,7 +152,7 @@ def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult, backen
_CACHE.pop(oldest, None)
except StopIteration:
pass
_CACHE[key] = (now, result)
_CACHE[key] = result
def cache_stats() -> dict[str, int]:
+1 -2
View File
@@ -2,7 +2,6 @@
from __future__ import annotations
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
@@ -43,7 +42,7 @@ class LocalBackend(StorageBackend):
to_read = _STREAM_CHUNK if remaining is None else min(_STREAM_CHUNK, remaining)
if to_read <= 0:
break
data = await asyncio.to_thread(f.read, to_read)
data = f.read(to_read)
if not data:
break
yield data
+6 -9
View File
@@ -252,15 +252,12 @@ async def gather_explanation_context(
client = AIClient()
query = _build_query(question)
try:
# 같은 AsyncSession 을 asyncio.gather 로 동시 execute 에 넘기면 SQLAlchemy async 가
# 'another operation in progress' 로 부하 의존적 비결정 크래시(이전 주석 'lock 충돌
# 없음' 은 rerank HTTP 만 보고 DB execute 동시성을 간과한 오인). 백그라운드 prefetch
# 라 순차 직렬화 — 사용자 대면 rewrite 경로(독립 세션 fan-out)와는 다른 처방.
docs = await _gather_document_evidence(
session, user_id, question.study_topic_id, query, client
)
questions = await _gather_question_evidence(
session, user_id, question.study_topic_id, question.id, query, client
# 두 조회 병렬화 (rerank 호출이 별개라 lock 충돌 없음)
docs, questions = await asyncio.gather(
_gather_document_evidence(session, user_id, question.study_topic_id, query, client),
_gather_question_evidence(
session, user_id, question.study_topic_id, question.id, query, client
),
)
return ExplanationContext(documents=docs, questions=questions)
finally:
+3 -7
View File
@@ -238,13 +238,9 @@ async def gather_subject_note_context(
client = AIClient()
query = _build_query(subject, scope)
try:
# 같은 AsyncSession 동시 execute 회피 — 순차 직렬화(백그라운드 prefetch).
# explanation_rag.gather_explanation_context 와 동형(R2 공유세션 동시성 수정).
docs = await _gather_document_evidence(
session, user_id, study_topic_id, query, client
)
questions = await _gather_question_evidence(
session, user_id, study_topic_id, subject, scope, query, client
docs, questions = await asyncio.gather(
_gather_document_evidence(session, user_id, study_topic_id, query, client),
_gather_question_evidence(session, user_id, study_topic_id, subject, scope, query, client),
)
return SubjectNoteContext(documents=docs, questions=questions)
finally:
+2 -10
View File
@@ -303,12 +303,10 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
src = await session.get(NewsSource, source_id)
watermark = _watermark(src, category)
newest_seen: datetime | None = None
capped = False # 이번 run 이 cap 으로 카테고리 중도 절단됐는지 (R4)
max_pages = (10**6 if bulk else _MAX_PAGES_PER_CAT)
try:
for page in range(max_pages):
if inserted >= run_cap:
capped = True
break
xml_text = await _fetch(client, query, page * _PAGE_SIZE)
total, entries = parse_arxiv_feed(xml_text)
@@ -331,18 +329,12 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
else:
await session.rollback()
if inserted >= run_cap:
capped = True
break
await asyncio.sleep(_REQ_SLEEP)
if stop or (page + 1) * _PAGE_SIZE >= total:
break
# 카테고리 워터마크 전진 — cap 으로 절단된 run 은 미전진 (R4).
# 절단 시 newest_seen 으로 전진하면 [oldest-ingested, 옛 watermark] 사이
# 미적재 항목이 다음 run 의 watermark 필터(entry.published <= watermark)에
# 영구 배제(silent data loss). 미전진하면 다음 run 이 최신부터 재스캔하며
# 적재분은 dedup-skip(_ingest_entry False, cap 미소모)하고 gap 까지 내려가
# 이어 적재 → 백로그가 run 당 cap 씩 소화(livelock 회피). bulk 은 cap 무관.
if newest_seen and not capped:
# 카테고리 워터마크 전진(이번 run 최신 발행일)
if newest_seen:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
_set_watermark(src, category, newest_seen)
+9 -14
View File
@@ -272,20 +272,15 @@ async def _lookup_news_source(
if not source_name:
return None, None, None
# news_sources prefix 매칭 — R10: 전체 로드+Python 루프 대신 DB 필터 푸시다운.
# (name == source_name) OR (name 이 "source_name " 로 시작) = 기존 split[0]==source_name 동치
# (첫 토큰 일치 = 정확일치 또는 'source_name ' prefix). autoescape 로 %/_ 안전.
result = await session.execute(
select(NewsSource)
.where(
(NewsSource.name == source_name)
| NewsSource.name.startswith(source_name + " ", autoescape=True)
)
.limit(1)
)
src = result.scalars().first()
if src is not None:
return src.country, src.name, src.language
# news_sources에서 이름이 일치하는 레코드 찾기 (prefix match)
result = await session.execute(select(NewsSource))
sources = result.scalars().all()
for src in sources:
if source_name and (
src.name.split(" ")[0] == source_name
or src.name.startswith(source_name + " ")
):
return src.country, src.name, src.language
logger.warning(
f"[chunk] news_source 매핑 실패: doc_id={doc.id} ai_sub_group={source_name!r} "
+1 -3
View File
@@ -563,9 +563,7 @@ async def process(
doc.facet_doctype = ai_doctype
# ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ───
# R9: 기존 제안(material_type 제안 등) 우선 — doc.ai_suggestion is None 가드 추가
# (material 제안 블록과 대칭). 없으면 거래문서 제안이 기존 제안을 clobber('기존 제안 우선' 위반).
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES and doc.ai_suggestion is None:
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES:
year = doc.facet_year or datetime.now(timezone.utc).year
doc.ai_suggestion = {
"proposed_category": "library",
+19 -31
View File
@@ -5,8 +5,7 @@ DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
SMTP 발송은 2026-06-10 제거 ( 번도 전달 성공한 없는 기능 폐기 결정).
"""
import asyncio
from datetime import datetime, time, timedelta, timezone
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from pathlib import Path
@@ -21,36 +20,17 @@ from models.queue import ProcessingQueue
logger = setup_logger("daily_digest")
def _write_and_rotate(digest_dir: Path, today: str, markdown: str) -> Path:
"""digest 파일 저장 + 90일 초과 아카이브 이동 (blocking — caller 가 to_thread, R8)."""
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
return digest_path
async def run():
"""일일 다이제스트 생성 + 저장 + 발송"""
# KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire).
kst = ZoneInfo("Asia/Seoul")
today = datetime.now(kst).date()
# KST 하루를 UTC 범위로 변환 (R8) — func.date(created_at)는 pg TimeZone(UTC) 기준 날짜라
# KST 0~9시 생성 문서(UTC 전날)가 누락되던 경계 버그. created_at(UTC저장) 범위 비교로.
start_utc = datetime.combine(today, time.min, tzinfo=kst).astimezone(timezone.utc)
end_utc = start_utc + timedelta(days=1)
# KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire). date 객체로 비교 — Document.created_at::date 와 직접 매칭.
today = datetime.now(ZoneInfo("Asia/Seoul")).date()
sections = []
async with async_session() as session:
# ─── 1. 오늘 추가된 문서 ───
added = await session.execute(
select(Document.ai_domain, func.count(Document.id))
.where(Document.created_at >= start_utc, Document.created_at < end_utc)
.where(func.date(Document.created_at) == today)
.group_by(Document.ai_domain)
)
added_rows = added.all()
@@ -69,8 +49,7 @@ async def run():
select(Document.title)
.where(
Document.source_channel == "law_monitor",
Document.created_at >= start_utc,
Document.created_at < end_utc,
func.date(Document.created_at) == today,
)
)
law_rows = law_docs.scalars().all()
@@ -87,8 +66,7 @@ async def run():
select(func.count(Document.id))
.where(
Document.source_channel == "email",
Document.created_at >= start_utc,
Document.created_at < end_utc,
func.date(Document.created_at) == today,
)
)
email_total = email_count.scalar() or 0
@@ -123,7 +101,7 @@ async def run():
)
failed_count = failed.scalar() or 0
if failed_count > 0:
section += f"\n**[주의] 실패 {failed_count}건** — 수동 확인 필요\n"
section += f"\n⚠️ **실패 {failed_count}건** — 수동 확인 필요\n"
sections.append(section)
# ─── 5. Inbox 미분류 ───
@@ -141,8 +119,18 @@ async def run():
markdown += "\n".join(sections)
markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n"
# ─── NAS 저장 + 90일 아카이브 (blocking 파일 I/O off-thread, R8/R5 일관) ───
# ─── NAS 저장 ───
digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests"
digest_path = await asyncio.to_thread(_write_and_rotate, digest_dir, str(today), markdown)
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
# ─── 90일 초과 아카이브 ───
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
logger.info(f"다이제스트 생성 완료: {digest_path}")
+2 -6
View File
@@ -144,13 +144,9 @@ async def process(
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
raise
except Exception as exc:
# 호출 실패(네트워크/API 5xx 등)는 삼키지 않고 전파 (R3) — queue_consumer 가
# attempts 소진까지 재시도 후 status=failed(dead-letter)로 가시화한다. 삼키면
# worker_fn 이 정상 반환 → 큐가 completed 로 확정 → ai_detail_summary 영구 누락 +
# tier 가 triage 에 고착(silent 영구 손실). extract/marker/fulltext/stt 정본과 일치.
# 완주 전 doc 쓰기(168~)는 일어나지 않으므로 부분 쓰기 0 (sleep-안전).
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
raise
parse_error = "call_failed"
raw = ""
finally:
await client.close()
-65
View File
@@ -1,65 +0,0 @@
"""delete_file=true 로 요청된 문서의 NAS 원본을 grace 후 물리삭제 (R7 retention sweep).
purge_requested_at 마커 기준(deleted_at 아님 일반 soft-delete/숨김은 파일 보존, undelete
가능). grace(30) 경과 + 파일 존재 unlink + AUDIT 로그. 파일 존재 체크로 멱등
(재실행 이미 삭제된 skip). 요청 경로(DELETE) 동기 비가역 op 0 모두 cron 으로.
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from models.document import Document
logger = logging.getLogger("purge_sweep")
PURGE_GRACE_DAYS = 30
def _unlink_if_exists(p: Path) -> bool:
"""파일이 있으면 unlink (blocking — caller 가 to_thread). 존재 여부 반환(멱등)."""
if p.exists():
p.unlink()
return True
return False
async def run() -> int:
"""purge 요청 + grace 경과 문서의 NAS 원본 물리삭제. 삭제 건수 반환."""
cutoff = datetime.now(timezone.utc) - timedelta(days=PURGE_GRACE_DAYS)
async with async_session() as session:
rows = (
await session.execute(
select(Document.id, Document.file_path, Document.purge_requested_at).where(
Document.purge_requested_at.is_not(None),
Document.purge_requested_at < cutoff,
Document.file_path.is_not(None),
)
)
).all()
purged = 0
for doc_id, file_path, requested_at in rows:
nas_path = Path(settings.nas_mount_path) / file_path
try:
existed = await asyncio.to_thread(_unlink_if_exists, nas_path)
if existed:
purged += 1
# AUDIT — 물리삭제 기록 (가시화). doc_id / 경로 / 요청일 / grace.
logger.warning(
"PURGE doc_id=%s file=%s requested_at=%s grace_days=%s",
doc_id,
file_path,
requested_at.isoformat() if requested_at else None,
PURGE_GRACE_DAYS,
)
except OSError as e:
logger.error("PURGE 실패 doc_id=%s file=%s: %s", doc_id, file_path, e)
if purged:
logger.info("[purge_sweep] NAS 원본 %d건 물리삭제 (grace %d일)", purged, PURGE_GRACE_DAYS)
return purged
+3 -14
View File
@@ -17,7 +17,6 @@ Web/Blog ingest (devonagent 트랙, plan db-snuggly-petal.md):
- sidecar (.json) 누락 : skip 하고 ingest, web_meta.sidecar_missing=true
"""
import asyncio
import hashlib
import json
from pathlib import Path
@@ -137,10 +136,6 @@ def _canonicalize_url(url: str) -> str:
같은 글의 utm 변형 (`?utm_source=foo`) fragment 변형 (`#section`) 을
row 수렴시키기 위해 file_hash 산출 반드시 거친다.
R11c: news_collector._normalize_url(news 채널) 의도적으로 다르다 이쪽(web_clip)
query-sort/trailing-slash/소문자화로 공격적 정규화하지만, news 쪽은 query-식별 사이트의
별개 기사 붕괴 방지를 위해 보수적이다. 함수 통합 금지(채널별 dedup 의도가 다름).
"""
if not url:
return ""
@@ -251,8 +246,7 @@ async def watch_inbox():
async with async_session() as session:
# ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ───
if web_root.exists():
# rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))):
for file_path in web_root.rglob("*.html"):
if not file_path.is_file() or should_skip(file_path):
continue
rel_path = str(file_path.relative_to(nas_root))
@@ -270,8 +264,7 @@ async def watch_inbox():
Path(sub).name, (None, None, None)
)
# NFS 디렉토리 walk(blocking) off-thread 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))):
for file_path in scan_root.rglob("*"):
if not file_path.is_file() or should_skip(file_path):
continue
@@ -285,11 +278,7 @@ async def watch_inbox():
continue
rel_path = str(file_path.relative_to(nas_root))
# GB 파일 SHA-256 은 이벤트 루프를 점유 → 같은 루프의 모든 1분 주기 consumer
# + FastAPI 요청이 수십초~분 동시 정지. to_thread 오프로드. 스캔 루프가 이미
# 순차라 file_hash 는 한 번에 하나만 실행(직렬화) — 병렬 해싱 X = NFS 2.5GbE
# 대역폭·버퍼 메모리 blowup 방지 (R5).
fhash = await asyncio.to_thread(file_hash, file_path)
fhash = file_hash(file_path)
result = await session.execute(
select(Document).where(Document.file_path == rel_path)
-8
View File
@@ -297,10 +297,6 @@ async def collect_disaster_cases(session) -> int:
await _ingest_attachment(session, boardno, filenm, filepath)
except FeedError as e:
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
# 케이스 단위 commit (R4) — 이후 페이지/케이스의 _api_get 실패가 앞서 적재한
# 케이스까지 전체 rollback 하지 않게 부분 적재 보존 (csb/api_standards idiom).
await session.commit()
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
@@ -378,8 +374,6 @@ async def collect_fatal_accidents(session) -> int:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
new_count += 1
# 케이스 단위 commit (R4) — 이후 페이지 실패가 앞 케이스 전체 rollback 방지.
await session.commit()
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
@@ -456,8 +450,6 @@ async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
await session.flush()
await enqueue_stage(session, doc.id, "extract")
ingested += 1
# 항목 단위 commit (R4) — 다운로드 실패가 앞서 적재한 GUIDE 항목 전체 rollback 방지.
await session.commit()
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
+1 -5
View File
@@ -39,11 +39,7 @@ from models.queue import ProcessingQueue
logger = logging.getLogger(__name__)
# 마크다운 추출 엔드포인트. compose env `MARKER_ENDPOINT`(base URL)에서 읽는다 —
# 기본=marker(무변), 컷오버=`http://mineru-service:3301` 로 env 플립만으로 전환.
# marker/mineru 가 동일 /convert 계약(file_path·start/end·md+base64 images)이라 워커 무변.
_MARKDOWN_BASE = os.getenv("MARKER_ENDPOINT", "http://marker-service:3300").rstrip("/")
MARKER_ENDPOINT = _MARKDOWN_BASE if _MARKDOWN_BASE.endswith("/convert") else _MARKDOWN_BASE + "/convert"
MARKER_ENDPOINT = "http://marker-service:3300/convert"
MARKER_TIMEOUT = 300 # 큰 PDF 5 분 한도
MAX_PAGES = 200 # 소형 1-shot 경로 /convert max_pages 안전장치
+101 -65
View File
@@ -83,10 +83,6 @@ def _normalize_url(url: str) -> str:
query 전체 제거 금지: hada.io/topic?id= · aitimes articleView.html?idxno= ·
HN item?id= query-식별 사이트에서 별개 기사가 같은 URL 붕괴된다.
저장(edit_url)·조회 양쪽이 함수를 공유해야 dedup 성립.
R11c: file_watcher._canonicalize_url(web_clip 채널) 의도적으로 다르다 이쪽은 콘텐츠
식별 query 보존(별개 기사 붕괴 방지) 핵심이라 query-sort/trailing-slash/소문자화를 한다.
함수 통합 금지(news dedup 깨짐). 채널별 normalization 의도된 설계.
"""
parsed = urlparse(url)
kept = [
@@ -401,55 +397,6 @@ def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict:
}
async def _already_ingested(session, article_id: str, normalized_url: str, link: str) -> bool:
"""이미 적재된 기사인지 — file_hash 또는 정규화/raw edit_url 매칭 (3 fetch 공통, R11c).
레거시 raw URL + 교차 게시 다중 매칭 내성(first). _fetch_rss/_fetch_api_guardian/
_fetch_api_nyt 복제하던 동일 존재체크를 단일화.
"""
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id)
| (Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
return existing.scalars().first() is not None
def _build_news_doc(source, ident, source_short, article_id, title, body,
extractor_version, normalized_url, pub_dt) -> Document:
"""3 fetch 공통 뉴스 Document 빌더 (R11c). 채널별 차이는 인자로만 — body(NYT=summary)·
extractor_version·ident(category 계산 차이 흡수) 다르고 22 필드 구조는 정적 동일.
edit_url 조회와 동일 정규화 저장(raw 저장 URL dedup 무력화)."""
return Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version=extractor_version,
# article = 텍스트 네이티브 → 생성 시점 terminal 'skipped' 명시(markdown 변환 비대상,
# 미명시 시 'pending' 영구 비수렴 → backlog 지표 오염). page 정책은 fulltext_worker 승격.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
@@ -568,7 +515,13 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
if await _already_ingested(session, article_id, normalized_url, link):
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
@@ -580,9 +533,35 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, body,
extractor_version, normalized_url, pub_dt,
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version=extractor_version,
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
# 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
@@ -679,7 +658,13 @@ async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
if await _already_ingested(session, article_id, normalized_url, link):
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
@@ -690,9 +675,30 @@ async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, body,
"guardian_api_full" if is_full else "guardian_api", normalized_url, pub_dt,
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version="guardian_api_full" if is_full else "guardian_api",
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
@@ -749,7 +755,13 @@ async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
if await _already_ingested(session, article_id, normalized_url, link):
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
@@ -760,9 +772,33 @@ async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, summary,
"nyt_api", normalized_url, pub_dt,
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_at=datetime.now(timezone.utc),
extractor_version="nyt_api",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
+1 -8
View File
@@ -331,13 +331,11 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
filter_str = (build_issn_filter(wm_key, watermark) if kind == "issn"
else build_filter(wm_key, watermark))
newest: str | None = None
capped = False # 이번 run 이 cap 으로 시드 중도 절단됐는지 (R4)
cursor = "*"
max_pages = (10**6 if bulk else _MAX_PAGES_PER_KW)
try:
for _page in range(max_pages):
if inserted >= run_cap:
capped = True
break
text = await _fetch(client, key, filter_str, cursor)
_count, next_cursor, works = parse_openalex_works(text)
@@ -355,17 +353,12 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
else:
await session.rollback()
if inserted >= run_cap:
capped = True
break
await asyncio.sleep(_REQ_SLEEP)
if not next_cursor:
break
cursor = next_cursor
# cap 절단 시 워터마크 미전진 — 미페치 works 가 다음 run 의 watermark 필터
# (publication_date > watermark)에 영구 배제되는 silent loss 방지. 미전진하면
# 다음 run 이 옛 watermark 부터 재페치하며 적재분 dedup-skip(cap 미소모) 후
# 이어 적재 → 백로그 run 당 cap 소화 (R4). bulk 은 cap 무관.
if newest and not capped:
if newest:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
_set_watermark(src, wm_key, newest)
+142
View File
@@ -0,0 +1,142 @@
"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1).
docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \
--target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32]
설계 원칙 (plan r3):
- resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) 중단/재실행 이어서.
배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트".
- 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL).
cand 테이블은 동결 범위로만 INSERT (retrieval cand path snapshot filter 타는 전제).
- 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input /
chunk_worker [제목][섹션][본문]) + plain (instruct prefix 쿼리 전용 G-1 불변식).
- 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력).
- qwen4m CLI 대상이 아님 qwen4 적재 SQL 파생(subvector+l2_normalize), plan E-1.
"""
import argparse
import asyncio
import hashlib
import time
import httpx
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from workers.embed_worker import _build_embed_input
logger = setup_logger("phase2a_cand_backfill")
OLLAMA_EMBED = "http://ollama:11434/api/embed"
TARGETS = {
"qwen06": {
"model": "qwen3-embedding:0.6b", "dim": 1024,
"docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06",
},
"qwen4": {
"model": "qwen3-embedding:4b", "dim": 2560,
"docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4",
},
}
async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]:
r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600)
r.raise_for_status()
embs = r.json()["embeddings"]
if len(embs) != len(texts):
raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}")
return embs
async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT d.id FROM documents d
WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id)
ORDER BY d.id LIMIT :b
"""), {"m": doc_id_max, "b": batch})).scalars().all()
if not rows:
break
docs = [(await session.get(Document, i)) for i in rows]
inputs = [_build_embed_input(d) for d in docs]
embs = await _embed_batch(http, target["model"], inputs)
for d, inp, e in zip(docs, inputs, embs):
await session.execute(text(f"""
INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding)
VALUES (:i, :h, cast(:e AS vector))
ON CONFLICT (doc_id) DO NOTHING
"""), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})")
return total
async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title
FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id
WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id)
ORDER BY c.id LIMIT :b
"""), {"m": chunk_id_max, "b": batch})).all()
if not rows:
break
inputs = [
f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}"
for r in rows
]
embs = await _embed_batch(http, target["model"], inputs)
for r, e in zip(rows, embs):
await session.execute(text(f"""
INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding)
VALUES (:i, :d, :x, :s, :t, cast(:e AS vector))
ON CONFLICT (id) DO NOTHING
"""), {"i": r.id, "d": r.doc_id, "x": r.chunk_index,
"s": r.section_title, "t": r.text, "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})")
return total
async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None:
target = TARGETS[target_key]
start = time.monotonic()
async with httpx.AsyncClient() as http:
nd = await backfill_docs(target, doc_id_max, batch, http)
nc = await backfill_chunks(target, chunk_id_max, batch, http)
mins = (time.monotonic() - start) / 60
async with async_session() as session:
cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one()
cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one()
logger.info(
f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · "
f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)"
)
def main() -> None:
p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)")
p.add_argument("--target", required=True, choices=sorted(TARGETS))
p.add_argument("--doc-id-max", type=int, required=True)
p.add_argument("--chunk-id-max", type=int, required=True)
p.add_argument("--batch", type=int, default=32)
a = p.parse_args()
asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch))
if __name__ == "__main__":
main()
+2 -18
View File
@@ -275,15 +275,7 @@ async def _process_stage(stage, worker_fn):
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await skip_session.commit()
# 완료 커밋 후 enqueue — 실패가 outer except 로 전파돼 completed 재오픈
# 되지 않게 격리 (R3, 정상 완료 경로와 동일 처리).
try:
await enqueue_next_stage(document_id, stage)
except Exception as enq_err:
logger.error(
f"[{stage}] document_id={document_id} skip(note) 완료됐으나 "
f"다음 단계 enqueue 실패: {enq_err}"
)
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} skip (note)")
continue
@@ -301,15 +293,7 @@ async def _process_stage(stage, worker_fn):
item.completed_at = datetime.now(timezone.utc)
await session.commit()
# 완료는 이미 커밋됨. enqueue_next_stage 실패가 outer except 로 전파되면
# completed 항목을 재오픈(pending/failed)해 같은 단계를 재실행 = 비싼 작업 중복
# + 부분 재쓰기. 자체 try 로 격리하고 ERROR 로 가시화한다 (R3).
try:
await enqueue_next_stage(document_id, stage)
except Exception as enq_err:
logger.error(
f"[{stage}] document_id={document_id} 완료됐으나 다음 단계 enqueue 실패: {enq_err}"
)
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} 완료")
except StageDeferred as defer:
+1 -3
View File
@@ -102,9 +102,7 @@ async def _process_one(session: AsyncSession, qid: int, client: AIClient) -> boo
try:
async with asyncio.timeout(EMBED_TIMEOUT_S):
vec = await client.embed(text)
except asyncio.CancelledError:
raise # 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception as e:
except (asyncio.TimeoutError, Exception) as e:
logger.warning("study_q_embed_failed qid=%s err=%s: %s", qid, type(e).__name__, e)
# 실패 — status='failed'. 직전 embedding 보존.
q.embedding_status = "failed"
+1 -6
View File
@@ -121,12 +121,7 @@ async def process(document_id: int, session: AsyncSession) -> None:
ok = _extract_thumbnail(source, output, seek)
if not ok:
# 썸네일 추출 실패(ffmpeg)는 삼키지 않고 raise (R3) — queue_consumer 가 attempts
# 소진까지 재시도 후 status=failed 로 가시화. silent return 이면 큐가 completed 로
# 확정 + 썸네일 영구 누락 + 재시도/추적 0 (silent skip). 손상 영상이면 failed 로 안착.
raise RuntimeError(
f"thumbnail 추출 실패: document_id={document_id} source={source}"
)
return
doc.thumbnail_path = str(output)
doc.updated_at = datetime.now(timezone.utc)
-8
View File
@@ -52,11 +52,6 @@ DOMAIN_PRIORITY: list[tuple[str, str]] = [
("manual", "source_channel = 'manual'"),
]
# R12: filter_clause 는 SQL 에 직접 보간되므로 이 allowlist(DOMAIN_PRIORITY 출처) 통과분만
# 허용 — 현재 모듈 상수라 injection 경로 0 이나, 외부 입력화 시 즉시 차단하는 final gate
# (retrieval_service 의 _VALID_DOCS_TABLE allowlist 정본 대비 비대칭 해소).
_ALLOWED_FILTER_CLAUSES: frozenset[str] = frozenset(c for _, c in DOMAIN_PRIORITY)
async def _classify_pending(session: AsyncSession) -> int:
return int(await session.scalar(text("""
@@ -71,9 +66,6 @@ async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int)
extracted_text 문자열 (LENGTH=0) 제외 classify_worker not doc.extracted_text
truthy 체크라 문자열에서 ValueError raise. 무한 retry 루프 방지.
"""
# R12: SQL 직접 보간 전 allowlist final gate.
if filter_clause not in _ALLOWED_FILTER_CLAUSES:
raise ValueError(f"비허용 filter_clause (allowlist 외): {filter_clause!r}")
sql = text(f"""
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
SELECT id, 'classify', 'pending', 0, 3
+2
View File
@@ -1,6 +1,8 @@
# hyungi_Document_Server 설정
ai:
gateway:
endpoint: "http://ai-gateway:8080"
models:
# ─── 단일 generation 호스트 routing (2026-05-14 GPU LLM 제거) ───
+32 -24
View File
@@ -54,27 +54,24 @@ services:
start_period: 180s
restart: unless-stopped
# MinerU 2.5 VLM PDFmarkdown 추출 — ★ marker-service 대체(컷오버 2026-06-18, A/B 8/8 PASS).
# 단일카드 markdown VRAM ~10GB(marker)→~5.9GB 고정. fastapi 가 MARKER_ENDPOINT 로 호출.
# 동기 do_parse 버그 회피 위해 server.py 는 async aio_do_parse 사용. 포트 3301.
mineru-service:
build: ./services/mineru
# Phase 1B (2026-05-01): PDFmarkdown 변환. ocr-service 와 별도 컨테이너 (deps 충돌 회피).
marker-service:
build: ./services/marker
ports:
- "127.0.0.1:3301:3301"
- "127.0.0.1:3300:3300"
expose:
- "3301"
- "3300"
environment:
# vlm-engine = 순수 VLM 단일모델. 기본 hybrid-engine 은 다중모델 로드 = OOM(반드시 명시).
- MINERU_BACKEND=vlm-engine
- MINERU_LANG=${MINERU_LANG:-korean}
# 공유 16GB 카드 공존: 절대 VRAM 캡(GB, 공유카드 robust) + vLLM 분율 캡 병용.
- MINERU_VIRTUAL_VRAM_SIZE=${MINERU_VIRTUAL_VRAM_SIZE:-6}
- MINERU_GPU_MEMORY_UTILIZATION=${MINERU_GPU_MEMORY_UTILIZATION:-0.40}
- MINERU_PRELOAD=${MINERU_PRELOAD:-1}
- HF_HOME=/models/huggingface
- TORCH_HOME=/models/torch
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~3.5GB) 해제가 90% 봉투의 전제.
# /ready 는 idle 에서도 200 (fastapi depends_on service_healthy 유지).
# 롤백 = MARKER_PRELOAD=1 + MARKER_IDLE_UNLOAD_MINUTES=0.
- MARKER_PRELOAD=0
- MARKER_IDLE_UNLOAD_MINUTES=${MARKER_IDLE_UNLOAD_MINUTES:-30}
volumes:
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
- mineru_models:/root/.cache
ipc: host # vLLM 공유메모리 — 공식 run 의 --ipc=host 대응.
- marker_models:/models
deploy:
resources:
reservations:
@@ -83,11 +80,11 @@ services:
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3301/ready"]
test: ["CMD", "curl", "-f", "http://localhost:3300/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 900s # VLM 모델 lazy 다운로드(~2.4GB)+엔진 로드 여유.
start_period: 300s
restart: unless-stopped
stt-service:
@@ -152,7 +149,7 @@ services:
# → 32 한도 초과 → 413. 64 로 늘림.
# GPU VRAM free 6199MiB 충분. baseline path (MAX_RERANK_INPUT=200) 영향 0.
- MAX_BATCH_TOKENS=16384
- MAX_CLIENT_BATCH_SIZE=256 # 2026-06-18 fix: 64→256, MAX_RERANK_INPUT=200 커버 (batch>64 ERROR=RRF silent fallback 해소; MAX_BATCH_TOKENS가 VRAM 상한이라 entries 증가는 VRAM 무관)
- MAX_CLIENT_BATCH_SIZE=64
- MAX_CONCURRENT_REQUESTS=4
volumes:
- reranker_cache:/data
@@ -171,6 +168,19 @@ services:
start_period: 120s
restart: unless-stopped
ai-gateway:
build: ./gpu-server/services/ai-gateway
ports:
- "127.0.0.1:8081:8080"
environment:
- PRIMARY_ENDPOINT=http://100.76.254.116:8801/v1/chat/completions
- FALLBACK_ENDPOINT=http://ollama:11434/v1/chat/completions
- CLAUDE_API_KEY=${CLAUDE_API_KEY:-}
- DAILY_BUDGET_USD=${DAILY_BUDGET_USD:-5.00}
# depends_on: ollama 제거 (2026-06-08) — ollama 서비스가 standalone 으로 이관됨.
# FALLBACK_ENDPOINT 의 ollama:11434 는 standalone(동일 hostname, DS 망 부착)으로 해소.
restart: unless-stopped
fastapi:
build: ./app
ports:
@@ -187,8 +197,7 @@ services:
condition: service_healthy
kordoc-service:
condition: service_healthy
# 마크다운 엔진 = mineru-service (marker-service 제거 2026-06-18, 롤백=git history).
mineru-service:
marker-service:
condition: service_healthy
env_file:
- credentials.env
@@ -196,8 +205,7 @@ services:
- DATABASE_URL=postgresql+asyncpg://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
- KORDOC_ENDPOINT=http://kordoc-service:3100
- OCR_ENDPOINT=http://ocr-service:3200
# ★ 컷오버 2026-06-18: marker-service:3300 → mineru-service:3301 (동일 /convert 계약).
- MARKER_ENDPOINT=http://mineru-service:3301
- MARKER_ENDPOINT=http://marker-service:3300
- MARKER_CONTAINER_PATH_PREFIX=/documents
# 2026-05-08 (D9 Track B revised): GPU stt-service 정식 승격, 내부 DNS 사용.
- STT_ENDPOINT=http://stt-service:3300
@@ -275,4 +283,4 @@ volumes:
reranker_cache:
ocr_models:
stt_models:
mineru_models:
marker_models:
@@ -1,18 +1,13 @@
<script lang="ts">
// 문서 상세 좌측 절(section) 목차 (PR-DocSrv-Hier-Section-UI-1).
// - ASME 등 구조화 코드(buildPartOutline.hasParts): front-matter 단일 접이그룹 + PART 접이
// (기본 접힘, 1030 flat → ~14 top-level). scroll-spy/딥링크 진입 시 조상 PART auto-expand. (D8)
// - 그 외(per-doc): groupOrFlat 폴백 — top-segment 1단 그룹 vs flat(5140/5186/비-ASME 무회귀).
// - groupOrFlat 로 per-doc 동적 (top-segment 1단 그룹 vs flat).
// - 항목 클릭 → 인라인 아코디언으로 요약/section_type/heading_path breadcrumb 표시.
import { untrack } from 'svelte';
// - 본문 스크롤 점프 없음(§Q2, deep-link 는 follow-up). summary=NULL 은 "요약 없음" 문구.
import Badge from '$lib/components/ui/Badge.svelte';
import {
cleanHeading,
pathSegments,
groupOrFlat,
buildPartOutline,
partGroupViews,
groupKeyByChunkId,
sectionTypeLabel,
type DocumentSection,
type OutlineItem,
@@ -22,38 +17,14 @@
sections: DocumentSection[];
/** 항목 클릭 시 본문 점프 콜백(부모가 #sec-{chunkId} scrollIntoView). 없으면 아코디언만. */
onJump?: (chunkId: number) => void;
/** scroll-spy 현재 절(chunk_id) — 강조 + Part auto-expand. */
/** scroll-spy 현재 절(chunk_id) — 강조. */
activeKey?: number | null;
}
let { sections, onJump, activeKey = null }: Props = $props();
let partOutline = $derived(buildPartOutline(sections));
// hasParts(ASME 등): Part 접이 모드. 아니면 partViews=null → groupOrFlat 폴백.
let partViews = $derived(partOutline.hasParts ? partGroupViews(partOutline) : null);
let layout = $derived.by(() => (partOutline.hasParts ? null : groupOrFlat(sections)));
let groupIndex = $derived(partViews ? groupKeyByChunkId(partViews) : null);
let layout = $derived(groupOrFlat(sections));
let total = $derived(sections.length);
let selectedId = $state<number | null>(null);
// Part 그룹 접이 상태: key 없으면 접힘(기본 전부 접힘). $state Record = Svelte5 deep-proxy 반응형.
let expanded = $state<Record<string, boolean>>({});
function toggleGroup(key: string) {
expanded[key] = !expanded[key];
}
// 문서 전환(DocumentViewer 가 sections prop 교체) 시 접이/선택 리셋 — 문서 간 PART 라벨/chunk_id 가
// 우연히 겹쳐 이전 펼침/선택이 이월되는 것 차단(기본 전부 접힘 불변식 보존). untrack=쓰기 자기재발화 차단.
$effect(() => {
void sections;
untrack(() => { expanded = {}; selectedId = null; });
});
// scroll-spy/딥링크 활성 절의 조상 Part 를 펼침(다른 그룹은 건드리지 않음). untrack=쓰기 자기재발화 차단.
$effect(() => {
const ak = activeKey;
const idx = groupIndex;
if (ak == null || !idx) return;
const gk = idx.get(ak);
if (gk) untrack(() => { expanded[gk] = true; });
});
function toggle(item: OutlineItem) {
const id = item.section.chunk_id;
@@ -124,37 +95,7 @@
<span class="text-faint font-normal">{total}</span>
</h3>
{#if partViews}
<!-- Part 접이 모드 (ASME 등): front-matter 단일 그룹 + PART 접이, 기본 접힘 -->
<div class="space-y-1">
{#each partViews as g (g.key)}
{@const isOpen = !!expanded[g.key]}
<div>
<button
type="button"
onclick={() => toggleGroup(g.key)}
aria-expanded={isOpen}
class={[
'w-full flex items-center gap-1.5 px-2 py-1.5 rounded-md text-[11px] font-semibold uppercase tracking-wide transition-colors',
g.isFrontMatter ? 'text-faint' : 'text-dim',
'hover:bg-surface hover:text-text',
].join(' ')}
>
<span class="shrink-0 transition-transform duration-150 {isOpen ? 'rotate-90' : ''}"></span>
<span class="flex-1 min-w-0 text-left truncate normal-case">{g.label}</span>
<span class="font-normal text-faint">{g.items.length}</span>
</button>
{#if isOpen}
<ul class="space-y-0.5 mt-0.5">
{#each g.items as item (item.section.chunk_id)}
{@render itemRow(item)}
{/each}
</ul>
{/if}
</div>
{/each}
</div>
{:else if layout?.mode === 'group'}
{#if layout.mode === 'group'}
<div class="space-y-3">
{#each layout.groups as g (g.key)}
<div>
@@ -177,7 +118,7 @@
</div>
{:else}
<ul class="space-y-0.5">
{#each layout?.items ?? [] as item (item.section.chunk_id)}
{#each layout.items as item (item.section.chunk_id)}
{@render itemRow(item)}
{/each}
</ul>
+39 -16
View File
@@ -65,19 +65,6 @@ docMarked.use({
`</figure>`
);
},
// 외부 링크(http/https) → 새 탭 + rel=noopener noreferrer (탭내빙 차단). 521건 실재.
// 내부/프래그먼트/상대 링크는 손대지 않음 — `#` anchor 는 gfmHeadingId/outline 경로 유지
// (클릭 인터셉터 없음 → 충돌 0), 상대 .md(코퍼스 0건)는 기본 동작(inert). marked 15 토큰객체 시그니처.
link(token: any): string {
const href = (token?.href ?? '') as string;
const text = this.parser.parseInline(token?.tokens ?? []);
const titleAttr = token?.title ? ` title="${escAttr(token.title as string)}"` : '';
const safeHref = escAttr(href);
if (/^https?:\/\//i.test(href)) {
return `<a href="${safeHref}"${titleAttr} target="_blank" rel="noopener noreferrer">${text}</a>`;
}
return `<a href="${safeHref}"${titleAttr}>${text}</a>`;
},
},
});
@@ -95,8 +82,6 @@ const SANITIZE_OPTS = {
'data-md-image-internal',
'data-md-image-alt',
'loading',
'target',
'rel',
],
ADD_TAGS: ['figure', 'figcaption'],
FORBID_TAGS: ['script', 'iframe', 'object', 'embed', 'link', 'meta'],
@@ -141,11 +126,49 @@ function _protectMath(text: string, slots: string[]): string {
});
}
// ── 이미지 pre-render ─────────────────────────────────────────────────────────
// docMarked 의 image 렌더러(.use renderer)가 런타임에 미발화하면 `![](docimg:img_NNN)` 가
// 기본 `<img src="docimg:..">` 로 떨어지고, DOMPurify(ALLOW_UNKNOWN_PROTOCOLS:false)가
// `docimg:` 를 미지원 프로토콜로 제거 → placeholder 도 이미지도 둘 다 사라진다(수식 토크나이저
// 미발화와 동형 증상). → marked 가 손대기 전에 image ref 를 placeholder figure 로 직접 변환해
// 슬롯 보호(렌더러 발화 여부와 무관). 슬롯/복원 메커니즘은 수식과 공유.
const _IMG_RE = /!\[([^\]]*)\]\(([^)\s]+)\)/g;
function _imagePlaceholder(alt: string, href: string): string {
const isInternal = href.startsWith('docimg:');
const basename = href.split('/').pop() ?? href;
const labelSrc = alt || basename || '이미지';
const safeHref = escAttr(href);
const safeAlt = escAttr(alt);
const safeLabel = escText(`[이미지: ${labelSrc} — 아직 표시되지 않음]`);
const internalFlag = isInternal ? '1' : '0';
return (
`<figure class="md-image-placeholder" data-md-img="1" data-md-image-src="${safeHref}" data-md-image-internal="${internalFlag}" data-md-image-alt="${safeAlt}">` +
`<div class="md-image-placeholder-card">` +
`<span class="md-image-placeholder-icon" aria-hidden="true">🖼️</span>` +
`<span class="md-image-placeholder-label">${safeLabel}</span>` +
`</div>` +
`</figure>`
);
}
function _protectImages(text: string, slots: string[]): string {
return text.replace(_IMG_RE, (m, alt, href) => {
try {
slots.push(_imagePlaceholder(String(alt ?? ''), String(href ?? '')));
return _MATH_SLOT(slots.length - 1);
} catch {
return m;
}
});
}
export function renderDocMarkdown(text: string | null | undefined): string {
if (!text) return '';
try {
const slots: string[] = [];
const protectedText = _protectMath(text, slots);
// 이미지 먼저 placeholder 로 pre-render(렌더러 우회) → 그 다음 수식. 슬롯 공유.
const protectedText = _protectMath(_protectImages(text, slots), slots);
let html = docMarked.parse(protectedText) as string;
if (slots.length) {
// 블록 수식이 단독 문단이면 marked 가 <p> 로 감싸므로 그 <p> 를 벗겨 블록 수식이 문단에
-214
View File
@@ -7,12 +7,6 @@ import {
pathSegments,
collapseWindows,
groupOrFlat,
buildPartOutline,
partitionOutlineItems,
partGroupViews,
groupKeyByChunkId,
FRONT_MATTER_KEY,
FRONT_MATTER_LABEL,
sectionTypeLabel,
type DocumentSection,
} from './headingPath.ts';
@@ -196,211 +190,3 @@ test('groupOrFlat: 빈 입력 → flat, 항목 0', () => {
assert.equal(layout.mode, 'flat');
assert.equal(layout.items.length, 0);
});
// ── D9: cleanHeading ASME 개정바 ðNÞ strip ──
test('cleanHeading: ASME 개정바 ðNÞ 통째 제거 (가운데 25 안 남김)', () => {
assert.equal(
cleanHeading('<sup>ð</sup>**25**<sup>Þ</sup> **PG-5.4 Size Limits**'),
'PG-5.4 Size Limits',
);
// 개정바 없는 일반 제목은 그대로 (회귀)
assert.equal(cleanHeading('#### **PG-2 SERVICE LIMITATIONS**'.replace(/^#+\s*/, '')), 'PG-2 SERVICE LIMITATIONS');
});
// ── D7: buildPartOutline — front-matter 분리 + PART 그룹 ──
test('buildPartOutline: front-matter 분리 + PART 그룹', () => {
const sections = [
sec({ heading_path: 'TABLE OF CONTENTS', section_title: 'TABLE OF CONTENTS' }),
sec({ heading_path: 'Honors and Awards Committee', section_title: 'Honors and Awards Committee' }),
sec({ heading_path: 'PART PG GENERAL > PG-1 SCOPE', section_title: 'PG-1 SCOPE' }),
sec({ heading_path: 'PART PG GENERAL > PG-2 SERVICE', section_title: 'PG-2 SERVICE' }),
sec({ heading_path: 'PART PW > PW-1 SCOPE', section_title: 'PW-1 SCOPE' }),
];
const o = buildPartOutline(sections);
assert.equal(o.hasParts, true);
assert.equal(o.frontMatter.length, 2); // TOC + Committee
assert.equal(o.groups.length, 2); // PART PG, PART PW
assert.equal(o.groups[0].key, 'PART PG GENERAL');
assert.equal(o.groups[0].items.length, 2); // PG-1, PG-2
assert.equal(o.groups[1].key, 'PART PW');
assert.equal(o.groups[1].items.length, 1);
});
test('buildPartOutline: split-parent + window 가 같은 PART 그룹에서 1항목으로 흡수', () => {
const sections = [
sec({ heading_path: 'PART PG GENERAL > PG-27 CYL', section_title: 'PG-27 CYL', node_type: 'section_split', chunk_id: 100, text: 'PG-27 CYL' }),
sec({ heading_path: 'PART PG GENERAL > PG-27 CYL', section_title: 'PG-27 CYL', node_type: 'window', parent_id: 100, text: 'body part 1' }),
sec({ heading_path: 'PART PG GENERAL > PG-27 CYL', section_title: 'PG-27 CYL', node_type: 'window', parent_id: 100, text: 'body part 2' }),
];
const o = buildPartOutline(sections);
assert.equal(o.hasParts, true);
assert.equal(o.groups.length, 1);
assert.equal(o.groups[0].items.length, 1); // split-parent + 2 window → 1 항목
assert.equal(o.groups[0].items[0].fragmentCount, 2);
});
test('buildPartOutline: content part 없으면 hasParts=false (폴백 신호)', () => {
const o = buildPartOutline([sec({ heading_path: 'Intro', section_title: 'Intro' })]);
assert.equal(o.hasParts, false);
assert.equal(o.groups.length, 0);
});
test('buildPartOutline: PART/SUBSECTION 마커 없으면(항목코드만) hasParts=false → 폴백', () => {
// 실 ASME 코드(5180/5210)는 PART/SUBSECTION 마커를 갖는다. PART 가 0 인 문서(항목코드만)는
// 접을 PART 가 없으므로 hasParts=false → 호출자가 groupOrFlat/flat 으로 폴백.
const o = buildPartOutline([
sec({ heading_path: 'FOREWORD', section_title: 'FOREWORD' }),
sec({ heading_path: null, section_title: 'U-1 적용범위' }),
]);
assert.equal(o.hasParts, false);
assert.equal(o.groups.length, 0);
});
test('buildPartOutline: (NON)MANDATORY APPENDIX 도 최상위 섹션 경계 — 마지막 PART 흡수 방지', () => {
// 5180 실측: 부록을 마커로 안 잡으면 마지막 PART(PHRSG)가 부록 289항목을 carry-forward 흡수(=300).
const o = buildPartOutline([
sec({ heading_path: 'PART PHRSG REQUIREMENTS > PHRSG-1', section_title: 'PHRSG-1' }),
sec({ heading_path: 'PHRSG-2 SCOPE', section_title: 'PHRSG-2' }), // PHRSG 로 carry
sec({ heading_path: 'MANDATORY APPENDIX IV LOCAL THIN AREAS', section_title: '...' }),
sec({ heading_path: 'IV-1 GENERAL', section_title: 'IV-1' }), // APPENDIX IV 로 carry
sec({ heading_path: 'NONMANDATORY APPENDIX A EXPLANATION', section_title: '...' }),
]);
assert.deepEqual(o.groups.map((g) => [g.key.slice(0, 24), g.items.length]), [
['PART PHRSG REQUIREMENTS', 2], // PHRSG-1 + PHRSG-2(carry), 부록 안 섞임
['MANDATORY APPENDIX IV LO', 2], // 부록 헤딩 + IV-1(carry)
['NONMANDATORY APPENDIX A ', 1],
]);
});
test('buildPartOutline: 본문 cross-ref/문장 false PART 차단 (5210 stale 패턴)', () => {
// 혼합대소문자 'Part D…' · 코드 뒤 비대문자(한글) 문장 'PART UW 규정은…' · 비대문자 코드 'PART 층이…'
// = 전부 본문이라 PART 아님. 깨끗한 PART 0 → hasParts=false → flat 폴백(가짜 그룹 0).
const o = buildPartOutline([
sec({ heading_path: 'Part D, Subpart 3의 해당 재료', section_title: 'Part D…' }),
sec({ heading_path: 'PART UW 규정은 용접에 의해 제작되는', section_title: 'PART UW 규정은…' }),
sec({ heading_path: 'PART 층이 진 구조로 조립되는', section_title: 'PART 층이…' }),
]);
assert.equal(o.hasParts, false);
});
test('buildPartOutline: SUBSECTION 마커도 PART 경계로 인식(Sec VIII)', () => {
const o = buildPartOutline([
sec({ heading_path: 'TOC', section_title: 'TOC' }),
sec({ heading_path: 'SUBSECTION A GENERAL > UG-1', section_title: 'UG-1' }),
sec({ heading_path: 'SUBSECTION B > UW-1', section_title: 'UW-1' }),
]);
assert.equal(o.hasParts, true);
assert.equal(o.frontMatter.length, 1);
assert.deepEqual(o.groups.map((g) => g.key), ['SUBSECTION A GENERAL', 'SUBSECTION B']);
});
// ── D8: partitionOutlineItems — 이미 collapse 된 OutlineItem 재배치(인스턴스 보존) ──
test('partitionOutlineItems: flat outline 의 인스턴스를 그대로 재배치(재-collapse 없음)', () => {
const sections = [
sec({ heading_path: 'TABLE OF CONTENTS', section_title: 'TABLE OF CONTENTS' }),
sec({ heading_path: 'PART PG GENERAL > PG-1 SCOPE', section_title: 'PG-1 SCOPE' }),
sec({ heading_path: 'PART PG GENERAL > PG-2 SERVICE', section_title: 'PG-2 SERVICE' }),
sec({ heading_path: 'PART PW > PW-1 SCOPE', section_title: 'PW-1 SCOPE' }),
];
const flat = collapseWindows(sections); // 컴포넌트의 outline 과 동일 경로
const o = partitionOutlineItems(flat);
assert.equal(o.hasParts, true);
assert.equal(o.frontMatter.length, 1);
assert.equal(o.groups.length, 2);
// ★ 인스턴스 동일성: 재배치된 item 이 flat outline 의 바로 그 객체여야 selectedSectionId 정합.
assert.ok(o.frontMatter[0] === flat[0], 'front-matter item = flat[0] 인스턴스');
assert.ok(o.groups[0].items[0] === flat[1], 'PART PG 첫 item = flat[1] 인스턴스');
assert.ok(o.groups[1].items[0] === flat[3], 'PART PW item = flat[3] 인스턴스');
// chunk_id 집합이 flat 과 정확히 일치(클릭→selectedSectionId 조회 실패 없음).
const flatIds = flat.map((it) => it.section.chunk_id).sort();
const partIds = [...o.frontMatter, ...o.groups.flatMap((g) => g.items)]
.map((it) => it.section.chunk_id).sort();
assert.deepEqual(partIds, flatIds);
});
test('partitionOutlineItems: 비-PART top-segment 항목은 직전 PART 로 carry-forward (marker 트리 불규칙 흡수)', () => {
// ★ 5180 실측 패턴: PART 아래 직접 중첩 안 된 항목('PG-28'·'GENERAL')의 top-segment 가 PART 가
// 아니다 → 단순 segs[0] 그룹핑이면 가짜 그룹 폭발. carry-forward 가 직전 PART 로 흡수해야 한다.
const items = collapseWindows([
sec({ heading_path: 'TOC', section_title: 'TOC' }),
sec({ heading_path: 'PART PG GENERAL > PG-1', section_title: 'PG-1' }),
sec({ heading_path: 'PG-28 EXTERNAL PRESSURE', section_title: 'PG-28' }), // top-seg ≠ PART → carry
sec({ heading_path: 'OPENINGS AND COMPENSATION', section_title: 'OPENINGS' }), // carry
sec({ heading_path: 'PART PW > PW-1', section_title: 'PW-1' }),
sec({ heading_path: 'GENERAL', section_title: 'GENERAL' }), // PART PW 로 carry
]);
const o = partitionOutlineItems(items);
assert.equal(o.hasParts, true);
assert.equal(o.frontMatter.length, 1);
assert.equal(o.groups.length, 2, 'PART PG / PART PW 단 2그룹(가짜 그룹 0)');
assert.equal(o.groups[0].key, 'PART PG GENERAL');
assert.equal(o.groups[0].items.length, 3, 'PG-1 + PG-28 + OPENINGS carry');
assert.equal(o.groups[1].key, 'PART PW');
assert.equal(o.groups[1].items.length, 2, 'PW-1 + GENERAL carry');
// carry 된 항목도 인스턴스 보존(클릭 정합)
assert.ok(o.groups[0].items[1].section.section_title === 'PG-28');
});
test('partitionOutlineItems: buildPartOutline 과 그룹 구조 동치(collapse→partition == partition∘collapse)', () => {
const sections = [
sec({ heading_path: 'PART PG > PG-27 CYL', section_title: 'PG-27 CYL', node_type: 'section_split', chunk_id: 100, text: 'PG-27 CYL' }),
sec({ heading_path: 'PART PG > PG-27 CYL', section_title: 'PG-27 CYL', node_type: 'window', parent_id: 100, text: 'b1' }),
sec({ heading_path: 'PART PG > PG-27 CYL', section_title: 'PG-27 CYL', node_type: 'window', parent_id: 100, text: 'b2' }),
sec({ heading_path: 'PART PW > PW-1', section_title: 'PW-1' }),
];
const viaBuild = buildPartOutline(sections);
const viaPartition = partitionOutlineItems(collapseWindows(sections));
assert.equal(viaBuild.hasParts, viaPartition.hasParts);
assert.deepEqual(viaBuild.groups.map((g) => [g.key, g.items.length]), viaPartition.groups.map((g) => [g.key, g.items.length]));
// window 흡수 후 PART PG 는 1 항목(fragmentCount 2).
assert.equal(viaPartition.groups[0].items.length, 1);
assert.equal(viaPartition.groups[0].items[0].fragmentCount, 2);
});
// ── D8: partGroupViews / groupKeyByChunkId — 렌더 그룹 평탄화 + auto-expand 역인덱스 ──
test('partGroupViews: front-matter 를 첫 그룹(sentinel key)으로, 이어 PART 그룹', () => {
const sections = [
sec({ heading_path: 'TOC', section_title: 'TOC' }),
sec({ heading_path: 'PART PG > PG-1', section_title: 'PG-1' }),
sec({ heading_path: 'PART PW > PW-1', section_title: 'PW-1' }),
];
const views = partGroupViews(buildPartOutline(sections));
assert.equal(views.length, 3);
assert.equal(views[0].key, FRONT_MATTER_KEY);
assert.equal(views[0].label, FRONT_MATTER_LABEL);
assert.equal(views[0].isFrontMatter, true);
assert.equal(views[1].key, 'PART PG');
assert.equal(views[1].label, 'PART PG');
assert.equal(views[1].isFrontMatter, false);
assert.equal(views[2].key, 'PART PW');
// 모든 key 유일(Svelte each key 안전)
const keys = views.map((v) => v.key);
assert.equal(new Set(keys).size, keys.length);
});
test('partGroupViews: front-matter 없으면 PART 그룹만(첫 그룹 sentinel 없음)', () => {
const sections = [
sec({ heading_path: 'PART PG > PG-1', section_title: 'PG-1' }),
sec({ heading_path: 'PART PW > PW-1', section_title: 'PW-1' }),
];
const views = partGroupViews(buildPartOutline(sections));
assert.equal(views.length, 2);
assert.ok(views.every((v) => !v.isFrontMatter));
assert.equal(views[0].key, 'PART PG');
});
test('groupKeyByChunkId: 대표 chunk_id → 소속 group key (auto-expand 역인덱스)', () => {
const sections = [
sec({ chunk_id: 1, heading_path: 'TOC', section_title: 'TOC' }),
sec({ chunk_id: 2, heading_path: 'PART PG > PG-1', section_title: 'PG-1' }),
sec({ chunk_id: 3, heading_path: 'PART PG > PG-2', section_title: 'PG-2' }),
sec({ chunk_id: 4, heading_path: 'PART PW > PW-1', section_title: 'PW-1' }),
];
const views = partGroupViews(buildPartOutline(sections));
const idx = groupKeyByChunkId(views);
assert.equal(idx.get(1), FRONT_MATTER_KEY);
assert.equal(idx.get(2), 'PART PG');
assert.equal(idx.get(3), 'PART PG');
assert.equal(idx.get(4), 'PART PW');
assert.equal(idx.get(999), undefined);
});
-129
View File
@@ -84,9 +84,6 @@ export function sectionTypeLabel(t: string | null | undefined): string | null {
export function cleanHeading(raw: string | null | undefined): string {
if (!raw) return '';
return raw
// D9(read-time): ASME 개정바 ðNÞ(`<sup>ð</sup>**25**<sup>Þ</sup>`) 통째 제거 — 개별 sup strip 전에.
// (일반 sup strip 이 먼저면 가운데 '25'(개정 연도)만 남아 'ð25Þ PG-5.4' → '25 PG-5.4' 오염)
.replace(/<sup>\s*ð\s*<\/sup>.*?<sup>\s*Þ\s*<\/sup>/gi, '')
.replace(/<sup>.*?<\/sup>/gi, '') // 각주 위첨자
.replace(/<sub>.*?<\/sub>/gi, '')
.replace(/<[^>]+>/g, '') // 잔여 HTML 태그
@@ -234,129 +231,3 @@ export function groupOrFlat(sections: DocumentSection[]): OutlineLayout {
}));
return { mode: 'group', items: [], groups };
}
// ── D7/D8 (asme-item-decomp read-time): front-matter 억제 + Part 계층 그룹 ──
// 긴 구조화 코드(ASME)의 절뷰가 flat 1030 으로 길어지는 문제(front-matter 240 + 다중 PART)를
// 표현 계층에서 해결. 빌더/재분해 무접촉 — sections 엔드포인트가 주는 heading_path 만으로 산출.
/**
* top-segment 패턴: 대문자 'PART'/'SUBSECTION'/'(MANDATORY|NONMANDATORY) APPENDIX'
* + (PG/UW/IV/A) + (// ).
* : 'PART PG GENERAL REQUIREMENTS…', 'SUBSECTION A GENERAL', 'NONMANDATORY APPENDIX A EXPLANATION…'.
* (APPENDIX) ASME ( ) PART
* carry-forward (5180 실측: PART PHRSG 11 289 = 300).
*
* case-sensitive + - = cross-ref/ false match (5210 ):
* 'Part D, Subpart 3의 …'() · 'PART UW 규정은 용접에 …'( ) · 'PART 층이 진 …'
* ( ) . D1 _ENG read-time ([[feedback_docstring_invariant_swap_audit]]).
* (D3 ): - () PART
* (: 'PART PG 일반 요건'). false-negative(flat ) false-positive( )
* (5180 ) 5210(D3 stale) flat . **5210 D3 PART
* (//) ** read-time 0. [[project_hierarchical_decomposition]] D3.
*/
const PART_MARKER_RE = /^((MANDATORY |NONMANDATORY )?APPENDIX|PART|SUBSECTION)\s+[A-Z][A-Z0-9.\-]*(\s+[A-Z0-9(].*)?$/;
/** top-segment 문자열이 PART/SUBSECTION/APPENDIX 헤딩인가 (마커 판정 단일 소스 — 경계·carry 공용). */
function isPartMarkerSeg(seg0: string): boolean {
return PART_MARKER_RE.test(seg0);
}
/** 절의 heading_path 첫 세그먼트가 PART/SUBSECTION/APPENDIX 헤딩 = 새 최상위 섹션 경계. */
function isPartMarker(s: DocumentSection): boolean {
const segs = pathSegments(s.heading_path);
return segs.length > 0 && isPartMarkerSeg(segs[0]);
}
export interface PartOutline {
/** PART PG / PART PW … 전(前) front-matter(TOC·위원회·인명) — 단일 접이 그룹용. */
frontMatter: OutlineItem[];
/** 본문 Part 그룹들(heading_path 첫 세그먼트 = PART 기준). 기본 접힘은 렌더(D8)에서. */
groups: OutlineGroup[];
/** content part 경계를 못 찾으면 false → 기존 groupOrFlat 폴백 권장. */
hasParts: boolean;
}
/**
* collapseWindows OutlineItem[] front-matter( PART ) + PART
* ** carry-forward** . (chunk_index) .
*
* carry-forward 핵심: ASME md marker 'PG-28'·'GENERAL'
* heading_path PART ( / ). segs[0]
* 250+ (5180 ). PART/SUBSECTION , -
* PART = ~13 PART .
* OutlineItem (-collapse ) flat outline
* chunk_id· 1:1 ( treeNav selectedSectionId/focusView ).
* PART 0 hasParts=false groupOrFlat/flat .
*/
export function partitionOutlineItems(items: OutlineItem[]): PartOutline {
let boundary = -1;
for (let i = 0; i < items.length; i++) {
if (isPartMarker(items[i].section)) { boundary = i; break; }
}
if (boundary < 0) {
return { frontMatter: [], groups: [], hasParts: false };
}
const frontMatter = items.slice(0, boundary);
const order: string[] = [];
const map = new Map<string, OutlineItem[]>();
let current = ''; // 현재 PART 키 — boundary 가 PART 마커라 첫 본문 항목에서 즉시 설정됨.
for (let i = boundary; i < items.length; i++) {
const it = items[i];
const segs = pathSegments(it.section.heading_path);
if (segs.length && isPartMarkerSeg(segs[0])) current = segs[0]; // 새 PART 경계(경계 루프와 동일 판정 = '' 누출 불가)
if (!map.has(current)) { map.set(current, []); order.push(current); }
map.get(current)!.push(it);
}
const groups: OutlineGroup[] = order.map((key) => ({ key, isOther: false, items: map.get(key)! }));
return { frontMatter, groups, hasParts: true };
}
/**
* front-matter ( content part) + PART(heading_path ) .
* = collapseWindows partitionOutlineItems ( rail/treeNav , sections ).
*/
export function buildPartOutline(sections: DocumentSection[]): PartOutline {
return partitionOutlineItems(collapseWindows(sections));
}
// ── D8: Part 접이 렌더용 — front-matter 를 첫 그룹으로 평탄화 + auto-expand 역인덱스 ──
/** front-matter 접이 그룹의 안정 key/라벨(실 PART 키와 충돌 불가능한 sentinel). */
export const FRONT_MATTER_KEY = '__front_matter__';
export const FRONT_MATTER_LABEL = '문서 정보·서문';
/** 접이 그룹 1개(front-matter 또는 PART) 의 렌더 뷰. */
export interface PartGroupView {
/** Svelte each key + 접이 상태 key. front-matter = FRONT_MATTER_KEY. */
key: string;
/** 헤더 표시 라벨. */
label: string;
isFrontMatter: boolean;
items: OutlineItem[];
}
/**
* PartOutline . front-matter() ,
* PART . /auto-expand key .
*/
export function partGroupViews(outline: PartOutline): PartGroupView[] {
const views: PartGroupView[] = [];
if (outline.frontMatter.length) {
views.push({ key: FRONT_MATTER_KEY, label: FRONT_MATTER_LABEL, isFrontMatter: true, items: outline.frontMatter });
}
for (const g of outline.groups) {
views.push({ key: g.key, label: g.key, isFrontMatter: false, items: g.items });
}
return views;
}
/**
* OutlineItem chunk_id group key (/
* auto-expand ). activeKey/selectedSectionId chunk_id .
*/
export function groupKeyByChunkId(views: PartGroupView[]): Map<number, string> {
const m = new Map<number, string>();
for (const v of views) for (const it of v.items) m.set(it.section.chunk_id, v.key);
return m;
}
+11 -75
View File
@@ -24,8 +24,7 @@
import AIClassificationEditor from '$lib/components/editors/AIClassificationEditor.svelte';
import LibraryPathEditor from '$lib/components/editors/LibraryPathEditor.svelte';
import DocumentDangerZone from '$lib/components/editors/DocumentDangerZone.svelte';
import { untrack } from 'svelte';
import { cleanHeading, pathSegments, sectionTypeLabel, collapseWindows, partitionOutlineItems, partGroupViews, groupKeyByChunkId } from '$lib/utils/headingPath';
import { cleanHeading, pathSegments, sectionTypeLabel, collapseWindows } from '$lib/utils/headingPath';
import { domainLabel } from '$lib/utils/domainSlug';
marked.use({ mangle: false, headerIds: false });
@@ -70,21 +69,10 @@
// 강등한다(예: 5180 = 27개 논리 절 → 562 window). raw sections 를 그대로 그리면 동일 제목 수백 행으로
// 파편화되므로, collapseWindows 로 논리 절 1개(대표=split-parent, bodyText=window 본문 합본)로 합친다.
let outline = $derived(collapseWindows(sections));
// Part 접이 트리(ASME 등 hasParts): 같은 outline 인스턴스를 front-matter/PART 로 재배치(재-collapse 없음
// → selectedSectionId/focusView 정합). flat 1030 → front-matter 단일그룹 + ~14 PART 접이. (D8)
let treePart = $derived(partitionOutlineItems(outline));
let treeGroups = $derived(treePart.hasParts ? partGroupViews(treePart) : null);
let treeGroupIndex = $derived(treeGroups ? groupKeyByChunkId(treeGroups) : null);
let treeExpanded = $state({}); // key 없으면 접힘(기본 전부 접힘). Svelte5 deep-proxy 반응형.
function toggleTreeGroup(key) { treeExpanded[key] = !treeExpanded[key]; }
// sections 로딩 완료 플래그 — 미완 동안 fallback 풀-문서 뷰어를 띄우면, 곧 절뷰로 교체되며
// 풀-문서 이미지가 '살짝 보였다 사라지는' 플래시가 난다(절 보유 문서). 로딩 중엔 skeleton.
let sectionsLoaded = $state(false);
async function loadSections() {
const reqId = docId;
try { const r = await api(`/documents/${reqId}/sections`); if (reqId === docId) sections = r?.sections ?? []; }
catch { if (reqId === docId) sections = []; }
finally { if (reqId === docId) sectionsLoaded = true; }
}
onMount(async () => {
@@ -128,34 +116,8 @@
let mTree = $state(false);
let mIns = $state(false);
let manageOpen = $state(false);
// 기본 선택 = 첫 본문 Part 의 첫 절(front-matter TOC 가 아니라 실제 내용으로 진입, front-matter 접힘 유지).
let defaultSelId = $derived.by(() => {
if (treeGroups) {
const body = treeGroups.find((g) => !g.isFrontMatter);
if (body && body.items.length) return body.items[0].section.chunk_id;
}
return outline[0]?.section.chunk_id ?? null;
});
$effect(() => { if (outline.length && !outline.some((it) => it.section.chunk_id === selectedSectionId)) selectedSectionId = defaultSelId; });
// 문서가 바뀌면(sections 교체) Part 접이·모바일 본문 펼침 상태 리셋 — 문서 간 PART 라벨/chunk_id 가
// 겹쳐 이전 상태가 이월되는 것 차단(기본 전부 접힘 보존). ※ 같은 컴포넌트 인스턴스로 client 네비 시
// sections 가 재로딩될 때만 발화 — 현재 [id] 페이지는 onMount 1회 로딩이라 SPA prev/next 미reload 는
// 선존 별도 이슈(D8 범위 밖, 사용자 보고 대상).
$effect(() => {
void sections;
untrack(() => { treeExpanded = {}; mBodyOpen = {}; });
});
// 선택 절의 조상 Part 를 펼침(prev/next·딥링크 진입 시 트리에서 자동 노출). untrack=쓰기 자기재발화 차단.
$effect(() => {
const sel = selectedSectionId;
const idx = treeGroupIndex;
if (sel == null || !idx) return;
const gk = idx.get(sel);
if (gk) untrack(() => { treeExpanded[gk] = true; });
});
// selectedSectionId 미설정(초기) 시 defaultSelId(첫 본문 Part)로 바로 해석 — outline[0](표지/front-matter)
// 를 잠깐 렌더했다 effect 가 defaultSelId 로 바꾸는 절뷰 내부 플래시 차단.
let selectedItem = $derived(outline.find((it) => it.section.chunk_id === (selectedSectionId ?? defaultSelId)) ?? outline[0] ?? null);
$effect(() => { if (outline.length && !outline.some((it) => it.section.chunk_id === selectedSectionId)) selectedSectionId = outline[0].section.chunk_id; });
let selectedItem = $derived(outline.find((it) => it.section.chunk_id === selectedSectionId) ?? outline[0] ?? null);
let selectedSection = $derived(selectedItem?.section ?? null);
let selIdx = $derived(outline.findIndex((it) => it.section.chunk_id === selectedItem?.section?.chunk_id));
// 절 본문 = 청크 원문(it.bodyText, window 조각 합본) 직접 렌더. 과거 char_start 로 md_content 를
@@ -206,14 +168,13 @@
<span style="display:inline-flex;align-items:center;gap:4px;font-size:10px;color:#697061;"><span style="width:8px;height:8px;border-radius:2px;background:#7a8b3f;"></span>절차</span>
<span style="display:inline-flex;align-items:center;gap:4px;font-size:10px;color:#697061;"><span style="width:8px;height:8px;border-radius:2px;background:#b5840a;"></span>요건</span>
</div>
{#snippet treeNode(it)}
{#each outline as it (it.section.chunk_id)}
{@const s = it.section}
{@const tm = typeMeta(it.sectionType)}
{@const active = !jumpMode && s.chunk_id === selectedSection?.chunk_id}
{@const child = secDepth(s) > 0}
{@const low = isMidLow(it.confidence)}
<svelte:element this={jumpMode ? 'a' : 'div'} href={jumpMode ? `#m-sec-${s.chunk_id}` : undefined}
role={jumpMode ? undefined : 'button'} tabindex={jumpMode ? undefined : 0}
<svelte:element this={jumpMode ? 'a' : 'div'} href={jumpMode ? `#m-sec-${s.chunk_id}` : undefined} role="button" tabindex="0"
onclick={() => !jumpMode && (selectedSectionId = s.chunk_id)}
onkeydown={(e) => { if (!jumpMode && (e.key === 'Enter' || e.key === ' ')) { e.preventDefault(); selectedSectionId = s.chunk_id; } }}
class="d3node {child ? 'd3child' : ''} {active ? 'd3active' : ''}"
@@ -228,25 +189,7 @@
{/if}
</div>
</svelte:element>
{/snippet}
{#if treeGroups}
<!-- Part 접이(ASME 등): front-matter 단일그룹 + PART 접이, 기본 접힘. 선택/딥링크 시 조상 Part auto-expand. -->
{#each treeGroups as g (g.key)}
{@const isOpen = !!treeExpanded[g.key]}
<button type="button" class="d3grp" aria-expanded={isOpen} onclick={() => toggleTreeGroup(g.key)}
style="display:flex;align-items:center;gap:7px;width:100%;text-align:left;background:none;border:none;cursor:pointer;border-radius:8px;padding:6px 8px;margin:4px 0 1px;">
<span style="transition:transform .16s;transform:rotate({isOpen ? 90 : 0}deg);color:#9aa090;font-weight:700;font-size:12px;flex-shrink:0;"></span>
<span style="flex:1;min-width:0;font-size:11px;font-weight:700;color:{g.isFrontMatter ? '#9aa090' : '#697061'};letter-spacing:.3px;text-transform:uppercase;overflow:hidden;text-overflow:ellipsis;white-space:nowrap;">{g.label}</span>
<span style="font-size:10px;color:#9aa090;font-variant-numeric:tabular-nums;flex-shrink:0;">{g.items.length}</span>
</button>
{#if isOpen}
{#each g.items as it (it.section.chunk_id)}{@render treeNode(it)}{/each}
{/if}
{/each}
{:else}
{#each outline as it (it.section.chunk_id)}{@render treeNode(it)}{/each}
{/if}
{/each}
{#if quality}
<div style="margin-top:12px;padding-top:10px;border-top:1px solid #dde3d6;">
<div style="font-size:10.5px;font-weight:700;color:#697061;margin-bottom:7px;letter-spacing:.3px;">추출 품질</div>
@@ -296,8 +239,8 @@
{/if}
</div>
{/if}
{#if selectedItem?.bodyText}
<MarkdownDoc documentId={doc.id} mdContent={selectedItem.bodyText} mdStatus={null} class="prose prose-base max-w-none text-text" />
{#if selectedBodyHtml}
<div class="prose prose-base max-w-none text-text">{@html selectedBodyHtml}</div>
{:else}
<p style="color:#9aa090;font-size:14px;font-style:italic;">이 절의 본문은 추출되지 않았습니다. 헤더의 '원본'에서 확인하세요.</p>
{/if}
@@ -396,7 +339,7 @@
{#if it.bodyText}
<details class="m-secbody" ontoggle={(e) => { if (e.currentTarget.open) mBodyOpen[s.chunk_id] = true; }}>
<summary style="cursor:pointer;list-style:none;font-size:12px;color:#697061;padding:5px 0;user-select:none;display:flex;align-items:center;gap:5px;">본문 보기 <span class="m-chev" style="transition:transform .16s;color:#9aa090;"></span></summary>
{#if mBodyOpen[s.chunk_id]}<div style="margin-top:6px;"><MarkdownDoc documentId={doc.id} mdContent={it.bodyText} mdStatus={null} class="prose prose-sm max-w-none text-text" /></div>{/if}
{#if mBodyOpen[s.chunk_id]}<div class="prose prose-sm max-w-none text-text" style="margin-top:6px;">{@html bodyHtml(it)}</div>{/if}
</details>
{/if}
</div>
@@ -441,13 +384,10 @@
</div>
</div>
{#if !sectionsLoaded}
<!-- sections 로딩 중: fallback 풀-문서(이미지)→절뷰 교체 플래시 방지용 skeleton -->
<Skeleton h="h-96" rounded="card" />
{:else if useSectionView}
{#if useSectionView}
<!-- 데스크탑(xl+): 3영역 -->
<div class="hidden xl:grid" style="grid-template-columns:252px minmax(0,1fr) 336px;gap:13px;align-items:start;">
<div style="background:#f4f7f1;border:1px solid #dde3d6;border-radius:14px;padding:13px 11px;position:sticky;top:14px;max-height:calc(100vh - 2rem);overflow-y:auto;">{@render treeNav(false)}</div>
<div style="background:#f4f7f1;border:1px solid #dde3d6;border-radius:14px;padding:13px 11px;position:sticky;top:14px;max-height:calc(100vh-2rem);overflow-y:auto;">{@render treeNav(false)}</div>
<div style="min-width:0;"><div style="background:#f4f7f1;border:1px solid #dde3d6;border-radius:14px;padding:20px 22px;">{@render focusView()}</div></div>
<div style="position:sticky;top:14px;">{@render rail()}</div>
</div>
@@ -460,9 +400,6 @@
</div>
{#if mTree}<div style="background:#f4f7f1;border:1px solid #dde3d6;border-radius:12px;padding:6px;margin-bottom:10px;">{@render treeNav(true)}</div>{/if}
{#if mIns}<div style="background:#f4f7f1;border:1px solid #dde3d6;border-radius:12px;padding:13px 14px;margin-bottom:10px;">{@render rail()}</div>{/if}
<!-- D8 스코프 한계(의도적): 모바일 본문은 전체 outline(~1030)을 연속 카드로 eager 마운트한다.
Part 접이는 위 treeNav(앵커 점프 네비)에만 적용 — 본문 롱스크롤은 줄이지 않는다. 데스크탑은
focusView 가 단일 절만 렌더하므로 무관. 모바일 본문 분할/가상화는 별 follow-up. -->
<div style="display:flex;flex-direction:column;gap:10px;">{#each outline as it (it.section.chunk_id)}{@render sectionCard(it)}{/each}</div>
</div>
{:else}
@@ -537,7 +474,6 @@
<style>
.d3node:hover { background: #ecf0e8; }
.d3active:hover { background: #e3ebdf; }
.d3grp:hover { background: #ecf0e8; }
.d3child { position: relative; }
.d3child::before { content: ""; position: absolute; left: 2px; top: -3px; bottom: 50%; width: 1px; background: #cdd6c4; }
.d3child::after { content: ""; position: absolute; left: 2px; top: 50%; width: 7px; height: 1px; background: #cdd6c4; }
@@ -1,6 +0,0 @@
-- 359: delete_file=true 명시 삭제 요청 마커 (R7 delete_file 큐드삭제).
-- retention sweep(document_purge_sweep) 이 이 컬럼 + grace(30일) 기준으로 NAS 원본을
-- 물리삭제한다. deleted_at(단순 숨김)과 분리 — 숨김(delete_file=false)은 파일 보존(undelete
-- 가능). sweep 가 deleted_at 기준이면 모든 숨김이 30일 후 물리삭제되는 데이터 손실이 되므로
-- 명시 purge 요청만 대상으로 한다.
ALTER TABLE documents ADD COLUMN IF NOT EXISTS purge_requested_at TIMESTAMPTZ;
@@ -1,11 +0,0 @@
-- 360: Phase 2A 임베딩 후보 cand 섀도 테이블 제거 (R13).
-- Phase 2A no-go 종결(2026-06-12, 후보 전부 -0.03~-0.04) + phase2a_cand_backfill 워커 dormant.
-- retrieval_service.CANDIDATE_BACKEND_MAP / api.search allowed 슬러그 선제거 후 DROP.
-- ★single statement(콤마 구분) — init_db 의 exec_driver_sql(asyncpg)은 multi-statement 불허.
-- IF EXISTS — me5/snowflake 는 ad-hoc 생성분이라 환경별 존재 여부 다를 수 있음(멱등).
DROP TABLE IF EXISTS
document_chunks_cand_me5_large_inst, documents_cand_me5_large_inst,
document_chunks_cand_snowflake_l_v2, documents_cand_snowflake_l_v2,
document_chunks_cand_qwen06, documents_cand_qwen06,
document_chunks_cand_qwen4, documents_cand_qwen4,
document_chunks_cand_qwen4m, documents_cand_qwen4m;
@@ -1,9 +0,0 @@
-- 361: quiz 세션 내 같은 문제 이중 attempt 방지 partial UNIQUE (R9).
-- submit_attempt 의 FOR UPDATE 행잠금이 1차 방어, 이 제약은 DB 레벨 belt-and-suspenders.
-- prod 실측 중복 0 (GROUP BY (quiz_session_id, study_question_id) HAVING count>1 = 0) + fresh DB
-- 빈 테이블이라 dedup DELETE 불요 → ★single statement(init_db exec_driver_sql 은 multi-statement
-- 불허). 혹시 중복이 생긴 환경이면 이 마이그가 실패하므로(IntegrityError) 수동 dedup 후 재적용.
-- quiz_session_id IS NULL(세션 외 직접 입력)은 비대상 → partial index.
CREATE UNIQUE INDEX IF NOT EXISTS uq_attempt_session_question
ON study_question_attempts (quiz_session_id, study_question_id)
WHERE quiz_session_id IS NOT NULL;
File diff suppressed because it is too large Load Diff
-122
View File
@@ -1,122 +0,0 @@
"""전체 app 부팅 런타임 스모크 (GPU 격리) — deploy-blocker 게이트.
init_db 자체는 initdb_runtime_test.py(R1)·migration_smoke.sh 검증한다.
스모크는 위에서 **실제 컨테이너 부팅 경로**(main:app + lifespan startup) 실행해
py_compile 잡는 deploy-blocker 클래스를 잡는다:
`import main` = router import + FastAPI app 빌드 (router 심볼누락·순환 검출)
lifespan startup = lifespan 안의 worker import(35) + init_db + add_job 실행
(worker import-time 오류· 등록 오류 검출, **drift 0** = 실제 경로)
/health (health_check 직접 호출) = DB connected
prod/AI/NAS 무접촉을 위해 부작용 3개만 외과적으로 중립화한다 (검증 대상 로직은 그대로):
- NAS 마운트 체크 임시 디렉토리(+PKM/) 통과 ( NAS 의존 제거)
- scheduler.start() no-op (잡은 등록되지만 실행 = 워커 폴링·외부 API 호출 0)
- scheduler.shutdown() no-op (start 했으니 __aexit__ shutdown raise 하도록)
- prewarm_analyzer() no-op (AI 라우터 :8890 미호출 = 검색실험 soft-lock 안전)
실행 (worktree 루트를 마운트한 prod fastapi 이미지 컨테이너 ):
docker run --rm --network <net> -v <worktree>:/work -w /work \
-e PYTHONPATH=/work/app -e BOOT_SMOKE=1 \
-e DATABASE_URL="postgresql+asyncpg://postgres@ds-bootsmoke-pg:5432/pkm" \
<fastapi_image> python scripts/ci/boot_smoke.py
기대: IMPORTS OK LIFESPAN startup OK (jobs=N, purge_sweep 포함) schema OK HEALTH ok PASS
"""
import asyncio
import os
import tempfile
from pathlib import Path
from sqlalchemy import text
async def main() -> None:
# ── 0) 안전 가드: prod DB 오접속 차단 ─────────────────────────────────
from core.config import settings
url = settings.database_url
print("DATABASE_URL:", url)
assert os.getenv("BOOT_SMOKE") == "1", "SAFETY ABORT: BOOT_SMOKE=1 미설정"
# prod = '...@postgres:5432/pkm' (user pkm). ephemeral = bootsmoke 호스트 / localhost / postgres user.
assert "@postgres:" not in url and "@postgres/" not in url, f"SAFETY ABORT: prod DB 로 보임: {url}"
assert ("bootsmoke" in url) or ("localhost" in url) or ("127.0.0.1" in url), \
f"SAFETY ABORT: ephemeral 마커(bootsmoke/localhost) 없음: {url}"
# ── 1) 부작용 3개 중립화 (검증 대상 로직 보존) ───────────────────────
# prewarm: AI 라우터 미호출
import services.search.query_analyzer as qa
async def _noop_prewarm(*a, **k):
return None
qa.prewarm_analyzer = _noop_prewarm
# scheduler.start/shutdown no-op + start 캡처로 잡 개수 집계
from apscheduler.schedulers.asyncio import AsyncIOScheduler
captured: dict = {}
_orig_init = AsyncIOScheduler.__init__
def _init(self, *a, **k):
_orig_init(self, *a, **k)
captured["sched"] = self
AsyncIOScheduler.__init__ = _init
AsyncIOScheduler.start = lambda self, *a, **k: None
AsyncIOScheduler.shutdown = lambda self, *a, **k: None
# NAS 체크 통과용 임시 마운트
tmp = tempfile.mkdtemp(prefix="bootsmoke-nas-")
(Path(tmp) / "PKM").mkdir(parents=True, exist_ok=True)
settings.nas_mount_path = tmp
print("nas_mount_path(override):", tmp)
# ── 2) import main = 전 router import + app 빌드 ──────────────────────
import main
route_count = len(main.app.routes)
print(f"IMPORTS OK — main 빌드, app.routes={route_count}")
assert route_count > 50, f"라우트 수 비정상({route_count}) — 라우터 누락 의심"
# ── 3) lifespan startup 실행 (init_db + 전 worker import + 전 add_job) ─
cm = main.lifespan(main.app)
await cm.__aenter__()
sched = captured.get("sched")
jobs = sched.get_jobs() if sched else []
job_ids = sorted(j.id for j in jobs)
print(f"LIFESPAN startup OK — 등록 잡 {len(jobs)}")
print(" job_ids:", ", ".join(job_ids))
assert len(jobs) >= 30, f"잡 등록 수 비정상({len(jobs)})"
for required in ("purge_sweep", "auto_review", "queue_consumer", "statute_collector"):
assert required in job_ids, f"필수 잡 누락: {required}"
# ── 4) 스키마 상태 (lifespan 의 실 init_db 가 359/360/361 적용했는지) ──
from core.database import async_session, engine
async with async_session() as s:
docs = (await s.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))).scalar()
purge = (await s.execute(text(
"SELECT count(*) FROM information_schema.columns "
"WHERE table_name='documents' AND column_name='purge_requested_at'"))).scalar()
cand = (await s.execute(text(
"SELECT count(*) FROM information_schema.tables "
"WHERE table_name LIKE 'documents_cand_qwen%'"))).scalar()
uq = (await s.execute(text(
"SELECT count(*) FROM pg_indexes WHERE indexname='uq_attempt_session_question'"))).scalar()
mx = (await s.execute(text("SELECT max(version) FROM schema_migrations"))).scalar()
print(f"SCHEMA OK — max_migration={mx} documents={docs} purge_col={purge} cand_qwen={cand} attempt_uq={uq}")
assert docs and purge == 1 and cand == 0 and uq == 1 and mx == 361, "FAIL: 기대 스키마 상태 불일치"
# ── 5) /health 직접 호출 ──────────────────────────────────────────────
health = await main.health_check()
print("HEALTH:", health)
assert health["status"] == "ok" and health["database"] == "connected", "FAIL: health degraded"
# ── 6) 정리 ───────────────────────────────────────────────────────────
await cm.__aexit__(None, None, None)
await engine.dispose()
print("RESULT: PASS — 전체 app 부팅(import·init_db·잡등록·health) 검증")
asyncio.run(main())
-51
View File
@@ -1,51 +0,0 @@
"""init_db() baseline 부팅 런타임 검증 (R1) — psql migration_smoke 가 못 잡는 asyncpg 경로 확인.
migration_smoke.sh(psql) SQL 유효성만 검증한다. init_db asyncpg exec_driver_sql(prepared)
경로라 multi-statement 불허 baseline raw asyncpg 적재 skip/stamp/멱등 이걸 실측한다.
실행 (worktree 루트):
python3.11 -m venv /tmp/v && /tmp/v/bin/pip install -q "sqlalchemy[asyncio]>=2" asyncpg pydantic pyyaml
docker run -d --name idb -p 55432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust pgvector/pgvector:pg16
docker exec idb psql -U postgres -c "CREATE DATABASE pkm"
ln -sfn ../migrations app/migrations # Docker 의 /app/migrations 레이아웃 모사 (테스트 후 rm)
PYTHONPATH=app DATABASE_URL="postgresql+asyncpg://postgres@localhost:55432/pkm" /tmp/v/bin/python scripts/ci/initdb_runtime_test.py
rm -f app/migrations; docker rm -f idb
기대: 1st OK(documents=True·purge_col=1·cand_qwen=0·attempt_unique=1), 2nd 멱등동일=True.
"""
import asyncio
from sqlalchemy import text
async def main():
from core.config import settings
url = settings.database_url
print("effective DATABASE_URL:", url)
assert "localhost" in url or "127.0.0.1" in url, f"SAFETY ABORT non-local: {url}"
from core.database import init_db, async_session, engine
print("=== 1st init_db (fresh DB) ===")
await init_db()
async with async_session() as s:
cnt = (await s.execute(text("SELECT count(*) FROM schema_migrations"))).scalar()
mx = (await s.execute(text("SELECT max(version) FROM schema_migrations"))).scalar()
bl = (await s.execute(text("SELECT count(*) FROM schema_migrations WHERE name LIKE 'baseline:%'"))).scalar()
docs = (await s.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))).scalar()
purge = (await s.execute(text("SELECT count(*) FROM information_schema.columns WHERE table_name='documents' AND column_name='purge_requested_at'"))).scalar()
cand = (await s.execute(text("SELECT count(*) FROM information_schema.tables WHERE table_name LIKE 'documents_cand_qwen%'"))).scalar()
uq = (await s.execute(text("SELECT count(*) FROM pg_indexes WHERE indexname='uq_attempt_session_question'"))).scalar()
print(f" schema_migrations count={cnt} max={mx} baseline_stamped={bl}")
print(f" documents={docs} purge_col={purge} cand_qwen_tables={cand} attempt_unique={uq}")
assert docs and purge == 1 and cand == 0 and uq == 1, "FAIL: 기대 스키마 상태 불일치"
print("=== 2nd init_db (rerun = baseline skip + 멱등) ===")
await init_db()
async with async_session() as s:
cnt2 = (await s.execute(text("SELECT count(*) FROM schema_migrations"))).scalar()
assert cnt == cnt2, "FAIL: 멱등 아님 (재실행이 schema_migrations 변경)"
print(f" count={cnt2} 멱등동일={cnt == cnt2}")
print("RESULT: PASS — init_db baseline 부팅/멱등 검증")
await engine.dispose()
asyncio.run(main())
-138
View File
@@ -1,138 +0,0 @@
#!/usr/bin/env bash
# migration_smoke.sh — fresh-DB + DR enum-same-txn 게이트 (plan ds-backend-audit-1 R0)
#
# app/core/database.py 의 init_db() 는 모든 pending migration 을 단일 트랜잭션
# (`async with engine.begin()`) 으로 적용한다. 이 스크립트는 그 경로를 미러해
# migrations/ 전체가 빈 DB / DR 업그레이드에서 한 트랜잭션으로 적용 가능한지 검증한다.
#
# 시나리오:
# FRESH — 빈 DB 에 migrations/ 전체를 단일 트랜잭션으로 적용 (신규 환경 부팅 경로)
# DR — 001~319 를 커밋(과거 운영 DB 모사) 후 320~end 를 단일 트랜잭션으로 적용
# (pre-320 백업/지연 복제를 320 경계 너머로 catch-up 업그레이드하는 재해복구 경로)
#
# enum-same-txn 결함(ALTER TYPE ADD VALUE 한 값을 같은 트랜잭션에서 사용)이 있으면
# 두 시나리오 모두 'unsafe use of new value' 로 abort 한다.
# R1(enum-barrier) fix 후에는 두 시나리오 모두 PASS 해야 한다.
#
# prod 동일 이미지(pg16)로 핀. 의존: docker.
# 사용: scripts/ci/migration_smoke.sh (ephemeral 컨테이너 자동 기동/정리)
set -uo pipefail
IMAGE="pgvector/pgvector:pg16"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
MIG_DIR="$(cd "$SCRIPT_DIR/../../migrations" && pwd)"
CNAME="ds-mig-smoke-$$"
DB="pkm" # 358 의 ALTER DATABASE pkm 가 이 이름을 요구
cleanup() { docker rm -f "$CNAME" >/dev/null 2>&1 || true; }
trap cleanup EXIT
# 버전순 마이그레이션 파일 목록 (NNN_ 3자리 zero-pad → lexical = numeric)
# bash 3.2(macOS) 호환 — mapfile 미사용
MIGS=()
while IFS= read -r _line; do MIGS+=("$_line"); done < <(ls "$MIG_DIR"/[0-9]*.sql | sort)
[ "${#MIGS[@]}" -gt 0 ] || { echo "FATAL: migrations 없음 ($MIG_DIR)"; exit 2; }
echo "migrations: ${#MIGS[@]}건 ($(basename "${MIGS[0]}") ~ $(basename "${MIGS[$((${#MIGS[@]}-1))]}"))"
psql_exec() { docker exec -i "$CNAME" psql -U postgres -v ON_ERROR_STOP=1 "$@"; }
# 주어진 파일 범위를 단일 트랜잭션 스트림으로 묶어 출력 (psql stdin 용)
# 각 파일 앞에 \echo 마커 — 실패 시 마지막 마커가 깨진 마이그레이션.
emit_single_txn() {
echo '\set ON_ERROR_STOP on'
echo 'BEGIN;'
for f in "$@"; do
echo "\\echo >>>APPLY $(basename "$f")"
cat "$f"; echo
done
echo 'COMMIT;'
}
# 자동커밋(파일별 즉시 커밋) 스트림 — DR phase1 (기존 운영 DB 모사)
emit_autocommit() {
echo '\set ON_ERROR_STOP on'
for f in "$@"; do
echo "\\echo >>>APPLY $(basename "$f")"
cat "$f"; echo
done
}
reset_db() {
psql_exec -d postgres -c "DROP DATABASE IF EXISTS $DB" >/dev/null 2>&1
psql_exec -d postgres -c "CREATE DATABASE $DB" >/dev/null
}
run_scenario() {
local name="$1"; shift
local out rc last_apply
out="$( "$@" 2>&1 )"; rc=$?
last_apply="$(printf '%s\n' "$out" | grep '>>>APPLY' | tail -1 | sed 's/>>>APPLY //')"
if [ "$rc" -eq 0 ]; then
echo " [$name] PASS — 전체 적용 성공"
return 0
else
echo " [$name] FAIL — 깨진 지점: ${last_apply:-?}"
printf '%s\n' "$out" | grep -iE 'ERROR|unsafe|HINT' | head -3 | sed 's/^/ /'
return 1
fi
}
BASELINE_CUTOFF=358
BASELINE_FILE="$MIG_DIR/_baseline/0358_schema_baseline.sql"
# post-baseline(버전 > cutoff) 마이그 파일만 출력
_post_baseline() {
local f base ver
for f in "${MIGS[@]}"; do
base="$(basename "$f")"; ver="${base%%_*}"; ver="$((10#$ver))"
[ "$ver" -gt "$BASELINE_CUTOFF" ] && printf '%s\n' "$f"
done
}
# FRESH — init_db fresh 경로 미러: baseline 적재 + post-baseline 을 단일 트랜잭션
scenario_fresh() {
reset_db
local post=(); while IFS= read -r f; do post+=("$f"); done < <(_post_baseline)
{
echo '\set ON_ERROR_STOP on'; echo 'BEGIN;'
echo "\\echo >>>APPLY _baseline"
cat "$BASELINE_FILE"; echo
for f in "${post[@]}"; do
echo "\\echo >>>APPLY $(basename "$f")"; cat "$f"; echo
done
echo 'COMMIT;'
} | psql_exec -d "$DB"
}
# INCREMENTAL — 기존 운영 DB(at cutoff) 모사: baseline 커밋 후 post-baseline 을 별 트랜잭션
scenario_dr() {
reset_db
if ! { echo '\set ON_ERROR_STOP on'; cat "$BASELINE_FILE"; } | psql_exec -d "$DB" >/dev/null 2>&1; then
printf '%s\n' ">>>APPLY _baseline"; echo "baseline 적재 실패"; return 1
fi
local post=(); while IFS= read -r f; do post+=("$f"); done < <(_post_baseline)
emit_single_txn "${post[@]}" 2>/dev/null | psql_exec -d "$DB"
}
# ── 컨테이너 기동 ──
echo "기동: $IMAGE ($CNAME)"
docker run -d --name "$CNAME" -e POSTGRES_PASSWORD=x -e POSTGRES_HOST_AUTH_METHOD=trust "$IMAGE" >/dev/null
for _ in $(seq 1 40); do docker exec "$CNAME" pg_isready -U postgres -q 2>/dev/null && break; sleep 0.5; done
echo "pg: $(docker exec "$CNAME" psql -U postgres -tAc 'show server_version' 2>/dev/null)"
echo
fail=0
echo "── FRESH (baseline 적재 + post-baseline 단일 트랜잭션 = init_db fresh 경로) ──"
run_scenario FRESH scenario_fresh || fail=1
echo
echo "── INCREMENTAL (baseline 커밋 후 post-baseline 별 트랜잭션 = 기존 DB 증분) ──"
run_scenario DR scenario_dr || fail=1
echo
if [ "$fail" -eq 0 ]; then
echo "RESULT: PASS — fresh/incremental 모두 baseline+post-baseline 적용 가능"
exit 0
else
echo "RESULT: FAIL — baseline/post-baseline 적용 불가 (위 지점)"
exit 1
fi
+22
View File
@@ -0,0 +1,22 @@
FROM python:3.12-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
libgl1 libglib2.0-0 curl \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir \
--extra-index-url https://download.pytorch.org/whl/cu126 \
-r requirements.txt
# 모델 미다운로드 (HF cache volume → 첫 호출/warmup 시 적재).
COPY server.py .
EXPOSE 3300
HEALTHCHECK --start-period=300s --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:3300/ready || exit 1
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3300"]
+9
View File
@@ -0,0 +1,9 @@
torch==2.11.0+cu126
torchvision==0.26.0+cu126
transformers==4.57.6
surya-ocr==0.17.1
marker-pdf==1.10.2
pymupdf>=1.24.0,<2.0.0
fastapi>=0.110.0,<1.0.0
uvicorn[standard]>=0.27.0,<1.0.0
pillow>=10.0.0,<12.0.0
+325
View File
@@ -0,0 +1,325 @@
"""marker-service — POST /convert: PDF → markdown + 추출 이미지 base64.
Phase 1B (2026-05-01) 텍스트만 응답, 이미지 폐기.
Phase 1B.5 `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
없는 stateless 변환기 유지 (fastapi NAS persist 담당).
D-1 (plan crawl-24x7-1, 2026-06-10) idle-unload 운영 전환:
MARKER_PRELOAD=0 : startup warmup ( /convert lazy load)
MARKER_IDLE_UNLOAD_MINUTES : N분 유휴 모델 해제 (0=비활성, 기존 동작)
/ready idle(미적재)에서도 200 fastapi depends_on service_healthy
lazy 모드에서 영구 미기동으로 굳는 방지. 503 warmup_failed 한정.
plan: ~/.claude/plans/piped-humming-crystal.md
"""
import base64
import gc
import hashlib
import io
import logging
import os
import threading
import time
from pathlib import Path
from fastapi import FastAPI, HTTPException, Response
from pydantic import BaseModel, Field
from marker.converters.pdf import PdfConverter
from marker.models import create_model_dict
from marker.output import text_from_rendered
import marker as marker_module
logger = logging.getLogger(__name__)
app = FastAPI()
os.environ.setdefault("HF_HOME", "/models/huggingface")
os.environ.setdefault("TORCH_HOME", "/models/torch")
_models = None
_converter = None
try:
import importlib.metadata
_engine_version = importlib.metadata.version("marker-pdf")
except Exception:
_engine_version = "unknown"
_warmup_done = False
_warmup_error: str | None = None
_warmup_lock = threading.Lock()
# D-1 idle-unload 상태 — 전이는 전부 _warmup_lock 아래
_PRELOAD = os.getenv("MARKER_PRELOAD", "1") != "0"
_IDLE_UNLOAD_MINUTES = int(os.getenv("MARKER_IDLE_UNLOAD_MINUTES", "0"))
_inflight = 0
_last_used = time.monotonic()
# 이미지 응답 cap. base64 응답 크기 폭주 방지. 사용자 PDF 풀 측정 (Phase 1D) 시
# 가장 이미지 많은 문서가 ~30건 수준 → 200 은 안전 마진. 초과 시 truncate flag 응답.
MAX_IMAGES_PER_DOC = int(os.getenv("MARKER_MAX_IMAGES_PER_DOC", "200"))
# per-image 최대 raw bytes (base64 전). 그래픽이 많은 풀페이지 스캔 회피.
MAX_BYTES_PER_IMAGE = int(os.getenv("MARKER_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
def _ensure_warmup() -> None:
"""첫 /convert 또는 startup hook 시 모델 로드. HF cache volume 활용."""
global _models, _converter, _warmup_done, _warmup_error
if _warmup_done:
return
with _warmup_lock:
if _warmup_done:
return
try:
logger.info("[marker-service] warmup start")
_models = create_model_dict()
_converter = PdfConverter(artifact_dict=_models)
_warmup_done = True
_warmup_error = None
logger.info(f"[marker-service] warmup done engine_version={_engine_version}")
except Exception as exc:
_warmup_error = f"{type(exc).__name__}: {exc}"
logger.exception("[marker-service] warmup failed")
raise
def _acquire_models():
"""warmup 보장 + inflight 진입을 원자적으로 — ensure 직후 reaper 가 해제하는 경합 차단."""
global _inflight
while True:
_ensure_warmup()
with _warmup_lock:
if _warmup_done:
_inflight += 1
return
# ensure 와 lock 재진입 사이에 unload 가 끼어든 희귀 경합 — 재시도
def _release_models():
global _inflight, _last_used
with _warmup_lock:
_inflight -= 1
_last_used = time.monotonic()
def _maybe_unload() -> None:
"""유휴 시 모델 해제. 변환 중(inflight>0)이면 절대 해제하지 않는다.
split 변환의 배치 사이 간격은 단위 N>=1 임계면 배치 사이 해제 없음.
"""
global _models, _converter, _warmup_done
with _warmup_lock:
if not _warmup_done or _inflight > 0:
return
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
return
_models = None
_converter = None
_warmup_done = False
gc.collect()
try:
import torch
torch.cuda.empty_cache()
except Exception:
pass
logger.info(f"[marker-service] idle-unload: 모델 해제 (유휴 {_IDLE_UNLOAD_MINUTES}분 초과)")
async def _idle_reaper():
import asyncio
while True:
await asyncio.sleep(60)
try:
_maybe_unload()
except Exception:
logger.exception("[marker-service] idle reaper 오류")
@app.on_event("startup")
async def startup():
"""startup hook — warmup 은 MARKER_PRELOAD 게이트 (D-1: lazy 기본 전환은 compose 가)."""
import asyncio
if _PRELOAD:
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
if _IDLE_UNLOAD_MINUTES > 0:
asyncio.create_task(_idle_reaper())
logger.info(f"[marker-service] idle-unload 활성: {_IDLE_UNLOAD_MINUTES}")
class ConvertRequest(BaseModel):
file_path: str
max_pages: int | None = None
# page range (1-based inclusive) — LargeDoc split 변환용. marker 내부 0-based 변환은
# convert() 에 격리 (page numbering invariant: DB/API=1-based, marker=0-based).
start_page: int | None = None
end_page: int | None = None
class ConvertImage(BaseModel):
"""marker 추출 이미지 1건. fastapi 가 NAS 에 쓰고 docimg:img_NNN 으로 ref 정규화."""
slug: str # marker 원본 slug (예: '_page_0_Picture_3.jpeg')
format: str # 'png' | 'jpeg' | 'webp' | 'gif'
width: int | None = None
height: int | None = None
bytes_b64: str # base64-encoded raw bytes
class ConvertResponse(BaseModel):
md_content: str
md_content_hash: str
engine: str
engine_version: str
elapsed_ms: int
raw_metrics: dict
images: list[ConvertImage] = Field(default_factory=list)
images_truncated: bool = False
@app.get("/health")
def health():
return {"status": "ok", "service": "marker-service"}
@app.get("/ready")
async def ready(response: Response):
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출.
D-1: idle(미적재) = 200. 503 warmup_failed 한정 lazy 모드에서 fastapi
depends_on service_healthy 영구 미기동으로 굳지 않게. 배포 검증에서
'status=ready' 단언하던 runbook 강제 warm 호출(/convert 1) 대체.
"""
if _warmup_error:
response.status_code = 503
return {
"status": "warmup_failed",
"engine": "marker",
"engine_version": _engine_version,
"error": _warmup_error,
}
if not _warmup_done:
return {
"status": "warming_up" if _PRELOAD else "idle",
"engine": "marker",
"engine_version": _engine_version,
"models_loaded": False,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
}
return {
"status": "ready",
"engine": "marker",
"engine_version": _engine_version,
"models_loaded": True,
"inflight": _inflight,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
}
@app.post("/convert", response_model=ConvertResponse)
async def convert(req: ConvertRequest):
p = Path(req.file_path)
if not p.is_file():
raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)})
if req.start_page is not None and req.end_page is not None:
if req.start_page < 1 or req.end_page < req.start_page:
raise HTTPException(
422,
detail={
"code": "bad_page_range",
"message": f"start_page={req.start_page} end_page={req.end_page}",
},
)
# D-1: warmup 보장 + inflight 진입 원자화 — 변환 중 reaper 해제 차단. 해제는 finally.
_acquire_models()
try:
start = time.monotonic()
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
converter = _converter
if req.start_page is not None and req.end_page is not None:
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
try:
rendered = converter(str(p))
except Exception as exc:
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
raise HTTPException(
status_code=422,
detail={
"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}",
},
) from exc
md_text, _meta, raw_images = text_from_rendered(rendered)
elapsed_ms = int((time.monotonic() - start) * 1000)
finally:
_release_models()
images_payload, truncated = _serialize_images(raw_images, str(p))
return ConvertResponse(
md_content=md_text,
md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(),
engine="marker",
engine_version=_engine_version,
elapsed_ms=elapsed_ms,
raw_metrics={
"page_count": getattr(rendered, "page_count", None),
"image_count_extracted": len(raw_images) if raw_images else 0,
"image_count_returned": len(images_payload),
},
images=images_payload,
images_truncated=truncated,
)
def _serialize_images(raw_images, src_path: str) -> tuple[list[ConvertImage], bool]:
"""marker 의 `_images` (dict[slug, PIL.Image]) → base64 ConvertImage 리스트.
가드:
- MAX_IMAGES_PER_DOC 초과 head 반환 + truncated=True
- per-image 직렬화 실패 해당 이미지만 skip + warn (전체 fail )
- per-image 결과 byte 크기가 MAX_BYTES_PER_IMAGE 초과 skip + warn
"""
if not raw_images:
return [], False
items = list(raw_images.items())
truncated = len(items) > MAX_IMAGES_PER_DOC
if truncated:
logger.warning(
f"[marker-service] images truncated path={src_path} "
f"total={len(items)} cap={MAX_IMAGES_PER_DOC}"
)
items = items[:MAX_IMAGES_PER_DOC]
out: list[ConvertImage] = []
for slug, pil_img in items:
try:
fmt_raw = (pil_img.format or "PNG").upper()
# WebP/GIF 도 marker 가 emit 가능하지만 본 1B.5 기준은 PNG/JPEG 우선.
# 알 수 없는 포맷이면 PNG 로 강제 (lossless re-encode).
fmt = fmt_raw if fmt_raw in {"PNG", "JPEG", "WEBP", "GIF"} else "PNG"
buf = io.BytesIO()
pil_img.save(buf, format=fmt)
raw_bytes = buf.getvalue()
if len(raw_bytes) > MAX_BYTES_PER_IMAGE:
logger.warning(
f"[marker-service] image too large skipped path={src_path} "
f"slug={slug} bytes={len(raw_bytes)} cap={MAX_BYTES_PER_IMAGE}"
)
continue
out.append(
ConvertImage(
slug=slug,
format=fmt.lower(),
width=pil_img.width,
height=pil_img.height,
bytes_b64=base64.b64encode(raw_bytes).decode("ascii"),
)
)
except Exception as exc:
logger.warning(
f"[marker-service] image serialize failed path={src_path} "
f"slug={slug}: {type(exc).__name__}: {exc}"
)
continue
return out, truncated
-45
View File
@@ -1,45 +0,0 @@
# mineru-service — MinerU 2.5 VLM 기반 PDF→markdown 추출기. marker-service 대체.
# 단일카드(RTX 4070 Ti S 16GB→PRO 4000 24GB) markdown VRAM ~10GB(marker)→~5GB(MinerU VLM).
#
# 공식 opendatalab/MinerU global Dockerfile 기반:
# FROM vllm/vllm-openai:v0.21.0 (CUDA 13.0). GPU 호스트 드라이버 595.71.05 / CUDA 13.2 가
# 13.0 런타임 지원 → cu129 폴백 불필요. vLLM 은 base 이미지가 제공하므로 mineru 는 [core] 만.
#
# 모델은 이미지에 굽지 않고 런타임 warmup 시 HF cache 볼륨으로 lazy 다운로드 (marker/ocr 선례 =
# 서버 .cache 볼륨). 이미지 슬림 유지 + server.py 반복 빌드 빠름 + 모델 볼륨 영속.
FROM vllm/vllm-openai:v0.21.0
# base 이미지의 ENTRYPOINT(vLLM OpenAI 서버)를 제거 — 우리는 uvicorn 으로 자체 FastAPI 기동.
ENTRYPOINT []
# opencv(libgl) + CJK 폰트(레이아웃/렌더 안전) + curl(healthcheck). 공식 Dockerfile 동일.
RUN apt-get update && apt-get install -y --no-install-recommends \
fonts-noto-core fonts-noto-cjk fontconfig libgl1 curl \
&& fc-cache -fv \
&& apt-get clean && rm -rf /var/lib/apt/lists/*
# mineru[core] — 공식 설치 라인. vLLM(vlm-engine 백엔드)은 base 가 이미 제공.
RUN python3 -m pip install -U 'mineru[core]>=3.2.1' --break-system-packages \
&& python3 -m pip cache purge
# 서비스 wrapper 의존성. base(vllm-openai)+mineru 가 fastapi/uvicorn/pillow 를 이미 제공 →
# pymupdf 만 추가(나머지 명시 핀은 base 의 pillow 12.x 를 불필요하게 다운그레이드해서 제거).
RUN python3 -m pip install --no-cache-dir --break-system-packages \
'pymupdf>=1.24.0,<2.0.0'
# MINERU_MODEL_SOURCE=huggingface = warmup 시 lazy 다운로드 (HF cache 볼륨에 영속).
# PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True = 단편화 완화(연구 권고, 거대 입력 OOM 완충).
ENV MINERU_MODEL_SOURCE=huggingface \
HF_HOME=/root/.cache/huggingface \
PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True
WORKDIR /app
# server.py = 무거운 pip 레이어 뒤에 COPY → 반복 빌드 시 캐시 적중(빠른 재빌드).
COPY server.py /app/server.py
EXPOSE 3301
# VLM 모델 lazy 다운로드(~2.4GB)+엔진 로드 여유로 start-period 길게.
HEALTHCHECK --start-period=900s --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:3301/ready || exit 1
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3301"]
-315
View File
@@ -1,315 +0,0 @@
"""mineru-service — POST /convert: PDF → markdown + 추출 이미지 base64.
marker-service 대체(MinerU 2.5 VLM). **marker /convert 계약을 그대로 복제**해서
marker_worker 엔드포인트만 바꾸면 되도록 한다(요청/응답 동일 shape):
요청: {file_path, max_pages?, start_page?, end_page?} (page = 1-based inclusive)
응답: {md_content, md_content_hash, engine, engine_version, elapsed_ms,
raw_metrics, images:[{slug, format, width, height, bytes_b64}], images_truncated}
설계 노트:
- **page range PyMuPDF 직접 슬라이스**해서 MinerU 넘긴다(start_page..end_page
0-based [a,b] 페이지만 담은 PDF bytes). MinerU `end_page_id=0 falsy 무시` 버그 회피.
40p 윈도우 분할은 marker_worker 그대로 담당. (검증: fitz 슬라이스 렌더 = 원본과 동일 품질.)
- ** 반드시 async 엔진(`aio_do_parse`) 사용.** 동기 `do_parse`(vllm-engine sync) 모델
(MinerU2.5-Pro-2605-1.2B)에서 layout 토큰 malformed md 산출(실측 G1-2). async
(`aio_do_parse` = vllm-async-engine, mineru CLI 쓰는 정상 경로) = 정상 출력.
- **이미지 = stateless**: marker 처럼 NAS write . MinerU md 박는 `![](images/<sha>.jpg)`
href 그대로 slug 으로 반환 fastapi(marker_worker) `_rewrite_image_refs` basename
매칭으로 `docimg:img_NNN` 정규화 + NAS persist. (계약 무변)
- **VRAM **: `MINERU_GPU_MEMORY_UTILIZATION`(vLLM 분율, 0.40~6GB 실측). compose
`MINERU_VIRTUAL_VRAM_SIZE` 무해(실측 정상)하나 출력엔 무관 캡은 분율로 충분.
backend=`vlm-engine`(기본 hybrid-engine 다중모델 로드 OOM, 반드시 명시).
엔진은 변환(또는 startup warmup) 1 로드 MinerU ModelSingleton 캐시. 단일 GPU
변환은 _engine_lock 으로 직렬화.
"""
import asyncio
import base64
import hashlib
import inspect
import io
import logging
import os
import time
import unicodedata
from pathlib import Path
import fitz # PyMuPDF — page 슬라이스 + 페이지수
from fastapi import FastAPI, HTTPException, Response
from PIL import Image
from pydantic import BaseModel, Field
logger = logging.getLogger("mineru-service")
logging.basicConfig(level=logging.INFO)
app = FastAPI()
try:
import importlib.metadata
_engine_version = importlib.metadata.version("mineru")
except Exception:
_engine_version = "unknown"
# ---- 설정 (compose env 로 override) -----------------------------------------
MINERU_BACKEND = os.getenv("MINERU_BACKEND", "vlm-engine")
MINERU_LANG = os.getenv("MINERU_LANG", "korean")
GPU_MEM_UTIL = float(os.getenv("MINERU_GPU_MEMORY_UTILIZATION", "0.40"))
MAX_IMAGES_PER_DOC = int(os.getenv("MINERU_MAX_IMAGES_PER_DOC", "200"))
MAX_BYTES_PER_IMAGE = int(os.getenv("MINERU_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
MAX_PAGES_HARD = int(os.getenv("MINERU_MAX_PAGES_HARD", "200")) # 1-shot max_pages 안전장치
_PRELOAD = os.getenv("MINERU_PRELOAD", "1") != "0"
# ---- 엔진 상태 ---------------------------------------------------------------
_warmup_done = False
_warmup_error: str | None = None
# 단일 GPU async 엔진 — warmup + convert 직렬화(엔진 1개, 임시디렉토리/싱글톤 경합 차단).
_engine_lock = asyncio.Lock()
async def _run_mineru(pdf_bytes: bytes, lang: str) -> tuple[str, list[dict]]:
"""슬라이스된 PDF bytes → (markdown, 이미지 dict 리스트). **async 엔진 경로.**
호출자(_ensure_warmup / convert) _engine_lock 잡은 상태로 호출한다.
이미지 dict: {slug, format, width, height, raw_bytes}. slug = md href 그대로.
"""
import glob
import tempfile
from mineru.cli.common import aio_do_parse
with tempfile.TemporaryDirectory(prefix="mineru_") as td:
candidate = {
"output_dir": td,
"pdf_file_names": ["doc"],
"pdf_bytes_list": [pdf_bytes],
"p_lang_list": [lang],
"backend": MINERU_BACKEND,
"formula_enable": True,
"table_enable": True,
"f_dump_md": True,
"f_dump_content_list": True,
"f_dump_middle_json": False,
"f_dump_model_output": False,
"f_dump_orig_pdf": False,
"f_draw_layout_bbox": False,
"f_draw_span_bbox": False,
"gpu_memory_utilization": GPU_MEM_UTIL,
}
sig = inspect.signature(aio_do_parse)
has_var_kw = any(
p.kind == inspect.Parameter.VAR_KEYWORD for p in sig.parameters.values()
)
kwargs = candidate if has_var_kw else {
k: v for k, v in candidate.items() if k in sig.parameters
}
await aio_do_parse(**kwargs)
md_files = sorted(glob.glob(f"{td}/**/*.md", recursive=True))
if not md_files:
raise RuntimeError("mineru produced no markdown output")
md_path = Path(md_files[0])
md_text = md_path.read_text(encoding="utf-8", errors="replace")
images: list[dict] = []
img_dir = md_path.parent / "images"
if img_dir.is_dir():
for img_file in sorted(img_dir.iterdir()):
if not img_file.is_file():
continue
raw = img_file.read_bytes()
slug = f"images/{img_file.name}" # md href 와 정확히 일치
w = h = None
try:
with Image.open(io.BytesIO(raw)) as im:
w, h = im.width, im.height
fmt = (im.format or "JPEG").lower()
except Exception:
fmt = img_file.suffix.lstrip(".").lower() or "jpeg"
images.append(
{"slug": slug, "format": fmt, "width": w, "height": h, "raw_bytes": raw}
)
return md_text, images
async def _ensure_warmup() -> None:
"""첫 /convert 또는 startup hook 시 1-page 합성 PDF 로 엔진+모델 적재."""
global _warmup_done, _warmup_error
if _warmup_done:
return
async with _engine_lock:
if _warmup_done:
return
try:
logger.info("[mineru-service] warmup start (async engine load + model fetch)")
doc = fitz.open()
page = doc.new_page()
page.insert_text((72, 72), "MinerU warmup.")
warmup_bytes = doc.tobytes()
doc.close()
await _run_mineru(warmup_bytes, MINERU_LANG)
_warmup_done = True
_warmup_error = None
logger.info(f"[mineru-service] warmup done engine_version={_engine_version}")
except Exception as exc:
_warmup_error = f"{type(exc).__name__}: {exc}"
logger.exception("[mineru-service] warmup failed")
raise
@app.on_event("startup")
async def startup():
if _PRELOAD:
asyncio.create_task(_ensure_warmup())
# ---- 계약 모델 (marker 와 동일 shape) ----------------------------------------
class ConvertRequest(BaseModel):
file_path: str
max_pages: int | None = None
start_page: int | None = None # 1-based inclusive
end_page: int | None = None # 1-based inclusive
class ConvertImage(BaseModel):
slug: str
format: str
width: int | None = None
height: int | None = None
bytes_b64: str
class ConvertResponse(BaseModel):
md_content: str
md_content_hash: str
engine: str
engine_version: str
elapsed_ms: int
raw_metrics: dict
images: list[ConvertImage] = Field(default_factory=list)
images_truncated: bool = False
@app.get("/health")
def health():
return {"status": "ok", "service": "mineru-service"}
@app.get("/ready")
async def ready(response: Response):
"""marker /ready 의미 복제: warmup_failed 만 503, idle/warming=200(depends_on 굳음 방지)."""
if _warmup_error:
response.status_code = 503
return {"status": "warmup_failed", "engine": "mineru",
"engine_version": _engine_version, "error": _warmup_error}
if not _warmup_done:
return {"status": "warming_up" if _PRELOAD else "idle", "engine": "mineru",
"engine_version": _engine_version, "models_loaded": False}
return {"status": "ready", "engine": "mineru",
"engine_version": _engine_version, "models_loaded": True}
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. ocr/server.py 와 동일 패턴
(필수 한글명 파일은 NFS=NFD 저장이라 DB NFC 경로로는 is_file=False)."""
for c in (file_path,
unicodedata.normalize("NFD", file_path),
unicodedata.normalize("NFC", file_path)):
p = Path(c)
if p.exists():
return p
parent = Path(file_path).parent
if parent.exists():
target = unicodedata.normalize("NFC", Path(file_path).name)
for child in parent.iterdir():
if unicodedata.normalize("NFC", child.name) == target:
return child
return None
def _slice_pdf(src_path: Path, start_page: int | None, end_page: int | None,
max_pages: int | None) -> tuple[bytes, int]:
"""요청 page 범위(1-based inclusive)만 담은 새 PDF bytes + 변환 페이지수 반환."""
with fitz.open(src_path) as src:
n = src.page_count
if start_page is not None and end_page is not None:
a = max(0, start_page - 1)
b = min(n - 1, end_page - 1)
else:
a = 0
cap = max_pages if max_pages is not None else MAX_PAGES_HARD
b = min(n - 1, cap - 1)
if b < a:
raise HTTPException(422, detail={"code": "bad_page_range",
"message": f"a={a} b={b} n={n}"})
out = fitz.open()
out.insert_pdf(src, from_page=a, to_page=b)
pdf_bytes = out.tobytes()
out.close()
return pdf_bytes, (b - a + 1)
def _serialize_images(images: list[dict], src_path: str) -> tuple[list[ConvertImage], bool]:
"""이미지 dict 리스트 → base64 ConvertImage 리스트 (marker 가드 동일)."""
truncated = len(images) > MAX_IMAGES_PER_DOC
if truncated:
logger.warning(f"[mineru-service] images truncated path={src_path} "
f"total={len(images)} cap={MAX_IMAGES_PER_DOC}")
images = images[:MAX_IMAGES_PER_DOC]
out: list[ConvertImage] = []
for img in images:
raw = img["raw_bytes"]
if len(raw) > MAX_BYTES_PER_IMAGE:
logger.warning(f"[mineru-service] image too large skipped path={src_path} "
f"slug={img['slug']} bytes={len(raw)} cap={MAX_BYTES_PER_IMAGE}")
continue
out.append(ConvertImage(
slug=img["slug"], format=img["format"],
width=img.get("width"), height=img.get("height"),
bytes_b64=base64.b64encode(raw).decode("ascii"),
))
return out, truncated
@app.post("/convert", response_model=ConvertResponse)
async def convert(req: ConvertRequest):
p = _resolve_path(req.file_path)
if p is None or not p.is_file():
raise HTTPException(404, detail={"code": "file_not_found", "message": req.file_path})
if req.start_page is not None and req.end_page is not None:
if req.start_page < 1 or req.end_page < req.start_page:
raise HTTPException(422, detail={"code": "bad_page_range",
"message": f"start_page={req.start_page} end_page={req.end_page}"})
pdf_bytes, page_count = _slice_pdf(p, req.start_page, req.end_page, req.max_pages)
await _ensure_warmup() # 엔진 로드 보장(내부에서 _engine_lock 잡았다 놓음)
async with _engine_lock: # 실제 변환 직렬화(단일 GPU)
start = time.monotonic()
try:
md_text, raw_images = await _run_mineru(pdf_bytes, MINERU_LANG)
except HTTPException:
raise
except Exception as exc:
logger.exception(f"[mineru-service] conversion failed path={p}: {exc}")
raise HTTPException(422, detail={"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}"}) from exc
elapsed_ms = int((time.monotonic() - start) * 1000)
images_payload, truncated = _serialize_images(raw_images, str(p))
return ConvertResponse(
md_content=md_text,
md_content_hash=hashlib.sha256(md_text.encode("utf-8")).hexdigest(),
engine="mineru",
engine_version=_engine_version,
elapsed_ms=elapsed_ms,
raw_metrics={
"page_count": page_count,
"image_count_extracted": len(raw_images),
"image_count_returned": len(images_payload),
},
images=images_payload,
images_truncated=truncated,
)
-106
View File
@@ -1,106 +0,0 @@
"""_ENG 매처 노이즈 차단 단위테스트 (asme-item-decomp-1 D1).
핵심 불변식: 영문 구조 헤딩 매처(_ENG)
- (음성) 본문 중간 'Part III to demonstrate…' 같은 소문자 문장연속을 가짜 절로 잡지 않고,
- (양성) 진짜 영문 구조 헤딩(PART PG / Part 1 / Section 3.31 / Part UHX ) 탐지하며,
- (ATX 보존) _ENG 축소가 ATX 파트(`# PART PG`)·항목(`#### PG-1`)을 떨구지 않는다(ATX 우선).
pytest + 단독 실행 양쪽 지원:
PYTHONPATH=. python3 tests/hier_decomp/test_eng_matcher.py
"""
from __future__ import annotations
try: # pytest 경로 (앱 패키지)
from app.services.hier_decomp.builder import _detect_heading, build_hier_tree
except Exception: # 단독 실행 (앱 deps 없이 builder.py 직접 로드 — stdlib only)
import importlib.util
import pathlib
import sys
_bp = pathlib.Path(__file__).resolve().parents[2] / "app/services/hier_decomp/builder.py"
_spec = importlib.util.spec_from_file_location("_hier_builder_t", _bp)
_m = importlib.util.module_from_spec(_spec)
sys.modules[_spec.name] = _m # dataclass __module__ 해소
_spec.loader.exec_module(_m)
_detect_heading, build_hier_tree = _m._detect_heading, _m.build_hier_tree
# ── 음성: 본문 문장은 헤딩 아님 (가짜 절 차단 — D1 회귀의 핵심) ──
NEG = [
"Part III to demonstrate to the satisfaction of the represen-",
"Section V of the agreement applies to all parties",
"Part IV is hereby amended as follows",
"Article II shall be interpreted broadly",
"Chapter 3 describes the general method used here",
]
# ── 양성: 진짜 영문 구조 헤딩 ──
POS = [
"PART PG GENERAL REQUIREMENTS FOR ALL METHODS OF CONSTRUCTION",
"Part 1",
"Part PFH",
"Part UHX (TUBESHEET CALCULATION)",
"Section 3.31",
"Chapter 1 Introduction",
"Article 5 Definitions",
]
def test_eng_negatives_not_detected():
for line in NEG:
assert _detect_heading(line) is None, f"가짜 절로 잡힘: {line!r}"
def test_eng_positives_detected_as_chapter():
for line in POS:
r = _detect_heading(line)
assert r is not None, f"진짜 헤딩 미탐지: {line!r}"
_lvl, _title, nt = r
assert nt == "chapter", f"{line!r} node_type={nt}"
def test_atx_part_and_item_still_detected():
# _ENG 축소가 진짜 ATX 파트/항목을 떨구지 않음 (ATX 우선 탐지)
r = _detect_heading("# PART PG GENERAL REQUIREMENTS FOR ALL METHODS OF CONSTRUCTION")
assert r is not None
lvl, title, nt = r
assert lvl == 1 and nt is None, r # ATX = level(# 수), node_type None
assert title.startswith("PART PG")
r2 = _detect_heading("#### PG-1 SCOPE")
assert r2 is not None and r2[0] == 4 and r2[2] is None, r2
def test_build_hier_tree_drops_false_part_section():
# 본문에 'Part III to demonstrate…' 가 섞여도 가짜 절이 생기지 않음
md = (
"# PART PG GENERAL REQUIREMENTS\n"
"#### PG-1 SCOPE\n"
"The rules cover power boilers.\n"
"Part III to demonstrate to the satisfaction of the representative\n"
"that the requirements are met, the manufacturer shall proceed...\n"
"#### PG-2 SERVICE LIMITATIONS\n"
"body of pg-2 here.\n"
)
titles = [n.section_title for n in build_hier_tree(md) if n.section_title]
assert any(t.startswith("PART PG") for t in titles), titles
assert any(t.startswith("PG-1") for t in titles), titles
assert any(t.startswith("PG-2") for t in titles), titles
assert not any("demonstrate" in (t or "") for t in titles), f"가짜 절 누출: {titles}"
if __name__ == "__main__":
import sys
import traceback
fns = [(k, v) for k, v in sorted(globals().items()) if k.startswith("test_") and callable(v)]
failed = 0
for name, fn in fns:
try:
fn()
print(f"PASS {name}")
except Exception as e:
failed += 1
print(f"FAIL {name}: {e}")
traceback.print_exc()
print(f"\n{len(fns) - failed}/{len(fns)} passed")
sys.exit(1 if failed else 0)