Presto:分布式 SQL 查询引擎
Presto 是什么?
Presto™ (PrestoDB™) is an open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes.
Presto™ (PrestoSQL™, a.k.a. Trino™) is a high performance, distributed SQL query engine for big data.
基本概念
组件
Coordinator
负责管理 Worker 和 MetaStore 节点,以及接受客户端查询请求,并进行 SQL 的语法解析(Parser)、执行计划生成与优化(Planner)和查询任务的调度(Scheduler)
Worker
负责具体的查询计算和数据读写
Discovery Server
负责发现集群的各个节点,用于节点间心跳监控
数据源
Connector
负责访问不同的数据源,相当于访问数据库的驱动
Catalog
负责记录 Schema 信息和 DataSource 的引用。Presto 中一个完整的表名通过 <Catalog>.<Schema>.<Table>
组合表示。例如 hive.test_data.test
,则表示:
- Catalog 为
hive
- Schema 为
test_data
- Table 为
test
Schema
一种组织 Table 的方式
Table
等同于关系型数据库中表的概念
查询模型
Statement
兼容 ANSI 标准的 SQL 字符串
Query
当 Presto 解析一条 SQL 语句时,会将其转换为 Query,并创建一个分布式 Query 执行计划
Stage
当 Presto 执行查询时,会进一步分为多个 Stage 阶段来执行
Task
Stage 包含了一系列的 Task,而 Task 才是真正在 Worker 之上被执行的逻辑
Split
Split 主要是为了拆分大规模数据集,以便 Task 在此之上执行
Driver
Driver 是一系列运算实例,可以理解为是内存中的一组物理运算符
Operator
Operator 可以消费(Consume)、转换(Transform)和生产(Produce)数据。例如,一个 Table Scan 从一个 Connector 中 fetch 数据,并生产数据以供给 Operator 消费
Exchange
Exchanage 负责在 Presto 的节点之间,传输一个 Query 的不同 Stage 的数据。Task 可以生产数据到一个 output 缓存区,也可以通过 Exchange 客户端消费其他 Task 生产的数据
优缺点
优势
- Ad Hoc(即席查询,秒级或分钟级的查询响应)
- 比 Hive 快 10x 倍
- 支持多种数据源(Hive、Druid、Kafka、MySQL、MongoDB、Redis、JMX、ORC 等)
- Client 支持多种编程语言(Java、Python、Ruby、PHP、Node.js 等)
- 支持 JDBC / ODBC 连接
- 支持 Kerberos 认证
- 支持查询 LZO 压缩的数据
- ANSI SQL(窗口函数、Join、聚合、复杂查询等)
劣势
- 不支持 SQL 的隐式类型转换,而 Hive 支持
- 不支持容错
查询请求会分发到多个 Worker 上,当任意一个 Worker 执行失败,Master 会感知到,并认为整个查询请求失败了。并且 Presto 并没有重试机制,所以需要业务端完成重试
架构
架构图
SQL 执行流程图
Connector 交互图
交互时序图
sequenceDiagram participant Client participant Coordinator participant Worker participant Connector participant Discovery Server Client ->>+ Coordinator : query Coordinator ->>+ Worker : choose workers Worker ->>- Coordinator : return worker list Coordinator ->>+ Worker : send task Worker ->>+ Connector : load data Connector ->>- Worker : return data Worker ->> Worker : execute task Worker ->>- Coordinator : return result Coordinator ->>- Client : return result loop regularly Coordinator -->> Discovery Server : heart beat Worker -->> Discovery Server : heart beat end
内存管理
Service Discovery
Presto 并不是由 Worker 主动发送心跳,而是 Discovery Server 定时探测节点是否存活。内部基于 Airlift 框架来实现服务发现,通过 HTTP 协议进行通讯。在 etc/config.properties
文件中配置的 discovery.uri
参数,会透传给 Airlift 框架。如果,我们将 discovery.uri
参数,设置为 http://127.0.0.1:9999
,则可以通过访问 http://127.0.0.1:9999/v1/service
地址,获取到所有注册的服务(包括服务类型、ID、通讯地址、服务所在的节点等信息),具体返回内容如下:
1 | { |
更加具体地来说,是在 HeartbeatFailureDetector
类中启动了一个执行周期为 5s 的定时任务,不断地调用 updateMonitoredServices
方法,来更新集群的服务状态。另外,DiscoveryNodeManager
类中也会启动一个执行周期为 5s 的定时任务,不断地调用 pollWorkers
方法,来检查各个节点的状态。Node 的状态主要分为 active、inactive、shuttingDown 三种,以集合的形式保存在了 AllNodes 类中。后续再选择 Worker 的时候会判断是否存活,并通过 AllNodes#getActiveNodes 方法获取到 active 状态的 Node 集合。另外,我们可以访问 http://localhost:9999/v1/info/state
地址,来检查节点是否处于 active 状态。如果节点存活,则会返回 "ACTIVE"
字符串
MPP
Presto 采用 MPP(Massively Parallel Processing)大规模并行处理架构,来解决大量数据分析的场景。该架构的主要特征,如下:
- 任务并行执行
- 分布式计算
- Shared Nothing
- 横向扩展
- 数据分布式存储(本地化)
SPI
Presto 采用 SPI(Service Provider Interface)服务提供发现机制,来插件化地支持多种数据源,以实现联邦查询(Federation Query,指能够通过一条 SQL 查询,对处于完全不同的系统中的不同的数据库和模式,进行引用和使用)
Servlet
graph TD Load(fa:fa-spinner Load) Construct(fa:fa-puzzle-piece Construct) PostConstruct(fa:fa-at PostConstruct) Init(fa:fa-cog Init) Service(fa:fa-database Service) Destroy(fa:fa-times Destroy) PreDestroy(fa:fa-at PreDestroy) Unload(fa:fa-bomb Unload) Load ==> Construct Construct ==> PostConstruct PostConstruct ==> Init Init ==> Service Service ==> Destroy Destroy ==> PreDestroy PreDestroy ==> Unload style PostConstruct fill:#0099FF style PreDestroy fill:#0099FF
比对
Presto vs Apache Hive
优势比较
Presto | Apache Hive | |
---|---|---|
场景特征 | 交互式查询 | 高吞吐 |
Join | 一个较大的事实表 + 多个较小的维度表 | 事实表 + 事实表 |
窗口函数 | 支持 | 支持 |
SQL 标准化 | ANSI SQL | HiveQL |
架构比较
Presto vs Amazon Athena
本质上,Amazon Athena(雅典娜)是一款完全支持标准 SQL 的 Presto
PrestoDB vs Trino
PrestoDB | Trino | |
---|---|---|
研发主力 | Martin、Dain 和 David | |
通讯模式 | 支持 RESTful 和二进制 | 仅支持 RESTful |
查询下推 | 支持 | 支持 |
Connector 数量 | 28 | 31 |
技术输出 | 博客 | 博客 + 视频 + 书籍 |
Slack 渠道 | ||
代码质量 | ||
Contributor 数量 | ||
待解决 Issues 数量 | ||
活跃 PR 数量 | ||
Commit 数量 |
部署
单机版
下载
1 | $ wget https://repo1.maven.org/maven2/io/trino/trino-server/354/trino-server-354.tar.gz |
配置
1 | $ cd trino |
1 | $ vim catalog/jmx.properties |
1 | connector.name=jmx |
1 | $ vim config.properties |
1 | coordinator=true |
1 | $ vim jvm.config |
1 | -server |
1 | $ vim node.properties |
1 | node.environment=trino |
1 | $ vim launcher |
1 | export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk-11.0.10.jdk/Contents/Home |
启动
1 | $ cd .. |
查看运行状态
1 | $ bin/launcher status |
查看日志
1 | # 启动日志 |
客户端连接
1 | $ wget https://repo1.maven.org/maven2/io/trino/trino-cli/354/trino-cli-354-executable.jar -O bin/trino |
可视化
开启 Debug 模式
1 | $ vim etc/jvm.config |
1 | -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005 |
1 | $ bin/launcher restart |
1 | Stopped 19181 |
Docker 容器版
下载
1 | $ docker pull trinodb/trino |
启动
1 | $ docker run -p 8080:8080 --name trino trinodb/trino |
客户端连接
1 | $ docker exec -it trino trino --catalog tpch --schema sf1 |
查询
1 | trino:sf1> show tables; |
1 | Table |
1 | trino:sf1> select * from customer limit 3; |
1 | custkey | name | address | nationkey | phone | acctbal | mktsegment | |
Kubernetes 集群版
详见,我的另一篇博客:Helm 实战
实战
Example HTTP Connector
配置 Catalog
1 | $ vim etc/catalog/example-http.properties |
1 | connector.name=example-http |
重启 Presto
1 | $ bin/launcher restart |
下载客户端
1 | $ wget -c https://repo1.maven.org/maven2/io/prestosql/presto-cli/345/presto-cli-345-executable.jar -O bin/presto |
启动客户端
1 | $ bin/presto --server localhost:9999 |
显示 Catalog
1 | $ presto> show catalogs; |
1 | Catalog |
显示 Schema
1 | $ presto> show schemas from "example-http"; |
1 | Schema |
切换 Schema
1 | $ presto> use "example-http".example; |
1 | USE |
显示所有表
1 | presto:example> show tables; |
1 | Table |
查询
1 | presto:example> select * from numbers; |
1 | text | value |
Apache Druid Connector
配置 Catalog
1 | $ vim etc/catalog/druid.properties |
1 | connector.name=druid |
开放 Broker 端口
1 | $ kill `ps -ef | grep 8082 | grep -v grep | awk '{print $2}'`; export POD_NAME=$(kubectl get pods --namespace default -l "app=druid,release=`helm list | grep druid- | awk '{print $1}'`" | grep broker | awk '{print $1}') ; nohup kubectl port-forward $POD_NAME 8082:8082 --address 0.0.0.0 2>&1 & |
重启 Presto
1 | $ bin/launcher restart |
下载客户端
1 | $ wget -c https://repo1.maven.org/maven2/io/prestosql/presto-cli/345/presto-cli-345-executable.jar -O bin/presto |
启动客户端
1 | $ bin/presto --catalog druid --server localhost:9999 --schema default |
显示 Schema
1 | $ presto:default> show schemas; |
1 | Schema |
切换数据库
1 | $ presto:default> use druid; |
1 | USE |
显示所有表
1 | presto:druid> show tables; |
1 | Table |
查询
1 | presto:druid> select * from wikipedia where "cityName" != '' order by "__time" desc limit 3; |
1 | __time | channel | cityname | comment |
表结构
1 | presto:druid> describe wikipedia; |
1 | Column | Type | Extra | Comment |
Explain
1 | presto:druid> explain select * from wikipedia where "cityName" != '' order by "__time" desc limit 3; |
1 | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
Explain Anaylze
1 | presto:druid> explain analyze select * from wikipedia where "cityName" != '' order by "__time" desc limit 3; |
1 | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
源码分析
编译
1 | $ java -version |
1 | java version "11.0.9" 2020-10-20 LTS |
1 | $ git clone --depth 1 --single-branch --branch master https://github.com/prestosql/presto/ |
启动流程
sequenceDiagram participant PrestoServer participant Platform participant Server participant Logger participant ImmutableList participant ImmutableList.Builder participant Bootstrap participant ConfigurationLoader participant Injector participant LifeCycleManager participant StaticCatalogStore participant PluginManager participant ConfigurationFactory participant Elements participant RecordingBinder participant Module participant System participant AbstractConfigurationAwareModule participant ServerConfig participant ServerMainModule participant CoordinatorModule participant WorkerModule participant LifeCycleModule participant ConfigurationModule participant Guice PrestoServer ->> PrestoServer : main opt check the version of java PrestoServer ->>+ Platform : nullToEmpty Platform ->>- PrestoServer : java.version alt java.version < 11 PrestoServer ->> System : exit(100) end end PrestoServer ->>+ Server : new Server ->>- PrestoServer : Server PrestoServer ->> Server : start Server ->> Server : doStart Server ->>+ ImmutableList : builder ImmutableList ->>- Server : ImmutableList.Builder opt init modules Server ->>+ ServerMainModule : new ServerMainModule ->>- Server : ServerMainModule end Server ->> ImmutableList.Builder : add Server ->> ImmutableList.Builder : build ImmutableList.Builder ->> Server : ImmutableList Server ->> Bootstrap : new Bootstrap ->> Server : Bootstrap Bootstrap ->>+ Bootstrap : initialize opt config Bootstrap ->>+ ConfigurationLoader : loadPropertiesFrom(configFile) ConfigurationLoader ->>- Bootstrap : MaprequiredProperties Bootstrap ->>+ ConfigurationLoader : getSystemProperties ConfigurationLoader ->>+ System : getProperties System ->>- ConfigurationLoader : Properties ConfigurationLoader ->>- Bootstrap : Map systemProperties Bootstrap ->>+ ConfigurationFactory : new ConfigurationFactory ->>- Bootstrap : ConfigurationFactory end opt install module by airlift ConfigurationFactory ->> ConfigurationFactory : registerConfigurationClasses ConfigurationFactory ->>+ Elements : getElements(modules) Elements ->>+ RecordingBinder : new RecordingBinder ->>- Elements : RecordingBinder Elements ->> RecordingBinder : install(module) opt setup modules, for example, ServerMainModule Elements ->> Module : configure(binder) Module ->> AbstractConfigurationAwareModule : setup AbstractConfigurationAwareModule ->> ServerMainModule : setup ServerMainModule ->>+ AbstractConfigurationAwareModule : buildConfigObject AbstractConfigurationAwareModule ->>- ServerMainModule : ServerConfig ServerMainModule ->>+ ServerConfig : isCoordinator ServerConfig ->>- ServerMainModule : boolean alt true ServerMainModule ->>+ CoordinatorModule : new CoordinatorModule ->>+ ServerMainModule : CoordinatorModule else false ServerMainModule ->>+ WorkerModule : new WorkerModule ->>+ ServerMainModule : WorkerModule end ServerMainModule ->> AbstractConfigurationAwareModule : install end Elements ->>+ RecordingBinder : binder.elements RecordingBinder ->>- Elements : List Elements ->>- ConfigurationFactory : List ConfigurationFactory ->> ConfigurationFactory : validateRegisteredConfigurationProvider end opt system modules Bootstrap ->>+ LifeCycleModule : new LifeCycleModule ->>- Bootstrap : LifeCycleModule Bootstrap ->>+ ConfigurationModule : new ConfigurationModule ->>- Bootstrap : ConfigurationModule end opt create the injector Bootstrap ->>+ Guice : createInjector Guice ->>- Bootstrap : Injector end opt create the life-cycle manager and start it Bootstrap ->>+ Injector : getInstance Injector ->>- Bootstrap : LifeCycleManager Bootstrap ->> LifeCycleManager : start end Bootstrap ->>- Bootstrap : Injector opt load resources Server ->>+ Injector : getInstance(PluginManager.class) Injector ->>- Server : PluginManager Server ->> PluginManager : loadPlugins Server ->>+ Injector : getInstance(StaticCatalogStore.class) Injector ->>- Server : StaticCatalogStore Server ->> StaticCatalogStore : loadCatalogs end Server ->> Logger : "======== SERVER STARTED ========"
请求链路
sequenceDiagram participant Client participant QueuedStatementResource participant QueuedStatementResource.Query participant QueuedStatementResource#queries participant DispatchManager participant QueuedStatementResource.Query participant ExecutingStatementResource participant ExecutingStatementResource#queries participant io.prestosql.server.protocol.Query participant Futures opt send query to queue Client ->>+ QueuedStatementResource : POST /v1/statement QueuedStatementResource ->> QueuedStatementResource : postStatement QueuedStatementResource ->>+ QueuedStatementResource.Query : new QueuedStatementResource.Query ->>+ DispatchManager : createQueryId DispatchManager ->>- QueuedStatementResource.Query : QueryId QueuedStatementResource.Query ->>- QueuedStatementResource : QueuedStatementResource.Query QueuedStatementResource ->> QueuedStatementResource#queries : put with queryId QueuedStatementResource ->>+ QueuedStatementResource.Query : getQueryResults QueuedStatementResource.Query ->>+ QueuedStatementResource.Query : createQueryResults QueuedStatementResource.Query ->>+ QueuedStatementResource.Query : getNextUri QueuedStatementResource.Query ->>- QueuedStatementResource.Query : URI (http://localhost:9999/v1/statement/queued/20200906_093843_00041_pg9xm/y3f077eadf2dca6425d56173bac9afea11161cdd0/1) QueuedStatementResource.Query ->>- QueuedStatementResource.Query : QueryResults QueuedStatementResource.Query ->>- QueuedStatementResource : QueryResults QueuedStatementResource ->>+ Response : ok Response ->>- QueuedStatementResource : Response QueuedStatementResource ->>- Client : Response end opt get query from queue and dispatch it Client ->> QueuedStatementResource : GET /v1/statement/queued/{queryId}/{slug}/{token} QueuedStatementResource ->>+ QueuedStatementResource : getQuery QueuedStatementResource ->>+ QueuedStatementResource#queries : get by queryId QueuedStatementResource#queries ->>+ QueuedStatementResource : QueuedStatementResource.Query QueuedStatementResource ->>- QueuedStatementResource : QueuedStatementResource.Query opt wait for query to be dispatched, up to the wait timeout QueuedStatementResource ->>+ QueuedStatementResource.Query : waitForDispatched QueuedStatementResource.Query ->>+ DispatchManager : waitForDispatched DispatchManager ->>- QueuedStatementResource.Query : ListenableFuture QueuedStatementResource.Query ->>- QueuedStatementResource : ListenableFuture end opt when state changes, fetch the next result QueuedStatementResource ->> QueuedStatementResource.Query : getQueryResults QueuedStatementResource.Query ->> QueuedStatementResource.Query : createQueryResults QueuedStatementResource.Query ->>+ QueuedStatementResource.Query : getNextUri QueuedStatementResource.Query ->>+ QueuedStatementResource.Query : getRedirectUri QueuedStatementResource.Query ->>- QueuedStatementResource.Query : URI (http://localhost:9999/v1/statement/executing/20200906_093843_00041_pg9xm/y714bd6e0605f3fdf63d1aac8d10aa92f4c81cf23/0) QueuedStatementResource.Query ->>- QueuedStatementResource.Query : URI QueuedStatementResource.Query ->> QueuedStatementResource : QueryResults QueuedStatementResource ->>+ Futures : transform Futures ->>- QueuedStatementResource : ListenableFutureend opt transform to Response QueuedStatementResource ->>+ Response : ok Response ->>- QueuedStatementResource : Response QueuedStatementResource ->>+ Futures : transform Futures ->>- QueuedStatementResource : ListenableFuture end QueuedStatementResource ->> QueuedStatementResource : bindAsyncResponse QueuedStatementResource ->> Client : Response end opt execute Client ->>+ ExecutingStatementResource : GET /v1/statement/executing/{queryId}/{slug}/{token} opt get or recreate query ExecutingStatementResource ->> ExecutingStatementResource : getQueryResults ExecutingStatementResource ->>+ ExecutingStatementResource#queries : get by queryId alt exist ExecutingStatementResource#queries ->>- ExecutingStatementResource : io.prestosql.server.protocol.Query else not-exist ExecutingStatementResource ->>+ io.prestosql.server.protocol.Query : create io.prestosql.server.protocol.Query ->>- ExecutingStatementResource : io.prestosql.server.protocol.Query ExecutingStatementResource ->>+ ExecutingStatementResource#queries : computeIfAbsent ExecutingStatementResource#queries ->>- ExecutingStatementResource : io.prestosql.server.protocol.Query end end opt result ExecutingStatementResource ->> ExecutingStatementResource : asyncQueryResults ExecutingStatementResource ->>+ io.prestosql.server.protocol.Query : waitForResults io.prestosql.server.protocol.Query ->>- ExecutingStatementResource : ListenableFuture ExecutingStatementResource ->>+ ExecutingStatementResource : toResponse ExecutingStatementResource ->>- ExecutingStatementResource : Response ExecutingStatementResource ->>+ Futures : transform Futures ->>- ExecutingStatementResource : ListenableFuture ExecutingStatementResource ->> ExecutingStatementResource : bindAsyncResponse end end ExecutingStatementResource ->>- Client : Response
踩过的坑
不支持类型的隐式转换
描述
原本只支持数值类型之间的比较,例如 2 > 1
:
1 |
|
解决
增加以下方法,以支持字符串与数值之间的比较,例如 '2' > 1
:
1 |
|
compiler message file broken
描述
1 | [INFO] --- maven-compiler-plugin:3.8.0:testCompile (default-testCompile) @ presto-main --- |
解决
低版本 JDK11 的已知 Bug,升级至当前最新的版本(JDK11.0.9+7-LTS)后解决,详见:JDK-8212586
Terminating due to java.lang.OutOfMemoryError: Java heap space
描述
1 | $ vim etc/jvm.config |
1 | -server |
解决
- Dump 内存占用的情况
1 | $ vim etc/jvm.config |
1 | -XX:+HeapDumpOnOutOfMemoryError |
定位出内存 OOM 的原因
通过 MAT 工具分析发现是 System 的 ClassLoader 使用
java.util.zip.ZipFile$Source
加载 jar 包,并保存在堆内内存上,占用了 70% 多的内存资源。这是 JDK9 中做的一次改动,为了避免调用耗时的 JNI,以及 MMap 出现 Crash 的风险,详见:JDK-8145260提高分配的内存大小
1 | $ vim etc/jvm.config |
1 | -Xmx256M |
jmx.properties does not contain connector.name
解决
JMX Connector 的 connector.name
属于必填字段,给 jmx.properties 增加一行 connector.name=jmx
即可
1 | $ vim catalog/jmx.properties |
1 | connector.name=jmx |
社区发展
Star 趋势
个人贡献
详见:《如何成为 Apache 的 PMC》
资料
Github
Book
- Presto: The Definitive Guide