Pushdown#

CedrusData может выполнять pushdown обработки запросов или их частей в подключенный источник данных. Это означает, что определенный предикат, агрегатная функция или другая операция передается в базовую базу данных или систему хранения для обработки.

Результаты pushdown могут включать следующие преимущества:

  • Улучшение общей производительности запросов

  • Сокращение сетевого трафика между CedrusData и источником данных

  • Снижение нагрузки на удаленный источник данных

Эти преимущества часто приводят к значительному сокращению стоимости выполнения запроса.

Поддержка pushdown зависит от конкретного коннектора и соответствующей базовой базы данных или системы хранения.

Pushdown предикатов#

Pushdown предикатов оптимизирует фильтрацию на уровне строк. Он использует выведенный фильтр, обычно полученный из условия в выражении WHERE, для исключения ненужных строк. Обработка передается коннектором в источник данных, который выполняет фильтрацию.

Если pushdown предиката для конкретного условия выполнен успешно, план EXPLAIN для запроса не содержит операцию ScanFilterProject для этого условия.

Pushdown проекций#

CedrusData использует столбцы, указанные в выражении SELECT и других частях запроса, чтобы ограничить чтение данных только этими столбцами. Если pushdown проекций выполнен успешно, план EXPLAIN для запроса обращается только к соответствующим столбцам в секции Layout операции TableScan.

Pushdown вложенных колонок#

CedrusData может делать pushdown вложенных колонок для дальнейшего ограничения количества данных, возвращаемых из источника. Например, рассмотрим таблицу в коннекторе Hive, содержащую столбец типа ROW с несколькими полями. Если запрос обращается только к одному полю, pushdown разыменования позволяет читателю файла считать только это одно поле внутри строки. Это же применимо к полям строки, вложенной внутри строки верхнего уровня. Это может привести к значительной экономии объема данных, считываемых из системы хранения.

Pushdown агрегаций#

Pushdown агрегаций может быть выполнен при соблюдении следующих условий:

  • Коннектор поддерживает pushdown агрегаций в целом.

  • Коннектор поддерживает pushdown конкретной функции или функций.

  • Структура запроса допускает выполнение pushdown.

Проверить, выполняется ли pushdown для конкретного запроса, можно, посмотрев план EXPLAIN запроса. Если агрегатная функция успешно передана в коннектор, план explain не содержит оператор Aggregate. План explain показывает только операции, выполняемые CedrusData.

В качестве примера мы загрузили набор данных TPC-H в базу данных PostgreSQL и затем выполнили запрос с помощью коннектора PostgreSQL:

SELECT regionkey, count(*)
FROM nation
GROUP BY regionkey;

Получить план explain можно, добавив EXPLAIN перед запросом:

EXPLAIN
SELECT regionkey, count(*)
FROM nation
GROUP BY regionkey;

План explain для этого запроса не содержит оператор Aggregate с функцией count, так как эта операция теперь выполняется коннектором. Функция count(*) видна как часть оператора TableScan PostgreSQL. Это указывает на успешный pushdown.

Fragment 0 [SINGLE]
    Output layout: [regionkey_0, _generated_1]
    Output partitioning: SINGLE []
    Output[regionkey, _col1]
    │   Layout: [regionkey_0:bigint, _generated_1:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
    │   regionkey := regionkey_0
    │   _col1 := _generated_1
    └─ RemoteSource[1]
            Layout: [regionkey_0:bigint, _generated_1:bigint]

Fragment 1 [SOURCE]
    Output layout: [regionkey_0, _generated_1]
    Output partitioning: SINGLE []
    TableScan[postgresql:tpch.nation tpch.nation columns=[regionkey:bigint:int8, count(*):_generated_1:bigint:bigint] groupingSets=[[regionkey:bigint:int8]], gro
        Layout: [regionkey_0:bigint, _generated_1:bigint]
        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        _generated_1 := count(*):_generated_1:bigint:bigint
        regionkey_0 := regionkey:bigint:int8

Ряд факторов может препятствовать выполнению pushdown:

  • Использование другой агрегатной функции, которая не может быть передана в коннектор

  • Использование коннектора без поддержки pushdown для конкретной функции

Пример ниже показывает план, в котором pushdown в удаленный источник данных не выполняется, и CedrusData выполняет обработку агрегатную самостоятельно.

Fragment 0 [SINGLE]
    Output layout: [regionkey, count]
    Output partitioning: SINGLE []
    Output[regionkey, _col1]
    │   Layout: [regionkey:bigint, count:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    │   _col1 := count
    └─ RemoteSource[1]
           Layout: [regionkey:bigint, count:bigint]

Fragment 1 [HASH]
    Output layout: [regionkey, count]
    Output partitioning: SINGLE []
    Aggregate(FINAL)[regionkey]
    │   Layout: [regionkey:bigint, count:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    │   count := count("count_0")
    └─ LocalExchange[HASH][$hashvalue] ("regionkey")
       │   Layout: [regionkey:bigint, count_0:bigint, $hashvalue:bigint]
       │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
       └─ RemoteSource[2]
              Layout: [regionkey:bigint, count_0:bigint, $hashvalue_1:bigint]

Fragment 2 [SOURCE]
    Output layout: [regionkey, count_0, $hashvalue_2]
    Output partitioning: HASH [regionkey][$hashvalue_2]
    Project[]
    │   Layout: [regionkey:bigint, count_0:bigint, $hashvalue_2:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    │   $hashvalue_2 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("regionkey"), 0))
    └─ Aggregate(PARTIAL)[regionkey]
       │   Layout: [regionkey:bigint, count_0:bigint]
       │   count_0 := count(*)
       └─ TableScan[tpch:nation:sf0.01, grouped = false]
              Layout: [regionkey:bigint]
              Estimates: {rows: 25 (225B), cpu: 225, memory: 0B, network: 0B}
              regionkey := tpch:regionkey

Ограничения#

Pushdown агрегаций не поддерживает ряд более сложных конструкций:

Pushdown Join#

Pushdown Join позволяет коннектору делегировать операцию Join источнику данных. Это может привести к повышению производительности и позволяет CedrusData выполнять оставшуюся обработку запроса над меньшим объемом данных.

Особенности поддерживаемого pushdown Join различаются для каждого источника данных. Существуют общие условия, которые должны быть выполнены для pushdown Join:

  • Все предикаты, являющиеся частью Join, должны поддерживать pushdown

  • Таблицы в Join должны принадлежать одному каталогу

Проверить, выполняется ли pushdown для конкретного Join, можно, посмотрев план EXPLAIN запроса. План explain не содержит оператор Join, если Join передан коннектором в источник данных:

EXPLAIN SELECT c.custkey, o.orderkey
FROM orders o JOIN customer c ON c.custkey = o.custkey;

Следующий план получен коннектором PostgreSQL при запросе данных TPC-H в базе данных PostgreSQL. Он не содержит оператор Join в результате успешного pushdown Join.

Fragment 0 [SINGLE]
    Output layout: [custkey, orderkey]
    Output partitioning: SINGLE []
    Output[custkey, orderkey]
    │   Layout: [custkey:bigint, orderkey:bigint]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
    └─ RemoteSource[1]
           Layout: [orderkey:bigint, custkey:bigint]

Fragment 1 [SOURCE]
    Output layout: [orderkey, custkey]
    Output partitioning: SINGLE []
    TableScan[postgres:Query[SELECT l."orderkey" AS "orderkey_0", l."custkey" AS "custkey_1", r."custkey" AS "custkey_2" FROM (SELECT "orderkey", "custkey" FROM "tpch"."orders") l INNER JOIN (SELECT "custkey" FROM "tpch"."customer") r O
        Layout: [orderkey:bigint, custkey:bigint]
        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        orderkey := orderkey_0:bigint:int8
        custkey := custkey_1:bigint:int8

Pushdown limit#

Выражение Выражения LIMIT или FETCH FIRST ограничивает количество возвращаемых записей запроса. Pushdown limit позволяет коннектору передать обработку таких запросов для неотсортированных записей в базовый источник данных. CedrusData поддерживает pushdown конструкций LIMIT N и FETCH FIRST N ROWS.

Реализация и поддержка pushdown limit зависят от конкретного коннектора.

Pushdown Top-N#

Комбинация выражения Выражения LIMIT или FETCH FIRST с выражением Выражение ORDER BY формирует небольшой набор записей, возвращаемый из большого отсортированного набора данных. Она зависит от порядка сортировки для определения того, какие записи необходимо вернуть, и поэтому существенно отличается в плане оптимизации от Pushdown limit.

Pushdown для такого запроса называется pushdown Top-N, поскольку операция возвращает верхние N строк. Он позволяет коннектору передать обработку таких запросов в базовый источник данных и тем самым значительно сократить объем данных, передаваемых в CedrusData и обрабатываемых движком.

CedrusData поддерживает pushdown конструкций ORDER BY ... LIMIT N или ORDER BY ... FETCH FIRST N ROWS.

Реализация и поддержка зависят от конкретного коннектора, так как различные источники данных поддерживают различный синтаксис SQL и обработку.

Например, ниже приведены два запроса, демонстрирующие, как определить поведение pushdown Top-N.

Первый пример — запрос с pushdown Top-N к базе данных PostgreSQL:

SELECT id, name
FROM postgresql.public.company
ORDER BY id
LIMIT 5;

Получить план explain можно, добавив EXPLAIN перед запросом:

EXPLAIN SELECT id, name
FROM postgresql.public.company
ORDER BY id
LIMIT 5;
Fragment 0 [SINGLE]
    Output layout: [id, name]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    Output[id, name]
    │   Layout: [id:integer, name:varchar]
    │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: ?}
    └─ RemoteSource[1]
           Layout: [id:integer, name:varchar]

Fragment 1 [SOURCE]
    Output layout: [id, name]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    TableScan[postgresql:public.company public.company sortOrder=[id:integer:int4 ASC NULLS LAST] limit=5, grouped = false]
        Layout: [id:integer, name:varchar]
        Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
        name := name:varchar:text
        id := id:integer:int4

Второй пример — запрос Top-N к коннектору tpch, который не поддерживает функциональность pushdown Top-N:

SELECT custkey, name
FROM tpch.sf1.customer
ORDER BY custkey
LIMIT 5;

Соответствующий план запроса:

Fragment 0 [SINGLE]
    Output layout: [custkey, name]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    Output[custkey, name]
    │   Layout: [custkey:bigint, name:varchar(25)]
    │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
    └─ TopN[5 by (custkey ASC NULLS LAST)]
       │   Layout: [custkey:bigint, name:varchar(25)]
       └─ LocalExchange[SINGLE] ()
          │   Layout: [custkey:bigint, name:varchar(25)]
          │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
          └─ RemoteSource[1]
                 Layout: [custkey:bigint, name:varchar(25)]

Fragment 1 [SOURCE]
    Output layout: [custkey, name]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    TopNPartial[5 by (custkey ASC NULLS LAST)]
    │   Layout: [custkey:bigint, name:varchar(25)]
    └─ TableScan[tpch:customer:sf1.0, grouped = false]
           Layout: [custkey:bigint, name:varchar(25)]
           Estimates: {rows: 150000 (4.58MB), cpu: 4.58M, memory: 0B, network: 0B}
           custkey := tpch:custkey
           name := tpch:name

В приведенном выше плане запроса операция Top-N TopN[5 by (custkey ASC NULLS LAST)] выполняется во Fragment 0 движком CedrusData, а не исходной базой данных.

Обратите внимание, что по сравнению с запросом, выполненным через коннектор tpch, план explain запроса, выполненного через коннектор postgresql, не содержит ссылки на операцию TopN[5 by (id ASC NULLS LAST)] во Fragment 0. Отсутствие оператора CedrusData TopN во Fragment 0 плана запроса демонстрирует, что запрос использует оптимизацию pushdown Top-N.