Compare commits

..

4 Commits

Author SHA1 Message Date
hyungi d89f046121 fix(ds-watch): 실기기 설치용 서명 허용 — 프로젝트 기본 CODE_SIGNING 차단 제거
헤드리스 시뮬 빌드용으로 박아둔 CODE_SIGNING_ALLOWED/REQUIRED=NO 가 Xcode 실기기
설치를 막아, 프로젝트 기본에서 제거(자동 서명 유지). 시뮬 빌드는 CLI 플래그로 계속 처리.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 15:11:47 +09:00
hyungi 91a540d533 feat(ds-shell): 맥·iOS 웹 래퍼 앱 — document.hyungi.net WKWebView + DS 아이콘
- 맥·iOS 2타깃, WKWebView 로 웹 UI 100% 재사용(2026-06-15 결정: 맥/아이폰=웹 래퍼)
- 영속 쿠키(로그인 유지), 첨부 응답 다운로드 처리, 업로드는 네이티브 피커 자동
- 맥 창 off-screen 가드(분리 모니터 좌표 저장 시 중앙 복귀)
- DS 초록 라운드 아이콘(맥=라운드/iOS=풀블리드, 1024px 생성)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 15:05:14 +09:00
hyungi c79bf41a76 feat(ds-watch): Apple Watch 앱 신규 — 4기능 셸 + 공부/할일/브리핑/이드 라이브 결선 + DS 아이콘
- standalone watchOS(WKApplication + WKWatchOnly), 다크 OLED, xcodegen 단일 타깃
- 4기능 = 이드(AI채팅)·공부(암기카드)·할일·브리핑
- 라이브: 공부 /study-cards(due·rate·flag) · 할일 /events(today·complete)
  · 브리핑 /briefing/latest · 이드 /eid/chat(SSE 누적, unavailable 처리)
- 1회 로그인(access 메모리 + refresh 쿠키 7일 영속) + 401 자동 refresh+재시도
- 햅틱 피드백 + 정직한 로딩/빈/오류 상태 + DS 초록 아이콘(원형 마스킹)
- 맥·아이폰은 웹 래퍼로(2026-06-15 결정), 순수 네이티브는 워치 전용

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 15:05:14 +09:00
hyungi f527c63232 feat(ds-app): macOS 앱 마무리 — 업로드·다운로드·로그아웃 + 4섹션 페이지
- FU-C 멀티파트 업로드(DSClient.uploadDocument + LiveDSClient 401 재시도 공유 + 툴바/상태바)
- FU-D 네이티브 다운로드(NSSavePanel + URLSession, ?token= 미노출, 임시파일 정리)
- 로그아웃(AppModel.logout 세션 전체 초기화 + 계정 메뉴)
- 셸 2-column 재구성: 질문/이드 제거, 홈 코크핏 + 문서 3-pane 컬럼 브라우저
  (인스펙터 TL;DR/핵심점/심층/불일치) + 도메인 필터 전체 load-all
- 적대 리뷰 반영(stale 401 데모션·다운로드 임시파일 정리·메모 저장 saveMemo 경유·도메인 필터 선택 정합)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 14:52:29 +09:00
129 changed files with 3478 additions and 9145 deletions
+7 -10
View File
@@ -289,16 +289,13 @@ class AIClient:
return response.json()
async def _call_chat(self, model_config, prompt: str) -> str:
"""OpenAI 호환 API 호출 (R6: 무동의 클라우드 폴백 제거).
이전엔 primary(맥미니) TimeoutException/ConnectError 시 동의·과금 통제 없이
self.ai.fallback(Claude API)로 자동 전환 → 개인 문서/쿼리/메모가 Anthropic 으로
silent egress. on-prem 추론 프라이버시 계약 위반이라 봉쇄한다. 실패는 그대로 전파:
배치 워커는 재시도/StageDeferred(R3·queue_consumer), interactive 호출자는 5xx 표면화
(documents.analyze 등 이미 502/504 변환). 클라우드는 premium explicit-trigger
(summarize force_premium) 또는 call_fallback 명시 호출로만 — 자동 진입 금지.
"""
return await self._request(model_config, prompt)
"""OpenAI 호환 API 호출 + 자동 폴백"""
try:
return await self._request(model_config, prompt)
except (httpx.TimeoutException, httpx.ConnectError):
if model_config == self.ai.primary:
return await self._request(self.ai.fallback, prompt)
raise
async def _request(self, model_config, prompt: str, system: str | None = None) -> str:
"""단일 모델 API 호출 (OpenAI 호환 + Anthropic Messages API).
-6
View File
@@ -195,14 +195,8 @@ async def regenerate(
date 미지정 시 오늘 KST. 같은 날 row 존재 시 transaction 안에서 삭제 후 신규 생성.
응답 status='success' | 'partial' | 'failed' | 'empty'.
"""
from core.config import settings
from workers.briefing_worker import run
# held(정책상 정상 보류)를 409 로 표면화 (R8) — digest.py 정본 대칭. 이전엔 briefing_worker.run()
# 이 held/timeout/exception 셋 다 None 반환 → API 가 셋 다 500 으로 오보(silent-state-conflation).
if "briefing" in settings.pipeline_held_stages:
raise HTTPException(status_code=409, detail="briefing 단계가 일시 보류(held) 상태입니다")
result = await run(target_date=date)
if result is None:
raise HTTPException(status_code=500, detail="briefing 워커 실행 실패 (로그 확인)")
+26 -38
View File
@@ -69,19 +69,6 @@ def _upload_error(status_code: int, error_code: str, message: str) -> HTTPExcept
)
async def get_live_document(session: AsyncSession, doc_id: int) -> Document:
"""soft-delete(deleted_at) 가드 포함 문서 조회 — 없거나 삭제됐으면 404 (R7).
조회/수정 경로는 deleted_at 을 일관 가드하나 파일/콘텐츠 서빙 엔드포인트가 누락 →
삭제 문서의 원본/preview/전문이 doc_id(+유효 토큰)만으로 노출되던 비대칭. '경로마다
deleted_at 기억'에 의존하지 않게 헬퍼로 구조 강제(추가될 서빙 경로도 자동 보호).
"""
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
return doc
async def _near_dup_scan_bg(doc_id: int) -> None:
"""B-3: post-upload near_duplicate 스캔 (BackgroundTask). 자체 세션, best-effort.
@@ -851,7 +838,9 @@ async def get_document_file(
# 일반 Bearer 헤더 인증 시도
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await get_live_document(session, doc_id)
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# note(메모)는 물리 파일이 없음
if not doc.file_path:
@@ -954,8 +943,10 @@ async def get_document_image_raw(
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
# 문서 존재 확인 (image_key 만 있고 doc 가 사라진 케이스 차단 + soft-delete 가드)
doc = await get_live_document(session, doc_id)
# 문서 존재 확인 (image_key 만 있고 doc 가 사라진 케이스 차단)
doc = await session.get(Document, doc_id)
if doc is None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
img = await session.scalar(
select(DocumentImage).where(
@@ -1166,10 +1157,8 @@ async def upload_document(
doc.duplicate_of = canonical.id
canonical.duplicate_count = (canonical.duplicate_count or 0) + 1
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리.
# G2: 첫 stage=presegment (extract 前 번들 PDF 분할, 후보 A 검증완료 2026-06-18).
# 非PDF/단일은 presegment 가 무변 통과 → extract. 번들 PDF 만 N 자식 분할(worker-side gating).
await enqueue_stage(session, doc.id, "presegment")
# document + processing_queue 는 단일 트랜잭션으로 묶어 원자적 정리
await enqueue_stage(session, doc.id, "extract")
await session.commit()
except Exception:
# DB 예외 시 session 은 get_session 컨텍스트 종료로 자동 rollback.
@@ -1368,8 +1357,9 @@ async def save_document_content(
body: dict = None,
):
"""Markdown 원본 파일 저장 + extracted_text 갱신"""
# soft-delete 문서엔 쓰기 차단 (R7 — 삭제 문서 resurrect / NAS 재기록 방지)
doc = await get_live_document(session, doc_id)
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
if doc.file_format not in ("md", "txt"):
raise HTTPException(status_code=400, detail="편집 가능한 포맷이 아닙니다 (md, txt만 가능)")
@@ -1409,7 +1399,9 @@ async def get_document_preview(
else:
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await get_live_document(session, doc_id)
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
preview_path = Path(settings.nas_mount_path) / "PKM" / ".preview" / f"{doc_id}.pdf"
if not preview_path.exists():
@@ -1435,24 +1427,18 @@ async def delete_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
delete_file: bool = Query(False, description="NAS 원본도 삭제 (grace 후 retention sweep 이 물리삭제)"),
delete_file: bool = Query(False, description="NAS 파일도 함께 삭제"),
):
"""문서 삭제. 기본: soft-delete(숨김, 파일 보존). delete_file=true: purge 예약 (R7)."""
doc = await get_live_document(session, doc_id)
"""문서 삭제 (기본: DB만 삭제, 파일 유지)"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# soft-delete(숨김). delete_file=true 면 purge_requested_at 마커를 추가로 set —
# retention sweep cron(document_purge_sweep)이 grace(30일) 경과 후 NAS 원본 물리삭제
# + audit-log. ★일반 숨김(delete_file=false)은 파일 보존 = undelete 가능. sweep 는
# deleted_at 이 아니라 purge_requested_at 기준이라 단순 숨김이 영구삭제되지 않는다.
now = datetime.now(timezone.utc)
doc.deleted_at = now
if delete_file:
doc.purge_requested_at = now
# soft-delete (물리 파일은 cleanup job에서 나중에 정리)
doc.deleted_at = datetime.now(timezone.utc)
await session.commit()
if delete_file:
return {"message": f"문서 {doc_id} 삭제 — NAS 원본은 30일 후 정리 예약"}
return {"message": f"문서 {doc_id} soft-delete 완료 (파일 보존)"}
return {"message": f"문서 {doc_id} soft-delete 완료"}
@router.get("/{doc_id}/content")
@@ -1462,7 +1448,9 @@ async def get_document_content(
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 전문 텍스트 반환 (서비스 호출용)."""
doc = await get_live_document(session, doc_id)
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
raw_text = doc.extracted_text or ""
content = raw_text[:15000]
+5 -5
View File
@@ -21,7 +21,7 @@ from zoneinfo import ZoneInfo
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, func, or_, select
from sqlalchemy import and_, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
@@ -388,10 +388,10 @@ async def list_events(
)
base = select(Event).where(and_(*where))
# R10: 전체 ID 로딩 후 len() 대신 DB COUNT 푸시다운 (행 수 선형 메모리/전송 비용 제거).
total = (
await session.execute(select(func.count(Event.id)).where(and_(*where)))
).scalar() or 0
total_q = await session.execute(
select(Event.id).where(and_(*where))
)
total = len(total_q.scalars().all())
rows = await session.execute(
base.order_by(Event.created_at.desc())
+1 -5
View File
@@ -6,7 +6,6 @@ Bearer token 보호 (settings.internal_worker_token).
"""
from __future__ import annotations
import hmac
import logging
from fastapi import APIRouter, Depends, Header, HTTPException, Path, Response, status
@@ -29,10 +28,7 @@ def _verify_token(authorization: str | None = Header(default=None)) -> None:
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="missing Bearer token")
token = authorization[7:].strip()
# 상수시간 비교 (R7) — 일반 != 는 첫 불일치에서 단락돼 prefix 길이로 바이트 추정 가능한
# timing side-channel. 이 토큰이 RAG 정답 포함 endpoint 를 보호하므로 compare_digest 로
# 통일(search.py 정본과 일치).
if not hmac.compare_digest(token, settings.internal_worker_token):
if token != settings.internal_worker_token:
raise HTTPException(status_code=403, detail="invalid token")
+62 -25
View File
@@ -473,35 +473,72 @@ async def get_facet_counts(
result = FacetCountsResponse(company=[], topic=[], year=[], doctype=[])
# R10: 4 facet 블록 중복 제거 — 적용된 facet 필터(값 있는 것만)를 모아 각 축 집계 시
# '자기 자신 축'만 제외하고 적용하는 헬퍼로. 쿼리/자기제외/order_by/value 매핑 모두 동일.
applied: dict = {}
if facet_company:
applied["company"] = Document.facet_company == facet_company
# company counts (다른 facet 필터 적용, 자기 자신 제외)
q_company = base_query()
if facet_topic:
applied["topic"] = Document.facet_topic == facet_topic
q_company = q_company.where(Document.facet_topic == facet_topic)
if facet_year:
applied["year"] = Document.facet_year == facet_year
q_company = q_company.where(Document.facet_year == facet_year)
if facet_doctype:
applied["doctype"] = Document.facet_doctype == facet_doctype
q_company = q_company.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_company, func.count())
.where(Document.facet_company != None) # noqa: E711
.where(Document.id.in_(q_company.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_company)
.order_by(func.count().desc())
)
result.company = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
async def _facet_count(name, facet_col, order_by, value_fn):
q = base_query()
for k, cond in applied.items():
if k != name: # 자기 자신 facet 필터는 제외 (다른 축만 적용)
q = q.where(cond)
rows = await session.execute(
select(facet_col, func.count())
.where(facet_col != None) # noqa: E711
.where(Document.id.in_(q.with_only_columns(Document.id).subquery().select()))
.group_by(facet_col)
.order_by(order_by)
)
return [FacetCountItem(value=value_fn(r[0]), count=r[1]) for r in rows]
# topic counts
q_topic = base_query()
if facet_company:
q_topic = q_topic.where(Document.facet_company == facet_company)
if facet_year:
q_topic = q_topic.where(Document.facet_year == facet_year)
if facet_doctype:
q_topic = q_topic.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_topic, func.count())
.where(Document.facet_topic != None) # noqa: E711
.where(Document.id.in_(q_topic.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_topic)
.order_by(func.count().desc())
)
result.topic = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
result.company = await _facet_count("company", Document.facet_company, func.count().desc(), lambda v: v)
result.topic = await _facet_count("topic", Document.facet_topic, func.count().desc(), lambda v: v)
result.year = await _facet_count("year", Document.facet_year, Document.facet_year.desc(), lambda v: str(v))
result.doctype = await _facet_count("doctype", Document.facet_doctype, func.count().desc(), lambda v: v)
# year counts
q_year = base_query()
if facet_company:
q_year = q_year.where(Document.facet_company == facet_company)
if facet_topic:
q_year = q_year.where(Document.facet_topic == facet_topic)
if facet_doctype:
q_year = q_year.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_year, func.count())
.where(Document.facet_year != None) # noqa: E711
.where(Document.id.in_(q_year.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_year)
.order_by(Document.facet_year.desc())
)
result.year = [FacetCountItem(value=str(r[0]), count=r[1]) for r in rows]
# doctype counts
q_doctype = base_query()
if facet_company:
q_doctype = q_doctype.where(Document.facet_company == facet_company)
if facet_topic:
q_doctype = q_doctype.where(Document.facet_topic == facet_topic)
if facet_year:
q_doctype = q_doctype.where(Document.facet_year == facet_year)
rows = await session.execute(
select(Document.facet_doctype, func.count())
.where(Document.facet_doctype != None) # noqa: E711
.where(Document.id.in_(q_doctype.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_doctype)
.order_by(func.count().desc())
)
result.doctype = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
return result
+2 -57
View File
@@ -300,13 +300,9 @@ async def list_memos(
base = base.where(Document.pinned == pinned)
if tag:
# 파라미터 바인딩 (R7) — f-string 으로 사용자 tag 를 JSON 배열 리터럴에 직접 삽입하면
# tag 안 " 나 ] 가 JSON 을 깨 500 + 필터 의미 변형. jsonb_build_array 로 tag 를
# 바인드 파라미터로 전달(@> JSONB containment).
tag_arr = func.jsonb_build_array(tag)
base = base.where(
Document.user_tags.op("@>")(tag_arr)
| Document.ai_tags.op("@>")(tag_arr)
Document.user_tags.op("@>")(f'["{tag}"]')
| Document.ai_tags.op("@>")(f'["{tag}"]')
)
count_query = select(func.count()).select_from(base.subquery())
@@ -692,57 +688,6 @@ async def dismiss_event_suggestion(
return _to_memo_response(doc)
@router.post("/{memo_id}/promote-to-document", status_code=201)
async def promote_memo_to_document(
memo_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""메모 1건 → 문서함 정식 Document 로 승격 ("자료로 보내기", P1).
동작 (in-place 변환 row 생성 X, extracted_text/태그/이력 보존):
- source_channel memo/voice/hermes 'manual' (메모 목록서 빠지고 문서함 진입)
- file_type 'note' 'editable' (문서함 목록 필터 `file_type != 'note'` 통과)
- category='library' (자료실), content_origin='manual'
- classify/embed/chunk 재큐 도메인 재부여 + 요약/심층분석(26B escalate) + 임베딩/청크 갱신
P2 'draft' 워커(후속) 거친 메모를 구조화 마크다운(md_content)으로 정리 예정.
"""
doc = await session.get(Document, memo_id)
if (
not doc
or doc.deleted_at is not None
or doc.source_channel not in ("memo", "voice", "hermes")
or doc.file_type != "note"
):
raise HTTPException(status_code=404, detail="승격할 메모를 찾을 수 없습니다")
now = datetime.now(timezone.utc)
doc.source_metadata = {
**(doc.source_metadata or {}),
"promoted_from_memo": True,
"promoted_at": now.isoformat(),
"original_source_channel": doc.source_channel,
# P2: memo_draft_worker 가 집어 26B 로 구조화 마크다운(md_content) 생성.
"needs_draft": True,
}
doc.source_channel = "manual"
doc.file_type = "editable"
doc.category = "library"
doc.content_origin = "manual"
doc.updated_at = now
# 문서 컨텍스트로 재처리 — 도메인 재부여 + 요약/심층분석 + 임베딩/청크 갱신.
await _enqueue_ai_stages(session, doc.id)
await session.commit()
await session.refresh(doc)
return {
"document_id": doc.id,
"category": doc.category,
"message": "문서함으로 보냈습니다. AI 분류·요약·심층분석을 진행합니다.",
}
# ─── Memo Intake Upgrade PR-2C: voice upload ───
+2 -10
View File
@@ -65,8 +65,7 @@ async def create_source(
):
from core.url_validator import validate_feed_url
try:
# getaddrinfo(DNS) 는 blocking — 이벤트 루프 점유 방지 위해 off-thread (R5)
await asyncio.to_thread(validate_feed_url, body.feed_url)
validate_feed_url(body.feed_url)
except ValueError as e:
raise HTTPException(status_code=422, detail=f"feed_url 검증 실패: {e}")
source = NewsSource(**body.model_dump())
@@ -195,17 +194,10 @@ async def trigger_collect(
if _collect_lock.locked():
raise HTTPException(status_code=429, detail="수집이 이미 진행 중입니다")
# TOCTOU 제거 (R9) — 기존엔 locked() 체크 후 실제 acquire 가 별도 task 안에서 일어나, 그
# 사이 다른 요청이 끼어들어 이중 수집 task 가 생길 수 있었다. 핸들러에서 동기적으로(uncontended
# Lock.acquire 는 이벤트루프 양보 없이 즉시 완료) acquire 하고 task 의 finally 에서 release.
await _collect_lock.acquire()
async def _run_with_lock():
try:
async with _collect_lock:
from workers.news_collector import run
await run()
finally:
_collect_lock.release()
asyncio.create_task(_run_with_lock())
return {"message": "뉴스 수집 시작됨"}
-1
View File
@@ -108,7 +108,6 @@ class BackgroundJobItem(BaseModel):
stale = running 인데 heartbeat 오래 끊김(프로세스 사망 추정)."""
id: int
kind: str
machine: str
label: str | None
state: Literal["running", "done", "failed"]
processed: int
+3 -7
View File
@@ -291,7 +291,7 @@ async def search(
content={
"error_reason": "unknown_embedding_backend",
"backend_requested": embedding_backend,
"allowed": ["baseline"],
"allowed": ["baseline", "cand_me5_large_inst", "cand_snowflake_l_v2"],
"detail": msg,
},
)
@@ -710,9 +710,7 @@ async def ask(
# 30s 로 align → classifier 동작 안정. ask 응답 latency 상한 ↑ 의도.
try:
classifier_result = await asyncio.wait_for(classifier_task, timeout=30.0)
except asyncio.CancelledError:
raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception:
except (asyncio.TimeoutError, Exception):
classifier_result = ClassifierResult("timeout", None, [], [], 0.0)
defense_log["classifier"] = {
@@ -874,9 +872,7 @@ async def ask(
# → classifier 와 동일 패턴 (search.py:522 가 6s→15s swap 했던 case). 10s 로 align.
try:
verifier_result = await asyncio.wait_for(verifier_task, timeout=10.0)
except asyncio.CancelledError:
raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception:
except (asyncio.TimeoutError, Exception):
verifier_result = VerifierResult("timeout", [], 0.0)
# Verifier contradictions → grounding flags 머지 (prefix 로 구분, severity 3단계)
+1 -10
View File
@@ -1009,16 +1009,7 @@ async def submit_attempt(
# PR-10: 세션 연동. 기본은 None.
quiz_session: StudyQuizSession | None = None
if body.quiz_session_id is not None:
# FOR UPDATE 로 행 잠금 (R9) — 모바일 더블탭/재시도로 같은 세션에 동시 제출이 들어오면
# 둘 다 cursor=N 을 읽고 둘 다 cursor+1·count 가산하는 race(이중 가산). 잠금으로 직렬화 →
# 두 번째 제출은 첫 commit 후 cursor=N+1 을 보고 cursor 불일치 409 로 거부된다.
quiz_session = (
await session.execute(
select(StudyQuizSession)
.where(StudyQuizSession.id == body.quiz_session_id)
.with_for_update()
)
).scalar_one_or_none()
quiz_session = await session.get(StudyQuizSession, body.quiz_session_id)
if quiz_session is None or quiz_session.user_id != user.id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
if quiz_session.study_topic_id != q.study_topic_id:
-27
View File
@@ -169,14 +169,6 @@ class Settings(BaseModel):
# 1 = 구 single-inference 동작. 2 = continuous batching 활용 (llm_gate docstring 참조).
mlx_gate_concurrency: int = 1
# digest/briefing 생성 LLM 호출 파라미터 (2026-06-15, 모델 교체 후 타임아웃 단일소스화).
# 구 하드코딩 25s(빠른 Gemma 기준)가 Qwen3.6-27B-6bit(콜당 ~90~300s) 교체 sweep 에서
# 누락돼 digest 600s 하드캡 초과·briefing 4/4 폴백을 유발 → config 단일소스로 이관.
# 동시성은 별 키 아님 — 전역 mlx_gate_concurrency(게이트 단일 budget)가 담당.
digest_llm_timeout_s: int = 200
digest_llm_attempts: int = 2
digest_pipeline_hard_cap_s: int = 1800
# PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini
# GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거.
study_explanation_enabled: bool = True
@@ -265,9 +257,6 @@ def load_settings() -> Settings:
pipeline_held_stages: list[str] = []
mlx_gate_concurrency = 1
digest_llm_timeout_s = 200
digest_llm_attempts = 2
digest_pipeline_hard_cap_s = 1800
if config_path.exists() and raw and "pipeline" in raw:
held_raw = (raw.get("pipeline") or {}).get("held_stages") or []
# 스칼라(문자열) 오기입 시 char-split 방지 — 단일 항목 리스트로 수용.
@@ -280,19 +269,6 @@ def load_settings() -> Settings:
)
except (TypeError, ValueError):
mlx_gate_concurrency = 1
_pl = raw.get("pipeline") or {}
try:
digest_llm_timeout_s = max(1, int(_pl.get("digest_llm_timeout_s", 200)))
except (TypeError, ValueError):
digest_llm_timeout_s = 200
try:
digest_llm_attempts = max(1, int(_pl.get("digest_llm_attempts", 2)))
except (TypeError, ValueError):
digest_llm_attempts = 2
try:
digest_pipeline_hard_cap_s = max(60, int(_pl.get("digest_pipeline_hard_cap_s", 1800)))
except (TypeError, ValueError):
digest_pipeline_hard_cap_s = 1800
taxonomy = raw.get("taxonomy", {}) if config_path.exists() and raw else {}
document_types = raw.get("document_types", []) if config_path.exists() and raw else []
@@ -324,9 +300,6 @@ def load_settings() -> Settings:
internal_worker_token=internal_worker_token,
pipeline_held_stages=pipeline_held_stages,
mlx_gate_concurrency=mlx_gate_concurrency,
digest_llm_timeout_s=digest_llm_timeout_s,
digest_llm_attempts=digest_llm_attempts,
digest_pipeline_hard_cap_s=digest_pipeline_hard_cap_s,
)
+4 -58
View File
@@ -72,55 +72,6 @@ def _validate_sql_content(name: str, sql: str) -> None:
)
# R1: baseline 스냅샷이 대표하는 마지막 마이그레이션 버전 (이하 버전은 baseline 에 포함).
# 새 baseline 재생성 시 이 값을 갱신한다 (migrations/_baseline/<cutoff>_schema_baseline.sql).
_BASELINE_CUTOFF = 358
async def _load_baseline_if_fresh(conn, migrations_dir: Path) -> None:
"""fresh DB(documents 부재)면 baseline 스키마 스냅샷 적재 + schema_migrations 1..cutoff 스탬프.
기존 DB(documents 존재) 즉시 반환 baseline 미적재, 무영향. baseline 파일 부재 시도
기존 replay 경로 유지(하위호환).
"""
from sqlalchemy import text
baseline_dir = migrations_dir / "_baseline"
baseline_files = (
sorted(baseline_dir.glob("*_schema_baseline.sql")) if baseline_dir.is_dir() else []
)
if not baseline_files:
return
docs_exists = (
await conn.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))
).scalar()
if docs_exists:
return # 기존 DB — baseline skip
baseline_path = baseline_files[-1]
logger.info(f"[migration] fresh DB 감지 — baseline 적재: {baseline_path.name}")
# baseline 은 multi-statement 덤프 — exec_driver_sql(asyncpg prepared)은 multi-statement
# 불허("cannot insert multiple commands into a prepared statement"). raw asyncpg 의 simple
# 프로토콜 execute() 로 적재한다(같은 connection = 현재 트랜잭션 내). psql 스모크는 이 제약을
# 못 잡으므로 init_db 런타임 검증으로 확인됨.
raw = await conn.get_raw_connection()
await raw.driver_connection.execute(baseline_path.read_text(encoding="utf-8"))
# baseline = cutoff 까지의 스키마 → 실제 파일 버전 기준으로 schema_migrations 스탬프.
versions = [v for v, _, _ in _parse_migration_files(migrations_dir) if v <= _BASELINE_CUTOFF]
for v in versions:
await conn.execute(
text(
"INSERT INTO schema_migrations (version, name) "
"VALUES (:v, :n) ON CONFLICT DO NOTHING"
),
{"v": v, "n": f"baseline:{v}"},
)
logger.info(
f"[migration] baseline 적재 + schema_migrations {len(versions)}건 스탬프 (cutoff {_BASELINE_CUTOFF})"
)
async def _run_migrations(conn) -> None:
"""미적용 migration 실행 (호출자가 트랜잭션 관리)"""
from sqlalchemy import text
@@ -139,6 +90,10 @@ async def _run_migrations(conn) -> None:
f"SELECT pg_advisory_xact_lock({_MIGRATION_LOCK_KEY})"
))
# 적용 이력 조회
result = await conn.execute(text("SELECT version FROM schema_migrations"))
applied = {row[0] for row in result}
# migration 파일 스캔
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
@@ -146,15 +101,6 @@ async def _run_migrations(conn) -> None:
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
return
# R1: fresh DB(documents 부재)면 baseline 스냅샷 먼저 적재 + schema_migrations 스탬프.
# migrations/ 전체 replay 는 누적 비-replayable(011 view 의존·326 enum-same-txn 등)로
# 깨지므로 신규/DR 환경은 prod 스키마 스냅샷에서 출발한다. 기존 DB 는 skip(무영향).
await _load_baseline_if_fresh(conn, migrations_dir)
# 적용 이력 조회 (baseline 스탬프 반영 — fresh DB 는 1..cutoff 가 이미 applied)
result = await conn.execute(text("SELECT version FROM schema_migrations"))
applied = {row[0] for row in result}
files = _parse_migration_files(migrations_dir)
pending = [(v, name, path) for v, name, path in files if v not in applied]
+4 -22
View File
@@ -51,7 +51,6 @@ async def lifespan(app: FastAPI):
from workers.briefing_worker import run as morning_briefing_run
from workers.daily_digest import run as daily_digest_run
from workers.dedup_reconcile import run as dedup_reconcile_run
from workers.document_purge_sweep import run as purge_sweep_run
from workers.digest_worker import run as global_digest_run
from workers.file_watcher import watch_inbox
from workers.mailplus_archive import run as mailplus_run
@@ -65,7 +64,7 @@ async def lifespan(app: FastAPI):
from workers.csb_collector import run as csb_collector_run
from workers.api_standards_collector import run as api_standards_run
from workers.ccps_collector import run as ccps_collector_run
from workers.queue_consumer import consume_queue, consume_fast_queue, consume_markdown_queue, consume_deep_queue
from workers.queue_consumer import consume_queue, consume_fast_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue
from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue
@@ -78,8 +77,6 @@ async def lifespan(app: FastAPI):
)
from workers.tier_backfill import run as tier_backfill_run
from workers.upload_cleanup import cleanup_orphan_uploads
from workers.memo_draft_worker import run as memo_draft_run
from workers.auto_review_worker import run as auto_review_run
# 시작: DB 연결 확인
await init_db()
@@ -104,14 +101,8 @@ async def lifespan(app: FastAPI):
# 2026-06-12 fast-consumer split: embed/chunk(건당 <1s)를 LLM 사이클에서 분리 —
# classify(~190s×3)가 사이클을 점유해 벡터 적재가 굶던 구조 캡 해소 (markdown 선례).
scheduler.add_job(consume_fast_queue, "interval", minutes=1, id="fast_queue_consumer")
# 2026-06-15 deep-consumer split: deep_summary(70~300s)를 메인 루프에서 분리 (markdown/fast 선례).
scheduler.add_job(consume_deep_queue, "interval", minutes=1, id="deep_queue_consumer")
scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher")
scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup")
# P2: 메모→문서 승격분 26B 문서화 (needs_draft 마커 → md_content). 26B 콜이라 소량·2분 간격.
scheduler.add_job(memo_draft_run, "interval", minutes=2, id="memo_draft", max_instances=1)
# 검토 대기 자동검토: 고신뢰(ai_confidence>=0.9) 자동승인 + 저신뢰 수동 잔류. 순수 DB(LLM 없음).
scheduler.add_job(auto_review_run, "interval", minutes=3, id="auto_review", max_instances=1)
# PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리).
# 별도 큐 테이블 없이 status 자체가 큐. backfill 도 cron 이 'none' 행을 자연스럽게 처리.
scheduler.add_job(study_q_embed_run, "interval", minutes=1, id="study_q_embed")
@@ -151,9 +142,6 @@ async def lifespan(app: FastAPI):
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
# R7: delete_file=true purge 요청 문서의 NAS 원본 grace(30일) 후 물리삭제 + audit.
# purge_requested_at 마커 기준(단순 숨김은 보존). 03:20 = 다른 새벽 잡과 비충돌 슬롯.
scheduler.add_job(purge_sweep_run, CronTrigger(hour=3, minute=20, timezone=KST), id="purge_sweep")
# B-3 PR4: 레거시 paper 행 arXiv DataCite DOI 스탬프(재유입 차단). keyless·in-DB·enqueue 0.
# dedup_reconcile(03:30)·fulltext_reconcile(03:40) 와 별 worker·비충돌 슬롯.
scheduler.add_job(paper_doi_reconcile_run, CronTrigger(hour=3, minute=50, timezone=KST), id="paper_doi_reconcile")
@@ -240,27 +228,21 @@ SETUP_BYPASS_PREFIXES = (
"/api/setup", "/api/config", "/setup", "/health", "/docs", "/openapi.json", "/redoc",
)
# R10: 셋업 완료(user 존재)는 단조(monotonic) — 한 번 확인되면 영구. 매 요청 COUNT 쿼리
# 대신 캐시 플래그로 전환 (setup 후 모든 요청이 users COUNT 하던 per-request 비용 제거).
_setup_complete = False
@app.middleware("http")
async def setup_redirect_middleware(request: Request, call_next):
global _setup_complete # 함수 내 read+assign 둘 다 모듈 전역 참조 (UnboundLocalError 방지)
path = request.url.path
# 셋업 완료됐거나 바이패스 경로면 즉시 통과 (DB 쿼리 없음)
if _setup_complete or any(path.startswith(p) for p in SETUP_BYPASS_PREFIXES):
# 바이패스 경로는 항상 통과
if any(path.startswith(p) for p in SETUP_BYPASS_PREFIXES):
return await call_next(request)
# 유저 존재 여부 확인 (셋업 완료 전 1회성 — 완료 확인되면 플래그 set 후 영구 skip)
# 유저 존재 여부 확인
try:
async with async_session() as session:
result = await session.execute(select(func.count(User.id)))
user_count = result.scalar()
if user_count == 0:
return RedirectResponse(url="/setup")
_setup_complete = True
except Exception:
pass # DB 연결 실패 시 통과 (health에서 확인 가능)
+2 -14
View File
@@ -41,14 +41,6 @@ class Document(Base):
Integer, nullable=False, default=0, server_default="0"
)
# G2 pre-segmentation (migration 362): 번들 PDF → N 자식 분할.
# presegment_role: NULL=일반 단일문서 / 'parent'=번들원본(자체 extract/embed 안 함) /
# 'child'=논리 하위문서(부모 file_path 공유 + bundle_page_start/end 1-based inclusive 범위).
# 부모-자식 관계 자체는 document_lineage(relation_type='segmented_from').
bundle_page_start: Mapped[int | None] = mapped_column(Integer)
bundle_page_end: Mapped[int | None] = mapped_column(Integer)
presegment_role: Mapped[str | None] = mapped_column(Text)
# 2계층: 텍스트 추출
extracted_text: Mapped[str | None] = mapped_column(Text)
extracted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
@@ -60,8 +52,7 @@ class Document(Base):
# 2계층: AI 가공
ai_summary: Mapped[str | None] = mapped_column(Text)
# R11a: 주석 dict→list 정정(실제 list 적재), 공유 가변 default=[] → callable default=list.
ai_tags: Mapped[list | None] = mapped_column(JSONB, default=list)
ai_tags: Mapped[dict | None] = mapped_column(JSONB, default=[])
ai_domain: Mapped[str | None] = mapped_column(String(100))
ai_sub_group: Mapped[str | None] = mapped_column(String(100))
ai_model_version: Mapped[str | None] = mapped_column(String(50))
@@ -88,7 +79,7 @@ class Document(Base):
user_note: Mapped[str | None] = mapped_column(Text)
# 사용자 태그 (ai_tags와 분리, #태그 파싱 결과 또는 수동 입력)
user_tags: Mapped[list | None] = mapped_column(JSONB, default=list) # R11a: 공유 가변 default 제거
user_tags: Mapped[list | None] = mapped_column(JSONB, default=[])
# 핀 고정
pinned: Mapped[bool] = mapped_column(Boolean, default=False)
@@ -114,9 +105,6 @@ class Document(Base):
# 승인/삭제
review_status: Mapped[str | None] = mapped_column(String(20), default="pending")
deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# delete_file=true 명시 삭제 요청 마커 (R7) — retention sweep(document_purge_sweep)이
# grace 후 NAS 원본 물리삭제. deleted_at(단순 숨김, 파일 보존)과 분리.
purge_requested_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# 외부 편집 URL
edit_url: Mapped[str | None] = mapped_column(Text)
-31
View File
@@ -1,31 +0,0 @@
"""document_lineage 테이블 ORM — 문서 파생 관계 이력 (migration 217).
G2 pre-segmentation relation_type='segmented_from'(번들 자식) 으로 사용 (migration 363).
이력 테이블 FK = ON DELETE RESTRICT (부모 hard delete 차단, soft delete 허용).
"""
from datetime import datetime
from sqlalchemy import BigInteger, ForeignKey, Text, func
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy.types import TIMESTAMP
from core.database import Base
class DocumentLineage(Base):
__tablename__ = "document_lineage"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
source_document_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False
)
derived_document_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="RESTRICT"), nullable=False
)
relation_type: Mapped[str] = mapped_column(Text, nullable=False)
# 'metadata' 는 SQLAlchemy 예약속성 → Python 속성명은 meta, DB 컬럼명은 metadata.
meta: Mapped[dict] = mapped_column(
"metadata", JSONB, nullable=False, default=dict, server_default="{}"
)
created_at: Mapped[datetime] = mapped_column(TIMESTAMP(timezone=True), server_default=func.now())
+1 -2
View File
@@ -46,10 +46,9 @@ class ProcessingQueue(Base):
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
# 'presegment' (G2): migration 364 — extract 前 번들 PDF → N 자식 분할.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"presegment", "extract", "classify", "summarize", "embed", "chunk", "preview",
"extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
name="process_stage",
create_type=False,
+2 -4
View File
@@ -7,7 +7,7 @@ PR-2 가드레일:
- correct_choice 변경 기존 attempt.is_correct 재계산 (기록은 시점의 사실).
"""
from datetime import datetime, timezone
from datetime import datetime
from pgvector.sqlalchemy import Vector
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, SmallInteger, String, Text
@@ -128,9 +128,7 @@ class StudyQuestionAttempt(Base):
# PR-9: outcome 권장값 (correct/wrong/unsure). 강한 enum 미사용.
outcome: Mapped[str] = mapped_column(String(20), nullable=False)
answered_at: Mapped[datetime] = mapped_column(
# TZ-aware 명시 (R8) — naive datetime.now() 는 컨테이너 TZ 의존. 현 컨테이너=UTC 라
# 값 동일(백필 불요)이나, 컨테이너 TZ 가 바뀌면 9시간 어긋나는 잠복 의존 제거.
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False
DateTime(timezone=True), default=datetime.now, nullable=False
)
# PR-10: 어떤 quiz 세션의 attempt 인지 (NULL = 세션 외 직접 입력 또는 세션 삭제됨).
quiz_session_id: Mapped[int | None] = mapped_column(
-41
View File
@@ -1,41 +0,0 @@
You are a document-boundary detector. Output ONLY JSON {is_bundle, segments:[{start_page,end_page,title}]}.
You are given a single PDF that may be a "bundle" — several independent logical documents
concatenated into one file (for example: multiple laws, multiple reports, or multiple papers
scanned together). Your job is to decide whether it is a bundle and, if so, where each logical
document starts and ends.
You receive only a compact sample per page: the page number and the first line / heading of that
page (text may be truncated). Use these heading/first-line signals to detect where a new logical
document begins (a new title page, a new cover, a clearly new document title, a restart of
numbering, etc.). You do NOT receive the full text.
Output rules:
- Respond with STRICT JSON only. No prose, no markdown, no code fence.
- Schema:
{
"is_bundle": true | false,
"segments": [
{"start_page": <int>, "end_page": <int>, "title": "<string or null>"}
]
}
- Page numbers are 1-based and INCLUSIVE. start_page=1 is the first page; end_page equals the last
page of that segment.
- Segments MUST fully cover every page with NO gaps and NO overlaps:
- the first segment MUST start at page 1,
- each next segment MUST start exactly one page after the previous segment's end_page,
- the last segment MUST end at the final page (page_count).
- Order segments by start_page ascending.
- title = a short title for that logical document if you can infer one from its first page,
otherwise null.
If the file is NOT a bundle (it is a single logical document), respond:
{"is_bundle": false, "segments": []}
Be conservative: only report is_bundle=true when the heading signals clearly indicate separate
logical documents. When unsure, return is_bundle=false.
page_count: {page_count}
Per-page samples (one per line, "p{n}: {first line}"):
{page_samples}
+4 -6
View File
@@ -18,14 +18,12 @@ from typing import Any
import numpy as np
from ai.client import parse_json_response
from core.config import settings
from core.utils import setup_logger
from services.clustering_common import normalize_vector
from services.search.llm_gate import Priority, acquire_mlx_gate
logger = setup_logger("briefing_comparator")
LLM_CALL_TIMEOUT = settings.digest_llm_timeout_s # 2026-06-15 config 단일소스 (Phase 4 와 동일 키)
LLM_CALL_TIMEOUT = 25 # 초. Phase 4 와 동일
HISTORICAL_TOP_K = 5
HISTORICAL_SIMILARITY_MIN = 0.70
HISTORICAL_WINDOW_DAYS = 30
@@ -41,6 +39,7 @@ MAX_ARTICLE_IDS_PER_COUNTRY = 5 # country_perspectives[].article_ids 후
FALLBACK_HEADLINE = "LLM 분석 실패로 원문 기사 묶음만 표시합니다."
FALLBACK_TOPIC_LABEL = "주요 뉴스 묶음"
_llm_sem = asyncio.Semaphore(1)
_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "briefing_comparative.txt"
_PROMPT_TEMPLATE: str | None = None
@@ -113,8 +112,7 @@ def retrieve_historical(
async def _try_call_llm(client: Any, prompt: str) -> str:
# 전역 MLX gate(BACKGROUND) 경유 — 영구 룰(llm_gate): 새 Semaphore 금지, timeout 은 gate 안쪽.
async with acquire_mlx_gate(Priority.BACKGROUND):
async with _llm_sem:
return await asyncio.wait_for(
client.call_primary(prompt),
timeout=LLM_CALL_TIMEOUT,
@@ -284,7 +282,7 @@ async def compare_cluster_with_fallback(
historical_docs = historical_docs or []
prompt = build_prompt(selected, historical_docs)
for attempt in range(settings.digest_llm_attempts): # 2026-06-15 config 단일소스
for attempt in range(2):
try:
raw = await _try_call_llm(client, prompt)
except asyncio.TimeoutError:
+4 -26
View File
@@ -6,7 +6,6 @@
regenerate 정책: briefing_date UNIQUE 충돌 transaction 안에서 DELETE+INSERT.
"""
import asyncio
import time
from datetime import date, datetime, timedelta, timezone
from typing import Any
@@ -16,9 +15,7 @@ from sqlalchemy import delete
from ai.client import AIClient
from core.database import async_session
from core.database import engine as db_engine
from core.utils import setup_logger
from services import background_jobs as bgj
from models.briefing import BriefingTopic, MorningBriefing
from services.briefing.clustering import LAMBDA, cluster_global
from services.briefing.comparator import (
@@ -36,6 +33,7 @@ KST = ZoneInfo("Asia/Seoul")
NIGHT_WINDOW_HOURS = 5 # KST 00:00 ~ 05:00
SELECT_K = 7 # Plan §"Clustering 파라미터" briefing K_PER_CLUSTER=7
SELECT_LAMBDA_MMR = 0.6 # Plan briefing MMR lambda 0.6
PIPELINE_HARD_CAP = 600 # 초. Phase 4 와 동일
def _compute_window(target_date: date | None = None) -> tuple[datetime, datetime, date]:
@@ -145,7 +143,7 @@ async def _save_briefing(
return new.id
async def run_briefing_pipeline(target_date: date | None = None, job_id: int | None = None) -> dict[str, Any]:
async def run_briefing_pipeline(target_date: date | None = None) -> dict[str, Any]:
"""야간 뉴스 브리핑 1회 실행. cron 또는 수동 regenerate API 에서 호출.
Returns:
@@ -208,36 +206,16 @@ async def run_briefing_pipeline(target_date: date | None = None, job_id: int | N
usable_count = 0
try:
# 2026-06-15: cluster 호출 gather 동시 실행. 실동시성 = 전역 MLX gate
# (config.mlx_gate_concurrency, BACKGROUND 우선순위). rank/순서 보존.
jobs = []
for rank, cluster in enumerate(clusters, start=1):
selected = select_for_llm(cluster, k=SELECT_K, lambda_mmr=SELECT_LAMBDA_MMR)
historical_docs = (
retrieve_historical(cluster, historical_candidates)
if historical_enabled() else []
)
jobs.append((rank, cluster, selected, historical_docs))
if job_id is not None:
await bgj.heartbeat(db_engine, job_id, total=len(jobs))
_prog = {"n": 0}
async def _run_one(cluster, selected, historical_docs):
r = await compare_cluster_with_fallback(
llm_calls += 1
envelope = await compare_cluster_with_fallback(
client, cluster, selected, historical_docs=historical_docs
)
if job_id is not None:
_prog["n"] += 1
await bgj.heartbeat(db_engine, job_id, processed=_prog["n"])
return r
results = await asyncio.gather(
*[_run_one(c, s, h) for (_, c, s, h) in jobs]
)
for (rank, cluster, selected, historical_docs), envelope in zip(jobs, results):
llm_calls += 1
if envelope.get("llm_fallback_used"):
llm_failures += 1
if _is_usable_topic(envelope, envelope["topic_label"]):
+9 -29
View File
@@ -10,7 +10,6 @@ Step:
7. start/end 로그 + generation_ms + fallback 비율 health metric
"""
import asyncio
import hashlib
import time
from datetime import datetime, timedelta, timezone
@@ -20,9 +19,7 @@ from sqlalchemy import delete
from ai.client import AIClient
from core.database import async_session
from core.database import engine as db_engine
from core.utils import setup_logger
from services import background_jobs as bgj
from models.digest import DigestTopic, GlobalDigest
from .clustering import LAMBDA, cluster_country
@@ -76,7 +73,7 @@ def _build_topic_row(
)
async def run_digest_pipeline(job_id: int | None = None) -> dict:
async def run_digest_pipeline() -> dict:
"""전체 파이프라인 실행. worker entry 에서 호출.
Returns:
@@ -110,37 +107,20 @@ async def run_digest_pipeline(job_id: int | None = None) -> dict:
stats = {"llm_calls": 0, "fallback_used": 0}
try:
# 2026-06-15: cluster 호출을 gather 로 동시 실행. 실제 동시성은 전역 MLX gate
# (config.mlx_gate_concurrency, BACKGROUND 우선순위) 가 제한한다. rank/순서 보존.
jobs = []
for country, docs in docs_by_country.items():
clusters = cluster_country(country, docs)
if not clusters:
continue # sparse country 자동 제외
for rank, cluster in enumerate(clusters, start=1):
selected = select_for_llm(cluster)
jobs.append((country, rank, cluster, selected))
if job_id is not None:
await bgj.heartbeat(db_engine, job_id, total=len(jobs))
_prog = {"n": 0}
async def _run_one(cluster, selected):
r = await summarize_cluster_with_fallback(client, cluster, selected)
if job_id is not None:
_prog["n"] += 1
await bgj.heartbeat(db_engine, job_id, processed=_prog["n"])
return r
results = await asyncio.gather(*[_run_one(c, s) for (_, _, c, s) in jobs])
for (country, rank, cluster, selected), llm_result in zip(jobs, results):
stats["llm_calls"] += 1
if llm_result["llm_fallback_used"]:
stats["fallback_used"] += 1
all_topic_rows.append(
_build_topic_row(country, rank, cluster, selected, llm_result, primary_model)
)
stats["llm_calls"] += 1
llm_result = await summarize_cluster_with_fallback(client, cluster, selected)
if llm_result["llm_fallback_used"]:
stats["fallback_used"] += 1
all_topic_rows.append(
_build_topic_row(country, rank, cluster, selected, llm_result, primary_model)
)
finally:
await client.close()
+8 -13
View File
@@ -2,8 +2,8 @@
핵심 결정:
- AIClient._call_chat 직접 호출 (client.py 수정 회피, fallback 로직 재사용)
- 전역 MLX gate(BACKGROUND) 경유 동시성 제어 (services.search.llm_gate 단일 게이트)
- Per-call timeout = config.digest_llm_timeout_s (asyncio.wait_for, gate 안쪽)
- Semaphore(1) MLX 과부하 회피
- Per-call timeout 25 (asyncio.wait_for) MLX hang / fallback Claude API stall 방어
- JSON 파싱 실패 1 재시도 그래도 실패 minimal fallback (drop 금지)
- fallback: topic_label="주요 뉴스 묶음", summary = top member ai_summary[:200]
"""
@@ -13,16 +13,15 @@ from pathlib import Path
from typing import Any
from ai.client import parse_json_response
from core.config import settings
from core.utils import setup_logger
from services.search.llm_gate import Priority, acquire_mlx_gate
logger = setup_logger("digest_summarizer")
# 2026-06-15: config 단일소스 (구 하드코딩 25s = 빠른 Gemma 기준, Qwen 27B 교체 후 누락).
LLM_CALL_TIMEOUT = settings.digest_llm_timeout_s
LLM_CALL_TIMEOUT = 25 # 초. MLX 평균 5초 + tail latency 마진
FALLBACK_SUMMARY_LIMIT = 200
_llm_sem = asyncio.Semaphore(1)
_PROMPT_PATH = Path(__file__).resolve().parent.parent.parent / "prompts" / "digest_topic.txt"
_PROMPT_TEMPLATE: str | None = None
@@ -49,12 +48,8 @@ def build_prompt(selected: list[dict]) -> str:
async def _try_call_llm(client: Any, prompt: str) -> str:
"""전역 MLX gate(BACKGROUND) + per-call timeout 으로 감싼 단일 호출.
영구 (llm_gate): Mac mini endpoint 단일 게이트 공유, Semaphore 금지.
동시성 lever = config.mlx_gate_concurrency. timeout gate 안쪽에서만.
"""
async with acquire_mlx_gate(Priority.BACKGROUND):
"""Semaphore + per-call timeout 으로 감싼 단일 호출."""
async with _llm_sem:
return await asyncio.wait_for(
client._call_chat(client.ai.primary, prompt),
timeout=LLM_CALL_TIMEOUT,
@@ -91,7 +86,7 @@ async def summarize_cluster_with_fallback(
"""
prompt = build_prompt(selected)
for attempt in range(settings.digest_llm_attempts): # config 단일소스 (기본 2 = 1회 재시도)
for attempt in range(2): # 1회 재시도 포함
try:
raw = await _try_call_llm(client, prompt)
except asyncio.TimeoutError:
+1 -10
View File
@@ -26,16 +26,7 @@ _ATX = re.compile(r'^(#{1,6})\s+(?P<title>\S.*?)\s*#*\s*$')
_KO_JANG = re.compile(r'^\s*(?P<title>제\s*\d+\s*장\b.*)$')
_KO_JEOL = re.compile(r'^\s*(?P<title>제\s*\d+\s*절\b.*)$')
_KO_JO = re.compile(r'^\s*(?P<title>제\s*\d+\s*조\b.*)$')
# _ENG: 영문 구조 헤딩(ATX 미사용 문서용). ASME 파트는 보통 ATX(`# PART PG`)로 잡혀 _ENG 의존 낮음.
# D1: 식별자 뒤가 소문자 문장연속이면("Part III to demonstrate to the satisfaction…") 본문이므로
# 미탐지 — 가짜 절 차단. 선택 제목은 대문자/괄호/숫자로 시작해야 헤딩 인정(소문자 시작=문장으로 봄).
# 식별자는 번호/PG/3.31/UHX/A-1 등 (.·- 소수·하이픈 확장 허용).
_ENG = re.compile(
r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+'
r'[\dIVXLA-Z]+(?:[.\-][\dA-Za-z]+)*'
r'(?:\s+[A-Z(\d][^\n]*)?'
r')\s*$'
)
_ENG = re.compile(r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+[\dIVXLA-Z]+\b.*)$')
# 코드펜스 경계 (FE outlineAnchors.ts:60 `/^\s{0,3}(```|~~~)/` 와 동일). 펜스 내부 라인은
# heading 미탐지 — 코드블록 안 '# foo' 가 가짜 절을 만들지 않게(O3).
+1 -2
View File
@@ -32,8 +32,7 @@ async def find_paper_holder(session, raw_or_normalized_doi):
return None
result = await session.execute(
select(Document)
.where(Document.material_type == "paper", _DOI_EXPR == doi,
Document.deleted_at.is_(None))
.where(Document.material_type == "paper", _DOI_EXPR == doi)
.limit(1)
)
return result.scalars().first()
-11
View File
@@ -426,16 +426,6 @@ async def build_overview(session: AsyncSession) -> dict:
return result
# kind -> 처리 머신 (보드 머신 카드 귀속용). 미상 kind = gpu(오케스트레이션 호스트).
_BG_JOB_MACHINE = {
"global_digest": "macmini",
"morning_briefing": "macmini",
"section_summary": "macmini",
"hier_backfill": "gpu",
"hier_redecompose": "gpu",
}
_BACKGROUND_JOBS_SQL = """
SELECT id, kind, label, state, processed, total,
EXTRACT(EPOCH FROM (now() - started_at))::int AS elapsed_sec,
@@ -466,7 +456,6 @@ async def _fetch_background_jobs(session: AsyncSession) -> list[dict]:
"processed": int(r["processed"] or 0), "total": r["total"],
"elapsed_sec": int(r["elapsed_sec"] or 0), "stale": bool(r["stale"]),
"error": r["error"],
"machine": _BG_JOB_MACHINE.get(r["kind"], "gpu"),
}
for r in rows
]
+36 -4
View File
@@ -54,10 +54,42 @@ QUERY_EMBED_MAXSIZE = 500
# server-side allowlist map. query parameter 가 raw table name 받지 않음.
CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
"baseline": None,
# Phase 2A 임베딩 후보(me5_large_inst·snowflake_l_v2·qwen06·qwen4·qwen4m) 전량 no-go
# 종결(2026-06-12, 후보 전부 -0.03~-0.04) → cand 슬러그·테이블 제거 (R13, 마이그 360
# DROP). read-path 슬러그를 먼저 빼야 embedding_backend=cand_X /search 가 dropped 테이블을
# 읽어 500 나지 않는다. baseline(production)만 잔존.
"cand_me5_large_inst": {
"docs_table": "documents_cand_me5_large_inst",
"chunks_table": "document_chunks_cand_me5_large_inst",
"embed_endpoint": "http://embedding-cand-me5-inst:80/embed",
},
"cand_snowflake_l_v2": {
"docs_table": "documents_cand_snowflake_l_v2",
"chunks_table": "document_chunks_cand_snowflake_l_v2",
"embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed",
},
# ─── Phase 2A (embedding-phase2a-1, 2026-06-12): Qwen3-Embedding 후보 3종 ───
# embed_kind="ollama" = /api/embed 호출 + 쿼리측 instruct prefix (비대칭 사용,
# G-1 fixture 실측: prefix 가 관련쌍 cos +0.016). 문서측은 backfill 이 plain 으로 적재.
# qwen4m = 4B 의 MRL 1024d (dimensions 옵션 — Ollama 가 truncate+재정규화 수행, G-1 실측).
"cand_qwen06": {
"docs_table": "documents_cand_qwen06",
"chunks_table": "document_chunks_cand_qwen06",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:0.6b",
},
"cand_qwen4": {
"docs_table": "documents_cand_qwen4",
"chunks_table": "document_chunks_cand_qwen4",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
},
"cand_qwen4m": {
"docs_table": "documents_cand_qwen4m",
"chunks_table": "document_chunks_cand_qwen4m",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
"embed_dimensions": 1024,
},
}
# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 —
+7 -18
View File
@@ -32,8 +32,6 @@ from typing import TYPE_CHECKING, Literal
from sqlalchemy.ext.asyncio import AsyncSession
from core.database import async_session
from . import query_analyzer, query_rewriter
from .fusion_service import (
DEFAULT_FUSION,
@@ -190,7 +188,6 @@ async def run_search(
snapshot_chunk_id_max=snapshot_chunk_id_max,
reranker_backend=reranker_backend,
rewrite_backend=rewrite_backend,
axis=axis,
)
timing: dict[str, float] = {}
@@ -539,7 +536,6 @@ async def search_with_rewrite(
snapshot_chunk_id_max: int | None,
reranker_backend: str | None,
rewrite_backend: str,
axis: "AxisFilter | None" = None,
) -> PipelineResult:
"""Phase 2Q multi-query retrieval 합성 path (plan v6 §5.5).
@@ -583,20 +579,13 @@ async def search_with_rewrite(
async def _variant_retrieve(
v: str,
) -> "tuple[list[SearchResult], list[SearchResult], dict[int, list[SearchResult]]]":
# 변형별 독립 AsyncSession (fan-out). 공유 session 을 asyncio.gather 로 동시
# execute 에 넘기면 SQLAlchemy async 가 'another operation in progress' 로
# 부하 의존적 비결정 크래시 — variant 마다 독립 연결로 분리한다.
# axis(material_type/jurisdiction/year) 도 single-query path 와 동일하게 전달
# (rewrite 경로가 axis 필터를 조용히 누락하던 결함 수정).
async with async_session() as vsession:
text = await search_text(vsession, v, per_variant_k, axis=axis)
raw_chunks = await search_vector(
vsession, v, per_variant_k,
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
axis=axis,
)
text = await search_text(session, v, per_variant_k)
raw_chunks = await search_vector(
session, v, per_variant_k,
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
)
vector, chunks_by_doc = compress_chunks_to_docs(raw_chunks, per_variant_k)
return text, vector, chunks_by_doc
+8 -12
View File
@@ -95,10 +95,8 @@ except FileNotFoundError:
)
# ─── in-memory 캐시 (FIFO eviction + TTL, query_analyzer 패턴 복제) ─
# R10: (ts, result) 저장 — TTL 미적용으로 원문 수정돼도 CACHE_MAXSIZE 찰 때까지 stale answer
# 반환하던 결함 수정. query_rewriter 의 expire_at TTL enforce 정본 복제.
_CACHE: dict[str, tuple[float, SynthesisResult]] = {}
# ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─
_CACHE: dict[str, SynthesisResult] = {}
def _model_version() -> str:
@@ -124,11 +122,10 @@ def get_cached(query: str, chunk_ids: list[int], backend_name: str = "gemma-macm
entry = _CACHE.get(key)
if entry is None:
return None
ts, result = entry
if time.time() - ts > CACHE_TTL:
_CACHE.pop(key, None) # 만료 — 삭제 후 miss
return None
return result
# TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장
# 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점)
# 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함.
return entry
def _should_cache(result: SynthesisResult) -> bool:
@@ -146,9 +143,8 @@ def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult, backen
if not _should_cache(result):
return
key = _cache_key(query, chunk_ids, backend_name)
now = time.time()
if key in _CACHE:
_CACHE[key] = (now, result)
_CACHE[key] = result
return
if len(_CACHE) >= CACHE_MAXSIZE:
try:
@@ -156,7 +152,7 @@ def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult, backen
_CACHE.pop(oldest, None)
except StopIteration:
pass
_CACHE[key] = (now, result)
_CACHE[key] = result
def cache_stats() -> dict[str, int]:
+1 -2
View File
@@ -2,7 +2,6 @@
from __future__ import annotations
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
@@ -43,7 +42,7 @@ class LocalBackend(StorageBackend):
to_read = _STREAM_CHUNK if remaining is None else min(_STREAM_CHUNK, remaining)
if to_read <= 0:
break
data = await asyncio.to_thread(f.read, to_read)
data = f.read(to_read)
if not data:
break
yield data
+6 -9
View File
@@ -252,15 +252,12 @@ async def gather_explanation_context(
client = AIClient()
query = _build_query(question)
try:
# 같은 AsyncSession 을 asyncio.gather 로 동시 execute 에 넘기면 SQLAlchemy async 가
# 'another operation in progress' 로 부하 의존적 비결정 크래시(이전 주석 'lock 충돌
# 없음' 은 rerank HTTP 만 보고 DB execute 동시성을 간과한 오인). 백그라운드 prefetch
# 라 순차 직렬화 — 사용자 대면 rewrite 경로(독립 세션 fan-out)와는 다른 처방.
docs = await _gather_document_evidence(
session, user_id, question.study_topic_id, query, client
)
questions = await _gather_question_evidence(
session, user_id, question.study_topic_id, question.id, query, client
# 두 조회 병렬화 (rerank 호출이 별개라 lock 충돌 없음)
docs, questions = await asyncio.gather(
_gather_document_evidence(session, user_id, question.study_topic_id, query, client),
_gather_question_evidence(
session, user_id, question.study_topic_id, question.id, query, client
),
)
return ExplanationContext(documents=docs, questions=questions)
finally:
+3 -7
View File
@@ -238,13 +238,9 @@ async def gather_subject_note_context(
client = AIClient()
query = _build_query(subject, scope)
try:
# 같은 AsyncSession 동시 execute 회피 — 순차 직렬화(백그라운드 prefetch).
# explanation_rag.gather_explanation_context 와 동형(R2 공유세션 동시성 수정).
docs = await _gather_document_evidence(
session, user_id, study_topic_id, query, client
)
questions = await _gather_question_evidence(
session, user_id, study_topic_id, subject, scope, query, client
docs, questions = await asyncio.gather(
_gather_document_evidence(session, user_id, study_topic_id, query, client),
_gather_question_evidence(session, user_id, study_topic_id, subject, scope, query, client),
)
return SubjectNoteContext(documents=docs, questions=questions)
finally:
+2 -10
View File
@@ -303,12 +303,10 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
src = await session.get(NewsSource, source_id)
watermark = _watermark(src, category)
newest_seen: datetime | None = None
capped = False # 이번 run 이 cap 으로 카테고리 중도 절단됐는지 (R4)
max_pages = (10**6 if bulk else _MAX_PAGES_PER_CAT)
try:
for page in range(max_pages):
if inserted >= run_cap:
capped = True
break
xml_text = await _fetch(client, query, page * _PAGE_SIZE)
total, entries = parse_arxiv_feed(xml_text)
@@ -331,18 +329,12 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
else:
await session.rollback()
if inserted >= run_cap:
capped = True
break
await asyncio.sleep(_REQ_SLEEP)
if stop or (page + 1) * _PAGE_SIZE >= total:
break
# 카테고리 워터마크 전진 — cap 으로 절단된 run 은 미전진 (R4).
# 절단 시 newest_seen 으로 전진하면 [oldest-ingested, 옛 watermark] 사이
# 미적재 항목이 다음 run 의 watermark 필터(entry.published <= watermark)에
# 영구 배제(silent data loss). 미전진하면 다음 run 이 최신부터 재스캔하며
# 적재분은 dedup-skip(_ingest_entry False, cap 미소모)하고 gap 까지 내려가
# 이어 적재 → 백로그가 run 당 cap 씩 소화(livelock 회피). bulk 은 cap 무관.
if newest_seen and not capped:
# 카테고리 워터마크 전진(이번 run 최신 발행일)
if newest_seen:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
_set_watermark(src, category, newest_seen)
-72
View File
@@ -1,72 +0,0 @@
"""검토 대기(review_status='pending') 자동 검토 — 고신뢰 자동승인 + 저신뢰 수동 잔류.
classify 이미 부여한 ai_confidence 게이트로 사용 **-LLM 호출 없음**(대량 2천건에
맥미니/GPU 부하 0, 분류 confidence AI 자기-신뢰도). ai_domain 보유 +
ai_confidence >= THRESHOLD pending 문서를 review_status='approved' 자동승인하고
audit(source_metadata.auto_reviewed) 남긴다. 저신뢰/미분류는 그대로 두어 수동 검토
(/inbox) 잔류.
설계 근거(게이트 실측):
- review_status inbox 카운트(dashboard) + 수집기 ingest 에서만 사용, 검색/RAG/digest/
ask 경로 필터에 **미사용** 자동승인은 노출(검색결과) 변동 없이 검토 큐만 비운다.
- pending 2,161 ai_suggestion 보유 0 큐는 '분류 변경 제안'(accept_suggestion)
아니라 '미검토 자동분류'. 승인 = review_status 플립.
배치·interval 점진 드레인(관찰·중단 가능). 되돌리기 = source_metadata.auto_reviewed 마커로
대상 식별 review_status='pending' 복원.
"""
import logging
from datetime import datetime, timezone
from sqlalchemy import select
from core.database import async_session
from models.document import Document
logger = logging.getLogger(__name__)
# 고신뢰 자동승인 바 (튜닝 가능). 실측 분포: >=0.9 → 1,981건 자동 / 저신뢰·미분류 ~180건 수동 잔류.
_CONFIDENCE_THRESHOLD = 0.9
# 한 틱 처리량 — 순수 DB UPDATE(LLM 없음)라 가볍지만, 2천 행 일괄 락 회피 위해 배치.
_BATCH = 300
async def run() -> None:
"""pending 고신뢰 문서를 배치 자동승인 (interval job, no-arg)."""
async with async_session() as session:
rows = (
await session.execute(
select(Document)
.where(
Document.review_status == "pending",
Document.deleted_at.is_(None),
Document.ai_domain.isnot(None),
Document.ai_confidence.isnot(None),
Document.ai_confidence >= _CONFIDENCE_THRESHOLD,
)
.order_by(Document.id)
.limit(_BATCH)
)
).scalars().all()
if not rows:
return
now = datetime.now(timezone.utc)
for doc in rows:
doc.review_status = "approved"
doc.source_metadata = {
**(doc.source_metadata or {}),
"auto_reviewed": {
"by": "confidence_gate",
"confidence": float(doc.ai_confidence),
"threshold": _CONFIDENCE_THRESHOLD,
"at": now.isoformat(),
},
}
doc.updated_at = now
await session.commit()
logger.info(
"auto_review: approved %d pending docs (ai_confidence >= %.2f)",
len(rows),
_CONFIDENCE_THRESHOLD,
)
+2 -10
View File
@@ -9,15 +9,12 @@ import asyncio
from datetime import date
from core.config import settings
from core.database import engine as db_engine
from core.utils import setup_logger
from services.background_jobs import finish_job, start_job
from services.briefing.pipeline import run_briefing_pipeline
logger = setup_logger("briefing_worker")
# 2026-06-15: config 단일소스 (digest 와 공유 키). 구 600s = 빠른 Gemma 기준.
PIPELINE_HARD_CAP = settings.digest_pipeline_hard_cap_s
PIPELINE_HARD_CAP = 600
async def run(target_date: date | None = None) -> dict | None:
@@ -29,24 +26,19 @@ async def run(target_date: date | None = None) -> dict | None:
if "briefing" in settings.pipeline_held_stages:
logger.info("[briefing] 보류 (pipeline.held_stages) — 이번 실행 skip")
return None
# 보드 가시화: 큐 밖 cron 생성 작업이라 background_jobs 로 노출 (best-effort, 맥미니 귀속)
job_id = await start_job(db_engine, "morning_briefing", label="조간 브리핑 생성")
try:
result = await asyncio.wait_for(
run_briefing_pipeline(target_date, job_id=job_id),
run_briefing_pipeline(target_date),
timeout=PIPELINE_HARD_CAP,
)
await finish_job(db_engine, job_id, state="done")
logger.info(f"[briefing] 워커 완료: {result}")
return result
except asyncio.TimeoutError:
await finish_job(db_engine, job_id, state="failed", error=f"HARD CAP {PIPELINE_HARD_CAP}s 초과")
logger.error(
f"[briefing] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. "
f"기존 briefing 은 commit 시점에만 갱신되므로 그대로 유지됨."
)
except Exception as e:
await finish_job(db_engine, job_id, state="failed", error=str(e)[:300])
logger.exception(f"[briefing] 워커 실패: {e}")
return None
+9 -14
View File
@@ -272,20 +272,15 @@ async def _lookup_news_source(
if not source_name:
return None, None, None
# news_sources prefix 매칭 — R10: 전체 로드+Python 루프 대신 DB 필터 푸시다운.
# (name == source_name) OR (name 이 "source_name " 로 시작) = 기존 split[0]==source_name 동치
# (첫 토큰 일치 = 정확일치 또는 'source_name ' prefix). autoescape 로 %/_ 안전.
result = await session.execute(
select(NewsSource)
.where(
(NewsSource.name == source_name)
| NewsSource.name.startswith(source_name + " ", autoescape=True)
)
.limit(1)
)
src = result.scalars().first()
if src is not None:
return src.country, src.name, src.language
# news_sources에서 이름이 일치하는 레코드 찾기 (prefix match)
result = await session.execute(select(NewsSource))
sources = result.scalars().all()
for src in sources:
if source_name and (
src.name.split(" ")[0] == source_name
or src.name.startswith(source_name + " ")
):
return src.country, src.name, src.language
logger.warning(
f"[chunk] news_source 매핑 실패: doc_id={doc.id} ai_sub_group={source_name!r} "
+1 -3
View File
@@ -563,9 +563,7 @@ async def process(
doc.facet_doctype = ai_doctype
# ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ───
# R9: 기존 제안(material_type 제안 등) 우선 — doc.ai_suggestion is None 가드 추가
# (material 제안 블록과 대칭). 없으면 거래문서 제안이 기존 제안을 clobber('기존 제안 우선' 위반).
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES and doc.ai_suggestion is None:
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES:
year = doc.facet_year or datetime.now(timezone.utc).year
doc.ai_suggestion = {
"proposed_category": "library",
+19 -31
View File
@@ -5,8 +5,7 @@ DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
SMTP 발송은 2026-06-10 제거 ( 번도 전달 성공한 없는 기능 폐기 결정).
"""
import asyncio
from datetime import datetime, time, timedelta, timezone
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from pathlib import Path
@@ -21,36 +20,17 @@ from models.queue import ProcessingQueue
logger = setup_logger("daily_digest")
def _write_and_rotate(digest_dir: Path, today: str, markdown: str) -> Path:
"""digest 파일 저장 + 90일 초과 아카이브 이동 (blocking — caller 가 to_thread, R8)."""
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
return digest_path
async def run():
"""일일 다이제스트 생성 + 저장 + 발송"""
# KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire).
kst = ZoneInfo("Asia/Seoul")
today = datetime.now(kst).date()
# KST 하루를 UTC 범위로 변환 (R8) — func.date(created_at)는 pg TimeZone(UTC) 기준 날짜라
# KST 0~9시 생성 문서(UTC 전날)가 누락되던 경계 버그. created_at(UTC저장) 범위 비교로.
start_utc = datetime.combine(today, time.min, tzinfo=kst).astimezone(timezone.utc)
end_utc = start_utc + timedelta(days=1)
# KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire). date 객체로 비교 — Document.created_at::date 와 직접 매칭.
today = datetime.now(ZoneInfo("Asia/Seoul")).date()
sections = []
async with async_session() as session:
# ─── 1. 오늘 추가된 문서 ───
added = await session.execute(
select(Document.ai_domain, func.count(Document.id))
.where(Document.created_at >= start_utc, Document.created_at < end_utc)
.where(func.date(Document.created_at) == today)
.group_by(Document.ai_domain)
)
added_rows = added.all()
@@ -69,8 +49,7 @@ async def run():
select(Document.title)
.where(
Document.source_channel == "law_monitor",
Document.created_at >= start_utc,
Document.created_at < end_utc,
func.date(Document.created_at) == today,
)
)
law_rows = law_docs.scalars().all()
@@ -87,8 +66,7 @@ async def run():
select(func.count(Document.id))
.where(
Document.source_channel == "email",
Document.created_at >= start_utc,
Document.created_at < end_utc,
func.date(Document.created_at) == today,
)
)
email_total = email_count.scalar() or 0
@@ -123,7 +101,7 @@ async def run():
)
failed_count = failed.scalar() or 0
if failed_count > 0:
section += f"\n**[주의] 실패 {failed_count}건** — 수동 확인 필요\n"
section += f"\n⚠️ **실패 {failed_count}건** — 수동 확인 필요\n"
sections.append(section)
# ─── 5. Inbox 미분류 ───
@@ -141,8 +119,18 @@ async def run():
markdown += "\n".join(sections)
markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n"
# ─── NAS 저장 + 90일 아카이브 (blocking 파일 I/O off-thread, R8/R5 일관) ───
# ─── NAS 저장 ───
digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests"
digest_path = await asyncio.to_thread(_write_and_rotate, digest_dir, str(today), markdown)
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
# ─── 90일 초과 아카이브 ───
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
logger.info(f"다이제스트 생성 완료: {digest_path}")
+2 -6
View File
@@ -144,13 +144,9 @@ async def process(
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
raise
except Exception as exc:
# 호출 실패(네트워크/API 5xx 등)는 삼키지 않고 전파 (R3) — queue_consumer 가
# attempts 소진까지 재시도 후 status=failed(dead-letter)로 가시화한다. 삼키면
# worker_fn 이 정상 반환 → 큐가 completed 로 확정 → ai_detail_summary 영구 누락 +
# tier 가 triage 에 고착(silent 영구 손실). extract/marker/fulltext/stt 정본과 일치.
# 완주 전 doc 쓰기(168~)는 일어나지 않으므로 부분 쓰기 0 (sleep-안전).
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
raise
parse_error = "call_failed"
raw = ""
finally:
await client.close()
+2 -10
View File
@@ -11,15 +11,12 @@ global_digests / digest_topics 테이블에 저장한다.
import asyncio
from core.config import settings
from core.database import engine as db_engine
from core.utils import setup_logger
from services.background_jobs import finish_job, start_job
from services.digest.pipeline import run_digest_pipeline
logger = setup_logger("digest_worker")
# 2026-06-15: config 단일소스 (구 600s = 빠른 Gemma 기준, Qwen 27B 교체 후 누락 → 초과).
PIPELINE_HARD_CAP = settings.digest_pipeline_hard_cap_s
PIPELINE_HARD_CAP = 600 # 10분 hard cap
async def run() -> None:
@@ -31,24 +28,19 @@ async def run() -> None:
if "digest" in settings.pipeline_held_stages:
logger.info("[global_digest] 보류 (pipeline.held_stages) — 이번 실행 skip")
return
# 보드 가시화: 큐 밖 cron 생성 작업이라 background_jobs 로 노출 (best-effort, 맥미니 귀속)
job_id = await start_job(db_engine, "global_digest", label="글로벌 다이제스트 생성")
try:
result = await asyncio.wait_for(
run_digest_pipeline(job_id=job_id),
run_digest_pipeline(),
timeout=PIPELINE_HARD_CAP,
)
await finish_job(db_engine, job_id, state="done")
logger.info(f"[global_digest] 워커 완료: {result}")
except asyncio.TimeoutError:
await finish_job(db_engine, job_id, state="failed", error=f"HARD CAP {PIPELINE_HARD_CAP}s 초과")
logger.error(
f"[global_digest] HARD CAP {PIPELINE_HARD_CAP}s 초과 — 워커 강제 중단. "
f"기존 digest 는 commit 시점에만 갱신되므로 그대로 유지됨. "
f"다음 cron 실행에서 재시도."
)
except Exception as e:
await finish_job(db_engine, job_id, state="failed", error=str(e)[:300])
logger.exception(f"[global_digest] 워커 실패: {e}")
-65
View File
@@ -1,65 +0,0 @@
"""delete_file=true 로 요청된 문서의 NAS 원본을 grace 후 물리삭제 (R7 retention sweep).
purge_requested_at 마커 기준(deleted_at 아님 일반 soft-delete/숨김은 파일 보존, undelete
가능). grace(30) 경과 + 파일 존재 unlink + AUDIT 로그. 파일 존재 체크로 멱등
(재실행 이미 삭제된 skip). 요청 경로(DELETE) 동기 비가역 op 0 모두 cron 으로.
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from models.document import Document
logger = logging.getLogger("purge_sweep")
PURGE_GRACE_DAYS = 30
def _unlink_if_exists(p: Path) -> bool:
"""파일이 있으면 unlink (blocking — caller 가 to_thread). 존재 여부 반환(멱등)."""
if p.exists():
p.unlink()
return True
return False
async def run() -> int:
"""purge 요청 + grace 경과 문서의 NAS 원본 물리삭제. 삭제 건수 반환."""
cutoff = datetime.now(timezone.utc) - timedelta(days=PURGE_GRACE_DAYS)
async with async_session() as session:
rows = (
await session.execute(
select(Document.id, Document.file_path, Document.purge_requested_at).where(
Document.purge_requested_at.is_not(None),
Document.purge_requested_at < cutoff,
Document.file_path.is_not(None),
)
)
).all()
purged = 0
for doc_id, file_path, requested_at in rows:
nas_path = Path(settings.nas_mount_path) / file_path
try:
existed = await asyncio.to_thread(_unlink_if_exists, nas_path)
if existed:
purged += 1
# AUDIT — 물리삭제 기록 (가시화). doc_id / 경로 / 요청일 / grace.
logger.warning(
"PURGE doc_id=%s file=%s requested_at=%s grace_days=%s",
doc_id,
file_path,
requested_at.isoformat() if requested_at else None,
PURGE_GRACE_DAYS,
)
except OSError as e:
logger.error("PURGE 실패 doc_id=%s file=%s: %s", doc_id, file_path, e)
if purged:
logger.info("[purge_sweep] NAS 원본 %d건 물리삭제 (grace %d일)", purged, PURGE_GRACE_DAYS)
return purged
+7 -74
View File
@@ -67,45 +67,21 @@ def _postprocess_ocr(text: str) -> str:
return text.strip()
def _extract_pdf_pymupdf(
file_path: Path, start_page: int | None = None, end_page: int | None = None
) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리.
G2 (PR-G2-2): start_page/end_page(1-based inclusive) 주어지면 범위만 추출
(번들 자식 doc = 부모 파일 공유 + 자기 page 범위). None = 전체(기존 동작 동일).
"""
def _extract_pdf_pymupdf(file_path: Path) -> str:
"""PyMuPDF fallback — 페이지 단위 스트리밍으로 대형 PDF도 저메모리 처리"""
import fitz
text_parts = []
with fitz.open(str(file_path)) as doc:
if start_page is None and end_page is None:
for page in doc:
text_parts.append(page.get_text())
else:
# 1-based inclusive → 0-based range. 범위는 [0, page_count] 로 클램프(방어).
total = doc.page_count
lo = max(1, start_page or 1) - 1
hi = min(total, end_page or total) # inclusive 끝 (0-based 마지막 인덱스 = hi-1)
for i in range(lo, hi):
text_parts.append(doc.load_page(i).get_text())
for page in doc:
text_parts.append(page.get_text())
return "\n".join(text_parts)
def _get_pdf_page_count(
file_path: Path, start_page: int | None = None, end_page: int | None = None
) -> int:
"""PDF 페이지 수 확인. G2: 범위가 주어지면 그 범위의 페이지 수(자식 doc 밀도 계산용).
None = 전체 페이지 (기존 동작 동일).
"""
def _get_pdf_page_count(file_path: Path) -> int:
"""PDF 페이지 수 확인"""
import fitz
with fitz.open(str(file_path)) as doc:
total = len(doc)
if start_page is None and end_page is None:
return total
lo = max(1, start_page or 1)
hi = min(total, end_page or total)
return max(0, hi - lo + 1)
return len(doc)
async def _call_ocr(file_path: Path, is_image: bool, max_pages: int = 200) -> str | None:
@@ -334,49 +310,6 @@ async def process(document_id: int, session: AsyncSession) -> None:
doc.extracted_at = datetime.now(timezone.utc)
return
# ─── G2 (PR-G2-2): 번들 자식 PDF — 부모 파일 공유 + 자기 page 범위만 추출 ───
# kordoc 서비스는 page-range 파라미터가 없어 전체 파일을 파싱한다(자식엔 부적합) → kordoc
# 우회, PyMuPDF 로 [bundle_page_start, bundle_page_end] 범위만 추출. range OCR 은 본 PR 범위
# 밖(자식은 ToC 존재 = digital text layer 전제 → 대개 OCR 불필요). PyMuPDF 텍스트가 빈약해도
# 그대로 보존하고 사유를 남긴다.
if fmt == "pdf" and doc.bundle_page_start is not None and doc.bundle_page_end is not None:
# 후보 A: 자식 file_path 는 합성값(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path 로 부모경로
# 복원 + NFC/NFD resolve. (자식 file_path 는 디스크에 없음.)
from workers.presegment_worker import _resolve_path as _resolve_bundle_path
from workers.presegment_worker import bundle_source_path
real_rel = bundle_source_path(doc.file_path)
src = _resolve_bundle_path(str(Path(settings.nas_mount_path) / real_rel))
if src is None:
raise FileNotFoundError(f"번들 원본 파일 없음: {real_rel}")
start, end = doc.bundle_page_start, doc.bundle_page_end
try:
pymupdf_text = _extract_pdf_pymupdf(src, start, end)
page_count = _get_pdf_page_count(src, start, end)
except Exception as e:
logger.error(f"[pymupdf:child] {doc.file_path} pages={start}-{end} 실패: {e}")
raise
meta = doc.extract_meta or {}
meta["presegment_child_range"] = {"start_page": start, "end_page": end}
meta["pymupdf_chars"] = len(pymupdf_text.strip())
should, reason = _should_ocr(pymupdf_text, page_count)
if should:
# range OCR 미지원(후속 PR) — PyMuPDF 결과 유지 + 사유 기록(silent skip 아님).
meta["ocr_skip_reason"] = "presegment_child_range_ocr_unsupported"
meta["ocr_reason"] = reason
logger.warning(
f"[pymupdf:child] {doc.file_path} pages={start}-{end} "
f"OCR 필요({reason})하나 range OCR 미지원 → PyMuPDF 결과 유지"
)
doc.extracted_text = pymupdf_text.replace("\x00", "")
doc.extracted_at = datetime.now(timezone.utc)
doc.extractor_version = PYMUPDF_VERSION if pymupdf_text.strip() else None
doc.extract_meta = meta
logger.info(
f"[pymupdf:child] {doc.file_path} pages={start}-{end} ({len(pymupdf_text)}자)"
)
return
# ─── kordoc 파싱 (HWP/HWPX/PDF) + PyMuPDF fallback + OCR ───
if fmt in KORDOC_FORMATS:
container_path = f"/documents/{doc.file_path}"
+6 -20
View File
@@ -17,7 +17,6 @@ Web/Blog ingest (devonagent 트랙, plan db-snuggly-petal.md):
- sidecar (.json) 누락 : skip 하고 ingest, web_meta.sidecar_missing=true
"""
import asyncio
import hashlib
import json
from pathlib import Path
@@ -118,18 +117,16 @@ def _route_media(path: Path, expected_category: str | None) -> tuple[str | None,
if expected_category == "library":
# 외부 작성 학습 자료 (KGS Code, 시행규칙 등). 문서 확장자만 수락.
# frontmatter 해석은 classify_worker (옵션 C) 가 담당. file_watcher 는 라우팅만.
# G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할.
if ext in LIBRARY_DOC_EXTS:
return ("library", False, "presegment")
return ("library", False, "extract")
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None) # audio/video 잘못 들어오면 skip
return (None, False, None) # 기타 알 수 없는 확장자 skip
# Inbox: 문서 파이프 (기존). audio/video 확장자가 실수로 여기 들어오면 skip.
# G2: 첫 stage=presegment (후보 A 검증완료). 非PDF/단일 통과, 번들 PDF 만 분할.
if ext in AUDIO_EXTS or ext in VIDEO_DIRECT_EXTS or ext in VIDEO_QUARANTINE_EXTS:
return (None, False, None)
return (None, False, "presegment")
return (None, False, "extract")
# ─── Web/Blog ingest (devonagent 트랙) 헬퍼 ──────────────────────────────────
@@ -139,10 +136,6 @@ def _canonicalize_url(url: str) -> str:
같은 글의 utm 변형 (`?utm_source=foo`) fragment 변형 (`#section`) 을
row 수렴시키기 위해 file_hash 산출 반드시 거친다.
R11c: news_collector._normalize_url(news 채널) 의도적으로 다르다 이쪽(web_clip)
query-sort/trailing-slash/소문자화로 공격적 정규화하지만, news 쪽은 query-식별 사이트의
별개 기사 붕괴 방지를 위해 보수적이다. 함수 통합 금지(채널별 dedup 의도가 다름).
"""
if not url:
return ""
@@ -228,8 +221,7 @@ async def _ingest_web_file(session, file_path: Path, rel_path: str) -> tuple[int
)
session.add(doc)
await session.flush()
# G2: 첫 stage=presegment (후보 A 검증완료). HTML(非PDF)은 presegment 가 무변 통과 → extract.
await enqueue_stage(session, doc.id, "presegment")
await enqueue_stage(session, doc.id, "extract")
return (1, 0)
@@ -254,8 +246,7 @@ async def watch_inbox():
async with async_session() as session:
# ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ───
if web_root.exists():
# rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))):
for file_path in web_root.rglob("*.html"):
if not file_path.is_file() or should_skip(file_path):
continue
rel_path = str(file_path.relative_to(nas_root))
@@ -273,8 +264,7 @@ async def watch_inbox():
Path(sub).name, (None, None, None)
)
# NFS 디렉토리 walk(blocking) off-thread 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))):
for file_path in scan_root.rglob("*"):
if not file_path.is_file() or should_skip(file_path):
continue
@@ -288,11 +278,7 @@ async def watch_inbox():
continue
rel_path = str(file_path.relative_to(nas_root))
# GB 파일 SHA-256 은 이벤트 루프를 점유 → 같은 루프의 모든 1분 주기 consumer
# + FastAPI 요청이 수십초~분 동시 정지. to_thread 오프로드. 스캔 루프가 이미
# 순차라 file_hash 는 한 번에 하나만 실행(직렬화) — 병렬 해싱 X = NFS 2.5GbE
# 대역폭·버퍼 메모리 blowup 방지 (R5).
fhash = await asyncio.to_thread(file_hash, file_path)
fhash = file_hash(file_path)
result = await session.execute(
select(Document).where(Document.file_path == rel_path)
-8
View File
@@ -297,10 +297,6 @@ async def collect_disaster_cases(session) -> int:
await _ingest_attachment(session, boardno, filenm, filepath)
except FeedError as e:
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
# 케이스 단위 commit (R4) — 이후 페이지/케이스의 _api_get 실패가 앞서 적재한
# 케이스까지 전체 rollback 하지 않게 부분 적재 보존 (csb/api_standards idiom).
await session.commit()
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
@@ -378,8 +374,6 @@ async def collect_fatal_accidents(session) -> int:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
new_count += 1
# 케이스 단위 commit (R4) — 이후 페이지 실패가 앞 케이스 전체 rollback 방지.
await session.commit()
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
@@ -456,8 +450,6 @@ async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
await session.flush()
await enqueue_stage(session, doc.id, "extract")
ingested += 1
# 항목 단위 commit (R4) — 다운로드 실패가 앞서 적재한 GUIDE 항목 전체 rollback 방지.
await session.commit()
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
+11 -54
View File
@@ -39,11 +39,7 @@ from models.queue import ProcessingQueue
logger = logging.getLogger(__name__)
# 마크다운 추출 엔드포인트. compose env `MARKER_ENDPOINT`(base URL)에서 읽는다 —
# 기본=marker(무변), 컷오버=`http://mineru-service:3301` 로 env 플립만으로 전환.
# marker/mineru 가 동일 /convert 계약(file_path·start/end·md+base64 images)이라 워커 무변.
_MARKDOWN_BASE = os.getenv("MARKER_ENDPOINT", "http://marker-service:3300").rstrip("/")
MARKER_ENDPOINT = _MARKDOWN_BASE if _MARKDOWN_BASE.endswith("/convert") else _MARKDOWN_BASE + "/convert"
MARKER_ENDPOINT = "http://marker-service:3300/convert"
MARKER_TIMEOUT = 300 # 큰 PDF 5 분 한도
MAX_PAGES = 200 # 소형 1-shot 경로 /convert max_pages 안전장치
@@ -185,10 +181,7 @@ async def process(document_id: int, session: AsyncSession) -> None:
await _fail(session, document_id, "no file_path")
return
# 후보 A: 자식(bundle cols)은 합성 file_path(`{부모}#p{s}-{e}`) → 실파일 = bundle_source_path
# 로 부모경로 복원. 일반 doc 은 그대로(접미사 없음). marker/mineru 는 실파일 + page 범위로 변환.
from workers.presegment_worker import bundle_source_path
container_path = _to_marker_path(bundle_source_path(doc.file_path))
container_path = _to_marker_path(doc.file_path)
suffix = Path(container_path).suffix.lower()
# ---- (3) office/hwp → md (C-2): PDF 외 지원 포맷은 office_md 하이브리드 변환 ----
@@ -210,21 +203,7 @@ async def process(document_id: int, session: AsyncSession) -> None:
return
# ---- (4) page_count gauge + 분기 (LargeDoc split) ----
# G2 (PR-G2-2): 번들 자식 doc 은 부모 파일 공유 + 자기 page 범위([bundle_page_start, end],
# 1-based inclusive)만 변환해야 한다. page_offset = 절대 시작페이지(부모 파일 기준), page_count =
# 자식 범위의 페이지 수. cols 가 NULL(일반 doc)이면 page_offset=1 + 전체 page_count = 기존 동작 동일.
file_page_count = _get_page_count(container_path)
is_child = doc.bundle_page_start is not None and doc.bundle_page_end is not None
if is_child:
page_offset = doc.bundle_page_start
if file_page_count is not None:
child_end = min(doc.bundle_page_end, file_page_count)
page_count = max(0, child_end - doc.bundle_page_start + 1)
else:
page_count = doc.bundle_page_end - doc.bundle_page_start + 1
else:
page_offset = 1
page_count = file_page_count
page_count = _get_page_count(container_path)
# >MAX_SPLIT_PAGES = 변환 안전상태(manual_review). silently skip 아님.
if page_count is not None and page_count > MAX_SPLIT_PAGES:
@@ -243,35 +222,20 @@ async def process(document_id: int, session: AsyncSession) -> None:
# ---- (6) 변환 분기: 소형 1-shot / 대형(>SPLIT_THRESHOLD) page-range 분할 ----
if page_count is not None and page_count > SPLIT_THRESHOLD_PAGES:
await _process_split(doc, document_id, container_path, page_count, session, page_offset)
await _process_split(doc, document_id, container_path, page_count, session)
else:
await _process_single(doc, document_id, container_path, session, page_count, page_offset)
await _process_single(doc, document_id, container_path, session)
async def _process_single(
doc: Document, document_id: int, container_path: str, session: AsyncSession,
page_count: int | None = None, page_offset: int = 1,
doc: Document, document_id: int, container_path: str, session: AsyncSession
) -> None:
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로).
G2 (PR-G2-2): 번들 자식(page_offset>1) [page_offset, page_offset+page_count-1] 범위만
변환하도록 marker start_page/end_page 명시한다. 일반 doc(page_offset=1) 기존과
동일하게 max_pages 보낸다(payload byte-identical).
"""
# 일반 doc = 기존 payload 유지. 자식만 절대 page 범위를 명시(부모 파일 기준 1-based inclusive).
if page_offset > 1 and page_count is not None:
req_json = {
"file_path": container_path,
"start_page": page_offset,
"end_page": page_offset + page_count - 1,
}
else:
req_json = {"file_path": container_path, "max_pages": MAX_PAGES}
"""소형 PDF(≤ SPLIT_THRESHOLD_PAGES) 통째 1-shot 변환 (Phase 1B/1B.5 기존 경로)."""
try:
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
resp = await client.post(
MARKER_ENDPOINT,
json=req_json,
json={"file_path": container_path, "max_pages": MAX_PAGES},
)
resp.raise_for_status()
data = resp.json()
@@ -545,7 +509,6 @@ async def _process_split(
container_path: str,
page_count: int,
session: AsyncSession,
page_offset: int = 1,
) -> None:
"""대형 PDF page-range 분할 변환.
@@ -556,10 +519,6 @@ async def _process_split(
invariant: page numbering = 1-based inclusive (batch1: 1..BATCH_PAGES, ...).
marker slug(`_page_0_*`) batch 마다 재시작 batch rewrite stitch (충돌 회피).
G2 (PR-G2-2): page_offset = 부모 파일 기준 절대 시작페이지(번들 자식). marker 보내는
page 절대값(page_offset 가산), manifest/기록은 자식 상대값(1-based) 유지 일반 doc
(page_offset=1) abs==rel 이라 기존 동작과 동일.
"""
n_batches = (page_count + BATCH_PAGES - 1) // BATCH_PAGES
succeeded: list[dict[str, Any]] = [] # {start_page, end_page, md}
@@ -571,17 +530,15 @@ async def _process_split(
async with httpx.AsyncClient(timeout=MARKER_TIMEOUT) as client:
for b in range(n_batches):
start_page = b * BATCH_PAGES + 1 # 자식 상대 1-based (manifest/기록용)
start_page = b * BATCH_PAGES + 1
end_page = min((b + 1) * BATCH_PAGES, page_count)
abs_start = start_page + (page_offset - 1) # 부모 파일 절대 page (marker 요청용)
abs_end = end_page + (page_offset - 1)
try:
resp = await client.post(
MARKER_ENDPOINT,
json={
"file_path": container_path,
"start_page": abs_start,
"end_page": abs_end,
"start_page": start_page,
"end_page": end_page,
},
)
resp.raise_for_status()
-110
View File
@@ -1,110 +0,0 @@
"""메모 → 문서 승격 시 거친 메모를 구조화된 마크다운 문서로 정리 (26B, P2).
`POST /memos/{id}/promote-to-document` `source_metadata.needs_draft=true` 마커를
찍으면 스케줄 워커가 집어 AIClient.call_primary(26B Mac mini = 로컬, 과금규칙 부합)
md_content 생성한다. markdown canonical Phase 1A 스키마 재사용:
- content_origin='ai_drafted' + md_draft_status='draft'
(migration 212 제약: md_draft_status NOT NULL content_origin='ai_drafted' 필수)
- md_status='success', md_extraction_engine='ai_draft'
원본 메모는 extracted_text 보존(검색/청크는 원문 사용). "필요시" = 이미 정돈된 메모는
프롬프트가 형식만 다듬고, 거친 메모는 구조화하도록 지시(사실 추가 금지).
"""
import logging
from datetime import datetime, timezone
from sqlalchemy import select
from ai.client import AIClient, strip_thinking
from core.database import async_session
from models.document import Document
from services.search.llm_gate import Priority, acquire_mlx_gate
logger = logging.getLogger(__name__)
# 한 번에 처리할 승격 문서 수 (26B 콜 = 무겁다 → 소량 순차). interval 잡이라 다음 틱에 이어 처리.
_BATCH = 2
# 너무 짧은 메모는 문서화 의미 없음 — 마커만 정리하고 md 생성 스킵.
_MIN_CHARS = 20
_DRAFT_SYSTEM = (
"당신은 사용자의 거친 메모를 사실 추가 없이 깔끔한 마크다운 문서로 정리하는 도우미입니다."
)
_DRAFT_PROMPT = """다음은 사용자가 빠르게 적은 메모입니다. 이를 정식 자료 문서로 정리하세요.
규칙:
- 메모에 있는 정보만 사용하고, 내용·사실을 추가하거나 추측하지 마세요.
- 이미 정돈돼 있으면 형식만 다듬고, 거친 메모면 제목·소제목·목록으로 구조화하세요.
- 원문 언어를 유지하세요(한국어는 한국어, 영어는 영어).
- 출력은 마크다운 본문만. 인사말·메타 설명 없이 문서 내용만 출력하세요.
--- 메모 ---
{content}
--- ---"""
async def _ids_needing_draft() -> list[int]:
async with async_session() as session:
rows = (
await session.execute(
select(Document.id)
.where(
Document.deleted_at.is_(None),
# JSONB 마커 (json/jsonb 공통 ->> 연산자). promote 가 needs_draft=true 세팅.
Document.source_metadata.op("->>")("needs_draft") == "true",
)
.order_by(Document.id)
.limit(_BATCH)
)
).scalars().all()
return list(rows)
async def run() -> None:
"""needs_draft 마커가 찍힌 승격 문서를 26B로 문서화 (interval job, no-arg)."""
ids = await _ids_needing_draft()
if not ids:
return
client = AIClient()
for doc_id in ids:
# 문서별 독립 세션·트랜잭션 — 1건 실패가 나머지를 막지 않게.
async with async_session() as session:
try:
doc = await session.get(Document, doc_id)
if doc is None or not (doc.source_metadata or {}).get("needs_draft"):
continue # 경합/이미 처리됨
source = (doc.extracted_text or "").strip()
now = datetime.now(timezone.utc)
meta = dict(doc.source_metadata or {})
md = ""
if len(source) >= _MIN_CHARS:
# 26B 호출은 반드시 mlx gate(Semaphore 1) 안에서 — 동시 호출 pile-up 방지
# ([[feedback_llm_verification_load_pileup]]). BACKGROUND = 사용자 대면보다 양보.
async with acquire_mlx_gate(Priority.BACKGROUND):
raw = await client.call_primary(
_DRAFT_PROMPT.format(content=source), system=_DRAFT_SYSTEM
)
md = strip_thinking(raw or "").strip()
if md:
doc.md_content = md
# 제약(212): md_draft_status NOT NULL 이면 content_origin='ai_drafted' 여야 함.
doc.content_origin = "ai_drafted"
doc.md_draft_status = "draft"
doc.md_status = "success"
doc.md_extraction_engine = "ai_draft"
doc.md_generated_at = now
meta["drafted_at"] = now.isoformat()
# 성공/스킵 모두 마커 해제(무한 재시도 방지). 26B 호출 자체가 예외면 except 로 빠져 마커 유지.
meta["needs_draft"] = False
doc.source_metadata = meta
doc.updated_at = now
await session.commit()
logger.info("memo_draft doc=%s md_len=%d", doc_id, len(md))
except Exception:
logger.exception("memo_draft 실패 doc=%s (다음 틱 재시도)", doc_id)
await session.rollback()
+101 -65
View File
@@ -83,10 +83,6 @@ def _normalize_url(url: str) -> str:
query 전체 제거 금지: hada.io/topic?id= · aitimes articleView.html?idxno= ·
HN item?id= query-식별 사이트에서 별개 기사가 같은 URL 붕괴된다.
저장(edit_url)·조회 양쪽이 함수를 공유해야 dedup 성립.
R11c: file_watcher._canonicalize_url(web_clip 채널) 의도적으로 다르다 이쪽은 콘텐츠
식별 query 보존(별개 기사 붕괴 방지) 핵심이라 query-sort/trailing-slash/소문자화를 한다.
함수 통합 금지(news dedup 깨짐). 채널별 normalization 의도된 설계.
"""
parsed = urlparse(url)
kept = [
@@ -401,55 +397,6 @@ def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict:
}
async def _already_ingested(session, article_id: str, normalized_url: str, link: str) -> bool:
"""이미 적재된 기사인지 — file_hash 또는 정규화/raw edit_url 매칭 (3 fetch 공통, R11c).
레거시 raw URL + 교차 게시 다중 매칭 내성(first). _fetch_rss/_fetch_api_guardian/
_fetch_api_nyt 복제하던 동일 존재체크를 단일화.
"""
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id)
| (Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
return existing.scalars().first() is not None
def _build_news_doc(source, ident, source_short, article_id, title, body,
extractor_version, normalized_url, pub_dt) -> Document:
"""3 fetch 공통 뉴스 Document 빌더 (R11c). 채널별 차이는 인자로만 — body(NYT=summary)·
extractor_version·ident(category 계산 차이 흡수) 다르고 22 필드 구조는 정적 동일.
edit_url 조회와 동일 정규화 저장(raw 저장 URL dedup 무력화)."""
return Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version=extractor_version,
# article = 텍스트 네이티브 → 생성 시점 terminal 'skipped' 명시(markdown 변환 비대상,
# 미명시 시 'pending' 영구 비수렴 → backlog 지표 오염). page 정책은 fulltext_worker 승격.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
@@ -568,7 +515,13 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
if await _already_ingested(session, article_id, normalized_url, link):
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
@@ -580,9 +533,35 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, body,
extractor_version, normalized_url, pub_dt,
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version=extractor_version,
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
# 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
@@ -679,7 +658,13 @@ async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
if await _already_ingested(session, article_id, normalized_url, link):
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
@@ -690,9 +675,30 @@ async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, body,
"guardian_api_full" if is_full else "guardian_api", normalized_url, pub_dt,
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version="guardian_api_full" if is_full else "guardian_api",
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
@@ -749,7 +755,13 @@ async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
if await _already_ingested(session, article_id, normalized_url, link):
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
@@ -760,9 +772,33 @@ async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = _build_news_doc(
source, ident, source_short, article_id, title, summary,
"nyt_api", normalized_url, pub_dt,
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_at=datetime.now(timezone.utc),
extractor_version="nyt_api",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
+1 -8
View File
@@ -331,13 +331,11 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
filter_str = (build_issn_filter(wm_key, watermark) if kind == "issn"
else build_filter(wm_key, watermark))
newest: str | None = None
capped = False # 이번 run 이 cap 으로 시드 중도 절단됐는지 (R4)
cursor = "*"
max_pages = (10**6 if bulk else _MAX_PAGES_PER_KW)
try:
for _page in range(max_pages):
if inserted >= run_cap:
capped = True
break
text = await _fetch(client, key, filter_str, cursor)
_count, next_cursor, works = parse_openalex_works(text)
@@ -355,17 +353,12 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
else:
await session.rollback()
if inserted >= run_cap:
capped = True
break
await asyncio.sleep(_REQ_SLEEP)
if not next_cursor:
break
cursor = next_cursor
# cap 절단 시 워터마크 미전진 — 미페치 works 가 다음 run 의 watermark 필터
# (publication_date > watermark)에 영구 배제되는 silent loss 방지. 미전진하면
# 다음 run 이 옛 watermark 부터 재페치하며 적재분 dedup-skip(cap 미소모) 후
# 이어 적재 → 백로그 run 당 cap 소화 (R4). bulk 은 cap 무관.
if newest and not capped:
if newest:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
_set_watermark(src, wm_key, newest)
+142
View File
@@ -0,0 +1,142 @@
"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1).
docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \
--target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32]
설계 원칙 (plan r3):
- resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) 중단/재실행 이어서.
배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트".
- 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL).
cand 테이블은 동결 범위로만 INSERT (retrieval cand path snapshot filter 타는 전제).
- 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input /
chunk_worker [제목][섹션][본문]) + plain (instruct prefix 쿼리 전용 G-1 불변식).
- 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력).
- qwen4m CLI 대상이 아님 qwen4 적재 SQL 파생(subvector+l2_normalize), plan E-1.
"""
import argparse
import asyncio
import hashlib
import time
import httpx
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from workers.embed_worker import _build_embed_input
logger = setup_logger("phase2a_cand_backfill")
OLLAMA_EMBED = "http://ollama:11434/api/embed"
TARGETS = {
"qwen06": {
"model": "qwen3-embedding:0.6b", "dim": 1024,
"docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06",
},
"qwen4": {
"model": "qwen3-embedding:4b", "dim": 2560,
"docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4",
},
}
async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]:
r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600)
r.raise_for_status()
embs = r.json()["embeddings"]
if len(embs) != len(texts):
raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}")
return embs
async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT d.id FROM documents d
WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id)
ORDER BY d.id LIMIT :b
"""), {"m": doc_id_max, "b": batch})).scalars().all()
if not rows:
break
docs = [(await session.get(Document, i)) for i in rows]
inputs = [_build_embed_input(d) for d in docs]
embs = await _embed_batch(http, target["model"], inputs)
for d, inp, e in zip(docs, inputs, embs):
await session.execute(text(f"""
INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding)
VALUES (:i, :h, cast(:e AS vector))
ON CONFLICT (doc_id) DO NOTHING
"""), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})")
return total
async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title
FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id
WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id)
ORDER BY c.id LIMIT :b
"""), {"m": chunk_id_max, "b": batch})).all()
if not rows:
break
inputs = [
f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}"
for r in rows
]
embs = await _embed_batch(http, target["model"], inputs)
for r, e in zip(rows, embs):
await session.execute(text(f"""
INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding)
VALUES (:i, :d, :x, :s, :t, cast(:e AS vector))
ON CONFLICT (id) DO NOTHING
"""), {"i": r.id, "d": r.doc_id, "x": r.chunk_index,
"s": r.section_title, "t": r.text, "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})")
return total
async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None:
target = TARGETS[target_key]
start = time.monotonic()
async with httpx.AsyncClient() as http:
nd = await backfill_docs(target, doc_id_max, batch, http)
nc = await backfill_chunks(target, chunk_id_max, batch, http)
mins = (time.monotonic() - start) / 60
async with async_session() as session:
cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one()
cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one()
logger.info(
f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · "
f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)"
)
def main() -> None:
p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)")
p.add_argument("--target", required=True, choices=sorted(TARGETS))
p.add_argument("--doc-id-max", type=int, required=True)
p.add_argument("--chunk-id-max", type=int, required=True)
p.add_argument("--batch", type=int, default=32)
a = p.parse_args()
asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch))
if __name__ == "__main__":
main()
-562
View File
@@ -1,562 +0,0 @@
"""presegment_worker — extract 前 번들 PDF(여러 논리문서 한 파일) → N 자식 분할 (G2 / PR-G2-2).
문서가 presegment stage 진입한다(worker-side gating):
- 非PDF(file_format != pdf · suffix != .pdf) = 즉시 fast-exit enqueue_next_stage extract 흘림.
- PDF = PyMuPDF ToC(level-1) deterministic 분석. '명확한 번들' 자식 분할, 나머지는 단일문서로 extract.
deterministic 경로(PR-G2-2): 판정이 애매하면 보수적으로 분할하지 않고 단일문서로 둔다
(bias to NOT splitting). 분할 = '확실한 번들' :
- page_count >= MIN_BUNDLE_PAGES AND level-1 ToC 항목 >= 2 AND 모든 자식 >= MIN_CHILD_PAGES
AND 단조 증가·비중첩 AND [1, page_count] 범위 커버 AND 2 <= N <= MAX_CHILDREN.
LLM 경계 폴백(PR-G2-3, env PRESEGMENT_LLM_FALLBACK, 기본 OFF scaffold-first): deterministic
'명확한 번들' 만든 대형 PDF(ToC 없음/level-1 없음/게이트 미달) 한해, OFF 오늘과
동일(단일문서)이고 ON 이면 off-card Qwen(맥북, 라우터 :8890, model=qwen-macbook)에게 경계를
제안받는다. compact per-page heading 샘플만 전송(본문 미전송). LLM 출력은 **동일 검증 게이트
(_is_clear_bundle)** 통과 시에만 deterministic 같은 _create_children 경로로 분할
is_bundle=false / 파싱·검증 실패 = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
맥북 불가(503/연결/절단) StageDeferred 재시도(백오프, no silent fallback).
분할 후보 A(물리분할 없음, uq_documents_file_path 해소): 자식 file_path = unique 합성값
`{부모경로}#p{start}-{end}` (UNIQUE 제약 통과), 실파일은 `bundle_source_path()` 로 부모 경로 복원.
자식은 bundle_page_start/end(1-based inclusive) 부모 파일의 자기 page 범위만 가리킨다.
부모-자식 관계 정본 = document_lineage(relation_type='segmented_from'). 부모(presegment_role='parent')
파일 홀더라 자체 extract/embed enqueue_next_stage presegmentextract 전이가 'parent'
억제된다(queue_consumer 참조). 자식의 extract 워커가 직접 enqueue. extract_worker/marker_worker
자식 처리 bundle_source_path() 실파일 접근.
멱등: 재실행 같은 부모로 이미 자식이 있으면(document_lineage segmented_from) 재생성하지 않고
수렴( 자식이 extract 활성/완료 상태인지만 보장)한다.
해결 이력 (2026-06-18): 최초 Option A(자식이 부모 file_path 그대로 공유) uq_documents_file_path
UNIQUE 위반(실번들 검증서 발견) 합성 file_path(후보 A) 해소. 인제스트 재활성 = 합성번들 재검증 PASS .
plan: G2 pre-segmentation (PR-G2-2 deterministic ToC segmentation)
"""
import hashlib
import os
import re
import unicodedata
from pathlib import Path
from pydantic import BaseModel, ValidationError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, call_deep_or_defer, parse_json_response
from core.config import settings
from core.utils import setup_logger
from models.document import Document
from models.document_lineage import DocumentLineage
from models.queue import enqueue_stage
logger = setup_logger("presegment_worker")
# ─── 임계값 (모듈 상수, env-override 가능, 보수적 = 분할 안 하는 쪽으로 bias) ───
# MIN_BUNDLE_PAGES: 이 미만이면 번들로 보지 않음(단일문서). 짧은 문서의 우연한 level-1 ToC 보호.
MIN_BUNDLE_PAGES = int(os.getenv("PRESEGMENT_MIN_BUNDLE_PAGES", "60"))
# MIN_CHILD_PAGES: 자식 하나라도 이 미만이면 분할 거부(표지/목차만 떼지는 over-split 방지).
MIN_CHILD_PAGES = int(os.getenv("PRESEGMENT_MIN_CHILD_PAGES", "5"))
# MAX_CHILDREN: 자식 수 상한. 초과 = ToC 가 챕터/소제목 수준이라 논리문서 경계가 아님 → 분할 거부.
MAX_CHILDREN = int(os.getenv("PRESEGMENT_MAX_CHILDREN", "50"))
# marker_worker._to_marker_path 와 동일 — NAS 상대경로 → 컨테이너 절대경로 prefix.
CONTAINER_PATH_PREFIX = os.getenv("MARKER_CONTAINER_PATH_PREFIX", "/documents")
# ─── PR-G2-3 LLM 경계 폴백 (scaffold-first, 기본 OFF) ───
# PRESEGMENT_LLM_FALLBACK: 기본 "false". OFF 면 deterministic 경로만(=오늘과 동일 — 애매하면
# 단일문서). ON 이면 deterministic 이 '명확한 번들' 을 못 만든 대형 PDF(page_count >=
# MIN_BUNDLE_PAGES) 에 한해 off-card Qwen(맥북, 라우터 :8890 경유)에게 경계를 제안받아
# **동일 검증 게이트(_is_clear_bundle)** 통과 시에만 deterministic 과 같은 자식 생성 경로로 분할.
# 검증 실패/파싱 실패/is_bundle=false = 단일문서(오늘과 동일) + presegment_llm_rejected 로깅.
PRESEGMENT_LLM_FALLBACK = os.getenv("PRESEGMENT_LLM_FALLBACK", "false").lower() in (
"1", "true", "yes", "on",
)
# LLM 에 보내는 per-page 샘플의 page 당 char 상한 (heading/첫줄만 — 본문 미전송).
PRESEGMENT_LLM_PAGE_CHARS = int(os.getenv("PRESEGMENT_LLM_PAGE_CHARS", "80"))
# 전체 page-sample 블록의 char 상한 (수 KB 가드 — 초과 시 잘라냄, 본문 누출/페이로드 폭발 방지).
PRESEGMENT_LLM_SAMPLE_CHARS = int(os.getenv("PRESEGMENT_LLM_SAMPLE_CHARS", "12000"))
# 경계 폴백 프롬프트 (app/prompts/presegment_boundaries.txt). system 지시 + 1-based inclusive·
# 전범위 커버·무중첩 규칙. {page_count}/{page_samples} 를 str.replace 로 주입.
_PRESEGMENT_PROMPT_PATH = Path(__file__).parent.parent / "prompts" / "presegment_boundaries.txt"
class Segment(BaseModel):
"""LLM 이 제안하는 1-based inclusive page 범위 한 조각."""
start_page: int
end_page: int
title: str | None = None
class SegmentationOutput(BaseModel):
"""presegment_boundaries 응답 스키마. parse_json_response → model_validate."""
is_bundle: bool = False
segments: list[Segment] = []
confidence: float | None = None
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 차이 흡수. thumbnail_worker._resolve_path 와 동일 패턴."""
candidates = [
file_path,
unicodedata.normalize("NFD", file_path),
unicodedata.normalize("NFC", file_path),
]
for c in candidates:
p = Path(c)
if p.exists():
return p
parent = Path(file_path).parent
if parent.exists():
target = unicodedata.normalize("NFC", Path(file_path).name)
for child in parent.iterdir():
if unicodedata.normalize("NFC", child.name) == target:
return child
return None
def _to_container_path(file_path: str) -> str:
"""file_path 를 컨테이너 내부 절대경로로 변환 (marker_worker._to_marker_path 와 동일)."""
if file_path.startswith("/"):
return file_path
return f"{CONTAINER_PATH_PREFIX}/{file_path}"
# 후보 A: 자식 합성 file_path 패턴 `{부모경로}#p{start}-{end}` (uq_documents_file_path 유일성).
_BUNDLE_SUFFIX_RE = re.compile(r"#p\d+-\d+$")
def bundle_source_path(file_path: str | None) -> str | None:
"""자식 합성 file_path → 부모 실파일 경로 복원. 일반 doc(접미사 없음)은 그대로 반환.
extract_worker/marker_worker 자식 처리 실제 파일 접근에 사용 (자식 file_path
합성값이라 디스크에 없음). 결정적·세션 불필요. lineage 부모-자식 관계의 정본 기록.
"""
if not file_path:
return file_path
return _BUNDLE_SUFFIX_RE.sub("", file_path)
def _is_pdf(doc: Document) -> bool:
"""PDF 판정 — file_format=pdf 또는 .pdf 확장자."""
fmt = (doc.file_format or "").lower()
if fmt == "pdf":
return True
if doc.file_path:
return Path(doc.file_path).suffix.lower() == ".pdf"
return False
def _level1_segments(toc: list, page_count: int) -> list[dict]:
"""get_toc(simple=True) 결과에서 level-1 항목만 골라 자식 후보 segment 리스트 생성.
toc 항목 = [level, title, page] (page 1-based). level==1 채택.
end_page = 다음 level-1 항목의 page - 1, 마지막 = page_count.
동일 page 에서 시작하는 level-1 여럿이면 정렬 인접 항목으로 경계 계산되며,
경우 0-페이지 segment 생겨 후속 검증(MIN_CHILD_PAGES·단조)에서 거부된다.
"""
starts = []
for entry in toc:
# simple=True 는 [level, title, page]. 방어적으로 길이 체크.
if not entry or len(entry) < 3:
continue
level, title, page = entry[0], entry[1], entry[2]
if level != 1:
continue
# ToC page 가 범위 밖(0/음수/page_count 초과)이면 깨진 ToC → 후속 검증에서 거부됨.
starts.append((int(page), (title or "").strip()))
# ToC 가 정렬돼 있지 않을 수 있으므로 page 기준 정렬(원본 순서 보존 위해 안정 정렬).
starts.sort(key=lambda x: x[0])
segments: list[dict] = []
for i, (start_page, title) in enumerate(starts):
if i + 1 < len(starts):
end_page = starts[i + 1][0] - 1
else:
end_page = page_count
segments.append({"start_page": start_page, "end_page": end_page, "title": title})
return segments
def _is_clear_bundle(segments: list[dict], page_count: int) -> tuple[bool, str]:
"""deterministic '명확한 번들' 판정. (clear, reason) 반환.
clear=True reason="" / clear=False reason 거부 사유(로깅용).
모든 조건은 보수적 하나라도 어긋나면 단일문서로 처리(분할 ).
"""
n = len(segments)
if n < 2:
return False, f"too_few_level1_entries(n={n})"
if n > MAX_CHILDREN:
return False, f"too_many_children(n={n}>{MAX_CHILDREN})"
# 첫 segment 가 1페이지에서 시작 + 마지막이 page_count 에서 끝 = 전 범위 커버.
if segments[0]["start_page"] != 1:
return False, f"first_start_not_1(start={segments[0]['start_page']})"
if segments[-1]["end_page"] != page_count:
return False, f"last_end_not_page_count(end={segments[-1]['end_page']},pc={page_count})"
prev_end = 0
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
# 단조 증가 · 비중첩: 각 start 는 직전 end + 1 이어야 빈틈/겹침 없이 [1,pc] 정확 분할.
if start != prev_end + 1:
return False, f"non_contiguous(start={start},prev_end={prev_end})"
if end < start:
return False, f"non_monotonic(start={start},end={end})"
if (end - start + 1) < MIN_CHILD_PAGES:
return False, f"child_too_small(pages={end - start + 1}<{MIN_CHILD_PAGES})"
prev_end = end
if prev_end != page_count:
return False, f"coverage_gap(covered={prev_end},pc={page_count})"
return True, ""
def _child_title(parent: Document, seg: dict) -> str:
"""자식 제목 = 부모 제목 + '' + (segment 제목 또는 page 범위)."""
base = (parent.title or "").strip() or (parent.original_filename or "") or "문서"
seg_title = (seg.get("title") or "").strip()
suffix = seg_title if seg_title else f"p.{seg['start_page']}-{seg['end_page']}"
return f"{base}{suffix}"
def _child_file_hash(parent_hash: str, start: int, end: int) -> str:
"""자식 file_hash = sha256(f"{parent.file_hash}:{start}-{end}"). 결정적 → 재실행 멱등.
부모 file_hash NULL 수는 없으나(NOT NULL) 방어적으로 문자열 처리.
"""
return hashlib.sha256(f"{parent_hash or ''}:{start}-{end}".encode("utf-8")).hexdigest()
async def _ensure_child_extract(session: AsyncSession, child_id: int) -> None:
"""자식이 아직 extract 안 됐으면 extract enqueue (멱등 수렴 경로).
이미 extracted_text 채워졌거나 활성 행이 있으면 enqueue_stage no-op/skip.
"""
child = await session.get(Document, child_id)
if child is None:
return
# 이미 추출 완료면 재enqueue 불필요 (큐 중복은 enqueue_stage 가 막지만 의미상으로도 skip).
if child.extracted_at is not None and child.extracted_text is not None:
return
await enqueue_stage(session, child_id, "extract")
async def _create_children(
doc: Document, segments: list[dict], session: AsyncSession
) -> int:
"""검증된 segments 로 자식 N개 생성 + lineage + extract enqueue + 부모 표식 (멱등).
deterministic '명확한 번들' 경로와 LLM 폴백 경로가 공유하는 단일 자식 생성 경로.
호출 segments 반드시 _is_clear_bundle 검증을 통과해야 한다(여기선 재검증 X).
commit 까지 수행. 반환값 = 실제 생성한 자식 (이미 존재해 수렴만 경우 0).
"""
# ─── 멱등 체크: 이미 자식이 있으면 수렴만 (재생성 금지) ───
existing_children = (
await session.execute(
select(DocumentLineage.derived_document_id).where(
DocumentLineage.source_document_id == doc.id,
DocumentLineage.relation_type == "segmented_from",
)
)
).scalars().all()
if existing_children:
# 부모 표식이 누락된 경우 보정(이전 부분실패 복구).
if doc.presegment_role != "parent":
doc.presegment_role = "parent"
for child_id in existing_children:
await _ensure_child_extract(session, child_id)
await session.commit()
logger.info(
f"[presegment] id={doc.id} children already exist "
f"(n={len(existing_children)}) → converge(ensure extract), no re-create"
)
return 0
# ─── 자식 N개 생성 + lineage + extract enqueue ───
created_ids: list[int] = []
for seg in segments:
start, end = seg["start_page"], seg["end_page"]
child = Document(
# 후보 A: 자식 file_path = unique 합성값 `{부모경로}#p{s}-{e}` (uq_documents_file_path
# 충돌 회피). 실파일은 bundle_source_path() 로 복원(부모 경로). 물리 분할 없음 —
# 자식은 bundle_page_start/end 로 부모 파일을 슬라이스.
file_path=f"{doc.file_path}#p{start}-{end}",
file_hash=_child_file_hash(doc.file_hash, start, end),
file_format=doc.file_format,
file_size=doc.file_size,
file_type=doc.file_type,
import_source=doc.import_source,
original_filename=doc.original_filename,
source_channel=doc.source_channel,
category=doc.category,
data_origin=doc.data_origin,
doc_purpose=doc.doc_purpose,
# 안전 자료실 축은 부모에서 상속(분할이 자료유형/관할을 바꾸지 않음).
material_type=doc.material_type,
jurisdiction=doc.jurisdiction,
title=_child_title(doc, seg),
bundle_page_start=start,
bundle_page_end=end,
presegment_role="child",
)
session.add(child)
await session.flush() # child.id 확보
created_ids.append(child.id)
session.add(
DocumentLineage(
source_document_id=doc.id,
derived_document_id=child.id,
relation_type="segmented_from",
meta={"start_page": start, "end_page": end},
)
)
# 자식 extract 는 워커가 직접 enqueue (부모는 'parent' 라 extract 로 흐르지 않음).
await enqueue_stage(session, child.id, "extract")
# 부모 = 파일 홀더. presegment→extract 전이는 enqueue_next_stage 가 'parent' 면 억제.
doc.presegment_role = "parent"
await session.commit()
logger.info(
f"[presegment] id={doc.id} SPLIT into {len(created_ids)} children "
f"child_ids={created_ids}"
)
return len(created_ids)
def _segments_from_output(out: "SegmentationOutput") -> list[dict]:
"""SegmentationOutput.segments(Pydantic) → _is_clear_bundle / _create_children 가 쓰는 dict 형태."""
return [
{"start_page": s.start_page, "end_page": s.end_page, "title": (s.title or "")}
for s in out.segments
]
def _page_samples(pdf, page_count: int) -> str:
"""LLM 입력용 compact per-page 샘플 — page 당 heading/첫줄만(`p{n}: {firstline}`).
PyMuPDF page.get_text() page 텍스트를 스트리밍하되 page 비공백 줄만,
PRESEGMENT_LLM_PAGE_CHARS 잘라 본문 누출 차단. 전체 블록은 PRESEGMENT_LLM_SAMPLE_CHARS
가드로 상한( KB) 초과 지점에서 중단(앞쪽 페이지 우선 보존).
"""
lines: list[str] = []
total = 0
for i in range(page_count):
try:
text = pdf[i].get_text() or ""
except Exception:
text = ""
first = ""
for ln in text.splitlines():
ln = ln.strip()
if ln:
first = ln
break
first = first[:PRESEGMENT_LLM_PAGE_CHARS]
entry = f"p{i + 1}: {first}"
if total + len(entry) + 1 > PRESEGMENT_LLM_SAMPLE_CHARS:
break
lines.append(entry)
total += len(entry) + 1
return "\n".join(lines)
async def _llm_boundary_fallback(
doc: Document, source: Path, page_count: int, session: AsyncSession
) -> bool:
"""애매 + 대형(ToC-less 등) PDF 에 대해 off-card Qwen 으로 경계 제안 → 검증 → 분할.
반환 True = LLM 경로가 분할을 수행(또는 멱등 수렴)했으므로 호출자는 추가 처리 없이 return.
반환 False = is_bundle=false / 파싱 실패 / 검증 실패 호출자는 단일문서(오늘과 동일) 처리.
맥북 불가(503/연결/절단) call_deep_or_defer StageDeferred raise 재시도(백오프).
silent fallback 금지 deep 슬롯 다른 backend 자동 호출 .
"""
import fitz # PyMuPDF — deterministic 경로와 동일 의존
# per-page 샘플은 파일을 다시 열어 스트리밍(deterministic with 블록과 분리해 그 경로 무회귀).
try:
with fitz.open(str(source)) as pdf:
samples = _page_samples(pdf, page_count)
except Exception as exc:
logger.warning(
f"[presegment] id={doc.id} llm fallback sample 실패 "
f"({type(exc).__name__}: {exc}) → single doc(extract)"
)
return False
try:
template = _PRESEGMENT_PROMPT_PATH.read_text(encoding="utf-8")
except Exception as exc:
logger.warning(
f"[presegment] id={doc.id} prompt 로드 실패 ({type(exc).__name__}: {exc}) "
f"→ single doc(extract)"
)
return False
prompt = template.replace("{page_count}", str(page_count)).replace(
"{page_samples}", samples
)
# off-card 호출 — call_deep_or_defer 가 deep 슬롯(맥북, 라우터 :8890, model=qwen-macbook)
# 으로 라우팅. 맥북 불가는 StageDeferred 로 전파(여기서 잡지 않음 → 큐가 보류/백오프).
# classify_worker 와 동일하게 AIClient() 인스턴스화.
client = AIClient()
try:
raw = await call_deep_or_defer(client, prompt)
finally:
await client.close()
parsed = parse_json_response(raw)
if not parsed:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=parse_failed raw={raw[:160]!r} → single doc(extract)"
)
return False
try:
out = SegmentationOutput.model_validate(parsed)
except (ValidationError, ValueError, TypeError) as exc:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=schema_invalid({type(exc).__name__}) → single doc(extract)"
)
return False
if not out.is_bundle:
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason=is_bundle_false → single doc(extract)"
)
return False
segments = _segments_from_output(out)
clear, reason = _is_clear_bundle(segments, page_count)
if not clear:
# LLM 출력을 그대로 믿지 않음 — deterministic 과 동일 게이트 미달이면 단일문서.
logger.info(
f"[presegment] presegment_llm_rejected id={doc.id} "
f"reason={reason} n={len(segments)} pages={page_count} → single doc(extract)"
)
return False
n = await _create_children(doc, segments, session)
logger.info(
f"[presegment] id={doc.id} LLM-SPLIT accepted "
f"(pages={page_count} n={len(segments)} created={n} "
f"confidence={out.confidence})"
)
return True
async def process(document_id: int, session: AsyncSession) -> None:
"""presegment stage 워커 진입점. queue_consumer 가 호출.
문서가 진입하며, 非PDF·단일문서는 변경 없이 통과(presegment_role 그대로 NULL) extract 흐른다.
'명확한 번들' PDF 자식 분할 + 부모를 'parent' 표식( 경우 부모는 extract 흐르지 않음).
"""
doc = await session.get(Document, document_id)
if doc is None:
logger.warning(f"[presegment] document {document_id} not found")
return
# ─── (0) 非PDF — fast-exit. presegment_role 그대로 NULL → enqueue_next_stage 가 extract 로 흘림 ───
if not _is_pdf(doc):
logger.info(f"[presegment] id={document_id} non-pdf (fmt={doc.file_format}) → extract")
return
# ─── (0.5) file_path 없음(예: note) — 분할 불가, 단일문서로 통과 ───
if not doc.file_path:
logger.info(f"[presegment] id={document_id} no file_path → extract")
return
# ─── (1) 이미 분할된 자식 자신이 presegment 로 다시 들어온 경우 — 재분할 금지 ───
# (정상 흐름에선 자식은 곧장 extract 로 enqueue 되지만, 재처리 스크립트 등으로 들어올 수 있음.)
if doc.presegment_role in ("child", "parent"):
logger.info(
f"[presegment] id={document_id} already presegment_role={doc.presegment_role} → skip"
)
return
# ─── (2) 파일 열기 + page_count ───
raw = str(Path(settings.nas_mount_path) / doc.file_path)
source = _resolve_path(raw)
if source is None:
# 파일 부재 = extract 가 동일 상황에서 FileNotFoundError 로 처리할 사안.
# presegment 는 분할 불가일 뿐이므로 단일문서로 통과시켜 extract 가 일관되게 처리하게 둔다.
logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract")
return
import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존
try:
with fitz.open(str(source)) as pdf:
page_count = pdf.page_count
toc = pdf.get_toc(simple=True) or []
except Exception as exc:
# PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화).
logger.warning(
f"[presegment] id={document_id} fitz open/toc failed "
f"({type(exc).__name__}: {exc}) → extract"
)
return
# ─── (3) page_count 가 임계 미만 = 단일문서 (대다수 경로) ───
if page_count < MIN_BUNDLE_PAGES:
logger.info(
f"[presegment] id={document_id} single doc "
f"(pages={page_count}<{MIN_BUNDLE_PAGES}) → extract"
)
return
# ─── (4) level-1 ToC → 자식 후보 segment ───
segments = _level1_segments(toc, page_count)
if not segments:
# 큰 PDF 인데 ToC 없음/level-1 없음 = 애매. flag ON 이면 LLM 경계 폴백(PR-G2-3),
# OFF(기본) 이면 오늘과 동일 — 단일문서로 처리하고 사유를 남긴다.
if PRESEGMENT_LLM_FALLBACK:
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason=no_level1_toc pages={page_count} → LLM fallback"
)
if await _llm_boundary_fallback(doc, source, page_count, session):
return
# LLM 이 분할하지 않음(is_bundle=false / 검증·파싱 실패) — 단일문서.
return
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason=no_level1_toc pages={page_count} → single doc(extract)"
)
return
clear, reason = _is_clear_bundle(segments, page_count)
if not clear:
# 큰 PDF + ToC 는 있으나 '명확한 번들' 기준 미달 = 애매. flag ON 이면 LLM 경계 폴백,
# OFF(기본) 이면 오늘과 동일 — 단일문서(분할 안 함).
if PRESEGMENT_LLM_FALLBACK:
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason={reason} pages={page_count} level1={len(segments)} → LLM fallback"
)
if await _llm_boundary_fallback(doc, source, page_count, session):
return
return
logger.info(
f"[presegment] presegment_ambiguous id={document_id} "
f"reason={reason} pages={page_count} level1={len(segments)} → single doc(extract)"
)
return
# ─── (5) 명확한 번들 (deterministic) — 공유 자식 생성 경로 (멱등 수렴 포함) ───
await _create_children(doc, segments, session)
+7 -68
View File
@@ -31,9 +31,9 @@ _hold_logged = False
# embed/chunk 1→10 (2026-06-12 fast-consumer): 건당 <1s 실측 — Phase 0.1 초기 보수값이
# LLM 사이클에 인질로 잡혀 실효 ~580/일 vs 수요 최대 2,700/일 → 적체 원인이었음.
# 10 = TEI/marker 와 GPU 공유 고려한 보수 상향(전용 1분 잡 기준 캡 ~14,400/일).
BATCH_SIZE = {"presegment": 3, "extract": 5, "classify": 3, "summarize": 3, "embed": 10,
"chunk": 10, "preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1,
"markdown": 1, "fulltext": 3}
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 10, "chunk": 10,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
@@ -46,16 +46,11 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
# (reset_stale_items 가 자기 집합만 reset, 교차 시 이중 복구 위험).
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"presegment", "extract", "classify", "summarize",
"preview", "stt", "thumbnail", "fulltext",
"extract", "classify", "summarize",
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
# 2026-06-15: deep_summary(26B, 콜당 70~300s)를 메인 루프에서 분리 (markdown/fast 선례).
# 단일 deep 호출이 1분 틱을 초과해 메인 consume_queue 가 영구 coalesce 되고 extract/
# classify 등 경량 stage 까지 굶던 문제 제거. 집합 disjoint(자기 집합만 stale reset).
DEEP_QUEUE_STAGES = ["deep_summary"]
# 고속(비-LLM·경량 GPU) stage — LLM 사이클(분 단위)에서 분리해 1분 잡 전용 소비.
# embed/chunk 는 건당 <1s 라 main 루프에 두면 classify(~190s×3) 뒤에서 굶는다
# (2026-06-12 실측: 적체 3,570 · 4070 가동률 0%). markdown 분리(05-01)와 동일 패턴.
@@ -165,10 +160,6 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
}
next_stages = {
# G2 (PR-G2-2): 전 문서가 presegment → extract. 단, 번들 분할로 'parent' 가 된 문서는
# 파일 홀더라 자체 extract 안 함 — 아래 suppression 으로 이 전이를 건너뛴다(자식 extract 는
# presegment_worker 가 직접 enqueue). 단일/非PDF 문서(role NULL)는 정상적으로 extract 로 흐름.
"presegment": ["extract"],
"extract": ["classify", "preview"],
"classify": ["embed", "chunk", "markdown"],
"stt": ["classify"],
@@ -184,18 +175,6 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
stages = extract_override_by_channel[sc]
else:
stages = next_stages.get(current_stage, [])
elif current_stage == "presegment":
# 번들 분할 parent 는 extract 로 흐르지 않게 억제 (자식이 부모 extract 에 가려지는 것 방지).
# role NULL(단일/非PDF) / 'child' 는 정상 전이. presegment_worker 가 자식 extract 를 직접
# enqueue 하므로 'parent' 만 여기서 no-op.
from models.document import Document
async with async_session() as lookup_session:
doc = await lookup_session.get(Document, document_id)
role = doc.presegment_role if doc else None
if role == "parent":
stages = []
else:
stages = next_stages.get(current_stage, [])
else:
stages = next_stages.get(current_stage, [])
@@ -215,7 +194,6 @@ def _load_workers():
from workers.deep_summary_worker import process as deep_summary_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
from workers.presegment_worker import process as presegment_process
from workers.preview_worker import process as preview_process
from workers.stt_worker import process as stt_process
from workers.summarize_worker import process as summarize_process
@@ -224,8 +202,6 @@ def _load_workers():
from workers.fulltext_worker import process as fulltext_process
return {
# G2 (PR-G2-2): extract 前 번들 PDF → N 자식 분할 (deterministic ToC). 非PDF/단일은 통과.
"presegment": presegment_process,
"extract": extract_process,
"classify": classify_process,
"summarize": summarize_process,
@@ -294,15 +270,7 @@ async def _process_stage(stage, worker_fn):
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await skip_session.commit()
# 완료 커밋 후 enqueue — 실패가 outer except 로 전파돼 completed 재오픈
# 되지 않게 격리 (R3, 정상 완료 경로와 동일 처리).
try:
await enqueue_next_stage(document_id, stage)
except Exception as enq_err:
logger.error(
f"[{stage}] document_id={document_id} skip(note) 완료됐으나 "
f"다음 단계 enqueue 실패: {enq_err}"
)
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} skip (note)")
continue
@@ -320,15 +288,7 @@ async def _process_stage(stage, worker_fn):
item.completed_at = datetime.now(timezone.utc)
await session.commit()
# 완료는 이미 커밋됨. enqueue_next_stage 실패가 outer except 로 전파되면
# completed 항목을 재오픈(pending/failed)해 같은 단계를 재실행 = 비싼 작업 중복
# + 부분 재쓰기. 자체 try 로 격리하고 ERROR 로 가시화한다 (R3).
try:
await enqueue_next_stage(document_id, stage)
except Exception as enq_err:
logger.error(
f"[{stage}] document_id={document_id} 완료됐으나 다음 단계 enqueue 실패: {enq_err}"
)
await enqueue_next_stage(document_id, stage)
logger.info(f"[{stage}] document_id={document_id} 완료")
except StageDeferred as defer:
@@ -445,24 +405,3 @@ async def consume_markdown_queue():
for stage in MARKDOWN_QUEUE_STAGES:
await _process_stage(stage, workers[stage])
async def consume_deep_queue():
"""deep_summary 전용 큐 소비자 (2026-06-15) — 26B 심층요약을 메인 파이프라인과 분리.
deep_summary 1콜이 70~300s(맥미니 Qwen 27B 폴백) 메인 consume_queue(1 ) 안에
있으면 틱이 interval 초과해 영구 "maximum running instances" coalesce 되고
extract/classify 경량 stage 까지 함께 굶었다. 분리 = deep 자기 1 잡에서
coalesce, 나머지 메인 루프는 완료. max_instances=1 동시 deep 2건은 방지.
"""
workers = _load_workers()
try:
await reset_stale_items(DEEP_QUEUE_STAGES, STALE_THRESHOLD_MINUTES)
except Exception:
logger.exception("deep stale reset failed, but continuing queue consumption")
for stage in DEEP_QUEUE_STAGES:
if stage in settings.pipeline_held_stages:
continue
await _process_stage(stage, workers[stage])
+1 -3
View File
@@ -102,9 +102,7 @@ async def _process_one(session: AsyncSession, qid: int, client: AIClient) -> boo
try:
async with asyncio.timeout(EMBED_TIMEOUT_S):
vec = await client.embed(text)
except asyncio.CancelledError:
raise # 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception as e:
except (asyncio.TimeoutError, Exception) as e:
logger.warning("study_q_embed_failed qid=%s err=%s: %s", qid, type(e).__name__, e)
# 실패 — status='failed'. 직전 embedding 보존.
q.embedding_status = "failed"
+1 -6
View File
@@ -121,12 +121,7 @@ async def process(document_id: int, session: AsyncSession) -> None:
ok = _extract_thumbnail(source, output, seek)
if not ok:
# 썸네일 추출 실패(ffmpeg)는 삼키지 않고 raise (R3) — queue_consumer 가 attempts
# 소진까지 재시도 후 status=failed 로 가시화. silent return 이면 큐가 completed 로
# 확정 + 썸네일 영구 누락 + 재시도/추적 0 (silent skip). 손상 영상이면 failed 로 안착.
raise RuntimeError(
f"thumbnail 추출 실패: document_id={document_id} source={source}"
)
return
doc.thumbnail_path = str(output)
doc.updated_at = datetime.now(timezone.utc)
-8
View File
@@ -52,11 +52,6 @@ DOMAIN_PRIORITY: list[tuple[str, str]] = [
("manual", "source_channel = 'manual'"),
]
# R12: filter_clause 는 SQL 에 직접 보간되므로 이 allowlist(DOMAIN_PRIORITY 출처) 통과분만
# 허용 — 현재 모듈 상수라 injection 경로 0 이나, 외부 입력화 시 즉시 차단하는 final gate
# (retrieval_service 의 _VALID_DOCS_TABLE allowlist 정본 대비 비대칭 해소).
_ALLOWED_FILTER_CLAUSES: frozenset[str] = frozenset(c for _, c in DOMAIN_PRIORITY)
async def _classify_pending(session: AsyncSession) -> int:
return int(await session.scalar(text("""
@@ -71,9 +66,6 @@ async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int)
extracted_text 문자열 (LENGTH=0) 제외 classify_worker not doc.extracted_text
truthy 체크라 문자열에서 ValueError raise. 무한 retry 루프 방지.
"""
# R12: SQL 직접 보간 전 allowlist final gate.
if filter_clause not in _ALLOWED_FILTER_CLAUSES:
raise ValueError(f"비허용 filter_clause (allowlist 외): {filter_clause!r}")
sql = text(f"""
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
SELECT id, 'classify', 'pending', 0, 3
@@ -0,0 +1,63 @@
import AppKit
import Foundation
/// macOS + . AppKit(NSOpenPanel/NSSavePanel) AppFeature
/// (OS UI ) DSKit ( iOS/watchOS). @MainActor.
@MainActor
enum FilePanels {
/// 1 . nil.
static func pickFileToUpload() -> URL? {
let panel = NSOpenPanel()
panel.allowsMultipleSelection = false
panel.canChooseDirectories = false
panel.canChooseFiles = true
panel.message = "업로드할 문서를 선택하세요"
panel.prompt = "업로드"
return panel.runModal() == .OK ? panel.url : nil
}
/// . nil. = (files.user-selected).
static func pickSaveDestination(suggestedName: String) -> URL? {
let panel = NSSavePanel()
panel.nameFieldStringValue = suggestedName
panel.message = "원본 파일을 저장할 위치"
panel.prompt = "저장"
return panel.runModal() == .OK ? panel.url : nil
}
}
/// . URL ?token= ( ),
/// URL / . NSSavePanel .
@MainActor
enum FileDownloader {
enum Outcome: Equatable {
case saved(URL)
case cancelled
case failed(String)
}
/// `url` = DSDownload.fileURL ?token= URL. `suggestedName` = .
static func download(from url: URL, suggestedName: String) async -> Outcome {
guard let dest = FilePanels.pickSaveDestination(suggestedName: suggestedName) else {
return .cancelled
}
do {
let (temp, response) = try await URLSession.shared.download(from: url)
// (async download )
// . move temp removeItem no-op.
defer { try? FileManager.default.removeItem(at: temp) }
if let http = response as? HTTPURLResponse, !(200..<300).contains(http.statusCode) {
// URL/ .
return .failed("다운로드 실패 (HTTP \(http.statusCode))")
}
if FileManager.default.fileExists(atPath: dest.path) {
try FileManager.default.removeItem(at: dest)
}
try FileManager.default.moveItem(at: temp, to: dest)
return .saved(dest)
} catch {
// URLError/ localizedDescription URL .
return .failed("저장 실패: \((error as NSError).localizedDescription)")
}
}
}
@@ -1,85 +0,0 @@
import SwiftUI
import AIFabric
/// RAG proof page: routes corpusAsk through AIService (-> AIRouter -> MockAIProvider). Explicit backend
/// pick sets explicitProvider; an explicit-unavailable result renders a visible, non-retrying error.
struct AskView: View {
@Environment(AppModel.self) private var model
@State private var backend: BackendChoice = .auto
var body: some View {
@Bindable var model = model
ScrollView {
VStack(alignment: .leading, spacing: 14) {
Picker("백엔드", selection: $backend) {
ForEach(BackendChoice.allCases) { Text($0.label).tag($0) }
}
.pickerStyle(.segmented)
HStack(spacing: 8) {
TextField("코퍼스 전체에 질문", text: $model.askQuery)
.textFieldStyle(.roundedBorder)
.onSubmit { Task { await model.runAsk(backend: backend.provider) } }
Button("질문") { Task { await model.runAsk(backend: backend.provider) } }
.buttonStyle(.borderedProminent)
}
if let result = model.askResult {
switch result {
case .success(let response):
AICompletionView(response: response) { docID in
model.section = .documents
Task { await model.openDocument(docID) }
}
if let meta = model.askMeta {
HStack(spacing: 6) {
Chip("완성도 \(meta.completeness)", Sage.muted)
if let aspects = meta.coveredAspects {
ForEach(aspects, id: \.self) { Chip($0, Sage.brand) }
}
}
}
case .failure(let err):
ErrorBanner(text: message(for: err))
}
} else {
EmptyState(text: "질문을 입력하세요").frame(minHeight: 160)
}
}
.padding(16)
}
.background(Sage.surface)
}
private func message(for error: AIServiceError) -> String {
switch error {
case .explicitUnavailable(let id):
return "\(id.displayName) 백엔드를 쓸 수 없습니다 — 다른 백엔드로 자동 전환하지 않았습니다. 다른 백엔드를 고르세요."
case .notConfigured(let id): return "\(id.displayName) 백엔드 미구성"
case .noneAvailable: return "응답 가능한 백엔드가 없습니다."
case .providerFailed(let s): return "응답 실패: \(s)"
case .unknown(let s): return "오류: \(s)"
}
}
}
enum BackendChoice: String, CaseIterable, Identifiable {
case auto, onDevice, localMLX, remoteDS
var id: String { rawValue }
var label: String {
switch self {
case .auto: return "자동"
case .onDevice: return "온디바이스"
case .localMLX: return "맥미니"
case .remoteDS: return "원격 DS"
}
}
var provider: AIProviderID? {
switch self {
case .auto: return nil
case .onDevice: return .onDevice
case .localMLX: return .localMLX
case .remoteDS: return .remoteDS
}
}
}
@@ -1,51 +1,386 @@
import SwiftUI
import DSKit
/// Corpus-health overview (not a dumped table). Stat hero + domain distribution bars; tapping a
/// domain jumps to Documents (cross-page nav proof).
/// = ( 1). detail 1000pt , 2.
/// ( + + ) (·)/(·).
struct DashboardView: View {
@Environment(AppModel.self) private var model
var body: some View {
ScrollView {
ScrollView(.vertical) {
VStack(alignment: .leading, spacing: 18) {
if let s = model.stats {
LazyVGrid(columns: [GridItem(.adaptive(minimum: 150), spacing: 12)], spacing: 12) {
StatCard(title: "전체", value: s.total, color: Sage.brand)
StatCard(title: "문서", value: s.counts["document"] ?? 0, color: Sage.brand)
StatCard(title: "승인 대기", value: s.libraryPendingSuggestions, color: Sage.amber)
}
VStack(alignment: .leading, spacing: 10) {
Text("카테고리 분포").font(.headline).foregroundStyle(Sage.ink)
ForEach(s.counts.sorted { $0.value > $1.value }, id: \.key) { key, value in
DomainBar(name: Self.categoryLabel(key), count: value, max: s.counts.values.max() ?? 1)
.contentShape(Rectangle())
.onTapGesture { model.section = .documents }
}
}
.padding(16)
.background(Sage.card, in: RoundedRectangle(cornerRadius: 14))
.overlay(RoundedRectangle(cornerRadius: 14).stroke(Sage.line))
} else {
GreetingHeader()
if model.stats == nil && model.tree.isEmpty {
ProgressView().frame(maxWidth: .infinity, minHeight: 200)
} else {
TodayStrip()
HStack(alignment: .top, spacing: 18) {
VStack(alignment: .leading, spacing: 18) {
CaptureCard()
ActivityTimeline()
}
.frame(maxWidth: .infinity)
VStack(alignment: .leading, spacing: 18) {
DomainDistribution()
PinnedItems()
}
.frame(width: 312)
}
}
}
.padding(20)
.frame(maxWidth: 1000, alignment: .leading)
.padding(.horizontal, 30)
.padding(.vertical, 26)
}
.frame(maxWidth: .infinity, alignment: .topLeading)
.background(Sage.surface)
}
}
/// category enum ( raw ).
static func categoryLabel(_ key: String) -> String {
switch key {
case "document": return "문서"
case "library": return "자료실"
case "news": return "뉴스"
case "law": return "법령"
case "memo": return "메모"
case "audio": return "오디오"
case "video": return "비디오"
default: return key
// MARK: - Greeting
private struct GreetingHeader: View {
@Environment(AppModel.self) private var model
var body: some View {
VStack(alignment: .leading, spacing: 3) {
HStack(alignment: .firstTextBaseline, spacing: 10) {
Text("안녕하세요, \(model.currentUser?.username ?? "사용자")")
.font(.system(size: 22, weight: .bold)).kerning(-0.4).foregroundStyle(Sage.ink)
Text("오늘도 지식 쌓는 날.").font(.callout).foregroundStyle(Sage.muted)
}
Text(Self.today).font(.caption).foregroundStyle(Sage.muted.opacity(0.8))
}
.padding(.bottom, 4)
}
static var today: String {
let f = DateFormatter()
f.locale = Locale(identifier: "ko_KR")
f.dateFormat = "y년 M월 d일 EEEE"
return f.string(from: Date())
}
}
// MARK: - Today strip (hero)
private struct TodayStrip: View {
@Environment(AppModel.self) private var model
var body: some View {
VStack(spacing: 14) {
HStack(alignment: .top, spacing: 0) {
reviewQueue
.frame(minWidth: 150, alignment: .leading)
Rectangle().fill(Sage.line).frame(width: 1).padding(.horizontal, 22)
digestTeaser
.frame(maxWidth: .infinity, alignment: .leading)
}
Divider().overlay(Sage.line)
statRow
}
.dashCard(padding: 20)
}
private var reviewQueue: some View {
VStack(alignment: .leading, spacing: 4) {
Text(model.reviewPendingCount.map(String.init) ?? "")
.font(.system(size: 38, weight: .bold)).kerning(-1.5).monospacedDigit()
.foregroundStyle(Sage.amber)
Text("검토 대기 문서").font(.caption).foregroundStyle(Sage.muted)
Button { model.section = .documents } label: {
Text("검토 시작 →").font(.caption.weight(.semibold)).foregroundStyle(Sage.brand)
}
.buttonStyle(.plain)
}
}
@ViewBuilder private var digestTeaser: some View {
if let t = topTopic {
Button { model.section = .digest } label: {
VStack(alignment: .leading, spacing: 6) {
HStack(spacing: 8) {
Chip("속보", Sage.danger)
Text("\(model.digest?.digestDateDisplay ?? "") 브리핑")
.font(.caption2).foregroundStyle(Sage.muted)
}
Text(t.label).font(.system(size: 15)).foregroundStyle(Sage.ink)
.lineLimit(2).fixedSize(horizontal: false, vertical: true)
.multilineTextAlignment(.leading)
Text(t.meta).font(.caption2).foregroundStyle(Sage.muted)
}
.frame(maxWidth: .infinity, alignment: .leading)
}
.buttonStyle(.plain)
} else {
Text("오늘 브리핑이 아직 없습니다").font(.callout).foregroundStyle(Sage.muted)
.frame(maxWidth: .infinity, alignment: .leading)
}
}
private var statRow: some View {
HStack(spacing: 0) {
StatCell(value: model.stats?.total ?? 0, label: "전체", color: Sage.brand)
StatCell(value: model.stats?.counts["document"] ?? 0, label: "문서")
StatCell(value: domainCount("Industrial_Safety"), label: "산업안전",
color: Sage.domainColor("Industrial_Safety"))
StatCell(value: domainCount("Engineering"), label: "엔지니어링",
color: Sage.domainColor("Engineering"))
StatCell(value: domainCount("General"), label: "자료실", color: Sage.domainColor("General"))
StatCell(value: model.stats?.counts["memo"] ?? model.memoList.count, label: "메모")
}
}
private func domainCount(_ name: String) -> Int {
model.tree.first { $0.name == name }?.count ?? 0
}
private var topTopic: (label: String, meta: String)? {
guard let digest = model.digest else { return nil }
var best: (TopicResponse, String)?
for c in digest.countries {
for t in c.topics where best == nil || (t.importanceScore ?? 0) > (best!.0.importanceScore ?? 0) {
best = (t, c.country)
}
}
guard let (t, country) = best else { return nil }
let arts = t.articleCount ?? t.articles.count
var meta = "관련 기사 \(arts)"
if let imp = t.importanceScore { meta += " · 중요도 \(String(format: "%.0f", imp))" }
if !country.isEmpty { meta += " · \(country)" }
return (t.topicLabel, meta)
}
}
// MARK: - Left column
private struct CaptureCard: View {
@Environment(AppModel.self) private var model
var body: some View {
@Bindable var m = model
VStack(alignment: .leading, spacing: 12) {
SectionLabel("빠른 캡처")
HStack(spacing: 8) {
TextField("메모 한 줄 남기기…", text: $m.captureText)
.textFieldStyle(.plain)
.padding(.horizontal, 14).frame(height: 38)
.background(Sage.surface, in: RoundedRectangle(cornerRadius: 8))
.overlay(RoundedRectangle(cornerRadius: 8).stroke(Sage.line))
.onSubmit { Task { await model.saveMemo() } }
Button { Task { await model.saveMemo() } } label: {
Text("저장").font(.callout.weight(.semibold)).foregroundStyle(.white)
.padding(.horizontal, 18).frame(height: 38)
.background(Sage.brand, in: RoundedRectangle(cornerRadius: 8))
}
.buttonStyle(.plain)
.disabled(model.captureText.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty)
}
Button {
guard let url = FilePanels.pickFileToUpload() else { return }
Task { await model.uploadPicked(url) }
} label: {
Text(" 파일 업로드").font(.caption.weight(.semibold)).foregroundStyle(Sage.brand)
.padding(.horizontal, 10).padding(.vertical, 5)
.background(Sage.brand.opacity(0.12), in: Capsule())
}
.buttonStyle(.plain)
}
.frame(maxWidth: .infinity, alignment: .leading)
.dashCard()
}
}
private struct ActivityTimeline: View {
@Environment(AppModel.self) private var model
private var recent: [DocumentResponse] {
model.documentList
.sorted { ($0.updatedAt ?? .distantPast) > ($1.updatedAt ?? .distantPast) }
.prefix(5).map { $0 }
}
var body: some View {
VStack(alignment: .leading, spacing: 12) {
HStack(alignment: .firstTextBaseline) {
SectionLabel("최근 활동")
Spacer()
Button { model.section = .documents } label: {
Text("전체 보기 →").font(.caption.weight(.semibold)).foregroundStyle(Sage.brand)
}
.buttonStyle(.plain)
}
if recent.isEmpty {
Text("최근 활동이 없습니다").font(.caption).foregroundStyle(Sage.muted)
} else {
VStack(spacing: 0) {
ForEach(Array(recent.enumerated()), id: \.element.id) { idx, doc in
ActivityRow(doc: doc, isLast: idx == recent.count - 1)
if idx != recent.count - 1 { Divider().overlay(Sage.line) }
}
}
}
}
.frame(maxWidth: .infinity, alignment: .leading)
.dashCard()
}
}
private struct ActivityRow: View {
@Environment(AppModel.self) private var model
let doc: DocumentResponse
let isLast: Bool
var body: some View {
HStack(alignment: .top, spacing: 12) {
Text(Self.relative(doc.updatedAt))
.font(.caption2).foregroundStyle(Sage.muted)
.frame(width: 54, alignment: .trailing)
VStack(spacing: 0) {
Circle().fill(Sage.domainColor(doc.aiDomain)).frame(width: 8, height: 8).padding(.top, 4)
if !isLast { Rectangle().fill(Sage.line).frame(width: 1).frame(maxHeight: .infinity) }
}
.frame(width: 14)
VStack(alignment: .leading, spacing: 3) {
Text("\(localizedDomain(doc.aiDomain)) · \(doc.displayFormat.uppercased())")
.font(.caption2.weight(.bold)).foregroundStyle(Sage.domainColor(doc.aiDomain))
Text(doc.title ?? doc.downloadLabel).font(.callout).foregroundStyle(Sage.ink).lineLimit(2)
}
.frame(maxWidth: .infinity, alignment: .leading)
.padding(.bottom, isLast ? 0 : 10)
}
.contentShape(Rectangle())
.onTapGesture { model.section = .documents; Task { await model.openDocument(doc.id) } }
}
static func relative(_ date: Date?) -> String {
guard let date else { return "" }
let f = RelativeDateTimeFormatter()
f.locale = Locale(identifier: "ko_KR")
f.unitsStyle = .short
return f.localizedString(for: date, relativeTo: Date())
}
}
// MARK: - Right column
private struct DomainDistribution: View {
@Environment(AppModel.self) private var model
private var domains: [DomainTreeNode] { model.tree.sorted { $0.count > $1.count } }
private var domainTotal: Int { domains.reduce(0) { $0 + $1.count } }
private var sum: Int { max(1, domainTotal) } // 0- ( )
var body: some View {
VStack(alignment: .leading, spacing: 12) {
SectionLabel("도메인 분포")
// = / ( ) .
HStack(alignment: .firstTextBaseline, spacing: 3) {
Text("분류").font(.caption).foregroundStyle(Sage.muted)
Text("\(domainTotal)").font(.system(size: 18, weight: .semibold))
.monospacedDigit().foregroundStyle(Sage.ink)
Text("").font(.caption).foregroundStyle(Sage.muted)
}
GeometryReader { geo in
HStack(spacing: 2) {
ForEach(domains) { d in
Rectangle().fill(Sage.domainColor(d.name))
.frame(width: max(2, geo.size.width * CGFloat(d.count) / CGFloat(sum)))
}
}
}
.frame(height: 8)
.clipShape(RoundedRectangle(cornerRadius: 4))
VStack(spacing: 7) {
ForEach(domains) { d in
Button {
model.section = .documents
Task { await model.loadDocuments(domain: d.path) }
} label: {
HStack(spacing: 8) {
RoundedRectangle(cornerRadius: 2).fill(Sage.domainColor(d.name)).frame(width: 10, height: 10)
Text(localizedDomain(d.name)).font(.caption).foregroundStyle(Sage.ink)
.lineLimit(1).frame(maxWidth: .infinity, alignment: .leading)
Text("\(d.count)").font(.caption.monospacedDigit()).foregroundStyle(Sage.muted)
}
}
.buttonStyle(.plain)
}
}
}
.frame(maxWidth: .infinity, alignment: .leading)
.dashCard()
}
}
private struct PinnedItems: View {
@Environment(AppModel.self) private var model
private var docs: [DocumentResponse] { model.documentList.filter { $0.pinned == true } }
private var memos: [MemoResponse] { model.memoList.filter { $0.isPinned } }
var body: some View {
VStack(alignment: .leading, spacing: 12) {
HStack {
SectionLabel("고정 항목")
Spacer()
Button { model.section = .documents } label: {
Text("관리 →").font(.caption.weight(.semibold)).foregroundStyle(Sage.brand)
}
.buttonStyle(.plain)
}
if docs.isEmpty && memos.isEmpty {
Text("고정된 항목이 없습니다").font(.caption).foregroundStyle(Sage.muted)
} else {
VStack(spacing: 8) {
ForEach(docs) { d in
PinRow(kind: "문서", kindColor: Sage.domainColor("Engineering"),
title: d.title ?? d.downloadLabel, date: d.updatedAtRaw) {
model.section = .documents; Task { await model.openDocument(d.id) }
}
}
ForEach(memos) { m in
PinRow(kind: "메모", kindColor: Sage.brand,
title: m.title ?? (m.content ?? "메모"), date: m.updatedAtRaw ?? "") {
model.section = .memos; Task { await model.openMemo(m.id) }
}
}
}
}
}
.frame(maxWidth: .infinity, alignment: .leading)
.dashCard()
}
}
private struct PinRow: View {
let kind: String
let kindColor: Color
let title: String
let date: String
let action: () -> Void
var body: some View {
Button(action: action) {
HStack(alignment: .top, spacing: 10) {
Chip(kind, kindColor)
Text(title).font(.caption).foregroundStyle(Sage.ink).lineLimit(2)
.frame(maxWidth: .infinity, alignment: .leading)
Text(date.prefix(10)).font(.caption2.monospacedDigit()).foregroundStyle(Sage.muted)
}
.padding(10)
.background(Sage.surface, in: RoundedRectangle(cornerRadius: 8))
}
.buttonStyle(.plain)
}
}
#if DEBUG
#Preview("Dashboard") {
@Previewable @State var model = AppModel.preview
DashboardView()
.environment(model)
.frame(width: 1100, height: 760)
.task { await model.bootstrap() }
}
#endif
@@ -1,91 +1,367 @@
import SwiftUI
import DSKit
struct DocumentListView: View {
/// = DEVONthink . () , detail
/// HSplitView 3-pane = | MD | ().
/// model.loadDocuments(domain:) .
struct DocumentsBrowser: View {
@Environment(AppModel.self) private var model
@State private var showInspector = true
@State private var sortOrder = [KeyPathComparator(\DocumentResponse.sortUpdated, order: .reverse)]
var body: some View {
HSplitView {
DocumentListTable(sortOrder: $sortOrder)
.frame(minWidth: 300, idealWidth: 360, maxWidth: 460)
DocumentReader(showInspector: $showInspector)
.frame(minWidth: 420, maxWidth: .infinity)
if showInspector, let d = model.documentDetail {
DocumentInspector(detail: d)
.frame(minWidth: 280, idealWidth: 320, maxWidth: 360)
}
}
.task { await model.ensureDocumentsLoaded() } // load-all
}
}
// MARK: - Column list (sortable Table)
private extension DocumentResponse {
var sortTitle: String { title ?? downloadLabel }
var sortFormat: String { (originalFormat ?? fileFormat ?? "").lowercased() }
var sortUpdated: String { updatedAtRaw }
/// "PDFMD" / "MD" .
var formatBadge: String {
if let orig = originalFormat, orig.lowercased() != (fileFormat ?? "").lowercased() {
return "\(orig.uppercased())→MD"
}
return displayFormat.uppercased()
}
}
struct DocumentListTable: View {
@Environment(AppModel.self) private var model
@Binding var sortOrder: [KeyPathComparator<DocumentResponse>]
private var documents: [DocumentResponse] { model.documentList.sorted(using: sortOrder) }
var body: some View {
let selection = Binding<Int?>(
get: { model.selectedDocumentID },
set: { if let id = $0 { Task { await model.openDocument(id) } } }
)
List(model.documentList, selection: selection) { doc in
DocumentRow(doc: doc)
}
.listStyle(.inset)
.background(Sage.surface)
}
}
struct DocumentRow: View {
let doc: DocumentResponse
var body: some View {
VStack(alignment: .leading, spacing: 4) {
HStack(spacing: 6) {
Chip(doc.displayFormat.uppercased(), Sage.formatColor(doc.displayFormat))
Text(doc.title ?? doc.downloadLabel)
.font(.callout.weight(.medium)).foregroundStyle(Sage.ink).lineLimit(1)
Spacer()
if doc.pinned == true { Text("고정").font(.caption2).foregroundStyle(Sage.amber) }
}
HStack(spacing: 6) {
if let d = doc.aiDomain { Chip(d, Sage.domainColor(d)) }
if let r = doc.reviewStatus {
Text(r).font(.caption2).foregroundStyle(Sage.reviewStatusColor(r))
Group {
if model.documentList.isEmpty {
EmptyState(text: "문서가 없습니다")
} else {
Table(documents, selection: selection, sortOrder: $sortOrder) {
TableColumn("제목", value: \.sortTitle) { doc in
VStack(alignment: .leading, spacing: 2) {
Text(doc.title ?? doc.downloadLabel)
.font(.system(size: 12.5, weight: .semibold)).foregroundStyle(Sage.ink).lineLimit(1)
Text(localizedDomain(doc.aiDomain))
.font(.system(size: 11)).foregroundStyle(Sage.muted).lineLimit(1)
}
.padding(.vertical, 2)
}
TableColumn("종류", value: \.sortFormat) { doc in
Chip(doc.formatBadge, Sage.formatColor(doc.originalFormat ?? doc.displayFormat))
}
.width(min: 66, ideal: 74, max: 96)
TableColumn("수정", value: \.sortUpdated) { doc in
Text(doc.updatedAtRaw.prefix(10))
.font(.caption2.monospacedDigit()).foregroundStyle(Sage.muted)
}
.width(min: 78, ideal: 86, max: 110)
}
Spacer()
Text(doc.updatedAtRaw.prefix(10)).font(.caption2.monospacedDigit()).foregroundStyle(Sage.muted)
.tint(Sage.brand)
}
}
.padding(.vertical, 4)
.background(Sage.card)
}
}
/// MD-first detail: render md_content when renderable, else extracted_text fallback + 'MD '
/// badge + emphasized original-download button. (Download builds a real-shaped ?token= URL.)
struct DocumentDetailView: View {
// MARK: - Reader
struct DocumentReader: View {
@Environment(AppModel.self) private var model
@Binding var showInspector: Bool
var body: some View {
Group {
if let detail = model.documentDetail {
VStack(spacing: 0) {
ReaderHeader(detail: detail, showInspector: $showInspector)
ReaderBody(detail: detail)
}
} else {
EmptyState(text: "문서를 선택하세요")
}
}
.background(Sage.card)
}
}
private struct ReaderHeader: View {
let detail: DocumentDetailResponse
@Binding var showInspector: Bool
var body: some View {
VStack(alignment: .leading, spacing: 6) {
Text(crumb).font(.system(size: 11)).foregroundStyle(Sage.muted).lineLimit(1)
HStack(alignment: .firstTextBaseline, spacing: 10) {
Text(detail.base.title ?? detail.base.downloadLabel)
.font(.system(size: 18, weight: .heavy)).foregroundStyle(Sage.ink).lineLimit(2)
Spacer()
DownloadButton(doc: detail.base, compact: true)
inspectorToggle
}
metaBadges
tagRow
}
.padding(.horizontal, 26).padding(.vertical, 14)
.frame(maxWidth: .infinity, alignment: .leading)
.background(Sage.card)
.overlay(alignment: .bottom) { Rectangle().fill(Sage.line).frame(height: 1) }
}
private var crumb: String {
let dom = localizedDomain(detail.base.aiDomain)
if let sub = detail.base.aiSubGroup, !sub.isEmpty { return "\(dom) \(sub)" }
return dom
}
/// : · · tier DEEP · · PDFMD success.
@ViewBuilder private var metaBadges: some View {
let b = detail.base
ScrollView(.horizontal, showsIndicators: false) {
HStack(spacing: 6) {
if let d = b.aiDomain { Chip(localizedDomain(d), Sage.domainColor(d)) }
if let t = b.documentType, !t.isEmpty { Chip(t, Sage.muted) }
if b.aiAnalysisTier == "deep" { Chip("tier DEEP", Sage.brand) }
if let c = b.aiConfidence { Chip("신뢰도 \(String(format: "%.2f", c))", Sage.brandDark) }
if detail.mdIsRenderable { Chip("PDF→MD success", Sage.mdStatusColor("completed")) }
}
}
}
private var inspectorToggle: some View {
Button { withAnimation(.easeInOut(duration: 0.2)) { showInspector.toggle() } } label: {
Image(systemName: "info.circle").font(.system(size: 15))
.foregroundStyle(showInspector ? Sage.brandDark : Sage.muted)
.frame(width: 30, height: 30)
.background(showInspector ? Sage.brand.opacity(0.14) : Sage.card, in: RoundedRectangle(cornerRadius: 8))
.overlay(RoundedRectangle(cornerRadius: 8).stroke(showInspector ? Sage.brand : Sage.line))
}
.buttonStyle(.plain)
.help("인스펙터")
}
@ViewBuilder private var tagRow: some View {
let tags = detail.base.aiTags ?? []
if detail.mdStatus != nil || !tags.isEmpty {
ScrollView(.horizontal, showsIndicators: false) {
HStack(spacing: 6) {
if let st = detail.mdStatus { Chip("MD \(st)", Sage.mdStatusColor(st)) }
ForEach(tags, id: \.self) { Chip($0, Sage.brand) }
}
}
}
}
}
private struct ReaderBody: View {
let detail: DocumentDetailResponse
var body: some View {
ScrollView {
VStack(alignment: .leading, spacing: 14) {
Text(detail.base.title ?? detail.base.downloadLabel)
.font(.title2.weight(.bold)).foregroundStyle(Sage.ink)
HStack(spacing: 8) {
if let d = detail.base.aiDomain { Chip(d, Sage.domainColor(d)) }
Chip(detail.base.displayFormat.uppercased(), Sage.formatColor(detail.base.displayFormat))
if let conf = detail.base.aiConfidence {
Chip("AI \(String(format: "%.0f%%", conf * 100))", Sage.muted)
}
Spacer()
if let url = model.downloadURL(for: detail.base) {
Link(detail.base.downloadLabel, destination: url).font(.callout.weight(.semibold))
}
}
if let tags = detail.base.aiTags, !tags.isEmpty {
HStack(spacing: 6) { ForEach(tags, id: \.self) { Chip($0, Sage.brand) } }
}
Divider()
if detail.mdIsRenderable, let md = detail.mdContent {
MarkdownView(md)
} else {
HStack { Chip("MD 변환 대기", Sage.amber); Spacer() }
Text(detail.extractedText ?? "본문 없음")
.font(.body).foregroundStyle(Sage.muted)
.frame(maxWidth: .infinity, alignment: .leading)
if let url = model.downloadURL(for: detail.base) {
Link("원본 다운로드 — \(detail.base.downloadLabel)", destination: url)
.font(.callout.weight(.semibold))
HStack(spacing: 0) {
Spacer(minLength: 0)
VStack(alignment: .leading, spacing: 14) {
if detail.mdIsRenderable, let md = detail.mdContent {
MarkdownView(md)
} else {
HStack { Chip("MD 변환 대기", Sage.amber); Spacer() }
Text(detail.extractedText ?? "본문 없음")
.font(.body).foregroundStyle(Sage.muted)
.frame(maxWidth: .infinity, alignment: .leading)
DownloadButton(doc: detail.base, compact: false)
}
}
.frame(maxWidth: 700, alignment: .leading)
Spacer(minLength: 0)
}
.padding(.horizontal, 28).padding(.top, 22).padding(.bottom, 44)
}
.background(Sage.card)
}
}
// MARK: - Inspector
struct DocumentInspector: View {
let detail: DocumentDetailResponse
private var base: DocumentResponse { detail.base }
var body: some View {
ScrollView {
VStack(alignment: .leading, spacing: 18) {
// ( : TL;DR · · · )
if let tldr = (base.aiTldr ?? base.aiSummary), !tldr.isEmpty {
InspectorSection("TL;DR") {
Text(tldr).font(.system(size: 12)).foregroundStyle(Sage.ink).lineSpacing(2)
.frame(maxWidth: .infinity, alignment: .leading)
}
}
if let bullets = base.aiBullets, !bullets.isEmpty {
InspectorSection("핵심점") {
VStack(alignment: .leading, spacing: 6) {
ForEach(bullets, id: \.self) { b in
HStack(alignment: .top, spacing: 6) {
Text("·").font(.system(size: 12, weight: .bold)).foregroundStyle(Sage.amber)
Text(b).font(.system(size: 12)).foregroundStyle(Sage.ink)
.frame(maxWidth: .infinity, alignment: .leading)
}
}
}
}
}
if let deep = base.aiDetailSummary, !deep.isEmpty {
InspectorSection("심층") {
VStack(alignment: .leading, spacing: 6) {
if base.aiAnalysisTier == "deep" { Chip("DEEP", Sage.brand) }
Text(deep).font(.system(size: 11.5)).foregroundStyle(Sage.ink).lineSpacing(2)
.frame(maxWidth: .infinity, alignment: .leading)
}
}
}
if let inc = base.aiInconsistencies, !inc.isEmpty {
InspectorSection("불일치 \(inc.count)") {
VStack(alignment: .leading, spacing: 5) {
ForEach(inc, id: \.self) { x in
Text("· \(x)").font(.system(size: 11.5)).foregroundStyle(Sage.ink)
.frame(maxWidth: .infinity, alignment: .leading)
}
}
}
}
//
InspectorSection("정보") {
VStack(spacing: 0) {
KV("종류", base.formatBadge)
KV("도메인", localizedDomain(base.aiDomain))
KV("하위", base.aiSubGroup ?? "")
KV("수정", String(base.updatedAtRaw.prefix(10)))
if let size = base.fileSize {
KV("원본", ByteCountFormatter.string(fromByteCount: Int64(size), countStyle: .file))
}
if let st = detail.mdStatus { KV("md 상태", st, color: Sage.mdStatusColor(st)) }
if let tier = base.aiAnalysisTier { KV("tier", tier, color: Sage.brandDark) }
if let c = base.aiConfidence { KV("신뢰도", String(format: "%.2f", c), color: Sage.brand) }
KV("읽음", "\(base.reads)")
}
}
if let tags = base.aiTags, !tags.isEmpty {
InspectorSection("태그") { TagWrap(tags: tags) }
}
}
.padding(.horizontal, 16).padding(.vertical, 18)
}
.frame(maxWidth: .infinity, alignment: .leading)
.background(Sage.sidebar)
.overlay(alignment: .leading) { Rectangle().fill(Sage.line).frame(width: 1) }
}
}
private struct InspectorSection<Content: View>: View {
let title: String
@ViewBuilder let content: Content
init(_ title: String, @ViewBuilder content: () -> Content) { self.title = title; self.content = content() }
var body: some View {
VStack(alignment: .leading, spacing: 8) {
Text(title).font(.system(size: 10, weight: .heavy)).tracking(0.8)
.textCase(.uppercase).foregroundStyle(Sage.muted.opacity(0.8))
content
}
.frame(maxWidth: .infinity, alignment: .leading)
}
}
private struct KV: View {
let k: String
let v: String
var color: Color = Sage.ink
init(_ k: String, _ v: String, color: Color = Sage.ink) { self.k = k; self.v = v; self.color = color }
var body: some View {
HStack {
Text(k).font(.system(size: 12)).foregroundStyle(Sage.muted)
Spacer()
Text(v).font(.system(size: 12, weight: .semibold)).foregroundStyle(color)
.multilineTextAlignment(.trailing)
}
.padding(.vertical, 3)
}
}
/// (2 Layout ).
private struct TagWrap: View {
let tags: [String]
var body: some View {
VStack(alignment: .leading, spacing: 6) {
ForEach(Array(stride(from: 0, to: tags.count, by: 2)), id: \.self) { i in
HStack(spacing: 6) {
Chip(tags[i], Sage.brand)
if i + 1 < tags.count { Chip(tags[i + 1], Sage.brand) }
Spacer(minLength: 0)
}
}
}
}
}
// MARK: - Native download button (preserved)
/// . ?token= URL NSSavePanel (
/// ). + / . note .
struct DownloadButton: View {
@Environment(AppModel.self) private var model
let doc: DocumentResponse
/// compact = () / false = .
var compact: Bool
@State private var busy = false
@State private var status: String?
@State private var isError = false
var body: some View {
if let url = model.downloadURL(for: doc) {
HStack(spacing: 8) {
Button {
Task {
busy = true; status = nil; isError = false
let outcome = await FileDownloader.download(from: url, suggestedName: doc.downloadLabel)
busy = false
switch outcome {
case .saved(let dest): status = "저장됨: \(dest.lastPathComponent)"; isError = false
case .cancelled: status = nil
case .failed(let msg): status = msg; isError = true
}
}
} label: {
Label(compact ? doc.downloadLabel : "원본 다운로드 — \(doc.downloadLabel)",
systemImage: "arrow.down.circle")
.font(.callout.weight(.semibold))
}
.buttonStyle(.borderless)
.disabled(busy)
if busy { ProgressView().controlSize(.small) }
if let s = status {
Text(s).font(.caption)
.foregroundStyle(isError ? Sage.danger : Sage.muted)
.lineLimit(1)
}
}
.padding(20)
}
.background(Sage.surface)
}
}
@@ -13,11 +13,10 @@ struct MemoListView: View {
.textFieldStyle(.roundedBorder)
Button("저장") {
let content = draft
draft = ""
Task { _ = try? await model.client.createMemo(MemoCreate(content: content)) }
Task { if await model.saveMemo(content) { draft = "" } }
}
.buttonStyle(.bordered)
.disabled(draft.isEmpty)
.disabled(draft.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty)
}
.padding(12)
@@ -1,50 +0,0 @@
import SwiftUI
import DSKit
/// Distinct from the Documents table: relevance-forward result cards (score bar + match_reason).
struct SearchView: View {
@Environment(AppModel.self) private var model
var body: some View {
@Bindable var model = model
VStack(alignment: .leading, spacing: 0) {
HStack(spacing: 8) {
TextField("검색어를 입력하세요", text: $model.searchQuery)
.textFieldStyle(.roundedBorder)
.onSubmit { Task { await model.runSearch() } }
Button("검색") { Task { await model.runSearch() } }
.buttonStyle(.borderedProminent)
}
.padding(12)
if let response = model.searchResponse {
List(response.results) { result in
VStack(alignment: .leading, spacing: 5) {
HStack(spacing: 6) {
if let d = result.aiDomain { Chip(d, Sage.domainColor(d)) }
Text(result.title ?? "문서 \(result.id)")
.font(.callout.weight(.medium)).foregroundStyle(Sage.ink).lineLimit(1)
Spacer()
if let m = result.matchReason {
Text(m).font(.caption2).foregroundStyle(Sage.muted)
}
}
Text(result.snippet ?? result.aiSummary ?? "")
.font(.caption).foregroundStyle(Sage.muted).lineLimit(2)
if let score = result.score { ScoreBar(score: score) }
}
.padding(.vertical, 4)
.contentShape(Rectangle())
.onTapGesture {
model.section = .documents
Task { await model.openDocument(result.id) }
}
}
.listStyle(.inset)
} else {
EmptyState(text: "검색어를 입력하세요")
}
}
.background(Sage.surface)
}
}
@@ -1,5 +1,58 @@
import SwiftUI
/// raw (/ enum ) . Sage.domainColor(raw) raw
/// raw, . .
func localizedDomain(_ raw: String?) -> String {
guard let raw, !raw.isEmpty else { return "미분류" }
// (Philosophy/Aesthetics) leaf , leaf
let leaf = raw.split(separator: "/").last.map(String.init) ?? raw
let map: [String: String] = [
"Engineering": "엔지니어링", "Industrial_Safety": "산업안전", "General": "자료실",
"Programming": "프로그래밍", "법령": "법령", "Philosophy": "철학",
]
return map[raw] ?? map[leaf] ?? leaf
}
/// / (·heavy·muted) / .
struct SectionLabel: View {
let text: String
init(_ text: String) { self.text = text }
var body: some View {
Text(text)
.font(.caption.weight(.heavy))
.textCase(.uppercase)
.kerning(0.7)
.foregroundStyle(Sage.muted)
}
}
/// (Sage.card + corner 12 + Sage.line stroke + ).
struct DashCard: ViewModifier {
var padding: CGFloat = 18
func body(content: Content) -> some View {
content
.padding(padding)
.background(Sage.card, in: RoundedRectangle(cornerRadius: 12))
.overlay(RoundedRectangle(cornerRadius: 12).stroke(Sage.line))
}
}
extension View { func dashCard(padding: CGFloat = 18) -> some View { modifier(DashCard(padding: padding)) } }
/// ( ). StatCard .
struct StatCell: View {
let value: Int
let label: String
var color: Color = Sage.ink
var body: some View {
VStack(alignment: .leading, spacing: 3) {
Text("\(value)").font(.system(size: 20, weight: .semibold)).kerning(-0.6)
.monospacedDigit().foregroundStyle(color)
Text(label).font(.caption2).foregroundStyle(Sage.muted)
}
.frame(maxWidth: .infinity, alignment: .leading)
}
}
struct StatCard: View {
let title: String
let value: Int
@@ -1,9 +1,10 @@
import SwiftUI
import DSKit
/// DEVONthink-style 3-column shell. RootView only ROUTES; each page owns its own interior treatment
/// (no shell-level auto-inherit). macOS-only target.
/// : checking( refresh ) loggedOut(LoginView) ready(3-pane ).
/// 2-column ( + detail). detail
/// (= / = HSplitView 3-pane / =+). 3-column
/// . macOS-only.
/// : checking(refresh ) loggedOut(LoginView) ready().
public struct RootView: View {
@Environment(AppModel.self) private var model
@State private var columnVisibility: NavigationSplitViewVisibility = .all
@@ -29,38 +30,45 @@ public struct RootView: View {
private var shell: some View {
NavigationSplitView(columnVisibility: $columnVisibility) {
Sidebar()
.navigationSplitViewColumnWidth(min: 220, ideal: 250)
} content: {
ContentColumn()
.navigationSplitViewColumnWidth(min: 300, ideal: 380)
.navigationSplitViewColumnWidth(min: 200, ideal: 215, max: 270)
} detail: {
DetailColumn()
SectionDetail()
}
.navigationSplitViewStyle(.balanced)
.tint(Sage.brand)
.toolbar {
ToolbarItem(placement: .primaryAction) { UploadToolbarButton() }
ToolbarItem(placement: .primaryAction) { AccountMenu() }
}
.safeAreaInset(edge: .bottom) {
// (no-silent-fallback) .
if let err = model.errorText {
HStack(spacing: 10) {
Text(err)
.font(.callout)
.foregroundStyle(.white)
.lineLimit(2)
Spacer()
Button("닫기") { model.errorText = nil }
.buttonStyle(.plain)
.foregroundStyle(.white.opacity(0.85))
VStack(spacing: 0) {
UploadStatusBar()
// (no-silent-fallback) .
if let err = model.errorText {
HStack(spacing: 10) {
Text(err)
.font(.callout)
.foregroundStyle(.white)
.lineLimit(2)
Spacer()
Button("닫기") { model.errorText = nil }
.buttonStyle(.plain)
.foregroundStyle(.white.opacity(0.85))
}
.padding(.horizontal, 14)
.padding(.vertical, 8)
.background(Sage.danger)
}
.padding(.horizontal, 14)
.padding(.vertical, 8)
.background(Sage.danger)
}
}
}
}
// MARK: - Sidebar
struct Sidebar: View {
@Environment(AppModel.self) private var model
private let navSections: [AppModel.Section] = [.dashboard, .documents, .digest, .memos]
var body: some View {
let selection = Binding<AppModel.Section?>(
@@ -68,73 +76,132 @@ struct Sidebar: View {
set: { if let v = $0 { model.section = v } }
)
List(selection: selection) {
BrandRow().selectionDisabled()
Section {
ForEach(AppModel.Section.allCases) { s in
Text(s.title).tag(s)
ForEach(navSections) { s in
Label(s.title, systemImage: Self.icon(s)).tag(s)
}
}
if model.section == .documents, !model.tree.isEmpty {
Section("도메인") {
ForEach(model.tree) { node in
DomainRow(node: node)
}
}
// ( 4- ).
if model.section == .documents {
DocumentsSourceSidebar()
}
}
.listStyle(.sidebar)
.background(Sage.sidebar)
}
}
struct DomainRow: View {
@Environment(AppModel.self) private var model
let node: DomainTreeNode
var body: some View {
HStack(spacing: 8) {
Circle().fill(Sage.domainColor(node.name)).frame(width: 8, height: 8)
Text(node.name).font(.callout).foregroundStyle(Sage.ink)
Spacer()
Text("\(node.count)").font(.caption).foregroundStyle(Sage.muted)
static func icon(_ s: AppModel.Section) -> String {
switch s {
case .dashboard: return "house"
case .documents: return "folder"
case .digest: return "newspaper"
case .memos: return "note.text"
}
.contentShape(Rectangle())
.onTapGesture { model.section = .documents }
}
}
struct ContentColumn: View {
struct BrandRow: View {
var body: some View {
HStack(spacing: 8) {
RoundedRectangle(cornerRadius: 7).fill(Sage.brand).frame(width: 26, height: 26)
.overlay(Text("DS").font(.system(size: 10, weight: .heavy)).foregroundStyle(.white))
Text("Document Server").font(.system(size: 13.5, weight: .heavy)).foregroundStyle(Sage.ink)
}
.padding(.vertical, 4)
}
}
/// : ( = ) + /( placeholder).
struct DocumentsSourceSidebar: View {
@Environment(AppModel.self) private var model
var body: some View {
Section("분류") {
SourceRow(label: "전체 문서", color: nil, count: model.stats?.total,
selected: model.documentDomainFilter == nil) {
Task { await model.loadDocuments(domain: nil) }
}
ForEach(model.tree) { node in
SourceRow(label: localizedDomain(node.name), color: Sage.domainColor(node.name),
count: node.count, selected: model.documentDomainFilter == node.path) {
Task { await model.loadDocuments(domain: node.path) }
}
}
}
// IA ( ).
Section("스마트 그룹") {
ForEach(["최근 7일", "검토 대기", "법령 알림"], id: \.self) { t in
Text(t).font(.callout).foregroundStyle(Sage.muted).opacity(0.5)
}
}
Section("태그") {
ForEach(["압력용기", "ASME", "받은편지함"], id: \.self) { t in
Text("#\(t)").font(.callout).foregroundStyle(Sage.muted).opacity(0.5)
}
}
}
}
/// (). brand-soft List ( ).
struct SourceRow: View {
let label: String
let color: Color?
let count: Int?
let selected: Bool
let action: () -> Void
var body: some View {
HStack(spacing: 8) {
if let color { RoundedRectangle(cornerRadius: 3).fill(color).frame(width: 8, height: 8) }
Text(label).font(.callout)
.foregroundStyle(selected ? Sage.brandDark : Sage.ink)
.fontWeight(selected ? .bold : .regular)
.lineLimit(1)
Spacer()
if let count { Text("\(count)").font(.caption.monospacedDigit()).foregroundStyle(Sage.muted) }
}
.padding(.vertical, 2)
.contentShape(Rectangle())
.onTapGesture(perform: action)
.listRowBackground(selected ? Sage.brand.opacity(0.14) : Color.clear)
}
}
// MARK: - Section router
/// detail . inspector/list .
struct SectionDetail: View {
@Environment(AppModel.self) private var model
var body: some View {
Group {
switch model.section {
case .dashboard: DashboardView()
case .documents: DocumentListView()
case .search: SearchView()
case .ask: AskView()
case .memos: MemoListView()
case .digest: DigestView()
case .dashboard: DashboardView() //
case .documents: DocumentsBrowser() // HSplitView 3-pane
case .digest: DigestView() // ( )
case .memos: MemosBoard() // + ( )
}
}
.frame(maxWidth: .infinity, maxHeight: .infinity)
.background(Sage.surface)
.navigationTitle(model.section.title)
}
}
struct DetailColumn: View {
/// v1 + split ( ).
struct MemosBoard: View {
@Environment(AppModel.self) private var model
var body: some View {
Group {
switch model.section {
case .documents:
if let d = model.documentDetail { DocumentDetailView(detail: d) }
else { EmptyState(text: "문서를 선택하세요") }
case .memos:
HSplitView {
MemoListView()
.frame(minWidth: 300, idealWidth: 360, maxWidth: 460)
Group {
if let m = model.memoDetail { MemoDetailView(memo: m) }
else { EmptyState(text: "메모를 선택하세요") }
default:
EmptyState(text: model.section.title)
}
.frame(minWidth: 360, maxWidth: .infinity)
}
}
}
@@ -149,11 +216,96 @@ struct EmptyState: View {
}
}
// MARK: - Toolbar items
/// NSOpenPanel . .
struct UploadToolbarButton: View {
@Environment(AppModel.self) private var model
var body: some View {
Button {
guard let fileURL = FilePanels.pickFileToUpload() else { return }
Task { await model.uploadPicked(fileURL) }
} label: {
Label("업로드", systemImage: "square.and.arrow.up")
}
.help("문서 업로드")
.disabled(isUploading)
}
private var isUploading: Bool {
if case .uploading = model.uploadState { return true }
return false
}
}
/// + ( ).
struct AccountMenu: View {
@Environment(AppModel.self) private var model
@State private var confirmLogout = false
var body: some View {
Menu {
Button("로그아웃", role: .destructive) { confirmLogout = true }
} label: {
Label(model.currentUser?.username ?? "계정", systemImage: "person.crop.circle")
}
.help("계정")
.confirmationDialog("로그아웃하시겠습니까?", isPresented: $confirmLogout, titleVisibility: .visible) {
Button("로그아웃", role: .destructive) { Task { await model.logout() } }
Button("취소", role: .cancel) {}
}
}
}
/// / . uploading=( ) / done=( )+ / failed=+.
struct UploadStatusBar: View {
@Environment(AppModel.self) private var model
var body: some View {
switch model.uploadState {
case .idle:
EmptyView()
case .uploading(let name):
row(bg: Sage.brand) {
ProgressView().controlSize(.small).tint(.white)
Text("업로드 중 — \(name)").font(.callout).foregroundStyle(.white).lineLimit(1)
Spacer()
}
case .done(let title):
row(bg: Sage.brand) {
Text("업로드 완료 — \(title) (처리 대기 중)").font(.callout).foregroundStyle(.white).lineLimit(1)
Spacer()
closeButton
}
case .failed(let msg):
row(bg: Sage.danger) {
Text("업로드 실패 — \(msg)").font(.callout).foregroundStyle(.white).lineLimit(2)
Spacer()
closeButton
}
}
}
private var closeButton: some View {
Button("닫기") { model.dismissUploadStatus() }
.buttonStyle(.plain)
.foregroundStyle(.white.opacity(0.85))
}
private func row<Content: View>(bg: Color, @ViewBuilder _ content: () -> Content) -> some View {
HStack(spacing: 10) { content() }
.padding(.horizontal, 14)
.padding(.vertical, 8)
.background(bg)
}
}
#if DEBUG
#Preview("DS App — full shell") {
@Previewable @State var model = AppModel.preview
RootView()
.environment(model)
.frame(minWidth: 1000, minHeight: 660)
.frame(minWidth: 1100, minHeight: 700)
}
#endif
@@ -2,23 +2,24 @@ import SwiftUI
import Observation
import DSKit
import AIFabric
import UniformTypeIdentifiers
/// The single app-state store driving the 3-pane shell. @MainActor @Observable: mutations are
/// main-isolated; the DSClient returns Sendable models; AIService is an actor.
@MainActor
@Observable
public final class AppModel {
/// = ···. (ask)·(AI chat) v1 macOS (2026-06-15)
/// AIFabric(S2) iPhone/Watch , UI .
public enum Section: String, CaseIterable, Identifiable, Hashable {
case dashboard, documents, search, ask, memos, digest
case dashboard, documents, digest, memos
public var id: String { rawValue }
public var title: String {
switch self {
case .dashboard: return "대시보드"
case .dashboard: return ""
case .documents: return "문서"
case .search: return "검색"
case .ask: return "질문"
case .memos: return "메모"
case .digest: return "뉴스"
case .memos: return "메모"
}
}
}
@@ -27,19 +28,33 @@ public final class AppModel {
/// (ready). Fixture refresh fixture ready.
public enum AuthPhase: Equatable { case checking, loggedOut, ready }
/// / + . done/failed .
public enum UploadState: Equatable, Sendable {
case idle
case uploading(name: String)
case done(title: String)
case failed(String)
}
public var section: Section = .dashboard
public var selectedDocumentID: Int?
public var selectedMemoID: Int?
public var tree: [DomainTreeNode] = []
public var stats: CategoryCounts?
/// ( ). loadInitial count . nil=.
public var reviewPendingCount: Int?
/// ( ). loadInitial me() .
public var currentUser: UserResponse?
public private(set) var uploadState: UploadState = .idle
/// (CaptureCard , saveMemo ).
public var captureText: String = ""
public var documentList: [DocumentResponse] = []
public var documentDetail: DocumentDetailResponse?
public var searchQuery: String = ""
public var searchResponse: SearchResponse?
public var askQuery: String = ""
public var askResult: AIResult?
public var askMeta: DSKit.AskResponse? // qualified: AIFabric also defines an AskResponse
/// ( path, nil = ).
public var documentDomainFilter: String?
/// ( load-all ). .
public private(set) var documentsFullyLoaded = false
public var memoList: [MemoResponse] = []
public var memoDetail: MemoResponse?
public var digest: DigestResponse?
@@ -129,11 +144,16 @@ public final class AppModel {
}
public func loadInitial() async {
await guarded { self.currentUser = try await self.client.me() }
await guarded { self.tree = try await self.client.documentTree() }
await guarded { self.stats = try await self.client.categoryCounts() }
await guarded { self.documentList = try await self.client.documents(DocumentListQuery()).items }
await guarded { self.memoList = try await self.client.memos(MemoListQuery()).items }
await guarded { self.digest = try await self.client.digest(date: nil, country: nil) }
await guarded {
var q = DocumentListQuery(); q.reviewStatus = "pending"; q.pageSize = 1
self.reviewPendingCount = try await self.client.documents(q).total
}
}
public func openDocument(_ id: Int) async {
@@ -141,15 +161,60 @@ public final class AppModel {
await guarded { self.documentDetail = try await self.client.document(id: id) }
}
public func runSearch() async {
guard !searchQuery.isEmpty else { return }
await guarded { self.searchResponse = try await self.client.search(q: self.searchQuery, mode: .hybrid, page: 1, debug: false) }
/// ( ). load-all.
public func ensureDocumentsLoaded() async {
if !documentsFullyLoaded { await loadDocuments(domain: documentDomainFilter) }
}
public func runAsk(backend: AIProviderID?) async {
guard !askQuery.isEmpty else { return }
askResult = await ai.corpusAsk(question: askQuery, explicit: backend)
await guarded { self.askMeta = try await self.client.ask(q: self.askQuery, limit: nil, backend: nil, debug: false) }
/// **** load-all ( page_size 100
/// 1582 ). append .
/// / 3-pane .
public func loadDocuments(domain: String?) async {
documentDomainFilter = domain
documentsFullyLoaded = false
documentList = []
let pageSize = 100
var page = 1
do {
while page <= 80 { // ~8000
var q = DocumentListQuery(); q.domain = domain; q.page = page; q.pageSize = pageSize
let resp = try await client.documents(q)
documentList.append(contentsOf: resp.items)
if resp.items.count < pageSize || documentList.count >= resp.total { break }
page += 1
}
documentsFullyLoaded = true
} catch let e as DSError where e.isAuthExpired {
authPhase = .loggedOut
loginError = "세션이 만료되었습니다. 다시 로그인하세요."
} catch {
errorText = (error as? LocalizedError)?.errorDescription ?? "\(error)"
}
await syncAccessToken()
if let sel = selectedDocumentID, !documentList.contains(where: { $0.id == sel }) {
selectedDocumentID = nil
documentDetail = nil
}
}
/// . true. / (false).
/// guarded errorText ( ).
@discardableResult
public func saveMemo(_ text: String) async -> Bool {
let t = text.trimmingCharacters(in: .whitespacesAndNewlines)
guard !t.isEmpty else { return false }
var ok = false
await guarded {
let memo = try await self.client.createMemo(MemoCreate(content: t))
self.memoList.insert(memo, at: 0)
ok = true
}
return ok
}
/// captureText , .
public func saveMemo() async {
if await saveMemo(captureText) { captureText = "" }
}
public func openMemo(_ id: Int) async {
@@ -162,6 +227,67 @@ public final class AppModel {
return DSDownload.fileURL(base: base, documentID: doc.id, accessToken: accessToken)
}
/// : / (best-effort) loggedOut.
/// stale . .
public func logout() async {
try? await client.logout()
accessToken = ""
currentUser = nil
tree = []
stats = nil
reviewPendingCount = nil
captureText = ""
documentList = []
documentDetail = nil
documentDomainFilter = nil
documentsFullyLoaded = false
memoList = []
memoDetail = nil
digest = nil
selectedDocumentID = nil
selectedMemoID = nil
section = .dashboard // ( LOW: )
errorText = nil
uploadState = .idle
authPhase = .loggedOut
}
/// (NSOpenPanel URL) . IO uploadState .
public func uploadPicked(_ fileURL: URL) async {
let accessed = fileURL.startAccessingSecurityScopedResource()
defer { if accessed { fileURL.stopAccessingSecurityScopedResource() } }
let filename = fileURL.lastPathComponent
let data: Data
do {
data = try Data(contentsOf: fileURL)
} catch {
uploadState = .failed("파일을 읽을 수 없습니다: \((error as NSError).localizedDescription)")
return
}
let mime = UTType(filenameExtension: fileURL.pathExtension)?.preferredMIMEType
await upload(DocumentUpload(filename: filename, data: data, mimeType: mime))
}
/// + . ( = ).
public func upload(_ payload: DocumentUpload) async {
uploadState = .uploading(name: payload.filename)
do {
let doc = try await client.uploadDocument(payload)
uploadState = .done(title: doc.title ?? doc.downloadLabel)
await guarded { self.documentList = try await self.client.documents(DocumentListQuery()).items }
} catch let e as DSError where e.isAuthExpired {
authPhase = .loggedOut
loginError = "세션이 만료되었습니다. 다시 로그인하세요."
uploadState = .failed("세션이 만료되었습니다.")
} catch {
uploadState = .failed((error as? LocalizedError)?.errorDescription ?? "\(error)")
}
await syncAccessToken()
}
/// (done/failed ).
public func dismissUploadStatus() { uploadState = .idle }
private func guarded(_ work: () async throws -> Void) async {
do {
try await work()
@@ -23,6 +23,8 @@ public protocol DSClient: Sendable {
func patchDocument(id: Int, _ update: DocumentUpdate) async throws -> DocumentResponse
func putContent(id: Int, content: String) async throws
func deleteDocument(id: Int) async throws
/// (POST /documents/) Inbox + . 201 DocumentResponse.
func uploadDocument(_ upload: DocumentUpload) async throws -> DocumentResponse
// Search / Ask
func search(q: String, mode: SearchMode?, page: Int?, debug: Bool?) async throws -> SearchResponse
@@ -53,6 +53,9 @@ public struct FixtureDSClient: DSClient {
}
public func putContent(id: Int, content: String) async throws {}
public func deleteDocument(id: Int) async throws {}
public func uploadDocument(_ upload: DocumentUpload) async throws -> DocumentResponse {
try load("document_detail", as: DocumentDetailResponse.self).base
}
// Search / Ask
public func search(q: String, mode: SearchMode?, page: Int?, debug: Bool?) async throws -> SearchResponse {
@@ -64,15 +64,26 @@ public final class LiveDSClient: DSClient, @unchecked Sendable {
}
private func perform(_ endpoint: DSEndpoint) async throws -> Data {
let request = try makeRequest(endpoint, token: await tokens.current())
try await performWithRetry(requiresBearer: endpoint.requiresBearer) { token in
try self.makeRequest(endpoint, token: token)
}
}
/// 401 - refresh + 1 . `build` ( )URLRequest ,
/// 401 . JSON (perform) .
private func performWithRetry(
requiresBearer: Bool,
_ build: (_ token: String?) throws -> URLRequest
) async throws -> Data {
let request = try build(await tokens.current())
let (data, response) = try await dataOrTransport(request)
guard let http = response as? HTTPURLResponse else {
throw DSError.transport(underlying: "no HTTP response")
}
if http.statusCode == 401, endpoint.requiresBearer {
if http.statusCode == 401, requiresBearer {
// Single-flight refresh + one retry.
let newToken = try await tokens.refreshOnce()
let retry = try makeRequest(endpoint, token: newToken)
let retry = try build(newToken)
let (data2, response2) = try await dataOrTransport(retry)
guard let http2 = response2 as? HTTPURLResponse else {
throw DSError.transport(underlying: "no HTTP response")
@@ -122,6 +133,44 @@ public final class LiveDSClient: DSClient, @unchecked Sendable {
public func putContent(id: Int, content: String) async throws { try await sendVoid(.putContent(id, content)) }
public func deleteDocument(id: Int) async throws { try await sendVoid(.deleteDocument(id)) }
public func uploadDocument(_ upload: DocumentUpload) async throws -> DocumentResponse {
let boundary = "DSBoundary-\(UUID().uuidString)"
let body = LiveDSClient.multipartBody(for: upload, boundary: boundary)
// (POST /documents/) base (appendingPathComponent strip).
let raw = base.url.absoluteString + "/documents/"
guard let url = URL(string: raw) else { throw DSError.transport(underlying: "bad URL \(raw)") }
let data = try await performWithRetry(requiresBearer: true) { token in
var request = URLRequest(url: url)
request.httpMethod = "POST"
if let token { request.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization") }
request.setValue("multipart/form-data; boundary=\(boundary)", forHTTPHeaderField: "Content-Type")
request.httpBody = body
return request
}
do { return try decoder.decode(DocumentResponse.self, from: data) }
catch { throw DSError.decoding("documents/ upload: \(error)") }
}
/// multipart/form-data . file + form (doc_purpose/library_path).
/// internal( ) UTF-8 (Starlette ).
static func multipartBody(for upload: DocumentUpload, boundary: String) -> Data {
var body = Data()
func appendField(_ name: String, _ value: String) {
body.append(Data("--\(boundary)\r\n".utf8))
body.append(Data("Content-Disposition: form-data; name=\"\(name)\"\r\n\r\n".utf8))
body.append(Data("\(value)\r\n".utf8))
}
if let p = upload.docPurpose { appendField("doc_purpose", p) }
if let lp = upload.libraryPath { appendField("library_path", lp) }
body.append(Data("--\(boundary)\r\n".utf8))
body.append(Data("Content-Disposition: form-data; name=\"file\"; filename=\"\(upload.filename)\"\r\n".utf8))
body.append(Data("Content-Type: \(upload.mimeType ?? "application/octet-stream")\r\n\r\n".utf8))
body.append(upload.data)
body.append(Data("\r\n".utf8))
body.append(Data("--\(boundary)--\r\n".utf8))
return body
}
public func search(q: String, mode: SearchMode?, page: Int?, debug: Bool?) async throws -> SearchResponse { try await send(.search(q, mode, page, debug), as: SearchResponse.self) }
public func ask(q: String, limit: Int?, backend: String?, debug: Bool?) async throws -> AskResponse { try await send(.ask(q, limit, backend, debug), as: AskResponse.self) }
@@ -24,6 +24,25 @@ public struct MemoListQuery: Sendable {
public init() {}
}
/// (POST /documents/). `file` + form .
/// `data` ( ) .
public struct DocumentUpload: Sendable {
public var filename: String
public var data: Data
public var mimeType: String?
/// "business" | "knowledge" | nil. business @library .
public var docPurpose: String?
public var libraryPath: String?
public init(filename: String, data: Data, mimeType: String? = nil,
docPurpose: String? = nil, libraryPath: String? = nil) {
self.filename = filename
self.data = data
self.mimeType = mimeType
self.docPurpose = docPurpose
self.libraryPath = libraryPath
}
}
public struct DocumentUpdate: Codable, Sendable {
public var title: String?
public var userNote: String?
@@ -0,0 +1,50 @@
import XCTest
@testable import AppFeature
import DSKit
/// + 0 (Fixture).
final class AppModelActionsTests: XCTestCase {
// ready loggedOut + //
@MainActor
func testLogoutResetsStateAndLogsOut() async {
let model = AppModel.preview
await model.bootstrap()
XCTAssertEqual(model.authPhase, .ready)
XCTAssertFalse(model.documentList.isEmpty)
XCTAssertNotNil(model.currentUser, "loadInitial 이 me() 로 사용자 채움")
await model.logout()
XCTAssertEqual(model.authPhase, .loggedOut)
XCTAssertTrue(model.accessToken.isEmpty)
XCTAssertNil(model.currentUser)
XCTAssertTrue(model.documentList.isEmpty)
XCTAssertNil(model.documentDetail)
XCTAssertTrue(model.tree.isEmpty)
XCTAssertEqual(model.uploadState, .idle)
}
// uploadState=.done +
@MainActor
func testUploadSuccessSetsDoneAndReloads() async {
let model = AppModel.preview
await model.bootstrap()
await model.upload(DocumentUpload(filename: "x.pdf", data: Data("x".utf8), mimeType: "application/pdf"))
if case .done = model.uploadState {} else {
XCTFail("기대 .done, 실제 \(model.uploadState)")
}
XCTAssertFalse(model.documentList.isEmpty)
}
// (Equatable )
@MainActor
func testDismissUploadStatusReturnsToIdle() async {
let model = AppModel.preview
await model.bootstrap()
await model.upload(DocumentUpload(filename: "x.pdf", data: Data("x".utf8)))
model.dismissUploadStatus()
XCTAssertEqual(model.uploadState, .idle)
}
}
@@ -168,6 +168,7 @@ final class AuthStubClient: DSClient, @unchecked Sendable {
func patchDocument(id: Int, _ update: DocumentUpdate) async throws -> DocumentResponse { try await inner.patchDocument(id: id, update) }
func putContent(id: Int, content: String) async throws { try await inner.putContent(id: id, content: content) }
func deleteDocument(id: Int) async throws { try await inner.deleteDocument(id: id) }
func uploadDocument(_ upload: DocumentUpload) async throws -> DocumentResponse { try await inner.uploadDocument(upload) }
func search(q: String, mode: SearchMode?, page: Int?, debug: Bool?) async throws -> SearchResponse { try await inner.search(q: q, mode: mode, page: page, debug: debug) }
func ask(q: String, limit: Int?, backend: String?, debug: Bool?) async throws -> AskResponse { try await inner.ask(q: q, limit: limit, backend: backend, debug: debug) }
func memos(_ query: MemoListQuery) async throws -> MemoListResponse { try await inner.memos(query) }
@@ -0,0 +1,42 @@
import XCTest
@testable import DSKit
/// Fixture + multipart (// // ).
final class UploadTests: XCTestCase {
func testFixtureUploadReturnsDocument() async throws {
let doc = try await FixtureDSClient().uploadDocument(
DocumentUpload(filename: "a.pdf", data: Data("x".utf8), mimeType: "application/pdf"))
XCTAssertGreaterThan(doc.id, 0)
}
func testMultipartBodyShape() throws {
let upload = DocumentUpload(
filename: "보고서.pdf",
data: Data("PDFDATA".utf8),
mimeType: "application/pdf",
docPurpose: "knowledge"
)
let boundary = "TESTBOUNDARY"
let body = LiveDSClient.multipartBody(for: upload, boundary: boundary)
let s = try XCTUnwrap(String(data: body, encoding: .utf8))
XCTAssertTrue(s.contains("--TESTBOUNDARY\r\n"), "경계 마커")
XCTAssertTrue(s.contains(#"Content-Disposition: form-data; name="file"; filename=".pdf""#),
"file 파트 + 한글 파일명")
XCTAssertTrue(s.contains("Content-Type: application/pdf"), "파일 mime")
XCTAssertTrue(s.contains(#"Content-Disposition: form-data; name="doc_purpose""#), "선택 form 필드")
XCTAssertTrue(s.contains("knowledge"))
XCTAssertTrue(s.contains("PDFDATA"), "파일 데이터")
XCTAssertTrue(s.hasSuffix("--TESTBOUNDARY--\r\n"), "종료 경계")
}
func testMultipartOmitsAbsentOptionalFields() throws {
let upload = DocumentUpload(filename: "x.txt", data: Data("a".utf8))
let body = LiveDSClient.multipartBody(for: upload, boundary: "B")
let s = try XCTUnwrap(String(data: body, encoding: .utf8))
XCTAssertFalse(s.contains("doc_purpose"), "미지정 doc_purpose 는 본문에 없어야 함")
XCTAssertFalse(s.contains("library_path"), "미지정 library_path 는 본문에 없어야 함")
XCTAssertTrue(s.contains("Content-Type: application/octet-stream"), "mime 미지정 = octet-stream 폴백")
}
}
+1 -1
View File
@@ -54,7 +54,7 @@ UserResponse { id: Int, username: String, is_active: Bool, totp_enabled: Bool, l
| GET | `/documents/{id}/content` | — | 경량 텍스트(`content` 15k cap) | `document_content.json` |
| GET | `/documents/tree` | — | 도메인 트리(사이드바) | `documents_tree.json` |
| GET | `/documents/stats/category-counts` | — | `{counts: {category: n}, library_pending_suggestions}`**raw dict 반환(Pydantic 모델 없음), 2026-06-07 라이브 재캡처로 정정**(초기 추출이 shape 합성 오류) | `documents_stats.json` |
| POST | `/documents/` (multipart) | 파일 업로드 | `DocumentResponse` (201) | `document_detail.json` |
| POST | `/documents/` (multipart/form-data) | `file`(필수) + `doc_purpose?`(business\|knowledge) `library_path?` `facet_*?` | `DocumentResponse` (201) | `document_detail.json` |
| PATCH | `/documents/{id}` | `DocumentUpdate` | `DocumentResponse` | — |
| PUT | `/documents/{id}/content` | `{content}` (md 편집 저장) | `{}` | — |
| POST | `/documents/{id}/accept-suggestion` | `{expected_source_updated_at}` | `DocumentResponse` | — |
+4
View File
@@ -0,0 +1,4 @@
DSShell.xcodeproj/
Support/
.build/
*.xcuserstate
@@ -0,0 +1,74 @@
{
"info" : {
"version" : 1,
"author" : "xcode"
},
"images" : [
{
"scale" : "1x",
"filename" : "mac_16.png",
"idiom" : "mac",
"size" : "16x16"
},
{
"idiom" : "mac",
"size" : "16x16",
"scale" : "2x",
"filename" : "mac_32.png"
},
{
"filename" : "mac_32.png",
"size" : "32x32",
"scale" : "1x",
"idiom" : "mac"
},
{
"scale" : "2x",
"idiom" : "mac",
"size" : "32x32",
"filename" : "mac_64.png"
},
{
"idiom" : "mac",
"size" : "128x128",
"filename" : "mac_128.png",
"scale" : "1x"
},
{
"size" : "128x128",
"idiom" : "mac",
"scale" : "2x",
"filename" : "mac_256.png"
},
{
"filename" : "mac_256.png",
"scale" : "1x",
"idiom" : "mac",
"size" : "256x256"
},
{
"filename" : "mac_512.png",
"scale" : "2x",
"size" : "256x256",
"idiom" : "mac"
},
{
"filename" : "mac_512.png",
"size" : "512x512",
"idiom" : "mac",
"scale" : "1x"
},
{
"filename" : "mac_1024.png",
"size" : "512x512",
"scale" : "2x",
"idiom" : "mac"
},
{
"idiom" : "universal",
"filename" : "ios_1024.png",
"size" : "1024x1024",
"platform" : "ios"
}
]
}
Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 569 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 19 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.9 KiB

@@ -0,0 +1,3 @@
{
"info" : { "author" : "xcode", "version" : 1 }
}
+29
View File
@@ -0,0 +1,29 @@
import SwiftUI
/// DS document.hyungi.net . WKWebsiteDataStore.default()
/// (). ·iOS @main.
@main
struct DSShellApp: App {
private let url = URL(string: "https://document.hyungi.net")!
var body: some Scene {
WindowGroup {
RootWeb(url: url)
}
#if os(macOS)
.windowStyle(.automatic)
#endif
}
}
struct RootWeb: View {
let url: URL
var body: some View {
WebView(url: url)
.ignoresSafeArea()
#if os(macOS)
.frame(minWidth: 900, minHeight: 600)
.background(WindowOnScreenGuard()) //
#endif
}
}
+96
View File
@@ -0,0 +1,96 @@
import SwiftUI
import WebKit
#if os(macOS)
import AppKit
#else
import UIKit
#endif
/// document.hyungi.net WKWebView (=NSViewRepresentable / iOS=UIViewRepresentable).
/// = . (Content-Disposition: attachment) .
/// (file input) WKWebView .
struct WebView {
let url: URL
func makeCoordinator() -> Coordinator { Coordinator() }
@MainActor
fileprivate func makeWebView(coordinator: Coordinator) -> WKWebView {
let cfg = WKWebViewConfiguration()
cfg.websiteDataStore = .default() // ()
let wv = WKWebView(frame: .zero, configuration: cfg)
wv.navigationDelegate = coordinator
wv.allowsBackForwardNavigationGestures = true
wv.load(URLRequest(url: url))
return wv
}
final class Coordinator: NSObject, WKNavigationDelegate, WKDownloadDelegate {
// , (PDF ).
func webView(_ webView: WKWebView,
decidePolicyFor navigationResponse: WKNavigationResponse,
decisionHandler: @escaping (WKNavigationResponsePolicy) -> Void) {
if let http = navigationResponse.response as? HTTPURLResponse,
let cd = http.value(forHTTPHeaderField: "Content-Disposition"),
cd.lowercased().contains("attachment") {
decisionHandler(.download)
} else {
decisionHandler(.allow)
}
}
func webView(_ webView: WKWebView, navigationResponse: WKNavigationResponse, didBecome download: WKDownload) {
download.delegate = self
}
func webView(_ webView: WKWebView, navigationAction: WKNavigationAction, didBecome download: WKDownload) {
download.delegate = self
}
func download(_ download: WKDownload,
decideDestinationUsing response: URLResponse,
suggestedFilename: String) async -> URL? {
#if os(macOS)
let folder = FileManager.default.urls(for: .downloadsDirectory, in: .userDomainMask).first
#else
let folder = FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first
#endif
let dir = folder ?? FileManager.default.temporaryDirectory
var dest = dir.appendingPathComponent(suggestedFilename.isEmpty ? "download" : suggestedFilename)
// (name_1.ext )
let base = dest.deletingPathExtension().lastPathComponent
let ext = dest.pathExtension
var n = 1
while FileManager.default.fileExists(atPath: dest.path) {
let name = ext.isEmpty ? "\(base)_\(n)" : "\(base)_\(n).\(ext)"
dest = dir.appendingPathComponent(name); n += 1
}
return dest
}
}
}
#if os(macOS)
extension WebView: NSViewRepresentable {
func makeNSView(context: Context) -> WKWebView { makeWebView(coordinator: context.coordinator) }
func updateNSView(_ nsView: WKWebView, context: Context) {}
}
/// ( ) " " .
struct WindowOnScreenGuard: NSViewRepresentable {
func makeNSView(context: Context) -> NSView { OnScreenView() }
func updateNSView(_ nsView: NSView, context: Context) {}
final class OnScreenView: NSView {
override func viewDidMoveToWindow() {
super.viewDidMoveToWindow()
guard let win = window else { return }
if !NSScreen.screens.contains(where: { $0.visibleFrame.intersects(win.frame) }) { win.center() }
}
}
}
#else
extension WebView: UIViewRepresentable {
func makeUIView(context: Context) -> WKWebView { makeWebView(coordinator: context.coordinator) }
func updateUIView(_ uiView: WKWebView, context: Context) {}
}
#endif
+88
View File
@@ -0,0 +1,88 @@
# DS 웹 래퍼 — document.hyungi.net 을 WKWebView 로 감싼 네이티브 앱(맥 + iOS).
# 웹 UI 100% 재사용·항상 최신·코드 1벌(2026-06-15 결정). 순수 네이티브는 워치(clients/ds-watch)만.
# project.yml = source of truth, *.xcodeproj/Support = 생성물(gitignore).
name: DSShell
options:
bundleIdPrefix: net.hyungi
deploymentTarget:
macOS: "14.0"
iOS: "17.0"
createIntermediateGroups: true
minimumXcodeGenVersion: "2.40.0"
settings:
base:
SWIFT_VERSION: "6.0"
CODE_SIGN_STYLE: Automatic
CODE_SIGNING_ALLOWED: "NO"
CODE_SIGNING_REQUIRED: "NO"
GENERATE_INFOPLIST_FILE: "NO"
targets:
DSShellMac:
type: application
platform: macOS
deploymentTarget: "14.0"
sources:
- path: Sources
settings:
base:
PRODUCT_BUNDLE_IDENTIFIER: net.hyungi.dsshell
PRODUCT_NAME: DS
MARKETING_VERSION: "0.1"
CURRENT_PROJECT_VERSION: "1"
ASSETCATALOG_COMPILER_APPICON_NAME: AppIcon
info:
path: Support/Mac-Info.plist
properties:
CFBundleName: DS
CFBundleDisplayName: DS
CFBundleShortVersionString: "0.1"
CFBundleVersion: "1"
CFBundlePackageType: APPL
LSMinimumSystemVersion: "14.0"
LSApplicationCategoryType: public.app-category.productivity
entitlements:
path: Support/Mac.entitlements
properties:
com.apple.security.app-sandbox: true
com.apple.security.network.client: true
com.apple.security.files.downloads.read-write: true # 원본 다운로드 저장
com.apple.security.files.user-selected.read-write: true # 업로드 파일 선택
DSShelliOS:
type: application
platform: iOS
deploymentTarget: "17.0"
sources:
- path: Sources
settings:
base:
PRODUCT_BUNDLE_IDENTIFIER: net.hyungi.dsshell
PRODUCT_NAME: DS
MARKETING_VERSION: "0.1"
CURRENT_PROJECT_VERSION: "1"
TARGETED_DEVICE_FAMILY: "1,2"
ASSETCATALOG_COMPILER_APPICON_NAME: AppIcon
info:
path: Support/iOS-Info.plist
properties:
CFBundleName: DS
CFBundleDisplayName: DS
CFBundleShortVersionString: "0.1"
CFBundleVersion: "1"
UILaunchScreen: {}
UISupportedInterfaceOrientations:
- UIInterfaceOrientationPortrait
- UIInterfaceOrientationLandscapeLeft
- UIInterfaceOrientationLandscapeRight
schemes:
DSShellMac:
build:
targets: { DSShellMac: all }
run: { config: Debug }
DSShelliOS:
build:
targets: { DSShelliOS: all }
run: { config: Debug }
+5
View File
@@ -0,0 +1,5 @@
# xcodegen 생성물 (project.yml 이 source of truth)
DSWatch.xcodeproj/
Support/
.build/
*.xcuserstate
@@ -0,0 +1,6 @@
{
"images" : [
{ "idiom" : "universal", "platform" : "watchos", "size" : "1024x1024", "filename" : "watch_1024.png" }
],
"info" : { "author" : "xcode", "version" : 1 }
}
Binary file not shown.

After

Width:  |  Height:  |  Size: 38 KiB

@@ -0,0 +1,3 @@
{
"info" : { "author" : "xcode", "version" : 1 }
}
+28
View File
@@ -0,0 +1,28 @@
import SwiftUI
/// DS (standalone). 4 = (AI)·()· ·.
/// = (/study-cards/due·rate) / = . OLED.
@main
struct DSWatchApp: App {
@State private var model = WatchModel()
var body: some Scene {
WindowGroup {
RootGate()
.environment(model)
.task { await model.bootstrap() }
}
}
}
/// : checking( ) loggedOut() ready().
struct RootGate: View {
@Environment(WatchModel.self) private var model
var body: some View {
switch model.phase {
case .checking: ProgressView()
case .loggedOut: LoginView()
case .ready: RootMenu()
}
}
}
+9
View File
@@ -0,0 +1,9 @@
import WatchKit
/// / ( ).
@MainActor
enum Haptics {
static func success() { WKInterfaceDevice.current().play(.success) }
static func retry() { WKInterfaceDevice.current().play(.retry) }
static func click() { WKInterfaceDevice.current().play(.click) }
}
+262
View File
@@ -0,0 +1,262 @@
import Foundation
/// API . DS TLS(document.hyungi.net) Tailscale .
/// access = / refresh =HTTPCookieStorage(7 ) 1 .
/// Pydantic (study_cards.py CardItem/RateBody) .
enum WatchAPI {
static let baseString = "https://document.hyungi.net/api"
}
/// GET /study-cards/due CardItem ( ).
struct WCard: Decodable, Identifiable, Sendable {
let id: Int
let format: String
let cue: String
let fact: String
let clozeText: String?
let needsReview: Bool
let reviewStage: Int?
enum CodingKeys: String, CodingKey {
case id, format, cue, fact
case clozeText = "cloze_text"
case needsReview = "needs_review"
case reviewStage = "review_stage"
}
}
/// GET /events/today EventResponse ( ).
struct WEvent: Decodable, Identifiable, Sendable {
let id: Int
let title: String
let status: String
let dueAt: String?
let completedAt: String?
enum CodingKeys: String, CodingKey {
case id, title, status
case dueAt = "due_at"
case completedAt = "completed_at"
}
var isDone: Bool { status == "completed" || completedAt != nil }
}
private struct WEventList: Decodable { let items: [WEvent] }
/// GET /briefing/latest / ( ).
struct WPerspective: Decodable, Identifiable, Sendable {
let country: String
let summary: String
var id: String { country }
}
struct WTopic: Decodable, Identifiable, Sendable {
let id: Int
let topicLabel: String
let headline: String
let countryPerspectives: [WPerspective]
enum CodingKeys: String, CodingKey {
case id, headline
case topicLabel = "topic_label"
case countryPerspectives = "country_perspectives"
}
}
struct WBriefing: Decodable, Sendable {
let status: String
let headlineOneliner: String?
let topics: [WTopic]
enum CodingKeys: String, CodingKey {
case status, topics
case headlineOneliner = "headline_oneliner"
}
}
/// SSE unavailable( /).
struct ChatResult: Sendable {
let answer: String
let unavailable: Bool
let reason: String?
}
private struct AccessTokenBody: Decodable { let accessToken: String
enum CodingKeys: String, CodingKey { case accessToken = "access_token" } }
enum WCError: Error, LocalizedError {
case transport(String)
case http(Int, String?)
case decoding(String)
var errorDescription: String? {
switch self {
case .transport(let m): return "네트워크 오류: \(m)"
case .http(let s, let m): return m ?? "서버 오류 (\(s))"
case .decoding(let m): return "응답 해석 실패: \(m)"
}
}
var isUnauthorized: Bool { if case .http(401, _) = self { return true }; return false }
}
actor WatchClient {
private let session: URLSession
private var accessToken: String?
init() {
let cfg = URLSessionConfiguration.default
cfg.httpCookieStorage = .shared
cfg.httpShouldSetCookies = true
cfg.waitsForConnectivity = true
session = URLSession(configuration: cfg)
}
private func url(_ path: String) -> URL { URL(string: WatchAPI.baseString + "/" + path)! }
private func send(_ req: URLRequest) async throws -> (Data, HTTPURLResponse) {
do {
let (d, r) = try await session.data(for: req)
guard let h = r as? HTTPURLResponse else { throw WCError.transport("no HTTP response") }
return (d, h)
} catch let e as WCError { throw e }
catch { throw WCError.transport("\(error.localizedDescription)") }
}
private static func decodeMessage(_ data: Data) -> String? {
guard let o = try? JSONSerialization.jsonObject(with: data) as? [String: Any] else { return nil }
if let s = o["detail"] as? String { return s }
if let d = o["detail"] as? [String: Any] { return d["message"] as? String }
return nil
}
// MARK: auth
func login(username: String, password: String, totp: String?) async throws {
var req = URLRequest(url: url("auth/login"))
req.httpMethod = "POST"
req.setValue("application/json", forHTTPHeaderField: "Content-Type")
var body: [String: Any] = ["username": username, "password": password]
if let totp, !totp.isEmpty { body["totp_code"] = totp }
req.httpBody = try JSONSerialization.data(withJSONObject: body)
let (data, http) = try await send(req)
guard (200..<300).contains(http.statusCode) else { throw WCError.http(http.statusCode, Self.decodeMessage(data)) }
accessToken = try decodeToken(data)
}
@discardableResult
func refresh() async throws -> String {
var req = URLRequest(url: url("auth/refresh"))
req.httpMethod = "POST"
let (data, http) = try await send(req)
guard (200..<300).contains(http.statusCode) else { throw WCError.http(http.statusCode, Self.decodeMessage(data)) }
let t = try decodeToken(data)
accessToken = t
return t
}
func logout() async {
accessToken = nil
var req = URLRequest(url: url("auth/logout")); req.httpMethod = "POST"
_ = try? await send(req)
}
private func decodeToken(_ data: Data) throws -> String {
do { return try JSONDecoder().decode(AccessTokenBody.self, from: data).accessToken }
catch { throw WCError.decoding("token: \(error)") }
}
// MARK: authed request (401 single refresh + retry)
private func authed(_ path: String, method: String = "GET", json: [String: Any]? = nil) async throws -> Data {
func make(_ token: String?) throws -> URLRequest {
var r = URLRequest(url: url(path))
r.httpMethod = method
if let token { r.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization") }
if let json { r.httpBody = try JSONSerialization.data(withJSONObject: json); r.setValue("application/json", forHTTPHeaderField: "Content-Type") }
return r
}
let (data, http) = try await send(make(accessToken))
if http.statusCode == 401 {
let newToken = try await refresh()
let (d2, h2) = try await send(make(newToken))
guard (200..<300).contains(h2.statusCode) else { throw WCError.http(h2.statusCode, Self.decodeMessage(d2)) }
return d2
}
guard (200..<300).contains(http.statusCode) else { throw WCError.http(http.statusCode, Self.decodeMessage(data)) }
return data
}
// MARK: study cards
func dueCards() async throws -> [WCard] {
let data = try await authed("study-cards/due")
do { return try JSONDecoder().decode([WCard].self, from: data) }
catch { throw WCError.decoding("due: \(error)") }
}
func rate(cardId: Int, outcome: String) async throws {
_ = try await authed("study-cards/\(cardId)/rate", method: "POST", json: ["outcome": outcome])
}
func flag(cardId: Int) async throws {
_ = try await authed("study-cards/\(cardId)", method: "PATCH", json: ["needs_review": true])
}
// MARK: events ()
func events() async throws -> [WEvent] {
let data = try await authed("events/today")
do { return try JSONDecoder().decode(WEventList.self, from: data).items }
catch { throw WCError.decoding("events: \(error)") }
}
func completeEvent(id: Int) async throws {
_ = try await authed("events/\(id)/complete", method: "POST")
}
// MARK: briefing ( )
func briefing() async throws -> WBriefing {
let data = try await authed("briefing/latest")
do { return try JSONDecoder().decode(WBriefing.self, from: data) }
catch { throw WCError.decoding("briefing: \(error)") }
}
// MARK: eid chat (SSE 26B via DS )
func chat(_ text: String) async throws -> ChatResult {
let payload: [String: Any] = ["mode": "daily", "messages": [["role": "user", "content": text]]]
func make(_ token: String?) throws -> URLRequest {
var r = URLRequest(url: url("eid/chat"))
r.httpMethod = "POST"
r.timeoutInterval = 120
if let token { r.setValue("Bearer \(token)", forHTTPHeaderField: "Authorization") }
r.setValue("application/json", forHTTPHeaderField: "Content-Type")
r.setValue("text/event-stream", forHTTPHeaderField: "Accept")
r.httpBody = try JSONSerialization.data(withJSONObject: payload)
return r
}
var (stream, resp) = try await session.bytes(for: make(accessToken))
if (resp as? HTTPURLResponse)?.statusCode == 401 {
let t = try await refresh()
(stream, resp) = try await session.bytes(for: make(t))
}
guard let http = resp as? HTTPURLResponse else { throw WCError.transport("no HTTP response") }
let ctype = http.value(forHTTPHeaderField: "Content-Type") ?? ""
if ctype.contains("text/event-stream") {
var answer = ""
for try await line in stream.lines {
guard line.hasPrefix("data:") else { continue }
let body = line.dropFirst(5).trimmingCharacters(in: .whitespaces)
if body == "[DONE]" || body.isEmpty { continue }
if let d = body.data(using: .utf8),
let obj = try? JSONSerialization.jsonObject(with: d) as? [String: Any],
let choices = obj["choices"] as? [[String: Any]],
let delta = choices.first?["delta"] as? [String: Any],
let content = delta["content"] as? String {
answer += content
}
}
return ChatResult(answer: answer, unavailable: answer.isEmpty,
reason: answer.isEmpty ? "빈 응답" : nil)
}
// - = unavailable JSONResponse ( /) .
var raw = Data()
for try await b in stream { raw.append(b) }
return ChatResult(answer: "", unavailable: true, reason: Self.decodeMessage(raw) ?? "이드 연결 불가")
}
}
+46
View File
@@ -0,0 +1,46 @@
import SwiftUI
/// = 4 . .
struct RootMenu: View {
var body: some View {
NavigationStack {
List {
NavigationLink { EidView() } label: {
MenuRow(symbol: "bubble.left.and.bubble.right.fill", title: "이드", sub: "AI 채팅")
}
NavigationLink { StudyView() } label: {
MenuRow(symbol: "rectangle.on.rectangle.angled.fill", title: "공부", sub: "암기 카드")
}
NavigationLink { TodoView() } label: {
MenuRow(symbol: "checklist", title: "할 일", sub: "오늘")
}
NavigationLink { BriefingView() } label: {
MenuRow(symbol: "newspaper.fill", title: "브리핑", sub: "모닝")
}
}
.navigationTitle("DS")
}
.tint(WT.accent)
}
}
struct MenuRow: View {
let symbol: String
let title: String
let sub: String
var body: some View {
HStack(spacing: 10) {
Image(systemName: symbol)
.font(.system(size: 16))
.foregroundStyle(WT.accent)
.frame(width: 24)
VStack(alignment: .leading, spacing: 1) {
Text(title).font(.system(size: 16, weight: .semibold)).foregroundStyle(WT.ink)
Text(sub).font(.system(size: 11)).foregroundStyle(WT.muted)
}
}
.padding(.vertical, 3)
}
}
#Preview { RootMenu() }
+160
View File
@@ -0,0 +1,160 @@
import SwiftUI
// MARK: - (Todo) GET /events/today + POST /complete
struct TodoView: View {
@Environment(WatchModel.self) private var model
@State private var loaded = false
var body: some View {
Group {
if model.eventsLoading && model.events.isEmpty {
ProgressView()
} else if let e = model.eventsError, model.events.isEmpty {
retry("불러오기 실패\n\(e)") { await model.loadEvents() }
} else if model.events.isEmpty {
retry("오늘 할 일이 없어요", color: WT.muted) { await model.loadEvents() }
} else {
List(model.events) { ev in
Button {
if !ev.isDone { Haptics.success() }
Task { await model.completeEvent(ev.id) }
} label: {
HStack(spacing: 10) {
Image(systemName: ev.isDone ? "checkmark.circle.fill" : "circle")
.font(.system(size: 17))
.foregroundStyle(ev.isDone ? WT.accent : WT.muted)
Text(ev.title)
.font(.system(size: 14))
.foregroundStyle(ev.isDone ? WT.muted : WT.ink)
.strikethrough(ev.isDone, color: WT.muted)
Spacer()
}
.padding(.vertical, 2)
}
.buttonStyle(.plain)
}
}
}
.navigationTitle("할 일")
.task { if !loaded { loaded = true; await model.loadEvents() } }
}
}
// MARK: - () GET /briefing/latest,
struct BriefingView: View {
@Environment(WatchModel.self) private var model
@State private var loaded = false
var body: some View {
ScrollView {
if model.briefingLoading && model.briefing == nil {
ProgressView().padding(.top, 20)
} else if let e = model.briefingError, model.briefing == nil {
retry("불러오기 실패\n\(e)") { await model.loadBriefing() }
} else if let b = model.briefing, !b.topics.isEmpty {
VStack(alignment: .leading, spacing: 10) {
if let one = b.headlineOneliner, !one.isEmpty {
Text(one).font(.system(size: 15, weight: .semibold)).foregroundStyle(WT.ink)
}
ForEach(b.topics) { t in
VStack(alignment: .leading, spacing: 5) {
Text(t.headline).font(.system(size: 13, weight: .semibold)).foregroundStyle(WT.ink)
ForEach(t.countryPerspectives) { p in
HStack(alignment: .top, spacing: 5) {
Text(p.country.uppercased())
.font(.system(size: 9, weight: .bold)).foregroundStyle(WT.accent)
.frame(minWidth: 22, alignment: .leading)
Text(p.summary).font(.system(size: 11)).foregroundStyle(WT.muted).lineLimit(4)
}
}
}
.frame(maxWidth: .infinity, alignment: .leading)
.padding(8)
.background(WT.card, in: RoundedRectangle(cornerRadius: 12))
}
}
} else {
retry("오늘 브리핑이 아직 없어요", color: WT.muted) { await model.loadBriefing() }
}
}
.navigationTitle("브리핑")
.task { if !loaded { loaded = true; await model.loadBriefing() } }
}
}
// MARK: - (AI ) POST /eid/chat ( 26B via DS )
struct EidView: View {
@Environment(WatchModel.self) private var model
@State private var draft = ""
var body: some View {
ScrollView {
VStack(spacing: 8) {
HStack(spacing: 6) {
TextField("물어보기…", text: $draft)
.textFieldStyle(.plain)
.padding(8)
.background(WT.card, in: RoundedRectangle(cornerRadius: 10))
Button {
let t = draft; draft = ""
Task { await model.sendChat(t) }
} label: {
Image(systemName: "arrow.up.circle.fill").font(.system(size: 22))
}
.buttonStyle(.plain)
.foregroundStyle(WT.accent)
.disabled(draft.trimmingCharacters(in: .whitespacesAndNewlines).isEmpty || model.chatSending)
}
if model.chatSending {
HStack(spacing: 6) {
ProgressView().controlSize(.small)
Text("이드 생각 중…").font(.system(size: 11)).foregroundStyle(WT.muted)
}
}
ForEach(model.chatTurns.reversed()) { turn in
ChatBubble(turn: turn)
}
if model.chatTurns.isEmpty && !model.chatSending {
Text("음성·키보드로 묻고\n맥미니 26B 가 답합니다")
.font(.system(size: 11)).foregroundStyle(WT.muted)
.multilineTextAlignment(.center).padding(.top, 8)
}
}
}
.navigationTitle("이드")
}
}
private struct ChatBubble: View {
let turn: WatchModel.ChatTurn
var body: some View {
let isUser = turn.role == "user"
let isError = turn.role == "error"
HStack {
if isUser { Spacer(minLength: 24) }
Text(turn.text)
.font(.system(size: 12))
.foregroundStyle(isUser ? .black : (isError ? WT.danger : WT.ink))
.frame(maxWidth: .infinity, alignment: isUser ? .trailing : .leading)
.padding(8)
.background(isUser ? WT.accent : (isError ? WT.danger.opacity(0.15) : WT.card),
in: RoundedRectangle(cornerRadius: 10))
if !isUser { Spacer(minLength: 24) }
}
}
}
// MARK: - /
@MainActor
private func retry(_ text: String, color: Color = WT.danger, _ action: @escaping () async -> Void) -> some View {
VStack(spacing: 10) {
Text(text).font(.system(size: 13)).foregroundStyle(color).multilineTextAlignment(.center)
Button("다시 불러오기") { Task { await action() } }.tint(WT.accent)
}
.frame(maxWidth: .infinity)
.padding(.horizontal, 6).padding(.top, 16)
}
+132
View File
@@ -0,0 +1,132 @@
import SwiftUI
/// () ( cue fact) + 2 (/).
/// (B5): 2 ( ), ' ' ( /), OLED.
/// = GET /study-cards/due, = POST /{id}/rate (correct/wrong), = PATCH needs_review.
struct StudyView: View {
@Environment(WatchModel.self) private var model
@State private var index = 0
@State private var revealed = false
@State private var correctCount = 0
@State private var flagged = false
@State private var loaded = false
var body: some View {
Group {
if model.studyLoading && model.cards.isEmpty {
ProgressView()
} else if let err = model.studyError, model.cards.isEmpty {
stateText("불러오기 실패\n\(err)", color: WT.danger, retry: true)
} else if model.cards.isEmpty {
stateText("복습할 카드가 없어요", color: WT.muted, retry: true)
} else if index >= model.cards.count {
ResultView(total: model.cards.count, correct: correctCount) { Task { await reload() } }
} else {
cardScreen(model.cards[index])
}
}
.navigationTitle("공부")
.task { if !loaded { loaded = true; await model.loadDue(); reset() } }
}
private func cardScreen(_ c: WCard) -> some View {
VStack(spacing: 8) {
HStack {
Text("\(index + 1) / \(model.cards.count)").font(.system(size: 11)).foregroundStyle(WT.muted)
Spacer()
Button {
flagged = true
Haptics.click()
Task { await model.flag(cardId: c.id) }
} label: {
Image(systemName: flagged ? "flag.fill" : "flag")
.font(.system(size: 11)).foregroundStyle(flagged ? WT.amber : WT.muted)
}
.buttonStyle(.plain)
}
ScrollView {
VStack(spacing: 10) {
Text(c.cue)
.font(.system(size: 17, weight: .semibold)).foregroundStyle(WT.ink)
.multilineTextAlignment(.center)
if revealed {
Divider().overlay(WT.muted.opacity(0.4))
Text(c.fact)
.font(.system(size: 15)).foregroundStyle(WT.accent)
.multilineTextAlignment(.center)
}
}
.frame(maxWidth: .infinity)
.padding(12)
.background(WT.card, in: RoundedRectangle(cornerRadius: 14))
}
if revealed {
HStack(spacing: 8) {
rateButton("다시", sub: "내일", color: WT.danger) { advance(c, correct: false) }
rateButton("알아요", sub: nil, color: WT.accent) { advance(c, correct: true) }
}
} else {
Button { withAnimation(.easeOut(duration: 0.15)) { revealed = true } } label: {
Text("답 보기").frame(maxWidth: .infinity)
}
.tint(WT.accent)
}
}
.padding(.horizontal, 4)
}
private func rateButton(_ title: String, sub: String?, color: Color, _ action: @escaping () -> Void) -> some View {
Button(action: action) {
VStack(spacing: 1) {
Text(title).font(.system(size: 14, weight: .semibold))
if let sub { Text(sub).font(.system(size: 9)).opacity(0.8) }
}
.frame(maxWidth: .infinity).padding(.vertical, 2)
}
.tint(color)
}
private func stateText(_ text: String, color: Color, retry: Bool) -> some View {
VStack(spacing: 10) {
Text(text).font(.system(size: 13)).foregroundStyle(color).multilineTextAlignment(.center)
if retry { Button("다시 불러오기") { Task { await reload() } }.tint(WT.accent) }
}
.padding(.horizontal, 6)
}
private func advance(_ c: WCard, correct: Bool) {
if correct { correctCount += 1 }
Haptics.success() // (/ )
Task { await model.rate(cardId: c.id, outcome: correct ? "correct" : "wrong") }
flagged = false
revealed = false
index += 1
}
private func reload() async { await model.loadDue(); reset() }
private func reset() { index = 0; revealed = false; correctCount = 0; flagged = false }
}
/// tally( streak X).
struct ResultView: View {
let total: Int
let correct: Int
let onRestart: () -> Void
var body: some View {
ScrollView {
VStack(spacing: 10) {
Image(systemName: "checkmark.seal.fill").font(.system(size: 30)).foregroundStyle(WT.accent)
Text("오늘 복습 완료").font(.system(size: 16, weight: .semibold)).foregroundStyle(WT.ink)
Text("\(correct) / \(total) 알아요").font(.system(size: 13)).foregroundStyle(WT.muted)
Text("애매하거나 몰랐던 카드는 내일 다시 만나요")
.font(.system(size: 11)).foregroundStyle(WT.muted).multilineTextAlignment(.center)
Button("다시 불러오기", action: onRestart).tint(WT.accent).padding(.top, 4)
}
.frame(maxWidth: .infinity).padding(.vertical, 6)
}
.navigationTitle("결과")
}
}
+162
View File
@@ -0,0 +1,162 @@
import SwiftUI
import Observation
/// . refresh . .
@MainActor
@Observable
final class WatchModel {
enum Phase: Equatable { case checking, loggedOut, ready }
var phase: Phase = .checking
var loginError: String?
// (study)
var cards: [WCard] = []
var studyLoading = false
var studyError: String?
// (events)
var events: [WEvent] = []
var eventsLoading = false
var eventsError: String?
//
var briefing: WBriefing?
var briefingLoading = false
var briefingError: String?
// (chat)
struct ChatTurn: Identifiable, Sendable { let id: Int; let role: String; let text: String }
var chatTurns: [ChatTurn] = []
var chatSending = false
private var chatSeq = 0
private let client = WatchClient()
func bootstrap() async {
do { _ = try await client.refresh(); phase = .ready }
catch { phase = .loggedOut } // / =
}
func login(username: String, password: String, totp: String?) async {
loginError = nil
let code = totp?.trimmingCharacters(in: .whitespacesAndNewlines)
do {
try await client.login(username: username, password: password,
totp: (code?.isEmpty ?? true) ? nil : code)
phase = .ready
} catch {
loginError = (error as? LocalizedError)?.errorDescription ?? "\(error)"
}
}
func logout() async {
await client.logout()
cards = []; studyError = nil
phase = .loggedOut
}
func loadDue() async {
studyLoading = true; studyError = nil
do { cards = try await client.dueCards() }
catch let e as WCError where e.isUnauthorized { phase = .loggedOut }
catch { studyError = (error as? LocalizedError)?.errorDescription ?? "\(error)" }
studyLoading = false
}
/// (correct/wrong). ( ) .
func rate(cardId: Int, outcome: String) async {
do { try await client.rate(cardId: cardId, outcome: outcome) }
catch let e as WCError where e.isUnauthorized { phase = .loggedOut }
catch { studyError = (error as? LocalizedError)?.errorDescription ?? "\(error)" }
}
func flag(cardId: Int) async {
do { try await client.flag(cardId: cardId) }
catch { studyError = (error as? LocalizedError)?.errorDescription ?? "\(error)" }
}
// MARK: (events)
func loadEvents() async {
eventsLoading = true; eventsError = nil
do { events = try await client.events() }
catch let e as WCError where e.isUnauthorized { phase = .loggedOut }
catch { eventsError = (error as? LocalizedError)?.errorDescription ?? "\(error)" }
eventsLoading = false
}
func completeEvent(_ id: Int) async {
do { try await client.completeEvent(id: id); await loadEvents() }
catch let e as WCError where e.isUnauthorized { phase = .loggedOut }
catch { eventsError = (error as? LocalizedError)?.errorDescription ?? "\(error)" }
}
// MARK:
func loadBriefing() async {
briefingLoading = true; briefingError = nil
do { briefing = try await client.briefing() }
catch let e as WCError where e.isUnauthorized { phase = .loggedOut }
catch { briefingError = (error as? LocalizedError)?.errorDescription ?? "\(error)" }
briefingLoading = false
}
// MARK: (chat)
func sendChat(_ text: String) async {
let t = text.trimmingCharacters(in: .whitespacesAndNewlines)
guard !t.isEmpty, !chatSending else { return }
chatSeq += 1; chatTurns.append(.init(id: chatSeq, role: "user", text: t))
chatSending = true
do {
let result = try await client.chat(t)
chatSeq += 1
if result.unavailable {
chatTurns.append(.init(id: chatSeq, role: "error", text: result.reason ?? "이드 연결 불가"))
} else {
chatTurns.append(.init(id: chatSeq, role: "assistant", text: result.answer))
}
} catch let e as WCError where e.isUnauthorized {
phase = .loggedOut
} catch {
chatSeq += 1
chatTurns.append(.init(id: chatSeq, role: "error",
text: (error as? LocalizedError)?.errorDescription ?? "\(error)"))
}
chatSending = false
}
}
/// 1 (refresh 7 1). TOTP 6 .
struct LoginView: View {
@Environment(WatchModel.self) private var model
@State private var username = ""
@State private var password = ""
@State private var totp = ""
@State private var busy = false
var body: some View {
ScrollView {
VStack(spacing: 8) {
Text("DS 로그인").font(.system(size: 16, weight: .semibold)).foregroundStyle(WT.ink)
TextField("아이디", text: $username)
.textContentType(.username)
SecureField("비밀번호", text: $password)
TextField("OTP 6자리", text: $totp)
if let err = model.loginError {
Text(err).font(.system(size: 11)).foregroundStyle(WT.danger).multilineTextAlignment(.center)
}
Button {
busy = true
Task { await model.login(username: username, password: password, totp: totp); busy = false }
} label: {
if busy { ProgressView() } else { Text("로그인").frame(maxWidth: .infinity) }
}
.tint(WT.accent)
.disabled(busy || username.isEmpty || password.isEmpty)
}
.padding(.horizontal, 4)
}
}
}
+12
View File
@@ -0,0 +1,12 @@
import SwiftUI
/// OLED ( watch-app: --wgreen #37d67a). = OLED ·.
enum WT {
static let bg = Color.black
static let card = Color(white: 0.12)
static let accent = Color(red: 0x37 / 255, green: 0xd6 / 255, blue: 0x7a / 255) // #37d67a
static let ink = Color.white
static let muted = Color(white: 0.62)
static let amber = Color(red: 0xf2 / 255, green: 0xb6 / 255, blue: 0x3c / 255)
static let danger = Color(red: 0xe5 / 255, green: 0x6a / 255, blue: 0x5a / 255)
}
+55
View File
@@ -0,0 +1,55 @@
# DS Apple Watch 앱 (단일 타깃 standalone watchOS, WKApplication). 맥/아이폰은 웹 래퍼로 가고
# 순수 네이티브는 워치 전용(2026-06-15 사용자 결정). 시뮬레이터 빌드·스크린샷으로 검증, 실기기
# 설치는 사용자 Xcode 서명. project.yml = source of truth, *.xcodeproj/Support 는 생성물(gitignore).
name: DSWatch
options:
bundleIdPrefix: net.hyungi
deploymentTarget:
watchOS: "11.0"
createIntermediateGroups: true
minimumXcodeGenVersion: "2.40.0"
settings:
base:
SWIFT_VERSION: "6.0"
SWIFT_STRICT_CONCURRENCY: complete
WATCHOS_DEPLOYMENT_TARGET: "11.0"
CODE_SIGN_STYLE: Automatic
# 실기기 설치 시 Xcode 에서 Signing → 본인 Apple ID 팀 선택하면 자동 서명.
# (헤드리스 시뮬 빌드는 xcodebuild 에 CODE_SIGNING_ALLOWED=NO 를 CLI 로 전달)
targets:
DSWatch:
type: application
platform: watchOS
deploymentTarget: "11.0"
sources:
- path: Sources
settings:
base:
PRODUCT_BUNDLE_IDENTIFIER: net.hyungi.dswatch
PRODUCT_NAME: DS
GENERATE_INFOPLIST_FILE: "NO"
MARKETING_VERSION: "0.1"
CURRENT_PROJECT_VERSION: "1"
TARGETED_DEVICE_FAMILY: "4" # Apple Watch
ASSETCATALOG_COMPILER_APPICON_NAME: AppIcon
info:
path: Support/Info.plist
properties:
CFBundleDisplayName: DS
CFBundleName: DS
CFBundleVersion: "1"
CFBundleShortVersionString: "0.1"
WKApplication: true # 단일 타깃 standalone 워치 앱 (컴패니언 불요)
WKWatchOnly: true # 컴패니언 iOS 앱 없는 watch-only (설치 필수 키)
UISupportedInterfaceOrientations:
- UIInterfaceOrientationPortrait
schemes:
DSWatch:
build:
targets:
DSWatch: all
run:
config: Debug
+9 -13
View File
@@ -1,6 +1,8 @@
# hyungi_Document_Server 설정
ai:
gateway:
endpoint: "http://ai-gateway:8080"
models:
# ─── 단일 generation 호스트 routing (2026-05-14 GPU LLM 제거) ───
@@ -11,7 +13,7 @@ ai:
# triage: 상시 분류·요약·근거 선별. Mac mini Qwen 27B (primary 와 동일 endpoint, 짧은 max_tokens).
triage:
endpoint: "http://100.76.254.116:8890/v1/chat/completions"
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/Qwen3.6-27B-6bit"
max_tokens: 4096
timeout: 480 # 프리필 실측 ~112 tok/s — 120K자 장문 커버 (2026-06-11)
@@ -20,7 +22,7 @@ ai:
# primary: 에스컬레이션 전용. Qwen 27B MLX (맥미니 Semaphore(1) 보호 대상).
primary:
endpoint: "http://100.76.254.116:8890/v1/chat/completions"
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/Qwen3.6-27B-6bit"
max_tokens: 8192
timeout: 900 # 프리필 실측 ~112 tok/s — 260K자 상한 장문 커버 (2026-06-11)
@@ -70,7 +72,7 @@ ai:
# Phase 3.5a answerability classifier. 2026-05-14 GPU LLM 제거 후 Mac mini 26B 로 swap.
# classifier_service 가 hasattr 체크로 optional 이므로 이 섹션 제거 시 classifier gate 는 자동 skip (score-only).
classifier:
endpoint: "http://100.76.254.116:8890/v1/chat/completions"
endpoint: "http://100.76.254.116:8801/v1/chat/completions"
model: "mlx-community/Qwen3.6-27B-6bit" # 2026-06-11 B안 동승 — gemma id 잔존 시 mlx 서버가 Gemma 를 재로드(이중 적재) 위험
max_tokens: 512
timeout: 30 # 2026-05-17: 15s 도 동시 부하 시 elapsed 14.4s 직전이라 tight — 30s 로 2x 마진. classifier_service.LLM_TIMEOUT_MS=30000 와 align (초과 = score-only skip, graceful)
@@ -197,14 +199,8 @@ schedule:
# 이력: 2026-06-11 맥미니 모델 확정까지 8키 홀드 → 同日 Qwen3.6-27B-6bit 전환과 함께 해제([]).
pipeline:
held_stages: []
# mlx gate 동시 실행 상한 (config.mlx_gate_concurrency). 현 mlx_vlm = continuous batching
# (2026-06-11 밤 6~8 concurrent 실측 정상). 2026-06-15: 2→4 — digest/briefing 합성을
# 이 단일 게이트(BACKGROUND 우선순위)로 라우팅하며 digest(클러스터 44~68)가 하드캡 내
# 완료되도록 동시성 확보. ask/eid(FOREGROUND)는 큐 점프라 영향 최소. 되돌리면 구 동작.
# mlx gate 동시 실행 상한 (2026-06-12 fair-share): 구 "1 고정" 룰의 전제(single-inference
# 서버)가 소멸 — 현 mlx_vlm 은 continuous batching (2026-06-11 밤 6~8 concurrent 실측 정상).
# 2 = 워커 LLM 호출과 인터랙티브(ask/eid)가 서로 안 막힘 + 집계 throughput ~1.8배.
# 게이트(상한+우선순위)는 유지 — thundering herd 방지. 1 로 되돌리면 구 동작.
mlx_gate_concurrency: 2
# 2026-06-15: digest/briefing 생성 LLM 파라미터 (모델 교체 후 단일소스, 상세 = config.py).
# 구 하드코딩 25s(빠른 Gemma)가 Qwen 27B(콜당 ~90~300s) 교체 sweep 누락 → digest 600s
# 초과·briefing 4/4 폴백. 동시성은 위 mlx_gate_concurrency 가 담당(별 키 없음).
digest_llm_timeout_s: 300
digest_llm_attempts: 2
digest_pipeline_hard_cap_s: 5400
+32 -24
View File
@@ -54,27 +54,24 @@ services:
start_period: 180s
restart: unless-stopped
# MinerU 2.5 VLM PDFmarkdown 추출 — ★ marker-service 대체(컷오버 2026-06-18, A/B 8/8 PASS).
# 단일카드 markdown VRAM ~10GB(marker)→~5.9GB 고정. fastapi 가 MARKER_ENDPOINT 로 호출.
# 동기 do_parse 버그 회피 위해 server.py 는 async aio_do_parse 사용. 포트 3301.
mineru-service:
build: ./services/mineru
# Phase 1B (2026-05-01): PDFmarkdown 변환. ocr-service 와 별도 컨테이너 (deps 충돌 회피).
marker-service:
build: ./services/marker
ports:
- "127.0.0.1:3301:3301"
- "127.0.0.1:3300:3300"
expose:
- "3301"
- "3300"
environment:
# vlm-engine = 순수 VLM 단일모델. 기본 hybrid-engine 은 다중모델 로드 = OOM(반드시 명시).
- MINERU_BACKEND=vlm-engine
- MINERU_LANG=${MINERU_LANG:-korean}
# 공유 16GB 카드 공존: 절대 VRAM 캡(GB, 공유카드 robust) + vLLM 분율 캡 병용.
- MINERU_VIRTUAL_VRAM_SIZE=${MINERU_VIRTUAL_VRAM_SIZE:-6}
- MINERU_GPU_MEMORY_UTILIZATION=${MINERU_GPU_MEMORY_UTILIZATION:-0.40}
- MINERU_PRELOAD=${MINERU_PRELOAD:-1}
- HF_HOME=/models/huggingface
- TORCH_HOME=/models/torch
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~3.5GB) 해제가 90% 봉투의 전제.
# /ready 는 idle 에서도 200 (fastapi depends_on service_healthy 유지).
# 롤백 = MARKER_PRELOAD=1 + MARKER_IDLE_UNLOAD_MINUTES=0.
- MARKER_PRELOAD=0
- MARKER_IDLE_UNLOAD_MINUTES=${MARKER_IDLE_UNLOAD_MINUTES:-30}
volumes:
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
- mineru_models:/root/.cache
ipc: host # vLLM 공유메모리 — 공식 run 의 --ipc=host 대응.
- marker_models:/models
deploy:
resources:
reservations:
@@ -83,11 +80,11 @@ services:
count: 1
capabilities: [gpu]
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3301/ready"]
test: ["CMD", "curl", "-f", "http://localhost:3300/ready"]
interval: 30s
timeout: 10s
retries: 3
start_period: 900s # VLM 모델 lazy 다운로드(~2.4GB)+엔진 로드 여유.
start_period: 300s
restart: unless-stopped
stt-service:
@@ -152,7 +149,7 @@ services:
# → 32 한도 초과 → 413. 64 로 늘림.
# GPU VRAM free 6199MiB 충분. baseline path (MAX_RERANK_INPUT=200) 영향 0.
- MAX_BATCH_TOKENS=16384
- MAX_CLIENT_BATCH_SIZE=256 # 2026-06-18 fix: 64→256, MAX_RERANK_INPUT=200 커버 (batch>64 ERROR=RRF silent fallback 해소; MAX_BATCH_TOKENS가 VRAM 상한이라 entries 증가는 VRAM 무관)
- MAX_CLIENT_BATCH_SIZE=64
- MAX_CONCURRENT_REQUESTS=4
volumes:
- reranker_cache:/data
@@ -171,6 +168,19 @@ services:
start_period: 120s
restart: unless-stopped
ai-gateway:
build: ./gpu-server/services/ai-gateway
ports:
- "127.0.0.1:8081:8080"
environment:
- PRIMARY_ENDPOINT=http://100.76.254.116:8801/v1/chat/completions
- FALLBACK_ENDPOINT=http://ollama:11434/v1/chat/completions
- CLAUDE_API_KEY=${CLAUDE_API_KEY:-}
- DAILY_BUDGET_USD=${DAILY_BUDGET_USD:-5.00}
# depends_on: ollama 제거 (2026-06-08) — ollama 서비스가 standalone 으로 이관됨.
# FALLBACK_ENDPOINT 의 ollama:11434 는 standalone(동일 hostname, DS 망 부착)으로 해소.
restart: unless-stopped
fastapi:
build: ./app
ports:
@@ -187,8 +197,7 @@ services:
condition: service_healthy
kordoc-service:
condition: service_healthy
# 마크다운 엔진 = mineru-service (marker-service 제거 2026-06-18, 롤백=git history).
mineru-service:
marker-service:
condition: service_healthy
env_file:
- credentials.env
@@ -196,8 +205,7 @@ services:
- DATABASE_URL=postgresql+asyncpg://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
- KORDOC_ENDPOINT=http://kordoc-service:3100
- OCR_ENDPOINT=http://ocr-service:3200
# ★ 컷오버 2026-06-18: marker-service:3300 → mineru-service:3301 (동일 /convert 계약).
- MARKER_ENDPOINT=http://mineru-service:3301
- MARKER_ENDPOINT=http://marker-service:3300
- MARKER_CONTAINER_PATH_PREFIX=/documents
# 2026-05-08 (D9 Track B revised): GPU stt-service 정식 승격, 내부 DNS 사용.
- STT_ENDPOINT=http://stt-service:3300
@@ -275,4 +283,4 @@ volumes:
reranker_cache:
ocr_models:
stt_models:
mineru_models:
marker_models:

Some files were not shown because too many files have changed in this diff Show More