Elasticsearch SQL 连接器
Elasticsearch SQL 连接器
FLINK-1.14
Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。
连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。
如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。
如何创建 Elasticsearch 表
以下示例展示了如何创建 Elasticsearch sink 表:
不带认证
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch',
'version' = '7'
'hosts' = 'http://localhost:9200',
'index' = 'users'
);
带认证
CREATE TABLE es_sink_table (
testInt INT,
testVarchar VARCHAR
) WITH (
'connector' = 'elasticsearch',
'version' = '7',
'hosts' = 'http://localhost:29200',
'index' = 'users',
'format' = 'json',
'username' = 'elastic',
'password' = 'xxx'
);
连接器参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector |
必选 | (none) | String | 指定要使用的连接器,有效值为:
|
version |
必选 | (none) | String | 指定要使用的连接器版本,有效值为:5,6,7 |
hosts |
必选 | (none) | String | 要连接到的一台或多台 Elasticsearch 主机,例如 'http://host_name:9092;http://host_name:9093' 。 |
index |
必选 | (none) | String | Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 'myIndex' )或一个动态索引(例如 'index-{logts|yyyy-MM-dd}' )。
更多详细信息,请参见下面的动态索引部分。 |
document-type |
6.x 版本中必选 | (none) | String | Elasticsearch 文档类型。在 elasticsearch-7 中不再需要。 |
document-id.key-delimiter |
可选 | String | 复合键的分隔符(默认为"_"),例如,指定为"$"将导致文档 ID 为"KEY1$KEY2$KEY3"。 | |
username |
可选 | (none) | String | 用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下指南启用它来保护 Elasticsearch 集群。 |
password |
可选 | (none) | String | 用于连接 Elasticsearch 实例的密码。如果配置了username ,则此选项也必须配置为非空字符串。 |
failure-handler |
可选 | fail | String | 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:
|
sink.flush-on-checkpoint |
可选 | true | Boolean | 是否在 checkpoint 时执行 flush。禁用后,在 checkpoint 时 sink 将不会等待所有的 pending 请求被 Elasticsearch 确认。因此,sink 不会为请求的 at-least-once 交付提供任何有力保证。 |
sink.bulk-flush.max-actions |
可选 | 1000 | Integer | 每个批量请求的最大缓冲操作数。
可以设置为'0' 来禁用它。
|
sink.bulk-flush.max-size |
可选 | 2mb | MemorySize | 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。
可以设置为'0' 来禁用它。
|
sink.bulk-flush.interval |
可选 | 1s | Duration | flush 缓冲操作的间隔。
可以设置为'0' 来禁用它。注意,'sink.bulk-flush.max-size' 和'sink.bulk-flush.max-actions' 都设置为'0' 的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。
|
sink.bulk-flush.backoff.strategy |
可选 | DISABLED | String | 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:
|
sink.bulk-flush.backoff.max-retries |
可选 | 8 | Integer | 最大回退重试次数。 |
sink.bulk-flush.backoff.delay |
可选 | 50ms | Duration | 每次回退尝试之间的延迟。对于 CONSTANT 回退策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 回退策略,该值是初始的延迟。 |
connection.max-retry-timeout |
可选 | (none) | Duration | 最大重试超时时间。 |
connection.path-prefix |
可选 | (none) | String | 添加到每个 REST 通信中的前缀字符串,例如,'/v1' 。 |
format |
可选 | json | String | Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。
默认使用内置的 'json' 格式。更多详细信息,请参阅 }}">JSON Format 页面。
|
特性
Key 处理
Elasticsearch sink 可以根据是否定义了主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。
在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。
Elasticsearch 连接器通过使用 document-id.key-delimiter
指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。
某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES
,ROW
,ARRAY
,MAP
等。
如果未指定主键,Elasticsearch 将自动生成文档 id。
有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL。
动态索引
Elasticsearch sink 同时支持静态索引和动态索引。
如果你想使用静态索引,则 index
选项值应为纯字符串,例如 'myusers'
,所有记录都将被写入到 "myusers" 索引中。
如果你想使用动态索引,你可以使用 {field_name}
来引用记录中的字段值来动态生成目标索引。
你也可以使用 '{field_name|date_format_string}'
将 TIMESTAMP/DATE/TIME
类型的字段值转换为 date_format_string
指定的格式。
date_format_string
与 Java 的 DateTimeFormatter 兼容。
例如,如果选项值设置为 'myusers-{log_ts|yyyy-MM-dd}'
,则 log_ts
字段值为 2020-03-27 12:25:55
的记录将被写入到 "myusers-2020-03-27" 索引中。
数据类型映射
Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。
Flink 为 Elasticsearch 连接器使用内置的 'json'
格式。更多类型映射的详细信息,请参阅JSON Format
Flink-1.12
Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。
连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。
如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。
如何创建 Elasticsearch 表
以下示例展示了如何创建 Elasticsearch sink 表:
不带认证
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch',
'version' = '7'
'hosts' = 'http://localhost:9200',
'index' = 'users'
);
带认证
CREATE TABLE es_sink_table (
testInt INT,
testVarchar VARCHAR
) WITH (
'connector' = 'elasticsearch',
'version' = '7',
'hosts' = 'http://localhost:29200',
'index' = 'users',
'format' = 'json',
'username' = 'elastic',
'password' = 'xxx'
);
维表
支持将es作为维表使用
create table t1 (testInt int, c as testInt%2, proc as proctime()) with ('connector'='datagen');
CREATE TABLE dim (
testInt INT,
testVarchar varchar
) WITH (
'connector' = 'elasticsearch',
'version' = '7',
'hosts' = 'http://ip:29200',
'index' = 'users',
'document-type' = '_doc',
'format' = 'json',
'username' = 'elastic',
'password' = '加密内容1',
'lookup.cache.max-rows' = '100',
'lookup.cache.ttl' = '1200s',
'lookup.max-retries' = '3'
);
create table t2
(testInt int,
testVarchar varchar)
with ('connector'='print');
INSERT INTO t2
SELECT
t1.c as testInt,
d.testVarchar
FROM t1 left join dim for system_time as of t1.proc as d on t1.c = d.testInt;
连接器参数
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector |
必选 | (none) | String | 指定要使用的连接器,有效值为:
|
version |
必选 | (none) | String | 指定要使用的连接器版本,有效值为:5,6,7 |
hosts |
必选 | (none) | String | 要连接到的一台或多台 Elasticsearch 主机,例如 'http://host_name:9092;http://host_name:9093' 。 |
index |
必选 | (none) | String | Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 'myIndex' )或一个动态索引(例如 'index-{logts|yyyy-MM-dd}' )。
更多详细信息,请参见下面的动态索引部分。 |
document-type |
6.x 版本中必选 | (none) | String | Elasticsearch 文档类型。在 elasticsearch-7 中不再需要。 |
document-id.key-delimiter |
可选 | String | 复合键的分隔符(默认为"_"),例如,指定为"$"将导致文档 ID 为"KEY1$KEY2$KEY3"。 | |
username |
可选 | (none) | String | 用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下指南启用它来保护 Elasticsearch 集群。 |
password |
可选 | (none) | String | 用于连接 Elasticsearch 实例的密码。如果配置了username ,则此选项也必须配置为非空字符串。 |
failure-handler |
可选 | fail | String | 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:
|
sink.flush-on-checkpoint |
可选 | true | Boolean | 是否在 checkpoint 时执行 flush。禁用后,在 checkpoint 时 sink 将不会等待所有的 pending 请求被 Elasticsearch 确认。因此,sink 不会为请求的 at-least-once 交付提供任何有力保证。 |
sink.bulk-flush.max-actions |
可选 | 1000 | Integer | 每个批量请求的最大缓冲操作数。
可以设置为'0' 来禁用它。
|
sink.bulk-flush.max-size |
可选 | 2mb | MemorySize | 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。
可以设置为'0' 来禁用它。
|
sink.bulk-flush.interval |
可选 | 1s | Duration | flush 缓冲操作的间隔。
可以设置为'0' 来禁用它。注意,'sink.bulk-flush.max-size' 和'sink.bulk-flush.max-actions' 都设置为'0' 的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。
|
sink.bulk-flush.backoff.strategy |
可选 | DISABLED | String | 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:
|
sink.bulk-flush.backoff.max-retries |
可选 | 8 | Integer | 最大回退重试次数。 |
sink.bulk-flush.backoff.delay |
可选 | 50ms | Duration | 每次回退尝试之间的延迟。对于 CONSTANT 回退策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 回退策略,该值是初始的延迟。 |
connection.max-retry-timeout |
可选 | (none) | Duration | 最大重试超时时间。 |
connection.path-prefix |
可选 | (none) | String | 添加到每个 REST 通信中的前缀字符串,例如,'/v1' 。 |
format |
可选 | json | String | Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。
默认使用内置的 'json' 格式。更多详细信息,请参阅 }}">JSON Format 页面。
|
特性
Key 处理
Elasticsearch sink 可以根据是否定义了主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。
在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。
Elasticsearch 连接器通过使用 document-id.key-delimiter
指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。
某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES
,ROW
,ARRAY
,MAP
等。
如果未指定主键,Elasticsearch 将自动生成文档 id。
有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL。
动态索引
Elasticsearch sink 同时支持静态索引和动态索引。
如果你想使用静态索引,则 index
选项值应为纯字符串,例如 'myusers'
,所有记录都将被写入到 "myusers" 索引中。
如果你想使用动态索引,你可以使用 {field_name}
来引用记录中的字段值来动态生成目标索引。
你也可以使用 '{field_name|date_format_string}'
将 TIMESTAMP/DATE/TIME
类型的字段值转换为 date_format_string
指定的格式。
date_format_string
与 Java 的 DateTimeFormatter 兼容。
例如,如果选项值设置为 'myusers-{log_ts|yyyy-MM-dd}'
,则 log_ts
字段值为 2020-03-27 12:25:55
的记录将被写入到 "myusers-2020-03-27" 索引中。
数据类型映射
Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。
Flink 为 Elasticsearch 连接器使用内置的 'json'
格式。更多类型映射的详细信息,请参阅JSON Format
Flink-1.10
ES 5 Sink
简述
Easystream 支持输出到 ES 5。
示例
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '5',
'connector.hosts' = 'http://host_name:9092;http://host_name:9093',
'connector.cluster.name' = 'escluster',
'connector.index' = 'MyUsers',
'connector.document-type' = 'user',
'update-mode' = 'append',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.failure-handler' = '...',
'connector.flush-on-checkpoint' = 'true',
'connector.bulk-flush.max-actions' = '42',
'connector.bulk-flush.max-size' = '42 mb',
'connector.bulk-flush.interval' = '60000',
'connector.bulk-flush.back-off.type' = '...',
'connector.bulk-flush.back-off.max-retries' = '3',
'connector.bulk-flush.back-off.delay' = '30000',
'connector.connection-max-retry-timeout' = '3',
'connector.connection-path-prefix' = '/',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:elasticsearch |
connector.version | elasticsearch 版本 | 必填:5 |
connector.cluster.name | elasticsearch 集群名称 | 必填 |
connector.hosts | elaticseach transport 链接地址(可以多个,其中 protocols 不生效) | 必填:http://host_name:9092; http://host_name:9093 |
connector.index | elasticsearch 索引 | 必填 |
connector.document-type | elasticsearch 文档类型 | 必填 |
update-mode | 更新模式 (append/upsert) | 必填,默认 append |
connector.key-delimiter | key 分隔符 | 选填,默认‘_’ |
connector.key-null-literal | key 为空的时候的文字 | 选填,默认 null |
connector.failure-handler | failure 处理方式 | 选填,默认 fail |
connector.flush-on-checkpoint | checkpoint 的时候是否刷数据 | 选填,默认 true |
connector.bulk-flush.max-actions | 批量写入时的最大写入条数 | 选填 |
connector.bulk-flush.max-size | 批量写入时的最大数据量 | 选填 |
connector.bulk-flush.interval | 批量写入的时间间隔,配置后则会按照该时间间隔严格执行,无视上面的两个批量写入配置 | 选填 |
connector.bulk-flush.backoff.type | 重试策略,CONSTANT 常数型(表示多次重试之间的时间间隔为固定常数) | 选填,默认 disabled |
connector.bulk-flush.backoff.max-retries | 失败重试的次数 | 选填 |
connector.bulk-flush.backoff.delay | 进行重试的时间间隔 | 选填 |
connector.connection-max-retry-timeout | 链接超时时间(ms) | 选填 |
connector.connection-path-prefix | REST 请求的前缀(可选),需要和 es 集群上的配置对映 | 选填 |
connector.format.type | 数据格式,参数 format json | 必填:json |
ES 6/7 Sink
简述
Easystream 支持输出到 ES 6 或 7。
示例
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts.0.hostname' = 'host_name',
'connector.hosts.0.protocol' = 'http',
'connector.index' = 'MyUsers',
'connector.document-type' = 'user',
'update-mode' = 'append',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.failure-handler' = '...',
'connector.flush-on-checkpoint' = 'true',
'connector.bulk-flush.max-actions' = '42',
'connector.bulk-flush.max-size' = '42 mb',
'connector.bulk-flush.interval' = '60000',
'connector.bulk-flush.back-off.type' = '...',
'connector.bulk-flush.back-off.max-retries' = '3',
'connector.bulk-flush.back-off.delay' = '30000',
'connector.connection-max-retry-timeout' = '3',
'connector.connection-path-prefix' = '/',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:elasticsearch |
connector.version | elasticsearch 版本 | 必填:6/7 |
connector.hosts | elaticseach transport 链接地址(可以多个,其中 protocols 不生效) | 必填:http://host_name:9092; http://host_name:9093 |
connector.index | elasticsearch 索引 | 必填 |
connector.document-type | elasticsearch 文档类型 | 必填 |
update-mode | 更新模式 (append/upsert) | 必填,默认 append |
connector.key-delimiter | key 分隔符 | 选填,默认‘_’ |
connector.key-null-literal | key 为空的时候的文字 | 选填,默认 null |
connector.failure-handler | failure 处理方式 | 选填,默认 fail |
connector.flush-on-checkpoint | checkpoint 的时候是否刷数据 | 选填,默认 true |
connector.bulk-flush.max-actions | 批量写入时的最大写入条数 | 选填 |
connector.bulk-flush.max-size | 批量写入时的最大数据量 | 选填 |
connector.bulk-flush.interval | 批量写入的时间间隔,配置后则会按照该时间间隔严格执行,无视上面的两个批量写入配置 | 选填 |
connector.bulk-flush.backoff.type | 重试策略,CONSTANT 常数型(表示多次重试之间的时间间隔为固定常数) | 选填,默认 disabled |
connector.bulk-flush.backoff.max-retries | 失败重试的次数 | 选填 |
connector.bulk-flush.backoff.delay | 进行重试的时间间隔 | 选填 |
connector.connection-max-retry-timeout | 链接超时时间(ms) | 选填 |
connector.connection-path-prefix | REST 请求的前缀(可选),需要和 es 集群上的配置对映 | 选填 |
connector.format.type | 数据格式,参数 format json | 必填:json |
以上内容对您是否有帮助?