Create and Manage Kafka Connect Connectors in Kubernetes

When you have Kafka Connect deployed, you can create and manage connectors using Redpanda Console or the Kafka Connect REST API.

The Connectors Helm chart is a community-supported artifact. Redpanda Data does not provide enterprise support for this chart. For support, reach out to the Redpanda team in Redpanda Community Slack.

Prerequisites

Verify Kafka Connect is accessible

Before managing connectors, verify that Kafka Connect is running and accessible:

  1. Check the Pod status:

    kubectl get pod -l app.kubernetes.io/name=connectors --namespace <namespace>

    Expected output:

    NAME                                   READY   STATUS    RESTARTS   AGE
    redpanda-connectors-6d64b948f6-dk484   1/1     Running   0          5m
  2. Test the Kafka Connect REST API:

    POD_NAME=$(kubectl get pod -l app.kubernetes.io/name=connectors --namespace <namespace> -o jsonpath='{.items[0].metadata.name}')
    kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083 | jq '.version'

    This should return the Kafka Connect version.

  3. Verify the available connector plugins:

    kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connector-plugins | jq '.[].class'

    Expected output should include the MirrorMaker2 connectors:

    "org.apache.kafka.connect.mirror.MirrorCheckpointConnector"
    "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"
    "org.apache.kafka.connect.mirror.MirrorSourceConnector"

Manage connectors in Redpanda Console

Redpanda Console provides a web interface for managing Kafka Connect connectors. This is the recommended approach for most users as it provides a user-friendly interface and validation.

Access Redpanda Console

By default, Redpanda Console is deployed with a ClusterIP Service. To access Redpanda Console, you can use the kubectl port-forward command to forward one of your local ports to the Pod.

The kubectl port-forward command is a development tool. To expose services to external traffic in a more permanent and controlled manner, use Kubernetes Services such as LoadBalancer or NodePort.
  1. Expose Redpanda Console to your localhost:

    kubectl --namespace <namespace> port-forward svc/redpanda-console 8080:8080

    This command actively runs in the command-line window. To execute other commands while the command is running, open another command-line window.

  2. Open Redpanda Console on http://localhost:8080.

Using the Connect interface

When you have access to Redpanda Console:

  1. Navigate to Connect in the left menu.

  2. If Kafka Connect is properly configured, you should see:

    • The Kafka Connect cluster

    • A list of available connector types

    • Any existing tasks

From here, you can:

  • View connector status and health

  • Pause/resume connectors

  • View connector configuration

  • Delete connectors

  • View connector logs and metrics

Troubleshoot Redpanda Console connectivity

If you see "Kafka Connect is not configured":

  1. Verify the Redpanda Console configuration includes Kafka Connect settings (see deployment guide troubleshooting)

  2. Check the Redpanda Console logs for connection errors:

    kubectl logs -n <namespace> -l app.kubernetes.io/name=console --tail=20
  3. Look for successful connection messages:

    "creating Kafka connect HTTP clients and testing connectivity to all clusters"
    "tested Kafka connect cluster connectivity","successful_clusters":1,"failed_clusters":0"

Manage connectors with the REST API

This section provides examples of managing connectors using the Kafka Connect REST API with cURL. This approach is useful for automation, scripting, and CI/CD pipelines.

All REST API commands should be executed from within the Kafka Connect Pod or through kubectl exec. For comprehensive REST API documentation, see the Kafka Connect REST API reference.

For complex connector configurations and production deployments, consider using the Helm chart configuration options to manage connector settings.

Get connector Pod information

Get the Pod name and verify connectivity:

# Get the connector Pod name
POD_NAME=$(kubectl get pod -l app.kubernetes.io/name=connectors --namespace <namespace> -o jsonpath='{.items[0].metadata.name}')
echo "Using pod: $POD_NAME"

# Test basic connectivity
kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083

View version of Kafka Connect

kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083 | jq

Expected output:

{
  "version": "3.8.0",
  "commit": "771b9576b00ecf5b",
  "kafka_cluster_id": "redpanda.3e2649b0-f84c-4c03-b5e3-d6d1643f65b2"
}

View available connector plugins

kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connector-plugins | jq

View cluster worker information

kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083 | jq

This returns basic cluster information including version and Kafka cluster ID.

View all connectors

# List connector names only
kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connectors | jq

# View connectors with detailed status and configuration
kubectl exec $POD_NAME --namespace <namespace> -- curl -s 'localhost:8083/connectors?expand=status&expand=info' | jq

Create a connector

To create a connector, use a POST request with JSON configuration:

kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connectors" \
  -H 'Content-Type: application/json' \
  --data-raw '{
    "name": "heartbeat-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
      "heartbeats.topic.replication.factor": "1",
      "replication.factor": "1",
      "source.cluster.alias": "source",
      "target.cluster.alias": "target",
      "source.cluster.bootstrap.servers": "redpanda.redpanda.svc.cluster.local:9093",
      "target.cluster.bootstrap.servers": "redpanda.redpanda.svc.cluster.local:9093",
      "emit.heartbeats.interval.seconds": "30"
    }
  }' | jq

Example for MirrorSourceConnector:

kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connectors" \
  -H 'Content-Type: application/json' \
  --data-raw '{
    "name": "mirror-source-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "source.cluster.alias": "source",
      "target.cluster.alias": "target",
      "source.cluster.bootstrap.servers": "source-cluster:9092",
      "target.cluster.bootstrap.servers": "redpanda.redpanda.svc.cluster.local:9093",
      "topics": "my-topic",
      "replication.factor": "1"
    }
  }' | jq

View connector details

# View connector configuration
kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connectors/<connector-name>/config | jq

# View connector status
kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connectors/<connector-name>/status | jq

# View connector tasks
kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connectors/<connector-name>/tasks | jq

Example:

kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connectors/heartbeat-connector/status | jq

Update connector configuration

kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connectors/<connector-name>/config" \
  -H 'Content-Type: application/json' \
  -X PUT \
  --data-raw '{
    "connector.class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "heartbeats.topic.replication.factor": "1",
    "replication.factor": "1",
    "source.cluster.alias": "source",
    "target.cluster.alias": "target",
    "source.cluster.bootstrap.servers": "redpanda.redpanda.svc.cluster.local:9093",
    "target.cluster.bootstrap.servers": "redpanda.redpanda.svc.cluster.local:9093",
    "emit.heartbeats.interval.seconds": "60"
  }' | jq

Pause and resume connectors

# Pause a connector
kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connectors/<connector-name>/pause" -X PUT

# Resume a connector
kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connectors/<connector-name>/resume" -X PUT

# Restart a connector
kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connectors/<connector-name>/restart" -X POST

Delete a connector

kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connectors/<connector-name>" -X DELETE

Example:

kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connectors/heartbeat-connector" -X DELETE

Troubleshoot REST API issues

Common error responses

  • 404 Not Found: Connector doesn’t exist

  • 409 Conflict: Connector with the same name already exists

  • 400 Bad Request: Invalid configuration

Debugging connector failures

  1. Check connector status for error details:

    kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connectors/<connector-name>/status | jq '.connector.trace'
  2. View task-level errors:

    kubectl exec $POD_NAME --namespace <namespace> -- curl -s localhost:8083/connectors/<connector-name>/status | jq '.tasks[].trace'
  3. Check Kafka Connect logs:

    kubectl logs -n <namespace> -l app.kubernetes.io/name=connectors --tail=50

    For more detailed logging configuration, see logging configuration in the deployment guide.

Connector management best practices

Configuration validation

Before creating a connector, validate the configuration:

kubectl exec $POD_NAME --namespace <namespace> -- curl -s "localhost:8083/connector-plugins/<connector-class>/config/validate" \
  -H 'Content-Type: application/json' \
  -X PUT \
  --data-raw '{"connector.class": "<connector-class>", ...}' | jq

Monitoring connector health

Regularly check connector status:

# Check all connectors at once
kubectl exec $POD_NAME --namespace <namespace> -- curl -s 'localhost:8083/connectors?expand=status' | \
  jq '.[] | {name: .status.name, state: .status.connector.state, tasks: [.status.tasks[].state]}'

For comprehensive monitoring and alerting, see Kafka Connect monitoring guide.

Backup connector configurations

Save connector configurations for disaster recovery:

# Export all connector configs
kubectl exec $POD_NAME --namespace <namespace> -- curl -s 'localhost:8083/connectors?expand=info' | \
  jq '.[] | {name: .info.name, config: .info.config}' > connectors-backup.json