Начало работы с Apache Kafka с помощью Python

Apache Kafka – это распределенная потоковая система, которая обеспечивает доступ к данным в режиме реального времени. Эта система позволяет нам публиковать и подписываться на потоки данных, хранить их и обрабатывать.

Сообщение

Единица данных в Kafka называется сообщением. Сообщение – это просто массив байтов. Сообщение может иметь дополнительный бит метаданных, который называется ключом.

Для повышения эффективности сообщения записываются в Kafka партиями. Пакет – это просто набор сообщений, все из которых создаются для одной и той же темы и раздела.

Темы

Сообщения в Kafka делятся на темы. Наиболее близким аналогом темы является таблица базы данных или папка в файловой системе. Темы дополнительно разбиваются на несколько разделов. Обратите внимание, что поскольку тема обычно имеет несколько разделов, нет гарантии упорядочивания сообщений по времени во всей теме, только в пределах одного раздела.

Производители и потребители

Производители в Kafka – это те, кто производит и отправляет сообщения в темы. В некоторых случаях производитель направляет сообщения в определенные разделы. Обычно это делается с помощью ключа сообщения и разделителя, который генерирует хэш ключа и сопоставляет его с определенным разделом.

Потребитель подписывается на одну или несколько тем и читает сообщения в том порядке, в котором они были созданы. Потребитель следит за тем, какие сообщения он уже использовал, отслеживая смещение сообщений. Смещение – это еще один бит метаданных, представляющий собой целочисленное значение, которое постоянно увеличивается и которое Kafka добавляет к каждому сообщению по мере его создания. Каждое сообщение в данном разделе имеет уникальное смещение.

Брокеры и кластеры

Одиночный сервер Kafka называется брокером. В зависимости от конкретного оборудования и его характеристик производительности, один брокер может легко обрабатывать тысячи разделов и миллионы сообщений в секунду. Брокеры Kafka разработаны для работы в составе кластера. В кластере брокеров один брокер также будет выполнять функции контроллера кластера.

Сохранение

Ключевой особенностью Apache Kafka является сохранение, то есть долговременное хранение сообщений в течение определенного периода времени. Брокеры Kafka настроены с параметрами хранения тем по умолчанию: сообщения сохраняются в течение определенного периода времени (например, 7 дней) или пока тема не достигнет определенного размера в байтах (например, 1 ГБ). По достижении этих пределов срок хранения сообщений истекает, и они удаляются, так что конфигурация хранения представляет собой минимальный объем данных, доступных в любое время. Отдельные темы также могут быть сконфигурированы с собственными настройками хранения, чтобы сообщения хранились только до тех пор, пока они полезны.

Теперь, когда у нас есть общее представление об Apache Kafka, давайте установим его.

Установка Kafka

Я установлю Apache Kafka на mac с помощью homebrew. Для этого мне просто нужно набрать в терминале:

$ brew install kafka
Войти в полноэкранный режим Выйти из полноэкранного режима

Apache Kafka использует Zookeeper для хранения метаданных о кластере Kafka, а также данных о клиентах-потребителях. Поэтому во время установки будет установлен и Apache Zookeeper. На нашей машине уже должна быть установлена Java.

После установки Kafka мы можем увидеть что-то вроде этого:

Перейдите в эту директорию в отдельных терминальных сессиях, чтобы запустить Zookeeper и Kafka. Это может быть другой путь в зависимости от вашей машины и ОС.

$ cd /usr/local/opt/kafka/bin
Войдите в полноэкранный режим Выйти из полноэкранного режима

Сначала запустим сервер Apache Zookeeper.

$ zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
Войти в полноэкранный режим Выйти из полноэкранного режима

Теперь в другом терминальном сеансе выполните следующую команду

$ kafka-server-start /usr/local/etc/kafka/server.properties
Войти в полноэкранный режим Выйти из полноэкранного режима

Итак, у нас запущены Apache Zookeeper и Apache Kafka, что нам теперь делать? Давайте создадим тему Kafka.

Создание темы Kafka

Теперь давайте создадим тему под названием: first-topic в новой терминальной сессии.

$ kafka-topics --create --topic first-topic 
--bootstrap-server localhost:9092 
--replication-factor 1 --partitions 1

Created topic first-topic.
Войти в полноэкранный режим Выход из полноэкранного режима

Producer

Создать сообщения для первой темы

$ kafka-console-producer --broker-list localhost:9092 
--topic first-topic

>Sunday 1st May 2022
>Data Engineering    
>
Войти в полноэкранный режим Выйти из полноэкранного режима

Потребитель

Потреблять сообщения из первой темы

$ kafka-console-consumer --bootstrap-server localhost:9092 
--topic first-topic --from-beginning

Sunday 1st May 2022
Data Engineering 
Войти в полноэкранный режим Выйти из полноэкранного режима

Список тем

Вывод списка всех тем Kafka в кластере

$ kafka-topics --list --bootstrap-server localhost:9092
Войти в полноэкранный режим Выход из полноэкранного режима

Удалить тему

Мы можем захотеть удалить определенную тему

$ kafka-topics --bootstrap-server localhost:9092 
--delete --topic first-topic
Войти в полноэкранный режим Выйти из полноэкранного режима

Производитель и потребитель с помощью Python

Давайте создадим производителя и потребителя с помощью Python. Сначала нам нужно создать виртуальную среду.

$ python3 -m venv env
Войти в полноэкранный режим Выйти из полноэкранного режима

или

$ python -m venv env
Войти в полноэкранный режим Выйти из полноэкранного режима

Активируйте виртуальную среду, чтобы установить библиотеки

$ source env/bin/activate
Войти в полноэкранный режим Выйти из полноэкранного режима

Установим Python-клиент для Apache Kafka и библиотеки Request

$ pip install kafka-python
$ pip install requests 
Войти в полноэкранный режим Выйти из полноэкранного режима

Python Producer

Теперь давайте погрузимся в наш producer.py

#!/usr/local/bin/python
import sys
import json
import logging
import requests
from kafka import KafkaProducer
from datetime import datetime, timedelta

logging.basicConfig(stream=sys.stdout,
    level=logging.INFO,
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")

logger = logging.getLogger(__name__)


def producer():
    producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

    # get data from public API
    date = datetime.today() - timedelta(days=2)
    previous_date = f"{date.year}-{date.month}-{date.day}"
    url = 'https://indicadores.integrasus.saude.ce.gov.br/api/casos-coronavirus?dataInicio='+ previous_date +'&dataFim=' + previous_date
    req = requests.get(url)
    covid_data = req.json()

    for data in covid_data:
        producer.send('covid-topic', json.dumps(data).encode('utf-8'))
        producer.flush()


if __name__ == "__main__":
    producer()
Войти в полноэкранный режим Выход из полноэкранного режима

Python Consumer

Итак, давайте посмотрим на наш consumer.py

#!/usr/local/bin/python
from kafka import KafkaConsumer


if __name__ == "__main__":
    consumer = KafkaConsumer('covid-topic')
    for data in consumer:
        print(data)
Вход в полноэкранный режим Выход из полноэкранного режима

У вас должно быть два терминальных сеанса для запуска producer.py и consumer.py

Заключение

Мы изучили основные понятия Apache Kafka: сообщение/запись, производители, потребители, темы, брокеры и удержание. Событие фиксирует факт того, что “что-то произошло”. Его также называют записью или сообщением. Производители – это клиентские приложения, которые публикуют (записывают) события в Kafka, а потребители – те, которые подписываются на эти события (читают и обрабатывают их). События организуются и долговременно хранятся в темах. В упрощенном виде тема похожа на папку в файловой системе, а события – это файлы в этой папке.

Оцените статью
Procodings.ru
Добавить комментарий