♟️ Конвейер партий и машины состояний

Эта страница — про «партийную» сторону синхронизации: как одна-единственная партия проходит путь от обнаружения до архива и какая локальная модель данных этим путём управляет. Соседняя страница граф игроков отвечает за то, как партии вообще обнаруживаются (seeds → BFS по оппонентам → POST /api/player/history); здесь мы берём уже найденный gameId и доводим его до строки в аналитике и до записи в bronze-архиве.

Весь код живёт в dicechess-sync (src/catalog.ts, src/pipeline.ts, src/archive-push.ts, src/crawl.ts). Состояние конвейера целиком хранится в локальном SQLite — это делает каждый прогон возобновляемым: убили процесс на середине бэкфилла — следующий запуск подхватит с того же места, потому что «где мы остановились» записано в таблицах, а не в памяти.

Сквозной путь одной партии

discoveredFETCH (game-move-history) → fetchedNORMALIZE + POST (POST /api/games) → postedARCHIVE (зеркалирование сырья на dexus). Параллельные ветки: rejected (движок сказал 422), error (транзиентный сбой с таймером бэкоффа) и skipped (отменённая партия).


1. Локальная модель данных (bronze-кэш)

Локальная база — это bronze-слой: сырые ответы API хранятся дословно, до всякой нормализации. Партийная сторона использует две таблицы (src/db/migrations/0001_init.sql, 0002_archive.sql):

  • games — лёгкий каталог: одна строка на партию, быстрый для очередей и дедупа. Здесь живёт состояние конвейера (status, error_stage, next_retry_at) и денормализованные метаданные партии (игроки, время старта, тайм-контроль, результат).
  • raw_game_data — тяжёлые блобы (сырой JSON истории ходов). Читается только на стадиях normalize/assemble и archive. Связана с games по game_id через ON DELETE CASCADE — удаление партии из каталога уносит и её сырьё.
erDiagram
    games ||--|| raw_game_data : "game_id (1:1, CASCADE)"

    games {
        TEXT    game_id PK "dicechess UUID == id и external_id в analytics"
        TEXT    status "discovered|fetched|posted|rejected|error|skipped"
        TEXT    error_stage "fetch|normalize|post (когда status=error)"
        INTEGER white_user_id "мягкая связь -> players.user_id"
        INTEGER black_user_id
        INTEGER discovered_from "чья история вскрыла партию первой"
        INTEGER started_at_ms "старт партии (unix ms): порядок + инкремент"
        INTEGER allow_doubling "0/1 -> mode classic/x2"
        INTEGER result "перспектива БЕЛЫХ: 1 win | 0 draw | -1 loss"
        INTEGER time_initial_sec "timeLimit * 60"
        INTEGER time_increment_sec "timeBonus"
        INTEGER is_cancelled "0/1"
        TEXT    post_result "created | exists (analytics 201/200)"
        TEXT    reject_reason "тело 422"
        INTEGER attempts
        TEXT    last_error
        TEXT    next_retry_at "ISO, backoff"
        TEXT    created_at
        TEXT    updated_at
    }

    raw_game_data {
        TEXT    game_id PK "FK -> games(game_id) ON DELETE CASCADE"
        TEXT    history_meta_json "запись player/history: тайм-контроль, рейтинги, деньги, игроки"
        INTEGER meta_source_user_id "из чьей перспективы взяты метаданные"
        TEXT    move_history_json "сырой ответ game-move-history; NULL до fetch"
        TEXT    move_history_fetched_at "ISO"
        INTEGER byte_size "размер сырья в байтах"
        TEXT    archived_at "ISO; NULL = ещё не зеркалировано на dexus"
        TEXT    created_at
    }

Почему две таблицы, а не одна

Каталог games опрашивается на каждом тике конвейера (что фетчить, что постить, что архивировать), а блоб move_history_json весит десятки килобайт. Держать их раздельно — значит, что очереди по status бегут по узкому каталогу с индексами и не таскают за собой мегабайты сырья. Индексы каталога: ix_games_status (status, next_retry_at) для очередей с бэкоффом, плюс ix_games_white/ix_games_black для связи с игроками.

Частичный индекс под архивную очередь (v2)

Миграция 0002_archive.sql добавила колонку archived_at и частичный индекс именно под фронтир зеркалирования — строки, чьё сырьё скачано, но ещё не уехало в архив:

ALTER TABLE raw_game_data ADD COLUMN archived_at TEXT;
 
CREATE INDEX ix_raw_archive_pending ON raw_game_data (game_id)
  WHERE archived_at IS NULL AND move_history_json IS NOT NULL;

Индекс индексирует только строки, ждущие пуша. Когда бэкфилл уже почти весь зеркалирован, этот индекс остаётся крошечным, и nextArchiveBatch находит «что осталось» мгновенно, не сканируя миллионы уже-архивированных строк.

Соединение БД ( src/db/db.ts)

SQLite открывается в режиме WAL (PRAGMA journal_mode = WAL) — много читателей при одном писателе (краулер). Дополнительно PRAGMA foreign_keys = ON (чтобы ON DELETE CASCADE реально работал) и PRAGMA busy_timeout = 5000. Все мутации каталога обёрнуты в транзакцию tx() с откатом при исключении.


2. Машина состояний партии

Колонка games.status — это явный конечный автомат (GameStatus в src/types.ts). Все переходы делаются одним SQL-стейтментом в Catalog (src/catalog.ts); никакого состояния «в воздухе» нет.

stateDiagram-v2
    [*] --> discovered : upsertDiscovered (не отменённая)
    [*] --> skipped : upsertDiscovered (is_cancelled)

    discovered --> fetched : markFetched (сырьё сохранено)
    discovered --> error : markError (fetch) — таймер next_retry_at

    error --> fetched : ретрай fetch удался
    error --> fetched : replayNormalizeFailures (после фикса кода)

    fetched --> posted : markPosted (analytics 201/200)
    fetched --> rejected : markRejected (analytics 422)
    fetched --> error : markError (post 5xx/сеть) — таймер
    fetched --> error : markError (normalize) — БЕЗ таймера

    rejected --> fetched : replayRejected (после бампа движка)

    posted --> [*]
    skipped --> [*]

Расшифровка состояний и ветвлений:

  • discovered — партия занесена в каталог из чьей-то истории, сырьё ещё не скачано. raw_game_data.move_history_json тут NULL.
  • fetched — сырьё game-move-history скачано и сохранено дословно; партия ждёт нормализации и POST.
  • posted — аналитика приняла партию. post_result фиксирует, как приняла: created (HTTP 201, новая строка) или exists (HTTP 200, уже была).
  • rejected — движок аналитики отверг реплей (HTTP 422). Тело ответа (до 500 символов) лежит в reject_reason. Это состояние терминально до бампа движка — см. реплей. Подробности контракта — в 07 Контракт ingest и валидация движком.
  • error — сбой, который, возможно, рассосётся сам. Колонка error_stage говорит, где упало: fetch | normalize | post. Счётчик attempts инкрементится; last_error хранит сообщение.
  • skipped — партия отменена на сайте (is_cancelled). В конвейер не попадает вообще.

Бэкофф через next_retry_at

markError пишет next_retry_at (ISO-время в будущем). Очередь фетча (nextToFetch) подбирает error/fetch-партии только когда next_retry_at IS NULL OR next_retry_at <= :now — то есть таймер истёк. Дефолтная задержка DEFAULT_RETRY_MS = 5 * 60_000 (5 минут).

normalize-ошибки паркуются БЕЗ таймера

Для стадии normalize markError вызывается с nextRetryAt = null. Это сознательно: ошибка нормализатора — это не транзиент, а баг в нашем коде. Никакой таймер её не починит — нужна правка нормализатора и затем массовый реплей (replayNormalizeFailures). Поэтому такие партии «лежат припаркованными» и не крутят бесполезные ретраи.

Предохранитель: 5 подряд ошибок (circuit breaker)

В src/crawl.ts каждая стадия (enumerate / fetch / post) считает подряд идущие ошибки и обрывает фазу после MAX_CONSECUTIVE_ERRORS = 5:

const MAX_CONSECUTIVE_ERRORS = 5;
// ...
if ((consecutive += 1) >= MAX_CONSECUTIVE_ERRORS) break;

Любой успех сбрасывает счётчик в ноль. Логика: пять подряд сбоев — это почти наверняка не «одна кривая партия», а системная беда (протух JWT, Cloudflare выкатил challenge, аналитика лежит). Молотить по сайту в такой ситуации бессмысленно и вредно. На post-стадии rejected (422) не считается ошибкой предохранителя — аналитика ведь ответила, просто отвергла; счётчик при created/exists/rejected сбрасывается, а normalize_error его не трогает (это наш баг, не сбой связи).


3. Стадии конвейера

FETCH — скачать сырьё

Очередь: Catalog.nextToFetch(now) возвращает один game_id — самую свежую партию (ORDER BY started_at_ms DESC), которая либо в discovered, либо в error/fetch с истёкшим таймером. По одной за раз — это горячий путь через rate-limiter (single-flight).

fetchGame (src/pipeline.ts) делает запрос через инъектированный getMoveHistory (тот же GET /api/game-move-history, что у observer) и сохраняет ответ дословно:

export async function fetchGame(deps: FetchDeps, gameId: string): Promise<void> {
  const raw = await deps.getMoveHistory(gameId);
  deps.catalog.markFetched(gameId, JSON.stringify(raw), deps.now());
}

markFetched в одной транзакции записывает move_history_json + move_history_fetched_at + byte_size в raw_game_data и переводит games.status в fetched, обнуляя error_stage/last_error/next_retry_at. Сырьё хранится без изменений именно для того, чтобы потом переконвертировать его, не трогая сайт.

NORMALIZE — кратко

Нормализация (normalizeStateMap, src/normalize.ts) превращает «грязный» gameMoveHistoryStateMap сайта в чистую модель аналитики: разбивает state-map на ходы (turn = бросок кубиков + до трёх микро-ходов) и события (DOUBLE_OFFER, DRAW_ACCEPT и т.п.), выводит termination и initial/final FEN.

Это дешёвый локальный шаг внутри POST — он не имеет своего состояния каталога. Если нормализатор бросает исключение (битый state-map, неизвестное значение кубика), партия паркуется как error/normalize без таймера. А вот полная валидация реплеем движком — это уже не здесь: её делает аналитика на приёме, и она отдельная тема — см. 07 Контракт ingest и валидация движком.

Результат — авторитетно из метаданных истории, не из доски

Нормализатор может оставить result = null (например, при сдаче доска не доиграна). Поэтому авторитетный исход берётся из history_meta_json — записи player/history — в перспективе игрока и переводится в перспективу белых (1 win | 0 draw | -1 loss), как хранит games.result и ждёт аналитика.

POST — нормализовать из кэша и отправить

Очередь: Catalog.nextBatchToPost(limit) отдаёт пачку fetched-партий, сджойненных с их сырьём (games g JOIN raw_game_data r USING (game_id)), снова свежайшие первыми. Каждую партию обрабатывает postGame (src/pipeline.ts): парсит кэш → normalizeStateMapbuildPlayer для белых и чёрных → assembleGameIngestingest(wire).

buildPlayer собирает лучшую известную идентичность игрока: external_id = String(userId), имя/тип из фронтира (фолбэк player_type: userId < 0 ? 'bot' : 'human'), рейтинг — из записи истории. Подробности конвенции идентичности — в 08 Идентичность, источники и дедупликация.

Маппинг HTTP-ответа аналитики на исход (PostOutcome) и состояние каталога:

HTTP-ответPostOutcomeСостояние / запись
201 Createdcreatedposted, post_result = created
200 OKexistsposted, post_result = exists (партия уже была)
422 Unprocessablerejectedrejected, reject_reason = тело (≤ 500 симв.)
5xx / сетьerrorerror/post, next_retry_at = +5 мин
наше исключение парсинга/нормализацииnormalize_errorerror/normalize, next_retry_at = null (припарковано)

ARCHIVE — зеркалировать сырьё на dexus

Последняя стадия (pushRawToArchive, src/archive-push.ts) копирует кэшированное сырьё в центральный bronze-архив на dexus (192.168.10.4). Очередь — Catalog.nextArchiveBatch(limit) (дефолт 500), которая берёт строки с archived_at IS NULL AND move_history_json IS NOT NULL через тот самый частичный индекс. Для каждой строки собирается bundle из двух артефактов:

const bundle: RawBundle = {
  source: 'dicechess.com',
  externalId: r.game_id,
  whiteUserId: ..., blackUserId: ...,
  startedAt: ..., fetchedAt: ..., metaFetchedAt: ...,
  artifacts: {
    history_meta: r.history_meta_json ? JSON.parse(...) : null,
    move_history: JSON.parse(r.move_history_json),
  },
};
await deps.archive.put(bundle);     // first-writer-wins на стороне архива
deps.catalog.markArchived(r.game_id, now);

Два артефакта — history_meta (метаданные обнаружения) и move_history (сама партия) — приходят в разное время и хранятся вместе. После успешного archive.put строка помечается markArchived (пишет archived_at). Локальный SQLite-кэш — durable источник истины: если dexus недоступен, проход останавливается на первой ошибке, непушнутые строки остаются висеть pending и уедут следующим проходом — ничего не теряется (pendingArchiveCount всегда покажет хвост).


4. Связки стадий и очередей

sequenceDiagram
    participant Sync as dicechess-sync
    participant Site as dicechess.com<br/>(за Cloudflare)
    participant DB as SQLite (bronze)
    participant An as analytics (aurora)
    participant Arc as raw-archive (dexus)

    Note over DB: партия в статусе discovered

    Sync->>DB: nextToFetch(now)
    DB-->>Sync: game_id (свежайший)
    Sync->>Site: GET /api/game-move-history (через limiter)
    Site-->>Sync: сырой state-map
    Sync->>DB: markFetched (raw verbatim + status=fetched)

    Sync->>DB: nextBatchToPost(limit)
    DB-->>Sync: fetched-партии JOIN сырьё
    Note over Sync: normalizeStateMap + buildPlayer + assemble (локально)
    Sync->>An: POST /api/games (GameIngestWire)
    An-->>Sync: 201 created / 200 exists / 422 rejected
    Sync->>DB: markPosted | markRejected | markError

    Sync->>DB: nextArchiveBatch(500)
    DB-->>Sync: pending-сырьё (archived_at IS NULL)
    Sync->>Arc: archive.put(bundle{history_meta, move_history})
    Arc-->>Sync: ok (first-writer-wins)
    Sync->>DB: markArchived (archived_at)

Обрати внимание: запрос к сайту — единственный шаг, который трогает защищённый Cloudflare (через [[06 Защита от блокировок и Cloudflare|curl-subprocess и limiter]]). POST в аналитику и пуш в архив идут по LAN. Это и есть главный смысл разделения на стадии: дорогой и хрупкий fetch отделён от дешёвых локальных шагов, и его результат закэширован.


5. Сырой кэш — суперсила реплея

Раз сырьё хранится дословно, нормализацию и POST можно переиграть без единого нового запроса к сайту. Две операции Catalog просто перекидывают партии обратно в fetched, откуда POST-стадия подберёт их снова и переконвертирует из кэша:

// re-queue все 422-отвергнутые партии (после бампа движка)
replayRejected(nowIso): number   // UPDATE ... SET status='fetched', reject_reason=NULL, attempts=0
                                  //   WHERE status='rejected'
 
// re-queue всё, на чём подавился наш нормализатор (после фикса кода)
replayNormalizeFailures(nowIso): number  // UPDATE ... SET status='fetched', error_stage=NULL, attempts=0
                                         //   WHERE status='error' AND error_stage='normalize'

Зачем это нужно на практике:

  • Бамп движка. Аналитика отвергла партию 422 (например, баг реплея en passant — engine#351 — или хардкод рокировки в Chess960). Выходит релиз движка, где это починено → replayRejected отправляет все rejected-партии на повторный POST. Те, что теперь валидны, станут posted; остальные снова осядут в rejected.
  • Фикс нормализатора. Мы поправили эвристику разбора ходов / FEN → replayNormalizeFailures возвращает припаркованные партии в очередь, и они переконвертируются по уже скачанному сырью.

Это и есть ценность bronze-слоя

game-move-history — это point-in-time снимок: после 422 партию уже не запросить заново (доска могла измениться, партия удалиться). Дословно сохранённое сырьё (raw_game_data локально + bronze-архив на dexus) делает каждый 422 и каждый баг парсера восстановимым — переконвертируем всю историю локально и перезальём, ни разу не обратившись к защищённому сайту.


6. Идемпотентность на каждом стыке

Конвейер устроен так, что повторный прогон безопасен на всех трёх границах:

  1. Каталог: INSERT OR IGNORE — первая перспектива побеждает. Одна и та же партия видна из истории обоих игроков (game-move-history симметрична). upsertDiscovered делает INSERT OR IGNORE INTO games — кто занёс первым, та строка и остаётся; вторая перспектива молча игнорируется. Колонка discovered_from запоминает, чья история вскрыла партию. Так партия каталогизируется ровно один раз.

  2. Аналитика: 201 vs 200. На стороне аналитики id (dicechess UUID) — первичный ключ. Первый POST создаёт строку (201 → created), повторный получает 200 (exists) и ничего не перезаписывает. Это и реализует first-writer-wins: один и тот же gameId могут постить observer, dicechess-sync и extension — кто первый, тот и определяет строку и идентичности игроков; остальные получают 200 вхолостую.

  3. Архив: first-writer-wins + archived_at. На стороне архива ключ — (source, externalId), и put тоже first-writer-wins (если observer уже записал ту же партию, наш put ничего не портит). Локально archived_at гарантирует, что мы не пушим одно и то же дважды: пере-запуск pushRawToArchive просто доливает то, что ещё NULL. Один и тот же механизм покрывает и бэкфилл, и текущую работу.


Что осталось за рамками этой страницы

Как эксплуатировать сервис (CLI-команды, env, демон-цикл, Docker) — 09 Pipeline - dicechess-sync.