Hive коннектор#
Примечание
Ниже приведена оригинальная документация Trino. Скоро мы ее переведем на русский язык и дополним полезными примерами.
The Hive connector allows querying data stored in an Apache Hive data warehouse. Hive is a combination of three components:
Data files in varying formats, that are typically stored in the Hadoop Distributed File System (HDFS) or in object storage systems such as Amazon S3.
Metadata about how the data files are mapped to schemas and tables. This metadata is stored in a database, such as MySQL, and is accessed via the Hive metastore service.
A query language called HiveQL. This query language is executed on a distributed computing framework such as MapReduce or Tez.
Trino only uses the first two components: the data and the metadata. It does not use HiveQL or any part of Hive’s execution environment.
Requirements#
The Hive connector requires a Hive metastore service (HMS), or a compatible implementation of the Hive metastore, such as AWS Glue.
You must select and configure a supported file system in your catalog configuration file.
The coordinator and all workers must have network access to the Hive metastore and the storage system. Hive metastore access with the Thrift protocol defaults to using port 9083.
Data files must be in a supported file format. File formats can be
configured using the format
table property
and other specific properties:
In the case of serializable formats, only specific SerDes are allowed:
RCText - RCFile using
ColumnarSerDe
RCBinary - RCFile using
LazyBinaryColumnarSerDe
SequenceFile
CSV - using
org.apache.hadoop.hive.serde2.OpenCSVSerde
JSON - using
org.apache.hive.hcatalog.data.JsonSerDe
OPENX_JSON - OpenX JSON SerDe from
org.openx.data.jsonserde.JsonSerDe
. Find more details about the Trino implementation in the source repository.TextFile
General configuration#
To configure the Hive connector, create a catalog properties file
etc/catalog/example.properties
that references the hive
connector.
You must configure a metastore for metadata.
You must select and configure one of the supported file systems.
connector.name=hive
hive.metastore.uri=thrift://example.net:9083
fs.x.enabled=true
Replace the fs.x.enabled
configuration property with the desired file system.
If you are using AWS Glue as your metastore, you
must instead set hive.metastore
to glue
:
connector.name=hive
hive.metastore=glue
Each metastore type has specific configuration properties along with General metastore configuration properties.
Multiple Hive clusters#
You can have as many catalogs as you need, so if you have additional
Hive clusters, simply add another properties file to etc/catalog
with a different name, making sure it ends in .properties
. For
example, if you name the property file sales.properties
, Trino
creates a catalog named sales
using the configured connector.
Hive general configuration properties#
The following table lists general configuration properties for the Hive connector. There are additional sets of configuration properties throughout the Hive connector documentation.
Property Name |
Description |
Default |
---|---|---|
|
Enable reading data from subdirectories of table or partition locations. If
disabled, subdirectories are ignored. This is equivalent to the
|
|
|
Ignore partitions when the file system location does not exist rather than failing the query. This skips data that may be expected to be part of the table. |
|
|
The default file format used when creating new tables. |
|
|
Access ORC columns by name. By default, columns in ORC files are accessed by
their ordinal position in the Hive table definition. The equivalent catalog
session property is |
|
|
Access Parquet columns by name by default. Set this property to |
|
|
Time zone for Parquet read and write. |
JVM default |
|
The compression codec to use when writing files. Possible values are |
|
|
Force splits to be scheduled on the same node as the Hadoop DataNode process serving the split data. This is useful for installations where Trino is collocated with every DataNode. |
|
|
Should new partitions be written using the existing table format or the default Trino format? |
|
|
Can new data be inserted into existing partitions? If |
|
|
What happens when data is inserted into an existing partition? Possible values are
The equivalent catalog session property is |
|
|
Best effort maximum size of new files. |
|
|
Should empty files be created for buckets that have no data? |
|
|
Enables validation that data is in the correct bucket when reading bucketed tables. |
|
|
Specifies the number of partitions to analyze when computing table statistics. |
100 |
|
Maximum number of partitions per writer. |
100 |
|
The maximum number of partitions for a single table scan to load eagerly on the coordinator. Certain optimizations are not possible without eager loading. |
100,000 |
|
Maximum number of partitions for a single table scan. |
1,000,000 |
|
Enable writes to non-managed (external) Hive tables. |
|
|
Enable creating non-managed (external) Hive tables. |
|
|
Enables automatic column level statistics collection on write. See Table statistics for details. |
|
|
Cache directory listing for specific tables. Examples:
|
|
|
Maximum retained size of cached file status entries. |
|
|
How long a cached directory listing is considered valid. |
|
|
Maximum retained size of all entries in per transaction file status cache. Retained size limit is shared across all running queries. |
|
|
Adjusts binary encoded timestamp values to a specific time zone. For Hive 3.1+, this must be set to UTC. |
JVM default |
|
Specifies the precision to use for Hive columns of type |
|
|
Controls whether the temporary staging directory configured at
|
|
|
Controls the location of temporary staging directory that is used for write
operations. The |
|
|
Enable translation for Hive views. |
|
|
Use the legacy algorithm to translate Hive views. You
can use the |
|
|
Improve parallelism of partitioned and bucketed table writes. When disabled, the number of writing threads is limited to number of buckets. |
|
|
Set to |
|
|
Allow specifying the list of schemas for which Trino will enforce that
queries use a filter on partition keys for source tables. The list can be
specified using the |
|
|
Enables Table statistics. The equivalent catalog session
property is |
|
|
Set the default value for the auto_purge table property for managed tables. See the Table properties for more information on auto_purge. |
|
|
Enables Athena partition projection support |
|
|
Maximum number of partitions to drop in a single query. |
100,000 |
|
Enables auto-commit for all writes. This can be used to disallow multi-statement write transactions. |
|
File system access configuration#
The connector supports accessing the following file systems:
You must enable and configure the specific file system access. Legacy support is not recommended and will be removed.
Fault-tolerant execution support#
The connector supports Fault-tolerant execution of query processing. Read and write operations are both supported with any retry policy on non-transactional tables.
Read operations are supported with any retry policy on transactional tables.
Write operations and CREATE TABLE ... AS
operations are not supported with
any retry policy on transactional tables.
Security#
The connector supports different means of authentication for the used file system and metastore.
In addition, the following security-related features are supported.
SQL support#
The connector provides read access and write access to data and metadata in the configured object storage system and metadata stores:
Globally available statements; see also Globally available statements
-
DML; see also Hive-specific data management
Схемы и таблицы; see also Hive-specific schema and table management
Представления (view); see also Hive-specific view management
Безопасность: see also SQL standard-based authorization for object storage
Refer to the migration guide for practical advice on migrating from Hive to Trino.
The following sections provide Hive-specific information regarding SQL support.
Basic usage examples#
The examples shown here work on Google Cloud Storage by replacing s3://
with
gs://
.
Create a new Hive table named page_views
in the web
schema
that is stored using the ORC file format, partitioned by date and
country, and bucketed by user into 50
buckets. Note that Hive
requires the partition columns to be the last columns in the table:
CREATE TABLE example.web.page_views (
view_time TIMESTAMP,
user_id BIGINT,
page_url VARCHAR,
ds DATE,
country VARCHAR
)
WITH (
format = 'ORC',
partitioned_by = ARRAY['ds', 'country'],
bucketed_by = ARRAY['user_id'],
bucket_count = 50
)
Create a new Hive schema named web
that stores tables in an
S3 bucket named my-bucket
:
CREATE SCHEMA example.web
WITH (location = 's3://my-bucket/')
Drop a schema:
DROP SCHEMA example.web
Drop a partition from the page_views
table:
DELETE FROM example.web.page_views
WHERE ds = DATE '2016-08-09'
AND country = 'US'
Query the page_views
table:
SELECT * FROM example.web.page_views
List the partitions of the page_views
table:
SELECT * FROM example.web."page_views$partitions"
Create an external Hive table named request_logs
that points at
existing data in S3:
CREATE TABLE example.web.request_logs (
request_time TIMESTAMP,
url VARCHAR,
ip VARCHAR,
user_agent VARCHAR
)
WITH (
format = 'TEXTFILE',
external_location = 's3://my-bucket/data/logs/'
)
Collect statistics for the request_logs
table:
ANALYZE example.web.request_logs;
Drop the external table request_logs
. This only drops the metadata
for the table. The referenced data directory is not deleted:
DROP TABLE example.web.request_logs
CREATE TABLE AS can be used to create transactional tables in ORC format like this:
CREATE TABLE <name> WITH ( format='ORC', transactional=true ) AS <query>
Add an empty partition to the page_views
table:
CALL system.create_empty_partition(
schema_name => 'web',
table_name => 'page_views',
partition_columns => ARRAY['ds', 'country'],
partition_values => ARRAY['2016-08-09', 'US']);
Drop stats for a partition of the page_views
table:
CALL system.drop_stats(
schema_name => 'web',
table_name => 'page_views',
partition_values => ARRAY[ARRAY['2016-08-09', 'US']]);
Procedures#
Use the CALL statement to perform data manipulation or
administrative tasks. Procedures must include a qualified catalog name, if your
Hive catalog is called web
:
CALL web.system.example_procedure()
The following procedures are available:
system.create_empty_partition(schema_name, table_name, partition_columns, partition_values)
Create an empty partition in the specified table.
system.sync_partition_metadata(schema_name, table_name, mode, case_sensitive)
Check and update partitions list in metastore. There are three modes available:
ADD
: add any partitions that exist on the file system, but not in the metastore.DROP
: drop any partitions that exist in the metastore, but not on the file system.FULL
: perform bothADD
andDROP
.
The
case_sensitive
argument is optional. The default value istrue
for compatibility with Hive’sMSCK REPAIR TABLE
behavior, which expects the partition column names in file system paths to use lowercase (e.g.col_x=SomeValue
). Partitions on the file system not conforming to this convention are ignored, unless the argument is set tofalse
.system.drop_stats(schema_name, table_name, partition_values)
Drops statistics for a subset of partitions or the entire table. The partitions are specified as an array whose elements are arrays of partition values (similar to the
partition_values
argument increate_empty_partition
). Ifpartition_values
argument is omitted, stats are dropped for the entire table.
system.register_partition(schema_name, table_name, partition_columns, partition_values, location)
Registers existing location as a new partition in the metastore for the specified table.
When the
location
argument is omitted, the partition location is constructed usingpartition_columns
andpartition_values
.Due to security reasons, the procedure is enabled only when
hive.allow-register-partition-procedure
is set totrue
.
system.unregister_partition(schema_name, table_name, partition_columns, partition_values)
Unregisters given, existing partition in the metastore for the specified table. The partition data is not deleted.
system.flush_metadata_cache()
Flush all Hive metadata caches.
system.flush_metadata_cache(schema_name => ..., table_name => ...)
Flush Hive metadata caches entries connected with selected table. Procedure requires named parameters to be passed
system.flush_metadata_cache(schema_name => ..., table_name => ..., partition_columns => ARRAY[...], partition_values => ARRAY[...])
Flush Hive metadata cache entries connected with selected partition. Procedure requires named parameters to be passed.
Data management#
The DML functionality includes support for INSERT
,
UPDATE
, DELETE
, and MERGE
statements, with the exact support
depending on the storage system, file format, and metastore.
When connecting to a Hive metastore version 3.x, the Hive connector supports reading from and writing to insert-only and ACID tables, with full support for partitioning and bucketing.
DELETE applied to non-transactional tables is only supported if the
table is partitioned and the WHERE
clause matches entire partitions.
Transactional Hive tables with ORC format support «row-by-row» deletion, in
which the WHERE
clause may match arbitrary sets of rows.
UPDATE is only supported for transactional Hive tables with format
ORC. UPDATE
of partition or bucket columns is not supported.
MERGE is only supported for ACID tables.
ACID tables created with Hive Streaming Ingest are not supported.
Schema and table management#
The Hive connector supports querying and manipulating Hive tables and schemas (databases). While some uncommon operations must be performed using Hive directly, most operations can be performed using Trino.
Schema evolution#
Hive table partitions can differ from the current table schema. This occurs when the data types of columns of a table are changed from the data types of columns of preexisting partitions. The Hive connector supports this schema evolution by allowing the same conversions as Hive. The following table lists possible data type conversions.
Data type |
Converted to |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Any conversion failure results in null, which is the same behavior
as Hive. For example, converting the string 'foo'
to a number,
or converting the string '1234'
to a TINYINT
(which has a
maximum value of 127
).
Avro schema evolution#
Trino supports querying and manipulating Hive tables with the Avro storage format, which has the schema set based on an Avro schema file/literal. Trino is also capable of creating the tables in Trino by infering the schema from a valid Avro schema file located locally, or remotely in HDFS/Web server.
To specify that the Avro schema should be used for interpreting table data, use
the avro_schema_url
table property.
The schema can be placed in the local file system or remotely in the following locations:
HDFS (e.g.
avro_schema_url = 'hdfs://user/avro/schema/avro_data.avsc'
)S3 (e.g.
avro_schema_url = 's3n:///schema_bucket/schema/avro_data.avsc'
)A web server (e.g.
avro_schema_url = 'http://example.org/schema/avro_data.avsc'
)
The URL, where the schema is located, must be accessible from the Hive metastore and Trino coordinator/worker nodes.
Alternatively, you can use the table property avro_schema_literal
to define
the Avro schema.
The table created in Trino using the avro_schema_url
or
avro_schema_literal
property behaves the same way as a Hive table with
avro.schema.url
or avro.schema.literal
set.
Example:
CREATE TABLE example.avro.avro_data (
id BIGINT
)
WITH (
format = 'AVRO',
avro_schema_url = '/usr/local/avro_data.avsc'
)
The columns listed in the DDL (id
in the above example) is ignored if avro_schema_url
is specified.
The table schema matches the schema in the Avro schema file. Before any read operation, the Avro schema is
accessed so the query result reflects any changes in schema. Thus Trino takes advantage of Avro’s backward compatibility abilities.
If the schema of the table changes in the Avro schema file, the new schema can still be used to read old data. Newly added/renamed fields must have a default value in the Avro schema file.
The schema evolution behavior is as follows:
Column added in new schema: Data created with an older schema produces a default value when table is using the new schema.
Column removed in new schema: Data created with an older schema no longer outputs the data from the column that was removed.
Column is renamed in the new schema: This is equivalent to removing the column and adding a new one, and data created with an older schema produces a default value when table is using the new schema.
Changing type of column in the new schema: If the type coercion is supported by Avro or the Hive connector, then the conversion happens. An error is thrown for incompatible types.
Limitations#
The following operations are not supported when avro_schema_url
is set:
CREATE TABLE AS
is not supported.Bucketing(
bucketed_by
) columns are not supported inCREATE TABLE
.ALTER TABLE
commands modifying columns are not supported.
ALTER TABLE EXECUTE#
The connector supports the following commands for use with ALTER TABLE EXECUTE.
optimize#
The optimize
command is used for rewriting the content of the specified
table so that it is merged into fewer but larger files. If the table is
partitioned, the data compaction acts separately on each partition selected for
optimization. This operation improves read performance.
All files with a size below the optional file_size_threshold
parameter
(default value for the threshold is 100MB
) are merged:
ALTER TABLE test_table EXECUTE optimize
The following statement merges files in a table that are under 128 megabytes in size:
ALTER TABLE test_table EXECUTE optimize(file_size_threshold => '128MB')
You can use a WHERE
clause with the columns used to partition the table
to filter which partitions are optimized:
ALTER TABLE test_partitioned_table EXECUTE optimize
WHERE partition_key = 1
You can use a more complex WHERE
clause to narrow down the scope of the
optimize
procedure. The following example casts the timestamp values to
dates, and uses a comparison to only optimize partitions with data from the year
2022 or newer:
ALTER TABLE test_table EXECUTE optimize
WHERE CAST(timestamp_tz AS DATE) > DATE '2021-12-31'
The optimize
command is disabled by default, and can be enabled for a
catalog with the <catalog-name>.non_transactional_optimize_enabled
session property:
SET SESSION <catalog_name>.non_transactional_optimize_enabled=true
Предупреждение
Because Hive tables are non-transactional, take note of the following possible outcomes:
If queries are run against tables that are currently being optimized, duplicate rows may be read.
In rare cases where exceptions occur during the
optimize
operation, a manual cleanup of the table directory is needed. In this situation, refer to the Trino logs and query failure messages to see which files must be deleted.
Table properties#
Table properties supply or set metadata for the underlying tables. This is key for CREATE TABLE AS statements. Table properties are passed to the connector using a WITH clause:
CREATE TABLE tablename
WITH (format='CSV',
csv_escape = '"')
Property name |
Description |
Default |
---|---|---|
|
Indicates to the configured metastore to perform a purge when a table or partition is deleted instead of a soft deletion using the trash. |
|
|
The URI pointing to Avro schema evolution for the table. |
|
|
The number of buckets to group data into. Only valid if used with
|
0 |
|
The bucketing column for the storage table. Only valid if used with
|
|
|
Specifies which Hive bucketing version to use. Valid values are |
|
|
The CSV escape character. Requires CSV format. |
|
|
The CSV quote character. Requires CSV format. |
|
|
The CSV separator character. Requires CSV format. You can use other
separators such as |
|
|
The URI for an external Hive table on S3, Azure Blob Storage, etc. See the Basic usage examples for more information. |
|
|
The table file format. Valid values include |
|
|
The serialization format for |
|
|
Comma separated list of columns to use for ORC bloom filter. It improves the
performance of queries using equality predicates, such as |
|
|
The ORC bloom filters false positive probability. Requires ORC format. |
0.05 |
|
The partitioning column for the storage table. The columns listed in the
|
|
|
Comma separated list of columns to use for Parquet bloom filter. It improves
the performance of queries using equality predicates, such as |
|
|
The number of footer lines to ignore when parsing the file for data. Requires TextFile or CSV format tables. |
|
|
The number of header lines to ignore when parsing the file for data. Requires TextFile or CSV format tables. |
|
|
The column to sort by to determine bucketing for row. Only valid if
|
|
|
Allows the use of custom field separators, such as „|“, for TextFile formatted tables. |
|
|
Allows the use of a custom escape character for TextFile formatted tables. |
|
|
Set this property to |
|
|
Enables partition projection for selected table. Mapped from AWS Athena table property projection.enabled. |
|
|
Ignore any partition projection properties stored in the metastore for the selected table. This is a Trino-only property which allows you to work around compatibility issues on a specific table, and if enabled, Trino ignores all other configuration options related to partition projection. |
|
|
Projected partition location template, such as |
|
|
Additional properties added to a Hive table. The properties are not used by
Trino, and are available in the |
Metadata tables#
The raw Hive table properties are available as a hidden table, containing a separate column per table property, with a single row containing the property values.
$properties
table#
The properties table name is composed with the table name and $properties
appended.
It exposes the parameters of the table in the metastore.
You can inspect the property names and values with a simple query:
SELECT * FROM example.web."page_views$properties";
stats_generated_via_stats_task | auto.purge | trino_query_id | trino_version | transactional
---------------------------------------------+------------+-----------------------------+---------------+---------------
workaround for potential lack of HIVE-12730 | false | 20230705_152456_00001_nfugi | 434 | false
$partitions
table#
The $partitions
table provides a list of all partition values
of a partitioned table.
The following example query returns all partition values from the
page_views
table in the web
schema of the example
catalog:
SELECT * FROM example.web."page_views$partitions";
day | country
------------+---------
2023-07-01 | POL
2023-07-02 | POL
2023-07-03 | POL
2023-03-01 | USA
2023-03-02 | USA
Column properties#
Property name |
Description |
Default |
---|---|---|
|
Defines the type of partition projection to use on this column. May be used
only on partition columns. Available types: |
|
|
Used with |
|
|
Used with |
|
|
Used with |
|
|
Used with |
|
|
Used with |
|
|
Used with |
Metadata columns#
In addition to the defined columns, the Hive connector automatically exposes metadata in a number of hidden columns in each table:
$bucket
: Bucket number for this row$path
: Full file system path name of the file for this row$file_modified_time
: Date and time of the last modification of the file for this row$file_size
: Size of the file for this row$partition
: Partition name for this row
You can use these columns in your SQL statements like any other column. They can be selected directly, or used in conditional statements. For example, you can inspect the file size, location and partition for each record:
SELECT *, "$path", "$file_size", "$partition"
FROM example.web.page_views;
Retrieve all records that belong to files stored in the partition
ds=2016-08-09/country=US
:
SELECT *, "$path", "$file_size"
FROM example.web.page_views
WHERE "$partition" = 'ds=2016-08-09/country=US'
View management#
Trino allows reading from Hive materialized views, and can be configured to support reading Hive views.
Materialized views#
The Hive connector supports reading from Hive materialized views. In Trino, these views are presented as regular, read-only tables.
Hive views#
Hive views are defined in HiveQL and stored in the Hive Metastore Service. They are analyzed to allow read access to the data.
The Hive connector includes support for reading Hive views with three different modes.
Disabled
Legacy
Experimental
If using Hive views from Trino is required, you must compare results in Hive and Trino for each view definition to ensure identical results. Use the experimental mode whenever possible. Avoid using the legacy mode. Leave Hive views support disabled, if you are not accessing any Hive views from Trino.
You can configure the behavior in your catalog properties file.
By default, Hive views are executed with the RUN AS DEFINER
security mode.
Set the hive.hive-views.run-as-invoker
catalog configuration property to
true
to use RUN AS INVOKER
semantics.
Disabled
The default behavior is to ignore Hive views. This means that your business logic and data encoded in the views is not available in Trino.
Legacy
A very simple implementation to execute Hive views, and therefore allow read
access to the data in Trino, can be enabled with
hive.hive-views.enabled=true
and
hive.hive-views.legacy-translation=true
.
For temporary usage of the legacy behavior for a specific catalog, you can set
the hive_views_legacy_translation
catalog session property to true
.
This legacy behavior interprets any HiveQL query that defines a view as if it is written in SQL. It does not do any translation, but instead relies on the fact that HiveQL is very similar to SQL.
This works for very simple Hive views, but can lead to problems for more complex queries. For example, if a HiveQL function has an identical signature but different behaviors to the SQL version, the returned results may differ. In more extreme cases the queries might fail, or not even be able to be parsed and executed.
Experimental
The new behavior is better engineered and has the potential to become a lot more powerful than the legacy implementation. It can analyze, process, and rewrite Hive views and contained expressions and statements.
It supports the following Hive view functionality:
UNION [DISTINCT]
andUNION ALL
against Hive viewsNested
GROUP BY
clausescurrent_user()
LATERAL VIEW OUTER EXPLODE
LATERAL VIEW [OUTER] EXPLODE
on array of structLATERAL VIEW json_tuple
You can enable the experimental behavior with
hive.hive-views.enabled=true
. Remove the
hive.hive-views.legacy-translation
property or set it to false
to make
sure legacy is not enabled.
Keep in mind that numerous features are not yet implemented when experimenting with this feature. The following is an incomplete list of missing functionality:
HiveQL
current_date
,current_timestamp
, and othersHive function calls including
translate()
, window functions, and othersCommon table expressions and simple case expressions
Honor timestamp precision setting
Support all Hive data types and correct mapping to Trino types
Ability to process custom UDFs
Производительность#
Данная секция описывает важные улучшения производительности, реализованные в Hive коннекторе.
Локальный дисковый кэш данных#
Коннектор позволяет сохранять часть данных из озера данных на дисках worker-узлов CedrusData для ускорения доступа к ним. Во многих случаях использование локального дискового кэша приводит к кратному ускорению запросов.
При каждом доступе к колонке CedrusData проверяет, были ли закэшированные данные изменены в удаленном источнике. Если обнаружено изменение, локальные данные будут удалены и закэшированы повторно.
CedrusData кэширует метаданные файлов, а также диапазоны записей по мере необходимости. Гранулярность кэширования диапазонов записей зависит от формата:
Для формата Parquet единицей кэширования являются данные колонки внутри row group
Для формата ORC единицей кэширования являются данные колонки внутри stripe
Для включения локального дискового кэша необходимо:
Установить параметр конфигурации каталога
cedrusdata.hive.data-cache.enabled=true
Указать путь к файлу, в котором описаны правила кэширования в JSON формате
Для узлов, которые будут выполнять запросы (все worker-узлы, а также coordinator-узел, запущенный с параметром
node-scheduler.include-coordinator=true
) необходимо также указать путь к директории, в которой CedrusData будет хранить закэшированные данные. Параметр конфигурации:cedrusdata.hive.data-cache.path
Пример конфигурации для координатора, который не выполняет запросы:
cedrusdata.hive.data-cache.enabled=true
cedrusdata.hive.data-cache.rules.file=/path/to/rules.json
Пример конфигурации для worker-узла или координатора, который выполняет запросы (node-scheduler.include-coordinator=true
):
cedrusdata.hive.data-cache.enabled=true
cedrusdata.hive.data-cache.rules.file=/path/to/rules.json
cedrusdata.hive.data-cache.path=file:///path/to/cache/dir
Правила кэширования необходимо задать в отдельном файле в формате:
{
"rules": [
<rule1>,
<rule2>,
...
]
}
Где <rule>
представляет собой отдельное правило в формате:
{
"schema": "<regexp схемы>",
"table": "<regexp таблицы>",
"partition_filter": "<предикат ключа партиционирования таблицы>",
"distribution_mode": "<способ распределения сплитов по узлам>",
"affinity_mode": "<способ привязки сплитов к узлам>",
"affinity_node_count": "<количество узлов на которых может быть обработан сплит>",
"disable_cache": <флаг отключения кэширования>
}
Подробное описание полей правила:
Название |
Описание |
---|---|
|
Обязательное поле.
Задает паттерн для схем в формате Java Pattern.
Например, |
|
Опциональное поле.
Задает паттерн для таблиц в формате Java Pattern.
Например, |
|
Опциональное поле.
Задает фильтр партиции в виде SQL выражения. Выражение может ссылаться на колонки партиции таблицы и использовать стандартные функции.
Выражение не может ссылаться на колонки, которые не входят в ключ партиции, а также содержать подзапросы и вызовы табличных функций.
Вызов функций происходит от имени специального системного пользователя, на которого не распространяются проверки доступа к функциям.
Если значение |
|
Опциональное поле.
Поддерживается только для Hive коннектора.
Задает способ сопоставления сплитов с worker-узлами.
Допустимые значения: |
|
Опциональное поле.
Задает механизм привязки сплита к узлу.
Допустимые значения: |
|
Опциональное поле.
Задает количество узлов на которых может быть выполнен сплит.
Увеличение данного значения приводит к дублированию закэшированных в кластере, но обеспечивает более равномерное распределение нагрузки.
Если |
|
Опциональное поле.
Позволяет отключить кэширование заданных объектов, соответствующих заданным паттернам |
Обработка правил происходит в порядке их указания в файле сверху вниз. Проверка, кэшировать ли данные из текущей таблицы или партиции, завершается, как только найдено первое подходящее правило.
Ниже приведен полный пример файла правил, который включает кэширование для всех таблиц схем s1
и s2
, кроме таблицы s2.excluded_table
,
а также для таблицы s3.partitioned_table
, в которой закэшированы будут только партиции продаж за последний месяц:
{
"rules": [
{
"schema": "s2",
"table": "excluded_table",
"disable_cache": true
},
{
"schema": "s1|s2"
},
{
"schema": "s3",
"table": "partitioned_table",
"partition_filter": "sales_date + interval '1' month <= current_date"
}
]
}
Вы можете изменять содержимое файла без перезапуска узла. Повторное чтение содержимого файла происходит периодически в соответствии с параметром конфигурации
cedrusdata.hive.data-cache.rules.refresh-period
.
Название |
Описание |
Значение по умолчанию |
---|---|---|
|
Использовать ли локальный кэш данных. Параметр сессии: |
|
|
Путь к файлу с правилами кэширования в формате JSON. |
|
|
Как частно повторно считывать правила кэширования из файла, путь к которому задан в |
|
|
Имя часового пояса, в контексте которого происходит вычисление предикатов партиций. |
Имя текущего часового пояса JVM |
|
Размер кэша, в котором хранится решение о кэшировании и скомпилированный предикат партиции (при наличии) для таблиц. Значение 0 отключает кэширование (не рекомендовано). |
1000 |
|
Путь к локальной директории узла, в котором будут сохранены закэшированные данные.
Путь должен быть задан в формате |
|
|
Максимальный размер кэша данных. При превышении размера CedrusData начнет удаление наиболее редко используемых данных. Данный размер учитывает реальный размер данных на диске с учетом возможной компрессии (см. ниже). |
|
|
Максимальное время хранения записи в кэше. По истечении данного времени запись будет удалена из кэша. |
|
|
Режим компрессии закэшированных данных. При отсутствии компрессии данные занимают больше места на диске, но при
этом требуют меньше ресурсов CPU для чтения. При включенной компрессии данные занимают меньше месте на диске, но
каждая операция чтения потребляет больше CPU. Принимайте решение о включении компрессии на основе того, какой
ресурс узла является более дефицитным. Доступные значения: |
|
|
Как часто производить очистку кэша от устаревших записей. Запись считается устаревшей, если истек ее TTL, заданный параметром |
|
|
Сколько записей возвращать движку CedrusData при чтении данных из кэша. Используется для тонкой настройки производительности. В большинстве случаев его изменение не требуется. |
|
|
Максимальное количество операций кэширования удаленных данных. Когда CedrusData обнаруживает, что удаленные данные соответствуют заданным правилам кэширования, но отсутствуют в кэше, происходит асинхронное кэширование данных, которое требует повторное удаленное чтение. Таким образом, при прогреве кэша могут возникать всплески сетевой и дисковой I/O активности, которые могут негативно сказаться на производительности текущих запросов. Для уменьшения негативного эффекта вы можете задать максимальное количество запросов на запись данных в кэш. |
|
|
Размер буфера при чтении данных с диска. Увеличения размера буфера приводит к уменьшению количества IOPS,
требуемых для чтения данных, но увеличивает потребление памяти. Значение по умолчанию должно хорошо
справляться с большинством типичных нагрузок. Настройка данного параметра может быть полезна в облачных
окружениях, которые зачастую ограничивают количество IOPS в секунду. Обратите внимание, что размерность
«килобайт» необходимо указывать как |
|
|
Размер буфера для записи данных на диск. Увеличения размера буфера приводит к уменьшению количества IOPS,
требуемых для записи данных, но увеличивает потребление памяти. Значение по умолчанию должно хорошо
справляться с большинством типичных нагрузок. Настройка данного параметра может быть полезна в облачных
окружениях, которые зачастую ограничивают количество IOPS в секунду. Обратите внимание, что размерность
«килобайт» необходимо указывать как |
|
Для отчистки кэша конкретного каталога воспользуйтесь встроенной процедурой system.cedrusdata.clear_data_cache
.
Единственным аргументом процедуры является название каталога. Следующий запрос очищает кэш каталога my_data_lake
:
CALL system.cedrusdata.clear_data_cache('my_data_lake');
Статистики работы кэша доступны через таблицу JMX коннектор trino.plugin.hive.datacache:name=<имя каталога>,type=hivedatacacheservice
.
Следующий запрос отображает текущие статистики кэша каталога my_data_lake
:
SELECT * FROM jmx."current"."trino.plugin.hive.datacache:name=my_data_lake,type=hivedatacacheservice"
Оптимизация запросов к partitioned таблицам#
Если таблица Hive была создана с параметром partitioned_by
, то CedrusData может использовать информацию о схеме
партиционирования для выбора более оптимального плана запроса.
Наибольшее ускорение ожидается для запросов, в которых присутствуют операторы Join
и Aggregation
. Например:
SELECT ...
FROM t1 JOIN t2
ON t1.partitioning_column = t2.other_column
SELECT a, partitioning_column, b, sum(c)
FROM t
GROUP BY a, partitioning_column, b
Ускорение запросов происходит за счет того, что оптимизатор использует информацию о ключе партиционирования для выбора
более быстрого способа выполнения того или иного оператора. Например, для осуществления группировки по двум атрибутам
GROUP BY a, b
CedrusData обычно осуществляет предварительную группировку локально на узлах, после чего пересылает
полученные данные между узлами с помощью оператора Exchange, чтобы осуществить финальную группировку:
Parent
Aggregation[FINAL, groupBy=[a,b]]
Exchange
Aggregation[PARTIAL, groupBy=[a,b]]
TableScan
Если же одна из колонок a
или b
является ключом партиционирования таблицы, оптимизатор CedrusData использует
эту информацию, чтобы осуществить полную группировку локально, тем самым упрощая план запроса:
Parent
Aggregation[FINAL, groupBy=[a,b]]
TableScan
Название |
Описание |
Значение по умолчанию |
---|---|---|
|
Использовать ли информацию о схеме партиционирования таблиц Hive для оптимизации запросов.
Данный параметр будет проигнорирован, если таблица создана с параметром |
|
|
Использовать ли информацию о схеме партиционирования таблиц Hive для оптимизации запросов, если таблица содержит
параметры |
|
Table statistics#
The Hive connector supports collecting and managing table statistics to improve query processing performance.
When writing data, the Hive connector always collects basic statistics
(numFiles
, numRows
, rawDataSize
, totalSize
)
and by default will also collect column level statistics:
Column type |
Collectible statistics |
---|---|
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values, min/max values |
|
Number of nulls, number of distinct values |
|
Number of nulls, number of distinct values |
|
Number of nulls |
|
Number of nulls, number of true/false values |
Updating table and partition statistics#
If your queries are complex and include joining large data sets, running ANALYZE on tables/partitions may improve query performance by collecting statistical information about the data.
When analyzing a partitioned table, the partitions to analyze can be specified
via the optional partitions
property, which is an array containing
the values of the partition keys in the order they are declared in the table schema:
ANALYZE table_name WITH (
partitions = ARRAY[
ARRAY['p1_value1', 'p1_value2'],
ARRAY['p2_value1', 'p2_value2']])
This query will collect statistics for two partitions with keys
p1_value1, p1_value2
and p2_value1, p2_value2
.
On wide tables, collecting statistics for all columns can be expensive and can have a
detrimental effect on query planning. It is also typically unnecessary - statistics are
only useful on specific columns, like join keys, predicates, grouping keys. One can
specify a subset of columns to be analyzed via the optional columns
property:
ANALYZE table_name WITH (
partitions = ARRAY[ARRAY['p2_value1', 'p2_value2']],
columns = ARRAY['col_1', 'col_2'])
This query collects statistics for columns col_1
and col_2
for the partition
with keys p2_value1, p2_value2
.
Note that if statistics were previously collected for all columns, they must be dropped before re-analyzing just a subset:
CALL system.drop_stats('schema_name', 'table_name')
You can also drop statistics for selected partitions only:
CALL system.drop_stats(
schema_name => 'schema',
table_name => 'table',
partition_values => ARRAY[ARRAY['p2_value1', 'p2_value2']])
Dynamic filtering#
The Hive connector supports the dynamic filtering optimization. Dynamic partition pruning is supported for partitioned tables stored in any file format for broadcast as well as partitioned joins. Dynamic bucket pruning is supported for bucketed tables stored in any file format for broadcast joins only.
For tables stored in ORC or Parquet file format, dynamic filters are also pushed into local table scan on worker nodes for broadcast joins. Dynamic filter predicates pushed into the ORC and Parquet readers are used to perform stripe or row-group pruning and save on disk I/O. Sorting the data within ORC or Parquet files by the columns used in join criteria significantly improves the effectiveness of stripe or row-group pruning. This is because grouping similar data within the same stripe or row-group greatly improves the selectivity of the min/max indexes maintained at stripe or row-group level.
Delaying execution for dynamic filters#
It can often be beneficial to wait for the collection of dynamic filters before starting a table scan. This extra wait time can potentially result in significant overall savings in query and CPU time, if dynamic filtering is able to reduce the amount of scanned data.
For the Hive connector, a table scan can be delayed for a configured amount of
time until the collection of dynamic filters by using the configuration property
hive.dynamic-filtering.wait-timeout
in the catalog file or the catalog
session property <hive-catalog>.dynamic_filtering_wait_timeout
.
Table redirection#
Trino offers the possibility to transparently redirect operations on an existing table to the appropriate catalog based on the format of the table and catalog configuration.
In the context of connectors which depend on a metastore service (for example, Hive коннектор, Iceberg коннектор and Delta Lake коннектор), the metastore (Hive metastore service, AWS Glue Data Catalog) can be used to accustom tables with different table formats. Therefore, a metastore database can hold a variety of tables with different table formats.
As a concrete example, let’s use the following simple scenario which makes use of table redirection:
USE example.example_schema;
EXPLAIN SELECT * FROM example_table;
Query Plan
-------------------------------------------------------------------------
Fragment 0 [SOURCE]
...
Output[columnNames = [...]]
│ ...
└─ TableScan[table = another_catalog:example_schema:example_table]
...
The output of the EXPLAIN
statement points out the actual
catalog which is handling the SELECT
query over the table example_table
.
The table redirection functionality works also when using fully qualified names for the tables:
EXPLAIN SELECT * FROM example.example_schema.example_table;
Query Plan
-------------------------------------------------------------------------
Fragment 0 [SOURCE]
...
Output[columnNames = [...]]
│ ...
└─ TableScan[table = another_catalog:example_schema:example_table]
...
Trino offers table redirection support for the following operations:
Table read operations
Table write operations
Table management operations
Trino does not offer view redirection support.
The connector supports redirection from Hive tables to Iceberg and Delta Lake tables with the following catalog configuration properties:
hive.iceberg-catalog-name
for redirecting the query to Iceberg коннекторhive.delta-lake-catalog-name
for redirecting the query to Delta Lake коннектор
File system cache#
The connector supports configuring and using file system caching.
Performance tuning configuration properties#
The following table describes performance tuning properties for the Hive connector.
Предупреждение
Performance tuning configuration properties are considered expert-level features. Altering these properties from their default values is likely to cause instability and performance degradation.
Property name |
Description |
Default value |
---|---|---|
|
The target number of buffered splits for each table scan in a query, before the scheduler tries to pause. |
|
|
The maximum size allowed for buffered splits for each table scan in a query, before the query fails. |
|
|
The maximum number of splits generated per second per table scan. This can be used to reduce the load on the storage system. By default, there is no limit, which results in Trino maximizing the parallelization of data access. |
|
|
For each table scan, the coordinator first assigns file sections of up to
|
|
|
The size of a single file section assigned to a worker until
|
|
|
The largest size of a single file section assigned to a worker. Smaller splits result in more parallelism and thus can decrease latency, but also have more overhead and increase load on the system. |
|