Paykeeper Adapter — BR 3.4
Контракты
- Data Model — 2 новые таблицы (
paykeeper_productsрасширенная +pk_catalog_sync_runs)- Events — 4 consume-топика (
catalog.product.*+catalog.modifier_group.*)- API — 4 admin-эндпоинта для catalog sync
Что делаем
Миграции Liquibase
-
src/main/resources/db/changelog/005_catalog_sync.xml:- Таблица
paykeeper_productsс полями:erp_product_id,erp_structural_option_id(nullable),erp_free_option_id(nullable),variant_kind(base/structural_variant/free_addon),sku,pk_product_id,hash,status,last_synced_at,last_error. - UNIQUE
(account_id, erp_product_id, COALESCE(erp_structural_option_id, '00000000-0000-0000-0000-000000000000'), COALESCE(erp_free_option_id, '00000000-0000-0000-0000-000000000000'))— одна запись на комбинацию. - UNIQUE
(account_id, pk_product_id)— для reverse lookup при обратном webhook’е. - Индексы:
(account_id, status),(account_id, erp_product_id)(все варианты товара),last_synced_at. - Таблица
pk_catalog_sync_runsс упрощёнными счётчиками:products_upserted,products_deleted,errors_count,last_error,errors_json(массив{erp_product_id, variant_kind, variant_sku, message}). - Комментарий / CHECK для
pk_outbox.op_type— расширить двумя значениями:upsert_product,delete_product. (Значений для категорий и групп модификаторов НЕ добавляем — они разворачиваются в product-операции.) - Регистрация в
db.changelog-master.xml.
- Таблица
Таблицы paykeeper_categories и paykeeper_modifier_groups не создаются — у PK ims-api нет таких сущностей.
Entities + Repositories
-
com.erp.paykeeper.entity.PaykeeperProduct+PaykeeperProductRepository:findByAccountIdAndErpProductIdAndStructOptAndFreeOpt(...)— точное совпадение вариантаfindAllByAccountIdAndErpProductId(...)— все варианты одного ERP-товараfindByAccountIdAndPkProductId(...)— для обратного webhook lookup (готовность к Phase D)findStale(LocalDateTime),findAllByAccountIdAndStatus(...)как обычно
-
PkCatalogSyncRun+ repository (findTopByAccountIdOrderByStartedAtDesc,existsByAccountIdAndStatus('running')для блокировки параллельных запусков)
PayKeeper ims-api client
-
com.erp.paykeeper.pk.PayKeeperImsClient— HTTP-клиент кhttps://ims-api.paykeeper.ru/api/v1(из envPK_IMS_API_BASE).- Auth: отдельный JWT через
GET /info/settings/service-token?service={PK_IMS_SERVICE_ID}&user={login}на{tsp}.server.paykeeper.ru— базовая auth по логину/паролю → JWT Bearer. Кэш JWT на 23 ч (отдельно от существующегоpk_token_cache, либо расширяем его новым полем). - Методы:
Map<String,Object> createProduct(PaykeeperAccount, Map<String,Object> productPayload)→POST /productsс body{"product": {...}}Map<String,Object> updateProduct(PaykeeperAccount, String pkProductId, Map<String,Object> partialUpdate)→PATCH /product/{id}с body{"product": {...}}void deleteProduct(PaykeeperAccount, String pkProductId)→DELETE /product/{id}Map<String,Object> getProducts(PaykeeperAccount, int startFrom, int limit)→GET /products?limit&start_from(для первичной сверки)
- Retry-логика переиспользует общий паттерн из
PayKeeperClient— выносим вAbstractPkHttpClient. - Throttle:
Thread.sleep(500)μs между write-запросами (изCATALOG_SYNC_THROTTLE_USenv), 500 мс между страницами list-запросов.
- Auth: отдельный JWT через
Catalog event consumer
-
com.erp.paykeeper.kafka.CatalogEventConsumer—@KafkaListener(topics = {"catalog.product.upserted", "catalog.product.deleted", "catalog.modifier_group.upserted", "catalog.modifier_group.deleted"}, groupId = "paykeeper-adapter-catalog"):- Парсит
event_id,payload(Jackson map). - Извлекает
franchise_id. - Собирает список
affected_product_ids:catalog.product.*→ один id из payloadcatalog.modifier_group.*→referenced_by_product_idsиз payload
- Резолвит активные accounts:
paykeeper_accounts WHERE franchise_id=X AND status='active'(черезUserServiceClient.getFranchiseIdByLegalEntity+ кэш Redis 5 мин). - Для product.deleted: для каждого account → enqueue
delete_productдля всехpaykeeper_products WHERE account_id=X AND erp_product_id=Y AND status='active'. Expand-запрос НЕ нужен. - Для product.upserted / modifier_group.*: для каждого account × каждого product_id:
catalogServiceClient.expandProduct(productId)→ актуальный набор виртуальных продуктов- Diff против
paykeeper_products WHERE account_id=X AND erp_product_id=Y:- В expand, нет в mapping →
upsert_product(create-intent) - В mapping, нет в expand →
delete_product - В обоих, разный hash →
upsert_product(update-intent)
- В expand, нет в mapping →
- Enqueue каждую операцию в
pk_outbox
- Парсит
- Dedup по
event_idчерез Caffeine LRU (ёмкость 10K, TTL 24ч).
Расширение PkOutboxWorker
- Новые ветки
switch (entry.getOpType()):upsert_product→executeUpsertProduct(entry):- Lookup
paykeeper_productsпо (account_id, erp_product_id, struct_opt, free_opt). - Если есть —
imsClient.updateProduct(account, existing.pk_product_id, payload). ИначеimsClient.createProduct(account, payload)→ получитьpk_product_idиз ответа. - Upsert
paykeeper_productsс новымhash,last_synced_at=now.
- Lookup
delete_product→executeDeleteProduct(entry):- Lookup
paykeeper_products→ если есть,imsClient.deleteProduct(pk_id)+status='deleted'. - Если нет (уже удалён) — просто markDone.
- Lookup
sync_catalog_snapshot→executeSnapshotSync(entry)— см. ниже cron job.
CatalogReconcileJob (cron)
-
com.erp.paykeeper.worker.CatalogReconcileJob:@Scheduled(cron = "${paykeeper.catalog-sync-cron:0 0 3 * * ?}")- Для каждого
active paykeeper_accounts:- Guard: если уже есть
pk_catalog_sync_runs.status='running'для этого account — пропустить. - Создать
pk_catalog_sync_runs(status=running, trigger=cron, started_at=now). - Резолвить
franchise_idчерезUserServiceClient. catalogServiceClient.getFullSnapshot(franchise_id)→ получить все products + modifier_groups.- Локальное развёртывание: для каждого product локально вычислить виртуальные варианты (логика дублирует
GET /expandна стороне adapter’а — используем общий хелперProductExpansion). Возможно лучше дернутьexpandдля каждого продукта; выбор трейдоф — решить на реализации. - Diff против всех
paykeeper_products WHERE account_id=X:- Ожидаемый, отсутствует в mapping →
upsert_product - Есть в mapping, отсутствует в ожидаемом →
delete_product - Совпадает по
(product_id, struct_opt, free_opt), разныйhash→upsert_product
- Ожидаемый, отсутствует в mapping →
- Enqueue всё это в
pk_outbox. - После прохождения всего outbox (ждём или проверяем в конце) — обновить
pk_catalog_sync_runsсчётчиками иstatus.
- Guard: если уже есть
- Recovery-режим:
@Scheduled(fixedRate = 3600000)— раз в час ищетpaykeeper_products.last_synced_at < now - 7dи запускает full re-sync сtrigger='webhook_missed'.
CatalogSyncController + Service
-
com.erp.paykeeper.service.CatalogSyncService:initiateManualResync(accountId)→ создаёт run сtrigger='manual', запускает ту же логику что cron.getStatus(accountId)→ aggregatedPkCatalogSyncStatusDto— счётчикиpk_products_synced(COUNT active),erp_products_total(из catalog snapshot, кэш 60с),diverged_count(records с устаревшим hash).listRuns(accountId, limit, since)→ page of runs.getRunDetail(accountId, runId)→ полный run сerrors_json.
-
com.erp.paykeeper.controller.CatalogSyncController— 4 эндпоинта по API.md:POST /internal/paykeeper/accounts/{id}/resync-catalogGET /internal/paykeeper/accounts/{id}/catalog-sync-statusGET /internal/paykeeper/accounts/{id}/catalog-sync-runsGET /internal/paykeeper/accounts/{id}/catalog-sync-runs/{run_id}
- Security —
@PreAuthorizepermissions соответствующие (read/manage). - DTO классы:
PkCatalogSyncStatusDto(сpk_products_synced,erp_products_total,diverged_count),PkCatalogSyncRunDto,PkCatalogSyncRunDetailDto(сerrors_json).
HTTP-клиент Catalog Service
-
com.erp.paykeeper.client.CatalogServiceClient:FullSnapshot getFullSnapshot(UUID franchiseId)→GET /internal/catalog/full-snapshot.ProductExpansion expandProduct(UUID productId)→GET /internal/catalog/products/{id}/expand.
- Использует
X-Service-Token(существующий конфиг). - Retry 3 раза с backoff при 5xx / timeout.
- При
429 RATE_LIMITED— ждёмRetry-Afterи повторяем.
Конфигурация (application.yml + env)
-
CATALOG_SERVICE_URL(defaulthttp://catalog-service:3004) -
CATALOG_SYNC_CRON(default0 0 3 * * ?) -
PK_IMS_API_BASE(defaulthttps://ims-api.paykeeper.ru/api/v1) -
PK_IMS_SERVICE_ID(defaultims-api.paykeeper.ru) -
CATALOG_SYNC_BATCH_SIZE(default 50) -
CATALOG_SYNC_THROTTLE_US(default 500)
Retention + алерты
- Обновить
RetentionJob— удалятьpk_catalog_sync_runsстарше 90 дней. - Slack-webhook алерт при
pk_catalog_sync_runs.status=failed. - Prometheus метрики (через Micrometer):
paykeeper_catalog_sync_last_success_tsgauge peraccount_idpaykeeper_catalog_sync_errors_totalcounter perop_type(толькоupsert_product/delete_product)paykeeper_catalog_products_divergedgauge peraccount_id
Тесты
- Unit
CatalogEventConsumerTest— routing на список accounts, expand-flow для product.upserted, delete-flow для product.deleted, re-expand для modifier_group.* - Unit
ProductExpansionTest(если логика локальная в adapter) — развёртывание для 4 комбинаций (без мод / только free / только struct / оба). Сверка с примерами из бизнес-спеки. - Unit
CatalogSyncServiceTest— guard против параллельного running, status aggregation. - Unit
PkOutboxWorkerTest— новые ветки upsert/delete product (mock ims-client). - Integration
CatalogReconcileJobIT— полный прогон с mock Catalog Service + mock ims-api через WireMock. - Integration
CatalogSyncControllerIT— 4 эндпоинта, валидация permissions. - Integration
PayKeeperImsClientIT— JWT через service-token, retry, throttle.
Deploy
- Обновить envs для
paykeeper-adapter:CATALOG_SERVICE_URL,CATALOG_SYNC_CRON,PK_IMS_API_BASE,PK_IMS_SERVICE_ID,CATALOG_SYNC_BATCH_SIZE,CATALOG_SYNC_THROTTLE_US
- Передеплой
paykeeper-adapterчерез/deploy-all paykeeper-adapter. - Sanity: логи показывают «CatalogReconcileJob initialized» при старте, первый manual resync проходит без ошибок на тестовом аккаунте.
Зависимости
- Catalog Service — должен быть раскатан раньше (event publisher + snapshot + expand endpoints). Без этого consumer не работает.
- User Service — существующий
GET /internal/legal-entities/{id}возвращаетfranchise_id, изменений не требует.