Apache Kafka® представляет собой центральную магистраль данных для все большего числа компаний, а богатая экосистема Apache Kafka® Connect предлагает различные плагины, позволяющие легко интегрировать Kafka с огромным количеством технологий.
При самостоятельном хостинге Apache Kafka можно использовать любой плагин коннектора с открытым исходным кодом, подобрав подходящие банки и параметры конфигурации. Ситуация меняется при использовании управляемого сервиса: даже если список поддерживаемых плагинов может быть потрясающим (посмотрите все, что предоставляет Aiven), вы можете не найти тот самый sink-to-obscure-datatech
, который вы искали. Тем не менее, Aiven принимает предложения и постоянно оценивает новые плагины коннекторов для включения в список, поэтому, если вы обнаружите важный промах, не стесняйтесь предлагать!
Еще один вариант — создать самоуправляемый кластер Apache Kafka® Connect и подключить его к Aiven for Apache Kafka. Таким образом, вы можете выбрать любой коннектор с открытым исходным кодом и при этом воспользоваться управляемым сервисом Kafka, который предлагает Aiven. Вы можете ознакомиться с примером этого процесса на нашем портале для разработчиков, а в этой статье блога мы рассмотрим шаги, необходимые для начала использования одного из лучших коннекторов в городе, который, вероятно, используется 90% новичков: коннектор источника Twitter.
- Создание кластера Apache Kafka
- Создание самоуправляемого кластера Apache Kafka Connect
- Добавьте зависимости коннектора источников Twitter
- Определение конфигурационного файла Apache Kafka Connect
- Запустите локальный кластер Connect
- Настройка доступа к Twitter
- Создайте файл конфигурации исходного коннектора
- Запустите коннектор источника Twitter
- Проверьте выходные данные в Apache Kafka
- Управляемые сервисы Apache Kafka и свобода выбора плагинов коннекторов
Создание кластера Apache Kafka
Давайте быстро рассмотрим эту часть, используя Aiven CLI и специальную функцию service create
. Вы можете ознакомиться со всеми доступными параметрами на специальной странице, а также найти список всех дополнительных параметров настройки, которые предлагает Aiven. Для целей данной статьи в блоге мы будем использовать следующие параметры:
avn service create demo-kafka
--service-type kafka
--cloud google-europe-west3
--plan business-4
-c kafka.auto_create_topics_enable=true
-c kafka_rest=true
Приведенная выше команда создает экземпляр Aiven для Apache Kafka с именем demo-kafka
с сочным планом business-4
над регионом google-europe-west3
. Мы также включаем автоматическое создание тем и REST API, которые мы будем использовать в конце для проверки данных, размещенных в теме.
Пока служба запускается, мы уже можем создать Java keystore и truststore, которые будут использоваться для интеграции локального кластера Apache Kafka Connect в службу demo-kafka
.
Мы можем создать оба хранилища с помощью следующей команды Aiven CLI:
avn service user-kafka-java-creds demo-kafka
-d certsfolder
-p STOREPASSWORD123
--username avnadmin
Вышеприведенная команда загрузит необходимые сертификаты в папку certsfolder
и создаст в той же папке keystore файл client.keystore.p12
и truststore client.truststore.jks
, все защищенные (не очень надежным) паролем STOREPASSWORD123
. Если вы хотите установить различные секреты для защиты хранилищ и ключей, вам стоит просмотреть специальный документ на портале для разработчиков.
Создание самоуправляемого кластера Apache Kafka Connect
Теперь пришло время использовать наши навыки работы с оболочкой, единственное условие — наличие установленного JDK. Давайте начнем с получения двоичных файлов Apache Kafka, мы загрузим версию 3.1.0
.
wget https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
Затем мы можем распаковать его
tar -xzf kafka_2.13-3.1.0.tgz
В результате будет создана папка kafka_2.13-3.1.0
, содержащая все полезности Apache Kafka.
Добавьте зависимости коннектора источников Twitter
Чтобы начать получать данные twitter, мы можем использовать специальный коннектор с открытым исходным кодом. Мы можем получить соответствующий код с помощью:
wget https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz
Распакуйте tar-файл
mkdir twitter-connector
tar -xvf kafka-connect-twitter-0.2.26.tar.gz -C twitter-connector
Приведенная выше команда распакует tar-файл в папку twitter-connector
, которая содержит подпапку usr/share/kafka-connect/kafka-connect-twitter
, содержащую все файлы, необходимые для загрузки кластера Apache Kafka Connect. Мы можем переместить их в подпапку plugin
в папке kafka_2.13-3.1.0
.
mkdir kafka_2.13-3.1.0/plugins
mv twitter-connector/usr/share/kafka-connect/kafka-connect-twitter kafka_2.13-3.1.0/plugins/lib
Определение конфигурационного файла Apache Kafka Connect
Теперь пришло время определить конфигурационный файл, чтобы локальный кластер Kafka Connect указывал на Aiven для Apache Kafka. Мы можем использовать шаблон портала разработчика для создания файла с именем my-connect-distributed.properties
и подставить его:
avn service get demo-kafka --format '{service_uri}'
Запустите локальный кластер Connect
С конфигурацией и всеми необходимыми файлами на месте мы запускаем локальный кластер Apache Kafka Connect с помощью:
./kafka_2.13-3.1.0/bin/connect-distributed.sh ./my-connect-distributed.properties
Настройка доступа к Twitter
Теперь, когда кластер Apache Kafka Connect запущен, мы можем перейти на портал разработчиков на странице портала разработчиков Twitter, чтобы создать новое приложение, которое предоставит нам учетные данные, необходимые коннектору для начала поиска твитов.
- На главной странице приборной панели мы можем создать новый проект, используя конечные точки v2. Для проекта нам нужно указать:
- название проекта
- сценарий использования из множества вариантов, включая Изучение API, Создание бота или Создание потребительского инструмента. Мы можем выбрать один из доступных вариантов, который соответствует нашей цели
- описание проекта, мы можем дать краткое описание нашей цели
- определить, будем ли мы использовать существующее приложение или создадим новое. Поскольку мы новичок в API Twitter, мы создадим новое приложение.
- В разделе App Setup мы можем определить настройки нашего нового приложения:
- среда приложения, для целей статьи в блоге мы можем выбрать Development
- имя приложения, дав приложению запоминающееся имя, мы сможем отследить, для чего используется приложение, имя вроде
app123
вряд ли будет узнаваемым. Мы можем назвать егоtwitter-kafka-connect-<SUFFIX>
, где<SUFFIX>
должен быть уникальным идентификатором (каждое имя приложения должно быть уникальным). - ключи приложений и токены, мы можем генерировать и извлекать необходимые ключи. Нам нужно скопировать из этого раздела API Key и API Key Secret, которые мы позже будем использовать при настройке коннектора Apache Kafka (мы будем ссылаться на них как
TWITTER_API_KEY
иTWITTER_API_SECRET
).
Когда все настроено, выберите App Settings, и мы должны увидеть экран, похожий на следующий, который сообщает нам, что приложение и проект были успешно созданы.
-
Теперь мы можем перейти на вкладку Keys and tokens, где мы можем сгенерировать (или регенерировать) дополнительные секреты, необходимые для работы коннектора.
-
Сгенерируйте токен доступа и секрет, которые мы будем использовать далее в блоге
- Токен доступа:
TWITTER_ACCESS_TOKEN
. - Токен доступа Secret:
TWITTER_ACCESS_TOKEN_SECRET
.
- Последняя настройка на портале разработчика Twitter — это запрос повышенного доступа для нашего проекта. Поскольку Twitter API v2 был выпущен, доступ по умолчанию (Essential) позволяет взаимодействовать только с v2, а коннектор Apache Kafka, который мы будем использовать, по-прежнему использует v1. Мы можем запросить доступ Elevated, нажав на название основного проекта и на кнопку «Apply for Elevated». Нам нужно будет заполнить некоторую информацию, включая уровень навыков кодирования, описание проекта и согласиться с Соглашением разработчика. Запрос официально проверяется Twitter, и если все идет хорошо, то вскоре мы должны получить готовый elevated-аккаунт, хотя иногда это может занять пару часов.
Создайте файл конфигурации исходного коннектора
Мы можем записать полученные выше секреты twitter в конфигурационный файл с именем twitter-source.json
со следующим содержанием
{
"name":"twitter_connector",
"config":
{
"tasks.max":"1",
"connector.class":"com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
"process.deletes":"false",
"filter.keywords":"database",
"kafka.status.topic":"twitter-topic",
"twitter.oauth.consumerKey":"TWITTER_API_KEY",
"twitter.oauth.consumerSecret":"TWITTER_API_SECRET",
"twitter.oauth.accessToken":"TWITTER_ACCESS_TOKEN",
"twitter.oauth.accessTokenSecret":"TWITTER_ACCESS_TOKEN_SECRET"
}
}
Ниже перечислены настраиваемые параметры:
- Значения параметров
twitter.oauth
должны быть изменены с помощью секретов twitter, найденных на предыдущем шаге.
Запустите коннектор источника Twitter
Теперь все детали на месте, поэтому пришло время запустить коннектор, используя REST API Apache Kafka:
curl -s -H "Content-Type: application/json" -X POST
-d @twitter-source.json
http://localhost:8083/connectors/
Проверьте выходные данные в Apache Kafka
Приведенная выше команда curl
использует конечную точку connectors
REST, передавая файл конфигурации коннектора twitter-source.json
. Теперь мы должны увидеть твиты, содержащие apachekafka
, поступающие в тему twitter-topic
Apache Kafka. Поскольку мы включили REST API Karapace, мы можем просмотреть данные темы, зайдя в Aiven Console, нажав на имя сервиса demo-kafka
во вкладке Topic.
Помните, что это потоковое решение, поэтому вы увидите данные в теме только тогда, когда будут записаны твиты, содержащие определенное вами ключевое слово. Если вы хотите протестировать коннектор, напишите твит, содержащий ключевое слово!
Большая новость: теперь мы используем плагин Apache Kafka® Connector, не поддерживаемый Aiven, с Aiven for Apache Kafka! На нас ляжет бремя управления кластером Apache Kafka Connect, но это может быть оптимальным вариантом в случае, если наша технология источника/цели не поддерживается ни одним из доступных коннекторов.
Быстрый совет: Если вам нужно изменить конфигурацию коннектора, возможно, вам придется сначала удалить коннектор с помощью команды
curl -s -X DELETE http://localhost:8083/connectors/twitter_connector
А затем отправить обновленный файл конфигурации с помощью команды curl
с опцией PUSH
, указанной ранее.
Управляемые сервисы Apache Kafka и свобода выбора плагинов коннекторов
Управление полными кластерами Apache Kafka может быть утомительной работой, поэтому использование управляемого сервиса, такого как Aiven для Apache Kafka, обычно является разумной идеей. В особых случаях использования вы можете обнаружить, что нужная вам конфигурация или коннектор не поддерживаются Aiven. Но не отчаивайтесь: вы можете легко интегрировать локальный кластер Apache Kafka Connect, решающий вашу конкретную интеграционную задачу, с Aiven for Apache Kafka и получить преимущества как от общего управляемого решения Apache Kafka, так и от широкого выбора коннекторов с открытым исходным кодом.
Ознакомьтесь со следующими ресурсами, чтобы узнать больше:
- Список поддерживаемых плагинов Apache Kafka Connect
- Список параметров расширенной конфигурации Aiven для Apache Kafka
- Документация по исходному коннектору Twitter
- Подробная статья о том, как настроить локальный коннектор JDBC sink