任务运行时异常:java.sql.BatchUpdateException: Duplicate entry 'xxxxx' for key 'PRIMARY'. XA_RBROLLBACK: Transaction branch was rolled back.

问题描述/异常栈

2021-11-30 16:46:47,949 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat [] - JDBC executeBatch error, retry times = 0
java.sql.BatchUpdateException: Duplicate entry 'xxxxx' for key 'PRIMARY'. XA_RBROLLBACK: Transaction branch was rolled back.
    at sun.reflect.GeneratedConstructorAccessor30.newInstance(Unknown Source) ~[?:?]
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_152]
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_152]
    at com.mysql.cj.util.Util.handleNewInstance(Util.java:192) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.util.Util.getInstance(Util.java:167) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.util.Util.getInstance(Util.java:174) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.exceptions.SQLError.createBatchUpdateException(SQLError.java:224) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:755) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchInternal(ClientPreparedStatement.java:426) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.StatementImpl.executeBatch(StatementImpl.java:796) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.connector.jdbc.internal.executor.TableBufferedStatementExecutor.executeBatch(TableBufferedStatementExecutor.java:64) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.attemptFlush(JdbcBatchingOutputFormat.java:216) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.flush(JdbcBatchingOutputFormat.java:184) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.writeRecord(JdbcBatchingOutputFormat.java:167) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:49) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:72) [flink-table-blink_2.11-ne-flink-1.12.4-1.0.0.jar:ne-flink-1.12.4-1.0.0]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at StreamExecCalc$28.processElement(Unknown Source) [flink-table-blink_2.11-ne-flink-1.12.4-1.0.0.jar:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:374) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) [plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry 'DPK364026238465' for key 'PRIMARY'. XA_RBROLLBACK: Transaction branch was rolled back.
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:117) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:953) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1092) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeUpdateInternal(ClientPreparedStatement.java:1040) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeLargeUpdate(ClientPreparedStatement.java:1347) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeBatchedInserts(ClientPreparedStatement.java:716) ~[plugin_ne-flink-1.12.4-1.1.1_scala2.12_hive2.1.1-3.8.0-1.2.2-SNAPSHOT.jar:?]
    ... 31 more
2021-11-30 16:46:48,036 ERROR org.apache.flink.connector.jdbc.internal.JdbcBatchingOutpu

解决方案

在建表时添加参数设置主键:
SET '表名.primary.keys' = '字段名';

问题原因

主键冲突,多为目标端设置了主键,但是数据中存在重复主键数据导致。

作者:王松