JohnShen's Blog.

Apache Calcite (一) - Adapter

字数统计: 1.7k阅读时长: 8 min
2020/02/20 Share

接触 Calcite 的时间不算长,感觉 Calcite 还是很难的,越往下看各个名次及查询优化部分就很难看的下去。不过工作方面暂时只用到了 Adapter 部分,而且由于通用性的考量,使用的 Table 暂时是ScannableTable。一下子接触太多东西也容易忘,所以还是从头开始记笔记,一边完成工作上的事一边继续学习,一蹴而不可取。

主要内容来自 Calcite 的英文官方指南,介绍如何使用 Calcite 对 CSV 进行 SQL 查询,主要参考一个基础的 Adapter 是如何实现的。

1
2
3
4
git clone https://github.com/apache/calcite.git
cd calcite
mvn install -DskipTests -Dcheckstyle.skip=true
cd example/csv

官方介绍的是使用 Sqlline 进行 shell 式查询,执行SELECT * FROM emps;可以看到存放在 CSV 中的emps表数据。

1
2
3
4
./sqlline
sqlline> !connect jdbc:calcite:model=src/test/resources/model.json admin admin
sqlline> !tables
sqlline> SELECT * FROM emps;

model.json 文件中的内容是查询关键,可以看到factory中指定了CsvSchemaFactory,后续可以以该类为起点展开。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"version": "1.0",
"defaultSchema": "SALES",
"schemas": [
{
"name": "SALES",
"type": "custom",
"factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
"operand": {
"directory": "sales"
}
}
]
}

由于使用 Sqlline 不方便 DEBUG,所以改写了CsvTest中的测试用例,把directory换成了绝对路径(实际用来使用的是new File(directory),避免找不到文件),model对应的JSON字符串开头多了inline。Calcite 对应的 Driver 是org.apache.calcite.jdbc.Driver,驱动的注册工作基于 SPI 自动完成,jdbcUrl为jdbc:calcite:即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void common() throws SQLException {
Properties info = new Properties();
info.put("model",
"inline:" + "{\n" +
" \"version\": \"1.0\",\n" +
" \"defaultSchema\": \"SALES\",\n" +
" \"schemas\": [\n" +
" {\n" +
" \"name\": \"SALES\",\n" +
" \"type\": \"custom\",\n" +
" \"factory\": \"org.apache.calcite.adapter.csv.CsvSchemaFactory\",\n" +
" \"operand\": {\n" +
" \"directory\": \"/...绝对路径/src/test/resources/sales\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}");
try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from emps")) {
while (resultSet.next()) {
// logic
System.out.println(resultSet.getLong(1));
}
}
}

一. SchemaFactory & Schema

First, we define a schema based on a schema factory class in a model file.

Then the schema factory creates a schema, and the schema creates several tables, each of which knows how to get data by scanning a CSV file.

Last, after Calcite has parsed the query and planned it to use those tables, Calcite invokes the tables to read the data as the query is being executed.

在 model 中指定了 factory 为 CsvSchemaFactory,其实现了 SchemaFactory 接口,主要完成生成Schema 的 create 方法实现。directory可作为 operand的参数获取到,根据directory生成directoryFile参数,及表示存放 csv/csv.gz 的文件目录。返回结果为 继承了AbstractSchemaCsvSchema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public Schema create(SchemaPlus parentSchema, String name,
Map<String, Object> operand) {
final String directory = (String) operand.get("directory");
final File base =
(File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
File directoryFile = new File(directory);
if (base != null && !directoryFile.isAbsolute()) {
directoryFile = new File(base, directory);
}
String flavorName = (String) operand.get("flavor");
CsvTable.Flavor flavor;
if (flavorName == null) {
flavor = CsvTable.Flavor.SCANNABLE;
} else {
flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase(Locale.ROOT));
}
return new CsvSchema(directoryFile, flavor);
}
}

CsvSchema中需要实现的是getTableMap()方法,其返回结果为Map<String, Table>。在 sales 目录下的 DEPTS.csv、EMPS.csv.gz、SDEPTS.csv 都会解析为对应的表,返回一个不可变Map:

1
2
3
"DEPTS" -> "CsvScannableTable"
"EMPS" -> "CsvScannableTable"
"SDEPTS" -> "CsvScannableTable"

二. Table & Enumerator

有三种Table,其接口分别为ScannableTableFilterableTableTranslatableTable,在文中分别对应CsvScannableTableCsvFilterableTableCsvTranslatableTable,先使用ScannableTable完成入门流程。这三者的主要区别可简述为:ScannableTable 会把所有数据加载到内存中,在内存中进行条件过滤;FilterableTable 可以获得过滤条件,可以帮助在DB查询数据时先过滤一部分,从而减少内存开销;TranslatableTable 难度较大,需根据上下文自己定义表扫描的执行计划。

CsvScannableTable 继承自CsvTable,而CsvTable又继承在AbstractTable,在CsvTable中实现了getRowType方法,返回类型为RelDataType,表示所有列属性信息。文中的CSV示例是以DEPTNO:int,NAME:string特定格式表示数据类型的,getRowType返回的就是包含这些属性名称及类型对应关系的RelDataType

在可获得类型信息后,CsvScannableTable需要关心的就是如何获取及扫描数据,这就需要实现ScannableTable定义的scan方法。

1
2
3
4
5
6
7
8
9
10
public Enumerable<Object[]> scan(DataContext root) {
final int[] fields = CsvEnumerator.identityList(fieldTypes.size());
final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root);
return new AbstractEnumerable<Object[]>() {
public Enumerator<Object[]> enumerator() {
return new CsvEnumerator<>(source, cancelFlag, false, null,
new CsvEnumerator.ArrayRowConverter(fieldTypes, fields));
}
};
}

CsvEnumerator实现了Enumerator接口,内部调用 opencsv 的 CSVReader.readNext() 进行推进及获取数据。

1
2
3
4
5
6
public interface Enumerator<T> extends AutoCloseable {  
T current();
boolean moveNext();
void reset();
void close();
}

CSV Adapter 总体的依赖关系如下所示。

三: View

在 JSON 中的 schemas 中可以添加 tables 来实现 View(type 指定为 view)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
version: '1.0',
defaultSchema: 'SALES',
schemas: [
{
name: 'SALES',
type: 'custom',
factory: 'org.apache.calcite.adapter.csv.CsvSchemaFactory',
operand: {
directory: 'sales'
},
tables: [
{
name: 'FEMALE_EMPS',
type: 'view',
sql: 'SELECT * FROM emps WHERE gender = \'F\''
}
]
}
]
}

四: 自定义表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
version: '1.0',
defaultSchema: 'CUSTOM_TABLE',
schemas: [
{
name: 'CUSTOM_TABLE',
tables: [
{
name: 'EMPS',
type: 'custom',
factory: 'org.apache.calcite.adapter.csv.CsvTableFactory',
operand: {
file: 'sales/EMPS.csv.gz',
flavor: "scannable"
}
}
]
}
]
}

自定义表相对于自定义Schema有更高的自由度,每个表可以有单独的参数设置,使用的工厂类也变为了CsvTableFactory,其 create 方法也较为简单,返回结果一样是 CsvScannableTable。

1
2
3
4
5
6
7
8
9
10
public CsvTable create(SchemaPlus schema, String name,
Map<String, Object> operand, RelDataType rowType) {
String fileName = (String) operand.get("file");
final File base =
(File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName);
final Source source = Sources.file(base, fileName);
final RelProtoDataType protoRowType =
rowType != null ? RelDataTypeImpl.proto(rowType) : null;
return new CsvScannableTable(source, protoRowType);
}

五: JDBC Adapter

JDBC Adapter 可以将数据库中的一个 Schema 映射成 Calcite 中的一个 Schema,该 Adapter 内置在 Calcite 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
version: '1.0',
defaultSchema: 'FOODMART',
schemas: [
{
name: 'FOODMART',
type: 'custom',
factory: 'org.apache.calcite.adapter.jdbc.JdbcSchema$Factory',
operand: {
jdbcDriver: 'com.mysql.jdbc.Driver',
jdbcUrl: 'jdbc:mysql://localhost/foodmart',
jdbcUser: 'foodmart',
jdbcPassword: 'foodmart'
}
}
]
}

使用的时候需要注意SQL语句的双引号以及大小写,该 Adapter SQL 语法比较严格。目前该 Adapter有局限性,仅支持表扫描操作,其他处理(过滤、连接、聚集等)都发生在 Calcite 中。

可以通过下面的简单示例看一下 CSV 和 MYSQL 两处数据源的联合查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Test
public void hybrid() throws SQLException {
Properties info = new Properties();
info.put("model",
"inline:" +
"{\n" +
" \"version\": \"1.0\",\n" +
" \"defaultSchema\": \"TEST\",\n" +
" \"schemas\": [\n" +
" {\n" +
" \"name\": \"TEST\",\n" +
" \"type\": \"custom\",\n" +
" \"factory\": \"org.apache.calcite.adapter.jdbc.JdbcSchema$Factory\",\n" +
" \"operand\": {\n" +
" \"jdbcDriver\": \"com.mysql.jdbc.Driver\",\n" +
" \"jdbcUrl\": \"jdbc:mysql://127.0.0.1:3306/test\",\n" +
" \"jdbcUser\": \"...\",\n" +
" \"jdbcPassword\": \"...\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"name\": \"SALES\",\n" +
" \"type\": \"custom\",\n" +
" \"factory\": \"org.apache.calcite.adapter.csv.CsvSchemaFactory\",\n" +
" \"operand\": {\n" +
" \"directory\": \"/.../sales\"\n" +
" }\n" +
" }\n" +
" ]\n" +
"}"
);
try (Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT * FROM \"TEST\".\"abc\",\"SALES\".\"EMPS\"")) {

while (resultSet.next()) {
// logic
System.out.println(resultSet.getString(1));
}
}
}

另外,还有一种 Cloning JDBC adapter,数据同样来自数据库,但是在第一次读取时会将数据读取到内存表中,Calcite 根据内存表计算查询,实际上相当于数据库的缓存。


Reference

https://matt33.com/2019/03/07/apache-calcite-process-flow/

https://zhuanlan.zhihu.com/p/53725382

CATALOG
  1. 1. 一. SchemaFactory & Schema
  2. 2. 二. Table & Enumerator
  3. 3. 三: View
  4. 4. 四: 自定义表
  5. 5. 五: JDBC Adapter
  • Reference