Движок обслуживания Apache Spark#
CedrusData Catalog поддерживает выполнение операций обслуживания таблиц Iceberg с помощью Apache Spark. Движок Spark позволяет выполнять ресурсоёмкие операции обслуживания (компактификация файлов, перестроение манифестов и т.д.) в распределённом режиме, используя вычислительные ресурсы кластера.
Общая информация#
Движок обслуживания Apache Spark интегрирован в CedrusData Catalog через SPI вычислительных движков.
При запуске операции обслуживания каталог формирует задачу Spark и передаёт её на выполнение через spark-submit.
Задача выполняется в виде отдельного процесса (или пода Kubernetes), который получает необходимые параметры подключения к каталогу
и файловой системе, выполняет запрошенную операцию и возвращает результат.
Рекомендуемый режим работы — Kubernetes. В этом режиме для каждой операции обслуживания создаётся отдельный pod драйвера Spark, который автоматически удаляется после завершения задачи.
Остальные режимы работы со Spark формально поддерживаются в рамках каталога, но на данный момент не рекомендуются для промышленной эксплуатации. Их поддержка будет улучшаться в последующих релизах каталога.
Запуск каталога с поддержкой Spark#
Для использования движка обслуживания Apache Spark необходимо, чтобы на хосте каталога было доступно окружение Apache Spark.
Запуск из Docker-образа#
Для работы с движком Spark используйте специальную версию Docker-образа CedrusData Catalog со встроенным Apache Spark:
cr.yandex/crpjtvqf29mpabhmrf1s/cedrusdata-catalog:476-1-spark-3.5.8
Данный образ содержит предустановленный Apache Spark 3.5 и не требует дополнительной конфигурации SPARK_HOME.
Запуск из архива#
При запуске CedrusData Catalog из архива необходимо установить переменную окружения SPARK_HOME,
указывающую на директорию установки Apache Spark или задать параметр конфигурации каталога maintenance.engine.spark.home.
export SPARK_HOME=/path/to/spark
maintenance.engine.spark.home=/path/to/spark
Убедитесь, что версия Apache Spark совместима с версией, поддерживаемой CedrusData Catalog (в настоящее время — 3.5 и 4.0).
Kubernetes: образ для задач обслуживания#
При работе в режиме Kubernetes движок Spark создаёт поды, используя Docker-образ с предустановленным JAR-файлом задачи обслуживания.
Имя этого образа необходимо задать с помощью параметра конфигурации каталога maintenance.engine.spark.image (см. Конфигурация Iceberg).
maintenance.engine.spark.image=cr.yandex/crpjtvqf29mpabhmrf1s/cedrusdata-catalog-spark-maintenance:476-1
Управление движком Spark#
Перед запуском операций обслуживания необходимо создать экземпляр вычислительного движка типа spark.
Создание движка#
Для создания движка используйте команду CLI catalog engine create:
catalog engine create \
--engine-name=<имя> \
--type=spark \
[--property=<ключ>=<значение>]...
Опции:
--engine-name Уникальное имя экземпляра движка. Обязательный параметр.
--type Тип движка. Для Spark-движка укажите значение spark. Обязательный параметр.
--property Свойство движка в формате ключ=значение. Может быть указано несколько раз.
--description Текстовое описание движка (опционально).
Другие команды управления движками#
Команда |
Описание |
|---|---|
|
Список зарегистрированных движков |
|
Информация о движке и его свойствах |
|
Изменение свойств движка |
|
Удаление движка |
|
Проверка доступности движка |
Свойства движка#
Свойства задаются с помощью опции --property при создании или обновлении движка.
Обязательные свойства#
Свойство |
Описание |
|---|---|
|
URL мастера Spark. Определяет режим запуска. Подробнее см. Режимы запуска. |
|
URL Iceberg REST Catalog эндпоинта CedrusData Catalog, доступного со стороны драйвера Spark.
Например: |
Дополнительные свойства#
Свойство |
Описание |
|---|---|
|
Время жизни временного токена доступа к каталогу, создаваемого под конкретную операцию обслуживания; значение по умолчанию - 6 часов |
Дополнительные свойства Spark#
Движок поддерживает передачу произвольных свойств конфигурации Apache Spark с префиксом spark..
Эти свойства будут переданы в spark-submit при запуске задачи.
Примеры часто используемых свойств:
Свойство |
Описание |
|---|---|
|
Объём памяти для драйвера Spark (значение по умолчанию: |
|
Количество экземпляров исполнителей (значение по умолчанию: |
|
Объём памяти для каждого исполнителя (значение по умолчанию: |
|
Namespace Kubernetes, в котором будут создаваться поды Spark (значение по умолчанию: namespace каталога) |
|
Сервисный аккаунт, который будет использован для запуска пода драйвера (значение по умолчанию: сервисный аккаунт каталога при запуске в одном namespace или |
Режимы запуска#
Режим запуска определяется значением свойства spark.master.
Kubernetes (рекомендуемый)#
Это рекомендуемый режим для промышленной эксплуатации. В данном режиме для каждой задачи обслуживания создаётся отдельный pod драйвера Spark в кластере Kubernetes.
Значение spark.master:
При запуске каталога внутри кластера Kubernetes:
k8s(разрешается вk8s://https://kubernetes.default:443)При запуске каталога вне кластера: полный URL API-сервера, например
k8s://https://192.168.49.2:8443
Рекомендуется режим, в котором задачи обслуживания Spark запускаются в том же кластере, в котором уже развёрнут каталог.
В этом режиме достаточно указать k8s в качестве значения параметра spark.master.
Пример создания движка:
catalog engine create \
--engine-name=spark_k8s \
--type=spark \
--property=spark.master=k8s \
--property=catalog.url=http://cedrusdata-catalog:9080/catalog/iceberg
Пример с указанием namespace Kubernetes:
catalog engine create \
--engine-name=spark_k8s \
--type=spark \
--property=spark.master=k8s \
--property=catalog.url=http://cedrusdata-catalog:9080/catalog/iceberg \
--property=spark.kubernetes.namespace=spark \
--property=spark.executor.instances=2
Настройка прав доступа Kubernetes#
Для работы движка Spark в Kubernetes каталогу и драйверу Spark необходимы определённые права на создание и удаление подов и секретов.
Вариант 1 — запуск Spark в пространстве имён каталога
Если каталог и Spark работают в одном пространстве имён (например, platform), необходимо заранее создать роль и привязку роли
для сервисного аккаунта каталога.
В этом примере подразумевается, что каталог запущен с сервисным аккаунтом default в пространстве имён platform.
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: platform
name: spark-maintenance-role
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps", "persistentvolumeclaims"]
verbs: ["get", "list", "watch", "create", "delete", "patch", "update", "deletecollection"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["create", "delete", "list"]
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-maintenance-role-binding
namespace: platform
subjects:
- kind: ServiceAccount
name: default
namespace: platform
roleRef:
kind: Role
name: spark-maintenance-role
apiGroup: rbac.authorization.k8s.io
При работе в одном пространстве имён сервисный аккаунт каталога автоматически передаётся драйверу Spark, который использует его для создания и удаления подов исполнителей.
Пример создания движка:
catalog engine create \
--engine-name=spark_k8s \
--type=spark \
--property=spark.master=k8s \
--property=catalog.url=http://cedrusdata-catalog:9080/catalog/iceberg
Вариант 2 — запуск Spark в отдельном пространстве имён
Если каталог работает в пространстве имён platform, а Spark должен работать в пространстве имён spark,
необходимо определить кластерную роль для каталога, роль для для драйвера и несколько привязок ролей.
В этом примере подразумевается, что каталог запущен с сервисным аккаунтом default в пространстве имён platform.
В этом примере подразумевается, что драйвер будет запущен с сервисным аккаунтом defaultпространства имён spark (поведение по умолчанию).
Если это по какой-то причне не является желательным, то роль описанная ниже, должна быть привязана к другому сервисному аккауту,
а его имя должно быть передано в движок с помощью опции spark.kubernetes.authenticate.driver.serviceAccountName.
Кластерная роль для кросс-namespace доступа:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: cross-namespace-spark-role
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps", "persistentvolumeclaims"]
verbs: ["get", "list", "watch", "create", "delete", "patch", "update", "deletecollection"]
- apiGroups: [""]
resources: ["pods/log"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["create", "delete", "list"]
Привязка кластерной роли для сервисного аккаунта каталога.
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-cross-namespace-binding
namespace: spark
subjects:
- kind: ServiceAccount
name: default
namespace: platform
roleRef:
kind: ClusterRole
name: cross-namespace-spark-role
apiGroup: rbac.authorization.k8s.io
Роль для драйвера Spark в namespace spark:
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: spark
name: spark-driver-role
rules:
- apiGroups: [""]
resources: ["pods", "configmaps", "persistentvolumeclaims", "services"]
verbs: ["get", "create", "delete", "watch", "list", "deletecollection"]
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-driver-role-binding
namespace: spark
subjects:
- kind: ServiceAccount
name: default
namespace: spark
roleRef:
kind: Role
name: spark-driver-role
apiGroup: rbac.authorization.k8s.io
Пример создания движка:
catalog engine create \
--engine-name=spark_k8s \
--type=spark \
--property=spark.master=k8s \
--property=catalog.url=http://cedrusdata-catalog.platform.svc.cluster.local:9080/catalog/iceberg \
--property=spark.kubernetes.namespace=spark
Запуск операций обслуживания#
Операции обслуживания запускаются с помощью команды CLI catalog maintenance custom:
catalog maintenance custom \
--engine-name=<движок> \
--operation-name=<операция> \
--target-catalog=<каталог> \
--target-namespace=<namespace> \
--target-object=<таблица> \
[--parameter=<ключ>=<значение>]... \
[--engine-config=<ключ>=<значение>]...
Опции:
--engine-name Имя зарегистрированного вычислительного движка. Обязательный параметр.
--operation-name Операция обслуживания для выполнения (см. Поддерживаемые операции). Обязательный параметр.
--target-catalog Имя целевого каталога Iceberg. Обязательный параметр (если не указан --target-object-group).
--target-namespace Имя целевого namespace. Обязательный параметр (если не указан --target-object-group).
--target-object Имя целевой таблицы. Обязательный параметр (если не указан --target-object-group).
--target-object-group Имя группы объектов. Альтернатива трём предыдущим параметрам.
--parameter, -p Параметр операции обслуживания в формате ключ=значение. Может быть указан несколько раз.
--engine-config Переопределение свойства конфигурации движка для данного запуска. Может быть указан несколько раз.
Пример:
catalog maintenance custom \
--engine-name=spark_k8s \
--target-catalog=ice_cat \
--target-namespace=my_schema \
--target-object=my_table \
--operation-name=expire-snapshots \
--parameter=older-than=30 \
--parameter=retain-last=5
Мониторинг операций#
Все команды запуска операций возвращают уникальный идентификатор операции, который можно использовать для отслеживания:
catalog maintenance get --operation-id=<id>
catalog maintenance list
catalog maintenance cancel --operation-id=<id>
Поддерживаемые операции#
Движок Spark поддерживает пять операций обслуживания Iceberg.
В основном, параметры операций и их семантика напрямую отражают спецификацию Spark Iceberg Maintenance
Однако, возможны минимальные корректировки, к примеру, параметр older_than в CedrusData Catalog принимает целочисленное значение в днях, а не timestamp.
rewrite-data-files#
Компактификация мелких файлов данных в более крупные для повышения производительности чтения.
Параметры:
Параметр |
Тип |
Описание |
|---|---|---|
|
string |
Стратегия перезаписи: |
|
string |
Выражение сортировки (используется со стратегией |
|
string |
Фильтр — перезаписываются только файлы, соответствующие условию |
|
long |
Целевой размер выходного файла в байтах |
|
long |
Файлы меньше указанного размера являются кандидатами на перезапись |
|
long |
Файлы больше указанного размера являются кандидатами на перезапись |
|
int |
Минимальное количество входных файлов для запуска группы перезаписи |
|
boolean |
Перезаписать все файлы независимо от размера |
|
int |
Максимальное количество одновременно перезаписываемых групп файлов |
|
long |
Максимальный суммарный размер файлов в одной группе перезаписи |
|
boolean |
Разрешить инкрементальную фиксацию результатов |
|
int |
Максимальное количество коммитов для инкрементальной фиксации |
|
int |
Максимальное количество неудачных коммитов до прерывания |
|
string |
Порядок выполнения задач перезаписи |
|
boolean |
Использовать sequence number начала операции вместо каждого коммита |
|
long |
Минимальное количество delete-файлов для кандидата на перезапись |
|
double |
Минимальное соотношение удалённых записей для кандидата на перезапись |
|
int |
ID спецификации партиционирования для выходных файлов |
|
boolean |
Удалить delete-файлы, не относящиеся ни к одному живому файлу данных |
rewrite-manifests#
Перестроение manifest-файлов таблицы для повышения производительности планирования запросов.
Параметры:
Параметр |
Тип |
Описание |
|---|---|---|
|
boolean |
Кэшировать данные таблицы во время планирования. Значение по умолчанию: |
rewrite-position-delete-files#
Компактификация position delete-файлов для уменьшения количества мелких delete-файлов и повышения производительности чтения.
Параметры:
Параметр |
Тип |
Описание |
|---|---|---|
|
string |
Фильтр — перезаписываются только delete-файлы, соответствующие условию |
|
long |
Целевой размер выходного файла в байтах |
|
long |
Файлы меньше указанного размера являются кандидатами |
|
long |
Файлы больше указанного размера являются кандидатами |
|
int |
Минимальное количество входных файлов для запуска группы перезаписи |
|
boolean |
Перезаписать все delete-файлы независимо от размера |
|
int |
Максимальное количество одновременных задач перезаписи |
|
long |
Максимальный суммарный размер файлов в одной группе |
|
int |
Максимальное общее количество delete-файлов для перезаписи |
|
boolean |
Разрешить инкрементальную фиксацию результатов |
|
int |
Максимальное количество коммитов для инкрементальной фиксации |
|
string |
Порядок выполнения задач перезаписи |
expire-snapshots#
Удаление устаревших snapshot Iceberg и файлов данных, на которые не ссылается ни один живой snapshot.
Параметры:
Параметр |
Тип |
Описание |
|---|---|---|
|
int |
файлы старше скольки дней можно удалять; значение по умолчанию — 5 |
|
int |
Минимальное количество snapshot, которое необходимо сохранить независимо от |
|
int |
Максимальное количество одновременно удаляемых файлов |
|
boolean |
Также удалять устаревшие файлы метаданных. Значение по умолчанию: |
remove-orphan-files#
Сканирование директории таблицы и удаление файлов, на которые не ссылаются метаданные Iceberg.
Параметры:
Параметр |
Тип |
Описание |
|---|---|---|
|
int |
файлы старше скольки дней можно удалять; значение по умолчанию — 5 |
|
int |
Максимальное количество одновременно удаляемых файлов |
Поддерживаемые файловые системы#
Движок Spark получает конфигурацию файловой системы из определения файловой системы CedrusData Catalog и автоматически передаёт необходимые параметры в задачу Spark.
Amazon S3 / S3-совместимые хранилища#
Поддерживаются AWS S3 и любые S3-совместимые хранилища (MinIO, Ceph и т.д.). Параметры доступа к S3 (endpoint, access key, secret key) берутся из конфигурации файловой системы каталога. Авторизация через STS на данный момент не поддерживается.
HDFS#
Поддерживается подключение к Hadoop Distributed File System.
Примечание
Механизмы аутентификации для HDFS (например, Kerberos) в настоящее время не поддерживаются.
Мониторинг статуса задач и расследование причин ошибок#
Для отслеживания статуса исполнения текущих maintenance операций в Kubernetes рекомендуется ориентироваться на следующие маркеры:
Поды драйвера и исполнителей, а также временные секреты для maintenance задач будут помечены при их старте двумя метками: cedrusdata-maintenance-operation-id и cedrusdata-maintenance-table.
Из значений этих меток можно излечь ID maintenance задачи и конкретный объект, для которого задача была запущена.
Поды драйвера и исполнителей, а также временные секреты для maintenance задач будут иметь следующий префикс в их имени: cedrusdata-spark-iceberg
Поды драйвера и исполнителей всегда удаляются после завершения задачи (как успешного, так и аварийного).
Логи драйвера всегда транслируются в лог каталога; в случае ошибки логи аварийных исполнителей также записываются в лог каталога.
В случае ошибочного завершения задачи обслуживания, в логах каталога будет содержаться подробная информация о причине ошибки, включая трассировку стека.
Также, ошибка будет отражена в статусе операции обслуживания, который можно получить с помощью команды catalog maintenance get --operation-id=<id>.
Рекомендации по использованию#
При запуске операций обслуживания с помощью движка Spark рекомендуется придерживаться следующих практик:
При создании движка, определяйте базовые параметры Spark, такие как
spark.executor.instancesиspark.executor.memory, исходя из размера таблицы и ожидаемой нагрузки.При запуске операций, если необходимо, переопределяйте параметры Spark для конкретной операции с помощью опции
--engine-config. Это позволяет гибко управлять ресурсами для каждой операции.Группируйте таблицы по размеру (больщие/средние/маленькие) и запускайте операции для групп, чтобы оптимизировать использование ресурсов и время выполнения и избежать аварийного завершения задач из-за нехватки ресурсов.
Регулируйте нагрузку на кластер Kubernetes через настройку параметра конфигурации
maintenance.max-active-operations(см. Конфигурация Iceberg)При работе с большими таблицами, которые никогда ранее не обслуживались, рекомендуется сначала запустить операцию
expire-snapshotsилиremove-orphan-filesс помощью движка Spark, чтобы удалить большое количество устаревших файлов, а затем уже запускать ресурсоёмкие операции, такие какrewrite-data-files.
Безопасность#
Передача учётных данных#
Для каждой операции обслуживания генерируется временный токен доступа к каталогу
Токен доступа и ключи S3 передаются в Kubernetes через механизм Секретов. После завершения работы операции, секреты автоматически удаляются.
Контроль доступа#
Пользователь, запускающий операцию обслуживания через движок Spark, должен обладать привилегией compute-engine.start-operation для соответствующего вычислительного движка.