Delta Lake коннектор#
Примечание
Ниже приведена оригинальная документация Trino. Скоро мы ее переведем на русский язык и дополним полезными примерами.
The Delta Lake connector allows querying data stored in Delta Lake format, including Databricks Delta Lake. It can natively read the Delta transaction log, and thus detect when external systems change data.
Требования#
To connect to Databricks Delta Lake, you need:
Tables written by Databricks Runtime 7.3 LTS, 9.1 LTS, 10.4 LTS and 11.3 LTS are supported.
Deployments using AWS, HDFS, Azure Storage, and Google Cloud Storage (GCS) are fully supported.
Network access from the coordinator and workers to the Delta Lake storage.
Access to the Hive metastore service (HMS) of Delta Lake or a separate HMS.
Network access to the HMS from the coordinator and workers. Port 9083 is the default port for the Thrift protocol used by the HMS.
Конфигурация#
The connector requires a Hive metastore for table metadata and supports the same
metastore configuration properties as the Hive connector. At a minimum, hive.metastore.uri
must be configured.
The connector recognizes Delta tables created in the metastore by the Databricks runtime. If non-Delta tables are present in the metastore, as well, they are not visible to the connector.
To configure the Delta Lake connector, create a catalog properties file
etc/catalog/example.properties
that references the delta-lake
connector. Update the hive.metastore.uri
with the URI of your Hive metastore
Thrift service:
connector.name=delta-lake
hive.metastore.uri=thrift://example.net:9083
If you are using AWS Glue as Hive metastore, you can simply set the metastore to
glue
:
connector.name=delta-lake
hive.metastore=glue
The Delta Lake connector reuses certain functionalities from the Hive connector, including the metastore Thrift and Glue configuration, detailed in the Hive connector documentation.
To configure access to S3 and S3-compatible storage, Azure storage, and others, consult the appropriate section of the Hive documentation.
Configuration properties#
The following configuration properties are all using reasonable, tested default values. Typical usage does not require you to configure them.
Property name |
Description |
Default |
---|---|---|
|
Frequency of checks for metadata updates, equivalent to transactions, to update the metadata cache specified in duration. |
|
|
The maximum number of Delta table metadata entries to cache. |
1000 |
|
Amount of memory allocated for caching information about files. Needs
to be specified in data size values such as |
|
|
Caching duration for active files which correspond to the Delta Lake tables. |
|
|
The compression codec to be used when writing new data files. Possible values are
|
|
|
Maximum number of partitions per writer. |
100 |
|
Hide information about tables that are not managed by Delta Lake. Hiding only applies to tables with the metadata managed in a Glue catalog, does not apply to usage with a Hive metastore service. |
|
|
Enable write support for all supported file systems, specifically take note of the warning about concurrency and checkpoints. |
|
|
Default integer count to write transaction log checkpoint entries. If
the value is set to N, then checkpoints are written after every Nth
statement performing table writes. The value can be overridden for a
specific table with the |
10 |
|
Name of the catalog to which |
|
|
Enable writing row statistics to checkpoint files. |
|
|
Duration to wait for completion of dynamic filtering during split generation. |
|
|
Enables Table statistics for performance improvements. |
|
|
Maximum number of metastore data objects per transaction in the Hive metastore cache. |
|
|
Whether schema locations should be deleted when Trino can’t determine whether they contain external files. |
|
|
Time zone for Parquet read and write. |
JVM default |
|
Target maximum size of written files; the actual size may be larger. |
|
|
Use randomized, unique table locations. |
|
|
Enable to allow users to call the |
|
The following table describes performance tuning catalog properties for the connector.
Предупреждение
Performance tuning configuration properties are considered expert-level features. Altering these properties from their default values is likely to cause instability and performance degradation. We strongly suggest that you use them only to address non-trivial performance issues, and that you keep a backup of the original values if you change them.
Property name |
Description |
Default |
---|---|---|
|
Minimum size of query predicates above which Trino compacts the predicates. Pushing a large list of predicates down to the data source can compromise performance. For optimization in that situation, Trino can compact the large predicates. If necessary, adjust the threshold to ensure a balance between performance and predicate pushdown. |
100 |
|
The target number of buffered splits for each table scan in a query, before the scheduler tries to pause. |
1000 |
|
Sets the maximum number of splits used per second to access underlying storage. Reduce this number if your limit is routinely exceeded, based on your filesystem limits. This is set to the absolute maximum value, which results in Trino maximizing the parallelization of data access by default. Attempting to set it higher results in Trino not being able to start. |
Integer.MAX_VALUE |
|
For each query, the coordinator assigns file sections to read first
at the |
200 |
|
Sets the initial data size for a single read section
assigned to a worker until |
|
|
Sets the largest data size for a single read section
assigned to a worker after max-initial-splits have been processed. You
can also use the corresponding catalog session property
|
|
|
A decimal value in the range (0, 1] used as a minimum for weights assigned to each split. A low value may improve performance on tables with small files. A higher value may improve performance for queries with highly skewed aggregations or joins. |
0.05 |
|
Sets the maximum number of rows read in a batch. |
|
|
Whether the optimized writer should be used when writing Parquet files.
The equivalent catalog session property is
|
|
|
Whether batched column readers should be used when reading Parquet files
for improved performance. Set this property to |
|
The following table describes catalog session properties supported by the Delta Lake connector to configure processing of Parquet files.
Property name |
Description |
Default |
---|---|---|
|
Whether the optimized writer should be used when writing Parquet files. |
|
|
Whether batched column readers should be used when reading Parquet files for improved performance. |
|
|
The maximum block size used when reading Parquet files. |
|
|
The maximum block size created by the Parquet writer. |
|
|
The maximum page size created by the Parquet writer. |
|
|
Maximum number of rows processed by the parquet writer in a batch. |
|
Authorization checks#
You can enable authorization checks for the connector by setting
the delta.security
property in the catalog properties file. This
property must be one of the following values:
Property value |
Description |
---|---|
|
No authorization checks are enforced. |
|
The connector relies on system-level access control. |
|
Operations that read data or metadata, such as SELECT are permitted. No operations that write data or metadata, such as CREATE TABLE, INSERT, or DELETE are allowed. |
|
Authorization checks are enforced using a catalog-level access control
configuration file whose path is specified in the |
Type mapping#
Because Trino and Delta Lake each support types that the other does not, this connector modifies some types when reading or writing data. Data types may not map the same way in both directions between Trino and the data source. Refer to the following sections for type mapping in each direction.
See the Delta Transaction Log specification for more information about supported data types in the Delta Lake table format specification.
Delta Lake to Trino type mapping#
The connector maps Delta Lake types to the corresponding Trino types following this table:
Delta Lake type |
Trino type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
No other types are supported.
Trino to Delta Lake type mapping#
The connector maps Trino types to the corresponding Delta Lake types following this table:
Trino type |
Delta Lake type |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
No other types are supported.
Table redirection#
Trino offers the possibility to transparently redirect operations on an existing table to the appropriate catalog based on the format of the table and catalog configuration.
In the context of connectors which depend on a metastore service (for example, Hive коннектор, Iceberg коннектор and Delta Lake коннектор), the metastore (Hive metastore service, AWS Glue Data Catalog) can be used to accustom tables with different table formats. Therefore, a metastore database can hold a variety of tables with different table formats.
As a concrete example, let’s use the following simple scenario which makes use of table redirection:
USE example.example_schema;
EXPLAIN SELECT * FROM example_table;
Query Plan
-------------------------------------------------------------------------
Fragment 0 [SOURCE]
...
Output[columnNames = [...]]
│ ...
└─ TableScan[table = another_catalog:example_schema:example_table]
...
The output of the EXPLAIN
statement points out the actual
catalog which is handling the SELECT
query over the table example_table
.
The table redirection functionality works also when using fully qualified names for the tables:
EXPLAIN SELECT * FROM example.example_schema.example_table;
Query Plan
-------------------------------------------------------------------------
Fragment 0 [SOURCE]
...
Output[columnNames = [...]]
│ ...
└─ TableScan[table = another_catalog:example_schema:example_table]
...
Trino offers table redirection support for the following operations:
Table read operations
Table write operations
Table management operations
Trino does not offer view redirection support.
The connector supports redirection from Delta Lake tables to Hive tables
with the delta.hive-catalog-name
catalog configuration property.
SQL support#
The connector provides read and write access to data and metadata in Delta Lake. In addition to the globally available and read operation statements, the connector supports the following features:
DML, see also Updating data
CREATE SCHEMA, see also Creating schemas
CREATE TABLE, see also Creating tables
ALTER TABLE EXECUTE#
The connector supports the following commands for use with ALTER TABLE EXECUTE.
optimize#
The optimize
command is used for rewriting the content
of the specified table so that it is merged into fewer but larger files.
In case that the table is partitioned, the data compaction
acts separately on each partition selected for optimization.
This operation improves read performance.
All files with a size below the optional file_size_threshold
parameter (default value for the threshold is 100MB
) are
merged:
ALTER TABLE test_table EXECUTE optimize
The following statement merges files in a table that are under 10 megabytes in size:
ALTER TABLE test_table EXECUTE optimize(file_size_threshold => '10MB')
You can use a WHERE
clause with the columns used to partition the table,
to filter which partitions are optimized:
ALTER TABLE test_partitioned_table EXECUTE optimize
WHERE partition_key = 1
Special columns#
In addition to the defined columns, the Delta Lake connector automatically exposes metadata in a number of hidden columns in each table. You can use these columns in your SQL statements like any other column, e.g., they can be selected directly or used in conditional statements.
$path
Full file system path name of the file for this row.
$file_modified_time
Date and time of the last modification of the file for this row.
$file_size
Size of the file for this row.
Creating schemas#
The connector supports creating schemas. You can create a schema with or without a specified location.
You can create a schema with the CREATE SCHEMA statement and the
location
schema property. Tables in this schema are located in a
subdirectory under the schema location. Data files for tables in this schema
using the default location are cleaned up if the table is dropped:
CREATE SCHEMA example.example_schema
WITH (location = 's3://my-bucket/a/path');
Optionally, the location can be omitted. Tables in this schema must have a location included when you create them. The data files for these tables are not removed if the table is dropped:
CREATE SCHEMA example.example_schema
Creating tables#
When Delta tables exist in storage, but not in the metastore, Trino can be used to register them:
CREATE TABLE example.default.example_table (
dummy bigint
)
WITH (
location = '...'
)
Columns listed in the DDL, such as dummy
in the preceeding example, are
ignored. The table schema is read from the transaction log, instead. If the
schema is changed by an external system, Trino automatically uses the new
schema.
Предупреждение
Using CREATE TABLE
with an existing table content is deprecated, instead use the
system.register_table
procedure. The CREATE TABLE ... WITH (location=...)
syntax can be temporarily re-enabled using the delta.legacy-create-table-with-existing-location.enabled
config property or legacy_create_table_with_existing_location_enabled
session property.
If the specified location does not already contain a Delta table, the connector automatically writes the initial transaction log entries and registers the table in the metastore. As a result, any Databricks engine can write to the table:
CREATE TABLE example.default.new_table (id bigint, address varchar);
The Delta Lake connector also supports creating tables using the CREATE TABLE AS syntax.
There are three table properties available for use in table creation.
Property name |
Description |
---|---|
|
File system location URI for the table. |
|
Set partition columns. |
|
Set the checkpoint interval in seconds. |
The following example uses all three table properties:
CREATE TABLE example.default.example_partitioned_table
WITH (
location = 's3://my-bucket/a/path',
partitioned_by = ARRAY['regionkey'],
checkpoint_interval = 5
)
AS SELECT name, comment, regionkey FROM tpch.tiny.nation;
Register table#
The connector can register table into the metastore with existing transaction logs and data files.
The system.register_table
procedure allows the caller to register an existing delta lake
table in the metastore, using its existing transaction logs and data files:
CALL example.system.register_table(schema_name => 'testdb', table_name => 'customer_orders', table_location => 's3://my-bucket/a/path')
To prevent unauthorized users from accessing data, this procedure is disabled by default.
The procedure is enabled only when delta.register-table-procedure.enabled
is set to true
.
Updating data#
You can use the connector to INSERT, DELETE, UPDATE, and MERGE data in Delta Lake tables.
Write operations are supported for tables stored on the following systems:
Azure ADLS Gen2, Google Cloud Storage
Writes to the Azure ADLS Gen2 and Google Cloud Storage are enabled by default. Trino detects write collisions on these storage systems when writing from multiple Trino clusters, or from other query engines.
S3 and S3-compatible storage
Writes to Amazon S3 and S3-compatible storage must be enabled with the
delta.enable-non-concurrent-writes
property. Writes to S3 can safely be made from multiple Trino clusters, however write collisions are not detected when writing concurrently from other Delta Lake engines. You need to make sure that no concurrent data modifications are run to avoid data corruption.
Performance#
The connector includes a number of performance improvements, detailed in the following sections:
Support for write partitioning.
Table statistics#
You can use ANALYZE statements in Trino to populate the table statistics in Delta Lake. Data size and number of distinct values (NDV) statistics are supported, while Minimum value, maximum value, and null value count statistics are not supported. The cost-based optimizer then uses these statistics to improve query performance.
Extended statistics enable a broader set of optimizations, including join
reordering. The controlling catalog property delta.table-statistics-enabled
is enabled by default. The equivalent catalog session property is statistics_enabled
.
Each ANALYZE
statement updates the table statistics incrementally, so only
the data changed since the last ANALYZE
is counted. The table statistics are
not automatically updated by write operations such as INSERT
, UPDATE
,
and DELETE
. You must manually run ANALYZE
again to update the table
statistics.
To collect statistics for a table, execute the following statement:
ANALYZE table_schema.table_name;
To gain the most benefit from cost-based optimizations, run periodic ANALYZE
statements on every large table that is frequently queried.
Fine tuning#
The files_modified_after
property is useful if you want to run the
ANALYZE
statement on a table that was previously analyzed. You can use it to
limit the amount of data used to generate the table statistics:
ANALYZE example_table WITH(files_modified_after = TIMESTAMP '2021-08-23
16:43:01.321 Z')
As a result, only files newer than the specified time stamp are used in the analysis.
You can also specify a set or subset of columns to analyze using the columns
property:
ANALYZE example_table WITH(columns = ARRAY['nationkey', 'regionkey'])
To run ANALYZE
with columns
more than once, the next ANALYZE
must
run on the same set or a subset of the original columns used.
To broaden the set of columns
, drop the statistics and reanalyze the table.
Disable and drop extended statistics#
You can disable extended statistics with the catalog configuration property
delta.extended-statistics.enabled
set to false
. Alternatively, you can
disable it for a session, with the catalog session property extended_statistics_enabled
set to false
.
If a table is changed with many delete and update operation, calling ANALYZE
does not result in accurate statistics. To correct the statistics you have to
drop the extended stats and analyze table again.
Use the system.drop_extended_stats
procedure in the catalog to drop the
extended statistics for a specified table in a specified schema:
CALL example.system.drop_extended_stats('my_schema', 'my_table')
Memory usage#
The Delta Lake connector is memory intensive and the amount of required memory grows with the size of Delta Lake transaction logs of any accessed tables. It is important to take that into account when provisioning the coordinator.
You need to decrease memory usage by keeping the number of active data files in
table low by running OPTIMIZE
and VACUUM
in Delta Lake regularly.
VACUUM
#
The VACUUM
procedure removes all old files that are not in the transaction
log, as well as files that are not needed to read table snapshots newer than the
current time minus the retention period defined by the retention period
parameter.
Users with INSERT
and DELETE
permissions on a table can run VACUUM
as follows:
CALL example.system.vacuum('myschemaname', 'mytablename', '7d');
All parameters are required, and must be presented in the following order:
Schema name
Table name
Retention period
The delta.vacuum.min-retention
config property provides a safety
measure to ensure that files are retained as expected. The minimum value for
this property is 0s
. There is a minimum retention session property as well,
vacuum_min_retention
.
Memory monitoring#
When using the Delta Lake connector you need to monitor memory usage on the coordinator. Specifically monitor JVM heap utilization using standard tools as part of routine operation of the cluster.
A good proxy for memory usage is the cache utilization of Delta Lake caches. It
is exposed by the connector with the
plugin.deltalake.transactionlog:name=<catalog-name>,type=transactionlogaccess
JMX bean.
You can access it with any standard monitoring software with JMX support, or use the JMX коннектор with the following query:
SELECT * FROM jmx.current."*.plugin.deltalake.transactionlog:name=<catalog-name>,type=transactionlogaccess"
Following is an example result:
datafilemetadatacachestats.hitrate | 0.97
datafilemetadatacachestats.missrate | 0.03
datafilemetadatacachestats.requestcount | 3232
metadatacachestats.hitrate | 0.98
metadatacachestats.missrate | 0.02
metadatacachestats.requestcount | 6783
node | trino-master
object_name | io.trino.plugin.deltalake.transactionlog:type=TransactionLogAccess,name=delta
In a healthy system both datafilemetadatacachestats.hitrate
and
metadatacachestats.hitrate
are close to 1.0
.