Skip to content

Подключение к данным

Данный раздел описывает подключение в Датастору - движку, который занимается обработкой данных. Это может быть как классическая СУБД типа PostgreSQL, так и что-то похожее на неё как Apache Spark.

Помимо URL можно указать дополнительные опции соединения к датастору в v100.source.options. Ниже перечислены некоторые из них:

  • pool_size - The number of connections to keep open inside the connection pool.
  • max_overflow - the number of connections to allow in connection pool “overflow”, that is connections that can be opened above and beyond the pool_size setting.
  • pool_timeout - Number of seconds to wait before giving up on getting a connection from the pool.
  • pool_recycle - This setting causes the pool to recycle connections after the given number of seconds has passed. It defaults to -1, or no timeout.
  • pool_reset_on_return - Action upon returning connection back to pool: 'rollback', 'commit' or null.
  • pool_use_lifo - Use LIFO (last-in-first-out) when retrieving connections from the pool instead of FIFO (first-in-first-out).
  • max_identifier_length integer; Override the max_identifier_length determined by the dialect. if null or zero, has no effect.
  • connect_args - A dictionary of options which will be passed directly to the driver as additional arguments.
  • hide_parameters - When set to True, SQL statement parameters will not be displayed in INFO logging nor will they be formatted into the string representation of StatementError objects.
  • isolation_level - This string parameter is interpreted by various dialects in order to affect the transaction isolation level of the database connection. The parameter essentially accepts some subset of these string arguments: "SERIALIZABLE", "REPEATABLE READ", "READ COMMITTED", "READ UNCOMMITTED" and "AUTOCOMMIT". Behavior here varies per backend, and individual dialects should be consulted directly.

Рекомендованные параметры

Info

В зависимости от типа СУБД и используемого драйвера аргумент, задающий timeout при подключения может называться по-разному, но его обязательно необходимо задавать, иначе можно попасть в ситуацию, когда на транспортном уровне есть проблемы, и пакеты не доходят до клиента и попытка подключения висит бесконечно (например MTU в виртуальной сети не соответствует настройкам по размеру пакета в кластере greenplum)

Казалось бы, что pool_timeout ровно за это и отвечает, но нет. Опция pool_timeout срабатывает только в случае, когда сервер отвечает да/нет, но если ожидаение ответа подвисает, то подвисает и приложение.

# v100source.options
{  # Параметры движка
    "pool_size": 50,     # Максимальное количество одновременных подключений
    "max_overflow": 0,   # Максимально допустимый перелимит
    "pool_timeout": 60,  # Таймаут на получение подключения из пула
    "connect_args" {     # параметры дравера
        "connect_timeout": 10,
        ...
    }
}

PostgreSQL & Greenplum

URL Подключения к базе должен иметь правильную схему:

  • PostgreSQL: postgres://..., postgresql://... или postgressql+psycopg2://...
  • Greenplum: greenplum://...

Рекомендуется использование параметров keepalives... для надежного определения ситуаций, когда соединение с БД разрывается в процессе работы, из-за сбоя БД или потери сетевой связанности.

# v100source.options
{
    ...
    "connect_args": {  # Опции драйвера psycopg2
        "connect_timeout": 10,
        "keepalives": 1,
        "keepalives_idle": 10,
        "keepalives_interval": 5,
        "keepalives_count": 2
    }
}

Также через "options": "-c ..." можно дополнительно передать параметры сессии по-молчанию. Полный список таких параметров в документации PostgreSQL и по Greenplum

# PostgreSQL
{
    ...
    "connect_args": {
        ...
        "options": "-c idle_in_transaction_session_timeout=5min",
    }
}
# Greenplum
{
    ...
    "connect_args": {
        ...
        "options": "-c max_statement_mem=4000MB -c gp_default_storage_options=orientation=column,compresstype=zstd,compresslevel=1"
    }
}

Кастомизировать опции, с которыми создаются таблицы, можно через параметры в ключе fs_settings. aux таблицами являются те, которые удаляются после выполнения трансформации.

{
    ...
    "fs_settings":{
        "create_table_prefix": "",
        "create_table_postfix": "WITH (appendoptimized=true,orientation=column,compresstype=zstd,compresslevel=1)",
        "create_table_aux_prefix": "UNLOGGED",
        "create_table_aux_postfix": "WITH (appendoptimized=true,autovacuum_enabled=false,orientation=column,compresstype=zstd,compresslevel=1)"
    },
    "connect_args": {
        ...
    }
}

Apache Spark

Работа с кластером Apache Spark реализована через отправку запросов к hive thift серверу. Который должен быть предварительно запущен и сконфигурирован в кластере.

URL Подключения должен иметь схему spark://...

# v100source.options
{
    ...
    "connect_args": {
        "connect_timeout": 10
    }
}

Clickhouse

URL Подключения должен иметь схему clickhouse://...

{
    "connect_args": {
        "connect_timeout": 10,
        "settings": {
            "handshake_timeout_ms": 1,
        },
    }
}

В ключе connect_args задаются параметры подключения. Полный список возможных значений можно посмотреть в описании конструктора объекта Connection пакета clickhouse-driver.

Также connect_args может содержать ключ settings, в котором можно задать:

Ограничения Clickhouse

  • По причине ошибки в Clickhouse в источниках данных невозможно использовать колонки с именами, зарезервированными для внутреннего использования приложением: from_dttm, to_dttm, report_dttm,_created_dttm,_updated_dttm,_run_rk.

Apache Impala

Подключение к Apache Impala возможно двумя способами:

  • посредством python библиотеки impyla
  • посредством ODBC драйвера

Подключение посредством библиотеки impyla

URL подключения должен иметь вид impala://[user[:password]]host[:port][/database].

В ключе connect_args параметров подключения возможно задать дополнительные параметры, соответствующие аргументам функции connect библиотеки.

Подключение посредством ODBC драйвера

Для подключения в системе должен быть установлен и сконфигурирован ODBC драйвер. Система тестировалась с драйвером Cloudera 2.6.11.

В соответствии с инструкциями, приложенными к драйверу необходимо прописать название драйвера и (опционально) конфигурации источников в соответствующих ini файлах.

Далее необходимо сформировать ODBC строку подключения (см. документацию к драйверу) и сконвертировать ее в форму, пригодную для использования в URL. Для этого необходимо использовать python функцию quote_plus.

В простейшем случае строка подключения может иметь вид:

params = quote_plus(
    "DRIVER={Cloudera Impala ODBC Driver 64-bit};"
    "Host=impala;"
    "Port=21050;"
)

Пример более сложной конфигурации с сертификатами (должны находиться в соответствующей директории).

params = quote_plus(
    "DRIVER={Cloudera Impala ODBC Driver 64-bit};"
    "Host=host;"
    "Port=21050;"
    "AuthMech=3;"
    "UID=user;"
    "PWD=password;"
    "SSL=1;"
    "AllowSelfSignedServerCert=1;"
    "AllowHostNameCNMismatch=1;"
    r"TrustedCerts=/root/root-certs.pem;"
)

Конечный URL подключения будет иметь вид:

url = f"impala+pyodbc:///?odbc_connect={params}&autocommit=True"

Oracle Database

URL подключения должен иметь вид oracle://[user[:password]@]host[:port]/?service_name=PDB.

где PDB - имя PDB (например XEPDB1).

В connect_args возможно задать дополнительные параметры подключения, например

{
    "connect_args": {
        "tcp_connect_timeout": 10
    }
}

Внимание! В БД должны быть созданы все необходимые схемы (пользователи в терминологии Oracle) для работы системы. Для системного пользователя (поле user в строке подключения), а также для всех созданных пользователей должен быть выдан полный список прав для совершения операций в БД. Для настройки БД проконсультируйтесь с руководством по БД Oracle.

Справочник для Oracle

Для корректной работы с БД oracle необходимо для всех численных типов в справочнике fs_type указать py_type=decimal

NOTE: Oracle не поддерживает тип данных bool до версии 23c. В качестве альтернативы можно использовать CHAR(5) / NUMBER(1).

Использование типов данных с плавающей точкой

Если в справочнике имплементаций используется численный тип данных с плавающей точкой как целое число, например NUMERIC(18,0), то его нужно добавить в справочник типов данных по умолчанию с указанной в справочнике имплементаций точностью:

db_type default_fs_type db_name
NUMERIC FLOAT oracle
NUMERIC(18,0) INT oracle

Имплементации:

fs_type db_type db_name
INT NUMERIC(18,0) oracle
AMOUNT NUMERIC(12,4) oracle
FLOAT NUMERIC(18,6) oracle