ElasticSearch流表
更新时间: 2023-05-16 14:55:44
阅读 317
本文将为您介绍 EasyStream 中如何创建 ElasticSearch 类型流表。
操作步骤
- 在流表管理页面,单击目标逻辑库,表单页面左上角显示对应库名表示切换成功。
- 单击 创建表 按键,进行表创建。
- 选择 数据源类型 为 ElasticSearch。
- 完成剩余表单内容,创建 ElasticSearch 流表。
字段类型映射
Elasticsearch 将文档存储在 JSON 字符串中。因此数据类型映射介于 Flink 数据类型和 JSON 数据类型之间。 Flink 为 Elasticsearch 连接器使用内置的 json
格式,请参阅流表序列化及映射规则。
流表配置
配置名称 | 是否必填 | 配置生效类型 | 参数值字段类型 | 参数默认值 | 参数说明(用于用户手册) | |
---|---|---|---|---|---|---|
connector | 必填 | 目标表 | String | - | 指定要使用的连接器,有效值为: elasticsearch-6 :连接到 Elasticsearch 6.x 的集群。 elasticsearch-7 :连接到 Elasticsearch 7.x 及更高版本的集群。 |
|
hosts | 必填 | 目标表 | String | - | 要连接到的一台或多台 Elasticsearch 主机,例如 'http://host_name:9092;http://host_name:9093'。 | |
password | 可选 | 目标表 | String | - | 用于连接 Elasticsearch 实例的密码。 | |
username | 可选 | 目标表 | String | - | 用于连接 Elasticsearch 实例的用户名。 | |
index | 必填 | 目标表 | String | - | Elasticsearch 中每条记录的索引。可以是一个静态索引(例如 'myIndex')或一个动态索引(例如 'index-{log_ts\ | yyyy-MM-dd}')。 |
document-type | 对es5/es6必填,es7不必填 | 目标表 | String | - | Elasticsearch 文档类型。在 elasticsearch-7 中不再需要。 | |
document-id.key-delimiter | 可选 | 目标表 | String | _ | 复合键的分隔符(默认为"_") | |
failure-handler | 可选 | 目标表 | String | fail | 对 Elasticsearch 请求失败情况下的失败处理策略。有效策略为: fail :如果请求失败并因此导致作业失败,则抛出异常。 ignore :忽略失败并放弃请求。 retry-rejected :重新添加由于队列容量饱和而失败的请求。 自定义类名称name :使用 ActionRequestFailureHandler 的子类进行失败处理。 |
|
sink.flush-on-checkpoint | 可选 | 目标表 | Boolean | true | 在进行 checkpoint 时是否保证刷出缓冲区中的数据。 | |
sink.bulk-flush.max-size | 可选 | 目标表 | MemorySize | 2mb | 每个批量请求的缓冲操作在内存中的最大值。单位必须为 MB。 可以设置为'0'来禁用它。 | |
sink.bulk-flush.max-actions | 可选 | 目标表 | Integer | 1000 | 每个批量请求的最大缓冲操作数。 可以设置为'0'来禁用它。 | |
sink.bulk-flush.interval | 可选 | 目标表 | Duration | 1s | flush 缓冲操作的间隔。 可以设置为'0'来禁用它。注意,'sink.bulk-flush.max-size'和'sink.bulk-flush.max-actions'都设置为'0'的这种 flush 间隔设置允许对缓冲操作进行完全异步处理。 | |
sink.bulk-flush.backoff.strategy | 可选 | 目标表 | String | DISABLED | 指定在由于临时请求错误导致任何 flush 操作失败时如何执行重试。有效策略为: DISABLED :不执行重试,即第一次请求错误后失败。 CONSTANT :等待重试之间的回退延迟。 EXPONENTIAL :先等待回退延迟,然后在重试之间指数递增。 |
|
sink.bulk-flush.backoff.max-retries | 可选 | 目标表 | Integer | - | 最大回退重试次数。 | |
sink.bulk-flush.backoff.delay | 可选 | 目标表 | Duration | - | 每次回退重试之间的延迟。对于 CONSTANT 退避策略,该值是每次重试之间的延迟。对于 EXPONENTIAL 退避策略,该值是初始的延迟。 | |
connection.max-retry-timeout | 可选 | 目标表 | Duration | - | 每次重试之间的延迟。 | |
connection.path-prefix | 可选 | 目标表 | String | - | 添加到每个 REST 通信中的前缀字符串,例如,'/v1'。 | |
format | 可选 | 目标表 | String | json | Elasticsearch 连接器支持指定格式。该格式必须生成一个有效的 json 文档。 默认使用内置的 'json' 格式。 | |
sink.parallelism | 可选 | 目标表 | String | - | sink算子并行度。 | |
version | 可选 | 目标表 | String | 6 | sloth语法检查专用配置 |
特殊字段规则
数据源 | 主键 | 特殊字段类型 |
---|---|---|
ES (同JSON格式规则) | 不可设置主键 | 不支持varbinary字段;不支持metadata字段;map的key必须是string |
文档反馈
以上内容对您是否有帮助?