大數(shù)據(jù)查詢引擎Presto簡介
SQL on Hadoop:
- Hive的出現(xiàn)讓技術(shù)人員可以通過類SQL的方式對批量數(shù)據(jù)進(jìn)行查詢,而不用開發(fā)MapReduce程序
- MapReduce計算過程中大量的中間結(jié)果磁盤落地使運行效率較低
- 為了提高SQL on Hadoop的效率,各大工具應(yīng)運而生,比如Shark、Impala等
SQL on Hadoop的常見工具:

Presto是什么:
- Presto是由Facebook開發(fā)的分布式SQL查詢引擎,用來進(jìn)行高速實時的數(shù)據(jù)分析
- Presto的產(chǎn)生是為了解決Hive的MapReduce模型太慢且不能通過BI等工具展現(xiàn)HDFS數(shù)據(jù)的問題
- Presto是一個計算引擎,它不存儲數(shù)據(jù),通過豐富的Connector獲取第三方服務(wù)的數(shù)據(jù),并支持?jǐn)U展
Presto顯而易見的優(yōu)點:
- Presto支持標(biāo)準(zhǔn)的SQL,降低了分析人員和開發(fā)人員的使用門檻
- Presto支持可插拔的Connector,可以連接多種數(shù)據(jù)源。包括HiveRDBMS、Kafka、 MongoDB等等
- Presto是一個低延時、高并發(fā)的內(nèi)存計算引擎,比Hive執(zhí)行效率高的多
Presto數(shù)據(jù)模型:
- Catalog:即數(shù)據(jù)源。Hive、 Mysq|都 是數(shù)據(jù)源。Presto可 以連接多個Hive和多個Mysql。
- Schema:類比于DataBase,一個Catalog下有多個Schema
- Table:數(shù)據(jù)表,與我們常用的數(shù)據(jù)庫表意義相同,一個Schema下有多個數(shù)據(jù)表
Presto架構(gòu)與執(zhí)行流程
Presto架構(gòu)圖:

Presto為Master - Slave架構(gòu),由三部分組成:
- 一個Coordinator節(jié)點
- 一個Discovery Server節(jié)點
- 多個Worker節(jié)點
Presto組件:
- Coordinator負(fù)責(zé)解析SQL語句,生成查詢計劃,分發(fā)執(zhí)行任務(wù)
- Discovery Server負(fù)責(zé)維護(hù)Coordinator和Worker的關(guān)系,通常內(nèi)嵌于Coordinator節(jié)點
- Worker節(jié)點負(fù)責(zé)執(zhí)行查詢?nèi)蝿?wù)以及與HDFS進(jìn)行交互讀取數(shù)據(jù)
Presto查詢流程:

Presto的一些名詞:
- Plan:Presto將需要執(zhí)行的SQL進(jìn)行解析,生成執(zhí)行計劃
- Stage:Presto執(zhí)行計劃分 為多個Stage,比如讀取數(shù)據(jù)、聚合數(shù)據(jù)等
- Exchange:用于連接不同的Stage,進(jìn)行數(shù)據(jù)交互
- Task:Stage由多個Task組成,每個Task分配到 一個Worker執(zhí)行
- Split:一個分片表示大的數(shù)據(jù)集合中的一個小子集,與MapReduce類似
- Page:Presto 中處理的最小數(shù)據(jù)單元
關(guān)于數(shù)據(jù)庫架構(gòu)設(shè)計:
- Shared Everthting:完全透明共享CPU/MEMORY/IO,并行處理能力是最差的
- Shared Storage:各個處理單元使用自己的私有CPU和Memory,共享磁盤系統(tǒng)
- Shared Nothing:各個處理單元都有自己私有的CPU/內(nèi)存/硬盤等
Presto屬于MPP架構(gòu)設(shè)計:

MPP架構(gòu)的優(yōu)缺點
- 易擴(kuò)容:可輕松通過擴(kuò)展機(jī)器節(jié)點(處理單元)擴(kuò)展整個系統(tǒng)的分布式存儲和計算能力
- 效率高:任務(wù)并行執(zhí)行能力強(qiáng),充分發(fā)揮本地計算的能力,數(shù)據(jù)無共享、無I/O沖突,無鎖資源競爭,計算速度快
- 短板效應(yīng):單個節(jié)點查詢效率慢會影響整個查詢
Presto安裝部署
官方文檔:
Presto的安裝方式有兩種,一是到官網(wǎng)下載編譯好的二進(jìn)制包進(jìn)行安裝,二是從Github倉庫上拉取源碼進(jìn)行編譯安裝。為了簡單起見,我這里選擇第一種方式,Server和Client都需要下載。
將下載的安裝包上傳到服務(wù)器上:
[root@hadoop ~]# cd /usr/local/src
[root@hadoop /usr/local/src]# ls
presto-server-0.243.2.tar.gz presto-cli-0.243.2-executable.jar
[root@hadoop /usr/local/src]#
解壓presto-server安裝包,并移動到合適的目錄下:
[root@hadoop /usr/local/src]# tar -zxvf presto-server-0.243.2.tar.gz
[root@hadoop /usr/local/src]# mv presto-server-0.243.2 /usr/local/presto-server
[root@hadoop /usr/local/src]# cd /usr/local/presto-server/
[root@hadoop /usr/local/presto-server]# ls
bin lib NOTICE plugin README.txt
[root@hadoop /usr/local/presto-server]#
配置presto-server:
[root@hadoop /usr/local/presto-server]# mkdir etc
[root@hadoop /usr/local/presto-server]# vim etc/config.properties
# 作為coordinator節(jié)點
coordinator=true
# 指定即是coordinator也是work節(jié)點
node-scheduler.include-coordinator=true
http-server.http.port=9090
# 是否使用內(nèi)嵌的discovery-server
discovery-server.enabled=true
discovery.uri=http://192.168.243.161:9090
[root@hadoop /usr/local/presto-server]# vim etc/node.properties # 每個節(jié)點的特殊配置
# presto集群的名稱
node.environment=presto_dev
# 當(dāng)前節(jié)點的id
node.id=ffffffff-ffff-ffff-ffff-ffffffffff01
# 節(jié)點的數(shù)據(jù)存儲目錄
node.data-dir=/data/presto
[root@hadoop /usr/local/presto-server]# vim etc/jvm.config # JVM相關(guān)配置
-server
-Xmx8G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
[root@hadoop /usr/local/presto-server]# vim etc/log.properties # 日志相關(guān)配置
com.facebook.presto=INFO
配置catalog的連接信息:
[root@hadoop /usr/local/presto-server]# mkdir etc/catalog
[root@hadoop /usr/local/presto-server]# vim etc/catalog/jmx.properties
connector.name=jmx
[root@hadoop /usr/local/presto-server]# vim etc/catalog/hive.properties
connector.name=hive-hadoop2
hive.metastore.uri=thrift://192.168.243.161:9083
hive.config.resources=/usr/local/hadoop-2.8.5/etc/hadoop/hdfs-site.xml,/usr/local/hadoop-2.8.5/etc/hadoop/core-site.xml
hive.allow-drop-table=false
完成以上的配置后,啟動presto-server:
[root@hadoop /usr/local/presto-server]# bin/launcher run
...
2020-11-16T16:55:35.776+0800 INFO main com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========
以上這種啟動方式是前臺啟動,后臺啟動的方式如下:
[root@hadoop /usr/local/presto-server]# bin/launcher start
Started as 5908
[root@hadoop /usr/local/presto-server]#
檢查presto-server進(jìn)程是否正常:
[root@hadoop /usr/local/presto-server]# jps |grep -i presto
5908 PrestoServer
[root@hadoop /usr/local/presto-server]# netstat -lntp |grep 5908
tcp6 0 0 :::39225 :::* LISTEN 5908/java
tcp6 0 0 :::42622 :::* LISTEN 5908/java
tcp6 0 0 :::9090 :::* LISTEN 5908/java
tcp6 0 0 :::36714 :::* LISTEN 5908/java
tcp6 0 0 :::45066 :::* LISTEN 5908/java
tcp6 0 0 :::32982 :::* LISTEN 5908/java
[root@hadoop /usr/local/presto-server]#
將presto-client的jar包移動到bin目錄下:
[root@hadoop /usr/local/presto-server]# mv /usr/local/src/presto-cli-0.243.2-executable.jar bin/presto-cli.jar
[root@hadoop /usr/local/presto-server]# chmod a+x bin/presto-cli.jar
使用presto-client連接presto-server,進(jìn)入到交互式終端,測試下能否正常查詢Hive中的數(shù)據(jù):
[root@hadoop /usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
presto> show catalogs;
Catalog
---------
hive
jmx
system
(3 rows)
Query 20201116_091555_00001_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [0 rows, 0B] [0 rows/s, 0B/s]
presto> show schemas;
Schema
--------------------
db01
default
information_schema
(3 rows)
Query 20201116_091557_00002_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [3 rows, 44B] [16 rows/s, 243B/s]
presto> use db01;
USE
presto:db01> show tables;
Table
----------
log_dev
log_dev2
(2 rows)
Query 20201116_091652_00004_cus94, FINISHED, 1 node
Splits: 19 total, 19 done (100.00%)
0:00 [2 rows, 43B] [5 rows/s, 117B/s]
presto:db01> select * from log_dev;
id | name | create_time | creator | info
----+----------+-------------+---------+----------------
4 | 更新用戶 | 1554189515 | yarn | 更新用戶 test3
6 | 創(chuàng)建用戶 | 1554299345 | yarn | 創(chuàng)建用戶 test5
(2 rows)
Query 20201116_091705_00005_cus94, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [2 rows/s, 84B/s]
presto:db01>
presto-server提供了ui界面,可以在該界面上查看一些監(jiān)控信息。使用瀏覽器訪問9090端口:

點擊Query ID可以進(jìn)入Query Detail頁面查看該Query的詳細(xì)信息:

往下拉可以查看Stages和Task信息:

點擊“Live Plan”可以查看執(zhí)行計劃:

通過Jdbc操作Presto
在上一小節(jié)中,簡單演示了使用presto-client操作presto-server,本小節(jié)則演示下如何通過編寫代碼以JDBC的方式操作presto-server。首先,創(chuàng)建Maven項目,pom文件的內(nèi)容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>presto-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-jdbc</artifactId>
<version>0.243.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
編寫JDBC代碼如下:
package com.example.presto.demo;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
/**
* 使用JDBC操作Presto
*
* @author 01
* @date 2020-11-16
**/
public class JdbcTest {
public static void main(String[] args) throws Exception {
Class.forName("com.facebook.presto.jdbc.PrestoDriver");
Connection connection = DriverManager.getConnection(
"jdbc:presto://192.168.243.161:9090/hive/db01",
"root", null
);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from log_dev");
while (resultSet.next()) {
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.print(resultSet.getString(i) + "\t");
}
System.out.println();
}
resultSet.close();
connection.close();
}
}
執(zhí)行結(jié)果如下:

Presto UDF開發(fā)之Scalar函數(shù)
與Hive和Spark SQL一樣,Presto也支持用戶自定義函數(shù)(UDF)。Presto UDF:
- 在Presto中,函數(shù)大體分為三種:scalar、aggregation和window
- Scalar就是標(biāo)量函數(shù),簡單來說就是Java中的一個靜態(tài)方法,本身沒有任何狀態(tài)
- Aggregation函數(shù),就是需要累積狀態(tài)的函數(shù),例如COUNT、AVG等
Scalar函數(shù)的開發(fā)步驟:
- 定義一個Java類,用
@ScalarFunction注解標(biāo)記實現(xiàn)業(yè)務(wù)邏輯的靜態(tài)方法 - 使用
@Description描述函數(shù)的作用,這里的內(nèi)容會在SHOW FUNCTIONS中顯示 - 使用
@SqlType標(biāo)記函數(shù)的返回值類型
在pom文件中,添加如下依賴:
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<version>0.243</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
開發(fā)一個scalar類型函數(shù),實現(xiàn)為字符串添加一個前綴,代碼示例:
package com.example.presto.demo.udf;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
public class PrefixFunction {
/**
* 為字符串添加一個前綴
* presto中沒有String類型,使用Slice代替
*/
@ScalarFunction("Prefix")
@Description("prefix string")
@SqlType(StandardTypes.VARCHAR)
public static Slice prefix(@SqlType(StandardTypes.VARCHAR) Slice value) {
return Slices.utf8Slice("presto_udf_" + value.toStringUtf8());
}
}
scalar類型函數(shù)支持傳入多個值,例如可以實現(xiàn)一個根據(jù)傳入的數(shù)據(jù)生成json字符串的函數(shù),代碼示例:
package com.example.presto.demo.udf;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.spi.function.Description;
import com.facebook.presto.spi.function.ScalarFunction;
import com.facebook.presto.spi.function.SqlNullable;
import com.facebook.presto.spi.function.SqlType;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
public class GenJson {
/**
* 根據(jù)傳入的數(shù)據(jù)生成json字符串
*/
@ScalarFunction("GenJson")
@Description("gen json string")
@SqlType(StandardTypes.VARCHAR)
public static Slice genJson(@SqlType(StandardTypes.VARCHAR) Slice key,
@SqlType(StandardTypes.VARCHAR) Slice value) {
return Slices.utf8Slice(
String.format("{\"%s\":\"%s\"}", key.toStringUtf8(),
value == null ? "" : value.toStringUtf8())
);
}
}
編寫一個Plugin的實現(xiàn)類,在getFunctions方法中添加我們開發(fā)的UDF函數(shù)。代碼如下:
package com.example.presto.demo.udf;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
public class ExampleFunctionsPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions() {
return ImmutableSet.<Class<?>>builder()
.add(PrefixFunction.class)
.add(GenJson.class)
.build();
}
}
最后還需要在項目的resources目錄下創(chuàng)建如下目錄文件:

文件內(nèi)容如下:
com.example.presto.demo.udf.ExampleFunctionsPlugin
將項目編譯并打包上傳到服務(wù)器:
[root@hadoop ~/jars]# ls
presto-test-1.0-SNAPSHOT.jar
[root@hadoop ~/jars]#
將jar包拷貝到presto-server的plugin目錄下:
[root@hadoop ~]# mkdir /usr/local/presto-server/plugin/example-functions
[root@hadoop ~]# cp jars/presto-test-1.0-SNAPSHOT.jar /usr/local/presto-server/plugin/example-functions
[root@hadoop ~]# cp /usr/local/presto-server/plugin/hive-hadoop2/guava-26.0-jre.jar /usr/local/presto-server/plugin/example-functions # 項目中依賴了guava,所以需要一并拷貝
[root@hadoop ~]# ls /usr/local/presto-server/plugin/example-functions
guava-26.0-jre.jar presto-test-1.0-SNAPSHOT.jar
重啟presto-server:
[root@hadoop ~]# /usr/local/presto-server/bin/launcher restart
使用presto-cli進(jìn)入交互命令行,驗證一下我們開發(fā)的UDF函數(shù)是否生效:
[root@hadoop /usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
presto> use db01;
USE
presto:db01> select Prefix(name) from log_dev;
_col0
---------------------
presto_udf_更新用戶
presto_udf_創(chuàng)建用戶
(2 rows)
Query 20201116_121815_00002_upy9p, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [1 rows/s, 63B/s]
presto:db01> select GenJson(creator, name) from log_dev;
_col0
---------------------
{"yarn":"更新用戶"}
{"yarn":"創(chuàng)建用戶"}
(2 rows)
Query 20201116_121905_00003_upy9p, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:00 [2 rows, 84B] [8 rows/s, 336B/s]
presto:db01>
Presto UDF開發(fā)之Aggregation函數(shù)
Aggregation函數(shù)中的幾個概念:
-
input(state, data):針對每條數(shù)據(jù),執(zhí)行input函數(shù),在每個有數(shù)據(jù)的節(jié)點都會執(zhí)行,最終得到多個累積的狀態(tài)數(shù)據(jù) -
combine(state1, state2):將所有節(jié)點的狀態(tài)數(shù)據(jù)聚合起來,直至所有狀態(tài)數(shù)據(jù)被聚合成一個最終狀態(tài),即Aggregation函數(shù)的輸出結(jié)果 -
output(final_state, out):最終輸出結(jié)果到一個BlockBuilder
Aggregation函數(shù)的開發(fā)步驟:
- 定義一個Java類,用
@AggregationFunction標(biāo)記為Aggregation函數(shù) - 使用
@InputFunction、@CombineFunction、@OutputFunction分別標(biāo)記計算函數(shù)、合并結(jié)果函數(shù)和最終輸出函數(shù) - 實現(xiàn)相關(guān)函數(shù)邏輯
首先,定義一個接口,繼承AccumulatorState,聲明用于提供和獲取值的方法:
package com.example.presto.demo.udf;
import com.facebook.presto.spi.function.AccumulatorState;
import io.airlift.slice.Slice;
public interface StringValueState extends AccumulatorState {
Slice getStringValue();
void setStringValue(Slice value);
}
然后定義一個Java類,實現(xiàn)Aggregation函數(shù)的核心邏輯:
package com.example.presto.demo.udf;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.spi.function.*;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
/**
* Aggregation函數(shù) - 實現(xiàn)字符串連接功能
*
* @author 01
*/
@AggregationFunction("ConcatStr")
public class ConCatFunction {
@InputFunction
public static void input(StringValueState state,
@SqlType(StandardTypes.VARCHAR) Slice value) {
state.setStringValue(Slices.utf8Slice(
checkNull(state.getStringValue()) + "|" +
value.toStringUtf8()
));
}
@CombineFunction
public static void combine(StringValueState state,
StringValueState otherState) {
state.setStringValue(Slices.utf8Slice(
checkNull(state.getStringValue()) + "|" +
checkNull(otherState.getStringValue())
));
}
@OutputFunction(StandardTypes.VARCHAR)
public static void output(StringValueState state,
BlockBuilder blockBuilder) {
VarcharType.VARCHAR.writeSlice(blockBuilder, state.getStringValue());
}
private static String checkNull(Slice slice) {
return slice == null ? "" : slice.toStringUtf8();
}
}
然后還需要在ExampleFunctionsPlugin中添加該函數(shù):
public class ExampleFunctionsPlugin implements Plugin {
@Override
public Set<Class<?>> getFunctions() {
return ImmutableSet.<Class<?>>builder()
...
.add(ConCatFunction.class)
.build();
}
}
將項目編譯打包并上傳到服務(wù)器:
[root@hadoop ~]# ls jars/
presto-test-1.0-SNAPSHOT.jar
[root@hadoop ~]#
覆蓋之前的jar包:
[root@hadoop ~]# cp jars/presto-test-1.0-SNAPSHOT.jar /usr/local/presto-server/plugin/example-functions/
cp:是否覆蓋"/usr/local/presto-server/plugin/example-functions/presto-test-1.0-SNAPSHOT.jar"? yes
[root@hadoop ~]#
重啟presto-server:
[root@hadoop ~]# /usr/local/presto-server/bin/launcher restart
使用presto-cli進(jìn)入交互命令行,驗證一下我們開發(fā)的UDF函數(shù)是否生效:
[root@hadoop /usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
presto> use db01;
USE
presto:db01> select ConcatStr(creator) from log_dev2;
_col0
---------------------------------
||hdfs|yarn|hdfs|yarn|hdfs|yarn
(1 row)
Query 20201116_124714_00001_inrgm, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:01 [6 rows, 825B] [4 rows/s, 571B/s]
presto:db01>
Presto EventListener開發(fā)
Event Listener是Presto提供的事件監(jiān)聽機(jī)制,我們可以通過開發(fā)自己的Event Listener來監(jiān)聽Presto中發(fā)生的一些事件,例如建立查詢、查詢成功/失敗等事件。總體來說,Event Listener有點類似于Hive中的Hook。Presto提供了三種Event Listener:
- Query Creation:Presto查詢建立相關(guān)信息
- Query completion:查詢執(zhí)行相關(guān)信息,包含成功查詢的細(xì)節(jié)信息,失敗查詢的錯誤碼等信息
- Split completion:split執(zhí)行信息,同理包含成功和失敗的細(xì)節(jié)信息
Event Listener的開發(fā)步驟:
- 實現(xiàn)Presto的
EventListener和EventListenerFactory接口 - 基于服務(wù)提供者接口(SPI)正確的打包我們的jar
- 部署,放到Presto指定目錄,修改配置文件并重啟服務(wù)
接下來演示一下開發(fā)一個EventListener,實現(xiàn)監(jiān)聽事件并將事件信息寫入日志文件。首先,編寫EventListener的實現(xiàn)類,核心邏輯都在該類中。代碼如下:
package com.example.presto.demo.eventlistener;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.QueryCompletedEvent;
import com.facebook.presto.spi.eventlistener.QueryCreatedEvent;
import com.facebook.presto.spi.eventlistener.SplitCompletedEvent;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
public class QueryEventListener implements EventListener {
private final String logPath;
public QueryEventListener(Map<String, String> config) {
logPath = config.get("log.path");
System.out.println(logPath);
}
/**
* 監(jiān)聽創(chuàng)建查詢事件
*/
@Override
public void queryCreated(QueryCreatedEvent queryCreatedEvent) {
String queryId = queryCreatedEvent.getMetadata().getQueryId();
String query = queryCreatedEvent.getMetadata().getQuery();
String user = queryCreatedEvent.getContext().getUser();
String fileName = logPath + File.separator + queryId;
File logFile = new File(fileName);
if (!logFile.exists()) {
try {
boolean result = logFile.createNewFile();
System.out.println(result);
} catch (IOException e) {
e.printStackTrace();
}
}
try (FileWriter fw = new FileWriter(fileName, true)) {
fw.append(String.format("User:%s Id:%s Query:%s%n", user, queryId, query));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 監(jiān)聽查詢完成事件
*/
@Override
public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {
String queryId = queryCompletedEvent.getMetadata().getQueryId();
long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();
long endTime = queryCompletedEvent.getEndTime().toEpochMilli();
long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();
String queryState = queryCompletedEvent.getMetadata().getQueryState();
queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -> {
int errCode = queryFailureInfo.getErrorCode().getCode();
String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();
String failureHost = queryFailureInfo.getFailureHost().orElse("");
String failureMessage = queryFailureInfo.getFailureMessage().orElse("");
});
String fileName = logPath + File.separator + queryId;
try (FileWriter fw = new FileWriter(fileName, true)) {
fw.append(String.format("Id:%s StartTime:%s EndTime:%s State:%s%n",
queryId, createTime, endTime, queryState));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 監(jiān)聽split完成事件
*/
@Override
public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {
long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();
long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MAX).toEpochMilli();
String queryId = splitCompletedEvent.getQueryId();
String stageId = splitCompletedEvent.getStageId();
String taskId = splitCompletedEvent.getTaskId();
String fileName = logPath + File.separator + queryId;
try (FileWriter fw = new FileWriter(fileName, true)) {
fw.append(String.format("Id:%s StartTime:%s EndTime:%s StageId:%s TaskId:%s%n",
queryId, createTime, endTime, stageId, taskId));
} catch (IOException e) {
e.printStackTrace();
}
}
}
然后編寫一個工廠類實現(xiàn)EventListenerFactory接口,用于創(chuàng)建我們自定義的QueryEventListener:
package com.example.presto.demo.eventlistener;
import com.facebook.presto.spi.eventlistener.EventListener;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Map;
public class QueryEventListenerFactory implements EventListenerFactory {
@Override
public String getName() {
// EventListener的名稱
return "query-event-listener";
}
@Override
public EventListener create(Map<String, String> config) {
if (!config.containsKey("log.path")) {
throw new RuntimeException("missing log.path conf");
}
return new QueryEventListener(config);
}
}
編寫Plugin的實現(xiàn)類,在getEventListenerFactories方法中添加我們自定義的EventListener創(chuàng)建工廠:
package com.example.presto.demo.eventlistener;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.eventlistener.EventListenerFactory;
import java.util.Collections;
public class QueryEventPlugin implements Plugin {
@Override
public Iterable<EventListenerFactory> getEventListenerFactories() {
QueryEventListenerFactory queryEventListenerFactory = new QueryEventListenerFactory();
return Collections.singletonList(queryEventListenerFactory);
}
}
最后還需要在com.facebook.presto.spi.Plugin文件中,添加QueryEventPlugin類的包路徑:
com.example.presto.demo.eventlistener.QueryEventPlugin
將項目編譯打包并上傳到服務(wù)器:
[root@hadoop ~]# ls jars/
presto-test-1.0-SNAPSHOT.jar
[root@hadoop ~]#
將jar包拷貝到presto-server的plugin目錄下:
[root@hadoop ~]# mkdir /usr/local/presto-server/plugin/event-listener
[root@hadoop ~]# cp jars/presto-test-1.0-SNAPSHOT.jar /usr/local/presto-server/plugin/event-listener
[root@hadoop ~]# cp /usr/local/presto-server/plugin/hive-hadoop2/guava-26.0-jre.jar /usr/local/presto-server/plugin/event-listener # 項目中依賴了guava,所以需要一并拷貝
[root@hadoop ~]# ls /usr/local/presto-server/plugin/event-listener
guava-26.0-jre.jar presto-test-1.0-SNAPSHOT.jar
刪除example-functions目錄,否則會在啟動presto-server時因為重復(fù)注冊UDF而報錯:
[root@hadoop ~]# rm -rf /usr/local/presto-server/plugin/example-functions/
然后還需要配置一下presto的event-listener:
[root@hadoop ~]# vim /usr/local/presto-server/etc/event-listener.properties
event-listener.name=query-event-listener
log.path=/data/presto/log
[root@hadoop ~]# mkdir -p /data/presto/log
重啟presto-server:
[root@hadoop ~]# /usr/local/presto-server/bin/launcher restart
使用presto-cli進(jìn)入交互命令行,隨便執(zhí)行一些查詢語句:
[root@hadoop /usr/local/presto-server]# bin/presto-cli.jar --server localhost:9090 --catalog hive --user root
presto> use db01;
USE
presto:db01> select * from log_dev;
id | name | create_time | creator | info
----+----------+-------------+---------+----------------
4 | 更新用戶 | 1554189515 | yarn | 更新用戶 test3
6 | 創(chuàng)建用戶 | 1554299345 | yarn | 創(chuàng)建用戶 test5
(2 rows)
Query 20201116_132643_00001_tvyva, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
0:01 [2 rows, 84B] [1 rows/s, 58B/s]
presto:db01> select * from log_dev2 limit 1;
id | name | create_time | creator | info
----+----------+-------------+---------+---------------
1 | 創(chuàng)建用戶 | 1554099545 | hdfs | 創(chuàng)建用戶 test
(1 row)
Query 20201116_132652_00002_tvyva, FINISHED, 1 node
Splits: 18 total, 18 done (100.00%)
0:00 [1 rows, 825B] [3 rows/s, 2.48KB/s]
presto:db01>
然后驗證一下我們開發(fā)的EventListener是否生效,查看是否有記錄相應(yīng)的事件日志信息即可:
[root@hadoop ~]# ls /data/presto/log/
20201116_132435_00000_tvyva 20201116_132643_00001_tvyva 20201116_132652_00002_tvyva
[root@hadoop ~]# cat /data/presto/log/20201116_132435_00000_tvyva
User:root Id:20201116_132435_00000_tvyva Query:use db01
Id:20201116_132435_00000_tvyva StartTime:1605533075986 EndTime:1605533076000 State:FINISHED
[root@hadoop ~]# cat /data/presto/log/20201116_132643_00001_tvyva
User:root Id:20201116_132643_00001_tvyva Query:select * from log_dev
Id:20201116_132643_00001_tvyva StartTime:1605533204999 EndTime:1605533205193 StageId:20201116_132643_00001_tvyva.1 TaskId:0
...
Id:20201116_132643_00001_tvyva StartTime:1605533203889 EndTime:1605533205297 State:FINISHED
[root@hadoop ~]# cat /data/presto/log/20201116_132652_00002_tvyva
User:root Id:20201116_132652_00002_tvyva Query:select * from log_dev2 limit 1
Id:20201116_132652_00002_tvyva StartTime:1605533212541 EndTime:1605533212644 StageId:20201116_132652_00002_tvyva.1 TaskId:0
...
Id:20201116_132652_00002_tvyva StartTime:1605533212413 EndTime:1605533212688 State:FINISHED
[root@hadoop ~]#
Presto配置優(yōu)化
Presto架構(gòu):
- Presto采用典型的Master - Slave架構(gòu)模型
- Coordinator和Worker依賴Discovery Server進(jìn)行相互通信
- Coordinator和DiscoveryServer在設(shè)計上是單點的,存在單點問題
Presto高可用方案之綁定虛擬IP:

Presto高可用方案之獨立部署Discovery Server:

Presto內(nèi)存模型:
- Presto采用邏輯的內(nèi)存池,來管理不同類型的內(nèi)存需求
- Presto把整個內(nèi)存劃分成三個內(nèi)存池,分別是System Pool,ReservedPool,General Pool
- Presto 0.201+版本之后,默認(rèn)不啟用SystemPool,只保留ReservedPool和General Pool
- System Pool是用來保留給系統(tǒng)使用的,默認(rèn)為40%的內(nèi)存空間留給系統(tǒng)使用,0.201+版本,SystemPool合并到GeneralPool
- Reserved Pool和General Pool用來分配query運行時內(nèi)存
- 大部分的query使用General Pool,當(dāng)General Pool滿了之后,將使用內(nèi)存最大的SQL放到Reserved Pool執(zhí)行
Presto內(nèi)存管理:
- Query內(nèi)存管理:query劃分成很多task,每個task會有一個線程循環(huán)獲取task的狀態(tài),包括task所用內(nèi)存。匯總成query所用內(nèi)存
- 機(jī)器內(nèi)存管理:Coordinator有一個線程,定時的輪詢每臺機(jī)器,查看當(dāng)前的機(jī)器內(nèi)存狀態(tài)
Presto通過兩點判斷集群是否達(dá)到了內(nèi)存的上限:
- General Pool出現(xiàn)阻塞節(jié)點(Block node)
- Reserved Pool已經(jīng)被使用
通過設(shè)置query.low-memory-killer.policy配置參數(shù),可以指定kill查詢的策略。該參數(shù)取值:total-reservation-on-blocked-nodes(kill在阻塞節(jié)點上使用內(nèi)存最多的查詢)或者total-reservation(kill最耗費內(nèi)存的查詢)
在了解了Presto的內(nèi)存模型和內(nèi)存管理后,以下列舉一些在Presto中可以優(yōu)化的配置參數(shù):
-
query.max-memory:單個query在整個集群中允許占用的最大user memory -
query.max-total-memory:單個query在整個集群中允許占用的最大(user + system) memory -
query.max-memory-per-node:一個query在單個worker上允許的最大user memory,即ReservedPool,默認(rèn)為heapSize的0.1 -
query.max-total-memory-per-node:一個query在單個worker上允許的最大(user + system) memory
用戶查詢數(shù)據(jù)量/復(fù)雜性,決定了ReservedPool大?。挥脩舨樵儾l(fā)度,決定了jvm heapSize的大小