728x90
반응형
에러내용
k:190)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'test-list_agg' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='june_test.estancia.test-list_agg',partition=0,offset=0,timestamp=1703078660902) with a null key and null key schema.
at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:116)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
... 10 more
[2023-12-21 18:24:27,279] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:107)
원인
sink 커넥터가 타겟 DB에 데이터를 넣기 전에 아래 파라미터와 메시지 간 충돌이 발생한다.
- delete.enabled=true : 데이터를 삭제할 수 있는 옵션
- pk.mode=record_key : 식별자를 레코드의 키(pk키로 생각하면 된다)를 기준으로 각 레코드를 구분하는 옵션
위 옵션이 활성화되기 위해서는 메시지 내 키밸류에서 키 스키마가 null이면 안된다. null은 정확하게 식별할 수가 없기 때문
에러 발생 커넥터 생성 설정 파라미터
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://xxx.xxx.xxx.xxx:8089/connectors/ -d '{
"name": "test_list_agg",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "(.*)\\_list\\_agg",
"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3333/june_test?characterEncoding=UTF-8&serverTimezone=Asia/Seoul",
"connection.user":"test",
"connection.password":"masking",
"auto.create": "true",
"auto.evolve":"true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.mode": "record_key",
"table.name.format": "june_test.test-list_agg",
"pk.field": "(__host,table_schema, table_name)",
"transforms": "ReplaceField",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "__deleted"
}
}'
소스 테이블 스키마 정보
create table ec_list_agg (
host varchar(100)
, port varchar(10)
, dummy_1 varchar(10)
, dummy_2 varchar(10)
, dummy_3 varchar(10)
, dummy_4 varchar(50)
, table_schema varchar(100)
, table_name varchar(100)
, block_cnt bigint
, row_cnt bigint);
시도한 방법
- 타겟 DB에 pk를 만들어봤지만 동일 에러 발생
- jdbc sink 커넥터 파라미터에 아래 추가했지만 동일 에러 발생
- "key.converter": "org.apache.kafka.connect.json.JsonConverter"
- "key.converter.schemas.enable": "true"
- "value.converter": "org.apache.kafka.connect.json.JsonConverter"
- "value.converter.schemas.enable": "true"
해결방법 1
일단 적재를 한다. append-only 방식이다.
이렇게 되면 소스 데이터가 update, delete 되었을 때 타겟 데이터는 바뀌지 않는다.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://xxx.xxx.xxx.xxx:8089/connectors/ -d '{
"name": "test_list_agg",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "(.*)\\_list\\_agg",
"connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3333/june_test?characterEncoding=UTF-8&serverTimezone=Asia/Seoul",
"connection.user":"test",
"connection.password":"masking",
"auto.create": "true",
"auto.evolve":"true",
# "insert.mode": "insert", 이 부분 upsert -> insert로 변경
# "delete.enabled": "true", 이 부분 제거
# "pk.mode": "record_key", 이 부분 제거
"table.name.format": "june_test.test-list_agg",
"pk.field": "(__host,table_schema, table_name)",
"transforms": "ReplaceField",
"transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.ReplaceField.blacklist": "__deleted"
}
}'
데이터가 적재가 된다.
MariaDB [mysql]> select count(1) from june_test.test-list_agg;
+----------+
| count(1) |
+----------+
| 1442398 |
+----------+
1 row in set (0.863 sec)
해결방법 2
소스 데이터에 pk를 설정한 후 프로듀싱 구간부터 재수집한다.
이렇게 되면 소스DB <> 프로듀서 <> 브로커 <> 컨슈머 모두 영향을 끼치게 된다.
하지만 소스DB의 데이터가 update, delete 됨에 따라 타겟DB 데이터도 동기화되길 바란다면 이 방법이 최선이다.
참고
그렇다면 카프카 브로커 토픽 내 적재된 메시지에 null 스키마를 not null로 바꾸면 되는 것 아닌가 생각을 할 수도 있다.
카프카 브로커의 메시지는 수정이 불가능하다.
728x90
반응형
'Data PipeLine > Jdbc Sink' 카테고리의 다른 글
[Confluent Jdbc Sink] 10.7.x Release note 주요 내용 정리 (0) | 2024.02.21 |
---|---|
[Confluent Jdbc Sink] 10.6.x Release note 주요 내용 정리 (0) | 2024.02.20 |