Catalog Service — Events

Топики — сводка

TopicRoleRetentionКонсьюмеры
catalog.product.upsertedProducer7 днейPaykeeper Adapter (BR 3.4)
catalog.product.deletedProducer7 днейPaykeeper Adapter (BR 3.4), self (orphan-каскад в external_menu_items, BR 4.1)
catalog.modifier_group.upsertedProducer7 днейPaykeeper Adapter (BR 3.4)
catalog.modifier_group.deletedProducer7 днейPaykeeper Adapter (BR 3.4)
external_menu.updatedProducer3 дняmenu-renderer / WebSocket gateway (BR 4.1), Aggregator Service для invalidate snapshot (BR 4.2)
catalog.kds_settings.updatedProducer3 дня(P1) pos-bff — broadcast в WebSocket KDS-подписчикам для re-pull настроек (BR 5.1)
catalog.stoplist.updatedProducer7 днейpos-bff — broadcast SSE menu.invalidate в POS Desktop (SSE-bridge)

Почему нет catalog.category.*

PayKeeper ims-api не имеет сущности «категория» — каталог там flat-список. Категории в ERP используются только для формирования префикса имени товара на стороне консьюмера ("Кофе / Капучино"). При изменении категории мы не отдельно уведомляем — консьюмер получает свежее название категории через catalog.product.upserted для каждого затронутого товара (Catalog Service каскадно переопубликует события товаров этой категории).

Naming convention

.upserted — единое событие для create и update (консьюмер делает upsert по entity_id). Отдельного .created/.updated не вводим.


Публикуем

catalog.product.upserted

Публикуется при создании товара, любом изменении его полей, изменении его категории (если меняется prefix), или каскадно при изменении категории этого товара / переименовании группы модификаторов, на которую он ссылается.

Topic: catalog.product.upserted

Ключ Kafka: product_id.

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "franchise_id": "uuid",
    "product_id": "uuid",
    "name": "string",
    "category_path": "string | null",
    "base_price": "decimal",
    "vat_rate": "string",
    "unit_of_measure": "string",
    "is_marked": "boolean",
    "is_open_price": "boolean",
    "is_by_weight": "boolean",
    "is_alcohol": "boolean",
    "modifier_group_ids": ["uuid"],
    "deleted_at": null
  }
}

category_path — готовый префикс имени от корня через /, например "Кофе / Холодное". Формирует Catalog Service из дерева категорий — консьюмер не должен резолвить иерархию сам.

Консьюмеры:

  • Paykeeper Adapter — вычисляет полный список виртуальных PK-продуктов по правилу развёртывания (спека) и делает upsert каждого через ims-api.

catalog.product.deleted

Публикуется при soft-delete товара (проставлен deleted_at).

Topic: catalog.product.deleted

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "franchise_id": "uuid",
    "product_id": "uuid",
    "deleted_at": "datetime"
  }
}

Консьюмеры:

  • Paykeeper Adapter — помечает ВСЕ paykeeper_products этого erp_product_id как status=deleted и вызывает DELETE /product/{pk_id} в PK по каждому.

catalog.modifier_group.upserted

Публикуется при создании/правке группы модификаторов или её опций (атомарное событие на уровне всей группы).

Topic: catalog.modifier_group.upserted

Ключ Kafka: modifier_group_id.

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "franchise_id": "uuid",
    "modifier_group_id": "uuid",
    "group_name": "string",
    "binding_type": "structural | free",
    "min": "integer",
    "max": "integer",
    "options": [
      {
        "id": "uuid",
        "name": "string",
        "price": "decimal | null",
        "sku_1c": "string | null"
      }
    ],
    "referenced_by_product_ids": ["uuid"]
  }
}

referenced_by_product_ids — список товаров этой франшизы которые ссылаются на эту группу. Нужен консьюмеру чтобы понять какие товары переразвернуть в PK (у каждого изменится набор variants/addons).

(sku_1c добавлено в BR 1.17) — код номенклатуры 1С для опции. Заполнен только для опций structural-мода. Консьюмеры могут использовать для отдельной фискальной/учётной логики (выгрузка в 1С Общепит — отдельной BR).

Консьюмеры:

  • Paykeeper Adapter — для каждого product_id из списка: собирает актуальный набор виртуальных PK-продуктов, диффит против paykeeper_products этого account’а × erp_product_id, кладёт upsert/delete в outbox.

catalog.modifier_group.deleted

Публикуется при удалении группы модификаторов (или открепления от всех товаров).

Topic: catalog.modifier_group.deleted

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "franchise_id": "uuid",
    "modifier_group_id": "uuid",
    "deleted_at": "datetime",
    "referenced_by_product_ids": ["uuid"]
  }
}

Консьюмеры: Paykeeper Adapter — удаляет соответствующие variant/addon записи в PK и переразворачивает затронутые товары.

external_menu.updated (BR 4.1)

Публикуется при любом изменении external_menu, его категорий или items, включая публикацию/снятие/удаление/восстановление. Используется для live-обновления рекламных мониторов и (в BR 4.2) для инвалидации кэша в Aggregator Service.

Topic: external_menu.updated

Ключ Kafka: external_menu_id

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "franchise_id": "uuid",
    "external_menu_id": "uuid",
    "channel": "tv_screen | json | yandex_eda | koala",
    "store_id": "uuid | null",
    "slug": "string | null",
    "status": "draft | published | archived",
    "change_type": "content_changed | published | unpublished | archived | restored | settings_changed | item_orphaned",
    "affected_item_ids": ["uuid"]
  }
}

Поля:

  • change_type — тип изменения, помогает консьюмеру выбрать стратегию (например, при archived сразу разорвать WebSocket-соединения; при content_changed отправить delta или попросить refetch)
  • affected_item_ids — IDs items затронутых данным изменением (опционально, для частичного refresh; при крупных изменениях вроде публикации может быть пустым)

Консьюмеры:

  • menu-renderer / WebSocket gateway (BR 4.1) — пушит сигнал «refetch» подключённым клиентам live URL
  • Aggregator Service (BR 4.2) — инвалидирует menu_snapshots для bindings с external_menu_id = X

Когда публикуется:

Действиеchange_type
Создание меню(не публикуется до первого опубликования)
Публикация (draft → published)published
Снятие с публикацииunpublished
Soft delete (→ archived)archived
Восстановление из архиваrestored
Изменение настроек меню (имя, шаблон, slug)settings_changed
Добавление / удаление / правка item, overridecontent_changed (только для published-меню)
Каскад orphan при удалении товара в каталогеitem_orphaned

Не публикуется для draft

События content_changed для меню в статусе draft НЕ публикуются — нет смысла обновлять никого, меню никому не отдаётся. Исключение: published — публикуем сразу при переходе.

catalog.stoplist.updated

Публикуется при добавлении/снятии товара или категории в стоп-лист на конкретной ТТ. Используется pos-bff для realtime-инвалидации меню на POS-терминалах (Kafka → SSE → EventSource в pos-desktop).

Topic: catalog.stoplist.updated

Ключ Kafka: store_id

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "franchise_id": "uuid",
    "store_id": "uuid",
    "entity_type": "product | category",
    "entity_id": "uuid",
    "is_stopped": "boolean"
  }
}

Поля:

  • entity_typeproduct или category
  • is_stoppedtrue при добавлении в стоп-лист, false при снятии

Консьюмеры:

  • pos-bff (group pos-bff-sse-${HOSTNAME}) — broadcastToStore(store_id, menu.invalidate) → POS Desktop вызывает menuStore.checkForUpdates() → snapshot diff → мигалка «Меню изменилось»

Цены товаров — без отдельного события

Изменение цены через batch-update прейскуранта триггерит productService.republishProducts(affectedProductIds), который публикует catalog.product.upserted для каждого затронутого товара. pos-bff подписан и на этот топик — отдельный catalog.pricelist.updated не нужен.

catalog.kds_settings.updated (BR 5.1)

Публикуется при изменении настроек KDS франшизы — kds_franchise_settings (звуки, авто-логаут) или порогов цвета на kitchen_stations. В P0 consumer не реализован (KDS делает pull при перезапуске/«Применить»). В P1 consumer = pos-bff для live-push в WebSocket KDS-подписчикам.

Topic: catalog.kds_settings.updated

Ключ Kafka: franchise_id

{
  "event_id": "uuid",
  "event_type": "catalog.kds_settings.updated",
  "timestamp": "datetime",
  "version": 1,
  "source": "catalog-service",
  "payload": {
    "franchise_id": "uuid",
    "kind": "settings | station_thresholds",
    "kitchen_station_id": "uuid | null",
    "updated_at": "datetime"
  }
}

Поля:

  • kind=settings — изменились franchise-level настройки (звуки/громкость/auto_logout) → KDS-устройства должны re-pull GET /admin/kds/settings
  • kind=station_thresholds — изменились yellow/red_threshold_minutes конкретной станции → re-pull GET /kitchen-stations
  • kitchen_station_id — заполнен только для kind=station_thresholds

Консьюмеры:

  • (P1) pos-bff — broadcast WS на устройства этой franchise_id с типом события «settings_changed», KDS делает re-pull
  • (P0) никто — событие публикуется в топик, но не консьюмится. Достаточно для аудита и будущего live-push.

Каскадная публикация

Чтобы консьюмеры не резолвили зависимости самостоятельно, Catalog Service каскадно перепубликует catalog.product.upserted для всех затронутых товаров в следующих случаях:

  • Переименование категории → перепубликация всех товаров этой категории + всех товаров дочерних категорий (меняется category_path).
  • Перемещение товара в другую категориюcatalog.product.upserted с новым category_path.
  • Удаление категории (если бизнес-правила позволяют) → товары переназначаются на родительскую категорию и перепубликуются.

Это даёт консьюмерам единообразный API: «я получил событие товара — у меня полная актуальная картина для этого товара».


Потребляем

catalog.product.deleted (self-consume) (BR 4.1)

Catalog Service сам потребляет собственное событие удаления товара чтобы каскадно обработать orphan-логику в external_menu_items:

  • Найти все external_menu_items WHERE product_id = X
  • Установить status = 'orphan'
  • Опубликовать external_menu.updated с change_type = 'item_orphaned' для каждого затронутого external_menu

Альтернатива — обработка inline в ProductService.deleteProduct() через app-логику. Self-consume через Kafka выбран для:

  1. Единообразия — соответствует тому как Paykeeper Adapter обрабатывает удаления
  2. Гарантии — если ProductService падает после Kafka publish, retry достанется

Потенциальные будущие события (не в скоупе)

Если PK реализует reverse webhook (изменение цены в ЛК PK) — потребуется consume catalog.pk_upstream_changed от Paykeeper Adapter. См. PK-summary-answers §6.


Обёртка события

Все события следуют единому формату:

{
  "event_id": "uuid",
  "timestamp": "ISO-8601 UTC",
  "version": 1,
  "payload": { ... }
}

payload.franchise_idобязательное поле. Используется консьюмерами для scope resolution.


Надёжность доставки

Publisher-side: transactional outbox (таблица catalog_outbox). Сервис атомарно коммитит UPDATE products + INSERT INTO catalog_outbox, отдельный воркер публикует в Kafka с retry-backoff.

Consumer-side: дедуп по event_id (Paykeeper Adapter сохраняет event_id в LRU-кэше для идемпотентности).

Ссылки