Denis

@nuculabs
142 Followers
984 Following
363 Posts
Kafka Connect Basics

Hi, this article is about Kafka connect! Introduction Kafka connect is a tool for streaming data between Kafka and other systems. It is distributed and scalable by default and since it’s a standardized tool there are lots of connectors already available. Connectors connect Kafka to a system or vice versa. There are two types of connectors Source: Source connectors grab data from an existing system e.g: MariaDB, PostgreSQL, S3, Jira, and others, and stream the data into one or more Kafka topics. Sink: Sink connectors grab the data from the topics and ingests it to a new system, eg: MongoDB, Snowflake, S3. If you want to stream change data capture events from your databases, the Debezium provides connectors that allow you to do just that. CDC is an append only log that identifies changes in databases, using a cdc stream you can replicate or reconstruct a database, additionally you can react on events by processing them in an external system. Kafka connect can be deployed in standalone mode or distributed as a cluster of workers. It features a RESTful interface for interacting with it: configuring connectors starting, stopping, pausing connectors viewing connector status resting connector offsets It also allows you to apply various transformations on a message. Apache Kafka has an amazing documentation section on Kafka Connect. Rest API For reference, I’ve copied all the operations from the REST API documentation and put them into a table. Method Path Description   GET /connectors return a list of active connectors.   POST /connectors create a new connector.   GET /connectors/{name} get information about a specific connector.   DELETE /connectors/{name} deletes a connector.   GET /connectors/{name}/config get the configuration parameters for a specific connector.   PUT /connectors/{name}/config update the configuration parameters for a specific connector.   PATCH /connectors/{name}/config patch the configuration parameters for a specific connector.   GET /connectors/{name}/status get current status of the connector.   GET /connectors/{name}/tasks get a list of tasks currently running for a connector.   GET /connectors/{name}/tasks/{taskid}/status get current status of the task.   PUT /connectors/{name}/pause pause the connector and its tasks, which stops message processing until the connector is resumed.   PUT /connectors/{name}/stop stop the connector and shut down its tasks.   PUT /connectors/{name}/resume resume a paused or stopped connector.   POST /connectors/{name}/restart restart a connector and its tasks instances.   POST /connectors/{name}/tasks/{taskId}/restart restart an individual task.   PUT /connectors/{name}/topics/reset send a request to empty the set of active topics of a connector.   GET /connectors/{name}/offsets get the current offsets for a connector.   DELETE /connectors/{name}/offsets reset the offsets for a connector.   GET /connector-plugins return a list of connector plugins installed in the Kafka Connect cluster.   GET /connector-plugins/{plugin-type}/config get the configuration definition for the specified plugin.   PUT /connector-plugins/{connector-type}/config/validate validate the provided configuration values against the configuration definition.   To start a new connector instance you would usually use POST on /connectors with a config body: { "name": "my-jdbc-source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:postgresql://localhost:5432/mydb", "connection.user": "myuser", "connection.password": "mypassword", "mode": "incrementing", "incrementing.column.name": "id", "table.whitelist": "users", "poll.interval.ms": "5000", "topic.prefix": "pg.", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "transforms": "maskSensitive", "transforms.maskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.maskSensitive.fields": "email,phone", "transforms.maskSensitive.replacement": "****" } } Converters Converters are used by connect in order to convert values from a type to another. Converts apply to the kafka message key and kafka message value. For example, if you have the following JSON message: {"data": 1} A string converter will put that message as a string in the Kafka topic, where as a JSON converter will keep it JSON. There are also binary format converters like Avro and ProtoBuf, that help reduce the message size by packing the message into the compact format. A downside of this format is that you need the message schema in order to deserialize it. You can also write your own converter and load it into Kafka connect. To set the converters you use the following keys: key.converter: Sets the converter for the message key. value.converter: Sets the converter for the message value. Here are some common converter classes: org.apache.kafka.connect.storage.StringConverter org.apache.kafka.connect.json.JsonConverter org.apache.kafka.connect.converters.ByteArrayConverter io.confluent.connect.json.JsonSchemaConverter (Requires schema registry) io.confluent.connect.protobuf.ProtobufConverter (Requires schema registry) io.confluent.connect.avro.AvroConverter (Requires schema registry) And you usually set a converter with: { "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "key.converter.schemas.enable": "true" } By also setting value.converter.schemas.enable to true you will receive the schema of the JSON message along with the payload. Schema Registry The schema registry is another component that acts as a cache for the message schemas. Binary formats like Avro or Protobuf cannot be decoded by their receiver without the message’s schema, and sending the schema with each message increases the message size. The purpose of the schema registry is to keep all schemas together in a database and let producers and consumers request the schema only when needed, so that messages can be produced in the kafka topic without including the schema. This component is optional, and it’s only required when using binary formats like Avro or Protobuf. Transforms You can apply various transformations on messages that are processed by the connector. Common transforms include masking fields, dropping fields, replacing values, renaming fields and more. Cast - Cast fields or the entire key or value to a specific type DropHeaders - Remove headers by name ExtractField - Extract a specific field from Struct and Map and include only this field in results Filter - Removes messages from all further processing. This is used with a predicate to selectively filter certain messages Flatten - Flatten a nested data structure HeaderFrom - Copy or move fields in the key or value to the record headers HoistField - Wrap the entire event as a single field inside a Struct or a Map InsertField - Add a field using either static data or record metadata InsertHeader - Add a header using static data MaskField - Replace field with valid null value for the type (0, empty string, etc) or custom replacement (non-empty string or numeric value only) RegexRouter - modify the topic of a record based on original topic, replacement string and a regular expression ReplaceField - Filter or rename fields SetSchemaMetadata - modify the schema name or version TimestampConverter - Convert timestamps between different formats TimestampRouter - Modify the topic of a record based on original topic and timestamp. Useful when using a sink that needs to write to different tables or indexes based on timestamps ValueToKey - Replace the record key with a new key formed from a subset of fields in the record value Source: https://kafka.apache.org/41/kafka-connect/user-guide/#transformations To apply transforms you would include them into the connector config: { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "transforms": "maskSensitive", "transforms.maskSensitive.type": "org.apache.kafka.connect.transforms.MaskField$Value", "transforms.maskSensitive.fields": "sensitiveField", "transforms.maskSensitive.replacement": "****" } Docker Compose You can start a pre-configured Kafka Connect instance along with a Kafka cluster for development or playing around using this docker-compose file. services: broker: image: confluentinc/cp-kafka:8.0.0 hostname: broker container_name: broker ports: - "9092:9092" - "9101:9101" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost KAFKA_PROCESS_ROLES: 'broker,controller' KAFKA_CONTROLLER_QUORUM_VOTERS: '1@broker:29093' KAFKA_LISTENERS: 'PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092' KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT' KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER' KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs' # Replace CLUSTER_ID with a unique base64 UUID using "bin/kafka-storage.sh random-uuid" # See https://docs.confluent.io/kafka/operations-tools/kafka-tools.html#kafka-storage-sh CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' # schema-registry: # image: confluentinc/cp-schema-registry:8.0.0 # hostname: schema-registry # container_name: schema-registry # depends_on: # - broker # ports: # - "8081:8081" # environment: # SCHEMA_REGISTRY_HOST_NAME: schema-registry # SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092' # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 connect: image: confluentinc/cp-kafka-connect:8.1.2 hostname: connect container_name: connect depends_on: - broker # - schema-registry ports: - "8083:8083" environment: CONNECT_BOOTSTRAP_SERVERS: 'broker:29092' CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter # CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components" Additionally, you can also download the kafka binary archive and run connect with bin/connect-standalone.sh config/connect-standalone.properties. That’s all, I hope this article gave you a rough idea of Kafka Connect and it’s capabilities. References https://kafka.apache.org/41/kafka-connect/user-guide/ https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/

NucuLabs Blog

This podcast episode with Grady Booch is amazing! The third golden age of software engineering and why it is exciting to be a software developer in these times.

https://youtu.be/OfMAtaocvJw

The third golden age of software engineering – thanks to AI, with Grady Booch

YouTube
I love that the Rails guides are available in .epub format, it's just so nice.

Brilliant European women powered progress, but the credit did not always go to them.

Melitta Bentz invented the coffee filter. History called her a ‘housewife’.
Marthe Gautier found Down Syndrome’s extra chromosome. Her boss took the spotlight.
Maria Anna Mozart composed masterpieces. Wolfgang Amadeus’ name survived, hers vanished.
Janet Sobel pioneered drip painting. Jackson Pollock was the genius.

This International Women’s Day, let’s restore their stories and give the credit they deserve!

I've just used AI to generate some legal documents (simple ones) and ask questions about taxes, so far it worked wonderful. Gemini and Claude were on par.
A Day in the Life of an Ensh*ttificator

YouTube
@marcel @davidgerard hopefully It won't be followed by a bad crash
@nuculabs @davidgerard
Sounds like a sign of desperation to me.
@marcel @davidgerard
400k - 600k per influencer, that's crazy
*️⃣ Ungeachtet der Gefahren rund um Fehlfunktionen oder Zugangsrechteverlust: Influencer, die begeisterte Videos drehen, wie sie mit KI-Agenten Spass haben, werden von Microsoft und Google je mit rund einer halben Million belohnt. Pro Influencer. Zusammengefasst von @davidgerard .
https://pivot-to-ai.com/2026/02/27/copilot-insecure-and-unhelpful-but-oh-those-influencers/
Copilot: insecure and unhelpful — but oh, those influencers!

Copilot is a collection of security holes. In the latest, Copilot was summarising any email in your sent items or drafts — including emails with confidentiality labels. This was reported in January…

Pivot to AI