I’m using docker with kafka and clickhouse. I want to connect ‘KsqlDB table’ and ‘clickhouse’ using ‘kafka connect’. So I referred to this document and modified ‘docker composite’.
here is my docker-compose
--- version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:6.2.1 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 broker: image: confluentinc/cp-server:6.2.1 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" - "9101:9101" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092 CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 CONFLUENT_METRICS_ENABLE: 'true' CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' schema-registry: image: confluentinc/cp-schema-registry:6.2.1 hostname: schema-registry container_name: schema-registry depends_on: - broker ports: - "8081:8081" environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: container_name: connect build: context: ./ dockerfile: Dockerfile-kafka-connect args: clickHouseVersion: "0.2.4" depends_on: - broker - schema-registry - zookeeper ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL : http://schema-registry:8081 CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' # CLASSPATH required due to CC-2422 # CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.1.jar # CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" # CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR control-center: image: confluentinc/cp-enterprise-control-center:6.2.1 hostname: control-center container_name: control-center depends_on: - broker - schema-registry - connect - ksqldb-server ports: - "9021:9021" environment: CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092' CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083' CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088" CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088" CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" CONTROL_CENTER_REPLICATION_FACTOR: 1 CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 CONFLUENT_METRICS_TOPIC_REPLICATION: 1 PORT: 9021 ksqldb-server: image: confluentinc/cp-ksqldb-server:6.2.1 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker - connect ports: - "8088:8088" environment: KSQL_CONFIG_DIR: "/etc/ksql" KSQL_BOOTSTRAP_SERVERS: "broker:29092" KSQL_HOST_NAME: ksqldb-server KSQL_LISTENERS: "http://0.0.0.0:8088" KSQL_CACHE_MAX_BYTES_BUFFERING: 0 KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" KSQL_KSQL_CONNECT_URL: "http://connect:8083" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1 KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true' KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true' ksqldb-cli: image: confluentinc/cp-ksqldb-cli:6.2.1 container_name: ksqldb-cli depends_on: - broker - connect - ksqldb-server entrypoint: /bin/sh tty: true ksql-datagen: image: confluentinc/ksqldb-examples:6.2.1 hostname: ksql-datagen container_name: ksql-datagen depends_on: - ksqldb-server - broker - schema-registry - connect command: "bash -c 'echo Waiting for Kafka to be ready... && cub kafka-ready -b broker:29092 1 40 && echo Waiting for Confluent Schema Registry to be ready... && cub sr-ready schema-registry 8081 40 && echo Waiting a few seconds for topic creation to finish... && sleep 11 && tail -f /dev/null'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: broker:29092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 rest-proxy: image: confluentinc/cp-kafka-rest:6.2.1 depends_on: - broker - schema-registry ports: - 8082:8082 hostname: rest-proxy container_name: rest-proxy environment: KAFKA_REST_HOST_NAME: rest-proxy KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092' KAFKA_REST_LISTENERS: "http://0.0.0.0:8082" KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081' clickhouse: image: yandex/clickhouse-server:20.4 hostname: clickhouse container_name: clickhouse
And This is Dockerfile-kafka-connect
FROM confluentinc/cp-kafka-connect-base:6.2.1 ARG jdbcDriverPath ARG clickHouseVersion ENV JDBC_DRIVER_PATH=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib ENV JDBC_DRIVER=clickhouse-jdbc-$clickHouseVersion.jar RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest RUN mkdir -p $JDBC_DRIVER_PATH && echo "Downloading JDBC Driver for ClickHouse v$clickHouseVersion" && wget -O ${JDBC_DRIVER_PATH}/${JDBC_DRIVER} https://github.com/ClickHouse/clickhouse-jdbc/releases/download/release_$clickHouseVersion/${JDBC_DRIVER}
And I typed this command in ‘KsqlDB’.
CREATE SOURCE CONNECTOR `clickhouse-kospi-connector` WITH ( "value.converter.schema.registry.url"= "http://schema-registry:8081", "key.converter.schema.registry.url"= "http://schema-registry:8081", "connector.class"= "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max"= "1", "key.converter"= "io.confluent.connect.avro.AvroConverter", "value.converter"= "io.confluent.connect.avro.AvroConverter", "errors.log.enable"= "true", "errors.log.include.messages"= "true", "topics"= "S3_FINAL", "connection.url"= "jdbc:clickhouse://clickhouse:8123/default", "auto.create"= "true" }
(‘S3_FINAL’ topic has AVRO format for both multi-key and values.)
And It doesn’t work. So I read the error message.
ERROR WorkerSinkTask{id=clickhouse-kospi-connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: org/apache/http/conn/HttpClientConnectionManager (org.apache.kafka.connect.runtime.WorkerSinkTask) java.lang.NoClassDefFoundError: org/apache/http/conn/HttpClientConnectionManager at ru.yandex.clickhouse.ClickHouseConnectionImpl.<init>(ClickHouseConnectionImpl.java:73) at ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:55) at ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:47) at ru.yandex.clickhouse.ClickHouseDriver.connect(ClickHouseDriver.java:29) at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677) at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:189) at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.getConnection(GenericDatabaseDialect.java:239) at io.confluent.connect.jdbc.util.CachedConnectionProvider.newConnection(CachedConnectionProvider.java:80) at io.confluent.connect.jdbc.util.CachedConnectionProvider.getConnection(CachedConnectionProvider.java:52) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:64) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:84) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:582) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.ClassNotFoundException: org.apache.http.conn.HttpClientConnectionManager at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589) at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) ... 22 more
I looked into the problem, but I don’t know how to fill in the empty jar while using the docker. In addition, since this problem is a problem of CLASSPATH, I think it is a problem of setting. Will you be able to help me?
Advertisement
Answer
It was solved when ‘httpcomponents-client-4.5.13’ was downloaded through wget. I think ‘httpclient’ was needed in ‘clickhouse-jdbc’. I’m using clickhouse-jdbc-v0.2.6
FROM confluentinc/cp-kafka-connect-base:6.2.1 ARG jdbcDriverPath ARG clickHouseVersion ENV JDBC_DRIVER_PATH=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib ENV HTTP_COMP_PATH=/usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc ENV JDBC_DRIVER=clickhouse-jdbc-$clickHouseVersion.jar RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:6.2.1 RUN mkdir -p $JDBC_DRIVER_PATH && echo "Downloading JDBC Driver for ClickHouse v$clickHouseVersion" && wget -O ${JDBC_DRIVER_PATH}/${JDBC_DRIVER} https://github.com/ClickHouse/clickhouse-jdbc/releases/download/v$clickHouseVersion/${JDBC_DRIVER} RUN wget -O ${HTTP_COMP_PATH}/httpcomponents-client-4.5.13-bin.tar.gz https://dlcdn.apache.org//httpcomponents/httpclient/binary/httpcomponents-client-4.5.13-bin.tar.gz RUN tar -zxvf ${HTTP_COMP_PATH}/httpcomponents-client-4.5.13-bin.tar.gz -C ${HTTP_COMP_PATH}