Получение представления потока DynamoDB в режиме, близком к реальному времени, с помощью Python

Потоки DynamoDB помогают реагировать на изменения в таблицах, что обычно используется для создания агрегаций или запуска других рабочих процессов после обновления данных. Получение практически в реальном времени представления об этих потоках также может быть полезно при разработке или отладке приложения Serverless в AWS. Сегодня я поделюсь сценарием на Python, который я создал для подключения к потокам DynamoDB.

Прежде чем мы начнем, я советую вам прочитать мою статью в блоге, которая содержит глубокое погружение в потоки DynamoDB и их реализацию, поскольку сегодня мы будем использовать эти концепции. Вкратце, таблицы DynamoDB состоят из разделов хранения, к которым присоединяются шарды, составляющие поток. Мы можем читать записи из этих разделов и обрабатывать их любым способом.

Наша цель — создать инструмент, который сможет делать именно это и отображать изменения практически в реальном времени. Чтобы создать наш клиент на Python, нам нужно начать с перечисления всех шардов в потоке, что требует рекурсивного вызова API DescribeStream, поскольку в boto3 нет пагинатора для этой операции (пока).

Shard = collections.namedtuple(
    typename="Shard",
    field_names=[
        "stream_arn",
        "shard_id",
        "parent_shard_id",
        "starting_sequence_number",
        "ending_sequence_number"
    ]
)

def list_all_shards(stream_arn: str, **kwargs: dict) -> typing.List[Shard]:

    def _shard_response_to_shard(response: dict) -> Shard:
        return Shard(
            stream_arn=stream_arn,
            shard_id=response.get("ShardId"),
            parent_shard_id=response.get("ParentShardId"),
            starting_sequence_number=response.get("SequenceNumberRange", {}).get("StartingSequenceNumber"),
            ending_sequence_number=response.get("SequenceNumberRange", {}).get("EndingSequenceNumber")
        )

    client = boto3.client("dynamodbstreams")
    pagination_args = {}
    exclusive_start_shard_id = kwargs.get("next_page_identifier", None)
    if exclusive_start_shard_id is not None:
        pagination_args["ExclusiveStartShardId"] = exclusive_start_shard_id

    response = client.describe_stream(
        StreamArn=stream_arn,
        **pagination_args
    )

    list_of_shards = [_shard_response_to_shard(item) for item in response["StreamDescription"]["Shards"]]

    next_page_identifier = response["StreamDescription"].get("LastEvaluatedShardId")
    if next_page_identifier is not None:
        list_of_shards += list_all_shards(
            stream_arn=stream_arn,
            next_page_identifier=next_page_identifier
        )

    return list_of_shards
Вход в полноэкранный режим Выход из полноэкранного режима

Я решил создать небольшой класс под названием Shard, чтобы инкапсулировать концепцию шарда, используя namedtuple из модуля collections. Теперь, когда у нас есть список осколков, нас интересуют только те, которые еще не закрыты, поскольку мы хотим иметь представление о текущих событиях в режиме, близком к реальному времени. Закрытые осколки имеют EndingSequenceNumber, поэтому мы можем отфильтровать их следующим образом.

def is_open_shard(shard: Shard) -> bool:
    return shard.ending_sequence_number is None

def list_open_shards(stream_arn: str) -> typing.List[Shard]:
    all_shards = list_all_shards(
        stream_arn=stream_arn
    )

    open_shards = [shard for shard in all_shards if is_open_shard(shard)]

    return open_shards
Вход в полноэкранный режим Выход из полноэкранного режима

Мы хотим запросить все записи в каждом из этих осколков, для чего создаем итератор осколков и затем используем его для получения записей. API GetRecords также возвращает новый итератор осколков, который мы можем использовать для последующих запросов. Если в ответе нет нового итератора осколков, это означает, что осколок закрыт.

def get_shard_iterator(shard: Shard, iterator_type: str = "LATEST") -> str:
    client = boto3.client("dynamodbstreams")

    response = client.get_shard_iterator(
        StreamArn=shard.stream_arn,
        ShardId=shard.shard_id,
        ShardIteratorType=iterator_type
    )

    return response["ShardIterator"]

def get_next_records(shard_iterator: str) -> typing.Tuple[typing.List[dict], str]:
    client = boto3.client("dynamodbstreams")

    response = client.get_records(
        ShardIterator=shard_iterator
    )

    return response["Records"], response.get("NextShardIterator")
Вход в полноэкранный режим Выход из полноэкранного режима

Чтобы собрать все это вместе, необходимо создать функцию shard_watcher для периодического получения последних записей с определенного шарда. Эта функция получает шард, за который она отвечает, и список функций, которые будут вызываться с каждой полученной записью. Вы можете считать их наблюдателями, а записи — наблюдаемыми, если вы знакомы с шаблоном Observer. Необязательный параметр start_at_oldest определяет, будет ли шард просматриваться с самой старой доступной записи или с самой последней. Мы также немного подождем в цикле перед запросом новых записей. Это делается для того, чтобы не перегружать AWS API.

def shard_watcher(shard: Shard, callables: typing.List[typing.Callable], start_at_oldest = False):

    shard_iterator_type = "TRIM_HORIZON" if start_at_oldest else "LATEST"
    shard_iterator = get_shard_iterator(shard, shard_iterator_type)

    while shard_iterator is not None:
        records, shard_iterator = get_next_records(shard_iterator)

        for record in records:
            for handler in callables:
                handler(record)

        time.sleep(0.5)
Вход в полноэкранный режим Выйти из полноэкранного режима

Это позволяет нам наблюдать за одним шардом, но в действительности поток состоит из множества шардов, и нам нужно наблюдать за всеми ними, чтобы не пропустить изменения. Поэтому я реализовал функцию для управления наблюдателями. Она получает ARN потока и список наблюдателей и использует модуль мультипроцессинга для порождения процесса наблюдателя для каждого шарда, так что они наблюдаются параллельно.

def start_watching(stream_arn: str, callables: typing.List[typing.Callable]) -> None:

    shard_to_watcher: typing.Dict[str, mp.Process] = {}
    initial_loop = True

    while True:

        open_shards = list_open_shards(stream_arn=stream_arn)
        start_at_oldest = True
        if initial_loop:
            start_at_oldest = False
            initial_loop = False

        for shard in open_shards:
            if shard.shard_id not in shard_to_watcher:

                print("Starting watcher for shard:", shard.shard_id)
                args = (shard, callables, start_at_oldest)
                process = mp.Process(target=shard_watcher, args=args)
                shard_to_watcher[shard.shard_id] = process
                process.start()

        time.sleep(10)
Вход в полноэкранный режим Выход из полноэкранного режима

Эта функция периодически перечисляет все шарды в потоке и обеспечивает наличие наблюдателя для каждого шарда. Каждый шард, обнаруженный в первом цикле, будет отслеживаться с самой последней записи на момент запуска функции. В последующих циклах любой вновь обнаруженный осколок будет считываться с самой старой доступной записи. После начала работы мы не хотим пропустить ни одной записи.

Я также реализовал два основных наблюдателя, которые могут обрабатывать изменяющиеся записи. Первая функция печатает сводку об изменении, состоящую из типа операции, временной метки и ключей элемента. Вторая функция является еще более простой и печатает запись.

def print_summary(change_record: dict):

    changed_at:datetime = change_record["dynamodb"]["ApproximateCreationDateTime"]
    event_type:str = change_record["eventName"]

    item_keys:dict = change_record["dynamodb"]["Keys"]
    item_key_list = []
    for key in sorted(item_keys.keys()):
        value = item_keys[key][list(item_keys[key].keys())[0]]
        item_key_list.append(f"{key}={value}")

    output_str = "[{0}] - {1:^6} - {2}".format(changed_at.isoformat(timespec="seconds"), event_type, ", ".join(item_key_list))

    print(output_str)

def print_change_record(change_record: dict):
    print(change_record)
Вход в полноэкранный режим Выход из полноэкранного режима

Я реализовал парсер аргументов, который принимает аргументы командной строки и настраивает все соответствующим образом, чтобы это можно было вызвать извне. Здесь пригодится модуль argparse из стандартной библиотеки.

def main():

    parser = argparse.ArgumentParser(description="See what's going on in DynamoDB Streams in near real-time 🔍")
    parser.add_argument("stream_arn", type=str, help="The ARN of the stream you want to watch.")
    parser.add_argument("--print-record", "-pr", action="store_true", help="Print each change record. If nothing else is selected, this is the default.")
    parser.add_argument("--print-summary", "-ps", action="store_true", help="Print a summary of a change record")
    parsed = parser.parse_args()

    handlers = []
    if parsed.print_record:
        handlers.append(print_change_record)
    if parsed.print_summary:
        handlers.append(print_summary)

    if len(handlers) == 0:
        # When no handlers are set, we default to printing the record
        handlers.append(print_change_record)

    start_watching(parsed.stream_arn, handlers)

if __name__ == "__main__":
    main()
Вход в полноэкранный режим Выход из полноэкранного режима

Благодаря argparse при вызове скрипта мы получаем красивое меню помощи.

$ python dynamodb_streamgazer.py -h
usage: dynamodb_streamgazer.py [-h] [--print-record] [--print-summary] stream_arn

See what's going on in DynamoDB Streams in near real-time 🔍

positional arguments:
  stream_arn            The ARN of the stream you want to watch.

optional arguments:
  -h, --help            show this help message and exit
  --print-record, -pr   Print each change record. If nothing else is selected, this is the default.
  --print-summary, -ps  Print a summary of a change record
Вход в полноэкранный режим Выход из полноэкранного режима

Вот пример вызова скрипта только с опцией сводки. Задержка между изменениями в таблице и появлением результата в консоли незначительна. Также легко реализовать собственные наблюдатели, которые могут выполнять агрегирование или другие действия, подходящие для вашего рабочего процесса.

python dynamodb_streamgazer.py $STREAM_ARN --print-summary
Starting watcher for shard: shardId-00000001653646993166-46aa7561
Starting watcher for shard: shardId-00000001653648537152-e0a56e69
Starting watcher for shard: shardId-00000001653648750475-f3978e9b
Starting watcher for shard: shardId-00000001653657153330-46f0ba41
[2022-05-27T15:35:57+02:00] - INSERT - PK=test, SK=item
[2022-05-27T15:36:13+02:00] - MODIFY - PK=test, SK=item
[2022-05-27T15:36:23+02:00] - REMOVE - PK=test, SK=item
Вход в полноэкранный режим Выход из полноэкранного режима

В этом посте я познакомил вас со скриптом, который позволяет просматривать потоки DynamoDB практически в реальном времени. Код доступен на Github. Надеюсь, вы найдете это полезным, и я с нетерпением жду ваших отзывов и вопросов.


Title Image by Towfiqu barbhuiya on Unsplash

Оцените статью
Procodings.ru
Добавить комментарий