大數(shù)據(jù)平臺建設(shè) —— SQL查詢引擎之Presto

大數(shù)據(jù)查詢引擎Presto簡介

SQL on Hadoop:

  • Hive的出現(xiàn)讓技術(shù)人員可以通過類SQL的方式對批量數(shù)據(jù)進(jìn)行查詢,而不用開發(fā)MapReduce程序
  • MapReduce計算過程中大量的中間結(jié)果磁盤落地使運行效率較低
  • 為了提高SQL on Hadoop的效率,各大工具應(yīng)運而生,比如Shark、Impala等

SQL on Hadoop的常見工具:


image.png

Presto是什么:

  • Presto是由Facebook開發(fā)的分布式SQL查詢引擎,用來進(jìn)行高速實時的數(shù)據(jù)分析
  • Presto的產(chǎn)生是為了解決Hive的MapReduce模型太慢且不能通過BI等工具展現(xiàn)HDFS數(shù)據(jù)的問題
  • Presto是一個計算引擎,它不存儲數(shù)據(jù),通過豐富的Connector獲取第三方服務(wù)的數(shù)據(jù),并支持?jǐn)U展

Presto顯而易見的優(yōu)點:

  • Presto支持標(biāo)準(zhǔn)的SQL,降低了分析人員和開發(fā)人員的使用門檻
  • Presto支持可插拔的Connector,可以連接多種數(shù)據(jù)源。包括HiveRDBMS、Kafka、 MongoDB等等
  • Presto是一個低延時、高并發(fā)的內(nèi)存計算引擎,比Hive執(zhí)行效率高的多

Presto數(shù)據(jù)模型:

  • Catalog:即數(shù)據(jù)源。Hive、 Mysq|都 是數(shù)據(jù)源。Presto可 以連接多個Hive和多個Mysql。
  • Schema:類比于DataBase,一個Catalog下有多個Schema
  • Table:數(shù)據(jù)表,與我們常用的數(shù)據(jù)庫表意義相同,一個Schema下有多個數(shù)據(jù)表

Presto架構(gòu)與執(zhí)行流程

Presto架構(gòu)圖:


image.png

Presto為Master - Slave架構(gòu),由三部分組成:

  • 一個Coordinator節(jié)點
  • 一個Discovery Server節(jié)點
  • 多個Worker節(jié)點

Presto組件:

  • Coordinator負(fù)責(zé)解析SQL語句,生成查詢計劃,分發(fā)執(zhí)行任務(wù)
  • Discovery Server負(fù)責(zé)維護(hù)Coordinator和Worker的關(guān)系,通常內(nèi)嵌于Coordinator節(jié)點
  • Worker節(jié)點負(fù)責(zé)執(zhí)行查詢?nèi)蝿?wù)以及與HDFS進(jìn)行交互讀取數(shù)據(jù)

Presto查詢流程:


image.png

Presto的一些名詞:

  • Plan:Presto將需要執(zhí)行的SQL進(jìn)行解析,生成執(zhí)行計劃
  • Stage:Presto執(zhí)行計劃分 為多個Stage,比如讀取數(shù)據(jù)、聚合數(shù)據(jù)等
  • Exchange:用于連接不同的Stage,進(jìn)行數(shù)據(jù)交互
  • Task:Stage由多個Task組成,每個Task分配到 一個Worker執(zhí)行
  • Split:一個分片表示大的數(shù)據(jù)集合中的一個小子集,與MapReduce類似
  • Page:Presto 中處理的最小數(shù)據(jù)單元

關(guān)于數(shù)據(jù)庫架構(gòu)設(shè)計:

  • Shared Everthting:完全透明共享CPU/MEMORY/IO,并行處理能力是最差的
  • Shared Storage:各個處理單元使用自己的私有CPU和Memory,共享磁盤系統(tǒng)
  • Shared Nothing:各個處理單元都有自己私有的CPU/內(nèi)存/硬盤等

Presto屬于MPP架構(gòu)設(shè)計:


image.png

MPP架構(gòu)的優(yōu)缺點

  • 易擴(kuò)容:可輕松通過擴(kuò)展機(jī)器節(jié)點(處理單元)擴(kuò)展整個系統(tǒng)的分布式存儲和計算能力
  • 效率高:任務(wù)并行執(zhí)行能力強(qiáng),充分發(fā)揮本地計算的能力,數(shù)據(jù)無共享、無I/O沖突,無鎖資源競爭,計算速度快
  • 短板效應(yīng):單個節(jié)點查詢效率慢會影響整個查詢

Presto安裝部署

官方文檔:

Presto的安裝方式有兩種,一是到官網(wǎng)下載編譯好的二進(jìn)制包進(jìn)行安裝,二是從Github倉庫上拉取源碼進(jìn)行編譯安裝。為了簡單起見,我這里選擇第一種方式,Server和Client都需要下載。

將下載的安裝包上傳到服務(wù)器上:

[root@hadoop ~]# cd /usr/local/src
[root@hadoop /usr/local/src]# ls
presto-server-0.243.2.tar.gz    presto-cli-0.243.2-executable.jar
[root@hadoop /usr/local/src]# 

解壓presto-server安裝包,并移動到合適的目錄下:

[root@hadoop /usr/local/src]# tar -zxvf presto-server-0.243.2.tar.gz
[root@hadoop /usr/local/src]# mv presto-server-0.243.2 /usr/local/presto-server
[root@hadoop /usr/local/src]# cd /usr/local/presto-server/
[root@hadoop /usr/local/presto-server]# ls
bin  lib  NOTICE  plugin  README.txt
[root@hadoop /usr/local/presto-server]# 

配置presto-server:

[root@hadoop /usr/local/presto-server]# mkdir etc
[root@hadoop /usr/local/presto-server]# vim etc/config.properties
# 作為coordinator節(jié)點
coordinator=true
# 指定即是coordinator也是work節(jié)點
node-scheduler.include-coordinator=true
http-server.http.port=9090
# 是否使用內(nèi)嵌的discovery-server
discovery-server.enabled=true
discovery.uri=http://192.168.243.161:9090

[root@hadoop /usr/local/presto-server]# vim etc/node.properties  # 每個節(jié)點的特殊配置
# presto集群的名稱
node.environment=presto_dev
# 當(dāng)前節(jié)點的id
node.id=ffffffff-ffff-ffff-ffff-ffffffffff01
# 節(jié)點的數(shù)據(jù)存儲目錄
node.data-dir=/data/presto

[root@hadoop /usr/local/presto-server]# vim etc/jvm.config  # JVM相關(guān)配置
-server
-Xmx8G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError

[root@hadoop /usr/local/presto-server]# vim etc/log.properties   # 日志相關(guān)配置
com.facebook.presto=INFO

配置catalog的連接信息:

[root@hadoop /usr/local/presto-server]# mkdir etc/catalog
[root@hadoop /usr/local/presto-server]# vim etc/catalog/jmx.properties
connector.name=jmx

[root@hadoop /usr/local/presto-server]# vim etc/catalog/hive.properties
connector.name=hive-hadoop2
hive.metastore.uri=thrift://192.168.243.161:9083
hive.config.resources=/usr/local/hadoop-2.8.5/etc/hadoop/hdfs-site.xml,/usr/local/hadoop-2.8.5/etc/hadoop/core-site.xml
hive.allow-drop-table=false

完成以上的配置后,啟動presto-server:

[root@hadoop /usr/local/presto-server]# bin/launcher run
...

2020-11-16T16:55:35.776+0800    INFO    main    com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========

以上這種啟動方式是前臺啟動,后臺啟動的方式如下:

[root@hadoop /usr/local/presto-server]# bin/launcher start
Started as 5908
[root@hadoop /usr/local/presto-server]# 

檢查presto-server進(jìn)程是否正常:

[root@hadoop /usr/local/presto-server]# jps |grep -i presto
5908 PrestoServer
[root@hadoop /usr/local/presto-server]# netstat -lntp |grep 5908
tcp6       0      0 :::39225                :::*            LISTEN      5908/java           
tcp6       0      0 :::42622                :::*            LISTEN      5908/java           
tcp6       0      0 :::9090                 :::*            LISTEN      5908/java           
tcp6       0      0 :::36714                :::*            LISTEN      5908/java           
tcp6       0      0 :::45066                :::*            LISTEN      5908/java           
tcp6       0      0 :::32982                :::*            LISTEN      5908/java           
[root@hadoop /usr/local/presto-server]# 

將presto-client的jar包移動到bin目錄下:

[root@hadoop /usr/local/presto-server]# mv /usr/local/src/presto-cli-0.243.2-executable.jar bin/presto-cli.jar
[root@hadoop /usr/local/presto-server]# chmod a+x bin/presto-cli.jar

使用presto-client連接presto-server,進(jìn)入到交互式終端,測試下能否正常查詢Hive中的數(shù)據(jù):

[root@hadoop /usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
presto> show catalogs;
 Catalog 
---------
 hive    
 jmx     
 system  
(3 rows)

Query 20201116_091555_00001_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]

presto> show schemas;
       Schema       
--------------------
 db01               
 default            
 information_schema 
(3 rows)

Query 20201116_091557_00002_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [3 rows, 44B] [16 rows/s, 243B/s]

presto> use db01;
USE
presto:db01> show tables;
  Table   
----------
 log_dev  
 log_dev2 
(2 rows)

Query 20201116_091652_00004_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [2 rows, 43B] [5 rows/s, 117B/s]

presto:db01> select * from log_dev;
 id |   name   | create_time | creator |      info      
----+----------+-------------+---------+----------------
  4 | 更新用戶 |  1554189515 | yarn    | 更新用戶 test3 
  6 | 創(chuàng)建用戶 |  1554299345 | yarn    | 創(chuàng)建用戶 test5 
(2 rows)

Query 20201116_091705_00005_cus94, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [2 rows/s, 84B/s]

presto:db01> 

presto-server提供了ui界面,可以在該界面上查看一些監(jiān)控信息。使用瀏覽器訪問9090端口:


image.png

點擊Query ID可以進(jìn)入Query Detail頁面查看該Query的詳細(xì)信息:


image.png

往下拉可以查看Stages和Task信息:


image.png

點擊“Live Plan”可以查看執(zhí)行計劃:


image.png

通過Jdbc操作Presto

在上一小節(jié)中,簡單演示了使用presto-client操作presto-server,本小節(jié)則演示下如何通過編寫代碼以JDBC的方式操作presto-server。首先,創(chuàng)建Maven項目,pom文件的內(nèi)容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>presto-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>com.facebook.presto</groupId>
            <artifactId>presto-jdbc</artifactId>
            <version>0.243.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

編寫JDBC代碼如下:

package com.example.presto.demo;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

/**
 * 使用JDBC操作Presto
 *
 * @author 01
 * @date 2020-11-16
 **/
public class JdbcTest {

    public static void main(String[] args) throws Exception {
        Class.forName("com.facebook.presto.jdbc.PrestoDriver");
        Connection connection = DriverManager.getConnection(
                "jdbc:presto://192.168.243.161:9090/hive/db01",
                "root", null
        );
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("select * from log_dev");
        while (resultSet.next()) {
            for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
                System.out.print(resultSet.getString(i) + "\t");
            }
            System.out.println();
        }
        resultSet.close();
        connection.close();
    }
}

執(zhí)行結(jié)果如下:


image.png

Presto UDF開發(fā)之Scalar函數(shù)

與Hive和Spark SQL一樣,Presto也支持用戶自定義函數(shù)(UDF)。Presto UDF:

  • 在Presto中,函數(shù)大體分為三種:scalar、aggregation和window
  • Scalar就是標(biāo)量函數(shù),簡單來說就是Java中的一個靜態(tài)方法,本身沒有任何狀態(tài)
  • Aggregation函數(shù),就是需要累積狀態(tài)的函數(shù),例如COUNT、AVG等

Scalar函數(shù)的開發(fā)步驟:

  • 定義一個Java類,用@ScalarFunction注解標(biāo)記實現(xiàn)業(yè)務(wù)邏輯的靜態(tài)方法
  • 使用@Description描述函數(shù)的作用,這里的內(nèi)容會在SHOW FUNCTIONS中顯示
  • 使用@SqlType標(biāo)記函數(shù)的返回值類型

pom文件中,添加如下依賴:

<dependency>
    <groupId>com.facebook.presto</groupId>
    <artifactId>presto-spi</artifactId>
    <version>0.243</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>21.0</version>
</dependency>

開發(fā)一個scalar類型函數(shù),實現(xiàn)為字符串添加一個前綴,代碼示例:

package com.example.presto.demo.udf;

import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

public class PrefixFunction {

    /**
     * 為字符串添加一個前綴
     * presto中沒有String類型,使用Slice代替
     */
    @ScalarFunction("Prefix")
    @Description("prefix string")
    @SqlType(StandardTypes.VARCHAR)
    public static Slice prefix(@SqlType(StandardTypes.VARCHAR) Slice value) {
        return Slices.utf8Slice("presto_udf_" + value.toStringUtf8());
    }
}

scalar類型函數(shù)支持傳入多個值,例如可以實現(xiàn)一個根據(jù)傳入的數(shù)據(jù)生成json字符串的函數(shù),代碼示例:

package com.example.presto.demo.udf;

import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlNullable;
import com.facebook.presto.spi.function.SqlType;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

public class GenJson {

    /**
     * 根據(jù)傳入的數(shù)據(jù)生成json字符串
     */
    @ScalarFunction("GenJson")
    @Description("gen json string")
    @SqlType(StandardTypes.VARCHAR)
    public static Slice genJson(@SqlType(StandardTypes.VARCHAR) Slice key,
                                @SqlType(StandardTypes.VARCHAR) Slice value) {
        return Slices.utf8Slice(
                String.format("{\"%s\":\"%s\"}", key.toStringUtf8(),
                        value == null ? "" : value.toStringUtf8())
        );
    }
}

編寫一個Plugin的實現(xiàn)類,在getFunctions方法中添加我們開發(fā)的UDF函數(shù)。代碼如下:

package com.example.presto.demo.udf;

import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;

import java.util.Set;

public class ExampleFunctionsPlugin implements Plugin {

    @Override
    public Set<Class<?>> getFunctions() {
        return ImmutableSet.<Class<?>>builder()
                .add(PrefixFunction.class)
                .add(GenJson.class)
                .build();
    }
}

最后還需要在項目的resources目錄下創(chuàng)建如下目錄文件:

image.png

文件內(nèi)容如下:

      com.example.presto.demo.udf.ExampleFunctionsPlugin

將項目編譯并打包上傳到服務(wù)器:

[root@hadoop ~/jars]# ls
presto-test-1.0-SNAPSHOT.jar
[root@hadoop ~/jars]# 

將jar包拷貝到presto-server的plugin目錄下:

[root@hadoop ~]# mkdir /usr/local/presto-server/plugin/example-functions
[root@hadoop ~]# cp jars/presto-test-1.0-SNAPSHOT.jar /usr/local/presto-server/plugin/example-functions
[root@hadoop ~]# cp /usr/local/presto-server/plugin/hive-hadoop2/guava-26.0-jre.jar /usr/local/presto-server/plugin/example-functions  # 項目中依賴了guava,所以需要一并拷貝
[root@hadoop ~]# ls /usr/local/presto-server/plugin/example-functions
guava-26.0-jre.jar  presto-test-1.0-SNAPSHOT.jar

重啟presto-server:

[root@hadoop ~]# /usr/local/presto-server/bin/launcher restart

使用presto-cli進(jìn)入交互命令行,驗證一下我們開發(fā)的UDF函數(shù)是否生效:

[root@hadoop /usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
presto> use db01;
USE
presto:db01> select Prefix(name) from log_dev;
        _col0        
---------------------
 presto_udf_更新用戶 
 presto_udf_創(chuàng)建用戶 
(2 rows)

Query 20201116_121815_00002_upy9p, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [1 rows/s, 63B/s]

presto:db01> select GenJson(creator, name) from log_dev;
        _col0        
---------------------
 {"yarn":"更新用戶"} 
 {"yarn":"創(chuàng)建用戶"} 
(2 rows)

Query 20201116_121905_00003_upy9p, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [2 rows, 84B] [8 rows/s, 336B/s]

presto:db01> 

Presto UDF開發(fā)之Aggregation函數(shù)

Aggregation函數(shù)中的幾個概念:

  • input(state, data):針對每條數(shù)據(jù),執(zhí)行input函數(shù),在每個有數(shù)據(jù)的節(jié)點都會執(zhí)行,最終得到多個累積的狀態(tài)數(shù)據(jù)
  • combine(state1, state2):將所有節(jié)點的狀態(tài)數(shù)據(jù)聚合起來,直至所有狀態(tài)數(shù)據(jù)被聚合成一個最終狀態(tài),即Aggregation函數(shù)的輸出結(jié)果
  • output(final_state, out):最終輸出結(jié)果到一個BlockBuilder

Aggregation函數(shù)的開發(fā)步驟:

  • 定義一個Java類,用@AggregationFunction標(biāo)記為Aggregation函數(shù)
  • 使用@InputFunction、@CombineFunction@OutputFunction分別標(biāo)記計算函數(shù)、合并結(jié)果函數(shù)和最終輸出函數(shù)
  • 實現(xiàn)相關(guān)函數(shù)邏輯

首先,定義一個接口,繼承AccumulatorState,聲明用于提供和獲取值的方法:

package com.example.presto.demo.udf;

import com.facebook.presto.spi.function.AccumulatorState;
import io.airlift.slice.Slice;

public interface StringValueState extends AccumulatorState {
    Slice getStringValue();

    void setStringValue(Slice value);
}

然后定義一個Java類,實現(xiàn)Aggregation函數(shù)的核心邏輯:

package com.example.presto.demo.udf;

import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.spi.function.*;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;

/**
 * Aggregation函數(shù) - 實現(xiàn)字符串連接功能
 *
 * @author 01
 */
@AggregationFunction("ConcatStr")
public class ConCatFunction {

    @InputFunction
    public static void input(StringValueState state,
                             @SqlType(StandardTypes.VARCHAR) Slice value) {
        state.setStringValue(Slices.utf8Slice(
                checkNull(state.getStringValue()) + "|" +
                        value.toStringUtf8()
        ));
    }

    @CombineFunction
    public static void combine(StringValueState state,
                               StringValueState otherState) {
        state.setStringValue(Slices.utf8Slice(
                checkNull(state.getStringValue()) + "|" +
                        checkNull(otherState.getStringValue())
        ));
    }

    @OutputFunction(StandardTypes.VARCHAR)
    public static void output(StringValueState state,
                              BlockBuilder blockBuilder) {
        VarcharType.VARCHAR.writeSlice(blockBuilder, state.getStringValue());
    }

    private static String checkNull(Slice slice) {
        return slice == null ? "" : slice.toStringUtf8();
    }
}

然后還需要在ExampleFunctionsPlugin中添加該函數(shù):

public class ExampleFunctionsPlugin implements Plugin {

    @Override
    public Set<Class<?>> getFunctions() {
        return ImmutableSet.<Class<?>>builder()
                ...
                .add(ConCatFunction.class)
                .build();
    }
}

將項目編譯打包并上傳到服務(wù)器:

[root@hadoop ~]# ls jars/
presto-test-1.0-SNAPSHOT.jar
[root@hadoop ~]# 

覆蓋之前的jar包:

[root@hadoop ~]# cp jars/presto-test-1.0-SNAPSHOT.jar /usr/local/presto-server/plugin/example-functions/
cp:是否覆蓋"/usr/local/presto-server/plugin/example-functions/presto-test-1.0-SNAPSHOT.jar"? yes
[root@hadoop ~]# 

重啟presto-server:

[root@hadoop ~]# /usr/local/presto-server/bin/launcher restart

使用presto-cli進(jìn)入交互命令行,驗證一下我們開發(fā)的UDF函數(shù)是否生效:

[root@hadoop /usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
presto> use db01;
USE
presto:db01> select ConcatStr(creator) from log_dev2;
              _col0              
---------------------------------
 ||hdfs|yarn|hdfs|yarn|hdfs|yarn 
(1 row)

Query 20201116_124714_00001_inrgm, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:01 [6 rows, 825B] [4 rows/s, 571B/s]

presto:db01> 

Presto EventListener開發(fā)

Event Listener是Presto提供的事件監(jiān)聽機(jī)制,我們可以通過開發(fā)自己的Event Listener來監(jiān)聽Presto中發(fā)生的一些事件,例如建立查詢、查詢成功/失敗等事件。總體來說,Event Listener有點類似于Hive中的Hook。Presto提供了三種Event Listener:

  • Query Creation:Presto查詢建立相關(guān)信息
  • Query completion:查詢執(zhí)行相關(guān)信息,包含成功查詢的細(xì)節(jié)信息,失敗查詢的錯誤碼等信息
  • Split completion:split執(zhí)行信息,同理包含成功和失敗的細(xì)節(jié)信息

Event Listener的開發(fā)步驟:

  • 實現(xiàn)Presto的EventListenerEventListenerFactory接口
  • 基于服務(wù)提供者接口(SPI)正確的打包我們的jar
  • 部署,放到Presto指定目錄,修改配置文件并重啟服務(wù)

接下來演示一下開發(fā)一個EventListener,實現(xiàn)監(jiān)聽事件并將事件信息寫入日志文件。首先,編寫EventListener的實現(xiàn)類,核心邏輯都在該類中。代碼如下:

package com.example.presto.demo.eventlistener;

import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;

public class QueryEventListener implements EventListener {

    private final String logPath;

    public QueryEventListener(Map<String, String> config) {
        logPath = config.get("log.path");
        System.out.println(logPath);
    }

    /**
     * 監(jiān)聽創(chuàng)建查詢事件
     */
    @Override
    public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
        String queryId = queryCreatedEvent.getMetadata().getQueryId();
        String query = queryCreatedEvent.getMetadata().getQuery();
        String user = queryCreatedEvent.getContext().getUser();
        String fileName = logPath + File.separator + queryId;
        File logFile = new File(fileName);
        if (!logFile.exists()) {
            try {
                boolean result = logFile.createNewFile();
                System.out.println(result);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        try (FileWriter fw = new FileWriter(fileName, true)) {
            fw.append(String.format("User:%s Id:%s Query:%s%n", user, queryId, query));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 監(jiān)聽查詢完成事件
     */
    @Override
    public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
        String queryId = queryCompletedEvent.getMetadata().getQueryId();
        long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
        long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
        long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
        String queryState = queryCompletedEvent.getMetadata().getQueryState();

        queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
            int errCode = queryFailureInfo.getErrorCode().getCode();
            String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
            String failureHost = queryFailureInfo.getFailureHost().orElse("");
            String failureMessage = queryFailureInfo.getFailureMessage().orElse("");
        });

        String fileName = logPath + File.separator + queryId;
        try (FileWriter fw = new FileWriter(fileName, true)) {
            fw.append(String.format("Id:%s StartTime:%s EndTime:%s State:%s%n",
                    queryId, createTime, endTime, queryState));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 監(jiān)聽split完成事件
     */
    @Override
    public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
        long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
        long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MAX).toEpochMilli();
        String queryId = splitCompletedEvent.getQueryId();
        String stageId = splitCompletedEvent.getStageId();
        String taskId = splitCompletedEvent.getTaskId();

        String fileName = logPath + File.separator + queryId;
        try (FileWriter fw = new FileWriter(fileName, true)) {
            fw.append(String.format("Id:%s StartTime:%s EndTime:%s StageId:%s TaskId:%s%n",
                    queryId, createTime, endTime, stageId, taskId));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

然后編寫一個工廠類實現(xiàn)EventListenerFactory接口,用于創(chuàng)建我們自定義的QueryEventListener

package com.example.presto.demo.eventlistener;

import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;

import java.util.Map;

public class QueryEventListenerFactory implements EventListenerFactory {

    @Override
    public String getName() {
        // EventListener的名稱
        return "query-event-listener";
    }

    @Override
    public EventListener create(Map<String, String> config) {
        if (!config.containsKey("log.path")) {
            throw new RuntimeException("missing log.path conf");
        }
        return new QueryEventListener(config);
    }
}

編寫Plugin的實現(xiàn)類,在getEventListenerFactories方法中添加我們自定義的EventListener創(chuàng)建工廠:

package com.example.presto.demo.eventlistener;

import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;

import java.util.Collections;

public class QueryEventPlugin implements Plugin {

    @Override
    public Iterable<EventListenerFactory> getEventListenerFactories() {
        QueryEventListenerFactory queryEventListenerFactory = new QueryEventListenerFactory();
        return Collections.singletonList(queryEventListenerFactory);
    }
}

最后還需要在com.facebook.presto.spi.Plugin文件中,添加QueryEventPlugin類的包路徑:

      com.example.presto.demo.eventlistener.QueryEventPlugin

將項目編譯打包并上傳到服務(wù)器:

[root@hadoop ~]# ls jars/
presto-test-1.0-SNAPSHOT.jar
[root@hadoop ~]# 

將jar包拷貝到presto-server的plugin目錄下:

[root@hadoop ~]# mkdir /usr/local/presto-server/plugin/event-listener
[root@hadoop ~]# cp jars/presto-test-1.0-SNAPSHOT.jar /usr/local/presto-server/plugin/event-listener
[root@hadoop ~]# cp /usr/local/presto-server/plugin/hive-hadoop2/guava-26.0-jre.jar /usr/local/presto-server/plugin/event-listener  # 項目中依賴了guava,所以需要一并拷貝
[root@hadoop ~]# ls /usr/local/presto-server/plugin/event-listener
guava-26.0-jre.jar  presto-test-1.0-SNAPSHOT.jar

刪除example-functions目錄,否則會在啟動presto-server時因為重復(fù)注冊UDF而報錯:

[root@hadoop ~]# rm -rf /usr/local/presto-server/plugin/example-functions/

然后還需要配置一下presto的event-listener:

[root@hadoop ~]# vim /usr/local/presto-server/etc/event-listener.properties
event-listener.name=query-event-listener
log.path=/data/presto/log
[root@hadoop ~]# mkdir -p /data/presto/log

重啟presto-server:

[root@hadoop ~]# /usr/local/presto-server/bin/launcher restart

使用presto-cli進(jìn)入交互命令行,隨便執(zhí)行一些查詢語句:

[root@hadoop /usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
presto> use db01;
USE
presto:db01> select * from log_dev;
 id |   name   | create_time | creator |      info      
----+----------+-------------+---------+----------------
  4 | 更新用戶 |  1554189515 | yarn    | 更新用戶 test3 
  6 | 創(chuàng)建用戶 |  1554299345 | yarn    | 創(chuàng)建用戶 test5 
(2 rows)

Query 20201116_132643_00001_tvyva, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [1 rows/s, 58B/s]

presto:db01> select * from log_dev2 limit 1;
 id |   name   | create_time | creator |     info      
----+----------+-------------+---------+---------------
  1 | 創(chuàng)建用戶 |  1554099545 | hdfs    | 創(chuàng)建用戶 test 
(1 row)

Query 20201116_132652_00002_tvyva, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [1 rows, 825B] [3 rows/s, 2.48KB/s]

presto:db01> 

然后驗證一下我們開發(fā)的EventListener是否生效,查看是否有記錄相應(yīng)的事件日志信息即可:

[root@hadoop ~]# ls /data/presto/log/
20201116_132435_00000_tvyva  20201116_132643_00001_tvyva  20201116_132652_00002_tvyva
[root@hadoop ~]# cat /data/presto/log/20201116_132435_00000_tvyva 
User:root Id:20201116_132435_00000_tvyva Query:use db01
Id:20201116_132435_00000_tvyva StartTime:1605533075986 EndTime:1605533076000 State:FINISHED
[root@hadoop ~]# cat /data/presto/log/20201116_132643_00001_tvyva 
User:root Id:20201116_132643_00001_tvyva Query:select * from log_dev
Id:20201116_132643_00001_tvyva StartTime:1605533204999 EndTime:1605533205193 StageId:20201116_132643_00001_tvyva.1 TaskId:0
...
Id:20201116_132643_00001_tvyva StartTime:1605533203889 EndTime:1605533205297 State:FINISHED
[root@hadoop ~]# cat /data/presto/log/20201116_132652_00002_tvyva 
User:root Id:20201116_132652_00002_tvyva Query:select * from log_dev2 limit 1
Id:20201116_132652_00002_tvyva StartTime:1605533212541 EndTime:1605533212644 StageId:20201116_132652_00002_tvyva.1 TaskId:0
...
Id:20201116_132652_00002_tvyva StartTime:1605533212413 EndTime:1605533212688 State:FINISHED
[root@hadoop ~]# 

Presto配置優(yōu)化

Presto架構(gòu):

  • Presto采用典型的Master - Slave架構(gòu)模型
  • Coordinator和Worker依賴Discovery Server進(jìn)行相互通信
  • Coordinator和DiscoveryServer在設(shè)計上是單點的,存在單點問題

Presto高可用方案之綁定虛擬IP:


image.png

Presto高可用方案之獨立部署Discovery Server:


image.png

Presto內(nèi)存模型:

  • Presto采用邏輯的內(nèi)存池,來管理不同類型的內(nèi)存需求
  • Presto把整個內(nèi)存劃分成三個內(nèi)存池,分別是System Pool,ReservedPool,General Pool
  • Presto 0.201+版本之后,默認(rèn)不啟用SystemPool,只保留ReservedPool和General Pool
  • System Pool是用來保留給系統(tǒng)使用的,默認(rèn)為40%的內(nèi)存空間留給系統(tǒng)使用,0.201+版本,SystemPool合并到GeneralPool
  • Reserved Pool和General Pool用來分配query運行時內(nèi)存
  • 大部分的query使用General Pool,當(dāng)General Pool滿了之后,將使用內(nèi)存最大的SQL放到Reserved Pool執(zhí)行

Presto內(nèi)存管理:

  • Query內(nèi)存管理:query劃分成很多task,每個task會有一個線程循環(huán)獲取task的狀態(tài),包括task所用內(nèi)存。匯總成query所用內(nèi)存
  • 機(jī)器內(nèi)存管理:Coordinator有一個線程,定時的輪詢每臺機(jī)器,查看當(dāng)前的機(jī)器內(nèi)存狀態(tài)

Presto通過兩點判斷集群是否達(dá)到了內(nèi)存的上限:

  • General Pool出現(xiàn)阻塞節(jié)點(Block node)
  • Reserved Pool已經(jīng)被使用

通過設(shè)置query.low-memory-killer.policy配置參數(shù),可以指定kill查詢的策略。該參數(shù)取值:total-reservation-on-blocked-nodes(kill在阻塞節(jié)點上使用內(nèi)存最多的查詢)或者total-reservation(kill最耗費內(nèi)存的查詢)

在了解了Presto的內(nèi)存模型和內(nèi)存管理后,以下列舉一些在Presto中可以優(yōu)化的配置參數(shù):

  • query.max-memory:單個query在整個集群中允許占用的最大user memory
  • query.max-total-memory:單個query在整個集群中允許占用的最大(user + system) memory
  • query.max-memory-per-node:一個query在單個worker上允許的最大user memory,即ReservedPool,默認(rèn)為heapSize的0.1
  • query.max-total-memory-per-node:一個query在單個worker上允許的最大(user + system) memory

用戶查詢數(shù)據(jù)量/復(fù)雜性,決定了ReservedPool大?。挥脩舨樵儾l(fā)度,決定了jvm heapSize的大小

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容