본문 바로가기
Data PipeLine/Jdbc Sink

[Jdbc Sink] 레코드에 not null 키 구조가 없어 발생하는 에러

by 연습장이 2023. 12. 25.
728x90
반응형

에러내용

k:190)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:610)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'test-list_agg' is configured with 'delete.enabled=true' and 'pk.mode=record_key' and therefore requires records with a non-null key and non-null Struct or primitive key schema, but found record at (topic='june_test.estancia.test-list_agg',partition=0,offset=0,timestamp=1703078660902) with a null key and null key schema.
        at io.confluent.connect.jdbc.sink.RecordValidator.lambda$requiresKey$3(RecordValidator.java:116)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:82)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582)
        ... 10 more
[2023-12-21 18:24:27,279] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask:107)

 

원인

sink 커넥터가 타겟 DB에 데이터를 넣기 전에 아래 파라미터와 메시지 간 충돌이 발생한다.

  • delete.enabled=true : 데이터를 삭제할 수 있는 옵션
  • pk.mode=record_key : 식별자를 레코드의 키(pk키로 생각하면 된다)를 기준으로 각 레코드를 구분하는 옵션

위 옵션이 활성화되기 위해서는 메시지 내 키밸류에서 키 스키마가 null이면 안된다. null은 정확하게 식별할 수가 없기 때문

에러 발생 커넥터 생성 설정 파라미터

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://xxx.xxx.xxx.xxx:8089/connectors/ -d '{
     "name": "test_list_agg",
     "config": {
         "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
         "tasks.max": "1",
         "topics.regex": "(.*)\\_list\\_agg",
         "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3333/june_test?characterEncoding=UTF-8&serverTimezone=Asia/Seoul",
         "connection.user":"test",
         "connection.password":"masking",
         "auto.create": "true",
         "auto.evolve":"true",
         "insert.mode": "upsert",
         "delete.enabled": "true",
         "pk.mode": "record_key",
         "table.name.format": "june_test.test-list_agg",
         "pk.field": "(__host,table_schema, table_name)",
         "transforms": "ReplaceField",
         "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
         "transforms.ReplaceField.blacklist": "__deleted"
      }
}'

 

소스 테이블 스키마 정보

create table ec_list_agg (                                       
 host varchar(100)
, port varchar(10)
, dummy_1 varchar(10)
, dummy_2 varchar(10)
, dummy_3 varchar(10)
, dummy_4 varchar(50)
, table_schema varchar(100)
, table_name varchar(100)
, block_cnt bigint
, row_cnt bigint);

시도한 방법

  • 타겟 DB에 pk를 만들어봤지만 동일 에러 발생
  • jdbc sink 커넥터 파라미터에 아래 추가했지만 동일 에러 발생
    • "key.converter": "org.apache.kafka.connect.json.JsonConverter"
    •  "key.converter.schemas.enable": "true"
    • "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    • "value.converter.schemas.enable": "true"

해결방법 1

일단 적재를 한다. append-only 방식이다.

이렇게 되면 소스 데이터가 update, delete 되었을 때 타겟 데이터는 바뀌지 않는다.

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://xxx.xxx.xxx.xxx:8089/connectors/ -d '{
     "name": "test_list_agg",
     "config": {
         "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
         "tasks.max": "1",
         "topics.regex": "(.*)\\_list\\_agg",
         "connection.url": "jdbc:mysql://xxx.xxx.xxx.xxx:3333/june_test?characterEncoding=UTF-8&serverTimezone=Asia/Seoul",
         "connection.user":"test",
         "connection.password":"masking",
         "auto.create": "true",
         "auto.evolve":"true",
         # "insert.mode": "insert", 이 부분 upsert -> insert로 변경
         # "delete.enabled": "true", 이 부분 제거
         # "pk.mode": "record_key", 이 부분 제거
         "table.name.format": "june_test.test-list_agg",
         "pk.field": "(__host,table_schema, table_name)",
         "transforms": "ReplaceField",
         "transforms.ReplaceField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
         "transforms.ReplaceField.blacklist": "__deleted"
      }
}'

 

데이터가 적재가 된다.

MariaDB [mysql]> select count(1) from june_test.test-list_agg;
+----------+
| count(1) |
+----------+
|  1442398 |
+----------+
1 row in set (0.863 sec)

 

해결방법 2

소스 데이터에 pk를 설정한 후 프로듀싱 구간부터 재수집한다.

이렇게 되면 소스DB <> 프로듀서 <> 브로커 <> 컨슈머 모두 영향을 끼치게 된다.

하지만 소스DB의 데이터가 update, delete 됨에 따라 타겟DB 데이터도 동기화되길 바란다면 이 방법이 최선이다.

 

참고

그렇다면 카프카 브로커 토픽 내 적재된 메시지에 null 스키마를 not null로 바꾸면 되는 것 아닌가 생각을 할 수도 있다.

 

카프카 브로커의 메시지는 수정이 불가능하다.

728x90
반응형