1С KAFKA. НЕ ПИШИТЕ СВОЙ REST СЕРВИС ДЛЯ КАФКИ НА ПИТОНЕ.
- Опубликовано Илья Низамов
- Разделы Blog
- Дата 07.01.2021
- Комментарии 4 комментария
В телеграмм чатах, на профильных конференциях я часто сталкиваюсь с кейсом когда 1С программисты в проектах внедрения «Kafka для 1С» принимают неоптимальное решение. Обычно оно звучит так – «МЫ написали свой REST сервис». Часто в качестве платформы для написания выбирается Django. Кстати – такую ошибку делают не только 1С-ники, но и соседи (java, Python, .NET, etc разработчики).
Почему данное решение НЕ оптимально? Помимо того, что Django для написания простого Web сервиса – это примерно, как забивать гвозди промышленным плиткорезом, что конечно можно, но не профильно. Так вот, помимо этого, если Вам прилетела задача интегрироваться с Apache Kafka из 1С, а денег на покупку штатной librdkafka не выделили, есть специализированный сервис от компании Confluent – Kafka Rest Proxy.
На нашем вебинаре мы же разбирали этот момент, однако в рамках данной статьи давайте попробуем разобраться как использовать штатный REST Proxy для Apache Kafka из 1С.
Запишись на тренинг
Маленькое дополнение – все примеры кода будут даны для использования с подсистемой Connector от Владимира Бондаревского. Итак, начнем со ссылок
- Репозиторий сервиса https://github.com/confluentinc/kafka-rest
- Репозиторий подсистемы 1С – https://github.com/vbondarevsky/Connector
- Докер образ REST сервиса для быстрой локальной отладки алгоритмов https://github.com/confluentinc/cp-all-in-one/tree/6.0.1-post/cp-all-in-one-community
Опять же напомню глоссарий – когда мы работаем с Apache Kafka мы должны помнить о семантике:
- Мы отправляем сообщения в потоковом режиме в виде записей разделов, которые сгруппированы в тему.
- После чего мы получаем записи из разделов темы по подписке – а в рамках подписки мы или сервер фиксируем/фиксирует смещение относительно начала журнала записей которую получил клиент-подписчик.
Указанный выше глоссарий нам понадобится, поэтому стоит перечитать его дважды – данная семантика нас будет преследовать почти на каждом шагу использования платформы Apache Kafka.
Перед стартом не забудьте выполнить команды подготовки кодирования
mkdir fund-kafka-experiments
cd fund-kafka-experiments
git clone https://github.com/confluentinc/cp-all-in-one
Для отладки нам понадобится запустить комплект платформы Kafka
cd cp-all-in-one/cp-all-in-one-communitydocker-compose up -d
Здесь я предполагаю, что у вас установлен Docker Engine – https://docs.docker.com/engine/install/
Обратите внимание – подключаться мы будем по порту 8082 на локальной машине… Для отладки работы нам этого будет достаточно.
1с kafka. Самый простой отправитель
Итак, я предполагаю, что вы скачали подсистему для вызова HTTP сервисов (на сегодня актуальный релиз 2.1.3) и встроили в свою Конфигурацию получив общий модуль для вызова HTTP сервисов.
Как же в этом случае выглядит простой отправитель событий? Необходимо выполнить следующее:
Инициализировать массив записей
Инициализировать само событие
Добавить ЗАПИСЬ в массив ЗАПИСЕЙ
Преобразовать наши объекты в данные – в моем случае я использую JSON
Итак, данные готовы – остается только отправить, для этого мы указываем заголовок – чтобы REST Proxy Kafka понял какой формат данных мы стараемся передать.
И передаем:
Соответственно – мы вольны сами выбирать имя темы, объем передаваемых событий за один раз (размерность массива) и формат – JSON, XML, BASE64 и т.д.
Отдельно стоит отметить, что в возвращаемом значении хранится либо исключение/ошибка – либо смещение в потоке которое было назначено нашим записям, которые мы отправили. Собственно, это массив смещений.
И вот у вас теперь есть адаптер для 1С к серверу Кафка – без регистрации и смс. Фактически ваша задача превращается в творческий копипаст кода отсюда https://docs.confluent.io/platform/current/kafka-rest/quickstart.html
1с kafka. Группа получателей, получатель с автоматической фиксацией транзакцией
C получателем в экосистеме Kafka всегда сложней, здесь нам понадобится понимание того что Kafka это не RabbitMQ 😉 – имеется ввиду что у нас всегда есть не подписчик, а группа подписчиков. Предполагается, что «забиратели данных» будут делать это в многопоточном режиме. Поэтому поведение чуть сложней:
- Для начала нам понадобится функция, которая формирует правильные данные запроса
- Состав функции простой: мы явно указываем наш уникальный идентификатор клиента (потока/фонового задания), формат сообщения в котором мы его хотим принять и ключевое слово «самые ранние еще нами не полученные сообщения»
- Теперь полученную структуру вместе с уже привычными заголовками мы отправляем в REST Proxy
Дальше есть особенность – уже связанная с отладочным запуском – дело в том что в объекте ВыданнаяПодписка нам выдан адрес внутри Docker DNS – и извне он по умолчанию не доступен, поэтому придется подменить в отладочный целях базовый URL
Итак – мы получили выданную подписку – то есть адрес ОТКУДА мы будем получать в нашем потоке сообщения из потока. Теперь надо указать на какой комплект ТЕМ мы подписываемся…
Внимательный читатель уже понял, что этот вызов также проходит через функции отправки управляющих параметров. А именно КаналПодписки()
- Функция простая до невозможности – она формирует «список ТЕМ» на которые мы хотим подписаться.
Финальное мы должны начать получать записи – здесь важно помнить про 3 ключевых момента:
- Стоит явно различать структуры «управление подпиской» и структуру управления «сериализацией» – те самые заголовки вызова
- ЗаголовкиСериализацииЗаписей.Вставить(“Accept”, “application/vnd.kafka.json.v2+json”);
- ЗаголовкиПодписки.Вставить(“Content-Type”, “application/vnd.kafka.json.v2+json”);
- По умолчанию вы получите ВСЕ записи с предыдущего подключения, поэтому вызывать получателя нужно как можно чаще. Это вам не librdkafka и NativeAPi компонента.
- Чтобы вы не делали – нужно явно удалить поток из группы подписчиков – чтобы не было проблем с перебалансировкой – поэтому и используется попытка.
И в итоге у вас теперь есть и подписчик и получатель. Как вы видите из примера кода – реальное сообщение хранится в значении VALUE – советую посмотреть на остальные поля: это метаданные, которые вам назначил потоковый сервер Kafka. Лучше всего это делать при помощи комбинации клавиш Shift+F9 (имеется ввиду отладчик, точка остановки и просмотр значения)
В порядке завершения
Я сознательно сделал скриншоты кода – а не написал код для удобного копипаста. Я искренне надеюсь что вы сами как и я просто напишите этот код руками и проверите – насколько удобно и быстро можно вызывать любой REST сервис… Плюс – исходный код получают покупатели нашего вебинара – https://nizamov.school/courses/integration1s/1s-kafka/ , где мы намного больше рассмотрели кейсов и подходов к Apache Kafka
Ну а данная статья написана в качестве ответа тем, кто потратил время и деньги на написание своего велосипеда, что применительно к платформе Apache Kafka совершенно не нужно – так как основные затраты там на стороне поиска бизнес-кейсов, где бы она была применима. Писать что-то низкоуровневое для прикладного программиста – это совсем непрофильно. Тем более использование, как Вы видите, достаточно простое.
4 Комментария
В новой версии коннектораHttp можно заранее не создавать строку json, а передавать структуру в параметрах и сам коннектор ее превратит в json.
примерно так:
ДополнительныеПараметры = Новый Структура;
ДополнительныеПараметры.Вставить(“Заголовки”, Заголовки);
ДополнительныеПараметры.Вставить(“Json”, Структура);
ДополнительныеПараметры.Вставить(“Таймаут”, 10);
Ответ = КоннекторHTTP.Post(Адрес,,ДополнительныеПараметры);
сделано избыточно чтобы код читался и не казался магией (во первых), а во вторых чтобы люди хотя бы встроили Коннектор в Конфу – народ в массе своей почему-то опасается.
а так да – в документации так и написано что можно сразу объект передавать.
И еще одно дополнение по поводу “невозможности” установить параметры с точкой для отправки в качестве параметров функций (“auto.offset.reset”). Надо просто создавать не Структуру параметров, а Соответствие. Там с точкой имена свойств разрешены, а Коннектор её примет как родную и нормально передаст.