BigQuery поддерживает транзакции с прошлого года (представлено на Google Cloud Next’21): теперь можно выполнять мутирующие операции над одной или несколькими таблицами, а затем фиксировать или откатывать результат атомарно, обернув сценарий между ними.
BEGIN TRANSACTION;
и
COMMIT TRANSACTION;
или
ROLLBACK TRANSACTION;
Достаточно просто! Все объяснено в официальной документации.
Однако транзакции имеют одно ограничение:
Транзакция не может охватывать несколько скриптов.
Хотя в большинстве случаев это не является проблемой, это может стать проблемой, когда скрипты, заключенные в транзакцию, становятся слишком сложными или имеют слишком много параметров запроса, или нарушают любую другую квоту заданий BigQuery. Это может произойти, например, когда сценарии запросов автоматически генерируются из полезной нагрузки запроса.
Есть способ обойти это, используя сессии BigQuery. Давайте посмотрим, как это работает
Сессии BigQuery
Сессии — это способ связывать задания и сохранять между ними переходные данные, например, временные таблицы.
Один из распространенных случаев использования сессий — это именно то, что нам нужно:
Создание многоэтапных транзакций над несколькими запросами. В рамках сеанса можно начать транзакцию, внести изменения и просмотреть временный результат, прежде чем принять решение о фиксации или откате. Это можно делать над несколькими запросами в сеансе. Если вы не используете сеанс, то транзакцию с несколькими утверждениями необходимо завершить в одном запросе.
Идея заключается в том, чтобы сложить запросы транзакции в одну сессию, начиная с BEGIN TRANSACTION;
и заканчивая COMMIT TRANSACTION;
.
В промежутках между ними вы можете вызывать столько запросов, сколько необходимо, и вся сессия будет иметь атомарное поведение.
Сессия закрывается автоматически после 24 часов бездействия. Однако, если использовать транзакции, может случиться так, что целевая таблица будет «заблокирована» в сессии и станет непригодной для использования до конца сессии. Поэтому я рекомендую принудительно завершать сессию в конце сценария. Это делается путем вызова следующего запроса:
CALL BQ.ABORT_SESSION();
Реализация в Python
Мы имеем дело с понятием сессии, которую нужно открывать и всегда закрывать в конце обработки: для этого естественно нужен менеджер контекста.
"""ContextManager wrapping a bigquery session."""
from google.cloud import bigquery
class BigquerySession:
"""ContextManager wrapping a bigquerySession."""
def __init__(self, bqclient: bigquery.Client, bqlocation: str = "EU") -> None:
"""Construct instance."""
self._bigquery_client = bqclient
self._location = bqlocation
self._session_id = None
def __enter__(self) -> str:
"""Initiate a Bigquery session and return the session_id."""
job = self._bigquery_client.query(
"SELECT 1;", # a query can't fail
job_config=bigquery.QueryJobConfig(create_session=True),
location=self._location,
)
self._session_id = job.session_info.session_id
job.result() # wait job completion
return self._session_id
def __exit__(self, exc_type, exc_value, traceback):
"""Abort the opened session."""
if self._session_id:
# abort the session in any case to have a clean state at the end
# (sometimes in case of script failure, the table is locked in
# the session)
job = self._bigquery_client.query(
"CALL BQ.ABORT_SESSION();",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=self._session_id
)
],
),
location=self._location,
)
job.result()
Затем становится очень просто использовать этот контекст для объединения заданий в один сеанс, таким образом создавая многоэтапную, многоскриптовую транзакцию bigquery:
with BigquerySession(self.bigquery_client, BIGQUERY_LOCATION) as session_id:
# open transaction
job = self.bigquery_client.query(
"BEGIN TRANSACTION;",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
# stack queries
for queryscript in scripts:
job = self.bigquery_client.query(
queryscript,
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
# end transaction
job = self.bigquery_client.query(
"COMMIT TRANSACTION;",
job_config=bigquery.QueryJobConfig(
create_session=False,
connection_properties=[
bigquery.query.ConnectionProperty(
key="session_id", value=session_id
)
],
),
location=BIGQUERY_LOCATION,
)
job.result()
Обратите внимание, что все задания выполняются с одним и тем же session_id (т.е. в рамках одной сессии) и в одном и том же месте (это требование для сессий).
Надеюсь, это поможет!
Фото Caroline Selfors on Unsplash