我自己做测试从oracle同步到pg利用kafka connect debezium 和jdbc sink connector。插入数据没用问题,只有更新事件,时候出现debezium的记录里面只有更改的值,before和after里面其他字段的值都为null,导致在PG数据库里面无法更新。以下是我的配置和更新的消息。
{
"name": "my-oracle-connector",
"config": {
"connector.class": "io.debezium.connector.oracle.OracleConnector",
"database.hostname": "localhost",
"database.port": "1521",
"database.user": "xx",
"database.password": "xxx",
"database.dbname": "ORCL",
"database.server.name": "my-oracle-db",
"schema.history.internal.kafka.topic": "test-schema-topic",
"schema.history.internal.kafka.bootstrap.servers": "localhost:9092",
"table.include.list": "MRSONG.TEST_SYNC",
"topic.prefix": "mrsong",
"decimal.handling.mode": "string",
"pk.fields":"MRSONG.TEST_SYNC.ID"
}
}
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "ID"
},
{
"type": "string",
"optional": true,
"field": "NAME"
},
{
"type": "string",
"optional": true,
"field": "CITY"
},
{
"type": "string",
"optional": true,
"field": "AGE"
}
],
"optional": true,
"name": "mrsong.MRSONG.TEST_SYNC.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "ID"
},
{
"type": "string",
"optional": true,
"field": "NAME"
},
{
"type": "string",
"optional": true,
"field": "CITY"
},
{
"type": "string",
"optional": true,
"field": "AGE"
}
],
"optional": true,
"name": "mrsong.MRSONG.TEST_SYNC.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "txId"
},
{
"type": "string",
"optional": true,
"field": "scn"
},
{
"type": "string",
"optional": true,
"field": "commit_scn"
},
{
"type": "string",
"optional": true,
"field": "lcr_position"
},
{
"type": "string",
"optional": true,
"field": "rs_id"
},
{
"type": "int64",
"optional": true,
"field": "ssn"
},
{
"type": "int32",
"optional": true,
"field": "redo_thread"
},
{
"type": "string",
"optional": true,
"field": "user_name"
}
],
"optional": false,
"name": "io.debezium.connector.oracle.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "mrsong.MRSONG.TEST_SYNC.Envelope",
"version": 1
},
"payload": {
"before": {
"ID": null,
"NAME": null,
"CITY": null,
"AGE": "66"
},
"after": {
"ID": null,
"NAME": null,
"CITY": null,
"AGE": "44444"
},
"source": {
"version": "2.4.0.Beta2",
"connector": "oracle",
"name": "mrsong",
"ts_ms": 1695258469000,
"snapshot": "false",
"db": "ORCL",
"sequence": null,
"schema": "MRSONG",
"table": "TEST_SYNC",
"txId": "06001f00367a0000",
"scn": "34757042",
"commit_scn": "34757055",
"lcr_position": null,
"rs_id": "0x000287.0002da93.0010",
"ssn": 0,
"redo_thread": 1,
"user_name": "MRSONG"
},
"op": "u",
"ts_ms": 1695258475754,
"transaction": null
}
}
可以看到只有AGE变了,其他字段都是NULL,插入事件是没有问题的,只有更新事件这样,我换了几个版本的debezium oracle connector都这样。这会导致我用jdbc sink connector同步到PG数据库无法新增,也无法更新,因为没有主键消息。