🛰️ Live-наблюдатель (dicechess-observer)

Наблюдатель (observer) — это реактивная половина конвейера добычи: он смотрит за партиями, которые идут на dicechess.com прямо сейчас, ловит момент их завершения и тут же нормализует и заливает результат в аналитику. Это зеркальная противоположность бэкфилл-краулера синхронизации: тот перечисляет всю историю задним числом, observer же видит только то, что произошло, пока он был запущен.

Репозиторий: dicechess-observer (Node/TypeScript, исполняется на ESM-нативном Node, файлы импортируются с расширением .ts). Деплой — долгоживущий сервис на rpi4 (192.168.10.9, residential IP, egress через Cloudflare).


1. Что такое observer и чем он не sync

Одно ядро, две стратегии

И observer, и sync скачивают канонический game-move-history, прогоняют его через одну и ту же идею нормализации и постят в один контракт ingest. Различается только то, как они узнают, какие партии забирать.

🛰️ Observer🕸️ Sync (бэкфилл)
Модельреактивная (live)проактивная (исторический обход)
Источник id партийPOST /api/games/active (что идёт сейчас)граф игроков + POST /api/player/history
Перечисление / спайдернетда (BFS по оппонентам от лидербордов)
Память о «виденных»in-memory Map, без долговечного курсораSQLite-состояние, резюмируемые курсоры
Видимостьтолько партии, идущие во время работывся история сайта
Узкое горлышко rate-limitпрактически нет (game-move-history свободен)player/history (см. 06 Защита от блокировок и Cloudflare)
Запускдолгоживущий сервис с панельюCLI-команды + демон

Ключевое следствие реактивности: у observer нет долговечного состояния. Множество «известных» партий (known) живёт только в оперативной памяти (Map<string, ActiveGame>, src/observer.ts:76). Перезапустили процесс — забыли всё, что было в работе; партии, завершившиеся в окно простоя, observer не увидит (их потом подберёт бэкфилл-sync). Это сознательный компромисс: observer оптимизирован под низкую задержку «партия закончилась → строка в аналитике», а полнота — забота краулера.


2. Цикл опроса (poll loop)

Сердце сервиса — метод Observer.loop() (src/observer.ts:122). Каждую итерацию:

  1. Опрашиваем активные партии. getActiveGames(jwt)POST /api/games/active. Тело запроса фильтрует по диапазону ставок (startBets) и допустимым контролям времени (allowedTimes), сортирует по BET_AMOUNT (src/dicechess.ts:12).
  2. Сливаем в known. Каждую живую партию кладём/обновляем в Map по id: for (const g of games) this.known.set(g.id, g).
  3. Считаем разницу множеств (set-difference). finishedIds(known.keys(), liveIds) (src/observer.ts:59) возвращает те id, что есть в known, но больше нет в живом списке — это и есть «партии, завершившиеся с прошлого опроса».
  4. Обрабатываем каждую завершённую через processFinished(game), попутно удаляя её из known (this.known.delete(id)).
  5. Сливаем спул-архив. В конце каждой итерации — archive.drain(): дозаливаем в bronze-архив бандлы, которые ранее не доехали (см. §4).
  6. Спим POLL_INTERVAL_MS (по умолчанию 15 000 мс) и повторяем.
flowchart TD
    A["POST /api/games/active"] --> B["liveIds = Set(g.id)"]
    B --> C["known.set(id, game)<br/>для каждой живой"]
    C --> D{"finishedIds:<br/>id ∈ known, id ∉ liveIds"}
    D -->|"для каждой завершённой"| E["known.delete(id)<br/>processFinished(game)"]
    D -->|"нет завершённых"| F
    E --> F["archive.drain()<br/>дозалив спула"]
    F --> G["sleep POLL_INTERVAL_MS (15s)"]
    G --> A

Почему set-difference, а не флаг состояния

Сайт не уведомляет о завершении — он просто перестаёт возвращать партию в /api/games/active. «Исчезла из живого списка» = «закончилась». Поэтому observer обязан помнить, что он уже видел живым: id, который был в known и пропал, и есть сигнал завершения. Если процесс не работал в момент исчезновения — сигнала не будет.

Ошибка опроса (например, сетевой сбой или 403 от Cloudflare) не роняет цикл: она ловится, инкрементит stats.errors и логируется, цикл продолжается со следующей итерации.


3. Обработка завершённой партии (processFinished)

processFinished(game) (src/observer.ts:149) — строго упорядоченная цепочка. Порядок шагов важен: сырьё архивируется раньше нормализации, чтобы баг нормализатора или недоступность аналитики не привели к потере данных.

sequenceDiagram
    participant L as loop
    participant S as dicechess.com
    participant A as ArchiveWriter (dexus)
    participant N as normalizeStateMap
    participant As as assembleGameIngest
    participant An as analytics (aurora)

    L->>S: GET /api/game-move-history?gameId
    S-->>L: gameMoveHistoryStateMap
    Note over L,A: 1. СНАЧАЛА сырьё
    L->>A: write(buildRawBundle(game, hist))
    Note right of A: write() НИКОГДА не бросает;<br/>при сбое — спул на диск
    L->>L: пустой stateMap? → skipped, выход
    L->>N: normalizeStateMap(sm, {allowDoubling})
    N-->>L: turns, events, result, termination, цвета
    L->>L: turns.length == 0? → skipped, выход
    L->>As: assembleGameIngest(game, norm)
    As-->>L: GameIngestWire (snake_case)
    alt dryRun
        L->>L: запись recent (outcome=dry), выход
    else live
        L->>An: POST /api/games (Bearer)
        An-->>L: 201 / 200 / 422 / прочее
        L->>L: outcomeForStatus → ingested/existed/rejected/error
    end

Пошагово:

  1. getMoveHistory(jwt, id)GET /api/game-move-history?gameId=…, канонический gameMoveHistoryStateMap (src/dicechess.ts:58).
  2. Архивируем сырой бандл ПЕРВЫМ. archive.write(buildRawBundle(game, hist)). Комментарий в коде: «Persist the raw bundle BEFORE any normalization: robust to normalizer bugs and to the archive being unreachable (spools locally). Never throws.» Бандл — это два артефакта as-is: discovery_meta (ActiveGame из /active) и move_history (MoveHistoryResponse), без нормализации (src/bundle.ts:14). Подробнее про bronze-слой — Raw-архив — bronze-слой.
  3. Отсев пустых. Если stateMap отсутствует или пуст → skipped, выход.
  4. normalizeStateMap(sm, { allowDoubling }) (src/normalize.ts:157). Превращает «грязную» state-машину сайта в turns/events/result/termination + выведенные цвета. Подробности — §5.
  5. Отсев фантомных партий. Если turns.length === 0 (партия оборвалась так, что не сложилось ни одного полноценного хода) → skipped с уже определённым termination.
  6. assembleGameIngest(game, norm) (src/assemble.ts:21) — сшивает discovery-метаданные с нормализованной партией в проводной GameIngestWire (snake_case).
  7. Ветка dry-run. Если DRY_RUN включён — нормализовали, собрали, записали в recent с outcome=dry, но не постим.
  8. ingestGame(payload)POST /api/games на аналитику (src/ingest.ts). Это обычный Node fetch: аналитика в LAN, без Cloudflare. Маппинг ответа outcomeForStatus (src/observer.ts:64):
HTTPoutcomeсчётчиксмысл
201createdingestedновая строка создана
200existsexistedпартия уже была (first-writer-wins)
422rejectedrejectedреплей-гейт движка отверг (см. 07 Контракт ingest и валидация движком)
прочееerrorerrorsсетевой/серверный сбой

При rejected/error observer вытаскивает detail из JSON-тела ответа аналитики и логирует его — это диагностический хвост для разбора 422.


4. Нормализация (кратко) — что делает normalizeStateMap

Полная теория конвейера — на отдельной странице

Идея нормализации и машина состояний партии общие у observer и sync, разобраны в 05 Конвейер партий и состояния. Здесь — только специфика наблюдателя.

gameMoveHistoryStateMap — это пронумерованная карта снимков (FEN + кости + часы + дельта-ход). Индексы могут идти с пропусками, поэтому весь код ходит по отсортированным ключам, а не по сырым целочисленным индексам.

  • Ходы и микро-ходы. Один ход (turn) = состояние «новый бросок костей» (есть кости, нет хода, регистр костей совпадает с активным цветом, кости изменились) + до трёх микро-ходов. Кость кодируется буквой фигуры, регистр = цвет (P/N/B/R/Q/K белые, строчные — чёрные); число: p=1 … k=6. Микро-ход собирается в UCI (b1c3, промоушн → e7e8q).
  • Вывод цвета (inferColors). Сайт не говорит, кто белыми. Цвет выводится корреляцией убывания часов (leftTime) с активным цветом позиции: чьи часы тикали на ходу белых — тот и белые. Это даёт whitePlayerId/blackPlayerId (id игроков). Если корреляция ни с кем не совпала — assemble откатывается на «creator = white» (src/assemble.ts:28).
  • Termination и result. Это то, что observer добавил поверх старого Python-ETL (тот оставлял unknown). Дерево решений (src/normalize.ts:244):
    • bK == 0king_captured, result = 1 (белые выиграли); wK == 0result = -1;
    • чей-то leftTime <= 0timeout, проигрывает таймаут-игрок;
    • хвост «нет хода, все кости заблокированы, кости не менялись» → double_declined (отказ от удвоения);
    • хвост «нет хода, тот же FEN, те же кости, часы тикают» → draw_agreement (result = 0);
    • иначе оба короля живы, сигналов нет → resign, result = null.
  • События удвоения и ничьи. Принятое удвоение детектируется по скачку bank (общий пот) → DOUBLE_OFFER + DOUBLE_ACCEPT(payload.bank). Отказ → DOUBLE_OFFER + DOUBLE_DECLINE. Согласие на ничью → DRAW_OFFER + DRAW_ACCEPT.

result у observer выводится из доски, а не из метаданных

В отличие от бэкфилла, observer не имеет авторитетного результата из player/history — он выводит исход из последней позиции. Поэтому resign остаётся с result = null (по доске победителя не видно). Аналитика заполнит это позже, когда тот же gameId дойдёт через sync (POV игрока → POV белых). См. урок про авторитетный результат в 03 Синхронизация — обзор и архитектура.


5. Архив-сток (raw archive sink)

Перед нормализацией каждый завершённый бандл уходит в bronze-архив. За это отвечает ArchiveWriter (src/archiveSink.ts), и он построен так, чтобы никогда не уронить цикл наблюдения из-за архивации.

  • write(bundle) пытается archive.put(bundle) (вставка в Postgres raw-архива на dexus). При любой ошибке — бандл спулится на диск (writeFile в ARCHIVE_SPOOL_DIR, имя source__externalId.json), а сам write() ничего не бросает наверх. Даже если и спул не удался — только лог, цикл живёт.
  • drain() вызывается в конце каждой итерации цикла: читает .json-файлы из спула, по каждому putunlink. Лимит maxPerDrain = 500 файлов за проход — большой backlog после длинного простоя архива не блокирует poll-loop, остаток дольётся за следующие проходы.
  • Карантин битых файлов. Нечитаемый/повреждённый спул-файл переименовывается в *.json.bad (rename), чтобы не перечитывать его каждый проход. Если архив всё ещё недоступен — drain прерывается (break) и повторит на следующей итерации.
  • noopSink — когда ARCHIVE_DB_URL не задан, createArchiveSink() возвращает no-op: observer ведёт себя ровно как раньше, без архивации.

Сейчас архивация в live-деплое выключена

В текущем деплое observer ARCHIVE_DB_URL не выставлен → работает noopSink, raw-архивация отключена. Это известная дыра: live-партии существуют только в памяти процесса до залива в аналитику; сырьё для них пока нигде не оседает. Включение bronze-слоя для observer — отдельный план, см. Raw-архив — bronze-слой.

flowchart TD
    W["write(bundle)"] --> P{"archive.put OK?"}
    P -->|"да"| OK["в Postgres bronze (dexus)"]
    P -->|"ошибка"| SP["spool → ARCHIVE_SPOOL_DIR/<br/>source__externalId.json"]
    D["drain() каждую итерацию"] --> R["readdir *.json<br/>(до maxPerDrain=500)"]
    R --> J{"JSON парсится?"}
    J -->|"нет"| Q["rename → *.json.bad (карантин)"]
    J -->|"да"| PUT{"put OK?"}
    PUT -->|"да"| U["unlink (залито)"]
    PUT -->|"нет"| BR["break — повтор на след. проходе"]

6. Идемпотентность и идентичность

Дедупликацию и идентичность делает аналитика, не observer

observer постит «оптимистично» — он не проверяет заранее, есть ли партия. Идемпотентность обеспечивает аналитика по правилу first-writer-wins: один и тот же gameId (UUID) могут писать observer, sync и extension; кто первый создал строку — тот и определил её (и идентичности игроков), повторный POST получает 200 и не перезаписывает. Это и есть гонка observer ↔ extension вокруг bot:<algorithm> против хост-аккаунта. Полностью разобрано в 08 Идентичность, источники и дедупликация.

Идентичность игроков observer формирует в makePlayer (src/assemble.ts:11):

  • external_id = нативный userId сайта (строкой);
  • player_type = "bot", если userId < 0, иначе "human"по знаку id (сайт-боты имеют отрицательный id);
  • плюс username и rating игрока.

source = "dicechess.com" — это поле уровня партии, его ставит не makePlayer, а assembleGameIngest (src/assemble.ts), а не на каждого игрока.

Сайт-ботов НЕ переименовывать в bot:<algorithm>

Нативный отрицательный id сам по себе является канонической идентичностью сайт-бота. Синтетическую схему bot:<algorithm> использует только наш собственный движок, заливаемый через dicechess-extension. observer обязан сохранять нативный id как есть — см. 08 Идентичность, источники и дедупликация.


7. Конфигурация

Все переменные читаются в src/config.ts (через dotenv).

ПеременнаяДефолтНазначение
DICECHESS_JWT— (обязательно)Bearer-JWT из залогиненного браузера (живёт ~месяцы). Без него сервис не стартует.
ANALYTICS_BASE_URLhttp://192.168.10.3:8020База аналитики (aurora) для POST /api/games.
ANALYTICS_INGEST_TOKEN""Bearer-токен ingest-эндпоинта аналитики.
POLL_INTERVAL_MS15000Период опроса /api/games/active.
DRY_RUNfalse1/true → нормализуем и логируем, но не постим в аналитику.
MAX_RUNTIME_MS0Остановить цикл через N мс (0 = бесконечно); для тестов/CLI.
PORT8040Порт HTTP-панели управления.
AUTO_STARTfalse1/true → начать наблюдение сразу на старте сервиса.
ARCHIVE_DB_URL""Postgres bronze-архива (dexus). Пусто → архивация = no-op. Обобщённо: postgres://rawarchive:…@192.168.10.4:5433/….
ARCHIVE_SPOOL_DIR./.archive-spoolЛокальный спул-каталог для бандлов, не доехавших до архива.

Транспорт

Запросы к dicechess.com идут через curl-subprocess (execFile), а не через Node fetch: Cloudflare фингерпринтит TLS, и undici-fetch ловит 403 «managed challenge» даже с чистого residential IP. Запросы к аналитике (LAN) — обычный fetch. Общая транспортная механика — 06 Защита от блокировок и Cloudflare.


8. Панель управления (control panel)

Сервис (src/service.ts) поднимает HTTP-сервер (src/server.ts) на порту 8040 и держит единственный экземпляр Observer. Сервис стартует в простое (idle) — наблюдение запускается кнопкой на панели (или сразу, если AUTO_START). На SIGINT/SIGTERM — graceful stop (даёт текущей итерации доиграть) + dispose() (закрытие пула архива).

Сама панель (GET /) — самодостаточная HTML/CSS/JS-страница, опрашивающая API каждые 2.5 с: показывает статус (Running/Idle), бейдж режима (LIVE/DRY-RUN), счётчики (live now, finished, created/exists/rejected/errors/skipped, polls), разбивку by termination и таблицу последних 30 партий (RECENT_LIMIT).

Метод/путьНазначение
GET /Панель управления (HTML).
GET /api/healthHealth-check ({ ok: true }).
GET /api/statusСостояние: running, dryRun, pollIntervalMs, analyticsBaseUrl, jwtConfigured, startedAt, lastPollAt, liveGames, uptimeMs.
GET /api/statsПолная статистика Stats (счётчики + byTermination + recent).
POST /api/startЗапустить цикл. Тело { "dryRun": bool } переопределяет режим. Идемпотентно.
POST /api/stopGraceful stop.

Stats (src/observer.ts:24) — целиком in-memory: при перезапуске сервиса счётчики обнуляются (ещё одно проявление отсутствия долговечного состояния).


Связанные страницы