Ввод интернет-данных в потоковый график Quine

В предыдущей статье этого цикла (Quine Ingest Streams) был представлен входящий поток и базовая структура для его создания. В этой статье я углублюсь, исследуя входящий запрос и его роль в входящем потоке.

Краткий обзор входящих потоков:

  • Поток ввода соединяет Quine с производителями данных.
  • Потоки ввода используют обратное давление, чтобы избежать перегрузки.
  • Данные преобразуются в потоковом графе с помощью входящего запроса.
  • Использование idFrom позволяет нам действовать так, как будто все узлы в графе уже существуют.
  • Ingest-потоки создаются либо с помощью вызовов API, либо с помощью Recipes.

В этой статье мы используем встроенный рецепт wikipedia в качестве отправной точки.

Определение входящего потока

Рецепт ingest страницы wikipedia определяет поток ingest, который получает обновления из потока событий mediawiki.page-create.

Вот копия входящего потока из рецепта.

ingestStreams:
  - type: ServerSentEventsIngest
    url: https://stream.wikimedia.org/v2/stream/page-create
    format:
      type: CypherJson
      query: |-
        MATCH (revNode) WHERE id(revNode) = idFrom("revision", $that.rev_id)
        MATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)
        MATCH (userNode) WHERE id(userNode) = idFrom("id", $that.performer.user_id)
        SET revNode = $that, revNode.type = "rev"
        SET dbNode.database = $that.database, dbNode.type = "db"
        SET userNode = $that.performer, userNode.type = "user"
        WITH *, datetime($that.rev_timestamp) AS d
        CALL create.setLabels(revNode, ["rev:" + $that.page_title])
        CALL create.setLabels(dbNode, ["db:" + $that.database])
        CALL create.setLabels(userNode, ["user:" + $that.performer.user_text])
        CALL reify.time(d, ["year", "month", "day", "hour", "minute"]) YIELD node AS timeNode
        CALL incrementCounter(timeNode, "count")
        CREATE (revNode)-[:at]->(timeNode)
        CREATE (revNode)-[:db]->(dbNode)
        CREATE (revNode)-[:by]->(userNode)
Вход в полноэкранный режим Выход из полноэкранного режима

Этот поток имеет три элемента, type, url и format. Объявление type для входящего потока устанавливает структуру для определения объекта входящего потока. Данный поток является потоком ServerSentEventsIngest.

Просмотр документации по схеме ServerSentEventsIngest из документации API предоставляет нам схему, которой мы должны следовать для определения входящего потока.

При первом открытии определение схемы по умолчанию будет иметь значение File Ingest Stream.

Обязательно нажмите на стрелку вниз 🔽 рядом с File Ingest Stream и выберите Server Sent Events Stream из выпадающего списка, чтобы просмотреть правильную схему.

Вот схема для ServerSentEventsIngest.

Структура потока ServerSentEventsIngest довольно проста.

Данные page-create Википедии

Небольшое замечание: прежде чем начать разбирать запрос ingest, нам нужно понять, с какими данными мы работаем.

Вот образец объекта page-create json для рассмотрения. Посмотрите другие примеры, посетив страницу потоков событий Википедии, выбрав поток mediawiki.page-create и нажав зеленую кнопку «Stream».

{
    "$schema": "/mediawiki/revision/create/1.1.0",
    "meta": {
        "uri": "https://en.wikipedia.org/wiki/Established_population",
        "request_id": "85b7bd4b-23a5-4c20-84a1-d89430c21f6c",
        "id": "8a34f1c0-a276-4a2b-ae2e-305f8822011c",
        "dt": "2022-05-20T16:43:34Z",
        "domain": "en.wikipedia.org",
        "stream": "mediawiki.page-create",
        "topic": "eqiad.mediawiki.page-create",
        "partition": 0,
        "offset": 231788500
    },
    "database": "enwiki",
    "page_id": 70828723,
    "page_title": "Established_population",
    "page_namespace": 0,
    "rev_id": 1088883819,
    "rev_timestamp": "2022-05-20T16:43:33Z",
    "rev_sha1": "d9uoc7gw3cj3ejhs8ihvsi61hp54icq",
    "rev_minor_edit": false,
    "rev_len": 82,
    "rev_content_model": "wikitext",
    "rev_content_format": "text/x-wiki",
    "performer": {
        "user_text": "Invasive Spices",
        "user_groups": [
            "extendedconfirmed",
            "*",
            "user",
            "autoconfirmed"
        ],
        "user_is_bot": false,
        "user_id": 40272459,
        "user_registration_dt": "2020-09-30T23:11:08Z",
        "user_edit_count": 9319
    },
    "page_is_redirect": true,
    "comment": "#REDIRECT [[Naturalisation (biology)]] {{R cat shell| {{R from related topic}} }}",
    "parsedcomment": "#REDIRECT <a href="/wiki/Naturalisation_(biology)" title="Naturalisation (biology)">Naturalisation (biology)</a> {{R cat shell| {{R from related topic}} }}",
    "rev_slots": {
        "main": {
            "rev_slot_content_model": "wikitext",
            "rev_slot_sha1": "d9uoc7gw3cj3ejhs8ihvsi61hp54icq",
            "rev_slot_size": 82,
            "rev_slot_origin_rev_id": 1088883819
        }
    }
}
Вход в полноэкранный режим Выход из полноэкранного режима

Уделите некоторое время ознакомлению со схемой page-create из документации wikipedia API. Пример объекта немного беспорядочен, чтобы мы могли понять, что происходит, поэтому давайте немного почистим его. Показывая только ключи объекта с помощью jq, гораздо проще спланировать наш запрос на получение информации.

❯ jq '. | keys' /tmp/data.json
[
  "$schema",
  "comment",
  "database",
  "meta",
  "page_id",
  "page_is_redirect",
  "page_namespace",
  "page_title",
  "parsedcomment",
  "performer",
  "rev_content_format",
  "rev_content_model",
  "rev_id",
  "rev_len",
  "rev_minor_edit",
  "rev_sha1",
  "rev_slots",
  "rev_timestamp"
]
Вход в полноэкранный режим Выход из полноэкранного режима

Рецепт mediawiki — это пример использования пользовательской функции reify.time. Она создает временные узлы в графе и связи с узлами page-create на основе rev_timestamp.

Демонстрируя функцию reify.time, наш запрос ingest создает узлы ревизии, узлы db и узлы пользователя, которые связаны друг с другом и их репрезентативными временными узлами.

Чтобы узнать больше о создании узлов временного ряда в Quine, прочитайте о реификации времени здесь.

Запрос на ввод

Запрос на ввод — это рабочая лошадка потока ввода данных. Каждая дата, в данном случае объект page-create, обрабатывается запросом ingest. Запрос написан на языке Cypher и отвечает за разбор данных, создание узлов, хранение данных и установление связей в потоковом графе.

Сначала запрос ingest создает нужные нам узлы с помощью MATCH и WHERE. Узел id назначается с помощью функции idFrom.

MATCH (revNode) WHERE id(revNode) = idFrom("revision", $that.rev_id)
MATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)
MATCH (userNode) WHERE id(userNode) = idFrom("id", $that.performer.user_id)
Вход в полноэкранный режим Выход из полноэкранного режима

Обратите внимание, что мы передаем два параметра функции idFrom. Первый параметр задает уникальное пространство имен для id, чтобы избежать коллизий. Второй параметр — это rev_id из объекта page-create. Результатом idFrom является детерминированный UUID для каждого узла.

Далее мы сохраняем значения rev, db и user как свойства соответствующих узлов и помечаем каждый узел для наглядности в проводнике графа. Quine разбирает вложенную строку и сохраняет результаты в переменной $that. Вы можете извлекать значения из проанализированной переменной, используя точечную нотацию в виде $that.<attribute>.

SET revNode = $that, revNode.type = "rev"
SET dbNode.database = $that.database, dbNode.type = "db"
SET userNode = $that.performer, userNode.type = "user"
CALL create.setLabels(revNode, ["rev:" + $that.page_title])
CALL create.setLabels(dbNode, ["db:" + $that.database])
CALL create.setLabels(userNode, ["user:" + $that.performer.user_text])
Вход в полноэкранный режим Выход из полноэкранного режима

В этой простой строке происходит довольно много событий. В частности, использование WITH *. Давайте уделим немного времени, чтобы понять, почему мы решили использовать этот паттерн.

Вызывая WITH *, Cypher изменяет объем доступных данных. Если вы явно перечислите каждый узел в данных и случайно опустите переменную, она будет потеряна для остальной части запроса, и вы можете получить непредвиденные ошибки. Использование glob гарантирует, что все узлы и переменные будут в вашем распоряжении в запросе ingest.

WITH *, datetime($that.rev_timestamp) AS d
Вход в полноэкранный режим Выход из полноэкранного режима

Запрос ingest делает CALL к функции reify.time для создания нового timeNode. Результирующий узел основан на годе, месяце, дне, часе и минуте из rev_timestamp. Он также увеличивает параметр count узла timeNode.

CALL reify.time(d, ["year", "month", "day", "hour", "minute"]) 
    YIELD node AS timeNode
CALL incrementCounter(timeNode, "count")
Вход в полноэкранный режим Выход из полноэкранного режима

Наконец, запрос ingest создает связи между узлами графа.

CREATE (revNode)-[:at]->(timeNode)
CREATE (revNode)-[:db]->(dbNode)
CREATE (revNode)-[:by]->(userNode)
Войти в полноэкранный режим Выход из полноэкранного режима

Запуск рецепта Википедии

Теперь давайте запустим рецепт, чтобы увидеть, как запрос ingest строит граф в Quine. Загрузив последний jar-файл Quine с сайта Quine.io, запустите рецепт из командной строки.

❯ java -jar quine-x.x.x.jar -r wikipedia
Войдите в полноэкранный режим Выйти из полноэкранного режима

Рецепт включает в себя постоянный запрос, который выводит узлы на терминал по мере их поступления. После запуска рецепта вы должны быстро увидеть активность.

Пока граф не стал слишком большим, откройте Quine explorer (http:/0.0.0.0:8080) и запустите хранимый запрос временных узлов. Каждый из временных узлов был создан запросом ingest с использованием временной метки в объекте page-create.

Мы называем их синтетическими узлами. Синтетические узлы полезны при поиске абстрактных закономерностей между слабо связанными узлами. В данном случае, какие обновления были сделаны в течение определенного промежутка времени.
![[Снимок экрана 2022-05-20 at 4.09.17 PM.png]]
Используя API, давайте проверим поток ingest, используя конечную точку ingest.

❯ http GET http://0.0.0.0:8080/api/v1/ingest Content-Type:application/json
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Type: application/json
Date: Fri, 20 May 2022 20:06:01 GMT
Server: akka-http/10.2.9
Transfer-Encoding: chunked

{
    "INGEST-1": {
        "settings": {
            "format": {
                "parameter": "that",
                "query": "MATCH (revNode) WHERE id(revNode) = idFrom("revision", $that.rev_id)nMATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)nMATCH (userNode) WHERE id(userNode) = idFrom("id", $that.performer.user_id)nSET revNode = $that, revNode.type = "rev"nSET dbNode.database = $that.database, dbNode.type = "db"nSET userNode = $that.performer, userNode.type = "user"nWITH *, datetime($that.rev_timestamp) AS dnCALL create.setLabels(revNode, ["rev:" + $that.page_title])nCALL create.setLabels(dbNode, ["db:" + $that.database])nCALL create.setLabels(userNode, ["user:" + $that.performer.user_text])nCALL reify.time(d, ["year", "month", "day", "hour", "minute"]) YIELD node AS timeNodenCALL incrementCounter(timeNode, "count")nCREATE (revNode)-[:at]->(timeNode)nCREATE (revNode)-[:db]->(dbNode)nCREATE (revNode)-[:by]->(userNode)",
                "type": "CypherJson"
            },
            "parallelism": 16,
            "type": "ServerSentEventsIngest",
            "url": "https://stream.wikimedia.org/v2/stream/page-create"
        },
        "stats": {
            "byteRates": {
                "count": 1354157,
                "fifteenMinute": 1552.6927122874843,
                "fiveMinute": 1398.959143968717,
                "oneMinute": 1099.4731678954581,
                "overall": 1448.3578957557581
            },
            "ingestedCount": 914,
            "rates": {
                "count": 914,
                "fifteenMinute": 1.0510781922502073,
                "fiveMinute": 0.9474472912218986,
                "oneMinute": 0.7431750446830565,
                "overall": 0.9775815796950665
            },
            "startTime": "2022-05-20T19:50:26.494025Z",
            "totalRuntime": 934608
        },
        "status": "Running"
    }
}
Вход в полноэкранный режим Выход из полноэкранного режима

Запрос ingest, определенный через рецепт, называется INGEST-1 и в настоящее время выполняется.

Знаете ли вы, что можно выполнять вызовы API непосредственно из встроенной документации по API?

Выберите значок страницы (📄) в левой навигационной панели Quine Explore и перейдите к конечной точке API, которую вы хотите использовать.

Настройте вызов API по необходимости и нажмите синюю кнопку «Отправить запрос API».

Приостановка потока через API осуществляется через конечную точку ingest/{name}/pause.

❯ http PUT http://tow-mater:8080/api/v1/ingest/INGEST-1/pause Content-Type:application/json
HTTP/1.1 200 OK
Content-Encoding: gzip
Content-Type: application/json
Date: Fri, 20 May 2022 20:09:27 GMT
Server: akka-http/10.2.9
Transfer-Encoding: chunked

{
    "name": "INGEST-1",
    "settings": {
        "format": {
            "parameter": "that",
            "query": "MATCH (revNode) WHERE id(revNode) = idFrom("revision", $that.rev_id)nMATCH (dbNode) WHERE id(dbNode) = idFrom("db", $that.database)nMATCH (userNode) WHERE id(userNode) = idFrom("id", $that.performer.user_id)nSET revNode = $that, revNode.type = "rev"nSET dbNode.database = $that.database, dbNode.type = "db"nSET userNode = $that.performer, userNode.type = "user"nWITH *, datetime($that.rev_timestamp) AS dnCALL create.setLabels(revNode, ["rev:" + $that.page_title])nCALL create.setLabels(dbNode, ["db:" + $that.database])nCALL create.setLabels(userNode, ["user:" + $that.performer.user_text])nCALL reify.time(d, ["year", "month", "day", "hour", "minute"]) YIELD node AS timeNodenCALL incrementCounter(timeNode, "count")nCREATE (revNode)-[:at]->(timeNode)nCREATE (revNode)-[:db]->(dbNode)nCREATE (revNode)-[:by]->(userNode)",
            "type": "CypherJson"
        },
        "parallelism": 16,
        "type": "ServerSentEventsIngest",
        "url": "https://stream.wikimedia.org/v2/stream/page-create"
    },
    "stats": {
        "byteRates": {
            "count": 1653281,
            "fifteenMinute": 1530.565647994232,
            "fiveMinute": 1428.2092910910662,
            "oneMinute": 1488.2104624440235,
            "overall": 1448.444229804896
        },
        "ingestedCount": 1117,
        "rates": {
            "count": 1117,
            "fifteenMinute": 1.0361739604926652,
            "fiveMinute": 0.96669545913622,
            "oneMinute": 1.0032209384426753,
            "overall": 0.9786067989220232
        },
        "startTime": "2022-05-20T19:50:26.494025Z",
        "totalRuntime": 1141066
    },
    "status": "Paused"
}
Вход в полноэкранный режим Выход из полноэкранного режима

Обратите внимание, что обновления в окне терминала прекратились, а поток INGEST-1 ingest имеет статус "Paused".

Перезапустите поток с помощью PUT на конечную точку /ingest/{name}/start. Обновления возобновятся в окне вашего терминала, а статус входящего потока вернется к "Running".

Заключение

Мы только начинаем разминаться с ingest-потоками! В этом посте мы рассмотрели простой поток ингеста и запрос ингеста для чтения отправленных сервером событий (SSE) из службы потоковых событий Wikipedia.

Следующей статьей в этой серии будет «Ввод данных CSV», где мы рассмотрим, как Quine вводит данные, хранящиеся в CSV-файле.

Я приветствую ваши отзывы! Заходите в Quine Slack и дайте мне знать, что вы думаете. Я всегда рад обсудить Quine или ответить на вопросы.

Оцените статью
Procodings.ru
Добавить комментарий