1С KAFKA. НЕ ПИШИТЕ СВОЙ REST СЕРВИС ДЛЯ КАФКИ НА ПИТОНЕ.
- Опубликовано Илья Низамов
- Разделы Blog
- Дата 07.01.2021
- Комментарии 4 комментария

В телеграмм чатах, на профильных конференциях я часто сталкиваюсь с кейсом когда 1С программисты в проектах внедрения «Kafka для 1С» принимают неоптимальное решение. Обычно оно звучит так – «МЫ написали свой REST сервис». Часто в качестве платформы для написания выбирается Django. Кстати – такую ошибку делают не только 1С-ники, но и соседи (java, Python, .NET, etc разработчики).
Почему данное решение НЕ оптимально? Помимо того, что Django для написания простого Web сервиса – это примерно, как забивать гвозди промышленным плиткорезом, что конечно можно, но не профильно. Так вот, помимо этого, если Вам прилетела задача интегрироваться с Apache Kafka из 1С, а денег на покупку штатной librdkafka не выделили, есть специализированный сервис от компании Confluent – Kafka Rest Proxy.
На нашем вебинаре мы же разбирали этот момент, однако в рамках данной статьи давайте попробуем разобраться как использовать штатный REST Proxy для Apache Kafka из 1С.
Запишись на тренинг
Маленькое дополнение – все примеры кода будут даны для использования с подсистемой Connector от Владимира Бондаревского. Итак, начнем со ссылок
- Репозиторий сервиса https://github.com/confluentinc/kafka-rest
- Репозиторий подсистемы 1С – https://github.com/vbondarevsky/Connector
- Докер образ REST сервиса для быстрой локальной отладки алгоритмов https://github.com/confluentinc/cp-all-in-one/tree/6.0.1-post/cp-all-in-one-community
Опять же напомню глоссарий – когда мы работаем с Apache Kafka мы должны помнить о семантике:
- Мы отправляем сообщения в потоковом режиме в виде записей разделов, которые сгруппированы в тему.
- После чего мы получаем записи из разделов темы по подписке – а в рамках подписки мы или сервер фиксируем/фиксирует смещение относительно начала журнала записей которую получил клиент-подписчик.
Указанный выше глоссарий нам понадобится, поэтому стоит перечитать его дважды – данная семантика нас будет преследовать почти на каждом шагу использования платформы Apache Kafka.
Перед стартом не забудьте выполнить команды подготовки кодирования
mkdir fund-kafka-experiments
cd fund-kafka-experiments
git clone https://github.com/confluentinc/cp-all-in-one
Для отладки нам понадобится запустить комплект платформы Kafka
cd cp-all-in-one/cp-all-in-one-communitydocker-compose up -d
Здесь я предполагаю, что у вас установлен Docker Engine – https://docs.docker.com/engine/install/
Обратите внимание – подключаться мы будем по порту 8082 на локальной машине… Для отладки работы нам этого будет достаточно.
1с kafka. Самый простой отправитель
Итак, я предполагаю, что вы скачали подсистему для вызова HTTP сервисов (на сегодня актуальный релиз 2.1.3) и встроили в свою Конфигурацию получив общий модуль для вызова HTTP сервисов.
Как же в этом случае выглядит простой отправитель событий? Необходимо выполнить следующее:
Инициализировать массив записей
Инициализировать само событие
Добавить ЗАПИСЬ в массив ЗАПИСЕЙ
Преобразовать наши объекты в данные – в моем случае я использую JSON
Итак, данные готовы – остается только отправить, для этого мы указываем заголовок – чтобы REST Proxy Kafka понял какой формат данных мы стараемся передать.
И передаем:
Соответственно – мы вольны сами выбирать имя темы, объем передаваемых событий за один раз (размерность массива) и формат – JSON, XML, BASE64 и т.д.
Отдельно стоит отметить, что в возвращаемом значении хранится либо исключение/ошибка – либо смещение в потоке которое было назначено нашим записям, которые мы отправили. Собственно, это массив смещений.
И вот у вас теперь есть адаптер для 1С к серверу Кафка – без регистрации и смс. Фактически ваша задача превращается в творческий копипаст кода отсюда https://docs.confluent.io/platform/current/kafka-rest/quickstart.html
1с kafka. Группа получателей, получатель с автоматической фиксацией транзакцией
C получателем в экосистеме Kafka всегда сложней, здесь нам понадобится понимание того что Kafka это не RabbitMQ 😉 – имеется ввиду что у нас всегда есть не подписчик, а группа подписчиков. Предполагается, что «забиратели данных» будут делать это в многопоточном режиме. Поэтому поведение чуть сложней:
- Для начала нам понадобится функция, которая формирует правильные данные запроса
- Состав функции простой: мы явно указываем наш уникальный идентификатор клиента (потока/фонового задания), формат сообщения в котором мы его хотим принять и ключевое слово «самые ранние еще нами не полученные сообщения»
- Теперь полученную структуру вместе с уже привычными заголовками мы отправляем в REST Proxy
Дальше есть особенность – уже связанная с отладочным запуском – дело в том что в объекте ВыданнаяПодписка нам выдан адрес внутри Docker DNS – и извне он по умолчанию не доступен, поэтому придется подменить в отладочный целях базовый URL
Итак – мы получили выданную подписку – то есть адрес ОТКУДА мы будем получать в нашем потоке сообщения из потока. Теперь надо указать на какой комплект ТЕМ мы подписываемся…
Внимательный читатель уже понял, что этот вызов также проходит через функции отправки управляющих параметров. А именно КаналПодписки()
- Функция простая до невозможности – она формирует «список ТЕМ» на которые мы хотим подписаться.
Финальное мы должны начать получать записи – здесь важно помнить про 3 ключевых момента:
- Стоит явно различать структуры «управление подпиской» и структуру управления «сериализацией» – те самые заголовки вызова
- ЗаголовкиСериализацииЗаписей.Вставить(“Accept”, “application/vnd.kafka.json.v2+json”);
- ЗаголовкиПодписки.Вставить(“Content-Type”, “application/vnd.kafka.json.v2+json”);
- По умолчанию вы получите ВСЕ записи с предыдущего подключения, поэтому вызывать получателя нужно как можно чаще. Это вам не librdkafka и NativeAPi компонента.
- Чтобы вы не делали – нужно явно удалить поток из группы подписчиков – чтобы не было проблем с перебалансировкой – поэтому и используется попытка.
И в итоге у вас теперь есть и подписчик и получатель. Как вы видите из примера кода – реальное сообщение хранится в значении VALUE – советую посмотреть на остальные поля: это метаданные, которые вам назначил потоковый сервер Kafka. Лучше всего это делать при помощи комбинации клавиш Shift+F9 (имеется ввиду отладчик, точка остановки и просмотр значения)
В порядке завершения
Я сознательно сделал скриншоты кода – а не написал код для удобного копипаста. Я искренне надеюсь что вы сами как и я просто напишите этот код руками и проверите – насколько удобно и быстро можно вызывать любой REST сервис… Плюс – исходный код получают покупатели нашего вебинара – https://nizamov.school/courses/integration1s/1s-kafka/ , где мы намного больше рассмотрели кейсов и подходов к Apache Kafka
Ну а данная статья написана в качестве ответа тем, кто потратил время и деньги на написание своего велосипеда, что применительно к платформе Apache Kafka совершенно не нужно – так как основные затраты там на стороне поиска бизнес-кейсов, где бы она была применима. Писать что-то низкоуровневое для прикладного программиста – это совсем непрофильно. Тем более использование, как Вы видите, достаточно простое.
1с kafka. Первый блок из платного курса.
Вам также может понравиться

GIGACHAT ИЛИ CHATGPT ИИ МЕНЕДЖЕР ДЛЯ 1С

ИИ МЕНЕДЖЕР НА БАЗЕ GIGACHAT

4 Комментария
В новой версии коннектораHttp можно заранее не создавать строку json, а передавать структуру в параметрах и сам коннектор ее превратит в json.
примерно так:
ДополнительныеПараметры = Новый Структура;
ДополнительныеПараметры.Вставить(“Заголовки”, Заголовки);
ДополнительныеПараметры.Вставить(“Json”, Структура);
ДополнительныеПараметры.Вставить(“Таймаут”, 10);
Ответ = КоннекторHTTP.Post(Адрес,,ДополнительныеПараметры);
сделано избыточно чтобы код читался и не казался магией (во первых), а во вторых чтобы люди хотя бы встроили Коннектор в Конфу – народ в массе своей почему-то опасается.
а так да – в документации так и написано что можно сразу объект передавать.
И еще одно дополнение по поводу “невозможности” установить параметры с точкой для отправки в качестве параметров функций (“auto.offset.reset”). Надо просто создавать не Структуру параметров, а Соответствие. Там с точкой имена свойств разрешены, а Коннектор её примет как родную и нормально передаст.