Рукодоство использования CedrusData Catalog при работе через PyIceberg#

Данное руководство пример конфигурирования Apache PyIceberg и пример наполнения Iceberg-таблицы в S3.

1. Подготовка стенда#

Для выполнения примера потребуется запустить S3, CedrusData Catalog в docker-compose и установить необходимые библиотеки Python.

  1. Убедитесь, что у вас установлен Docker и Docker Compose.

  2. Создайте новый рабочий каталог, например iceberg:

mkdir ~/iceberg
cd ~/iceberg
  1. Создайте файл docker-compose.yaml со следующим содержимым:

services:
  catalog:
    image: cr.yandex/crpjtvqf29mpabhmrf1s/cedrusdata-catalog:476-1
    container_name: catalog
    ports:
      - "9080:9080"

  minio:
    image: minio/minio:latest
    container_name: minio
    command: server /data --console-address ":9001"
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin

  minio-init:
    image: minio/mc:latest
    depends_on:
      - minio
    environment:
      MC_HOST_s3: "http://minioadmin:minioadmin@minio:9000"
    entrypoint: ["mc", "mb", "-p", "s3/mybucket"]
    restart: "no"
  1. Запустите контейнеры командой: docker compose up -d --wait

  2. Проверьте что Catalog запущен успешно и бакет создан:

docker logs catalog
# expected: ... ======== SERVER STARTED ========

docker run --rm --network container:minio -e MC_HOST_s3="http://minioadmin:minioadmin@127.0.0.1:9000" minio/mc ls s3
# expected: [.... UTC]    0B mybucket/
  1. Выполните инициализацию Catalog:

docker exec -it catalog /bin/catalog file-system create --file-system-name s3_fs --type s3 -p endpoint=http://minio:9000 -p access-key=minioadmin -p secret-key=minioadmin -p path-style-access=true
docker exec -it catalog /bin/catalog iceberg catalog create --catalog-name ice_cat --file-system-name s3_fs --file-system-location s3://mybucket
docker exec -it catalog /bin/catalog principal create --principal-name admin
docker exec -it catalog /bin/catalog security grant-role --principal-name admin --role builtin.iceberg.admin
  1. Сохраните административный токен доступа к Catalog (потребуется позже)

docker exec -it catalog cat /data/catalog/catalog-admin-access-token

2. Подготовка окружения Python#

  1. Убедитесь что у вас установлен Pythin версии не ниже 3.10

  2. Установите пакеты зависимотей:

pip install "pyiceberg[pyarrow,s3fs]" pyarrow

3. Пример работы с Iceberg и Catalog через библиотеку pyIceberg#

В данном примере будет создан новый неймспейс s3_sbx и таблица test_ice в нём.

  1. Создайте файл ice.py с содержимым:

В файле укажите корректное значение для опции "token" (полученное ранее из файла catalog-admin-access-token).

import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError
from datetime import datetime, UTC

table_id = f"s3_sbx.test_ice"

catalog = load_catalog(
    "ice_cat",
    **{
        "type": "rest",
        "uri": "http://localhost:9080/catalog/iceberg",

        "warehouse": "ice_cat",

        "token": "<put-your-token-here>",
        "header.X-Iceberg-Access-Delegation": "",  # должно быть пустым

        "py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",

        "s3.endpoint": "http://localhost:9000",
        "s3.access-key-id": "minioadmin",
        "s3.secret-access-key": "minioadmin",
        "s3.path-style-access": "true",
        "s3.region": "us-east-1",
    }
)

try:
    catalog.create_namespace("s3_sbx")
except NamespaceAlreadyExistsError:
    print(f"Namespace already exists")

arrow_schema = pa.schema([
    pa.field("event_id", pa.int64(), nullable=False),
    pa.field("event_type", pa.string(), nullable=False),
    pa.field("ts", pa.timestamp("us"), nullable=False),
])

try:
    table = catalog.load_table(table_id)
except Exception:
    table = catalog.create_table(
        identifier=table_id,
        schema=arrow_schema,
    )

arrow_tbl = pa.Table.from_pylist(
    [
        {"event_id": 1, "event_type": "click", "ts": datetime.now(UTC)},
        {"event_id": 2, "event_type": "view",  "ts": datetime.now(UTC)},
    ],
    schema=arrow_schema
)

table.append(arrow_tbl)

print("Done: Appended", arrow_tbl.num_rows, "rows into table", table_id)
  1. Запустите код командой:

python3 ice.py

4. Остановите запущенные контейнеры#

docker compose down