Рукодоство использования CedrusData Catalog при работе через PyIceberg#
Данное руководство пример конфигурирования Apache PyIceberg и пример наполнения Iceberg-таблицы в S3.
1. Подготовка стенда#
Для выполнения примера потребуется запустить S3, CedrusData Catalog в docker-compose и установить необходимые библиотеки Python.
Убедитесь, что у вас установлен Docker и Docker Compose.
Создайте новый рабочий каталог, например
iceberg:
mkdir ~/iceberg
cd ~/iceberg
Создайте файл
docker-compose.yamlсо следующим содержимым:
services:
catalog:
image: cr.yandex/crpjtvqf29mpabhmrf1s/cedrusdata-catalog:476-1
container_name: catalog
ports:
- "9080:9080"
minio:
image: minio/minio:latest
container_name: minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
minio-init:
image: minio/mc:latest
depends_on:
- minio
environment:
MC_HOST_s3: "http://minioadmin:minioadmin@minio:9000"
entrypoint: ["mc", "mb", "-p", "s3/mybucket"]
restart: "no"
Запустите контейнеры командой:
docker compose up -d --waitПроверьте что Catalog запущен успешно и бакет создан:
docker logs catalog
# expected: ... ======== SERVER STARTED ========
docker run --rm --network container:minio -e MC_HOST_s3="http://minioadmin:minioadmin@127.0.0.1:9000" minio/mc ls s3
# expected: [.... UTC] 0B mybucket/
Выполните инициализацию Catalog:
docker exec -it catalog /bin/catalog file-system create --file-system-name s3_fs --type s3 -p endpoint=http://minio:9000 -p access-key=minioadmin -p secret-key=minioadmin -p path-style-access=true
docker exec -it catalog /bin/catalog iceberg catalog create --catalog-name ice_cat --file-system-name s3_fs --file-system-location s3://mybucket
docker exec -it catalog /bin/catalog principal create --principal-name admin
docker exec -it catalog /bin/catalog security grant-role --principal-name admin --role builtin.iceberg.admin
Сохраните административный токен доступа к Catalog (потребуется позже)
docker exec -it catalog cat /data/catalog/catalog-admin-access-token
2. Подготовка окружения Python#
Убедитесь что у вас установлен Pythin версии не ниже 3.10
Установите пакеты зависимотей:
pip install "pyiceberg[pyarrow,s3fs]" pyarrow
3. Пример работы с Iceberg и Catalog через библиотеку pyIceberg#
В данном примере будет создан новый неймспейс s3_sbx и таблица test_ice в нём.
Создайте файл
ice.pyс содержимым:
В файле укажите корректное значение для опции "token" (полученное ранее из файла catalog-admin-access-token).
import pyarrow as pa
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError
from datetime import datetime, UTC
table_id = f"s3_sbx.test_ice"
catalog = load_catalog(
"ice_cat",
**{
"type": "rest",
"uri": "http://localhost:9080/catalog/iceberg",
"warehouse": "ice_cat",
"token": "<put-your-token-here>",
"header.X-Iceberg-Access-Delegation": "", # должно быть пустым
"py-io-impl": "pyiceberg.io.pyarrow.PyArrowFileIO",
"s3.endpoint": "http://localhost:9000",
"s3.access-key-id": "minioadmin",
"s3.secret-access-key": "minioadmin",
"s3.path-style-access": "true",
"s3.region": "us-east-1",
}
)
try:
catalog.create_namespace("s3_sbx")
except NamespaceAlreadyExistsError:
print(f"Namespace already exists")
arrow_schema = pa.schema([
pa.field("event_id", pa.int64(), nullable=False),
pa.field("event_type", pa.string(), nullable=False),
pa.field("ts", pa.timestamp("us"), nullable=False),
])
try:
table = catalog.load_table(table_id)
except Exception:
table = catalog.create_table(
identifier=table_id,
schema=arrow_schema,
)
arrow_tbl = pa.Table.from_pylist(
[
{"event_id": 1, "event_type": "click", "ts": datetime.now(UTC)},
{"event_id": 2, "event_type": "view", "ts": datetime.now(UTC)},
],
schema=arrow_schema
)
table.append(arrow_tbl)
print("Done: Appended", arrow_tbl.num_rows, "rows into table", table_id)
Запустите код командой:
python3 ice.py
4. Остановите запущенные контейнеры#
docker compose down