How does Index Source work in Presto

Overview

Presto Index Source is an optimizing strategy based on the index of data source. It can improve query performance by avoiding reading of data that would be filtered by join condition.

In Index Source strategy, presto will transfer keys from the left table(probe side) to the right table(the Index Source), then the Index Source will do a lookup operation to fetch records according to the keys it received. After that the left and right table can do a hash join operation. That's to say, using index source will not create any splits or do a table scan operation, index source will read records according to the keys it received directly.

So Presto Index Source performs well when the right table is extremely large and we only need a few of them according to the join conditions. Besides, the right table must have an efficient way to fetch rows associated with keys.

Index Source is a little bit like dynamic filter, they all focus on how to reduce the data amount to be read, but dynamic filter has a timeout, and it focuses on the left table.

Code tracing

In order to figure out how Index Source works, I did some code debugging job, so let's start from a simple test case in class com.facebook.presto.tests.AbstractTestIndexedQueries#testBasicIndexJoin. SQL is :

SELECT * FROM (SELECT * FROM lineitem WHERE partkey % 8 = 0) l JOIN orders o ON l.orderkey = o.orderkey;

From its execution plan we can see that Index Source has changed its right table's ScanProjectOperator to an IndexSourceOperator.

IndexSource[tpch_indexed:com.facebook.presto.tests.tpch.TpchIndexHandle@2981c060, lookup = [orderkey_63]] => [orderkey_63:bigint, custkey:bigint, orderstatus:varchar(1), totalprice:double, orderdate:date, orderpriority:varchar(15), clerk:varchar(15), shippriority:integer, comment_64:varchar(79)]
        CPU: 1.68s (30.82%), Scheduled: 1.77s (28.76%), Output: 332605 rows (41.22MB)
        Input avg.: 523.79 rows, Input std.dev.: 102.78%
        orderkey_63 := tpch:orderkey
        custkey := tpch:custkey
        orderstatus := tpch:orderstatus
        totalprice := tpch:totalprice
        orderdate := tpch:orderdate
        orderpriority := tpch:orderpriority
        clerk := tpch:clerk
        shippriority := tpch:shippriority
        comment_64 := tpch:comment

IndexSourceOperator is defined in class com.facebook.presto.operator.index.IndexSourceOperator. Its core code is :

    @Override
    public Supplier<Optional<UpdatablePageSource>> addSplit(Split split)
    {
        requireNonNull(split, "split is null");
        checkState(source == null, "Index source split already set");

        IndexSplit indexSplit = (IndexSplit) split.getConnectorSplit();

        // Normalize the incoming RecordSet to something that can be consumed by the index
        RecordSet normalizedRecordSet = probeKeyNormalizer.apply(indexSplit.getKeyRecordSet());
        // !!!!!filter the right table's records according to the left table's key set ( indexSplit.getKeyRecordSet() )
        ConnectorPageSource result = index.lookup(normalizedRecordSet);
        // create right table's page source according to the filter result, and read the result set page by page later.
        source = new PageSourceOperator(result, operatorContext);

        Object splitInfo = split.getInfo();
        if (splitInfo != null) {
            operatorContext.setInfoSupplier(() -> new SplitOperatorInfo(splitInfo));
        }

        return Optional::empty;
    }

Method ConnectorIndex#lookup is refer to the lookup operation in execution plan. Step into this method. This interface is only implemented by TpchConnectorIndex:

    @Override
    public ConnectorPageSource lookup(RecordSet rawInputRecordSet)
    {
        // convert the input record set from the column ordering in the query to
        // match the column ordering of the index
        RecordSet inputRecordSet = keyFormatter.apply(rawInputRecordSet);

        // !!!!!lookup the values in the index
        RecordSet rawOutputRecordSet = indexedTable.lookupKeys(inputRecordSet);

        // convert the output record set of the index into the column ordering
        // expect by the query
        return new RecordPageSource(outputFormatter.apply(rawOutputRecordSet));
    }

Let's step into method IndexedTable#lookupKeys to see how they do this filtering job:

        public RecordSet lookupKeys(RecordSet recordSet)
        {
            // Since we only return a cached copy of IndexedTable, please make sure you reorder the input to same order of keyColumns
            checkArgument(recordSet.getColumnTypes().equals(keyTypes), "Input RecordSet keys do not match expected key type");

            Iterable<RecordSet> outputRecordSets = Iterables.transform(tupleIterable(recordSet), key -> {
                for (Object value : key.getValues()) {
                    if (value == null) {
                        throw new IllegalArgumentException("TPCH index does not support null values");
                    }
                }
                // lookup record by specified key
                return lookupKey(key);
            });

            // We will return result same order as outputColumns
            return new ConcatRecordSet(outputRecordSets, outputTypes);
        }

        private RecordSet lookupKey(MaterializedTuple tupleKey)
        {
            // fetch records from cache key -> record mapping in local attribute keyToValues
            return new MaterializedTupleRecordSet(keyToValues.get(tupleKey), outputTypes);
        }

For tpch is just a test connector, so when com.facebook.presto.tests.tpch.IndexedTpchConnectorFactory is initializing, it has read all the records in table orders and cached them as key -> record mapping in a ListMultimap named keyToValues, so here we can just get record by key and return. Below is how tpch connector cache data in table orders.

    public TpchIndexedData(String connectorId, TpchIndexSpec tpchIndexSpec)
    {
        requireNonNull(connectorId, "connectorId is null");
        requireNonNull(tpchIndexSpec, "tpchIndexSpec is null");

        TpchMetadata tpchMetadata = new TpchMetadata(connectorId);
        TpchRecordSetProvider tpchRecordSetProvider = new TpchRecordSetProvider();

        ImmutableMap.Builder<Set<TpchScaledColumn>, IndexedTable> indexedTablesBuilder = ImmutableMap.builder();

        Set<TpchScaledTable> tables = tpchIndexSpec.listIndexedTables();
        for (TpchScaledTable table : tables) {
            SchemaTableName tableName = new SchemaTableName("sf" + table.getScaleFactor(), table.getTableName());
            TpchTableHandle tableHandle = tpchMetadata.getTableHandle(null, tableName);
            Map<String, ColumnHandle> columnHandles = new LinkedHashMap<>(tpchMetadata.getColumnHandles(null, tableHandle));
            for (Set<String> columnNames : tpchIndexSpec.getColumnIndexes(table)) {
                List<String> keyColumnNames = ImmutableList.copyOf(columnNames); // Finalize the key order
                Set<TpchScaledColumn> keyColumns = keyColumnNames.stream()
                        .map(name -> new TpchScaledColumn(table, name))
                        .collect(toImmutableSet());

                TpchTable<?> tpchTable = TpchTable.getTable(table.getTableName());
                RecordSet recordSet = tpchRecordSetProvider.getRecordSet(tpchTable, ImmutableList.copyOf(columnHandles.values()), table.getScaleFactor(), 0, 1, TupleDomain.all());
                IndexedTable indexedTable = indexTable(recordSet, ImmutableList.copyOf(columnHandles.keySet()), keyColumnNames);
                indexedTablesBuilder.put(keyColumns, indexedTable);
            }
        }

        indexedTables = indexedTablesBuilder.build();
    }

    private static IndexedTable indexTable(RecordSet recordSet, final List<String> outputColumns, List<String> keyColumns)
    {
        List<Integer> keyPositions = keyColumns.stream()
                .map(columnName -> {
                    int position = outputColumns.indexOf(columnName);
                    checkState(position != -1);
                    return position;
                })
                .collect(toImmutableList());

        ImmutableListMultimap.Builder<MaterializedTuple, MaterializedTuple> indexedValuesBuilder = ImmutableListMultimap.builder();

        List<Type> outputTypes = recordSet.getColumnTypes();
        List<Type> keyTypes = extractPositionValues(outputTypes, keyPositions);

        RecordCursor cursor = recordSet.cursor();
        while (cursor.advanceNextPosition()) {
            List<Object> values = extractValues(cursor, outputTypes);
            List<Object> keyValues = extractPositionValues(values, keyPositions);

            indexedValuesBuilder.put(new MaterializedTuple(keyValues), new MaterializedTuple(values));
        }

        return new IndexedTable(keyColumns, keyTypes, outputColumns, outputTypes, indexedValuesBuilder.build());
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容