摘要
Apache Calcite是一个提供查询处理、优化和支持查询多种流行的开源数据处理系统如Hive、Storm、Flink、Druid和MapD语言支持的基础软件框架。Calcite的架构由模块化和可扩展的查询优化器组成,它具有数百个内置优化规则,一个能够处理各种查询语言的查询处理器,一个为了可扩展性实现的适配器架构,并且支持异构数据模型和存储(关系型,半结构型,流数据和地理空间)。这种灵活,可嵌入且可扩展的架构是让Calcite成为大数据框架采用的一个有吸引力的选择。这是一个活跃项目,不断支持新类型的数据源,查询语言和查询处理和优化的方法。
1. 介绍
在开创性的System R之后,传统的关系数据库引擎主宰了数据处理领域。然而,早在2005年,Stonebraker和Çetintemel 预测我们会看到一系列特别的引擎的增加,如列存储,流处理引擎,文本搜索引擎等等。他们认为这些专门的引擎可以提供更具成本效益的性能,并且它们将带来“通吃的”范式的终结。今天他们的愿景似乎比以往更加重要。事实上,许多专门的开源数据系统已经变得流行,如Storm和Flink(流处理),Elasticsearch(文本搜索),Apache Spark,Druid等。
随着组织投资于针对其特定需求量身定制的数据处理系统,出现了两个首要问题:
这些系统的开发人员遇到了相关的问题,例如查询优化或需要支持像SQL和相关扩展(如流查询)的查询语言,以及LINQ的语言集成查询 [33]。没有统一的框架,让多个工程师独立开发类似的优化逻辑和语言支持会浪费努力。
使用这些专用系统的程序员通常必须将其中的几个集成在一起。组织可能依赖Elasticsearch,Apache Spark和Druid。我们需要构建能够支持跨异构数据源的优化查询的系统。
Apache Calcite是为解决这些问题而开发的。它是一个完整的查询处理系统,提供许多常见功能 - 查询执行,优化和查询语言 - 任何数据库管理系统都需要,除了数据存储和管理,它们留给专用引擎。Hive,Drill,Storm和许多其他数据处理引擎很快采用了Calcite,其为它们提供了高级查询优化和查询语言。例如,Hive是一个建立在Apache Hadoop之上的流行数据仓库项目的。随着Hive从批处理流程转向交互式SQL查询响应平台,很明显该项目的核心需要强大的优化器。因此,Hive采用了Calcite作为其优化器,并且自那时起它们的集成一直在持续增长。许多其他项目和产品也纷纷效仿,包括Flink,MapD [12]等。
此外,Calcite通过向多个系统暴露一个通用接口实现了跨平台优化。为了保证效率,优化器需要全局推理,例如,在关于物化视图选择的不同系统中做出决策。建立一个共同框架并非没有挑战。特别是,框架需要是可扩展的,是足够灵活的,以适应不同类型的需要整合的系统。
我们相信以下特性有助于Calcite在开源社区和行业中的广泛采用:
开源友好。近20年许多主流数据处理平台或开源或基于开源。Calcite是一个Apache基金会孵化的开源的框架, 它提供了协作开发项目的手段。此外,该软件由Java编写,更易与许多最新的数据处理系统(它们也大都由Java编写)进行集成,尤其是那些在Hadoop生态系统里的。
多种数据模型。 Calcite提供对查询优化和使用流和传统数据处理范式的查询语言的支持。Calcite将流视为时序的记录或事件集合,这些记录或事件不像传统数据处理系统那样持久保存到磁盘。
灵活的查询优化器。 从规则到成本模型,优化器的每个组件都是可插拔可扩展的。此外,Calcite还支持多个计划引擎。因此,优化可以分解为由不同优化引擎处理的阶段,这取决于哪一个最适合该阶段。
跨系统支持。Calcite框架可以跨多个查询处理系统和数据库后端运行和优化查询。
可靠性。Calcite是可靠的,因为它多年来的广泛使用导致了该平台的详尽测试。Calcite还包含一个涵盖很广的测试套件,用于验证系统的所有组件,包括查询优化器规则和与后端数据源的集成。
支持SQL及其扩展。许多系统不提供自己的查询语言,而是更喜欢依赖现有的查询语言。对于那些,Calcite提供对ANSI标准SQL的支持,以及各种SQL方言和扩展,例如,用于表达对流或嵌套数据的查询。此外,Calcite包含符合标准Java API(JDBC)的驱动程序。
其余部分安排如下。第2节讨论相关工作。第3节介绍了Calcite的架构及其主要组件。第4节描述了Calcite核心的关系代数。第5节介绍了Calcite的适配器,这是一种定义如何读取外部数据源的抽象。然后,第6节描述了Calcite的优化器及其主要特性,而第7节介绍了处理不同查询处理范式的扩展。第8节概述了已经使用Calcite的数据处理系统。第9节讨论了在第10节结束之前框架可能的未来扩展。
2. 相关工作
虽然Calcite目前是Hadoop生态系统中最广泛采用的大数据分析优化器,但其背后的许多想法并不新颖。例如,查询优化器基于Volcano和Cascades框架的构思,并结合其他广泛使用的优化技术,例如物化视图重写。还有其他系统试图填补与Calcite类似的角色。
Orca是一种模块化查询优化器,用于数据管理产品,如Greenplum和HAWQ。Orca通过实现一个用于在两个称为Data eXchange Language之间交换信息的框架,将优化器与查询执行引擎分离。Orca还提供了用于验证生成的查询计划的正确性和性能的工具。与Orca相比,Calcite可以用作独立的查询执行引擎,它可以联合多个存储和处理后端,包括可插拔的规划器和优化器。
Spark SQL扩展了Apache Spark以支持SQL查询执行,这也可以像在Calcite中一样对多个数据源执行查询。但是,虽然Spark SQL中的Catalyst优化器也试图最小化查询执行成本,但它缺乏Calcite使用的动态编程方法,并且存在陷入局部最小值的风险。
Algebricks是一种查询编译器体系结构,为大数据查询处理提供数据模型无关的代数层和编译器框架。高级语言被编译为Algebricks逻辑代数。然后Algebricks生成针对Hyracks并行处理后端的优化作业。虽然Calcite与Algebricks共享模块化方法,但Calcite还支持基于成本的优化。在当前版本的Calcite中,查询优化器架构使用基于Volcano的基于动态编程的规划,以及Orca中的多阶段优化扩展。虽然原则上Algebricks可以支持多个处理后端(例如,Apache Tez,Spark),但是Calcite多年来为各种后端提供了经过充分测试的支持。
Garlic是一个异构数据管理系统,它表示统一对象模型下来自多个系统的数据。但是,Garlic不支持跨不同系统的查询优化,并依赖于每个系统来优化自己的查询。
FORWARD [17]是一个联合查询处理器,它实现了SQL的超集,称为SQL++。SQL++有一个半结构化数据模型,它集成了JSON和关系数据模型,而Calcite通过在查询规划期间在关系数据模型中表示它们来支持半结构化数据模型。FORWARD将用SQL++编写的联合查询分解为子查询,并根据查询计划在底层数据库上执行它们。数据的合并发生在FORWARD引擎内部。
另一个联合数据存储和处理系统是Big-DAWG,它抽象了广泛的数据模型,包括关系,时间序列和流。BigDAWG中的抽象单元称为信息孤岛。每个信息孤岛都有查询语言,数据模型并连接到一个或多个存储系统。在单个信息孤岛的范围内支持跨存储系统查询。Calcite提供了统一的关系抽象,允许使用不同的数据模型查询后端。
Myria是用于大数据分析的通用引擎,具有对Python语言的高级支持。它为其他后端引擎(如Spark和PostgreSQL)生成查询计划。
3 架构
Calcite包含许多构成典型数据库管理系统的部分。然而,它省略了一些关键组件,例如,数据存储,处理数据的算法,以及用于存储元数据的存储库。这些遗漏是有意的:它使得Calcite成为在具有一个或多个数据存储位置和使用多个数据处理引擎的应用程序之间进行调解的绝佳选择。它也是构建定制数据处理系统的坚实基础。
图1 Apache Calcite架构和交互
图1概述了Calcite架构的主要组件。Calcite的优化器使用关系运算符树作为其内部表示。优化引擎主要由三个组件组成:规则,元数据提供程序和计划程序引擎。我们将在第6节中更详细地讨论这些组件。在图中,虚线表示与框架可能的外部交互。有与Calcite交互的不同方式。
首先,Calcite包含一个查询解析器和验证器,可以将SQL查询转换为关系运算符树。由于Calcite不包含存储层,因此它提供了一种通过适配器在外部存储引擎中定义表模式和视图的机制(如第5节所述),因此可以在这些引擎之上使用它。
其次,尽管Calcite为需要此类数据库语言支持的系统提供了优化的SQL支持,但它还为已经拥有自己的语言解析和解释的系统提供了优化支持:
有些系统支持SQL查询,但没有或有限的查询优化。例如,Hive和Spark最初都提供了对SQL语言的支持,但它们没有包含优化器。对于这种情况,一旦优化了查询,Calcite就可以将关系表达式转换回SQL。此功能允许Calcite作为独立系统在具有SQL接口但没有优化器的任何数据管理系统之上工作。
Calcite架构不仅适用于优化SQL查询。数据处理系统通常选择将自己的解析器用于自己的查询语言。Calcite也可以帮助优化这些查询。实际上,Calcite还允许通过直接实例化关系运算符来容易地构造运算符树。可以使用内置的关系表达式构建接口。例如,假设我们想使用表达式构建器表达以下Apache Pig脚本:
emp = LOAD 'employee_data ' AS (deptno , sal );
emp_by_dept = GROUP emp by ( deptno );
emp_agg = FOREACH emp_by_dept GENERATE GROUP as deptno ,
COUNT (emp.sal) AS c, SUM(emp.sal) as s;
dump emp_agg ;
复制
等式如下所示:
final RelNode node = builder
.scan("employee_data")
.aggregate(builder.groupKey("deptno"),
builder.count(false, "c"),
builder.sum(false, "s", builder.field ("sal")))
.build();
复制
此接口公开构建关系表达式所需的主要构造。优化阶段完成后,应用程序可以检索优化的关系表达式,然后可以将其映射回系统的查询处理单元。
4 查询代数
操作符(Operators)。 关系代数是Calcite的核心。除了表达最常见的数据操作的运算符(例如filter,project,join等)之外,Calcite还包括满足不同目的的其他运算符,例如,能够简明地表示复杂操作或更有效地识别优化机会。
例如,OLAP,决策制定和流应用程序通常使用窗口定义来表示复杂的分析函数,例如一段时间或一个或多个行的数量的移动平均值。因此,Calcite引入了一个窗口运算符,它封装了窗口定义,即上限和下限,分区等,以及在每个窗口上执行的聚合函数。
特征(Traits)。 Calcite不使用不同的实体来表示逻辑和物理运算符。相反,它描述了与使用特征的运算符关联的物理属性。这些特征有助于优化器评估不同替代计划的成本。更改特征值不会更改正在评估的逻辑表达式,即由给定运算符生成的行仍将是相同的。
在优化过程中,Calcite试图在关系表达式上强制执行某些特性,例如某些列的排序顺序。关系运算符可以实现一个转换器接口,该接口指示如何将表达式的特性从一个值转换为另一个值。
Calcite包括描述由关系表达式产生的数据物理性质的常见特征,如排序、分组和分区。与SCOPE优化器类似,Calcite优化器可以解释这些属性,并利用它们来找到避免不必要操作的计划。例如,如果排序运算符的输入已经正确排序,可能是因为这与后端系统中的行使用的顺序相同,那么可以删除排序操作。
除这些性质外,Calcite的一个主要特点是calling convention特征。本质上,特征表示将在其中执行表达式的数据处理系统。将调用约定作为一个特性,使Calcite能够实现其优化透明查询的目标,这些查询的执行可能跨越不同的引擎,即,该约定将被视为任何其他物理属性。
图2 一个查询优化过程
例如,考虑将MySQL中的Products表连接到Splunk中的Orders表(参见图2)。最初,订单扫描在Splunk convention中进行,产品扫描在jdbc-mysql convention中进行。这些表格必须在各自的引擎内进行扫描。Join在逻辑convention中,这意味着没有任何实现是被选择的。此外,图2中的SQL查询包含一个过滤器(WHERE子句),该过滤器由特定于适配器的规则(参见第5节)推送到Splunk中。一种可能的实现是使用Apache Spark作为外部引擎:Join被转换为Spark convention,其输入是从jdbc mysql和Splunk到Spark convention的转换器。但是有一个更有效的实现:利用Splunk可以通过ODBC执行对MySQL的查询的事实,一个planner规则通过splunk-to-spark转换器下推Join,Join现在处于Splunk convention中,运行在Splunk引擎。
5 适配器
适配器是一种体系结构模式,用于定义Calcite如何将各种数据源合并到一般访问中。图3描绘了其组件。实质上,适配器由model,schema和schema工厂组成。model是要访问的数据源的物理属性的规范。schema是model中的数据(format和layout)的定义。数据本身通过表进行物理访问。Calcite与适配器中定义的表接口,以便在执行查询时读取数据。适配器可以定义一组添加到计划器(planner)的规则。例如,它通常包括将各种类型的逻辑关系表达式转换为适配器约定的对应关系表达式的规则。schema工厂组件从model中获取元数据信息并生成schema。
如第4节所述,Calcite使用称为calling convention的物理特征来识别对应于特定数据库后端的关系运算符。这些物理运算符实现每个适配器中基础表的访问路径。解析查询并将其转换为关系代数表达式时,将为表示该表上数据扫描的每个表创建一个运算符。它是适配器必须实现的最小接口。如果适配器实现了表扫描运算符,那么Calcite优化器就可以使用客户端运算符(如排序,过滤和连接)对这些表执行任意SQL查询。
扫表的运算符包含适配器向适配器的后端数据库发出扫描所需的必要信息。为了扩展适配器提供的功能,Calcite定义了一个可枚举的calling convention。具有可枚举calling convention的关系运算符只是通过迭代器接口对元组进行操作。这个calling convention允许Calcite实现在每个适配器的后端中可能不可用的运算符。例如, EnumerableJoin
运算符通过从其子节点收集行并连接所需的属性来实现Join。
对于仅触及表中数据的一小部分的查询,Calcite枚举所有元组是低效的。幸运的是,可以使用相同的基于规则的优化器来实现特定于适配器的优化规则。例如,假设查询涉及对表进行过滤和排序。可以在后端执行过滤的适配器可以实现与 LogicalFilter
匹配的规则,并将其转换为适配器的调用calling convention。此规则将 LogicalFilter
转换为另一个 Filter
实例。此新的 Filter
节点具有较低的关联成本,允许Calcite优化跨适配器的查询。
适配器的使用是一种强大的抽象,不仅可以优化特定后端的查询,还可以跨多个后端进行查询。通过将所有可能的逻辑推送到每个后端,然后对结果数据执行连接和聚合,Calcite能够响应涉及多个后端的表的查询。实现适配器可以像提供扫表操作符一样简单,也可以涉及许多高级优化的设计。可以使用优化程序规则将关系代数中表示的任何表达式下推到适配器。
6 查询过程和优化
查询优化器是框架中的主要组件。Calcite通过将planner规则重复应用于关系表达式来优化查询。一个成本模型指引着过程,计划程序引擎尝试生成一个替代表达式,该表达式具有与原始语义相同的语义,但成本较低。
优化器中的每个组件都是可扩展的。用户可以添加关系运算符,规则,成本模型和统计信息。
计划器(Planner)规则。 Calcite包含一组用于转换表达式树的planner规则。特别是,规则匹配语法树中的给定模式,并执行保留该表达式语义的转换。Calcite包含数百个优化规则。然而,依赖于Calcite进行优化的数据处理系统允许特定的重写包括它们自己的规则,这是相当普遍的。
例如,Calcite为Apache Cassandra提供了一个适配器,Apache Cassandra是一个宽列存储,它通过表中的列的子集对数据进行分区,然后在每个分区内根据列的另一个子集对行进行排序。正如第5节中所讨论的那样,适配器可以将尽可能多的查询处理压缩到每个后端以提高效率。将Sort推入Cassandra的规则必须检查两个条件:
该表先前已过滤到单个分区(因为行仅在分区内排序)
Cassandra中的分区排序有一些带有所需排序的通用前缀。
这要求将 LogicalFilter
重写为 CassandraFilter
,以确保将分区过滤下推到数据库。规则的效果很简单(将 LogicalSort
转换为 CassandraSort
),但规则匹配的灵活性使后端能够在复杂的场景中下推运算符。
举个具有更复杂效果的规则的示例,考虑以下查询:
SELECT products.name , COUNT (*)
FROM sales JOIN products USING ( productId )
WHERE sales.discount IS NOT NULL
GROUP BY products.name
ORDER BY COUNT (*) DESC ;
复制
该查询对应于图4a中呈现的关系代数表达式。因为WHERE子句仅适用于sales表,所以我们可以在连接之前移动Filter,如图4b所示。此优化可以显着减少查询执行时间,因为我们不需要为与谓词匹配的行执行Join。此外,如果销售和产品表包含在单独的后端中,则在连接之前移动Filter还可能使适配器将Filter推入后端。Calcite通过 FilterIntoJoinRule
实现此优化,它将Filter节点与作为父节点的Join节点进行匹配,并检查Filter是否可以由Join执行。此优化说明了Calcite优化方法的灵活性。
元数据provider。元数据是Calcite优化器的重要组成部分,它有两个主要目的:(i)指导planner实现降低整体查询计划成本的目标,以及(ii)在应用规则时为规则提供信息。
元数据负责将信息提供给优化器。特别是,Calcite中的默认元数据provider的实现,包含了返回在运算符树中执行子表达式的总成本、行数和该表达式结果的数据大小,以及可以执行的最大并行度的函数。反过来,它还可以提供关于计划结构的信息,例如,在某个树节点下面存在的过滤条件。
Calcite提供的接口允许数据处理系统将其元数据信息插入到框架中。这些系统可以选择编写覆盖现有功能的provider,或者提供可能在优化阶段使用的新的元数据功能。但是,对于他们中的许多人来说,提供有关其输入数据的统计数据就足够了,例如,行数和表的大小,给定列的值是否是唯一的等等,而Calcite将通过以下方式完成其余的工作。使用其默认实现。
由于元数据provider是可插入的,因此它们在运行时使用Java轻量级编译器Janino进行编译和实例化。它们的实现包括元数据结果的缓存,这会产生显著的性能改进,例如,当我们需要计算多种类型的元数据时,例如基数,平均行大小和给定join的选择性,并且所有这些计算都依赖于它们的输入的基数。他们的投入。
Planner引擎。 计划引擎的主要目标是触发提供给引擎的规则,直到达到给定目标。目前,Calcite提供两种不同的引擎。新的引擎可以在框架中插入。
第一种是基于成本的计划引擎,它触发输入规则,目标是降低总体表达式的成本。该引擎使用类似于火山模型的动态编程算法来创建和跟踪通过触发给予引擎的规则而创建的不同替代计划。最初,每个表达式都与计划程序一起注册,并与基于表达式的属性和输入二者的摘要一起注册。当在表达式e1上触发规则并且规则生成新表达式e2时,规划器将e2添加到e1所属的等价表达式Sa集合中。此外,计划程序会为新表达式生成摘要,并与先前在计划程序中注册的摘要进行比较。如果找到与属于集合Sb的表达式e3相关联的类似摘要,则规划器已找到重复,因此将Sa和Sb合并为一组新的等价。该过程一直持续到计划程序到达可配置的修复点。特别是,它可以(i)穷尽地探索搜索空间,直到所有规则都应用于所有表达式,或者(ii)当计划成本未提高超过给定阈值δ时,使用基于启发式的方法来停止搜索。最后的计划迭代。允许优化器决定选择哪个计划的成本函数是通过元数据提供程序提供的。默认成本函数实现结合了给定表达式使用的CPU,IO和内存资源的估计。
第二个引擎是一个详尽的计划器,它会详尽地触发规则,直到它生成一个不再被任何规则修改的表达式。此计划程序可用于快速执行规则,而无需考虑每个表达式的成本。
用户可以根据具体需要选择使用现有的计划引擎之一,并在系统需求发生变化时从一个引擎切换到另一个,这很简单。或者,用户可以选择生成多阶段优化逻辑,其中在优化过程的连续阶段中应用不同的规则集。重要的是,两个规划器的存在允许Calcite用户通过引导搜索不同的查询计划来减少整体优化时间。
物化视图。 加速数据仓库中查询处理的最强大技术之一是相关摘要或物化视图的预计算。依赖于Calcite的多个Calcite适配器和项目都有自己的物化视图概念。例如,Cassandra允许用户基于由系统自动维护的现有表来定义物化视图。
这些引擎将其物化视图提供给Calcite。然后,优化器有机会重写传入的查询以使用这些视图而不是原始表。特别是,Calcite提供了两种不同的物化视图重写算法的实现。
第一种方法基于视图替换(view substitution)。目标是用一个使用物化视图的等效表达式替换部分关系代数树,算法如下:(i)物化视图上的扫描操作符和物化视图定义的扫描操作符在planner上注册,以及(ii)触发试图统一计划中表达的转换规则。视图不需要与要替换的查询中的表达式完全匹配,因为Calcite中的重写算法可以产生包括用于计算期望表达式的附加运算符的部分重写,例如具有残余谓词条件的过滤器。
第二种方法基于格子(lattices)。一旦数据源被声明形成一个lattice,Calcite就会将每个实现表示为一个tile,而优化器又可以使用它来回答传入的查询。一方面,重写算法在以星型模式组织的数据源上匹配表达式方面特别有效,这在OLAP应用程序中很常见。另一方面,它比视图替换更具限制性,因为它对底层模式施加了限制。