♟️ Конвейер партий и машины состояний
Эта страница — про «партийную» сторону синхронизации: как одна-единственная партия проходит путь
от обнаружения до архива и какая локальная модель данных этим путём управляет. Соседняя страница
граф игроков отвечает за то, как партии вообще обнаруживаются
(seeds → BFS по оппонентам → POST /api/player/history); здесь мы берём уже найденный gameId
и доводим его до строки в аналитике и до записи в bronze-архиве.
Весь код живёт в dicechess-sync (src/catalog.ts, src/pipeline.ts, src/archive-push.ts,
src/crawl.ts). Состояние конвейера целиком хранится в локальном SQLite — это делает каждый прогон
возобновляемым: убили процесс на середине бэкфилла — следующий запуск подхватит с того же места,
потому что «где мы остановились» записано в таблицах, а не в памяти.
Сквозной путь одной партии
discovered→ FETCH (game-move-history) →fetched→ NORMALIZE + POST (POST /api/games) →posted→ ARCHIVE (зеркалирование сырья на dexus). Параллельные ветки:rejected(движок сказал 422),error(транзиентный сбой с таймером бэкоффа) иskipped(отменённая партия).
1. Локальная модель данных (bronze-кэш)
Локальная база — это bronze-слой: сырые ответы API хранятся дословно, до всякой нормализации.
Партийная сторона использует две таблицы (src/db/migrations/0001_init.sql, 0002_archive.sql):
games— лёгкий каталог: одна строка на партию, быстрый для очередей и дедупа. Здесь живёт состояние конвейера (status,error_stage,next_retry_at) и денормализованные метаданные партии (игроки, время старта, тайм-контроль, результат).raw_game_data— тяжёлые блобы (сырой JSON истории ходов). Читается только на стадиях normalize/assemble и archive. Связана сgamesпоgame_idчерезON DELETE CASCADE— удаление партии из каталога уносит и её сырьё.
erDiagram games ||--|| raw_game_data : "game_id (1:1, CASCADE)" games { TEXT game_id PK "dicechess UUID == id и external_id в analytics" TEXT status "discovered|fetched|posted|rejected|error|skipped" TEXT error_stage "fetch|normalize|post (когда status=error)" INTEGER white_user_id "мягкая связь -> players.user_id" INTEGER black_user_id INTEGER discovered_from "чья история вскрыла партию первой" INTEGER started_at_ms "старт партии (unix ms): порядок + инкремент" INTEGER allow_doubling "0/1 -> mode classic/x2" INTEGER result "перспектива БЕЛЫХ: 1 win | 0 draw | -1 loss" INTEGER time_initial_sec "timeLimit * 60" INTEGER time_increment_sec "timeBonus" INTEGER is_cancelled "0/1" TEXT post_result "created | exists (analytics 201/200)" TEXT reject_reason "тело 422" INTEGER attempts TEXT last_error TEXT next_retry_at "ISO, backoff" TEXT created_at TEXT updated_at } raw_game_data { TEXT game_id PK "FK -> games(game_id) ON DELETE CASCADE" TEXT history_meta_json "запись player/history: тайм-контроль, рейтинги, деньги, игроки" INTEGER meta_source_user_id "из чьей перспективы взяты метаданные" TEXT move_history_json "сырой ответ game-move-history; NULL до fetch" TEXT move_history_fetched_at "ISO" INTEGER byte_size "размер сырья в байтах" TEXT archived_at "ISO; NULL = ещё не зеркалировано на dexus" TEXT created_at }
Почему две таблицы, а не одна
Каталог games опрашивается на каждом тике конвейера (что фетчить, что постить, что архивировать),
а блоб move_history_json весит десятки килобайт. Держать их раздельно — значит, что очереди по
status бегут по узкому каталогу с индексами и не таскают за собой мегабайты сырья. Индексы каталога:
ix_games_status (status, next_retry_at) для очередей с бэкоффом, плюс ix_games_white/ix_games_black
для связи с игроками.
Частичный индекс под архивную очередь (v2)
Миграция 0002_archive.sql добавила колонку archived_at и частичный индекс именно под фронтир
зеркалирования — строки, чьё сырьё скачано, но ещё не уехало в архив:
ALTER TABLE raw_game_data ADD COLUMN archived_at TEXT;
CREATE INDEX ix_raw_archive_pending ON raw_game_data (game_id)
WHERE archived_at IS NULL AND move_history_json IS NOT NULL;Индекс индексирует только строки, ждущие пуша. Когда бэкфилл уже почти весь зеркалирован,
этот индекс остаётся крошечным, и nextArchiveBatch находит «что осталось» мгновенно, не сканируя
миллионы уже-архивированных строк.
Соединение БД (
src/db/db.ts)SQLite открывается в режиме WAL (
PRAGMA journal_mode = WAL) — много читателей при одном писателе (краулер). ДополнительноPRAGMA foreign_keys = ON(чтобыON DELETE CASCADEреально работал) иPRAGMA busy_timeout = 5000. Все мутации каталога обёрнуты в транзакциюtx()с откатом при исключении.
2. Машина состояний партии
Колонка games.status — это явный конечный автомат (GameStatus в src/types.ts). Все переходы
делаются одним SQL-стейтментом в Catalog (src/catalog.ts); никакого состояния «в воздухе» нет.
stateDiagram-v2 [*] --> discovered : upsertDiscovered (не отменённая) [*] --> skipped : upsertDiscovered (is_cancelled) discovered --> fetched : markFetched (сырьё сохранено) discovered --> error : markError (fetch) — таймер next_retry_at error --> fetched : ретрай fetch удался error --> fetched : replayNormalizeFailures (после фикса кода) fetched --> posted : markPosted (analytics 201/200) fetched --> rejected : markRejected (analytics 422) fetched --> error : markError (post 5xx/сеть) — таймер fetched --> error : markError (normalize) — БЕЗ таймера rejected --> fetched : replayRejected (после бампа движка) posted --> [*] skipped --> [*]
Расшифровка состояний и ветвлений:
discovered— партия занесена в каталог из чьей-то истории, сырьё ещё не скачано.raw_game_data.move_history_jsonтутNULL.fetched— сырьёgame-move-historyскачано и сохранено дословно; партия ждёт нормализации и POST.posted— аналитика приняла партию.post_resultфиксирует, как приняла:created(HTTP 201, новая строка) илиexists(HTTP 200, уже была).rejected— движок аналитики отверг реплей (HTTP 422). Тело ответа (до 500 символов) лежит вreject_reason. Это состояние терминально до бампа движка — см. реплей. Подробности контракта — в 07 Контракт ingest и валидация движком.error— сбой, который, возможно, рассосётся сам. Колонкаerror_stageговорит, где упало:fetch|normalize|post. Счётчикattemptsинкрементится;last_errorхранит сообщение.skipped— партия отменена на сайте (is_cancelled). В конвейер не попадает вообще.
Бэкофф через next_retry_at
markError пишет next_retry_at (ISO-время в будущем). Очередь фетча (nextToFetch) подбирает
error/fetch-партии только когда next_retry_at IS NULL OR next_retry_at <= :now — то есть таймер истёк.
Дефолтная задержка DEFAULT_RETRY_MS = 5 * 60_000 (5 минут).
normalize-ошибки паркуются БЕЗ таймераДля стадии
normalizemarkErrorвызывается сnextRetryAt = null. Это сознательно: ошибка нормализатора — это не транзиент, а баг в нашем коде. Никакой таймер её не починит — нужна правка нормализатора и затем массовый реплей (replayNormalizeFailures). Поэтому такие партии «лежат припаркованными» и не крутят бесполезные ретраи.
Предохранитель: 5 подряд ошибок (circuit breaker)
В src/crawl.ts каждая стадия (enumerate / fetch / post) считает подряд идущие ошибки и обрывает
фазу после MAX_CONSECUTIVE_ERRORS = 5:
const MAX_CONSECUTIVE_ERRORS = 5;
// ...
if ((consecutive += 1) >= MAX_CONSECUTIVE_ERRORS) break;Любой успех сбрасывает счётчик в ноль. Логика: пять подряд сбоев — это почти наверняка не «одна
кривая партия», а системная беда (протух JWT, Cloudflare выкатил challenge, аналитика лежит).
Молотить по сайту в такой ситуации бессмысленно и вредно. На post-стадии rejected (422) не считается
ошибкой предохранителя — аналитика ведь ответила, просто отвергла; счётчик при created/exists/rejected
сбрасывается, а normalize_error его не трогает (это наш баг, не сбой связи).
3. Стадии конвейера
FETCH — скачать сырьё
Очередь: Catalog.nextToFetch(now) возвращает один game_id — самую свежую партию (ORDER BY started_at_ms DESC), которая либо в discovered, либо в error/fetch с истёкшим таймером. По одной
за раз — это горячий путь через rate-limiter (single-flight).
fetchGame (src/pipeline.ts) делает запрос через инъектированный getMoveHistory (тот же
GET /api/game-move-history, что у observer) и сохраняет ответ дословно:
export async function fetchGame(deps: FetchDeps, gameId: string): Promise<void> {
const raw = await deps.getMoveHistory(gameId);
deps.catalog.markFetched(gameId, JSON.stringify(raw), deps.now());
}markFetched в одной транзакции записывает move_history_json + move_history_fetched_at + byte_size
в raw_game_data и переводит games.status в fetched, обнуляя error_stage/last_error/next_retry_at.
Сырьё хранится без изменений именно для того, чтобы потом переконвертировать его, не трогая сайт.
NORMALIZE — кратко
Нормализация (normalizeStateMap, src/normalize.ts) превращает «грязный» gameMoveHistoryStateMap
сайта в чистую модель аналитики: разбивает state-map на ходы (turn = бросок кубиков + до трёх
микро-ходов) и события (DOUBLE_OFFER, DRAW_ACCEPT и т.п.), выводит termination и initial/final FEN.
Это дешёвый локальный шаг внутри POST — он не имеет своего состояния каталога. Если нормализатор
бросает исключение (битый state-map, неизвестное значение кубика), партия паркуется как error/normalize
без таймера. А вот полная валидация реплеем движком — это уже не здесь: её делает аналитика на приёме,
и она отдельная тема — см. 07 Контракт ingest и валидация движком.
Результат — авторитетно из метаданных истории, не из доски
Нормализатор может оставить
result = null(например, при сдаче доска не доиграна). Поэтому авторитетный исход берётся изhistory_meta_json— записиplayer/history— в перспективе игрока и переводится в перспективу белых (1win |0draw |-1loss), как хранитgames.resultи ждёт аналитика.
POST — нормализовать из кэша и отправить
Очередь: Catalog.nextBatchToPost(limit) отдаёт пачку fetched-партий, сджойненных с их сырьём
(games g JOIN raw_game_data r USING (game_id)), снова свежайшие первыми. Каждую партию обрабатывает
postGame (src/pipeline.ts): парсит кэш → normalizeStateMap → buildPlayer для белых и чёрных →
assembleGameIngest → ingest(wire).
buildPlayer собирает лучшую известную идентичность игрока: external_id = String(userId), имя/тип из
фронтира (фолбэк player_type: userId < 0 ? 'bot' : 'human'),
рейтинг — из записи истории. Подробности конвенции идентичности — в
08 Идентичность, источники и дедупликация.
Маппинг HTTP-ответа аналитики на исход (PostOutcome) и состояние каталога:
| HTTP-ответ | PostOutcome | Состояние / запись |
|---|---|---|
| 201 Created | created | posted, post_result = created |
| 200 OK | exists | posted, post_result = exists (партия уже была) |
| 422 Unprocessable | rejected | rejected, reject_reason = тело (≤ 500 симв.) |
| 5xx / сеть | error | error/post, next_retry_at = +5 мин |
| наше исключение парсинга/нормализации | normalize_error | error/normalize, next_retry_at = null (припарковано) |
ARCHIVE — зеркалировать сырьё на dexus
Последняя стадия (pushRawToArchive, src/archive-push.ts) копирует кэшированное сырьё в центральный
bronze-архив на dexus (192.168.10.4). Очередь — Catalog.nextArchiveBatch(limit)
(дефолт 500), которая берёт строки с archived_at IS NULL AND move_history_json IS NOT NULL через
тот самый частичный индекс. Для каждой строки собирается bundle из двух артефактов:
const bundle: RawBundle = {
source: 'dicechess.com',
externalId: r.game_id,
whiteUserId: ..., blackUserId: ...,
startedAt: ..., fetchedAt: ..., metaFetchedAt: ...,
artifacts: {
history_meta: r.history_meta_json ? JSON.parse(...) : null,
move_history: JSON.parse(r.move_history_json),
},
};
await deps.archive.put(bundle); // first-writer-wins на стороне архива
deps.catalog.markArchived(r.game_id, now);Два артефакта — history_meta (метаданные обнаружения) и move_history (сама партия) — приходят
в разное время и хранятся вместе. После успешного archive.put строка помечается markArchived
(пишет archived_at). Локальный SQLite-кэш — durable источник истины: если dexus недоступен,
проход останавливается на первой ошибке, непушнутые строки остаются висеть pending и уедут следующим
проходом — ничего не теряется (pendingArchiveCount всегда покажет хвост).
4. Связки стадий и очередей
sequenceDiagram participant Sync as dicechess-sync participant Site as dicechess.com<br/>(за Cloudflare) participant DB as SQLite (bronze) participant An as analytics (aurora) participant Arc as raw-archive (dexus) Note over DB: партия в статусе discovered Sync->>DB: nextToFetch(now) DB-->>Sync: game_id (свежайший) Sync->>Site: GET /api/game-move-history (через limiter) Site-->>Sync: сырой state-map Sync->>DB: markFetched (raw verbatim + status=fetched) Sync->>DB: nextBatchToPost(limit) DB-->>Sync: fetched-партии JOIN сырьё Note over Sync: normalizeStateMap + buildPlayer + assemble (локально) Sync->>An: POST /api/games (GameIngestWire) An-->>Sync: 201 created / 200 exists / 422 rejected Sync->>DB: markPosted | markRejected | markError Sync->>DB: nextArchiveBatch(500) DB-->>Sync: pending-сырьё (archived_at IS NULL) Sync->>Arc: archive.put(bundle{history_meta, move_history}) Arc-->>Sync: ok (first-writer-wins) Sync->>DB: markArchived (archived_at)
Обрати внимание: запрос к сайту — единственный шаг, который трогает защищённый Cloudflare (через
[[06 Защита от блокировок и Cloudflare|curl-subprocess и limiter]]). POST в аналитику и пуш в архив
идут по LAN. Это и есть главный смысл разделения на стадии: дорогой и хрупкий fetch отделён от дешёвых
локальных шагов, и его результат закэширован.
5. Сырой кэш — суперсила реплея
Раз сырьё хранится дословно, нормализацию и POST можно переиграть без единого нового запроса к сайту.
Две операции Catalog просто перекидывают партии обратно в fetched, откуда POST-стадия подберёт их
снова и переконвертирует из кэша:
// re-queue все 422-отвергнутые партии (после бампа движка)
replayRejected(nowIso): number // UPDATE ... SET status='fetched', reject_reason=NULL, attempts=0
// WHERE status='rejected'
// re-queue всё, на чём подавился наш нормализатор (после фикса кода)
replayNormalizeFailures(nowIso): number // UPDATE ... SET status='fetched', error_stage=NULL, attempts=0
// WHERE status='error' AND error_stage='normalize'Зачем это нужно на практике:
- Бамп движка. Аналитика отвергла партию 422 (например, баг реплея en passant —
engine#351 — или хардкод рокировки в Chess960).
Выходит релиз движка, где это починено →
replayRejectedотправляет всеrejected-партии на повторный POST. Те, что теперь валидны, станутposted; остальные снова осядут вrejected. - Фикс нормализатора. Мы поправили эвристику разбора ходов / FEN →
replayNormalizeFailuresвозвращает припаркованные партии в очередь, и они переконвертируются по уже скачанному сырью.
Это и есть ценность bronze-слоя
game-move-history— это point-in-time снимок: после 422 партию уже не запросить заново (доска могла измениться, партия удалиться). Дословно сохранённое сырьё (raw_game_dataлокально + bronze-архив на dexus) делает каждый 422 и каждый баг парсера восстановимым — переконвертируем всю историю локально и перезальём, ни разу не обратившись к защищённому сайту.
6. Идемпотентность на каждом стыке
Конвейер устроен так, что повторный прогон безопасен на всех трёх границах:
-
Каталог:
INSERT OR IGNORE— первая перспектива побеждает. Одна и та же партия видна из истории обоих игроков (game-move-historyсимметрична).upsertDiscoveredделаетINSERT OR IGNORE INTO games— кто занёс первым, та строка и остаётся; вторая перспектива молча игнорируется. Колонкаdiscovered_fromзапоминает, чья история вскрыла партию. Так партия каталогизируется ровно один раз. -
Аналитика: 201 vs 200. На стороне аналитики
id(dicechess UUID) — первичный ключ. Первый POST создаёт строку (201 →created), повторный получает 200 (exists) и ничего не перезаписывает. Это и реализует first-writer-wins: один и тот жеgameIdмогут постить observer,dicechess-syncи extension — кто первый, тот и определяет строку и идентичности игроков; остальные получают 200 вхолостую. -
Архив: first-writer-wins +
archived_at. На стороне архива ключ —(source, externalId), иputтоже first-writer-wins (если observer уже записал ту же партию, нашputничего не портит). Локальноarchived_atгарантирует, что мы не пушим одно и то же дважды: пере-запускpushRawToArchiveпросто доливает то, что ещёNULL. Один и тот же механизм покрывает и бэкфилл, и текущую работу.
Что осталось за рамками этой страницы
- Как партии обнаруживаются (seeds, BFS по оппонентам,
player/history, курсорsynced_through_ms, таблицаplayers) — 04 Граф игроков и перечисление.- Rate-limiter и обход Cloudflare (single-flight, spacing+jitter, backoff, retry-on-throttle, рычаг
pageSize) — 06 Защита от блокировок и Cloudflare.- Внутренности контракта ingest и валидация реплеем (схема
GameIngest, что значит 422, правило частичного хода) — 07 Контракт ingest и валидация движком.- Центральный bronze-архив целиком (gzip BYTEA, provenance, ретеншн, топология) — Raw-архив — bronze-слой.
Как эксплуатировать сервис (CLI-команды, env, демон-цикл, Docker) — 09 Pipeline - dicechess-sync.