1С KAFKA. НЕ ПИШИТЕ СВОЙ REST СЕРВИС ДЛЯ КАФКИ

1С KAFKA. НЕ ПИШИТЕ СВОЙ REST СЕРВИС ДЛЯ КАФКИ НА ПИТОНЕ.

В телеграмм чатах, на профильных конференциях я часто сталкиваюсь с кейсом когда 1С программисты в проектах внедрения «Kafka для 1С» принимают неоптимальное решение. Обычно оно звучит так – «МЫ написали свой REST сервис». Часто в качестве платформы для написания выбирается Django. Кстати – такую ошибку делают не только 1С-ники, но и соседи (java, Python, .NET, etc разработчики).

Почему данное решение НЕ оптимально? Помимо того, что Django для написания простого Web сервиса – это примерно, как забивать гвозди промышленным плиткорезом, что конечно можно, но не профильно. Так вот, помимо этого, если Вам прилетела задача интегрироваться с Apache Kafka из 1С, а денег на покупку штатной librdkafka не выделили, есть специализированный сервис от компании Confluent – Kafka Rest Proxy.

На нашем вебинаре мы же разбирали этот момент, однако в рамках данной статьи давайте попробуем разобраться как использовать штатный REST Proxy для Apache Kafka из 1С.

 

Маленькое дополнение – все примеры кода будут даны для использования с подсистемой Connector от Владимира Бондаревского. Итак, начнем со ссылок

  • Репозиторий сервиса https://github.com/confluentinc/kafka-rest
  • Репозиторий подсистемы 1С – https://github.com/vbondarevsky/Connector
  • Докер образ REST сервиса для быстрой локальной отладки алгоритмов https://github.com/confluentinc/cp-all-in-one/tree/6.0.1-post/cp-all-in-one-community

Опять же напомню глоссарий – когда мы работаем с Apache Kafka мы должны помнить о семантике:

  • Мы отправляем сообщения в потоковом режиме в виде записей разделов, которые сгруппированы в тему.
  • После чего мы получаем записи из разделов темы по подписке – а в рамках подписки мы или сервер фиксируем/фиксирует смещение относительно начала журнала записей которую получил клиент-подписчик.

Указанный выше глоссарий нам понадобится, поэтому стоит перечитать его дважды – данная семантика нас будет преследовать почти на каждом шагу использования платформы Apache Kafka.

Перед стартом не забудьте выполнить команды подготовки кодирования

mkdir fund-kafka-experiments 

cd fund-kafka-experiments 

git clone https://github.com/confluentinc/cp-all-in-one

Для отладки нам понадобится запустить комплект платформы Kafka

cd cp-all-in-one/cp-all-in-one-communitydocker-compose up -d

Здесь я предполагаю, что у вас установлен Docker Engine – https://docs.docker.com/engine/install/

Обратите внимание – подключаться мы будем по порту 8082 на локальной машине… Для отладки работы нам этого будет достаточно.

1с kafka. Самый простой отправитель

Итак, я предполагаю, что вы скачали подсистему для вызова HTTP сервисов (на сегодня актуальный релиз 2.1.3) и встроили в свою Конфигурацию получив общий модуль для вызова HTTP сервисов.

Как же в этом случае выглядит простой отправитель событий? Необходимо выполнить следующее:

Инициализировать массив записей

Инициализировать само событие

Добавить ЗАПИСЬ в массив ЗАПИСЕЙ

Преобразовать наши объекты в данные – в моем случае я использую JSON

Итак, данные готовы – остается только отправить, для этого мы указываем заголовок – чтобы REST Proxy Kafka понял какой формат данных мы стараемся передать.

И передаем:

Соответственно – мы вольны сами выбирать имя темы, объем передаваемых событий за один раз (размерность массива) и формат – JSON, XML, BASE64 и т.д.

Отдельно стоит отметить, что в возвращаемом значении хранится либо исключение/ошибка – либо смещение в потоке которое было назначено нашим записям, которые мы отправили. Собственно, это массив смещений.

И вот у вас теперь есть адаптер для 1С к серверу Кафка – без регистрации и смс. Фактически ваша задача превращается в творческий копипаст кода отсюда https://docs.confluent.io/platform/current/kafka-rest/quickstart.html

1с kafka. Группа получателей, получатель с автоматической фиксацией транзакцией

C получателем в экосистеме Kafka всегда сложней, здесь нам понадобится понимание того что Kafka это не RabbitMQ 😉 – имеется ввиду что у нас всегда есть не подписчик, а группа подписчиков. Предполагается, что «забиратели данных» будут делать это в многопоточном режиме. Поэтому поведение чуть сложней:

  • Для начала нам понадобится функция, которая формирует правильные данные запроса

  • Состав функции простой: мы явно указываем наш уникальный идентификатор клиента (потока/фонового задания), формат сообщения в котором мы его хотим принять и ключевое слово «самые ранние еще нами не полученные сообщения» 

  • Теперь полученную структуру вместе с уже привычными заголовками мы отправляем в REST Proxy

Дальше есть особенность – уже связанная с отладочным запуском – дело в том что в объекте ВыданнаяПодписка нам выдан адрес внутри Docker DNS – и извне он по умолчанию не доступен, поэтому придется подменить в отладочный целях базовый URL

Итак – мы получили выданную подписку – то есть адрес ОТКУДА мы будем получать в нашем потоке сообщения из потока. Теперь надо указать на какой комплект ТЕМ мы подписываемся…

Внимательный читатель уже понял, что этот вызов также проходит через функции отправки управляющих параметров. А именно КаналПодписки() 

  • Функция простая до невозможности – она формирует «список ТЕМ» на которые мы хотим подписаться.

Финальное мы должны начать получать записи  – здесь важно помнить про 3 ключевых момента:

  • Стоит явно различать структуры «управление подпиской» и структуру управления «сериализацией» – те самые заголовки вызова
    • ЗаголовкиСериализацииЗаписей.Вставить(“Accept”, “application/vnd.kafka.json.v2+json”);
    • ЗаголовкиПодписки.Вставить(“Content-Type”, “application/vnd.kafka.json.v2+json”);
  • По умолчанию вы получите ВСЕ записи с предыдущего подключения, поэтому вызывать получателя нужно как можно чаще. Это вам не librdkafka и NativeAPi компонента.
  • Чтобы вы не делали – нужно явно удалить поток из группы подписчиков – чтобы не было проблем с перебалансировкой – поэтому и используется попытка.

И в итоге у вас теперь есть и подписчик и получатель. Как вы видите из примера кода – реальное сообщение хранится в значении VALUE – советую посмотреть на остальные поля: это метаданные, которые вам назначил потоковый сервер Kafka. Лучше всего это делать при помощи комбинации клавиш Shift+ F9 (имеется ввиду отладчик, точка остановки и просмотр  значения) 

В порядке завершения

Я сознательно сделал скриншоты кода – а не написал код для удобного копипаста. Я искренне надеюсь что вы сами как и я просто напишите этот код руками и проверите – насколько удобно и быстро можно вызывать любой REST сервис… Плюс – исходный код получают покупатели нашего вебинара – https://nizamov.school/courses/integration1s/1s-kafka/ , где мы намного больше рассмотрели кейсов и подходов к Apache Kafka

Ну а данная статья написана в качестве ответа тем, кто потратил время и деньги на написание своего велосипеда, что применительно к платформе Apache Kafka совершенно не нужно – так как основные затраты там на стороне поиска бизнес-кейсов, где бы она была применима. Писать что-то низкоуровневое для прикладного программиста – это совсем непрофильно. Тем более использование, как Вы видите, достаточно простое.