Files
hyungi_document_server/app/workers/fulltext_worker.py
T
hyungi 1842f27d89 feat(news): crawl-24x7 사이클 2 — B-2/B-3/C-1/C-2/C-3/C-5 (마이그 324-326)
- 채널 인지화: news_sources.source_channel(324, documents enum 재사용) →
  문서 생성 정체성(_doc_identity)·embed/chunk 30일 게이트(crawl=전량 색인)·
  extract 후속 override(crawl→classify, preview 스킵) 분기.
- B-2 Guardian Open Platform: API 디스패치(호스트 분기, 미지 호스트=명시 실패)
  + show-fields=bodyText 전문 어댑터. fixture live 박제 + call-shape 테스트.
- B-3 구독지: playwright-fetcher 격리 컨테이너(동시 1·요청당 브라우저·storage_state
  ro mount) + politeness 사람속도(30-60s) 브라우저 경로 + fulltext 인증 라우팅
  (내용 기반 probe 게이트·relogin_requested 소비=open-스킵보다 앞·본문 페이월 마커
  게이트) + source_health probe 컬럼(325) + 세션 박제 스크립트(맥북용).
- C-2 KOSHA: 3 API live 검증·fixture 박제(board/attach/guide) — 재해사례 daily diff
  +첨부 PDF/HWP→extract 파이프라인, GUIDE 일일 cap 점진 백필(silent cap 금지 로그).
  키는 URL 직결합(재인코딩 함정 회피). daily 06:40 KST.
- C-3 정적 코퍼스: National Board 86 + TWI job-knowledge 153 일괄 CLI(멱등·politeness
  ·crawl_raw 보존·fulltext_worker 승격 필드 규약 동일).
- C-1/C-5 시드(326): 전 URL live 검증 — UK HSE(feed-full)/안전신문/고용노동부 3종
  (rss/*.do)/OSHA/EU-OSHA(후보)/SEP/1000-Word(feed-full)/Doing Philosophy/Aeon/Psyche
  (skip-video quirk).

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-10 15:08:18 +09:00

321 lines
13 KiB
Python

"""fulltext 승격 워커 (A-2 + A-7, plan crawl-24x7-1)
news_collector 가 fulltext_policy='page' 소스의 기사에 enqueue 한 'fulltext' stage 를 소비:
기사 페이지 politeness fetch (A-4) → 원본 HTML NAS gzip 보존 (A-7)
→ extract_worker 4-tier 재사용 (tier 2 sibling .md 는 디스크 원본이 없어 비적용)
→ extracted_text/md_content 승격 → summarize + (30일 게이트) embed/chunk enqueue.
실패 처리 (큐 어휘 = DB enum, 분기만 워커):
- 일시 오류 (5xx/timeout) : raise → 큐 재시도 (max_attempts 3)
- 차단/비대상 (403/429/robots/비HTML/추출부족): RSS 요약으로 격하(degrade) 후 완료
→ summarize/embed/chunk enqueue 보장 (기사 유실 0). 격하 사유는 extract_meta.fulltext 에 기록.
- 영구 실패 (3회 소진) : 야간 reconcile_unresolved() 가 summarize 안전망 enqueue
([[feedback_silent_skip_accumulation]] — 조건부 skip 이 영구 침묵으로 누적되지 않게).
승격 게이트: 전 tier 공통 본문 >= 200자 (devonagent 와 달리 tier 4 도 게이트 적용 —
페이월/오류 페이지의 nav 찌꺼기를 본문으로 승격하느니 RSS 요약 격하가 낫다).
"""
import gzip
import hashlib
import re
from datetime import datetime, timezone
from pathlib import Path
from sqlalchemy import exists, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased
from core.config import settings
from core.crawl_politeness import (
CrawlBlocked,
CrawlFetchError,
CrawlSkip,
fetch_page,
fetch_page_via_browser,
probe_session,
)
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from models.news_source import NewsSource
from models.queue import ProcessingQueue, enqueue_stage
from workers.extract_worker import (
_WEB_MIN_BODY_LEN,
_extract_web_with_bs4,
_extract_web_with_readability,
_extract_web_with_trafilatura,
)
logger = setup_logger("fulltext_worker")
# 한국 기사 푸터 1층 후처리 (A-2) — 보수적으로 라인 단위만 제거
_FOOTER_PATTERNS = [
re.compile(r"^.{0,120}(무단\s*전재|무단\s*복제|재배포\s*금지|저작권자\s*[ⓒ©(]).*$", re.M),
re.compile(r"^[\w.+-]+@[\w.-]+\.[A-Za-z]{2,}\s*$", re.M), # 단독 이메일 라인
re.compile(r"^\s*\S{2,4}\s*기자\s*$", re.M), # 단독 '◯◯◯ 기자' 라인
]
def _strip_article_footer(body: str) -> str:
for pat in _FOOTER_PATTERNS:
body = pat.sub("", body)
return re.sub(r"\n{3,}", "\n\n", body).strip()
def _extract_body(html_text: str) -> tuple[str, str | None, str | None]:
"""(body, engine, engine_version). 전 tier >= 200자 게이트, 미달이면 ("", None, None)."""
body, ver = _extract_web_with_trafilatura(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "trafilatura", ver
body, ver = _extract_web_with_readability(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "readability", ver
body, ver = _extract_web_with_bs4(html_text)
if body and len(body) >= _WEB_MIN_BODY_LEN:
return body, "bs4_text", ver
return "", None, None
def _raw_html_path(source_id: int | None, file_hash: str, now: datetime) -> Path:
"""A-7 원본 보존 경로 — NAS 본진. 한글 디렉토리의 NFC/NFD 비대칭을 피해 source_id 사용.
file_hash 는 DB 컬럼이 character(64) 라 32자 해시가 공백 패딩되어 돌아옴 — strip 필수
(미적용 시 NAS 파일명에 공백 32개 = 쉘/rsync 함정).
"""
src_dir = f"src_{source_id}" if source_id is not None else "src_unknown"
return (
Path(settings.nas_mount_path) / "crawl_raw" / src_dir
/ now.strftime("%Y-%m") / f"{file_hash.strip()}.html.gz"
)
def _save_raw_html(path: Path, html_text: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with gzip.open(path, "wb") as f:
f.write(html_text.encode("utf-8", errors="replace"))
async def _enqueue_downstream(session: AsyncSession, doc: Document) -> None:
"""승격/격하 공통 후속 — summarize 무조건 + 30일 게이트 통과 시 embed/chunk."""
await enqueue_stage(session, doc.id, "summarize")
published_raw = (doc.extract_meta or {}).get("published_at")
if doc.source_channel == "crawl":
# 도메인 재료 코퍼스 — 발행일 무관 전량 색인 (30일 게이트는 뉴스 전용)
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
return
days_old = 0
if published_raw:
try:
pub_dt = datetime.fromisoformat(published_raw)
days_old = (datetime.now(timezone.utc) - pub_dt).days
except ValueError:
days_old = 0 # 파싱 불가 = 신규 취급 (수집 시점 기본과 동일)
if days_old <= 30:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
def _set_fulltext_meta(doc: Document, **fields) -> None:
"""extract_meta.fulltext 갱신 — JSONB 변경 감지를 위해 dict 재할당."""
meta = dict(doc.extract_meta or {})
meta["fulltext"] = {**meta.get("fulltext", {}), **fields}
doc.extract_meta = meta
_PROBE_TTL_SECONDS = 6 * 3600 # probe 유효 시간 — 만료 시 배치 경계에서 재검증
async def _auth_session_ready(session: AsyncSession, source: NewsSource) -> tuple[bool, str]:
"""B-3 ② 내용 기반 probe 게이트 + relogin_requested 소비 (수동 half-open).
플래그 소비는 '불가용 스킵' 분기보다 앞 — 어댑터 틱마다 도달 (r5 데드 버튼 함정 고정).
probe 실패 상태에서는 auth fetch 0회 (자동 재시도 루프 = 계정 잠금 직행 — B-3 ③).
복구 경로 = storage_state 갱신 후 relogin_requested 플래그 set (수동).
probe 설정은 source.selector_override JSONB: probe_url / min_body_chars / paywall_markers.
"""
from workers.news_collector import _get_or_create_health
health = await _get_or_create_health(session, source.id)
now = datetime.now(timezone.utc)
cfg = source.selector_override or {}
probe_url = cfg.get("probe_url")
force = False
if health.relogin_requested:
health.relogin_requested = False # 소비 = 1회 half-open 시도
health.updated_at = now
force = True
logger.info(f"[fulltext/auth] {source.name} relogin_requested 소비 — half-open probe")
if not force:
if health.last_probe_ok is False:
return False, "probe 실패 상태 (storage_state 갱신 + relogin_requested 대기)"
if (
health.last_probe_ok
and health.last_probe_at
and (now - health.last_probe_at).total_seconds() < _PROBE_TTL_SECONDS
):
return True, ""
if not probe_url:
return False, "selector_override.probe_url 미설정"
result = await probe_session(
source.auth_profile,
probe_url,
int(cfg.get("min_body_chars", 800)),
list(cfg.get("paywall_markers", [])),
)
health.last_probe_at = now
health.last_probe_ok = bool(result.get("ok"))
health.updated_at = now
if not health.last_probe_ok:
logger.warning(f"[fulltext/auth] {source.name} probe 실패: {result.get('reason')}")
return False, str(result.get("reason"))
logger.info(f"[fulltext/auth] {source.name} probe OK ({result.get('body_chars')}자)")
return True, ""
async def _degrade(session: AsyncSession, doc: Document, reason: str) -> None:
"""본문 승격 실패 — RSS 요약 그대로 후속 단계 진행 (기사 유실 0)."""
_set_fulltext_meta(
doc, status="degraded", reason=reason[:300],
resolved_at=datetime.now(timezone.utc).isoformat(),
)
await _enqueue_downstream(session, doc)
logger.warning(f"[fulltext] doc={doc.id} 격하(RSS 요약 유지): {reason}")
async def process(document_id: int, session: AsyncSession) -> None:
"""기사 1건 풀텍스트 승격. queue_consumer 컨벤션 시그니처 (커밋은 consumer 가)."""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.edit_url:
await _degrade(session, doc, "edit_url 없음")
return
meta = doc.extract_meta or {}
source_id = meta.get("source_id")
# B-3: 구독 소스(auth_profile)는 Playwright 세션 fetch — probe 게이트 선행
source = await session.get(NewsSource, source_id) if source_id else None
auth_profile = source.auth_profile if source is not None else None
if auth_profile:
ready, why = await _auth_session_ready(session, source)
if not ready:
await _degrade(session, doc, f"구독 세션 불가용: {why}")
return
try:
if auth_profile:
html_text, final_url = await fetch_page_via_browser(doc.edit_url, auth_profile)
else:
html_text, final_url = await fetch_page(doc.edit_url)
except (CrawlBlocked, CrawlSkip) as e:
await _degrade(session, doc, f"{type(e).__name__}: {e}")
return
except CrawlFetchError:
raise # 일시 오류 — 큐 재시도
now = datetime.now(timezone.utc)
# A-7: 원본 HTML 보존 (추출기 교체 시 전체 재추출 가능 상태 유지)
raw_path = _raw_html_path(source_id, doc.file_hash, now)
try:
_save_raw_html(raw_path, html_text)
raw_saved = True
except OSError as e:
# NAS 일시 장애 시 보존만 누락하고 승격은 진행 — 사유 기록 (silent 누락 회피)
raw_saved = False
logger.error(f"[fulltext] doc={doc.id} 원본 보존 실패 (승격은 진행): {e}")
body, engine, engine_ver = _extract_body(html_text)
if not engine:
await _degrade(session, doc, f"추출 실패 (전 tier < {_WEB_MIN_BODY_LEN}자)")
return
clean_body = _strip_article_footer(body.replace("\x00", ""))
if len(clean_body) < _WEB_MIN_BODY_LEN:
await _degrade(session, doc, "푸터 제거 후 본문 부족")
return
# B-3: 추출 결과도 페이월 마커로 게이트 — probe 통과 후 만료된 세션의
# '페이월 안내문' 본문 승격(silent corruption) 차단 + 즉시 probe 상태 강등
if auth_profile:
from workers.news_collector import _get_or_create_health
markers = (source.selector_override or {}).get("paywall_markers", [])
hit = next((m for m in markers if m and m.lower() in clean_body.lower()), None)
if hit:
health = await _get_or_create_health(session, source.id)
health.last_probe_ok = False
health.updated_at = datetime.now(timezone.utc)
await _degrade(session, doc, f"본문 페이월 마커 검출({hit}) — 세션 손상 의심")
return
title = doc.title or ""
doc.extracted_text = f"{title}\n\n{clean_body}" if title else clean_body
doc.extracted_at = now
doc.extractor_version = f"rss+page@{engine}"
doc.md_content = clean_body
doc.md_status = "success"
doc.md_extraction_engine = engine
doc.md_extraction_engine_version = engine_ver
doc.md_format_version = "1.0"
doc.md_generated_at = now
doc.md_source_hash = hashlib.sha256(html_text.encode("utf-8", errors="replace")).hexdigest()
doc.md_content_hash = hashlib.sha256(clean_body.encode("utf-8")).hexdigest()
doc.md_extraction_error = None # 수집 시점의 '변환 비대상' 마커 해제
doc.content_origin = "extracted"
doc.file_size = len(doc.extracted_text.encode())
_set_fulltext_meta(
doc, status="promoted", engine=engine,
raw_html_path=str(raw_path) if raw_saved else None,
final_url=final_url, body_chars=len(clean_body),
resolved_at=now.isoformat(),
)
await _enqueue_downstream(session, doc)
logger.info(
f"[fulltext/{engine}] doc={doc.id} {len(clean_body)}자 승격 "
f"(raw={'saved' if raw_saved else 'MISSING'})"
)
async def reconcile_unresolved() -> None:
"""안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌
뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치."""
async with async_session() as session:
# 외부 쿼리 FROM 에 ProcessingQueue 가 이미 있어 alias 없이는 auto-correlation 이
# 서브쿼리 FROM 을 전부 제거 → InvalidRequestError (queue_consumer.reset_stale_items 패턴)
pq = aliased(ProcessingQueue)
summarize_q = (
select(pq.id)
.where(
pq.document_id == Document.id,
pq.stage == "summarize",
)
)
result = await session.execute(
select(Document)
.join(ProcessingQueue, ProcessingQueue.document_id == Document.id)
.where(
ProcessingQueue.stage == "fulltext",
ProcessingQueue.status == "failed",
Document.source_channel == "news",
~exists(summarize_q),
)
.limit(200)
)
docs = result.scalars().unique().all()
for doc in docs:
_set_fulltext_meta(doc, status="failed_reconciled")
await _enqueue_downstream(session, doc)
if docs:
await session.commit()
logger.warning(f"[fulltext] reconcile: 영구 실패 {len(docs)}건 RSS 요약으로 후속 enqueue")