Compare commits

...

20 Commits

Author SHA1 Message Date
hyungi f4e5db9723 fix(news): 304 를 redirect 로 오인하던 버그 — is_redirect → has_redirect_location
httpx 의 Response.is_redirect 는 3xx 전체(304 Not Modified 포함)에 True 라,
조건부 GET 으로 304 를 받으면 location 없는 같은 URL 을 3회 재요청 후
'redirect 3회 초과'로 오류 처리 → ETag/Last-Modified 받는 안정 피드(SEP/HSE/OSHA
/철학 RSS 등)가 2번째 사이클부터 전멸하던 systematic 버그.

- 304 처리를 redirect 루프보다 앞으로 이동.
- redirect 판별을 has_redirect_location(=location 헤더 있는 진짜 redirect)으로 교체.
  news_collector._fetch_rss + crawl_politeness.fetch_page 동일 함정 양쪽 수정.
- 사이클 1 파일럿(경향)은 304 를 받은 적 없어 잠복했고, 안정 피드 첫 304 에서 발현.
- 회귀 테스트 3건(304 비-redirect / 진짜 redirect / 코드 패턴 audit).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-11 06:32:15 +09:00
hyungi 69db9bcb94 fix(news): 안티봇 챌린지 페이지 식별 게이트 — DataDome corruption 차단 (B-3 실측)
르몽드 기사 = DataDome Client Challenge(316자)가 200자 본문 floor 통과 → 챌린지
HTML 이 기사 본문으로 승격되는 silent corruption 위험. fetch_page_via_browser 에
챌린지 마커 게이트 추가 → CrawlBlocked(degrade=RSS 요약 유지). 헤드리스 탐지라 재시도 무의미.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 17:04:11 +09:00
hyungi 61e5a416d0 fix(news): fetch_page content-type 허용 파라미터 — TWI sitemap(text/xml) 수집 (검증 게이트 발견)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 16:41:30 +09:00
hyungi cdf4ee0ef6 fix(news): Guardian sectionName 'World news' 카테고리 매핑 (셀프 리뷰)
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 16:37:22 +09:00
hyungi 251a5392ef fix(services): playwright-fetcher pwuser 실행 — root Chromium sandbox 함정 회피
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 15:11:03 +09:00
hyungi 1842f27d89 feat(news): crawl-24x7 사이클 2 — B-2/B-3/C-1/C-2/C-3/C-5 (마이그 324-326)
- 채널 인지화: news_sources.source_channel(324, documents enum 재사용) →
  문서 생성 정체성(_doc_identity)·embed/chunk 30일 게이트(crawl=전량 색인)·
  extract 후속 override(crawl→classify, preview 스킵) 분기.
- B-2 Guardian Open Platform: API 디스패치(호스트 분기, 미지 호스트=명시 실패)
  + show-fields=bodyText 전문 어댑터. fixture live 박제 + call-shape 테스트.
- B-3 구독지: playwright-fetcher 격리 컨테이너(동시 1·요청당 브라우저·storage_state
  ro mount) + politeness 사람속도(30-60s) 브라우저 경로 + fulltext 인증 라우팅
  (내용 기반 probe 게이트·relogin_requested 소비=open-스킵보다 앞·본문 페이월 마커
  게이트) + source_health probe 컬럼(325) + 세션 박제 스크립트(맥북용).
- C-2 KOSHA: 3 API live 검증·fixture 박제(board/attach/guide) — 재해사례 daily diff
  +첨부 PDF/HWP→extract 파이프라인, GUIDE 일일 cap 점진 백필(silent cap 금지 로그).
  키는 URL 직결합(재인코딩 함정 회피). daily 06:40 KST.
- C-3 정적 코퍼스: National Board 86 + TWI job-knowledge 153 일괄 CLI(멱등·politeness
  ·crawl_raw 보존·fulltext_worker 승격 필드 규약 동일).
- C-1/C-5 시드(326): 전 URL live 검증 — UK HSE(feed-full)/안전신문/고용노동부 3종
  (rss/*.do)/OSHA/EU-OSHA(후보)/SEP/1000-Word(feed-full)/Doing Philosophy/Aeon/Psyche
  (skip-video quirk).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 15:08:18 +09:00
hyungi 53a30449e2 fix(news): crawl_politeness logger 를 setup_logger 로 정합화 — INFO 대기 로그 가시화
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:47:18 +09:00
hyungi ab668d7990 fix(news): crawl_raw 파일명 CHAR(64) 패딩 strip + politeness 대기 로그
- documents.file_hash 실 컬럼이 character(64) — 32자 해시가 공백 패딩되어
  gz 파일명에 공백 32개 포함 (실배포 1건 실측). _raw_html_path 에서 strip.
- _respect_domain_rate silent sleep 에 대기 로그 1줄 (검증 게이트·운영 가시성).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:43:29 +09:00
hyungi dcf99b377e fix(news): 적대 리뷰 반영 — reconcile auto-correlation·워터마크 검증 후 영속·수집 락
- fulltext_worker.reconcile_unresolved: EXISTS 서브쿼리 aliased(ProcessingQueue) —
  auto-correlation 이 FROM 전부 제거해 매 실행 InvalidRequestError (안전망 dead code).
  SQLAlchemy 2.0.50 컴파일 재현·수정 확인.
- news_collector._fetch_rss: ETag/Last-Modified/content-hash 영속을 bozo 파싱 검증
  뒤로 이동 — 부패 응답 워터마크 저장 시 영구 304-skip 차단.
- news_collector.run: 모듈 락으로 수동 collect vs 6h 스케줄 동시 실행 차단 —
  _get_or_create_health 동시 INSERT 의 uq_source_health_source_id 위반이
  사이클 전체를 죽이는 경합 봉쇄.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 13:34:46 +09:00
hyungi 3df0ca53ab feat(services): crawl-24x7 A-8 헬스 패널 + D-1 stt/marker idle-unload
A-8 1차: crawl-health 컨테이너(100.110.63.63:8765 Tailscale 바인딩 전용, 읽기 전용 SELECT, caddy 라우트 금지).
D-1 전제 작업: STT_PRELOAD=0+30분 유휴 해제(lock+inflight+reaper), marker MARKER_PRELOAD=0+idle-unload,
/ready idle=200(503=warmup_failed 한정 — fastapi depends_on 정합), healthcheck cuda 기준 전환.
2026-06-10 13:03:31 +09:00
hyungi 7cd8cfde0a feat(news): crawl-24x7 A그룹 — 레지스트리 증축·조건부 GET·fulltext 승격·politeness·source_health
A-3 migrations 319-323 (news_sources 9컬럼 + source_channel 'crawl' + process_stage 'fulltext' + source_health)
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송)+콘텐츠 해시 변경감지, A-4 politeness 코어(per-domain 직렬+robots+정직UA),
A-2+A-7 fulltext_worker(4-tier 재사용·NAS crawl_raw gzip 보존·격하 경로·03:40 reconcile 안전망),
A-5 circuit breaker(3/10 임계, enabled 미터치), A-6 포털 전재 2차 dedup(제목+3일, 12자 게이트).
기존 소스 fulltext_policy='none' 기본 = 무회귀. plan crawl-24x7-1, 예외 박제 crawl-24x7-exec1-20260610.md
2026-06-10 13:03:31 +09:00
hyungi acd595244a fix(news): URL dedup 정규화 저장·조회 통일 + 다중매칭 내성
BBC Technology 매 사이클 MultipleResultsFound (06-04~) 해소.
- 저장 edit_url=raw vs 조회 normalized 비대칭으로 URL dedup 무력화돼
  교차게시(HN x BBC) 시 2행 동시매칭 -> scalar_one_or_none raise.
- _normalize_url: query 전체 제거 -> tracking 파라미터만 제거로 교정
  (hada.io/topic?id= 등 query-식별 사이트 870건 붕괴 방지, 리뷰 게이트).
- 조회 .first() + edit_url IN (normalized, raw) 레거시 행 내성. RSS/NYT 양쪽.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 22:26:22 +00:00
hyungi 34eb5c9411 refactor(workers)!: SMTP 메일 발송 기능 전면 제거
다이제스트/이메일수집알림/법령알림 메일 발송 폐기 (사용자 결정 2026-06-10).
근거: 게이트(if smtp_host and smtp_user)가 06-07 전엔 항상 false(silent skip),
자격증명 활성 후엔 100% 553 Sender rejected — 한 통도 전달 성공 이력 없음.
law_monitor 는 CalDAV VTODO 가 단일 알림 채널로 유지. 다이제스트 .md 생성/
90일 아카이브, 이메일 IMAP 수집은 무변경. eid dispatch 의 send_smtp_email
문자열 블랙리스트는 의도적 잔존(코드층 박탈 강화와 정합).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 22:26:22 +00:00
hyungi 8e1645dfc9 fix(markdown): news article md_status pending→skipped 정합화
news article 은 텍스트 네이티브(본문=extracted_text)라 markdown 단계를 미enqueue
하는데(summarize/embed/chunk 만), md_status 기본값 pending 이 영구 고착돼 30,903 건이
비수렴 → (1) backlog 지표 오염(실 미변환≈0인데 pending 30,930) (2) md_status_pending
partial 인덱스 비대. terminal skipped(변환 비대상)로 정합화.

- news_collector.py: RSS/API 양쪽 Document 생성 시 md_status=skipped +
  md_extraction_error 사유 명시(생성 시점부터 정합).
- documents/[id]/+page.svelte: article 뷰의 MarkdownDoc 에 mdStatus 미전달(null).
  badge 는 mdStatus 로만 구동 → skipped 라도 "Markdown 제외" 칩이 3만 기사에
  뜨지 않게(article 은 markdown 변환 비대상이라 badge 자체가 무의미).
- 기존 30,903 건 backfill UPDATE(별도 실행): pending 30,930→27, partial 인덱스 동일 축소.

검증: pending 잔여 27(eml/doc/xls/이미지/미디어 long-tail) / 검색 무영향(article
extracted_text·chunks 그대로) / md_status 만 변경.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 06:22:04 +00:00
hyungi 55216271a6 feat(markdown): hwp raster 이미지 NAS 영속 + library backfill 스크립트
pyhwp(hwp5html) 가 bindata/ 로 추출하는 raster 이미지를 NAS 에 영속한다. 기존엔
변환 tempdir 와 함께 폐기돼 경고 없이 silent 유실(도식·수식)이었다(적대 리뷰 MEDIUM).

- office_md.py: _run_hwp5html 으로 hwp5html 1회 실행 → (markdown, raster_images).
  convert_hwp_to_md_and_images() 신규 = marker_worker 이미지 경로용. hwp5html 은 이미지를
  본문 xhtml 에 <img> 앵커하지 않아(--css/--html 동일) 인라인 위치 복원 불가 → 호출부가
  말미 갤러리로 부착. OLE 수식/도형은 앵커도 raster 도 아니라 영속 제외.
- marker_worker._process_office: .hwp raster 를 marker(PDF)의 _persist_images_to_nas 로
  NAS 영속 + document_images UPSERT(_sync_document_images, 재변환 orphan 정리) + md 말미
  ## 첨부 이미지 docimg: 갤러리 + quality.warnings hwp_images_appended. docx/xlsx/pptx/
  hwpx 는 이미지 미처리(기존 동작 유지).
- scripts/backfill_hwp_library.py: 지정 PKM 폴더 .hwp 를 content-hash dedup(Inbox 중복 +
  _1/카피본 사본 흡수) 후 category=library 일회성 ingest.

검증(E2E): Knowledge/Engineering 18개 → dedup 후 신규 5개(산업안전기사 3~7과목) ingest,
5/5 success. 제4과목 raster 3장 → NAS extracted_images/35778/img_001~003.jpeg 실재 +
document_images 3 row(engine=pyhwp) + md 갤러리 docimg ref. 이미지 없는 문서는 갤러리
미생성. 텍스트/표 경로 회귀 0(기존 4건 재변환 success).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 05:10:45 +00:00
hyungi d0994a1bce fix(markdown): hwp 변환 libhwplo→pyhwp 교체 + xml 프롤로그 strip
LibreOffice 번들 libhwplo 필터가 실제 한컴 HWP5 binary 를 못 읽어(rc=0 +
"source file could not be loaded") HWP 전건 실패(0/4). 순수 Python HWP5 전용
변환기 pyhwp(CLI hwp5html)로 교체.

- office_md.py: .hwp → _via_pyhwp_html(hwp5html→index.xhtml→markdownify).
  hwp5html xhtml 의 <?xml?> 선언이 markdownify PI 파싱으로 md 본문에 새고,
  ~34자가 _MIN_BODY_CHARS(16) 빈출력 게이트를 무력화(빈 변환 false-success,
  모듈 불변식 위반) → markdownify 전 프롤로그 re.sub strip.
- .hwpx 는 pyhwp 미지원 → LibreOffice 폴백 유지.
- marker_worker.py: 엔진 라벨 .hwp→pyhwp / .hwpx→libreoffice_hwp / else→markitdown.
- requirements.txt: pyhwp + six(pyhwp 미선언 런타임 의존성).

검증: HWP5 4건(용접 WPS/PQR·산업안전기사 1·2과목·원칙요약) 4/4 success,
한글 무결·표 GFM 보존·xml 아티팩트 0. 기존 포맷 경로(docx/xlsx/pptx·pdf·
passthrough·hwpx) 회귀 없음(적대 리뷰 2렌즈 확인).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 04:19:37 +00:00
hyungi 448195637b fix(documents): g-measure verdict 를 jump-target 대 jump-target 비교로 정정
hier_outline_quality_gate 의 keep-better verdict 가 build jump-target(n_b, window-child 제외)을
stored leaf 전수(n_a, window-child 포함)와 비교 → windowed doc 이 n_a≫n_b 로 거짓 A_better 강등되던 bias 제거.
stored 도 jump-target((비-window leaf OR %_split)+제목)만 카운트. 정정 후 hash_stable 31(≈MEASURE2 32,
fence-flip 1)·dup_title 8·in_corpus 3(5140/5186/5225) 전부 UPDATE-only = MEASURE2 와 정합.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 11:54:01 +09:00
hyungi aeb9290cbd feat(documents): hier 절 char_start offset (Path B) — md_content 점프 builder offset
플랜 ds-outline-anchor-b5 (g1~g6 코드). 핵심 ASME/법령 windowed 절의 0% 점프를
서버계산 char_start(builder offset)로 100% deterministic 점프로 전환.

- g1 migration 318: document_chunks.char_start INTEGER NULL (단일 statement, 멱등)
- g2 builder: char_start emit = FE 라인/offset 모델 미러(split('\n')+UTF-16 code unit+코드펜스 skip).
  window-child=NULL, split-parent=heading offset, preamble=NULL, CR 미strip, NFC=telemetry.
  node.text 보존(라인모델 hash-neutral) → hash_stable doc 보존. 단위테스트 7건.
- g3 persist+backfill 하이브리드:
  * persist INSERT char_start
  * update-char-start (g3-tU): hash_stable doc 비파괴 — 100% jump-target VERIFY(NEW-1) +
    position-aligned PK UPDATE(NEW-2), 미달 doc DEMOTE → re-decompose 합류(NEW-4)
  * --reprocess (g3-t2): md_content 출처(g0-t1) + jump-target-set 완료마커(B1) + B_jumptarget>=1(B3),
    --doc 필수 else REFUSE. self-heal sweep(g3-t3).
- g4 /sections: char_start inner+outer SELECT + split-parent 노출(is_leaf OR %_split)
- g5 FE: resolveAnchorMap(BE-first, NEW-5 jump-target-candidate-scoped 폴백, C1 OR-exclude),
  per-render-site basis guard(C3), endsWith('_split') 정정 + collapseWindows split-parent 흡수(C2).
  단위테스트 25건(NEW-5/B4/C1/C2 포함).
- g6 hier_outline_quality_gate.py: read-only g-measure(verdict/B_jumptarget/hash_stable/dup/fence)

배포(g7: --no-deps, 스냅샷, UPDATE-only 32 + re-decompose 230∪demote, 정확도 게이트)는 별 ops 단계.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-09 10:12:26 +09:00
hyungi 9bf41d1dfc Merge pull request 'feat(documents): 3-pane 중앙 리더에 절 목차 rail + 점프 + scroll-spy' (#32) from feat/documents-outline-rail into main
Reviewed-on: #32
2026-06-08 21:27:51 +09:00
hyungi 6c6b350aca Merge pull request 'Feat/outline anchor' (#31) from feat/outline-anchor into main
Reviewed-on: #31
2026-06-08 21:16:45 +09:00
57 changed files with 3792 additions and 296 deletions
+5 -4
View File
@@ -663,8 +663,9 @@ class SectionItem(BaseModel):
section_title: str | None = None # raw 마크다운 포함 — 정제는 프런트(headingPath.ts)
heading_path: str | None = None # raw
level: int | None = None
node_type: str | None = None # window | section_split | null
node_type: str | None = None # window | chapter_split | clause_split | section_split | null
is_leaf: bool
char_start: int | None = None # md_content 내 heading offset(UTF-16). jump-target 만 값, 그 외 None (Path B)
section_type: str | None = None
summary: str | None = None # status='summarized' 인 분석행에만, 그 외 None
confidence: float | None = None
@@ -703,12 +704,12 @@ async def get_document_sections(
await session.execute(
sql_text(
"""
SELECT chunk_id, section_title, heading_path, level, node_type, is_leaf,
SELECT chunk_id, section_title, heading_path, level, node_type, is_leaf, char_start,
section_type, summary, confidence
FROM (
SELECT DISTINCT ON (c.id)
c.id AS chunk_id, c.chunk_index, c.section_title, c.heading_path,
c.level, c.node_type, c.is_leaf,
c.level, c.node_type, c.is_leaf, c.char_start,
a.section_type,
CASE WHEN a.status = 'summarized' THEN a.summary ELSE NULL END AS summary,
a.confidence
@@ -717,7 +718,7 @@ async def get_document_sections(
ON a.chunk_id = c.id AND a.status = 'summarized'
WHERE c.doc_id = :doc_id
AND c.source_type = 'hier_section'
AND c.is_leaf = true
AND (c.is_leaf = true OR c.node_type LIKE '%\\_split' ESCAPE '\\')
ORDER BY c.id, a.created_at DESC, a.id DESC
) t
ORDER BY t.chunk_index
+284
View File
@@ -0,0 +1,284 @@
"""크롤링 politeness 코어 (A-4, plan crawl-24x7-1)
개인 아카이빙 권장치를 그대로 박은 공용 fetch 계층:
- per-domain 동시성 1 (asyncio.Lock) + 같은 도메인 연속 요청 515초 지연 + jitter
- robots.txt 존중 (urllib.robotparser, 24h 캐시) — 비로그인 공개 크롤링 한정.
로그인 세션 fetch (B-3) 는 사용자 행위 성격이라 robots 대신 사람 속도가 기준.
- 정직 식별 UA + 연락처 (익명 크롤링 트랙. 로그인 세션은 브라우저 UA 유지 — B-3)
- 429 = Retry-After 존중 / 5xx = 재시도 가능 / 403 = 차단 신호 (호출측 circuit 연동)
도메인별 마지막 요청 시각 등 rate 상태는 in-process (영속 워터마크는 DB — news_sources).
SSRF 차단은 core.url_validator.validate_feed_url 재사용 (redirect target 재검증 포함).
"""
import asyncio
import random
import time
import urllib.robotparser
from urllib.parse import urljoin, urlparse
import httpx
from core.url_validator import validate_feed_url
from core.utils import setup_logger
# bare getLogger 는 root(WARNING) 상속이라 INFO 대기/차단 로그가 드랍됨 — 타 워커와 동일 설정
logger = setup_logger("crawl_politeness")
# 정직 식별 UA + 연락처 — 차단 전 연락 통로 (A-4)
CRAWL_UA = "HyungiPKM-Archiver/1.0 (personal archive; +mailto:hyun49196@gmail.com)"
# 같은 도메인 연속 요청 간격 (초) — 권장치 515s + jitter
_DOMAIN_DELAY_MIN = 5.0
_DOMAIN_DELAY_MAX = 15.0
# 구독 세션(브라우저) fetch 간격 — 사람 속도 (B-3 ④: 기사 간 수십 초)
_AUTH_DELAY_MIN = 30.0
_AUTH_DELAY_MAX = 60.0
# B-3 Playwright 격리 컨테이너 (internal-only, compose DNS)
_FETCHER_URL = "http://playwright-fetcher:3400"
_FETCHER_TIMEOUT = 120.0 # 브라우저 기동 + 네비게이션 + settle 포함
# 안티봇 챌린지 페이지 식별 마커 (DataDome/Cloudflare 등) — 좁게 유지(오탐 회피).
# 실측: 르몽드 기사 = DataDome "Client Challenge" + "Entrez les caractères" CAPTCHA.
_CHALLENGE_MARKERS = (
"Client Challenge",
"Entrez les caractères affichés",
"Checking your browser before",
"captcha-delivery.com",
"geo.captcha-delivery",
)
_ROBOTS_CACHE_TTL = 24 * 3600 # 24h
_MAX_PAGE_BYTES = 5 * 1024 * 1024 # 피드 fetch 와 동일 5MB cap
_PAGE_TIMEOUT = 20.0
_MAX_REDIRECTS = 3
_HTML_CONTENT_TYPES = ("text/html", "application/xhtml+xml")
class CrawlFetchError(Exception):
"""일시 오류 (5xx / timeout / 네트워크) — 큐 재시도 대상."""
class CrawlBlocked(Exception):
"""차단 신호 (403 / 429 / robots disallow) — 재시도보다 backoff/circuit 대상."""
class CrawlSkip(Exception):
"""영구 비대상 (비-HTML / 크기 초과 / SSRF 차단 / 4xx) — 격하 처리 대상."""
# 도메인별 직렬화 상태 (in-process)
_domain_locks: dict[str, asyncio.Lock] = {}
_domain_last_request: dict[str, float] = {}
# host → (cached_at, RobotFileParser | None). None = robots 없음/4xx (전부 허용)
_robots_cache: dict[str, tuple[float, urllib.robotparser.RobotFileParser | None]] = {}
def _domain_of(url: str) -> str:
return (urlparse(url).hostname or "").lower()
def _get_lock(domain: str) -> asyncio.Lock:
if domain not in _domain_locks:
_domain_locks[domain] = asyncio.Lock()
return _domain_locks[domain]
async def _respect_domain_rate(
domain: str,
delay_min: float = _DOMAIN_DELAY_MIN,
delay_max: float = _DOMAIN_DELAY_MAX,
) -> None:
"""같은 도메인 직전 요청에서 delay(jitter) 경과할 때까지 대기."""
last = _domain_last_request.get(domain)
if last is not None:
delay = random.uniform(delay_min, delay_max)
wait = last + delay - time.monotonic()
if wait > 0:
# silent sleep 금지 — politeness 동작 검증·운영 관찰 가시성
logger.info("[politeness] %s %.1fs 대기", domain, wait)
await asyncio.sleep(wait)
async def _fetch_robots(client: httpx.AsyncClient, scheme: str, host: str):
"""robots.txt 조회. 4xx/부재 = 전부 허용(None), 5xx/오류 = 보수적으로 이번 사이클 차단."""
robots_url = f"{scheme}://{host}/robots.txt"
try:
resp = await client.get(robots_url, headers={"User-Agent": CRAWL_UA})
except httpx.HTTPError as e:
raise CrawlFetchError(f"robots.txt 조회 실패: {host}: {e}") from e
if resp.status_code >= 500:
# 5xx 는 의도 불명 — 표준 관행대로 이번 사이클은 차단 취급
raise CrawlFetchError(f"robots.txt 5xx: {host}: {resp.status_code}")
if resp.status_code >= 400:
return None # robots 없음 = 전부 허용
rp = urllib.robotparser.RobotFileParser()
rp.parse(resp.text.splitlines())
return rp
async def _robots_allows(client: httpx.AsyncClient, url: str) -> bool:
parsed = urlparse(url)
host = (parsed.hostname or "").lower()
cached = _robots_cache.get(host)
if cached is None or time.monotonic() - cached[0] > _ROBOTS_CACHE_TTL:
rp = await _fetch_robots(client, parsed.scheme or "https", host)
_robots_cache[host] = (time.monotonic(), rp)
cached = _robots_cache[host]
rp = cached[1]
if rp is None:
return True
return rp.can_fetch(CRAWL_UA, url)
async def fetch_page(
url: str, *, check_robots: bool = True,
content_types: tuple[str, ...] = _HTML_CONTENT_TYPES,
) -> tuple[str, str]:
"""공개 페이지 1건 politeness fetch. (html_text, final_url) 반환.
- SSRF 검증 (redirect target 포함, news_collector 피드 fetch 와 동일 이중 검증)
- per-domain 동시성 1 + 515s jitter 지연
- 429: Retry-After 로그 후 CrawlBlocked / 403: CrawlBlocked / 그 외 4xx: CrawlSkip
- 5xx/timeout: CrawlFetchError (큐 재시도)
- 비-HTML content-type / 5MB 초과: CrawlSkip
"""
try:
validate_feed_url(url)
except ValueError as e:
raise CrawlSkip(f"URL 검증 실패: {e}") from e
domain = _domain_of(url)
async with _get_lock(domain):
await _respect_domain_rate(domain)
try:
async with httpx.AsyncClient(
timeout=_PAGE_TIMEOUT, follow_redirects=False,
headers={"User-Agent": CRAWL_UA},
) as client:
if check_robots and not await _robots_allows(client, url):
raise CrawlBlocked(f"robots.txt disallow: {url}")
resp = await client.get(url)
redirects = 0
# has_redirect_location = location 헤더 있는 진짜 redirect 만 (httpx 의
# is_redirect 는 3xx 전체라 304 등을 redirect 로 오인 — news_collector 동일 함정)
while resp.has_redirect_location and redirects < _MAX_REDIRECTS:
location = urljoin(str(resp.request.url), resp.headers["location"])
try:
validate_feed_url(location)
except ValueError as e:
raise CrawlSkip(f"redirect target 차단: {e}") from e
# redirect 도 같은 도메인 연속 요청 — 간격은 lock 보유로 충분 (즉시 1회)
resp = await client.get(location)
redirects += 1
if resp.has_redirect_location:
raise CrawlSkip(f"redirect {_MAX_REDIRECTS}회 초과: {url}")
except httpx.TimeoutException as e:
raise CrawlFetchError(f"timeout: {url}") from e
except httpx.HTTPError as e:
raise CrawlFetchError(f"네트워크 오류: {url}: {e}") from e
finally:
_domain_last_request[domain] = time.monotonic()
if resp.status_code == 429:
retry_after = resp.headers.get("retry-after", "")
logger.warning("[politeness] 429 %s (Retry-After=%s)", domain, retry_after or "-")
raise CrawlBlocked(f"429 rate limited: {url} (Retry-After={retry_after or '-'})")
if resp.status_code == 403:
raise CrawlBlocked(f"403 forbidden: {url}")
if resp.status_code >= 500:
raise CrawlFetchError(f"{resp.status_code}: {url}")
if resp.status_code >= 400:
raise CrawlSkip(f"{resp.status_code}: {url}")
ct = resp.headers.get("content-type", "").lower()
if ct and not any(t in ct for t in content_types):
raise CrawlSkip(f"비허용 content-type: {ct}: {url}")
if len(resp.content) > _MAX_PAGE_BYTES:
raise CrawlSkip(f"크기 초과: {len(resp.content)} bytes: {url}")
return resp.text, str(resp.request.url)
# ── B-3 구독 세션 fetch (Playwright 격리 컨테이너 경유) ──────────────────────
async def fetch_page_via_browser(url: str, profile: str) -> tuple[str, str]:
"""인증 페이지 1건 — playwright-fetcher 에 위임, politeness 는 사람 속도(30~60s).
(html_text, final_url) 반환. robots 미적용 — 구독 계약 기반 개인 보관 fetch 로
공개 크롤러 규약 대상이 아님 (대신 사람 속도 + 동시 1 + 야간 저빈도가 보호 장치).
예외 어휘는 fetch_page 와 동일 (호출측 분기 재사용).
"""
try:
validate_feed_url(url)
except ValueError as e:
raise CrawlSkip(f"URL 검증 실패: {e}") from e
domain = _domain_of(url)
async with _get_lock(domain):
await _respect_domain_rate(domain, _AUTH_DELAY_MIN, _AUTH_DELAY_MAX)
try:
async with httpx.AsyncClient(timeout=_FETCHER_TIMEOUT) as client:
resp = await client.post(
f"{_FETCHER_URL}/fetch", json={"url": url, "profile": profile}
)
except httpx.TimeoutException as e:
raise CrawlFetchError(f"browser fetch timeout: {url}") from e
except httpx.HTTPError as e:
raise CrawlFetchError(f"playwright-fetcher 연결 오류: {e}") from e
finally:
_domain_last_request[domain] = time.monotonic()
if resp.status_code == 503:
# storage_state 부재 — 수동 세션 박제 대기 (호출측 degrade, 재시도 루프 금지)
raise CrawlBlocked(f"세션 프로필 부재: {profile}")
if resp.status_code != 200:
raise CrawlFetchError(f"playwright-fetcher {resp.status_code}: {url}")
data = resp.json()
html_text = data.get("html", "")
if len(html_text.encode("utf-8", errors="replace")) > _MAX_PAGE_BYTES:
raise CrawlSkip(f"크기 초과 (browser): {url}")
# 안티봇 챌린지 페이지(DataDome 등) 식별 — 본문 길이 게이트(200자)를 통과하는
# 짧은 챌린지 HTML 이 기사 본문으로 승격되는 silent corruption 차단. 헤드리스 탐지라
# 재시도 무의미 → CrawlBlocked(=degrade, RSS 요약 유지). 마커는 보수적으로 좁게.
if any(m in html_text for m in _CHALLENGE_MARKERS):
raise CrawlBlocked(f"안티봇 챌린지 페이지(headless 차단): {url}")
return html_text, data.get("final_url", url)
async def probe_session(
profile: str, probe_url: str, min_body_chars: int, paywall_markers: list[str]
) -> dict:
"""내용 기반 세션 probe (B-3 ②) — {'ok': bool, 'reason': str|None, 'body_chars': int}.
실패를 예외가 아닌 값으로 반환 — 호출측이 source_health 에 기록하고 degrade 분기.
probe 도 실제 publisher fetch 라 동일 도메인 lock + 사람 속도 적용.
"""
domain = _domain_of(probe_url)
async with _get_lock(domain):
await _respect_domain_rate(domain, _AUTH_DELAY_MIN, _AUTH_DELAY_MAX)
try:
async with httpx.AsyncClient(timeout=_FETCHER_TIMEOUT) as client:
resp = await client.post(
f"{_FETCHER_URL}/probe",
json={
"profile": profile,
"probe_url": probe_url,
"min_body_chars": min_body_chars,
"paywall_markers": paywall_markers,
},
)
except httpx.HTTPError as e:
return {"ok": False, "reason": f"fetcher 연결 오류: {e}", "body_chars": 0}
finally:
_domain_last_request[domain] = time.monotonic()
if resp.status_code == 503:
return {"ok": False, "reason": f"세션 프로필 부재: {profile}", "body_chars": 0}
if resp.status_code != 200:
return {"ok": False, "reason": f"fetcher {resp.status_code}", "body_chars": 0}
return resp.json()
-30
View File
@@ -106,33 +106,3 @@ END:VCALENDAR"""
except Exception as e:
logging.getLogger("caldav").error(f"CalDAV VTODO 생성 실패: {e}")
return None
# ─── SMTP 헬퍼 ───
def send_smtp_email(
host: str,
port: int,
username: str,
password: str,
subject: str,
body: str,
to_addr: str | None = None,
):
"""Synology MailPlus SMTP로 이메일 발송"""
import smtplib
from email.mime.text import MIMEText
to_addr = to_addr or username
msg = MIMEText(body, "plain", "utf-8")
msg["Subject"] = subject
msg["From"] = username
msg["To"] = to_addr
try:
with smtplib.SMTP_SSL(host, port, timeout=30) as server:
server.login(username, password)
server.send_message(msg)
except Exception as e:
logging.getLogger("smtp").error(f"SMTP 발송 실패: {e}")
+7
View File
@@ -54,6 +54,8 @@ async def lifespan(app: FastAPI):
from workers.law_monitor import run as law_monitor_run
from workers.mailplus_archive import run as mailplus_run
from workers.news_collector import run as news_collector_run
from workers.fulltext_worker import reconcile_unresolved as fulltext_reconcile_run
from workers.kosha_collector import run as kosha_collector_run
from workers.queue_consumer import consume_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue
@@ -121,9 +123,14 @@ async def lifespan(app: FastAPI):
# 이드 W3-2: 공부중 토픽 약점 derived 스냅샷 (nightly 04:30 KST, LLM 0). study_diagnosis 표면 source.
scheduler.add_job(study_weakness_run, CronTrigger(hour=4, minute=30, timezone=KST), id="study_weakness")
scheduler.add_job(news_collector_run, "interval", hours=6, id="news_collector")
# crawl-24x7 A-2 안전망: fulltext 영구 실패(3회 소진) 문서를 RSS 요약 기준으로
# 후속 enqueue (silent skip 누적 방지). 03:40 = dedup_reconcile(03:30) 직후 비충돌 슬롯.
scheduler.add_job(fulltext_reconcile_run, CronTrigger(hour=3, minute=40, timezone=KST), id="fulltext_reconcile")
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
# crawl-24x7 C-2: KOSHA 재해사례 diff + GUIDE 점진 백필 (daily, 새벽 잡들과 비충돌 슬롯).
scheduler.add_job(kosha_collector_run, CronTrigger(hour=6, minute=40, timezone=KST), id="kosha_collector")
scheduler.start()
# Phase 2.1 (async 구조): QueryAnalyzer prewarm.
+1 -1
View File
@@ -118,7 +118,7 @@ class Document(Base):
source_channel: Mapped[str | None] = mapped_column(
Enum("law_monitor", "devonagent", "email", "web_clip",
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
"voice", "hermes",
"voice", "hermes", "crawl",
name="source_channel")
)
# 외부 채널 (Hermes Discord 등) 의 channel/user/message_id/timestamp 메타.
+31 -1
View File
@@ -2,7 +2,8 @@
from datetime import datetime
from sqlalchemy import Boolean, DateTime, String, Text
from sqlalchemy import Boolean, DateTime, Enum, Integer, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
@@ -23,3 +24,32 @@ class NewsSource(Base):
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
# ── A-3 (plan crawl-24x7-1) 레지스트리 증축 — migration 319 ──
# fetch_method: rss / rss+page / sitemap+page / page / api / signal-only
fetch_method: Mapped[str] = mapped_column(String(20), default="rss")
# fulltext_policy: none(현행) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
fulltext_policy: Mapped[str] = mapped_column(String(20), default="none")
# NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터 슬롯)
auth_profile: Mapped[str | None] = mapped_column(String(50))
# 소스별 차등 폴링 (NULL=전역 6h 사이클)
poll_interval_minutes: Mapped[int | None] = mapped_column(Integer)
# 조건부 GET 워터마크 — 서버가 준 값 그대로 저장·재전송 (A-1)
etag: Mapped[str | None] = mapped_column(Text)
last_modified: Mapped[str | None] = mapped_column(Text)
# CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행 (A-1)
feed_content_hash: Mapped[str | None] = mapped_column(String(64))
# 추출 실패 잦은 소스의 site-specific CSS selector (A-2)
selector_override: Mapped[dict | None] = mapped_column(JSONB)
# rdf / table-strip / gn-redirect / skip-video 등 파서 특이 케이스 (B-5)
parser_quirk: Mapped[str | None] = mapped_column(String(30))
# 채널 — 'news'(다이제스트/브리핑 대상) / 'crawl'(도메인 재료, 0-5 (a)) — migration 324.
# documents.source_channel 로 전파, crawl 채널은 embed/chunk 30일 게이트 미적용.
# documents 와 동일 PG enum 재사용 (Document 모델과 값 목록 동기 유지).
source_channel: Mapped[str] = mapped_column(
Enum("law_monitor", "devonagent", "email", "web_clip",
"tksafety", "inbox_route", "manual", "drive_sync", "news", "memo",
"voice", "hermes", "crawl",
name="source_channel"),
default="news",
)
+2 -1
View File
@@ -18,10 +18,11 @@ class ProcessingQueue(Base):
stage: Mapped[str] = mapped_column(
# 'stt' (audio): migration 150 / 'thumbnail' (video): queue_consumer 가 enqueue.
# 'deep_summary' (PR-B B-1): classify_worker 가 에스컬레이션 시 enqueue.
# 'fulltext' (crawl-24x7 A-2): migration 321 — 기사 페이지 fetch 후 본문 승격.
# DB enum 변경은 마이그레이션이 처리하므로 create_type=False.
Enum(
"extract", "classify", "summarize", "embed", "chunk", "preview",
"stt", "thumbnail", "deep_summary", "markdown",
"stt", "thumbnail", "deep_summary", "markdown", "fulltext",
name="process_stage",
create_type=False,
),
+44
View File
@@ -0,0 +1,44 @@
"""source_health 테이블 ORM (A-5, plan crawl-24x7-1)
news_sources 1:1. 소스별 fetch 성공/실패 기록 + circuit breaker 상태.
silent skip 누적 방지의 가시성 기반 A-8 헬스 패널이 읽는다.
"""
from datetime import datetime
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class SourceHealth(Base):
__tablename__ = "source_health"
id: Mapped[int] = mapped_column(primary_key=True)
source_id: Mapped[int] = mapped_column(
Integer, ForeignKey("news_sources.id", ondelete="CASCADE"), nullable=False
)
consecutive_failures: Mapped[int] = mapped_column(Integer, default=0)
total_fetches: Mapped[int] = mapped_column(BigInteger, default=0)
total_failures: Mapped[int] = mapped_column(BigInteger, default=0)
last_success_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_error: Mapped[str | None] = mapped_column(Text)
last_error_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_fetch_items: Mapped[int | None] = mapped_column(Integer)
# 200 인데 entries 0 인 연속 fetch 횟수 (304/해시동일은 미집계 — 피드 부패 신호 전용)
empty_streak: Mapped[int] = mapped_column(Integer, default=0)
# closed(정상) / open(연속 실패 → 지수 backoff) / disabled(임계 초과, 수동 복구 대상)
circuit_state: Mapped[str] = mapped_column(String(10), default="closed")
circuit_opened_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
# ── B-3 구독 세션 상태 계약 — migration 325 ──
# 쓰기 1종 플래그: A-8 버튼이 기록만, 어댑터가 소비(수동 half-open).
# 소비 위치 = open-스킵 분기보다 앞 (r5 함정 고정 — 데드 버튼 방지).
relogin_requested: Mapped[bool] = mapped_column(Boolean, default=False)
# 내용 기반 probe 결과 (시간 기반 만료 판정 금지 — 페이월 안내문 silent corruption 차단)
last_probe_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
last_probe_ok: Mapped[bool | None] = mapped_column(Boolean)
+10 -3
View File
@@ -17,10 +17,17 @@ python-multipart>=0.0.9
jinja2>=3.1.0
feedparser>=6.0.0
pymupdf>=1.24.0
# Web/Blog ingest (devonagent 트랙) — HTML 본문 정화 4-tier fallback
trafilatura>=1.12.0
# Web/Blog ingest (devonagent 트랙) + 뉴스 fulltext 승격 (crawl-24x7 A-2) — 4-tier fallback.
# trafilatura 는 단일 메인테이너 리스크로 exact pin (A-2 결정).
trafilatura==2.1.0
readability-lxml>=0.8.1
markdownify>=0.13.1
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1). hwp 는 LibreOffice+markdownify 경로.
# tier-4 (bs4) 가 직접 import — 전이 의존 가정 제거 (crawl-24x7 A-2)
beautifulsoup4>=4.12.0
# office OOXML(docx/xlsx/pptx) → md (plan ds-s1-backend-1 C-1).
# 정확한 핀은 E-1 markitdown OOXML PoC(devsbx/버전핀 컨텍스트)에서 확정.
markitdown[docx,xlsx,pptx]>=0.1.0
# .hwp(HWP5 binary) → md: 순수 Python HWP5 전용 변환기(CLI hwp5html). LibreOffice 번들 libhwplo
# 필터가 실제 한컴 HWP5 를 못 읽어 전건 실패 → pyhwp 로 교체(2026-06-09). six = pyhwp 의 미선언 런타임 의존성.
pyhwp>=0.1b15
six>=1.16.0
+100 -28
View File
@@ -13,6 +13,7 @@
from __future__ import annotations
import re
import hashlib
import unicodedata
from dataclasses import dataclass, field
STRUCTURE_SPLIT_THRESHOLD = 4000
@@ -27,6 +28,17 @@ _KO_JEOL = re.compile(r'^\s*(?P<title>제\s*\d+\s*절\b.*)$')
_KO_JO = re.compile(r'^\s*(?P<title>제\s*\d+\s*조\b.*)$')
_ENG = re.compile(r'^\s*(?P<title>(?:Chapter|Section|Article|Part|PART)\s+[\dIVXLA-Z]+\b.*)$')
# 코드펜스 경계 (FE outlineAnchors.ts:60 `/^\s{0,3}(```|~~~)/` 와 동일). 펜스 내부 라인은
# heading 미탐지 — 코드블록 안 '# foo' 가 가짜 절을 만들지 않게(O3).
_FENCE = re.compile(r'^\s{0,3}(```|~~~)')
def _utf16_units(s: str) -> int:
"""JS 문자열 .length(= UTF-16 code unit 수) 와 동일. astral(BMP 밖)=surrogate pair=2 units.
FE `raw.length` / `out.slice(off)` UTF-16 code unit 단위라 char_start 같은 단위여야 .
len(s.encode('utf-16-le'))//2 = code unit (utf-16-le BOM 미부착)."""
return len(s.encode("utf-16-le")) // 2
@dataclass
class HierNode:
@@ -39,6 +51,9 @@ class HierNode:
text: str
is_leaf: bool = True
chunk_content_hash: str = field(default="")
# md_content 내 heading 라인 시작 offset(UTF-16 code unit). jump-target(비-window leaf / %_split parent)만
# 값 보유; window-child / preamble(title None) = None(점프 타깃 아님, g0-t2/g2-t3).
char_start: int | None = None
def finalize_hash(self):
self.chunk_content_hash = hashlib.sha256(self.text.encode("utf-8")).hexdigest()
@@ -57,33 +72,64 @@ def _detect_heading(line: str) -> tuple[int, str, str] | None:
return None
def _segment(text: str) -> list[tuple[int, str | None, str | None, str]]:
"""heading 경계로 분할 → [(level, title, node_type, segment_text), ...].
def _segment(text: str) -> list[tuple[int, str | None, str | None, str, int | None]]:
"""heading 경계로 분할 → [(level, title, node_type, segment_text, char_start), ...].
preamble( heading 이전 본문) = (0, None, None, text).
라인 모델 = FE outlineAnchors.ts:55-65 동일: `text.split('\n')` + UTF-16 code-unit offset +
코드펜스 추적(splitlines(keepends=True) 폐기 JS 라인경계 \v\f\x1c 7종을 다르게 쪼개는 문제 제거).
char_start = segment 라인(=heading 라인) UTF-16 offset. preamble = None(점프 타깃 아님).
node.text 보존(라인모델 변경에 hash-neutral): 그룹을 '\n'.join 하되 마지막 그룹이 아니면 분리용 '\n'
그룹 끝에 되돌려 붙여(= splitlines(keepends) 마지막 라인에 \n 남기던 동작) 원문과 동일.
CR 미strip(CRLF '\r' 잔류 FE raw.length 동일), NFC 무변환.
"""
lines = text.splitlines(keepends=True)
segs: list[tuple[int, str | None, str | None, list[str]]] = []
cur: tuple[int, str | None, str | None, list[str]] | None = None
preamble: list[str] = []
for ln in lines:
h = _detect_heading(ln.rstrip("\n"))
if h:
if cur is not None:
segs.append(cur)
elif preamble and "".join(preamble).strip():
segs.append((0, None, None, preamble))
cur = (h[0], h[1], h[2], [ln])
raw_lines = text.split("\n")
n = len(raw_lines)
# 라인별 (offset, heading) 선계산 — 펜스 내부/경계 라인은 heading 미탐지.
offs: list[int] = []
headings: list[tuple[int, str, str | None] | None] = []
off = 0
in_fence = False
for raw in raw_lines:
fence_toggle = bool(_FENCE.match(raw))
fenced_here = in_fence or fence_toggle
offs.append(off)
headings.append(None if fenced_here else _detect_heading(raw))
if fence_toggle:
in_fence = not in_fence
off += _utf16_units(raw) + 1 # '\n'
# 그룹 경계 = 첫 heading 이전(preamble) + 각 heading 라인. (start_idx, meta) 리스트.
first_heading = next((i for i in range(n) if headings[i] is not None), None)
starts: list[int] = []
metas: list[tuple[int, str | None, str | None] | None] = []
if first_heading is None:
starts.append(0)
metas.append(None) # 전체 = preamble
else:
if first_heading > 0:
starts.append(0)
metas.append(None)
for i in range(first_heading, n):
h = headings[i]
if h is not None:
starts.append(i)
metas.append((h[0], h[1], h[2]))
segs: list[tuple[int, str | None, str | None, str, int | None]] = []
for gi, s_idx in enumerate(starts):
e_idx = starts[gi + 1] if gi + 1 < len(starts) else n
seg_text = "\n".join(raw_lines[s_idx:e_idx])
if e_idx < n:
seg_text += "\n" # 분리용 '\n' 을 앞 그룹에 귀속(splitlines keepends 동치)
meta = metas[gi]
if meta is None:
if not seg_text.strip(): # 빈 preamble 폐기(기존 동작)
continue
segs.append((0, None, None, seg_text, None))
else:
if cur is None:
preamble.append(ln)
else:
cur[3].append(ln)
if cur is not None:
segs.append(cur)
elif preamble and "".join(preamble).strip():
segs.append((0, None, None, preamble))
return [(lvl, title, nt, "".join(body)) for (lvl, title, nt, body) in segs]
lvl, title, nt = meta
segs.append((lvl, title, nt, seg_text, offs[s_idx]))
return segs
def _window_split(body: str, target: int) -> list[str]:
@@ -139,7 +185,7 @@ def build_hier_tree(
chain.append(title)
return " > ".join(chain) if chain else None
for lvl, title, nt, body in segs:
for lvl, title, nt, body, cstart in segs:
norm = 0 if lvl == 0 else min(level_map[lvl], max_depth)
# 부모 = 스택에서 norm 보다 작은 가장 가까운 노드
while stack and stack[-1][0] >= norm:
@@ -147,8 +193,11 @@ def build_hier_tree(
parent_idx = stack[-1][1] if stack else None
idx = len(nodes)
hp = _heading_path(parent_idx, title)
# char_start = 생성 시점 할당(window-split 가 n.text 를 heading 라인으로 truncate 하기 전에 박제).
# split-parent 가 돼도 이 값(heading 라인 offset)이 windowed section 단일 jump target 으로 보존된다.
node = HierNode(idx=idx, parent_idx=parent_idx, level=norm, node_type=nt,
section_title=title, heading_path=hp, text=body, is_leaf=True)
section_title=title, heading_path=hp, text=body, is_leaf=True,
char_start=cstart)
nodes.append(node)
if norm > 0:
stack.append((norm, idx))
@@ -178,14 +227,17 @@ def build_hier_tree(
n.is_leaf = False
heading_line = (n.text.splitlines() or [""])[0]
n.text = heading_line # 중복 저장 회피 (full body 는 window child 가 보유)
n.node_type = (n.node_type or "section") + "_split"
n.node_type = (n.node_type or "section") + "_split" # chapter_split/clause_split/section_split
# n.char_start 보존 = windowed section 의 단일 jump target(생성시점 heading offset).
base_level = min(n.level + 1, max_depth)
for wtext in wins:
ci = len(final)
# window child = char_start None(_window_split 가 whitespace buf 를 drop 해
# char-preserving 이 아니므로 합산 offset 이 거짓; 점프 타깃도 아님, B1/#1).
final.append(HierNode(
idx=ci, parent_idx=n.idx, level=base_level, node_type="window",
section_title=n.section_title, heading_path=n.heading_path,
text=wtext, is_leaf=True))
text=wtext, is_leaf=True, char_start=None))
for n in final:
n.finalize_hash()
return final
@@ -209,6 +261,24 @@ def coverage_stats(text: str, nodes: list[HierNode]) -> dict:
# 일반 네비: 자식 level > 부모 level 만 보장
if n.level <= nodes[n.parent_idx].level and nodes[n.parent_idx].level > 0:
bad_level += 1
# char_start O5 검증 (UTF-16 슬라이스 == heading 라인) + NFC telemetry (g2-t4).
# 검증은 FE 가 실제 쓰는 방식과 동일: md.encode('utf-16-le')[2*cs:2*(cs+n)].decode == heading_line
# (Python code-point 슬라이스 md[cs:cs+n] 가 아님 — astral 시 어긋남).
md_u16 = text.encode("utf-16-le")
cs_total = cs_verified = 0
for n in nodes:
if n.char_start is None:
continue
cs_total += 1
first_line = n.text.split("\n", 1)[0]
nu = _utf16_units(first_line)
seg = md_u16[2 * n.char_start: 2 * (n.char_start + nu)]
try:
if seg.decode("utf-16-le") == first_line:
cs_verified += 1
except UnicodeDecodeError:
pass
non_nfc = 1 if unicodedata.normalize("NFC", text) != text else 0
return {
"nodes": len(nodes), "leaves": len(leaves),
"coverage_ratio": round(leaf_chars / base, 4) if base else 0,
@@ -217,4 +287,6 @@ def coverage_stats(text: str, nodes: list[HierNode]) -> dict:
"level_dist": {l: sum(1 for n in nodes if n.level == l) for l in sorted({n.level for n in nodes})},
"leaf_len_min": min((len(n.text) for n in leaves), default=0),
"leaf_len_max": max((len(n.text) for n in leaves), default=0),
"char_start_total": cs_total, "char_start_verified": cs_verified,
"non_nfc": non_nfc,
}
+3 -3
View File
@@ -58,16 +58,16 @@ async def persist_hier_tree(
INSERT INTO document_chunks
(doc_id, chunk_index, chunk_type, section_title, heading_path, domain_category,
text, embedding, source_type, chunker_version, chunk_content_hash,
parent_id, level, node_type, is_leaf, in_corpus)
parent_id, level, node_type, is_leaf, in_corpus, char_start)
VALUES (:d, :ci, :ct, :stt, :hp, :dc, :tx,
cast(cast(:emb AS text) AS vector),
:src, :cv, :hash, :pid, :lvl, :nt, :leaf, false)
:src, :cv, :hash, :pid, :lvl, :nt, :leaf, false, :cs)
RETURNING id"""), {
"d": doc_id, "ci": base + n.idx, "ct": chunk_type,
"stt": n.section_title, "hp": n.heading_path, "dc": domain_category,
"tx": n.text, "emb": emb_str, "src": SOURCE_TYPE, "cv": CHUNKER_VERSION,
"hash": n.chunk_content_hash, "pid": parent_db, "lvl": n.level,
"nt": n.node_type, "leaf": n.is_leaf})
"nt": n.node_type, "leaf": n.is_leaf, "cs": n.char_start})
idx_to_dbid[n.idx] = db_id
await session.commit()
+3 -15
View File
@@ -1,10 +1,10 @@
"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown + SMTP
"""일일 다이제스트 워커 — PostgreSQL/CalDAV 쿼리 → Markdown 생성
v1 scripts/pkm_daily_digest.py에서 포팅.
DEVONthink/OmniFocus PostgreSQL/CalDAV 쿼리로 전환.
SMTP 발송은 2026-06-10 제거 ( 번도 전달 성공한 없는 기능 폐기 결정).
"""
import os
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from pathlib import Path
@@ -13,7 +13,7 @@ from sqlalchemy import func, select, text
from core.config import settings
from core.database import async_session
from core.utils import send_smtp_email, setup_logger
from core.utils import setup_logger
from models.document import Document
from models.queue import ProcessingQueue
@@ -133,16 +133,4 @@ async def run():
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
# ─── SMTP 발송 ───
smtp_host = os.getenv("MAILPLUS_HOST", "")
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
smtp_user = os.getenv("MAILPLUS_USER", "")
smtp_pass = os.getenv("MAILPLUS_PASS", "")
if smtp_host and smtp_user:
send_smtp_email(
smtp_host, smtp_port, smtp_user, smtp_pass,
f"PKM 다이제스트 — {date_display}",
markdown,
)
logger.info(f"다이제스트 생성 완료: {digest_path}")
+320
View File
@@ -0,0 +1,320 @@
"""fulltext 승격 워커 (A-2 + A-7, plan crawl-24x7-1)
news_collector fulltext_policy='page' 소스의 기사에 enqueue 'fulltext' stage 소비:
기사 페이지 politeness fetch (A-4) 원본 HTML NAS gzip 보존 (A-7)
extract_worker 4-tier 재사용 (tier 2 sibling .md 디스크 원본이 없어 비적용)
extracted_text/md_content 승격 summarize + (30 게이트) embed/chunk enqueue.
실패 처리 ( 어휘 = DB enum, 분기만 워커):
- 일시 오류 (5xx/timeout) : raise 재시도 (max_attempts 3)
- 차단/비대상 (403/429/robots/비HTML/추출부족): RSS 요약으로 격하(degrade) 완료
summarize/embed/chunk enqueue 보장 (기사 유실 0). 격하 사유는 extract_meta.fulltext 기록.
- 영구 실패 (3 소진) : 야간 reconcile_unresolved() summarize 안전망 enqueue
([[feedback_silent_skip_accumulation]] 조건부 skip 영구 침묵으로 누적되지 않게).
승격 게이트: tier 공통 본문 >= 200 (devonagent 달리 tier 4 게이트 적용
페이월/오류 페이지의 nav 찌꺼기를 본문으로 승격하느니 RSS 요약 격하가 낫다).
"""
import gzip
import hashlib
import re
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import exists, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased
from core.config import settings
from core.crawl_politeness import (
CrawlBlocked,
CrawlFetchError,
CrawlSkip,
fetch_page,
fetch_page_via_browser,
probe_session,
)
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import ProcessingQueue, enqueue_stage
from workers.extract_worker import (
_WEB_MIN_BODY_LEN,
_extract_web_with_bs4,
_extract_web_with_readability,
_extract_web_with_trafilatura,
)
logger = setup_logger("fulltext_worker")
# 한국 기사 푸터 1층 후처리 (A-2) — 보수적으로 라인 단위만 제거
_FOOTER_PATTERNS = [
re.compile(r"^.{0,120}(무단\s*전재|무단\s*복제|재배포\s*금지|저작권자\s*[ⓒ©(]).*$", re.M),
re.compile(r"^[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\s*$", re.M), # 단독 이메일 라인
re.compile(r"^\s*\S{2,4}\s*기자\s*$", re.M), # 단독 '◯◯◯ 기자' 라인
]
def _strip_article_footer(body: str) -> str:
for pat in _FOOTER_PATTERNS:
body = pat.sub("", body)
return re.sub(r"\n{3,}", "\n\n", body).strip()
def _extract_body(html_text: str) -> tuple[str, str | None, str | None]:
"""(body, engine, engine_version). 전 tier >= 200자 게이트, 미달이면 ("", None, None)."""
body, ver = _extract_web_with_trafilatura(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "trafilatura", ver
body, ver = _extract_web_with_readability(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "readability", ver
body, ver = _extract_web_with_bs4(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "bs4_text", ver
return "", None, None
def _raw_html_path(source_id: int | None, file_hash: str, now: datetime) -> Path:
"""A-7 원본 보존 경로 — NAS 본진. 한글 디렉토리의 NFC/NFD 비대칭을 피해 source_id 사용.
file_hash DB 컬럼이 character(64) 32 해시가 공백 패딩되어 돌아옴 strip 필수
(미적용 NAS 파일명에 공백 32 = /rsync 함정).
"""
src_dir = f"src_{source_id}" if source_id is not None else "src_unknown"
return (
Path(settings.nas_mount_path) / "crawl_raw" / src_dir
/ now.strftime("%Y-%m") / f"{file_hash.strip()}.html.gz"
)
def _save_raw_html(path: Path, html_text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(path, "wb") as f:
f.write(html_text.encode("utf-8", errors="replace"))
async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None:
"""승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk."""
await enqueue_stage(session, doc.id, "summarize")
published_raw = (doc.extract_meta or {}).get("published_at")
if doc.source_channel == "crawl":
# 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용)
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
return
days_old = 0
if published_raw:
try:
pub_dt = datetime.fromisoformat(published_raw)
days_old = (datetime.now(timezone.utc) - pub_dt).days
except ValueError:
days_old = 0 # 파싱 불가 = 신규 취급 (수집 시점 기본과 동일)
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _set_fulltext_meta(doc: Document, **fields) -> None:
"""extract_meta.fulltext 갱신 — JSONB 변경 감지를 위해 dict 재할당."""
meta = dict(doc.extract_meta or {})
meta["fulltext"] = {**meta.get("fulltext", {}), **fields}
doc.extract_meta = meta
_PROBE_TTL_SECONDS = 6 * 3600 # probe 유효 시간 — 만료 시 배치 경계에서 재검증
async def _auth_session_ready(session: AsyncSession, source: NewsSource) -> tuple[bool, str]:
"""B-3 ② 내용 기반 probe 게이트 + relogin_requested 소비 (수동 half-open).
플래그 소비는 '불가용 스킵' 분기보다 어댑터 틱마다 도달 (r5 데드 버튼 함정 고정).
probe 실패 상태에서는 auth fetch 0 (자동 재시도 루프 = 계정 잠금 직행 B-3 ).
복구 경로 = storage_state 갱신 relogin_requested 플래그 set (수동).
probe 설정은 source.selector_override JSONB: probe_url / min_body_chars / paywall_markers.
"""
from workers.news_collector import _get_or_create_health
health = await _get_or_create_health(session, source.id)
now = datetime.now(timezone.utc)
cfg = source.selector_override or {}
probe_url = cfg.get("probe_url")
force = False
if health.relogin_requested:
health.relogin_requested = False # 소비 = 1회 half-open 시도
health.updated_at = now
force = True
logger.info(f"[fulltext/auth] {source.name} relogin_requested 소비 — half-open probe")
if not force:
if health.last_probe_ok is False:
return False, "probe 실패 상태 (storage_state 갱신 + relogin_requested 대기)"
if (
health.last_probe_ok
and health.last_probe_at
and (now - health.last_probe_at).total_seconds() < _PROBE_TTL_SECONDS
):
return True, ""
if not probe_url:
return False, "selector_override.probe_url 미설정"
result = await probe_session(
source.auth_profile,
probe_url,
int(cfg.get("min_body_chars", 800)),
list(cfg.get("paywall_markers", [])),
)
health.last_probe_at = now
health.last_probe_ok = bool(result.get("ok"))
health.updated_at = now
if not health.last_probe_ok:
logger.warning(f"[fulltext/auth] {source.name} probe 실패: {result.get('reason')}")
return False, str(result.get("reason"))
logger.info(f"[fulltext/auth] {source.name} probe OK ({result.get('body_chars')}자)")
return True, ""
async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None:
"""본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0)."""
_set_fulltext_meta(
doc, status="degraded", reason=reason[:300],
resolved_at=datetime.now(timezone.utc).isoformat(),
)
await _enqueue_downstream(session, doc)
logger.warning(f"[fulltext] doc={doc.id} 격하(RSS 요약 유지): {reason}")
async def process(document_id: int, session: AsyncSession) -> None:
"""기사 1건 풀텍스트 승격. queue_consumer 컨벤션 시그니처 (커밋은 consumer 가)."""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.edit_url:
await _degrade(session, doc, "edit_url 없음")
return
meta = doc.extract_meta or {}
source_id = meta.get("source_id")
# B-3: 구독 소스(auth_profile)는 Playwright 세션 fetch — probe 게이트 선행
source = await session.get(NewsSource, source_id) if source_id else None
auth_profile = source.auth_profile if source is not None else None
if auth_profile:
ready, why = await _auth_session_ready(session, source)
if not ready:
await _degrade(session, doc, f"구독 세션 불가용: {why}")
return
try:
if auth_profile:
html_text, final_url = await fetch_page_via_browser(doc.edit_url, auth_profile)
else:
html_text, final_url = await fetch_page(doc.edit_url)
except (CrawlBlocked, CrawlSkip) as e:
await _degrade(session, doc, f"{type(e).__name__}: {e}")
return
except CrawlFetchError:
raise # 일시 오류 — 큐 재시도
now = datetime.now(timezone.utc)
# A-7: 원본 HTML 보존 (추출기 교체 시 전체 재추출 가능 상태 유지)
raw_path = _raw_html_path(source_id, doc.file_hash, now)
try:
_save_raw_html(raw_path, html_text)
raw_saved = True
except OSError as e:
# NAS 일시 장애 시 보존만 누락하고 승격은 진행 — 사유 기록 (silent 누락 회피)
raw_saved = False
logger.error(f"[fulltext] doc={doc.id} 원본 보존 실패 (승격은 진행): {e}")
body, engine, engine_ver = _extract_body(html_text)
if not engine:
await _degrade(session, doc, f"추출 실패 (전 tier < {_WEB_MIN_BODY_LEN}자)")
return
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
await _degrade(session, doc, "푸터 제거 후 본문 부족")
return
# B-3: 추출 결과도 페이월 마커로 게이트 — probe 통과 후 만료된 세션의
# '페이월 안내문' 본문 승격(silent corruption) 차단 + 즉시 probe 상태 강등
if auth_profile:
from workers.news_collector import _get_or_create_health
markers = (source.selector_override or {}).get("paywall_markers", [])
hit = next((m for m in markers if m and m.lower() in clean_body.lower()), None)
if hit:
health = await _get_or_create_health(session, source.id)
health.last_probe_ok = False
health.updated_at = datetime.now(timezone.utc)
await _degrade(session, doc, f"본문 페이월 마커 검출({hit}) — 세션 손상 의심")
return
title = doc.title or ""
doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body
doc.extracted_at = now
doc.extractor_version = f"rss+page@{engine}"
doc.md_content = clean_body
doc.md_status = "success"
doc.md_extraction_engine = engine
doc.md_extraction_engine_version = engine_ver
doc.md_format_version = "1.0"
doc.md_generated_at = now
doc.md_source_hash = hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest()
doc.md_content_hash = hashlib.sha256(clean_body.encode("utf-8")).hexdigest()
doc.md_extraction_error = None # 수집 시점의 '변환 비대상' 마커 해제
doc.content_origin = "extracted"
doc.file_size = len(doc.extracted_text.encode())
_set_fulltext_meta(
doc, status="promoted", engine=engine,
raw_html_path=str(raw_path) if raw_saved else None,
final_url=final_url, body_chars=len(clean_body),
resolved_at=now.isoformat(),
)
await _enqueue_downstream(session, doc)
logger.info(
f"[fulltext/{engine}] doc={doc.id} {len(clean_body)}자 승격 "
f"(raw={'saved' if raw_saved else 'MISSING'})"
)
async def reconcile_unresolved() -> None:
"""안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌
뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 enqueue 후엔 조건 불일치."""
async with async_session() as session:
# 외부 쿼리 FROM 에 ProcessingQueue 가 이미 있어 alias 없이는 auto-correlation 이
# 서브쿼리 FROM 을 전부 제거 → InvalidRequestError (queue_consumer.reset_stale_items 패턴)
pq = aliased(ProcessingQueue)
summarize_q = (
select(pq.id)
.where(
pq.document_id == Document.id,
pq.stage == "summarize",
)
)
result = await session.execute(
select(Document)
.join(ProcessingQueue, ProcessingQueue.document_id == Document.id)
.where(
ProcessingQueue.stage == "fulltext",
ProcessingQueue.status == "failed",
Document.source_channel == "news",
~exists(summarize_q),
)
.limit(200)
)
docs = result.scalars().unique().all()
for doc in docs:
_set_fulltext_meta(doc, status="failed_reconciled")
await _enqueue_downstream(session, doc)
if docs:
await session.commit()
logger.warning(f"[fulltext] reconcile: 영구 실패 {len(docs)}건 RSS 요약으로 후속 enqueue")
+351
View File
@@ -0,0 +1,351 @@
"""C-2 KOSHA Open API 수집 워커 (plan crawl-24x7-1).
3 API (2026-06-10 실키 live 검증 + fixture 박제 tests/fixtures/kosha_*_response.json):
재해사례 게시판: GET /B552468/disaster_api02/getdisaster_api02 callApiId=1060
재해사례 첨부: GET /B552468/disaster_attach_api02/Disaster_attach_api02 callApiId=1070
KOSHA GUIDE: GET /B552468/koshaguide/getKoshaGuide callApiId=1050
daily 스케줄 1 (main.py):
재해사례 = 최근 페이지만 diff (boardno dedup) 사례 본문 Document(텍스트 네이티브)
+ 첨부 PDF/HWP 다운로드 /documents/crawl_raw/kosha/{boardno}/ 저장
파일 Document + extract enqueue (kordoc HWP/PDF 기존 파이프라인 재사용).
GUIDE = 전체 레지스트리 메타 diff (1039, 100/page = 11 call) 신규/개정만,
일일 ingest cap(기본 25) = backlog 자동 점진 백필(~6) + 부하 평탄화.
cap 으로 미처리 잔량은 매회 로그 (silent cap 금지).
: KOSHA_API_KEY (credentials.env) 공공데이터포털 '인코딩' 키를 그대로 저장.
httpx params= 넘기면 % 재인코딩되므로 반드시 URL 문자열에 직접 결합.
개정 감지: GUIDE dedup = 규정번호+공표일자 같은 번호의 공표일자 = 신규 문서로 적재.
"""
import asyncio
import hashlib
import os
import random
import re
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy import select
from core.config import settings
from core.crawl_politeness import CRAWL_UA
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.news_collector import (
FeedError,
_get_or_create_health,
_record_failure,
_record_success,
)
logger = setup_logger("kosha_collector")
_BASE = "https://apis.data.go.kr/B552468"
_BOARD_EP = f"{_BASE}/disaster_api02/getdisaster_api02"
_ATTACH_EP = f"{_BASE}/disaster_attach_api02/Disaster_attach_api02"
_GUIDE_EP = f"{_BASE}/koshaguide/getKoshaGuide"
_CASE_SOURCE = "KOSHA 재해사례"
_GUIDE_SOURCE = "KOSHA GUIDE"
_CASE_PAGES = 2 # daily diff 범위 (30×2 = 최근 60건 — 등록일 역순 API)
_CASE_ROWS = 30
_GUIDE_ROWS = 100
_GUIDE_DAILY_CAP = int(os.getenv("KOSHA_GUIDE_DAILY_CAP", "25"))
_MAX_FILE_BYTES = 50 * 1024 * 1024
_DOWNLOAD_DELAY = (2.0, 5.0) # portal.kosha.or.kr 파일서버 — 연속 다운로드 간격
def _api_key() -> str:
key = os.getenv("KOSHA_API_KEY", "")
if not key:
raise FeedError("KOSHA_API_KEY 미설정 — KOSHA 수집 불가")
return key
async def _api_get(url: str) -> dict:
"""공통 GET — 게이트웨이/제공자 이중 에러 체계 검사."""
async with httpx.AsyncClient(timeout=25) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"KOSHA API {resp.status_code} @ {url.split('?')[0]}")
try:
payload = resp.json()
except ValueError as e:
# 게이트웨이 에러는 XML/plain 으로 옴 (SERVICE_KEY_IS_NOT_REGISTERED 등)
raise FeedError(f"KOSHA API 비-JSON 응답: {resp.text[:120]}") from e
code = (payload.get("header") or {}).get("resultCode")
if code != "00":
raise FeedError(f"KOSHA API resultCode={code}: {(payload.get('header') or {}).get('resultMsg')}")
return payload
def _items(payload: dict) -> list[dict]:
"""body.items.item — 단건이면 dict 로 오는 data.go.kr 관행 방어."""
item = ((payload.get("body") or {}).get("items") or {}).get("item")
if item is None:
return []
return [item] if isinstance(item, dict) else list(item)
def _safe_filename(name: str) -> str:
"""NAS 파일명 정화 — 경로분리자/제어문자/공백연쇄 제거 (쉘 함정 회피)."""
name = re.sub(r"[/\\\x00-\x1f]", "_", name).strip()
name = re.sub(r"\s+", " ", name)
return name[:140] or "unnamed"
async def _download(url: str, dest: Path) -> int:
"""첨부/규정 파일 다운로드 — 크기 cap + 디렉토리 생성 + 연속 간격."""
await asyncio.sleep(random.uniform(*_DOWNLOAD_DELAY))
async with httpx.AsyncClient(timeout=60, follow_redirects=True) as client:
resp = await client.get(url, headers={"User-Agent": CRAWL_UA})
if resp.status_code != 200:
raise FeedError(f"파일 다운로드 {resp.status_code}: {url}")
if len(resp.content) > _MAX_FILE_BYTES:
raise FeedError(f"파일 크기 초과 ({len(resp.content)} bytes): {url}")
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(resp.content)
return len(resp.content)
async def _get_or_create_source(session, name: str, feed_url: str) -> NewsSource:
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
if source is None:
source = NewsSource(
name=name, feed_url=feed_url, feed_type="rss", fetch_method="api",
fulltext_policy="none", source_channel="crawl", category="Safety",
language="ko", country="KR",
enabled=False, # 6h 뉴스 사이클 비대상 — 본 워커가 daily 폴링
)
session.add(source)
await session.flush()
return source
async def _ingest_attachment(session, boardno: str, filenm: str, filepath: str) -> bool:
"""첨부 1건 → NAS 저장 + 파일 Document + extract enqueue. 반환 = 신규 여부."""
safe = _safe_filename(filenm)
rel_path = f"crawl_raw/kosha/{boardno}/{safe}"
existing = await session.execute(
select(Document).where(Document.file_path == rel_path).limit(1)
)
if existing.scalars().first():
return False
dest = Path(settings.nas_mount_path) / rel_path
size = await _download(filepath, dest)
ext = (safe.rsplit(".", 1)[-1].lower() if "." in safe else "bin")[:10]
doc = Document(
file_path=rel_path,
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
file_format=ext,
file_size=size,
file_type="immutable",
title=safe.rsplit(".", 1)[0],
source_channel="crawl",
data_origin="external",
import_source="kosha_api",
edit_url=filepath,
ai_tags=["Safety/KOSHA재해사례/첨부"],
extract_meta={"kosha": {"boardno": boardno, "kind": "case_attachment"}},
)
session.add(doc)
await session.flush()
# extract → (crawl override) classify → embed/chunk — 기존 파일 파이프라인 재사용
await enqueue_stage(session, doc.id, "extract")
logger.info(f"[kosha] 첨부 ingest: {rel_path} ({size} bytes)")
return True
async def collect_disaster_cases(session) -> int:
"""재해사례 daily diff — 최근 _CASE_PAGES 페이지, boardno dedup."""
key = _api_key()
source = await _get_or_create_source(session, _CASE_SOURCE, _BOARD_EP)
new_count = 0
for page in range(1, _CASE_PAGES + 1):
payload = await _api_get(
f"{_BOARD_EP}?serviceKey={key}&callApiId=1060&pageNo={page}&numOfRows={_CASE_ROWS}"
)
items = _items(payload)
if not items:
break
page_all_dup = True
for item in items:
boardno = str(item.get("boardno") or "").strip()
title = (item.get("keyword") or "").strip()
if not boardno or not title:
continue
fhash = hashlib.sha256(f"kosha-case|{boardno}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if existing.scalars().first():
continue
page_all_dup = False
contents = (item.get("contents") or "").strip()
business = (item.get("business") or "").strip()
now = datetime.now(timezone.utc)
doc = Document(
file_path=f"crawl/{_CASE_SOURCE}/{boardno}",
file_hash=fhash,
file_format="article",
file_size=len(contents.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n[{business}]\n{contents}",
extracted_at=now,
extractor_version="kosha_api",
md_status="skipped",
md_extraction_error="kosha case: 텍스트 네이티브, markdown 변환 비대상",
source_channel="crawl",
data_origin="external",
review_status="approved",
ai_domain="Safety",
ai_sub_group=_CASE_SOURCE,
ai_tags=[f"Safety/KOSHA재해사례/{business or '기타'}"],
extract_meta={
"source_id": source.id,
"source_name": _CASE_SOURCE,
"published_at": None,
"kosha": {"boardno": boardno, "business": business,
"atcflcnt": item.get("atcflcnt")},
},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
new_count += 1
# 첨부 (PDF/HWP) — 본문보다 정보량 큰 정식 사례 보고서
if int(item.get("atcflcnt") or 0) > 0:
attach = await _api_get(
f"{_ATTACH_EP}?serviceKey={key}&callApiId=1070"
f"&pageNo=1&numOfRows=10&boardno={boardno}"
)
for att in _items(attach):
filenm = (att.get("filenm") or "").strip()
filepath = (att.get("filepath") or "").strip()
if not filenm or not filepath.startswith("https://"):
continue
try:
await _ingest_attachment(session, boardno, filenm, filepath)
except FeedError as e:
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
logger.info(f"[kosha] 재해사례 신규 {new_count}")
return new_count
async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
"""GUIDE 레지스트리 전체 메타 diff → 신규/개정만 다운로드 (일일 cap 점진 백필)."""
key = _api_key()
await _get_or_create_source(session, _GUIDE_SOURCE, _GUIDE_EP)
new_specs: list[dict] = []
page, total = 1, None
while True:
payload = await _api_get(
f"{_GUIDE_EP}?serviceKey={key}&callApiId=1050&pageNo={page}&numOfRows={_GUIDE_ROWS}"
)
if total is None:
total = int((payload.get("body") or {}).get("totalCount") or 0)
items = _items(payload)
if not items:
break
for item in items:
no = (item.get("techGdlnNo") or "").strip()
ymd = (item.get("techGdlnOfancYmd") or "").strip()
url = (item.get("fileDownloadUrl") or "").strip()
if not no or not url.startswith("https://"):
continue
fhash = hashlib.sha256(f"kosha-guide|{no}|{ymd}".encode()).hexdigest()[:32]
existing = await session.execute(
select(Document).where(Document.file_hash == fhash).limit(1)
)
if not existing.scalars().first():
new_specs.append({"no": no, "ymd": ymd, "url": url,
"name": (item.get("techGdlnNm") or no).strip(),
"fhash": fhash})
if page * _GUIDE_ROWS >= total:
break
page += 1
todo, deferred = new_specs[:cap], len(new_specs) - min(len(new_specs), cap)
ingested = 0
for spec in todo:
safe_no = _safe_filename(spec["no"])
rel_path = f"crawl_raw/kosha_guide/{safe_no}-{spec['ymd'] or 'nodate'}.pdf"
dest = Path(settings.nas_mount_path) / rel_path
try:
size = await _download(spec["url"], dest)
except FeedError as e:
logger.warning(f"[kosha] GUIDE 다운로드 실패 skip ({spec['no']}): {e}")
continue
doc = Document(
file_path=rel_path,
file_hash=spec["fhash"],
file_format="pdf",
file_size=size,
file_type="immutable",
title=f"{spec['name']} ({spec['no']})",
source_channel="crawl",
data_origin="external",
import_source="kosha_api",
edit_url=spec["url"],
ai_tags=["Safety/KOSHA GUIDE"],
extract_meta={"kosha": {"kind": "guide", "techGdlnNo": spec["no"],
"ofancYmd": spec["ymd"]}},
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
ingested += 1
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
+ (f" (cap {cap}, 잔여 {deferred}건 — 일일 점진 백필)" if deferred > 0 else ""))
return ingested
async def run() -> None:
"""daily 1회 — 소스별 실패 격리 (재해사례 실패가 GUIDE 를 막지 않게)."""
now = datetime.now(timezone.utc)
for name, collector in ((_CASE_SOURCE, collect_disaster_cases),
(_GUIDE_SOURCE, collect_kosha_guide)):
async with async_session() as session:
result = await session.execute(select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
try:
count = await collector(session)
if source is None: # 첫 실행에서 collector 가 생성
result = await session.execute(
select(NewsSource).where(NewsSource.name == name))
source = result.scalars().first()
health = await _get_or_create_health(session, source.id)
_record_success(health, count, False, now)
await session.commit()
except Exception as e:
logger.error(f"[kosha] {name} 수집 실패: {e}")
await session.rollback() # 부분 적재 폐기 후 health 만 기록
if source is not None:
health = await _get_or_create_health(session, source.id)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
if __name__ == "__main__":
asyncio.run(run())
+2 -15
View File
@@ -15,7 +15,7 @@ from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import create_caldav_todo, escape_ical_text, file_hash, send_smtp_email, setup_logger
from core.utils import create_caldav_todo, file_hash, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import enqueue_stage
@@ -337,8 +337,7 @@ def _safe_name(name: str) -> str:
def _send_notifications(law_name: str, proclamation_date: str, revision_type: str):
"""CalDAV + SMTP 알림"""
# CalDAV
"""CalDAV 할일 알림 (SMTP 발송은 2026-06-10 폐기 — CalDAV 가 단일 알림 채널)"""
caldav_url = os.getenv("CALDAV_URL", "")
caldav_user = os.getenv("CALDAV_USER", "")
caldav_pass = os.getenv("CALDAV_PASS", "")
@@ -349,15 +348,3 @@ def _send_notifications(law_name: str, proclamation_date: str, revision_type: st
description=f"공포일자: {proclamation_date}, 개정구분: {revision_type}",
due_days=7,
)
# SMTP
smtp_host = os.getenv("MAILPLUS_HOST", "")
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
smtp_user = os.getenv("MAILPLUS_USER", "")
smtp_pass = os.getenv("MAILPLUS_PASS", "")
if smtp_host and smtp_user:
send_smtp_email(
smtp_host, smtp_port, smtp_user, smtp_pass,
subject=f"[법령 변경] {law_name} ({revision_type})",
body=f"법령명: {law_name}\n공포일자: {proclamation_date}\n개정구분: {revision_type}",
)
+1 -8
View File
@@ -17,7 +17,7 @@ from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash, send_smtp_email, setup_logger
from core.utils import file_hash, setup_logger
from models.automation import AutomationState
from models.document import Document
from models.queue import enqueue_stage
@@ -201,11 +201,4 @@ async def run():
await session.commit()
# SMTP 알림
smtp_host = os.getenv("MAILPLUS_HOST", "")
smtp_port = int(os.getenv("MAILPLUS_SMTP_PORT", "465"))
if archived and smtp_host:
body = f"이메일 {len(archived)}건 수집 완료:\n\n" + "\n".join(f"- {s}" for s in archived)
send_smtp_email(smtp_host, smtp_port, user, password, "PKM 이메일 수집 알림", body)
logger.info(f"이메일 {len(archived)}건 수집 완료 (max_uid={max_uid})")
+77 -6
View File
@@ -394,13 +394,29 @@ async def _process_office(
partial arm PDF split 전용 office 이진이라 여기 없음. 'completed' A-3 직렬화 전용(워커 미사용).
quality content-type-aware: office=scored(_compute_quality). 동기 변환은 to_thread event loop 비차단.
"""
from workers.office_md import OfficeMdError, convert_office_to_md
from workers.office_md import (
OfficeMdError,
convert_hwp_to_md_and_images,
convert_office_to_md,
)
is_hwp = Path(container_path).suffix.lower() in (".hwp", ".hwpx")
engine = "libreoffice_hwp" if is_hwp else "markitdown"
suffix = Path(container_path).suffix.lower()
if suffix == ".hwp":
engine = "pyhwp" # HWP5 binary: libhwplo 못 읽어 pyhwp 로 교체(2026-06-09)
elif suffix == ".hwpx":
engine = "libreoffice_hwp" # HWPX 는 pyhwp 미지원 → LibreOffice 폴백
else:
engine = "markitdown"
hwp_images: list[dict[str, Any]] = []
try:
# 동기 subprocess(LibreOffice)/markitdown — 스레드로 빼서 이벤트 루프 비차단.
md_content = await asyncio.to_thread(convert_office_to_md, container_path)
# 동기 subprocess/markitdown — 스레드로 빼서 이벤트 루프 비차단.
if suffix == ".hwp":
md_content, hwp_images = await asyncio.to_thread(
convert_hwp_to_md_and_images, container_path
)
else:
md_content = await asyncio.to_thread(convert_office_to_md, container_path)
except OfficeMdError as exc:
logger.warning(f"[marker] office md 변환 실패 id={document_id} engine={engine}: {exc}")
await _fail(session, document_id, f"office_md: {str(exc)[:990]}", engine=engine)
@@ -410,8 +426,49 @@ async def _process_office(
await _fail(session, document_id, f"office_md_unexpected: {str(exc)[:980]}", engine=engine)
return
# ---- 이미지 NAS persist (.hwp 전용) ----
# hwp5html 은 bindata raster 를 추출하나 본문 xhtml 에 <img> 앵커가 없어(orphan, --css/--html
# 동일) 인라인 위치 복원 불가 → marker(PDF) 의 _persist_images_to_nas 로 NAS 영속 후 md 말미
# 갤러리로 부착(docimg: ref = 뷰어 해석). OLE 수식/도형은 앵커도 raster 도 아니라 제외.
# docx/xlsx/pptx/hwpx 는 이미지 미처리(기존 동작 유지).
saved_images: list[dict[str, Any]] = []
orphan_paths: list[str] = []
if suffix == ".hwp" and MARKDOWN_IMAGE_PERSIST:
if hwp_images:
images_resp = [
{
"bytes_b64": base64.b64encode(im["data"]).decode("ascii"),
"format": im.get("format") or "png",
"slug": "",
"width": None,
"height": None,
}
for im in hwp_images
]
try:
saved_images = _persist_images_to_nas(document_id, images_resp)
except OSError as exc:
# NAS 일시 끊김 등 — transient. queue retry 로 복구.
logger.warning(
f"[marker] hwp image persist NAS write failed id={document_id}: "
f"{type(exc).__name__}: {exc}"
)
raise
if saved_images:
gallery = "\n\n## 첨부 이미지\n\n" + "\n\n".join(
f"![](docimg:{img['image_key']})" for img in saved_images
)
md_content = md_content + gallery
# 재변환 시 현재 saved_images 기준으로 과거 document_images row/NAS 파일 정리.
orphan_paths = await _sync_document_images(
session, document_id, saved_images, {"engine": engine}
)
# 성공 — 계약상 md_content 는 비공백(빈출력은 raise). quality scored.
quality = _compute_quality(md_content, doc.extracted_text or "", {"page_count": None})
if saved_images:
quality.setdefault("warnings", []).append(f"hwp_images_appended:{len(saved_images)}")
await session.execute(
update(Document).where(Document.id == document_id).values(
md_content=md_content,
@@ -429,7 +486,21 @@ async def _process_office(
)
)
await session.commit()
logger.info(f"[marker] office success id={document_id} engine={engine} len={len(md_content)}")
# commit 후 고아 NAS 파일 unlink (best-effort, 실패해도 DB 정합 유지).
for orphan_path in orphan_paths:
try:
Path(orphan_path).unlink(missing_ok=True)
except Exception as exc:
logger.warning(
f"[marker] orphan image unlink failed id={document_id} path={orphan_path}: "
f"{type(exc).__name__}: {exc}"
)
logger.info(
f"[marker] office success id={document_id} engine={engine} "
f"len={len(md_content)} images={len(saved_images)}"
)
async def _process_split(
+476 -72
View File
@@ -1,20 +1,30 @@
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장"""
"""뉴스 수집 워커 — RSS/API에서 기사 수집, documents에 저장
plan crawl-24x7-1 A그룹 (2026-06-10):
A-1 조건부 GET(ETag/Last-Modified 그대로 재전송) + 콘텐츠 해시 변경감지
A-2 fulltext_policy='page' 소스는 'fulltext' stage 본문 승격 위임
A-5 source_health 기록 + circuit breaker (소스별 실패 격리)
A-6 first-wins + 포털 전재 2 dedup (제목+최근 3, 12 이상 제목 한정)
"""
import asyncio
import hashlib
import re
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from html import unescape
from urllib.parse import urlparse, urlunparse
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
import feedparser
import httpx
from sqlalchemy import select
from core.crawl_politeness import CRAWL_UA
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from models.source_health import SourceHealth
logger = setup_logger("news_collector")
@@ -26,6 +36,7 @@ CATEGORY_MAP = {
"환경": "Environment", "기술": "Technology",
# 영어
"World": "International", "International": "International",
"World news": "International", # Guardian sectionName (B-2)
"Technology": "Technology", "Tech": "Technology", "Sci-Tech": "Technology",
"Arts": "Culture", "Culture": "Culture",
"Climate": "Environment", "Environment": "Environment",
@@ -35,27 +46,50 @@ CATEGORY_MAP = {
"Kultur": "Culture", "Wissenschaft": "Technology",
# 프랑스어
"Environnement": "Environment",
# 도메인 채널 (source_channel='crawl', 0-5 (a)) — 양쪽 공통 맵
"안전": "Safety", "Safety": "Safety",
"공학": "Engineering", "Engineering": "Engineering",
"철학": "Philosophy", "Philosophy": "Philosophy",
}
class FeedError(Exception):
"""소스 단위 fetch/parse 실패 — run() 이 source_health 실패로 기록."""
def _normalize_category(raw: str) -> str:
"""카테고리 표준화"""
return CATEGORY_MAP.get(raw, CATEGORY_MAP.get(raw.strip(), "Other"))
def _clean_html(text: str) -> str:
"""HTML 태그 제거 + 정제"""
def _clean_html(text: str, max_len: int | None = 1000) -> str:
"""HTML 태그 제거 + 정제. max_len=None 이면 절단 없음 (feed-full 전문용)."""
if not text:
return ""
text = re.sub(r"<[^>]+>", "", text)
text = unescape(text)
return text.strip()[:1000]
text = text.strip()
return text if max_len is None else text[:max_len]
# tracking 파라미터 판별 — prefix(utm_/at_=BBC/ns_=BBC/mc_=mailchimp) + 단독 키
_TRACKING_PREFIXES = ("utm_", "at_", "ns_", "mc_")
_TRACKING_PARAMS = {"fbclid", "gclid", "igshid", "ref", "smid", "partner", "cmp", "ocid", "ftag"}
def _normalize_url(url: str) -> str:
"""URL 정규화 (tracking params 제거)"""
"""URL 정규화 tracking 파라미터만 제거, 콘텐츠 식별 파라미터는 보존.
query 전체 제거 금지: hada.io/topic?id= · aitimes articleView.html?idxno= ·
HN item?id= query-식별 사이트에서 별개 기사가 같은 URL 붕괴된다.
저장(edit_url)·조회 양쪽이 함수를 공유해야 dedup 성립.
"""
parsed = urlparse(url)
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", "", ""))
kept = [
(k, v) for k, v in parse_qsl(parsed.query, keep_blank_values=True)
if not (k.lower().startswith(_TRACKING_PREFIXES) or k.lower() in _TRACKING_PARAMS)
]
return urlunparse((parsed.scheme, parsed.netloc, parsed.path, "", urlencode(kept), ""))
def _article_hash(title: str, published: str, source_name: str) -> str:
@@ -73,8 +107,104 @@ def _normalize_to_utc(dt) -> datetime:
return datetime.now(timezone.utc)
# ── A-5: circuit breaker 정책 ──
# 연속 실패 >= OPEN 임계 → open (재시도 간격 지수 확대, 6h × 2^n, cap 48h)
# 연속 실패 > DISABLE 임계 → disabled (수집 제외 + 가시 로그, 수동 복구 대상)
# news_sources.enabled 는 건드리지 않는다 — 사용자 의도(enabled)와 자동 상태(circuit) 분리.
_CIRCUIT_OPEN_AFTER = 3
_CIRCUIT_DISABLE_AFTER = 10
_BACKOFF_BASE_HOURS = 6
_BACKOFF_CAP_HOURS = 48
_EMPTY_STREAK_ALERT = 8 # 6h 사이클 × 8 = 약 2일 연속 빈 피드 → 가시 경고
def _should_attempt(health: SourceHealth, now: datetime) -> bool:
"""circuit 상태에 따라 이번 사이클 fetch 여부 결정.
주의 (B-3 계약 , r5): 추후 relogin_requested 플래그 소비는 반드시
open-스킵 분기보다 ** 두어야 한다 open 스케줄 제외 형태가 되면
배치 경계가 플래그가 영원히 미소비(half-open 데드 버튼) 된다.
"""
if health.circuit_state == "disabled":
return False
if health.circuit_state == "open" and health.last_error_at is not None:
over = max(health.consecutive_failures - _CIRCUIT_OPEN_AFTER, 0)
backoff_h = min(_BACKOFF_BASE_HOURS * (2 ** over), _BACKOFF_CAP_HOURS)
if now - health.last_error_at < timedelta(hours=backoff_h):
return False
return True
def _record_success(health: SourceHealth, items: int, not_modified: bool, now: datetime) -> None:
health.consecutive_failures = 0
health.total_fetches += 1
health.last_success_at = now
health.last_fetch_items = items
if health.circuit_state != "closed":
logger.info(f"[health] source={health.source_id} circuit {health.circuit_state}→closed")
health.circuit_state = "closed"
health.circuit_opened_at = None
# 빈 피드 streak: 304/해시동일은 정상 신호라 미집계, 200+entries 0 만 집계 (피드 부패 감시)
if not_modified:
pass
elif items == 0:
health.empty_streak += 1
if health.empty_streak >= _EMPTY_STREAK_ALERT:
logger.error(
f"[health] source={health.source_id} 빈 피드 {health.empty_streak}회 연속 "
f"— 피드 부패 의심 (RSSHub 류 라우트 깨짐 패턴)"
)
else:
health.empty_streak = 0
health.updated_at = now
def _record_failure(health: SourceHealth, error: str, now: datetime) -> None:
health.consecutive_failures += 1
health.total_fetches += 1
health.total_failures += 1
health.last_error = error[:500]
health.last_error_at = now
health.updated_at = now
cf = health.consecutive_failures
if cf > _CIRCUIT_DISABLE_AFTER and health.circuit_state != "disabled":
health.circuit_state = "disabled"
logger.error(
f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit DISABLED "
f"(수집 제외, A-8 패널에서 수동 복구 필요)"
)
elif cf >= _CIRCUIT_OPEN_AFTER and health.circuit_state == "closed":
health.circuit_state = "open"
health.circuit_opened_at = now
logger.warning(f"[health] source={health.source_id} 연속 실패 {cf}회 — circuit open")
async def _get_or_create_health(session, source_id: int) -> SourceHealth:
result = await session.execute(
select(SourceHealth).where(SourceHealth.source_id == source_id)
)
health = result.scalars().first()
if health is None:
health = SourceHealth(source_id=source_id)
session.add(health)
await session.flush()
return health
# 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일
# 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT
# → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄.
_run_lock = asyncio.Lock()
async def run():
"""뉴스 수집 실행"""
async with _run_lock:
await _run_locked()
async def _run_locked():
now = datetime.now(timezone.utc)
async with async_session() as session:
result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True)
@@ -87,17 +217,23 @@ async def run():
total = 0
for source in sources:
health = await _get_or_create_health(session, source.id)
if not _should_attempt(health, now):
logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip")
continue
try:
if source.feed_type == "api":
count = await _fetch_api(session, source)
count, status = await _fetch_api(session, source)
else:
count = await _fetch_rss(session, source)
count, status = await _fetch_rss(session, source)
source.last_fetched_at = datetime.now(timezone.utc)
_record_success(health, count, status == "not_modified", now)
total += count
except Exception as e:
logger.error(f"[{source.name}] 수집 실패: {e}")
source.last_fetched_at = datetime.now(timezone.utc)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
@@ -108,8 +244,83 @@ ALLOWED_CONTENT_TYPES = ("application/rss+xml", "application/atom+xml",
"application/xml", "text/xml")
async def _fetch_rss(session, source: NewsSource) -> int:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한"""
async def _is_portal_duplicate(session, title: str) -> bool:
"""A-6 2차 dedup: 포털 전재본 vs 원본이 다른 URL 로 이중 적재되는 케이스.
보조 = 제목 + 최근 3 (다른 소스/다른 URL 이므로 1 키로 잡힘).
범용 제목 오탐 방지: 12 미만 제목은 비적용. skip 전부 로그 (silent 누락 회피).
"""
if len(title) < 12:
return False
cutoff = datetime.now(timezone.utc) - timedelta(days=3)
dup = await session.execute(
select(Document.id).where(
Document.title == title,
Document.source_channel == "news",
Document.file_format == "article",
Document.extracted_at >= cutoff,
).limit(1)
)
return dup.scalars().first() is not None
async def _enqueue_processing(session, doc: Document, source: NewsSource, pub_dt: datetime) -> None:
"""후속 단계 enqueue.
fulltext_policy='page' 소스는 'fulltext' stage summarize/embed/chunk
fulltext_worker 승격(또는 격하) 확정 enqueue (RSS 요약 선요약 풀텍스트
도착 summarize_worker '이미 요약 있음 skip' 막히는 순서 함정 회피).
"""
if source.fulltext_policy == "page" and doc.edit_url:
await enqueue_stage(session, doc.id, "fulltext")
return
await enqueue_stage(session, doc.id, "summarize")
if source.source_channel == "crawl":
# 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용)
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
return
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _build_extract_meta(source: NewsSource, pub_dt: datetime) -> dict:
"""fulltext_worker / 패널이 쓰는 출처 메타 (documents 에 source FK 가 없어 여기 기록)."""
return {
"source_id": source.id,
"source_name": source.name,
"published_at": pub_dt.isoformat(),
}
def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict:
"""채널별 문서 정체성 — news 채널은 기존 값 그대로(무회귀), crawl 채널은 도메인 정체성.
file_path 접두사가 채널 디렉토리. ai_domain 다이제스트/검색 필터의 분기 축이라
crawl 채널이 'News' 오염시키지 않게 분리 (0-5 채널 레벨 분리 사상).
"""
if source.source_channel == "crawl":
domain = category if category and category != "Other" else "Domain"
return {
"path_prefix": "crawl",
"ai_domain": domain,
"ai_tags": [f"{domain}/{source_short}"],
}
return {
"path_prefix": "news",
"ai_domain": "News",
"ai_tags": [f"News/{source_short}/{category}"],
}
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
반환 (신규 건수, 상태). 상태 'not_modified' = 304 또는 콘텐츠 해시 동일.
소스 단위 실패는 FeedError raise run() health 실패로 기록.
"""
from urllib.parse import urljoin
from core.url_validator import validate_feed_url, HTTP_EXCEPTION_DOMAINS
@@ -120,51 +331,79 @@ async def _fetch_rss(session, source: NewsSource) -> int:
# 순수 HTTP 소스인데 allowlist에 없으면 차단
if source.feed_url.startswith("http://") and not http_allowed:
logger.error(f"[{source.name}] HTTP 차단 (allowlist 미등록): {source_hostname}")
return 0
raise FeedError(f"HTTP 차단 (allowlist 미등록): {source_hostname}")
# fetch 전 URL 재검증 (등록 이후 DNS 변경 대비)
try:
validate_feed_url(source.feed_url, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] URL 검증 실패: {e}")
return 0
raise FeedError(f"URL 검증 실패: {e}") from e
async with httpx.AsyncClient(timeout=10, follow_redirects=False) as client:
# A-1: 정직 UA + 조건부 GET — 서버가 준 워터마크를 받은 그대로 재전송
headers = {"User-Agent": CRAWL_UA}
if source.etag:
headers["If-None-Match"] = source.etag
if source.last_modified:
headers["If-Modified-Since"] = source.last_modified
async with httpx.AsyncClient(
timeout=10, follow_redirects=False, headers=headers
) as client:
resp = await client.get(source.feed_url)
# redirect 수동 처리 (최대 3회, 각 target 재검증)
# 304 는 redirect 처리보다 먼저 — httpx 의 is_redirect 는 3xx 전체(304 포함)에
# True 라, 304 를 redirect 로 오인하면 location 없는 같은 URL 을 재요청해
# "redirect 3회 초과" 로 오류 처리됨(조건부 GET 안정 피드 전멸 버그).
if resp.status_code == 304:
logger.info(f"[{source.name}] 304 Not Modified — 본문 미전송")
return 0, "not_modified"
# redirect 수동 처리 (최대 3회, 각 target 재검증) — location 있는 진짜 redirect 만.
# allowlist 도메인이면 redirect target의 HTTP도 허용
redirects = 0
while resp.is_redirect and redirects < 3:
location = resp.headers.get("location", "")
location = urljoin(str(resp.request.url), location)
while resp.has_redirect_location and redirects < 3:
location = urljoin(str(resp.request.url), resp.headers["location"])
try:
validate_feed_url(location, allow_http=http_allowed)
except ValueError as e:
logger.error(f"[{source.name}] redirect target 차단: {e}")
return 0
raise FeedError(f"redirect target 차단: {e}") from e
resp = await client.get(location)
if resp.status_code == 304:
logger.info(f"[{source.name}] 304 Not Modified (redirect 후) — 본문 미전송")
return 0, "not_modified"
redirects += 1
if resp.is_redirect:
logger.error(f"[{source.name}] redirect 3회 초과")
return 0
if resp.has_redirect_location:
raise FeedError("redirect 3회 초과")
resp.raise_for_status()
if len(resp.content) > MAX_RESPONSE_SIZE:
logger.warning(f"[{source.name}] 응답 크기 초과: {len(resp.content)} bytes")
return 0
raise FeedError(f"응답 크기 초과: {len(resp.content)} bytes")
ct = resp.headers.get("content-type", "").lower()
if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
logger.warning(f"[{source.name}] 비정상 content-type: {ct}")
return 0
raise FeedError(f"비정상 content-type: {ct}")
# A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상
# 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전
new_etag = resp.headers.get("etag")
new_last_modified = resp.headers.get("last-modified")
content_hash = hashlib.sha256(resp.content).hexdigest()
if source.feed_content_hash == content_hash:
logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip")
return 0, "not_modified"
feed = feedparser.parse(resp.text)
if feed.bozo and not feed.entries:
logger.warning(f"[{source.name}] RSS 파싱 실패: {feed.bozo_exception}")
return 0
raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}")
# A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면
# 이후 304 로 영구 skip 되는 silent corruption 차단
if new_etag:
source.etag = new_etag
if new_last_modified:
source.last_modified = new_last_modified
source.feed_content_hash = content_hash
count = 0
for entry in feed.entries:
@@ -176,67 +415,226 @@ async def _fetch_rss(session, source: NewsSource) -> int:
if not summary:
summary = title
# A-6: feed-full 소스만 피드 본문을 전문으로 신뢰 (truncate·광고 삽입이 흔해
# 일반 소스의 summary/content:encoded 를 전문으로 오인 저장 금지)
body = summary
is_feed_full = False
if source.fulltext_policy == "feed-full":
content_list = entry.get("content") or []
raw_body = content_list[0].get("value", "") if content_list else ""
full_body = _clean_html(raw_body or entry.get("summary", ""), max_len=None)
if len(full_body) > len(summary):
body = full_body
is_feed_full = True
link = entry.get("link", "")
# B-5 quirk: 비디오 항목 필터 (Aeon/Psyche — 텍스트 코퍼스에 비디오 페이지 무가치)
if source.parser_quirk == "skip-video" and re.search(r"/videos?/", link):
continue
published = entry.get("published_parsed") or entry.get("updated_parsed")
pub_dt = datetime(*published[:6], tzinfo=timezone.utc) if published else datetime.now(timezone.utc)
# 중복 체크
# 중복 체크 — 레거시 행은 raw URL 로 저장돼 있어 normalized/raw 양쪽 매칭.
# 교차 게시(같은 기사가 두 피드에 존재)로 2행 이상 매칭될 수 있어 first() 사용
# (scalar_one_or_none 은 MultipleResultsFound raise — 2026-06 BBC 수집 중단 원인).
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url == normalized_url)
)
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalar_one_or_none():
if existing.scalars().first():
continue
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(source.category or "")
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"news/{source.name}/{article_id}",
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version="rss",
source_channel="news",
extractor_version="rss-feed-full" if is_feed_full else "rss",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=link,
# 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화
edit_url=normalized_url,
review_status="approved",
ai_domain="News",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
ai_tags=ident["ai_tags"],
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
# summarize + embed + chunk 등록 (classify 불필요)
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
# summarize + embed + chunk 등록 (classify 불필요).
# page 정책 소스는 fulltext 만 — 후속은 fulltext_worker 가 확정 후 enqueue.
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] RSS → {count}건 수집")
return count
return count, "ok"
async def _fetch_api(session, source: NewsSource) -> int:
async def _fetch_api(session, source: NewsSource) -> tuple[int, str]:
"""API 소스 디스패치 — feed_url 호스트로 제공자 판별 (B-2).
레거시 NYT (feed_url=api.nytimes.com) 무변경 경로. 신규 제공자는 호스트 분기 추가.
미지의 호스트 = NYT 경로로 넘기지 않고 명시 실패 (silent fallback 금지).
"""
host = (urlparse(source.feed_url).hostname or "").lower()
if host.endswith("guardianapis.com"):
return await _fetch_api_guardian(session, source)
if host.endswith("nytimes.com"):
return await _fetch_api_nyt(session, source)
raise FeedError(f"API 제공자 미등록 호스트: {host} — 디스패치 분기 추가 필요")
def _guardian_request(feed_url: str, api_key: str) -> tuple[str, dict]:
"""Guardian 호출 형태 단일 source-of-truth — fixture 회귀 테스트 대상
(tests/fixtures/guardian_open_platform_search_response.json 박제 호출과 동일해야 )."""
parsed = urlparse(feed_url)
params = {
**dict(parse_qsl(parsed.query)),
"show-fields": "bodyText,trailText",
"page-size": "20",
"order-by": "newest",
"api-key": api_key,
}
return f"{parsed.scheme}://{parsed.netloc}{parsed.path}", params
async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
"""Guardian Open Platform 수집 (B-2) — show-fields=bodyText 로 정식 전문 JSON.
feed_url section 쿼리를 박아 등록 (: https://content.guardianapis.com/search?section=world).
전문이 API 오므로 fulltext stage 불요. 미설정 = FeedError (health 실패 기록,
silent fallback 없음 [[feedback_no_silent_fallback_explicit_opt_in]]).
"""
import os
api_key = os.getenv("GUARDIAN_API_KEY", "")
if not api_key:
raise FeedError("GUARDIAN_API_KEY 미설정 — Guardian 수집 불가")
endpoint, params = _guardian_request(source.feed_url, api_key)
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(endpoint, params=params)
resp.raise_for_status()
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path 까지만 로깅 (NYT 와 동일 규율)
safe_url = str(e.request.url).split("?")[0]
raise FeedError(f"Guardian API 실패: {e.response.status_code} @ {safe_url}") from e
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
raise FeedError(f"Guardian API 연결 실패: {safe_url}") from e
payload = resp.json().get("response", {})
if payload.get("status") != "ok":
raise FeedError(f"Guardian API status={payload.get('status')}")
count = 0
for item in payload.get("results", []):
title = (item.get("webTitle") or "").strip()
if not title:
continue
fields = item.get("fields") or {}
body_text = (fields.get("bodyText") or "").strip()
trail = _clean_html(fields.get("trailText") or "")
# bodyText = plain text 전문 (HTML 정화 불요). 짧으면(라이브 블로그 잔재 등) trail 격하.
is_full = len(body_text) >= 200
body = body_text if is_full else (trail or title)
link = item.get("webUrl", "")
pub_str = item.get("webPublicationDate", "")
try:
pub_dt = datetime.fromisoformat(pub_str.replace("Z", "+00:00"))
except (ValueError, AttributeError):
pub_dt = datetime.now(timezone.utc)
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(item.get("sectionName", source.category or ""))
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version="guardian_api_full" if is_full else "guardian_api",
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count, "ok"
async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
"""NYT API 수집 — 키 마스킹 + health degradation"""
import os
nyt_key = os.getenv("NYT_API_KEY", "")
if not nyt_key:
logger.error("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
return 0
raise FeedError("NYT_API_KEY 미설정 — US 뉴스 수집 불가")
try:
async with httpx.AsyncClient(timeout=10) as client:
@@ -248,12 +646,10 @@ async def _fetch_api(session, source: NewsSource) -> int:
except httpx.HTTPStatusError as e:
# 쿼리스트링(api-key 포함) 제거 — path까지만 로깅
safe_url = str(e.request.url).split("?")[0]
logger.error(f"NYT API 실패: {e.response.status_code} @ {safe_url}")
return 0
raise FeedError(f"NYT API 실패: {e.response.status_code} @ {safe_url}") from e
except httpx.RequestError as e:
safe_url = str(e.request.url).split("?")[0] if e.request else "unknown"
logger.error(f"NYT API 연결 실패: {safe_url}")
return 0
raise FeedError(f"NYT API 연결 실패: {safe_url}") from e
data = resp.json()
count = 0
@@ -277,20 +673,26 @@ async def _fetch_api(session, source: NewsSource) -> int:
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url == normalized_url)
)
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalar_one_or_none():
if existing.scalars().first():
continue
if await _is_portal_duplicate(session, title):
logger.info(f"[{source.name}] portal-dup skip: {title[:60]}")
continue
category = _normalize_category(article.get("section", source.category or ""))
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"news/{source.name}/{article_id}",
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
@@ -299,24 +701,26 @@ async def _fetch_api(session, source: NewsSource) -> int:
extracted_text=f"{title}\n\n{summary}",
extracted_at=datetime.now(timezone.utc),
extractor_version="nyt_api",
source_channel="news",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=link,
edit_url=normalized_url,
review_status="approved",
ai_domain="News",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=[f"News/{source_short}/{category}"],
ai_tags=ident["ai_tags"],
extract_meta=_build_extract_meta(source, pub_dt),
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "summarize")
days_old = (datetime.now(timezone.utc) - pub_dt).days
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
await _enqueue_processing(session, doc, source, pub_dt)
count += 1
logger.info(f"[{source.name}] API → {count}건 수집")
return count
return count, "ok"
+102 -5
View File
@@ -5,9 +5,11 @@
전략 (하이브리드):
- OOXML(.docx/.xlsx/.pptx) markitdown 신규 의존성(pip install markitdown). lazy import.
- .hwp/.hwpx LibreOffice(headless) HTML markdownify markdownify 기존 의존성.
(LibreOffice hwp import 필터 보유. .hwpx .hwp 다른 필터·버전 의존 E-1: prod LibreOffice
버전핀 안전컨텍스트에서 PoC 실행. fidelity 진짜 리스크 하니스가 측정.)
- .hwp(HWP5 binary) pyhwp hwp5html HTML markdownify pyhwp+six 의존성.
(2026-06-09: LibreOffice 번들 libhwplo 필터가 실제 한컴 HWP5 파일을 읽어 rc=0 + 'source file
could not be loaded' 로 전건 실패 → 순수 Python HWP5 전용 변환기 pyhwp 로 교체.)
- .hwpx LibreOffice(headless) HTML markdownify markdownify 기존 의존성.
(HWPX(zip) pyhwp 미지원 LibreOffice 폴백 유지. 현재 코퍼스는 전부 HWP5 binary.)
실패 계약 (C-5 postcondition backend 절반):
변환 실패· 출력·타임아웃·의존성 부재 OfficeMdError raise 한다.
@@ -18,6 +20,7 @@
from __future__ import annotations
import os
import re
import shutil
import subprocess
import tempfile
@@ -34,6 +37,13 @@ _MIN_BODY_CHARS = 16
# 이름) → 기본값 정합. soffice 만 있는 환경은 LIBREOFFICE_BIN 으로 override.
_SOFFICE_BIN = os.environ.get("LIBREOFFICE_BIN", "libreoffice")
# pyhwp 콘솔 스크립트(pip install pyhwp 시 PATH 등록). HWP5 binary(.hwp) 전용.
_HWP5HTML_BIN = os.environ.get("HWP5HTML_BIN", "hwp5html")
# hwp5html 이 bindata/ 로 추출하는 첨부물 중 NAS 영속 대상 raster 확장자.
# (OLE 수식/도형은 index.xhtml 에 앵커가 없어 위치 복원 불가 → 영속 제외.)
_RASTER_EXTS = {"jpg", "jpeg", "png", "gif", "bmp"}
class OfficeMdError(Exception):
"""office/hwp → md 변환 실패 신호. 호출부는 md_status='failed' 로 라우팅."""
@@ -50,7 +60,9 @@ def convert_office_to_md(path: str | Path, *, timeout: int = 90) -> str:
if suffix in OOXML_FORMATS:
md = _via_markitdown(p)
else: # .hwp / .hwpx
elif suffix == ".hwp":
md = _via_pyhwp_html(p, timeout=timeout)
else: # .hwpx (pyhwp 미지원 → LibreOffice 폴백)
md = _via_libreoffice_html(p, timeout=timeout)
md = (md or "").strip()
@@ -74,8 +86,93 @@ def _via_markitdown(path: Path) -> str:
return getattr(result, "text_content", "") or ""
def _run_hwp5html(path: Path, *, timeout: int) -> tuple[str, list[dict]]:
"""HWP5 binary(.hwp) → (markdown, raster_images). hwp5html 1회 실행 = md + 이미지 동시 추출.
LibreOffice 번들 libhwplo 필터가 실제 한컴 HWP5 파일을 읽어(rc=0 + 'source file could
not be loaded') 전건 실패 → 순수 Python HWP5 전용 변환기 pyhwp(CLI hwp5html)로 교체.
`_via_libreoffice_html` 동일한 실패 계약(rc0 또는 출력 부재 OfficeMdError raise).
raster_images = [{'data': bytes, 'format': 'jpeg'|'png'|...}] bindata/ 래스터만.
hwp5html 이미지를 본문 xhtml <img> 앵커하지 않으므로(bindata orphan, --css/--html 동일)
인라인 위치는 복원 불가 호출부가 NAS 영속 말미 갤러리로 부착한다.
"""
try:
from markdownify import markdownify # 기존 의존성
except ImportError as e: # noqa: BLE001
raise OfficeMdError("markdownify 미설치(기존 의존성이어야 함)") from e
with tempfile.TemporaryDirectory(prefix="office_md_hwp_") as tmp:
outdir = Path(tmp)
# hwp5html --output <dir> <file.hwp> → <dir>/index.xhtml + styles.css + bindata/
cmd = [_HWP5HTML_BIN, "--output", str(outdir), str(path)]
try:
proc = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout, check=False
)
except FileNotFoundError as e:
raise OfficeMdError(
f"pyhwp(hwp5html) 바이너리 부재({_HWP5HTML_BIN}) — `pip install pyhwp six` 필요"
) from e
except subprocess.TimeoutExpired as e:
raise OfficeMdError(f"pyhwp 변환 타임아웃({timeout}s): {path.name}") from e
index_path = outdir / "index.xhtml"
if proc.returncode != 0 or not index_path.exists():
raise OfficeMdError(
f"pyhwp html 변환 실패: {path.name} (rc={proc.returncode}): "
f"{(proc.stderr or proc.stdout or '').strip()[:300]}"
)
html = index_path.read_text(encoding="utf-8", errors="replace")
# hwp5html 의 xhtml 은 최상단 <?xml ...?> 선언을 가짐(LibreOffice 의 .html 경로엔 없음).
# markdownify 의 html.parser 가 이를 PI 텍스트('xml version="1.0" encoding="utf-8"?')로
# 본문에 흘려 (1) md 최상단 잡음·검색/청크 오염, (2) 빈 body 셸일 때 그 ~34자가
# _MIN_BODY_CHARS(16) 빈출력 게이트를 무력화(빈 변환의 false-success) → markdownify 전에 제거.
html = re.sub(r"^\s*<\?xml[^>]*\?>\s*", "", html)
# 표 보존 위해 markdownify 가 table 을 GFM 으로 — heading_style ATX (libreoffice 경로와 동일).
md = markdownify(html, heading_style="ATX", strip=["span", "font"])
images: list[dict] = []
bindata = outdir / "bindata"
if bindata.is_dir():
for f in sorted(bindata.iterdir()):
ext = f.suffix.lower().lstrip(".")
if ext in _RASTER_EXTS:
images.append({
"data": f.read_bytes(),
"format": "jpeg" if ext == "jpg" else ext,
})
return md, images
def _via_pyhwp_html(path: Path, *, timeout: int) -> str:
"""HWP5 binary(.hwp) → markdown (이미지 제외). convert_office_to_md 단일 텍스트 경로용."""
md, _images = _run_hwp5html(path, timeout=timeout)
return md
def convert_hwp_to_md_and_images(
path: str | Path, *, timeout: int = 90
) -> tuple[str, list[dict]]:
"""HWP5(.hwp) → (markdown, raster_images). marker_worker 이미지 영속 경로 전용.
실패/빈출력 계약은 convert_office_to_md 동일(OfficeMdError raise / md 절대 반환 금지).
raster_images 원소 = {'data': bytes, 'format': str}; 비어있을 있음(이미지 없는 문서).
"""
p = Path(path)
if p.suffix.lower() != ".hwp":
raise OfficeMdError(f"convert_hwp_to_md_and_images: .hwp 전용, got {p.suffix!r}")
if not p.exists():
raise OfficeMdError(f"file not found: {p}")
md, images = _run_hwp5html(p, timeout=timeout)
md = (md or "").strip()
if len(md) < _MIN_BODY_CHARS:
raise OfficeMdError(f"empty/too-short conversion ({len(md)} chars) for {p.name}")
return md, images
def _via_libreoffice_html(path: Path, *, timeout: int) -> str:
"""LibreOffice headless 로 HTML 변환 후 markdownify. hwp/hwpx 용."""
"""LibreOffice headless 로 HTML 변환 후 markdownify. hwpx 용(.hwp 는 pyhwp)."""
try:
from markdownify import markdownify # 기존 의존성
except ImportError as e: # noqa: BLE001
+12 -2
View File
@@ -22,8 +22,11 @@ logger = setup_logger("queue_consumer")
# stage별 배치 크기
# stt 는 GPU 단일 점유 + 회의 30분짜리도 가능 → 배치 1. thumbnail 은 ffmpeg subprocess 로 가벼움.
# deep_summary (PR-B B-1) 는 MLX 26B 단일 Semaphore(1) 경유 → 배치 1.
# fulltext 는 politeness 지연(같은 도메인 5–15s)이 배치 내 직렬로 걸린다 — 배치 3 이면
# 같은 도메인 최악 ~45s/사이클, 메인 큐 1m 간격(max_instances=1, coalesce)이 흡수.
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1,
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1}
"preview": 2, "stt": 1, "thumbnail": 3, "deep_summary": 1, "markdown": 1,
"fulltext": 3}
STALE_THRESHOLD_MINUTES = 10
# markdown 대형 split 변환은 한 doc 이 수십 분(5210 ≈ 40분) 동안 processing 상태로 머문다.
# marker_worker 는 queue 행에 heartbeat 를 찍지 않으므로(started_at 고정), main 의 10분
@@ -35,7 +38,7 @@ MARKDOWN_STALE_THRESHOLD_MINUTES = int(os.getenv("MARKDOWN_STALE_MINUTES", "120"
# STT 도 장기 작업 가능성이 있으나 본 PR 범위 밖 — main 에 유지(follow-up).
MAIN_QUEUE_STAGES = [
"extract", "classify", "summarize", "embed", "chunk",
"preview", "stt", "thumbnail", "deep_summary",
"preview", "stt", "thumbnail", "deep_summary", "fulltext",
]
MARKDOWN_QUEUE_STAGES = ["markdown"]
@@ -137,6 +140,9 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
# source_channel-aware override (extract stage 만). source_channel 누락 시 _default.
extract_override_by_channel = {
"devonagent": ["embed", "chunk"],
# crawl 채널 파일형 (KOSHA 첨부/GUIDE PDF 등): preview 사전 캐시 스킵 —
# 재료 코퍼스 대량 백필이 preview 큐를 점령하지 않게. classify → embed/chunk/markdown 유지.
"crawl": ["classify"],
}
next_stages = {
@@ -179,6 +185,7 @@ def _load_workers():
from workers.summarize_worker import process as summarize_process
from workers.thumbnail_worker import process as thumbnail_process
from workers.marker_worker import process as marker_process
from workers.fulltext_worker import process as fulltext_process
return {
"extract": extract_process,
@@ -195,6 +202,9 @@ def _load_workers():
# Phase 1B: classify 완료 후 enqueue. PDF→markdown 변환 (leaf, embed/chunk 와 독립).
# consume_markdown_queue 가 전담 (대형 split 변환이 메인 파이프라인을 막지 않도록).
"markdown": marker_process,
# crawl-24x7 A-2: 기사 페이지 fetch → 4-tier 본문 승격. 후속(summarize/embed/chunk)은
# 워커가 직접 enqueue — next_stages dict 미등록 (enqueue_next_stage no-op).
"fulltext": fulltext_process,
}
+265
View File
@@ -0,0 +1,265 @@
"""C-3 공학 정적 코퍼스 1회 일괄 ingest (plan crawl-24x7-1).
National Board 기술 아티클(~86, ASP.NET 구식 기사 앵커가 싱글쿼트 href) +
TWI Job Knowledge(~153, sitemap 기반). 지속 크롤링이 아니라 아카이브 일괄 +
저빈도 증분 유형 스케줄러 미등록, 수동 CLI:
docker exec hyungi_document_server-fastapi-1 \
python -m workers.static_corpus_ingest --corpus all --limit 3 # 검증용
docker exec -d hyungi_document_server-fastapi-1 \
python -m workers.static_corpus_ingest --corpus all # 전체 (~45분)
-d 백그라운드 실행 중단은 host pkill 아니라 컨테이너 내부 PID kill
([[feedback_docker_exec_orphan_kill]]).
멱등: edit_url(정규화)+file_hash dedup 재실행 = 신규분만 (그대로 monthly 증분 절차).
politeness: fetch_page 재사용 (per-domain 1 + 5~15s jitter + robots).
원본 보존·승격 필드: fulltext_worker 동일 규약 (재추출 가능 상태 유지).
실패는 degrade 없이 skip + 말미 목록 출력 (정적 코퍼스 RSS 요약 같은 격하 대상 부재).
"""
import argparse
import asyncio
import hashlib
import re
from datetime import datetime, timezone
from html import unescape
from sqlalchemy import select
from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import enqueue_stage
from workers.fulltext_worker import (
_WEB_MIN_BODY_LEN,
_extract_body,
_raw_html_path,
_save_raw_html,
_strip_article_footer,
)
from workers.news_collector import _article_hash, _normalize_url
logger = setup_logger("static_corpus")
_NB_LISTING = "https://www.nationalboard.org/Index.aspx?pageID=164"
_TWI_SITEMAP = "https://www.twi-global.com/sitemap.xml"
async def _discover_national_board() -> list[str]:
"""목록 페이지의 기사 앵커 — 싱글쿼트 href 가 기본형이라 양쪽 인용부호 매칭."""
html_text, _ = await fetch_page(_NB_LISTING)
ids = sorted(
{int(i) for i in re.findall(
r"href=['\"]/?Index\.aspx\?pageID=164&(?:amp;)?ID=(\d+)['\"]", html_text)}
)
return [f"https://www.nationalboard.org/Index.aspx?pageID=164&ID={i}" for i in ids]
async def _discover_twi() -> list[str]:
"""sitemap 에서 job-knowledge 시리즈만 (faqs/published-papers 는 향후 증분 후보)."""
xml_text, _ = await fetch_page(
_TWI_SITEMAP,
content_types=("text/xml", "application/xml", "text/html"),
)
urls = re.findall(
r"<loc>(https://www\.twi-global\.com/technical-knowledge/job-knowledge/[^<]+)</loc>",
xml_text,
)
return sorted({u for u in urls if not u.rstrip("/").endswith("job-knowledge")})
CORPORA = {
"national-board": {
"source_name": "National Board 기술 아티클",
"listing_url": _NB_LISTING,
"discover": _discover_national_board,
"fetch_method": "page",
},
"twi": {
"source_name": "TWI Job Knowledge",
"listing_url": _TWI_SITEMAP,
"discover": _discover_twi,
"fetch_method": "sitemap+page",
},
}
async def _get_or_create_source(session, spec: dict) -> NewsSource:
"""레지스트리 행 — 출처 추적 + crawl_raw src_{id} 경로 + A-8 패널 가시성.
enabled=False: 6h 뉴스 사이클 비대상 (피드가 없는 정적 코퍼스 증분은 CLI 재실행).
"""
result = await session.execute(
select(NewsSource).where(NewsSource.name == spec["source_name"])
)
source = result.scalars().first()
if source is None:
source = NewsSource(
name=spec["source_name"],
feed_url=spec["listing_url"],
feed_type="rss",
fetch_method=spec["fetch_method"],
fulltext_policy="none",
source_channel="crawl",
category="Engineering",
language="en",
country="US" if "national" in spec["source_name"].lower() else "GB",
enabled=False,
)
session.add(source)
await session.flush()
return source
def _page_title(html_text: str, fallback: str) -> str:
m = re.search(r'<meta\s+property="og:title"\s+content="([^"]+)"', html_text)
if not m:
m = re.search(r"<title[^>]*>([^<]+)</title>", html_text, re.I)
title = unescape(m.group(1)).strip() if m else ""
# 사이트 접미 잡음 제거 (TWI 는 ' - TWI', NB 는 'National Board ...' 꼬리표)
title = re.sub(r"\s*[-|·]\s*(TWI|National Board[^-|]*)\s*$", "", title).strip()
return title or fallback
async def _ingest_one(session, source: NewsSource, url: str) -> str:
"""기사 1건. 반환: 'ok' / 'dup' / 'skip'(추출부족·차단)."""
normalized_url = _normalize_url(url)
existing = await session.execute(
select(Document).where(Document.edit_url.in_([normalized_url, url])).limit(1)
)
if existing.scalars().first():
return "dup"
try:
html_text, final_url = await fetch_page(url)
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
logger.warning(f"[{source.name}] fetch 실패 skip: {url}{type(e).__name__}: {e}")
return "skip"
body, engine, engine_ver = _extract_body(html_text)
if not engine:
logger.warning(f"[{source.name}] 추출 실패 skip (< {_WEB_MIN_BODY_LEN}자): {url}")
return "skip"
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
logger.warning(f"[{source.name}] 푸터 제거 후 본문 부족 skip: {url}")
return "skip"
title = _page_title(html_text, fallback=url.rsplit("/", 1)[-1][:90])
article_id = _article_hash(title, "static", source.name)
dup2 = await session.execute(
select(Document).where(Document.file_hash == article_id).limit(1)
)
if dup2.scalars().first():
return "dup"
now = datetime.now(timezone.utc)
raw_path = _raw_html_path(source.id, article_id, now)
raw_saved = True
try:
_save_raw_html(raw_path, html_text)
except OSError as e:
raw_saved = False
logger.error(f"[{source.name}] 원본 보존 실패 (ingest 는 진행): {e}")
doc = Document(
file_path=f"crawl/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=0, # 아래 extracted_text 확정 후 재계산
file_type="note",
title=title,
extracted_text=f"{title}\n\n{clean_body}",
extracted_at=now,
extractor_version=f"static+page@{engine}",
md_content=clean_body,
md_status="success",
md_extraction_engine=engine,
md_extraction_engine_version=engine_ver,
md_format_version="1.0",
md_generated_at=now,
md_source_hash=hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest(),
md_content_hash=hashlib.sha256(clean_body.encode("utf-8")).hexdigest(),
content_origin="extracted",
source_channel="crawl",
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain="Engineering",
ai_sub_group=source.name,
ai_tags=[f"Engineering/{source.name}"],
extract_meta={
"source_id": source.id,
"source_name": source.name,
"published_at": None, # 정적 코퍼스 — 페이지 발행일 비신뢰, 색인은 채널 게이트로 무조건
"fulltext": {
"status": "static_corpus",
"engine": engine,
"final_url": final_url,
"raw_html_path": str(raw_path) if raw_saved else None,
"body_chars": len(clean_body),
"resolved_at": now.isoformat(),
},
},
)
doc.file_size = len(doc.extracted_text.encode())
session.add(doc)
await session.flush()
# crawl 채널 = 발행일 무관 전량 색인 (summarize 는 맥미니 큐 — D-4 lag 관찰 대상)
await enqueue_stage(session, doc.id, "summarize")
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
logger.info(f"[{source.name}] ingest {len(clean_body)}자 ({engine}): {title[:60]}")
return "ok"
async def run(corpus: str = "all", limit: int = 0) -> None:
targets = list(CORPORA) if corpus == "all" else [corpus]
for key in targets:
spec = CORPORA[key]
async with async_session() as session:
source = await _get_or_create_source(session, spec)
await session.commit()
source_id = source.id
try:
urls = await spec["discover"]()
except (CrawlBlocked, CrawlSkip, CrawlFetchError) as e:
logger.error(f"[{spec['source_name']}] 목록 수집 실패 — corpus 건너뜀: {e}")
continue
if limit:
urls = urls[:limit]
logger.info(f"[{spec['source_name']}] 대상 {len(urls)}건 (limit={limit or '없음'})")
counts = {"ok": 0, "dup": 0, "skip": 0}
failed: list[str] = []
for i, url in enumerate(urls, 1):
# 커밋 10건 단위 — 장시간 배치 중단 시 진행분 보존
async with async_session() as session:
src = await session.get(NewsSource, source_id)
status = await _ingest_one(session, src, url)
await session.commit()
counts[status] += 1
if status == "skip":
failed.append(url)
if i % 10 == 0:
logger.info(f"[{spec['source_name']}] 진행 {i}/{len(urls)} {counts}")
logger.info(f"[{spec['source_name']}] 완료: {counts}")
if failed:
logger.warning(
f"[{spec['source_name']}] skip {len(failed)}건 — 재시도는 CLI 재실행(멱등):\n "
+ "\n ".join(failed)
)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="C-3 정적 코퍼스 일괄 ingest")
parser.add_argument("--corpus", choices=[*CORPORA, "all"], default="all")
parser.add_argument("--limit", type=int, default=0, help="corpus 당 상한 (0=전체)")
args = parser.parse_args()
asyncio.run(run(args.corpus, args.limit))
+38 -3
View File
@@ -64,6 +64,11 @@ services:
environment:
- HF_HOME=/models/huggingface
- TORCH_HOME=/models/torch
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~3.5GB) 해제가 90% 봉투의 전제.
# /ready 는 idle 에서도 200 (fastapi depends_on service_healthy 유지).
# 롤백 = MARKER_PRELOAD=1 + MARKER_IDLE_UNLOAD_MINUTES=0.
- MARKER_PRELOAD=0
- MARKER_IDLE_UNLOAD_MINUTES=${MARKER_IDLE_UNLOAD_MINUTES:-30}
volumes:
- ${NAS_NFS_PATH:-/mnt/nas/Document_Server}:/documents:ro
- marker_models:/models
@@ -97,6 +102,11 @@ services:
- WHISPER_MODEL=${WHISPER_MODEL:-large-v3}
- WHISPER_DEVICE=${WHISPER_DEVICE:-cuda}
- WHISPER_COMPUTE_TYPE=${WHISPER_COMPUTE_TYPE:-float16}
# D-1 (crawl-24x7): idle-unload 전환 — 영구 점유(~4GB) 해제가 90% 봉투의 전제.
# 콜드로드 수초~수십 초는 배치 작업이라 무방 (stt_worker read=1800s 가 흡수).
# 롤백 = STT_PRELOAD=1 + STT_IDLE_UNLOAD_MINUTES=0.
- STT_PRELOAD=0
- STT_IDLE_UNLOAD_MINUTES=${STT_IDLE_UNLOAD_MINUTES:-30}
deploy:
resources:
reservations:
@@ -105,9 +115,9 @@ services:
count: 1
capabilities: [gpu]
healthcheck:
# /ready: CUDA 디바이스 + 모델 적재 둘 다 확인. ready=true 만 healthy 처리.
# /health 는 단순 liveness 라 모델 적재 상태도 healthy 로 잡혀 운영 신호로 부적합.
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('ready') else 1)"]
# D-1: idle-unload 도입으로 '모델 적재' 는 더 이상 상시 상태가 아님 — cuda 가용성만
# healthy 기준. 모델 적재 여부는 /ready 의 models_loaded 필드로 관측(정보성).
test: ["CMD", "python3", "-c", "import json,urllib.request,sys; r=urllib.request.urlopen('http://localhost:3300/ready'); sys.exit(0 if json.load(r).get('cuda') else 1)"]
interval: 30s
timeout: 10s
retries: 3
@@ -229,6 +239,31 @@ services:
- fastapi
restart: unless-stopped
# crawl-24x7 A-8 1차: 전 소스 헬스 패널 — 내부 전용 (읽기 전용 SELECT 만).
# '내부 전용' 성립 구현 = 별도 바인딩뿐 (r4 결정): Tailscale 인터페이스에만 publish.
# 기존 SvelteKit 라우트(vhost=Host 헤더 검사=앱 가드 환원)나 프록시 경로 차단(경로 가드
# 회귀)으로 옮기지 말 것. caddy/home-caddy 라우트 추가 금지. fastapi/postgres 바인딩 선례.
crawl-health:
build: ./services/crawl-health
ports:
- "100.110.63.63:8765:8765"
environment:
- CRAWL_HEALTH_DSN=postgresql://pkm:${POSTGRES_PASSWORD}@postgres:5432/pkm
depends_on:
postgres:
condition: service_healthy
restart: unless-stopped
# crawl-24x7 B-3: 구독 세션 Playwright fetch 격리 — internal-only (host 포트·caddy 라우트 금지).
# 브라우저 hang/크래시가 fastapi APScheduler 를 잠식하지 않게 별도 컨테이너 + mem cap.
# 세션 파일(쿠키=credential 등가물)은 repo 밖 호스트 경로 ro mount (600, gitignore 무관 영역).
playwright-fetcher:
build: ./services/playwright-fetcher
volumes:
- /home/hyungi/.local/share/crawl-auth:/auth:ro
mem_limit: 2g
restart: unless-stopped
caddy:
image: caddy:2
ports:
@@ -10,7 +10,7 @@
import SectionOutline from '$lib/components/SectionOutline.svelte';
import { getViewerType } from '$lib/utils/viewerType';
import { isMdSuccess } from '$lib/utils/mdStatus';
import { buildAnchorMap } from '$lib/utils/outlineAnchors';
import { resolveAnchorMap } from '$lib/utils/resolveAnchorMap';
import { cleanHeading } from '$lib/utils/headingPath';
// 편집 미리보기 전용 plain marked (본문 렌더는 MarkdownDoc 가 담당).
@@ -109,7 +109,7 @@
(s) => !!(cleanHeading(s.section_title) || cleanHeading((s.heading_path || '').split('>').pop() || '')),
),
);
// MarkdownDoc 가 실제 렌더하는 텍스트(anchor offset 기준과 일치해야 함).
// MarkdownDoc 가 실제 렌더하는 텍스트(rail 표시 게이트용).
let mdRenderText = $derived.by(() => {
if (!fullDoc) return '';
if (viewerType === 'pdf') return pdfViewMode === 'markdown' && canShowMarkdown ? (fullDoc.md_content || '') : '';
@@ -117,7 +117,26 @@
if (viewerType === 'hwp-markdown' || viewerType === 'article') return fullDoc.md_content || fullDoc.extracted_text || '';
return '';
});
let anchorMap = $derived(sections.length && mdRenderText ? buildAnchorMap(mdRenderText, sections).anchors : {});
// [g5-t3] basis 는 RENDER SITE 별. anchorMap 을 basis 별로 분리 — 같은 component 가 두 basis 를
// 공유하면(md_content vs extracted_text) trustBE 가 어긋난다.
// - md_content site(pdf-markdown): trustBE=true (BE char_start 1순위, 비면 내부 string-match 폴백).
// - extracted_text site(3-pane markdown): trustBE=false (char_start 는 md_content offset 이라 무효 → 무조건 폴백).
let mdBasisText = $derived.by(() => {
if (!fullDoc) return '';
if (viewerType === 'pdf') return pdfViewMode === 'markdown' && canShowMarkdown ? (fullDoc.md_content || '') : '';
return '';
});
let extractedBasisText = $derived.by(() => {
if (!fullDoc) return '';
if (viewerType === 'markdown') return fullDoc.extracted_text || rawMarkdown || '';
return '';
});
let anchorMapMd = $derived(
sections.length && mdBasisText ? resolveAnchorMap(mdBasisText, sections, { trustBE: true }).anchors : {},
);
let anchorMapExtracted = $derived(
sections.length && extractedBasisText ? resolveAnchorMap(extractedBasisText, sections, { trustBE: false }).anchors : {},
);
let showRail = $derived(outlineSections.length > 0 && !!mdRenderText);
let scrollEl = $state();
@@ -128,7 +147,8 @@
}
// scroll-spy: scrollEl 내 .md-anchor 중 컨테이너 상단(+120) 지난 마지막 = 현재 절.
$effect(() => {
void anchorMap;
void anchorMapMd;
void anchorMapExtracted;
const el = scrollEl;
if (!el) return;
let raf = 0;
@@ -255,7 +275,7 @@
mdStatus={fullDoc.md_status}
mdExtractionError={fullDoc.md_extraction_error}
mdExtractionQuality={fullDoc.md_extraction_quality}
anchorMap={anchorMap}
anchorMap={anchorMapExtracted}
extractedText={fullDoc.extracted_text || rawMarkdown}
class={PROSE}
/>
@@ -280,7 +300,7 @@
mdStatus={fullDoc.md_status}
mdExtractionError={fullDoc.md_extraction_error}
mdExtractionQuality={fullDoc.md_extraction_quality}
anchorMap={anchorMap}
anchorMap={anchorMapMd}
extractedText={fullDoc.extracted_text}
class={PROSE}
/>
@@ -50,7 +50,9 @@
}: Props = $props();
// 개요 anchor 주입: body 의 각 offset(내림차순)에 빈 <span id="sec-N"> 삽입(점프 타깃).
// offset 은 buildAnchorMap 이 body 와 동일 문자열 기준으로 산출했어야 함(호출측 책임).
// [C3 불변식] char_start(BE) 는 호출측이 넘긴 md_content(raw, untransformed)에 대한 UTF-16 offset 이다.
// 이 함수는 그 동일 문자열을 'out' 으로 받아 trim/CRLF-normalize/replace 없이 slice 해야 한다 —
// prop→out 사이 어떤 변환도 char_start 를 drift 시킨다. (현재 out = text(=body=mdContent prop) 무변환.)
function spliceAnchors(text: string, map: Record<number, number> | null): string {
if (!map) return text;
const ents = Object.entries(map)
@@ -69,6 +69,20 @@ test('collapseWindows: 연속 동일 heading window 만 dedupe, 순서 유지',
);
});
test('[C2] collapseWindows: split-parent + window 들 → rail 1행, 대표=split-parent(char_start 보유)', () => {
const input = [
sec({ section_title: 'Article 5', heading_path: 'Article 5', node_type: 'chapter_split', is_leaf: false, char_start: 120 }),
sec({ section_title: 'Article 5', heading_path: 'Article 5', node_type: 'window', is_leaf: true, char_start: null }),
sec({ section_title: 'Article 5', heading_path: 'Article 5', node_type: 'window', is_leaf: true, char_start: null }),
];
const out = collapseWindows(input);
assert.equal(out.length, 1, 'split-parent + 2 window → rail 1행');
// 대표 = split-parent (char_start 보유) → jump 성립
assert.equal(out[0].section.node_type, 'chapter_split');
assert.equal(out[0].section.char_start, 120);
assert.equal(out[0].fragmentCount, 2, 'window 조각 수 = 2 (split-parent 자신 제외)');
});
test('groupOrFlat: 적은 그룹 + 낮은 기타% → group (5140-류)', () => {
// 3 top segment × 4 = 12절, window 없음 → group_count 3, 기타 0%
const sections: DocumentSection[] = [];
+19 -11
View File
@@ -12,8 +12,10 @@ export interface DocumentSection {
section_title: string | null;
heading_path: string | null;
level: number | null;
node_type: string | null; // 'window' | 'section_split' | null
node_type: string | null; // 'window' | 'chapter_split' | 'clause_split' | 'section_split' | null
is_leaf: boolean;
/** md_content 내 heading offset(UTF-16). jump-target 만 값, window-child/preamble/Path A = null (Path B). */
char_start?: number | null;
section_type: string | null;
summary: string | null;
confidence: number | null;
@@ -87,32 +89,38 @@ export function pathSegments(hp: string | null | undefined): string[] {
.filter(Boolean);
}
/** 그룹 키: window/section_split(인공 조각) 또는 path 없음/깨짐 → OTHER. */
/** 그룹 키: window/%_split(인공 조각·windowed split-parent) 또는 path 없음/깨짐 → OTHER. */
function topSegment(s: DocumentSection): string {
if (s.node_type === 'window' || s.node_type === 'section_split') return OTHER;
if (s.node_type === 'window' || !!s.node_type?.endsWith('_split')) return OTHER;
const segs = pathSegments(s.heading_path);
return segs.length === 0 ? OTHER : segs[0];
}
/**
* chunk_index ( ), cleaned heading_path
* node_type='window' 1 dedupe. = ( ), fragmentCount .
* node_type='window' 1 dedupe. fragmentCount = window .
*
* [C2] g4-t2 split-parent(%_split, char_start ) window child ( chunk_index)
* , window child split-parent( legacy window ) rail 1 .
* merged row section = split-parent jump(anchorMap[split-parent char_start])
* window-child(char_start NULL, anchorMap ) windowed section .
* fragmentCount: split-parent 0 ( ) + child = ;
* legacy window 1 ( ).
*/
export function collapseWindows(sections: DocumentSection[]): OutlineItem[] {
const out: OutlineItem[] = [];
for (const s of sections) {
const prev = out[out.length - 1];
const h = cleanHeading(s.heading_path);
if (
s.node_type === 'window' &&
const prevAbsorbs =
prev &&
prev.section.node_type === 'window' &&
(prev.section.node_type === 'window' || !!prev.section.node_type?.endsWith('_split')) &&
h !== '' &&
cleanHeading(prev.section.heading_path) === h
) {
prev.fragmentCount += 1;
cleanHeading(prev.section.heading_path) === h;
if (s.node_type === 'window' && prevAbsorbs) {
prev!.fragmentCount += 1; // window child 흡수 — 대표(split-parent 우선)는 그대로 유지
} else {
out.push({ section: s, fragmentCount: 1 });
out.push({ section: s, fragmentCount: s.node_type?.endsWith('_split') ? 0 : 1 });
}
}
return out;
+3 -2
View File
@@ -69,8 +69,9 @@ export function buildAnchorMap(
let matched = 0;
for (const s of sections) {
// window/section_split 조각은 자체 heading 없음(부모 제목 상속) → 건너뜀.
if (s.node_type === 'window' || s.node_type === 'section_split') continue;
// window 조각 + %_split parent(chapter_split/clause_split/section_split)는 string-match 대상 아님 →
// 건너뜀. (split-parent jump 은 Path B 의 BE char_start 로만 성립; Path A 폴백선 windowed 절 무점프=무회귀.)
if (s.node_type === 'window' || s.node_type?.endsWith('_split')) continue;
let nt = norm(s.section_title);
if (!nt && s.heading_path) {
const last = s.heading_path.split('>').pop();
@@ -0,0 +1,95 @@
// resolveAnchorMap 회귀 테스트 (플랜 ds-outline-anchor-b5 g5-t1 / NEW-5 / B4 / C1).
// 실행: node --test src/lib/utils/resolveAnchorMap.test.ts
import { test } from 'node:test';
import assert from 'node:assert/strict';
import { resolveAnchorMap, isJumpTargetCandidate } from './resolveAnchorMap.ts';
import { type DocumentSection } from './headingPath.ts';
let _id = 0;
function sec(p: Partial<DocumentSection>): DocumentSection {
return {
chunk_id: ++_id,
section_title: null,
heading_path: null,
level: null,
node_type: null,
is_leaf: true,
char_start: null,
section_type: null,
summary: null,
confidence: null,
...p,
};
}
const LONG = 'x'.repeat(500);
test('trustBE=false → 무조건 string-match 폴백(fellBack=true)', () => {
const md = '# Alpha\nbody\n# Beta\nx';
const secs = [sec({ section_title: 'Alpha', char_start: 999 }), sec({ section_title: 'Beta', char_start: 999 })];
const r = resolveAnchorMap(md, secs, { trustBE: false });
assert.equal(r.fellBack, true);
// char_start(999) 무시하고 string-match offset 사용
assert.ok(Object.values(r.anchors).every((o) => o < 50));
});
test('trustBE=true + 모든 jump-target candidate char_start 보유 → BE 채택(fellBack=false)', () => {
const secs = [
sec({ section_title: 'A', char_start: 5, is_leaf: true }),
sec({ section_title: 'B', char_start: 42, is_leaf: true }),
];
const r = resolveAnchorMap(LONG, secs, { trustBE: true });
assert.equal(r.fellBack, false);
assert.equal(r.anchors[secs[0].chunk_id], 5);
assert.equal(r.anchors[secs[1].chunk_id], 42);
assert.equal(r.matched, 2);
});
test('[NEW-5] windowed doc — window-child char_start NULL 이 폴백을 유발하지 않음(split-parent BE 사용)', () => {
const secs = [
sec({ section_title: 'Big', heading_path: 'Big', node_type: 'chapter_split', is_leaf: false, char_start: 10 }),
sec({ section_title: 'Big', heading_path: 'Big', node_type: 'window', is_leaf: true, char_start: null }),
sec({ section_title: 'Big', heading_path: 'Big', node_type: 'window', is_leaf: true, char_start: null }),
];
const r = resolveAnchorMap(LONG, secs, { trustBE: true });
// window-child NULL 은 candidate 가 아니므로 트리거 안 됨 → BE 사용, split-parent 점프 보존
assert.equal(r.fellBack, false, 'window-child NULL 이 whole-doc 폴백을 유발하면 안 됨(NEW-5)');
assert.equal(r.anchors[secs[0].chunk_id], 10, 'split-parent char_start 가 BE 맵에 있어야 함');
// window-child 는 anchor 없음
assert.equal(r.anchors[secs[1].chunk_id], undefined);
});
test('[B4] non-PASS doc — jump-target candidate char_start NULL → string-match 폴백', () => {
const md = '# Gamma\nbody text here\n# Delta\nmore';
const secs = [
sec({ section_title: 'Gamma', is_leaf: true, char_start: null }),
sec({ section_title: 'Delta', is_leaf: true, char_start: null }),
];
const r = resolveAnchorMap(md, secs, { trustBE: true });
assert.equal(r.fellBack, true, 'candidate char_start NULL 이면 폴백해야 함(BE-first not BE-only)');
// string-match 로 실제 jump 산출(0 아님)
assert.ok(r.matched >= 1, 'md-aligned doc 는 폴백 string-match 로 jump 비-0');
});
test('char_start > splicedText.length → 그 anchor 만 비활성, 폴백 안 함', () => {
const secs = [
sec({ section_title: 'A', char_start: 3, is_leaf: true }),
sec({ section_title: 'B', char_start: 100000, is_leaf: true }), // 범위 초과(truncated tail)
];
const short = 'hello world';
const r = resolveAnchorMap(short, secs, { trustBE: true });
assert.equal(r.fellBack, false, '범위 초과는 폴백 트리거 아님(candidate char_start NOT NULL)');
assert.equal(r.anchors[secs[0].chunk_id], 3);
assert.equal(r.anchors[secs[1].chunk_id], undefined, '초과 anchor 는 비활성');
});
test('preamble(title 없음, is_leaf) char_start NULL 은 candidate 아님 → 폴백 유발 X', () => {
const secs = [
sec({ section_title: null, heading_path: null, is_leaf: true, char_start: null }), // preamble
sec({ section_title: 'Real', is_leaf: true, char_start: 7 }),
];
const r = resolveAnchorMap(LONG, secs, { trustBE: true });
assert.equal(isJumpTargetCandidate(secs[0]), false, 'preamble 은 candidate 아님');
assert.equal(r.fellBack, false);
assert.equal(r.anchors[secs[1].chunk_id], 7);
});
@@ -0,0 +1,82 @@
// 개요(절 목차) → 본문 점프 anchor 산출 공유 헬퍼 (경로 B: BE char_start primary + string-match 폴백).
//
// render-site 가 md_content 를 splice 할 때(trustBE=true)는 BE 가 builder 단계에서 박은 char_start 를
// 1순위로 쓰고, 비-md basis(3-pane extracted_text 등, trustBE=false)는 무조건 string-match(buildAnchorMap)로
// 폴백한다. char_start 가 비어 있으면(non-PASS doc, 또는 multi-night 재처리 중 아직 미백필 PASS doc) BE-only
// 가 아니라 string-match 로 graceful degrade 한다(B4: BE-first, NOT BE-only).
//
// ★ NEW-5 (must-not-miss): 폴백 트리거는 JUMP-TARGET-CANDIDATE 한정이다.
// window-child(node_type='window')와 preamble(title 없음)은 char_start=NULL **BY DESIGN**(g2).
// 트리거가 'NULL char_start 가 하나라도 있으면 whole-doc 폴백' 이면, window-child 를 항상 보유한 windowed
// doc 은 매번 폴백 → split-parent char_start(windowed 절의 단일 jump target)를 영영 안 쓰고 →
// buildAnchorMap 은 split-parent 를 skip → windowed 코어 절이 영원히 점프 안 됨 = 이 플랜이 겨냥한
// 바로 그 절에서 Path A 0% 회귀. 따라서 트리거 분모 = jump-target-candidate 뿐.
import { buildAnchorMap } from './outlineAnchors.ts';
import { cleanHeading, type DocumentSection } from './headingPath.ts';
export interface ResolveResult {
/** chunk_id → splicedText 내 char offset (UTF-16). */
anchors: Record<number, number>;
/** jump-target candidate 수(BE 경로) 또는 buildAnchorMap.total(폴백). */
total: number;
/** 실제 anchor 부여 수. */
matched: number;
/** string-match(buildAnchorMap) 로 폴백했는지 — V-rail/검증용. */
fellBack: boolean;
}
/** 표시 가능한 제목(또는 heading_path 말단)이 있는가. */
function hasTitle(s: DocumentSection): boolean {
if (cleanHeading(s.section_title)) return true;
const last = (s.heading_path || '').split('>').pop() || '';
return !!cleanHeading(last);
}
/**
* jump-target candidate = char_start .
* = (-window leaf) OR (%_split parent), .
* window-child(node_type='window')·preamble( ) char_start NULL candidate (NEW-5).
*/
export function isJumpTargetCandidate(s: DocumentSection): boolean {
const structural = (s.is_leaf && s.node_type !== 'window') || !!s.node_type?.endsWith('_split');
return structural && hasTitle(s);
}
export function resolveAnchorMap(
splicedText: string | null | undefined,
sections: DocumentSection[] | null | undefined,
opts: { trustBE: boolean },
): ResolveResult {
const secs = sections ?? [];
// basis 불일치(extracted_text 3-pane 등) → 무조건 string-match.
if (!opts.trustBE) {
const r = buildAnchorMap(splicedText, secs);
return { ...r, fellBack: true };
}
// [B4 + NEW-5] BE-first: jump-target candidate 가 비었거나, candidate 중 char_start NULL 이 있으면 폴백.
// window-child/preamble NULL 은 candidate 가 아니라 트리거에 안 들어간다.
const candidates = secs.filter(isJumpTargetCandidate);
const beUnusable = candidates.length === 0 || candidates.some((s) => s.char_start == null);
if (beUnusable) {
const r = buildAnchorMap(splicedText, secs);
return { ...r, fellBack: true };
}
// BE char_start 채택 (C1: window/null/no-title 제외 = candidate 집합과 동일).
const anchors: Record<number, number> = {};
const limit = (splicedText ?? '').length;
let matched = 0;
for (const s of candidates) {
const cs = s.char_start as number;
// char_start<=splicedText.length 가드(MarkdownDoc.svelte:58). 초과 = FE serve-truncate tail →
// 그 anchor 만 비활성(폴백 안 함 — string-match 도 truncated tail 은 못 찾음).
if (Number.isFinite(cs) && cs >= 0 && cs <= limit) {
anchors[s.chunk_id] = cs;
matched++;
}
}
return { anchors, total: candidates.length, matched, fellBack: false };
}
@@ -7,7 +7,7 @@
import { goto } from '$app/navigation';
import { api, getAccessToken } from '$lib/api';
import { isMdSuccess } from '$lib/utils/mdStatus';
import { buildAnchorMap } from '$lib/utils/outlineAnchors';
import { resolveAnchorMap } from '$lib/utils/resolveAnchorMap';
import { addToast } from '$lib/stores/toast';
import { marked } from 'marked';
import DOMPurify from 'dompurify';
@@ -164,11 +164,12 @@
}
});
// ── 개요 점프 (outlineAnchors, 경로 A) ──
// anchorMap = md_content 의 각 절 heading offset. MarkdownDoc 가 <span id="sec-N"> 주입.
// ── 개요 점프 (경로 B: BE char_start primary + string-match 폴백) ──
// 이 사이트는 항상 md_content basis(canShowMarkdown && doc.md_content) → trustBE=true.
// BE char_start 가 있으면 채택, 비면(non-PASS/미백필) resolveAnchorMap 내부에서 buildAnchorMap 로 폴백.
let anchorMap = $derived(
hasSections && canShowMarkdown && doc?.md_content
? buildAnchorMap(doc.md_content, sections).anchors
? resolveAnchorMap(doc.md_content, sections, { trustBE: true }).anchors
: {}
);
let activeKey = $state(null);
@@ -411,11 +412,13 @@
</span>
</div>
{#if doc.md_content || doc.extracted_text}
<!-- article = 텍스트 네이티브(markdown 변환 비대상). md_status='skipped' 라도
"Markdown 제외" badge 를 띄우지 않도록 mdStatus 미전달(badge 는 mdStatus 로만 구동). -->
<MarkdownDoc
documentId={doc.id}
mdContent={doc.md_content}
mdFrontmatter={doc.md_frontmatter}
mdStatus={doc.md_status}
mdStatus={null}
mdExtractionError={doc.md_extraction_error}
mdExtractionQuality={doc.md_extraction_quality}
extractedText={doc.extracted_text}
@@ -0,0 +1,15 @@
-- 318_document_chunks_char_start.sql
-- 플랜 ds-outline-anchor-b5 (Path B, g1-t1): hier 절 → md_content 본문 점프용 offset 컬럼.
--
-- char_start = md_content 내 heading 라인 시작 offset, **UTF-16 code unit** 기준
-- (FE outlineAnchors.ts:64 `off += raw.length + 1` / MarkdownDoc.svelte:63 `out.slice(off)` 와 동일 단위).
-- NULL 허용 = (a) md_content 없음(legacy/news/Path A) (b) window-child(node_type='window') (c) preamble(title NULL).
-- → jump-target(비-window leaf OR %_split parent)만 NOT NULL 을 받는다(BY DESIGN, B1/B3 완료마커 기준).
--
-- 두 backfill 경로 공통 prereq:
-- - UPDATE-only path(g3-tU, hash_stable): 저장된 hier 행에 char_start 만 UPDATE (DELETE/CASCADE/재임베딩 0).
-- - re-decompose path(g3-t2, hash_changed): persist INSERT 시 char_start 동봉.
--
-- 멱등: ADD COLUMN IF NOT EXISTS + init_db version-skip + pg_advisory_xact_lock. BEGIN/COMMIT 금지(단일 statement).
ALTER TABLE document_chunks ADD COLUMN IF NOT EXISTS char_start INTEGER NULL;
@@ -0,0 +1,19 @@
-- A-3 (plan crawl-24x7-1): 소스 레지스트리 증축 — additive only.
-- fetch_method : rss / rss+page / sitemap+page / page / api / signal-only
-- fulltext_policy : none(현행 유지) / page(기사 페이지 fetch 후 4-tier 승격) / feed-full(피드 본문이 전문)
-- auth_profile : NULL=공개, 값=구독 세션 키 (B-3 Playwright 어댑터용 슬롯)
-- poll_interval_minutes : 소스별 차등 폴링 (NULL=전역 6h 사이클)
-- etag / last_modified : 조건부 GET 워터마크 — 받은 그대로 저장·재전송 (상태는 전부 DB, APScheduler in-process)
-- feed_content_hash : CDN ETag 회전 대비 콘텐츠 해시 변경감지 병행
-- selector_override : 추출 실패 잦은 소스의 site-specific CSS selector (JSONB)
-- parser_quirk : rdf / table-strip / gn-redirect 등 파서 특이 케이스
ALTER TABLE news_sources
ADD COLUMN IF NOT EXISTS fetch_method VARCHAR(20) NOT NULL DEFAULT 'rss',
ADD COLUMN IF NOT EXISTS fulltext_policy VARCHAR(20) NOT NULL DEFAULT 'none',
ADD COLUMN IF NOT EXISTS auth_profile VARCHAR(50),
ADD COLUMN IF NOT EXISTS poll_interval_minutes INTEGER,
ADD COLUMN IF NOT EXISTS etag TEXT,
ADD COLUMN IF NOT EXISTS last_modified TEXT,
ADD COLUMN IF NOT EXISTS feed_content_hash VARCHAR(64),
ADD COLUMN IF NOT EXISTS selector_override JSONB,
ADD COLUMN IF NOT EXISTS parser_quirk VARCHAR(30);
@@ -0,0 +1,3 @@
-- 0-5 (a) 확정 (plan crawl-24x7-1): 도메인 자료(안전/공학/철학) 채널 신설 — news 와 분리.
-- 신규 값은 같은 트랜잭션 내 사용 금지 (PG 제약) — 본 배치의 다른 마이그레이션은 'crawl' 미사용.
ALTER TYPE source_channel ADD VALUE IF NOT EXISTS 'crawl';
@@ -0,0 +1,3 @@
-- A-2 (plan crawl-24x7-1): RSS 요약 → 기사 페이지 fetch → 4-tier 본문 승격 stage.
-- fulltext_policy='page' 소스의 기사에만 news_collector 가 enqueue.
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'fulltext';
+19
View File
@@ -0,0 +1,19 @@
-- A-5 (plan crawl-24x7-1): 소스 건강 — 소스별 실패 격리 기록 + circuit breaker.
-- 한 소스가 죽어도 나머지 영향 0. silent skip 누적 방지의 가시성 기반 (A-8 패널이 읽음).
-- circuit_state: closed(정상) / open(연속 실패로 지수 backoff 중) / disabled(M회 초과, 수동 복구 대상)
-- empty_streak : 200 인데 entries 0 인 연속 fetch 횟수 (피드 부패 감시 — 304/해시동일은 미집계)
CREATE TABLE IF NOT EXISTS source_health (
id SERIAL PRIMARY KEY,
source_id INTEGER NOT NULL REFERENCES news_sources(id) ON DELETE CASCADE,
consecutive_failures INTEGER NOT NULL DEFAULT 0,
total_fetches BIGINT NOT NULL DEFAULT 0,
total_failures BIGINT NOT NULL DEFAULT 0,
last_success_at TIMESTAMPTZ,
last_error TEXT,
last_error_at TIMESTAMPTZ,
last_fetch_items INTEGER,
empty_streak INTEGER NOT NULL DEFAULT 0,
circuit_state VARCHAR(10) NOT NULL DEFAULT 'closed',
circuit_opened_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
@@ -0,0 +1,2 @@
-- A-5: source_health 는 news_sources 와 1:1 — upsert 기준 키.
CREATE UNIQUE INDEX IF NOT EXISTS uq_source_health_source_id ON source_health (source_id);
@@ -0,0 +1,5 @@
-- B/C 그룹 (plan crawl-24x7-1, 0-5 확정): 레지스트리에 채널 컬럼 — additive only.
-- documents.source_channel 과 동일 enum 재사용 ('crawl' 값은 320 에서 별도 트랜잭션으로 추가 완료).
-- 기존 행 전부 'news' 기본값 = 무회귀. crawl 채널 소스의 문서 생성/색인 게이트 분기 기준.
ALTER TABLE news_sources
ADD COLUMN IF NOT EXISTS source_channel source_channel NOT NULL DEFAULT 'news';
@@ -0,0 +1,8 @@
-- B-3 (plan crawl-24x7-1): 구독 세션 상태 노출 계약 — additive only.
-- relogin_requested: 쓰기 1종 플래그 (A-8 버튼이 기록, 어댑터가 소비 = 수동 half-open).
-- 소비 위치 함정(r5 고정): open-스킵 분기보다 앞 — 어댑터 틱마다 확인.
-- last_probe_at/ok: 내용 기반 probe 결과 (시간 기반 만료 판정 금지 — silent corruption 차단).
ALTER TABLE source_health
ADD COLUMN IF NOT EXISTS relogin_requested BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS last_probe_at TIMESTAMPTZ,
ADD COLUMN IF NOT EXISTS last_probe_ok BOOLEAN;
@@ -0,0 +1,33 @@
-- crawl-24x7 사이클 2 소스 seed (B-2 + C-1 안전 + C-5 철학) — 2026-06-10 전 URL live 검증.
-- 262 선례: WHERE NOT EXISTS idempotent, 기존 행 보존, 신규만 insert (단일 statement).
-- 채널: news = 다이제스트/브리핑 대상 / crawl = 도메인 재료 (0-5 분리).
-- 정책: feed-full = 피드 본문이 전문 (UK HSE content:encoded 실측) / page = 기사 페이지 4-tier 승격.
-- EU-OSHA 는 후보 등재만 (enabled=false — 카드 C-1 '우선순위 낮음').
-- 르몽드 B-3 활성화는 seed 아님 — 세션 박제 후 runtime UPDATE (auth_profile/selector_override).
INSERT INTO news_sources
(name, country, language, feed_type, feed_url, category, enabled,
fetch_method, fulltext_policy, source_channel, parser_quirk)
SELECT v.name, v.country, v.language, v.feed_type, v.feed_url, v.category, v.enabled,
v.fetch_method, v.fulltext_policy, v.source_channel::source_channel, v.parser_quirk
FROM (VALUES
-- B-2: Guardian Open Platform (전문 JSON — 스크래핑 불요, GUARDIAN_API_KEY 필요)
('Guardian World', 'GB', 'en', 'api', 'https://content.guardianapis.com/search?section=world', 'International', true, 'api', 'none', 'news', NULL),
-- C-1 안전 (Safety)
('UK HSE Press', 'GB', 'en', 'rss', 'https://press.hse.gov.uk/feed/', 'Safety', true, 'rss', 'feed-full', 'crawl', NULL),
('안전신문', 'KR', 'ko', 'rss', 'https://www.safetynews.co.kr/rss/allArticle.xml', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('고용노동부 공지', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/notice.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('고용노동부 정책', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/policy.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('고용노동부 입법행정예고', 'KR', 'ko', 'rss', 'https://www.moel.go.kr/rss/lawinfo.do', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('OSHA QuickTakes', 'US', 'en', 'rss', 'https://www.osha.gov/sites/default/files/quicktakes.xml', 'Safety', true, 'rss', 'page', 'crawl', NULL),
('EU-OSHA News', 'EU', 'en', 'rss', 'https://osha.europa.eu/en/rss-feeds/latest/news.xml', 'Safety', false, 'rss', 'page', 'crawl', NULL),
-- C-5 철학 (Philosophy)
('SEP 신규·개정', 'US', 'en', 'rss', 'https://plato.stanford.edu/rss/sep.xml', 'Philosophy', true, 'rss', 'page', 'crawl', NULL),
('1000-Word Philosophy', 'US', 'en', 'rss', 'https://1000wordphilosophy.com/feed/', 'Philosophy', true, 'rss', 'feed-full', 'crawl', NULL),
('Doing Philosophy', 'KR', 'ko', 'rss', 'https://doingphilosophy.kr/feed', 'Philosophy', true, 'rss', 'page', 'crawl', NULL),
('Aeon', 'GB', 'en', 'rss', 'https://aeon.co/feed.rss', 'Philosophy', true, 'rss', 'page', 'crawl', 'skip-video'),
('Psyche', 'GB', 'en', 'rss', 'https://psyche.co/feed.rss', 'Philosophy', true, 'rss', 'page', 'crawl', 'skip-video')
) AS v(name, country, language, feed_type, feed_url, category, enabled,
fetch_method, fulltext_policy, source_channel, parser_quirk)
WHERE NOT EXISTS (
SELECT 1 FROM news_sources ns WHERE ns.name = v.name
);
+110
View File
@@ -0,0 +1,110 @@
"""HWP(library) 백필 — 지정 PKM 폴더의 .hwp 를 content-hash dedup 후 일회성 ingest.
산업안전기사 외부 학습자료(category='library') 코퍼스에 편입한다. file_watcher
PKM 트랙 로직을 재사용하되 dedup file_path 아닌 **file_hash** 기준으로 해서
(a) Inbox 중복 (b) `_1`/`카피본` 사본을 1건으로 수렴시킨다(file_watcher path dedup 이라
동일내용 다른경로를 중복 ingest ). 이후 파이프라인:
extract(텍스트) classify embed/chunk(검색) markdown(.hwp=pyhwp hwp5html + raster NAS 영속)
실행 (GPU 서버):
# dry-run (기본) — 무엇이 ingest/skip 될지만 출력
docker exec hyungi_document_server-fastapi-1 \
python /app/scripts/backfill_hwp_library.py --subdir Knowledge/Engineering
# 실제 ingest
docker exec hyungi_document_server-fastapi-1 \
python /app/scripts/backfill_hwp_library.py --subdir Knowledge/Engineering --commit
"""
import argparse
import asyncio
import sys
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from core.utils import file_hash
from models.document import Document
from models.queue import enqueue_stage
async def run(subdir: str, commit: bool) -> int:
nas_root = Path(settings.nas_mount_path)
scan_root = nas_root / "PKM" / subdir
if not scan_root.exists():
print(f"[backfill] scan_root 부재: {scan_root}", file=sys.stderr)
return 2
files = sorted(
p for p in scan_root.rglob("*") if p.is_file() and p.suffix.lower() == ".hwp"
)
print(f"[backfill] {scan_root} 하위 .hwp {len(files)}개 발견 / mode={'COMMIT' if commit else 'DRY-RUN'}")
ingested = skipped_existing = skipped_batchdup = 0
seen_hashes: set[str] = set()
async with async_session() as session:
for fp in files:
rel_path = str(fp.relative_to(nas_root))
fhash = file_hash(fp)
if fhash in seen_hashes:
print(f" SKIP(batch-dup) {rel_path}")
skipped_batchdup += 1
continue
seen_hashes.add(fhash)
# content-hash dedup (path 무관) — Inbox 중복 + _1/카피본 사본 흡수
existing = (
await session.execute(
select(Document.id).where(Document.file_hash == fhash).limit(1)
)
).first()
if existing:
print(f" SKIP(exists id={existing[0]}) {rel_path}")
skipped_existing += 1
continue
ingested += 1
if not commit:
print(f" INGEST(dry) {rel_path}")
continue
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format="hwp",
file_size=fp.stat().st_size,
file_type="immutable",
title=fp.stem,
source_channel="drive_sync",
category="library",
needs_conversion=False,
)
session.add(doc)
await session.flush()
await enqueue_stage(session, doc.id, "extract")
print(f" INGEST id={doc.id} {rel_path}")
if commit:
await session.commit()
print(
f"[backfill] done — ingest={ingested} "
f"skip_existing={skipped_existing} skip_batchdup={skipped_batchdup}"
)
return 0
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__)
ap.add_argument("--subdir", default="Knowledge/Engineering", help="PKM 하위 스캔 폴더")
ap.add_argument("--commit", action="store_true", help="실제 ingest (없으면 dry-run)")
args = ap.parse_args()
return asyncio.run(run(args.subdir, args.commit))
if __name__ == "__main__":
sys.exit(main())
+59
View File
@@ -0,0 +1,59 @@
"""B-3 구독 세션 1회 수동 박제 (MacBook 등 GUI 머신에서 실행).
르몽드 = Google OAuth 자동화 브라우저 로그인은 Google 차단하므로
로그인 자체는 항상 사람이 headed 브라우저에서 수행하고, 스크립트는
결과(쿠키+localStorage = storage_state JSON) 박제한다.
사용 (MacBook):
pip install playwright && playwright install chromium
python scripts/capture_subscription_session.py --profile lemonde --url https://www.lemonde.fr
1) 떠오른 브라우저에서 직접 로그인 (Google OAuth 포함)
2) 로그인 완료 확인 터미널에서 Enter
3) ~/.local/share/crawl-auth/lemonde.json 저장 (600)
GPU 반영:
ssh gpu 'mkdir -p ~/.local/share/crawl-auth && chmod 700 ~/.local/share/crawl-auth'
scp ~/.local/share/crawl-auth/lemonde.json gpu:.local/share/crawl-auth/
ssh gpu 'chmod 600 ~/.local/share/crawl-auth/lemonde.json'
세션 만료 재로그인도 동일 절차 + source_health.relogin_requested 플래그 set
(어댑터가 다음 틱에 half-open probe 소비).
주의: storage_state = credential 등가물. repo ·백업 대상 경로에 두지 .
"""
import argparse
from pathlib import Path
from playwright.sync_api import sync_playwright
AUTH_DIR = Path.home() / ".local" / "share" / "crawl-auth"
def main() -> None:
parser = argparse.ArgumentParser(description="B-3 구독 세션 storage_state 박제")
parser.add_argument("--profile", required=True, help="예: lemonde")
parser.add_argument("--url", required=True, help="로그인 시작 페이지")
args = parser.parse_args()
AUTH_DIR.mkdir(parents=True, exist_ok=True)
AUTH_DIR.chmod(0o700)
out = AUTH_DIR / f"{args.profile}.json"
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=False)
context = browser.new_context(viewport={"width": 1366, "height": 900})
page = context.new_page()
page.goto(args.url)
print(f"\n브라우저에서 로그인을 완료한 뒤 이 터미널에서 Enter 를 누르세요.")
input("로그인 완료 후 Enter > ")
context.storage_state(path=str(out))
browser.close()
out.chmod(0o600)
print(f"저장: {out} (600)")
print("다음: scp 로 GPU ~/.local/share/crawl-auth/ 반영 + chmod 600")
if __name__ == "__main__":
main()
+203
View File
@@ -0,0 +1,203 @@
"""hier 개요 keep-better 게이트 + g-measure 엔진 (플랜 ds-outline-anchor-b5 g6-t1 / gm-t1).
READ-ONLY dry-run. doc 별로:
(A) 저장 hier 절제목 (source_type='hier_section', char_start IS NULL = extracted_text )
(B) build_hier_tree(md_content) 절제목 (= g2 builder: split('\n')+UTF-16+fence skip)
비교해 산출:
- verdict {B_better, A_better, equivalent} (+ junk-heading 검출 A_better 보호)
- B_jumptarget_count (build jump-target node ) B3 게이트 입력
- hash_stable 판정 UPDATE-only(g3-tU) vs re-decompose(g3-t2) 라우팅:
* hash_stable_strict = build(md) 저장 hier hash position-by-position 100% 재현
(= 런타임 g3-tU UPDATE-only 처리할 정확한 집합; demote )
* hash_stable_99 = >=99% 재현 ( MEASURE2 분류 기준 비교용)
- dup_title_count / has_fence (measure3 budget note: fence 보유 doc builder 에서 hash_changed flip 가능)
- REFINED PASS = (verdict B>=A) AND (B_jumptarget>=1)
gm-t1 재확인( 빌드의 유일 잔여 측정): g2 builder 코딩 1 실행 REFINED PASS
hash_changed(=re-decompose) count ~230 인지 확인(코드펜스-skip 으로 32 2 flip 최대 ~232 수용).
실행 (GPU 서버, 컨테이너):
docker compose exec -T fastapi python /app/scripts/hier_outline_quality_gate.py run
docker compose exec -T fastapi python /app/scripts/hier_outline_quality_gate.py run --json /tmp/measure.json
docker compose exec -T fastapi python /app/scripts/hier_outline_quality_gate.py run --doc 5140,5209,5165 # 코어 spot-check
"""
import argparse
import asyncio
import json
import os
import re
import sys
from collections import Counter
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from sqlalchemy import text
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from services.hier_decomp.builder import build_hier_tree
def _is_jump_target(node) -> bool:
"""jump-target = 비-window leaf OR %_split parent + 제목 보유 (resolveAnchorMap / _JUMP_TARGET_PRED 일치)."""
structural = (node.is_leaf and node.node_type != "window") or bool(
node.node_type and node.node_type.endswith("_split"))
return structural and bool(node.section_title)
# cover/TOC org-이름 junk 검출 (g6-t1 high-recall): 회사명 접미사 + 거의-전부-대문자.
_JUNK_ORG = re.compile(r"\b(INC\.?|LLC|L\.L\.C|CORP\.?|CO\.,?\s*LTD|CONSULTING|COMPANY|LIMITED|LTD\.?)\b", re.I)
_FENCE_ANY = re.compile(r"(?m)^\s{0,3}(```|~~~)")
def _looks_junk(title: str | None) -> bool:
if not title:
return False
if _JUNK_ORG.search(title):
return True
letters = [c for c in title if c.isalpha()]
if len(letters) >= 6 and sum(1 for c in letters if c.isupper()) / len(letters) >= 0.85:
return True
return False
def _make_engine():
return create_async_engine(os.environ["DATABASE_URL"], pool_pre_ping=True)
async def _measure_doc(session, doc_id):
md = await session.scalar(text("SELECT md_content FROM documents WHERE id=:d"), {"d": doc_id})
stored = (await session.execute(text("""
SELECT chunk_index, chunk_content_hash, node_type, is_leaf, section_title, char_start
FROM document_chunks WHERE doc_id=:d AND source_type='hier_section'
ORDER BY chunk_index"""), {"d": doc_id})).mappings().all()
if not stored:
return None
res = {"doc_id": doc_id, "n_stored": len(stored)}
if not md or not md.strip():
res.update({"md_null": True, "verdict": "A_better", "b_jumptarget": 0,
"hash_stable_strict": False, "refined_pass": False})
return res
nodes = build_hier_tree(md)
jt = [n for n in nodes if _is_jump_target(n)]
titles = [n.section_title for n in jt]
res["n_build"] = len(nodes)
res["b_jumptarget"] = len(jt)
res["dup_title"] = len(titles) - len(set(titles))
res["has_fence"] = bool(_FENCE_ANY.search(md))
res["len_md"] = len(md)
# hash 비교 (position-aligned, runtime g3-tU 기준).
if len(nodes) == len(stored):
mism = sum(1 for n, s in zip(nodes, stored)
if n.chunk_content_hash != s["chunk_content_hash"])
frac = (len(stored) - mism) / len(stored)
res["hash_match_frac"] = round(frac, 4)
res["hash_stable_strict"] = (mism == 0)
res["hash_stable_99"] = (frac >= 0.99)
else:
res["hash_match_frac"] = 0.0
res["hash_stable_strict"] = False
res["hash_stable_99"] = False
stored_titles = {s["section_title"] for s in stored if s["section_title"]}
res["junk_b"] = any(_looks_junk(n.section_title) and n.section_title not in stored_titles for n in nodes)
# verdict 휴리스틱 (high-recall junk 보호 + absent-structure → A_better).
# MEASURE2 가 canonical 분포를 이미 박제 — 이 verdict 는 재현/감사용. 애매(notes:ambiguous)는 PASS 미차단.
# ★ apples-to-apples: 양쪽 모두 JUMP-TARGET 수로 비교(stored leaf 전수 X — window-child 가 n_a 를 부풀려
# windowed doc 을 거짓 A_better 로 떨구는 bias 제거). stored jump-target = (비-window leaf OR %_split) + 제목.
def _stored_is_jt(s):
st = (s["is_leaf"] and s["node_type"] != "window") or bool(
s["node_type"] and s["node_type"].endswith("_split"))
return st and bool(s["section_title"])
n_a = sum(1 for s in stored if _stored_is_jt(s))
res["a_jumptarget"] = n_a
n_b = res["b_jumptarget"]
if n_b == 0:
res["verdict"] = "A_better" # B 개요 없음(빈 jump-target)
elif res["junk_b"]:
res["verdict"] = "A_better" # B 가 cover junk 도입
elif n_b >= max(1, n_a * 0.7):
res["verdict"] = "B_better" if n_b > n_a else "equivalent"
else:
res["verdict"] = "A_better" # B 가 구조 상실(5209 absent-class)
res["notes"] = "absent_or_degraded"
res["refined_pass"] = res["verdict"] in ("B_better", "equivalent") and n_b >= 1
return res
async def cmd_run(args):
doc_ids = [int(x) for x in args.doc.split(",") if x.strip()] if args.doc else None
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
try:
async with sm() as session:
if doc_ids is None:
doc_ids = [r[0] for r in (await session.execute(text(
"SELECT DISTINCT doc_id FROM document_chunks WHERE source_type='hier_section' ORDER BY doc_id"))).all()]
results = []
for d in doc_ids:
r = await _measure_doc(session, d)
if r is not None:
results.append(r)
finally:
await engine.dispose()
total = len(results)
md_null = [r for r in results if r.get("md_null")]
measured = [r for r in results if not r.get("md_null")]
passes = [r for r in measured if r.get("refined_pass")]
pass_jt0 = [r for r in measured if r["verdict"] in ("B_better", "equivalent") and r["b_jumptarget"] == 0]
hash_stable = [r for r in passes if r.get("hash_stable_strict")]
hash_stable_99 = [r for r in passes if r.get("hash_stable_99")]
hash_changed = [r for r in passes if not r.get("hash_stable_strict")]
verdict_dist = Counter(r["verdict"] for r in measured)
dup_among_stable = [r for r in hash_stable if r.get("dup_title", 0) > 0]
fence_among_stable = [r for r in hash_stable if r.get("has_fence")]
print("=" * 64)
print(f"hier doc 측정: {total} (md_null {len(md_null)}, measured {len(measured)})")
print(f"verdict 분포: {dict(verdict_dist)}")
print(f"B_jumptarget==0 (PASS-verdict 이나 빈 jump-target, B3 HOLD): {len(pass_jt0)}")
print("-" * 64)
print(f"REFINED PASS = (verdict B>=A) AND (B_jumptarget>=1): {len(passes)}")
print(f" ├─ hash_stable (strict 100% position 재현 = g3-tU UPDATE-only): {len(hash_stable)}")
print(f" │ dup_title>0: {len(dup_among_stable)} / has_fence: {len(fence_among_stable)}")
print(f" │ (참고) hash_stable_99(원 MEASURE2 기준): {len(hash_stable_99)}")
print(f" └─ hash_changed (re-decompose 대상, g3-t2 --reprocess): {len(hash_changed)} ← ★ '230' 재확인 수치")
print("-" * 64)
print(f" re-decompose --doc(B_jumptarget>=1) = {','.join(str(r['doc_id']) for r in hash_changed) or '(없음)'}")
print(f" UPDATE-only --doc(hash_stable) = {','.join(str(r['doc_id']) for r in hash_stable) or '(없음)'}")
if md_null:
print(f" md_null(suspect, V4): {[r['doc_id'] for r in md_null]}")
print("=" * 64)
print("NOTE: '230' 은 hash_changed PASS 수치. 코드펜스-skip 으로 hash_stable 32 중 fence 보유분(measure3=2)이 "
"hash_changed 로 flip 가능 → 230~232 수용(NEW-3 budget-only, 정확성은 g3-tU 런타임 100% VERIFY 가 보증).")
if args.json:
with open(args.json, "w") as f:
json.dump({"summary": {
"total": total, "measured": len(measured), "refined_pass": len(passes),
"hash_stable": len(hash_stable), "hash_changed": len(hash_changed),
"b_jumptarget_0": len(pass_jt0), "md_null": [r["doc_id"] for r in md_null],
"hash_changed_doc_ids": [r["doc_id"] for r in hash_changed],
"hash_stable_doc_ids": [r["doc_id"] for r in hash_stable],
}, "docs": results}, f, ensure_ascii=False, indent=2)
print(f"[json] {args.json} 기록 ({len(results)} doc)")
def main():
ap = argparse.ArgumentParser(description="hier 개요 keep-better 게이트 + g-measure (read-only)")
sub = ap.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("run", help="전체(또는 --doc) 측정 + 분포 출력")
p.add_argument("--doc", default=None, help="comma-sep doc id (미지정=전 hier doc)")
p.add_argument("--json", default=None, help="per-doc 결과 JSON 덤프 경로")
args = ap.parse_args()
asyncio.run({"run": cmd_run}[args.cmd](args))
if __name__ == "__main__":
main()
+154 -19
View File
@@ -29,6 +29,7 @@ from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from ai.client import AIClient, parse_json_response, strip_thinking
from core.config import settings
from services.hier_decomp.builder import build_hier_tree
from services.hier_decomp.persist import persist_hier_tree
from services.search.llm_gate import Priority, acquire_mlx_gate
@@ -42,27 +43,48 @@ DOC_MIN_CHARS = 4000 # hier 분해가 의미 있는 doc 크기 하한(STRUCTUR
BUFFER_MIN = 10 # deadline 이 만큼 전 안전 중단
def _candidate_sql(allowlist, doc_ids=None):
"""allowlist 있으면 그 domain 만, 없으면 EXCLUDE_DOMAINS(news) 제외 전부.
doc_ids 명시 = doc (크기 게이트 DOC_MIN_CHARS + domain 필터 우회
구조화 소형 문서(법령 ) eval coverage 보정용. NOT EXISTS hier 멱등 가드는 유지).
작은 doc 먼저 = 완료 doc 최대화 + 단일 mega-doc 예산 독식 방지."""
# jump-target = 비-window leaf OR %_split parent (B1/B3 완료마커 + B_jumptarget 분모, 플랜 g3-t2).
# 이 집합만 char_start 를 받는다(window-child/preamble 은 설계상 NULL).
_JUMP_TARGET_PRED = r"((c.is_leaf AND c.node_type IS DISTINCT FROM 'window') OR c.node_type LIKE '%\_split' ESCAPE '\')"
def _candidate_sql(allowlist, doc_ids=None, reprocess=False):
"""body = d.md_content (g0-t1: hier 출처 md_content 영구확정 — extracted_text 폐기. char_start 가
md_content offset 이라 FE splice basis 일치해야 하므로 분해 source md_content 여야 [F1]).
reprocess=False (additive): 아직 hier 없는 doc 신규 분해 (NOT EXISTS hier_section 멱등).
reprocess=True (re-decompose): hier 있으나 jump-target char_start 아직 채워진 doc 재분해.
[B1] 완료마커 = jump-target char_start NOT NULL 행이 존재(= 재분해되면 atomic 하게 전부 채워짐);
window-child/preamble 설계상 NULL 이라 'all-leaf NOT NULL' 마커의 무한 trap 피한다.
[B3] jump-target doc(B_jumptarget==0) NOT EXISTS vacuous TRUE 영구 재선택 trap
호출측이 --doc REFINED PASS(B_jumptarget>=1) 제한해 차단(--reprocess --doc 필수, REFUSE).
doc_ids 명시 크기 게이트 우회. 작은 doc 먼저 = 완료 doc 최대화."""
if doc_ids:
cond, gate = "d.id = ANY(:doc_ids)", "" # 명시 doc = 크기 게이트 우회
else:
cond = ("lower(split_part(coalesce(d.ai_domain,''), '/', 1)) = ANY(:domains)"
if allowlist else
"lower(split_part(coalesce(d.ai_domain,''), '/', 1)) <> ALL(:exclude)")
gate = "AND length(d.extracted_text) > :minchars"
gate = "AND length(d.md_content) > :minchars"
if reprocess:
marker = f"""
AND EXISTS (SELECT 1 FROM document_chunks dc
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')
AND NOT EXISTS (SELECT 1 FROM document_chunks c
WHERE c.doc_id = d.id AND c.source_type = 'hier_section'
AND c.char_start IS NOT NULL AND {_JUMP_TARGET_PRED})"""
else:
marker = """
AND NOT EXISTS (SELECT 1 FROM document_chunks dc
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')"""
return text(f"""
SELECT d.id AS doc_id, d.extracted_text AS body, d.ai_domain AS ai_domain
SELECT d.id AS doc_id, d.md_content AS body, d.ai_domain AS ai_domain
FROM documents d
WHERE d.extracted_text IS NOT NULL
WHERE d.md_content IS NOT NULL AND length(d.md_content) > 0
{gate}
AND {cond}
AND NOT EXISTS (SELECT 1 FROM document_chunks dc
WHERE dc.doc_id = d.id AND dc.source_type = 'hier_section')
ORDER BY length(d.extracted_text) ASC
{marker}
ORDER BY length(d.md_content) ASC
""")
@@ -77,10 +99,11 @@ def _candidate_params(allowlist, doc_ids=None):
return p
def _scope_label(allowlist, doc_ids=None):
def _scope_label(allowlist, doc_ids=None, reprocess=False):
tag = "RE-DECOMPOSE" if reprocess else "additive"
if doc_ids:
return f"doc-list={len(doc_ids)}건(크기게이트 우회)"
return f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}"
return f"doc-list={len(doc_ids)}건(크기게이트 우회, {tag})"
return (f"allowlist={allowlist}" if allowlist else f"all-except={EXCLUDE_DOMAINS}") + f" ({tag})"
# 멱등 leaf 선별 (재실행 시 이미 분석된 leaf 제외)
LEAF_SQL = text("""
@@ -177,14 +200,19 @@ def _parse_doc_ids(args):
async def cmd_dry_run(args):
allowlist = args.domains.split(",") if args.domains else None
doc_ids = _parse_doc_ids(args)
reprocess = getattr(args, "reprocess", False)
if reprocess and not doc_ids:
print("REFUSE: --reprocess 는 --doc <list> 필수 (B3 빈 jump-target trap 차단 — REFINED PASS 리스트만)")
sys.exit(2)
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
async with sm() as session:
rows = (await session.execute(_candidate_sql(allowlist, doc_ids),
rows = (await session.execute(_candidate_sql(allowlist, doc_ids, reprocess),
_candidate_params(allowlist, doc_ids))).mappings().all()
await engine.dispose()
gate_lbl = "doc-list" if doc_ids else f">{DOC_MIN_CHARS}"
print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids)}, {gate_lbl}, 미분해)")
state_lbl = "재분해 미완료(jump-target char_start 부재)" if reprocess else "미분해"
print(f"[dry-run] 후보 doc {len(rows)} ({_scope_label(allowlist, doc_ids, reprocess)}, {gate_lbl}, {state_lbl})")
if rows:
lens = [len(r["body"]) for r in rows]
print(f" 본문길이: min={min(lens)} p50={int(statistics.median(lens))} max={max(lens)} 합={sum(lens):,}")
@@ -196,11 +224,16 @@ async def cmd_dry_run(args):
async def cmd_run(args):
allowlist = args.domains.split(",") if args.domains else None
doc_ids = _parse_doc_ids(args)
reprocess = getattr(args, "reprocess", False)
if reprocess and not doc_ids:
_log("REFUSE: --reprocess 는 --doc <list> 필수 (B3 빈 jump-target trap 차단 — REFINED PASS 리스트만)")
sys.exit(2)
skip_analysis = getattr(args, "skip_analysis", False)
deadline = _compute_deadline(args.deadline)
stop_at = (deadline - timedelta(minutes=BUFFER_MIN)).timestamp()
_log(f"deadline={deadline:%m-%d %H:%M} (buffer {BUFFER_MIN}m → stop_at={datetime.fromtimestamp(stop_at):%H:%M}) "
f"{_scope_label(allowlist, doc_ids)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}")
f"{_scope_label(allowlist, doc_ids, reprocess)}{' [SKIP-ANALYSIS: 분해+임베딩만]' if skip_analysis else ''}"
f"{' [RE-DECOMPOSE: 기존 hier DELETE→CASCADE chunk_section_analysis→재INSERT; 스냅샷 선행 필수]' if reprocess else ''}")
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
@@ -219,7 +252,7 @@ async def cmd_run(args):
run_start = time.time()
try:
async with sm() as session:
cands = (await session.execute(_candidate_sql(allowlist, doc_ids),
cands = (await session.execute(_candidate_sql(allowlist, doc_ids, reprocess),
_candidate_params(allowlist, doc_ids))).mappings().all()
_log(f"후보 doc {len(cands)} 선별. 시작.")
@@ -268,6 +301,101 @@ async def cmd_run(args):
d = Counter(all_types)
_log(f" section_type: {dict(d.most_common())} other={d.get('other',0)/len(all_types):.1%}")
# [g3-t3/g3-t4] post-run sweep: 처리한 doc 중 미분석 leaf 잔여 집계(반쪽상태/stall 검출).
# GOAL(jump=char_start)/rail-summary(re-analyze) DECOUPLE — 잔여는 다음 실행이 LEAF_SQL 멱등으로 흡수.
if doc_ids:
try:
async with sm() as session:
pending = (await session.execute(text(f"""
SELECT dc.doc_id, count(*) AS unanalyzed
FROM document_chunks dc
WHERE dc.doc_id = ANY(:ids) AND dc.source_type='hier_section' AND dc.is_leaf=true
AND NOT EXISTS (SELECT 1 FROM chunk_section_analysis a
WHERE a.chunk_id = dc.id AND a.prompt_version = :pv
AND a.source_content_hash = dc.chunk_content_hash)
GROUP BY dc.doc_id ORDER BY unanalyzed DESC"""),
{"ids": doc_ids, "pv": PROMPT_VERSION})).mappings().all()
if pending:
tot = sum(r["unanalyzed"] for r in pending)
_log(f" [sweep] 미분석 leaf 잔여: {tot} (doc {len(pending)}) — 다음 실행이 이어서 분석(멱등). "
f"상위: {[(r['doc_id'], r['unanalyzed']) for r in pending[:5]]}")
else:
_log(" [sweep] 미분석 leaf 잔여 0 — 분석 수렴.")
except Exception as exc:
_log(f" [sweep] 잔여 집계 실패(무해): {type(exc).__name__}")
def _is_jump_target(node) -> bool:
"""jump-target = 비-window leaf OR %_split parent (builder HierNode 판정, _JUMP_TARGET_PRED 와 일치)."""
return ((node.is_leaf and node.node_type != "window")
or bool(node.node_type and node.node_type.endswith("_split")))
async def cmd_update_char_start(args):
"""[g3-tU] hash_stable doc 전용 비파괴 char_start UPDATE.
doc: build(md_content) stored hier 행과 position-by-position(chunk_index ) 정렬
[NEW-1] jump-target 전수 100% hash 일치(ALL-OR-NOTHING) VERIFY. 자리라도 불일치 DEMOTE.
[NEW-2] hash WHERE 하지 않음(동일-body 충돌 회피) position stored row PK(id) UPDATE.
통과 doc: UPDATE document_chunks SET char_start (DELETE/CASCADE/embed/analyze 0, 가역).
미달 doc: DEMOTE-LIST emit re-decompose 배치에 UNION(NEW-4). stdout 마지막에 DEMOTE_DOC_IDS= 출력.
"""
doc_ids = _parse_doc_ids(args)
if not doc_ids:
_log("REFUSE: update-char-start 는 --doc <list> 필수 (hash_stable 32 = gm-t1 산출)")
sys.exit(2)
engine = _make_engine()
sm = async_sessionmaker(engine, expire_on_commit=False)
updated, demoted, noop = [], [], []
try:
for doc_id in doc_ids:
async with sm() as session:
md = await session.scalar(text("SELECT md_content FROM documents WHERE id=:d"), {"d": doc_id})
if not md or not md.strip():
noop.append(doc_id)
_log(f" doc={doc_id} md_content 없음 → no-op(suspect, V4)")
continue
nodes = build_hier_tree(md)
stored = (await session.execute(text("""
SELECT id, chunk_index, chunk_content_hash, node_type, is_leaf
FROM document_chunks
WHERE doc_id=:d AND source_type='hier_section'
ORDER BY chunk_index"""), {"d": doc_id})).mappings().all()
# [NEW-2] position 정렬: build node[i] ↔ stored[i] (chunk_index = base + idx 라 동일 순서).
# 노드 수가 다르면 구조 변경 = hash_changed → DEMOTE.
if len(nodes) != len(stored):
demoted.append(doc_id)
_log(f" doc={doc_id} 노드수 build {len(nodes)} ≠ stored {len(stored)} → DEMOTE(re-decompose)")
continue
# [NEW-1] 전 position hash 일치 VERIFY (position-alignment 가 ordering 도 검증).
# 임의 position 불일치 → DEMOTE (jump-target 1% miss 도 whole-doc 폴백 회귀를 부르므로 100%).
mismatch = next((i for i, (nd, sr) in enumerate(zip(nodes, stored))
if nd.chunk_content_hash != sr["chunk_content_hash"]), None)
if mismatch is not None:
demoted.append(doc_id)
_log(f" doc={doc_id} position {mismatch} hash 불일치 → DEMOTE(re-decompose, NEW-1)")
continue
# 통과 → jump-target 의 char_start 를 stored row PK 로 UPDATE.
n_upd = 0
for nd, sr in zip(nodes, stored):
if _is_jump_target(nd) and nd.char_start is not None:
await session.execute(
text("UPDATE document_chunks SET char_start=:cs WHERE id=:id"),
{"cs": nd.char_start, "id": sr["id"]})
n_upd += 1
await session.commit()
updated.append(doc_id)
_log(f" ✓ doc={doc_id} char_start UPDATE {n_upd} jump-target (VERIFY 100%, 비파괴)")
finally:
await engine.dispose()
_log(f"=== update-char-start: updated={len(updated)} demoted={len(demoted)} noop={len(noop)} ===")
if demoted:
_log(f" DEMOTE(re-decompose 배치 합류, NEW-4): {demoted}")
if noop:
_log(f" NO-OP(md_content NULL suspect, V4): {noop}")
# 기계가독: re-decompose --doc = (gm-t1 hash_changed 230) UNION (이 리스트)
print("DEMOTE_DOC_IDS=" + ",".join(str(x) for x in demoted), flush=True)
def main():
ap = argparse.ArgumentParser(description="오버나이트 hier 분해+절 분석 backfill (additive)")
@@ -275,13 +403,20 @@ def main():
p_dry = sub.add_parser("dry-run", help="후보 doc 집계 (작업 0)")
p_dry.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
p_dry.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
p_dry.add_argument("--reprocess", action="store_true", help="재분해 후보(기존 hier+jump-target char_start 부재) — --doc 필수")
p_run = sub.add_parser("run", help="분해+분석 실행 (deadline time-box)")
p_run.add_argument("--deadline", default="07:00", help="HH:MM (기본 07:00 — 컨테이너 UTC 주의, 07:00 KST=22:00 UTC)")
p_run.add_argument("--domains", default=None, help="comma-sep allowlist (미지정=뉴스 제외 전부)")
p_run.add_argument("--doc", default=None, help="comma-sep doc id (크기 게이트 우회 — 구조화 소형 문서 coverage 보정)")
p_run.add_argument("--skip-analysis", action="store_true", help="절 분석(Mac mini) 생략, 분해+임베딩만 (retrieval go/no-go 측정 준비용)")
p_run.add_argument("--reprocess", action="store_true",
help="[g3-t2] RE-DECOMPOSE: 기존 hier DELETE→CASCADE→재INSERT (md_content 출처, char_start). "
"--doc(REFINED PASS hash_changeddemote) 필수 / 스냅샷 선행 필수")
p_upd = sub.add_parser("update-char-start",
help="[g3-tU] hash_stable doc 비파괴 char_start UPDATE (100% VERIFY, --doc 필수)")
p_upd.add_argument("--doc", default=None, help="comma-sep doc id (gm-t1 hash_stable 32)")
args = ap.parse_args()
fn = {"dry-run": cmd_dry_run, "run": cmd_run}[args.cmd]
fn = {"dry-run": cmd_dry_run, "run": cmd_run, "update-char-start": cmd_update_char_start}[args.cmd]
asyncio.run(fn(args))
+12
View File
@@ -0,0 +1,12 @@
FROM python:3.12-slim
WORKDIR /srv
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
EXPOSE 8765
HEALTHCHECK --interval=30s --timeout=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8765/health')"
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8765"]
+3
View File
@@ -0,0 +1,3 @@
fastapi>=0.111.0
uvicorn>=0.30.0
asyncpg>=0.29.0
+202
View File
@@ -0,0 +1,202 @@
"""crawl-health — 전 소스 헬스 패널 1차 (A-8, plan crawl-24x7-1)
읽기 전용 내부 운영 패널. 의존 = 기존 수집 상태(news_sources/source_health/documents/
processing_queue SELECT ) 쓰기 0.
[1] 소스별 last success / 수집 건수 추이(24h/7d) / 연속 실패 / circuit 상태 /
피드 streak + fulltext 승격/격하 통계 + 백로그. -RSS 소스(C-2 sitemap )
같은 표면이 수용 (fetch_method 컬럼 표시 '구독 소스 패널' 좁히지 않는 소스 일반화).
[2 범위 ] B-3 상태 계약 도착 세션 + [재로그인 시도] 버튼(enqueue 방식).
노출: 별도 바인딩만 compose Tailscale 인터페이스(100.110.63.63)에만 publish.
vhost/경로 가드 방식 금지 (r4: '덜 깨짐' 속성 상실). 레벨 인증 없음 =
Tailscale 도달성만이 경계 (fab-server 선례).
"""
import html
import logging
import os
from contextlib import asynccontextmanager
import asyncpg
from fastapi import FastAPI
from fastapi.responses import HTMLResponse, JSONResponse
logger = logging.getLogger("crawl_health")
DSN = os.environ.get("CRAWL_HEALTH_DSN", "")
_pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(_app: FastAPI):
global _pool
_pool = await asyncpg.create_pool(DSN, min_size=1, max_size=3)
yield
await _pool.close()
app = FastAPI(lifespan=lifespan)
async def _collect_data() -> dict:
async with _pool.acquire() as conn:
sources = await conn.fetch(
"""
SELECT s.id, s.name, s.country, s.enabled, s.feed_type, s.fetch_method,
s.fulltext_policy, s.last_fetched_at,
h.circuit_state, h.consecutive_failures, h.last_success_at,
h.last_error, h.last_error_at, h.last_fetch_items, h.empty_streak,
h.total_fetches, h.total_failures
FROM news_sources s
LEFT JOIN source_health h ON h.source_id = s.id
ORDER BY s.enabled DESC, s.name
"""
)
counts = await conn.fetch(
"""
SELECT s.id,
count(d.id) FILTER (WHERE d.extracted_at > now() - interval '24 hours') AS items_24h,
count(d.id) AS items_7d
FROM news_sources s
LEFT JOIN documents d
ON d.source_channel = 'news'
AND d.extracted_at > now() - interval '7 days'
AND d.file_path LIKE 'news/' || s.name || '/%'
GROUP BY s.id
"""
)
queue = await conn.fetch(
"""
SELECT stage::text AS stage, status::text AS status, count(*) AS n,
min(created_at) FILTER (WHERE status = 'pending') AS oldest_pending
FROM processing_queue
WHERE stage IN ('fulltext', 'summarize', 'embed', 'chunk')
AND status IN ('pending', 'processing', 'failed')
GROUP BY 1, 2
ORDER BY 1, 2
"""
)
fulltext = await conn.fetch(
"""
SELECT extract_meta -> 'fulltext' ->> 'status' AS status, count(*) AS n
FROM documents
WHERE source_channel = 'news' AND extract_meta ? 'fulltext'
GROUP BY 1
"""
)
count_map = {r["id"]: r for r in counts}
return {
"sources": [
{**dict(r),
"items_24h": count_map.get(r["id"], {}).get("items_24h", 0),
"items_7d": count_map.get(r["id"], {}).get("items_7d", 0)}
for r in sources
],
"queue": [dict(r) for r in queue],
"fulltext": [dict(r) for r in fulltext],
}
@app.get("/health")
async def health():
"""Liveness — Docker healthcheck 용 (DB 미접근, 프로세스 생존만)."""
return {"status": "ok", "service": "crawl-health"}
@app.get("/api/health.json")
async def api_health():
data = await _collect_data()
# asyncpg Record 의 datetime → isoformat 직렬화
def _ser(v):
return v.isoformat() if hasattr(v, "isoformat") else v
return JSONResponse({
k: [{kk: _ser(vv) for kk, vv in row.items()} for row in v]
for k, v in data.items()
})
def _chip(state: str | None, enabled: bool) -> str:
if not enabled:
return '<span class="chip off">OFF</span>'
if state == "disabled":
return '<span class="chip err">DISABLED</span>'
if state == "open":
return '<span class="chip warn">OPEN</span>'
return '<span class="chip ok">OK</span>'
def _fmt_ts(v) -> str:
return v.strftime("%m-%d %H:%M") if v else "-"
@app.get("/", response_class=HTMLResponse)
async def index():
data = await _collect_data()
rows = []
for s in data["sources"]:
err = html.escape((s.get("last_error") or "")[:80])
warn_cls = ""
if s["enabled"] and (s.get("consecutive_failures") or 0) >= 3:
warn_cls = ' class="row-warn"'
elif s["enabled"] and (s.get("empty_streak") or 0) >= 8:
warn_cls = ' class="row-warn"'
rows.append(
f"<tr{warn_cls}>"
f"<td>{html.escape(s['name'])}</td>"
f"<td>{_chip(s.get('circuit_state'), s['enabled'])}</td>"
f"<td>{html.escape(s.get('fetch_method') or 'rss')}</td>"
f"<td>{html.escape(s.get('fulltext_policy') or 'none')}</td>"
f"<td class='num'>{s['items_24h']}</td>"
f"<td class='num'>{s['items_7d']}</td>"
f"<td class='num'>{s.get('consecutive_failures') or 0}</td>"
f"<td class='num'>{s.get('empty_streak') or 0}</td>"
f"<td>{_fmt_ts(s.get('last_success_at'))}</td>"
f"<td>{_fmt_ts(s.get('last_fetched_at'))}</td>"
f"<td class='err-text'>{err}</td>"
f"</tr>"
)
qrows = [
f"<tr><td>{html.escape(q['stage'])}</td><td>{html.escape(q['status'])}</td>"
f"<td class='num'>{q['n']}</td><td>{_fmt_ts(q.get('oldest_pending'))}</td></tr>"
for q in data["queue"]
]
frows = [
f"<tr><td>{html.escape(f['status'] or '-')}</td><td class='num'>{f['n']}</td></tr>"
for f in data["fulltext"]
]
body = f"""<!DOCTYPE html>
<html lang="ko"><head><meta charset="utf-8">
<title>crawl-health 소스 헬스 패널</title>
<style>
body {{ font-family: -apple-system, 'Apple SD Gothic Neo', sans-serif; background: #f5f1e8;
color: #3d3a33; margin: 0; padding: 28px; }}
h1 {{ font-size: 19px; margin: 0 0 4px; }} h2 {{ font-size: 14px; margin: 26px 0 8px; }}
.sub {{ color: #8a8474; font-size: 12px; margin-bottom: 18px; }}
table {{ border-collapse: collapse; width: 100%; background: #fffdf8; font-size: 12.5px; }}
th, td {{ border: 1px solid #e3ddcd; padding: 5px 9px; text-align: left; }}
th {{ background: #ece6d6; font-weight: 600; white-space: nowrap; }}
td.num {{ text-align: right; font-variant-numeric: tabular-nums; }}
td.err-text {{ color: #9a4a3a; font-size: 11.5px; max-width: 320px; }}
tr.row-warn td {{ background: #fbf0e4; }}
.chip {{ display: inline-block; padding: 1px 8px; border-radius: 9px; font-size: 11px; font-weight: 600; }}
.chip.ok {{ background: #dce8d4; color: #3c5a2e; }}
.chip.warn {{ background: #f3e0b8; color: #7a5a14; }}
.chip.err {{ background: #eecfc6; color: #8a2f1d; }}
.chip.off {{ background: #e3ddcd; color: #6e6859; }}
</style></head><body>
<h1>crawl-health 소스 헬스 패널</h1>
<div class="sub">A-8 1 (피드 수집 헬스) · 내부 전용 (Tailscale 바인딩) · 새로고침 = 실시간 조회</div>
<h2>소스 ({len(rows)})</h2>
<table><tr><th>소스</th><th>circuit</th><th>fetch</th><th>fulltext</th><th>24h</th><th>7d</th>
<th>연속실패</th><th>빈피드</th><th>last success</th><th>last fetch</th><th>last error</th></tr>
{''.join(rows)}</table>
<h2>처리 (fulltext / summarize / embed / chunk)</h2>
<table><tr><th>stage</th><th>status</th><th>건수</th><th>oldest pending</th></tr>
{''.join(qrows) or '<tr><td colspan="4">백로그 없음</td></tr>'}</table>
<h2>fulltext 승격 누적</h2>
<table><tr><th>status</th><th>건수</th></tr>
{''.join(frows) or '<tr><td colspan="2">기록 없음 (파일럿 전환 전)</td></tr>'}</table>
</body></html>"""
return HTMLResponse(body)
+109 -28
View File
@@ -1,12 +1,18 @@
"""marker-service — POST /convert: PDF → markdown + 추출 이미지 base64.
Phase 1B (2026-05-01) 텍스트만 응답, 이미지 폐기.
Phase 1B.5 ( 변경) `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
Phase 1B.5 `_images` 직렬화해서 base64 응답에 포함. NAS write 권한이
없는 stateless 변환기 유지 (fastapi NAS persist 담당).
D-1 (plan crawl-24x7-1, 2026-06-10) idle-unload 운영 전환:
MARKER_PRELOAD=0 : startup warmup ( /convert lazy load)
MARKER_IDLE_UNLOAD_MINUTES : N분 유휴 모델 해제 (0=비활성, 기존 동작)
/ready idle(미적재)에서도 200 fastapi depends_on service_healthy
lazy 모드에서 영구 미기동으로 굳는 방지. 503 warmup_failed 한정.
plan: ~/.claude/plans/piped-humming-crystal.md
"""
import base64
import gc
import hashlib
import io
import logging
@@ -40,6 +46,12 @@ _warmup_done = False
_warmup_error: str | None = None
_warmup_lock = threading.Lock()
# D-1 idle-unload 상태 — 전이는 전부 _warmup_lock 아래
_PRELOAD = os.getenv("MARKER_PRELOAD", "1") != "0"
_IDLE_UNLOAD_MINUTES = int(os.getenv("MARKER_IDLE_UNLOAD_MINUTES", "0"))
_inflight = 0
_last_used = time.monotonic()
# 이미지 응답 cap. base64 응답 크기 폭주 방지. 사용자 PDF 풀 측정 (Phase 1D) 시
# 가장 이미지 많은 문서가 ~30건 수준 → 200 은 안전 마진. 초과 시 truncate flag 응답.
MAX_IMAGES_PER_DOC = int(os.getenv("MARKER_MAX_IMAGES_PER_DOC", "200"))
@@ -68,11 +80,67 @@ def _ensure_warmup() -> None:
raise
def _acquire_models():
"""warmup 보장 + inflight 진입을 원자적으로 — ensure 직후 reaper 가 해제하는 경합 차단."""
global _inflight
while True:
_ensure_warmup()
with _warmup_lock:
if _warmup_done:
_inflight += 1
return
# ensure 와 lock 재진입 사이에 unload 가 끼어든 희귀 경합 — 재시도
def _release_models():
global _inflight, _last_used
with _warmup_lock:
_inflight -= 1
_last_used = time.monotonic()
def _maybe_unload() -> None:
"""유휴 시 모델 해제. 변환 중(inflight>0)이면 절대 해제하지 않는다.
split 변환의 배치 사이 간격은 단위 N>=1 임계면 배치 사이 해제 없음.
"""
global _models, _converter, _warmup_done
with _warmup_lock:
if not _warmup_done or _inflight > 0:
return
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
return
_models = None
_converter = None
_warmup_done = False
gc.collect()
try:
import torch
torch.cuda.empty_cache()
except Exception:
pass
logger.info(f"[marker-service] idle-unload: 모델 해제 (유휴 {_IDLE_UNLOAD_MINUTES}분 초과)")
async def _idle_reaper():
import asyncio
while True:
await asyncio.sleep(60)
try:
_maybe_unload()
except Exception:
logger.exception("[marker-service] idle reaper 오류")
@app.on_event("startup")
async def startup():
"""startup hook — async warmup 백그라운드. /ready 가 완료 여부 노출."""
"""startup hook — warmup 은 MARKER_PRELOAD 게이트 (D-1: lazy 기본 전환은 compose 가)."""
import asyncio
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
if _PRELOAD:
asyncio.create_task(asyncio.to_thread(_ensure_warmup))
if _IDLE_UNLOAD_MINUTES > 0:
asyncio.create_task(_idle_reaper())
logger.info(f"[marker-service] idle-unload 활성: {_IDLE_UNLOAD_MINUTES}")
class ConvertRequest(BaseModel):
@@ -111,7 +179,12 @@ def health():
@app.get("/ready")
async def ready(response: Response):
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출."""
"""Round 4 #1+#2: Response.status_code 명시 + warmup_error 노출.
D-1: idle(미적재) = 200. 503 warmup_failed 한정 lazy 모드에서 fastapi
depends_on service_healthy 영구 미기동으로 굳지 않게. 배포 검증에서
'status=ready' 단언하던 runbook 강제 warm 호출(/convert 1) 대체.
"""
if _warmup_error:
response.status_code = 503
return {
@@ -121,31 +194,28 @@ async def ready(response: Response):
"error": _warmup_error,
}
if not _warmup_done:
response.status_code = 503
return {
"status": "warming_up",
"status": "warming_up" if _PRELOAD else "idle",
"engine": "marker",
"engine_version": _engine_version,
"models_loaded": False,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
}
return {
"status": "ready",
"engine": "marker",
"engine_version": _engine_version,
"models_loaded": True,
"inflight": _inflight,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
}
@app.post("/convert", response_model=ConvertResponse)
async def convert(req: ConvertRequest):
_ensure_warmup()
p = Path(req.file_path)
if not p.is_file():
raise HTTPException(404, detail={"code": "file_not_found", "message": str(p)})
start = time.monotonic()
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
converter = _converter
if req.start_page is not None and req.end_page is not None:
if req.start_page < 1 or req.end_page < req.start_page:
raise HTTPException(
@@ -155,22 +225,33 @@ async def convert(req: ConvertRequest):
"message": f"start_page={req.start_page} end_page={req.end_page}",
},
)
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
try:
rendered = converter(str(p))
except Exception as exc:
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
raise HTTPException(
status_code=422,
detail={
"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}",
},
) from exc
md_text, _meta, raw_images = text_from_rendered(rendered)
elapsed_ms = int((time.monotonic() - start) * 1000)
# D-1: warmup 보장 + inflight 진입 원자화 — 변환 중 reaper 해제 차단. 해제는 finally.
_acquire_models()
try:
start = time.monotonic()
# page range 지정 시 per-request converter (모델 _models 재사용 → reload 없음).
# invariant: req.start_page/end_page = 1-based inclusive → marker 0-based 로 변환.
converter = _converter
if req.start_page is not None and req.end_page is not None:
page_range = list(range(req.start_page - 1, req.end_page)) # 0-based inclusive
converter = PdfConverter(artifact_dict=_models, config={"page_range": page_range})
try:
rendered = converter(str(p))
except Exception as exc:
logger.exception(f"[marker-service] conversion failed path={p}: {exc}")
raise HTTPException(
status_code=422,
detail={
"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}",
},
) from exc
md_text, _meta, raw_images = text_from_rendered(rendered)
elapsed_ms = int((time.monotonic() - start) * 1000)
finally:
_release_models()
images_payload, truncated = _serialize_images(raw_images, str(p))
+18
View File
@@ -0,0 +1,18 @@
# B-3 / A-1 Tier 2 (plan crawl-24x7-1) — Playwright 격리 컨테이너.
# 브라우저 hang/크래시가 fastapi APScheduler 를 잠식하지 않게 별도 서비스로 격리,
# 타임아웃 있는 HTTP 호출로만 사용. 요청당 브라우저 기동 = 컨텍스트 누적 메모리 차단.
FROM mcr.microsoft.com/playwright/python:v1.47.0-jammy
WORKDIR /srv
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY server.py .
# root 로 Chromium 실행 시 sandbox 비활성 강제됨 — 이미지 내장 pwuser(uid 1000)로 실행.
# /auth ro mount(호스트 hyungi uid 1000, mode 600)도 동일 uid 라 판독 가능.
USER pwuser
# internal-only — compose 네트워크 전용, host 포트 미매핑 (caddy 라우트 금지)
EXPOSE 3400
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "3400"]
@@ -0,0 +1,3 @@
fastapi==0.115.*
uvicorn==0.32.*
playwright==1.47.0
+107
View File
@@ -0,0 +1,107 @@
"""B-3 구독 세션 Playwright fetcher (plan crawl-24x7-1).
storage_state JSON(쿠키+localStorage 스냅샷) 기반 인증 페이지 fetch + 내용 기반 probe.
- 동시 1 인스턴스 (글로벌 세마포어) 계정 보호 + 사람 속도는 호출측 politeness 담당.
- 요청당 브라우저 기동/종료 컨텍스트 메모리 누적·hang 잔존 차단 (저빈도라 기동비용 무관).
- 세션 파일: /auth/{profile}.json (호스트 ~/.local/share/crawl-auth/, ro mount, 600).
부재 = 503 profile_missing (silent fallback 없음 호출측이 degrade).
- 시간 기반 만료 판정 금지 probe 알려진 유료 기사에서 본문 길이 + 페이월 마커 부재 검증
(만료 200 '페이월 안내문' 본문으로 저장되는 silent corruption 차단).
"""
import asyncio
import logging
from pathlib import Path
from fastapi import FastAPI, HTTPException
from playwright.async_api import async_playwright, Error as PlaywrightError
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
logger = logging.getLogger("playwright-fetcher")
AUTH_DIR = Path("/auth")
NAV_TIMEOUT_MS = 45_000
SETTLE_MS = 1_500 # domcontentloaded 후 lazy 본문 settle 대기
app = FastAPI(title="playwright-fetcher")
_browser_slot = asyncio.Semaphore(1) # 동시 1 인스턴스 (B-3 ① persistent 제약과 동일 규율)
class FetchReq(BaseModel):
url: str
profile: str = Field(pattern=r"^[a-z0-9_-]{1,50}$")
class ProbeReq(BaseModel):
profile: str = Field(pattern=r"^[a-z0-9_-]{1,50}$")
probe_url: str
min_body_chars: int = 800
paywall_markers: list[str] = []
def _state_path(profile: str) -> Path:
p = AUTH_DIR / f"{profile}.json"
if not p.is_file():
raise HTTPException(503, detail={"error_reason": "profile_missing", "profile": profile})
return p
async def _browse(url: str, state: Path) -> tuple[str, str, str]:
"""(html, final_url, visible_text). 요청당 브라우저 — 종료를 finally 로 보장."""
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
try:
context = await browser.new_context(
storage_state=str(state),
viewport={"width": 1366, "height": 900},
locale="fr-FR",
)
page = await context.new_page()
await page.goto(url, wait_until="domcontentloaded", timeout=NAV_TIMEOUT_MS)
await page.wait_for_timeout(SETTLE_MS)
html = await page.content()
final_url = page.url
text = await page.evaluate("document.body ? document.body.innerText : ''")
return html, final_url, text
finally:
await browser.close()
@app.get("/health")
def health():
profiles = sorted(p.stem for p in AUTH_DIR.glob("*.json")) if AUTH_DIR.is_dir() else []
return {"status": "ok", "profiles": profiles}
@app.post("/fetch")
async def fetch(req: FetchReq):
state = _state_path(req.profile)
async with _browser_slot:
try:
html, final_url, _ = await _browse(req.url, state)
except PlaywrightError as e:
logger.warning("fetch 실패 %s: %s", req.url, e)
raise HTTPException(502, detail={"error_reason": "browse_failed", "message": str(e)[:300]})
logger.info("fetch ok profile=%s %s (%d bytes)", req.profile, req.url, len(html))
return {"html": html, "final_url": final_url}
@app.post("/probe")
async def probe(req: ProbeReq):
"""내용 기반 세션 probe — ok=False 사유를 명시 반환 (호출측이 health 에 기록)."""
state = _state_path(req.profile)
async with _browser_slot:
try:
_, final_url, text = await _browse(req.probe_url, state)
except PlaywrightError as e:
return {"ok": False, "reason": f"browse_failed: {str(e)[:200]}", "body_chars": 0}
body_chars = len(text.strip())
hit = next((m for m in req.paywall_markers if m and m.lower() in text.lower()), None)
if hit:
return {"ok": False, "reason": f"paywall_marker: {hit}", "body_chars": body_chars}
if body_chars < req.min_body_chars:
return {"ok": False, "reason": f"body_too_short: {body_chars} < {req.min_body_chars}",
"body_chars": body_chars}
logger.info("probe ok profile=%s (%d chars, final=%s)", req.profile, body_chars, final_url)
return {"ok": True, "reason": None, "body_chars": body_chars}
+83 -27
View File
@@ -1,14 +1,23 @@
"""STT 마이크로서비스 — faster-whisper (GPU) 기반 음성 전사.
filePath {text, segments:[{start,end,text}]}.
모델은 startup 에서 eager preload (Docker /ready healthcheck 모델 적재까지 검증).
기본 모델 large-v3 (VRAM ~3GB, float16). 환경변수로 교체 가능.
환경변수 `STT_PRELOAD=0` 으로 lazy 강제 가능 (개발/테스트용).
D-1 (plan crawl-24x7-1, 2026-06-10) idle-unload 운영 전환:
STT_PRELOAD=0 : startup eager preload ( 요청 lazy load)
STT_IDLE_UNLOAD_MINUTES: N분 유휴 모델 해제 (0=비활성, 기존 동작).
faster-whisper=CTranslate2 torch 미설치 해제는
참조 제거 + gc (CTranslate2 소멸 VRAM 반환).
콜드로드 수초~수십 초는 호출측(stt_worker read=1800s) 흡수. healthcheck
cuda 가용성 기준 (compose) 모델 적재는 이상 상시 상태가 아니다.
"""
import asyncio
import gc
import logging
import os
import threading
import time
import unicodedata
from contextlib import asynccontextmanager
from pathlib import Path
@@ -17,18 +26,26 @@ from fastapi import FastAPI
logger = logging.getLogger("stt")
_IDLE_UNLOAD_MINUTES = int(os.getenv("STT_IDLE_UNLOAD_MINUTES", "0"))
@asynccontextmanager
async def lifespan(_app: FastAPI):
# startup: 모델 eager preload 시도. 실패해도 프로세스는 살아 있고
# /ready 가 false 로 남아 healthcheck 가 unhealthy 처리.
# /ready 의 models_loaded 가 false 로 남는다.
if os.getenv("STT_PRELOAD", "1") != "0":
try:
_load_model()
logger.info("stt model preloaded: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
except Exception as e:
logger.exception("stt model preload failed: %s", e)
reaper = None
if _IDLE_UNLOAD_MINUTES > 0:
reaper = asyncio.create_task(_idle_reaper())
logger.info("stt idle-unload 활성: %d", _IDLE_UNLOAD_MINUTES)
yield
if reaper:
reaper.cancel()
app = FastAPI(lifespan=lifespan)
@@ -38,6 +55,11 @@ _MODEL_NAME = os.getenv("WHISPER_MODEL", "large-v3")
_DEVICE = os.getenv("WHISPER_DEVICE", "cuda")
_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "float16")
# load/unload/inflight 상태 전이는 전부 이 lock 아래 (cold 동시 요청 이중 로드 방지 포함)
_model_lock = threading.Lock()
_inflight = 0
_last_used = time.monotonic()
def _resolve_path(file_path: str) -> Path | None:
"""NFC(DB) vs NFD(NFS) 한글 경로 정규화 차이 흡수. OCR 서비스와 동일 패턴."""
@@ -61,14 +83,38 @@ def _resolve_path(file_path: str) -> Path | None:
def _load_model():
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유."""
"""faster-whisper lazy loading — 첫 호출 시만 VRAM 점유. lock 으로 이중 로드 방지."""
global _model
if _model is not None:
return _model
from faster_whisper import WhisperModel
with _model_lock:
if _model is None:
from faster_whisper import WhisperModel
logger.info("stt model loading: %s (%s, %s)", _MODEL_NAME, _DEVICE, _COMPUTE_TYPE)
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
return _model
_model = WhisperModel(_MODEL_NAME, device=_DEVICE, compute_type=_COMPUTE_TYPE)
return _model
def _maybe_unload() -> None:
"""유휴 시 모델 해제. 처리 중(inflight>0)이면 절대 해제하지 않는다."""
global _model
with _model_lock:
if _model is None or _inflight > 0:
return
if time.monotonic() - _last_used < _IDLE_UNLOAD_MINUTES * 60:
return
_model = None
gc.collect()
logger.info("stt idle-unload: whisper 모델 해제 (유휴 %d분 초과)", _IDLE_UNLOAD_MINUTES)
async def _idle_reaper():
while True:
await asyncio.sleep(60)
try:
_maybe_unload()
except Exception:
logger.exception("stt idle reaper 오류")
def _cuda_device_count() -> int:
@@ -87,7 +133,7 @@ def health():
@app.get("/ready")
def ready():
"""Readiness — CUDA + 모델 상태. 배포 검증용."""
"""Readiness — CUDA + 모델 상태. healthcheck 는 cuda 만 본다 (D-1 idle-unload)."""
count = _cuda_device_count()
cuda_ok = count > 0
models_loaded = _model is not None
@@ -98,6 +144,8 @@ def ready():
"models_loaded": models_loaded,
"model": _MODEL_NAME,
"compute_type": _COMPUTE_TYPE,
"idle_unload_minutes": _IDLE_UNLOAD_MINUTES,
"inflight": _inflight,
}
@@ -121,6 +169,7 @@ async def transcribe(body: dict):
"duration": 1832.5
}
"""
global _inflight, _last_used
raw_path = body["filePath"]
langs = body.get("langs")
beam_size = int(body.get("beamSize", 5))
@@ -129,28 +178,35 @@ async def transcribe(body: dict):
if resolved is None:
return {"error": f"파일 없음: {raw_path}", "text": "", "segments": []}
model = _load_model()
with _model_lock:
_inflight += 1
try:
model = _load_model()
language = None
if isinstance(langs, list) and len(langs) == 1:
language = langs[0]
language = None
if isinstance(langs, list) and len(langs) == 1:
language = langs[0]
segments_iter, info = model.transcribe(
str(resolved),
beam_size=beam_size,
language=language,
vad_filter=True,
)
segments_iter, info = model.transcribe(
str(resolved),
beam_size=beam_size,
language=language,
vad_filter=True,
)
segments = []
parts = []
for seg in segments_iter:
segments.append({
"start": round(float(seg.start), 2),
"end": round(float(seg.end), 2),
"text": seg.text.strip(),
})
parts.append(seg.text)
segments = []
parts = []
for seg in segments_iter:
segments.append({
"start": round(float(seg.start), 2),
"end": round(float(seg.end), 2),
"text": seg.text.strip(),
})
parts.append(seg.text)
finally:
with _model_lock:
_inflight -= 1
_last_used = time.monotonic()
return {
"text": " ".join(p.strip() for p in parts).strip(),
File diff suppressed because one or more lines are too long
+1
View File
@@ -0,0 +1 @@
{"body":{"pageNo":1,"totalCount":1,"numOfRows":5,"items":{"item":[{"filenm":"컨베이어에 끼임.pdf","filepath":"https://portal.kosha.or.kr/openapi/v1/file/down/stdboard/B2025022104002/202605281621537G75H2/D0801000010001","boardno":"202605281621537G75H2"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
+1
View File
@@ -0,0 +1 @@
{"body":{"pageNo":1,"totalCount":6334,"numOfRows":3,"items":{"item":[{"business":"제조업","contents":"2026.01.00(월) 07:30경, 경기도 소재 OOOO(주)에서 재해자가 골재 이송 컨베이어 상부의 이물질을 제거하던 중,다리가 컨베이어 벨트와 테일 풀리 (Tail Pulley)* 사이에 끼임 *컨베이어의 아래쪽 끝단에서 회전하며 벨트를 순환시키는 원통형 기계장치","atcflcnt":1,"keyword":"컨베이어에 끼임","boardno":"202605281621537G75H2"},{"business":"건설업","contents":"2025. 8. 00. (금) 11:12 경 경기도 소재 OOO 신축공 사현장에서 데크플레이트 설치 중 밟고 있던 미고정 데크플레이트가 탈락하며 약 7m 높이에서 추락함","atcflcnt":1,"keyword":"데크플레이트 설치 작업 중 추락","boardno":"20260528162031VZLE93"},{"business":"건설업","contents":"2025. 06. 00.(금) 12:35경, 경북 봉화군 소재 (주)OOOO 침전저류지 현장에서 타워크레인 전도 후 매립된 케이크*(오염토)를 굴착 및 운반 작업 중, 사면의 토사와 타워크레인 기초구조물이 무너지며 하단에서 작업 중이던 굴착기가 매몰됨 * 분말 상태의 원료에서 아연을 채취한 후 남은 중금속 부산물(산화칼슘, 납, 산화철, 황산 등)을 장기간 매립하여 만들어지는 고체 형태의 오염 토양 덩어리","atcflcnt":1,"keyword":"사면 굴착 작업 중 매몰","boardno":"20260527153100O7QX25"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
+1
View File
@@ -0,0 +1 @@
{"body":{"pageNo":1,"totalCount":1039,"numOfRows":3,"items":{"item":[{"techGdlnNm":"구리에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-1-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883045/7"},{"techGdlnNm":"마그네슘에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-4-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883165/3"},{"techGdlnNm":"백금에 대한 작업환경측정,분석 기술지침","techGdlnNo":"A-6-2018","techGdlnOfancYmd":"2018-11-27","fileDownloadUrl":"https://portal.kosha.or.kr/openapi/v1/file/down/FL00015883187/3"}]}},"header":{"resultCode":"00","resultMsg":"NORMAL_CODE"}}
@@ -0,0 +1,95 @@
"""builder.py char_start emit 단위테스트 (플랜 ds-outline-anchor-b5 g2 / g0-t2).
핵심 불변식:
- char_start = FE outlineAnchors.ts 라인/offset 모델(split('\n') + UTF-16 code unit + 코드펜스) 동일.
- astral(BMP ) prefix 있어도 UTF-16 code unit offset 이어야 (#2 SILENT 단위버그 게이트).
- window-child char_start=None, split-parent char_start=heading offset (B1/#1).
- 코드펜스 내부 heading 미탐지 (O3).
- 라인모델 변경이 node.text 바꾸지 않음(hash-neutral) hash_stable doc 보존.
"""
from __future__ import annotations
import hashlib
from app.services.hier_decomp.builder import build_hier_tree, coverage_stats, _utf16_units
def _fe_offset_of_line(md: str, target_line: str) -> int | None:
"""FE outlineAnchors.ts:55-65 재현 — char_start 가 이 값과 같아야 함."""
off = 0
for raw in md.split("\n"):
if raw == target_line:
return off
off += len(raw.encode("utf-16-le")) // 2 + 1
return None
def _u16_slice(md: str, cs: int, n: int) -> str:
return md.encode("utf-16-le")[2 * cs: 2 * (cs + n)].decode("utf-16-le")
def test_char_start_matches_fe_offset_and_slices():
md = "# Alpha\nbody alpha here\n\n## Beta\nbody beta\n# Gamma\nlast line"
nodes = build_hier_tree(md, leaf_hard_max=100000)
seen = 0
for n in nodes:
if n.char_start is None:
continue
seen += 1
head = n.text.split("\n", 1)[0]
assert n.char_start == _fe_offset_of_line(md, head), n.section_title
assert _u16_slice(md, n.char_start, _utf16_units(head)) == head
assert seen >= 2
def test_astral_prefix_offset_is_utf16_not_codepoint():
# 📄 = U+1F4C4 = 1 code point 이나 UTF-16 surrogate pair(2 code unit).
md = "\U0001F4C4 manifest\n\n# Section One\nbody"
nodes = build_hier_tree(md)
sec = next(n for n in nodes if n.section_title == "Section One")
fe = _fe_offset_of_line(md, "# Section One")
assert sec.char_start == fe
# UTF-16 슬라이스는 정확
assert _u16_slice(md, sec.char_start, _utf16_units("# Section One")) == "# Section One"
# code-point 슬라이스는 어긋나야 함(astral 때문에) — 단위버그가 있었다면 이게 통과했을 것
assert md[sec.char_start: sec.char_start + len("# Section One")] != "# Section One"
def test_fenced_heading_not_detected():
md = "# Real\nintro\n```\n# Fake In Fence\n```\n# Real Two\nx"
titles = [n.section_title for n in build_hier_tree(md) if n.section_title]
assert "Fake In Fence" not in titles
assert "Real" in titles and "Real Two" in titles
def test_window_child_null_split_parent_has_offset():
md = "# BigSection\n" + ("paragraph text here. " * 20 + "\n\n") * 60
nodes = build_hier_tree(md, leaf_hard_max=5000, leaf_target_max=3000)
sp = [n for n in nodes if n.node_type and n.node_type.endswith("_split")]
wc = [n for n in nodes if n.node_type == "window"]
assert sp and sp[0].char_start is not None
assert wc and all(w.char_start is None for w in wc)
def test_node_text_preserved_hash_neutral():
# 라인모델(split vs splitlines) 변경에도 leaf 이어붙이면 원문 재구성 → hash 불변.
md = "# A\nl1\nl2\n# B\nl3\n# C\nl4\n"
nodes = build_hier_tree(md, leaf_hard_max=100000)
recon = "".join(n.text for n in nodes if n.is_leaf or (n.node_type and n.node_type.endswith("_split")))
assert recon == md
def test_preamble_char_start_none():
md = "intro paragraph with no heading\nmore intro\n# First\nbody"
nodes = build_hier_tree(md, leaf_hard_max=100000)
preamble = [n for n in nodes if n.section_title is None and n.level == 0]
assert preamble and preamble[0].char_start is None
def test_coverage_stats_char_start_telemetry():
md = "# Alpha\nbody\n# Beta\nbody2"
nodes = build_hier_tree(md, leaf_hard_max=100000)
st = coverage_stats(md, nodes)
assert st["char_start_total"] >= 2
assert st["char_start_verified"] == st["char_start_total"] # 모두 O5 통과
assert st["non_nfc"] == 0
+139
View File
@@ -0,0 +1,139 @@
"""crawl-24x7 사이클 2 — 순수 함수/형태 회귀 테스트 (DB 불요).
Guardian 호출 형태 + fixture 응답 파싱 + 채널 정체성 + B-5 quirk.
fixture = tests/fixtures/guardian_open_platform_search_response.json
(2026-06-10 실키 live 박제, api-key 응답 본문 미포함 확인 [[feedback_external_api_fixture_first]]).
"""
import json
import re
from pathlib import Path
from workers.news_collector import (
_article_hash,
_doc_identity,
_guardian_request,
_normalize_category,
)
FIXTURE = Path(__file__).parent / "fixtures" / "guardian_open_platform_search_response.json"
def _make_source(**kw):
"""ORM 인스턴스 없이 속성만 흉내 (식별성 함수는 속성 접근만 사용)."""
class S:
pass
s = S()
s.source_channel = kw.get("source_channel", "news")
s.parser_quirk = kw.get("parser_quirk")
return s
class TestGuardianCallShape:
def test_request_shape_matches_fixture_recipe(self):
"""fixture 박제 시 사용한 호출과 단일 source-of-truth 정합
([[feedback_fixture_first_call_shape]])."""
endpoint, params = _guardian_request(
"https://content.guardianapis.com/search?section=world", "KEY"
)
assert endpoint == "https://content.guardianapis.com/search"
assert params["section"] == "world"
assert params["show-fields"] == "bodyText,trailText"
assert params["order-by"] == "newest"
assert params["api-key"] == "KEY"
def test_feed_url_query_overridden_by_fixed_fields(self):
# feed_url 에 show-fields 가 잘못 박혀 있어도 고정 필드가 이긴다 (dict merge 순서)
_, params = _guardian_request(
"https://content.guardianapis.com/search?section=world&show-fields=headline", "K"
)
assert params["show-fields"] == "bodyText,trailText"
class TestGuardianFixtureParsing:
def test_fixture_response_shape(self):
payload = json.loads(FIXTURE.read_text())["response"]
assert payload["status"] == "ok"
assert payload["results"], "fixture 에 결과 0건"
for item in payload["results"]:
assert item["webTitle"].strip()
assert item["webUrl"].startswith("https://")
assert "webPublicationDate" in item
assert "sectionName" in item
fields = item.get("fields") or {}
assert "bodyText" in fields and "trailText" in fields
def test_fixture_bodytext_is_fulltext_grade(self):
payload = json.loads(FIXTURE.read_text())["response"]
# 전문 게이트(200자)를 fixture 가 통과해야 어댑터 is_full 경로가 산다
assert any(len(i["fields"]["bodyText"]) >= 200 for i in payload["results"])
def test_fixture_contains_no_api_key(self):
assert "api-key" not in FIXTURE.read_text()
class TestChannelIdentity:
def test_news_channel_unchanged(self):
ident = _doc_identity(_make_source(source_channel="news"), "경향신문", "Society")
assert ident == {
"path_prefix": "news",
"ai_domain": "News",
"ai_tags": ["News/경향신문/Society"],
}
def test_crawl_channel_domain_identity(self):
ident = _doc_identity(_make_source(source_channel="crawl"), "TWI", "Engineering")
assert ident["path_prefix"] == "crawl"
assert ident["ai_domain"] == "Engineering"
assert ident["ai_tags"] == ["Engineering/TWI"]
def test_crawl_channel_unknown_category_falls_back(self):
ident = _doc_identity(_make_source(source_channel="crawl"), "X", "Other")
assert ident["ai_domain"] == "Domain"
def test_category_map_has_domain_axes(self):
assert _normalize_category("안전") == "Safety"
assert _normalize_category("Engineering") == "Engineering"
assert _normalize_category("철학") == "Philosophy"
class TestSkipVideoQuirk:
PATTERN = re.compile(r"/videos?/")
def test_video_urls_match(self):
assert self.PATTERN.search("https://psyche.co/videos/some-film")
assert self.PATTERN.search("https://aeon.co/video/another")
def test_article_urls_pass(self):
assert not self.PATTERN.search("https://psyche.co/ideas/how-to-think")
class TestRedirect304Distinction:
"""httpx is_redirect 가 304(3xx 전체)에 True 라 redirect 로 오인 → 조건부 GET
안정 피드가 'redirect 3회 초과' 전멸하던 버그. has_redirect_location 으로 구분."""
def test_304_is_not_a_redirect_location(self):
import httpx
r = httpx.Response(304, request=httpx.Request("GET", "https://x/"))
assert r.is_redirect is True # httpx 함정: 304 도 is_redirect
assert r.has_redirect_location is False # 우리가 써야 하는 정확한 판별
def test_real_redirect_has_location(self):
import httpx
r = httpx.Response(301, headers={"location": "https://y/"},
request=httpx.Request("GET", "https://x/"))
assert r.has_redirect_location is True
def test_collector_uses_has_redirect_location(self):
import inspect
from workers import news_collector
src = inspect.getsource(news_collector._fetch_rss)
assert "has_redirect_location" in src
assert "while resp.is_redirect" not in src # 옛 버그 패턴 부재
class TestArticleHashStability:
def test_static_corpus_hash_deterministic(self):
a = _article_hash("Creep and Creep Failures", "static", "National Board 기술 아티클")
b = _article_hash("Creep and Creep Failures", "static", "National Board 기술 아티클")
assert a == b and len(a) == 32