Glink
GitHub Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Toggle Dark/Light/Auto mode Back to homepage

GeoMesa SQL Connector

Glink对Flink的SQL Connector进行了扩展, 实现了GeoMesa SQL Connector. 这里介绍了如何设置和使用GeoMesa SQL Connector. 我们在GeoMesa 3.1+版本上进行了测试.

Dependencies

为了使用GeoMesa SQL Connector, 需要添加如下依赖.

Glink dependency
glink-connector-geomesa-x.x.x.jar
glink-sql-x.x.x.jar

最简单的方法是将$GLINK_HOME/lib目录中的上述两个Jar包复制到$FLINK_HOME/lib目录下.

DDL

使用GeoMesa SQL Connector时, 在DDL中有几点需要特别注意, 这里进行详细说明.

DDL中定义空间数据类型

由于Flink当前无法支持可注册的自定义类型, 因此我们无法在DDL中直接定义空间数据类型. 在GeoMesa SQL Connector中可以WKT/WKB形式表示空间数据类型, 并在WITH参数中用geomesa.spatial.fields指明, 其格式为: <field name>:<field type>, 多个字段由”,“分隔. 其中<field type>支持Spatial Data Types中的所有GeoMesa Type.

如下DDL定义了GeoMesa中的一个表, 用于存储T-Drive数据, 其中point2字段为WKT格式的STRING类型, 并且在WITH参数中将其指定为Point空间类型. 这样, 如果将其作为source, 那么GeoMesa SQL Connector从GeoMesa取出数据时会将point2字段的数据类型从Point转化为WKT格式的STRING; 如果将其作为sink, 那么GeoMesa SQL Connector将数据写入GeoMesa时会将WKT格式的STRING转化为Point.

CREATE TABLE GeoMesa_TDrive (
    `pid` STRING,
    `time` TIMESTAMP(0),
    `point2` STRING,
    PRIMARY KEY (pid) NOT ENFORCED
) WITH (
    'connector' = 'geomesa',
    'geomesa.data.store' = 'hbase',
    'geomesa.schema.name' = 'geomesa-test',
    'geomesa.spatial.fields' = 'point2:Point',
    'hbase.catalog' = 'test-sql'
);

DDL中指定空间连接谓词

目前Flink仅支持等值连接, 因此在Glink中使用GeoMesa进行lookup join时, 我们必须为每个连接指定其空间关系. 我们通过在WITH参数中增加geomesa.temporal.join.predict选项来实现. geomesa.temporal.join.predict目前支持以下选项:

  • R:<distance>:表示维度几何与流几何距离小于distance米即符合空间连接条件;
  • I: 表示流几何与维度几何相交即符合空间连接条件;
  • +C: 表示流几何包含维度几何即符合空间连接条件;
  • -C: 表示维度几何包含流几何即符合空间连接条件.

以下是一个使用GeoMesa SQL Connector进行lookup join的案例, 它读取CSV文件中的点数据, 将其与GeoMesa中的多边形数据进行空间连接, 如果点被包含在某个多边形中, 则将二者进行连接. 注意'geomesa.temporal.join.predict' = 'I'这一行, 它指定了空间连接的类型为包含(对于多边形与点之间的空间关系, 包含和相交是一样的), 这样在DQL的连接条件ON ST_AsText(ST_Point(A.lng, A.lat)) = B.geom中, Glink就会将=理解为两侧的几何类型相交.

CREATE TABLE csv_point (
    id STRING,
    dtg TIMESTAMP(0),
    lng DOUBLE,
    lat DOUBLE,
    proctime AS PROCTIME())
WITH (
    'connector' = 'filesystem',
    'path' = '/path/to/csv', 
    'format' = 'csv'
);

CREATE TABLE GeoMesa_Area (
    id STRING,
    dtg TIMESTAMP(0),
    geom STRING,
    PRIMARY KEY (id) NOT ENFORCED)
WITH (
    'connector' = 'geomesa',
    'geomesa.data.store' = 'hbase',
    'geomesa.schema.name' = 'restricted_area',
    'geomesa.spatial.fields' = 'geom:Polygon',
    'geomesa.temporal.join.predict' = 'I',
    'hbase.zookeepers' = 'localhost:2181',
    'hbase.catalog' = 'restricted_area'
);

SELECT A.id AS point_id, A.dtg, ST_AsText(ST_Point(A.lng, A.lat)) AS point, B.id AS area_id
    FROM csv_point AS A
    LEFT JOIN GeoMesa_Area FOR SYSTEM_TIME AS OF A.proctime AS B
    ON ST_AsText(ST_Point(A.lng, A.lat)) = B.geom;

How to use GeoMesa table

上述案例中已经使用过GeoMesa表, 这里再用一个完整的例子进一步阐述. 在这个例子中, 我们实现的功能是使用Glink将CSV中的点数据导入到GeoMesa中, 它几乎是一个完整的空间数据ETL案例了.

首先创建CSV source table.

CREATE TABLE CSV_TDrive (
    `pid` STRING,
    `time` TIMESTAMP(0),
    `lng` DOUBLE,
    `lat` DOUBLE
) WITH (
    'connector' = 'filesystem',
    'path' = '/path/to/csv',
    'format' = 'csv'
);

然后创建GeoMesa sink table.

CREATE TABLE Geomesa_TDrive (
    `pid` STRING,
    `time` TIMESTAMP(0),
    `point2` STRING,
    PRIMARY KEY (pid) NOT ENFORCED
) WITH (
    'connector' = 'geomesa',
    'geomesa.data.store' = 'hbase',
    'geomesa.schema.name' = 'geomesa-test',
    'geomesa.spatial.fields' = 'point2:Point',
    'hbase.catalog' = 'test-sql'
);

最后执行以下DML语句即可执行ETL作业.

INSERT INTO Geomesa_TDrive 
    SELECT `pid`, `time`, ST_AsText(ST_Point(lng, lat)) FROM CSV_TDrive;

Connector Options

GeoMesa SQL Connector支持通过WITH参数的方式对GeoMesa客户端的相关参数进行配置. 其中有些参数是引擎无关的, 有些参数是不同后端存储引擎的可选配置, 具体如下.

GeoMesa

以下配置是不受GeoMesa后端存储引擎所影响的.

Option Required Description
connector required 连接器类型, 对于GeoMesa SQL Connector而言固定为geomesa
geomesa.data.store required GeoMesa Data Store类型, 目前支持hbase
geomesa.schema.name required GeoMesa Schema名称
geomesa.spatial.fields optional 空间类型字段, 当包含空间字段时必须指定, 否则空间类型将无法正确解析, 格式: :, 多个字段间由”,“分隔
geomesa.temporal.join.predict optional 指定lookup join的空间关系谓词, 符合关系的记录将被join:
R:表示维表中空间对象与流表中空间对象距离小于distance米;
I表示流表中空间对象与维表中空间对象相交;
+C表示流表中空间对象包含维表中空间对象;
-C表示维表中空间对象包含流表中空间对象.

HBase Data Store

以下参数是GeoMesa以HBase作为后端存储引擎时可选的配置参数. GeoMesa SQL Connector支持GeoMesa HBase Data Store的所有配置参数, 关于各个参数的具体含义, 参见Geomesa文档.

Option Required
hbase.catalog optional
hbase.zookeepers optional
hbase.coprocessor.url optional
hbase.config.paths optional
hbase.config.xml optional
hbase.connections.reuse optional
hbase.remote.filtering optional
hbase.security.enabled optional
hbase.coprocessor.threads optional
hbase.ranges.max-per-extended-scan optional
hbase.ranges.max-per-coprocessor-scan optional
hbase.coprocessor.arrow.enable optional
hbase.coprocessor.bin.enable optional
hbase.coprocessor.density.enable optional
hbase.coprocessor.stats.enable optional
hbase.coprocessor.yield.partial.results optional
hbase.coprocessor.scan.parallel optional
注意: 当geomesa.data.store为hbase时必须指定hbase.catalog

Data Type Mapping

Flink SQL的数据类型并不与GeoMesa完全兼容. 对于基础数据类型而言GeoMesa SQL Connector进行了最大程度的适配; 对于空间数据类型, 由于Flink目前尚未支持可注册的结构化类型, 因此在Flink SQL中所有空间数据类型均由WKT/WKB格式的SRING/BINARY类型表示, 且必须使用geomesa.spatial.fields这一WITH参数指定具体类型, Geomesa SQL Connector在写入时会使用org.locationtech.jts.io.WKTReader/org.locationtech.jts.io.WKBReader进行转换. 详细的数据类型对应关系如下. Flink SQL数据类型参见Flink文档. GeoMesa数据类型参见GeoMesa文档.

Basic Data Types

Flink SQL Type GeoMesa Type Java Type Indexable
CHAR / VARCHAR / STRING String java.lang.String Yes
BOOLEAN Boolean java.lang.Boolean Yes
BINARY / VARBINARY / BYTES Bytes byte[] No
TINYINT / SMALLINT / INT Integer java.lang.Integer Yes
BIGINT Long java.lang.Long Yes
FLOAT Float java.lang.Float Yes
DOUBLE Double java.lang.Double Yes
DATE / TIME Date java.util.Date Yes
TIMESTAMP Timestamp java.sql.Timestamp Yes
DECIMAL Not supported
ARRAY Not supported
MAP / MULTISET Not supported
Row Not supported

Spatial Data Types

所有空间数据类型在Glink SQL中均由WKT/WKB格式的STRING/ARRAY类型表示.

GeoMesa Type Java Type Indexable
Point org.locationtech.jts.geom.Point Yes
LineString org.locationtech.jts.geom.LineString Yes
Polygon org.locationtech.jts.geom.Polygon Yes
MultiPoint org.locationtech.jts.geom.MultiPoint Yes
MultiLineString org.locationtech.jts.geom.MultiLineString Yes
MultiPolygon org.locationtech.jts.geom.MultiPolygon Yes
GeometryCollection org.locationtech.jts.geom.GeometryCollection Yes
Geometry org.locationtech.jts.geom.Geometry Yes