GeoMesa SQL Connector
Glink对Flink的SQL Connector进行了扩展, 实现了GeoMesa SQL Connector. 这里介绍了如何设置和使用GeoMesa SQL Connector. 我们在GeoMesa 3.1+版本上进行了测试.
Dependencies
为了使用GeoMesa SQL Connector, 需要添加如下依赖.
| Glink dependency |
|---|
| glink-connector-geomesa-x.x.x.jar |
| glink-sql-x.x.x.jar |
最简单的方法是将$GLINK_HOME/lib目录中的上述两个Jar包复制到$FLINK_HOME/lib目录下.
DDL
使用GeoMesa SQL Connector时, 在DDL中有几点需要特别注意, 这里进行详细说明.
DDL中定义空间数据类型
由于Flink当前无法支持可注册的自定义类型, 因此我们无法在DDL中直接定义空间数据类型. 在GeoMesa SQL Connector中可以WKT/WKB形式表示空间数据类型, 并在WITH参数中用geomesa.spatial.fields指明, 其格式为: <field name>:<field type>, 多个字段由”,“分隔. 其中<field type>支持Spatial Data Types中的所有GeoMesa Type.
如下DDL定义了GeoMesa中的一个表, 用于存储T-Drive数据, 其中point2字段为WKT格式的STRING类型, 并且在WITH参数中将其指定为Point空间类型. 这样, 如果将其作为source, 那么GeoMesa SQL Connector从GeoMesa取出数据时会将point2字段的数据类型从Point转化为WKT格式的STRING; 如果将其作为sink, 那么GeoMesa SQL Connector将数据写入GeoMesa时会将WKT格式的STRING转化为Point.
CREATE TABLE GeoMesa_TDrive (
`pid` STRING,
`time` TIMESTAMP(0),
`point2` STRING,
PRIMARY KEY (pid) NOT ENFORCED
) WITH (
'connector' = 'geomesa',
'geomesa.data.store' = 'hbase',
'geomesa.schema.name' = 'geomesa-test',
'geomesa.spatial.fields' = 'point2:Point',
'hbase.catalog' = 'test-sql'
);
DDL中指定空间连接谓词
目前Flink仅支持等值连接, 因此在Glink中使用GeoMesa进行lookup join时, 我们必须为每个连接指定其空间关系. 我们通过在WITH参数中增加geomesa.temporal.join.predict选项来实现. geomesa.temporal.join.predict目前支持以下选项:
R:<distance>:表示维度几何与流几何距离小于distance米即符合空间连接条件;I: 表示流几何与维度几何相交即符合空间连接条件;+C: 表示流几何包含维度几何即符合空间连接条件;-C: 表示维度几何包含流几何即符合空间连接条件.
以下是一个使用GeoMesa SQL Connector进行lookup join的案例, 它读取CSV文件中的点数据, 将其与GeoMesa中的多边形数据进行空间连接, 如果点被包含在某个多边形中, 则将二者进行连接. 注意'geomesa.temporal.join.predict' = 'I'这一行, 它指定了空间连接的类型为包含(对于多边形与点之间的空间关系, 包含和相交是一样的), 这样在DQL的连接条件ON ST_AsText(ST_Point(A.lng, A.lat)) = B.geom中, Glink就会将=理解为两侧的几何类型相交.
CREATE TABLE csv_point (
id STRING,
dtg TIMESTAMP(0),
lng DOUBLE,
lat DOUBLE,
proctime AS PROCTIME())
WITH (
'connector' = 'filesystem',
'path' = '/path/to/csv',
'format' = 'csv'
);
CREATE TABLE GeoMesa_Area (
id STRING,
dtg TIMESTAMP(0),
geom STRING,
PRIMARY KEY (id) NOT ENFORCED)
WITH (
'connector' = 'geomesa',
'geomesa.data.store' = 'hbase',
'geomesa.schema.name' = 'restricted_area',
'geomesa.spatial.fields' = 'geom:Polygon',
'geomesa.temporal.join.predict' = 'I',
'hbase.zookeepers' = 'localhost:2181',
'hbase.catalog' = 'restricted_area'
);
SELECT A.id AS point_id, A.dtg, ST_AsText(ST_Point(A.lng, A.lat)) AS point, B.id AS area_id
FROM csv_point AS A
LEFT JOIN GeoMesa_Area FOR SYSTEM_TIME AS OF A.proctime AS B
ON ST_AsText(ST_Point(A.lng, A.lat)) = B.geom;
How to use GeoMesa table
上述案例中已经使用过GeoMesa表, 这里再用一个完整的例子进一步阐述. 在这个例子中, 我们实现的功能是使用Glink将CSV中的点数据导入到GeoMesa中, 它几乎是一个完整的空间数据ETL案例了.
首先创建CSV source table.
CREATE TABLE CSV_TDrive (
`pid` STRING,
`time` TIMESTAMP(0),
`lng` DOUBLE,
`lat` DOUBLE
) WITH (
'connector' = 'filesystem',
'path' = '/path/to/csv',
'format' = 'csv'
);
然后创建GeoMesa sink table.
CREATE TABLE Geomesa_TDrive (
`pid` STRING,
`time` TIMESTAMP(0),
`point2` STRING,
PRIMARY KEY (pid) NOT ENFORCED
) WITH (
'connector' = 'geomesa',
'geomesa.data.store' = 'hbase',
'geomesa.schema.name' = 'geomesa-test',
'geomesa.spatial.fields' = 'point2:Point',
'hbase.catalog' = 'test-sql'
);
最后执行以下DML语句即可执行ETL作业.
INSERT INTO Geomesa_TDrive
SELECT `pid`, `time`, ST_AsText(ST_Point(lng, lat)) FROM CSV_TDrive;
Connector Options
GeoMesa SQL Connector支持通过WITH参数的方式对GeoMesa客户端的相关参数进行配置. 其中有些参数是引擎无关的, 有些参数是不同后端存储引擎的可选配置, 具体如下.
GeoMesa
以下配置是不受GeoMesa后端存储引擎所影响的.
| Option | Required | Description |
|---|---|---|
| connector | required | 连接器类型, 对于GeoMesa SQL Connector而言固定为geomesa |
| geomesa.data.store | required | GeoMesa Data Store类型, 目前支持hbase |
| geomesa.schema.name | required | GeoMesa Schema名称 |
| geomesa.spatial.fields | optional | 空间类型字段, 当包含空间字段时必须指定, 否则空间类型将无法正确解析, 格式: |
| geomesa.temporal.join.predict | optional | 指定lookup join的空间关系谓词, 符合关系的记录将被join: R: I表示流表中空间对象与维表中空间对象相交; +C表示流表中空间对象包含维表中空间对象; -C表示维表中空间对象包含流表中空间对象. |
HBase Data Store
以下参数是GeoMesa以HBase作为后端存储引擎时可选的配置参数. GeoMesa SQL Connector支持GeoMesa HBase Data Store的所有配置参数, 关于各个参数的具体含义, 参见Geomesa文档.
| Option | Required |
|---|---|
| hbase.catalog | optional |
| hbase.zookeepers | optional |
| hbase.coprocessor.url | optional |
| hbase.config.paths | optional |
| hbase.config.xml | optional |
| hbase.connections.reuse | optional |
| hbase.remote.filtering | optional |
| hbase.security.enabled | optional |
| hbase.coprocessor.threads | optional |
| hbase.ranges.max-per-extended-scan | optional |
| hbase.ranges.max-per-coprocessor-scan | optional |
| hbase.coprocessor.arrow.enable | optional |
| hbase.coprocessor.bin.enable | optional |
| hbase.coprocessor.density.enable | optional |
| hbase.coprocessor.stats.enable | optional |
| hbase.coprocessor.yield.partial.results | optional |
| hbase.coprocessor.scan.parallel | optional |
注意: 当geomesa.data.store为hbase时必须指定hbase.catalog
Data Type Mapping
Flink SQL的数据类型并不与GeoMesa完全兼容. 对于基础数据类型而言GeoMesa SQL Connector进行了最大程度的适配; 对于空间数据类型, 由于Flink目前尚未支持可注册的结构化类型, 因此在Flink SQL中所有空间数据类型均由WKT/WKB格式的SRING/BINARY类型表示, 且必须使用geomesa.spatial.fields这一WITH参数指定具体类型, Geomesa SQL Connector在写入时会使用org.locationtech.jts.io.WKTReader/org.locationtech.jts.io.WKBReader进行转换. 详细的数据类型对应关系如下. Flink SQL数据类型参见Flink文档. GeoMesa数据类型参见GeoMesa文档.
Basic Data Types
| Flink SQL Type | GeoMesa Type | Java Type | Indexable |
|---|---|---|---|
| CHAR / VARCHAR / STRING | String | java.lang.String | Yes |
| BOOLEAN | Boolean | java.lang.Boolean | Yes |
| BINARY / VARBINARY / BYTES | Bytes | byte[] | No |
| TINYINT / SMALLINT / INT | Integer | java.lang.Integer | Yes |
| BIGINT | Long | java.lang.Long | Yes |
| FLOAT | Float | java.lang.Float | Yes |
| DOUBLE | Double | java.lang.Double | Yes |
| DATE / TIME | Date | java.util.Date | Yes |
| TIMESTAMP | Timestamp | java.sql.Timestamp | Yes |
| DECIMAL | Not supported | ||
| ARRAY | Not supported | ||
| MAP / MULTISET | Not supported | ||
| Row | Not supported |
Spatial Data Types
所有空间数据类型在Glink SQL中均由WKT/WKB格式的STRING/ARRAY类型表示.
| GeoMesa Type | Java Type | Indexable |
|---|---|---|
| Point | org.locationtech.jts.geom.Point | Yes |
| LineString | org.locationtech.jts.geom.LineString | Yes |
| Polygon | org.locationtech.jts.geom.Polygon | Yes |
| MultiPoint | org.locationtech.jts.geom.MultiPoint | Yes |
| MultiLineString | org.locationtech.jts.geom.MultiLineString | Yes |
| MultiPolygon | org.locationtech.jts.geom.MultiPolygon | Yes |
| GeometryCollection | org.locationtech.jts.geom.GeometryCollection | Yes |
| Geometry | org.locationtech.jts.geom.Geometry | Yes |