Apache Kafka – это распределенная потоковая система, которая обеспечивает доступ к данным в режиме реального времени. Эта система позволяет нам публиковать и подписываться на потоки данных, хранить их и обрабатывать.
Сообщение
Единица данных в Kafka называется сообщением. Сообщение – это просто массив байтов. Сообщение может иметь дополнительный бит метаданных, который называется ключом.
Для повышения эффективности сообщения записываются в Kafka партиями. Пакет – это просто набор сообщений, все из которых создаются для одной и той же темы и раздела.
Темы
Сообщения в Kafka делятся на темы. Наиболее близким аналогом темы является таблица базы данных или папка в файловой системе. Темы дополнительно разбиваются на несколько разделов. Обратите внимание, что поскольку тема обычно имеет несколько разделов, нет гарантии упорядочивания сообщений по времени во всей теме, только в пределах одного раздела.
Производители и потребители
Производители в Kafka – это те, кто производит и отправляет сообщения в темы. В некоторых случаях производитель направляет сообщения в определенные разделы. Обычно это делается с помощью ключа сообщения и разделителя, который генерирует хэш ключа и сопоставляет его с определенным разделом.
Потребитель подписывается на одну или несколько тем и читает сообщения в том порядке, в котором они были созданы. Потребитель следит за тем, какие сообщения он уже использовал, отслеживая смещение сообщений. Смещение – это еще один бит метаданных, представляющий собой целочисленное значение, которое постоянно увеличивается и которое Kafka добавляет к каждому сообщению по мере его создания. Каждое сообщение в данном разделе имеет уникальное смещение.
Брокеры и кластеры
Одиночный сервер Kafka называется брокером. В зависимости от конкретного оборудования и его характеристик производительности, один брокер может легко обрабатывать тысячи разделов и миллионы сообщений в секунду. Брокеры Kafka разработаны для работы в составе кластера. В кластере брокеров один брокер также будет выполнять функции контроллера кластера.
Сохранение
Ключевой особенностью Apache Kafka является сохранение, то есть долговременное хранение сообщений в течение определенного периода времени. Брокеры Kafka настроены с параметрами хранения тем по умолчанию: сообщения сохраняются в течение определенного периода времени (например, 7 дней) или пока тема не достигнет определенного размера в байтах (например, 1 ГБ). По достижении этих пределов срок хранения сообщений истекает, и они удаляются, так что конфигурация хранения представляет собой минимальный объем данных, доступных в любое время. Отдельные темы также могут быть сконфигурированы с собственными настройками хранения, чтобы сообщения хранились только до тех пор, пока они полезны.
Теперь, когда у нас есть общее представление об Apache Kafka, давайте установим его.
Установка Kafka
Я установлю Apache Kafka на mac с помощью homebrew. Для этого мне просто нужно набрать в терминале:
$ brew install kafka
Apache Kafka использует Zookeeper для хранения метаданных о кластере Kafka, а также данных о клиентах-потребителях. Поэтому во время установки будет установлен и Apache Zookeeper. На нашей машине уже должна быть установлена Java.
После установки Kafka мы можем увидеть что-то вроде этого:
Перейдите в эту директорию в отдельных терминальных сессиях, чтобы запустить Zookeeper и Kafka. Это может быть другой путь в зависимости от вашей машины и ОС.
$ cd /usr/local/opt/kafka/bin
Сначала запустим сервер Apache Zookeeper.
$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Теперь в другом терминальном сеансе выполните следующую команду
$ kafka-server-start /usr/local/etc/kafka/server.properties
Итак, у нас запущены Apache Zookeeper и Apache Kafka, что нам теперь делать? Давайте создадим тему Kafka.
Создание темы Kafka
Теперь давайте создадим тему под названием: first-topic в новой терминальной сессии.
$ kafka-topics --create --topic first-topic
--bootstrap-server localhost:9092
--replication-factor 1 --partitions 1
Created topic first-topic.
Producer
Создать сообщения для первой темы
$ kafka-console-producer --broker-list localhost:9092
--topic first-topic
>Sunday 1st May 2022
>Data Engineering
>
Потребитель
Потреблять сообщения из первой темы
$ kafka-console-consumer --bootstrap-server localhost:9092
--topic first-topic --from-beginning
Sunday 1st May 2022
Data Engineering
Список тем
Вывод списка всех тем Kafka в кластере
$ kafka-topics --list --bootstrap-server localhost:9092
Удалить тему
Мы можем захотеть удалить определенную тему
$ kafka-topics --bootstrap-server localhost:9092
--delete --topic first-topic
Производитель и потребитель с помощью Python
Давайте создадим производителя и потребителя с помощью Python. Сначала нам нужно создать виртуальную среду.
$ python3 -m venv env
или
$ python -m venv env
Активируйте виртуальную среду, чтобы установить библиотеки
$ source env/bin/activate
Установим Python-клиент для Apache Kafka и библиотеки Request
$ pip install kafka-python
$ pip install requests
Python Producer
Теперь давайте погрузимся в наш producer.py
#!/usr/local/bin/python
import sys
import json
import logging
import requests
from kafka import KafkaProducer
from datetime import datetime, timedelta
logging.basicConfig(stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def producer():
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# get data from public API
date = datetime.today() - timedelta(days=2)
previous_date = f"{date.year}-{date.month}-{date.day}"
url = 'https://indicadores.integrasus.saude.ce.gov.br/api/casos-coronavirus?dataInicio='+ previous_date +'&dataFim=' + previous_date
req = requests.get(url)
covid_data = req.json()
for data in covid_data:
producer.send('covid-topic', json.dumps(data).encode('utf-8'))
producer.flush()
if __name__ == "__main__":
producer()
Python Consumer
Итак, давайте посмотрим на наш consumer.py
#!/usr/local/bin/python
from kafka import KafkaConsumer
if __name__ == "__main__":
consumer = KafkaConsumer('covid-topic')
for data in consumer:
print(data)
У вас должно быть два терминальных сеанса для запуска producer.py и consumer.py
Заключение
Мы изучили основные понятия Apache Kafka: сообщение/запись, производители, потребители, темы, брокеры и удержание. Событие фиксирует факт того, что “что-то произошло”. Его также называют записью или сообщением. Производители – это клиентские приложения, которые публикуют (записывают) события в Kafka, а потребители – те, которые подписываются на эти события (читают и обрабатывают их). События организуются и долговременно хранятся в темах. В упрощенном виде тема похожа на папку в файловой системе, а события – это файлы в этой папке.