本文将为您介绍 EasyStream 中如何创建 ElasticSearch 类型流表。

操作步骤

  1. 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
  2. 单击 创建表 按键,进行表创建。
  3. 选择 数据源类型ElasticSearch
  4. 完成剩余表单内容,创建 ElasticSearch 流表。

字段类型映射

Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。 Flink 为 Elasticsearch 连接器使用内置的 json 格式,请参阅流表序列化及映射规则

流表配置

配置名称 是否必填 配置生效类型 参数值字段类型 参数默认值 参数说明(用于用户手册)
connector 必填 目标表 String - 指定要使用的连接器,有效值为: elasticsearch-6:连接到 Elasticsearch 6.x 的集群。 elasticsearch-7:连接到 Elasticsearch 7.x 及更高版本的集群。
hosts 必填 目标表 String - 要连接到的一台或多台 Elasticsearch 主机,例如 'http://host_name:9092;http://host_name:9093'。
password 可选 目标表 String - 用于连接 Elasticsearch 实例的密码。
username 可选 目标表 String - 用于连接 Elasticsearch 实例的用户名。
index 必填 目标表 String - Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 'myIndex')或一个动态索引(例如 'index-{log_ts\ yyyy-MM-dd}')。
document-type 对es5/es6必填,es7不必填 目标表 String - Elasticsearch 文档类型。在 elasticsearch-7 中不再需要。
document-id.key-delimiter 可选 目标表 String _ 复合键的分隔符(默认为"_")
failure-handler 可选 目标表 String fail 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为: fail:如果请求失败并因此导致作业失败,则抛出异常。 ignore:忽略失败并放弃请求。 retry-rejected:重新添加由于队列容量饱和而失败的请求。 自定义类名称name:使用 ActionRequestFailureHandler 的子类进行失败处理。
sink.flush-on-checkpoint 可选 目标表 Boolean true 在进行 checkpoint 时是否保证刷出缓冲区中的数据。
sink.bulk-flush.max-size 可选 目标表 MemorySize 2mb 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为'0'来禁用它。
sink.bulk-flush.max-actions 可选 目标表 Integer 1000 每个批量请求的最大缓冲操作数。 可以设置为'0'来禁用它。
sink.bulk-flush.interval 可选 目标表 Duration 1s flush 缓冲操作的间隔。 可以设置为'0'来禁用它。注意,'sink.bulk-flush.max-size'和'sink.bulk-flush.max-actions'都设置为'0'的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。
sink.bulk-flush.backoff.strategy 可选 目标表 String DISABLED 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为: DISABLED:不执行重试,即第一次请求错误后失败。 CONSTANT:等待重试之间的回退延迟。 EXPONENTIAL:先等待回退延迟,然后在重试之间指数递增。
sink.bulk-flush.backoff.max-retries 可选 目标表 Integer - 最大回退重试次数。
sink.bulk-flush.backoff.delay 可选 目标表 Duration - 每次回退重试之间的延迟。对于 CONSTANT 退避策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 退避策略,该值是初始的延迟。
connection.max-retry-timeout 可选 目标表 Duration - 每次重试之间的延迟。
connection.path-prefix 可选 目标表 String - 添加到每个 REST 通信中的前缀字符串,例如,'/v1'。
format 可选 目标表 String json Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 'json' 格式。
sink.parallelism 可选 目标表 String - sink算子并行度。
version 可选 目标表 String 6 sloth语法检查专用配置

特殊字段规则

数据源 主键 特殊字段类型
ES (同JSON格式规则) 不可设置主键 不支持varbinary字段;不支持metadata字段;map的key必须是string