Загрузчик
Загрузчик является инструментом регистрации данных.
Данные, зарегистрированные в приложении, используются для создания датасетов.
Процесс регистрации
При регистрации загрузчика указывается:
- Источник данных
- Ссылка на даг Airflow, которым рассчитываются данные в источнике (справочно)
- Тип приемника
- Режим загрузки
- Маппинг
- Расписание
Имя загрузчика должно быть уникальным во всём каталоге.
Источник данных
Допустимые источники данных:
- Таблица в БД (table / external)
- SQL-скрипт
- CSV/XLSX-файл (file)
В отличие от CSV/XLSX-файла и SQL-скрипта, таблица в БД может быть зарегистрирована в виде ссылки без последующей физической загрузки данных из неё во внутренние структуры приложения. Ниже представлено сравнение использования загрузчика с загрузкой данных и без таковой.
Загрузчик с загрузкой данных (table, file, sql)
- Плюсы
- Поддерживается ведение истории с обеспечением принципов неизменности исторических данных (данные за загруженные даты не изменяются)
- Автоматический расчет периодов актуальности данных
- Оптимальная структура хранения загруженных данных
- Минусы
- Данные занимают дополнительное место в БД
- Требуется время на загрузку данных
- Не всегда возможно исправить загруженные исторические срезы, учесть "долеты"
Ссылка на таблицу в БД (external)
- Плюсы
- Не требуется дополнительное место в БД
- Данные готовы для использования сразу, не надо ждать загрузку
- Возможность исправить исторические данные
- Минусы
- Данные в таблице не валидируются
- Данные в таблице находятся в зоне ответственности пользователя (система не контролирует неизменность истории, не рассчитывает индексы и т.д.)
- Дополнительные требования к структуре таблицы (поля с датами и ключами должны называться определенным образом)
Загрузка данных через SQL-скрипт
К скрипту выдвигаются следующие требования:
- Скрипт может содержать только одну инструкцию
selectбез использования CTE; - Скрипт должен быть исполняем на том источнике данных, который указывается в загрузчике.
Тип приемника
Допустимые типы приемников:
- Переменные (hot) - создаются и загружаются новые версии в каталоге переменных
- Готовые переменные (final) - данные в разрезе сущности, готовые для использования в датасете без дополнительных манипуляций; в срезе данных по дате актуальности отсутствуют дубли по сущности; примеры - пол клиента, номер договора и т.д.
- Транзакционный предагрегат (trans_preagg) - предварительно агрегированные данные за определенный период (гранулярность); примеры - количество транзакций за неделю, сумма зп-начислений за месяц и т.д.
- Предагрегат общего назначения (common_preagg) - детализированная информация по сущности под расширенным
ключом;
примеры - договоры в разрезе клиента, ответы из БКИ по клиенту и т.д
- Сегмент (keys) - сохраняется список значений сущности, остальные поля сохраняются как переменные сегмента. Сегмент регистрируется в каталоге датасетов, переменные сегмента не сохраняются в каталоге переменных. Для датасетов, полученных таким образом, после завершения загрузки производится расчет статистик и графика плотности распределения (отключаемо). Загрузка сегмента недоступна через sql-скрипт.
- Связь сущностей (entity_link) - создается и загружается новая связь существующих сущностей
Режим загрузки
Доступны следующие режимы загрузки:
- full = загрузка полного среза
- replace = замена данных приемника данными источника; режим доступен только для источников файл, таблица и sql-скрипт
Реализовано сохранение истории изменения данных, если это применимо для выбранного типа приемника. Стратегия работы с историей зависит от режима загрузки и типа приемника:
| Тип приемника | Режим загрузки | Стратегия загрузки |
|---|---|---|
| Готовые переменные | full | snapshot |
| Готовые переменные | replace | replace |
| Транзакционный предагрегат | full | insert |
| Транзакционный предагрегат | replace | replace |
| Предагрегат общего назначения | full | snapshot |
| Предагрегат общего назначения | replace | replace |
| Связь сущностей | full | snapshot |
| Связь сущностей | replace | replace |
| Сегмент | - | replace |
Для стратегий загрузки snapshot и insert происходит последовательная загрузка срезов данных по отобранным
датам актуальности. Даты актуальности отбираются из источника по условию:
дата актуальности > максимальная дата актуальности в таблице-приемнике.
Для режима загрузки replace даты актуальности не анализируются, происходит простая замена данных приемника
данными источника без дополнительных проверок и условий.
snapshot
У записей у которых в таблице-приемнике to_dttm=infinity, проставляется
to_dttm=<пришедшая дата> - 1 microsecond.
Новые записи вставляются с from_dttm=<пришедшая среза>, to_dttm=infinity
insert
Производится INSERT новых записей без проверок
replace
Без истории. Производится REPLACE таблицы
Маппинг
При регистрации загрузчика необходимо указать соответствие загружаемых полей источника на:
- ключи выбранной сущности (для связи сущностей - ключи родительской и дочерней сущности)
- дату актуальности - дата, на которую рассчитаны данные в строке
- дату окончания актуальности - дата до которой данные в строке остаются актуальными
- переменные (только для загрузчиков переменных) - при этом указываются параметры создаваемой версии (имя, описание, тип данных). Имя версии переменной должно быть уникальным в разрезе переменной во всём каталоге.
Маппинг на даты (источник -> приемник)
table,file,sql->hot,entity_link- Дата актуальности: Опционально (поле/константа)
- Дата окончания: Не обрабатывается
external->hot,entity_link- Дата актуальности: Опционально (только поле)
- Дата окончания: Если указана Дата акутальности, то Обязательно, иначе не обрабатывается
*->keys- Дата актуальности: Не обрабатывается
- Дата окончания: Не обрабатывается
Если дата актуальности не указана для источников table/file/sql загрузчиков, загружаемых в режиме
полного среза, то в качестве даты актуальности проставляется фактическая дата загрузки данных
(т.о. загружаемые данные действуют с момента загрузки).
Во всех остальных случаях, если дата актуальности не указана, то данные в таблице считаются действующими бессрочно.
Настройка расписания
| Источник | Настройка расписания |
|---|---|
| CSV/XLSX-файл | ✅ |
| Таблица в БД (ссылка) | ❌ |
| Таблица в БД (загрузка) | ✅ |
| SQL | ✅ |
Настройка расписания осуществляется в формате пятизначной cron-строки:
* * * * * cron-строка расписания
- - - - -
| | | | |
| | | | ----- день недели (0—7) (воскресенье = 0 или 7)
| | | ------- месяц (1—12)
| | --------- день месяца (1—31)
| ----------- час (0—23)
------------- минута (0—59)
Настройка загрузчика в SDK:
from catalogue import loader
new_loader = loader.create(
source: str, # источник загрузчика: file, table, external, sql
target: str # тип загрузки: features, segment, entity_link
)
# Конфиг для загрузчика фичей с источником файл (`load.create(source='file', target='feature'`):
new_loader.config = {
'loader_name': str, # имя загрузчика
'description': Optional[str], # описание загрузчика
'load_mode': str, # тип загрузки, доступные: ("inc", "full")
'delimiter': str, # разделитель в загруженном файле (для .csv)
'entity': Entity, # сущность для загрузчика
'preagg_flg': bool, # флаг таблицы предагрегатов
'granularity_type': str = None, # тип гранулярности; необязательный
'granularity': int = None, # гранулярность; необязательный
}
# Конфиг для загрузчика фичей с источником таблица/внешняя таблица/SQL-скрипт
new_loader.config = {
'loader_name': str, # имя загрузчика
'description': Optional[str], # описание загрузчика
'load_mode': str, # тип загрузки, доступные: ("inc", "full")
'src_schema_name': str, # имя схемы из источника
'src_table_name': str, # имя таблицы из источника
'entity': Entity, # сущность для загрузчика
'preagg_flg': bool, # флаг таблицы предагрегатов
'granularity_type': str = None, # тип гранулярности; необязательный
'granularity': int = None, # гранулярность; необязательный
}
# Конфиг для загрузчика связи сущностей с источником файл (`load.create(source='file', target='feature'`):
new_loader.config = {
'loader_name': str, # имя загрузчика
'description': Optional[str], # описание загрузчика
'load_mode': str, # тип загрузки, доступные: ("inc", "full")
'delimiter': str, # разделитель в загруженном файле (для .csv)
'kind': str, # тип связи ("1:1", "1:M", "M:N")
'parent_entity': Entity, # родительская сущность (в типе связи - слева)
'child_entity': Entity # дочерняя сущность (в типе связи - справа)
}
# Конфиг для загрузчика связи сущностей с источником таблица/внешняя таблица/SQL-скрипт
load.create(source='file', target='feature')
new_loader.config = {
'loader_name': str, # имя загрузчика
'description': Optional[str], # описание загрузчика
'load_mode': str, # тип загрузки, доступные: ("inc", "full")
'src_schema_name': str, # имя схемы из источника
'src_table_name': str, # имя таблицы из источника
'kind': str, # тип связи ("1:1", "1:M", "M:N")
'parent_entity': Entity, # родительская сущность (в типе связи - слева)
'child_entity': Entity # дочерняя сущность (в типе связи - справа)
}
# Конфиг для загрузчика списка ключей/сегмента с источником файл (`load.create(source='file', target='segment'`):
new_loader.config = {
'loader_name': str, # имя загрузчика
'description': Optional[str], # описание загрузчика
'load_mode': str, # тип загрузки, доступные: ("inc", "full")
'delimiter': str, # разделитель в загруженном файле (для .csv)
'entity': Entity, # сущность для загрузчика
}
# Конфиг для загрузчика списка ключей/сегмента с источником таблица/внешняя таблица/SQL-скрипт
load.create(source='file', target='segment')
new_loader.config = {
'loader_name': str, # имя загрузчика
'description': Optional[str], # описание загрузчика
'load_mode': str, # тип загрузки, доступные: ("inc", "full")
'src_schema_name': str, # имя схемы из источника
'src_table_name': str, # имя таблицы из источника
'entity': Entity, # сущность для загрузчика
}
# Настройка расписания
new_loader.schedule = {
'type': str, # возможные значения:
# 'Hours', 'Everyday', 'Weekly', 'Monthly_last_day', 'Monthly_day_of_week', 'Custom'
'step': int, # количество часов в шаге; обязательный только для Hours
'weekdays_flg': bool, # признак "только по будням"; может заполняться только
# для Everyday (не обязательный), в остальных случаях не заполняется
'day_of_week': List[int], # перечисление дней недели (1-7); обязательный только
# для Weekly И Monthly_day_of_week, в остальных случаях не заполняется
'num_of_week': int, # номер недели; обязательный только для Monthly_day_of_week,
# в остальных случаях не заполняется
'day_of_month': List[int], # перечисление дней месяца (1-31), на которые нужно
# провести расчёт; обязательный только для Custom,
# в остальных случаях не заполняется
'run_time': str # время запуска в формате строки "YYYY-MM-DD hh:mm:ss"
}
# Маппинг загрузчика фичей
new_loader.mapping = {
'from_dttm': Union[str, datetime.date], # поле даты актуальности
'to_dttm': Union[str, datetime.date] = None, # поле окончания периода актуальности
# (только для External загрузчика)
'entity': [
{ # словарь маппинга выбранного сущности
'entity_key': Entity, # объект Entity с детализацией до ключа
'stg_column_name': str # название поля для ключа из выбранного файла
},
..
],
'features': List[
{
'stg_column_name': str, # название поля для ключа из выбранного файла
'feature': Feature, # объект Feature
'feature_version_config':
{ # заполняется только если в 'feature' передана БФ, а не версия
'name': str, # название версии фичи
'kind': str, # тип фичи, доступные значения: ["feature"]
'description': str, # описание версии фичи
'data_type': DataType # объект DataType
} = None
},
..
] # список словарей маппинга фичей
}
# Маппинг загрузчика связи сущностей
new_loader.mapping = {
'from_dttm': Union[str, datetime.date], # поле даты актуальности
'to_dttm': Union[str, datetime.date] = None,
'parent': [
{ # словарь маппинга родительнской сущности
'entity_key': EntityKey, # объект EntityKey с детализацией до ключа
'stg_column_name': str # название поля для ключа из выбранного файла
},
..
],
'child': [
{ # словарь маппинга дочерней сущности
'entity_key': EntityKey, # объект EntityKey с детализацией до ключа
'stg_column_name': str # название поля для ключа из выбранного файла
},
..
]
}
# Маппинг загрузчика сегмента/списка ключей
new_loader.mapping = {
'entity': Union[
[
{ # словарь маппинга выбранной сущности
'entity_key': Entity, # объект Entity с детализацией до ключа
'stg_column_name': str # название поля для ключа из выбранного файла
},
..
],
ent.ROWNUM # в качестве сущности можно указать ROWNUM
]
}
new_loader.save_x_execute()