Paykeeper Adapter — BR 3.4

Контракты

  • Data Model — 2 новые таблицы (paykeeper_products расширенная + pk_catalog_sync_runs)
  • Events — 4 consume-топика (catalog.product.* + catalog.modifier_group.*)
  • API — 4 admin-эндпоинта для catalog sync

Что делаем

Миграции Liquibase

  • src/main/resources/db/changelog/005_catalog_sync.xml:
    • Таблица paykeeper_products с полями: erp_product_id, erp_structural_option_id (nullable), erp_free_option_id (nullable), variant_kind (base/structural_variant/free_addon), sku, pk_product_id, hash, status, last_synced_at, last_error.
    • UNIQUE (account_id, erp_product_id, COALESCE(erp_structural_option_id, '00000000-0000-0000-0000-000000000000'), COALESCE(erp_free_option_id, '00000000-0000-0000-0000-000000000000')) — одна запись на комбинацию.
    • UNIQUE (account_id, pk_product_id) — для reverse lookup при обратном webhook’е.
    • Индексы: (account_id, status), (account_id, erp_product_id) (все варианты товара), last_synced_at.
    • Таблица pk_catalog_sync_runs с упрощёнными счётчиками: products_upserted, products_deleted, errors_count, last_error, errors_json (массив {erp_product_id, variant_kind, variant_sku, message}).
    • Комментарий / CHECK для pk_outbox.op_type — расширить двумя значениями: upsert_product, delete_product. (Значений для категорий и групп модификаторов НЕ добавляем — они разворачиваются в product-операции.)
    • Регистрация в db.changelog-master.xml.

Таблицы paykeeper_categories и paykeeper_modifier_groups не создаются — у PK ims-api нет таких сущностей.

Entities + Repositories

  • com.erp.paykeeper.entity.PaykeeperProduct + PaykeeperProductRepository:
    • findByAccountIdAndErpProductIdAndStructOptAndFreeOpt(...) — точное совпадение варианта
    • findAllByAccountIdAndErpProductId(...) — все варианты одного ERP-товара
    • findByAccountIdAndPkProductId(...) — для обратного webhook lookup (готовность к Phase D)
    • findStale(LocalDateTime), findAllByAccountIdAndStatus(...) как обычно
  • PkCatalogSyncRun + repository (findTopByAccountIdOrderByStartedAtDesc, existsByAccountIdAndStatus('running') для блокировки параллельных запусков)

PayKeeper ims-api client

  • com.erp.paykeeper.pk.PayKeeperImsClient — HTTP-клиент к https://ims-api.paykeeper.ru/api/v1 (из env PK_IMS_API_BASE).
    • Auth: отдельный JWT через GET /info/settings/service-token?service={PK_IMS_SERVICE_ID}&user={login} на {tsp}.server.paykeeper.ru — базовая auth по логину/паролю → JWT Bearer. Кэш JWT на 23 ч (отдельно от существующего pk_token_cache, либо расширяем его новым полем).
    • Методы:
      • Map<String,Object> createProduct(PaykeeperAccount, Map<String,Object> productPayload)POST /products с body {"product": {...}}
      • Map<String,Object> updateProduct(PaykeeperAccount, String pkProductId, Map<String,Object> partialUpdate)PATCH /product/{id} с body {"product": {...}}
      • void deleteProduct(PaykeeperAccount, String pkProductId)DELETE /product/{id}
      • Map<String,Object> getProducts(PaykeeperAccount, int startFrom, int limit)GET /products?limit&start_from (для первичной сверки)
    • Retry-логика переиспользует общий паттерн из PayKeeperClient — выносим в AbstractPkHttpClient.
    • Throttle: Thread.sleep(500) μs между write-запросами (из CATALOG_SYNC_THROTTLE_US env), 500 мс между страницами list-запросов.

Catalog event consumer

  • com.erp.paykeeper.kafka.CatalogEventConsumer@KafkaListener(topics = {"catalog.product.upserted", "catalog.product.deleted", "catalog.modifier_group.upserted", "catalog.modifier_group.deleted"}, groupId = "paykeeper-adapter-catalog"):
    • Парсит event_id, payload (Jackson map).
    • Извлекает franchise_id.
    • Собирает список affected_product_ids:
      • catalog.product.* → один id из payload
      • catalog.modifier_group.*referenced_by_product_ids из payload
    • Резолвит активные accounts: paykeeper_accounts WHERE franchise_id=X AND status='active' (через UserServiceClient.getFranchiseIdByLegalEntity + кэш Redis 5 мин).
    • Для product.deleted: для каждого account → enqueue delete_product для всех paykeeper_products WHERE account_id=X AND erp_product_id=Y AND status='active'. Expand-запрос НЕ нужен.
    • Для product.upserted / modifier_group.*: для каждого account × каждого product_id:
      1. catalogServiceClient.expandProduct(productId) → актуальный набор виртуальных продуктов
      2. Diff против paykeeper_products WHERE account_id=X AND erp_product_id=Y:
        • В expand, нет в mapping → upsert_product (create-intent)
        • В mapping, нет в expand → delete_product
        • В обоих, разный hash → upsert_product (update-intent)
      3. Enqueue каждую операцию в pk_outbox
  • Dedup по event_id через Caffeine LRU (ёмкость 10K, TTL 24ч).

Расширение PkOutboxWorker

  • Новые ветки switch (entry.getOpType()):
    • upsert_productexecuteUpsertProduct(entry):
      • Lookup paykeeper_products по (account_id, erp_product_id, struct_opt, free_opt).
      • Если есть — imsClient.updateProduct(account, existing.pk_product_id, payload). Иначе imsClient.createProduct(account, payload) → получить pk_product_id из ответа.
      • Upsert paykeeper_products с новым hash, last_synced_at=now.
    • delete_productexecuteDeleteProduct(entry):
      • Lookup paykeeper_products → если есть, imsClient.deleteProduct(pk_id) + status='deleted'.
      • Если нет (уже удалён) — просто markDone.
    • sync_catalog_snapshotexecuteSnapshotSync(entry) — см. ниже cron job.

CatalogReconcileJob (cron)

  • com.erp.paykeeper.worker.CatalogReconcileJob:
    • @Scheduled(cron = "${paykeeper.catalog-sync-cron:0 0 3 * * ?}")
    • Для каждого active paykeeper_accounts:
      1. Guard: если уже есть pk_catalog_sync_runs.status='running' для этого account — пропустить.
      2. Создать pk_catalog_sync_runs(status=running, trigger=cron, started_at=now).
      3. Резолвить franchise_id через UserServiceClient.
      4. catalogServiceClient.getFullSnapshot(franchise_id) → получить все products + modifier_groups.
      5. Локальное развёртывание: для каждого product локально вычислить виртуальные варианты (логика дублирует GET /expand на стороне adapter’а — используем общий хелпер ProductExpansion). Возможно лучше дернуть expand для каждого продукта; выбор трейдоф — решить на реализации.
      6. Diff против всех paykeeper_products WHERE account_id=X:
        • Ожидаемый, отсутствует в mapping → upsert_product
        • Есть в mapping, отсутствует в ожидаемом → delete_product
        • Совпадает по (product_id, struct_opt, free_opt), разный hashupsert_product
      7. Enqueue всё это в pk_outbox.
      8. После прохождения всего outbox (ждём или проверяем в конце) — обновить pk_catalog_sync_runs счётчиками и status.
  • Recovery-режим: @Scheduled(fixedRate = 3600000) — раз в час ищет paykeeper_products.last_synced_at < now - 7d и запускает full re-sync с trigger='webhook_missed'.

CatalogSyncController + Service

  • com.erp.paykeeper.service.CatalogSyncService:
    • initiateManualResync(accountId) → создаёт run с trigger='manual', запускает ту же логику что cron.
    • getStatus(accountId) → aggregated PkCatalogSyncStatusDto — счётчики pk_products_synced (COUNT active), erp_products_total (из catalog snapshot, кэш 60с), diverged_count (records с устаревшим hash).
    • listRuns(accountId, limit, since) → page of runs.
    • getRunDetail(accountId, runId) → полный run с errors_json.
  • com.erp.paykeeper.controller.CatalogSyncController — 4 эндпоинта по API.md:
    • POST /internal/paykeeper/accounts/{id}/resync-catalog
    • GET /internal/paykeeper/accounts/{id}/catalog-sync-status
    • GET /internal/paykeeper/accounts/{id}/catalog-sync-runs
    • GET /internal/paykeeper/accounts/{id}/catalog-sync-runs/{run_id}
  • Security — @PreAuthorize permissions соответствующие (read/manage).
  • DTO классы: PkCatalogSyncStatusDtopk_products_synced, erp_products_total, diverged_count), PkCatalogSyncRunDto, PkCatalogSyncRunDetailDtoerrors_json).

HTTP-клиент Catalog Service

  • com.erp.paykeeper.client.CatalogServiceClient:
    • FullSnapshot getFullSnapshot(UUID franchiseId)GET /internal/catalog/full-snapshot.
    • ProductExpansion expandProduct(UUID productId)GET /internal/catalog/products/{id}/expand.
  • Использует X-Service-Token (существующий конфиг).
  • Retry 3 раза с backoff при 5xx / timeout.
  • При 429 RATE_LIMITED — ждём Retry-After и повторяем.

Конфигурация (application.yml + env)

  • CATALOG_SERVICE_URL (default http://catalog-service:3004)
  • CATALOG_SYNC_CRON (default 0 0 3 * * ?)
  • PK_IMS_API_BASE (default https://ims-api.paykeeper.ru/api/v1)
  • PK_IMS_SERVICE_ID (default ims-api.paykeeper.ru)
  • CATALOG_SYNC_BATCH_SIZE (default 50)
  • CATALOG_SYNC_THROTTLE_US (default 500)

Retention + алерты

  • Обновить RetentionJob — удалять pk_catalog_sync_runs старше 90 дней.
  • Slack-webhook алерт при pk_catalog_sync_runs.status=failed.
  • Prometheus метрики (через Micrometer):
    • paykeeper_catalog_sync_last_success_ts gauge per account_id
    • paykeeper_catalog_sync_errors_total counter per op_type (только upsert_product / delete_product)
    • paykeeper_catalog_products_diverged gauge per account_id

Тесты

  • Unit CatalogEventConsumerTest — routing на список accounts, expand-flow для product.upserted, delete-flow для product.deleted, re-expand для modifier_group.*
  • Unit ProductExpansionTest (если логика локальная в adapter) — развёртывание для 4 комбинаций (без мод / только free / только struct / оба). Сверка с примерами из бизнес-спеки.
  • Unit CatalogSyncServiceTest — guard против параллельного running, status aggregation.
  • Unit PkOutboxWorkerTest — новые ветки upsert/delete product (mock ims-client).
  • Integration CatalogReconcileJobIT — полный прогон с mock Catalog Service + mock ims-api через WireMock.
  • Integration CatalogSyncControllerIT — 4 эндпоинта, валидация permissions.
  • Integration PayKeeperImsClientIT — JWT через service-token, retry, throttle.

Deploy

  • Обновить envs для paykeeper-adapter:
    • CATALOG_SERVICE_URL, CATALOG_SYNC_CRON, PK_IMS_API_BASE, PK_IMS_SERVICE_ID, CATALOG_SYNC_BATCH_SIZE, CATALOG_SYNC_THROTTLE_US
  • Передеплой paykeeper-adapter через /deploy-all paykeeper-adapter.
  • Sanity: логи показывают «CatalogReconcileJob initialized» при старте, первый manual resync проходит без ошибок на тестовом аккаунте.

Зависимости

  • Catalog Service — должен быть раскатан раньше (event publisher + snapshot + expand endpoints). Без этого consumer не работает.
  • User Service — существующий GET /internal/legal-entities/{id} возвращает franchise_id, изменений не требует.

Ссылки