🛰️ Live-наблюдатель (dicechess-observer)
Наблюдатель (observer) — это реактивная половина конвейера добычи: он смотрит за партиями, которые идут на dicechess.com прямо сейчас, ловит момент их завершения и тут же нормализует и заливает результат в аналитику. Это зеркальная противоположность бэкфилл-краулера синхронизации: тот перечисляет всю историю задним числом, observer же видит только то, что произошло, пока он был запущен.
Репозиторий: dicechess-observer (Node/TypeScript, исполняется на ESM-нативном Node, файлы импортируются с расширением .ts). Деплой — долгоживущий сервис на rpi4 (192.168.10.9, residential IP, egress через Cloudflare).
1. Что такое observer и чем он не sync
Одно ядро, две стратегии
И observer, и sync скачивают канонический
game-move-history, прогоняют его через одну и ту же идею нормализации и постят в один контракт ingest. Различается только то, как они узнают, какие партии забирать.
| 🛰️ Observer | 🕸️ Sync (бэкфилл) | |
|---|---|---|
| Модель | реактивная (live) | проактивная (исторический обход) |
| Источник id партий | POST /api/games/active (что идёт сейчас) | граф игроков + POST /api/player/history |
| Перечисление / спайдер | нет | да (BFS по оппонентам от лидербордов) |
| Память о «виденных» | in-memory Map, без долговечного курсора | SQLite-состояние, резюмируемые курсоры |
| Видимость | только партии, идущие во время работы | вся история сайта |
| Узкое горлышко rate-limit | практически нет (game-move-history свободен) | player/history (см. 06 Защита от блокировок и Cloudflare) |
| Запуск | долгоживущий сервис с панелью | CLI-команды + демон |
Ключевое следствие реактивности: у observer нет долговечного состояния. Множество «известных» партий (known) живёт только в оперативной памяти (Map<string, ActiveGame>, src/observer.ts:76). Перезапустили процесс — забыли всё, что было в работе; партии, завершившиеся в окно простоя, observer не увидит (их потом подберёт бэкфилл-sync). Это сознательный компромисс: observer оптимизирован под низкую задержку «партия закончилась → строка в аналитике», а полнота — забота краулера.
2. Цикл опроса (poll loop)
Сердце сервиса — метод Observer.loop() (src/observer.ts:122). Каждую итерацию:
- Опрашиваем активные партии.
getActiveGames(jwt)→POST /api/games/active. Тело запроса фильтрует по диапазону ставок (startBets) и допустимым контролям времени (allowedTimes), сортирует поBET_AMOUNT(src/dicechess.ts:12). - Сливаем в
known. Каждую живую партию кладём/обновляем вMapпоid:for (const g of games) this.known.set(g.id, g). - Считаем разницу множеств (set-difference).
finishedIds(known.keys(), liveIds)(src/observer.ts:59) возвращает те id, что есть вknown, но больше нет в живом списке — это и есть «партии, завершившиеся с прошлого опроса». - Обрабатываем каждую завершённую через
processFinished(game), попутно удаляя её изknown(this.known.delete(id)). - Сливаем спул-архив. В конце каждой итерации —
archive.drain(): дозаливаем в bronze-архив бандлы, которые ранее не доехали (см. §4). - Спим
POLL_INTERVAL_MS(по умолчанию 15 000 мс) и повторяем.
flowchart TD A["POST /api/games/active"] --> B["liveIds = Set(g.id)"] B --> C["known.set(id, game)<br/>для каждой живой"] C --> D{"finishedIds:<br/>id ∈ known, id ∉ liveIds"} D -->|"для каждой завершённой"| E["known.delete(id)<br/>processFinished(game)"] D -->|"нет завершённых"| F E --> F["archive.drain()<br/>дозалив спула"] F --> G["sleep POLL_INTERVAL_MS (15s)"] G --> A
Почему set-difference, а не флаг состояния
Сайт не уведомляет о завершении — он просто перестаёт возвращать партию в
/api/games/active. «Исчезла из живого списка» = «закончилась». Поэтому observer обязан помнить, что он уже видел живым: id, который был вknownи пропал, и есть сигнал завершения. Если процесс не работал в момент исчезновения — сигнала не будет.
Ошибка опроса (например, сетевой сбой или 403 от Cloudflare) не роняет цикл: она ловится, инкрементит stats.errors и логируется, цикл продолжается со следующей итерации.
3. Обработка завершённой партии (processFinished)
processFinished(game) (src/observer.ts:149) — строго упорядоченная цепочка. Порядок шагов важен: сырьё архивируется раньше нормализации, чтобы баг нормализатора или недоступность аналитики не привели к потере данных.
sequenceDiagram participant L as loop participant S as dicechess.com participant A as ArchiveWriter (dexus) participant N as normalizeStateMap participant As as assembleGameIngest participant An as analytics (aurora) L->>S: GET /api/game-move-history?gameId S-->>L: gameMoveHistoryStateMap Note over L,A: 1. СНАЧАЛА сырьё L->>A: write(buildRawBundle(game, hist)) Note right of A: write() НИКОГДА не бросает;<br/>при сбое — спул на диск L->>L: пустой stateMap? → skipped, выход L->>N: normalizeStateMap(sm, {allowDoubling}) N-->>L: turns, events, result, termination, цвета L->>L: turns.length == 0? → skipped, выход L->>As: assembleGameIngest(game, norm) As-->>L: GameIngestWire (snake_case) alt dryRun L->>L: запись recent (outcome=dry), выход else live L->>An: POST /api/games (Bearer) An-->>L: 201 / 200 / 422 / прочее L->>L: outcomeForStatus → ingested/existed/rejected/error end
Пошагово:
getMoveHistory(jwt, id)→GET /api/game-move-history?gameId=…, каноническийgameMoveHistoryStateMap(src/dicechess.ts:58).- Архивируем сырой бандл ПЕРВЫМ.
archive.write(buildRawBundle(game, hist)). Комментарий в коде: «Persist the raw bundle BEFORE any normalization: robust to normalizer bugs and to the archive being unreachable (spools locally). Never throws.» Бандл — это два артефакта as-is:discovery_meta(ActiveGameиз/active) иmove_history(MoveHistoryResponse), без нормализации (src/bundle.ts:14). Подробнее про bronze-слой — Raw-архив — bronze-слой. - Отсев пустых. Если
stateMapотсутствует или пуст →skipped, выход. normalizeStateMap(sm, { allowDoubling })(src/normalize.ts:157). Превращает «грязную» state-машину сайта вturns/events/result/termination+ выведенные цвета. Подробности — §5.- Отсев фантомных партий. Если
turns.length === 0(партия оборвалась так, что не сложилось ни одного полноценного хода) →skippedс уже определённымtermination. assembleGameIngest(game, norm)(src/assemble.ts:21) — сшивает discovery-метаданные с нормализованной партией в проводнойGameIngestWire(snake_case).- Ветка dry-run. Если
DRY_RUNвключён — нормализовали, собрали, записали вrecentсoutcome=dry, но не постим. ingestGame(payload)→POST /api/gamesна аналитику (src/ingest.ts). Это обычный Nodefetch: аналитика в LAN, без Cloudflare. Маппинг ответаoutcomeForStatus(src/observer.ts:64):
| HTTP | outcome | счётчик | смысл |
|---|---|---|---|
201 | created | ingested | новая строка создана |
200 | exists | existed | партия уже была (first-writer-wins) |
422 | rejected | rejected | реплей-гейт движка отверг (см. 07 Контракт ingest и валидация движком) |
| прочее | error | errors | сетевой/серверный сбой |
При rejected/error observer вытаскивает detail из JSON-тела ответа аналитики и логирует его — это диагностический хвост для разбора 422.
4. Нормализация (кратко) — что делает normalizeStateMap
Полная теория конвейера — на отдельной странице
Идея нормализации и машина состояний партии общие у observer и sync, разобраны в 05 Конвейер партий и состояния. Здесь — только специфика наблюдателя.
gameMoveHistoryStateMap — это пронумерованная карта снимков (FEN + кости + часы + дельта-ход). Индексы могут идти с пропусками, поэтому весь код ходит по отсортированным ключам, а не по сырым целочисленным индексам.
- Ходы и микро-ходы. Один ход (turn) = состояние «новый бросок костей» (есть кости, нет хода, регистр костей совпадает с активным цветом, кости изменились) + до трёх микро-ходов. Кость кодируется буквой фигуры, регистр = цвет (
P/N/B/R/Q/Kбелые, строчные — чёрные); число:p=1 … k=6. Микро-ход собирается в UCI (b1c3, промоушн →e7e8q). - Вывод цвета (
inferColors). Сайт не говорит, кто белыми. Цвет выводится корреляцией убывания часов (leftTime) с активным цветом позиции: чьи часы тикали на ходу белых — тот и белые. Это даётwhitePlayerId/blackPlayerId(id игроков). Если корреляция ни с кем не совпала —assembleоткатывается на «creator = white» (src/assemble.ts:28). - Termination и result. Это то, что observer добавил поверх старого Python-ETL (тот оставлял
unknown). Дерево решений (src/normalize.ts:244):bK == 0→king_captured,result = 1(белые выиграли);wK == 0→result = -1;- чей-то
leftTime <= 0→timeout, проигрывает таймаут-игрок; - хвост «нет хода, все кости заблокированы, кости не менялись» →
double_declined(отказ от удвоения); - хвост «нет хода, тот же FEN, те же кости, часы тикают» →
draw_agreement(result = 0); - иначе оба короля живы, сигналов нет →
resign,result = null.
- События удвоения и ничьи. Принятое удвоение детектируется по скачку
bank(общий пот) →DOUBLE_OFFER+DOUBLE_ACCEPT(payload.bank). Отказ →DOUBLE_OFFER+DOUBLE_DECLINE. Согласие на ничью →DRAW_OFFER+DRAW_ACCEPT.
result у observer выводится из доски, а не из метаданных
В отличие от бэкфилла, observer не имеет авторитетного результата из
player/history— он выводит исход из последней позиции. Поэтомуresignостаётся сresult = null(по доске победителя не видно). Аналитика заполнит это позже, когда тот жеgameIdдойдёт через sync (POV игрока → POV белых). См. урок про авторитетный результат в 03 Синхронизация — обзор и архитектура.
5. Архив-сток (raw archive sink)
Перед нормализацией каждый завершённый бандл уходит в bronze-архив. За это отвечает ArchiveWriter (src/archiveSink.ts), и он построен так, чтобы никогда не уронить цикл наблюдения из-за архивации.
write(bundle)пытаетсяarchive.put(bundle)(вставка в Postgres raw-архива на dexus). При любой ошибке — бандл спулится на диск (writeFileвARCHIVE_SPOOL_DIR, имяsource__externalId.json), а самwrite()ничего не бросает наверх. Даже если и спул не удался — только лог, цикл живёт.drain()вызывается в конце каждой итерации цикла: читает.json-файлы из спула, по каждомуput→unlink. ЛимитmaxPerDrain = 500файлов за проход — большой backlog после длинного простоя архива не блокирует poll-loop, остаток дольётся за следующие проходы.- Карантин битых файлов. Нечитаемый/повреждённый спул-файл переименовывается в
*.json.bad(rename), чтобы не перечитывать его каждый проход. Если архив всё ещё недоступен —drainпрерывается (break) и повторит на следующей итерации. noopSink— когдаARCHIVE_DB_URLне задан,createArchiveSink()возвращает no-op: observer ведёт себя ровно как раньше, без архивации.
Сейчас архивация в live-деплое выключена
В текущем деплое observer
ARCHIVE_DB_URLне выставлен → работаетnoopSink, raw-архивация отключена. Это известная дыра: live-партии существуют только в памяти процесса до залива в аналитику; сырьё для них пока нигде не оседает. Включение bronze-слоя для observer — отдельный план, см. Raw-архив — bronze-слой.
flowchart TD W["write(bundle)"] --> P{"archive.put OK?"} P -->|"да"| OK["в Postgres bronze (dexus)"] P -->|"ошибка"| SP["spool → ARCHIVE_SPOOL_DIR/<br/>source__externalId.json"] D["drain() каждую итерацию"] --> R["readdir *.json<br/>(до maxPerDrain=500)"] R --> J{"JSON парсится?"} J -->|"нет"| Q["rename → *.json.bad (карантин)"] J -->|"да"| PUT{"put OK?"} PUT -->|"да"| U["unlink (залито)"] PUT -->|"нет"| BR["break — повтор на след. проходе"]
6. Идемпотентность и идентичность
Дедупликацию и идентичность делает аналитика, не observer
observer постит «оптимистично» — он не проверяет заранее, есть ли партия. Идемпотентность обеспечивает аналитика по правилу first-writer-wins: один и тот же
gameId(UUID) могут писать observer, sync и extension; кто первый создал строку — тот и определил её (и идентичности игроков), повторныйPOSTполучает200и не перезаписывает. Это и есть гонка observer ↔ extension вокругbot:<algorithm>против хост-аккаунта. Полностью разобрано в 08 Идентичность, источники и дедупликация.
Идентичность игроков observer формирует в makePlayer (src/assemble.ts:11):
external_id= нативныйuserIdсайта (строкой);player_type="bot", еслиuserId < 0, иначе"human"— по знаку id (сайт-боты имеют отрицательный id);- плюс
usernameиratingигрока.
source = "dicechess.com" — это поле уровня партии, его ставит не makePlayer, а assembleGameIngest (src/assemble.ts), а не на каждого игрока.
Сайт-ботов НЕ переименовывать в bot:<algorithm>
Нативный отрицательный id сам по себе является канонической идентичностью сайт-бота. Синтетическую схему
bot:<algorithm>использует только наш собственный движок, заливаемый черезdicechess-extension. observer обязан сохранять нативный id как есть — см. 08 Идентичность, источники и дедупликация.
7. Конфигурация
Все переменные читаются в src/config.ts (через dotenv).
| Переменная | Дефолт | Назначение |
|---|---|---|
DICECHESS_JWT | — (обязательно) | Bearer-JWT из залогиненного браузера (живёт ~месяцы). Без него сервис не стартует. |
ANALYTICS_BASE_URL | http://192.168.10.3:8020 | База аналитики (aurora) для POST /api/games. |
ANALYTICS_INGEST_TOKEN | "" | Bearer-токен ingest-эндпоинта аналитики. |
POLL_INTERVAL_MS | 15000 | Период опроса /api/games/active. |
DRY_RUN | false | 1/true → нормализуем и логируем, но не постим в аналитику. |
MAX_RUNTIME_MS | 0 | Остановить цикл через N мс (0 = бесконечно); для тестов/CLI. |
PORT | 8040 | Порт HTTP-панели управления. |
AUTO_START | false | 1/true → начать наблюдение сразу на старте сервиса. |
ARCHIVE_DB_URL | "" | Postgres bronze-архива (dexus). Пусто → архивация = no-op. Обобщённо: postgres://rawarchive:…@192.168.10.4:5433/…. |
ARCHIVE_SPOOL_DIR | ./.archive-spool | Локальный спул-каталог для бандлов, не доехавших до архива. |
Транспорт
Запросы к dicechess.com идут через
curl-subprocess (execFile), а не через Nodefetch: Cloudflare фингерпринтит TLS, и undici-fetchловит 403 «managed challenge» даже с чистого residential IP. Запросы к аналитике (LAN) — обычныйfetch. Общая транспортная механика — 06 Защита от блокировок и Cloudflare.
8. Панель управления (control panel)
Сервис (src/service.ts) поднимает HTTP-сервер (src/server.ts) на порту 8040 и держит единственный экземпляр Observer. Сервис стартует в простое (idle) — наблюдение запускается кнопкой на панели (или сразу, если AUTO_START). На SIGINT/SIGTERM — graceful stop (даёт текущей итерации доиграть) + dispose() (закрытие пула архива).
Сама панель (GET /) — самодостаточная HTML/CSS/JS-страница, опрашивающая API каждые 2.5 с: показывает статус (Running/Idle), бейдж режима (LIVE/DRY-RUN), счётчики (live now, finished, created/exists/rejected/errors/skipped, polls), разбивку by termination и таблицу последних 30 партий (RECENT_LIMIT).
| Метод/путь | Назначение |
|---|---|
GET / | Панель управления (HTML). |
GET /api/health | Health-check ({ ok: true }). |
GET /api/status | Состояние: running, dryRun, pollIntervalMs, analyticsBaseUrl, jwtConfigured, startedAt, lastPollAt, liveGames, uptimeMs. |
GET /api/stats | Полная статистика Stats (счётчики + byTermination + recent). |
POST /api/start | Запустить цикл. Тело { "dryRun": bool } переопределяет режим. Идемпотентно. |
POST /api/stop | Graceful stop. |
Stats (src/observer.ts:24) — целиком in-memory: при перезапуске сервиса счётчики обнуляются (ещё одно проявление отсутствия долговечного состояния).
Связанные страницы
- 03 Синхронизация — обзор и архитектура — два режима добычи, medallion, сквозной путь партии.
- 05 Конвейер партий и состояния — общая идея нормализации и машина состояний партии.
- 06 Защита от блокировок и Cloudflare — curl-транспорт, Cloudflare, rate-limit (общее с sync).
- 07 Контракт ingest и валидация движком — контракт
POST /api/games, смысл 422. - 08 Идентичность, источники и дедупликация —
external_id/source/player_type, first-writer-wins,bot:<algorithm>. - Raw-архив — bronze-слой — bronze-архив сырья на dexus.
- Beturanga — второй источник — второй сайт (Socket.IO), отличия.