接触 Calcite 的时间不算长,感觉 Calcite 还是很难的,越往下看各个名次及查询优化部分就很难看的下去。不过工作方面暂时只用到了 Adapter 部分,而且由于通用性的考量,使用的 Table 暂时是ScannableTable
。一下子接触太多东西也容易忘,所以还是从头开始记笔记,一边完成工作上的事一边继续学习,一蹴而不可取。
主要内容来自 Calcite 的英文官方指南 ,介绍如何使用 Calcite 对 CSV 进行 SQL 查询,主要参考一个基础的 Adapter 是如何实现的。
1 2 3 4 git clone https://github.com/apache/calcite.git cd calcitemvn install -DskipTests -Dcheckstyle.skip=true cd example/csv
官方介绍的是使用 Sqlline 进行 shell 式查询,执行SELECT * FROM emps;
可以看到存放在 CSV 中的emps
表数据。
1 2 3 4 ./sqlline sqlline> !connect jdbc:calcite:model=src/test /resources/model.json admin admin sqlline> !tables sqlline> SELECT * FROM emps;
model.json 文件中的内容是查询关键,可以看到factory
中指定了CsvSchemaFactory
,后续可以以该类为起点展开。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 { "version" : "1.0" , "defaultSchema" : "SALES" , "schemas" : [ { "name" : "SALES" , "type" : "custom" , "factory" : "org.apache.calcite.adapter.csv.CsvSchemaFactory" , "operand" : { "directory" : "sales" } } ] }
由于使用 Sqlline 不方便 DEBUG,所以改写了CsvTest
中的测试用例,把directory
换成了绝对路径(实际用来使用的是new File(directory)
,避免找不到文件),model
对应的JSON字符串开头多了inline
。Calcite 对应的 Driver 是org.apache.calcite.jdbc.Driver
,驱动的注册工作基于 SPI 自动完成,jdbcUrl为jdbc:calcite:
即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Test public void common () throws SQLException { Properties info = new Properties(); info.put("model" , "inline:" + "{\n" + " \"version\": \"1.0\",\n" + " \"defaultSchema\": \"SALES\",\n" + " \"schemas\": [\n" + " {\n" + " \"name\": \"SALES\",\n" + " \"type\": \"custom\",\n" + " \"factory\": \"org.apache.calcite.adapter.csv.CsvSchemaFactory\",\n" + " \"operand\": {\n" + " \"directory\": \"/...绝对路径/src/test/resources/sales\"\n" + " }\n" + " }\n" + " ]\n" + "}" ); try (Connection connection = DriverManager.getConnection("jdbc:calcite:" , info); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("select * from emps" )) { while (resultSet.next()) { System.out.println(resultSet.getLong(1 )); } } }
一. SchemaFactory & Schema
First, we define a schema based on a schema factory class in a model file.
Then the schema factory creates a schema, and the schema creates several tables, each of which knows how to get data by scanning a CSV file.
Last, after Calcite has parsed the query and planned it to use those tables, Calcite invokes the tables to read the data as the query is being executed.
在 model 中指定了 factory 为 CsvSchemaFactory,其实现了 SchemaFactory 接口,主要完成生成Schema 的 create 方法实现。directory
可作为 operand
的参数获取到,根据directory
生成directoryFile
参数,及表示存放 csv/csv.gz 的文件目录。返回结果为 继承了AbstractSchema
的 CsvSchema
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Schema create (SchemaPlus parentSchema, String name, Map<String, Object> operand) { final String directory = (String) operand.get("directory" ); final File base = (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName); File directoryFile = new File(directory); if (base != null && !directoryFile.isAbsolute()) { directoryFile = new File(base, directory); } String flavorName = (String) operand.get("flavor" ); CsvTable.Flavor flavor; if (flavorName == null ) { flavor = CsvTable.Flavor.SCANNABLE; } else { flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase(Locale.ROOT)); } return new CsvSchema(directoryFile, flavor); } }
在CsvSchema
中需要实现的是getTableMap()
方法,其返回结果为Map<String, Table>
。在 sales 目录下的 DEPTS.csv、EMPS.csv.gz、SDEPTS.csv 都会解析为对应的表,返回一个不可变Map:
1 2 3 "DEPTS" -> "CsvScannableTable" "EMPS" -> "CsvScannableTable" "SDEPTS" -> "CsvScannableTable"
二. Table & Enumerator 有三种Table,其接口分别为ScannableTable
、FilterableTable
、TranslatableTable
,在文中分别对应CsvScannableTable
,CsvFilterableTable
、CsvTranslatableTable
,先使用ScannableTable
完成入门流程。这三者的主要区别可简述为:ScannableTable 会把所有数据加载到内存中,在内存中进行条件过滤;FilterableTable 可以获得过滤条件,可以帮助在DB查询数据时先过滤一部分,从而减少内存开销;TranslatableTable 难度较大,需根据上下文自己定义表扫描的执行计划。
CsvScannableTable 继承自CsvTable
,而CsvTable
又继承在AbstractTable
,在CsvTable
中实现了getRowType
方法,返回类型为RelDataType
,表示所有列属性信息。文中的CSV示例是以DEPTNO:int,NAME:string
特定格式表示数据类型的,getRowType
返回的就是包含这些属性名称及类型对应关系的RelDataType
。
在可获得类型信息后,CsvScannableTable
需要关心的就是如何获取及扫描数据,这就需要实现ScannableTable
定义的scan
方法。
1 2 3 4 5 6 7 8 9 10 public Enumerable<Object[]> scan(DataContext root) { final int [] fields = CsvEnumerator.identityList(fieldTypes.size()); final AtomicBoolean cancelFlag = DataContext.Variable.CANCEL_FLAG.get(root); return new AbstractEnumerable<Object[]>() { public Enumerator<Object[]> enumerator() { return new CsvEnumerator<>(source, cancelFlag, false , null , new CsvEnumerator.ArrayRowConverter(fieldTypes, fields)); } }; }
CsvEnumerator
实现了Enumerator
接口,内部调用 opencsv 的 CSVReader.readNext() 进行推进及获取数据。
1 2 3 4 5 6 public interface Enumerator <T > extends AutoCloseable { T current () ; boolean moveNext () ; void reset () ; void close () ; }
CSV Adapter 总体的依赖关系如下所示。
三: View 在 JSON 中的 schemas 中可以添加 tables 来实现 View(type 指定为 view)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 { version: '1.0' , defaultSchema: 'SALES' , schemas: [ { name: 'SALES' , type: 'custom' , factory: 'org.apache.calcite.adapter.csv.CsvSchemaFactory' , operand: { directory: 'sales' }, tables: [ { name: 'FEMALE_EMPS' , type: 'view' , sql: 'SELECT * FROM emps WHERE gender = \'F\'' } ] } ] }
四: 自定义表 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { version: '1.0' , defaultSchema: 'CUSTOM_TABLE' , schemas: [ { name: 'CUSTOM_TABLE' , tables: [ { name: 'EMPS' , type: 'custom' , factory: 'org.apache.calcite.adapter.csv.CsvTableFactory' , operand: { file: 'sales/EMPS.csv.gz' , flavor: "scannable" } } ] } ] }
自定义表相对于自定义Schema有更高的自由度,每个表可以有单独的参数设置,使用的工厂类也变为了CsvTableFactory
,其 create 方法也较为简单,返回结果一样是 CsvScannableTable。
1 2 3 4 5 6 7 8 9 10 public CsvTable create (SchemaPlus schema, String name, Map<String, Object> operand, RelDataType rowType) { String fileName = (String) operand.get("file" ); final File base = (File) operand.get(ModelHandler.ExtraOperand.BASE_DIRECTORY.camelName); final Source source = Sources.file(base, fileName); final RelProtoDataType protoRowType = rowType != null ? RelDataTypeImpl.proto(rowType) : null ; return new CsvScannableTable(source, protoRowType); }
五: JDBC Adapter JDBC Adapter 可以将数据库中的一个 Schema 映射成 Calcite 中的一个 Schema,该 Adapter 内置在 Calcite 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 { version: '1.0', defaultSchema: 'FOODMART', schemas: [ { name: 'FOODMART', type: 'custom', factory: 'org.apache.calcite.adapter.jdbc.JdbcSchema$Factory', operand: { jdbcDriver: 'com.mysql.jdbc.Driver', jdbcUrl: 'jdbc:mysql://localhost/foodmart', jdbcUser: 'foodmart', jdbcPassword: 'foodmart' } } ] }
使用的时候需要注意SQL语句的双引号以及大小写,该 Adapter SQL 语法比较严格。目前该 Adapter有局限性,仅支持表扫描操作,其他处理(过滤、连接、聚集等)都发生在 Calcite 中。
可以通过下面的简单示例看一下 CSV 和 MYSQL 两处数据源的联合查询。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Test public void hybrid () throws SQLException { Properties info = new Properties(); info.put("model" , "inline:" + "{\n" + " \"version\": \"1.0\",\n" + " \"defaultSchema\": \"TEST\",\n" + " \"schemas\": [\n" + " {\n" + " \"name\": \"TEST\",\n" + " \"type\": \"custom\",\n" + " \"factory\": \"org.apache.calcite.adapter.jdbc.JdbcSchema$Factory\",\n" + " \"operand\": {\n" + " \"jdbcDriver\": \"com.mysql.jdbc.Driver\",\n" + " \"jdbcUrl\": \"jdbc:mysql://127.0.0.1:3306/test\",\n" + " \"jdbcUser\": \"...\",\n" + " \"jdbcPassword\": \"...\"\n" + " }\n" + " },\n" + " {\n" + " \"name\": \"SALES\",\n" + " \"type\": \"custom\",\n" + " \"factory\": \"org.apache.calcite.adapter.csv.CsvSchemaFactory\",\n" + " \"operand\": {\n" + " \"directory\": \"/.../sales\"\n" + " }\n" + " }\n" + " ]\n" + "}" ); try (Connection connection = DriverManager.getConnection("jdbc:calcite:" , info); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("SELECT * FROM \"TEST\".\"abc\",\"SALES\".\"EMPS\"" )) { while (resultSet.next()) { System.out.println(resultSet.getString(1 )); } } }
另外,还有一种 Cloning JDBC adapter ,数据同样来自数据库,但是在第一次读取时会将数据读取到内存表中,Calcite 根据内存表计算查询,实际上相当于数据库的缓存。
Reference https://matt33.com/2019/03/07/apache-calcite-process-flow/
https://zhuanlan.zhihu.com/p/53725382