Paykeeper Adapter — Kafka события
Топики — сводка
| Topic | Role | Retention |
|---|---|---|
paykeeper.invoice.created | Producer | 7 дней |
paykeeper.payment.received | Producer | 30 дней |
paykeeper.payment.refunded | Producer | 30 дней |
paykeeper.receipt.fiscalized | Producer | 30 дней |
paykeeper.receipt.failed | Producer | 30 дней |
paykeeper.refund.failed | Producer | 30 дней |
paykeeper.account.provisioned | Producer | 7 дней |
order.payment_requested | Consumer | (от Order Service) |
order.refund_requested | Consumer | (от Order Service) |
catalog.product.upserted | Consumer | (от Catalog Service, BR 3.4) |
catalog.product.deleted | Consumer | (от Catalog Service, BR 3.4) |
catalog.modifier_group.upserted | Consumer | (от Catalog Service, BR 3.4) |
catalog.modifier_group.deleted | Consumer | (от Catalog Service, BR 3.4) |
Публикуем
paykeeper.invoice.created
Публикуется после успешного POST /change/invoice/preview/ в PK.
Topic: paykeeper.invoice.created
{
"event_id": "uuid",
"timestamp": "datetime",
"version": 1,
"payload": {
"order_id": "uuid",
"account_id": "uuid",
"pk_invoice_id": "string",
"pk_invoice_url": "string",
"pay_amount": "decimal",
"expiry_at": "datetime | null"
}
}Консьюмеры:
- Order Service — сохраняет
orders.pk_invoice_id,orders.pk_invoice_url, отдаёт URL клиенту / POS BFF.
paykeeper.payment.received
Публикуется при получении informer success от PK или при догрузке через reconciliation.
Topic: paykeeper.payment.received
{
"event_id": "uuid",
"timestamp": "datetime",
"version": 1,
"payload": {
"order_id": "uuid",
"account_id": "uuid",
"pk_payment_id": "string",
"pk_unique_id": "string | null",
"amount": "decimal",
"payment_method": "cash | card | qr | mixed",
"paid_at": "datetime",
"bank_id": "string | null",
"card_last4": "string | null",
"reconciled": "boolean"
}
}Консьюмеры:
- Order Service —
orders.status=paid,orders.paid_at,orders.payment_method,orders.pk_payment_id, эмититorder.paid. - Warehouse Service — списание остатков если
order.statusужеready(см. BR 2.5 §10.1). - Customer Service — пересчёт dynamic групп если
customer_id != null.
paykeeper.payment.refunded
Публикуется при получении refund webhook от PK.
Topic: paykeeper.payment.refunded
{
"event_id": "uuid",
"timestamp": "datetime",
"version": 1,
"payload": {
"order_id": "uuid",
"account_id": "uuid",
"pk_payment_id": "string",
"pk_refund_id": "string | null",
"amount": "decimal",
"refund_total_so_far": "decimal",
"is_full_refund": "boolean",
"initiated_by": "admin | pos | pk_external",
"datetime": "datetime"
}
}Консьюмеры:
- Order Service — обновляет
RefundRecord.status=done, эмититorder.refundedдля остальных.
paykeeper.receipt.fiscalized
Публикуется когда чек 54-ФЗ достиг status=success (либо догружен через GET /info/receipts/bypaymentid/, либо callback §8.13).
Topic: paykeeper.receipt.fiscalized
{
"event_id": "uuid",
"timestamp": "datetime",
"version": 1,
"payload": {
"order_id": "uuid",
"pk_receipt_id": "string",
"type": "sale | refund | expense | expense-refund",
"is_post_sale": "boolean",
"is_correction": "boolean",
"fpd": "string",
"fnd": "string",
"fn": "string",
"rnkkt": "string",
"shift_number": "integer",
"receipt_number": "integer",
"fop_receipt_key": "string",
"fop_url": "string",
"ts": "string"
}
}Консьюмеры:
- Order Service — записывает
order_payments.fiscal_dataJSONB +order_payments.pk_fop_receipt_keyдля отображения в карточке заказа и отчётах.
paykeeper.receipt.failed
Публикуется при финальной ошибке формирования чека (rejected / failed / timeout).
Topic: paykeeper.receipt.failed
{
"event_id": "uuid",
"timestamp": "datetime",
"version": 1,
"payload": {
"order_id": "uuid",
"pk_receipt_id": "string",
"status": "rejected | failed | timeout",
"error_type": "syntax_error | ffd_error | item_code_error | print_error | queue_error | null",
"error_message": "string | null"
}
}Консьюмеры:
- Admin BFF — уведомление в админку для оператора.
- Order Service — ставит флаг
order_payments.fiscal_failedдля визуальной метки в UI.
paykeeper.refund.failed
Публикуется когда PK отказал в возврате на этапе инициации ({"result":"fail"}).
Topic: paykeeper.refund.failed
{
"event_id": "uuid",
"timestamp": "datetime",
"version": 1,
"payload": {
"order_id": "uuid",
"pk_payment_id": "string",
"amount": "decimal",
"error_message": "string"
}
}Консьюмеры:
- Order Service — ставит
RefundRecord.status=failed+ сохраняетerror_message.
paykeeper.account.provisioned
Публикуется после успешного создания PK-аккаунта (в P3 — после autoinstaller; в P0 — после ручного ввода креденшелов и успешной проверки соединения).
Topic: paykeeper.account.provisioned
{
"event_id": "uuid",
"timestamp": "datetime",
"version": 1,
"payload": {
"account_id": "uuid",
"legal_entity_id": "uuid",
"pk_server_host": "string",
"paykeeper_id": "string | null"
}
}Консьюмеры:
- Admin BFF — обновление UI индикатора в карточке ЮЛ.
Потребляем
| Event | Topic | Consumer group | Зачем |
|---|---|---|---|
order.payment_requested | order.payment_requested | paykeeper-adapter-invoice | Создание инвойса в PK: POST /change/invoice/preview/ (R3 BR 3.3) |
order.refund_requested | order.refund_requested | paykeeper-adapter-refund | Инициация возврата: POST /change/payment/reverse/ (R7 BR 3.3) |
catalog.product.upserted | catalog.product.upserted | paykeeper-adapter-catalog | Развернуть товар в виртуальные PK-продукты и синкать (BR 3.4) |
catalog.product.deleted | catalog.product.deleted | paykeeper-adapter-catalog | Удалить все PK-продукты этого ERP-товара (BR 3.4) |
catalog.modifier_group.upserted | catalog.modifier_group.upserted | paykeeper-adapter-catalog | Переразвернуть все товары, использующие эту группу (BR 3.4) |
catalog.modifier_group.deleted | catalog.modifier_group.deleted | paykeeper-adapter-catalog | Переразвернуть затронутые товары (у них станет меньше variants) (BR 3.4) |
Payload order-событий публикует Order Service — см. Order Service · Events.
Payload catalog-событий публикует Catalog Service — см. Catalog Service · Events.
Обработка order.payment_requested
- Найти
account_idпоstore_id → paykeeper_terminals.account_id. - Если не найдено — запись в
webhook_logс ошибкой, publishpaykeeper.refund.failedдля видимости (или отдельныйpaykeeper.invoice.failed— TBD). - Упаковать
orderid_bridge = base64url(store_id_short:order_number). - Вставить запись в
pk_outboxсop_type=create_invoice. - Worker вытаскивает → вызов PK → сохранение
paykeeper_invoices→ publishpaykeeper.invoice.created.
Обработка order.refund_requested
- Найти
pk_payment_idчерез своиpaykeeper_paymentsпоorder_id. - Вставить запись в
pk_outboxсop_type=reverse_payment, payload{id, amount, partial, refund_cart}. - Worker вытаскивает → вызов PK → сохранение
paykeeper_refundsstatus=started. - При
result=failот PK — publishpaykeeper.refund.failed. - Успех фиксируется позже через refund webhook — publish
paykeeper.payment.refunded.
Обработка catalog.* событий (BR 3.4)
Единый воркер CatalogEventConsumer с consumer_group paykeeper-adapter-catalog, подписанный на 4 catalog-топика.
Общие шаги (для всех 4 событий)
- Извлечь
franchise_idизpayload. - Резолвить список активных PK-аккаунтов:
paykeeper_accounts WHERE franchise_id = X AND status = 'active'. Резолвlegal_entity_id → franchise_idчерез User ServiceGET /internal/legal-entities/{id}, кэш в Redis с TTL 5 мин. - Собрать список затронутых
erp_product_id:catalog.product.*→ один id из payloadcatalog.modifier_group.*→referenced_by_product_idsиз payload (Catalog Service отдаёт готовый список)
- Для каждого
account_id× каждогоerp_product_id— запросить у Catalog ServiceGET /internal/catalog/products/{id}/expand→ получить актуальный набор виртуальных продуктов. - Диффить против
paykeeper_products WHERE account_id = X AND erp_product_id = Y:- Новые варианты (нет в mapping’е) → enqueue
upsert_productвpk_outbox - Исчезнувшие варианты (есть в mapping, нет в expand-ответе) → enqueue
delete_product - Изменившиеся (разный
hash) → enqueueupsert_product - Совпадающие (тот же
hash) → skip, обновляем толькоlast_synced_at
- Новые варианты (нет в mapping’е) → enqueue
payload_jsonoutbox-записи содержит:account_id,event_id(для дедупа), полный payload виртуального продукта готовый к отправке в ims-api (name, price, tax, sku, variant_kind, erp-ids для reverse lookup).- Worker (
PkOutboxWorker) обрабатывает: вызов ims-api (POST /products/PATCH /product/{pk_id}/DELETE /product/{pk_id}) → upsertpaykeeper_productsс новымhash.
При ошибке в worker’е — стандартный outbox backoff (10s / 30s / 2m / 10m / 1h / 6h / 24h), после 10 попыток → dead_letter + алерт.
Дедуп — worker проверяет event_id в in-memory LRU (окно 24 ч) против повторной доставки Kafka.
Особенности по типам событий
catalog.product.deleted— expand-запрос не нужен. Сразу enqueuedelete_productдля ВСЕХpaykeeper_products WHERE account_id=X AND erp_product_id=Y AND status=active.catalog.modifier_group.upserted/.deleted— для каждогоreferenced_by_product_idsпрогоняем expand (у товара поменялся набор variants/addons) → diff → соответствующие upsert/delete. Это значит одно событие группы может породить десятки outbox-записей (по одной на каждый affected product × каждый PK-account × каждый variant).
Обработка full re-sync (op_type=sync_catalog_snapshot)
Инициируется тремя триггерами:
- Ночной cron
@Scheduled(cron = "0 0 3 * * ?")— для каждогоactiveaccount. - Manual через admin endpoint
POST /internal/paykeeper/accounts/{id}/resync-catalog. - Recovery: если обнаружены
paykeeper_productsсlast_synced_at < now - 7 days(cron-проверка раз в час).
Flow:
- Создать
pk_catalog_sync_runsсоstatus=runningи соответствующимtrigger. - Запрос в Catalog Service:
GET /internal/catalog/full-snapshot?franchise_id=Xчерезlegal_entity → franchise_id. - Для каждого товара в snapshot’е — локально развернуть его в виртуальные продукты (точно так же как делал бы
GET /.../expand, логика переиспользуется) — получаем ожидаемый набор записей. - Для каждого PK-аккаунта сравнить с
paykeeper_products:- Ожидаемый, отсутствует в mapping →
upsert_product - Есть в mapping, отсутствует в ожидаемом →
delete_product(был soft-delete или убран модификатор) - Есть в обоих, разный hash →
upsert_product
- Ожидаемый, отсутствует в mapping →
- По завершении batch’а — обновить
pk_catalog_sync_runsсчётчиками и статусом. - Если
errors_count > 0→status=partial, вerrors_json— список{erp_product_id, variant_kind, variant_sku, message}.
Сценарий полного цикла
sequenceDiagram participant OS as Order Service participant KF as Kafka participant AD as Paykeeper Adapter participant PK as PayKeeper OS->>KF: order.payment_requested KF->>AD: consume AD->>AD: pk_outbox insert AD->>PK: POST /change/invoice/preview/ PK-->>AD: invoice_id + url AD->>KF: paykeeper.invoice.created KF->>OS: consume, save invoice_url Note over PK: клиент оплачивает PK->>AD: POST /pk-webhooks/informer/{account_id} (MD5) AD->>AD: validate signature, dedup AD-->>PK: "OK <md5(id+secret)>" AD->>KF: paykeeper.payment.received KF->>OS: consume → orders.status=paid Note over AD: async fiscal fetch AD->>PK: GET /info/receipts/bypaymentid/ PK-->>AD: receipt with FN/FD/FP AD->>KF: paykeeper.receipt.fiscalized KF->>OS: consume → order_payments.fiscal_data