Elasticsearch SQL 连接器
Elasticsearch SQL 连接器
FLINK-1.14
Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。
连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。
如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。
如何创建 Elasticsearch 表
以下示例展示了如何创建 Elasticsearch sink 表:
不带认证
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch',
'version' = '7'
'hosts' = 'http://localhost:9200',
'index' = 'users'
);
带认证
CREATE TABLE es_sink_table (
testInt INT,
testVarchar VARCHAR
) WITH (
'connector' = 'elasticsearch',
'version' = '7',
'hosts' = 'http://localhost:29200',
'index' = 'users',
'format' = 'json',
'username' = 'elastic',
'password' = 'xxx'
);
参数配置
配置名称 | 是否必填 | 配置生效类型 | 参数值字段类型 | 参数默认值 | 参数说明(用于用户手册) | |
---|---|---|---|---|---|---|
connector | 必填 | 目标表 | String | - | 指定要使用的连接器,有效值为: <span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">elasticsearch-6</span> :连接到 Elasticsearch 6.x 的集群。 <span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">elasticsearch-7</span> :连接到 Elasticsearch 7.x 及更高版本的集群。 |
|
hosts | 必填 | 目标表 | String | - | 要连接到的一台或多台 Elasticsearch 主机,例如 'http://host_name:9092;http://host_name:9093'。 | |
password | 可选 | 目标表 | String | - | 用于连接 Elasticsearch 实例的密码。 | |
username | 可选 | 目标表 | String | - | 用于连接 Elasticsearch 实例的用户名。 | |
index | 必填 | 目标表 | String | - | Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 'myIndex')或一个动态索引(例如 'index-{log_ts\ | yyyy-MM-dd}')。 |
document-type | 对es5/es6必填,es7不必填 | 目标表 | String | - | Elasticsearch 文档类型。在 elasticsearch-7 中不再需要。 | |
document-id.key-delimiter | 可选 | 目标表 | String | _ | 复合键的分隔符(默认为"_") | |
failure-handler | 可选 | 目标表 | String | fail | 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为: <span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">fail</span> :如果请求失败并因此导致作业失败,则抛出异常。 <span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">ignore</span> :忽略失败并放弃请求。 <span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">retry-rejected</span> :重新添加由于队列容量饱和而失败的请求。 自定义类名称<span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">class name</span> :使用 ActionRequestFailureHandler 的子类进行失败处理。 |
|
sink.flush-on-checkpoint | 可选 | 目标表 | Boolean | true | 在进行 checkpoint 时是否保证刷出缓冲区中的数据。 | |
sink.bulk-flush.max-size | 可选 | 目标表 | MemorySize | 2mb | 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为'0'来禁用它。 | |
sink.bulk-flush.max-actions | 可选 | 目标表 | Integer | 1000 | 每个批量请求的最大缓冲操作数。 可以设置为'0'来禁用它。 | |
sink.bulk-flush.interval | 可选 | 目标表 | Duration | 1s | flush 缓冲操作的间隔。 可以设置为'0'来禁用它。注意,'sink.bulk-flush.max-size'和'sink.bulk-flush.max-actions'都设置为'0'的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。 | |
sink.bulk-flush.backoff.strategy | 可选 | 目标表 | String | DISABLED | 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为: <span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">DISABLED</span> :不执行重试,即第一次请求错误后失败。 <span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">CONSTANT</span> :等待重试之间的回退延迟。 <span md-inline="code" spellcheck="false" class="md-pair-s" style="box-sizing: border-box;">EXPONENTIAL</span> :先等待回退延迟,然后在重试之间指数递增。 |
|
sink.bulk-flush.backoff.max-retries | 可选 | 目标表 | Integer | - | 最大回退重试次数。 | |
sink.bulk-flush.backoff.delay | 可选 | 目标表 | Duration | - | 每次回退重试之间的延迟。对于 CONSTANT 退避策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 退避策略,该值是初始的延迟。 | |
connection.max-retry-timeout | 可选 | 目标表 | Duration | - | 每次重试之间的延迟。 | |
connection.path-prefix | 可选 | 目标表 | String | - | 添加到每个 REST 通信中的前缀字符串,例如,'/v1'。 | |
format | 可选 | 目标表 | String | json | Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 'json' 格式。 | |
sink.parallelism | 可选 | 目标表 | String | - | sink算子并行度。 | |
version | 可选 | 目标表 | String | 6 | sloth语法检查专用配置 |
特性
Key 处理
Elasticsearch sink 可以根据是否定义了主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。
在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。
Elasticsearch 连接器通过使用 document-id.key-delimiter
指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。
某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES
,ROW
,ARRAY
,MAP
等。
如果未指定主键,Elasticsearch 将自动生成文档 id。
有关 PRIMARY KEY 语法的更多详细信息,请参见 [CREATE TABLE DDL]($flink-docs-connectors-{{< ref "docs/dev/table/sql/create" >}}#create-table)。
动态索引
Elasticsearch sink 同时支持静态索引和动态索引。
如果你想使用静态索引,则 index
选项值应为纯字符串,例如 'myusers'
,所有记录都将被写入到 "myusers" 索引中。
如果你想使用动态索引,你可以使用 {field_name}
来引用记录中的字段值来动态生成目标索引。
你也可以使用 '{field_name|date_format_string}'
将 TIMESTAMP/DATE/TIME
类型的字段值转换为 date_format_string
指定的格式。
date_format_string
与 Java 的 DateTimeFormatter 兼容。
例如,如果选项值设置为 'myusers-{log_ts|yyyy-MM-dd}'
,则 log_ts
字段值为 2020-03-27 12:25:55
的记录将被写入到 "myusers-2020-03-27" 索引中。
数据类型映射
Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。
Flink 为 Elasticsearch 连接器使用内置的 'json'
格式。更多类型映射的详细信息,请参阅JSON Format
Flink-1.12
Elasticsearch 连接器允许将数据写入到 Elasticsearch 引擎的索引中。本文档描述运行 SQL 查询时如何设置 Elasticsearch 连接器。
连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息。
如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息。
如何创建 Elasticsearch 表
以下示例展示了如何创建 Elasticsearch sink 表:
不带认证
CREATE TABLE myUserTable (
user_id STRING,
user_name STRING
uv BIGINT,
pv BIGINT,
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch',
'version' = '7'
'hosts' = 'http://localhost:9200',
'index' = 'users'
);
带认证
CREATE TABLE es_sink_table (
testInt INT,
testVarchar VARCHAR
) WITH (
'connector' = 'elasticsearch',
'version' = '7',
'hosts' = 'http://localhost:29200',
'index' = 'users',
'format' = 'json',
'username' = 'elastic',
'password' = 'xxx'
);
维表
支持将es作为维表使用
create table t1 (testInt int, c as testInt%2, proc as proctime()) with ('connector'='datagen');
CREATE TABLE dim (
testInt INT,
testVarchar varchar
) WITH (
'connector' = 'elasticsearch',
'version' = '7',
'hosts' = 'http://ip:29200',
'index' = 'users',
'document-type' = '_doc',
'format' = 'json',
'username' = 'elastic',
'password' = '加密内容1',
'lookup.cache.max-rows' = '100',
'lookup.cache.ttl' = '1200s',
'lookup.max-retries' = '3'
);
create table t2
(testInt int,
testVarchar varchar)
with ('connector'='print');
INSERT INTO t2
SELECT
t1.c as testInt,
d.testVarchar
FROM t1 left join dim for system_time as of t1.proc as d on t1.c = d.testInt;
连接器参数 ---
| 参数| 是否必选 | 默认值 | 数据类型 | 描述
| --- | ---- | --- | ---- | --- || connector | 必选 | (none) | String | 指定要使用的连接器,有效值为:elasticsearch-6
:连接到 Elasticsearch 6.x 的集群。elasticsearch-7
:连接到 Elasticsearch 7.x 及更高版本的集群。
'http://host_name:9092;http://host_name:9093'
。 | index | 必选 | (none) | String | Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 'myIndex'
)或一个动态索引(例如 'index-{logts|yyyy-MM-dd}'
)。 更多详细信息,请参见下面的动态索引部分。 | document-type | 6.x 版本中必选 | (none) | String | Elasticsearch 文档类型。在 elasticsearch-7
中不再需要。 | document-id.key-delimiter | 可选 | \ | String | 复合键的分隔符(默认为"_"),例如,指定为"$"将导致文档 ID 为"KEY1$KEY2$KEY3"。 | username | 可选 | (none) | String | 用于连接 Elasticsearch 实例的用户名。请注意,Elasticsearch 没有预绑定安全特性,但你可以通过如下指南启用它来保护 Elasticsearch 集群。 | password | 可选 | (none) | String | 用于连接 Elasticsearch 实例的密码。如果配置了username
,则此选项也必须配置为非空字符串。 | failure-handler | 可选 | fail | String | 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为:fail
:如果请求失败并因此导致作业失败,则抛出异常。ignore
:忽略失败并放弃请求。retry-rejected
:重新添加由于队列容量饱和而失败的请求。- 自定义类名称:使用 ActionRequestFailureHandler 的子类进行失败处理。
'0'
来禁用它。 | sink.bulk-flush.max-size | 可选 | 2mb | MemorySize | 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为'0'
来禁用它。 | sink.bulk-flush.interval | 可选 | 1s | Duration | flush 缓冲操作的间隔。 可以设置为'0'
来禁用它。注意,'sink.bulk-flush.max-size'
和'sink.bulk-flush.max-actions'
都设置为'0'
的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。 | sink.bulk-flush.backoff.strategy | 可选 | DISABLED | String | 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为:DISABLED
:不执行重试,即第一次请求错误后失败。CONSTANT
:等待重试之间的回退延迟。EXPONENTIAL
:先等待回退延迟,然后在重试之间指数递增。
CONSTANT
回退策略,该值是每次重试之间的延迟。对于 EXPONENTIAL
回退策略,该值是初始的延迟。 | connection.max-retry-timeout | 可选 | (none) | Duration | 最大重试超时时间。 | connection.path-prefix | 可选 | (none) | String | 添加到每个 REST 通信中的前缀字符串,例如,'/v1'
。 | format | 可选 | json | String | Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 'json'
格式。更多详细信息,请参阅 }}">JSON Format 页面。 特性
Key 处理
Elasticsearch sink 可以根据是否定义了主键来确定是在 upsert 模式还是 append 模式下工作。 如果定义了主键,Elasticsearch sink 将以 upsert 模式工作,该模式可以消费包含 UPDATE/DELETE 消息的查询。 如果未定义主键,Elasticsearch sink 将以 append 模式工作,该模式只能消费包含 INSERT 消息的查询。
在 Elasticsearch 连接器中,主键用于计算 Elasticsearch 的文档 id,文档 id 为最多 512 字节且不包含空格的字符串。
Elasticsearch 连接器通过使用 document-id.key-delimiter
指定的键分隔符按照 DDL 中定义的顺序连接所有主键字段,为每一行记录生成一个文档 ID 字符串。
某些类型不允许作为主键字段,因为它们没有对应的字符串表示形式,例如,BYTES
,ROW
,ARRAY
,MAP
等。
如果未指定主键,Elasticsearch 将自动生成文档 id。
有关 PRIMARY KEY 语法的更多详细信息,请参见 [CREATE TABLE DDL]($flink-docs-connectors-{{< ref "docs/dev/table/sql/create" >}}#create-table)。
动态索引
Elasticsearch sink 同时支持静态索引和动态索引。
如果你想使用静态索引,则 index
选项值应为纯字符串,例如 'myusers'
,所有记录都将被写入到 "myusers" 索引中。
如果你想使用动态索引,你可以使用 {field_name}
来引用记录中的字段值来动态生成目标索引。
你也可以使用 '{field_name|date_format_string}'
将 TIMESTAMP/DATE/TIME
类型的字段值转换为 date_format_string
指定的格式。
date_format_string
与 Java 的 DateTimeFormatter 兼容。
例如,如果选项值设置为 'myusers-{log_ts|yyyy-MM-dd}'
,则 log_ts
字段值为 2020-03-27 12:25:55
的记录将被写入到 "myusers-2020-03-27" 索引中。
数据类型映射
Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。
Flink 为 Elasticsearch 连接器使用内置的 'json'
格式。更多类型映射的详细信息,请参阅JSON Format
Flink-1.10
ES 5 Sink
简述
Easystream 支持输出到 ES 5。
示例
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '5',
'connector.hosts' = 'http://host_name:9092;http://host_name:9093',
'connector.cluster.name' = 'escluster',
'connector.index' = 'MyUsers',
'connector.document-type' = 'user',
'update-mode' = 'append',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.failure-handler' = '...',
'connector.flush-on-checkpoint' = 'true',
'connector.bulk-flush.max-actions' = '42',
'connector.bulk-flush.max-size' = '42 mb',
'connector.bulk-flush.interval' = '60000',
'connector.bulk-flush.back-off.type' = '...',
'connector.bulk-flush.back-off.max-retries' = '3',
'connector.bulk-flush.back-off.delay' = '30000',
'connector.connection-max-retry-timeout' = '3',
'connector.connection-path-prefix' = '/',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:elasticsearch |
connector.version | elasticsearch 版本 | 必填:5 |
connector.cluster.name | elasticsearch 集群名称 | 必填 |
connector.hosts | elaticseach transport 链接地址(可以多个,其中 protocols 不生效) | 必填:http://host_name:9092; http://host_name:9093 |
connector.index | elasticsearch 索引 | 必填 |
connector.document-type | elasticsearch 文档类型 | 必填 |
update-mode | 更新模式 (append/upsert) | 必填,默认 append |
connector.key-delimiter | key 分隔符 | 选填,默认‘_’ |
connector.key-null-literal | key 为空的时候的文字 | 选填,默认 null |
connector.failure-handler | failure 处理方式 | 选填,默认 fail |
connector.flush-on-checkpoint | checkpoint 的时候是否刷数据 | 选填,默认 true |
connector.bulk-flush.max-actions | 批量写入时的最大写入条数 | 选填 |
connector.bulk-flush.max-size | 批量写入时的最大数据量 | 选填 |
connector.bulk-flush.interval | 批量写入的时间间隔,配置后则会按照该时间间隔严格执行,无视上面的两个批量写入配置 | 选填 |
connector.bulk-flush.backoff.type | 重试策略,CONSTANT 常数型(表示多次重试之间的时间间隔为固定常数) | 选填,默认 disabled |
connector.bulk-flush.backoff.max-retries | 失败重试的次数 | 选填 |
connector.bulk-flush.backoff.delay | 进行重试的时间间隔 | 选填 |
connector.connection-max-retry-timeout | 链接超时时间(ms) | 选填 |
connector.connection-path-prefix | REST 请求的前缀(可选),需要和 es 集群上的配置对映 | 选填 |
connector.format.type | 数据格式,参数 format json | 必填:json |
ES 6/7 Sink
简述
Easystream 支持输出到 ES 6 或 7。
示例
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '6',
'connector.hosts.0.hostname' = 'host_name',
'connector.hosts.0.protocol' = 'http',
'connector.index' = 'MyUsers',
'connector.document-type' = 'user',
'update-mode' = 'append',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.failure-handler' = '...',
'connector.flush-on-checkpoint' = 'true',
'connector.bulk-flush.max-actions' = '42',
'connector.bulk-flush.max-size' = '42 mb',
'connector.bulk-flush.interval' = '60000',
'connector.bulk-flush.back-off.type' = '...',
'connector.bulk-flush.back-off.max-retries' = '3',
'connector.bulk-flush.back-off.delay' = '30000',
'connector.connection-max-retry-timeout' = '3',
'connector.connection-path-prefix' = '/',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:elasticsearch |
connector.version | elasticsearch 版本 | 必填:6/7 |
connector.hosts | elaticseach transport 链接地址(可以多个,其中 protocols 不生效) | 必填:http://host_name:9092; http://host_name:9093 |
connector.index | elasticsearch 索引 | 必填 |
connector.document-type | elasticsearch 文档类型 | 必填 |
update-mode | 更新模式 (append/upsert) | 必填,默认 append |
connector.key-delimiter | key 分隔符 | 选填,默认‘_’ |
connector.key-null-literal | key 为空的时候的文字 | 选填,默认 null |
connector.failure-handler | failure 处理方式 | 选填,默认 fail |
connector.flush-on-checkpoint | checkpoint 的时候是否刷数据 | 选填,默认 true |
connector.bulk-flush.max-actions | 批量写入时的最大写入条数 | 选填 |
connector.bulk-flush.max-size | 批量写入时的最大数据量 | 选填 |
connector.bulk-flush.interval | 批量写入的时间间隔,配置后则会按照该时间间隔严格执行,无视上面的两个批量写入配置 | 选填 |
connector.bulk-flush.backoff.type | 重试策略,CONSTANT 常数型(表示多次重试之间的时间间隔为固定常数) | 选填,默认 disabled |
connector.bulk-flush.backoff.max-retries | 失败重试的次数 | 选填 |
connector.bulk-flush.backoff.delay | 进行重试的时间间隔 | 选填 |
connector.connection-max-retry-timeout | 链接超时时间(ms) | 选填 |
connector.connection-path-prefix | REST 请求的前缀(可选),需要和 es 集群上的配置对映 | 选填 |
connector.format.type | 数据格式,参数 format json | 必填:json |
以上内容对您是否有帮助?