Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 53a30449e2 | |||
| ab668d7990 | |||
| dcf99b377e | |||
| 3df0ca53ab | |||
| 7cd8cfde0a | |||
| acd595244a | |||
| 34eb5c9411 | |||
| 8e1645dfc9 | |||
| 55216271a6 | |||
| d0994a1bce | |||
| 448195637b | |||
| aeb9290cbd | |||
| 9bf41d1dfc | |||
| 988631fdb6 | |||
| 6c6b350aca |
@@ -663,8 +663,9 @@ class SectionItem(BaseModel):
|
||||
section_title: str | None = None # raw 마크다운 포함 — 정제는 프런트(headingPath.ts)
|
||||
heading_path: str | None = None # raw
|
||||
level: int | None = None
|
||||
node_type: str | None = None # window | section_split | null
|
||||
node_type: str | None = None # window | chapter_split | clause_split | section_split | null
|
||||
is_leaf: bool
|
||||
char_start: int | None = None # md_content 내 heading offset(UTF-16). jump-target 만 값, 그 외 None (Path B)
|
||||
section_type: str | None = None
|
||||
summary: str | None = None # status='summarized' 인 분석행에만, 그 외 None
|
||||
confidence: float | None = None
|
||||
@@ -703,12 +704,12 @@ async def get_document_sections(
|
||||
await session.execute(
|
||||
sql_text(
|
||||
"""
|
||||
SELECT chunk_id, section_title, heading_path, level, node_type, is_leaf,
|
||||
SELECT chunk_id, section_title, heading_path, level, node_type, is_leaf, char_start,
|
||||
section_type, summary, confidence
|
||||
FROM (
|
||||
SELECT DISTINCT ON (c.id)
|
||||
c.id AS chunk_id, c.chunk_index, c.section_title, c.heading_path,
|
||||
c.level, c.node_type, c.is_leaf,
|
||||
c.level, c.node_type, c.is_leaf, c.char_start,
|
||||
a.section_type,
|
||||
CASE WHEN a.status = 'summarized' THEN a.summary ELSE NULL END AS summary,
|
||||
a.confidence
|
||||
@@ -717,7 +718,7 @@ async def get_document_sections(
|
||||
ON a.chunk_id = c.id AND a.status = 'summarized'
|
||||
WHERE c.doc_id = :doc_id
|
||||
AND c.source_type = 'hier_section'
|
||||
AND c.is_leaf = true
|
||||
AND (c.is_leaf = true OR c.node_type LIKE '%\\_split' ESCAPE '\\')
|
||||
ORDER BY c.id, a.created_at DESC, a.id DESC
|
||||
) t
|
||||
ORDER BY t.chunk_index
|
||||
|
||||
@@ -0,0 +1,177 @@
|
||||
"""크롤링 politeness 코어 (A-4, plan crawl-24x7-1)
|
||||
|
||||
개인 아카이빙 권장치를 그대로 박은 공용 fetch 계층:
|
||||
- per-domain 동시성 1 (asyncio.Lock) + 같은 도메인 연속 요청 5–15초 지연 + jitter
|
||||
- robots.txt 존중 (urllib.robotparser, 24h 캐시) — 비로그인 공개 크롤링 한정.
|
||||
로그인 세션 fetch (B-3) 는 사용자 행위 성격이라 robots 대신 사람 속도가 기준.
|
||||
- 정직 식별 UA + 연락처 (익명 크롤링 트랙. 로그인 세션은 브라우저 UA 유지 — B-3)
|
||||
- 429 = Retry-After 존중 / 5xx = 재시도 가능 / 403 = 차단 신호 (호출측 circuit 연동)
|
||||
|
||||
도메인별 마지막 요청 시각 등 rate 상태는 in-process (영속 워터마크는 DB — news_sources).
|
||||
SSRF 차단은 core.url_validator.validate_feed_url 재사용 (redirect target 재검증 포함).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import random
|
||||
import time
|
||||
import urllib.robotparser
|
||||
from urllib.parse import urljoin, urlparse
|
||||
|
||||
import httpx
|
||||
|
||||
from core.url_validator import validate_feed_url
|
||||
from core.utils import setup_logger
|
||||
|
||||
# bare getLogger 는 root(WARNING) 상속이라 INFO 대기/차단 로그가 드랍됨 — 타 워커와 동일 설정
|
||||
logger = setup_logger("crawl_politeness")
|
||||
|
||||
# 정직 식별 UA + 연락처 — 차단 전 연락 통로 (A-4)
|
||||
CRAWL_UA = "HyungiPKM-Archiver/1.0 (personal archive; +mailto:hyun49196@gmail.com)"
|
||||
|
||||
# 같은 도메인 연속 요청 간격 (초) — 권장치 5–15s + jitter
|
||||
_DOMAIN_DELAY_MIN = 5.0
|
||||
_DOMAIN_DELAY_MAX = 15.0
|
||||
|
||||
_ROBOTS_CACHE_TTL = 24 * 3600 # 24h
|
||||
_MAX_PAGE_BYTES = 5 * 1024 * 1024 # 피드 fetch 와 동일 5MB cap
|
||||
_PAGE_TIMEOUT = 20.0
|
||||
_MAX_REDIRECTS = 3
|
||||
|
||||
_HTML_CONTENT_TYPES = ("text/html", "application/xhtml+xml")
|
||||
|
||||
|
||||
class CrawlFetchError(Exception):
|
||||
"""일시 오류 (5xx / timeout / 네트워크) — 큐 재시도 대상."""
|
||||
|
||||
|
||||
class CrawlBlocked(Exception):
|
||||
"""차단 신호 (403 / 429 / robots disallow) — 재시도보다 backoff/circuit 대상."""
|
||||
|
||||
|
||||
class CrawlSkip(Exception):
|
||||
"""영구 비대상 (비-HTML / 크기 초과 / SSRF 차단 / 4xx) — 격하 처리 대상."""
|
||||
|
||||
|
||||
# 도메인별 직렬화 상태 (in-process)
|
||||
_domain_locks: dict[str, asyncio.Lock] = {}
|
||||
_domain_last_request: dict[str, float] = {}
|
||||
# host → (cached_at, RobotFileParser | None). None = robots 없음/4xx (전부 허용)
|
||||
_robots_cache: dict[str, tuple[float, urllib.robotparser.RobotFileParser | None]] = {}
|
||||
|
||||
|
||||
def _domain_of(url: str) -> str:
|
||||
return (urlparse(url).hostname or "").lower()
|
||||
|
||||
|
||||
def _get_lock(domain: str) -> asyncio.Lock:
|
||||
if domain not in _domain_locks:
|
||||
_domain_locks[domain] = asyncio.Lock()
|
||||
return _domain_locks[domain]
|
||||
|
||||
|
||||
async def _respect_domain_rate(domain: str) -> None:
|
||||
"""같은 도메인 직전 요청에서 5–15초(jitter) 경과할 때까지 대기."""
|
||||
last = _domain_last_request.get(domain)
|
||||
if last is not None:
|
||||
delay = random.uniform(_DOMAIN_DELAY_MIN, _DOMAIN_DELAY_MAX)
|
||||
wait = last + delay - time.monotonic()
|
||||
if wait > 0:
|
||||
# silent sleep 금지 — politeness 동작 검증·운영 관찰 가시성
|
||||
logger.info("[politeness] %s %.1fs 대기", domain, wait)
|
||||
await asyncio.sleep(wait)
|
||||
|
||||
|
||||
async def _fetch_robots(client: httpx.AsyncClient, scheme: str, host: str):
|
||||
"""robots.txt 조회. 4xx/부재 = 전부 허용(None), 5xx/오류 = 보수적으로 이번 사이클 차단."""
|
||||
robots_url = f"{scheme}://{host}/robots.txt"
|
||||
try:
|
||||
resp = await client.get(robots_url, headers={"User-Agent": CRAWL_UA})
|
||||
except httpx.HTTPError as e:
|
||||
raise CrawlFetchError(f"robots.txt 조회 실패: {host}: {e}") from e
|
||||
if resp.status_code >= 500:
|
||||
# 5xx 는 의도 불명 — 표준 관행대로 이번 사이클은 차단 취급
|
||||
raise CrawlFetchError(f"robots.txt 5xx: {host}: {resp.status_code}")
|
||||
if resp.status_code >= 400:
|
||||
return None # robots 없음 = 전부 허용
|
||||
rp = urllib.robotparser.RobotFileParser()
|
||||
rp.parse(resp.text.splitlines())
|
||||
return rp
|
||||
|
||||
|
||||
async def _robots_allows(client: httpx.AsyncClient, url: str) -> bool:
|
||||
parsed = urlparse(url)
|
||||
host = (parsed.hostname or "").lower()
|
||||
cached = _robots_cache.get(host)
|
||||
if cached is None or time.monotonic() - cached[0] > _ROBOTS_CACHE_TTL:
|
||||
rp = await _fetch_robots(client, parsed.scheme or "https", host)
|
||||
_robots_cache[host] = (time.monotonic(), rp)
|
||||
cached = _robots_cache[host]
|
||||
rp = cached[1]
|
||||
if rp is None:
|
||||
return True
|
||||
return rp.can_fetch(CRAWL_UA, url)
|
||||
|
||||
|
||||
async def fetch_page(url: str, *, check_robots: bool = True) -> tuple[str, str]:
|
||||
"""공개 페이지 1건 politeness fetch. (html_text, final_url) 반환.
|
||||
|
||||
- SSRF 검증 (redirect target 포함, news_collector 피드 fetch 와 동일 이중 검증)
|
||||
- per-domain 동시성 1 + 5–15s jitter 지연
|
||||
- 429: Retry-After 로그 후 CrawlBlocked / 403: CrawlBlocked / 그 외 4xx: CrawlSkip
|
||||
- 5xx/timeout: CrawlFetchError (큐 재시도)
|
||||
- 비-HTML content-type / 5MB 초과: CrawlSkip
|
||||
"""
|
||||
try:
|
||||
validate_feed_url(url)
|
||||
except ValueError as e:
|
||||
raise CrawlSkip(f"URL 검증 실패: {e}") from e
|
||||
|
||||
domain = _domain_of(url)
|
||||
async with _get_lock(domain):
|
||||
await _respect_domain_rate(domain)
|
||||
try:
|
||||
async with httpx.AsyncClient(
|
||||
timeout=_PAGE_TIMEOUT, follow_redirects=False,
|
||||
headers={"User-Agent": CRAWL_UA},
|
||||
) as client:
|
||||
if check_robots and not await _robots_allows(client, url):
|
||||
raise CrawlBlocked(f"robots.txt disallow: {url}")
|
||||
|
||||
resp = await client.get(url)
|
||||
redirects = 0
|
||||
while resp.is_redirect and redirects < _MAX_REDIRECTS:
|
||||
location = urljoin(str(resp.request.url), resp.headers.get("location", ""))
|
||||
try:
|
||||
validate_feed_url(location)
|
||||
except ValueError as e:
|
||||
raise CrawlSkip(f"redirect target 차단: {e}") from e
|
||||
# redirect 도 같은 도메인 연속 요청 — 간격은 lock 보유로 충분 (즉시 1회)
|
||||
resp = await client.get(location)
|
||||
redirects += 1
|
||||
if resp.is_redirect:
|
||||
raise CrawlSkip(f"redirect {_MAX_REDIRECTS}회 초과: {url}")
|
||||
except httpx.TimeoutException as e:
|
||||
raise CrawlFetchError(f"timeout: {url}") from e
|
||||
except httpx.HTTPError as e:
|
||||
raise CrawlFetchError(f"네트워크 오류: {url}: {e}") from e
|
||||
finally:
|
||||
_domain_last_request[domain] = time.monotonic()
|
||||
|
||||
if resp.status_code == 429:
|
||||
retry_after = resp.headers.get("retry-after", "")
|
||||
logger.warning("[politeness] 429 %s (Retry-After=%s)", domain, retry_after or "-")
|
||||
raise CrawlBlocked(f"429 rate limited: {url} (Retry-After={retry_after or '-'})")
|
||||
if resp.status_code == 403:
|
||||
raise CrawlBlocked(f"403 forbidden: {url}")
|
||||
if resp.status_code >= 500:
|
||||
raise CrawlFetchError(f"{resp.status_code}: {url}")
|
||||
if resp.status_code >= 400:
|
||||
raise CrawlSkip(f"{resp.status_code}: {url}")
|
||||
|
||||
ct = resp.headers.get("content-type", "").lower()
|
||||
if ct and not any(t in ct for t in _HTML_CONTENT_TYPES):
|
||||
raise CrawlSkip(f"비-HTML content-type: {ct}: {url}")
|
||||
if len(resp.content) > _MAX_PAGE_BYTES:
|
||||
raise CrawlSkip(f"크기 초과: {len(resp.content)} bytes: {url}")
|
||||
|
||||
return resp.text, str(resp.request.url)
|
||||
@@ -106,33 +106,3 @@ END:VCALENDAR"""
|
||||
except Exception as e:
|
||||
logging.getLogger("caldav").error(f"CalDAV VTODO 생성 실패: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# ─── SMTP 헬퍼 ───
|
||||
|
||||
|
||||
def send_smtp_email(
|
||||
host: str,
|
||||
port: int,
|
||||
username: str,
|
||||
password: str,
|
||||
subject: str,
|
||||
body: str,
|
||||
to_addr: str | None = None,
|
||||
):
|
||||
"""Synology MailPlus SMTP로 이메일 발송"""
|
||||
import smtplib
|
||||
from email.mime.text import MIMEText
|
||||
|
||||
to_addr = to_addr or username
|
||||
msg = MIMEText(body, "plain", "utf-8")
|
||||
msg["Subject"] = subject
|
||||
msg["From"] = username
|
||||
msg["To"] = to_addr
|
||||
|
||||
try:
|
||||
with smtplib.SMTP_SSL(host, port, timeout=30) as server:
|
||||
server.login(username, password)
|
||||
server.send_message(msg)
|
||||
except Exception as e:
|
||||
logging.getLogger("smtp").error(f"SMTP 발송 실패: {e}")
|
||||
|
||||
@@ -54,6 +54,7 @@ async def lifespan(app: FastAPI):
|
||||
from workers.law_monitor import run as law_monitor_run
|
||||
from workers.mailplus_archive import run as mailplus_run
|
||||
from workers.news_collector import run as news_collector_run
|
||||
from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run
|
||||
from workers.queue_consumer import consume_queue, consume_markdown_queue
|
||||
from workers.study_queue_consumer import consume_study_queue
|
||||
from workers.study_session_queue_consumer import consume_study_session_queue
|
||||
@@ -121,6 +122,9 @@ async def lifespan(app: FastAPI):
|
||||
# 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source.
|
||||
scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness")
|
||||
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
|
||||
# crawl-24x7 A-2 안전망: fulltext 영구 실패(3회 소진) 문서를 RSS 요약 기준으로
|
||||
# 후속 enqueue (silent skip 누적 방지). 03:40 = dedup_reconcile(03:30) 직후 비충돌 슬롯.
|
||||
scheduler.add_job(fulltext_reconcile_run, CronTrigger(hour=3, minute=40, timezone=KST), id="fulltext_reconcile")
|
||||
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
|
||||
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
|
||||
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
|
||||
|
||||
@@ -118,7 +118,7 @@ class Document(Base):
|
||||
source_channel: Mapped[str | None] = mapped_column(
|
||||
Enum("law_monitor", "devonagent", "email", "web_clip",
|
||||
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
|
||||
"voice", "hermes",
|
||||
"voice", "hermes", "crawl",
|
||||
name="source_channel")
|
||||
)
|
||||
# 외부 채널 (Hermes Discord 등) 의 channel/user/message_id/timestamp 메타.
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import Boolean, DateTime, String, Text
|
||||
from sqlalchemy import Boolean, DateTime, Integer, String, Text
|
||||
from sqlalchemy.dialects.postgresql import JSONB
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from core.database import Base
|
||||
@@ -23,3 +24,22 @@ class NewsSource(Base):
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), default=datetime.now
|
||||
)
|
||||
|
||||
# ── A-3 (plan crawl-24x7-1) 레지스트리 증축 — migration 319 ──
|
||||
# fetch_method: rss / rss+page / sitemap+page / page / api / signal-only
|
||||
fetch_method: Mapped[str] = mapped_column(String(20), default="rss")
|
||||
# fulltext_policy: none(현행) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
|
||||
fulltext_policy: Mapped[str] = mapped_column(String(20), default="none")
|
||||
# NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터 슬롯)
|
||||
auth_profile: Mapped[str | None] = mapped_column(String(50))
|
||||
# 소스별 차등 폴링 (NULL=전역 6h 사이클)
|
||||
poll_interval_minutes: Mapped[int | None] = mapped_column(Integer)
|
||||
# 조건부 GET 워터마크 — 서버가 준 값 그대로 저장·재전송 (A-1)
|
||||
etag: Mapped[str | None] = mapped_column(Text)
|
||||
last_modified: Mapped[str | None] = mapped_column(Text)
|
||||
# CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행 (A-1)
|
||||
feed_content_hash: Mapped[str | None] = mapped_column(String(64))
|
||||
# 추출 실패 잦은 소스의 site-specific CSS selector (A-2)
|
||||
selector_override: Mapped[dict | None] = mapped_column(JSONB)
|
||||
# rdf / table-strip / gn-redirect 등 파서 특이 케이스 (B-5)
|
||||
parser_quirk: Mapped[str | None] = mapped_column(String(30))
|
||||
|
||||
+2
-1
@@ -18,10 +18,11 @@ class ProcessingQueue(Base):
|
||||
stage: Mapped[str] = mapped_column(
|
||||
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
|
||||
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
|
||||
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
|
||||
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
|
||||
Enum(
|
||||
"extract", "classify", "summarize", "embed", "chunk", "preview",
|
||||
"stt", "thumbnail", "deep_summary", "markdown",
|
||||
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
|
||||
name="process_stage",
|
||||
create_type=False,
|
||||
),
|
||||
|
||||
@@ -0,0 +1,36 @@
|
||||
"""source_health 테이블 ORM (A-5, plan crawl-24x7-1)
|
||||
|
||||
news_sources 와 1:1. 소스별 fetch 성공/실패 기록 + circuit breaker 상태.
|
||||
silent skip 누적 방지의 가시성 기반 — A-8 헬스 패널이 읽는다.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from core.database import Base
|
||||
|
||||
|
||||
class SourceHealth(Base):
|
||||
__tablename__ = "source_health"
|
||||
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
source_id: Mapped[int] = mapped_column(
|
||||
Integer, ForeignKey("news_sources.id", ondelete="CASCADE"), nullable=False
|
||||
)
|
||||
consecutive_failures: Mapped[int] = mapped_column(Integer, default=0)
|
||||
total_fetches: Mapped[int] = mapped_column(BigInteger, default=0)
|
||||
total_failures: Mapped[int] = mapped_column(BigInteger, default=0)
|
||||
last_success_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
last_error: Mapped[str | None] = mapped_column(Text)
|
||||
last_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
last_fetch_items: Mapped[int | None] = mapped_column(Integer)
|
||||
# 200 인데 entries 0 인 연속 fetch 횟수 (304/해시동일은 미집계 — 피드 부패 신호 전용)
|
||||
empty_streak: Mapped[int] = mapped_column(Integer, default=0)
|
||||
# closed(정상) / open(연속 실패 → 지수 backoff) / disabled(임계 초과, 수동 복구 대상)
|
||||
circuit_state: Mapped[str] = mapped_column(String(10), default="closed")
|
||||
circuit_opened_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime(timezone=True), default=datetime.now
|
||||
)
|
||||
+10
-3
@@ -17,10 +17,17 @@ python-multipart>=0.0.9
|
||||
jinja2>=3.1.0
|
||||
feedparser>=6.0.0
|
||||
pymupdf>=1.24.0
|
||||
# Web/Blog ingest (devonagent 트랙) — HTML 본문 정화 4-tier fallback
|
||||
trafilatura>=1.12.0
|
||||
# Web/Blog ingest (devonagent 트랙) + 뉴스 fulltext 승격 (crawl-24x7 A-2) — 4-tier fallback.
|
||||
# trafilatura 는 단일 메인테이너 리스크로 exact pin (A-2 결정).
|
||||
trafilatura==2.1.0
|
||||
readability-lxml>=0.8.1
|
||||
markdownify>=0.13.1
|
||||
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1). hwp 는 LibreOffice+markdownify 경로.
|
||||
# tier-4 (bs4) 가 직접 import — 전이 의존 가정 제거 (crawl-24x7 A-2)
|
||||
beautifulsoup4>=4.12.0
|
||||
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1).
|
||||
# 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정.
|
||||
markitdown[docx,xlsx,pptx]>=0.1.0
|
||||
# .hwp(HWP5 binary) → md: 순수 Python HWP5 전용 변환기(CLI hwp5html). LibreOffice 번들 libhwplo
|
||||
# 필터가 실제 한컴 HWP5 를 못 읽어 전건 실패 → pyhwp 로 교체(2026-06-09). six = pyhwp 의 미선언 런타임 의존성.
|
||||
pyhwp>=0.1b15
|
||||
six>=1.16.0
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
from __future__ import annotations
|
||||
import re
|
||||
import hashlib
|
||||
import unicodedata
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
STRUCTURE_SPLIT_THRESHOLD = 4000
|
||||
@@ -27,6 +28,17 @@ _KO_JEOL = re.compile(r'^\s*(?P<title>제\s*\d+\s*절\b.*)$')
|
||||
_KO_JO = re.compile(r'^\s*(?P<title>제\s*\d+\s*조\b.*)$')
|
||||
_ENG = re.compile(r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+[\dIVXLA-Z]+\b.*)$')
|
||||
|
||||
# 코드펜스 경계 (FE outlineAnchors.ts:60 `/^\s{0,3}(```|~~~)/` 와 동일). 펜스 내부 라인은
|
||||
# heading 미탐지 — 코드블록 안 '# foo' 가 가짜 절을 만들지 않게(O3).
|
||||
_FENCE = re.compile(r'^\s{0,3}(```|~~~)')
|
||||
|
||||
|
||||
def _utf16_units(s: str) -> int:
|
||||
"""JS 문자열 .length(= UTF-16 code unit 수) 와 동일. astral(BMP 밖)=surrogate pair=2 units.
|
||||
FE 의 `raw.length` / `out.slice(off)` 가 UTF-16 code unit 단위라 char_start 도 같은 단위여야 함.
|
||||
len(s.encode('utf-16-le'))//2 = code unit 수 (utf-16-le 는 BOM 미부착)."""
|
||||
return len(s.encode("utf-16-le")) // 2
|
||||
|
||||
|
||||
@dataclass
|
||||
class HierNode:
|
||||
@@ -39,6 +51,9 @@ class HierNode:
|
||||
text: str
|
||||
is_leaf: bool = True
|
||||
chunk_content_hash: str = field(default="")
|
||||
# md_content 내 heading 라인 시작 offset(UTF-16 code unit). jump-target(비-window leaf / %_split parent)만
|
||||
# 값 보유; window-child / preamble(title None) = None(점프 타깃 아님, g0-t2/g2-t3).
|
||||
char_start: int | None = None
|
||||
|
||||
def finalize_hash(self):
|
||||
self.chunk_content_hash = hashlib.sha256(self.text.encode("utf-8")).hexdigest()
|
||||
@@ -57,33 +72,64 @@ def _detect_heading(line: str) -> tuple[int, str, str] | None:
|
||||
return None
|
||||
|
||||
|
||||
def _segment(text: str) -> list[tuple[int, str | None, str | None, str]]:
|
||||
"""heading 경계로 분할 → [(level, title, node_type, segment_text), ...].
|
||||
def _segment(text: str) -> list[tuple[int, str | None, str | None, str, int | None]]:
|
||||
"""heading 경계로 분할 → [(level, title, node_type, segment_text, char_start), ...].
|
||||
|
||||
preamble(첫 heading 이전 본문) = (0, None, None, text).
|
||||
라인 모델 = FE outlineAnchors.ts:55-65 와 동일: `text.split('\n')` + UTF-16 code-unit offset +
|
||||
코드펜스 추적(splitlines(keepends=True) 폐기 — JS 와 라인경계 \v\f\x1c… 7종을 다르게 쪼개는 문제 제거).
|
||||
char_start = 그 segment 첫 라인(=heading 라인)의 UTF-16 offset. preamble = None(점프 타깃 아님).
|
||||
node.text 보존(라인모델 변경에 hash-neutral): 그룹을 '\n'.join 하되 마지막 그룹이 아니면 분리용 '\n'
|
||||
을 그 그룹 끝에 되돌려 붙여(= splitlines(keepends) 가 마지막 라인에 \n 을 남기던 동작) 원문과 동일.
|
||||
CR 미strip(CRLF 면 '\r' 잔류 → FE raw.length 와 동일), NFC 무변환.
|
||||
"""
|
||||
lines = text.splitlines(keepends=True)
|
||||
segs: list[tuple[int, str | None, str | None, list[str]]] = []
|
||||
cur: tuple[int, str | None, str | None, list[str]] | None = None
|
||||
preamble: list[str] = []
|
||||
for ln in lines:
|
||||
h = _detect_heading(ln.rstrip("\n"))
|
||||
if h:
|
||||
if cur is not None:
|
||||
segs.append(cur)
|
||||
elif preamble and "".join(preamble).strip():
|
||||
segs.append((0, None, None, preamble))
|
||||
cur = (h[0], h[1], h[2], [ln])
|
||||
raw_lines = text.split("\n")
|
||||
n = len(raw_lines)
|
||||
# 라인별 (offset, heading) 선계산 — 펜스 내부/경계 라인은 heading 미탐지.
|
||||
offs: list[int] = []
|
||||
headings: list[tuple[int, str, str | None] | None] = []
|
||||
off = 0
|
||||
in_fence = False
|
||||
for raw in raw_lines:
|
||||
fence_toggle = bool(_FENCE.match(raw))
|
||||
fenced_here = in_fence or fence_toggle
|
||||
offs.append(off)
|
||||
headings.append(None if fenced_here else _detect_heading(raw))
|
||||
if fence_toggle:
|
||||
in_fence = not in_fence
|
||||
off += _utf16_units(raw) + 1 # '\n'
|
||||
|
||||
# 그룹 경계 = 첫 heading 이전(preamble) + 각 heading 라인. (start_idx, meta) 리스트.
|
||||
first_heading = next((i for i in range(n) if headings[i] is not None), None)
|
||||
starts: list[int] = []
|
||||
metas: list[tuple[int, str | None, str | None] | None] = []
|
||||
if first_heading is None:
|
||||
starts.append(0)
|
||||
metas.append(None) # 전체 = preamble
|
||||
else:
|
||||
if first_heading > 0:
|
||||
starts.append(0)
|
||||
metas.append(None)
|
||||
for i in range(first_heading, n):
|
||||
h = headings[i]
|
||||
if h is not None:
|
||||
starts.append(i)
|
||||
metas.append((h[0], h[1], h[2]))
|
||||
|
||||
segs: list[tuple[int, str | None, str | None, str, int | None]] = []
|
||||
for gi, s_idx in enumerate(starts):
|
||||
e_idx = starts[gi + 1] if gi + 1 < len(starts) else n
|
||||
seg_text = "\n".join(raw_lines[s_idx:e_idx])
|
||||
if e_idx < n:
|
||||
seg_text += "\n" # 분리용 '\n' 을 앞 그룹에 귀속(splitlines keepends 동치)
|
||||
meta = metas[gi]
|
||||
if meta is None:
|
||||
if not seg_text.strip(): # 빈 preamble 폐기(기존 동작)
|
||||
continue
|
||||
segs.append((0, None, None, seg_text, None))
|
||||
else:
|
||||
if cur is None:
|
||||
preamble.append(ln)
|
||||
else:
|
||||
cur[3].append(ln)
|
||||
if cur is not None:
|
||||
segs.append(cur)
|
||||
elif preamble and "".join(preamble).strip():
|
||||
segs.append((0, None, None, preamble))
|
||||
return [(lvl, title, nt, "".join(body)) for (lvl, title, nt, body) in segs]
|
||||
lvl, title, nt = meta
|
||||
segs.append((lvl, title, nt, seg_text, offs[s_idx]))
|
||||
return segs
|
||||
|
||||
|
||||
def _window_split(body: str, target: int) -> list[str]:
|
||||
@@ -139,7 +185,7 @@ def build_hier_tree(
|
||||
chain.append(title)
|
||||
return " > ".join(chain) if chain else None
|
||||
|
||||
for lvl, title, nt, body in segs:
|
||||
for lvl, title, nt, body, cstart in segs:
|
||||
norm = 0 if lvl == 0 else min(level_map[lvl], max_depth)
|
||||
# 부모 = 스택에서 norm 보다 작은 가장 가까운 노드
|
||||
while stack and stack[-1][0] >= norm:
|
||||
@@ -147,8 +193,11 @@ def build_hier_tree(
|
||||
parent_idx = stack[-1][1] if stack else None
|
||||
idx = len(nodes)
|
||||
hp = _heading_path(parent_idx, title)
|
||||
# char_start = 생성 시점 할당(window-split 가 n.text 를 heading 라인으로 truncate 하기 전에 박제).
|
||||
# split-parent 가 돼도 이 값(heading 라인 offset)이 windowed section 단일 jump target 으로 보존된다.
|
||||
node = HierNode(idx=idx, parent_idx=parent_idx, level=norm, node_type=nt,
|
||||
section_title=title, heading_path=hp, text=body, is_leaf=True)
|
||||
section_title=title, heading_path=hp, text=body, is_leaf=True,
|
||||
char_start=cstart)
|
||||
nodes.append(node)
|
||||
if norm > 0:
|
||||
stack.append((norm, idx))
|
||||
@@ -178,14 +227,17 @@ def build_hier_tree(
|
||||
n.is_leaf = False
|
||||
heading_line = (n.text.splitlines() or [""])[0]
|
||||
n.text = heading_line # 중복 저장 회피 (full body 는 window child 가 보유)
|
||||
n.node_type = (n.node_type or "section") + "_split"
|
||||
n.node_type = (n.node_type or "section") + "_split" # chapter_split/clause_split/section_split
|
||||
# n.char_start 보존 = windowed section 의 단일 jump target(생성시점 heading offset).
|
||||
base_level = min(n.level + 1, max_depth)
|
||||
for wtext in wins:
|
||||
ci = len(final)
|
||||
# window child = char_start None(_window_split 가 whitespace buf 를 drop 해
|
||||
# char-preserving 이 아니므로 합산 offset 이 거짓; 점프 타깃도 아님, B1/#1).
|
||||
final.append(HierNode(
|
||||
idx=ci, parent_idx=n.idx, level=base_level, node_type="window",
|
||||
section_title=n.section_title, heading_path=n.heading_path,
|
||||
text=wtext, is_leaf=True))
|
||||
text=wtext, is_leaf=True, char_start=None))
|
||||
for n in final:
|
||||
n.finalize_hash()
|
||||
return final
|
||||
@@ -209,6 +261,24 @@ def coverage_stats(text: str, nodes: list[HierNode]) -> dict:
|
||||
# 일반 네비: 자식 level > 부모 level 만 보장
|
||||
if n.level <= nodes[n.parent_idx].level and nodes[n.parent_idx].level > 0:
|
||||
bad_level += 1
|
||||
# char_start O5 검증 (UTF-16 슬라이스 == heading 라인) + NFC telemetry (g2-t4).
|
||||
# 검증은 FE 가 실제 쓰는 방식과 동일: md.encode('utf-16-le')[2*cs:2*(cs+n)].decode == heading_line
|
||||
# (Python code-point 슬라이스 md[cs:cs+n] 가 아님 — astral 시 어긋남).
|
||||
md_u16 = text.encode("utf-16-le")
|
||||
cs_total = cs_verified = 0
|
||||
for n in nodes:
|
||||
if n.char_start is None:
|
||||
continue
|
||||
cs_total += 1
|
||||
first_line = n.text.split("\n", 1)[0]
|
||||
nu = _utf16_units(first_line)
|
||||
seg = md_u16[2 * n.char_start: 2 * (n.char_start + nu)]
|
||||
try:
|
||||
if seg.decode("utf-16-le") == first_line:
|
||||
cs_verified += 1
|
||||
except UnicodeDecodeError:
|
||||
pass
|
||||
non_nfc = 1 if unicodedata.normalize("NFC", text) != text else 0
|
||||
return {
|
||||
"nodes": len(nodes), "leaves": len(leaves),
|
||||
"coverage_ratio": round(leaf_chars / base, 4) if base else 0,
|
||||
@@ -217,4 +287,6 @@ def coverage_stats(text: str, nodes: list[HierNode]) -> dict:
|
||||
"level_dist": {l: sum(1 for n in nodes if n.level == l) for l in sorted({n.level for n in nodes})},
|
||||
"leaf_len_min": min((len(n.text) for n in leaves), default=0),
|
||||
"leaf_len_max": max((len(n.text) for n in leaves), default=0),
|
||||
"char_start_total": cs_total, "char_start_verified": cs_verified,
|
||||
"non_nfc": non_nfc,
|
||||
}
|
||||
|
||||
@@ -58,16 +58,16 @@ async def persist_hier_tree(
|
||||
INSERT INTO document_chunks
|
||||
(doc_id, chunk_index, chunk_type, section_title, heading_path, domain_category,
|
||||
text, embedding, source_type, chunker_version, chunk_content_hash,
|
||||
parent_id, level, node_type, is_leaf, in_corpus)
|
||||
parent_id, level, node_type, is_leaf, in_corpus, char_start)
|
||||
VALUES (:d, :ci, :ct, :stt, :hp, :dc, :tx,
|
||||
cast(cast(:emb AS text) AS vector),
|
||||
:src, :cv, :hash, :pid, :lvl, :nt, :leaf, false)
|
||||
:src, :cv, :hash, :pid, :lvl, :nt, :leaf, false, :cs)
|
||||
RETURNING id"""), {
|
||||
"d": doc_id, "ci": base + n.idx, "ct": chunk_type,
|
||||
"stt": n.section_title, "hp": n.heading_path, "dc": domain_category,
|
||||
"tx": n.text, "emb": emb_str, "src": SOURCE_TYPE, "cv": CHUNKER_VERSION,
|
||||
"hash": n.chunk_content_hash, "pid": parent_db, "lvl": n.level,
|
||||
"nt": n.node_type, "leaf": n.is_leaf})
|
||||
"nt": n.node_type, "leaf": n.is_leaf, "cs": n.char_start})
|
||||
idx_to_dbid[n.idx] = db_id
|
||||
await session.commit()
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown + SMTP
|
||||
"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown 생성
|
||||
|
||||
v1 scripts/pkm_daily_digest.py에서 포팅.
|
||||
DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
|
||||
SMTP 발송은 2026-06-10 제거 (한 번도 전달 성공한 적 없는 기능 — 폐기 결정).
|
||||
"""
|
||||
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from zoneinfo import ZoneInfo
|
||||
from pathlib import Path
|
||||
@@ -13,7 +13,7 @@ from sqlalchemy import func, select, text
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import send_smtp_email, setup_logger
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.queue import ProcessingQueue
|
||||
|
||||
@@ -133,16 +133,4 @@ async def run():
|
||||
if old.stat().st_mtime < cutoff:
|
||||
old.rename(archive_dir / old.name)
|
||||
|
||||
# ─── SMTP 발송 ───
|
||||
smtp_host = os.getenv("MAILPLUS_HOST", "")
|
||||
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
|
||||
smtp_user = os.getenv("MAILPLUS_USER", "")
|
||||
smtp_pass = os.getenv("MAILPLUS_PASS", "")
|
||||
if smtp_host and smtp_user:
|
||||
send_smtp_email(
|
||||
smtp_host, smtp_port, smtp_user, smtp_pass,
|
||||
f"PKM 다이제스트 — {date_display}",
|
||||
markdown,
|
||||
)
|
||||
|
||||
logger.info(f"다이제스트 생성 완료: {digest_path}")
|
||||
|
||||
@@ -0,0 +1,226 @@
|
||||
"""fulltext 승격 워커 (A-2 + A-7, plan crawl-24x7-1)
|
||||
|
||||
news_collector 가 fulltext_policy='page' 소스의 기사에 enqueue 한 'fulltext' stage 를 소비:
|
||||
기사 페이지 politeness fetch (A-4) → 원본 HTML NAS gzip 보존 (A-7)
|
||||
→ extract_worker 4-tier 재사용 (tier 2 sibling .md 는 디스크 원본이 없어 비적용)
|
||||
→ extracted_text/md_content 승격 → summarize + (30일 게이트) embed/chunk enqueue.
|
||||
|
||||
실패 처리 (큐 어휘 = DB enum, 분기만 워커):
|
||||
- 일시 오류 (5xx/timeout) : raise → 큐 재시도 (max_attempts 3)
|
||||
- 차단/비대상 (403/429/robots/비HTML/추출부족): RSS 요약으로 격하(degrade) 후 완료
|
||||
→ summarize/embed/chunk enqueue 보장 (기사 유실 0). 격하 사유는 extract_meta.fulltext 에 기록.
|
||||
- 영구 실패 (3회 소진) : 야간 reconcile_unresolved() 가 summarize 안전망 enqueue
|
||||
([[feedback_silent_skip_accumulation]] — 조건부 skip 이 영구 침묵으로 누적되지 않게).
|
||||
|
||||
승격 게이트: 전 tier 공통 본문 >= 200자 (devonagent 와 달리 tier 4 도 게이트 적용 —
|
||||
페이월/오류 페이지의 nav 찌꺼기를 본문으로 승격하느니 RSS 요약 격하가 낫다).
|
||||
"""
|
||||
|
||||
import gzip
|
||||
import hashlib
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import exists, select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.orm import aliased
|
||||
|
||||
from core.config import settings
|
||||
from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.queue import ProcessingQueue, enqueue_stage
|
||||
from workers.extract_worker import (
|
||||
_WEB_MIN_BODY_LEN,
|
||||
_extract_web_with_bs4,
|
||||
_extract_web_with_readability,
|
||||
_extract_web_with_trafilatura,
|
||||
)
|
||||
|
||||
logger = setup_logger("fulltext_worker")
|
||||
|
||||
# 한국 기사 푸터 1층 후처리 (A-2) — 보수적으로 라인 단위만 제거
|
||||
_FOOTER_PATTERNS = [
|
||||
re.compile(r"^.{0,120}(무단\s*전재|무단\s*복제|재배포\s*금지|저작권자\s*[ⓒ©(]).*$", re.M),
|
||||
re.compile(r"^[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\s*$", re.M), # 단독 이메일 라인
|
||||
re.compile(r"^\s*\S{2,4}\s*기자\s*$", re.M), # 단독 '◯◯◯ 기자' 라인
|
||||
]
|
||||
|
||||
|
||||
def _strip_article_footer(body: str) -> str:
|
||||
for pat in _FOOTER_PATTERNS:
|
||||
body = pat.sub("", body)
|
||||
return re.sub(r"\n{3,}", "\n\n", body).strip()
|
||||
|
||||
|
||||
def _extract_body(html_text: str) -> tuple[str, str | None, str | None]:
|
||||
"""(body, engine, engine_version). 전 tier >= 200자 게이트, 미달이면 ("", None, None)."""
|
||||
body, ver = _extract_web_with_trafilatura(html_text)
|
||||
if body and len(body) >= _WEB_MIN_BODY_LEN:
|
||||
return body, "trafilatura", ver
|
||||
body, ver = _extract_web_with_readability(html_text)
|
||||
if body and len(body) >= _WEB_MIN_BODY_LEN:
|
||||
return body, "readability", ver
|
||||
body, ver = _extract_web_with_bs4(html_text)
|
||||
if body and len(body) >= _WEB_MIN_BODY_LEN:
|
||||
return body, "bs4_text", ver
|
||||
return "", None, None
|
||||
|
||||
|
||||
def _raw_html_path(source_id: int | None, file_hash: str, now: datetime) -> Path:
|
||||
"""A-7 원본 보존 경로 — NAS 본진. 한글 디렉토리의 NFC/NFD 비대칭을 피해 source_id 사용.
|
||||
|
||||
file_hash 는 DB 컬럼이 character(64) 라 32자 해시가 공백 패딩되어 돌아옴 — strip 필수
|
||||
(미적용 시 NAS 파일명에 공백 32개 = 쉘/rsync 함정).
|
||||
"""
|
||||
src_dir = f"src_{source_id}" if source_id is not None else "src_unknown"
|
||||
return (
|
||||
Path(settings.nas_mount_path) / "crawl_raw" / src_dir
|
||||
/ now.strftime("%Y-%m") / f"{file_hash.strip()}.html.gz"
|
||||
)
|
||||
|
||||
|
||||
def _save_raw_html(path: Path, html_text: str) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with gzip.open(path, "wb") as f:
|
||||
f.write(html_text.encode("utf-8", errors="replace"))
|
||||
|
||||
|
||||
async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None:
|
||||
"""승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk."""
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
published_raw = (doc.extract_meta or {}).get("published_at")
|
||||
days_old = 0
|
||||
if published_raw:
|
||||
try:
|
||||
pub_dt = datetime.fromisoformat(published_raw)
|
||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||
except ValueError:
|
||||
days_old = 0 # 파싱 불가 = 신규 취급 (수집 시점 기본과 동일)
|
||||
if days_old <= 30:
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
|
||||
|
||||
def _set_fulltext_meta(doc: Document, **fields) -> None:
|
||||
"""extract_meta.fulltext 갱신 — JSONB 변경 감지를 위해 dict 재할당."""
|
||||
meta = dict(doc.extract_meta or {})
|
||||
meta["fulltext"] = {**meta.get("fulltext", {}), **fields}
|
||||
doc.extract_meta = meta
|
||||
|
||||
|
||||
async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None:
|
||||
"""본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0)."""
|
||||
_set_fulltext_meta(
|
||||
doc, status="degraded", reason=reason[:300],
|
||||
resolved_at=datetime.now(timezone.utc).isoformat(),
|
||||
)
|
||||
await _enqueue_downstream(session, doc)
|
||||
logger.warning(f"[fulltext] doc={doc.id} 격하(RSS 요약 유지): {reason}")
|
||||
|
||||
|
||||
async def process(document_id: int, session: AsyncSession) -> None:
|
||||
"""기사 1건 풀텍스트 승격. queue_consumer 컨벤션 시그니처 (커밋은 consumer 가)."""
|
||||
doc = await session.get(Document, document_id)
|
||||
if not doc:
|
||||
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
|
||||
if not doc.edit_url:
|
||||
await _degrade(session, doc, "edit_url 없음")
|
||||
return
|
||||
|
||||
meta = doc.extract_meta or {}
|
||||
source_id = meta.get("source_id")
|
||||
|
||||
try:
|
||||
html_text, final_url = await fetch_page(doc.edit_url)
|
||||
except (CrawlBlocked, CrawlSkip) as e:
|
||||
await _degrade(session, doc, f"{type(e).__name__}: {e}")
|
||||
return
|
||||
except CrawlFetchError:
|
||||
raise # 일시 오류 — 큐 재시도
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# A-7: 원본 HTML 보존 (추출기 교체 시 전체 재추출 가능 상태 유지)
|
||||
raw_path = _raw_html_path(source_id, doc.file_hash, now)
|
||||
try:
|
||||
_save_raw_html(raw_path, html_text)
|
||||
raw_saved = True
|
||||
except OSError as e:
|
||||
# NAS 일시 장애 시 보존만 누락하고 승격은 진행 — 사유 기록 (silent 누락 회피)
|
||||
raw_saved = False
|
||||
logger.error(f"[fulltext] doc={doc.id} 원본 보존 실패 (승격은 진행): {e}")
|
||||
|
||||
body, engine, engine_ver = _extract_body(html_text)
|
||||
if not engine:
|
||||
await _degrade(session, doc, f"추출 실패 (전 tier < {_WEB_MIN_BODY_LEN}자)")
|
||||
return
|
||||
|
||||
clean_body = _strip_article_footer(body.replace("\x00", ""))
|
||||
if len(clean_body) < _WEB_MIN_BODY_LEN:
|
||||
await _degrade(session, doc, "푸터 제거 후 본문 부족")
|
||||
return
|
||||
|
||||
title = doc.title or ""
|
||||
doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body
|
||||
doc.extracted_at = now
|
||||
doc.extractor_version = f"rss+page@{engine}"
|
||||
doc.md_content = clean_body
|
||||
doc.md_status = "success"
|
||||
doc.md_extraction_engine = engine
|
||||
doc.md_extraction_engine_version = engine_ver
|
||||
doc.md_format_version = "1.0"
|
||||
doc.md_generated_at = now
|
||||
doc.md_source_hash = hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest()
|
||||
doc.md_content_hash = hashlib.sha256(clean_body.encode("utf-8")).hexdigest()
|
||||
doc.md_extraction_error = None # 수집 시점의 '변환 비대상' 마커 해제
|
||||
doc.content_origin = "extracted"
|
||||
doc.file_size = len(doc.extracted_text.encode())
|
||||
_set_fulltext_meta(
|
||||
doc, status="promoted", engine=engine,
|
||||
raw_html_path=str(raw_path) if raw_saved else None,
|
||||
final_url=final_url, body_chars=len(clean_body),
|
||||
resolved_at=now.isoformat(),
|
||||
)
|
||||
|
||||
await _enqueue_downstream(session, doc)
|
||||
logger.info(
|
||||
f"[fulltext/{engine}] doc={doc.id} {len(clean_body)}자 승격 "
|
||||
f"(raw={'saved' if raw_saved else 'MISSING'})"
|
||||
)
|
||||
|
||||
|
||||
async def reconcile_unresolved() -> None:
|
||||
"""안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌
|
||||
뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치."""
|
||||
async with async_session() as session:
|
||||
# 외부 쿼리 FROM 에 ProcessingQueue 가 이미 있어 alias 없이는 auto-correlation 이
|
||||
# 서브쿼리 FROM 을 전부 제거 → InvalidRequestError (queue_consumer.reset_stale_items 패턴)
|
||||
pq = aliased(ProcessingQueue)
|
||||
summarize_q = (
|
||||
select(pq.id)
|
||||
.where(
|
||||
pq.document_id == Document.id,
|
||||
pq.stage == "summarize",
|
||||
)
|
||||
)
|
||||
result = await session.execute(
|
||||
select(Document)
|
||||
.join(ProcessingQueue, ProcessingQueue.document_id == Document.id)
|
||||
.where(
|
||||
ProcessingQueue.stage == "fulltext",
|
||||
ProcessingQueue.status == "failed",
|
||||
Document.source_channel == "news",
|
||||
~exists(summarize_q),
|
||||
)
|
||||
.limit(200)
|
||||
)
|
||||
docs = result.scalars().unique().all()
|
||||
for doc in docs:
|
||||
_set_fulltext_meta(doc, status="failed_reconciled")
|
||||
await _enqueue_downstream(session, doc)
|
||||
if docs:
|
||||
await session.commit()
|
||||
logger.warning(f"[fulltext] reconcile: 영구 실패 {len(docs)}건 RSS 요약으로 후속 enqueue")
|
||||
@@ -15,7 +15,7 @@ from sqlalchemy import select
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import create_caldav_todo, escape_ical_text, file_hash, send_smtp_email, setup_logger
|
||||
from core.utils import create_caldav_todo, file_hash, setup_logger
|
||||
from models.automation import AutomationState
|
||||
from models.document import Document
|
||||
from models.queue import enqueue_stage
|
||||
@@ -337,8 +337,7 @@ def _safe_name(name: str) -> str:
|
||||
|
||||
|
||||
def _send_notifications(law_name: str, proclamation_date: str, revision_type: str):
|
||||
"""CalDAV + SMTP 알림"""
|
||||
# CalDAV
|
||||
"""CalDAV 할일 알림 (SMTP 발송은 2026-06-10 폐기 — CalDAV 가 단일 알림 채널)"""
|
||||
caldav_url = os.getenv("CALDAV_URL", "")
|
||||
caldav_user = os.getenv("CALDAV_USER", "")
|
||||
caldav_pass = os.getenv("CALDAV_PASS", "")
|
||||
@@ -349,15 +348,3 @@ def _send_notifications(law_name: str, proclamation_date: str, revision_type: st
|
||||
description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}",
|
||||
due_days=7,
|
||||
)
|
||||
|
||||
# SMTP
|
||||
smtp_host = os.getenv("MAILPLUS_HOST", "")
|
||||
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
|
||||
smtp_user = os.getenv("MAILPLUS_USER", "")
|
||||
smtp_pass = os.getenv("MAILPLUS_PASS", "")
|
||||
if smtp_host and smtp_user:
|
||||
send_smtp_email(
|
||||
smtp_host, smtp_port, smtp_user, smtp_pass,
|
||||
subject=f"[법령 변경] {law_name} ({revision_type})",
|
||||
body=f"법령명: {law_name}\n공포일자: {proclamation_date}\n개정구분: {revision_type}",
|
||||
)
|
||||
|
||||
@@ -17,7 +17,7 @@ from sqlalchemy import select
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import file_hash, send_smtp_email, setup_logger
|
||||
from core.utils import file_hash, setup_logger
|
||||
from models.automation import AutomationState
|
||||
from models.document import Document
|
||||
from models.queue import enqueue_stage
|
||||
@@ -201,11 +201,4 @@ async def run():
|
||||
|
||||
await session.commit()
|
||||
|
||||
# SMTP 알림
|
||||
smtp_host = os.getenv("MAILPLUS_HOST", "")
|
||||
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
|
||||
if archived and smtp_host:
|
||||
body = f"이메일 {len(archived)}건 수집 완료:\n\n" + "\n".join(f"- {s}" for s in archived)
|
||||
send_smtp_email(smtp_host, smtp_port, user, password, "PKM 이메일 수집 알림", body)
|
||||
|
||||
logger.info(f"이메일 {len(archived)}건 수집 완료 (max_uid={max_uid})")
|
||||
|
||||
@@ -394,13 +394,29 @@ async def _process_office(
|
||||
partial arm 은 PDF split 전용 — office 는 이진이라 여기 없음. 'completed' 는 A-3 직렬화 전용(워커 미사용).
|
||||
quality 는 content-type-aware: office=scored(_compute_quality). 동기 변환은 to_thread 로 event loop 비차단.
|
||||
"""
|
||||
from workers.office_md import OfficeMdError, convert_office_to_md
|
||||
from workers.office_md import (
|
||||
OfficeMdError,
|
||||
convert_hwp_to_md_and_images,
|
||||
convert_office_to_md,
|
||||
)
|
||||
|
||||
is_hwp = Path(container_path).suffix.lower() in (".hwp", ".hwpx")
|
||||
engine = "libreoffice_hwp" if is_hwp else "markitdown"
|
||||
suffix = Path(container_path).suffix.lower()
|
||||
if suffix == ".hwp":
|
||||
engine = "pyhwp" # HWP5 binary: libhwplo 못 읽어 pyhwp 로 교체(2026-06-09)
|
||||
elif suffix == ".hwpx":
|
||||
engine = "libreoffice_hwp" # HWPX 는 pyhwp 미지원 → LibreOffice 폴백
|
||||
else:
|
||||
engine = "markitdown"
|
||||
|
||||
hwp_images: list[dict[str, Any]] = []
|
||||
try:
|
||||
# 동기 subprocess(LibreOffice)/markitdown — 스레드로 빼서 이벤트 루프 비차단.
|
||||
md_content = await asyncio.to_thread(convert_office_to_md, container_path)
|
||||
# 동기 subprocess/markitdown — 스레드로 빼서 이벤트 루프 비차단.
|
||||
if suffix == ".hwp":
|
||||
md_content, hwp_images = await asyncio.to_thread(
|
||||
convert_hwp_to_md_and_images, container_path
|
||||
)
|
||||
else:
|
||||
md_content = await asyncio.to_thread(convert_office_to_md, container_path)
|
||||
except OfficeMdError as exc:
|
||||
logger.warning(f"[marker] office md 변환 실패 id={document_id} engine={engine}: {exc}")
|
||||
await _fail(session, document_id, f"office_md: {str(exc)[:990]}", engine=engine)
|
||||
@@ -410,8 +426,49 @@ async def _process_office(
|
||||
await _fail(session, document_id, f"office_md_unexpected: {str(exc)[:980]}", engine=engine)
|
||||
return
|
||||
|
||||
# ---- 이미지 NAS persist (.hwp 전용) ----
|
||||
# hwp5html 은 bindata raster 를 추출하나 본문 xhtml 에 <img> 앵커가 없어(orphan, --css/--html
|
||||
# 동일) 인라인 위치 복원 불가 → marker(PDF) 의 _persist_images_to_nas 로 NAS 영속 후 md 말미
|
||||
# 갤러리로 부착(docimg: ref = 뷰어 해석). OLE 수식/도형은 앵커도 raster 도 아니라 제외.
|
||||
# docx/xlsx/pptx/hwpx 는 이미지 미처리(기존 동작 유지).
|
||||
saved_images: list[dict[str, Any]] = []
|
||||
orphan_paths: list[str] = []
|
||||
if suffix == ".hwp" and MARKDOWN_IMAGE_PERSIST:
|
||||
if hwp_images:
|
||||
images_resp = [
|
||||
{
|
||||
"bytes_b64": base64.b64encode(im["data"]).decode("ascii"),
|
||||
"format": im.get("format") or "png",
|
||||
"slug": "",
|
||||
"width": None,
|
||||
"height": None,
|
||||
}
|
||||
for im in hwp_images
|
||||
]
|
||||
try:
|
||||
saved_images = _persist_images_to_nas(document_id, images_resp)
|
||||
except OSError as exc:
|
||||
# NAS 일시 끊김 등 — transient. queue retry 로 복구.
|
||||
logger.warning(
|
||||
f"[marker] hwp image persist NAS write failed id={document_id}: "
|
||||
f"{type(exc).__name__}: {exc}"
|
||||
)
|
||||
raise
|
||||
if saved_images:
|
||||
gallery = "\n\n## 첨부 이미지\n\n" + "\n\n".join(
|
||||
f"" for img in saved_images
|
||||
)
|
||||
md_content = md_content + gallery
|
||||
# 재변환 시 현재 saved_images 기준으로 과거 document_images row/NAS 파일 정리.
|
||||
orphan_paths = await _sync_document_images(
|
||||
session, document_id, saved_images, {"engine": engine}
|
||||
)
|
||||
|
||||
# 성공 — 계약상 md_content 는 비공백(빈출력은 raise). quality scored.
|
||||
quality = _compute_quality(md_content, doc.extracted_text or "", {"page_count": None})
|
||||
if saved_images:
|
||||
quality.setdefault("warnings", []).append(f"hwp_images_appended:{len(saved_images)}")
|
||||
|
||||
await session.execute(
|
||||
update(Document).where(Document.id == document_id).values(
|
||||
md_content=md_content,
|
||||
@@ -429,7 +486,21 @@ async def _process_office(
|
||||
)
|
||||
)
|
||||
await session.commit()
|
||||
logger.info(f"[marker] office success id={document_id} engine={engine} len={len(md_content)}")
|
||||
|
||||
# commit 후 고아 NAS 파일 unlink (best-effort, 실패해도 DB 정합 유지).
|
||||
for orphan_path in orphan_paths:
|
||||
try:
|
||||
Path(orphan_path).unlink(missing_ok=True)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
f"[marker] orphan image unlink failed id={document_id} path={orphan_path}: "
|
||||
f"{type(exc).__name__}: {exc}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"[marker] office success id={document_id} engine={engine} "
|
||||
f"len={len(md_content)} images={len(saved_images)}"
|
||||
)
|
||||
|
||||
|
||||
async def _process_split(
|
||||
|
||||
+291
-59
@@ -1,20 +1,30 @@
|
||||
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
|
||||
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장
|
||||
|
||||
plan crawl-24x7-1 A그룹 (2026-06-10):
|
||||
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지
|
||||
A-2 fulltext_policy='page' 소스는 'fulltext' stage 로 본문 승격 위임
|
||||
A-5 source_health 기록 + circuit breaker (소스별 실패 격리)
|
||||
A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정)
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from html import unescape
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
|
||||
|
||||
import feedparser
|
||||
import httpx
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.crawl_politeness import CRAWL_UA
|
||||
from core.database import async_session
|
||||
from core.utils import setup_logger
|
||||
from models.document import Document
|
||||
from models.news_source import NewsSource
|
||||
from models.queue import enqueue_stage
|
||||
from models.source_health import SourceHealth
|
||||
|
||||
logger = setup_logger("news_collector")
|
||||
|
||||
@@ -38,24 +48,43 @@ CATEGORY_MAP = {
|
||||
}
|
||||
|
||||
|
||||
class FeedError(Exception):
|
||||
"""소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록."""
|
||||
|
||||
|
||||
def _normalize_category(raw: str) -> str:
|
||||
"""카테고리 표준화"""
|
||||
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
|
||||
|
||||
|
||||
def _clean_html(text: str) -> str:
|
||||
"""HTML 태그 제거 + 정제"""
|
||||
def _clean_html(text: str, max_len: int | None = 1000) -> str:
|
||||
"""HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용)."""
|
||||
if not text:
|
||||
return ""
|
||||
text = re.sub(r"<[^>]+>", "", text)
|
||||
text = unescape(text)
|
||||
return text.strip()[:1000]
|
||||
text = text.strip()
|
||||
return text if max_len is None else text[:max_len]
|
||||
|
||||
|
||||
# tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키
|
||||
_TRACKING_PREFIXES = ("utm_", "at_", "ns_", "mc_")
|
||||
_TRACKING_PARAMS = {"fbclid", "gclid", "igshid", "ref", "smid", "partner", "cmp", "ocid", "ftag"}
|
||||
|
||||
|
||||
def _normalize_url(url: str) -> str:
|
||||
"""URL 정규화 (tracking params 제거)"""
|
||||
"""URL 정규화 — tracking 파라미터만 제거, 콘텐츠 식별 파라미터는 보존.
|
||||
|
||||
query 전체 제거 금지: hada.io/topic?id= · aitimes articleView.html?idxno= ·
|
||||
HN item?id= 등 query-식별 사이트에서 별개 기사가 같은 URL 로 붕괴된다.
|
||||
저장(edit_url)·조회 양쪽이 이 함수를 공유해야 dedup 이 성립.
|
||||
"""
|
||||
parsed = urlparse(url)
|
||||
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", "", ""))
|
||||
kept = [
|
||||
(k, v) for k, v in parse_qsl(parsed.query, keep_blank_values=True)
|
||||
if not (k.lower().startswith(_TRACKING_PREFIXES) or k.lower() in _TRACKING_PARAMS)
|
||||
]
|
||||
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", urlencode(kept), ""))
|
||||
|
||||
|
||||
def _article_hash(title: str, published: str, source_name: str) -> str:
|
||||
@@ -73,8 +102,104 @@ def _normalize_to_utc(dt) -> datetime:
|
||||
return datetime.now(timezone.utc)
|
||||
|
||||
|
||||
# ── A-5: circuit breaker 정책 ──
|
||||
# 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h)
|
||||
# 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상)
|
||||
# news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리.
|
||||
_CIRCUIT_OPEN_AFTER = 3
|
||||
_CIRCUIT_DISABLE_AFTER = 10
|
||||
_BACKOFF_BASE_HOURS = 6
|
||||
_BACKOFF_CAP_HOURS = 48
|
||||
_EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고
|
||||
|
||||
|
||||
def _should_attempt(health: SourceHealth, now: datetime) -> bool:
|
||||
"""circuit 상태에 따라 이번 사이클 fetch 여부 결정.
|
||||
|
||||
주의 (B-3 계약 ②, r5): 추후 relogin_requested 플래그 소비는 반드시 이
|
||||
open-스킵 분기보다 *앞*에 두어야 한다 — open 이 스케줄 제외 형태가 되면
|
||||
배치 경계가 안 와 플래그가 영원히 미소비(half-open 데드 버튼)가 된다.
|
||||
"""
|
||||
if health.circuit_state == "disabled":
|
||||
return False
|
||||
if health.circuit_state == "open" and health.last_error_at is not None:
|
||||
over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0)
|
||||
backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS)
|
||||
if now - health.last_error_at < timedelta(hours=backoff_h):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None:
|
||||
health.consecutive_failures = 0
|
||||
health.total_fetches += 1
|
||||
health.last_success_at = now
|
||||
health.last_fetch_items = items
|
||||
if health.circuit_state != "closed":
|
||||
logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed")
|
||||
health.circuit_state = "closed"
|
||||
health.circuit_opened_at = None
|
||||
# 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시)
|
||||
if not_modified:
|
||||
pass
|
||||
elif items == 0:
|
||||
health.empty_streak += 1
|
||||
if health.empty_streak >= _EMPTY_STREAK_ALERT:
|
||||
logger.error(
|
||||
f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 "
|
||||
f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)"
|
||||
)
|
||||
else:
|
||||
health.empty_streak = 0
|
||||
health.updated_at = now
|
||||
|
||||
|
||||
def _record_failure(health: SourceHealth, error: str, now: datetime) -> None:
|
||||
health.consecutive_failures += 1
|
||||
health.total_fetches += 1
|
||||
health.total_failures += 1
|
||||
health.last_error = error[:500]
|
||||
health.last_error_at = now
|
||||
health.updated_at = now
|
||||
cf = health.consecutive_failures
|
||||
if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled":
|
||||
health.circuit_state = "disabled"
|
||||
logger.error(
|
||||
f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED "
|
||||
f"(수집 제외, A-8 패널에서 수동 복구 필요)"
|
||||
)
|
||||
elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed":
|
||||
health.circuit_state = "open"
|
||||
health.circuit_opened_at = now
|
||||
logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open")
|
||||
|
||||
|
||||
async def _get_or_create_health(session, source_id: int) -> SourceHealth:
|
||||
result = await session.execute(
|
||||
select(SourceHealth).where(SourceHealth.source_id == source_id)
|
||||
)
|
||||
health = result.scalars().first()
|
||||
if health is None:
|
||||
health = SourceHealth(source_id=source_id)
|
||||
session.add(health)
|
||||
await session.flush()
|
||||
return health
|
||||
|
||||
|
||||
# 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일
|
||||
# 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT
|
||||
# → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄.
|
||||
_run_lock = asyncio.Lock()
|
||||
|
||||
|
||||
async def run():
|
||||
"""뉴스 수집 실행"""
|
||||
async with _run_lock:
|
||||
await _run_locked()
|
||||
|
||||
|
||||
async def _run_locked():
|
||||
now = datetime.now(timezone.utc)
|
||||
async with async_session() as session:
|
||||
result = await session.execute(
|
||||
select(NewsSource).where(NewsSource.enabled == True)
|
||||
@@ -87,17 +212,23 @@ async def run():
|
||||
|
||||
total = 0
|
||||
for source in sources:
|
||||
health = await _get_or_create_health(session, source.id)
|
||||
if not _should_attempt(health, now):
|
||||
logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip")
|
||||
continue
|
||||
try:
|
||||
if source.feed_type == "api":
|
||||
count = await _fetch_api(session, source)
|
||||
count, status = await _fetch_api(session, source)
|
||||
else:
|
||||
count = await _fetch_rss(session, source)
|
||||
count, status = await _fetch_rss(session, source)
|
||||
|
||||
source.last_fetched_at = datetime.now(timezone.utc)
|
||||
_record_success(health, count, status == "not_modified", now)
|
||||
total += count
|
||||
except Exception as e:
|
||||
logger.error(f"[{source.name}] 수집 실패: {e}")
|
||||
source.last_fetched_at = datetime.now(timezone.utc)
|
||||
_record_failure(health, str(e) or repr(e), now)
|
||||
|
||||
await session.commit()
|
||||
logger.info(f"뉴스 수집 완료: {total}건 신규")
|
||||
@@ -108,8 +239,58 @@ ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
|
||||
"application/xml", "text/xml")
|
||||
|
||||
|
||||
async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
|
||||
async def _is_portal_duplicate(session, title: str) -> bool:
|
||||
"""A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스.
|
||||
|
||||
보조 키 = 제목 + 최근 3일 (다른 소스/다른 URL 이므로 1차 키로 안 잡힘).
|
||||
범용 제목 오탐 방지: 12자 미만 제목은 비적용. skip 은 전부 로그 (silent 누락 회피).
|
||||
"""
|
||||
if len(title) < 12:
|
||||
return False
|
||||
cutoff = datetime.now(timezone.utc) - timedelta(days=3)
|
||||
dup = await session.execute(
|
||||
select(Document.id).where(
|
||||
Document.title == title,
|
||||
Document.source_channel == "news",
|
||||
Document.file_format == "article",
|
||||
Document.extracted_at >= cutoff,
|
||||
).limit(1)
|
||||
)
|
||||
return dup.scalars().first() is not None
|
||||
|
||||
|
||||
async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None:
|
||||
"""후속 단계 enqueue.
|
||||
|
||||
fulltext_policy='page' 소스는 'fulltext' stage 만 — summarize/embed/chunk 는
|
||||
fulltext_worker 가 승격(또는 격하) 확정 후 enqueue (RSS 요약 선요약 → 풀텍스트
|
||||
도착 시 summarize_worker 의 '이미 요약 있음 skip' 에 막히는 순서 함정 회피).
|
||||
"""
|
||||
if source.fulltext_policy == "page" and doc.edit_url:
|
||||
await enqueue_stage(session, doc.id, "fulltext")
|
||||
return
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||
if days_old <= 30:
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
|
||||
|
||||
def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict:
|
||||
"""fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록)."""
|
||||
return {
|
||||
"source_id": source.id,
|
||||
"source_name": source.name,
|
||||
"published_at": pub_dt.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
|
||||
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
|
||||
|
||||
반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일.
|
||||
소스 단위 실패는 FeedError raise — run() 이 health 실패로 기록.
|
||||
"""
|
||||
from urllib.parse import urljoin
|
||||
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
|
||||
|
||||
@@ -120,17 +301,24 @@ async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
|
||||
# 순수 HTTP 소스인데 allowlist에 없으면 차단
|
||||
if source.feed_url.startswith("http://") and not http_allowed:
|
||||
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}")
|
||||
return 0
|
||||
raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}")
|
||||
|
||||
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
|
||||
try:
|
||||
validate_feed_url(source.feed_url, allow_http=http_allowed)
|
||||
except ValueError as e:
|
||||
logger.error(f"[{source.name}] URL 검증 실패: {e}")
|
||||
return 0
|
||||
raise FeedError(f"URL 검증 실패: {e}") from e
|
||||
|
||||
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
|
||||
# A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송
|
||||
headers = {"User-Agent": CRAWL_UA}
|
||||
if source.etag:
|
||||
headers["If-None-Match"] = source.etag
|
||||
if source.last_modified:
|
||||
headers["If-Modified-Since"] = source.last_modified
|
||||
|
||||
async with httpx.AsyncClient(
|
||||
timeout=10, follow_redirects=False, headers=headers
|
||||
) as client:
|
||||
resp = await client.get(source.feed_url)
|
||||
|
||||
# redirect 수동 처리 (최대 3회, 각 target 재검증)
|
||||
@@ -142,29 +330,45 @@ async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
try:
|
||||
validate_feed_url(location, allow_http=http_allowed)
|
||||
except ValueError as e:
|
||||
logger.error(f"[{source.name}] redirect target 차단: {e}")
|
||||
return 0
|
||||
raise FeedError(f"redirect target 차단: {e}") from e
|
||||
resp = await client.get(location)
|
||||
redirects += 1
|
||||
if resp.is_redirect:
|
||||
logger.error(f"[{source.name}] redirect 3회 초과")
|
||||
return 0
|
||||
raise FeedError("redirect 3회 초과")
|
||||
|
||||
if resp.status_code == 304:
|
||||
logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송")
|
||||
return 0, "not_modified"
|
||||
|
||||
resp.raise_for_status()
|
||||
|
||||
if len(resp.content) > MAX_RESPONSE_SIZE:
|
||||
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
|
||||
return 0
|
||||
raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes")
|
||||
|
||||
ct = resp.headers.get("content-type", "").lower()
|
||||
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
|
||||
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
|
||||
return 0
|
||||
raise FeedError(f"비정상 content-type: {ct}")
|
||||
|
||||
# A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상
|
||||
# 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전
|
||||
new_etag = resp.headers.get("etag")
|
||||
new_last_modified = resp.headers.get("last-modified")
|
||||
content_hash = hashlib.sha256(resp.content).hexdigest()
|
||||
if source.feed_content_hash == content_hash:
|
||||
logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip")
|
||||
return 0, "not_modified"
|
||||
|
||||
feed = feedparser.parse(resp.text)
|
||||
if feed.bozo and not feed.entries:
|
||||
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
|
||||
return 0
|
||||
raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}")
|
||||
|
||||
# A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면
|
||||
# 이후 304 로 영구 skip 되는 silent corruption 차단
|
||||
if new_etag:
|
||||
source.etag = new_etag
|
||||
if new_last_modified:
|
||||
source.last_modified = new_last_modified
|
||||
source.feed_content_hash = content_hash
|
||||
count = 0
|
||||
|
||||
for entry in feed.entries:
|
||||
@@ -176,21 +380,40 @@ async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
if not summary:
|
||||
summary = title
|
||||
|
||||
# A-6: feed-full 소스만 피드 본문을 전문으로 신뢰 (truncate·광고 삽입이 흔해
|
||||
# 일반 소스의 summary/content:encoded 를 전문으로 오인 저장 금지)
|
||||
body = summary
|
||||
is_feed_full = False
|
||||
if source.fulltext_policy == "feed-full":
|
||||
content_list = entry.get("content") or []
|
||||
raw_body = content_list[0].get("value", "") if content_list else ""
|
||||
full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None)
|
||||
if len(full_body) > len(summary):
|
||||
body = full_body
|
||||
is_feed_full = True
|
||||
|
||||
link = entry.get("link", "")
|
||||
published = entry.get("published_parsed") or entry.get("updated_parsed")
|
||||
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
|
||||
|
||||
# 중복 체크
|
||||
# 중복 체크 — 레거시 행은 raw URL 로 저장돼 있어 normalized/raw 양쪽 매칭.
|
||||
# 교차 게시(같은 기사가 두 피드에 존재)로 2행 이상 매칭될 수 있어 first() 사용
|
||||
# (scalar_one_or_none 은 MultipleResultsFound raise — 2026-06 BBC 수집 중단 원인).
|
||||
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
|
||||
normalized_url = _normalize_url(link)
|
||||
|
||||
existing = await session.execute(
|
||||
select(Document).where(
|
||||
(Document.file_hash == article_id) |
|
||||
(Document.edit_url == normalized_url)
|
||||
)
|
||||
(Document.edit_url.in_([normalized_url, link]))
|
||||
).limit(1)
|
||||
)
|
||||
if existing.scalar_one_or_none():
|
||||
if existing.scalars().first():
|
||||
continue
|
||||
|
||||
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
|
||||
if await _is_portal_duplicate(session, title):
|
||||
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
|
||||
continue
|
||||
|
||||
category = _normalize_category(source.category or "")
|
||||
@@ -200,43 +423,47 @@ async def _fetch_rss(session, source: NewsSource) -> int:
|
||||
file_path=f"news/{source.name}/{article_id}",
|
||||
file_hash=article_id,
|
||||
file_format="article",
|
||||
file_size=len(summary.encode()),
|
||||
file_size=len(body.encode()),
|
||||
file_type="note",
|
||||
title=title,
|
||||
extracted_text=f"{title}\n\n{summary}",
|
||||
extracted_text=f"{title}\n\n{body}",
|
||||
extracted_at=datetime.now(timezone.utc),
|
||||
extractor_version="rss",
|
||||
extractor_version="rss-feed-full" if is_feed_full else "rss",
|
||||
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
|
||||
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
|
||||
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
|
||||
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
|
||||
md_status="skipped",
|
||||
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
|
||||
source_channel="news",
|
||||
data_origin="external",
|
||||
edit_url=link,
|
||||
# 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화
|
||||
edit_url=normalized_url,
|
||||
review_status="approved",
|
||||
ai_domain="News",
|
||||
ai_sub_group=source_short,
|
||||
ai_tags=[f"News/{source_short}/{category}"],
|
||||
extract_meta=_build_extract_meta(source, pub_dt),
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
# summarize + embed + chunk 등록 (classify 불필요)
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||
if days_old <= 30:
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
# summarize + embed + chunk 등록 (classify 불필요).
|
||||
# page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue.
|
||||
await _enqueue_processing(session, doc, source, pub_dt)
|
||||
|
||||
count += 1
|
||||
|
||||
logger.info(f"[{source.name}] RSS → {count}건 수집")
|
||||
return count
|
||||
return count, "ok"
|
||||
|
||||
|
||||
async def _fetch_api(session, source: NewsSource) -> int:
|
||||
async def _fetch_api(session, source: NewsSource) -> tuple[int, str]:
|
||||
"""NYT API 수집 — 키 마스킹 + health degradation"""
|
||||
import os
|
||||
nyt_key = os.getenv("NYT_API_KEY", "")
|
||||
if not nyt_key:
|
||||
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
|
||||
return 0
|
||||
raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
@@ -248,12 +475,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
|
||||
except httpx.HTTPStatusError as e:
|
||||
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
|
||||
safe_url = str(e.request.url).split("?")[0]
|
||||
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
|
||||
return 0
|
||||
raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e
|
||||
except httpx.RequestError as e:
|
||||
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
|
||||
logger.error(f"NYT API 연결 실패: {safe_url}")
|
||||
return 0
|
||||
raise FeedError(f"NYT API 연결 실패: {safe_url}") from e
|
||||
|
||||
data = resp.json()
|
||||
count = 0
|
||||
@@ -277,13 +502,18 @@ async def _fetch_api(session, source: NewsSource) -> int:
|
||||
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
|
||||
normalized_url = _normalize_url(link)
|
||||
|
||||
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
|
||||
existing = await session.execute(
|
||||
select(Document).where(
|
||||
(Document.file_hash == article_id) |
|
||||
(Document.edit_url == normalized_url)
|
||||
)
|
||||
(Document.edit_url.in_([normalized_url, link]))
|
||||
).limit(1)
|
||||
)
|
||||
if existing.scalar_one_or_none():
|
||||
if existing.scalars().first():
|
||||
continue
|
||||
|
||||
if await _is_portal_duplicate(session, title):
|
||||
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
|
||||
continue
|
||||
|
||||
category = _normalize_category(article.get("section", source.category or ""))
|
||||
@@ -299,24 +529,26 @@ async def _fetch_api(session, source: NewsSource) -> int:
|
||||
extracted_text=f"{title}\n\n{summary}",
|
||||
extracted_at=datetime.now(timezone.utc),
|
||||
extractor_version="nyt_api",
|
||||
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
|
||||
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
|
||||
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
|
||||
md_status="skipped",
|
||||
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
|
||||
source_channel="news",
|
||||
data_origin="external",
|
||||
edit_url=link,
|
||||
edit_url=normalized_url,
|
||||
review_status="approved",
|
||||
ai_domain="News",
|
||||
ai_sub_group=source_short,
|
||||
ai_tags=[f"News/{source_short}/{category}"],
|
||||
extract_meta=_build_extract_meta(source, pub_dt),
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
|
||||
await enqueue_stage(session, doc.id, "summarize")
|
||||
days_old = (datetime.now(timezone.utc) - pub_dt).days
|
||||
if days_old <= 30:
|
||||
await enqueue_stage(session, doc.id, "embed")
|
||||
await enqueue_stage(session, doc.id, "chunk")
|
||||
await _enqueue_processing(session, doc, source, pub_dt)
|
||||
|
||||
count += 1
|
||||
|
||||
logger.info(f"[{source.name}] API → {count}건 수집")
|
||||
return count
|
||||
return count, "ok"
|
||||
|
||||
+102
-5
@@ -5,9 +5,11 @@
|
||||
|
||||
전략 (하이브리드):
|
||||
- OOXML(.docx/.xlsx/.pptx) → markitdown ← 신규 의존성(pip install markitdown). lazy import.
|
||||
- .hwp/.hwpx → LibreOffice(headless) → HTML → markdownify ← markdownify 기존 의존성.
|
||||
(LibreOffice 가 hwp import 필터 보유. .hwpx 는 .hwp 와 다른 필터·버전 의존 → E-1: prod LibreOffice
|
||||
버전핀 안전컨텍스트에서 PoC 실행. 표 fidelity 가 진짜 리스크 — 하니스가 측정.)
|
||||
- .hwp(HWP5 binary) → pyhwp hwp5html → HTML → markdownify ← pyhwp+six 의존성.
|
||||
(2026-06-09: LibreOffice 번들 libhwplo 필터가 실제 한컴 HWP5 파일을 못 읽어 rc=0 + 'source file
|
||||
could not be loaded' 로 전건 실패 → 순수 Python HWP5 전용 변환기 pyhwp 로 교체.)
|
||||
- .hwpx → LibreOffice(headless) → HTML → markdownify ← markdownify 기존 의존성.
|
||||
(HWPX(zip)는 pyhwp 미지원 → LibreOffice 폴백 유지. 현재 코퍼스는 전부 HWP5 binary.)
|
||||
|
||||
실패 계약 (C-5 postcondition 의 backend 절반):
|
||||
변환 실패·빈 출력·타임아웃·의존성 부재 → OfficeMdError 를 raise 한다.
|
||||
@@ -18,6 +20,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
@@ -34,6 +37,13 @@ _MIN_BODY_CHARS = 16
|
||||
# 이름) → 기본값 정합. soffice 만 있는 환경은 LIBREOFFICE_BIN 으로 override.
|
||||
_SOFFICE_BIN = os.environ.get("LIBREOFFICE_BIN", "libreoffice")
|
||||
|
||||
# pyhwp 콘솔 스크립트(pip install pyhwp 시 PATH 등록). HWP5 binary(.hwp) 전용.
|
||||
_HWP5HTML_BIN = os.environ.get("HWP5HTML_BIN", "hwp5html")
|
||||
|
||||
# hwp5html 이 bindata/ 로 추출하는 첨부물 중 NAS 영속 대상 raster 확장자.
|
||||
# (OLE 수식/도형은 index.xhtml 에 앵커가 없어 위치 복원 불가 → 영속 제외.)
|
||||
_RASTER_EXTS = {"jpg", "jpeg", "png", "gif", "bmp"}
|
||||
|
||||
|
||||
class OfficeMdError(Exception):
|
||||
"""office/hwp → md 변환 실패 신호. 호출부는 md_status='failed' 로 라우팅."""
|
||||
@@ -50,7 +60,9 @@ def convert_office_to_md(path: str | Path, *, timeout: int = 90) -> str:
|
||||
|
||||
if suffix in OOXML_FORMATS:
|
||||
md = _via_markitdown(p)
|
||||
else: # .hwp / .hwpx
|
||||
elif suffix == ".hwp":
|
||||
md = _via_pyhwp_html(p, timeout=timeout)
|
||||
else: # .hwpx (pyhwp 미지원 → LibreOffice 폴백)
|
||||
md = _via_libreoffice_html(p, timeout=timeout)
|
||||
|
||||
md = (md or "").strip()
|
||||
@@ -74,8 +86,93 @@ def _via_markitdown(path: Path) -> str:
|
||||
return getattr(result, "text_content", "") or ""
|
||||
|
||||
|
||||
def _run_hwp5html(path: Path, *, timeout: int) -> tuple[str, list[dict]]:
|
||||
"""HWP5 binary(.hwp) → (markdown, raster_images). hwp5html 1회 실행 = md + 이미지 동시 추출.
|
||||
|
||||
LibreOffice 번들 libhwplo 필터가 실제 한컴 HWP5 파일을 못 읽어(rc=0 + 'source file could
|
||||
not be loaded') 전건 실패 → 순수 Python HWP5 전용 변환기 pyhwp(CLI hwp5html)로 교체.
|
||||
`_via_libreoffice_html` 와 동일한 실패 계약(rc≠0 또는 출력 부재 → OfficeMdError raise).
|
||||
|
||||
raster_images = [{'data': bytes, 'format': 'jpeg'|'png'|...}] — bindata/ 의 래스터만.
|
||||
hwp5html 은 이미지를 본문 xhtml 에 <img> 로 앵커하지 않으므로(bindata orphan, --css/--html 동일)
|
||||
인라인 위치는 복원 불가 → 호출부가 NAS 영속 후 말미 갤러리로 부착한다.
|
||||
"""
|
||||
try:
|
||||
from markdownify import markdownify # 기존 의존성
|
||||
except ImportError as e: # noqa: BLE001
|
||||
raise OfficeMdError("markdownify 미설치(기존 의존성이어야 함)") from e
|
||||
|
||||
with tempfile.TemporaryDirectory(prefix="office_md_hwp_") as tmp:
|
||||
outdir = Path(tmp)
|
||||
# hwp5html --output <dir> <file.hwp> → <dir>/index.xhtml + styles.css + bindata/
|
||||
cmd = [_HWP5HTML_BIN, "--output", str(outdir), str(path)]
|
||||
try:
|
||||
proc = subprocess.run(
|
||||
cmd, capture_output=True, text=True, timeout=timeout, check=False
|
||||
)
|
||||
except FileNotFoundError as e:
|
||||
raise OfficeMdError(
|
||||
f"pyhwp(hwp5html) 바이너리 부재({_HWP5HTML_BIN}) — `pip install pyhwp six` 필요"
|
||||
) from e
|
||||
except subprocess.TimeoutExpired as e:
|
||||
raise OfficeMdError(f"pyhwp 변환 타임아웃({timeout}s): {path.name}") from e
|
||||
|
||||
index_path = outdir / "index.xhtml"
|
||||
if proc.returncode != 0 or not index_path.exists():
|
||||
raise OfficeMdError(
|
||||
f"pyhwp html 변환 실패: {path.name} (rc={proc.returncode}): "
|
||||
f"{(proc.stderr or proc.stdout or '').strip()[:300]}"
|
||||
)
|
||||
html = index_path.read_text(encoding="utf-8", errors="replace")
|
||||
# hwp5html 의 xhtml 은 최상단 <?xml ...?> 선언을 가짐(LibreOffice 의 .html 경로엔 없음).
|
||||
# markdownify 의 html.parser 가 이를 PI 텍스트('xml version="1.0" encoding="utf-8"?')로
|
||||
# 본문에 흘려 (1) md 최상단 잡음·검색/청크 오염, (2) 빈 body 셸일 때 그 ~34자가
|
||||
# _MIN_BODY_CHARS(16) 빈출력 게이트를 무력화(빈 변환의 false-success) → markdownify 전에 제거.
|
||||
html = re.sub(r"^\s*<\?xml[^>]*\?>\s*", "", html)
|
||||
# 표 보존 위해 markdownify 가 table 을 GFM 으로 — heading_style ATX (libreoffice 경로와 동일).
|
||||
md = markdownify(html, heading_style="ATX", strip=["span", "font"])
|
||||
|
||||
images: list[dict] = []
|
||||
bindata = outdir / "bindata"
|
||||
if bindata.is_dir():
|
||||
for f in sorted(bindata.iterdir()):
|
||||
ext = f.suffix.lower().lstrip(".")
|
||||
if ext in _RASTER_EXTS:
|
||||
images.append({
|
||||
"data": f.read_bytes(),
|
||||
"format": "jpeg" if ext == "jpg" else ext,
|
||||
})
|
||||
return md, images
|
||||
|
||||
|
||||
def _via_pyhwp_html(path: Path, *, timeout: int) -> str:
|
||||
"""HWP5 binary(.hwp) → markdown (이미지 제외). convert_office_to_md 단일 텍스트 경로용."""
|
||||
md, _images = _run_hwp5html(path, timeout=timeout)
|
||||
return md
|
||||
|
||||
|
||||
def convert_hwp_to_md_and_images(
|
||||
path: str | Path, *, timeout: int = 90
|
||||
) -> tuple[str, list[dict]]:
|
||||
"""HWP5(.hwp) → (markdown, raster_images). marker_worker 이미지 영속 경로 전용.
|
||||
|
||||
실패/빈출력 계약은 convert_office_to_md 와 동일(OfficeMdError raise / 빈 md 절대 반환 금지).
|
||||
raster_images 원소 = {'data': bytes, 'format': str}; 비어있을 수 있음(이미지 없는 문서).
|
||||
"""
|
||||
p = Path(path)
|
||||
if p.suffix.lower() != ".hwp":
|
||||
raise OfficeMdError(f"convert_hwp_to_md_and_images: .hwp 전용, got {p.suffix!r}")
|
||||
if not p.exists():
|
||||
raise OfficeMdError(f"file not found: {p}")
|
||||
md, images = _run_hwp5html(p, timeout=timeout)
|
||||
md = (md or "").strip()
|
||||
if len(md) < _MIN_BODY_CHARS:
|
||||
raise OfficeMdError(f"empty/too-short conversion ({len(md)} chars) for {p.name}")
|
||||
return md, images
|
||||
|
||||
|
||||
def _via_libreoffice_html(path: Path, *, timeout: int) -> str:
|
||||
"""LibreOffice headless 로 HTML 변환 후 markdownify. hwp/hwpx 용."""
|
||||
"""LibreOffice headless 로 HTML 변환 후 markdownify. hwpx 용(.hwp 는 pyhwp)."""
|
||||
try:
|
||||
from markdownify import markdownify # 기존 의존성
|
||||
except ImportError as e: # noqa: BLE001
|
||||
|
||||
@@ -22,8 +22,11 @@ logger = setup_logger("queue_consumer")
|
||||
# stage별 배치 크기
|
||||
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
|
||||
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
|
||||
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
|
||||
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
|
||||
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
|
||||
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1}
|
||||
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
|
||||
"fulltext": 3}
|
||||
STALE_THRESHOLD_MINUTES = 10
|
||||
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
|
||||
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
|
||||
@@ -35,7 +38,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
|
||||
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
|
||||
MAIN_QUEUE_STAGES = [
|
||||
"extract", "classify", "summarize", "embed", "chunk",
|
||||
"preview", "stt", "thumbnail", "deep_summary",
|
||||
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
|
||||
]
|
||||
MARKDOWN_QUEUE_STAGES = ["markdown"]
|
||||
|
||||
@@ -179,6 +182,7 @@ def _load_workers():
|
||||
from workers.summarize_worker import process as summarize_process
|
||||
from workers.thumbnail_worker import process as thumbnail_process
|
||||
from workers.marker_worker import process as marker_process
|
||||
from workers.fulltext_worker import process as fulltext_process
|
||||
|
||||
return {
|
||||
"extract": extract_process,
|
||||
@@ -195,6 +199,9 @@ def _load_workers():
|
||||
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
|
||||
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
|
||||
"markdown": marker_process,
|
||||
# crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은
|
||||
# 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op).
|
||||
"fulltext": fulltext_process,
|
||||
}
|
||||
|
||||
|
||||
|
||||
+28
-3
@@ -64,6 +64,11 @@ services:
|
||||
environment:
|
||||
- HF_HOME=/models/huggingface
|
||||
- TORCH_HOME=/models/torch
|
||||
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~3.5GB) 해제가 90% 봉투의 전제.
|
||||
# /ready 는 idle 에서도 200 (fastapi depends_on service_healthy 유지).
|
||||
# 롤백 = MARKER_PRELOAD=1 + MARKER_IDLE_UNLOAD_MINUTES=0.
|
||||
- MARKER_PRELOAD=0
|
||||
- MARKER_IDLE_UNLOAD_MINUTES=${MARKER_IDLE_UNLOAD_MINUTES:-30}
|
||||
volumes:
|
||||
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
|
||||
- marker_models:/models
|
||||
@@ -97,6 +102,11 @@ services:
|
||||
- WHISPER_MODEL=${WHISPER_MODEL:-large-v3}
|
||||
- WHISPER_DEVICE=${WHISPER_DEVICE:-cuda}
|
||||
- WHISPER_COMPUTE_TYPE=${WHISPER_COMPUTE_TYPE:-float16}
|
||||
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~4GB) 해제가 90% 봉투의 전제.
|
||||
# 콜드로드 수초~수십 초는 배치 작업이라 무방 (stt_worker read=1800s 가 흡수).
|
||||
# 롤백 = STT_PRELOAD=1 + STT_IDLE_UNLOAD_MINUTES=0.
|
||||
- STT_PRELOAD=0
|
||||
- STT_IDLE_UNLOAD_MINUTES=${STT_IDLE_UNLOAD_MINUTES:-30}
|
||||
deploy:
|
||||
resources:
|
||||
reservations:
|
||||
@@ -105,9 +115,9 @@ services:
|
||||
count: 1
|
||||
capabilities: [gpu]
|
||||
healthcheck:
|
||||
# /ready: CUDA 디바이스 + 모델 적재 둘 다 확인. ready=true 만 healthy 처리.
|
||||
# /health 는 단순 liveness 라 모델 미적재 상태도 healthy 로 잡혀 운영 신호로 부적합.
|
||||
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('ready') else 1)"]
|
||||
# D-1: idle-unload 도입으로 '모델 적재' 는 더 이상 상시 상태가 아님 — cuda 가용성만
|
||||
# healthy 기준. 모델 적재 여부는 /ready 의 models_loaded 필드로 관측(정보성).
|
||||
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('cuda') else 1)"]
|
||||
interval: 30s
|
||||
timeout: 10s
|
||||
retries: 3
|
||||
@@ -229,6 +239,21 @@ services:
|
||||
- fastapi
|
||||
restart: unless-stopped
|
||||
|
||||
# crawl-24x7 A-8 1차: 전 소스 헬스 패널 — 내부 전용 (읽기 전용 SELECT 만).
|
||||
# '내부 전용' 성립 구현 = 별도 바인딩뿐 (r4 결정): Tailscale 인터페이스에만 publish.
|
||||
# 기존 SvelteKit 라우트(vhost=Host 헤더 검사=앱 가드 환원)나 프록시 경로 차단(경로 가드
|
||||
# 회귀)으로 옮기지 말 것. caddy/home-caddy 라우트 추가 금지. fastapi/postgres 바인딩 선례.
|
||||
crawl-health:
|
||||
build: ./services/crawl-health
|
||||
ports:
|
||||
- "100.110.63.63:8765:8765"
|
||||
environment:
|
||||
- CRAWL_HEALTH_DSN=postgresql://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
|
||||
depends_on:
|
||||
postgres:
|
||||
condition: service_healthy
|
||||
restart: unless-stopped
|
||||
|
||||
caddy:
|
||||
image: caddy:2
|
||||
ports:
|
||||
|
||||
@@ -7,8 +7,11 @@
|
||||
import Tabs from '$lib/components/ui/Tabs.svelte';
|
||||
import MarkdownDoc from '$lib/components/MarkdownDoc.svelte';
|
||||
import MarkdownStatusBadge from '$lib/components/MarkdownStatusBadge.svelte';
|
||||
import SectionOutline from '$lib/components/SectionOutline.svelte';
|
||||
import { getViewerType } from '$lib/utils/viewerType';
|
||||
import { isMdSuccess } from '$lib/utils/mdStatus';
|
||||
import { resolveAnchorMap } from '$lib/utils/resolveAnchorMap';
|
||||
import { cleanHeading } from '$lib/utils/headingPath';
|
||||
|
||||
// 편집 미리보기 전용 plain marked (본문 렌더는 MarkdownDoc 가 담당).
|
||||
marked.use({ mangle: false, headerIds: false });
|
||||
@@ -52,9 +55,11 @@
|
||||
async function loadFullDoc(id) {
|
||||
loading = true;
|
||||
rawMarkdown = '';
|
||||
sections = [];
|
||||
try {
|
||||
fullDoc = await api(`/documents/${id}`);
|
||||
viewerType = getViewerType(fullDoc.file_format, fullDoc.source_channel);
|
||||
loadSections(id);
|
||||
|
||||
// 본문 markdown(md/txt) 인데 extracted_text 가 비면 원본 파일 직접 로드.
|
||||
if (viewerType === 'markdown' && !fullDoc.extracted_text) {
|
||||
@@ -88,6 +93,89 @@
|
||||
if (!canShowMarkdown && pdfViewMode === 'markdown') pdfViewMode = 'pdf';
|
||||
});
|
||||
|
||||
// ── 절 목차(개요) rail + 점프 + scroll-spy (outlineAnchors, 경로 A) ──
|
||||
let sections = $state([]);
|
||||
async function loadSections(id) {
|
||||
try {
|
||||
const r = await api(`/documents/${id}/sections`);
|
||||
if (id === doc?.id) sections = r?.sections ?? [];
|
||||
} catch {
|
||||
if (id === doc?.id) sections = [];
|
||||
}
|
||||
}
|
||||
// window 빈제목(31% 노이즈) 등 표시 가능한 제목 없는 항목은 rail 에서 제외(클린업).
|
||||
let outlineSections = $derived(
|
||||
sections.filter(
|
||||
(s) => !!(cleanHeading(s.section_title) || cleanHeading((s.heading_path || '').split('>').pop() || '')),
|
||||
),
|
||||
);
|
||||
// MarkdownDoc 가 실제 렌더하는 텍스트(rail 표시 게이트용).
|
||||
let mdRenderText = $derived.by(() => {
|
||||
if (!fullDoc) return '';
|
||||
if (viewerType === 'pdf') return pdfViewMode === 'markdown' && canShowMarkdown ? (fullDoc.md_content || '') : '';
|
||||
if (viewerType === 'markdown') return fullDoc.extracted_text || rawMarkdown || '';
|
||||
if (viewerType === 'hwp-markdown' || viewerType === 'article') return fullDoc.md_content || fullDoc.extracted_text || '';
|
||||
return '';
|
||||
});
|
||||
// [g5-t3] basis 는 RENDER SITE 별. anchorMap 을 basis 별로 분리 — 같은 component 가 두 basis 를
|
||||
// 공유하면(md_content vs extracted_text) trustBE 가 어긋난다.
|
||||
// - md_content site(pdf-markdown): trustBE=true (BE char_start 1순위, 비면 내부 string-match 폴백).
|
||||
// - extracted_text site(3-pane markdown): trustBE=false (char_start 는 md_content offset 이라 무효 → 무조건 폴백).
|
||||
let mdBasisText = $derived.by(() => {
|
||||
if (!fullDoc) return '';
|
||||
if (viewerType === 'pdf') return pdfViewMode === 'markdown' && canShowMarkdown ? (fullDoc.md_content || '') : '';
|
||||
return '';
|
||||
});
|
||||
let extractedBasisText = $derived.by(() => {
|
||||
if (!fullDoc) return '';
|
||||
if (viewerType === 'markdown') return fullDoc.extracted_text || rawMarkdown || '';
|
||||
return '';
|
||||
});
|
||||
let anchorMapMd = $derived(
|
||||
sections.length && mdBasisText ? resolveAnchorMap(mdBasisText, sections, { trustBE: true }).anchors : {},
|
||||
);
|
||||
let anchorMapExtracted = $derived(
|
||||
sections.length && extractedBasisText ? resolveAnchorMap(extractedBasisText, sections, { trustBE: false }).anchors : {},
|
||||
);
|
||||
let showRail = $derived(outlineSections.length > 0 && !!mdRenderText);
|
||||
|
||||
let scrollEl = $state();
|
||||
let activeKey = $state(null);
|
||||
function jumpTo(chunkId) {
|
||||
const el = scrollEl?.querySelector(`#sec-${chunkId}`);
|
||||
if (el) el.scrollIntoView({ block: 'start', behavior: 'smooth' });
|
||||
}
|
||||
// scroll-spy: scrollEl 내 .md-anchor 중 컨테이너 상단(+120) 지난 마지막 = 현재 절.
|
||||
$effect(() => {
|
||||
void anchorMapMd;
|
||||
void anchorMapExtracted;
|
||||
const el = scrollEl;
|
||||
if (!el) return;
|
||||
let raf = 0;
|
||||
const onScroll = () => {
|
||||
if (raf) return;
|
||||
raf = requestAnimationFrame(() => {
|
||||
raf = 0;
|
||||
const threshold = el.getBoundingClientRect().top + 120;
|
||||
let cur = null;
|
||||
el.querySelectorAll('.md-anchor').forEach((a) => {
|
||||
if (a.getBoundingClientRect().top <= threshold) cur = a;
|
||||
});
|
||||
if (cur) {
|
||||
const m = cur.id.match(/^sec-(\d+)$/);
|
||||
if (m) activeKey = Number(m[1]);
|
||||
}
|
||||
});
|
||||
};
|
||||
el.addEventListener('scroll', onScroll, { passive: true });
|
||||
const t = setTimeout(onScroll, 0);
|
||||
return () => {
|
||||
el.removeEventListener('scroll', onScroll);
|
||||
clearTimeout(t);
|
||||
if (raf) cancelAnimationFrame(raf);
|
||||
};
|
||||
});
|
||||
|
||||
function startEdit() {
|
||||
editContent = fullDoc?.extracted_text || rawMarkdown || '';
|
||||
editMode = true;
|
||||
@@ -152,8 +240,14 @@
|
||||
</div>
|
||||
{/if}
|
||||
|
||||
<!-- 뷰어 본문 -->
|
||||
<div class="flex-1 overflow-auto min-h-0">
|
||||
<!-- 뷰어 본문 (+ 절 목차 rail) -->
|
||||
<div class="flex-1 flex min-h-0">
|
||||
{#if showRail}
|
||||
<aside class="hidden lg:block w-[176px] shrink-0 overflow-y-auto border-r border-default p-2 bg-sidebar">
|
||||
<SectionOutline sections={outlineSections} onJump={jumpTo} {activeKey} />
|
||||
</aside>
|
||||
{/if}
|
||||
<div class="flex-1 overflow-auto min-h-0" bind:this={scrollEl}>
|
||||
{#if loading}
|
||||
<div class="flex items-center justify-center h-full"><p class="text-sm text-dim">로딩 중...</p></div>
|
||||
{:else if fullDoc}
|
||||
@@ -181,14 +275,15 @@
|
||||
mdStatus={fullDoc.md_status}
|
||||
mdExtractionError={fullDoc.md_extraction_error}
|
||||
mdExtractionQuality={fullDoc.md_extraction_quality}
|
||||
anchorMap={anchorMapExtracted}
|
||||
extractedText={fullDoc.extracted_text || rawMarkdown}
|
||||
class={PROSE}
|
||||
/>
|
||||
</div>
|
||||
{/if}
|
||||
{:else if viewerType === 'pdf'}
|
||||
<div class="p-4 flex flex-col h-full">
|
||||
<div class="mb-2 flex items-center gap-2 shrink-0">
|
||||
<div class="p-4">
|
||||
<div class="mb-2 flex items-center gap-2">
|
||||
<MarkdownStatusBadge mdStatus={fullDoc.md_status} mdExtractionError={fullDoc.md_extraction_error} mdExtractionQuality={fullDoc.md_extraction_quality} />
|
||||
{#if canShowMarkdown}
|
||||
<button onclick={() => (pdfViewMode = 'markdown')}
|
||||
@@ -198,20 +293,19 @@
|
||||
{/if}
|
||||
</div>
|
||||
{#if pdfViewMode === 'markdown' && canShowMarkdown}
|
||||
<div class="flex-1 overflow-auto">
|
||||
<MarkdownDoc
|
||||
documentId={fullDoc.id}
|
||||
mdContent={fullDoc.md_content}
|
||||
mdFrontmatter={fullDoc.md_frontmatter}
|
||||
mdStatus={fullDoc.md_status}
|
||||
mdExtractionError={fullDoc.md_extraction_error}
|
||||
mdExtractionQuality={fullDoc.md_extraction_quality}
|
||||
extractedText={fullDoc.extracted_text}
|
||||
class={PROSE}
|
||||
/>
|
||||
</div>
|
||||
<MarkdownDoc
|
||||
documentId={fullDoc.id}
|
||||
mdContent={fullDoc.md_content}
|
||||
mdFrontmatter={fullDoc.md_frontmatter}
|
||||
mdStatus={fullDoc.md_status}
|
||||
mdExtractionError={fullDoc.md_extraction_error}
|
||||
mdExtractionQuality={fullDoc.md_extraction_quality}
|
||||
anchorMap={anchorMapMd}
|
||||
extractedText={fullDoc.extracted_text}
|
||||
class={PROSE}
|
||||
/>
|
||||
{:else}
|
||||
<iframe src="/api/documents/{fullDoc.id}/file?token={getAccessToken()}" class="flex-1 w-full border-0 rounded" title={fullDoc.title}></iframe>
|
||||
<iframe src="/api/documents/{fullDoc.id}/file?token={getAccessToken()}" class="w-full h-[80vh] border-0 rounded" title={fullDoc.title}></iframe>
|
||||
{/if}
|
||||
</div>
|
||||
{:else if viewerType === 'hwp-markdown'}
|
||||
@@ -281,5 +375,6 @@
|
||||
<div class="flex items-center justify-center h-full"><p class="text-sm text-dim">미리보기를 지원하지 않는 형식입니다 ({fullDoc.file_format})</p></div>
|
||||
{/if}
|
||||
{/if}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -50,7 +50,9 @@
|
||||
}: Props = $props();
|
||||
|
||||
// 개요 anchor 주입: body 의 각 offset(내림차순)에 빈 <span id="sec-N"> 삽입(점프 타깃).
|
||||
// offset 은 buildAnchorMap 이 body 와 동일 문자열 기준으로 산출했어야 함(호출측 책임).
|
||||
// [C3 불변식] char_start(BE) 는 호출측이 넘긴 md_content(raw, untransformed)에 대한 UTF-16 offset 이다.
|
||||
// 이 함수는 그 동일 문자열을 'out' 으로 받아 trim/CRLF-normalize/replace 없이 slice 해야 한다 —
|
||||
// prop→out 사이 어떤 변환도 char_start 를 drift 시킨다. (현재 out = text(=body=mdContent prop) 무변환.)
|
||||
function spliceAnchors(text: string, map: Record<number, number> | null): string {
|
||||
if (!map) return text;
|
||||
const ents = Object.entries(map)
|
||||
|
||||
@@ -69,6 +69,20 @@ test('collapseWindows: 연속 동일 heading window 만 dedupe, 순서 유지',
|
||||
);
|
||||
});
|
||||
|
||||
test('[C2] collapseWindows: split-parent + window 들 → rail 1행, 대표=split-parent(char_start 보유)', () => {
|
||||
const input = [
|
||||
sec({ section_title: 'Article 5', heading_path: 'Article 5', node_type: 'chapter_split', is_leaf: false, char_start: 120 }),
|
||||
sec({ section_title: 'Article 5', heading_path: 'Article 5', node_type: 'window', is_leaf: true, char_start: null }),
|
||||
sec({ section_title: 'Article 5', heading_path: 'Article 5', node_type: 'window', is_leaf: true, char_start: null }),
|
||||
];
|
||||
const out = collapseWindows(input);
|
||||
assert.equal(out.length, 1, 'split-parent + 2 window → rail 1행');
|
||||
// 대표 = split-parent (char_start 보유) → jump 성립
|
||||
assert.equal(out[0].section.node_type, 'chapter_split');
|
||||
assert.equal(out[0].section.char_start, 120);
|
||||
assert.equal(out[0].fragmentCount, 2, 'window 조각 수 = 2 (split-parent 자신 제외)');
|
||||
});
|
||||
|
||||
test('groupOrFlat: 적은 그룹 + 낮은 기타% → group (5140-류)', () => {
|
||||
// 3 top segment × 4 = 12절, window 없음 → group_count 3, 기타 0%
|
||||
const sections: DocumentSection[] = [];
|
||||
|
||||
@@ -12,8 +12,10 @@ export interface DocumentSection {
|
||||
section_title: string | null;
|
||||
heading_path: string | null;
|
||||
level: number | null;
|
||||
node_type: string | null; // 'window' | 'section_split' | null
|
||||
node_type: string | null; // 'window' | 'chapter_split' | 'clause_split' | 'section_split' | null
|
||||
is_leaf: boolean;
|
||||
/** md_content 내 heading offset(UTF-16). jump-target 만 값, window-child/preamble/Path A = null (Path B). */
|
||||
char_start?: number | null;
|
||||
section_type: string | null;
|
||||
summary: string | null;
|
||||
confidence: number | null;
|
||||
@@ -87,32 +89,38 @@ export function pathSegments(hp: string | null | undefined): string[] {
|
||||
.filter(Boolean);
|
||||
}
|
||||
|
||||
/** 그룹 키: window/section_split(인공 조각) 또는 path 없음/깨짐 → OTHER. */
|
||||
/** 그룹 키: window/%_split(인공 조각·windowed split-parent) 또는 path 없음/깨짐 → OTHER. */
|
||||
function topSegment(s: DocumentSection): string {
|
||||
if (s.node_type === 'window' || s.node_type === 'section_split') return OTHER;
|
||||
if (s.node_type === 'window' || !!s.node_type?.endsWith('_split')) return OTHER;
|
||||
const segs = pathSegments(s.heading_path);
|
||||
return segs.length === 0 ? OTHER : segs[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* 서버 chunk_index 순서를 유지한 채(정렬 변경 금지), 연속된 동일 cleaned heading_path 의
|
||||
* node_type='window' 절을 1 항목으로 dedupe. 대표 = 첫 조각(요약 사용), fragmentCount 누적.
|
||||
* node_type='window' 절을 1 항목으로 dedupe. fragmentCount = window 조각 수.
|
||||
*
|
||||
* [C2] g4-t2 가 split-parent(%_split, char_start 보유)를 그 window child 들보다 먼저(낮은 chunk_index)
|
||||
* 노출하므로, 후속 window child 를 직전 split-parent(또는 legacy window 대표)에 흡수해 rail 1행으로 만든다.
|
||||
* merged row 의 대표 section = split-parent 여야 jump(anchorMap[split-parent char_start])가 성립한다 —
|
||||
* window-child(char_start NULL, anchorMap 부재)가 대표면 windowed section 이 점프 안 됨.
|
||||
* fragmentCount: split-parent 대표는 0 에서 시작(자신은 조각 아님) + 흡수 child 수 = 실제 조각 수;
|
||||
* legacy window 대표는 1 에서 시작(자신이 첫 조각).
|
||||
*/
|
||||
export function collapseWindows(sections: DocumentSection[]): OutlineItem[] {
|
||||
const out: OutlineItem[] = [];
|
||||
for (const s of sections) {
|
||||
const prev = out[out.length - 1];
|
||||
const h = cleanHeading(s.heading_path);
|
||||
if (
|
||||
s.node_type === 'window' &&
|
||||
const prevAbsorbs =
|
||||
prev &&
|
||||
prev.section.node_type === 'window' &&
|
||||
(prev.section.node_type === 'window' || !!prev.section.node_type?.endsWith('_split')) &&
|
||||
h !== '' &&
|
||||
cleanHeading(prev.section.heading_path) === h
|
||||
) {
|
||||
prev.fragmentCount += 1;
|
||||
cleanHeading(prev.section.heading_path) === h;
|
||||
if (s.node_type === 'window' && prevAbsorbs) {
|
||||
prev!.fragmentCount += 1; // window child 흡수 — 대표(split-parent 우선)는 그대로 유지
|
||||
} else {
|
||||
out.push({ section: s, fragmentCount: 1 });
|
||||
out.push({ section: s, fragmentCount: s.node_type?.endsWith('_split') ? 0 : 1 });
|
||||
}
|
||||
}
|
||||
return out;
|
||||
|
||||
@@ -69,8 +69,9 @@ export function buildAnchorMap(
|
||||
let matched = 0;
|
||||
|
||||
for (const s of sections) {
|
||||
// window/section_split 조각은 자체 heading 없음(부모 제목 상속) → 건너뜀.
|
||||
if (s.node_type === 'window' || s.node_type === 'section_split') continue;
|
||||
// window 조각 + %_split parent(chapter_split/clause_split/section_split)는 string-match 대상 아님 →
|
||||
// 건너뜀. (split-parent jump 은 Path B 의 BE char_start 로만 성립; Path A 폴백선 windowed 절 무점프=무회귀.)
|
||||
if (s.node_type === 'window' || s.node_type?.endsWith('_split')) continue;
|
||||
let nt = norm(s.section_title);
|
||||
if (!nt && s.heading_path) {
|
||||
const last = s.heading_path.split('>').pop();
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
// resolveAnchorMap 회귀 테스트 (플랜 ds-outline-anchor-b5 g5-t1 / NEW-5 / B4 / C1).
|
||||
// 실행: node --test src/lib/utils/resolveAnchorMap.test.ts
|
||||
import { test } from 'node:test';
|
||||
import assert from 'node:assert/strict';
|
||||
import { resolveAnchorMap, isJumpTargetCandidate } from './resolveAnchorMap.ts';
|
||||
import { type DocumentSection } from './headingPath.ts';
|
||||
|
||||
let _id = 0;
|
||||
function sec(p: Partial<DocumentSection>): DocumentSection {
|
||||
return {
|
||||
chunk_id: ++_id,
|
||||
section_title: null,
|
||||
heading_path: null,
|
||||
level: null,
|
||||
node_type: null,
|
||||
is_leaf: true,
|
||||
char_start: null,
|
||||
section_type: null,
|
||||
summary: null,
|
||||
confidence: null,
|
||||
...p,
|
||||
};
|
||||
}
|
||||
|
||||
const LONG = 'x'.repeat(500);
|
||||
|
||||
test('trustBE=false → 무조건 string-match 폴백(fellBack=true)', () => {
|
||||
const md = '# Alpha\nbody\n# Beta\nx';
|
||||
const secs = [sec({ section_title: 'Alpha', char_start: 999 }), sec({ section_title: 'Beta', char_start: 999 })];
|
||||
const r = resolveAnchorMap(md, secs, { trustBE: false });
|
||||
assert.equal(r.fellBack, true);
|
||||
// char_start(999) 무시하고 string-match offset 사용
|
||||
assert.ok(Object.values(r.anchors).every((o) => o < 50));
|
||||
});
|
||||
|
||||
test('trustBE=true + 모든 jump-target candidate char_start 보유 → BE 채택(fellBack=false)', () => {
|
||||
const secs = [
|
||||
sec({ section_title: 'A', char_start: 5, is_leaf: true }),
|
||||
sec({ section_title: 'B', char_start: 42, is_leaf: true }),
|
||||
];
|
||||
const r = resolveAnchorMap(LONG, secs, { trustBE: true });
|
||||
assert.equal(r.fellBack, false);
|
||||
assert.equal(r.anchors[secs[0].chunk_id], 5);
|
||||
assert.equal(r.anchors[secs[1].chunk_id], 42);
|
||||
assert.equal(r.matched, 2);
|
||||
});
|
||||
|
||||
test('[NEW-5] windowed doc — window-child char_start NULL 이 폴백을 유발하지 않음(split-parent BE 사용)', () => {
|
||||
const secs = [
|
||||
sec({ section_title: 'Big', heading_path: 'Big', node_type: 'chapter_split', is_leaf: false, char_start: 10 }),
|
||||
sec({ section_title: 'Big', heading_path: 'Big', node_type: 'window', is_leaf: true, char_start: null }),
|
||||
sec({ section_title: 'Big', heading_path: 'Big', node_type: 'window', is_leaf: true, char_start: null }),
|
||||
];
|
||||
const r = resolveAnchorMap(LONG, secs, { trustBE: true });
|
||||
// window-child NULL 은 candidate 가 아니므로 트리거 안 됨 → BE 사용, split-parent 점프 보존
|
||||
assert.equal(r.fellBack, false, 'window-child NULL 이 whole-doc 폴백을 유발하면 안 됨(NEW-5)');
|
||||
assert.equal(r.anchors[secs[0].chunk_id], 10, 'split-parent char_start 가 BE 맵에 있어야 함');
|
||||
// window-child 는 anchor 없음
|
||||
assert.equal(r.anchors[secs[1].chunk_id], undefined);
|
||||
});
|
||||
|
||||
test('[B4] non-PASS doc — jump-target candidate char_start NULL → string-match 폴백', () => {
|
||||
const md = '# Gamma\nbody text here\n# Delta\nmore';
|
||||
const secs = [
|
||||
sec({ section_title: 'Gamma', is_leaf: true, char_start: null }),
|
||||
sec({ section_title: 'Delta', is_leaf: true, char_start: null }),
|
||||
];
|
||||
const r = resolveAnchorMap(md, secs, { trustBE: true });
|
||||
assert.equal(r.fellBack, true, 'candidate char_start NULL 이면 폴백해야 함(BE-first not BE-only)');
|
||||
// string-match 로 실제 jump 산출(0 아님)
|
||||
assert.ok(r.matched >= 1, 'md-aligned doc 는 폴백 string-match 로 jump 비-0');
|
||||
});
|
||||
|
||||
test('char_start > splicedText.length → 그 anchor 만 비활성, 폴백 안 함', () => {
|
||||
const secs = [
|
||||
sec({ section_title: 'A', char_start: 3, is_leaf: true }),
|
||||
sec({ section_title: 'B', char_start: 100000, is_leaf: true }), // 범위 초과(truncated tail)
|
||||
];
|
||||
const short = 'hello world';
|
||||
const r = resolveAnchorMap(short, secs, { trustBE: true });
|
||||
assert.equal(r.fellBack, false, '범위 초과는 폴백 트리거 아님(candidate char_start NOT NULL)');
|
||||
assert.equal(r.anchors[secs[0].chunk_id], 3);
|
||||
assert.equal(r.anchors[secs[1].chunk_id], undefined, '초과 anchor 는 비활성');
|
||||
});
|
||||
|
||||
test('preamble(title 없음, is_leaf) char_start NULL 은 candidate 아님 → 폴백 유발 X', () => {
|
||||
const secs = [
|
||||
sec({ section_title: null, heading_path: null, is_leaf: true, char_start: null }), // preamble
|
||||
sec({ section_title: 'Real', is_leaf: true, char_start: 7 }),
|
||||
];
|
||||
const r = resolveAnchorMap(LONG, secs, { trustBE: true });
|
||||
assert.equal(isJumpTargetCandidate(secs[0]), false, 'preamble 은 candidate 아님');
|
||||
assert.equal(r.fellBack, false);
|
||||
assert.equal(r.anchors[secs[1].chunk_id], 7);
|
||||
});
|
||||
@@ -0,0 +1,82 @@
|
||||
// 개요(절 목차) → 본문 점프 anchor 산출 공유 헬퍼 (경로 B: BE char_start primary + string-match 폴백).
|
||||
//
|
||||
// render-site 가 md_content 를 splice 할 때(trustBE=true)는 BE 가 builder 단계에서 박은 char_start 를
|
||||
// 1순위로 쓰고, 비-md basis(3-pane extracted_text 등, trustBE=false)는 무조건 string-match(buildAnchorMap)로
|
||||
// 폴백한다. char_start 가 비어 있으면(non-PASS doc, 또는 multi-night 재처리 중 아직 미백필 PASS doc) BE-only
|
||||
// 가 아니라 string-match 로 graceful degrade 한다(B4: BE-first, NOT BE-only).
|
||||
//
|
||||
// ★ NEW-5 (must-not-miss): 폴백 트리거는 JUMP-TARGET-CANDIDATE 한정이다.
|
||||
// window-child(node_type='window')와 preamble(title 없음)은 char_start=NULL **BY DESIGN**(g2).
|
||||
// 트리거가 'NULL char_start 가 하나라도 있으면 whole-doc 폴백' 이면, window-child 를 항상 보유한 windowed
|
||||
// doc 은 매번 폴백 → split-parent char_start(windowed 절의 단일 jump target)를 영영 안 쓰고 →
|
||||
// buildAnchorMap 은 split-parent 를 skip → windowed 코어 절이 영원히 점프 안 됨 = 이 플랜이 겨냥한
|
||||
// 바로 그 절에서 Path A 0% 회귀. 따라서 트리거 분모 = jump-target-candidate 뿐.
|
||||
|
||||
import { buildAnchorMap } from './outlineAnchors.ts';
|
||||
import { cleanHeading, type DocumentSection } from './headingPath.ts';
|
||||
|
||||
export interface ResolveResult {
|
||||
/** chunk_id → splicedText 내 char offset (UTF-16). */
|
||||
anchors: Record<number, number>;
|
||||
/** jump-target candidate 수(BE 경로) 또는 buildAnchorMap.total(폴백). */
|
||||
total: number;
|
||||
/** 실제 anchor 부여 수. */
|
||||
matched: number;
|
||||
/** string-match(buildAnchorMap) 로 폴백했는지 — V-rail/검증용. */
|
||||
fellBack: boolean;
|
||||
}
|
||||
|
||||
/** 표시 가능한 제목(또는 heading_path 말단)이 있는가. */
|
||||
function hasTitle(s: DocumentSection): boolean {
|
||||
if (cleanHeading(s.section_title)) return true;
|
||||
const last = (s.heading_path || '').split('>').pop() || '';
|
||||
return !!cleanHeading(last);
|
||||
}
|
||||
|
||||
/**
|
||||
* jump-target candidate = char_start 를 받아야 하는 절.
|
||||
* = (비-window leaf) OR (%_split parent), 그리고 제목 보유.
|
||||
* window-child(node_type='window')·preamble(제목 없음)은 설계상 char_start NULL → candidate 아님(NEW-5).
|
||||
*/
|
||||
export function isJumpTargetCandidate(s: DocumentSection): boolean {
|
||||
const structural = (s.is_leaf && s.node_type !== 'window') || !!s.node_type?.endsWith('_split');
|
||||
return structural && hasTitle(s);
|
||||
}
|
||||
|
||||
export function resolveAnchorMap(
|
||||
splicedText: string | null | undefined,
|
||||
sections: DocumentSection[] | null | undefined,
|
||||
opts: { trustBE: boolean },
|
||||
): ResolveResult {
|
||||
const secs = sections ?? [];
|
||||
|
||||
// basis 불일치(extracted_text 3-pane 등) → 무조건 string-match.
|
||||
if (!opts.trustBE) {
|
||||
const r = buildAnchorMap(splicedText, secs);
|
||||
return { ...r, fellBack: true };
|
||||
}
|
||||
|
||||
// [B4 + NEW-5] BE-first: jump-target candidate 가 비었거나, candidate 중 char_start NULL 이 있으면 폴백.
|
||||
// window-child/preamble NULL 은 candidate 가 아니라 트리거에 안 들어간다.
|
||||
const candidates = secs.filter(isJumpTargetCandidate);
|
||||
const beUnusable = candidates.length === 0 || candidates.some((s) => s.char_start == null);
|
||||
if (beUnusable) {
|
||||
const r = buildAnchorMap(splicedText, secs);
|
||||
return { ...r, fellBack: true };
|
||||
}
|
||||
|
||||
// BE char_start 채택 (C1: window/null/no-title 제외 = candidate 집합과 동일).
|
||||
const anchors: Record<number, number> = {};
|
||||
const limit = (splicedText ?? '').length;
|
||||
let matched = 0;
|
||||
for (const s of candidates) {
|
||||
const cs = s.char_start as number;
|
||||
// char_start<=splicedText.length 가드(MarkdownDoc.svelte:58). 초과 = FE serve-truncate tail →
|
||||
// 그 anchor 만 비활성(폴백 안 함 — string-match 도 truncated tail 은 못 찾음).
|
||||
if (Number.isFinite(cs) && cs >= 0 && cs <= limit) {
|
||||
anchors[s.chunk_id] = cs;
|
||||
matched++;
|
||||
}
|
||||
}
|
||||
return { anchors, total: candidates.length, matched, fellBack: false };
|
||||
}
|
||||
@@ -7,7 +7,7 @@
|
||||
import { goto } from '$app/navigation';
|
||||
import { api, getAccessToken } from '$lib/api';
|
||||
import { isMdSuccess } from '$lib/utils/mdStatus';
|
||||
import { buildAnchorMap } from '$lib/utils/outlineAnchors';
|
||||
import { resolveAnchorMap } from '$lib/utils/resolveAnchorMap';
|
||||
import { addToast } from '$lib/stores/toast';
|
||||
import { marked } from 'marked';
|
||||
import DOMPurify from 'dompurify';
|
||||
@@ -164,11 +164,12 @@
|
||||
}
|
||||
});
|
||||
|
||||
// ── 개요 점프 (outlineAnchors, 경로 A) ──
|
||||
// anchorMap = md_content 의 각 절 heading offset. MarkdownDoc 가 <span id="sec-N"> 주입.
|
||||
// ── 개요 점프 (경로 B: BE char_start primary + string-match 폴백) ──
|
||||
// 이 사이트는 항상 md_content basis(canShowMarkdown && doc.md_content) → trustBE=true.
|
||||
// BE char_start 가 있으면 채택, 비면(non-PASS/미백필) resolveAnchorMap 내부에서 buildAnchorMap 로 폴백.
|
||||
let anchorMap = $derived(
|
||||
hasSections && canShowMarkdown && doc?.md_content
|
||||
? buildAnchorMap(doc.md_content, sections).anchors
|
||||
? resolveAnchorMap(doc.md_content, sections, { trustBE: true }).anchors
|
||||
: {}
|
||||
);
|
||||
let activeKey = $state(null);
|
||||
@@ -411,11 +412,13 @@
|
||||
</span>
|
||||
</div>
|
||||
{#if doc.md_content || doc.extracted_text}
|
||||
<!-- article = 텍스트 네이티브(markdown 변환 비대상). md_status='skipped' 라도
|
||||
"Markdown 제외" badge 를 띄우지 않도록 mdStatus 미전달(badge 는 mdStatus 로만 구동). -->
|
||||
<MarkdownDoc
|
||||
documentId={doc.id}
|
||||
mdContent={doc.md_content}
|
||||
mdFrontmatter={doc.md_frontmatter}
|
||||
mdStatus={doc.md_status}
|
||||
mdStatus={null}
|
||||
mdExtractionError={doc.md_extraction_error}
|
||||
mdExtractionQuality={doc.md_extraction_quality}
|
||||
extractedText={doc.extracted_text}
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
-- 318_document_chunks_char_start.sql
|
||||
-- 플랜 ds-outline-anchor-b5 (Path B, g1-t1): hier 절 → md_content 본문 점프용 offset 컬럼.
|
||||
--
|
||||
-- char_start = md_content 내 heading 라인 시작 offset, **UTF-16 code unit** 기준
|
||||
-- (FE outlineAnchors.ts:64 `off += raw.length + 1` / MarkdownDoc.svelte:63 `out.slice(off)` 와 동일 단위).
|
||||
-- NULL 허용 = (a) md_content 없음(legacy/news/Path A) (b) window-child(node_type='window') (c) preamble(title NULL).
|
||||
-- → jump-target(비-window leaf OR %_split parent)만 NOT NULL 을 받는다(BY DESIGN, B1/B3 완료마커 기준).
|
||||
--
|
||||
-- 두 backfill 경로 공통 prereq:
|
||||
-- - UPDATE-only path(g3-tU, hash_stable): 저장된 hier 행에 char_start 만 UPDATE (DELETE/CASCADE/재임베딩 0).
|
||||
-- - re-decompose path(g3-t2, hash_changed): persist INSERT 시 char_start 동봉.
|
||||
--
|
||||
-- 멱등: ADD COLUMN IF NOT EXISTS + init_db version-skip + pg_advisory_xact_lock. BEGIN/COMMIT 금지(단일 statement).
|
||||
|
||||
ALTER TABLE document_chunks ADD COLUMN IF NOT EXISTS char_start INTEGER NULL;
|
||||
@@ -0,0 +1,19 @@
|
||||
-- A-3 (plan crawl-24x7-1): 소스 레지스트리 증축 — additive only.
|
||||
-- fetch_method : rss / rss+page / sitemap+page / page / api / signal-only
|
||||
-- fulltext_policy : none(현행 유지) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
|
||||
-- auth_profile : NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터용 슬롯)
|
||||
-- poll_interval_minutes : 소스별 차등 폴링 (NULL=전역 6h 사이클)
|
||||
-- etag / last_modified : 조건부 GET 워터마크 — 받은 그대로 저장·재전송 (상태는 전부 DB, APScheduler in-process)
|
||||
-- feed_content_hash : CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행
|
||||
-- selector_override : 추출 실패 잦은 소스의 site-specific CSS selector (JSONB)
|
||||
-- parser_quirk : rdf / table-strip / gn-redirect 등 파서 특이 케이스
|
||||
ALTER TABLE news_sources
|
||||
ADD COLUMN IF NOT EXISTS fetch_method VARCHAR(20) NOT NULL DEFAULT 'rss',
|
||||
ADD COLUMN IF NOT EXISTS fulltext_policy VARCHAR(20) NOT NULL DEFAULT 'none',
|
||||
ADD COLUMN IF NOT EXISTS auth_profile VARCHAR(50),
|
||||
ADD COLUMN IF NOT EXISTS poll_interval_minutes INTEGER,
|
||||
ADD COLUMN IF NOT EXISTS etag TEXT,
|
||||
ADD COLUMN IF NOT EXISTS last_modified TEXT,
|
||||
ADD COLUMN IF NOT EXISTS feed_content_hash VARCHAR(64),
|
||||
ADD COLUMN IF NOT EXISTS selector_override JSONB,
|
||||
ADD COLUMN IF NOT EXISTS parser_quirk VARCHAR(30);
|
||||
@@ -0,0 +1,3 @@
|
||||
-- 0-5 (a) 확정 (plan crawl-24x7-1): 도메인 자료(안전/공학/철학) 채널 신설 — news 와 분리.
|
||||
-- 신규 값은 같은 트랜잭션 내 사용 금지 (PG 제약) — 본 배치의 다른 마이그레이션은 'crawl' 미사용.
|
||||
ALTER TYPE source_channel ADD VALUE IF NOT EXISTS 'crawl';
|
||||
@@ -0,0 +1,3 @@
|
||||
-- A-2 (plan crawl-24x7-1): RSS 요약 → 기사 페이지 fetch → 4-tier 본문 승격 stage.
|
||||
-- fulltext_policy='page' 소스의 기사에만 news_collector 가 enqueue.
|
||||
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'fulltext';
|
||||
@@ -0,0 +1,19 @@
|
||||
-- A-5 (plan crawl-24x7-1): 소스 건강 — 소스별 실패 격리 기록 + circuit breaker.
|
||||
-- 한 소스가 죽어도 나머지 영향 0. silent skip 누적 방지의 가시성 기반 (A-8 패널이 읽음).
|
||||
-- circuit_state: closed(정상) / open(연속 실패로 지수 backoff 중) / disabled(M회 초과, 수동 복구 대상)
|
||||
-- empty_streak : 200 인데 entries 0 인 연속 fetch 횟수 (피드 부패 감시 — 304/해시동일은 미집계)
|
||||
CREATE TABLE IF NOT EXISTS source_health (
|
||||
id SERIAL PRIMARY KEY,
|
||||
source_id INTEGER NOT NULL REFERENCES news_sources(id) ON DELETE CASCADE,
|
||||
consecutive_failures INTEGER NOT NULL DEFAULT 0,
|
||||
total_fetches BIGINT NOT NULL DEFAULT 0,
|
||||
total_failures BIGINT NOT NULL DEFAULT 0,
|
||||
last_success_at TIMESTAMPTZ,
|
||||
last_error TEXT,
|
||||
last_error_at TIMESTAMPTZ,
|
||||
last_fetch_items INTEGER,
|
||||
empty_streak INTEGER NOT NULL DEFAULT 0,
|
||||
circuit_state VARCHAR(10) NOT NULL DEFAULT 'closed',
|
||||
circuit_opened_at TIMESTAMPTZ,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
|
||||
);
|
||||
@@ -0,0 +1,2 @@
|
||||
-- A-5: source_health 는 news_sources 와 1:1 — upsert 기준 키.
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS uq_source_health_source_id ON source_health (source_id);
|
||||
@@ -0,0 +1,110 @@
|
||||
"""HWP(library) 백필 — 지정 PKM 폴더의 .hwp 를 content-hash dedup 후 일회성 ingest.
|
||||
|
||||
산업안전기사 등 외부 학습자료(category='library')를 코퍼스에 편입한다. file_watcher 의
|
||||
PKM 트랙 로직을 재사용하되 dedup 을 file_path 가 아닌 **file_hash** 기준으로 해서
|
||||
(a) Inbox 중복 (b) `_1`/`카피본` 사본을 1건으로 수렴시킨다(file_watcher 는 path dedup 이라
|
||||
동일내용 다른경로를 중복 ingest 함). 이후 파이프라인:
|
||||
extract(텍스트) → classify → embed/chunk(검색) → markdown(.hwp=pyhwp hwp5html + raster NAS 영속)
|
||||
|
||||
실행 (GPU 서버):
|
||||
# dry-run (기본) — 무엇이 ingest/skip 될지만 출력
|
||||
docker exec hyungi_document_server-fastapi-1 \
|
||||
python /app/scripts/backfill_hwp_library.py --subdir Knowledge/Engineering
|
||||
|
||||
# 실제 ingest
|
||||
docker exec hyungi_document_server-fastapi-1 \
|
||||
python /app/scripts/backfill_hwp_library.py --subdir Knowledge/Engineering --commit
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import select
|
||||
|
||||
from core.config import settings
|
||||
from core.database import async_session
|
||||
from core.utils import file_hash
|
||||
from models.document import Document
|
||||
from models.queue import enqueue_stage
|
||||
|
||||
|
||||
async def run(subdir: str, commit: bool) -> int:
|
||||
nas_root = Path(settings.nas_mount_path)
|
||||
scan_root = nas_root / "PKM" / subdir
|
||||
if not scan_root.exists():
|
||||
print(f"[backfill] scan_root 부재: {scan_root}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
files = sorted(
|
||||
p for p in scan_root.rglob("*") if p.is_file() and p.suffix.lower() == ".hwp"
|
||||
)
|
||||
print(f"[backfill] {scan_root} 하위 .hwp {len(files)}개 발견 / mode={'COMMIT' if commit else 'DRY-RUN'}")
|
||||
|
||||
ingested = skipped_existing = skipped_batchdup = 0
|
||||
seen_hashes: set[str] = set()
|
||||
|
||||
async with async_session() as session:
|
||||
for fp in files:
|
||||
rel_path = str(fp.relative_to(nas_root))
|
||||
fhash = file_hash(fp)
|
||||
|
||||
if fhash in seen_hashes:
|
||||
print(f" SKIP(batch-dup) {rel_path}")
|
||||
skipped_batchdup += 1
|
||||
continue
|
||||
seen_hashes.add(fhash)
|
||||
|
||||
# content-hash dedup (path 무관) — Inbox 중복 + _1/카피본 사본 흡수
|
||||
existing = (
|
||||
await session.execute(
|
||||
select(Document.id).where(Document.file_hash == fhash).limit(1)
|
||||
)
|
||||
).first()
|
||||
if existing:
|
||||
print(f" SKIP(exists id={existing[0]}) {rel_path}")
|
||||
skipped_existing += 1
|
||||
continue
|
||||
|
||||
ingested += 1
|
||||
if not commit:
|
||||
print(f" INGEST(dry) {rel_path}")
|
||||
continue
|
||||
|
||||
doc = Document(
|
||||
file_path=rel_path,
|
||||
file_hash=fhash,
|
||||
file_format="hwp",
|
||||
file_size=fp.stat().st_size,
|
||||
file_type="immutable",
|
||||
title=fp.stem,
|
||||
source_channel="drive_sync",
|
||||
category="library",
|
||||
needs_conversion=False,
|
||||
)
|
||||
session.add(doc)
|
||||
await session.flush()
|
||||
await enqueue_stage(session, doc.id, "extract")
|
||||
print(f" INGEST id={doc.id} {rel_path}")
|
||||
|
||||
if commit:
|
||||
await session.commit()
|
||||
|
||||
print(
|
||||
f"[backfill] done — ingest={ingested} "
|
||||
f"skip_existing={skipped_existing} skip_batchdup={skipped_batchdup}"
|
||||
)
|
||||
return 0
|
||||
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser(description=__doc__)
|
||||
ap.add_argument("--subdir", default="Knowledge/Engineering", help="PKM 하위 스캔 폴더")
|
||||
ap.add_argument("--commit", action="store_true", help="실제 ingest (없으면 dry-run)")
|
||||
args = ap.parse_args()
|
||||
return asyncio.run(run(args.subdir, args.commit))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -0,0 +1,203 @@
|
||||
"""hier 개요 keep-better 게이트 + g-measure 엔진 (플랜 ds-outline-anchor-b5 g6-t1 / gm-t1).
|
||||
|
||||
READ-ONLY dry-run. doc 별로:
|
||||
(A) 현 저장 hier 절제목 (source_type='hier_section', char_start IS NULL = extracted_text 산)
|
||||
(B) build_hier_tree(md_content) 절제목 (= 새 g2 builder: split('\n')+UTF-16+fence skip)
|
||||
를 비교해 산출:
|
||||
- verdict {B_better, A_better, equivalent} (+ junk-heading 검출 → A_better 보호)
|
||||
- B_jumptarget_count (build 후 jump-target node 수) — B3 게이트 입력
|
||||
- hash_stable 판정 — UPDATE-only(g3-tU) vs re-decompose(g3-t2) 라우팅:
|
||||
* hash_stable_strict = build(md) 가 저장 hier hash 를 position-by-position 100% 재현
|
||||
(= 런타임 g3-tU 가 UPDATE-only 로 처리할 정확한 집합; demote 안 함)
|
||||
* hash_stable_99 = >=99% 재현 (원 MEASURE2 분류 기준 — 비교용)
|
||||
- dup_title_count / has_fence (measure3 budget note: fence 보유 doc 은 새 builder 에서 hash_changed flip 가능)
|
||||
- REFINED PASS = (verdict B>=A) AND (B_jumptarget>=1)
|
||||
|
||||
★ gm-t1 재확인(이 빌드의 유일 잔여 측정): g2 builder 코딩 후 1회 실행 → REFINED PASS 중
|
||||
hash_changed(=re-decompose) count 가 ~230 인지 확인(코드펜스-skip 으로 32 중 ≤2 flip → 최대 ~232 수용).
|
||||
|
||||
실행 (GPU 서버, 컨테이너):
|
||||
docker compose exec -T fastapi python /app/scripts/hier_outline_quality_gate.py run
|
||||
docker compose exec -T fastapi python /app/scripts/hier_outline_quality_gate.py run --json /tmp/measure.json
|
||||
docker compose exec -T fastapi python /app/scripts/hier_outline_quality_gate.py run --doc 5140,5209,5165 # 코어 spot-check
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from collections import Counter
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from services.hier_decomp.builder import build_hier_tree
|
||||
|
||||
|
||||
def _is_jump_target(node) -> bool:
|
||||
"""jump-target = 비-window leaf OR %_split parent + 제목 보유 (resolveAnchorMap / _JUMP_TARGET_PRED 일치)."""
|
||||
structural = (node.is_leaf and node.node_type != "window") or bool(
|
||||
node.node_type and node.node_type.endswith("_split"))
|
||||
return structural and bool(node.section_title)
|
||||
|
||||
|
||||
# cover/TOC org-이름 junk 검출 (g6-t1 high-recall): 회사명 접미사 + 거의-전부-대문자.
|
||||
_JUNK_ORG = re.compile(r"\b(INC\.?|LLC|L\.L\.C|CORP\.?|CO\.,?\s*LTD|CONSULTING|COMPANY|LIMITED|LTD\.?)\b", re.I)
|
||||
_FENCE_ANY = re.compile(r"(?m)^\s{0,3}(```|~~~)")
|
||||
|
||||
|
||||
def _looks_junk(title: str | None) -> bool:
|
||||
if not title:
|
||||
return False
|
||||
if _JUNK_ORG.search(title):
|
||||
return True
|
||||
letters = [c for c in title if c.isalpha()]
|
||||
if len(letters) >= 6 and sum(1 for c in letters if c.isupper()) / len(letters) >= 0.85:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def _make_engine():
|
||||
return create_async_engine(os.environ["DATABASE_URL"], pool_pre_ping=True)
|
||||
|
||||
|
||||
async def _measure_doc(session, doc_id):
|
||||
md = await session.scalar(text("SELECT md_content FROM documents WHERE id=:d"), {"d": doc_id})
|
||||
stored = (await session.execute(text("""
|
||||
SELECT chunk_index, chunk_content_hash, node_type, is_leaf, section_title, char_start
|
||||
FROM document_chunks WHERE doc_id=:d AND source_type='hier_section'
|
||||
ORDER BY chunk_index"""), {"d": doc_id})).mappings().all()
|
||||
if not stored:
|
||||
return None
|
||||
res = {"doc_id": doc_id, "n_stored": len(stored)}
|
||||
if not md or not md.strip():
|
||||
res.update({"md_null": True, "verdict": "A_better", "b_jumptarget": 0,
|
||||
"hash_stable_strict": False, "refined_pass": False})
|
||||
return res
|
||||
|
||||
nodes = build_hier_tree(md)
|
||||
jt = [n for n in nodes if _is_jump_target(n)]
|
||||
titles = [n.section_title for n in jt]
|
||||
res["n_build"] = len(nodes)
|
||||
res["b_jumptarget"] = len(jt)
|
||||
res["dup_title"] = len(titles) - len(set(titles))
|
||||
res["has_fence"] = bool(_FENCE_ANY.search(md))
|
||||
res["len_md"] = len(md)
|
||||
|
||||
# hash 비교 (position-aligned, runtime g3-tU 기준).
|
||||
if len(nodes) == len(stored):
|
||||
mism = sum(1 for n, s in zip(nodes, stored)
|
||||
if n.chunk_content_hash != s["chunk_content_hash"])
|
||||
frac = (len(stored) - mism) / len(stored)
|
||||
res["hash_match_frac"] = round(frac, 4)
|
||||
res["hash_stable_strict"] = (mism == 0)
|
||||
res["hash_stable_99"] = (frac >= 0.99)
|
||||
else:
|
||||
res["hash_match_frac"] = 0.0
|
||||
res["hash_stable_strict"] = False
|
||||
res["hash_stable_99"] = False
|
||||
|
||||
stored_titles = {s["section_title"] for s in stored if s["section_title"]}
|
||||
res["junk_b"] = any(_looks_junk(n.section_title) and n.section_title not in stored_titles for n in nodes)
|
||||
|
||||
# verdict 휴리스틱 (high-recall junk 보호 + absent-structure → A_better).
|
||||
# MEASURE2 가 canonical 분포를 이미 박제 — 이 verdict 는 재현/감사용. 애매(notes:ambiguous)는 PASS 미차단.
|
||||
# ★ apples-to-apples: 양쪽 모두 JUMP-TARGET 수로 비교(stored leaf 전수 X — window-child 가 n_a 를 부풀려
|
||||
# windowed doc 을 거짓 A_better 로 떨구는 bias 제거). stored jump-target = (비-window leaf OR %_split) + 제목.
|
||||
def _stored_is_jt(s):
|
||||
st = (s["is_leaf"] and s["node_type"] != "window") or bool(
|
||||
s["node_type"] and s["node_type"].endswith("_split"))
|
||||
return st and bool(s["section_title"])
|
||||
n_a = sum(1 for s in stored if _stored_is_jt(s))
|
||||
res["a_jumptarget"] = n_a
|
||||
n_b = res["b_jumptarget"]
|
||||
if n_b == 0:
|
||||
res["verdict"] = "A_better" # B 개요 없음(빈 jump-target)
|
||||
elif res["junk_b"]:
|
||||
res["verdict"] = "A_better" # B 가 cover junk 도입
|
||||
elif n_b >= max(1, n_a * 0.7):
|
||||
res["verdict"] = "B_better" if n_b > n_a else "equivalent"
|
||||
else:
|
||||
res["verdict"] = "A_better" # B 가 구조 상실(5209 absent-class)
|
||||
res["notes"] = "absent_or_degraded"
|
||||
|
||||
res["refined_pass"] = res["verdict"] in ("B_better", "equivalent") and n_b >= 1
|
||||
return res
|
||||
|
||||
|
||||
async def cmd_run(args):
|
||||
doc_ids = [int(x) for x in args.doc.split(",") if x.strip()] if args.doc else None
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
try:
|
||||
async with sm() as session:
|
||||
if doc_ids is None:
|
||||
doc_ids = [r[0] for r in (await session.execute(text(
|
||||
"SELECT DISTINCT doc_id FROM document_chunks WHERE source_type='hier_section' ORDER BY doc_id"))).all()]
|
||||
results = []
|
||||
for d in doc_ids:
|
||||
r = await _measure_doc(session, d)
|
||||
if r is not None:
|
||||
results.append(r)
|
||||
finally:
|
||||
await engine.dispose()
|
||||
|
||||
total = len(results)
|
||||
md_null = [r for r in results if r.get("md_null")]
|
||||
measured = [r for r in results if not r.get("md_null")]
|
||||
passes = [r for r in measured if r.get("refined_pass")]
|
||||
pass_jt0 = [r for r in measured if r["verdict"] in ("B_better", "equivalent") and r["b_jumptarget"] == 0]
|
||||
hash_stable = [r for r in passes if r.get("hash_stable_strict")]
|
||||
hash_stable_99 = [r for r in passes if r.get("hash_stable_99")]
|
||||
hash_changed = [r for r in passes if not r.get("hash_stable_strict")]
|
||||
verdict_dist = Counter(r["verdict"] for r in measured)
|
||||
dup_among_stable = [r for r in hash_stable if r.get("dup_title", 0) > 0]
|
||||
fence_among_stable = [r for r in hash_stable if r.get("has_fence")]
|
||||
|
||||
print("=" * 64)
|
||||
print(f"hier doc 측정: {total} (md_null {len(md_null)}, measured {len(measured)})")
|
||||
print(f"verdict 분포: {dict(verdict_dist)}")
|
||||
print(f"B_jumptarget==0 (PASS-verdict 이나 빈 jump-target, B3 HOLD): {len(pass_jt0)}")
|
||||
print("-" * 64)
|
||||
print(f"REFINED PASS = (verdict B>=A) AND (B_jumptarget>=1): {len(passes)}")
|
||||
print(f" ├─ hash_stable (strict 100% position 재현 = g3-tU UPDATE-only): {len(hash_stable)}")
|
||||
print(f" │ dup_title>0: {len(dup_among_stable)} / has_fence: {len(fence_among_stable)}")
|
||||
print(f" │ (참고) hash_stable_99(원 MEASURE2 기준): {len(hash_stable_99)}")
|
||||
print(f" └─ hash_changed (re-decompose 대상, g3-t2 --reprocess): {len(hash_changed)} ← ★ '230' 재확인 수치")
|
||||
print("-" * 64)
|
||||
print(f" re-decompose --doc(B_jumptarget>=1) = {','.join(str(r['doc_id']) for r in hash_changed) or '(없음)'}")
|
||||
print(f" UPDATE-only --doc(hash_stable) = {','.join(str(r['doc_id']) for r in hash_stable) or '(없음)'}")
|
||||
if md_null:
|
||||
print(f" md_null(suspect, V4): {[r['doc_id'] for r in md_null]}")
|
||||
print("=" * 64)
|
||||
print("NOTE: '230' 은 hash_changed PASS 수치. 코드펜스-skip 으로 hash_stable 32 중 fence 보유분(measure3=2)이 "
|
||||
"hash_changed 로 flip 가능 → 230~232 수용(NEW-3 budget-only, 정확성은 g3-tU 런타임 100% VERIFY 가 보증).")
|
||||
|
||||
if args.json:
|
||||
with open(args.json, "w") as f:
|
||||
json.dump({"summary": {
|
||||
"total": total, "measured": len(measured), "refined_pass": len(passes),
|
||||
"hash_stable": len(hash_stable), "hash_changed": len(hash_changed),
|
||||
"b_jumptarget_0": len(pass_jt0), "md_null": [r["doc_id"] for r in md_null],
|
||||
"hash_changed_doc_ids": [r["doc_id"] for r in hash_changed],
|
||||
"hash_stable_doc_ids": [r["doc_id"] for r in hash_stable],
|
||||
}, "docs": results}, f, ensure_ascii=False, indent=2)
|
||||
print(f"[json] {args.json} 기록 ({len(results)} doc)")
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="hier 개요 keep-better 게이트 + g-measure (read-only)")
|
||||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||||
p = sub.add_parser("run", help="전체(또는 --doc) 측정 + 분포 출력")
|
||||
p.add_argument("--doc", default=None, help="comma-sep doc id (미지정=전 hier doc)")
|
||||
p.add_argument("--json", default=None, help="per-doc 결과 JSON 덤프 경로")
|
||||
args = ap.parse_args()
|
||||
asyncio.run({"run": cmd_run}[args.cmd](args))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -29,6 +29,7 @@ from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from ai.client import AIClient, parse_json_response, strip_thinking
|
||||
from core.config import settings
|
||||
from services.hier_decomp.builder import build_hier_tree
|
||||
from services.hier_decomp.persist import persist_hier_tree
|
||||
from services.search.llm_gate import Priority, acquire_mlx_gate
|
||||
|
||||
@@ -42,27 +43,48 @@ DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTUR
|
||||
BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단
|
||||
|
||||
|
||||
def _candidate_sql(allowlist, doc_ids=None):
|
||||
"""allowlist 있으면 그 domain 만, 없으면 EXCLUDE_DOMAINS(news) 제외 전부.
|
||||
doc_ids 명시 시 = 그 doc 만(크기 게이트 DOC_MIN_CHARS + domain 필터 우회 —
|
||||
구조화 소형 문서(법령 등) eval coverage 보정용. NOT EXISTS hier 멱등 가드는 유지).
|
||||
작은 doc 먼저 = 완료 doc 수 최대화 + 단일 mega-doc 예산 독식 방지."""
|
||||
# jump-target = 비-window leaf OR %_split parent (B1/B3 완료마커 + B_jumptarget 분모, 플랜 g3-t2).
|
||||
# 이 집합만 char_start 를 받는다(window-child/preamble 은 설계상 NULL).
|
||||
_JUMP_TARGET_PRED = r"((c.is_leaf AND c.node_type IS DISTINCT FROM 'window') OR c.node_type LIKE '%\_split' ESCAPE '\')"
|
||||
|
||||
|
||||
def _candidate_sql(allowlist, doc_ids=None, reprocess=False):
|
||||
"""body = d.md_content (g0-t1: hier 출처 md_content 영구확정 — extracted_text 폐기. char_start 가
|
||||
md_content offset 이라 FE splice basis 와 일치해야 하므로 분해 source 도 md_content 여야 함[F1]).
|
||||
|
||||
reprocess=False (additive): 아직 hier 없는 doc 만 신규 분해 (NOT EXISTS hier_section 멱등).
|
||||
reprocess=True (re-decompose): hier 는 있으나 jump-target char_start 가 아직 안 채워진 doc 재분해.
|
||||
[B1] 완료마커 = jump-target 중 char_start NOT NULL 행이 존재(=한 번 재분해되면 atomic 하게 전부 채워짐);
|
||||
window-child/preamble 은 설계상 NULL 이라 'all-leaf NOT NULL' 마커의 무한 trap 을 피한다.
|
||||
[B3] 빈 jump-target doc(B_jumptarget==0)은 NOT EXISTS 가 vacuous TRUE → 영구 재선택 trap →
|
||||
호출측이 --doc 을 REFINED PASS(B_jumptarget>=1) 로 제한해 차단(--reprocess 는 --doc 필수, REFUSE).
|
||||
doc_ids 명시 시 크기 게이트 우회. 작은 doc 먼저 = 완료 doc 수 최대화."""
|
||||
if doc_ids:
|
||||
cond, gate = "d.id = ANY(:doc_ids)", "" # 명시 doc = 크기 게이트 우회
|
||||
else:
|
||||
cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)"
|
||||
if allowlist else
|
||||
"lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)")
|
||||
gate = "AND length(d.extracted_text) > :minchars"
|
||||
gate = "AND length(d.md_content) > :minchars"
|
||||
if reprocess:
|
||||
marker = f"""
|
||||
AND EXISTS (SELECT 1 FROM document_chunks dc
|
||||
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')
|
||||
AND NOT EXISTS (SELECT 1 FROM document_chunks c
|
||||
WHERE c.doc_id = d.id AND c.source_type = 'hier_section'
|
||||
AND c.char_start IS NOT NULL AND {_JUMP_TARGET_PRED})"""
|
||||
else:
|
||||
marker = """
|
||||
AND NOT EXISTS (SELECT 1 FROM document_chunks dc
|
||||
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')"""
|
||||
return text(f"""
|
||||
SELECT d.id AS doc_id, d.extracted_text AS body, d.ai_domain AS ai_domain
|
||||
SELECT d.id AS doc_id, d.md_content AS body, d.ai_domain AS ai_domain
|
||||
FROM documents d
|
||||
WHERE d.extracted_text IS NOT NULL
|
||||
WHERE d.md_content IS NOT NULL AND length(d.md_content) > 0
|
||||
{gate}
|
||||
AND {cond}
|
||||
AND NOT EXISTS (SELECT 1 FROM document_chunks dc
|
||||
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')
|
||||
ORDER BY length(d.extracted_text) ASC
|
||||
{marker}
|
||||
ORDER BY length(d.md_content) ASC
|
||||
""")
|
||||
|
||||
|
||||
@@ -77,10 +99,11 @@ def _candidate_params(allowlist, doc_ids=None):
|
||||
return p
|
||||
|
||||
|
||||
def _scope_label(allowlist, doc_ids=None):
|
||||
def _scope_label(allowlist, doc_ids=None, reprocess=False):
|
||||
tag = "RE-DECOMPOSE" if reprocess else "additive"
|
||||
if doc_ids:
|
||||
return f"doc-list={len(doc_ids)}건(크기게이트 우회)"
|
||||
return f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}"
|
||||
return f"doc-list={len(doc_ids)}건(크기게이트 우회, {tag})"
|
||||
return (f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}") + f" ({tag})"
|
||||
|
||||
# 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외)
|
||||
LEAF_SQL = text("""
|
||||
@@ -177,14 +200,19 @@ def _parse_doc_ids(args):
|
||||
async def cmd_dry_run(args):
|
||||
allowlist = args.domains.split(",") if args.domains else None
|
||||
doc_ids = _parse_doc_ids(args)
|
||||
reprocess = getattr(args, "reprocess", False)
|
||||
if reprocess and not doc_ids:
|
||||
print("REFUSE: --reprocess 는 --doc <list> 필수 (B3 빈 jump-target trap 차단 — REFINED PASS 리스트만)")
|
||||
sys.exit(2)
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
async with sm() as session:
|
||||
rows = (await session.execute(_candidate_sql(allowlist, doc_ids),
|
||||
rows = (await session.execute(_candidate_sql(allowlist, doc_ids, reprocess),
|
||||
_candidate_params(allowlist, doc_ids))).mappings().all()
|
||||
await engine.dispose()
|
||||
gate_lbl = "doc-list" if doc_ids else f">{DOC_MIN_CHARS}자"
|
||||
print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids)}, {gate_lbl}, 미분해)")
|
||||
state_lbl = "재분해 미완료(jump-target char_start 부재)" if reprocess else "미분해"
|
||||
print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids, reprocess)}, {gate_lbl}, {state_lbl})")
|
||||
if rows:
|
||||
lens = [len(r["body"]) for r in rows]
|
||||
print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}")
|
||||
@@ -196,11 +224,16 @@ async def cmd_dry_run(args):
|
||||
async def cmd_run(args):
|
||||
allowlist = args.domains.split(",") if args.domains else None
|
||||
doc_ids = _parse_doc_ids(args)
|
||||
reprocess = getattr(args, "reprocess", False)
|
||||
if reprocess and not doc_ids:
|
||||
_log("REFUSE: --reprocess 는 --doc <list> 필수 (B3 빈 jump-target trap 차단 — REFINED PASS 리스트만)")
|
||||
sys.exit(2)
|
||||
skip_analysis = getattr(args, "skip_analysis", False)
|
||||
deadline = _compute_deadline(args.deadline)
|
||||
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
|
||||
_log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
|
||||
f"{_scope_label(allowlist, doc_ids)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}")
|
||||
f"{_scope_label(allowlist, doc_ids, reprocess)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}"
|
||||
f"{' [RE-DECOMPOSE: 기존 hier DELETE→CASCADE chunk_section_analysis→재INSERT; 스냅샷 선행 필수]' if reprocess else ''}")
|
||||
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
@@ -219,7 +252,7 @@ async def cmd_run(args):
|
||||
run_start = time.time()
|
||||
try:
|
||||
async with sm() as session:
|
||||
cands = (await session.execute(_candidate_sql(allowlist, doc_ids),
|
||||
cands = (await session.execute(_candidate_sql(allowlist, doc_ids, reprocess),
|
||||
_candidate_params(allowlist, doc_ids))).mappings().all()
|
||||
_log(f"후보 doc {len(cands)} 선별. 시작.")
|
||||
|
||||
@@ -268,6 +301,101 @@ async def cmd_run(args):
|
||||
d = Counter(all_types)
|
||||
_log(f" section_type: {dict(d.most_common())} other={d.get('other',0)/len(all_types):.1%}")
|
||||
|
||||
# [g3-t3/g3-t4] post-run sweep: 처리한 doc 중 미분석 leaf 잔여 집계(반쪽상태/stall 검출).
|
||||
# GOAL(jump=char_start)/rail-summary(re-analyze) DECOUPLE — 잔여는 다음 실행이 LEAF_SQL 멱등으로 흡수.
|
||||
if doc_ids:
|
||||
try:
|
||||
async with sm() as session:
|
||||
pending = (await session.execute(text(f"""
|
||||
SELECT dc.doc_id, count(*) AS unanalyzed
|
||||
FROM document_chunks dc
|
||||
WHERE dc.doc_id = ANY(:ids) AND dc.source_type='hier_section' AND dc.is_leaf=true
|
||||
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
|
||||
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
|
||||
AND a.source_content_hash = dc.chunk_content_hash)
|
||||
GROUP BY dc.doc_id ORDER BY unanalyzed DESC"""),
|
||||
{"ids": doc_ids, "pv": PROMPT_VERSION})).mappings().all()
|
||||
if pending:
|
||||
tot = sum(r["unanalyzed"] for r in pending)
|
||||
_log(f" [sweep] 미분석 leaf 잔여: {tot} (doc {len(pending)}) — 다음 실행이 이어서 분석(멱등). "
|
||||
f"상위: {[(r['doc_id'], r['unanalyzed']) for r in pending[:5]]}")
|
||||
else:
|
||||
_log(" [sweep] 미분석 leaf 잔여 0 — 분석 수렴.")
|
||||
except Exception as exc:
|
||||
_log(f" [sweep] 잔여 집계 실패(무해): {type(exc).__name__}")
|
||||
|
||||
|
||||
def _is_jump_target(node) -> bool:
|
||||
"""jump-target = 비-window leaf OR %_split parent (builder HierNode 판정, _JUMP_TARGET_PRED 와 일치)."""
|
||||
return ((node.is_leaf and node.node_type != "window")
|
||||
or bool(node.node_type and node.node_type.endswith("_split")))
|
||||
|
||||
|
||||
async def cmd_update_char_start(args):
|
||||
"""[g3-tU] hash_stable doc 전용 비파괴 char_start UPDATE.
|
||||
|
||||
각 doc: build(md_content) → stored hier 행과 position-by-position(chunk_index 순) 정렬 →
|
||||
[NEW-1] jump-target 전수 100% hash 일치(ALL-OR-NOTHING) VERIFY. 단 한 자리라도 불일치 → DEMOTE.
|
||||
[NEW-2] hash 로 WHERE 하지 않음(동일-body 절 충돌 회피) — position 의 stored row PK(id)로 UPDATE.
|
||||
통과 doc: UPDATE document_chunks SET char_start (DELETE/CASCADE/embed/analyze 0, 가역).
|
||||
미달 doc: DEMOTE-LIST 로 emit → re-decompose 배치에 UNION(NEW-4). stdout 마지막에 DEMOTE_DOC_IDS= 출력.
|
||||
"""
|
||||
doc_ids = _parse_doc_ids(args)
|
||||
if not doc_ids:
|
||||
_log("REFUSE: update-char-start 는 --doc <list> 필수 (hash_stable 32 = gm-t1 산출)")
|
||||
sys.exit(2)
|
||||
engine = _make_engine()
|
||||
sm = async_sessionmaker(engine, expire_on_commit=False)
|
||||
updated, demoted, noop = [], [], []
|
||||
try:
|
||||
for doc_id in doc_ids:
|
||||
async with sm() as session:
|
||||
md = await session.scalar(text("SELECT md_content FROM documents WHERE id=:d"), {"d": doc_id})
|
||||
if not md or not md.strip():
|
||||
noop.append(doc_id)
|
||||
_log(f" doc={doc_id} md_content 없음 → no-op(suspect, V4)")
|
||||
continue
|
||||
nodes = build_hier_tree(md)
|
||||
stored = (await session.execute(text("""
|
||||
SELECT id, chunk_index, chunk_content_hash, node_type, is_leaf
|
||||
FROM document_chunks
|
||||
WHERE doc_id=:d AND source_type='hier_section'
|
||||
ORDER BY chunk_index"""), {"d": doc_id})).mappings().all()
|
||||
# [NEW-2] position 정렬: build node[i] ↔ stored[i] (chunk_index = base + idx 라 동일 순서).
|
||||
# 노드 수가 다르면 구조 변경 = hash_changed → DEMOTE.
|
||||
if len(nodes) != len(stored):
|
||||
demoted.append(doc_id)
|
||||
_log(f" doc={doc_id} 노드수 build {len(nodes)} ≠ stored {len(stored)} → DEMOTE(re-decompose)")
|
||||
continue
|
||||
# [NEW-1] 전 position hash 일치 VERIFY (position-alignment 가 ordering 도 검증).
|
||||
# 임의 position 불일치 → DEMOTE (jump-target 1% miss 도 whole-doc 폴백 회귀를 부르므로 100%).
|
||||
mismatch = next((i for i, (nd, sr) in enumerate(zip(nodes, stored))
|
||||
if nd.chunk_content_hash != sr["chunk_content_hash"]), None)
|
||||
if mismatch is not None:
|
||||
demoted.append(doc_id)
|
||||
_log(f" doc={doc_id} position {mismatch} hash 불일치 → DEMOTE(re-decompose, NEW-1)")
|
||||
continue
|
||||
# 통과 → jump-target 의 char_start 를 stored row PK 로 UPDATE.
|
||||
n_upd = 0
|
||||
for nd, sr in zip(nodes, stored):
|
||||
if _is_jump_target(nd) and nd.char_start is not None:
|
||||
await session.execute(
|
||||
text("UPDATE document_chunks SET char_start=:cs WHERE id=:id"),
|
||||
{"cs": nd.char_start, "id": sr["id"]})
|
||||
n_upd += 1
|
||||
await session.commit()
|
||||
updated.append(doc_id)
|
||||
_log(f" ✓ doc={doc_id} char_start UPDATE {n_upd} jump-target (VERIFY 100%, 비파괴)")
|
||||
finally:
|
||||
await engine.dispose()
|
||||
_log(f"=== update-char-start: updated={len(updated)} demoted={len(demoted)} noop={len(noop)} ===")
|
||||
if demoted:
|
||||
_log(f" DEMOTE(re-decompose 배치 합류, NEW-4): {demoted}")
|
||||
if noop:
|
||||
_log(f" NO-OP(md_content NULL suspect, V4): {noop}")
|
||||
# 기계가독: re-decompose --doc = (gm-t1 hash_changed 230) UNION (이 리스트)
|
||||
print("DEMOTE_DOC_IDS=" + ",".join(str(x) for x in demoted), flush=True)
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="오버나이트 hier 분해+절 분석 backfill (additive)")
|
||||
@@ -275,13 +403,20 @@ def main():
|
||||
p_dry = sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)")
|
||||
p_dry.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
|
||||
p_dry.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
|
||||
p_dry.add_argument("--reprocess", action="store_true", help="재분해 후보(기존 hier+jump-target char_start 부재) — --doc 필수")
|
||||
p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)")
|
||||
p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00 — 컨테이너 UTC 주의, 07:00 KST=22:00 UTC)")
|
||||
p_run.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
|
||||
p_run.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
|
||||
p_run.add_argument("--skip-analysis", action="store_true", help="절 분석(Mac mini) 생략, 분해+임베딩만 (retrieval go/no-go 측정 준비용)")
|
||||
p_run.add_argument("--reprocess", action="store_true",
|
||||
help="[g3-t2] RE-DECOMPOSE: 기존 hier DELETE→CASCADE→재INSERT (md_content 출처, char_start). "
|
||||
"--doc(REFINED PASS hash_changed∪demote) 필수 / 스냅샷 선행 필수")
|
||||
p_upd = sub.add_parser("update-char-start",
|
||||
help="[g3-tU] hash_stable doc 비파괴 char_start UPDATE (100% VERIFY, --doc 필수)")
|
||||
p_upd.add_argument("--doc", default=None, help="comma-sep doc id (gm-t1 hash_stable 32)")
|
||||
args = ap.parse_args()
|
||||
fn = {"dry-run": cmd_dry_run, "run": cmd_run}[args.cmd]
|
||||
fn = {"dry-run": cmd_dry_run, "run": cmd_run, "update-char-start": cmd_update_char_start}[args.cmd]
|
||||
asyncio.run(fn(args))
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
FROM python:3.12-slim
|
||||
|
||||
WORKDIR /srv
|
||||
COPY requirements.txt .
|
||||
RUN pip install --no-cache-dir -r requirements.txt
|
||||
COPY server.py .
|
||||
|
||||
EXPOSE 8765
|
||||
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
|
||||
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8765/health')"
|
||||
|
||||
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8765"]
|
||||
@@ -0,0 +1,3 @@
|
||||
fastapi>=0.111.0
|
||||
uvicorn>=0.30.0
|
||||
asyncpg>=0.29.0
|
||||
@@ -0,0 +1,202 @@
|
||||
"""crawl-health — 전 소스 헬스 패널 1차 (A-8, plan crawl-24x7-1)
|
||||
|
||||
읽기 전용 내부 운영 패널. 의존 = 기존 수집 상태(news_sources/source_health/documents/
|
||||
processing_queue SELECT 만) — 쓰기 0.
|
||||
|
||||
[1차] 소스별 last success / 수집 건수 추이(24h/7d) / 연속 실패 / circuit 상태 /
|
||||
빈 피드 streak + fulltext 승격/격하 통계 + 큐 백로그. 비-RSS 소스(C-2 sitemap 등)도
|
||||
같은 표면이 수용 (fetch_method 컬럼 표시 — '구독 소스 패널' 로 좁히지 않는 전 소스 일반화).
|
||||
[2차 범위 외] B-3 상태 계약 도착 시 세션 열 + [재로그인 시도] 버튼(enqueue 방식).
|
||||
|
||||
노출: 별도 바인딩만 — compose 가 Tailscale 인터페이스(100.110.63.63)에만 publish.
|
||||
vhost/경로 가드 방식 금지 (r4: 둘 다 '덜 깨짐' 속성 상실). 앱 레벨 인증 없음 =
|
||||
Tailscale 도달성만이 경계 (fab-server 선례).
|
||||
"""
|
||||
|
||||
import html
|
||||
import logging
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
import asyncpg
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import HTMLResponse, JSONResponse
|
||||
|
||||
logger = logging.getLogger("crawl_health")
|
||||
|
||||
DSN = os.environ.get("CRAWL_HEALTH_DSN", "")
|
||||
|
||||
_pool: asyncpg.Pool | None = None
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_app: FastAPI):
|
||||
global _pool
|
||||
_pool = await asyncpg.create_pool(DSN, min_size=1, max_size=3)
|
||||
yield
|
||||
await _pool.close()
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
|
||||
|
||||
async def _collect_data() -> dict:
|
||||
async with _pool.acquire() as conn:
|
||||
sources = await conn.fetch(
|
||||
"""
|
||||
SELECT s.id, s.name, s.country, s.enabled, s.feed_type, s.fetch_method,
|
||||
s.fulltext_policy, s.last_fetched_at,
|
||||
h.circuit_state, h.consecutive_failures, h.last_success_at,
|
||||
h.last_error, h.last_error_at, h.last_fetch_items, h.empty_streak,
|
||||
h.total_fetches, h.total_failures
|
||||
FROM news_sources s
|
||||
LEFT JOIN source_health h ON h.source_id = s.id
|
||||
ORDER BY s.enabled DESC, s.name
|
||||
"""
|
||||
)
|
||||
counts = await conn.fetch(
|
||||
"""
|
||||
SELECT s.id,
|
||||
count(d.id) FILTER (WHERE d.extracted_at > now() - interval '24 hours') AS items_24h,
|
||||
count(d.id) AS items_7d
|
||||
FROM news_sources s
|
||||
LEFT JOIN documents d
|
||||
ON d.source_channel = 'news'
|
||||
AND d.extracted_at > now() - interval '7 days'
|
||||
AND d.file_path LIKE 'news/' || s.name || '/%'
|
||||
GROUP BY s.id
|
||||
"""
|
||||
)
|
||||
queue = await conn.fetch(
|
||||
"""
|
||||
SELECT stage::text AS stage, status::text AS status, count(*) AS n,
|
||||
min(created_at) FILTER (WHERE status = 'pending') AS oldest_pending
|
||||
FROM processing_queue
|
||||
WHERE stage IN ('fulltext', 'summarize', 'embed', 'chunk')
|
||||
AND status IN ('pending', 'processing', 'failed')
|
||||
GROUP BY 1, 2
|
||||
ORDER BY 1, 2
|
||||
"""
|
||||
)
|
||||
fulltext = await conn.fetch(
|
||||
"""
|
||||
SELECT extract_meta -> 'fulltext' ->> 'status' AS status, count(*) AS n
|
||||
FROM documents
|
||||
WHERE source_channel = 'news' AND extract_meta ? 'fulltext'
|
||||
GROUP BY 1
|
||||
"""
|
||||
)
|
||||
count_map = {r["id"]: r for r in counts}
|
||||
return {
|
||||
"sources": [
|
||||
{**dict(r),
|
||||
"items_24h": count_map.get(r["id"], {}).get("items_24h", 0),
|
||||
"items_7d": count_map.get(r["id"], {}).get("items_7d", 0)}
|
||||
for r in sources
|
||||
],
|
||||
"queue": [dict(r) for r in queue],
|
||||
"fulltext": [dict(r) for r in fulltext],
|
||||
}
|
||||
|
||||
|
||||
@app.get("/health")
|
||||
async def health():
|
||||
"""Liveness — Docker healthcheck 용 (DB 미접근, 프로세스 생존만)."""
|
||||
return {"status": "ok", "service": "crawl-health"}
|
||||
|
||||
|
||||
@app.get("/api/health.json")
|
||||
async def api_health():
|
||||
data = await _collect_data()
|
||||
# asyncpg Record 의 datetime → isoformat 직렬화
|
||||
def _ser(v):
|
||||
return v.isoformat() if hasattr(v, "isoformat") else v
|
||||
return JSONResponse({
|
||||
k: [{kk: _ser(vv) for kk, vv in row.items()} for row in v]
|
||||
for k, v in data.items()
|
||||
})
|
||||
|
||||
|
||||
def _chip(state: str | None, enabled: bool) -> str:
|
||||
if not enabled:
|
||||
return '<span class="chip off">OFF</span>'
|
||||
if state == "disabled":
|
||||
return '<span class="chip err">DISABLED</span>'
|
||||
if state == "open":
|
||||
return '<span class="chip warn">OPEN</span>'
|
||||
return '<span class="chip ok">OK</span>'
|
||||
|
||||
|
||||
def _fmt_ts(v) -> str:
|
||||
return v.strftime("%m-%d %H:%M") if v else "-"
|
||||
|
||||
|
||||
@app.get("/", response_class=HTMLResponse)
|
||||
async def index():
|
||||
data = await _collect_data()
|
||||
rows = []
|
||||
for s in data["sources"]:
|
||||
err = html.escape((s.get("last_error") or "")[:80])
|
||||
warn_cls = ""
|
||||
if s["enabled"] and (s.get("consecutive_failures") or 0) >= 3:
|
||||
warn_cls = ' class="row-warn"'
|
||||
elif s["enabled"] and (s.get("empty_streak") or 0) >= 8:
|
||||
warn_cls = ' class="row-warn"'
|
||||
rows.append(
|
||||
f"<tr{warn_cls}>"
|
||||
f"<td>{html.escape(s['name'])}</td>"
|
||||
f"<td>{_chip(s.get('circuit_state'), s['enabled'])}</td>"
|
||||
f"<td>{html.escape(s.get('fetch_method') or 'rss')}</td>"
|
||||
f"<td>{html.escape(s.get('fulltext_policy') or 'none')}</td>"
|
||||
f"<td class='num'>{s['items_24h']}</td>"
|
||||
f"<td class='num'>{s['items_7d']}</td>"
|
||||
f"<td class='num'>{s.get('consecutive_failures') or 0}</td>"
|
||||
f"<td class='num'>{s.get('empty_streak') or 0}</td>"
|
||||
f"<td>{_fmt_ts(s.get('last_success_at'))}</td>"
|
||||
f"<td>{_fmt_ts(s.get('last_fetched_at'))}</td>"
|
||||
f"<td class='err-text'>{err}</td>"
|
||||
f"</tr>"
|
||||
)
|
||||
qrows = [
|
||||
f"<tr><td>{html.escape(q['stage'])}</td><td>{html.escape(q['status'])}</td>"
|
||||
f"<td class='num'>{q['n']}</td><td>{_fmt_ts(q.get('oldest_pending'))}</td></tr>"
|
||||
for q in data["queue"]
|
||||
]
|
||||
frows = [
|
||||
f"<tr><td>{html.escape(f['status'] or '-')}</td><td class='num'>{f['n']}</td></tr>"
|
||||
for f in data["fulltext"]
|
||||
]
|
||||
body = f"""<!DOCTYPE html>
|
||||
<html lang="ko"><head><meta charset="utf-8">
|
||||
<title>crawl-health — 전 소스 헬스 패널</title>
|
||||
<style>
|
||||
body {{ font-family: -apple-system, 'Apple SD Gothic Neo', sans-serif; background: #f5f1e8;
|
||||
color: #3d3a33; margin: 0; padding: 28px; }}
|
||||
h1 {{ font-size: 19px; margin: 0 0 4px; }} h2 {{ font-size: 14px; margin: 26px 0 8px; }}
|
||||
.sub {{ color: #8a8474; font-size: 12px; margin-bottom: 18px; }}
|
||||
table {{ border-collapse: collapse; width: 100%; background: #fffdf8; font-size: 12.5px; }}
|
||||
th, td {{ border: 1px solid #e3ddcd; padding: 5px 9px; text-align: left; }}
|
||||
th {{ background: #ece6d6; font-weight: 600; white-space: nowrap; }}
|
||||
td.num {{ text-align: right; font-variant-numeric: tabular-nums; }}
|
||||
td.err-text {{ color: #9a4a3a; font-size: 11.5px; max-width: 320px; }}
|
||||
tr.row-warn td {{ background: #fbf0e4; }}
|
||||
.chip {{ display: inline-block; padding: 1px 8px; border-radius: 9px; font-size: 11px; font-weight: 600; }}
|
||||
.chip.ok {{ background: #dce8d4; color: #3c5a2e; }}
|
||||
.chip.warn {{ background: #f3e0b8; color: #7a5a14; }}
|
||||
.chip.err {{ background: #eecfc6; color: #8a2f1d; }}
|
||||
.chip.off {{ background: #e3ddcd; color: #6e6859; }}
|
||||
</style></head><body>
|
||||
<h1>crawl-health — 전 소스 헬스 패널</h1>
|
||||
<div class="sub">A-8 1차 (피드 수집 헬스) · 내부 전용 (Tailscale 바인딩) · 새로고침 = 실시간 조회</div>
|
||||
<h2>소스 ({len(rows)})</h2>
|
||||
<table><tr><th>소스</th><th>circuit</th><th>fetch</th><th>fulltext</th><th>24h</th><th>7d</th>
|
||||
<th>연속실패</th><th>빈피드</th><th>last success</th><th>last fetch</th><th>last error</th></tr>
|
||||
{''.join(rows)}</table>
|
||||
<h2>처리 큐 (fulltext / summarize / embed / chunk)</h2>
|
||||
<table><tr><th>stage</th><th>status</th><th>건수</th><th>oldest pending</th></tr>
|
||||
{''.join(qrows) or '<tr><td colspan="4">백로그 없음</td></tr>'}</table>
|
||||
<h2>fulltext 승격 누적</h2>
|
||||
<table><tr><th>status</th><th>건수</th></tr>
|
||||
{''.join(frows) or '<tr><td colspan="2">기록 없음 (파일럿 전환 전)</td></tr>'}</table>
|
||||
</body></html>"""
|
||||
return HTMLResponse(body)
|
||||
+109
-28
@@ -1,12 +1,18 @@
|
||||
"""marker-service — POST /convert: PDF → markdown + 추출 이미지 base64.
|
||||
|
||||
Phase 1B (2026-05-01) — 텍스트만 응답, 이미지 폐기.
|
||||
Phase 1B.5 (본 변경) — `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
|
||||
Phase 1B.5 — `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
|
||||
없는 stateless 변환기 유지 (fastapi 가 NAS persist 담당).
|
||||
D-1 (plan crawl-24x7-1, 2026-06-10) — idle-unload 운영 전환:
|
||||
MARKER_PRELOAD=0 : startup warmup 끔 (첫 /convert 시 lazy load)
|
||||
MARKER_IDLE_UNLOAD_MINUTES : N분 유휴 시 모델 해제 (0=비활성, 기존 동작)
|
||||
/ready 는 idle(미적재)에서도 200 — fastapi 의 depends_on service_healthy 가
|
||||
lazy 모드에서 영구 미기동으로 굳는 것 방지. 503 은 warmup_failed 한정.
|
||||
|
||||
plan: ~/.claude/plans/piped-humming-crystal.md
|
||||
"""
|
||||
import base64
|
||||
import gc
|
||||
import hashlib
|
||||
import io
|
||||
import logging
|
||||
@@ -40,6 +46,12 @@ _warmup_done = False
|
||||
_warmup_error: str | None = None
|
||||
_warmup_lock = threading.Lock()
|
||||
|
||||
# D-1 idle-unload 상태 — 전이는 전부 _warmup_lock 아래
|
||||
_PRELOAD = os.getenv("MARKER_PRELOAD", "1") != "0"
|
||||
_IDLE_UNLOAD_MINUTES = int(os.getenv("MARKER_IDLE_UNLOAD_MINUTES", "0"))
|
||||
_inflight = 0
|
||||
_last_used = time.monotonic()
|
||||
|
||||
# 이미지 응답 cap. base64 응답 크기 폭주 방지. 사용자 PDF 풀 측정 (Phase 1D) 시
|
||||
# 가장 이미지 많은 문서가 ~30건 수준 → 200 은 안전 마진. 초과 시 truncate flag 응답.
|
||||
MAX_IMAGES_PER_DOC = int(os.getenv("MARKER_MAX_IMAGES_PER_DOC", "200"))
|
||||
@@ -68,11 +80,67 @@ def _ensure_warmup() -> None:
|
||||
raise
|
||||
|
||||
|
||||
def _acquire_models():
|
||||
"""warmup 보장 + inflight 진입을 원자적으로 — ensure 직후 reaper 가 해제하는 경합 차단."""
|
||||
global _inflight
|
||||
while True:
|
||||
_ensure_warmup()
|
||||
with _warmup_lock:
|
||||
if _warmup_done:
|
||||
_inflight += 1
|
||||
return
|
||||
# ensure 와 lock 재진입 사이에 unload 가 끼어든 희귀 경합 — 재시도
|
||||
|
||||
|
||||
def _release_models():
|
||||
global _inflight, _last_used
|
||||
with _warmup_lock:
|
||||
_inflight -= 1
|
||||
_last_used = time.monotonic()
|
||||
|
||||
|
||||
def _maybe_unload() -> None:
|
||||
"""유휴 시 모델 해제. 변환 중(inflight>0)이면 절대 해제하지 않는다.
|
||||
|
||||
split 변환의 배치 사이 간격은 초 단위 — N>=1분 임계면 배치 사이 해제 없음.
|
||||
"""
|
||||
global _models, _converter, _warmup_done
|
||||
with _warmup_lock:
|
||||
if not _warmup_done or _inflight > 0:
|
||||
return
|
||||
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
|
||||
return
|
||||
_models = None
|
||||
_converter = None
|
||||
_warmup_done = False
|
||||
gc.collect()
|
||||
try:
|
||||
import torch
|
||||
torch.cuda.empty_cache()
|
||||
except Exception:
|
||||
pass
|
||||
logger.info(f"[marker-service] idle-unload: 모델 해제 (유휴 {_IDLE_UNLOAD_MINUTES}분 초과)")
|
||||
|
||||
|
||||
async def _idle_reaper():
|
||||
import asyncio
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
try:
|
||||
_maybe_unload()
|
||||
except Exception:
|
||||
logger.exception("[marker-service] idle reaper 오류")
|
||||
|
||||
|
||||
@app.on_event("startup")
|
||||
async def startup():
|
||||
"""startup hook — async warmup 백그라운드. /ready 가 완료 여부 노출."""
|
||||
"""startup hook — warmup 은 MARKER_PRELOAD 게이트 (D-1: lazy 기본 전환은 compose 가)."""
|
||||
import asyncio
|
||||
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
|
||||
if _PRELOAD:
|
||||
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
|
||||
if _IDLE_UNLOAD_MINUTES > 0:
|
||||
asyncio.create_task(_idle_reaper())
|
||||
logger.info(f"[marker-service] idle-unload 활성: {_IDLE_UNLOAD_MINUTES}분")
|
||||
|
||||
|
||||
class ConvertRequest(BaseModel):
|
||||
@@ -111,7 +179,12 @@ def health():
|
||||
|
||||
@app.get("/ready")
|
||||
async def ready(response: Response):
|
||||
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출."""
|
||||
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출.
|
||||
|
||||
D-1: idle(미적재) = 200. 503 은 warmup_failed 한정 — lazy 모드에서 fastapi
|
||||
depends_on service_healthy 가 영구 미기동으로 굳지 않게. 배포 검증에서
|
||||
'status=ready' 단언하던 runbook 은 강제 warm 호출(/convert 1건)로 대체.
|
||||
"""
|
||||
if _warmup_error:
|
||||
response.status_code = 503
|
||||
return {
|
||||
@@ -121,31 +194,28 @@ async def ready(response: Response):
|
||||
"error": _warmup_error,
|
||||
}
|
||||
if not _warmup_done:
|
||||
response.status_code = 503
|
||||
return {
|
||||
"status": "warming_up",
|
||||
"status": "warming_up" if _PRELOAD else "idle",
|
||||
"engine": "marker",
|
||||
"engine_version": _engine_version,
|
||||
"models_loaded": False,
|
||||
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
|
||||
}
|
||||
return {
|
||||
"status": "ready",
|
||||
"engine": "marker",
|
||||
"engine_version": _engine_version,
|
||||
"models_loaded": True,
|
||||
"inflight": _inflight,
|
||||
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
|
||||
}
|
||||
|
||||
|
||||
@app.post("/convert", response_model=ConvertResponse)
|
||||
async def convert(req: ConvertRequest):
|
||||
_ensure_warmup()
|
||||
|
||||
p = Path(req.file_path)
|
||||
if not p.is_file():
|
||||
raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)})
|
||||
|
||||
start = time.monotonic()
|
||||
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
|
||||
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
|
||||
converter = _converter
|
||||
if req.start_page is not None and req.end_page is not None:
|
||||
if req.start_page < 1 or req.end_page < req.start_page:
|
||||
raise HTTPException(
|
||||
@@ -155,22 +225,33 @@ async def convert(req: ConvertRequest):
|
||||
"message": f"start_page={req.start_page} end_page={req.end_page}",
|
||||
},
|
||||
)
|
||||
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
|
||||
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
|
||||
try:
|
||||
rendered = converter(str(p))
|
||||
except Exception as exc:
|
||||
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail={
|
||||
"code": "conversion_failed",
|
||||
"message": f"{type(exc).__name__}: {exc}",
|
||||
},
|
||||
) from exc
|
||||
|
||||
md_text, _meta, raw_images = text_from_rendered(rendered)
|
||||
elapsed_ms = int((time.monotonic() - start) * 1000)
|
||||
# D-1: warmup 보장 + inflight 진입 원자화 — 변환 중 reaper 해제 차단. 해제는 finally.
|
||||
_acquire_models()
|
||||
try:
|
||||
start = time.monotonic()
|
||||
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
|
||||
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
|
||||
converter = _converter
|
||||
if req.start_page is not None and req.end_page is not None:
|
||||
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
|
||||
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
|
||||
try:
|
||||
rendered = converter(str(p))
|
||||
except Exception as exc:
|
||||
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
|
||||
raise HTTPException(
|
||||
status_code=422,
|
||||
detail={
|
||||
"code": "conversion_failed",
|
||||
"message": f"{type(exc).__name__}: {exc}",
|
||||
},
|
||||
) from exc
|
||||
|
||||
md_text, _meta, raw_images = text_from_rendered(rendered)
|
||||
elapsed_ms = int((time.monotonic() - start) * 1000)
|
||||
finally:
|
||||
_release_models()
|
||||
|
||||
images_payload, truncated = _serialize_images(raw_images, str(p))
|
||||
|
||||
|
||||
+83
-27
@@ -1,14 +1,23 @@
|
||||
"""STT 마이크로서비스 — faster-whisper (GPU) 기반 음성 전사.
|
||||
|
||||
filePath → {text, segments:[{start,end,text}]}.
|
||||
모델은 startup 에서 eager preload (Docker /ready healthcheck 가 모델 적재까지 검증).
|
||||
기본 모델 large-v3 (VRAM ~3GB, float16). 환경변수로 교체 가능.
|
||||
|
||||
환경변수 `STT_PRELOAD=0` 으로 lazy 로 강제 가능 (개발/테스트용).
|
||||
D-1 (plan crawl-24x7-1, 2026-06-10) — idle-unload 운영 전환:
|
||||
STT_PRELOAD=0 : startup eager preload 끔 (첫 요청 시 lazy load)
|
||||
STT_IDLE_UNLOAD_MINUTES: N분 유휴 시 모델 해제 (0=비활성, 기존 동작).
|
||||
faster-whisper=CTranslate2 라 torch 미설치 — 해제는
|
||||
참조 제거 + gc (CTranslate2 가 소멸 시 VRAM 반환).
|
||||
콜드로드 수초~수십 초는 호출측(stt_worker read=1800s)이 흡수. healthcheck 는
|
||||
cuda 가용성 기준 (compose) — 모델 적재는 더 이상 상시 상태가 아니다.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import gc
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import unicodedata
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
@@ -17,18 +26,26 @@ from fastapi import FastAPI
|
||||
|
||||
logger = logging.getLogger("stt")
|
||||
|
||||
_IDLE_UNLOAD_MINUTES = int(os.getenv("STT_IDLE_UNLOAD_MINUTES", "0"))
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(_app: FastAPI):
|
||||
# startup: 모델 eager preload 시도. 실패해도 프로세스는 살아 있고
|
||||
# /ready 가 false 로 남아 healthcheck 가 unhealthy 처리.
|
||||
# /ready 의 models_loaded 가 false 로 남는다.
|
||||
if os.getenv("STT_PRELOAD", "1") != "0":
|
||||
try:
|
||||
_load_model()
|
||||
logger.info("stt model preloaded: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
|
||||
except Exception as e:
|
||||
logger.exception("stt model preload failed: %s", e)
|
||||
reaper = None
|
||||
if _IDLE_UNLOAD_MINUTES > 0:
|
||||
reaper = asyncio.create_task(_idle_reaper())
|
||||
logger.info("stt idle-unload 활성: %d분", _IDLE_UNLOAD_MINUTES)
|
||||
yield
|
||||
if reaper:
|
||||
reaper.cancel()
|
||||
|
||||
|
||||
app = FastAPI(lifespan=lifespan)
|
||||
@@ -38,6 +55,11 @@ _MODEL_NAME = os.getenv("WHISPER_MODEL", "large-v3")
|
||||
_DEVICE = os.getenv("WHISPER_DEVICE", "cuda")
|
||||
_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "float16")
|
||||
|
||||
# load/unload/inflight 상태 전이는 전부 이 lock 아래 (cold 동시 요청 이중 로드 방지 포함)
|
||||
_model_lock = threading.Lock()
|
||||
_inflight = 0
|
||||
_last_used = time.monotonic()
|
||||
|
||||
|
||||
def _resolve_path(file_path: str) -> Path | None:
|
||||
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. OCR 서비스와 동일 패턴."""
|
||||
@@ -61,14 +83,38 @@ def _resolve_path(file_path: str) -> Path | None:
|
||||
|
||||
|
||||
def _load_model():
|
||||
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유."""
|
||||
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유. lock 으로 이중 로드 방지."""
|
||||
global _model
|
||||
if _model is not None:
|
||||
return _model
|
||||
from faster_whisper import WhisperModel
|
||||
with _model_lock:
|
||||
if _model is None:
|
||||
from faster_whisper import WhisperModel
|
||||
logger.info("stt model loading: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
|
||||
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
|
||||
return _model
|
||||
|
||||
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
|
||||
return _model
|
||||
|
||||
def _maybe_unload() -> None:
|
||||
"""유휴 시 모델 해제. 처리 중(inflight>0)이면 절대 해제하지 않는다."""
|
||||
global _model
|
||||
with _model_lock:
|
||||
if _model is None or _inflight > 0:
|
||||
return
|
||||
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
|
||||
return
|
||||
_model = None
|
||||
gc.collect()
|
||||
logger.info("stt idle-unload: whisper 모델 해제 (유휴 %d분 초과)", _IDLE_UNLOAD_MINUTES)
|
||||
|
||||
|
||||
async def _idle_reaper():
|
||||
while True:
|
||||
await asyncio.sleep(60)
|
||||
try:
|
||||
_maybe_unload()
|
||||
except Exception:
|
||||
logger.exception("stt idle reaper 오류")
|
||||
|
||||
|
||||
def _cuda_device_count() -> int:
|
||||
@@ -87,7 +133,7 @@ def health():
|
||||
|
||||
@app.get("/ready")
|
||||
def ready():
|
||||
"""Readiness — CUDA + 모델 상태. 배포 검증용."""
|
||||
"""Readiness — CUDA + 모델 상태. healthcheck 는 cuda 만 본다 (D-1 idle-unload)."""
|
||||
count = _cuda_device_count()
|
||||
cuda_ok = count > 0
|
||||
models_loaded = _model is not None
|
||||
@@ -98,6 +144,8 @@ def ready():
|
||||
"models_loaded": models_loaded,
|
||||
"model": _MODEL_NAME,
|
||||
"compute_type": _COMPUTE_TYPE,
|
||||
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
|
||||
"inflight": _inflight,
|
||||
}
|
||||
|
||||
|
||||
@@ -121,6 +169,7 @@ async def transcribe(body: dict):
|
||||
"duration": 1832.5
|
||||
}
|
||||
"""
|
||||
global _inflight, _last_used
|
||||
raw_path = body["filePath"]
|
||||
langs = body.get("langs")
|
||||
beam_size = int(body.get("beamSize", 5))
|
||||
@@ -129,28 +178,35 @@ async def transcribe(body: dict):
|
||||
if resolved is None:
|
||||
return {"error": f"파일 없음: {raw_path}", "text": "", "segments": []}
|
||||
|
||||
model = _load_model()
|
||||
with _model_lock:
|
||||
_inflight += 1
|
||||
try:
|
||||
model = _load_model()
|
||||
|
||||
language = None
|
||||
if isinstance(langs, list) and len(langs) == 1:
|
||||
language = langs[0]
|
||||
language = None
|
||||
if isinstance(langs, list) and len(langs) == 1:
|
||||
language = langs[0]
|
||||
|
||||
segments_iter, info = model.transcribe(
|
||||
str(resolved),
|
||||
beam_size=beam_size,
|
||||
language=language,
|
||||
vad_filter=True,
|
||||
)
|
||||
segments_iter, info = model.transcribe(
|
||||
str(resolved),
|
||||
beam_size=beam_size,
|
||||
language=language,
|
||||
vad_filter=True,
|
||||
)
|
||||
|
||||
segments = []
|
||||
parts = []
|
||||
for seg in segments_iter:
|
||||
segments.append({
|
||||
"start": round(float(seg.start), 2),
|
||||
"end": round(float(seg.end), 2),
|
||||
"text": seg.text.strip(),
|
||||
})
|
||||
parts.append(seg.text)
|
||||
segments = []
|
||||
parts = []
|
||||
for seg in segments_iter:
|
||||
segments.append({
|
||||
"start": round(float(seg.start), 2),
|
||||
"end": round(float(seg.end), 2),
|
||||
"text": seg.text.strip(),
|
||||
})
|
||||
parts.append(seg.text)
|
||||
finally:
|
||||
with _model_lock:
|
||||
_inflight -= 1
|
||||
_last_used = time.monotonic()
|
||||
|
||||
return {
|
||||
"text": " ".join(p.strip() for p in parts).strip(),
|
||||
|
||||
@@ -0,0 +1,95 @@
|
||||
"""builder.py char_start emit 단위테스트 (플랜 ds-outline-anchor-b5 g2 / g0-t2).
|
||||
|
||||
핵심 불변식:
|
||||
- char_start = FE outlineAnchors.ts 라인/offset 모델(split('\n') + UTF-16 code unit + 코드펜스)과 동일.
|
||||
- astral(BMP 밖) prefix 가 있어도 UTF-16 code unit offset 이어야 함 (#2 SILENT 단위버그 게이트).
|
||||
- window-child char_start=None, split-parent char_start=heading offset (B1/#1).
|
||||
- 코드펜스 내부 heading 미탐지 (O3).
|
||||
- 라인모델 변경이 node.text 를 바꾸지 않음(hash-neutral) — hash_stable doc 보존.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
|
||||
from app.services.hier_decomp.builder import build_hier_tree, coverage_stats, _utf16_units
|
||||
|
||||
|
||||
def _fe_offset_of_line(md: str, target_line: str) -> int | None:
|
||||
"""FE outlineAnchors.ts:55-65 재현 — char_start 가 이 값과 같아야 함."""
|
||||
off = 0
|
||||
for raw in md.split("\n"):
|
||||
if raw == target_line:
|
||||
return off
|
||||
off += len(raw.encode("utf-16-le")) // 2 + 1
|
||||
return None
|
||||
|
||||
|
||||
def _u16_slice(md: str, cs: int, n: int) -> str:
|
||||
return md.encode("utf-16-le")[2 * cs: 2 * (cs + n)].decode("utf-16-le")
|
||||
|
||||
|
||||
def test_char_start_matches_fe_offset_and_slices():
|
||||
md = "# Alpha\nbody alpha here\n\n## Beta\nbody beta\n# Gamma\nlast line"
|
||||
nodes = build_hier_tree(md, leaf_hard_max=100000)
|
||||
seen = 0
|
||||
for n in nodes:
|
||||
if n.char_start is None:
|
||||
continue
|
||||
seen += 1
|
||||
head = n.text.split("\n", 1)[0]
|
||||
assert n.char_start == _fe_offset_of_line(md, head), n.section_title
|
||||
assert _u16_slice(md, n.char_start, _utf16_units(head)) == head
|
||||
assert seen >= 2
|
||||
|
||||
|
||||
def test_astral_prefix_offset_is_utf16_not_codepoint():
|
||||
# 📄 = U+1F4C4 = 1 code point 이나 UTF-16 surrogate pair(2 code unit).
|
||||
md = "\U0001F4C4 manifest\n\n# Section One\nbody"
|
||||
nodes = build_hier_tree(md)
|
||||
sec = next(n for n in nodes if n.section_title == "Section One")
|
||||
fe = _fe_offset_of_line(md, "# Section One")
|
||||
assert sec.char_start == fe
|
||||
# UTF-16 슬라이스는 정확
|
||||
assert _u16_slice(md, sec.char_start, _utf16_units("# Section One")) == "# Section One"
|
||||
# code-point 슬라이스는 어긋나야 함(astral 때문에) — 단위버그가 있었다면 이게 통과했을 것
|
||||
assert md[sec.char_start: sec.char_start + len("# Section One")] != "# Section One"
|
||||
|
||||
|
||||
def test_fenced_heading_not_detected():
|
||||
md = "# Real\nintro\n```\n# Fake In Fence\n```\n# Real Two\nx"
|
||||
titles = [n.section_title for n in build_hier_tree(md) if n.section_title]
|
||||
assert "Fake In Fence" not in titles
|
||||
assert "Real" in titles and "Real Two" in titles
|
||||
|
||||
|
||||
def test_window_child_null_split_parent_has_offset():
|
||||
md = "# BigSection\n" + ("paragraph text here. " * 20 + "\n\n") * 60
|
||||
nodes = build_hier_tree(md, leaf_hard_max=5000, leaf_target_max=3000)
|
||||
sp = [n for n in nodes if n.node_type and n.node_type.endswith("_split")]
|
||||
wc = [n for n in nodes if n.node_type == "window"]
|
||||
assert sp and sp[0].char_start is not None
|
||||
assert wc and all(w.char_start is None for w in wc)
|
||||
|
||||
|
||||
def test_node_text_preserved_hash_neutral():
|
||||
# 라인모델(split vs splitlines) 변경에도 leaf 이어붙이면 원문 재구성 → hash 불변.
|
||||
md = "# A\nl1\nl2\n# B\nl3\n# C\nl4\n"
|
||||
nodes = build_hier_tree(md, leaf_hard_max=100000)
|
||||
recon = "".join(n.text for n in nodes if n.is_leaf or (n.node_type and n.node_type.endswith("_split")))
|
||||
assert recon == md
|
||||
|
||||
|
||||
def test_preamble_char_start_none():
|
||||
md = "intro paragraph with no heading\nmore intro\n# First\nbody"
|
||||
nodes = build_hier_tree(md, leaf_hard_max=100000)
|
||||
preamble = [n for n in nodes if n.section_title is None and n.level == 0]
|
||||
assert preamble and preamble[0].char_start is None
|
||||
|
||||
|
||||
def test_coverage_stats_char_start_telemetry():
|
||||
md = "# Alpha\nbody\n# Beta\nbody2"
|
||||
nodes = build_hier_tree(md, leaf_hard_max=100000)
|
||||
st = coverage_stats(md, nodes)
|
||||
assert st["char_start_total"] >= 2
|
||||
assert st["char_start_verified"] == st["char_start_total"] # 모두 O5 통과
|
||||
assert st["non_nfc"] == 0
|
||||
Reference in New Issue
Block a user