Compare commits

...

35 Commits

Author SHA1 Message Date
hyungi 381fcfc675 ops(ci): 전체 app 부팅 스모크 (boot_smoke.py) — GPU 격리 deploy-blocker 게이트
lifespan 실 경로(init_db + 전 worker import + 전 add_job)를 prod 이미지 컨테이너 +
ephemeral PG 로 실행해 router/worker import 오류·잡 등록 오류를 검출. NAS/scheduler.start/
prewarm 3개 부작용만 중립화(prod/AI 무접촉). GPU 실측 PASS: routes=173·jobs=34·schema 361·health ok.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 15:49:28 +09:00
hyungi 3ff1d7c65d fix(migrations): R1 baseline 런타임 버그 3건 — init_db asyncpg 경로 (R1 fix)
★실제 init_db() 런타임 검증(psql migration_smoke 가 못 잡는 asyncpg 경로)에서 발견·수정:
1. baseline 덤프에 CREATE TABLE schema_migrations 포함 → init_db 가 IF NOT EXISTS 로 선-CREATE
   후 baseline 이 재-CREATE 충돌. --exclude-table=schema_migrations 재덤프(init_db 가 소유).
2. baseline 은 multi-statement 인데 exec_driver_sql(asyncpg prepared)은 multi-statement 불허
   ('cannot insert multiple commands into a prepared statement'). raw asyncpg simple 프로토콜
   execute() 로 적재(같은 connection = 트랜잭션 내).
3. 마이그 360(10 DROP)·361(DELETE+CREATE)이 multi-statement → init_db 적용 실패. 360=콤마구분
   단일 DROP, 361=단일 CREATE UNIQUE INDEX(prod 중복0·fresh 빈테이블이라 dedup DELETE 불요).

★검증: scripts/ci/initdb_runtime_test.py 로 실제 init_db 2회 — 1st(fresh: baseline 262 스탬프 +
359/360/361 적용, documents·purge_col·cand_drop·attempt_unique 전부 확인), 2nd(멱등 skip) PASS.
psql migration_smoke 도 PASS 유지.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:59:47 +09:00
hyungi 884ea1e669 docs(news): url normalizer 채널별 의도적 divergence 명시 — 통합 금지 박제 (R11b)
audit 의 dup-url-normalizer-divergent 는 design intent 오탐: news._normalize_url 은 query-식별
사이트(hada.io?id=·HN item?id=) 별개 기사 붕괴 방지 위해 보수적(query 보존·sort/trailing-slash/
소문자화 안 함), file_watcher._canonicalize_url 은 web_clip dedup 위해 공격적 정규화 — 채널별
의도된 차이. 통합하면 news dedup 가 깨진다(docstring 경고). 두 함수 docstring 에 상호 cross-ref +
'통합 금지' 명시해 미래 잘못된 통합 차단. 동작 변경 0(주석만).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:40:56 +09:00
hyungi 523c509954 refactor(news): 3 fetch Document 빌드 _build_news_doc 헬퍼 통합 (R11c)
_fetch_rss/_fetch_api_guardian/_fetch_api_nyt 의 22필드 Document 빌드가 정적 동일
(필드키 22개 동순서 실측) — 채널별 차이는 body(NYT=summary)·extractor_version·ident(category
계산)뿐이라 인자화. _build_news_doc 헬퍼로 통합 = 동작 보존(정적 검증). news_collector
god-file 중복 30줄×3 → 1 헬퍼.

검증: py_compile 통과, doc=Document( 직접빌드 0건. ★채널별 ingest smoke(staging)로 3 경로
동등 확인 권장.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:39:27 +09:00
hyungi 205a7bf3d5 fix(study): attempt (quiz_session_id, study_question_id) partial UNIQUE (R9)
submit_attempt FOR UPDATE(3ba9537) 1차 방어에 더해 DB 레벨 belt-and-suspenders — 모바일
더블탭/재시도가 어떤 경로로든 이중 attempt INSERT 에 도달해도 차단. prod 실측 중복 0
(GROUP BY HAVING count>1 = 0)이라 안전 — dedup DELETE 멱등 precaution + partial UNIQUE
(quiz_session_id IS NOT NULL). 세션 외 직접입력(NULL)은 비대상.

검증: migration_smoke PASS(post-baseline 361 적용). ★FOR UPDATE 가 정상경로선 막으므로
이 제약은 거의 트리거 안 됨 — 트리거 시 IntegrityError→500(should-never-happen 가시화);
graceful 409 변환이 필요하면 submit_attempt 에 try/except 추가 가능(별도).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:36:24 +09:00
hyungi 4d5f35b26e refactor(news): 3 fetch 공통 존재체크 _already_ingested 헬퍼 추출 (R11c)
_fetch_rss/_fetch_api_guardian/_fetch_api_nyt 가 복제하던 동일 존재체크
(file_hash 또는 edit_url.in_([normalized,link]) 매칭) 를 단일 헬퍼로 — byte-identical
블록이라 동작 100% 보존. news_collector god-file 중복 일부 감소.

(채널별 Document 빌드 30줄 3중복 통합은 채널별 필드 차이 검증 필요 → staging/별도.)
검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:30:47 +09:00
hyungi df4b07d29c refactor(library): facet-counts 4블록 헬퍼 추출 — 중복 제거 (R10)
company/topic/year/doctype 4 facet 집계 블록이 거의 동일 복붙(각 base_query 재구성 +
다른 3축 필터 적용). 적용된 facet 필터를 applied dict 로 모으고 '자기 자신 축 제외' 헬퍼
_facet_count(name, col, order_by, value_fn)로 추출 — 쿼리/자기제외/order_by(year=desc·others=count)/
value 매핑(year=str) 모두 동일 보존. 동작 무변경(staging 에서 facet 카운트 동등성 확인 권장).

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:28:33 +09:00
hyungi 3729083dc0 perf(search): synthesis 캐시 TTL enforce + chunk news_source DB 필터 (R10)
- synthesis_service: _CACHE 가 ts 없이 result 만 저장해 CACHE_TTL(1h) 미적용 → 원문 수정돼도
  CACHE_MAXSIZE 찰 때까지 stale answer 반환. (ts, result) tuple + get_cached 에서 만료 pop
  (query_rewriter expire_at 정본 복제).
- chunk_worker: 문서마다 news_sources 전량 로드 후 Python prefix 루프 → DB 필터 푸시다운
  ((name==source_name) | startswith(source_name+' ')). split[0]==source_name 과 동치, autoescape.

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:24:03 +09:00
hyungi 455a5a66ff fix(classify): ai_suggestion library 블록 is None 가드 — material 제안 clobber 방지 (R9)
거래문서(LIBRARY_SUGGESTION_DOCTYPES) 제안이 doc.ai_suggestion is None 체크 없이 덮어써,
material 제안 블록(material_type 제안)이 이미 점유한 ai_suggestion 을 clobber 하던 비대칭.
material 블록과 동일하게 is None 가드 추가 — 주석의 '기존 제안 우선' 사상 일치.

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:19:20 +09:00
hyungi 124b50af53 perf(events): list_events total 을 DB COUNT 푸시다운 (R10)
전체 Event.id 를 메모리 로딩 후 len() 하던 것을 select(func.count(Event.id)) 로 전환 —
행 수에 선형이던 메모리/전송 비용 제거. 결과 동등(단순 카운트라 golden-diff 불요).

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:18:12 +09:00
hyungi 0d3c841577 feat(migrations): 스키마 baseline 스냅샷 — fresh-DB/DR 부팅 fix (R1)
R0 가 입증했듯 migrations/ 전체 replay 는 011(view active_documents 가 documents.embedding
의존, DROP COLUMN CASCADE 부재)·326(enum-same-txn) 등 누적 비-replayable 로 깨져 신규/DR
환경 init_db 부팅이 불가능했다. 표준 squash baseline 로 해소:
- migrations/_baseline/0358_schema_baseline.sql: prod 스키마 스냅샷(pg_dump --schema-only
  --no-owner --no-privileges, psql 메타·search_path='' 정리 = asyncpg exec_driver_sql 호환).
- init_db._load_baseline_if_fresh: documents 테이블 부재(fresh) 시 baseline 적재 +
  schema_migrations 1..358 스탬프 → 이후 post-baseline(359/360)만 적용. ★기존 DB(documents
  존재)는 skip = prod 무영향(additive). baseline 부재 시 기존 replay 경로(하위호환).
- migration_smoke: baseline 경로 검증. ★실측 — 이전 FAIL(011 abort) → 이제 FRESH/INCREMENTAL
  모두 PASS (pg16.14). cutoff(_BASELINE_CUTOFF=358) 갱신 시 baseline 재생성.

검증: py_compile + migration_smoke PASS. ★boot-path 변경이라 deploy 전 staging 부팅 검증 필수.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:16:21 +09:00
hyungi 690b22fe58 fix(hardening): collect-lock TOCTOU 제거 (R9) + tier_backfill fstring allowlist (R12)
- news.collect: locked() 체크 후 실제 acquire 가 별도 task 안에서 일어나 그 사이 다른 요청이
  끼어들어 이중 수집 task 가 생기던 TOCTOU. 핸들러에서 동기 acquire + task finally release 로 원자화.
- tier_backfill._enqueue_domain: filter_clause 가 SQL 에 직접 보간되나 allowlist 가드 부재
  (retrieval_service _VALID_DOCS_TABLE 정본 대비 비대칭). DOMAIN_PRIORITY 출처 allowlist final
  gate 추가 — 현재 모듈 상수라 injection 0 이나 외부 입력화 시 즉시 차단.

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:07:07 +09:00
hyungi 3565ef9ac4 fix(digest): daily_digest 산출물 이모지 제거 — no-emoji 규칙 (R11a)
실패 강조 라인의 ⚠️ → **[주의]** 텍스트 마커. 산출물(다이제스트 markdown) no-emoji 준수.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:03:31 +09:00
hyungi 719c35afbc refactor(models): ai_tags/user_tags 공유 가변 default 제거 + 주석 정정 (R11a)
- ai_tags: 주석/Mapped 타입이 dict 인데 실제 list 적재 → list 로 정정.
- ai_tags/user_tags: default=[] (정의 시점 1회 평가되는 공유 가변 인스턴스) → default=list
  (callable, 행마다 새 리스트). SQLAlchemy column default 관용 idiom.

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:02:56 +09:00
hyungi e664d7b187 perf(setup): setup 미들웨어 user COUNT 캐시 — per-request 쿼리 제거 (R10)
setup 완료 후에도 모든 비-bypass 요청이 select count(User.id) 를 실행하던 per-request
비용. 셋업 완료(user 존재)는 monotonic 이라 1회 확인 후 _setup_complete 플래그로 영구
skip(이후 요청 DB 쿼리 0). global 선언은 함수 첫 줄(read+assign 혼용 UnboundLocalError 방지).

R10 잔여(library-tree jsonb 집계 golden-diff·facet-counts·events-count·synthesis cache TTL)는
결과 동등성 검증 동반이라 후속. 검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 14:01:25 +09:00
hyungi 3ba9537515 fix(study): submit_attempt FOR UPDATE 행 잠금 — 동시 이중제출 race 차단 (R9)
quiz_session 을 session.get(잠금 없음)으로 읽어 모바일 더블탭/재시도 시 동시 제출 둘 다
cursor=N 을 보고 cursor+1·correct/wrong/unsure count 를 이중 가산하던 race. select +
with_for_update() 로 행 잠금 → 직렬화. 두 번째 제출은 첫 commit 후 cursor=N+1 을 읽고
cursor 위치 불일치 409 로 거부된다.

belt-and-suspenders 인 attempt UNIQUE 제약은 기존 중복 dup-backfill 마이그가 선행조건이라
별도(R9 후속). 검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:59:35 +09:00
hyungi d58565ef38 refactor(search): Phase 2A cand 슬러그·테이블 제거 (R13)
Phase 2A 임베딩 후보(me5_large_inst·snowflake_l_v2·qwen06·qwen4·qwen4m) no-go 종결
(2026-06-12, 후보 전부 -0.03~-0.04) + phase2a_cand_backfill 워커 dormant(미스케줄·미import).
- retrieval_service.CANDIDATE_BACKEND_MAP: 5 cand 엔트리 제거(baseline 만 잔존) — read-path
  슬러그를 먼저 빼야 embedding_backend=cand_X /search 가 dropped 테이블 읽어 500 안 남.
- api.search allowed 하드코딩 리스트 → ["baseline"] (R12 search-error-allowed dangling 동반 제거).
- phase2a_cand_backfill.py 삭제(dead code, 드롭될 테이블 참조 — R12 config-bypass 동반 해소).
- 마이그 360: cand 10테이블 DROP TABLE IF EXISTS(멱등, 환경별 존재차 흡수).

검증: py_compile 통과, 슬러그 잔존 참조 0. migration txn 제어문 없음.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:56:42 +09:00
hyungi 70f90bc914 fix(digest): daily_digest KST↔UTC 경계 정렬 + blocking I/O off-thread (R8)
- func.date(created_at) == today(KST) 비교는 pg TimeZone(UTC) 기준 날짜라 KST 0~9시
  생성 문서(UTC 전날)가 오늘 다이제스트에서 누락되던 경계 버그. KST 하루를 UTC 범위
  (start_utc~end_utc)로 변환해 created_at(UTC저장) 범위 비교로 전환(3곳).
- NAS 저장/glob/stat/rename blocking 파일 I/O 를 _write_and_rotate 헬퍼 + asyncio.to_thread
  오프로드(이벤트 루프 점유 방지, R5 일관).

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:53:08 +09:00
hyungi 688532b1fa fix(briefing): held→409 표면화 + study attempt naive datetime→UTC (R8)
- briefing.regenerate: held(정책상 정상 보류)를 digest.py 정본처럼 409 로 표면화. 이전엔
  briefing_worker.run() 이 held/timeout/exception 셋 다 None 반환 → API 가 셋 다 500 으로
  오보(silent-state-conflation). 진입부 'briefing' in pipeline_held_stages 가드.
- study_question.answered_at: naive default datetime.now → lambda datetime.now(timezone.utc).
  컨테이너=UTC 실측이라 값 동일·백필 불요, 컨테이너 TZ 바뀌면 9h 어긋나던 잠복 의존 제거.

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:51:42 +09:00
hyungi 3a22d225a0 feat(documents): delete_file=true 큐드-감사삭제 — purge 마커 + retention sweep (R7)
delete_file 파라미터가 광고만 하고 본문에서 0회 참조(soft-delete만, 파일 영구 잔존 +
프론트가 실제 호출)되던 거짓 계약 구현. (c) 큐드삭제:
- 마이그 359: documents.purge_requested_at 컬럼(ADD COLUMN IF NOT EXISTS, replayable).
- delete_document: delete_file=true 시 purge_requested_at 마커 set(deleted_at 과 별도).
- document_purge_sweep cron(03:20 KST): purge_requested_at + grace(30일) 경과 + 파일 존재
  시 NAS 원본 unlink + AUDIT 로그. ★sweep 는 deleted_at 아니라 purge_requested_at 기준 —
  일반 숨김(delete_file=false)은 파일 보존(undelete 가능), 명시 purge 만 물리삭제(데이터 안전).
- DELETE 요청 경로엔 동기 비가역 op 0. 파일 존재 체크로 멱등. unlink 는 to_thread(R5 일관).

검증: py_compile 통과. migration txn 제어문 없음.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:48:25 +09:00
hyungi 8a625bfb27 fix(security): soft-delete 가드 구조화 — get_live_document 헬퍼 + paper-holder (R7)
조회/수정 경로는 deleted_at 을 일관 가드하나 파일/콘텐츠 서빙 5엔드포인트
(get_document_file·image_raw·save_content·preview·content)가 'if not doc' 만 검사 →
삭제 문서 원본/preview/전문/마커이미지가 doc_id(+토큰)만으로 노출·삭제 문서 NAS 재기록.
get_live_document(session, doc_id) 헬퍼(없거나 deleted_at 이면 404)로 통일 — '경로마다
deleted_at 기억' 대신 구조 강제(추가될 서빙 경로 자동 보호). save_content 는 삭제 문서
쓰기 차단까지. find_paper_holder 도 deleted_at IS NULL 필터 추가(dedup.find_canonical 대칭).

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:45:33 +09:00
hyungi 844a5e0204 fix(security): internal 토큰 상수시간 비교 + memo tag 파라미터 바인딩 (R7)
- internal_study._verify_token: != 비교는 첫 불일치 단락으로 prefix 길이 timing
  side-channel(RAG 정답 endpoint 보호 토큰) → hmac.compare_digest(search.py 정본 일치).
- memos tag 필터: f-string 으로 사용자 tag 를 JSON 배열 리터럴에 직접 삽입 → tag 안
  "/] 가 JSON 깨 500 + 필터 변형. func.jsonb_build_array(tag) 바인드 파라미터로.

검증: py_compile 통과. R7 나머지(get_live_document·paper-holder deleted_at·delete_file
purge 마커+retention sweep·fetch-page·save-content)는 이어서.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:40:35 +09:00
hyungi 456dfaa9f2 fix(ai): _call_chat 무동의 Claude egress 자동폴백 제거 (R6)
primary(맥미니) Timeout/ConnectError 시 동의·과금 통제 없이 ai.fallback(Claude API)으로
자동 전환 → 개인 문서/쿼리/메모가 Anthropic 으로 silent egress 되던 프라이버시 결함 봉쇄.
실패는 전파 — 배치 워커는 재시도/StageDeferred(R3), interactive 는 호출자 5xx 표면화
(documents.analyze 이미 502/504). 클라우드는 premium explicit-trigger / call_fallback
명시 호출로만 (자동 진입 금지).

참고: uncoordinated-mlx-semaphores 는 gitea/main 최신에서 digest/briefing 이 이미
acquire_mlx_gate 사용(감사 20커밋 stale 탓 오탐) — 변경 불요. rerank silent-identity 의
rerank_skipped notes 플래그는 시그니처 변경 동반이라 별도 후속(Low).

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:38:46 +09:00
hyungi cb7c0fdc4f fix(workers): blocking I/O off-thread — watch_inbox·getaddrinfo·file stream (R5)
AsyncIOScheduler 가 FastAPI lifespan 과 같은 이벤트 루프를 공유하는데 동기 blocking
I/O 가 루프를 점유 → 같은 루프의 모든 1분 주기 consumer + FastAPI 요청 동시 정지.
- watch_inbox: NFS rglob walk + GB 파일 SHA-256(file_hash)을 asyncio.to_thread 오프로드.
  스캔 루프가 순차라 file_hash 직렬화 유지(병렬 해싱 X = NFS 2.5GbE 대역폭·메모리 blowup 방지).
- news create_source: validate_feed_url 의 getaddrinfo(blocking DNS) off-thread.
- storage/local stream: 청크 f.read off-thread.
marker_worker/mailplus to_thread 컨벤션 재사용. daily_digest blocking 은 R8(TZ)과 한 패스.

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:35:44 +09:00
hyungi 2e19dc3d37 fix(collectors): kosha 부분실패 per-case commit — 전체 rollback 방지 (R4)
kosha run() 이 소스별 단일 세션으로 collector 전체를 돌리고 예외 시 rollback →
페이지 _api_get 실패가 앞서 적재한 케이스/항목을 전부 폐기(부분 적재 손실 + 매번
같은 지점 실패 시 영구 미적재). disaster_cases/fatal_accidents/guide 의 케이스·항목
단위로 session.commit() 경계 추가(csb/api_standards idiom) — 실패 이전 적재분 보존,
dedup 으로 다음 run 이 이어받음. 첨부 실패는 기존대로 격리(변경 없음).

검증: py_compile 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:32:07 +09:00
hyungi 2ad32c5c84 fix(collectors): 워터마크 cap 절단 시 미전진 — silent backlog loss 차단 (R4)
arxiv/openalex 수집기가 run_cap 도달로 카테고리/시드 중도 절단돼도 워터마크를
newest 로 전진시켜, [oldest-ingested, 옛 watermark] 사이 미적재 항목이 다음 run 의
watermark 필터에 영구 배제되던 silent data loss 수정.

capped 플래그: cap 으로 루프 절단 시 set → 워터마크 미전진. 미전진하면 다음 run 이
최신부터 재스캔하며 적재분은 dedup-skip(cap 미소모)하고 gap 까지 내려가 이어 적재
→ 백로그 run 당 cap 소화(livelock 회피). 정상 완주(watermark 도달/cursor 소진) 시에만
전진. bulk(CLI)은 cap 무관. docstring 의 '다음 run 이월' 약속을 실제 동작과 일치.

검증: py_compile 통과. kosha 부분실패 per-case commit 은 R4 후속.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:28:04 +09:00
hyungi c11f113cf1 fix(workers): silent completion 차단 — transient re-raise + enqueue 격리 (R3)
worker_fn 이 transient 실패를 삼켜 정상 반환하면 queue_consumer 가 status=completed
로 확정 → 영구 데이터 손실 + 재시도/추적 0. 정본(extract/marker/fulltext/stt 는
re-raise)과 어긋난 곳을 통일:
- deep_summary: 호출 실패(call_failed)를 삼키지 않고 raise → 재시도→failed dead-letter
  (이전엔 ai_detail_summary 영구 누락 + tier triage 고착).
- thumbnail: _extract_thumbnail 실패를 silent return → raise (썸네일 영구 누락 방지).
- queue_consumer: 완료 커밋 후 enqueue_next_stage(정상·skip-note 2곳)를 자체 try 로
  격리 — enqueue 실패가 outer except 로 전파돼 completed 항목을 재오픈(stage 재실행)
  하던 결함 차단. 실패는 ERROR 로 가시화.
- broad except 에 asyncio.CancelledError 명시 통과(embed worker / ask classifier·verifier).

dead-letter = ProcessingQueue.status='failed'(기존 attempts/max_attempts 머신 재사용,
신규 컬럼 불필요). 검증: py_compile 통과. 큐 재시도 의미 synthetic smoke(staging) 예정.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:24:25 +09:00
hyungi 9c22337647 fix(search): 공유 AsyncSession 동시 쿼리 직렬화/세션 분리 + rewrite axis 누락 (R2)
asyncio.gather 가 단일 AsyncSession 에 동시 execute 를 진입시켜 부하 의존적
'another operation in progress' 비결정 크래시 (정상 순차 경로에서만 검증돼 잠복).
사이트별 처방(균일 처방 회피):
- search_with_rewrite._variant_retrieve: variant 마다 독립 async_session() fan-out
  (사용자 대면 — N variant 병렬 유지)
- study explanation_rag / subject_note_rag: 백그라운드 prefetch 라 순차 직렬화
  (rerank 도 순차 — DB 순차+rerank gather 분할은 _gather_* 4곳 침습이라 보류,
   배경 작업의 rerank 병렬 이득 미미)

추가: rewrite(multi-query) 경로가 axis 필터(material_type/jurisdiction/year)를
single-query path 와 달리 조용히 누락 — search_with_rewrite 에 axis 인자 + _variant_retrieve
가 search_text/search_vector 에 전달.

검증: py_compile 통과. 동시 N variant 부하 테스트(staging)로 크래시 소거 확인 예정.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:18:17 +09:00
hyungi d8ad097a3a ops(migrations): fresh-DB/DR replay·enum 스모크 게이트 (R0)
init_db 의 단일 트랜잭션 적용 경로(engine.begin)를 미러해 migrations/ 전체가
빈 DB / DR(pre-320 → catch-up) 업그레이드에서 한 트랜잭션으로 적용 가능한지 검증.
pg16(pgvector/pgvector:pg16) 핀, ephemeral 컨테이너 자동 기동/정리.

현재 두 시나리오 모두 011_embedding_1024 에서 FAIL — view active_documents 가
documents.embedding 의존(DROP COLUMN CASCADE 부재). enum(326) 이전 지점.
fresh replay 가 한 번도 검증된 적 없어 누적 비-replayable cruft 다수 확인.
R1(스키마 baseline 스냅샷)으로 fix 후 PASS 가 게이트 기준.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-16 13:11:55 +09:00
hyungi 3a780c0d06 Merge pull request 'feat(review): 검토 대기 자동검토 워커 — 고신뢰 자동승인 + 저신뢰 잔류' (#42) from feat/auto-review-pending into main
Reviewed-on: #42
2026-06-15 16:33:25 +09:00
hyungi 35d7c7eab7 Merge pull request 'Feat/memo to document' (#41) from feat/memo-to-document into main
Reviewed-on: #41
2026-06-15 15:21:57 +09:00
hyungi ffe4c776e9 Revert "feat(viewer): md 본문 외부 링크 새 탭 + rel 보안"
This reverts commit 60f3b25.

병렬 세션이 동일 P0(외부 링크 새 탭+rel)를 feat/memo-to-document 브랜치에
docMarkdown.ts link 렌더러 + ADD_ATTR 방식으로 이미 구현(SSR 적용·memos 번들).
중복 회피 위해 본 $effect 구현(redundant)을 canonical 에서 되돌리고 그쪽에 양보.
분석 산출물/측정 결과는 PKM learning 문서로 기록 보존.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 15:17:18 +09:00
hyungi 60f3b259df feat(viewer): md 본문 외부 링크 새 탭 + rel 보안
문서 본문 markdown 의 외부 http(s) 링크(코퍼스 실측 521문서/9,496개)가
target/rel 없이 같은 탭으로 열려 SPA 를 이탈하던 문제 수정.

MarkdownDoc 에 heading-anchor 와 동일한 DOM 후처리 $effect 추가 —
sanitize 후 라이브 DOM 의 a[href^=http(s)] 에 target="_blank" +
rel="noopener noreferrer" 부여. marked 렌더러/DOMPurify(전역 hook)·
ADD_ATTR 무수정. 앵커(#)·상대경로·mailto 는 미변경(SPA 내부 항법 보존).

내부 위키링크([[...]])·백링크 그래프는 코퍼스 실측상 실신호 ~8개로
데이터 미지원이라 본 PR 범위에서 제외(보류, 내부 링크 증가가 트리거).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 15:07:39 +09:00
hyungi fabbca64e9 feat(markdown): 외부 링크 새 탭 + rel=noopener noreferrer (P0)
docMarked link 렌더러: http/https 링크에 target=_blank rel=noopener noreferrer
(탭내빙 차단, 코퍼스 521건). 내부/'#'프래그먼트/상대/mailto 는 무손 — outline
gfmHeadingId 경로 유지(클릭 인터셉터 없음=충돌 0). marked15 토큰객체 시그니처.
SANITIZE_OPTS ADD_ATTR 에 target/rel.

load-bearing 게이트: 상대 .md=코퍼스 0건·doc_key 부재 → path→id prop/document_links
미구현(dead). [[..]]=13건 대부분 인용 노이즈([[3\]]) → resolution/스트립 미구현.
외부 링크 하드닝만 정당화됨.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 15:06:58 +09:00
hyungi 4927c585c7 Merge pull request 'Fix/md render katex tldr' (#40) from fix/md-render-katex-tldr into main
Reviewed-on: #40
2026-06-15 14:17:44 +09:00
44 changed files with 6078 additions and 466 deletions
+10 -7
View File
@@ -289,13 +289,16 @@ class AIClient:
return response.json()
async def _call_chat(self, model_config, prompt: str) -> str:
"""OpenAI 호환 API 호출 + 자동 폴백"""
try:
return await self._request(model_config, prompt)
except (httpx.TimeoutException, httpx.ConnectError):
if model_config == self.ai.primary:
return await self._request(self.ai.fallback, prompt)
raise
"""OpenAI 호환 API 호출 (R6: 무동의 클라우드 폴백 제거).
이전엔 primary(맥미니) TimeoutException/ConnectError 시 동의·과금 통제 없이
self.ai.fallback(Claude API)로 자동 전환 → 개인 문서/쿼리/메모가 Anthropic 으로
silent egress. on-prem 추론 프라이버시 계약 위반이라 봉쇄한다. 실패는 그대로 전파:
배치 워커는 재시도/StageDeferred(R3·queue_consumer), interactive 호출자는 5xx 표면화
(documents.analyze 등 이미 502/504 변환). 클라우드는 premium explicit-trigger
(summarize force_premium) 또는 call_fallback 명시 호출로만 — 자동 진입 금지.
"""
return await self._request(model_config, prompt)
async def _request(self, model_config, prompt: str, system: str | None = None) -> str:
"""단일 모델 API 호출 (OpenAI 호환 + Anthropic Messages API).
+6
View File
@@ -195,8 +195,14 @@ async def regenerate(
date 미지정 시 오늘 KST. 같은 날 row 존재 시 transaction 안에서 삭제 후 신규 생성.
응답 status='success' | 'partial' | 'failed' | 'empty'.
"""
from core.config import settings
from workers.briefing_worker import run
# held(정책상 정상 보류)를 409 로 표면화 (R8) — digest.py 정본 대칭. 이전엔 briefing_worker.run()
# 이 held/timeout/exception 셋 다 None 반환 → API 가 셋 다 500 으로 오보(silent-state-conflation).
if "briefing" in settings.pipeline_held_stages:
raise HTTPException(status_code=409, detail="briefing 단계가 일시 보류(held) 상태입니다")
result = await run(target_date=date)
if result is None:
raise HTTPException(status_code=500, detail="briefing 워커 실행 실패 (로그 확인)")
+34 -24
View File
@@ -69,6 +69,19 @@ def _upload_error(status_code: int, error_code: str, message: str) -> HTTPExcept
)
async def get_live_document(session: AsyncSession, doc_id: int) -> Document:
"""soft-delete(deleted_at) 가드 포함 문서 조회 — 없거나 삭제됐으면 404 (R7).
조회/수정 경로는 deleted_at 을 일관 가드하나 파일/콘텐츠 서빙 엔드포인트가 누락 →
삭제 문서의 원본/preview/전문이 doc_id(+유효 토큰)만으로 노출되던 비대칭. '경로마다
deleted_at 기억'에 의존하지 않게 헬퍼로 구조 강제(추가될 서빙 경로도 자동 보호).
"""
doc = await session.get(Document, doc_id)
if not doc or doc.deleted_at is not None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
return doc
async def _near_dup_scan_bg(doc_id: int) -> None:
"""B-3: post-upload near_duplicate 스캔 (BackgroundTask). 자체 세션, best-effort.
@@ -838,9 +851,7 @@ async def get_document_file(
# 일반 Bearer 헤더 인증 시도
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
doc = await get_live_document(session, doc_id)
# note(메모)는 물리 파일이 없음
if not doc.file_path:
@@ -943,10 +954,8 @@ async def get_document_image_raw(
if not payload or payload.get("type") != "access":
raise HTTPException(status_code=401, detail="유효하지 않은 토큰")
# 문서 존재 확인 (image_key 만 있고 doc 가 사라진 케이스 차단)
doc = await session.get(Document, doc_id)
if doc is None:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# 문서 존재 확인 (image_key 만 있고 doc 가 사라진 케이스 차단 + soft-delete 가드)
doc = await get_live_document(session, doc_id)
img = await session.scalar(
select(DocumentImage).where(
@@ -1357,9 +1366,8 @@ async def save_document_content(
body: dict = None,
):
"""Markdown 원본 파일 저장 + extracted_text 갱신"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
# soft-delete 문서엔 쓰기 차단 (R7 — 삭제 문서 resurrect / NAS 재기록 방지)
doc = await get_live_document(session, doc_id)
if doc.file_format not in ("md", "txt"):
raise HTTPException(status_code=400, detail="편집 가능한 포맷이 아닙니다 (md, txt만 가능)")
@@ -1399,9 +1407,7 @@ async def get_document_preview(
else:
raise HTTPException(status_code=401, detail="토큰이 필요합니다")
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
doc = await get_live_document(session, doc_id)
preview_path = Path(settings.nas_mount_path) / "PKM" / ".preview" / f"{doc_id}.pdf"
if not preview_path.exists():
@@ -1427,18 +1433,24 @@ async def delete_document(
doc_id: int,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
delete_file: bool = Query(False, description="NAS 파일도 함께 삭제"),
delete_file: bool = Query(False, description="NAS 원본도 삭제 (grace 후 retention sweep 이 물리삭제)"),
):
"""문서 삭제 (기본: DB만 삭제, 파일 유지)"""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
"""문서 삭제. 기본: soft-delete(숨김, 파일 보존). delete_file=true: purge 예약 (R7)."""
doc = await get_live_document(session, doc_id)
# soft-delete (물리 파일은 cleanup job에서 나중에 정리)
doc.deleted_at = datetime.now(timezone.utc)
# soft-delete(숨김). delete_file=true 면 purge_requested_at 마커를 추가로 set —
# retention sweep cron(document_purge_sweep)이 grace(30일) 경과 후 NAS 원본 물리삭제
# + audit-log. ★일반 숨김(delete_file=false)은 파일 보존 = undelete 가능. sweep 는
# deleted_at 이 아니라 purge_requested_at 기준이라 단순 숨김이 영구삭제되지 않는다.
now = datetime.now(timezone.utc)
doc.deleted_at = now
if delete_file:
doc.purge_requested_at = now
await session.commit()
return {"message": f"문서 {doc_id} soft-delete 완료"}
if delete_file:
return {"message": f"문서 {doc_id} 삭제 — NAS 원본은 30일 후 정리 예약"}
return {"message": f"문서 {doc_id} soft-delete 완료 (파일 보존)"}
@router.get("/{doc_id}/content")
@@ -1448,9 +1460,7 @@ async def get_document_content(
session: Annotated[AsyncSession, Depends(get_session)],
):
"""문서 전문 텍스트 반환 (서비스 호출용)."""
doc = await session.get(Document, doc_id)
if not doc:
raise HTTPException(status_code=404, detail="문서를 찾을 수 없습니다")
doc = await get_live_document(session, doc_id)
raw_text = doc.extracted_text or ""
content = raw_text[:15000]
+5 -5
View File
@@ -21,7 +21,7 @@ from zoneinfo import ZoneInfo
from fastapi import APIRouter, Body, Depends, HTTPException, Query
from pydantic import BaseModel, Field
from sqlalchemy import and_, or_, select
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
@@ -388,10 +388,10 @@ async def list_events(
)
base = select(Event).where(and_(*where))
total_q = await session.execute(
select(Event.id).where(and_(*where))
)
total = len(total_q.scalars().all())
# R10: 전체 ID 로딩 후 len() 대신 DB COUNT 푸시다운 (행 수 선형 메모리/전송 비용 제거).
total = (
await session.execute(select(func.count(Event.id)).where(and_(*where)))
).scalar() or 0
rows = await session.execute(
base.order_by(Event.created_at.desc())
+5 -1
View File
@@ -6,6 +6,7 @@ Bearer token 보호 (settings.internal_worker_token).
"""
from __future__ import annotations
import hmac
import logging
from fastapi import APIRouter, Depends, Header, HTTPException, Path, Response, status
@@ -28,7 +29,10 @@ def _verify_token(authorization: str | None = Header(default=None)) -> None:
if not authorization or not authorization.lower().startswith("bearer "):
raise HTTPException(status_code=401, detail="missing Bearer token")
token = authorization[7:].strip()
if token != settings.internal_worker_token:
# 상수시간 비교 (R7) — 일반 != 는 첫 불일치에서 단락돼 prefix 길이로 바이트 추정 가능한
# timing side-channel. 이 토큰이 RAG 정답 포함 endpoint 를 보호하므로 compare_digest 로
# 통일(search.py 정본과 일치).
if not hmac.compare_digest(token, settings.internal_worker_token):
raise HTTPException(status_code=403, detail="invalid token")
+27 -64
View File
@@ -473,72 +473,35 @@ async def get_facet_counts(
result = FacetCountsResponse(company=[], topic=[], year=[], doctype=[])
# company counts (다른 facet 필터 적용, 자기 자신 제외)
q_company = base_query()
if facet_topic:
q_company = q_company.where(Document.facet_topic == facet_topic)
if facet_year:
q_company = q_company.where(Document.facet_year == facet_year)
if facet_doctype:
q_company = q_company.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_company, func.count())
.where(Document.facet_company != None) # noqa: E711
.where(Document.id.in_(q_company.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_company)
.order_by(func.count().desc())
)
result.company = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
# topic counts
q_topic = base_query()
# R10: 4 facet 블록 중복 제거 — 적용된 facet 필터(값 있는 것만)를 모아 각 축 집계 시
# '자기 자신 축'만 제외하고 적용하는 헬퍼로. 쿼리/자기제외/order_by/value 매핑 모두 동일.
applied: dict = {}
if facet_company:
q_topic = q_topic.where(Document.facet_company == facet_company)
if facet_year:
q_topic = q_topic.where(Document.facet_year == facet_year)
if facet_doctype:
q_topic = q_topic.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_topic, func.count())
.where(Document.facet_topic != None) # noqa: E711
.where(Document.id.in_(q_topic.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_topic)
.order_by(func.count().desc())
)
result.topic = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
# year counts
q_year = base_query()
if facet_company:
q_year = q_year.where(Document.facet_company == facet_company)
applied["company"] = Document.facet_company == facet_company
if facet_topic:
q_year = q_year.where(Document.facet_topic == facet_topic)
if facet_doctype:
q_year = q_year.where(Document.facet_doctype == facet_doctype)
rows = await session.execute(
select(Document.facet_year, func.count())
.where(Document.facet_year != None) # noqa: E711
.where(Document.id.in_(q_year.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_year)
.order_by(Document.facet_year.desc())
)
result.year = [FacetCountItem(value=str(r[0]), count=r[1]) for r in rows]
# doctype counts
q_doctype = base_query()
if facet_company:
q_doctype = q_doctype.where(Document.facet_company == facet_company)
if facet_topic:
q_doctype = q_doctype.where(Document.facet_topic == facet_topic)
applied["topic"] = Document.facet_topic == facet_topic
if facet_year:
q_doctype = q_doctype.where(Document.facet_year == facet_year)
rows = await session.execute(
select(Document.facet_doctype, func.count())
.where(Document.facet_doctype != None) # noqa: E711
.where(Document.id.in_(q_doctype.with_only_columns(Document.id).subquery().select()))
.group_by(Document.facet_doctype)
.order_by(func.count().desc())
)
result.doctype = [FacetCountItem(value=r[0], count=r[1]) for r in rows]
applied["year"] = Document.facet_year == facet_year
if facet_doctype:
applied["doctype"] = Document.facet_doctype == facet_doctype
async def _facet_count(name, facet_col, order_by, value_fn):
q = base_query()
for k, cond in applied.items():
if k != name: # 자기 자신 facet 필터는 제외 (다른 축만 적용)
q = q.where(cond)
rows = await session.execute(
select(facet_col, func.count())
.where(facet_col != None) # noqa: E711
.where(Document.id.in_(q.with_only_columns(Document.id).subquery().select()))
.group_by(facet_col)
.order_by(order_by)
)
return [FacetCountItem(value=value_fn(r[0]), count=r[1]) for r in rows]
result.company = await _facet_count("company", Document.facet_company, func.count().desc(), lambda v: v)
result.topic = await _facet_count("topic", Document.facet_topic, func.count().desc(), lambda v: v)
result.year = await _facet_count("year", Document.facet_year, Document.facet_year.desc(), lambda v: str(v))
result.doctype = await _facet_count("doctype", Document.facet_doctype, func.count().desc(), lambda v: v)
return result
+6 -2
View File
@@ -300,9 +300,13 @@ async def list_memos(
base = base.where(Document.pinned == pinned)
if tag:
# 파라미터 바인딩 (R7) — f-string 으로 사용자 tag 를 JSON 배열 리터럴에 직접 삽입하면
# tag 안 " 나 ] 가 JSON 을 깨 500 + 필터 의미 변형. jsonb_build_array 로 tag 를
# 바인드 파라미터로 전달(@> JSONB containment).
tag_arr = func.jsonb_build_array(tag)
base = base.where(
Document.user_tags.op("@>")(f'["{tag}"]')
| Document.ai_tags.op("@>")(f'["{tag}"]')
Document.user_tags.op("@>")(tag_arr)
| Document.ai_tags.op("@>")(tag_arr)
)
count_query = select(func.count()).select_from(base.subquery())
+10 -2
View File
@@ -65,7 +65,8 @@ async def create_source(
):
from core.url_validator import validate_feed_url
try:
validate_feed_url(body.feed_url)
# getaddrinfo(DNS) 는 blocking — 이벤트 루프 점유 방지 위해 off-thread (R5)
await asyncio.to_thread(validate_feed_url, body.feed_url)
except ValueError as e:
raise HTTPException(status_code=422, detail=f"feed_url 검증 실패: {e}")
source = NewsSource(**body.model_dump())
@@ -194,10 +195,17 @@ async def trigger_collect(
if _collect_lock.locked():
raise HTTPException(status_code=429, detail="수집이 이미 진행 중입니다")
# TOCTOU 제거 (R9) — 기존엔 locked() 체크 후 실제 acquire 가 별도 task 안에서 일어나, 그
# 사이 다른 요청이 끼어들어 이중 수집 task 가 생길 수 있었다. 핸들러에서 동기적으로(uncontended
# Lock.acquire 는 이벤트루프 양보 없이 즉시 완료) acquire 하고 task 의 finally 에서 release.
await _collect_lock.acquire()
async def _run_with_lock():
async with _collect_lock:
try:
from workers.news_collector import run
await run()
finally:
_collect_lock.release()
asyncio.create_task(_run_with_lock())
return {"message": "뉴스 수집 시작됨"}
+7 -3
View File
@@ -291,7 +291,7 @@ async def search(
content={
"error_reason": "unknown_embedding_backend",
"backend_requested": embedding_backend,
"allowed": ["baseline", "cand_me5_large_inst", "cand_snowflake_l_v2"],
"allowed": ["baseline"],
"detail": msg,
},
)
@@ -710,7 +710,9 @@ async def ask(
# 30s 로 align → classifier 동작 안정. ask 응답 latency 상한 ↑ 의도.
try:
classifier_result = await asyncio.wait_for(classifier_task, timeout=30.0)
except (asyncio.TimeoutError, Exception):
except asyncio.CancelledError:
raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception:
classifier_result = ClassifierResult("timeout", None, [], [], 0.0)
defense_log["classifier"] = {
@@ -872,7 +874,9 @@ async def ask(
# → classifier 와 동일 패턴 (search.py:522 가 6s→15s swap 했던 case). 10s 로 align.
try:
verifier_result = await asyncio.wait_for(verifier_task, timeout=10.0)
except (asyncio.TimeoutError, Exception):
except asyncio.CancelledError:
raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception:
verifier_result = VerifierResult("timeout", [], 0.0)
# Verifier contradictions → grounding flags 머지 (prefix 로 구분, severity 3단계)
+10 -1
View File
@@ -1009,7 +1009,16 @@ async def submit_attempt(
# PR-10: 세션 연동. 기본은 None.
quiz_session: StudyQuizSession | None = None
if body.quiz_session_id is not None:
quiz_session = await session.get(StudyQuizSession, body.quiz_session_id)
# FOR UPDATE 로 행 잠금 (R9) — 모바일 더블탭/재시도로 같은 세션에 동시 제출이 들어오면
# 둘 다 cursor=N 을 읽고 둘 다 cursor+1·count 가산하는 race(이중 가산). 잠금으로 직렬화 →
# 두 번째 제출은 첫 commit 후 cursor=N+1 을 보고 cursor 불일치 409 로 거부된다.
quiz_session = (
await session.execute(
select(StudyQuizSession)
.where(StudyQuizSession.id == body.quiz_session_id)
.with_for_update()
)
).scalar_one_or_none()
if quiz_session is None or quiz_session.user_id != user.id:
raise HTTPException(status_code=404, detail="quiz_session 을 찾을 수 없습니다")
if quiz_session.study_topic_id != q.study_topic_id:
+58 -4
View File
@@ -72,6 +72,55 @@ def _validate_sql_content(name: str, sql: str) -> None:
)
# R1: baseline 스냅샷이 대표하는 마지막 마이그레이션 버전 (이하 버전은 baseline 에 포함).
# 새 baseline 재생성 시 이 값을 갱신한다 (migrations/_baseline/<cutoff>_schema_baseline.sql).
_BASELINE_CUTOFF = 358
async def _load_baseline_if_fresh(conn, migrations_dir: Path) -> None:
"""fresh DB(documents 부재)면 baseline 스키마 스냅샷 적재 + schema_migrations 1..cutoff 스탬프.
기존 DB(documents 존재) 즉시 반환 baseline 미적재, 무영향. baseline 파일 부재 시도
기존 replay 경로 유지(하위호환).
"""
from sqlalchemy import text
baseline_dir = migrations_dir / "_baseline"
baseline_files = (
sorted(baseline_dir.glob("*_schema_baseline.sql")) if baseline_dir.is_dir() else []
)
if not baseline_files:
return
docs_exists = (
await conn.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))
).scalar()
if docs_exists:
return # 기존 DB — baseline skip
baseline_path = baseline_files[-1]
logger.info(f"[migration] fresh DB 감지 — baseline 적재: {baseline_path.name}")
# baseline 은 multi-statement 덤프 — exec_driver_sql(asyncpg prepared)은 multi-statement
# 불허("cannot insert multiple commands into a prepared statement"). raw asyncpg 의 simple
# 프로토콜 execute() 로 적재한다(같은 connection = 현재 트랜잭션 내). psql 스모크는 이 제약을
# 못 잡으므로 init_db 런타임 검증으로 확인됨.
raw = await conn.get_raw_connection()
await raw.driver_connection.execute(baseline_path.read_text(encoding="utf-8"))
# baseline = cutoff 까지의 스키마 → 실제 파일 버전 기준으로 schema_migrations 스탬프.
versions = [v for v, _, _ in _parse_migration_files(migrations_dir) if v <= _BASELINE_CUTOFF]
for v in versions:
await conn.execute(
text(
"INSERT INTO schema_migrations (version, name) "
"VALUES (:v, :n) ON CONFLICT DO NOTHING"
),
{"v": v, "n": f"baseline:{v}"},
)
logger.info(
f"[migration] baseline 적재 + schema_migrations {len(versions)}건 스탬프 (cutoff {_BASELINE_CUTOFF})"
)
async def _run_migrations(conn) -> None:
"""미적용 migration 실행 (호출자가 트랜잭션 관리)"""
from sqlalchemy import text
@@ -90,10 +139,6 @@ async def _run_migrations(conn) -> None:
f"SELECT pg_advisory_xact_lock({_MIGRATION_LOCK_KEY})"
))
# 적용 이력 조회
result = await conn.execute(text("SELECT version FROM schema_migrations"))
applied = {row[0] for row in result}
# migration 파일 스캔
# /app/core/database.py → parent.parent = /app → /app/migrations (volume mount 위치)
migrations_dir = Path(__file__).resolve().parent.parent / "migrations"
@@ -101,6 +146,15 @@ async def _run_migrations(conn) -> None:
logger.info("[migration] migrations/ 디렉토리 없음, 스킵")
return
# R1: fresh DB(documents 부재)면 baseline 스냅샷 먼저 적재 + schema_migrations 스탬프.
# migrations/ 전체 replay 는 누적 비-replayable(011 view 의존·326 enum-same-txn 등)로
# 깨지므로 신규/DR 환경은 prod 스키마 스냅샷에서 출발한다. 기존 DB 는 skip(무영향).
await _load_baseline_if_fresh(conn, migrations_dir)
# 적용 이력 조회 (baseline 스탬프 반영 — fresh DB 는 1..cutoff 가 이미 applied)
result = await conn.execute(text("SELECT version FROM schema_migrations"))
applied = {row[0] for row in result}
files = _parse_migration_files(migrations_dir)
pending = [(v, name, path) for v, name, path in files if v not in applied]
+13 -3
View File
@@ -51,6 +51,7 @@ async def lifespan(app: FastAPI):
from workers.briefing_worker import run as morning_briefing_run
from workers.daily_digest import run as daily_digest_run
from workers.dedup_reconcile import run as dedup_reconcile_run
from workers.document_purge_sweep import run as purge_sweep_run
from workers.digest_worker import run as global_digest_run
from workers.file_watcher import watch_inbox
from workers.mailplus_archive import run as mailplus_run
@@ -150,6 +151,9 @@ async def lifespan(app: FastAPI):
# plan ds-s1-backend-1 B-4: dedup 컬럼(duplicate_of/duplicate_count) 야간 절대 재계산.
# soft-delete 잔여 드리프트 정리(멱등, 드리프트 없으면 no-op). cron 03:30 (다른 잡과 비충돌).
scheduler.add_job(dedup_reconcile_run, CronTrigger(hour=3, minute=30, timezone=KST), id="dedup_reconcile")
# R7: delete_file=true purge 요청 문서의 NAS 원본 grace(30일) 후 물리삭제 + audit.
# purge_requested_at 마커 기준(단순 숨김은 보존). 03:20 = 다른 새벽 잡과 비충돌 슬롯.
scheduler.add_job(purge_sweep_run, CronTrigger(hour=3, minute=20, timezone=KST), id="purge_sweep")
# B-3 PR4: 레거시 paper 행 arXiv DataCite DOI 스탬프(재유입 차단). keyless·in-DB·enqueue 0.
# dedup_reconcile(03:30)·fulltext_reconcile(03:40) 와 별 worker·비충돌 슬롯.
scheduler.add_job(paper_doi_reconcile_run, CronTrigger(hour=3, minute=50, timezone=KST), id="paper_doi_reconcile")
@@ -236,21 +240,27 @@ SETUP_BYPASS_PREFIXES = (
"/api/setup", "/api/config", "/setup", "/health", "/docs", "/openapi.json", "/redoc",
)
# R10: 셋업 완료(user 존재)는 단조(monotonic) — 한 번 확인되면 영구. 매 요청 COUNT 쿼리
# 대신 캐시 플래그로 전환 (setup 후 모든 요청이 users COUNT 하던 per-request 비용 제거).
_setup_complete = False
@app.middleware("http")
async def setup_redirect_middleware(request: Request, call_next):
global _setup_complete # 함수 내 read+assign 둘 다 모듈 전역 참조 (UnboundLocalError 방지)
path = request.url.path
# 바이패스 경로는 항상 통과
if any(path.startswith(p) for p in SETUP_BYPASS_PREFIXES):
# 셋업 완료됐거나 바이패스 경로면 즉시 통과 (DB 쿼리 없음)
if _setup_complete or any(path.startswith(p) for p in SETUP_BYPASS_PREFIXES):
return await call_next(request)
# 유저 존재 여부 확인
# 유저 존재 여부 확인 (셋업 완료 전 1회성 — 완료 확인되면 플래그 set 후 영구 skip)
try:
async with async_session() as session:
result = await session.execute(select(func.count(User.id)))
user_count = result.scalar()
if user_count == 0:
return RedirectResponse(url="/setup")
_setup_complete = True
except Exception:
pass # DB 연결 실패 시 통과 (health에서 확인 가능)
+6 -2
View File
@@ -52,7 +52,8 @@ class Document(Base):
# 2계층: AI 가공
ai_summary: Mapped[str | None] = mapped_column(Text)
ai_tags: Mapped[dict | None] = mapped_column(JSONB, default=[])
# R11a: 주석 dict→list 정정(실제 list 적재), 공유 가변 default=[] → callable default=list.
ai_tags: Mapped[list | None] = mapped_column(JSONB, default=list)
ai_domain: Mapped[str | None] = mapped_column(String(100))
ai_sub_group: Mapped[str | None] = mapped_column(String(100))
ai_model_version: Mapped[str | None] = mapped_column(String(50))
@@ -79,7 +80,7 @@ class Document(Base):
user_note: Mapped[str | None] = mapped_column(Text)
# 사용자 태그 (ai_tags와 분리, #태그 파싱 결과 또는 수동 입력)
user_tags: Mapped[list | None] = mapped_column(JSONB, default=[])
user_tags: Mapped[list | None] = mapped_column(JSONB, default=list) # R11a: 공유 가변 default 제거
# 핀 고정
pinned: Mapped[bool] = mapped_column(Boolean, default=False)
@@ -105,6 +106,9 @@ class Document(Base):
# 승인/삭제
review_status: Mapped[str | None] = mapped_column(String(20), default="pending")
deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# delete_file=true 명시 삭제 요청 마커 (R7) — retention sweep(document_purge_sweep)이
# grace 후 NAS 원본 물리삭제. deleted_at(단순 숨김, 파일 보존)과 분리.
purge_requested_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# 외부 편집 URL
edit_url: Mapped[str | None] = mapped_column(Text)
+4 -2
View File
@@ -7,7 +7,7 @@ PR-2 가드레일:
- correct_choice 변경 기존 attempt.is_correct 재계산 (기록은 시점의 사실).
"""
from datetime import datetime
from datetime import datetime, timezone
from pgvector.sqlalchemy import Vector
from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, SmallInteger, String, Text
@@ -128,7 +128,9 @@ class StudyQuestionAttempt(Base):
# PR-9: outcome 권장값 (correct/wrong/unsure). 강한 enum 미사용.
outcome: Mapped[str] = mapped_column(String(20), nullable=False)
answered_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
# TZ-aware 명시 (R8) — naive datetime.now() 는 컨테이너 TZ 의존. 현 컨테이너=UTC 라
# 값 동일(백필 불요)이나, 컨테이너 TZ 가 바뀌면 9시간 어긋나는 잠복 의존 제거.
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), nullable=False
)
# PR-10: 어떤 quiz 세션의 attempt 인지 (NULL = 세션 외 직접 입력 또는 세션 삭제됨).
quiz_session_id: Mapped[int | None] = mapped_column(
+2 -1
View File
@@ -32,7 +32,8 @@ async def find_paper_holder(session, raw_or_normalized_doi):
return None
result = await session.execute(
select(Document)
.where(Document.material_type == "paper", _DOI_EXPR == doi)
.where(Document.material_type == "paper", _DOI_EXPR == doi,
Document.deleted_at.is_(None))
.limit(1)
)
return result.scalars().first()
+4 -36
View File
@@ -54,42 +54,10 @@ QUERY_EMBED_MAXSIZE = 500
# server-side allowlist map. query parameter 가 raw table name 받지 않음.
CANDIDATE_BACKEND_MAP: dict[str, dict[str, str] | None] = {
"baseline": None,
"cand_me5_large_inst": {
"docs_table": "documents_cand_me5_large_inst",
"chunks_table": "document_chunks_cand_me5_large_inst",
"embed_endpoint": "http://embedding-cand-me5-inst:80/embed",
},
"cand_snowflake_l_v2": {
"docs_table": "documents_cand_snowflake_l_v2",
"chunks_table": "document_chunks_cand_snowflake_l_v2",
"embed_endpoint": "http://embedding-cand-snowflake-l-v2:80/embed",
},
# ─── Phase 2A (embedding-phase2a-1, 2026-06-12): Qwen3-Embedding 후보 3종 ───
# embed_kind="ollama" = /api/embed 호출 + 쿼리측 instruct prefix (비대칭 사용,
# G-1 fixture 실측: prefix 가 관련쌍 cos +0.016). 문서측은 backfill 이 plain 으로 적재.
# qwen4m = 4B 의 MRL 1024d (dimensions 옵션 — Ollama 가 truncate+재정규화 수행, G-1 실측).
"cand_qwen06": {
"docs_table": "documents_cand_qwen06",
"chunks_table": "document_chunks_cand_qwen06",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:0.6b",
},
"cand_qwen4": {
"docs_table": "documents_cand_qwen4",
"chunks_table": "document_chunks_cand_qwen4",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
},
"cand_qwen4m": {
"docs_table": "documents_cand_qwen4m",
"chunks_table": "document_chunks_cand_qwen4m",
"embed_endpoint": "http://ollama:11434/api/embed",
"embed_kind": "ollama",
"embed_model": "qwen3-embedding:4b",
"embed_dimensions": 1024,
},
# Phase 2A 임베딩 후보(me5_large_inst·snowflake_l_v2·qwen06·qwen4·qwen4m) 전량 no-go
# 종결(2026-06-12, 후보 전부 -0.03~-0.04) → cand 슬러그·테이블 제거 (R13, 마이그 360
# DROP). read-path 슬러그를 먼저 빼야 embedding_backend=cand_X /search 가 dropped 테이블을
# 읽어 500 나지 않는다. baseline(production)만 잔존.
}
# G-1 핀 고정 instruct 문자열 (inventory 2026-06-12-c 기록과 동일해야 함 —
+18 -7
View File
@@ -32,6 +32,8 @@ from typing import TYPE_CHECKING, Literal
from sqlalchemy.ext.asyncio import AsyncSession
from core.database import async_session
from . import query_analyzer, query_rewriter
from .fusion_service import (
DEFAULT_FUSION,
@@ -188,6 +190,7 @@ async def run_search(
snapshot_chunk_id_max=snapshot_chunk_id_max,
reranker_backend=reranker_backend,
rewrite_backend=rewrite_backend,
axis=axis,
)
timing: dict[str, float] = {}
@@ -536,6 +539,7 @@ async def search_with_rewrite(
snapshot_chunk_id_max: int | None,
reranker_backend: str | None,
rewrite_backend: str,
axis: "AxisFilter | None" = None,
) -> PipelineResult:
"""Phase 2Q multi-query retrieval 합성 path (plan v6 §5.5).
@@ -579,13 +583,20 @@ async def search_with_rewrite(
async def _variant_retrieve(
v: str,
) -> "tuple[list[SearchResult], list[SearchResult], dict[int, list[SearchResult]]]":
text = await search_text(session, v, per_variant_k)
raw_chunks = await search_vector(
session, v, per_variant_k,
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
)
# 변형별 독립 AsyncSession (fan-out). 공유 session 을 asyncio.gather 로 동시
# execute 에 넘기면 SQLAlchemy async 가 'another operation in progress' 로
# 부하 의존적 비결정 크래시 — variant 마다 독립 연결로 분리한다.
# axis(material_type/jurisdiction/year) 도 single-query path 와 동일하게 전달
# (rewrite 경로가 axis 필터를 조용히 누락하던 결함 수정).
async with async_session() as vsession:
text = await search_text(vsession, v, per_variant_k, axis=axis)
raw_chunks = await search_vector(
vsession, v, per_variant_k,
embedding_backend=embedding_backend,
snapshot_doc_id_max=snapshot_doc_id_max,
snapshot_chunk_id_max=snapshot_chunk_id_max,
axis=axis,
)
vector, chunks_by_doc = compress_chunks_to_docs(raw_chunks, per_variant_k)
return text, vector, chunks_by_doc
+12 -8
View File
@@ -95,8 +95,10 @@ except FileNotFoundError:
)
# ─── in-memory LRU (FIFO 근사, query_analyzer 패턴 복제) ─
_CACHE: dict[str, SynthesisResult] = {}
# ─── in-memory 캐시 (FIFO eviction + TTL, query_analyzer 패턴 복제) ─
# R10: (ts, result) 저장 — TTL 미적용으로 원문 수정돼도 CACHE_MAXSIZE 찰 때까지 stale answer
# 반환하던 결함 수정. query_rewriter 의 expire_at TTL enforce 정본 복제.
_CACHE: dict[str, tuple[float, SynthesisResult]] = {}
def _model_version() -> str:
@@ -122,10 +124,11 @@ def get_cached(query: str, chunk_ids: list[int], backend_name: str = "gemma-macm
entry = _CACHE.get(key)
if entry is None:
return None
# TTL 체크는 elapsed_ms 를 악용할 수 없으므로 별도 저장
# 여기서는 단순 policy 로 처리: entry 가 있으면 반환 (eviction 은 FIFO 시점)
# 정확한 TTL 이 필요하면 (ts, result) tuple 로 저장해야 함.
return entry
ts, result = entry
if time.time() - ts > CACHE_TTL:
_CACHE.pop(key, None) # 만료 — 삭제 후 miss
return None
return result
def _should_cache(result: SynthesisResult) -> bool:
@@ -143,8 +146,9 @@ def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult, backen
if not _should_cache(result):
return
key = _cache_key(query, chunk_ids, backend_name)
now = time.time()
if key in _CACHE:
_CACHE[key] = result
_CACHE[key] = (now, result)
return
if len(_CACHE) >= CACHE_MAXSIZE:
try:
@@ -152,7 +156,7 @@ def set_cached(query: str, chunk_ids: list[int], result: SynthesisResult, backen
_CACHE.pop(oldest, None)
except StopIteration:
pass
_CACHE[key] = result
_CACHE[key] = (now, result)
def cache_stats() -> dict[str, int]:
+2 -1
View File
@@ -2,6 +2,7 @@
from __future__ import annotations
import asyncio
import os
from collections.abc import AsyncIterator
from pathlib import Path
@@ -42,7 +43,7 @@ class LocalBackend(StorageBackend):
to_read = _STREAM_CHUNK if remaining is None else min(_STREAM_CHUNK, remaining)
if to_read <= 0:
break
data = f.read(to_read)
data = await asyncio.to_thread(f.read, to_read)
if not data:
break
yield data
+9 -6
View File
@@ -252,12 +252,15 @@ async def gather_explanation_context(
client = AIClient()
query = _build_query(question)
try:
# 두 조회 병렬화 (rerank 호출이 별개라 lock 충돌 없음)
docs, questions = await asyncio.gather(
_gather_document_evidence(session, user_id, question.study_topic_id, query, client),
_gather_question_evidence(
session, user_id, question.study_topic_id, question.id, query, client
),
# 같은 AsyncSession 을 asyncio.gather 로 동시 execute 에 넘기면 SQLAlchemy async 가
# 'another operation in progress' 로 부하 의존적 비결정 크래시(이전 주석 'lock 충돌
# 없음' 은 rerank HTTP 만 보고 DB execute 동시성을 간과한 오인). 백그라운드 prefetch
# 라 순차 직렬화 — 사용자 대면 rewrite 경로(독립 세션 fan-out)와는 다른 처방.
docs = await _gather_document_evidence(
session, user_id, question.study_topic_id, query, client
)
questions = await _gather_question_evidence(
session, user_id, question.study_topic_id, question.id, query, client
)
return ExplanationContext(documents=docs, questions=questions)
finally:
+7 -3
View File
@@ -238,9 +238,13 @@ async def gather_subject_note_context(
client = AIClient()
query = _build_query(subject, scope)
try:
docs, questions = await asyncio.gather(
_gather_document_evidence(session, user_id, study_topic_id, query, client),
_gather_question_evidence(session, user_id, study_topic_id, subject, scope, query, client),
# 같은 AsyncSession 동시 execute 회피 — 순차 직렬화(백그라운드 prefetch).
# explanation_rag.gather_explanation_context 와 동형(R2 공유세션 동시성 수정).
docs = await _gather_document_evidence(
session, user_id, study_topic_id, query, client
)
questions = await _gather_question_evidence(
session, user_id, study_topic_id, subject, scope, query, client
)
return SubjectNoteContext(documents=docs, questions=questions)
finally:
+10 -2
View File
@@ -303,10 +303,12 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
src = await session.get(NewsSource, source_id)
watermark = _watermark(src, category)
newest_seen: datetime | None = None
capped = False # 이번 run 이 cap 으로 카테고리 중도 절단됐는지 (R4)
max_pages = (10**6 if bulk else _MAX_PAGES_PER_CAT)
try:
for page in range(max_pages):
if inserted >= run_cap:
capped = True
break
xml_text = await _fetch(client, query, page * _PAGE_SIZE)
total, entries = parse_arxiv_feed(xml_text)
@@ -329,12 +331,18 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
else:
await session.rollback()
if inserted >= run_cap:
capped = True
break
await asyncio.sleep(_REQ_SLEEP)
if stop or (page + 1) * _PAGE_SIZE >= total:
break
# 카테고리 워터마크 전진(이번 run 최신 발행일)
if newest_seen:
# 카테고리 워터마크 전진 — cap 으로 절단된 run 은 미전진 (R4).
# 절단 시 newest_seen 으로 전진하면 [oldest-ingested, 옛 watermark] 사이
# 미적재 항목이 다음 run 의 watermark 필터(entry.published <= watermark)에
# 영구 배제(silent data loss). 미전진하면 다음 run 이 최신부터 재스캔하며
# 적재분은 dedup-skip(_ingest_entry False, cap 미소모)하고 gap 까지 내려가
# 이어 적재 → 백로그가 run 당 cap 씩 소화(livelock 회피). bulk 은 cap 무관.
if newest_seen and not capped:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
_set_watermark(src, category, newest_seen)
+14 -9
View File
@@ -272,15 +272,20 @@ async def _lookup_news_source(
if not source_name:
return None, None, None
# news_sources에서 이름이 일치하는 레코드 찾기 (prefix match)
result = await session.execute(select(NewsSource))
sources = result.scalars().all()
for src in sources:
if source_name and (
src.name.split(" ")[0] == source_name
or src.name.startswith(source_name + " ")
):
return src.country, src.name, src.language
# news_sources prefix 매칭 — R10: 전체 로드+Python 루프 대신 DB 필터 푸시다운.
# (name == source_name) OR (name 이 "source_name " 로 시작) = 기존 split[0]==source_name 동치
# (첫 토큰 일치 = 정확일치 또는 'source_name ' prefix). autoescape 로 %/_ 안전.
result = await session.execute(
select(NewsSource)
.where(
(NewsSource.name == source_name)
| NewsSource.name.startswith(source_name + " ", autoescape=True)
)
.limit(1)
)
src = result.scalars().first()
if src is not None:
return src.country, src.name, src.language
logger.warning(
f"[chunk] news_source 매핑 실패: doc_id={doc.id} ai_sub_group={source_name!r} "
+3 -1
View File
@@ -563,7 +563,9 @@ async def process(
doc.facet_doctype = ai_doctype
# ─── ai_suggestion 저장 (자료실 승인 대기함 제안, §1) ───
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES:
# R9: 기존 제안(material_type 제안 등) 우선 — doc.ai_suggestion is None 가드 추가
# (material 제안 블록과 대칭). 없으면 거래문서 제안이 기존 제안을 clobber('기존 제안 우선' 위반).
if ai_doctype in LIBRARY_SUGGESTION_DOCTYPES and doc.ai_suggestion is None:
year = doc.facet_year or datetime.now(timezone.utc).year
doc.ai_suggestion = {
"proposed_category": "library",
+31 -19
View File
@@ -5,7 +5,8 @@ DEVONthink/OmniFocus → PostgreSQL/CalDAV 쿼리로 전환.
SMTP 발송은 2026-06-10 제거 ( 번도 전달 성공한 없는 기능 폐기 결정).
"""
from datetime import datetime, timezone
import asyncio
from datetime import datetime, time, timedelta, timezone
from zoneinfo import ZoneInfo
from pathlib import Path
@@ -20,17 +21,36 @@ from models.queue import ProcessingQueue
logger = setup_logger("daily_digest")
def _write_and_rotate(digest_dir: Path, today: str, markdown: str) -> Path:
"""digest 파일 저장 + 90일 초과 아카이브 이동 (blocking — caller 가 to_thread, R8)."""
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
return digest_path
async def run():
"""일일 다이제스트 생성 + 저장 + 발송"""
# KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire). date 객체로 비교 — Document.created_at::date 와 직접 매칭.
today = datetime.now(ZoneInfo("Asia/Seoul")).date()
# KST 기준 오늘 (cron 이 KST timezone fix 후 20:00 KST 에 fire).
kst = ZoneInfo("Asia/Seoul")
today = datetime.now(kst).date()
# KST 하루를 UTC 범위로 변환 (R8) — func.date(created_at)는 pg TimeZone(UTC) 기준 날짜라
# KST 0~9시 생성 문서(UTC 전날)가 누락되던 경계 버그. created_at(UTC저장) 범위 비교로.
start_utc = datetime.combine(today, time.min, tzinfo=kst).astimezone(timezone.utc)
end_utc = start_utc + timedelta(days=1)
sections = []
async with async_session() as session:
# ─── 1. 오늘 추가된 문서 ───
added = await session.execute(
select(Document.ai_domain, func.count(Document.id))
.where(func.date(Document.created_at) == today)
.where(Document.created_at >= start_utc, Document.created_at < end_utc)
.group_by(Document.ai_domain)
)
added_rows = added.all()
@@ -49,7 +69,8 @@ async def run():
select(Document.title)
.where(
Document.source_channel == "law_monitor",
func.date(Document.created_at) == today,
Document.created_at >= start_utc,
Document.created_at < end_utc,
)
)
law_rows = law_docs.scalars().all()
@@ -66,7 +87,8 @@ async def run():
select(func.count(Document.id))
.where(
Document.source_channel == "email",
func.date(Document.created_at) == today,
Document.created_at >= start_utc,
Document.created_at < end_utc,
)
)
email_total = email_count.scalar() or 0
@@ -101,7 +123,7 @@ async def run():
)
failed_count = failed.scalar() or 0
if failed_count > 0:
section += f"\n⚠️ **실패 {failed_count}건** — 수동 확인 필요\n"
section += f"\n**[주의] 실패 {failed_count}건** — 수동 확인 필요\n"
sections.append(section)
# ─── 5. Inbox 미분류 ───
@@ -119,18 +141,8 @@ async def run():
markdown += "\n".join(sections)
markdown += f"\n---\n*생성: {datetime.now(timezone.utc).isoformat()}*\n"
# ─── NAS 저장 ───
# ─── NAS 저장 + 90일 아카이브 (blocking 파일 I/O off-thread, R8/R5 일관) ───
digest_dir = Path(settings.nas_mount_path) / "PKM" / "Archive" / "digests"
digest_dir.mkdir(parents=True, exist_ok=True)
digest_path = digest_dir / f"{today}_digest.md"
digest_path.write_text(markdown, encoding="utf-8")
# ─── 90일 초과 아카이브 ───
archive_dir = digest_dir / "archive"
archive_dir.mkdir(exist_ok=True)
cutoff = datetime.now(timezone.utc).timestamp() - (90 * 86400)
for old in digest_dir.glob("*_digest.md"):
if old.stat().st_mtime < cutoff:
old.rename(archive_dir / old.name)
digest_path = await asyncio.to_thread(_write_and_rotate, digest_dir, str(today), markdown)
logger.info(f"다이제스트 생성 완료: {digest_path}")
+6 -2
View File
@@ -144,9 +144,13 @@ async def process(
logger.info(f"[deep] id={document_id} 맥북 일시 불가 — 보류 (deferred)")
raise
except Exception as exc:
# 호출 실패(네트워크/API 5xx 등)는 삼키지 않고 전파 (R3) — queue_consumer 가
# attempts 소진까지 재시도 후 status=failed(dead-letter)로 가시화한다. 삼키면
# worker_fn 이 정상 반환 → 큐가 completed 로 확정 → ai_detail_summary 영구 누락 +
# tier 가 triage 에 고착(silent 영구 손실). extract/marker/fulltext/stt 정본과 일치.
# 완주 전 doc 쓰기(168~)는 일어나지 않으므로 부분 쓰기 0 (sleep-안전).
logger.warning(f"[deep] 호출 실패 id={document_id} model={used_cfg.model}: {exc}")
parse_error = "call_failed"
raw = ""
raise
finally:
await client.close()
+65
View File
@@ -0,0 +1,65 @@
"""delete_file=true 로 요청된 문서의 NAS 원본을 grace 후 물리삭제 (R7 retention sweep).
purge_requested_at 마커 기준(deleted_at 아님 일반 soft-delete/숨김은 파일 보존, undelete
가능). grace(30) 경과 + 파일 존재 unlink + AUDIT 로그. 파일 존재 체크로 멱등
(재실행 이미 삭제된 skip). 요청 경로(DELETE) 동기 비가역 op 0 모두 cron 으로.
"""
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from pathlib import Path
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from models.document import Document
logger = logging.getLogger("purge_sweep")
PURGE_GRACE_DAYS = 30
def _unlink_if_exists(p: Path) -> bool:
"""파일이 있으면 unlink (blocking — caller 가 to_thread). 존재 여부 반환(멱등)."""
if p.exists():
p.unlink()
return True
return False
async def run() -> int:
"""purge 요청 + grace 경과 문서의 NAS 원본 물리삭제. 삭제 건수 반환."""
cutoff = datetime.now(timezone.utc) - timedelta(days=PURGE_GRACE_DAYS)
async with async_session() as session:
rows = (
await session.execute(
select(Document.id, Document.file_path, Document.purge_requested_at).where(
Document.purge_requested_at.is_not(None),
Document.purge_requested_at < cutoff,
Document.file_path.is_not(None),
)
)
).all()
purged = 0
for doc_id, file_path, requested_at in rows:
nas_path = Path(settings.nas_mount_path) / file_path
try:
existed = await asyncio.to_thread(_unlink_if_exists, nas_path)
if existed:
purged += 1
# AUDIT — 물리삭제 기록 (가시화). doc_id / 경로 / 요청일 / grace.
logger.warning(
"PURGE doc_id=%s file=%s requested_at=%s grace_days=%s",
doc_id,
file_path,
requested_at.isoformat() if requested_at else None,
PURGE_GRACE_DAYS,
)
except OSError as e:
logger.error("PURGE 실패 doc_id=%s file=%s: %s", doc_id, file_path, e)
if purged:
logger.info("[purge_sweep] NAS 원본 %d건 물리삭제 (grace %d일)", purged, PURGE_GRACE_DAYS)
return purged
+14 -3
View File
@@ -17,6 +17,7 @@ Web/Blog ingest (devonagent 트랙, plan db-snuggly-petal.md):
- sidecar (.json) 누락 : skip 하고 ingest, web_meta.sidecar_missing=true
"""
import asyncio
import hashlib
import json
from pathlib import Path
@@ -136,6 +137,10 @@ def _canonicalize_url(url: str) -> str:
같은 글의 utm 변형 (`?utm_source=foo`) fragment 변형 (`#section`) 을
row 수렴시키기 위해 file_hash 산출 반드시 거친다.
R11c: news_collector._normalize_url(news 채널) 의도적으로 다르다 이쪽(web_clip)
query-sort/trailing-slash/소문자화로 공격적 정규화하지만, news 쪽은 query-식별 사이트의
별개 기사 붕괴 방지를 위해 보수적이다. 함수 통합 금지(채널별 dedup 의도가 다름).
"""
if not url:
return ""
@@ -246,7 +251,8 @@ async def watch_inbox():
async with async_session() as session:
# ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ───
if web_root.exists():
for file_path in web_root.rglob("*.html"):
# rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))):
if not file_path.is_file() or should_skip(file_path):
continue
rel_path = str(file_path.relative_to(nas_root))
@@ -264,7 +270,8 @@ async def watch_inbox():
Path(sub).name, (None, None, None)
)
for file_path in scan_root.rglob("*"):
# NFS 디렉토리 walk(blocking) off-thread 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))):
if not file_path.is_file() or should_skip(file_path):
continue
@@ -278,7 +285,11 @@ async def watch_inbox():
continue
rel_path = str(file_path.relative_to(nas_root))
fhash = file_hash(file_path)
# GB 파일 SHA-256 은 이벤트 루프를 점유 → 같은 루프의 모든 1분 주기 consumer
# + FastAPI 요청이 수십초~분 동시 정지. to_thread 오프로드. 스캔 루프가 이미
# 순차라 file_hash 는 한 번에 하나만 실행(직렬화) — 병렬 해싱 X = NFS 2.5GbE
# 대역폭·버퍼 메모리 blowup 방지 (R5).
fhash = await asyncio.to_thread(file_hash, file_path)
result = await session.execute(
select(Document).where(Document.file_path == rel_path)
+8
View File
@@ -297,6 +297,10 @@ async def collect_disaster_cases(session) -> int:
await _ingest_attachment(session, boardno, filenm, filepath)
except FeedError as e:
logger.warning(f"[kosha] 첨부 실패 skip ({boardno}/{filenm}): {e}")
# 케이스 단위 commit (R4) — 이후 페이지/케이스의 _api_get 실패가 앞서 적재한
# 케이스까지 전체 rollback 하지 않게 부분 적재 보존 (csb/api_standards idiom).
await session.commit()
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
@@ -374,6 +378,8 @@ async def collect_fatal_accidents(session) -> int:
await enqueue_stage(session, doc.id, "embed")
await enqueue_stage(session, doc.id, "chunk")
new_count += 1
# 케이스 단위 commit (R4) — 이후 페이지 실패가 앞 케이스 전체 rollback 방지.
await session.commit()
if page_all_dup:
break # 등록일 역순 — 페이지 전체가 기존이면 이후 페이지도 기존
@@ -450,6 +456,8 @@ async def collect_kosha_guide(session, cap: int = _GUIDE_DAILY_CAP) -> int:
await session.flush()
await enqueue_stage(session, doc.id, "extract")
ingested += 1
# 항목 단위 commit (R4) — 다운로드 실패가 앞서 적재한 GUIDE 항목 전체 rollback 방지.
await session.commit()
# silent cap 금지 — 잔량 가시화 (자동 점진 백필: 내일 cap 만큼 또 소화)
logger.info(f"[kosha] GUIDE 신규/개정 {len(new_specs)}건 중 {ingested}건 ingest"
+65 -101
View File
@@ -83,6 +83,10 @@ def _normalize_url(url: str) -> str:
query 전체 제거 금지: hada.io/topic?id= · aitimes articleView.html?idxno= ·
HN item?id= query-식별 사이트에서 별개 기사가 같은 URL 붕괴된다.
저장(edit_url)·조회 양쪽이 함수를 공유해야 dedup 성립.
R11c: file_watcher._canonicalize_url(web_clip 채널) 의도적으로 다르다 이쪽은 콘텐츠
식별 query 보존(별개 기사 붕괴 방지) 핵심이라 query-sort/trailing-slash/소문자화를 한다.
함수 통합 금지(news dedup 깨짐). 채널별 normalization 의도된 설계.
"""
parsed = urlparse(url)
kept = [
@@ -397,6 +401,55 @@ def _doc_identity(source: NewsSource, source_short: str, category: str) -> dict:
}
async def _already_ingested(session, article_id: str, normalized_url: str, link: str) -> bool:
"""이미 적재된 기사인지 — file_hash 또는 정규화/raw edit_url 매칭 (3 fetch 공통, R11c).
레거시 raw URL + 교차 게시 다중 매칭 내성(first). _fetch_rss/_fetch_api_guardian/
_fetch_api_nyt 복제하던 동일 존재체크를 단일화.
"""
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id)
| (Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
return existing.scalars().first() is not None
def _build_news_doc(source, ident, source_short, article_id, title, body,
extractor_version, normalized_url, pub_dt) -> Document:
"""3 fetch 공통 뉴스 Document 빌더 (R11c). 채널별 차이는 인자로만 — body(NYT=summary)·
extractor_version·ident(category 계산 차이 흡수) 다르고 22 필드 구조는 정적 동일.
edit_url 조회와 동일 정규화 저장(raw 저장 URL dedup 무력화)."""
return Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version=extractor_version,
# article = 텍스트 네이티브 → 생성 시점 terminal 'skipped' 명시(markdown 변환 비대상,
# 미명시 시 'pending' 영구 비수렴 → backlog 지표 오염). page 정책은 fulltext_worker 승격.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
)
async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
"""RSS 피드 수집 — redirect 재검증 + 크기/content-type 제한 + 조건부 GET (A-1).
@@ -515,13 +568,7 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
article_id = _article_hash(title, pub_dt.strftime("%Y%m%d"), source.name)
normalized_url = _normalize_url(link)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
if await _already_ingested(session, article_id, normalized_url, link):
continue
# A-6 2차: 포털 전재 dedup (first-wins — 먼저 적재된 쪽이 정본)
@@ -533,35 +580,9 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0] # "경향신문 문화" → "경향신문"
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version=extractor_version,
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
# fulltext_policy='page' 소스는 fulltext_worker 가 승격 시 success 로 갱신.
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
# 조회와 동일하게 정규화해 저장 — raw(tracking param 포함) 저장 시 URL dedup 무력화
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
doc = _build_news_doc(
source, ident, source_short, article_id, title, body,
extractor_version, normalized_url, pub_dt,
)
session.add(doc)
await session.flush()
@@ -658,13 +679,7 @@ async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
if await _already_ingested(session, article_id, normalized_url, link):
continue
if await _is_portal_duplicate(session, title):
@@ -675,30 +690,9 @@ async def _fetch_api_guardian(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(body.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{body}",
extracted_at=datetime.now(timezone.utc),
extractor_version="guardian_api_full" if is_full else "guardian_api",
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
doc = _build_news_doc(
source, ident, source_short, article_id, title, body,
"guardian_api_full" if is_full else "guardian_api", normalized_url, pub_dt,
)
session.add(doc)
await session.flush()
@@ -755,13 +749,7 @@ async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
normalized_url = _normalize_url(link)
# RSS 수집부와 동일: 레거시 raw URL + 교차 게시 다중 매칭 내성 (first)
existing = await session.execute(
select(Document).where(
(Document.file_hash == article_id) |
(Document.edit_url.in_([normalized_url, link]))
).limit(1)
)
if existing.scalars().first():
if await _already_ingested(session, article_id, normalized_url, link):
continue
if await _is_portal_duplicate(session, title):
@@ -772,33 +760,9 @@ async def _fetch_api_nyt(session, source: NewsSource) -> tuple[int, str]:
source_short = source.name.split(" ")[0]
ident = _doc_identity(source, source_short, category)
doc = Document(
file_path=f"{ident['path_prefix']}/{source.name}/{article_id}",
file_hash=article_id,
file_format="article",
file_size=len(summary.encode()),
file_type="note",
title=title,
extracted_text=f"{title}\n\n{summary}",
extracted_at=datetime.now(timezone.utc),
extractor_version="nyt_api",
# article = 텍스트 네이티브(본문=extracted_text). markdown 단계 미enqueue 라
# 기본값 'pending' 이면 영구 비수렴 → backlog 지표 오염 + md_status_pending partial
# 인덱스 비대. 생성 시점에 terminal 'skipped' 로 명시(변환 비대상).
md_status="skipped",
md_extraction_error="news article: 텍스트 네이티브, markdown 변환 비대상",
source_channel=source.source_channel,
data_origin="external",
edit_url=normalized_url,
review_status="approved",
ai_domain=ident["ai_domain"],
ai_sub_group=source_short,
ai_tags=ident["ai_tags"],
# 안전 자료실 A-2 — 레지스트리 deterministic (classify-skip 경로라 ingest 시점 필수)
material_type=ident["material_type"],
jurisdiction=ident["jurisdiction"],
published_date=pub_dt.date() if pub_dt else None,
extract_meta=_build_extract_meta(source, pub_dt),
doc = _build_news_doc(
source, ident, source_short, article_id, title, summary,
"nyt_api", normalized_url, pub_dt,
)
session.add(doc)
await session.flush()
+8 -1
View File
@@ -331,11 +331,13 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
filter_str = (build_issn_filter(wm_key, watermark) if kind == "issn"
else build_filter(wm_key, watermark))
newest: str | None = None
capped = False # 이번 run 이 cap 으로 시드 중도 절단됐는지 (R4)
cursor = "*"
max_pages = (10**6 if bulk else _MAX_PAGES_PER_KW)
try:
for _page in range(max_pages):
if inserted >= run_cap:
capped = True
break
text = await _fetch(client, key, filter_str, cursor)
_count, next_cursor, works = parse_openalex_works(text)
@@ -353,12 +355,17 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
else:
await session.rollback()
if inserted >= run_cap:
capped = True
break
await asyncio.sleep(_REQ_SLEEP)
if not next_cursor:
break
cursor = next_cursor
if newest:
# cap 절단 시 워터마크 미전진 — 미페치 works 가 다음 run 의 watermark 필터
# (publication_date > watermark)에 영구 배제되는 silent loss 방지. 미전진하면
# 다음 run 이 옛 watermark 부터 재페치하며 적재분 dedup-skip(cap 미소모) 후
# 이어 적재 → 백로그 run 당 cap 소화 (R4). bulk 은 cap 무관.
if newest and not capped:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
_set_watermark(src, wm_key, newest)
-142
View File
@@ -1,142 +0,0 @@
"""Phase 2A 후보 임베딩 백필 CLI (embedding-phase2a-1 E-1).
docker compose exec -T fastapi python -m workers.phase2a_cand_backfill \
--target qwen06 --doc-id-max 41944 --chunk-id-max 104140 [--batch 32]
설계 원칙 (plan r3):
- resumable/idempotent: 대상 = NOT EXISTS(후보 테이블) 중단/재실행 이어서.
배치 단위 커밋. C-1 백필 게이트 = "후보 카운트 == 동결셋 카운트".
- 동결셋: id <= *_id_max AND 베이스라인 embedding IS NOT NULL (AND docs.deleted_at IS NULL).
cand 테이블은 동결 범위로만 INSERT (retrieval cand path snapshot filter 타는 전제).
- 문서/청크 입력 = production 경로와 동일 구성(embed_worker._build_embed_input /
chunk_worker [제목][섹션][본문]) + plain (instruct prefix 쿼리 전용 G-1 불변식).
- 임베딩 = Ollama /api/embed 배치 호출 (G-1 fixture: 정규화 출력).
- qwen4m CLI 대상이 아님 qwen4 적재 SQL 파생(subvector+l2_normalize), plan E-1.
"""
import argparse
import asyncio
import hashlib
import time
import httpx
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
from models.document import Document
from workers.embed_worker import _build_embed_input
logger = setup_logger("phase2a_cand_backfill")
OLLAMA_EMBED = "http://ollama:11434/api/embed"
TARGETS = {
"qwen06": {
"model": "qwen3-embedding:0.6b", "dim": 1024,
"docs": "documents_cand_qwen06", "chunks": "document_chunks_cand_qwen06",
},
"qwen4": {
"model": "qwen3-embedding:4b", "dim": 2560,
"docs": "documents_cand_qwen4", "chunks": "document_chunks_cand_qwen4",
},
}
async def _embed_batch(client: httpx.AsyncClient, model: str, texts: list[str]) -> list[list[float]]:
r = await client.post(OLLAMA_EMBED, json={"model": model, "input": texts}, timeout=600)
r.raise_for_status()
embs = r.json()["embeddings"]
if len(embs) != len(texts):
raise RuntimeError(f"embed count mismatch: {len(embs)} != {len(texts)}")
return embs
async def backfill_docs(target: dict, doc_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT d.id FROM documents d
WHERE d.id <= :m AND d.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['docs']} c WHERE c.doc_id = d.id)
ORDER BY d.id LIMIT :b
"""), {"m": doc_id_max, "b": batch})).scalars().all()
if not rows:
break
docs = [(await session.get(Document, i)) for i in rows]
inputs = [_build_embed_input(d) for d in docs]
embs = await _embed_batch(http, target["model"], inputs)
for d, inp, e in zip(docs, inputs, embs):
await session.execute(text(f"""
INSERT INTO {target['docs']} (doc_id, embed_input_hash, embedding)
VALUES (:i, :h, cast(:e AS vector))
ON CONFLICT (doc_id) DO NOTHING
"""), {"i": d.id, "h": hashlib.sha256(inp.encode()).hexdigest()[:16], "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['docs']}] +{total} (last id={rows[-1]})")
return total
async def backfill_chunks(target: dict, chunk_id_max: int, batch: int, http: httpx.AsyncClient) -> int:
total = 0
while True:
async with async_session() as session:
rows = (await session.execute(text(f"""
SELECT c.id, c.doc_id, c.chunk_index, c.section_title, c.text, d.title
FROM corpus_chunks c JOIN documents d ON d.id = c.doc_id
WHERE c.id <= :m AND c.embedding IS NOT NULL AND d.deleted_at IS NULL
AND NOT EXISTS (SELECT 1 FROM {target['chunks']} k WHERE k.id = c.id)
ORDER BY c.id LIMIT :b
"""), {"m": chunk_id_max, "b": batch})).all()
if not rows:
break
inputs = [
f"[제목] {r.title or ''}\n[섹션] {r.section_title or ''}\n[본문] {r.text}"
for r in rows
]
embs = await _embed_batch(http, target["model"], inputs)
for r, e in zip(rows, embs):
await session.execute(text(f"""
INSERT INTO {target['chunks']} (id, doc_id, chunk_index, section_title, text, embedding)
VALUES (:i, :d, :x, :s, :t, cast(:e AS vector))
ON CONFLICT (id) DO NOTHING
"""), {"i": r.id, "d": r.doc_id, "x": r.chunk_index,
"s": r.section_title, "t": r.text, "e": str(e)})
await session.commit()
total += len(rows)
if total % (batch * 10) < batch:
logger.info(f"[{target['chunks']}] +{total} (last id={rows[-1]})")
return total
async def run(target_key: str, doc_id_max: int, chunk_id_max: int, batch: int) -> None:
target = TARGETS[target_key]
start = time.monotonic()
async with httpx.AsyncClient() as http:
nd = await backfill_docs(target, doc_id_max, batch, http)
nc = await backfill_chunks(target, chunk_id_max, batch, http)
mins = (time.monotonic() - start) / 60
async with async_session() as session:
cd = (await session.execute(text(f"SELECT count(*) FROM {target['docs']}"))).scalar_one()
cc = (await session.execute(text(f"SELECT count(*) FROM {target['chunks']}"))).scalar_one()
logger.info(
f"[{target_key}] 완료 — 이번 run docs +{nd} chunks +{nc} ({mins:.1f}분) · "
f"누적 docs {cd} / chunks {cc} (동결 게이트 = 베이스라인 동결셋 카운트와 일치 확인)"
)
def main() -> None:
p = argparse.ArgumentParser(description="Phase 2A 후보 임베딩 백필 (resumable)")
p.add_argument("--target", required=True, choices=sorted(TARGETS))
p.add_argument("--doc-id-max", type=int, required=True)
p.add_argument("--chunk-id-max", type=int, required=True)
p.add_argument("--batch", type=int, default=32)
a = p.parse_args()
asyncio.run(run(a.target, a.doc_id_max, a.chunk_id_max, a.batch))
if __name__ == "__main__":
main()
+18 -2
View File
@@ -275,7 +275,15 @@ async def _process_stage(stage, worker_fn):
item.status = "completed"
item.completed_at = datetime.now(timezone.utc)
await skip_session.commit()
await enqueue_next_stage(document_id, stage)
# 완료 커밋 후 enqueue — 실패가 outer except 로 전파돼 completed 재오픈
# 되지 않게 격리 (R3, 정상 완료 경로와 동일 처리).
try:
await enqueue_next_stage(document_id, stage)
except Exception as enq_err:
logger.error(
f"[{stage}] document_id={document_id} skip(note) 완료됐으나 "
f"다음 단계 enqueue 실패: {enq_err}"
)
logger.info(f"[{stage}] document_id={document_id} skip (note)")
continue
@@ -293,7 +301,15 @@ async def _process_stage(stage, worker_fn):
item.completed_at = datetime.now(timezone.utc)
await session.commit()
await enqueue_next_stage(document_id, stage)
# 완료는 이미 커밋됨. enqueue_next_stage 실패가 outer except 로 전파되면
# completed 항목을 재오픈(pending/failed)해 같은 단계를 재실행 = 비싼 작업 중복
# + 부분 재쓰기. 자체 try 로 격리하고 ERROR 로 가시화한다 (R3).
try:
await enqueue_next_stage(document_id, stage)
except Exception as enq_err:
logger.error(
f"[{stage}] document_id={document_id} 완료됐으나 다음 단계 enqueue 실패: {enq_err}"
)
logger.info(f"[{stage}] document_id={document_id} 완료")
except StageDeferred as defer:
+3 -1
View File
@@ -102,7 +102,9 @@ async def _process_one(session: AsyncSession, qid: int, client: AIClient) -> boo
try:
async with asyncio.timeout(EMBED_TIMEOUT_S):
vec = await client.embed(text)
except (asyncio.TimeoutError, Exception) as e:
except asyncio.CancelledError:
raise # 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception as e:
logger.warning("study_q_embed_failed qid=%s err=%s: %s", qid, type(e).__name__, e)
# 실패 — status='failed'. 직전 embedding 보존.
q.embedding_status = "failed"
+6 -1
View File
@@ -121,7 +121,12 @@ async def process(document_id: int, session: AsyncSession) -> None:
ok = _extract_thumbnail(source, output, seek)
if not ok:
return
# 썸네일 추출 실패(ffmpeg)는 삼키지 않고 raise (R3) — queue_consumer 가 attempts
# 소진까지 재시도 후 status=failed 로 가시화. silent return 이면 큐가 completed 로
# 확정 + 썸네일 영구 누락 + 재시도/추적 0 (silent skip). 손상 영상이면 failed 로 안착.
raise RuntimeError(
f"thumbnail 추출 실패: document_id={document_id} source={source}"
)
doc.thumbnail_path = str(output)
doc.updated_at = datetime.now(timezone.utc)
+8
View File
@@ -52,6 +52,11 @@ DOMAIN_PRIORITY: list[tuple[str, str]] = [
("manual", "source_channel = 'manual'"),
]
# R12: filter_clause 는 SQL 에 직접 보간되므로 이 allowlist(DOMAIN_PRIORITY 출처) 통과분만
# 허용 — 현재 모듈 상수라 injection 경로 0 이나, 외부 입력화 시 즉시 차단하는 final gate
# (retrieval_service 의 _VALID_DOCS_TABLE allowlist 정본 대비 비대칭 해소).
_ALLOWED_FILTER_CLAUSES: frozenset[str] = frozenset(c for _, c in DOMAIN_PRIORITY)
async def _classify_pending(session: AsyncSession) -> int:
return int(await session.scalar(text("""
@@ -66,6 +71,9 @@ async def _enqueue_domain(session: AsyncSession, filter_clause: str, limit: int)
extracted_text 문자열 (LENGTH=0) 제외 classify_worker not doc.extracted_text
truthy 체크라 문자열에서 ValueError raise. 무한 retry 루프 방지.
"""
# R12: SQL 직접 보간 전 allowlist final gate.
if filter_clause not in _ALLOWED_FILTER_CLAUSES:
raise ValueError(f"비허용 filter_clause (allowlist 외): {filter_clause!r}")
sql = text(f"""
INSERT INTO processing_queue (document_id, stage, status, attempts, max_attempts)
SELECT id, 'classify', 'pending', 0, 3
+15
View File
@@ -65,6 +65,19 @@ docMarked.use({
`</figure>`
);
},
// 외부 링크(http/https) → 새 탭 + rel=noopener noreferrer (탭내빙 차단). 521건 실재.
// 내부/프래그먼트/상대 링크는 손대지 않음 — `#` anchor 는 gfmHeadingId/outline 경로 유지
// (클릭 인터셉터 없음 → 충돌 0), 상대 .md(코퍼스 0건)는 기본 동작(inert). marked 15 토큰객체 시그니처.
link(token: any): string {
const href = (token?.href ?? '') as string;
const text = this.parser.parseInline(token?.tokens ?? []);
const titleAttr = token?.title ? ` title="${escAttr(token.title as string)}"` : '';
const safeHref = escAttr(href);
if (/^https?:\/\//i.test(href)) {
return `<a href="${safeHref}"${titleAttr} target="_blank" rel="noopener noreferrer">${text}</a>`;
}
return `<a href="${safeHref}"${titleAttr}>${text}</a>`;
},
},
});
@@ -82,6 +95,8 @@ const SANITIZE_OPTS = {
'data-md-image-internal',
'data-md-image-alt',
'loading',
'target',
'rel',
],
ADD_TAGS: ['figure', 'figcaption'],
FORBID_TAGS: ['script', 'iframe', 'object', 'embed', 'link', 'meta'],
@@ -0,0 +1,6 @@
-- 359: delete_file=true 명시 삭제 요청 마커 (R7 delete_file 큐드삭제).
-- retention sweep(document_purge_sweep) 이 이 컬럼 + grace(30일) 기준으로 NAS 원본을
-- 물리삭제한다. deleted_at(단순 숨김)과 분리 — 숨김(delete_file=false)은 파일 보존(undelete
-- 가능). sweep 가 deleted_at 기준이면 모든 숨김이 30일 후 물리삭제되는 데이터 손실이 되므로
-- 명시 purge 요청만 대상으로 한다.
ALTER TABLE documents ADD COLUMN IF NOT EXISTS purge_requested_at TIMESTAMPTZ;
@@ -0,0 +1,11 @@
-- 360: Phase 2A 임베딩 후보 cand 섀도 테이블 제거 (R13).
-- Phase 2A no-go 종결(2026-06-12, 후보 전부 -0.03~-0.04) + phase2a_cand_backfill 워커 dormant.
-- retrieval_service.CANDIDATE_BACKEND_MAP / api.search allowed 슬러그 선제거 후 DROP.
-- ★single statement(콤마 구분) — init_db 의 exec_driver_sql(asyncpg)은 multi-statement 불허.
-- IF EXISTS — me5/snowflake 는 ad-hoc 생성분이라 환경별 존재 여부 다를 수 있음(멱등).
DROP TABLE IF EXISTS
document_chunks_cand_me5_large_inst, documents_cand_me5_large_inst,
document_chunks_cand_snowflake_l_v2, documents_cand_snowflake_l_v2,
document_chunks_cand_qwen06, documents_cand_qwen06,
document_chunks_cand_qwen4, documents_cand_qwen4,
document_chunks_cand_qwen4m, documents_cand_qwen4m;
@@ -0,0 +1,9 @@
-- 361: quiz 세션 내 같은 문제 이중 attempt 방지 partial UNIQUE (R9).
-- submit_attempt 의 FOR UPDATE 행잠금이 1차 방어, 이 제약은 DB 레벨 belt-and-suspenders.
-- prod 실측 중복 0 (GROUP BY (quiz_session_id, study_question_id) HAVING count>1 = 0) + fresh DB
-- 빈 테이블이라 dedup DELETE 불요 → ★single statement(init_db exec_driver_sql 은 multi-statement
-- 불허). 혹시 중복이 생긴 환경이면 이 마이그가 실패하므로(IntegrityError) 수동 dedup 후 재적용.
-- quiz_session_id IS NULL(세션 외 직접 입력)은 비대상 → partial index.
CREATE UNIQUE INDEX IF NOT EXISTS uq_attempt_session_question
ON study_question_attempts (quiz_session_id, study_question_id)
WHERE quiz_session_id IS NOT NULL;
File diff suppressed because it is too large Load Diff
+122
View File
@@ -0,0 +1,122 @@
"""전체 app 부팅 런타임 스모크 (GPU 격리) — deploy-blocker 게이트.
init_db 자체는 initdb_runtime_test.py(R1)·migration_smoke.sh 검증한다.
스모크는 위에서 **실제 컨테이너 부팅 경로**(main:app + lifespan startup) 실행해
py_compile 잡는 deploy-blocker 클래스를 잡는다:
`import main` = router import + FastAPI app 빌드 (router 심볼누락·순환 검출)
lifespan startup = lifespan 안의 worker import(35) + init_db + add_job 실행
(worker import-time 오류· 등록 오류 검출, **drift 0** = 실제 경로)
/health (health_check 직접 호출) = DB connected
prod/AI/NAS 무접촉을 위해 부작용 3개만 외과적으로 중립화한다 (검증 대상 로직은 그대로):
- NAS 마운트 체크 임시 디렉토리(+PKM/) 통과 ( NAS 의존 제거)
- scheduler.start() no-op (잡은 등록되지만 실행 = 워커 폴링·외부 API 호출 0)
- scheduler.shutdown() no-op (start 했으니 __aexit__ shutdown raise 하도록)
- prewarm_analyzer() no-op (AI 라우터 :8890 미호출 = 검색실험 soft-lock 안전)
실행 (worktree 루트를 마운트한 prod fastapi 이미지 컨테이너 ):
docker run --rm --network <net> -v <worktree>:/work -w /work \
-e PYTHONPATH=/work/app -e BOOT_SMOKE=1 \
-e DATABASE_URL="postgresql+asyncpg://postgres@ds-bootsmoke-pg:5432/pkm" \
<fastapi_image> python scripts/ci/boot_smoke.py
기대: IMPORTS OK LIFESPAN startup OK (jobs=N, purge_sweep 포함) schema OK HEALTH ok PASS
"""
import asyncio
import os
import tempfile
from pathlib import Path
from sqlalchemy import text
async def main() -> None:
# ── 0) 안전 가드: prod DB 오접속 차단 ─────────────────────────────────
from core.config import settings
url = settings.database_url
print("DATABASE_URL:", url)
assert os.getenv("BOOT_SMOKE") == "1", "SAFETY ABORT: BOOT_SMOKE=1 미설정"
# prod = '...@postgres:5432/pkm' (user pkm). ephemeral = bootsmoke 호스트 / localhost / postgres user.
assert "@postgres:" not in url and "@postgres/" not in url, f"SAFETY ABORT: prod DB 로 보임: {url}"
assert ("bootsmoke" in url) or ("localhost" in url) or ("127.0.0.1" in url), \
f"SAFETY ABORT: ephemeral 마커(bootsmoke/localhost) 없음: {url}"
# ── 1) 부작용 3개 중립화 (검증 대상 로직 보존) ───────────────────────
# prewarm: AI 라우터 미호출
import services.search.query_analyzer as qa
async def _noop_prewarm(*a, **k):
return None
qa.prewarm_analyzer = _noop_prewarm
# scheduler.start/shutdown no-op + start 캡처로 잡 개수 집계
from apscheduler.schedulers.asyncio import AsyncIOScheduler
captured: dict = {}
_orig_init = AsyncIOScheduler.__init__
def _init(self, *a, **k):
_orig_init(self, *a, **k)
captured["sched"] = self
AsyncIOScheduler.__init__ = _init
AsyncIOScheduler.start = lambda self, *a, **k: None
AsyncIOScheduler.shutdown = lambda self, *a, **k: None
# NAS 체크 통과용 임시 마운트
tmp = tempfile.mkdtemp(prefix="bootsmoke-nas-")
(Path(tmp) / "PKM").mkdir(parents=True, exist_ok=True)
settings.nas_mount_path = tmp
print("nas_mount_path(override):", tmp)
# ── 2) import main = 전 router import + app 빌드 ──────────────────────
import main
route_count = len(main.app.routes)
print(f"IMPORTS OK — main 빌드, app.routes={route_count}")
assert route_count > 50, f"라우트 수 비정상({route_count}) — 라우터 누락 의심"
# ── 3) lifespan startup 실행 (init_db + 전 worker import + 전 add_job) ─
cm = main.lifespan(main.app)
await cm.__aenter__()
sched = captured.get("sched")
jobs = sched.get_jobs() if sched else []
job_ids = sorted(j.id for j in jobs)
print(f"LIFESPAN startup OK — 등록 잡 {len(jobs)}")
print(" job_ids:", ", ".join(job_ids))
assert len(jobs) >= 30, f"잡 등록 수 비정상({len(jobs)})"
for required in ("purge_sweep", "auto_review", "queue_consumer", "statute_collector"):
assert required in job_ids, f"필수 잡 누락: {required}"
# ── 4) 스키마 상태 (lifespan 의 실 init_db 가 359/360/361 적용했는지) ──
from core.database import async_session, engine
async with async_session() as s:
docs = (await s.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))).scalar()
purge = (await s.execute(text(
"SELECT count(*) FROM information_schema.columns "
"WHERE table_name='documents' AND column_name='purge_requested_at'"))).scalar()
cand = (await s.execute(text(
"SELECT count(*) FROM information_schema.tables "
"WHERE table_name LIKE 'documents_cand_qwen%'"))).scalar()
uq = (await s.execute(text(
"SELECT count(*) FROM pg_indexes WHERE indexname='uq_attempt_session_question'"))).scalar()
mx = (await s.execute(text("SELECT max(version) FROM schema_migrations"))).scalar()
print(f"SCHEMA OK — max_migration={mx} documents={docs} purge_col={purge} cand_qwen={cand} attempt_uq={uq}")
assert docs and purge == 1 and cand == 0 and uq == 1 and mx == 361, "FAIL: 기대 스키마 상태 불일치"
# ── 5) /health 직접 호출 ──────────────────────────────────────────────
health = await main.health_check()
print("HEALTH:", health)
assert health["status"] == "ok" and health["database"] == "connected", "FAIL: health degraded"
# ── 6) 정리 ───────────────────────────────────────────────────────────
await cm.__aexit__(None, None, None)
await engine.dispose()
print("RESULT: PASS — 전체 app 부팅(import·init_db·잡등록·health) 검증")
asyncio.run(main())
+51
View File
@@ -0,0 +1,51 @@
"""init_db() baseline 부팅 런타임 검증 (R1) — psql migration_smoke 가 못 잡는 asyncpg 경로 확인.
migration_smoke.sh(psql) SQL 유효성만 검증한다. init_db asyncpg exec_driver_sql(prepared)
경로라 multi-statement 불허 baseline raw asyncpg 적재 skip/stamp/멱등 이걸 실측한다.
실행 (worktree 루트):
python3.11 -m venv /tmp/v && /tmp/v/bin/pip install -q "sqlalchemy[asyncio]>=2" asyncpg pydantic pyyaml
docker run -d --name idb -p 55432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust pgvector/pgvector:pg16
docker exec idb psql -U postgres -c "CREATE DATABASE pkm"
ln -sfn ../migrations app/migrations # Docker 의 /app/migrations 레이아웃 모사 (테스트 후 rm)
PYTHONPATH=app DATABASE_URL="postgresql+asyncpg://postgres@localhost:55432/pkm" /tmp/v/bin/python scripts/ci/initdb_runtime_test.py
rm -f app/migrations; docker rm -f idb
기대: 1st OK(documents=True·purge_col=1·cand_qwen=0·attempt_unique=1), 2nd 멱등동일=True.
"""
import asyncio
from sqlalchemy import text
async def main():
from core.config import settings
url = settings.database_url
print("effective DATABASE_URL:", url)
assert "localhost" in url or "127.0.0.1" in url, f"SAFETY ABORT non-local: {url}"
from core.database import init_db, async_session, engine
print("=== 1st init_db (fresh DB) ===")
await init_db()
async with async_session() as s:
cnt = (await s.execute(text("SELECT count(*) FROM schema_migrations"))).scalar()
mx = (await s.execute(text("SELECT max(version) FROM schema_migrations"))).scalar()
bl = (await s.execute(text("SELECT count(*) FROM schema_migrations WHERE name LIKE 'baseline:%'"))).scalar()
docs = (await s.execute(text("SELECT to_regclass('public.documents') IS NOT NULL"))).scalar()
purge = (await s.execute(text("SELECT count(*) FROM information_schema.columns WHERE table_name='documents' AND column_name='purge_requested_at'"))).scalar()
cand = (await s.execute(text("SELECT count(*) FROM information_schema.tables WHERE table_name LIKE 'documents_cand_qwen%'"))).scalar()
uq = (await s.execute(text("SELECT count(*) FROM pg_indexes WHERE indexname='uq_attempt_session_question'"))).scalar()
print(f" schema_migrations count={cnt} max={mx} baseline_stamped={bl}")
print(f" documents={docs} purge_col={purge} cand_qwen_tables={cand} attempt_unique={uq}")
assert docs and purge == 1 and cand == 0 and uq == 1, "FAIL: 기대 스키마 상태 불일치"
print("=== 2nd init_db (rerun = baseline skip + 멱등) ===")
await init_db()
async with async_session() as s:
cnt2 = (await s.execute(text("SELECT count(*) FROM schema_migrations"))).scalar()
assert cnt == cnt2, "FAIL: 멱등 아님 (재실행이 schema_migrations 변경)"
print(f" count={cnt2} 멱등동일={cnt == cnt2}")
print("RESULT: PASS — init_db baseline 부팅/멱등 검증")
await engine.dispose()
asyncio.run(main())
+138
View File
@@ -0,0 +1,138 @@
#!/usr/bin/env bash
# migration_smoke.sh — fresh-DB + DR enum-same-txn 게이트 (plan ds-backend-audit-1 R0)
#
# app/core/database.py 의 init_db() 는 모든 pending migration 을 단일 트랜잭션
# (`async with engine.begin()`) 으로 적용한다. 이 스크립트는 그 경로를 미러해
# migrations/ 전체가 빈 DB / DR 업그레이드에서 한 트랜잭션으로 적용 가능한지 검증한다.
#
# 시나리오:
# FRESH — 빈 DB 에 migrations/ 전체를 단일 트랜잭션으로 적용 (신규 환경 부팅 경로)
# DR — 001~319 를 커밋(과거 운영 DB 모사) 후 320~end 를 단일 트랜잭션으로 적용
# (pre-320 백업/지연 복제를 320 경계 너머로 catch-up 업그레이드하는 재해복구 경로)
#
# enum-same-txn 결함(ALTER TYPE ADD VALUE 한 값을 같은 트랜잭션에서 사용)이 있으면
# 두 시나리오 모두 'unsafe use of new value' 로 abort 한다.
# R1(enum-barrier) fix 후에는 두 시나리오 모두 PASS 해야 한다.
#
# prod 동일 이미지(pg16)로 핀. 의존: docker.
# 사용: scripts/ci/migration_smoke.sh (ephemeral 컨테이너 자동 기동/정리)
set -uo pipefail
IMAGE="pgvector/pgvector:pg16"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
MIG_DIR="$(cd "$SCRIPT_DIR/../../migrations" && pwd)"
CNAME="ds-mig-smoke-$$"
DB="pkm" # 358 의 ALTER DATABASE pkm 가 이 이름을 요구
cleanup() { docker rm -f "$CNAME" >/dev/null 2>&1 || true; }
trap cleanup EXIT
# 버전순 마이그레이션 파일 목록 (NNN_ 3자리 zero-pad → lexical = numeric)
# bash 3.2(macOS) 호환 — mapfile 미사용
MIGS=()
while IFS= read -r _line; do MIGS+=("$_line"); done < <(ls "$MIG_DIR"/[0-9]*.sql | sort)
[ "${#MIGS[@]}" -gt 0 ] || { echo "FATAL: migrations 없음 ($MIG_DIR)"; exit 2; }
echo "migrations: ${#MIGS[@]}건 ($(basename "${MIGS[0]}") ~ $(basename "${MIGS[$((${#MIGS[@]}-1))]}"))"
psql_exec() { docker exec -i "$CNAME" psql -U postgres -v ON_ERROR_STOP=1 "$@"; }
# 주어진 파일 범위를 단일 트랜잭션 스트림으로 묶어 출력 (psql stdin 용)
# 각 파일 앞에 \echo 마커 — 실패 시 마지막 마커가 깨진 마이그레이션.
emit_single_txn() {
echo '\set ON_ERROR_STOP on'
echo 'BEGIN;'
for f in "$@"; do
echo "\\echo >>>APPLY $(basename "$f")"
cat "$f"; echo
done
echo 'COMMIT;'
}
# 자동커밋(파일별 즉시 커밋) 스트림 — DR phase1 (기존 운영 DB 모사)
emit_autocommit() {
echo '\set ON_ERROR_STOP on'
for f in "$@"; do
echo "\\echo >>>APPLY $(basename "$f")"
cat "$f"; echo
done
}
reset_db() {
psql_exec -d postgres -c "DROP DATABASE IF EXISTS $DB" >/dev/null 2>&1
psql_exec -d postgres -c "CREATE DATABASE $DB" >/dev/null
}
run_scenario() {
local name="$1"; shift
local out rc last_apply
out="$( "$@" 2>&1 )"; rc=$?
last_apply="$(printf '%s\n' "$out" | grep '>>>APPLY' | tail -1 | sed 's/>>>APPLY //')"
if [ "$rc" -eq 0 ]; then
echo " [$name] PASS — 전체 적용 성공"
return 0
else
echo " [$name] FAIL — 깨진 지점: ${last_apply:-?}"
printf '%s\n' "$out" | grep -iE 'ERROR|unsafe|HINT' | head -3 | sed 's/^/ /'
return 1
fi
}
BASELINE_CUTOFF=358
BASELINE_FILE="$MIG_DIR/_baseline/0358_schema_baseline.sql"
# post-baseline(버전 > cutoff) 마이그 파일만 출력
_post_baseline() {
local f base ver
for f in "${MIGS[@]}"; do
base="$(basename "$f")"; ver="${base%%_*}"; ver="$((10#$ver))"
[ "$ver" -gt "$BASELINE_CUTOFF" ] && printf '%s\n' "$f"
done
}
# FRESH — init_db fresh 경로 미러: baseline 적재 + post-baseline 을 단일 트랜잭션
scenario_fresh() {
reset_db
local post=(); while IFS= read -r f; do post+=("$f"); done < <(_post_baseline)
{
echo '\set ON_ERROR_STOP on'; echo 'BEGIN;'
echo "\\echo >>>APPLY _baseline"
cat "$BASELINE_FILE"; echo
for f in "${post[@]}"; do
echo "\\echo >>>APPLY $(basename "$f")"; cat "$f"; echo
done
echo 'COMMIT;'
} | psql_exec -d "$DB"
}
# INCREMENTAL — 기존 운영 DB(at cutoff) 모사: baseline 커밋 후 post-baseline 을 별 트랜잭션
scenario_dr() {
reset_db
if ! { echo '\set ON_ERROR_STOP on'; cat "$BASELINE_FILE"; } | psql_exec -d "$DB" >/dev/null 2>&1; then
printf '%s\n' ">>>APPLY _baseline"; echo "baseline 적재 실패"; return 1
fi
local post=(); while IFS= read -r f; do post+=("$f"); done < <(_post_baseline)
emit_single_txn "${post[@]}" 2>/dev/null | psql_exec -d "$DB"
}
# ── 컨테이너 기동 ──
echo "기동: $IMAGE ($CNAME)"
docker run -d --name "$CNAME" -e POSTGRES_PASSWORD=x -e POSTGRES_HOST_AUTH_METHOD=trust "$IMAGE" >/dev/null
for _ in $(seq 1 40); do docker exec "$CNAME" pg_isready -U postgres -q 2>/dev/null && break; sleep 0.5; done
echo "pg: $(docker exec "$CNAME" psql -U postgres -tAc 'show server_version' 2>/dev/null)"
echo
fail=0
echo "── FRESH (baseline 적재 + post-baseline 단일 트랜잭션 = init_db fresh 경로) ──"
run_scenario FRESH scenario_fresh || fail=1
echo
echo "── INCREMENTAL (baseline 커밋 후 post-baseline 별 트랜잭션 = 기존 DB 증분) ──"
run_scenario DR scenario_dr || fail=1
echo
if [ "$fail" -eq 0 ]; then
echo "RESULT: PASS — fresh/incremental 모두 baseline+post-baseline 적용 가능"
exit 0
else
echo "RESULT: FAIL — baseline/post-baseline 적용 불가 (위 지점)"
exit 1
fi