Kafka Topics Registry — реестр для деплоя в прод

Контекст

На тестовом VPS (erp-test.nirbi.ru, docker compose) Kafka работает 8+ дней. На production k8s кластере Kafka не развёрнута — все publisher’ы упираются в дефолтный localhost:9092 и блокируют HTTP-запросы (см. инцидент 500 на POST /admin/employees 2026-05-12). Этот документ — полный пул того, что нужно поднять в проде, чтобы Kafka-интеграции заработали.

Что есть в тесте (source of truth)

  • Образ: apache/kafka:3.9.0
  • Режим: KRaft single-node (broker+controller, без ZooKeeper)
  • Сеть: kafka:9092 внутри erp-network
  • Объёмы: heap -Xmx1024m -Xms512m

Broker-level config

ПараметрTestПрод (рекомендация)
default.replication.factor1≥ 2 (кластер из 3+ брокеров — 3)
min.insync.replicas12
num.partitions33
auto.create.topics.enablefalsefalse (Spring создаёт через NewTopic beans или DLT recoverer)
log.retention.hours168 (7 дней)168
log.retention.bytes2 GB / partition2 GB
message.max.bytes1 048 5881 048 588
compression.typeproducer-sideproducer-side
offsets.topic.replication.factor13
transaction.state.log.replication.factor13
transaction.state.log.min.isr12

Replication

На тесте всё replicas=1 — допустимо, потому что брокер один. На проде минимум 3 брокера + replication=3, min.insync=2 иначе любой рестарт брокера даст потерю сообщений или producer acks=all будет вешаться.

Топики (39 main + 21 DLT)

Все main-топики — partitions=3, retention=7 дней (DLT — 30 дней). Где default — наследуют broker log.retention.hours=168 = 7 дней.

user-service (4 main)

TopicProducerConsumer (group)Назначение
user.employee.createduser-service(нет в коде)BR 3.2 — auto-sync с Нетмонет
user.employee.updateduser-serviceто же
user.employee.deactivateduser-serviceто же
user.employee.reactivateduser-serviceто же

catalog-service (8 main + 8 DLT)

TopicProducerConsumer (group)
catalog.product.upsertedcatalog-servicepaykeeper-adapter (paykeeper-adapter-catalog-v2), pos-bff (pos-bff-sse-*)
catalog.product.deletedcatalog-servicepaykeeper-adapter, pos-bff
catalog.category.upsertedcatalog-servicepos-bff
catalog.category.deletedcatalog-servicepos-bff
catalog.modifier_group.upsertedcatalog-servicepaykeeper-adapter, pos-bff
catalog.modifier_group.deletedcatalog-servicepaykeeper-adapter, pos-bff
catalog.stoplist.updatedcatalog-servicepos-bff (SSE для стоп-листов на кассе)
catalog.external_menu.updatedcatalog-servicecatalog-service-self-external-menu (self-loop для агрегаторов)

Заявлены в KafkaConfig.java через TopicBuilder.name(...).partitions(3).replicas(1).config("retention.ms", "604800000"). При replicas=3 в проде — обновить bean’ы или создавать топики из CLI.

order-service (15 main)

TopicProducerConsumer (group)
order.createdorder-service
order.paidorder-service
order.completedorder-servicecustomer-service (customer-service-group), warehouse-service (warehouse-service-order-completed)
order.closedorder-service(BR 2.5 alias)
order.cancelledorder-service
order.refundedorder-servicewarehouse-service (warehouse-service-order-refunded)
order.status.changedorder-servicestore-service (store-service-tables), aggregator-service (aggregator-service-status-sync)
order.payment_requestedorder-servicepaykeeper-adapter (paykeeper-adapter-invoice)
order.refund_requestedorder-servicepaykeeper-adapter (paykeeper-adapter-refund)
order.cooking_startedorder-service(BR 5.1 KDS)
order.readyorder-service
order.handed_overorder-service
order.in_deliveryorder-service
order.deliveredorder-service
order.item.kitchen_status_changedorder-service(per-item kitchen)

paykeeper-adapter (7 main)

TopicProducerConsumer (group)
paykeeper.invoice.createdpaykeeper-adapterorder-service (order-service-pk-invoice)
paykeeper.payment.receivedpaykeeper-adapterorder-service (order-service-pk-payment)
paykeeper.payment.refundedpaykeeper-adapterorder-service (order-service-pk-refund)
paykeeper.receipt.fiscalizedpaykeeper-adapterorder-service (order-service-pk-receipt)
paykeeper.receipt.failedpaykeeper-adapterorder-service (order-service-pk-receipt-fail)
paykeeper.refund.failedpaykeeper-adapterorder-service (order-service-pk-refund-fail)
paykeeper.account.provisionedpaykeeper-adapter

customer-service (4 main)

TopicProducerConsumer (group)
customer.createdcustomer-service
customer.updatedcustomer-service
customer.deletedcustomer-service
customer-group.member-changedcustomer-service

store-service (1 main)

TopicProducerConsumer (group)
store.table.upsertedstore-servicepos-bff (pos-bff-sse-*)

aggregator-service (1 main)

TopicProducerConsumer (group)
aggregator.order.receivedaggregator-serviceorder-service (order-service-aggregator), pos-bff (pos-bff-sse-*)

pos.shift (2 main)

TopicProducerConsumer (group)
pos.shift.openedpos-bffuser-service (user-service-group), admin-bff (admin-bff-shift-monitor)
pos.shift.closedpos-bffuser-service, admin-bff

DLT (Dead Letter Topics)

Создаются автоматически DeadLetterPublishingRecoverer при первом failed consume. Имя — <topic>.DLT. Конфиг — partitions=3, retention=30 дней. Полный список в тесте (21 шт):

aggregator.order.received.DLT
catalog.category.{upserted,deleted}.DLT
catalog.modifier_group.{upserted,deleted}.DLT
catalog.product.{upserted,deleted}.DLT
catalog.stoplist.updated.DLT
order.completed.DLT
order.payment_requested.DLT
order.refund_requested.DLT
order.status.changed.DLT
paykeeper.payment.{received,refunded}.DLT
paykeeper.receipt.fiscalized.DLT
pos.shift.{opened,closed}.DLT
store.table.upserted.DLT

Note

При auto.create.topics.enable=false DLT надо либо создавать вручную, либо разрешить DeadLetterPublishingRecoverer их создавать (он работает через AdminClient — нужна topic:Create ACL).

Consumer groups (24 в тесте)

aggregator-service-status-sync
aggregator-webhook-dispatcher
catalog-service-self-external-menu
customer-service-group
order-service-aggregator
order-service-pk-invoice
order-service-pk-payment
order-service-pk-receipt
order-service-pk-receipt-fail
order-service-pk-refund
order-service-pk-refund-fail
paykeeper-adapter-catalog-v2
paykeeper-adapter-invoice
paykeeper-adapter-refund
pos-bff-sse-<hostname-hash> (динамические, по одному на pod pos-bff)
store-service-tables
user-service-group
warehouse-service-order-completed
warehouse-service-order-refunded

SSE consumer groups

pos-bff создаёт group ID вида pos-bff-sse-<hostname> чтобы каждый pod был отдельным consumer и каждый клиент SSE получал все события. При autoscaling pos-bff будет много групп — это нормально, но нужно следить за group.metadata.expiration (дефолт 7 дней — после увольнения pod’а group самоудаляется).

Что нужно для прода (чек-лист девопсу)

1. Развернуть Kafka в k8s

Варианты:

  • Strimzi operator (quay.io/strimzi/operator:latest) — рекомендация. Декларативно через Kafka CR, KRaft, monitoring, TLS, ACL out-of-the-box. Поднимается в отдельном namespace kafka.
  • Bitnami Helm chart — проще, но менее гибко (StatefulSet с фиксированной schema).
  • Redpanda — Kafka API-compatible, single-binary, без ZK. Минус — отдельная экосистема инструментов.

2. Конфигурация брокеров

Минимум 3 брокера, RF=3, min.insync=2 (см. таблицу выше).

3. Создать топики

Скрипт для девопса (под Strimzi — через KafkaTopic CR, иначе через kafka-topics.sh):

# Main topics (39 шт) — все одинаковые
TOPICS=(
  user.employee.created user.employee.updated
  user.employee.deactivated user.employee.reactivated
  catalog.product.upserted catalog.product.deleted
  catalog.category.upserted catalog.category.deleted
  catalog.modifier_group.upserted catalog.modifier_group.deleted
  catalog.stoplist.updated catalog.external_menu.updated
  order.created order.paid order.completed order.closed
  order.cancelled order.refunded order.status.changed
  order.payment_requested order.refund_requested
  order.cooking_started order.ready order.handed_over
  order.in_delivery order.delivered
  order.item.kitchen_status_changed
  paykeeper.invoice.created paykeeper.payment.received
  paykeeper.payment.refunded paykeeper.receipt.fiscalized
  paykeeper.receipt.failed paykeeper.refund.failed
  paykeeper.account.provisioned
  customer.created customer.updated customer.deleted
  customer-group.member-changed
  store.table.upserted aggregator.order.received
  pos.shift.opened pos.shift.closed
)
for t in "${TOPICS[@]}"; do
  kafka-topics.sh --bootstrap-server kafka:9092 --create \
    --topic "$t" --partitions 3 --replication-factor 3 \
    --config retention.ms=604800000 --config min.insync.replicas=2
done
 
# DLT topics (21 шт) — retention 30 дней
DLT_TOPICS=(
  aggregator.order.received.DLT
  catalog.category.upserted.DLT catalog.category.deleted.DLT
  catalog.modifier_group.upserted.DLT catalog.modifier_group.deleted.DLT
  catalog.product.upserted.DLT catalog.product.deleted.DLT
  catalog.stoplist.updated.DLT
  order.completed.DLT order.payment_requested.DLT
  order.refund_requested.DLT order.status.changed.DLT
  paykeeper.payment.received.DLT paykeeper.payment.refunded.DLT
  paykeeper.receipt.fiscalized.DLT
  pos.shift.opened.DLT pos.shift.closed.DLT
  store.table.upserted.DLT
)
for t in "${DLT_TOPICS[@]}"; do
  kafka-topics.sh --bootstrap-server kafka:9092 --create \
    --topic "$t" --partitions 3 --replication-factor 3 \
    --config retention.ms=2592000000 --config min.insync.replicas=2
done

4. ENV переменные в Deployment каждого сервиса

Добавить в k8s Deployment всех сервисов из колонки Producer/Consumer выше + pos-bff, admin-bff (Node.js KafkaJS):

env:
  - name: SPRING_KAFKA_BOOTSTRAP_SERVERS    # для Spring Boot (Java)
    value: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"
  - name: KAFKA_BROKERS                      # для KafkaJS (Node)
    value: "kafka-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092"

Сервисы которым нужен KAFKA env: user-service, catalog-service, order-service, warehouse-service, aggregator-service, customer-service, paykeeper-adapter, store-service, pos-bff, admin-bff.

Сервисы которым НЕ нужен (нет Kafka в коде): auth-service, admin-frontend.

5. Сетевая доступность

  • Если Kafka в namespace kafka, ERP-сервисы в erp — нужны NetworkPolicy для cross-namespace egress на 9092.
  • Если все в erp — DNS работает напрямую.

6. Monitoring

В Prometheus сейчас 0 метрик kafka.*. Добавить:

  • JMX exporter на каждом брокере (Strimzi даёт из коробки)
  • kafka-exporter (Bitnami chart) для consumer lag и broker stats
  • Дашборд в Grafana — kafka-overview + kafka-consumer-lag

Spring Kafka — известные настройки

Из кода (catalog-service KafkaConfig.java):

  • Consumer error handling: 3 ретрая с exponential backoff (1s → 10s, max 30s elapsed) → DeadLetterPublishingRecoverer.DLT topic
  • Producer compression: gzip (фикс eddcfb1 — переключали с lz4)
  • acks: дефолт allmin.insync.replicas=2 гарантирует durability)

Ссылки