This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Apache Kafka Connect

Lean how to use the Jikkou Extension Provider for Apache Kafka Connect.

Here, you will find information to use the Kafka Connect extension for Jikkou.

More information:

1 - Configuration

Learn how to configure the extensions for Kafka Connect.

This section describes how to configure the Kafka Connect extension.

Extension

The Kafka Connect extension can be enabled/disabled via the configuration properties:

# Example
jikkou {
  extensions.provider.kafkaconnect.enabled = true
}

Configuration

You can configure the properties to be used to connect the Kafka Connect cluster through the Jikkou client configuration property: jikkou.kafkaConnect.

Example:

jikkou {
  kafkaConnect {
    # Array of Kafka Connect clusters configurations.
    clusters = [
      {
        # Name of the cluster (e.g., dev, staging, production, etc.)
        name = "locahost"
        # URL of the Kafka Connect service
        url = "http://localhost:8083"
        # Method to use for authenticating on Kafka Connect. Available values are: [none, basicauth, ssl]
        authMethod = none
        # Use when 'authMethod' is 'basicauth' to specify the username for Authorization Basic header
        basicAuthUser = null
        # Use when 'authMethod' is 'basicauth' to specify the password for Authorization Basic header
        basicAuthPassword = null
        # Enable debug logging
        debugLoggingEnabled = false
  
        # Ssl Config: Use when 'authMethod' is 'ssl'
        # The location of the key store file.
        sslKeyStoreLocation = "/certs/registry.keystore.jks"
        # The file format of the key store file.
        sslKeyStoreType = "JKS"
        # The password for the key store file.
        sslKeyStorePassword = "password"
        # The password of the private key in the key store file.
        sslKeyPassword = "password"
        # The location of the trust store file.
        sslTrustStoreLocation = "/certs/registry.truststore.jks"
        # The file format of the trust store file.
        sslTrustStoreType = "JKS"
        # The password for the trust store file.
        sslTrustStorePassword = "password"
        # Specifies whether to ignore the hostname verification.
        sslIgnoreHostnameVerification = true
      }
    ]
  }
}

2 - Resources

Learn how to use the resources provided by the Kafka Connect extension.

Here, you will find the list of resources supported by the Kafka Connect Extension.

Kafka Connect Resources

More information:

2.1 - KafkaConnectors

Learn how to manage Kafka Connectors.

This section describes the resource definition format for KafkaConnector entities, which can be used to define the configuration and status of connectors you plan to create and manage on specific Kafka Connect clusters.

Definition Format of KafkaConnector

Below is the overall structure of the KafkaConnector resource.

---
apiVersion: "kafka.jikkou.io/v1beta1"  # The api version (required)
kind: "KafkaConnector"                 # The resource kind (required)
metadata:
  name: <string>                       # The name of the connector (required)
  labels:
    # Name of the Kafka Connect cluster to create the connector instance in (required).
    kafka.jikkou.io/connect-cluster: <string>
  annotations:
    # Override client properties to connect to Kafka Connect cluster (optional).
    jikkou.io/config-override: | 
      <json>
spec:
  connectorClass: <string>            # Name or alias of the class for this connector.
  tasksMax: <integer>                 # The maximum number of tasks for the Kafka Connector.
  config:                             # Configuration properties of the connector.
    <key>: <value>
  state: <string>                     # The state the connector should be in. Defaults to running.

See below for details about all these fields.

Metadata

metadata.name [required]

The name of the connector.

labels.kafka.jikkou.io/connect-cluster [required]

The name of the Kafka Connect cluster to create the connector instance in. The cluster name must be configured through the kafkaConnect.clusters[] Jikkou’s configuration setting (see: Configuration).

jikkou.io/config-override: [optional]

The JSON client configurations to override for connecting to the Kafka Connect cluster. The configuration properties passed through this annotation override any cluster properties defined in the Jikkou’s configuration setting (see: Configuration).

apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  name: "my-connector"
  labels:
    kafka.jikkou.io/connect-cluster: "my-connect-cluster"
  annotations:
    jikkou.io/config-override: |
      { "url": "http://localhost:8083" }      

Specification

spec.connectorClass [required]

The name or alias of the class for this connector.

spec.tasksMax [optional]

The maximum number of tasks for the Kafka Connector. Default is 1.

spec.config [required]

The connector’s configuration properties.

spec.state [optional]

The state the connector should be in. Defaults to running.

Below are the valid values:

  • running: Transition the connector and its tasks to RUNNING state.
  • paused: Pause the connector and its tasks, which stops message processing until the connector is resumed.
  • stopped: Completely shut down the connector and its tasks. The connector config remains present in the config topic of the cluster (if running in distributed mode), unmodified.

Examples

The following is an example of a resource describing a Kafka connector:

---
# Example: file: kafka-connector-filestream-sink.yaml
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  name: "local-file-sink"
  labels:
    kafka.jikkou.io/connect-cluster: "my-connect-cluster"
spec:
  connectorClass: "FileStreamSink"
  tasksMax: 1
  config:
    file: "/tmp/test.sink.txt"
    topics: "connect-test"
  state: "RUNNING"

Listing KafkaConnector

You can retrieve the state of Kafka Connector instances running on your Kafka Connect clusters using the jikkou get kafkaconnectors (or jikkou get kc) command.

Usage

$jikkou get kc --help

Usage:

Get all 'KafkaConnector' resources.

jikkou get kafkaconnectors [-hV] [--expand-status] [-o=<format>]
                           [-s=<expressions>]...

Description:

Use jikkou get kafkaconnectors when you want to describe the state of all
resources of type 'KafkaConnector'.

Options:

      --expand-status     Retrieves additional information about the status of
                            the connector and its tasks.
  -h, --help              Show this help message and exit.
  -o, --output=<format>   Prints the output in the specified format. Allowed
                            values: json, yaml (default yaml).
  -s, --selector=<expressions>
                          The selector expression use for including or
                            excluding resources.
  -V, --version           Print version information and exit.

(The output from your current Jikkou version may be different from the above example.)

Examples

(command)

$ jikkou get kc --expand-status 

(output)

apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  name: "local-file-sink"
  labels:
    kafka.jikkou.io/connect-cluster: "localhost"
spec:
  connectorClass: "FileStreamSink"
  tasksMax: 1
  config:
    file: "/tmp/test.sink.txt"
    topics: "connect-test"
  state: "RUNNING"
status:
  connectorStatus:
    name: "local-file-sink"
    connector:
      state: "RUNNING"
      worker_id: "localhost:8083"
    tasks:
      id: 1
      state: "RUNNING"
      worker_id: "localhost:8083"

The status.connectorStatus provides the connector status, as reported by the Kafka Connect REST API.

3 - Validations

Learn how to use the validations provided by the Kafka Connect extension.

Jikkou ships with the following built-in validations:

4 - Annotations

Learn how to use the metadata annotations provided by the Kafka Connect extension.

This section lists a number of well known annotations, that have defined semantics. They can be attached to KafkaConnect resources through the metadata.annotations field and consumed as needed by extensions (i.e., validations, transformations, controller, collector, etc.).

List of built-in annotations

5 - Labels

Learn how to use the metadata labels provided by the Kafka Connect extension.

This section lists a number of well known labels, that have defined semantics. They can be attached to KafkaConnect resources through the metadata.labels field and consumed as needed by extensions (i.e., validations, transformations, controller, collector, etc.).

Labels

kafka.jikkou.io/connect-cluster

# Example
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  labels:
    kafka.jikkou.io/connect-cluster: 'my-connect-cluster'

The value of this label defined the name of the Kafka Connect cluster to create the connector instance in. The cluster name must be configured through the kafkaConnect.clusters[] Jikkou’s configuration setting (see: Configuration).

6 - Actions

Learn how to use the actions provided by the Extension Provider for Kafka Connect.

Here, you will find the list of actions provided by the Extension Provider for Kafka Connect.

Kafka Connect Action

More information:

6.1 - KafkaConnectRestartConnectors

Learn how to use the KafkaConnectRestartConnector action.

The KafkaConnectRestartConnectors action allows a user to restart all or just the failed Connector and Task instances for one or multiple named connectors.

Usage (CLI)

Usage:

Execute the action.

jikkou action KafkaConnectRestartConnectors execute [-hV] [--include-tasks]
[--only-failed] [--connect-cluster=PARAM] [--logger-level=<level>]
[-o=<format>] [--connector-name=PARAM]...

DESCRIPTION:

The KafkaConnectRestartConnectors action a user to restart all or just the
failed Connector and Task instances for one or multiple named connectors.

OPTIONS:

      --connect-cluster=PARAM
                          The name of the connect cluster.
      --connector-name=PARAM
                          The connector's name.
  -h, --help              Show this help message and exit.
      --include-tasks     Specifies whether to restart the connector instance
                            and task instances (includeTasks=true) or just the
                            connector instance (includeTasks=false)
      --logger-level=<level>
                          Specify the log level verbosity to be used while
                            running a command.
                          Valid level values are: TRACE, DEBUG, INFO, WARN,
                            ERROR.
                          For example, `--logger-level=INFO`
  -o, --output=<format>   Prints the output in the specified format. Allowed
                            values: JSON, YAML (default YAML).
      --only-failed       Specifies whether to restart just the instances with
                            a FAILED status (onlyFailed=true) or all instances
                            (onlyFailed=false)
  -V, --version           Print version information and exit.

Examples

Restart all connectors for all Kafka Connect clusters.

jikkou action kafkaconnectrestartconnectors execute

(output)

---
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
  labels: {}
  annotations: {}
results:
- status: "SUCCEEDED"
  data:
    apiVersion: "kafka.jikkou.io/v1beta1"
    kind: "KafkaConnector"
    metadata:
      name: "local-file-sink"
      labels:
        kafka.jikkou.io/connect-cluster: "my-connect-cluster"
      annotations: {}
    spec:
      connectorClass: "FileStreamSink"
      tasksMax: 1
      config:
        file: "/tmp/test.sink.txt"
        topics: "connect-test"
      state: "RUNNING"
    status:
      connectorStatus:
        name: "local-file-sink"
        connector:
          state: "RUNNING"
          workerId: "connect:8083"
        tasks:
        - id: 0
          state: "RUNNING"
          workerId: "connect:8083"

Restart all connectors with a FAILED status on all Kafka Connect clusters.

jikkou action kafkaconnectrestartconnectors execute \
--only-failed

Restart specific connector and tasks for on Kafka Connect cluster

jikkou action kafkaconnectrestartconnectors execute \
--cluster-name my-connect-cluster
--connector-name local-file-sink \
--include-tasks