Skip to content

Загрузчик

Загрузчик является инструментом регистрации данных.

Данные, зарегистрированные в приложении, используются для создания датасетов.

Процесс регистрации

При регистрации загрузчика указывается:

  • Источник данных
  • Ссылка на даг Airflow, которым рассчитываются данные в источнике (справочно)
  • Тип приемника
  • Режим загрузки
  • Маппинг
  • Расписание

Имя загрузчика должно быть уникальным во всём каталоге.

Источник данных

Допустимые источники данных:

  • Таблица в БД (table / external)
  • SQL-скрипт
  • CSV/XLSX-файл (file)

В отличие от CSV/XLSX-файла и SQL-скрипта, таблица в БД может быть зарегистрирована в виде ссылки без последующей физической загрузки данных из неё во внутренние структуры приложения. Ниже представлено сравнение использования загрузчика с загрузкой данных и без таковой.

Загрузчик с загрузкой данных (table, file, sql)

  • Плюсы
    • Поддерживается ведение истории с обеспечением принципов неизменности исторических данных (данные за загруженные даты не изменяются)
    • Автоматический расчет периодов актуальности данных
    • Оптимальная структура хранения загруженных данных
  • Минусы
    • Данные занимают дополнительное место в БД
    • Требуется время на загрузку данных
    • Не всегда возможно исправить загруженные исторические срезы, учесть "долеты"

Ссылка на таблицу в БД (external)

  • Плюсы
    • Не требуется дополнительное место в БД
    • Данные готовы для использования сразу, не надо ждать загрузку
    • Возможность исправить исторические данные
  • Минусы
    • Данные в таблице не валидируются
    • Данные в таблице находятся в зоне ответственности пользователя (система не контролирует неизменность истории, не рассчитывает индексы и т.д.)
    • Дополнительные требования к структуре таблицы (поля с датами и ключами должны называться определенным образом)

Загрузка данных через SQL-скрипт

К скрипту выдвигаются следующие требования:

  • Скрипт может содержать только одну инструкцию select без использования CTE;
  • Скрипт должен быть исполняем на том источнике данных, который указывается в загрузчике.

Тип приемника

Допустимые типы приемников:

  • Переменные (hot) - создаются и загружаются новые версии в каталоге переменных
    • Готовые переменные (final) - данные в разрезе сущности, готовые для использования в датасете без дополнительных манипуляций; в срезе данных по дате актуальности отсутствуют дубли по сущности; примеры - пол клиента, номер договора и т.д.
    • Транзакционный предагрегат (trans_preagg) - предварительно агрегированные данные за определенный период (гранулярность); примеры - количество транзакций за неделю, сумма зп-начислений за месяц и т.д.
    • Предагрегат общего назначения (common_preagg) - детализированная информация по сущности под расширенным ключом;
      примеры - договоры в разрезе клиента, ответы из БКИ по клиенту и т.д
  • Сегмент (keys) - сохраняется список значений сущности, остальные поля сохраняются как переменные сегмента. Сегмент регистрируется в каталоге датасетов, переменные сегмента не сохраняются в каталоге переменных. Для датасетов, полученных таким образом, после завершения загрузки производится расчет статистик и графика плотности распределения (отключаемо). Загрузка сегмента недоступна через sql-скрипт.
  • Связь сущностей (entity_link) - создается и загружается новая связь существующих сущностей

Режим загрузки

Доступны следующие режимы загрузки:

  • full = загрузка полного среза
  • replace = замена данных приемника данными источника; режим доступен только для источников файл, таблица и sql-скрипт

Реализовано сохранение истории изменения данных, если это применимо для выбранного типа приемника. Стратегия работы с историей зависит от режима загрузки и типа приемника:

Тип приемника Режим загрузки Стратегия загрузки
Готовые переменные full snapshot
Готовые переменные replace replace
Транзакционный предагрегат full insert
Транзакционный предагрегат replace replace
Предагрегат общего назначения full snapshot
Предагрегат общего назначения replace replace
Связь сущностей full snapshot
Связь сущностей replace replace
Сегмент - replace

Для стратегий загрузки snapshot и insert происходит последовательная загрузка срезов данных по отобранным датам актуальности. Даты актуальности отбираются из источника по условию: дата актуальности > максимальная дата актуальности в таблице-приемнике.

Для режима загрузки replace даты актуальности не анализируются, происходит простая замена данных приемника данными источника без дополнительных проверок и условий.

snapshot

У записей у которых в таблице-приемнике to_dttm=infinity, проставляется to_dttm=<пришедшая дата> - 1 microsecond. Новые записи вставляются с from_dttm=<пришедшая среза>, to_dttm=infinity

insert

Производится INSERT новых записей без проверок

replace

Без истории. Производится REPLACE таблицы

Маппинг

При регистрации загрузчика необходимо указать соответствие загружаемых полей источника на:

  • ключи выбранной сущности (для связи сущностей - ключи родительской и дочерней сущности)
  • дату актуальности - дата, на которую рассчитаны данные в строке
  • дату окончания актуальности - дата до которой данные в строке остаются актуальными
  • переменные (только для загрузчиков переменных) - при этом указываются параметры создаваемой версии (имя, описание, тип данных). Имя версии переменной должно быть уникальным в разрезе переменной во всём каталоге.

Маппинг на даты (источник -> приемник)

  • table, file, sql -> hot, entity_link
    • Дата актуальности: Опционально (поле/константа)
    • Дата окончания: Не обрабатывается
  • external -> hot, entity_link
    • Дата актуальности: Опционально (только поле)
    • Дата окончания: Если указана Дата акутальности, то Обязательно, иначе не обрабатывается
  • * -> keys
    • Дата актуальности: Не обрабатывается
    • Дата окончания: Не обрабатывается

Если дата актуальности не указана для источников table/file/sql загрузчиков, загружаемых в режиме полного среза, то в качестве даты актуальности проставляется фактическая дата загрузки данных (т.о. загружаемые данные действуют с момента загрузки).

Во всех остальных случаях, если дата актуальности не указана, то данные в таблице считаются действующими бессрочно.

Настройка расписания

Источник Настройка расписания
CSV/XLSX-файл
Таблица в БД (ссылка)
Таблица в БД (загрузка)
SQL

Настройка расписания осуществляется в формате пятизначной cron-строки:

* * * * * cron-строка расписания
- - - - -
| | | | |
| | | | ----- день недели (0—7) (воскресенье = 0 или 7)
| | | ------- месяц (1—12)
| | --------- день месяца (1—31)
| ----------- час (0—23)
------------- минута (0—59)

Настройка загрузчика в SDK:

from catalogue import loader

new_loader = loader.create(
    source: str,  # источник загрузчика: file, table, external, sql
    target: str  # тип загрузки: features, segment, entity_link
)

# Конфиг для загрузчика фичей с источником файл (`load.create(source='file', target='feature'`):
new_loader.config = {
        'loader_name': str, # имя загрузчика
        'description': Optional[str],  # описание загрузчика
        'load_mode': str, # тип загрузки, доступные: ("inc", "full")
        'delimiter': str, # разделитель в загруженном файле (для .csv)
        'entity': Entity,  # сущность для загрузчика
        'preagg_flg': bool,  # флаг таблицы предагрегатов
        'granularity_type': str = None,  # тип гранулярности; необязательный
        'granularity': int = None,  # гранулярность; необязательный

}

# Конфиг для загрузчика фичей с источником таблица/внешняя таблица/SQL-скрипт

new_loader.config = {
        'loader_name': str, # имя загрузчика
        'description': Optional[str],  # описание загрузчика
        'load_mode': str,  # тип загрузки, доступные: ("inc", "full")
        'src_schema_name': str,  # имя схемы из источника
        'src_table_name': str,  # имя таблицы из источника
        'entity': Entity,  # сущность для загрузчика
        'preagg_flg': bool,  # флаг таблицы предагрегатов
        'granularity_type': str = None,  # тип гранулярности; необязательный
        'granularity': int = None,  # гранулярность; необязательный
}

# Конфиг для загрузчика связи сущностей с источником файл (`load.create(source='file', target='feature'`):
new_loader.config = {
        'loader_name': str, # имя загрузчика
        'description': Optional[str],  # описание загрузчика
        'load_mode': str, # тип загрузки, доступные: ("inc", "full")
        'delimiter': str, # разделитель в загруженном файле (для .csv)
        'kind': str,  #  тип связи ("1:1", "1:M", "M:N")
        'parent_entity': Entity,  # родительская сущность (в типе связи - слева)
        'child_entity': Entity  # дочерняя сущность (в типе связи - справа)
}

# Конфиг для загрузчика связи сущностей с источником таблица/внешняя таблица/SQL-скрипт

load.create(source='file', target='feature')

new_loader.config = {
        'loader_name': str, # имя загрузчика
        'description': Optional[str],  # описание загрузчика
        'load_mode': str, # тип загрузки, доступные: ("inc", "full")
        'src_schema_name': str,  # имя схемы из источника
        'src_table_name': str,  # имя таблицы из источника
        'kind': str,  #  тип связи ("1:1", "1:M", "M:N")
        'parent_entity': Entity,  # родительская сущность (в типе связи - слева)
        'child_entity': Entity  # дочерняя сущность (в типе связи - справа)
}

# Конфиг для загрузчика списка ключей/сегмента с источником файл (`load.create(source='file', target='segment'`):
new_loader.config = {
        'loader_name': str, # имя загрузчика
        'description': Optional[str],  # описание загрузчика
        'load_mode': str, # тип загрузки, доступные: ("inc", "full")
        'delimiter': str, # разделитель в загруженном файле (для .csv)
        'entity': Entity,  # сущность для загрузчика
}

# Конфиг для загрузчика списка ключей/сегмента с источником таблица/внешняя таблица/SQL-скрипт

load.create(source='file', target='segment')

new_loader.config = {
        'loader_name': str, # имя загрузчика
        'description': Optional[str],  # описание загрузчика
        'load_mode': str, # тип загрузки, доступные: ("inc", "full")
        'src_schema_name': str,  # имя схемы из источника
        'src_table_name': str,  # имя таблицы из источника
        'entity': Entity,  # сущность для загрузчика
}

# Настройка расписания
new_loader.schedule = {
    'type': str,  # возможные значения: 
                  # 'Hours', 'Everyday', 'Weekly', 'Monthly_last_day', 'Monthly_day_of_week', 'Custom'
    'step': int,  # количество часов в шаге; обязательный только для Hours
    'weekdays_flg': bool,  # признак "только по будням"; может заполняться только 
                           # для Everyday (не обязательный), в остальных случаях не заполняется
    'day_of_week': List[int],  # перечисление дней недели (1-7); обязательный только 
                               # для Weekly И Monthly_day_of_week, в остальных случаях не заполняется
    'num_of_week': int,  # номер недели; обязательный только для Monthly_day_of_week,
                         # в остальных случаях не заполняется
    'day_of_month': List[int],  # перечисление дней месяца (1-31), на которые нужно 
                                # провести расчёт; обязательный только для Custom,
                                # в остальных случаях не заполняется
    'run_time': str  # время запуска в формате строки "YYYY-MM-DD hh:mm:ss"
}

# Маппинг загрузчика фичей
new_loader.mapping = {
        'from_dttm': Union[str, datetime.date],  # поле даты актуальности
        'to_dttm': Union[str, datetime.date] = None,  # поле окончания периода актуальности
                                                      # (только для External загрузчика)
        'entity': [
            { # словарь маппинга выбранного сущности
                'entity_key': Entity,  # объект Entity с детализацией до ключа
                'stg_column_name': str  # название поля для ключа из выбранного файла
            },
            ..
        ],
        'features': List[
            {
                'stg_column_name': str,  # название поля для ключа из выбранного файла
                'feature': Feature,  # объект Feature
                'feature_version_config': 
                    {  # заполняется только если в 'feature' передана БФ, а не версия
                        'name': str,  # название версии фичи
                        'kind': str,  # тип фичи, доступные значения: ["feature"]
                        'description': str,  # описание версии фичи
                        'data_type': DataType  # объект DataType
                    } = None
            },
            ..
       ]  # список словарей маппинга фичей
}

# Маппинг загрузчика связи сущностей
new_loader.mapping = {
        'from_dttm': Union[str, datetime.date],  # поле даты актуальности
        'to_dttm': Union[str, datetime.date] = None,
        'parent': [
            { # словарь маппинга родительнской сущности
                'entity_key': EntityKey,  # объект EntityKey с детализацией до ключа
                'stg_column_name': str  # название поля для ключа из выбранного файла
            },
            ..
        ],
        'child': [
            { # словарь маппинга дочерней сущности
                'entity_key': EntityKey,  # объект EntityKey с детализацией до ключа
                'stg_column_name': str  # название поля для ключа из выбранного файла
            },
            ..
        ]
}

# Маппинг загрузчика сегмента/списка ключей
new_loader.mapping = {
        'entity': Union[
            [
                { # словарь маппинга выбранной сущности
                    'entity_key': Entity,  # объект Entity с детализацией до ключа
                    'stg_column_name': str  # название поля для ключа из выбранного файла
                },
                ..
            ],
            ent.ROWNUM  # в качестве сущности можно указать ROWNUM
        ]
}

new_loader.save_x_execute()