Compare commits

...

5 Commits

Author SHA1 Message Date
hyungi 53a30449e2 fix(news): crawl_politeness logger 를 setup_logger 로 정합화 — INFO 대기 로그 가시화
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:47:18 +09:00
hyungi ab668d7990 fix(news): crawl_raw 파일명 CHAR(64) 패딩 strip + politeness 대기 로그
- documents.file_hash 실 컬럼이 character(64) — 32자 해시가 공백 패딩되어
  gz 파일명에 공백 32개 포함 (실배포 1건 실측). _raw_html_path 에서 strip.
- _respect_domain_rate silent sleep 에 대기 로그 1줄 (검증 게이트·운영 가시성).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:43:29 +09:00
hyungi dcf99b377e fix(news): 적대 리뷰 반영 — reconcile auto-correlation·워터마크 검증 후 영속·수집 락
- fulltext_worker.reconcile_unresolved: EXISTS 서브쿼리 aliased(ProcessingQueue) —
  auto-correlation 이 FROM 전부 제거해 매 실행 InvalidRequestError (안전망 dead code).
  SQLAlchemy 2.0.50 컴파일 재현·수정 확인.
- news_collector._fetch_rss: ETag/Last-Modified/content-hash 영속을 bozo 파싱 검증
  뒤로 이동 — 부패 응답 워터마크 저장 시 영구 304-skip 차단.
- news_collector.run: 모듈 락으로 수동 collect vs 6h 스케줄 동시 실행 차단 —
  _get_or_create_health 동시 INSERT 의 uq_source_health_source_id 위반이
  사이클 전체를 죽이는 경합 봉쇄.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:34:46 +09:00
hyungi 3df0ca53ab feat(services): crawl-24x7 A-8 헬스 패널 + D-1 stt/marker idle-unload
A-8 1차: crawl-health 컨테이너(100.110.63.63:8765 Tailscale 바인딩 전용, 읽기 전용 SELECT, caddy 라우트 금지).
D-1 전제 작업: STT_PRELOAD=0+30분 유휴 해제(lock+inflight+reaper), marker MARKER_PRELOAD=0+idle-unload,
/ready idle=200(503=warmup_failed 한정 — fastapi depends_on 정합), healthcheck cuda 기준 전환.
2026-06-10 13:03:31 +09:00
hyungi 7cd8cfde0a feat(news): crawl-24x7 A그룹 — 레지스트리 증축·조건부 GET·fulltext 승격·politeness·source_health
A-3 migrations 319-323 (news_sources 9컬럼 + source_channel 'crawl' + process_stage 'fulltext' + source_health)
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송)+콘텐츠 해시 변경감지, A-4 politeness 코어(per-domain 직렬+robots+정직UA),
A-2+A-7 fulltext_worker(4-tier 재사용·NAS crawl_raw gzip 보존·격하 경로·03:40 reconcile 안전망),
A-5 circuit breaker(3/10 임계, enabled 미터치), A-6 포털 전재 2차 dedup(제목+3일, 12자 게이트).
기존 소스 fulltext_policy='none' 기본 = 무회귀. plan crawl-24x7-1, 예외 박제 crawl-24x7-exec1-20260610.md
2026-06-10 13:03:31 +09:00
21 changed files with 1215 additions and 112 deletions
+177
View File
@@ -0,0 +1,177 @@
"""크롤링 politeness 코어 (A-4, plan crawl-24x7-1)
개인 아카이빙 권장치를 그대로 박은 공용 fetch 계층:
- per-domain 동시성 1 (asyncio.Lock) + 같은 도메인 연속 요청 515초 지연 + jitter
- robots.txt 존중 (urllib.robotparser, 24h 캐시) — 비로그인 공개 크롤링 한정.
로그인 세션 fetch (B-3) 는 사용자 행위 성격이라 robots 대신 사람 속도가 기준.
- 정직 식별 UA + 연락처 (익명 크롤링 트랙. 로그인 세션은 브라우저 UA 유지 — B-3)
- 429 = Retry-After 존중 / 5xx = 재시도 가능 / 403 = 차단 신호 (호출측 circuit 연동)
도메인별 마지막 요청 시각 등 rate 상태는 in-process (영속 워터마크는 DB — news_sources).
SSRF 차단은 core.url_validator.validate_feed_url 재사용 (redirect target 재검증 포함).
"""
import asyncio
import random
import time
import urllib.robotparser
from urllib.parse import urljoin, urlparse
import httpx
from core.url_validator import validate_feed_url
from core.utils import setup_logger
# bare getLogger 는 root(WARNING) 상속이라 INFO 대기/차단 로그가 드랍됨 — 타 워커와 동일 설정
logger = setup_logger("crawl_politeness")
# 정직 식별 UA + 연락처 — 차단 전 연락 통로 (A-4)
CRAWL_UA = "HyungiPKM-Archiver/1.0 (personal archive; +mailto:hyun49196@gmail.com)"
# 같은 도메인 연속 요청 간격 (초) — 권장치 515s + jitter
_DOMAIN_DELAY_MIN = 5.0
_DOMAIN_DELAY_MAX = 15.0
_ROBOTS_CACHE_TTL = 24 * 3600 # 24h
_MAX_PAGE_BYTES = 5 * 1024 * 1024 # 피드 fetch 와 동일 5MB cap
_PAGE_TIMEOUT = 20.0
_MAX_REDIRECTS = 3
_HTML_CONTENT_TYPES = ("text/html", "application/xhtml+xml")
class CrawlFetchError(Exception):
"""일시 오류 (5xx / timeout / 네트워크) — 큐 재시도 대상."""
class CrawlBlocked(Exception):
"""차단 신호 (403 / 429 / robots disallow) — 재시도보다 backoff/circuit 대상."""
class CrawlSkip(Exception):
"""영구 비대상 (비-HTML / 크기 초과 / SSRF 차단 / 4xx) — 격하 처리 대상."""
# 도메인별 직렬화 상태 (in-process)
_domain_locks: dict[str, asyncio.Lock] = {}
_domain_last_request: dict[str, float] = {}
# host → (cached_at, RobotFileParser | None). None = robots 없음/4xx (전부 허용)
_robots_cache: dict[str, tuple[float, urllib.robotparser.RobotFileParser | None]] = {}
def _domain_of(url: str) -> str:
return (urlparse(url).hostname or "").lower()
def _get_lock(domain: str) -> asyncio.Lock:
if domain not in _domain_locks:
_domain_locks[domain] = asyncio.Lock()
return _domain_locks[domain]
async def _respect_domain_rate(domain: str) -> None:
"""같은 도메인 직전 요청에서 5–15초(jitter) 경과할 때까지 대기."""
last = _domain_last_request.get(domain)
if last is not None:
delay = random.uniform(_DOMAIN_DELAY_MIN, _DOMAIN_DELAY_MAX)
wait = last + delay - time.monotonic()
if wait > 0:
# silent sleep 금지 — politeness 동작 검증·운영 관찰 가시성
logger.info("[politeness] %s %.1fs 대기", domain, wait)
await asyncio.sleep(wait)
async def _fetch_robots(client: httpx.AsyncClient, scheme: str, host: str):
"""robots.txt 조회. 4xx/부재 = 전부 허용(None), 5xx/오류 = 보수적으로 이번 사이클 차단."""
robots_url = f"{scheme}://{host}/robots.txt"
try:
resp = await client.get(robots_url, headers={"User-Agent": CRAWL_UA})
except httpx.HTTPError as e:
raise CrawlFetchError(f"robots.txt 조회 실패: {host}: {e}") from e
if resp.status_code >= 500:
# 5xx 는 의도 불명 — 표준 관행대로 이번 사이클은 차단 취급
raise CrawlFetchError(f"robots.txt 5xx: {host}: {resp.status_code}")
if resp.status_code >= 400:
return None # robots 없음 = 전부 허용
rp = urllib.robotparser.RobotFileParser()
rp.parse(resp.text.splitlines())
return rp
async def _robots_allows(client: httpx.AsyncClient, url: str) -> bool:
parsed = urlparse(url)
host = (parsed.hostname or "").lower()
cached = _robots_cache.get(host)
if cached is None or time.monotonic() - cached[0] > _ROBOTS_CACHE_TTL:
rp = await _fetch_robots(client, parsed.scheme or "https", host)
_robots_cache[host] = (time.monotonic(), rp)
cached = _robots_cache[host]
rp = cached[1]
if rp is None:
return True
return rp.can_fetch(CRAWL_UA, url)
async def fetch_page(url: str, *, check_robots: bool = True) -> tuple[str, str]:
"""공개 페이지 1건 politeness fetch. (html_text, final_url) 반환.
- SSRF 검증 (redirect target 포함, news_collector 피드 fetch 와 동일 이중 검증)
- per-domain 동시성 1 + 515s jitter 지연
- 429: Retry-After 로그 후 CrawlBlocked / 403: CrawlBlocked / 그 외 4xx: CrawlSkip
- 5xx/timeout: CrawlFetchError (큐 재시도)
- 비-HTML content-type / 5MB 초과: CrawlSkip
"""
try:
validate_feed_url(url)
except ValueError as e:
raise CrawlSkip(f"URL 검증 실패: {e}") from e
domain = _domain_of(url)
async with _get_lock(domain):
await _respect_domain_rate(domain)
try:
async with httpx.AsyncClient(
timeout=_PAGE_TIMEOUT, follow_redirects=False,
headers={"User-Agent": CRAWL_UA},
) as client:
if check_robots and not await _robots_allows(client, url):
raise CrawlBlocked(f"robots.txt disallow: {url}")
resp = await client.get(url)
redirects = 0
while resp.is_redirect and redirects < _MAX_REDIRECTS:
location = urljoin(str(resp.request.url), resp.headers.get("location", ""))
try:
validate_feed_url(location)
except ValueError as e:
raise CrawlSkip(f"redirect target 차단: {e}") from e
# redirect 도 같은 도메인 연속 요청 — 간격은 lock 보유로 충분 (즉시 1회)
resp = await client.get(location)
redirects += 1
if resp.is_redirect:
raise CrawlSkip(f"redirect {_MAX_REDIRECTS}회 초과: {url}")
except httpx.TimeoutException as e:
raise CrawlFetchError(f"timeout: {url}") from e
except httpx.HTTPError as e:
raise CrawlFetchError(f"네트워크 오류: {url}: {e}") from e
finally:
_domain_last_request[domain] = time.monotonic()
if resp.status_code == 429:
retry_after = resp.headers.get("retry-after", "")
logger.warning("[politeness] 429 %s (Retry-After=%s)", domain, retry_after or "-")
raise CrawlBlocked(f"429 rate limited: {url} (Retry-After={retry_after or '-'})")
if resp.status_code == 403:
raise CrawlBlocked(f"403 forbidden: {url}")
if resp.status_code >= 500:
raise CrawlFetchError(f"{resp.status_code}: {url}")
if resp.status_code >= 400:
raise CrawlSkip(f"{resp.status_code}: {url}")
ct = resp.headers.get("content-type", "").lower()
if ct and not any(t in ct for t in _HTML_CONTENT_TYPES):
raise CrawlSkip(f"비-HTML content-type: {ct}: {url}")
if len(resp.content) > _MAX_PAGE_BYTES:
raise CrawlSkip(f"크기 초과: {len(resp.content)} bytes: {url}")
return resp.text, str(resp.request.url)
+4
View File
@@ -54,6 +54,7 @@ async def lifespan(app: FastAPI):
from workers.law_monitor import run as law_monitor_run
from workers.mailplus_archive import run as mailplus_run
from workers.news_collector import run as news_collector_run
from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run
from workers.queue_consumer import consume_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue
@@ -121,6 +122,9 @@ async def lifespan(app: FastAPI):
# 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source.
scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness")
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
# crawl-24x7 A-2 안전망: fulltext 영구 실패(3회 소진) 문서를 RSS 요약 기준으로
# 후속 enqueue (silent skip 누적 방지). 03:40 = dedup_reconcile(03:30) 직후 비충돌 슬롯.
scheduler.add_job(fulltext_reconcile_run, CronTrigger(hour=3, minute=40, timezone=KST), id="fulltext_reconcile")
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
+1 -1
View File
@@ -118,7 +118,7 @@ class Document(Base):
source_channel: Mapped[str | None] = mapped_column(
Enum("law_monitor", "devonagent", "email", "web_clip",
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
"voice", "hermes",
"voice", "hermes", "crawl",
name="source_channel")
)
# 외부 채널 (Hermes Discord 등) 의 channel/user/message_id/timestamp 메타.
+21 -1
View File
@@ -2,7 +2,8 @@
from datetime import datetime
from sqlalchemy import Boolean, DateTime, String, Text
from sqlalchemy import Boolean, DateTime, Integer, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
@@ -23,3 +24,22 @@ class NewsSource(Base):
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
# ── A-3 (plan crawl-24x7-1) 레지스트리 증축 — migration 319 ──
# fetch_method: rss / rss+page / sitemap+page / page / api / signal-only
fetch_method: Mapped[str] = mapped_column(String(20), default="rss")
# fulltext_policy: none(현행) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
fulltext_policy: Mapped[str] = mapped_column(String(20), default="none")
# NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터 슬롯)
auth_profile: Mapped[str | None] = mapped_column(String(50))
# 소스별 차등 폴링 (NULL=전역 6h 사이클)
poll_interval_minutes: Mapped[int | None] = mapped_column(Integer)
# 조건부 GET 워터마크 — 서버가 준 값 그대로 저장·재전송 (A-1)
etag: Mapped[str | None] = mapped_column(Text)
last_modified: Mapped[str | None] = mapped_column(Text)
# CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행 (A-1)
feed_content_hash: Mapped[str | None] = mapped_column(String(64))
# 추출 실패 잦은 소스의 site-specific CSS selector (A-2)
selector_override: Mapped[dict | None] = mapped_column(JSONB)
# rdf / table-strip / gn-redirect 등 파서 특이 케이스 (B-5)
parser_quirk: Mapped[str | None] = mapped_column(String(30))
+2 -1
View File
@@ -18,10 +18,11 @@ class ProcessingQueue(Base):
stage: Mapped[str] = mapped_column(
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary", "markdown",
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
name="process_stage",
create_type=False,
),
+36
View File
@@ -0,0 +1,36 @@
"""source_health 테이블 ORM (A-5, plan crawl-24x7-1)
news_sources 와 1:1. 소스별 fetch 성공/실패 기록 + circuit breaker 상태.
silent skip 누적 방지의 가시성 기반 — A-8 헬스 패널이 읽는다.
"""
from datetime import datetime
from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class SourceHealth(Base):
__tablename__ = "source_health"
id: Mapped[int] = mapped_column(primary_key=True)
source_id: Mapped[int] = mapped_column(
Integer, ForeignKey("news_sources.id", ondelete="CASCADE"), nullable=False
)
consecutive_failures: Mapped[int] = mapped_column(Integer, default=0)
total_fetches: Mapped[int] = mapped_column(BigInteger, default=0)
total_failures: Mapped[int] = mapped_column(BigInteger, default=0)
last_success_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_error: Mapped[str | None] = mapped_column(Text)
last_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_fetch_items: Mapped[int | None] = mapped_column(Integer)
# 200 인데 entries 0 인 연속 fetch 횟수 (304/해시동일은 미집계 — 피드 부패 신호 전용)
empty_streak: Mapped[int] = mapped_column(Integer, default=0)
# closed(정상) / open(연속 실패 → 지수 backoff) / disabled(임계 초과, 수동 복구 대상)
circuit_state: Mapped[str] = mapped_column(String(10), default="closed")
circuit_opened_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
+5 -2
View File
@@ -17,10 +17,13 @@ python-multipart>=0.0.9
jinja2>=3.1.0
feedparser>=6.0.0
pymupdf>=1.24.0
# Web/Blog ingest (devonagent 트랙) — HTML 본문 정화 4-tier fallback
trafilatura>=1.12.0
# Web/Blog ingest (devonagent 트랙) + 뉴스 fulltext 승격 (crawl-24x7 A-2) — 4-tier fallback.
# trafilatura 는 단일 메인테이너 리스크로 exact pin (A-2 결정).
trafilatura==2.1.0
readability-lxml>=0.8.1
markdownify>=0.13.1
# tier-4 (bs4) 가 직접 import — 전이 의존 가정 제거 (crawl-24x7 A-2)
beautifulsoup4>=4.12.0
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1).
# 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정.
markitdown[docx,xlsx,pptx]>=0.1.0
+226
View File
@@ -0,0 +1,226 @@
"""fulltext 승격 워커 (A-2 + A-7, plan crawl-24x7-1)
news_collector 가 fulltext_policy='page' 소스의 기사에 enqueue 한 'fulltext' stage 를 소비:
기사 페이지 politeness fetch (A-4) → 원본 HTML NAS gzip 보존 (A-7)
→ extract_worker 4-tier 재사용 (tier 2 sibling .md 는 디스크 원본이 없어 비적용)
→ extracted_text/md_content 승격 → summarize + (30일 게이트) embed/chunk enqueue.
실패 처리 (큐 어휘 = DB enum, 분기만 워커):
- 일시 오류 (5xx/timeout) : raise → 큐 재시도 (max_attempts 3)
- 차단/비대상 (403/429/robots/비HTML/추출부족): RSS 요약으로 격하(degrade) 후 완료
→ summarize/embed/chunk enqueue 보장 (기사 유실 0). 격하 사유는 extract_meta.fulltext 에 기록.
- 영구 실패 (3회 소진) : 야간 reconcile_unresolved() 가 summarize 안전망 enqueue
([[feedback_silent_skip_accumulation]] — 조건부 skip 이 영구 침묵으로 누적되지 않게).
승격 게이트: 전 tier 공통 본문 >= 200자 (devonagent 와 달리 tier 4 도 게이트 적용 —
페이월/오류 페이지의 nav 찌꺼기를 본문으로 승격하느니 RSS 요약 격하가 낫다).
"""
import gzip
import hashlib
import re
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import exists, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased
from core.config import settings
from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue, enqueue_stage
from workers.extract_worker import (
_WEB_MIN_BODY_LEN,
_extract_web_with_bs4,
_extract_web_with_readability,
_extract_web_with_trafilatura,
)
logger = setup_logger("fulltext_worker")
# 한국 기사 푸터 1층 후처리 (A-2) — 보수적으로 라인 단위만 제거
_FOOTER_PATTERNS = [
re.compile(r"^.{0,120}(무단\s*전재|무단\s*복제|재배포\s*금지|저작권자\s*[ⓒ©(]).*$", re.M),
re.compile(r"^[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\s*$", re.M), # 단독 이메일 라인
re.compile(r"^\s*\S{2,4}\s*기자\s*$", re.M), # 단독 '◯◯◯ 기자' 라인
]
def _strip_article_footer(body: str) -> str:
for pat in _FOOTER_PATTERNS:
body = pat.sub("", body)
return re.sub(r"\n{3,}", "\n\n", body).strip()
def _extract_body(html_text: str) -> tuple[str, str | None, str | None]:
"""(body, engine, engine_version). 전 tier >= 200자 게이트, 미달이면 ("", None, None)."""
body, ver = _extract_web_with_trafilatura(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "trafilatura", ver
body, ver = _extract_web_with_readability(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "readability", ver
body, ver = _extract_web_with_bs4(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "bs4_text", ver
return "", None, None
def _raw_html_path(source_id: int | None, file_hash: str, now: datetime) -> Path:
"""A-7 원본 보존 경로 — NAS 본진. 한글 디렉토리의 NFC/NFD 비대칭을 피해 source_id 사용.
file_hash 는 DB 컬럼이 character(64) 라 32자 해시가 공백 패딩되어 돌아옴 — strip 필수
(미적용 시 NAS 파일명에 공백 32개 = 쉘/rsync 함정).
"""
src_dir = f"src_{source_id}" if source_id is not None else "src_unknown"
return (
Path(settings.nas_mount_path) / "crawl_raw" / src_dir
/ now.strftime("%Y-%m") / f"{file_hash.strip()}.html.gz"
)
def _save_raw_html(path: Path, html_text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(path, "wb") as f:
f.write(html_text.encode("utf-8", errors="replace"))
async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None:
"""승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk."""
await enqueue_stage(session, doc.id, "summarize")
published_raw = (doc.extract_meta or {}).get("published_at")
days_old = 0
if published_raw:
try:
pub_dt = datetime.fromisoformat(published_raw)
days_old = (datetime.now(timezone.utc) - pub_dt).days
except ValueError:
days_old = 0 # 파싱 불가 = 신규 취급 (수집 시점 기본과 동일)
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _set_fulltext_meta(doc: Document, **fields) -> None:
"""extract_meta.fulltext 갱신 — JSONB 변경 감지를 위해 dict 재할당."""
meta = dict(doc.extract_meta or {})
meta["fulltext"] = {**meta.get("fulltext", {}), **fields}
doc.extract_meta = meta
async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None:
"""본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0)."""
_set_fulltext_meta(
doc, status="degraded", reason=reason[:300],
resolved_at=datetime.now(timezone.utc).isoformat(),
)
await _enqueue_downstream(session, doc)
logger.warning(f"[fulltext] doc={doc.id} 격하(RSS 요약 유지): {reason}")
async def process(document_id: int, session: AsyncSession) -> None:
"""기사 1건 풀텍스트 승격. queue_consumer 컨벤션 시그니처 (커밋은 consumer 가)."""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.edit_url:
await _degrade(session, doc, "edit_url 없음")
return
meta = doc.extract_meta or {}
source_id = meta.get("source_id")
try:
html_text, final_url = await fetch_page(doc.edit_url)
except (CrawlBlocked, CrawlSkip) as e:
await _degrade(session, doc, f"{type(e).__name__}: {e}")
return
except CrawlFetchError:
raise # 일시 오류 — 큐 재시도
now = datetime.now(timezone.utc)
# A-7: 원본 HTML 보존 (추출기 교체 시 전체 재추출 가능 상태 유지)
raw_path = _raw_html_path(source_id, doc.file_hash, now)
try:
_save_raw_html(raw_path, html_text)
raw_saved = True
except OSError as e:
# NAS 일시 장애 시 보존만 누락하고 승격은 진행 — 사유 기록 (silent 누락 회피)
raw_saved = False
logger.error(f"[fulltext] doc={doc.id} 원본 보존 실패 (승격은 진행): {e}")
body, engine, engine_ver = _extract_body(html_text)
if not engine:
await _degrade(session, doc, f"추출 실패 (전 tier < {_WEB_MIN_BODY_LEN}자)")
return
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
await _degrade(session, doc, "푸터 제거 후 본문 부족")
return
title = doc.title or ""
doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body
doc.extracted_at = now
doc.extractor_version = f"rss+page@{engine}"
doc.md_content = clean_body
doc.md_status = "success"
doc.md_extraction_engine = engine
doc.md_extraction_engine_version = engine_ver
doc.md_format_version = "1.0"
doc.md_generated_at = now
doc.md_source_hash = hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest()
doc.md_content_hash = hashlib.sha256(clean_body.encode("utf-8")).hexdigest()
doc.md_extraction_error = None # 수집 시점의 '변환 비대상' 마커 해제
doc.content_origin = "extracted"
doc.file_size = len(doc.extracted_text.encode())
_set_fulltext_meta(
doc, status="promoted", engine=engine,
raw_html_path=str(raw_path) if raw_saved else None,
final_url=final_url, body_chars=len(clean_body),
resolved_at=now.isoformat(),
)
await _enqueue_downstream(session, doc)
logger.info(
f"[fulltext/{engine}] doc={doc.id} {len(clean_body)}자 승격 "
f"(raw={'saved' if raw_saved else 'MISSING'})"
)
async def reconcile_unresolved() -> None:
"""안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌
뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치."""
async with async_session() as session:
# 외부 쿼리 FROM 에 ProcessingQueue 가 이미 있어 alias 없이는 auto-correlation 이
# 서브쿼리 FROM 을 전부 제거 → InvalidRequestError (queue_consumer.reset_stale_items 패턴)
pq = aliased(ProcessingQueue)
summarize_q = (
select(pq.id)
.where(
pq.document_id == Document.id,
pq.stage == "summarize",
)
)
result = await session.execute(
select(Document)
.join(ProcessingQueue, ProcessingQueue.document_id == Document.id)
.where(
ProcessingQueue.stage == "fulltext",
ProcessingQueue.status == "failed",
Document.source_channel == "news",
~exists(summarize_q),
)
.limit(200)
)
docs = result.scalars().unique().all()
for doc in docs:
_set_fulltext_meta(doc, status="failed_reconciled")
await _enqueue_downstream(session, doc)
if docs:
await session.commit()
logger.warning(f"[fulltext] reconcile: 영구 실패 {len(docs)}건 RSS 요약으로 후속 enqueue")
+251 -47
View File
@@ -1,8 +1,16 @@
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장
plan crawl-24x7-1 A그룹 (2026-06-10):
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지
A-2 fulltext_policy='page' 소스는 'fulltext' stage 로 본문 승격 위임
A-5 source_health 기록 + circuit breaker (소스별 실패 격리)
A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정)
"""
import asyncio
import hashlib
import re
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from html import unescape
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
@@ -10,11 +18,13 @@ import feedparser
import httpx
from sqlalchemy import select
from core.crawl_politeness import CRAWL_UA
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from models.source_health import SourceHealth
logger = setup_logger("news_collector")
@@ -38,18 +48,23 @@ CATEGORY_MAP = {
}
class FeedError(Exception):
"""소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록."""
def _normalize_category(raw: str) -> str:
"""카테고리 표준화"""
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
def _clean_html(text: str) -> str:
"""HTML 태그 제거 + 정제"""
def _clean_html(text: str, max_len: int | None = 1000) -> str:
"""HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용)."""
if not text:
return ""
text = re.sub(r"<[^>]+>", "", text)
text = unescape(text)
return text.strip()[:1000]
text = text.strip()
return text if max_len is None else text[:max_len]
# tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키
@@ -87,8 +102,104 @@ def _normalize_to_utc(dt) -> datetime:
return datetime.now(timezone.utc)
# ── A-5: circuit breaker 정책 ──
# 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h)
# 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상)
# news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리.
_CIRCUIT_OPEN_AFTER = 3
_CIRCUIT_DISABLE_AFTER = 10
_BACKOFF_BASE_HOURS = 6
_BACKOFF_CAP_HOURS = 48
_EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고
def _should_attempt(health: SourceHealth, now: datetime) -> bool:
"""circuit 상태에 따라 이번 사이클 fetch 여부 결정.
주의 (B-3 계약 ②, r5): 추후 relogin_requested 플래그 소비는 반드시 이
open-스킵 분기보다 *앞*에 두어야 한다 — open 이 스케줄 제외 형태가 되면
배치 경계가 안 와 플래그가 영원히 미소비(half-open 데드 버튼)가 된다.
"""
if health.circuit_state == "disabled":
return False
if health.circuit_state == "open" and health.last_error_at is not None:
over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0)
backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS)
if now - health.last_error_at < timedelta(hours=backoff_h):
return False
return True
def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None:
health.consecutive_failures = 0
health.total_fetches += 1
health.last_success_at = now
health.last_fetch_items = items
if health.circuit_state != "closed":
logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed")
health.circuit_state = "closed"
health.circuit_opened_at = None
# 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시)
if not_modified:
pass
elif items == 0:
health.empty_streak += 1
if health.empty_streak >= _EMPTY_STREAK_ALERT:
logger.error(
f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 "
f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)"
)
else:
health.empty_streak = 0
health.updated_at = now
def _record_failure(health: SourceHealth, error: str, now: datetime) -> None:
health.consecutive_failures += 1
health.total_fetches += 1
health.total_failures += 1
health.last_error = error[:500]
health.last_error_at = now
health.updated_at = now
cf = health.consecutive_failures
if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled":
health.circuit_state = "disabled"
logger.error(
f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED "
f"(수집 제외, A-8 패널에서 수동 복구 필요)"
)
elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed":
health.circuit_state = "open"
health.circuit_opened_at = now
logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open")
async def _get_or_create_health(session, source_id: int) -> SourceHealth:
result = await session.execute(
select(SourceHealth).where(SourceHealth.source_id == source_id)
)
health = result.scalars().first()
if health is None:
health = SourceHealth(source_id=source_id)
session.add(health)
await session.flush()
return health
# 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일
# 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT
# → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄.
_run_lock = asyncio.Lock()
async def run():
"""뉴스 수집 실행"""
async with _run_lock:
await _run_locked()
async def _run_locked():
now = datetime.now(timezone.utc)
async with async_session() as session:
result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True)
@@ -101,17 +212,23 @@ async def run():
total = 0
for source in sources:
health = await _get_or_create_health(session, source.id)
if not _should_attempt(health, now):
logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip")
continue
try:
if source.feed_type == "api":
count = await _fetch_api(session, source)
count, status = await _fetch_api(session, source)
else:
count = await _fetch_rss(session, source)
count, status = await _fetch_rss(session, source)
source.last_fetched_at = datetime.now(timezone.utc)
_record_success(health, count, status == "not_modified", now)
total += count
except Exception as e:
logger.error(f"[{source.name}] 수집 실패: {e}")
source.last_fetched_at = datetime.now(timezone.utc)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
@@ -122,8 +239,58 @@ ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
"application/xml", "text/xml")
async def _fetch_rss(session, source: NewsSource) -> int:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
async def _is_portal_duplicate(session, title: str) -> bool:
"""A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스.
보조 키 = 제목 + 최근 3일 (다른 소스/다른 URL 이므로 1차 키로 안 잡힘).
범용 제목 오탐 방지: 12자 미만 제목은 비적용. skip 은 전부 로그 (silent 누락 회피).
"""
if len(title) < 12:
return False
cutoff = datetime.now(timezone.utc) - timedelta(days=3)
dup = await session.execute(
select(Document.id).where(
Document.title == title,
Document.source_channel == "news",
Document.file_format == "article",
Document.extracted_at >= cutoff,
).limit(1)
)
return dup.scalars().first() is not None
async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None:
"""후속 단계 enqueue.
fulltext_policy='page' 소스는 'fulltext' stage 만 — summarize/embed/chunk 는
fulltext_worker 가 승격(또는 격하) 확정 후 enqueue (RSS 요약 선요약 → 풀텍스트
도착 시 summarize_worker 의 '이미 요약 있음 skip' 에 막히는 순서 함정 회피).
"""
if source.fulltext_policy == "page" and doc.edit_url:
await enqueue_stage(session, doc.id, "fulltext")
return
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict:
"""fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록)."""
return {
"source_id": source.id,
"source_name": source.name,
"published_at": pub_dt.isoformat(),
}
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일.
소스 단위 실패는 FeedError raise — run() 이 health 실패로 기록.
"""
from urllib.parse import urljoin
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
@@ -134,17 +301,24 @@ async def _fetch_rss(session, source: NewsSource) -> int:
# 순수 HTTP 소스인데 allowlist에 없으면 차단
if source.feed_url.startswith("http://") and not http_allowed:
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}")
return 0
raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}")
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
try:
validate_feed_url(source.feed_url, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] URL 검증 실패: {e}")
return 0
raise FeedError(f"URL 검증 실패: {e}") from e
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
# A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송
headers = {"User-Agent": CRAWL_UA}
if source.etag:
headers["If-None-Match"] = source.etag
if source.last_modified:
headers["If-Modified-Since"] = source.last_modified
async with httpx.AsyncClient(
timeout=10, follow_redirects=False, headers=headers
) as client:
resp = await client.get(source.feed_url)
# redirect 수동 처리 (최대 3회, 각 target 재검증)
@@ -156,29 +330,45 @@ async def _fetch_rss(session, source: NewsSource) -> int:
try:
validate_feed_url(location, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] redirect target 차단: {e}")
return 0
raise FeedError(f"redirect target 차단: {e}") from e
resp = await client.get(location)
redirects += 1
if resp.is_redirect:
logger.error(f"[{source.name}] redirect 3회 초과")
return 0
raise FeedError("redirect 3회 초과")
if resp.status_code == 304:
logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송")
return 0, "not_modified"
resp.raise_for_status()
if len(resp.content) > MAX_RESPONSE_SIZE:
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
return 0
raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes")
ct = resp.headers.get("content-type", "").lower()
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
return 0
raise FeedError(f"비정상 content-type: {ct}")
# A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상
# 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전
new_etag = resp.headers.get("etag")
new_last_modified = resp.headers.get("last-modified")
content_hash = hashlib.sha256(resp.content).hexdigest()
if source.feed_content_hash == content_hash:
logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip")
return 0, "not_modified"
feed = feedparser.parse(resp.text)
if feed.bozo and not feed.entries:
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
return 0
raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}")
# A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면
# 이후 304 로 영구 skip 되는 silent corruption 차단
if new_etag:
source.etag = new_etag
if new_last_modified:
source.last_modified = new_last_modified
source.feed_content_hash = content_hash
count = 0
for entry in feed.entries:
@@ -190,6 +380,18 @@ async def _fetch_rss(session, source: NewsSource) -> int:
if not summary:
summary = title
# A-6: feed-full 소스만 피드 본문을 전문으로 신뢰 (truncate·광고 삽입이 흔해
# 일반 소스의 summary/content:encoded 를 전문으로 오인 저장 금지)
body = summary
is_feed_full = False
if source.fulltext_policy == "feed-full":
content_list = entry.get("content") or []
raw_body = content_list[0].get("value", "") if content_list else ""
full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None)
if len(full_body) > len(summary):
body = full_body
is_feed_full = True
link = entry.get("link", "")
published = entry.get("published_parsed") or entry.get("updated_parsed")
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
@@ -209,6 +411,11 @@ async def _fetch_rss(session, source: NewsSource) -> int:
if existing.scalars().first():
continue
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(source.category or "")
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
@@ -216,15 +423,16 @@ async def _fetch_rss(session, source: NewsSource) -> int:
file_path=f"news/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version="rss",
extractor_version="rss-feed-full" if is_feed_full else "rss",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel="news",
@@ -235,30 +443,27 @@ async def _fetch_rss(session, source: NewsSource) -> int:
ai_domain="News",
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
# summarize + embed + chunk 등록 (classify 불필요)
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
# summarize + embed + chunk 등록 (classify 불필요).
# page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue.
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] RSS → {count}건 수집")
return count
return count, "ok"
async def _fetch_api(session, source: NewsSource) -> int:
async def _fetch_api(session, source: NewsSource) -> tuple[int, str]:
"""NYT API 수집 — 키 마스킹 + health degradation"""
import os
nyt_key = os.getenv("NYT_API_KEY", "")
if not nyt_key:
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
return 0
raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
try:
async with httpx.AsyncClient(timeout=10) as client:
@@ -270,12 +475,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
safe_url = str(e.request.url).split("?")[0]
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
return 0
raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
logger.error(f"NYT API 연결 실패: {safe_url}")
return 0
raise FeedError(f"NYT API 연결 실패: {safe_url}") from e
data = resp.json()
count = 0
@@ -309,6 +512,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(article.get("section", source.category or ""))
source_short = source.name.split(" ")[0]
@@ -334,17 +541,14 @@ async def _fetch_api(session, source: NewsSource) -> int:
ai_domain="News",
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count
return count, "ok"
+9 -2
View File
@@ -22,8 +22,11 @@ logger = setup_logger("queue_consumer")
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1}
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
@@ -35,7 +38,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize", "embed", "chunk",
"preview", "stt", "thumbnail", "deep_summary",
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
@@ -179,6 +182,7 @@ def _load_workers():
from workers.summarize_worker import process as summarize_process
from workers.thumbnail_worker import process as thumbnail_process
from workers.marker_worker import process as marker_process
from workers.fulltext_worker import process as fulltext_process
return {
"extract": extract_process,
@@ -195,6 +199,9 @@ def _load_workers():
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
"markdown": marker_process,
# crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은
# 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op).
"fulltext": fulltext_process,
}
+28 -3
View File
@@ -64,6 +64,11 @@ services:
environment:
- HF_HOME=/models/huggingface
- TORCH_HOME=/models/torch
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~3.5GB) 해제가 90% 봉투의 전제.
# /ready 는 idle 에서도 200 (fastapi depends_on service_healthy 유지).
# 롤백 = MARKER_PRELOAD=1 + MARKER_IDLE_UNLOAD_MINUTES=0.
- MARKER_PRELOAD=0
- MARKER_IDLE_UNLOAD_MINUTES=${MARKER_IDLE_UNLOAD_MINUTES:-30}
volumes:
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
- marker_models:/models
@@ -97,6 +102,11 @@ services:
- WHISPER_MODEL=${WHISPER_MODEL:-large-v3}
- WHISPER_DEVICE=${WHISPER_DEVICE:-cuda}
- WHISPER_COMPUTE_TYPE=${WHISPER_COMPUTE_TYPE:-float16}
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~4GB) 해제가 90% 봉투의 전제.
# 콜드로드 수초~수십 초는 배치 작업이라 무방 (stt_worker read=1800s 가 흡수).
# 롤백 = STT_PRELOAD=1 + STT_IDLE_UNLOAD_MINUTES=0.
- STT_PRELOAD=0
- STT_IDLE_UNLOAD_MINUTES=${STT_IDLE_UNLOAD_MINUTES:-30}
deploy:
resources:
reservations:
@@ -105,9 +115,9 @@ services:
count: 1
capabilities: [gpu]
healthcheck:
# /ready: CUDA 디바이스 + 모델 적재 둘 다 확인. ready=true 만 healthy 처리.
# /health 는 단순 liveness 라 모델 적재 상태도 healthy 로 잡혀 운영 신호로 부적합.
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('ready') else 1)"]
# D-1: idle-unload 도입으로 '모델 적재' 는 더 이상 상시 상태가 아님 — cuda 가용성만
# healthy 기준. 모델 적재 여부는 /ready 의 models_loaded 필드로 관측(정보성).
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('cuda') else 1)"]
interval: 30s
timeout: 10s
retries: 3
@@ -229,6 +239,21 @@ services:
- fastapi
restart: unless-stopped
# crawl-24x7 A-8 1차: 전 소스 헬스 패널 — 내부 전용 (읽기 전용 SELECT 만).
# '내부 전용' 성립 구현 = 별도 바인딩뿐 (r4 결정): Tailscale 인터페이스에만 publish.
# 기존 SvelteKit 라우트(vhost=Host 헤더 검사=앱 가드 환원)나 프록시 경로 차단(경로 가드
# 회귀)으로 옮기지 말 것. caddy/home-caddy 라우트 추가 금지. fastapi/postgres 바인딩 선례.
crawl-health:
build: ./services/crawl-health
ports:
- "100.110.63.63:8765:8765"
environment:
- CRAWL_HEALTH_DSN=postgresql://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
caddy:
image: caddy:2
ports:
@@ -0,0 +1,19 @@
-- A-3 (plan crawl-24x7-1): 소스 레지스트리 증축 — additive only.
-- fetch_method : rss / rss+page / sitemap+page / page / api / signal-only
-- fulltext_policy : none(현행 유지) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
-- auth_profile : NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터용 슬롯)
-- poll_interval_minutes : 소스별 차등 폴링 (NULL=전역 6h 사이클)
-- etag / last_modified : 조건부 GET 워터마크 — 받은 그대로 저장·재전송 (상태는 전부 DB, APScheduler in-process)
-- feed_content_hash : CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행
-- selector_override : 추출 실패 잦은 소스의 site-specific CSS selector (JSONB)
-- parser_quirk : rdf / table-strip / gn-redirect 등 파서 특이 케이스
ALTER TABLE news_sources
ADD COLUMN IF NOT EXISTS fetch_method VARCHAR(20) NOT NULL DEFAULT 'rss',
ADD COLUMN IF NOT EXISTS fulltext_policy VARCHAR(20) NOT NULL DEFAULT 'none',
ADD COLUMN IF NOT EXISTS auth_profile VARCHAR(50),
ADD COLUMN IF NOT EXISTS poll_interval_minutes INTEGER,
ADD COLUMN IF NOT EXISTS etag TEXT,
ADD COLUMN IF NOT EXISTS last_modified TEXT,
ADD COLUMN IF NOT EXISTS feed_content_hash VARCHAR(64),
ADD COLUMN IF NOT EXISTS selector_override JSONB,
ADD COLUMN IF NOT EXISTS parser_quirk VARCHAR(30);
@@ -0,0 +1,3 @@
-- 0-5 (a) 확정 (plan crawl-24x7-1): 도메인 자료(안전/공학/철학) 채널 신설 — news 와 분리.
-- 신규 값은 같은 트랜잭션 내 사용 금지 (PG 제약) — 본 배치의 다른 마이그레이션은 'crawl' 미사용.
ALTER TYPE source_channel ADD VALUE IF NOT EXISTS 'crawl';
@@ -0,0 +1,3 @@
-- A-2 (plan crawl-24x7-1): RSS 요약 → 기사 페이지 fetch → 4-tier 본문 승격 stage.
-- fulltext_policy='page' 소스의 기사에만 news_collector 가 enqueue.
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'fulltext';
+19
View File
@@ -0,0 +1,19 @@
-- A-5 (plan crawl-24x7-1): 소스 건강 — 소스별 실패 격리 기록 + circuit breaker.
-- 한 소스가 죽어도 나머지 영향 0. silent skip 누적 방지의 가시성 기반 (A-8 패널이 읽음).
-- circuit_state: closed(정상) / open(연속 실패로 지수 backoff 중) / disabled(M회 초과, 수동 복구 대상)
-- empty_streak : 200 인데 entries 0 인 연속 fetch 횟수 (피드 부패 감시 — 304/해시동일은 미집계)
CREATE TABLE IF NOT EXISTS source_health (
id SERIAL PRIMARY KEY,
source_id INTEGER NOT NULL REFERENCES news_sources(id) ON DELETE CASCADE,
consecutive_failures INTEGER NOT NULL DEFAULT 0,
total_fetches BIGINT NOT NULL DEFAULT 0,
total_failures BIGINT NOT NULL DEFAULT 0,
last_success_at TIMESTAMPTZ,
last_error TEXT,
last_error_at TIMESTAMPTZ,
last_fetch_items INTEGER,
empty_streak INTEGER NOT NULL DEFAULT 0,
circuit_state VARCHAR(10) NOT NULL DEFAULT 'closed',
circuit_opened_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@@ -0,0 +1,2 @@
-- A-5: source_health 는 news_sources 와 1:1 — upsert 기준 키.
CREATE UNIQUE INDEX IF NOT EXISTS uq_source_health_source_id ON source_health (source_id);
+12
View File
@@ -0,0 +1,12 @@
FROM python:3.12-slim
WORKDIR /srv
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
EXPOSE 8765
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8765/health')"
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8765"]
+3
View File
@@ -0,0 +1,3 @@
fastapi>=0.111.0
uvicorn>=0.30.0
asyncpg>=0.29.0
+202
View File
@@ -0,0 +1,202 @@
"""crawl-health — 전 소스 헬스 패널 1차 (A-8, plan crawl-24x7-1)
읽기 전용 내부 운영 패널. 의존 = 기존 수집 상태(news_sources/source_health/documents/
processing_queue SELECT 만) — 쓰기 0.
[1차] 소스별 last success / 수집 건수 추이(24h/7d) / 연속 실패 / circuit 상태 /
빈 피드 streak + fulltext 승격/격하 통계 + 큐 백로그. 비-RSS 소스(C-2 sitemap 등)도
같은 표면이 수용 (fetch_method 컬럼 표시 — '구독 소스 패널' 로 좁히지 않는 전 소스 일반화).
[2차 범위 외] B-3 상태 계약 도착 시 세션 열 + [재로그인 시도] 버튼(enqueue 방식).
노출: 별도 바인딩만 — compose 가 Tailscale 인터페이스(100.110.63.63)에만 publish.
vhost/경로 가드 방식 금지 (r4: 둘 다 '덜 깨짐' 속성 상실). 앱 레벨 인증 없음 =
Tailscale 도달성만이 경계 (fab-server 선례).
"""
import html
import logging
import os
from contextlib import asynccontextmanager
import asyncpg
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, JSONResponse
logger = logging.getLogger("crawl_health")
DSN = os.environ.get("CRAWL_HEALTH_DSN", "")
_pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(_app: FastAPI):
global _pool
_pool = await asyncpg.create_pool(DSN, min_size=1, max_size=3)
yield
await _pool.close()
app = FastAPI(lifespan=lifespan)
async def _collect_data() -> dict:
async with _pool.acquire() as conn:
sources = await conn.fetch(
"""
SELECT s.id, s.name, s.country, s.enabled, s.feed_type, s.fetch_method,
s.fulltext_policy, s.last_fetched_at,
h.circuit_state, h.consecutive_failures, h.last_success_at,
h.last_error, h.last_error_at, h.last_fetch_items, h.empty_streak,
h.total_fetches, h.total_failures
FROM news_sources s
LEFT JOIN source_health h ON h.source_id = s.id
ORDER BY s.enabled DESC, s.name
"""
)
counts = await conn.fetch(
"""
SELECT s.id,
count(d.id) FILTER (WHERE d.extracted_at > now() - interval '24 hours') AS items_24h,
count(d.id) AS items_7d
FROM news_sources s
LEFT JOIN documents d
ON d.source_channel = 'news'
AND d.extracted_at > now() - interval '7 days'
AND d.file_path LIKE 'news/' || s.name || '/%'
GROUP BY s.id
"""
)
queue = await conn.fetch(
"""
SELECT stage::text AS stage, status::text AS status, count(*) AS n,
min(created_at) FILTER (WHERE status = 'pending') AS oldest_pending
FROM processing_queue
WHERE stage IN ('fulltext', 'summarize', 'embed', 'chunk')
AND status IN ('pending', 'processing', 'failed')
GROUP BY 1, 2
ORDER BY 1, 2
"""
)
fulltext = await conn.fetch(
"""
SELECT extract_meta -> 'fulltext' ->> 'status' AS status, count(*) AS n
FROM documents
WHERE source_channel = 'news' AND extract_meta ? 'fulltext'
GROUP BY 1
"""
)
count_map = {r["id"]: r for r in counts}
return {
"sources": [
{**dict(r),
"items_24h": count_map.get(r["id"], {}).get("items_24h", 0),
"items_7d": count_map.get(r["id"], {}).get("items_7d", 0)}
for r in sources
],
"queue": [dict(r) for r in queue],
"fulltext": [dict(r) for r in fulltext],
}
@app.get("/health")
async def health():
"""Liveness — Docker healthcheck 용 (DB 미접근, 프로세스 생존만)."""
return {"status": "ok", "service": "crawl-health"}
@app.get("/api/health.json")
async def api_health():
data = await _collect_data()
# asyncpg Record 의 datetime → isoformat 직렬화
def _ser(v):
return v.isoformat() if hasattr(v, "isoformat") else v
return JSONResponse({
k: [{kk: _ser(vv) for kk, vv in row.items()} for row in v]
for k, v in data.items()
})
def _chip(state: str | None, enabled: bool) -> str:
if not enabled:
return '<span class="chip off">OFF</span>'
if state == "disabled":
return '<span class="chip err">DISABLED</span>'
if state == "open":
return '<span class="chip warn">OPEN</span>'
return '<span class="chip ok">OK</span>'
def _fmt_ts(v) -> str:
return v.strftime("%m-%d %H:%M") if v else "-"
@app.get("/", response_class=HTMLResponse)
async def index():
data = await _collect_data()
rows = []
for s in data["sources"]:
err = html.escape((s.get("last_error") or "")[:80])
warn_cls = ""
if s["enabled"] and (s.get("consecutive_failures") or 0) >= 3:
warn_cls = ' class="row-warn"'
elif s["enabled"] and (s.get("empty_streak") or 0) >= 8:
warn_cls = ' class="row-warn"'
rows.append(
f"<tr{warn_cls}>"
f"<td>{html.escape(s['name'])}</td>"
f"<td>{_chip(s.get('circuit_state'), s['enabled'])}</td>"
f"<td>{html.escape(s.get('fetch_method') or 'rss')}</td>"
f"<td>{html.escape(s.get('fulltext_policy') or 'none')}</td>"
f"<td class='num'>{s['items_24h']}</td>"
f"<td class='num'>{s['items_7d']}</td>"
f"<td class='num'>{s.get('consecutive_failures') or 0}</td>"
f"<td class='num'>{s.get('empty_streak') or 0}</td>"
f"<td>{_fmt_ts(s.get('last_success_at'))}</td>"
f"<td>{_fmt_ts(s.get('last_fetched_at'))}</td>"
f"<td class='err-text'>{err}</td>"
f"</tr>"
)
qrows = [
f"<tr><td>{html.escape(q['stage'])}</td><td>{html.escape(q['status'])}</td>"
f"<td class='num'>{q['n']}</td><td>{_fmt_ts(q.get('oldest_pending'))}</td></tr>"
for q in data["queue"]
]
frows = [
f"<tr><td>{html.escape(f['status'] or '-')}</td><td class='num'>{f['n']}</td></tr>"
for f in data["fulltext"]
]
body = f"""<!DOCTYPE html>
<html lang="ko"><head><meta charset="utf-8">
<title>crawl-health — 전 소스 헬스 패널</title>
<style>
body {{ font-family: -apple-system, 'Apple SD Gothic Neo', sans-serif; background: #f5f1e8;
color: #3d3a33; margin: 0; padding: 28px; }}
h1 {{ font-size: 19px; margin: 0 0 4px; }} h2 {{ font-size: 14px; margin: 26px 0 8px; }}
.sub {{ color: #8a8474; font-size: 12px; margin-bottom: 18px; }}
table {{ border-collapse: collapse; width: 100%; background: #fffdf8; font-size: 12.5px; }}
th, td {{ border: 1px solid #e3ddcd; padding: 5px 9px; text-align: left; }}
th {{ background: #ece6d6; font-weight: 600; white-space: nowrap; }}
td.num {{ text-align: right; font-variant-numeric: tabular-nums; }}
td.err-text {{ color: #9a4a3a; font-size: 11.5px; max-width: 320px; }}
tr.row-warn td {{ background: #fbf0e4; }}
.chip {{ display: inline-block; padding: 1px 8px; border-radius: 9px; font-size: 11px; font-weight: 600; }}
.chip.ok {{ background: #dce8d4; color: #3c5a2e; }}
.chip.warn {{ background: #f3e0b8; color: #7a5a14; }}
.chip.err {{ background: #eecfc6; color: #8a2f1d; }}
.chip.off {{ background: #e3ddcd; color: #6e6859; }}
</style></head><body>
<h1>crawl-health — 전 소스 헬스 패널</h1>
<div class="sub">A-8 1차 (피드 수집 헬스) · 내부 전용 (Tailscale 바인딩) · 새로고침 = 실시간 조회</div>
<h2>소스 ({len(rows)})</h2>
<table><tr><th>소스</th><th>circuit</th><th>fetch</th><th>fulltext</th><th>24h</th><th>7d</th>
<th>연속실패</th><th>빈피드</th><th>last success</th><th>last fetch</th><th>last error</th></tr>
{''.join(rows)}</table>
<h2>처리 큐 (fulltext / summarize / embed / chunk)</h2>
<table><tr><th>stage</th><th>status</th><th>건수</th><th>oldest pending</th></tr>
{''.join(qrows) or '<tr><td colspan="4">백로그 없음</td></tr>'}</table>
<h2>fulltext 승격 누적</h2>
<table><tr><th>status</th><th>건수</th></tr>
{''.join(frows) or '<tr><td colspan="2">기록 없음 (파일럿 전환 전)</td></tr>'}</table>
</body></html>"""
return HTMLResponse(body)
+109 -28
View File
@@ -1,12 +1,18 @@
"""marker-service — POST /convert: PDF → markdown + 추출 이미지 base64.
Phase 1B (2026-05-01) — 텍스트만 응답, 이미지 폐기.
Phase 1B.5 (본 변경) — `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
Phase 1B.5 — `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
없는 stateless 변환기 유지 (fastapi 가 NAS persist 담당).
D-1 (plan crawl-24x7-1, 2026-06-10) — idle-unload 운영 전환:
MARKER_PRELOAD=0 : startup warmup 끔 (첫 /convert 시 lazy load)
MARKER_IDLE_UNLOAD_MINUTES : N분 유휴 시 모델 해제 (0=비활성, 기존 동작)
/ready 는 idle(미적재)에서도 200 — fastapi 의 depends_on service_healthy 가
lazy 모드에서 영구 미기동으로 굳는 것 방지. 503 은 warmup_failed 한정.
plan: ~/.claude/plans/piped-humming-crystal.md
"""
import base64
import gc
import hashlib
import io
import logging
@@ -40,6 +46,12 @@ _warmup_done = False
_warmup_error: str | None = None
_warmup_lock = threading.Lock()
# D-1 idle-unload 상태 — 전이는 전부 _warmup_lock 아래
_PRELOAD = os.getenv("MARKER_PRELOAD", "1") != "0"
_IDLE_UNLOAD_MINUTES = int(os.getenv("MARKER_IDLE_UNLOAD_MINUTES", "0"))
_inflight = 0
_last_used = time.monotonic()
# 이미지 응답 cap. base64 응답 크기 폭주 방지. 사용자 PDF 풀 측정 (Phase 1D) 시
# 가장 이미지 많은 문서가 ~30건 수준 → 200 은 안전 마진. 초과 시 truncate flag 응답.
MAX_IMAGES_PER_DOC = int(os.getenv("MARKER_MAX_IMAGES_PER_DOC", "200"))
@@ -68,11 +80,67 @@ def _ensure_warmup() -> None:
raise
def _acquire_models():
"""warmup 보장 + inflight 진입을 원자적으로 — ensure 직후 reaper 가 해제하는 경합 차단."""
global _inflight
while True:
_ensure_warmup()
with _warmup_lock:
if _warmup_done:
_inflight += 1
return
# ensure 와 lock 재진입 사이에 unload 가 끼어든 희귀 경합 — 재시도
def _release_models():
global _inflight, _last_used
with _warmup_lock:
_inflight -= 1
_last_used = time.monotonic()
def _maybe_unload() -> None:
"""유휴 시 모델 해제. 변환 중(inflight>0)이면 절대 해제하지 않는다.
split 변환의 배치 사이 간격은 초 단위 — N>=1분 임계면 배치 사이 해제 없음.
"""
global _models, _converter, _warmup_done
with _warmup_lock:
if not _warmup_done or _inflight > 0:
return
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
return
_models = None
_converter = None
_warmup_done = False
gc.collect()
try:
import torch
torch.cuda.empty_cache()
except Exception:
pass
logger.info(f"[marker-service] idle-unload: 모델 해제 (유휴 {_IDLE_UNLOAD_MINUTES}분 초과)")
async def _idle_reaper():
import asyncio
while True:
await asyncio.sleep(60)
try:
_maybe_unload()
except Exception:
logger.exception("[marker-service] idle reaper 오류")
@app.on_event("startup")
async def startup():
"""startup hook — async warmup 백그라운드. /ready 가 완료 여부 노출."""
"""startup hook — warmup 은 MARKER_PRELOAD 게이트 (D-1: lazy 기본 전환은 compose 가)."""
import asyncio
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
if _PRELOAD:
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
if _IDLE_UNLOAD_MINUTES > 0:
asyncio.create_task(_idle_reaper())
logger.info(f"[marker-service] idle-unload 활성: {_IDLE_UNLOAD_MINUTES}")
class ConvertRequest(BaseModel):
@@ -111,7 +179,12 @@ def health():
@app.get("/ready")
async def ready(response: Response):
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출."""
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출.
D-1: idle(미적재) = 200. 503 은 warmup_failed 한정 — lazy 모드에서 fastapi
depends_on service_healthy 가 영구 미기동으로 굳지 않게. 배포 검증에서
'status=ready' 단언하던 runbook 은 강제 warm 호출(/convert 1건)로 대체.
"""
if _warmup_error:
response.status_code = 503
return {
@@ -121,31 +194,28 @@ async def ready(response: Response):
"error": _warmup_error,
}
if not _warmup_done:
response.status_code = 503
return {
"status": "warming_up",
"status": "warming_up" if _PRELOAD else "idle",
"engine": "marker",
"engine_version": _engine_version,
"models_loaded": False,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
}
return {
"status": "ready",
"engine": "marker",
"engine_version": _engine_version,
"models_loaded": True,
"inflight": _inflight,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
}
@app.post("/convert", response_model=ConvertResponse)
async def convert(req: ConvertRequest):
_ensure_warmup()
p = Path(req.file_path)
if not p.is_file():
raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)})
start = time.monotonic()
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
converter = _converter
if req.start_page is not None and req.end_page is not None:
if req.start_page < 1 or req.end_page < req.start_page:
raise HTTPException(
@@ -155,22 +225,33 @@ async def convert(req: ConvertRequest):
"message": f"start_page={req.start_page} end_page={req.end_page}",
},
)
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
try:
rendered = converter(str(p))
except Exception as exc:
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
raise HTTPException(
status_code=422,
detail={
"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}",
},
) from exc
md_text, _meta, raw_images = text_from_rendered(rendered)
elapsed_ms = int((time.monotonic() - start) * 1000)
# D-1: warmup 보장 + inflight 진입 원자화 — 변환 중 reaper 해제 차단. 해제는 finally.
_acquire_models()
try:
start = time.monotonic()
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
converter = _converter
if req.start_page is not None and req.end_page is not None:
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
try:
rendered = converter(str(p))
except Exception as exc:
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
raise HTTPException(
status_code=422,
detail={
"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}",
},
) from exc
md_text, _meta, raw_images = text_from_rendered(rendered)
elapsed_ms = int((time.monotonic() - start) * 1000)
finally:
_release_models()
images_payload, truncated = _serialize_images(raw_images, str(p))
+83 -27
View File
@@ -1,14 +1,23 @@
"""STT 마이크로서비스 — faster-whisper (GPU) 기반 음성 전사.
filePath → {text, segments:[{start,end,text}]}.
모델은 startup 에서 eager preload (Docker /ready healthcheck 가 모델 적재까지 검증).
기본 모델 large-v3 (VRAM ~3GB, float16). 환경변수로 교체 가능.
환경변수 `STT_PRELOAD=0` 으로 lazy 로 강제 가능 (개발/테스트용).
D-1 (plan crawl-24x7-1, 2026-06-10) — idle-unload 운영 전환:
STT_PRELOAD=0 : startup eager preload 끔 (첫 요청 시 lazy load)
STT_IDLE_UNLOAD_MINUTES: N분 유휴 시 모델 해제 (0=비활성, 기존 동작).
faster-whisper=CTranslate2 라 torch 미설치 — 해제는
참조 제거 + gc (CTranslate2 가 소멸 시 VRAM 반환).
콜드로드 수초~수십 초는 호출측(stt_worker read=1800s)이 흡수. healthcheck 는
cuda 가용성 기준 (compose) — 모델 적재는 더 이상 상시 상태가 아니다.
"""
import asyncio
import gc
import logging
import os
import threading
import time
import unicodedata
from contextlib import asynccontextmanager
from pathlib import Path
@@ -17,18 +26,26 @@ from fastapi import FastAPI
logger = logging.getLogger("stt")
_IDLE_UNLOAD_MINUTES = int(os.getenv("STT_IDLE_UNLOAD_MINUTES", "0"))
@asynccontextmanager
async def lifespan(_app: FastAPI):
# startup: 모델 eager preload 시도. 실패해도 프로세스는 살아 있고
# /ready 가 false 로 남아 healthcheck 가 unhealthy 처리.
# /ready 의 models_loaded 가 false 로 남는다.
if os.getenv("STT_PRELOAD", "1") != "0":
try:
_load_model()
logger.info("stt model preloaded: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
except Exception as e:
logger.exception("stt model preload failed: %s", e)
reaper = None
if _IDLE_UNLOAD_MINUTES > 0:
reaper = asyncio.create_task(_idle_reaper())
logger.info("stt idle-unload 활성: %d", _IDLE_UNLOAD_MINUTES)
yield
if reaper:
reaper.cancel()
app = FastAPI(lifespan=lifespan)
@@ -38,6 +55,11 @@ _MODEL_NAME = os.getenv("WHISPER_MODEL", "large-v3")
_DEVICE = os.getenv("WHISPER_DEVICE", "cuda")
_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "float16")
# load/unload/inflight 상태 전이는 전부 이 lock 아래 (cold 동시 요청 이중 로드 방지 포함)
_model_lock = threading.Lock()
_inflight = 0
_last_used = time.monotonic()
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. OCR 서비스와 동일 패턴."""
@@ -61,14 +83,38 @@ def _resolve_path(file_path: str) -> Path | None:
def _load_model():
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유."""
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유. lock 으로 이중 로드 방지."""
global _model
if _model is not None:
return _model
from faster_whisper import WhisperModel
with _model_lock:
if _model is None:
from faster_whisper import WhisperModel
logger.info("stt model loading: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
return _model
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
return _model
def _maybe_unload() -> None:
"""유휴 시 모델 해제. 처리 중(inflight>0)이면 절대 해제하지 않는다."""
global _model
with _model_lock:
if _model is None or _inflight > 0:
return
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
return
_model = None
gc.collect()
logger.info("stt idle-unload: whisper 모델 해제 (유휴 %d분 초과)", _IDLE_UNLOAD_MINUTES)
async def _idle_reaper():
while True:
await asyncio.sleep(60)
try:
_maybe_unload()
except Exception:
logger.exception("stt idle reaper 오류")
def _cuda_device_count() -> int:
@@ -87,7 +133,7 @@ def health():
@app.get("/ready")
def ready():
"""Readiness — CUDA + 모델 상태. 배포 검증용."""
"""Readiness — CUDA + 모델 상태. healthcheck 는 cuda 만 본다 (D-1 idle-unload)."""
count = _cuda_device_count()
cuda_ok = count > 0
models_loaded = _model is not None
@@ -98,6 +144,8 @@ def ready():
"models_loaded": models_loaded,
"model": _MODEL_NAME,
"compute_type": _COMPUTE_TYPE,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
"inflight": _inflight,
}
@@ -121,6 +169,7 @@ async def transcribe(body: dict):
"duration": 1832.5
}
"""
global _inflight, _last_used
raw_path = body["filePath"]
langs = body.get("langs")
beam_size = int(body.get("beamSize", 5))
@@ -129,28 +178,35 @@ async def transcribe(body: dict):
if resolved is None:
return {"error": f"파일 없음: {raw_path}", "text": "", "segments": []}
model = _load_model()
with _model_lock:
_inflight += 1
try:
model = _load_model()
language = None
if isinstance(langs, list) and len(langs) == 1:
language = langs[0]
language = None
if isinstance(langs, list) and len(langs) == 1:
language = langs[0]
segments_iter, info = model.transcribe(
str(resolved),
beam_size=beam_size,
language=language,
vad_filter=True,
)
segments_iter, info = model.transcribe(
str(resolved),
beam_size=beam_size,
language=language,
vad_filter=True,
)
segments = []
parts = []
for seg in segments_iter:
segments.append({
"start": round(float(seg.start), 2),
"end": round(float(seg.end), 2),
"text": seg.text.strip(),
})
parts.append(seg.text)
segments = []
parts = []
for seg in segments_iter:
segments.append({
"start": round(float(seg.start), 2),
"end": round(float(seg.end), 2),
"text": seg.text.strip(),
})
parts.append(seg.text)
finally:
with _model_lock:
_inflight -= 1
_last_used = time.monotonic()
return {
"text": " ".join(p.strip() for p in parts).strip(),