Paykeeper Adapter — Kafka события

Топики — сводка

TopicRoleRetention
paykeeper.invoice.createdProducer7 дней
paykeeper.payment.receivedProducer30 дней
paykeeper.payment.refundedProducer30 дней
paykeeper.receipt.fiscalizedProducer30 дней
paykeeper.receipt.failedProducer30 дней
paykeeper.refund.failedProducer30 дней
paykeeper.account.provisionedProducer7 дней
order.payment_requestedConsumer(от Order Service)
order.refund_requestedConsumer(от Order Service)
catalog.product.upsertedConsumer(от Catalog Service, BR 3.4)
catalog.product.deletedConsumer(от Catalog Service, BR 3.4)
catalog.modifier_group.upsertedConsumer(от Catalog Service, BR 3.4)
catalog.modifier_group.deletedConsumer(от Catalog Service, BR 3.4)

Публикуем

paykeeper.invoice.created

Публикуется после успешного POST /change/invoice/preview/ в PK.

Topic: paykeeper.invoice.created

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "order_id": "uuid",
    "account_id": "uuid",
    "pk_invoice_id": "string",
    "pk_invoice_url": "string",
    "pay_amount": "decimal",
    "expiry_at": "datetime | null"
  }
}

Консьюмеры:

  • Order Service — сохраняет orders.pk_invoice_id, orders.pk_invoice_url, отдаёт URL клиенту / POS BFF.

paykeeper.payment.received

Публикуется при получении informer success от PK или при догрузке через reconciliation.

Topic: paykeeper.payment.received

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "order_id": "uuid",
    "account_id": "uuid",
    "pk_payment_id": "string",
    "pk_unique_id": "string | null",
    "amount": "decimal",
    "payment_method": "cash | card | qr | mixed",
    "paid_at": "datetime",
    "bank_id": "string | null",
    "card_last4": "string | null",
    "reconciled": "boolean"
  }
}

Консьюмеры:

  • Order Serviceorders.status=paid, orders.paid_at, orders.payment_method, orders.pk_payment_id, эмитит order.paid.
  • Warehouse Service — списание остатков если order.status уже ready (см. BR 2.5 §10.1).
  • Customer Service — пересчёт dynamic групп если customer_id != null.

paykeeper.payment.refunded

Публикуется при получении refund webhook от PK.

Topic: paykeeper.payment.refunded

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "order_id": "uuid",
    "account_id": "uuid",
    "pk_payment_id": "string",
    "pk_refund_id": "string | null",
    "amount": "decimal",
    "refund_total_so_far": "decimal",
    "is_full_refund": "boolean",
    "initiated_by": "admin | pos | pk_external",
    "datetime": "datetime"
  }
}

Консьюмеры:

  • Order Service — обновляет RefundRecord.status=done, эмитит order.refunded для остальных.

paykeeper.receipt.fiscalized

Публикуется когда чек 54-ФЗ достиг status=success (либо догружен через GET /info/receipts/bypaymentid/, либо callback §8.13).

Topic: paykeeper.receipt.fiscalized

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "order_id": "uuid",
    "pk_receipt_id": "string",
    "type": "sale | refund | expense | expense-refund",
    "is_post_sale": "boolean",
    "is_correction": "boolean",
    "fpd": "string",
    "fnd": "string",
    "fn": "string",
    "rnkkt": "string",
    "shift_number": "integer",
    "receipt_number": "integer",
    "fop_receipt_key": "string",
    "fop_url": "string",
    "ts": "string"
  }
}

Консьюмеры:

  • Order Service — записывает order_payments.fiscal_data JSONB + order_payments.pk_fop_receipt_key для отображения в карточке заказа и отчётах.

paykeeper.receipt.failed

Публикуется при финальной ошибке формирования чека (rejected / failed / timeout).

Topic: paykeeper.receipt.failed

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "order_id": "uuid",
    "pk_receipt_id": "string",
    "status": "rejected | failed | timeout",
    "error_type": "syntax_error | ffd_error | item_code_error | print_error | queue_error | null",
    "error_message": "string | null"
  }
}

Консьюмеры:

  • Admin BFF — уведомление в админку для оператора.
  • Order Service — ставит флаг order_payments.fiscal_failed для визуальной метки в UI.

paykeeper.refund.failed

Публикуется когда PK отказал в возврате на этапе инициации ({"result":"fail"}).

Topic: paykeeper.refund.failed

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "order_id": "uuid",
    "pk_payment_id": "string",
    "amount": "decimal",
    "error_message": "string"
  }
}

Консьюмеры:

  • Order Service — ставит RefundRecord.status=failed + сохраняет error_message.

paykeeper.account.provisioned

Публикуется после успешного создания PK-аккаунта (в P3 — после autoinstaller; в P0 — после ручного ввода креденшелов и успешной проверки соединения).

Topic: paykeeper.account.provisioned

{
  "event_id": "uuid",
  "timestamp": "datetime",
  "version": 1,
  "payload": {
    "account_id": "uuid",
    "legal_entity_id": "uuid",
    "pk_server_host": "string",
    "paykeeper_id": "string | null"
  }
}

Консьюмеры:

  • Admin BFF — обновление UI индикатора в карточке ЮЛ.

Потребляем

EventTopicConsumer groupЗачем
order.payment_requestedorder.payment_requestedpaykeeper-adapter-invoiceСоздание инвойса в PK: POST /change/invoice/preview/ (R3 BR 3.3)
order.refund_requestedorder.refund_requestedpaykeeper-adapter-refundИнициация возврата: POST /change/payment/reverse/ (R7 BR 3.3)
catalog.product.upsertedcatalog.product.upsertedpaykeeper-adapter-catalogРазвернуть товар в виртуальные PK-продукты и синкать (BR 3.4)
catalog.product.deletedcatalog.product.deletedpaykeeper-adapter-catalogУдалить все PK-продукты этого ERP-товара (BR 3.4)
catalog.modifier_group.upsertedcatalog.modifier_group.upsertedpaykeeper-adapter-catalogПереразвернуть все товары, использующие эту группу (BR 3.4)
catalog.modifier_group.deletedcatalog.modifier_group.deletedpaykeeper-adapter-catalogПереразвернуть затронутые товары (у них станет меньше variants) (BR 3.4)

Payload order-событий публикует Order Service — см. Order Service · Events.

Payload catalog-событий публикует Catalog Service — см. Catalog Service · Events.

Обработка order.payment_requested

  1. Найти account_id по store_id → paykeeper_terminals.account_id.
  2. Если не найдено — запись в webhook_log с ошибкой, publish paykeeper.refund.failed для видимости (или отдельный paykeeper.invoice.failed — TBD).
  3. Упаковать orderid_bridge = base64url(store_id_short:order_number).
  4. Вставить запись в pk_outbox с op_type=create_invoice.
  5. Worker вытаскивает → вызов PK → сохранение paykeeper_invoices → publish paykeeper.invoice.created.

Обработка order.refund_requested

  1. Найти pk_payment_id через свои paykeeper_payments по order_id.
  2. Вставить запись в pk_outbox с op_type=reverse_payment, payload {id, amount, partial, refund_cart}.
  3. Worker вытаскивает → вызов PK → сохранение paykeeper_refunds status=started.
  4. При result=fail от PK — publish paykeeper.refund.failed.
  5. Успех фиксируется позже через refund webhook — publish paykeeper.payment.refunded.

Обработка catalog.* событий (BR 3.4)

Единый воркер CatalogEventConsumer с consumer_group paykeeper-adapter-catalog, подписанный на 4 catalog-топика.

Общие шаги (для всех 4 событий)

  1. Извлечь franchise_id из payload.
  2. Резолвить список активных PK-аккаунтов: paykeeper_accounts WHERE franchise_id = X AND status = 'active'. Резолв legal_entity_id → franchise_id через User Service GET /internal/legal-entities/{id}, кэш в Redis с TTL 5 мин.
  3. Собрать список затронутых erp_product_id:
    • catalog.product.* → один id из payload
    • catalog.modifier_group.*referenced_by_product_ids из payload (Catalog Service отдаёт готовый список)
  4. Для каждого account_id × каждого erp_product_id — запросить у Catalog Service GET /internal/catalog/products/{id}/expand → получить актуальный набор виртуальных продуктов.
  5. Диффить против paykeeper_products WHERE account_id = X AND erp_product_id = Y:
    • Новые варианты (нет в mapping’е) → enqueue upsert_product в pk_outbox
    • Исчезнувшие варианты (есть в mapping, нет в expand-ответе) → enqueue delete_product
    • Изменившиеся (разный hash) → enqueue upsert_product
    • Совпадающие (тот же hash) → skip, обновляем только last_synced_at
  6. payload_json outbox-записи содержит: account_id, event_id (для дедупа), полный payload виртуального продукта готовый к отправке в ims-api (name, price, tax, sku, variant_kind, erp-ids для reverse lookup).
  7. Worker (PkOutboxWorker) обрабатывает: вызов ims-api (POST /products / PATCH /product/{pk_id} / DELETE /product/{pk_id}) → upsert paykeeper_products с новым hash.

При ошибке в worker’е — стандартный outbox backoff (10s / 30s / 2m / 10m / 1h / 6h / 24h), после 10 попыток → dead_letter + алерт.

Дедуп — worker проверяет event_id в in-memory LRU (окно 24 ч) против повторной доставки Kafka.

Особенности по типам событий

  • catalog.product.deleted — expand-запрос не нужен. Сразу enqueue delete_product для ВСЕХ paykeeper_products WHERE account_id=X AND erp_product_id=Y AND status=active.
  • catalog.modifier_group.upserted / .deleted — для каждого referenced_by_product_ids прогоняем expand (у товара поменялся набор variants/addons) → diff → соответствующие upsert/delete. Это значит одно событие группы может породить десятки outbox-записей (по одной на каждый affected product × каждый PK-account × каждый variant).

Обработка full re-sync (op_type=sync_catalog_snapshot)

Инициируется тремя триггерами:

  • Ночной cron @Scheduled(cron = "0 0 3 * * ?") — для каждого active account.
  • Manual через admin endpoint POST /internal/paykeeper/accounts/{id}/resync-catalog.
  • Recovery: если обнаружены paykeeper_products с last_synced_at < now - 7 days (cron-проверка раз в час).

Flow:

  1. Создать pk_catalog_sync_runs со status=running и соответствующим trigger.
  2. Запрос в Catalog Service: GET /internal/catalog/full-snapshot?franchise_id=X через legal_entity → franchise_id.
  3. Для каждого товара в snapshot’е — локально развернуть его в виртуальные продукты (точно так же как делал бы GET /.../expand, логика переиспользуется) — получаем ожидаемый набор записей.
  4. Для каждого PK-аккаунта сравнить с paykeeper_products:
    • Ожидаемый, отсутствует в mapping → upsert_product
    • Есть в mapping, отсутствует в ожидаемом → delete_product (был soft-delete или убран модификатор)
    • Есть в обоих, разный hash → upsert_product
  5. По завершении batch’а — обновить pk_catalog_sync_runs счётчиками и статусом.
  6. Если errors_count > 0status=partial, в errors_json — список {erp_product_id, variant_kind, variant_sku, message}.

Сценарий полного цикла

sequenceDiagram
  participant OS as Order Service
  participant KF as Kafka
  participant AD as Paykeeper Adapter
  participant PK as PayKeeper

  OS->>KF: order.payment_requested
  KF->>AD: consume
  AD->>AD: pk_outbox insert
  AD->>PK: POST /change/invoice/preview/
  PK-->>AD: invoice_id + url
  AD->>KF: paykeeper.invoice.created
  KF->>OS: consume, save invoice_url

  Note over PK: клиент оплачивает

  PK->>AD: POST /pk-webhooks/informer/{account_id} (MD5)
  AD->>AD: validate signature, dedup
  AD-->>PK: "OK <md5(id+secret)>"
  AD->>KF: paykeeper.payment.received
  KF->>OS: consume → orders.status=paid

  Note over AD: async fiscal fetch

  AD->>PK: GET /info/receipts/bypaymentid/
  PK-->>AD: receipt with FN/FD/FP
  AD->>KF: paykeeper.receipt.fiscalized
  KF->>OS: consume → order_payments.fiscal_data