Движок обслуживания Apache Spark#

CedrusData Catalog поддерживает выполнение операций обслуживания таблиц Iceberg с помощью Apache Spark. Движок Spark позволяет выполнять ресурсоёмкие операции обслуживания (компактификация файлов, перестроение манифестов и т.д.) в распределённом режиме, используя вычислительные ресурсы кластера.

Общая информация#

Движок обслуживания Apache Spark интегрирован в CedrusData Catalog через SPI вычислительных движков. При запуске операции обслуживания каталог формирует задачу Spark и передаёт её на выполнение через spark-submit. Задача выполняется в виде отдельного процесса (или пода Kubernetes), который получает необходимые параметры подключения к каталогу и файловой системе, выполняет запрошенную операцию и возвращает результат.

Рекомендуемый режим работы — Kubernetes. В этом режиме для каждой операции обслуживания создаётся отдельный pod драйвера Spark, который автоматически удаляется после завершения задачи.

Остальные режимы работы со Spark формально поддерживаются в рамках каталога, но на данный момент не рекомендуются для промышленной эксплуатации. Их поддержка будет улучшаться в последующих релизах каталога.

Запуск каталога с поддержкой Spark#

Для использования движка обслуживания Apache Spark необходимо, чтобы на хосте каталога было доступно окружение Apache Spark.

Запуск из Docker-образа#

Для работы с движком Spark используйте специальную версию Docker-образа CedrusData Catalog со встроенным Apache Spark:

cr.yandex/crpjtvqf29mpabhmrf1s/cedrusdata-catalog:476-1-spark-3.5.8

Данный образ содержит предустановленный Apache Spark 3.5 и не требует дополнительной конфигурации SPARK_HOME.

Запуск из архива#

При запуске CedrusData Catalog из архива необходимо установить переменную окружения SPARK_HOME, указывающую на директорию установки Apache Spark или задать параметр конфигурации каталога maintenance.engine.spark.home.

export SPARK_HOME=/path/to/spark
maintenance.engine.spark.home=/path/to/spark

Убедитесь, что версия Apache Spark совместима с версией, поддерживаемой CedrusData Catalog (в настоящее время — 3.5 и 4.0).

Kubernetes: образ для задач обслуживания#

При работе в режиме Kubernetes движок Spark создаёт поды, используя Docker-образ с предустановленным JAR-файлом задачи обслуживания. Имя этого образа необходимо задать с помощью параметра конфигурации каталога maintenance.engine.spark.image (см. Конфигурация Iceberg).

maintenance.engine.spark.image=cr.yandex/crpjtvqf29mpabhmrf1s/cedrusdata-catalog-spark-maintenance:476-1

Управление движком Spark#

Перед запуском операций обслуживания необходимо создать экземпляр вычислительного движка типа spark.

Создание движка#

Для создания движка используйте команду CLI catalog engine create:

catalog engine create \
  --engine-name=<имя> \
  --type=spark \
  [--property=<ключ>=<значение>]...

Опции:

--engine-name Уникальное имя экземпляра движка. Обязательный параметр.

--type Тип движка. Для Spark-движка укажите значение spark. Обязательный параметр.

--property Свойство движка в формате ключ=значение. Может быть указано несколько раз.

--description Текстовое описание движка (опционально).

Другие команды управления движками#

Команда

Описание

catalog engine list

Список зарегистрированных движков

catalog engine get --engine-name=<имя>

Информация о движке и его свойствах

catalog engine update --engine-name=<имя> ...

Изменение свойств движка

catalog engine delete --engine-name=<имя>

Удаление движка

catalog engine check --engine-name=<имя>

Проверка доступности движка

Свойства движка#

Свойства задаются с помощью опции --property при создании или обновлении движка.

Обязательные свойства#

Свойство

Описание

spark.master

URL мастера Spark. Определяет режим запуска. Подробнее см. Режимы запуска.

catalog.url

URL Iceberg REST Catalog эндпоинта CedrusData Catalog, доступного со стороны драйвера Spark. Например: http://catalog:9080/catalog/iceberg.

Дополнительные свойства#

Свойство

Описание

catalog.token.ttl

Время жизни временного токена доступа к каталогу, создаваемого под конкретную операцию обслуживания; значение по умолчанию - 6 часов

Дополнительные свойства Spark#

Движок поддерживает передачу произвольных свойств конфигурации Apache Spark с префиксом spark.. Эти свойства будут переданы в spark-submit при запуске задачи.

Примеры часто используемых свойств:

Свойство

Описание

spark.driver.memory

Объём памяти для драйвера Spark (значение по умолчанию: 1g)

spark.executor.instances

Количество экземпляров исполнителей (значение по умолчанию: 2)

spark.executor.memory

Объём памяти для каждого исполнителя (значение по умолчанию: 1g)

spark.kubernetes.namespace

Namespace Kubernetes, в котором будут создаваться поды Spark (значение по умолчанию: namespace каталога)

spark.kubernetes.authenticate.driver.serviceAccountName

Сервисный аккаунт, который будет использован для запуска пода драйвера (значение по умолчанию: сервисный аккаунт каталога при запуске в одном namespace или default при запуске в другом namespace)

Режимы запуска#

Режим запуска определяется значением свойства spark.master.

Kubernetes (рекомендуемый)#

Это рекомендуемый режим для промышленной эксплуатации. В данном режиме для каждой задачи обслуживания создаётся отдельный pod драйвера Spark в кластере Kubernetes.

Значение spark.master:

  • При запуске каталога внутри кластера Kubernetes: k8s (разрешается в k8s://https://kubernetes.default:443)

  • При запуске каталога вне кластера: полный URL API-сервера, например k8s://https://192.168.49.2:8443

Рекомендуется режим, в котором задачи обслуживания Spark запускаются в том же кластере, в котором уже развёрнут каталог. В этом режиме достаточно указать k8s в качестве значения параметра spark.master.

Пример создания движка:

catalog engine create \
  --engine-name=spark_k8s \
  --type=spark \
  --property=spark.master=k8s \
  --property=catalog.url=http://cedrusdata-catalog:9080/catalog/iceberg

Пример с указанием namespace Kubernetes:

catalog engine create \
  --engine-name=spark_k8s \
  --type=spark \
  --property=spark.master=k8s \
  --property=catalog.url=http://cedrusdata-catalog:9080/catalog/iceberg \
  --property=spark.kubernetes.namespace=spark \
  --property=spark.executor.instances=2

Настройка прав доступа Kubernetes#

Для работы движка Spark в Kubernetes каталогу и драйверу Spark необходимы определённые права на создание и удаление подов и секретов.

Вариант 1 — запуск Spark в пространстве имён каталога

Если каталог и Spark работают в одном пространстве имён (например, platform), необходимо заранее создать роль и привязку роли для сервисного аккаунта каталога.

В этом примере подразумевается, что каталог запущен с сервисным аккаунтом default в пространстве имён platform.

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
    namespace: platform
    name: spark-maintenance-role
rules:
    - apiGroups: [""]
      resources: ["pods", "services", "configmaps", "persistentvolumeclaims"]
      verbs: ["get", "list", "watch", "create", "delete", "patch", "update", "deletecollection"]
    - apiGroups: [""]
      resources: ["pods/log"]
      verbs: ["get", "list", "watch"]
    - apiGroups: [""]
      resources: ["secrets"]
      verbs: ["create", "delete", "list"]
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
    name: spark-maintenance-role-binding
    namespace: platform
subjects:
    - kind: ServiceAccount
      name: default
      namespace: platform
roleRef:
    kind: Role
    name: spark-maintenance-role
    apiGroup: rbac.authorization.k8s.io

При работе в одном пространстве имён сервисный аккаунт каталога автоматически передаётся драйверу Spark, который использует его для создания и удаления подов исполнителей.

Пример создания движка:

catalog engine create \
  --engine-name=spark_k8s \
  --type=spark \
  --property=spark.master=k8s \
  --property=catalog.url=http://cedrusdata-catalog:9080/catalog/iceberg

Вариант 2 — запуск Spark в отдельном пространстве имён

Если каталог работает в пространстве имён platform, а Spark должен работать в пространстве имён spark, необходимо определить кластерную роль для каталога, роль для для драйвера и несколько привязок ролей.

В этом примере подразумевается, что каталог запущен с сервисным аккаунтом default в пространстве имён platform.

В этом примере подразумевается, что драйвер будет запущен с сервисным аккаунтом defaultпространства имён spark (поведение по умолчанию). Если это по какой-то причне не является желательным, то роль описанная ниже, должна быть привязана к другому сервисному аккауту, а его имя должно быть передано в движок с помощью опции spark.kubernetes.authenticate.driver.serviceAccountName.

Кластерная роль для кросс-namespace доступа:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
    name: cross-namespace-spark-role
rules:
    - apiGroups: [""]
      resources: ["pods", "services", "configmaps", "persistentvolumeclaims"]
      verbs: ["get", "list", "watch", "create", "delete", "patch", "update", "deletecollection"]
    - apiGroups: [""]
      resources: ["pods/log"]
      verbs: ["get", "list", "watch"]
    - apiGroups: [""]
      resources: ["secrets"]
      verbs: ["create", "delete", "list"]

Привязка кластерной роли для сервисного аккаунта каталога.

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
    name: spark-cross-namespace-binding
    namespace: spark
subjects:
    - kind: ServiceAccount
      name: default
      namespace: platform
roleRef:
    kind: ClusterRole
    name: cross-namespace-spark-role
    apiGroup: rbac.authorization.k8s.io

Роль для драйвера Spark в namespace spark:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
    namespace: spark
    name: spark-driver-role
rules:
    - apiGroups: [""]
      resources: ["pods", "configmaps", "persistentvolumeclaims", "services"]
      verbs: ["get", "create", "delete", "watch", "list", "deletecollection"]
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
    name: spark-driver-role-binding
    namespace: spark
subjects:
    - kind: ServiceAccount
      name: default
      namespace: spark
roleRef:
    kind: Role
    name: spark-driver-role
    apiGroup: rbac.authorization.k8s.io

Пример создания движка:

catalog engine create \
  --engine-name=spark_k8s \
  --type=spark \
  --property=spark.master=k8s \
  --property=catalog.url=http://cedrusdata-catalog.platform.svc.cluster.local:9080/catalog/iceberg \
  --property=spark.kubernetes.namespace=spark

Запуск операций обслуживания#

Операции обслуживания запускаются с помощью команды CLI catalog maintenance custom:

catalog maintenance custom \
  --engine-name=<движок> \
  --operation-name=<операция> \
  --target-catalog=<каталог> \
  --target-namespace=<namespace> \
  --target-object=<таблица> \
  [--parameter=<ключ>=<значение>]... \
  [--engine-config=<ключ>=<значение>]...

Опции:

--engine-name Имя зарегистрированного вычислительного движка. Обязательный параметр.

--operation-name Операция обслуживания для выполнения (см. Поддерживаемые операции). Обязательный параметр.

--target-catalog Имя целевого каталога Iceberg. Обязательный параметр (если не указан --target-object-group).

--target-namespace Имя целевого namespace. Обязательный параметр (если не указан --target-object-group).

--target-object Имя целевой таблицы. Обязательный параметр (если не указан --target-object-group).

--target-object-group Имя группы объектов. Альтернатива трём предыдущим параметрам.

--parameter, -p Параметр операции обслуживания в формате ключ=значение. Может быть указан несколько раз.

--engine-config Переопределение свойства конфигурации движка для данного запуска. Может быть указан несколько раз.

Пример:

catalog maintenance custom \
  --engine-name=spark_k8s \
  --target-catalog=ice_cat \
  --target-namespace=my_schema \
  --target-object=my_table \
  --operation-name=expire-snapshots \
  --parameter=older-than=30 \
  --parameter=retain-last=5

Мониторинг операций#

Все команды запуска операций возвращают уникальный идентификатор операции, который можно использовать для отслеживания:

catalog maintenance get --operation-id=<id>
catalog maintenance list
catalog maintenance cancel --operation-id=<id>

Поддерживаемые операции#

Движок Spark поддерживает пять операций обслуживания Iceberg.

В основном, параметры операций и их семантика напрямую отражают спецификацию Spark Iceberg Maintenance

Однако, возможны минимальные корректировки, к примеру, параметр older_than в CedrusData Catalog принимает целочисленное значение в днях, а не timestamp.

rewrite-data-files#

Компактификация мелких файлов данных в более крупные для повышения производительности чтения.

Параметры:

Параметр

Тип

Описание

strategy

string

Стратегия перезаписи: binpack (по умолчанию) или sort

sort-order

string

Выражение сортировки (используется со стратегией sort)

where

string

Фильтр — перезаписываются только файлы, соответствующие условию

target-file-size-bytes

long

Целевой размер выходного файла в байтах

min-file-size-bytes

long

Файлы меньше указанного размера являются кандидатами на перезапись

max-file-size-bytes

long

Файлы больше указанного размера являются кандидатами на перезапись

min-input-files

int

Минимальное количество входных файлов для запуска группы перезаписи

rewrite-all

boolean

Перезаписать все файлы независимо от размера

max-concurrent-file-group-rewrites

int

Максимальное количество одновременно перезаписываемых групп файлов

max-file-group-size-bytes

long

Максимальный суммарный размер файлов в одной группе перезаписи

partial-progress.enabled

boolean

Разрешить инкрементальную фиксацию результатов

partial-progress.max-commits

int

Максимальное количество коммитов для инкрементальной фиксации

partial-progress-max-failed-commits

int

Максимальное количество неудачных коммитов до прерывания

rewrite-job-order

string

Порядок выполнения задач перезаписи

use-starting-sequence-number

boolean

Использовать sequence number начала операции вместо каждого коммита

delete-file-threshold

long

Минимальное количество delete-файлов для кандидата на перезапись

delete-ratio-threshold

double

Минимальное соотношение удалённых записей для кандидата на перезапись

output-spec-id

int

ID спецификации партиционирования для выходных файлов

remove-dangling-deletes

boolean

Удалить delete-файлы, не относящиеся ни к одному живому файлу данных

rewrite-manifests#

Перестроение manifest-файлов таблицы для повышения производительности планирования запросов.

Параметры:

Параметр

Тип

Описание

use-caching

boolean

Кэшировать данные таблицы во время планирования. Значение по умолчанию: true

rewrite-position-delete-files#

Компактификация position delete-файлов для уменьшения количества мелких delete-файлов и повышения производительности чтения.

Параметры:

Параметр

Тип

Описание

where

string

Фильтр — перезаписываются только delete-файлы, соответствующие условию

target-file-size-bytes

long

Целевой размер выходного файла в байтах

min-file-size-bytes

long

Файлы меньше указанного размера являются кандидатами

max-file-size-bytes

long

Файлы больше указанного размера являются кандидатами

min-input-files

int

Минимальное количество входных файлов для запуска группы перезаписи

rewrite-all

boolean

Перезаписать все delete-файлы независимо от размера

max-concurrent-file-group-rewrites

int

Максимальное количество одновременных задач перезаписи

max-file-group-size-bytes

long

Максимальный суммарный размер файлов в одной группе

max-files-to-rewrite

int

Максимальное общее количество delete-файлов для перезаписи

partial-progress-enabled

boolean

Разрешить инкрементальную фиксацию результатов

partial-progress-max-commits

int

Максимальное количество коммитов для инкрементальной фиксации

rewrite-job-order

string

Порядок выполнения задач перезаписи

expire-snapshots#

Удаление устаревших snapshot Iceberg и файлов данных, на которые не ссылается ни один живой snapshot.

Параметры:

Параметр

Тип

Описание

older-than

int

файлы старше скольки дней можно удалять; значение по умолчанию — 5

retain-last

int

Минимальное количество snapshot, которое необходимо сохранить независимо от older-than

max-concurrent-deletes

int

Максимальное количество одновременно удаляемых файлов

clean-expired-metadata

boolean

Также удалять устаревшие файлы метаданных. Значение по умолчанию: true

remove-orphan-files#

Сканирование директории таблицы и удаление файлов, на которые не ссылаются метаданные Iceberg.

Параметры:

Параметр

Тип

Описание

older-than

int

файлы старше скольки дней можно удалять; значение по умолчанию — 5

max-concurrent-deletes

int

Максимальное количество одновременно удаляемых файлов

Поддерживаемые файловые системы#

Движок Spark получает конфигурацию файловой системы из определения файловой системы CedrusData Catalog и автоматически передаёт необходимые параметры в задачу Spark.

Amazon S3 / S3-совместимые хранилища#

Поддерживаются AWS S3 и любые S3-совместимые хранилища (MinIO, Ceph и т.д.). Параметры доступа к S3 (endpoint, access key, secret key) берутся из конфигурации файловой системы каталога. Авторизация через STS на данный момент не поддерживается.

HDFS#

Поддерживается подключение к Hadoop Distributed File System.

Примечание

Механизмы аутентификации для HDFS (например, Kerberos) в настоящее время не поддерживаются.

Мониторинг статуса задач и расследование причин ошибок#

Для отслеживания статуса исполнения текущих maintenance операций в Kubernetes рекомендуется ориентироваться на следующие маркеры:

Поды драйвера и исполнителей, а также временные секреты для maintenance задач будут помечены при их старте двумя метками: cedrusdata-maintenance-operation-id и cedrusdata-maintenance-table.

Из значений этих меток можно излечь ID maintenance задачи и конкретный объект, для которого задача была запущена.

Поды драйвера и исполнителей, а также временные секреты для maintenance задач будут иметь следующий префикс в их имени: cedrusdata-spark-iceberg

Поды драйвера и исполнителей всегда удаляются после завершения задачи (как успешного, так и аварийного).

Логи драйвера всегда транслируются в лог каталога; в случае ошибки логи аварийных исполнителей также записываются в лог каталога.

В случае ошибочного завершения задачи обслуживания, в логах каталога будет содержаться подробная информация о причине ошибки, включая трассировку стека.

Также, ошибка будет отражена в статусе операции обслуживания, который можно получить с помощью команды catalog maintenance get --operation-id=<id>.

Рекомендации по использованию#

При запуске операций обслуживания с помощью движка Spark рекомендуется придерживаться следующих практик:

  • При создании движка, определяйте базовые параметры Spark, такие как spark.executor.instances и spark.executor.memory, исходя из размера таблицы и ожидаемой нагрузки.

  • При запуске операций, если необходимо, переопределяйте параметры Spark для конкретной операции с помощью опции --engine-config. Это позволяет гибко управлять ресурсами для каждой операции.

  • Группируйте таблицы по размеру (больщие/средние/маленькие) и запускайте операции для групп, чтобы оптимизировать использование ресурсов и время выполнения и избежать аварийного завершения задач из-за нехватки ресурсов.

  • Регулируйте нагрузку на кластер Kubernetes через настройку параметра конфигурации maintenance.max-active-operations (см. Конфигурация Iceberg)

  • При работе с большими таблицами, которые никогда ранее не обслуживались, рекомендуется сначала запустить операцию expire-snapshots или remove-orphan-files с помощью движка Spark, чтобы удалить большое количество устаревших файлов, а затем уже запускать ресурсоёмкие операции, такие как rewrite-data-files.

Безопасность#

Передача учётных данных#

  • Для каждой операции обслуживания генерируется временный токен доступа к каталогу

  • Токен доступа и ключи S3 передаются в Kubernetes через механизм Секретов. После завершения работы операции, секреты автоматически удаляются.

Контроль доступа#

Пользователь, запускающий операцию обслуживания через движок Spark, должен обладать привилегией compute-engine.start-operation для соответствующего вычислительного движка.