This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Documentation

1 - Overview

What is Jikkou ?

Welcome to the Jikkou documentation! Jikkou, means “execution (e.g. of a plan) or actual state (of things)” in Japanese.

What Is Jikkou ?

Jikkou is a powerful, flexible open-source framework that enables self-serve resource provisioning. It allows developers and DevOps teams to easily manage, automate, and provision all the resources needed for their Apache Kafka® platform.

Jikkou was born with the aim to streamline day-to-day operations on Apache Kafka®, ensuring that platform governance is no longer a tedious and boring task for both developers and administrators.

What Are The Use-Cases ?

Jikkou is primarily used as a GitOps solution for Kafka configuration management.

Here are some of the various use cases we’ve observed in different projects:

  • Topic as a Service: Build a self-serve platform for managing Kafka topics.
  • ACL Management: Centrally manage all ACLs of an Apache Kafka cluster.
  • Kafka Connectors Management: Deploy and manage Kafka Connect connectors.
  • Ad Hoc Changes: Apply ad hoc changes as needed.
  • Audit: Easily check configurations of topics, brokers, or identify divergences between different environments.
  • Kafka Configuration Backup: Periodically export all critical configurations of your Kafka cluster.
  • Configuration Replication: Replicate the Kafka configuration from one cluster to another.

How Does Jikkou Work ?

Jikkou offers flexibility in deployment, functioning either as a simple CLI (Command Line Interface) or as a REST server, based on your requirements.

By adopting a stateless approach, Jikkou does not store any internal state. Instead, it leverages your platforms or services as the source of truth. This design enables seamless integration with other solutions, such as Ansible and Terraform, or allows for ad hoc use for specific tasks, making Jikkou incredibly flexible and versatile.

Is Jikkou For Me ?

Jikkou can be implemented regardless of the size of your team or data platform.

Small Development Team

Jikkou is particularly useful for small development teams looking to quickly automate the creation and maintenance of their topics without having to implement a complex solution that requires learning a new technology or language.

Centralized Infrastructure (DevOps) Team

Jikkou can be very effective in larger contexts, where the configuration of your Kafka Topics, ACLs, and Quotas for all your data platform is managed by a single and centralized devops team.

Decentralized Data Product Teams

In an organization adopting Data Mesh principles, Jikkou can be leveraged in a decentralized way by each of your Data Teams to manage all the assets (e.g. Topics, ACLs, Schemas, Connectors, etc.) necessary to expose and manage their Data Products.

Can I Use Jikkou with my Apache Kafka vendor ?

Jikkou can be used any Apache Kafka infrastructures, including:

2 - Install Jikkou

This guide shows how to install the Jikkou CLI.

Jikkou can be installed either from source, or from releases.

The latest stable release of jikkou (x86) for Linux, and macOS can be retrieved via SDKMan!:

sdk install jikkou

From The Jikkou Project

Releases

Every release released versions of Jikkou is available:

These are the official ways to get Jikkou releases that you manually downloaded and installed.

Install From Release distribution

  1. Download your desired version
  2. Unpack it (unzip jikkou-0.34.0-linux-x86_64.zip)
  3. Move the unpacked directory to the desired destination (mv jikkou-0.34.0-linux-x86_64 /opt/jikkou)
  4. Add the executable to your PATH (export PATH=$PATH:/opt/jikkou/bin)

From there, you should be able to run the client: jikkou help.

It is recommended to install the bash/zsh completion script jikkou_completion:

wget https://raw.githubusercontent.com/streamthoughts/jikkou/master/jikkou_completion . jikkou_completion

or alternatively, run the following command for generation the completion script.

$ source <(jikkou generate-completion)

Using Docker Image

# Create a Jikkou configfile (i.e., jikkouconfig)
cat << EOF >jikkouconfig
{
  "currentContext" : "localhost",
  "localhost" : {
    "configFile" : null,
    "configProps" : {
      "kafka.client.bootstrap.servers" : "localhost:9092"
    }
  }
}
EOF

# Run Docker
docker run -it \
--net host \
--mount type=bind,source="$(pwd)"/jikkouconfig,target=/etc/jikkou/config \
streamthoughts/jikkou:latest -V

Development Builds

In addition to releases you can download or install development snapshots of Jikkou.

From Docker Hub

Docker images are built and push to Docker Hub from the latest main branch. They are not official releases, and may not be stable. However, they offer the opportunity to test the cutting edge features.

$ docker run -it streamthoughts/jikkou:main

From Source (Linux, macOS)

Building Jikkou from source is slightly more work, but is the best way to go if you want to test the latest ( pre-release) Jikkou version.

Prerequisites

To build the project you will need:

  • Java 21 (i.e. $JAVA_HOME environment variable is configured).
  • GraalVM 22.1.0 or newer to create native executable
  • TestContainer to run integration tests

Create Native Executable

# Build and run all tests
./mvnw clean verify -Pnative

You can then execute the native executable with: ./jikkou-cli/target/jikkou-$PROJECT_VERSION-runner

Build Debian Package (.deb)

# Build and run all tests
./mvnw clean package -Pnative
./mvnw package -Pdeb

You can then install the package with: sudo dpkg -i ./dist/jikkou-$PROJECT_VERSION-linux-x86_64.deb

NOTE: Jikkou will install itself in the directory : /opt/jikkou

Build RPM Package

# Build and run all tests
./mvnw clean package -Pnative
./mvnw package -Prpm

The RPM package will available in the ./target/rpm/jikkou/RPMS/noarch/ directory.

3 - Jikkou Tutorials

Learn common Jikkou tasks and use cases.

Try the tutorials for common Jikkou tasks and use cases.

3.1 - Jikkou Getting Started

This guide covers how you can quickly get started using Jikkou.

This document will guide you through setting up Jikkou in a few minutes and managing your first resources with Jikkou.

Prerequisites

The following prerequisites are required for a successful and properly use of Jikkou.

Make sure the following is installed:

  • An Apache Kafka cluster.
  • Java 21 (not required when using the binary version).

Start your local Apache Kafka Cluster

You must have access to an Apache Kafka cluster for using Jikkou. Most of the time, the latest version of Jikkou is always built for working with the most recent version of Apache Kafka.

Make sure the Docker is up and running.

Then, run the following commands:

$ git clone https://github.com/streamthoughts/jikkou
$ cd jikkou
$ ./up              # use ./down for stopping the docker-compose stack

Run Jikkou

Download the latest distribution (For Linux)

Run the following commands to install the latest version:

wget https://github.com/streamthoughts/jikkou/releases/download/v0.34.0/jikkou-0.34.0-linux-x86_64.zip && \
unzip jikkou-0.34.0-linux-x86_64.zip  && \
cp jikkou-0.34.0-linux-x86_64/bin/jikkou $HOME/.local/bin && \
source <(jikkou generate-completion) && \
jikkou --version

For more details, or for other options, see the installation guide.

Configure Jikkou for your local Apache Kafka cluster

Set configuration context for localhost

jikkou config set-context localhost --config-props=kafka.client.bootstrap.servers=localhost:9092

Show the complete configuration.

jikkou config view --name localhost

Finally, let’s check if your cluster is accessible:

jikkou health get kafka

(output)

If OK, you should get an output similar to :

---
name: "kafka"
status: "UP"
details:
  resource: "urn:kafka:cluster:id:KRzY-7iRTHy4d1UVyNlcuw"
  brokers:
    - id: "1"
      host: "localhost"
      port: 9092

Create your first topics

First, create a resource YAML file describing the topics you want to create on your cluster:

file: kafka-topics.yaml

apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopicList"
items:
  - metadata:
      name: 'my-first-topic'
    spec:
      partitions: 5
      replicationFactor: 1
      configs:
        cleanup.policy: 'compact'

  - metadata:
      name: 'my-second-topic'
    spec:
      partitions: 4
      replicationFactor: 1
      configs:
        cleanup.policy: 'delete'

Then, run the following Jikkou command to trigger the topic creation on the cluster:

jikkou create -f ./kafka-topics.yaml

(output)

TASK [ADD] Add topic 'my-first-topic' (partitions=5, replicas=-1, configs=[cleanup.policy=compact]) - CHANGED
  {
    "changed": true,
    "end": 1683986528117,
    "resource": {
      "name": "my-first-topic",
      "partitions": {
        "after": 5
      },
      "replicas": {
        "after": -1
      },
      "configs": {
        "cleanup.policy": {
          "after": "compact",
          "operation": "ADD"
        }
      },
      "operation": "ADD"
    },
    "failed": false,
    "status": "CHANGED"
  }
  TASK [ADD] Add topic 'my-second-topic' (partitions=4, replicas=-1, configs=[cleanup.policy=delete]) - CHANGED
  {
    "changed": true,
    "end": 1683986528117,
    "resource": {
      "name": "my-second-topic",
      "partitions": {
        "after": 4
      },
      "replicas": {
        "after": -1
      },
      "configs": {
        "cleanup.policy": {
          "after": "delete",
          "operation": "ADD"
        }
      },
      "operation": "ADD"
    },
    "failed": false,
    "status": "CHANGED"
  }
  EXECUTION in 772ms
ok:
  0, created:
    2, altered:
      0, deleted:
        0 failed: 0

Finally, you can verify that topics are created on the cluster

jikkou get kafkatopics --default-configs

Update Kafka Topics

Edit your kafka-topics.yaml to add a retention.ms: 86400000 property to the defined topics.

Then, run the following command.

jikkou update -f ./kafka-topics.yaml

Delete Kafka Topics

To delete all topics defines in the topics.yaml, add an annotation jikkou.io/delete: true as follows:

apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopicList"
metadata:
  annotations:
    # Annotation to specify that all resources must be deleted.
    jikkou.io/delete: true
items:
  - metadata:
      name: 'my-first-topic'
    spec:
      partitions: 5
      replicationFactor: 1
      configs:
        cleanup.policy: 'compact'

  - metadata:
      name: 'my-second-topic'
    spec:
      partitions: 4
      replicationFactor: 1
      configs:
        cleanup.policy: 'delete'

Then, run the following command:

$ jikkou apply \
    --files ./kafka-topics.yaml \
    --selector "metadata.name MATCHES (my-.*-topic)" \
    --dry-run

Using the dry-run option, give you the possibility to check the changes that will be made before applying them.

Now, rerun the above command without the --dry-run option to definitively delete the topics.

Reading the Help

To learn more about the available Jikkou commands, use jikkou help or type a command followed by the -h flag:

$ jikkou help get

Next Steps

Now, you’re ready to use Jikkou!🚀

As next steps, we suggest reading the following documentation in this order:

  • Learn Jikkou concepts
  • Read the Developer Guide to understand how to use the Jikkou API for Java
  • Look at the examples

4 - Jikkou CLI Documentation

Learn Jikkou’s CLI-based workflows.

Hands-on: Try the Jikkou: Get Started tutorials.

4.1 - Basic CLI Features

Hands-on: Try the Jikkou: Get Started tutorials.

The command line interface to Jikkou is the jikkou command, which accepts a variety of subcommands such as jikkou apply or jikkou validate.

To view a list of the commands available in your current Jikkou version, run jikkou with no additional arguments:

Usage: 
jikkou [-hV] [--logger-level=<level>] [COMMAND]


Jikkou CLI:: A command-line client designed to provide an efficient and easy way to manage, automate, and provision all the assets of your data infrastructure.

Find more information at: https://streamthoughts.github.io/jikkou/.

OPTIONS:

  -h, --help      Show this help message and exit.
      --logger-level=<level>
                  Specify the log level verbosity to be used while running a command.
                  Valid level values are: TRACE, DEBUG, INFO, WARN, ERROR.
                  For example, `--logger-level=INFO`
  -V, --version   Print version information and exit.

CORE COMMANDS:
  apply                     Update the resources as described by the resource definition files.
  create                    Create resources from the resource definition files (only non-existing resources will be created).
  delete                    Delete resources that are no longer described by the resource definition files.
  diff                      Show changes required by the current resource definitions.
  get                       Display one or many specific resources.
  prepare                   Prepare the resource definition files for validation.
  update                    Create or update resources from the resource definition files
  validate                  Check whether the resources definitions meet all validation requirements.

SYSTEM MANAGEMENT COMMANDS:
  action                    List/execute actions.
  health                    Print or describe health indicators.

ADDITIONAL COMMANDS:
  api-extensions            Print the supported API extensions
  api-resources             Print the supported API resources
  config                    Sets or retrieves the configuration of this client
  generate-completion       Generate bash/zsh completion script for jikkou.
  help                      Display help information about the specified command.

(The output from your current Jikkou version may be different than the above example.)

Checking Jikkou Version

Run the jikkou --version to display your current installation version:

Jikkou version "0.32.0" 2023-11-28
JVM: 21.0.1 (GraalVM Community Substrate VM 21.0.1+12)

Shell Tab-completion

It is recommended to install the bash/zsh completion script jikkou_completion.

The completion script can be downloaded from the project Github repository:

wget https://raw.githubusercontent.com/streamthoughts/jikkou/main/jikkou_completion . jikkou_completion

or alternatively, you can run the following command to generate it.

source <(jikkou generate-completion)

4.2 - CLI Configuration

Learn how to configure Jikkou CLI.

Hands-on: Try the Jikkou: Get Started tutorials.

Configuration

To set up the configuration settings used by Jikkou CLI, you will need create a jikkou config file, which is created automatically when you create a configuration context using:

jikkou config set-context <context-name> [--config-file=<config-gile>] [--config-props=<config-value>]

By default, the configuration of jikkou is located under the path $HOME/.jikkou/config.

This jikkou config file defines all the contexts that can be used by jikkou CLI.

For example, below is the config file created during the Getting Started.

{
  "currentContext": "localhost",
  "localhost": {
    "configFile": null,
    "configProps": {
      "kafka.client.bootstrap.servers": "localhost:9092"
    }
  }
}

Most of the time, a context does not directly contain the configuration properties to be used, but rather points to a specific HOCON (Human-Optimized Config Object Notation) through the configFile property.

Then, the configProps allows you to override some of the property define by this file.

In addition, if no configuration file path is specified, Jikkou will lookup for an application.conf to those following locations:

  • ./application.conf
  • $HOME/.jikkou/application.conf

Finally, Jikkou always fallback to a reference.conf file that you can use as a template to define your own configuration.

reference.conf:

jikkou {

  extension.providers {
    # By default, disable all extensions
    default.enabled: true
    # Explicitly enabled/disable extensions
    #<provider_name>.enabled: <boolean>
    # schemaregistry.enabled = true
    # kafka.enabled = true
    # aiven.enabled = true
    # kafkaconnect.enabled = true
  }

  # Configure Jikkou Proxy Mode
  # proxy {
  #  url = "http://localhost:8080"
  # }

  # Kafka Extension
  kafka {
    # The default Kafka Client configuration
    client {
      bootstrap.servers = "localhost:9092"
      bootstrap.servers = ${?JIKKOU_DEFAULT_KAFKA_BOOTSTRAP_SERVERS}
    }
    brokers {
      # If 'True' 
      waitForEnabled = true
      waitForEnabled = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_ENABLED}
      # The minimal number of brokers that should be alive for the CLI stops waiting.
      waitForMinAvailable = 1
      waitForMinAvailable = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_MIN_AVAILABLE}
      # The amount of time to wait before verifying that brokers are available.
      waitForRetryBackoffMs = 1000
      waitForRetryBackoffMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_RETRY_BACKOFF_MS}
      # Wait until brokers are available or this timeout is reached.
      waitForTimeoutMs = 60000
      waitForTimeoutMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_TIMEOUT_MS}
    }
  }

  schemaRegistry {
    url = "http://localhost:8081"
    url = ${?JIKKOU_DEFAULT_SCHEMA_REGISTRY_URL}
  }

  # The default custom transformations to apply on any resources.
  transformations = []

  # The default custom validations to apply on any resources.
  validations = [
    {
      name = "topicMustHaveValidName"
      type = io.streamthoughts.jikkou.kafka.validation.TopicNameRegexValidation
      priority = 100
      config = {
        topicNameRegex = "[a-zA-Z0-9\\._\\-]+"
        topicNameRegex = ${?VALIDATION_DEFAULT_TOPIC_NAME_REGEX}
      }
    },
    {
      name = "topicMustHavePartitionsEqualsOrGreaterThanOne"
      type = io.streamthoughts.jikkou.kafka.validation.TopicMinNumPartitionsValidation
      priority = 100
      config = {
        topicMinNumPartitions = 1
        topicMinNumPartitions = ${?VALIDATION_DEFAULT_TOPIC_MIN_NUM_PARTITIONS}
      }
    },
    {
      name = "topicMustHaveReplicasEqualsOrGreaterThanOne"
      type = io.streamthoughts.jikkou.kafka.validation.TopicMinReplicationFactorValidation
      priority = 100
      config = {
        topicMinReplicationFactor = 1
        topicMinReplicationFactor = ${?VALIDATION_DEFAULT_TOPIC_MIN_REPLICATION_FACTOR}
      }
    }
  ]
  # The default custom reporters to report applied changes.
  reporters = [
    # Uncomment following lines to enable default kafka reporter
    #    {
    #     name = "default"
    #      type = io.streamthoughts.jikkou.kafka.reporter.KafkaChangeReporter
    #      config = {
    #        event.source = "jikkou/cli"
    #        kafka = {
    #          topic.creation.enabled = true
    #          topic.creation.defaultReplicationFactor = 1
    #          topic.name = "jikkou-resource-change-event"
    #          client = ${jikkou.kafka.client} {
    #            client.id = "jikkou-reporter-producer"
    #          }
    #        }
    #      }
    #    }
  ]
}

Listing Contexts

$ jikkou config get-contexts 
 
 NAME         
 localhost *
 development
 staging
 production

Verify Current Context

You can use jikkou config current-context command to show the context currently used by Jikkou CLI.

$ jikkou config current-context
Using context 'localhost'

 KEY          VALUE                                                                         
 ConfigFile   
 ConfigProps  {"kafka.client.bootstrap.servers": "localhost:9092"}  

Verify Current Configuration

You can use jikkou config view command to show the configuration currently used by Jikkou CLI.

4.3 - Automating Jikkou

Learn automating Jikkou

4.3.1 - Automate Jikkou with GitHub Actions

Learn Jikkou Setup Github Action in your CI/CD Workflows

Setup Jikkou

The streamthoughts/setup-jikkou action is a JavaScript action that sets up Jikkou in your GitHub Actions workflow by:

  • Downloading a specific version of Jikkou CLI and adding it to the PATH.
  • Configuring JIKKOU CLI with a custom configuration file.

After you’ve used the action, subsequent steps in the same job can run arbitrary Jikkou commands using the GitHub Actions run syntax. This allows most Jikkou commands to work exactly like they do on your local command line.

Usage

steps:
  - uses: streamthoughts/setup-jikkou@v1

A specific version of Jikkou CLI can be installed:

steps:
  - uses: streamthoughts/setup-jikkou@v0.1.0
    with:
      jikkou_version: 0.29.0

A custom configuration file can be specified:

steps:
  - uses: streamthoughts/setup-jikkou@v0.1.0
    with:
      jikkou_config: ./config/jikkouconfig.json

Inputs

This Action additionally supports the following inputs :

PropertyDefaultDescription
jikkou_versionlatestThe version of Jikkou CLI to install. A value of latest will install the latest version of Jikkou CLI.
jikkou_configThe path to the Jikkou CLI config file. If set, Jikkou CLI will be configured through the JIKKOUCONFIG environment variable.

5 - Jikkou API Server Documentation

Learn Jikkou’s API Server usages.

Jikkou API Server provides a REST interface to any platform supported by Jikkou, making it even easier to manage, automate and visualise all your data platform assets.

Jikkou CLI can be used in combination with Jikkou API Server by configuring it in proxy mode. In this mode, the CLI no longer connects directly to your various platforms, but forwards all operations to the API server. This deployment method allows you to enhance the overall security of the platforms managed through Jikkou.

5.1 - Install Jikkou API Server

This guide shows how to install Jikkou API Server.

Releases

The latest stable release of Jikkou API Server is available:

Standalone Installation

Follow these few steps to download the latest stable versions and get started.

Prerequisites

To be able to run Jikkou API Server, the only requirement is to have a working Java 21 installation. You can check the correct installation of Java by issuing the following command:

java -version

Step 1: Download

Download the latest Java binary distribution from the GitHub Releases (e.g. jikkou-api-server-0.31.0.zip)

Unpack the download distribution and move the unpacked directory to a desired destination

unzip jikkou-api-server-$LATEST_VERSION.zip
mv jikkou-api-server-$LATEST_VERSION /opt/jikkou

Step 2: Start the API Server

Launch the application with:

./bin/jikkou-api-server.sh

Step 3: Test the API Server

$ curl -sX GET http://localhost:28082 -H "Accept: application/json" | jq

{
  "version": "0.31.0",
  "build_time": "2023-11-14T18:07:38+0000",
  "commit_id": "dae1be11c092256f36c18c8f1d90f16b0c951716",
  "_links": {
    "self": {
      "href": "/",
      "templated": false
    },
    "get-apis": {
      "href": "/apis",
      "templated": false
    }
  }
}

Step 4: Stop the API Server

PID=`ps -ef | grep -v grep | grep JikkouApiServer | awk '{print $2}'`
kill $PID

Docker

# Run Docker
docker run -it \
--net host \
streamthoughts/jikkou-api-server:latest

Development Builds

In addition to releases you can download or install development snapshots of Jikkou API Server.

From Docker Hub

Docker images are built and push to Docker Hub from the latest main branch.

They are not official releases, and may not be stable. However, they offer the opportunity to test the cutting edge features.

$ docker run -it streamthoughts/jikkou-api-server:main

5.2 - Configurations

Learn how to configure Jikkou API Server.

Jikkou API Server is built with Micronaut Framework.

The default configuration file is located in the installation directory of you server under the path /etc/application.yaml.

You can either modify this configuration file directly or create a new one. Then, your configuration file path can be targeted through the MICRONAUT_CONFIG_FILES environment variable.

A YAML Configuration file example can be found here: application.yaml

5.2.1 - API Server

Learn how to configure the Jikkou API server.

Running Server on a Specific Port

By default, the server runs on port 28082. However, you can set the server to run on a specific port:

# ./etc/application.yaml
micronaut:
  server:
    port: 80  # Port used to access APIs

endpoints:
  all:
    port: 80  # Port used to access Health endpoints

Enabling Specific Extension Providers

By default, the server is configured to run only with the core and kafka extension providers. However, you can enable (or disable) additional providers:

jikkou:
  extensions.provider:
    # By default, disable all extension providers.
    default.enabled: false
    
    # Explicitly enabled/disable an extension provider
    #<provider_name>.enabled: <boolean>
    core.enabled: true
    kafka.enabled: true
    # schemaregistry.enabled: true
    # aiven.enabled: true
    # kafkaconnect.enabled: true

5.2.2 - Authentication

Learn how to secure access to Jikkou API server.

Enable Security

To enable secure access to the API Server:

Configuration File

Update the configuration file (i.e., application.yaml) of the server with:

micronaut:
  security:
    enabled: true

Environment Variable

As an alternative, you can set the following environment variable MICRONAUT_SECUTIRY_ENABLED=true.

Unauthorized Access

When accessing a secured path, the server will return the following response if access is not authorized:

{
  "message": "Unauthorized",
  "errors": [
    {
      "status": 401,
      "error_code": "authentication_user_unauthorized",
      "message": "Unauthorized"
    }
  ]
}

5.2.2.1 - Basic Auth

Learn how to secure Jikkou API Server using Basic HTTP Authentication Scheme.

Jikkou API Server can be secured using a Basic HTTP Authentication Scheme.

RFC7617 defines the “Basic” Hypertext Transfer Protocol (HTTP) authentication scheme, which transmits credentials as user-id/password pairs, encoded using Base64.

Basic Authentication should be used over a secured connection using HTTPS.

Configure Basic HTTP Authentication

Step1: Enable security

Add the following configuration to your server configuration.

# ./etc/application.yaml
micronaut:
  security:
    enabled: true

Step2: Configure the list of users

The list of username/password authorized to connect to the API server can be configured as follows:

# ./etc/application.yaml
jikkou:
  security:
    basic-auth:
      - username: "admin"
        password: "{noop}password"

For production environment, password must not be configured in plaintext. Password can be passed encoded in bcrypt, scrypt, argon2, and sha256.

Example

echo -n password | sha256sum
5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8
# ./etc/application.yaml
jikkou:
  security:
    basic-auth:
      - username: "admin"
        password: "{sha256}5e884898da28047151d0e56f8dc6292773603d0d6aabbdd62a11ef721d1542d8"

Step3: Validate authentication

Encode credentials

echo -n "admin:password" | base64 
YWRtaW46cGFzc3dvcmQ=

Send request

curl -IX GET http://localhost:28082/apis/kafka.jikkou.io/v1beta2/kafkabrokers \
-H "Accept: application/json" \
-H "Authorization: Basic YWRtaW46cGFzc3dvcmQ"

HTTP/1.1 200 OK
Content-Type: application/hal+json
content-length: 576

5.2.2.2 - JWT

Learn how to secure Jikkou API Server using JWT (JSON Web Token) Authentication.

Jikkou API Server can be secured using JWT (JSON Web Token) Authentication.

Configure JWT

Step1: Set JWT signature secret

Add the following configuration to your server configuration.

# ./etc/application.yaml
micronaut:
  security:
    enabled: true
    authentication: bearer <1>
    token:
      enabled: true
      jwt:
        signatures:
          secret:
            generator:
              secret: ${JWT_GENERATOR_SIGNATURE_SECRET:pleaseChangeThisSecretForANewOne} <2>
  • <1> Set authentication to bearer to receive a JSON response from the login endpoint.
  • <2> Change this to your own secret and keep it safe (do not store this in your VCS).

Step2: Generate a Token

Generate a valid JSON Web Token on https://jwt.io/ using your secret.

Example with pleaseChangeThisSecretForANewOne as signature secret.

TOKEN=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.6cD3MnZmX2xyEAWyh-GgGD11TX8SmvmHVLknuAIJ8yE

Step3: Validate authentication

$ curl -I -X GET http://localhost:28082/apis/kafka.jikkou.io/v1beta2/kafkabrokers \
-H "Accept: application/json" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.6cD3MnZmX2xyEAWyh-GgGD11TX8SmvmHVLknuAIJ8yE"

HTTP/1.1 200 OK
Content-Type: application/hal+json
content-length: 576

5.2.3 - CLI Proxy Mode

Learn how to configure Jikkou CLI in proxy mode.

Configuration

Step 1: Enable Proxy Mode

To enable proxy mode so that the CLI communicates directly with your API Server, add the following parameters to your configuration:

jikkou {
  # Proxy Configuration
  proxy {
    # Specify whether proxy mode is enabled (default: false).
    enabled = true
    # URL of the API Server
    url = "http://localhost:28082"
    # Specifcy whether HTTP request debugging should be enabled (default: false)
    debugging = false
    # The connect timeout in millisecond (if not configured used ` default-timeout` ).
    connect-timeout = 10000
    # The read timeout in millisecond (if not configured used ` default-timeout` ).
    read-timeout = 10000
    # The write timeout in millisecond (if not configured used ` default-timeout` ).
    write-timeout = 10000
    # The default timeout (i.e., for read/connect) in millisecond (default: 10000)
    default-timeout = 10000
    # Security settings to authenticate to the API Server.
    security = {
      # For Token based Authentication.
      # access-token = ""
      # For Username/Password Basic-Authentication.
      # basic-auth = {
      #   username = ""
      #   password = ""
      # }
    }
  }
}

Step 2: Check connection

When enabling Proxy Mode, Jikkou CLI provides the additional command server-info. You can use it to verify the connectivity with teh server.

$ jikkou server-info -o JSON | jq

{
  "version": "0.31.0",
  "build_time": "2023-11-15T10:35:22+0100",
  "commit_id": "f3384d38e606fb32599c175895d0cbef28258540"
}

5.3 - Jikkou - API References

Jikkou - API References

6 - Concepts

Learn the differents concepts used within Jikkou

This section explains key concepts used within Jikkou:

6.1 - Resource

Jikkou Resources are entities that represent the state of a concrete instance of a concept that are part of the state of your system, like a Topic on an Apache Kafka cluster.

Resource Objects

All resources can be distinguished between persistent objects, which are used to describe the desired state of your system, and transient objects, which are only used to enrich or provide additional capabilities for the definition of persistent objects.

A resource is an object with a type (called a Kind) and a concrete model that describe the associated data. All resource are scoped by an API Group and Version.

Resource Definition

Resources are described in YAML format.

Here is a sample resource that described a Kafka Topic.

apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
  name: 'my-topic'
  labels:
    environment: test
  annotations: {}
spec:
  partitions: 1
  replicas: 1
  configs:
    min.insync.replicas: 1
    cleanup.policy: 'delete'

Resource Properties

The following are the properties that can be set to describe a resource:

PropertyDescription
apiVersionThe group/version of the resource type.
kindThe type of the describe resource.
metadata.nameAn optional name to identify the resource.
metadata.labelsArbitrary metadata to attach to the resource that can be handy when you have a lot of resources and you only need to identity or filters some objects.
metadata.annotationsArbitrary non-identifying metadata to attach to the resource to mark them for a specific operation or to record some metadata.
specThe object properties describing a desired state

6.2 - Labels and annotations

Labels

You can use labels to attach arbitrary identifying metadata to objects.

Labels are key/value maps:

metadata:
  labels:
    "key1": "value-1"
    "key2": "value-2"

Example

metadata:
  labels:
    environment: "stating"

Annotations

You can use annotations to attach arbitrary non-identifying metadata to objects.

Annotations are key/value maps:

metadata:
  annotations:
    key1: "value-1"
    key2: "value-2"

Built-in Annotations

jikkou.io/ignore

Used on: All Objects.

This annotation indicates whether the object should be ignored for reconciliation.

jikkou.io/bypass-validations

Used on: All Objects.

This annotation indicates whether the object should bypass the validation chain. In other words, no validations will be applied on the object.

jikkou.io/delete

Used on: All Objects.

This annotation indicates (when set to true) that the object should be deleted from your system.

jikkou.io/resource-location

Used by jikkou.

This annotation is automatically added by Jikkou to an object when loaded from your local filesystem.

jikkou.io/items-count

Used by jikkou.

This annotation is automatically added by Jikkou to an object collection grouping several resources of homogeneous type.

6.3 - Reconciliation

In the context of Jikkou, reconciliation refers to the process of comparing the desired state of an object with the actual state of the system and making any necessary corrections or adjustments to align them.

Changes

A Change represents a difference, detected during reconciliation, between two objects that can reconciled or corrected by adding, updating, or deleting an object or property attached to the actual state of the system.

A Change represents a detected difference between two objects during the reconciliation process. These differences can be reconciled or corrected by adding, updating, or deleting an object or property associated with the actual state of the system

  • Jikkou identifies four types of changes:

  • ADD: Indicates the addition of a new object or property to an existing object.

  • UPDATE: Indicates modifications made to an existing object or property of an existing object.

  • DELETE: Indicates the removal of an existing object or property of an existing object.

  • NONE: Indicates that no changes were made to an existing object or property.

Reconciliation Modes

Depending on the chosen reconciliation mode, only specific types of changes will be applied.

Jikkou provides four distinct reconciliation modes that determine the types of changes to be applied:

  • CREATE: This mode only applies changes that create new resource objects in your system.
  • DELETE: This mode only applies changes that delete existing resource objects in your system.
  • UPDATE: This mode only applies changes that create or update existing resource objects in your system.
  • APPLY_ALL: This mode applies all changes to ensure that the actual state of a resource in the cluster matches the desired state defined in your resource definition file, regardless of the specific type of change.

Each mode corresponds to a command offered by the Jikkou CLI (i.e., create, update, delete, and apply). Choose the appropriate mode based on your requirements.

Reconciliation Options

Depending on the type of resources being reconciled, the controller that will be involved in the reconciliation process might accept some options (i.e., using --options argument).

Mark Resource for Deletion

To delete all the states associated with resource’s entities, you must add the following annotation to the resource definition:

metadata:
  annotations:
    jikkou.io/delete: true

6.4 - Selectors

Selectors allows you to include or exclude some resource objects from being returned or reconciled by Jikkou.

Selector Expressions

Selectors are passed as arguments to Jikkou as expression strings in the following form:

  • <SELECTOR>: <KEY> <OPERATOR> VALUE
  • <SELECTOR>: <KEY> <OPERATOR> (VALUE[, VALUES])

or (using default field selector):

  • <KEY> <OPERATOR> VALUE
  • <KEY> <OPERATOR> (VALUE[, VALUES])

Selectors

Field (default)

Jikkou packs with a built-in FieldSelector allowing to filter resource objects based on a field key.

For example, the expression below shows you how to select only resource having a label environement equals to either staging or production.

metadata.labels.environement IN (staging, production)

Note: In the above example, we have omitted the selector because field is the default selector.

Expression Operators

Five kinds of operators are supported:

  • IN
  • NOTIN
  • EXISTS
  • MATCHES
  • DOESNOTMATCH

Matching Strategies

Jikkou allows you to use multiple selector expressions. To indicate how these expressions are to be combined, you can pass one of the following matching strategies:

  • ALL: A resource is selected if it matches all selectors.
  • ANY: A resource is selected if it matches one of the selectors.
  • NONE: A resource is selected if it matches none of the selectors.

Example:

jikkou get kafkatopics \
--selector 'metadata.name MATCHES (^__.*)' \
--selector 'metadata.name IN (_schemas)' \
--selector-match ANY

6.5 - Transformations

Transformations are applied to inbound resources. Transformations are used to transform, enrich, or filter resource entities before they are validated and thus before the reconciliation process is executed on them.

Available Transformations

You can list all the available transformations using the Jikkou CLI command:

jikkou extensions list --type=Transformation [-kinds <a resource kind to filter returned results>]

Transformation chain

When using Jikkou CLI, you can configure a transformation chain that will be applied to every resource. This chain consists of multiple transformations, each designed to handle different types of resources. Jikkou ensures that a transformation is executed only for the resource types it supports. In cases where a resource is not accepted by a transformation, it is passed to the next transformation in the chain. This process continues until a suitable transformation is found or until all transformations have been attempted.

Configuration

jikkou {
  # The list of transformations to execute
  transformations: [
    {
      # Simple or fully qualified class name of the transformation extension.
      type = ""
      # Priority to be used for executing this transformation extension.
      # The lowest value has the highest priority, so it's run first. Minimum value is -2^31 (highest) and a maximum value is 2^31-1 (lowest).
      # Usually, values under 0 should be reserved for internal transformation extensions.
      priority = 0
      config = {
        # Configuration properties for this transformation
      }
    }
  ]
}

Example

jikkou {
  # The list of transformations to execute
  transformations: [
    {
      # Enforce a minimum number of replicas for a kafka topic
      type = KafkaTopicMinReplicasTransformation
      priority = 100
      config = {
        minReplicationFactor = 4
      }
    },
    {
      # Enforce a {@code min.insync.replicas} for a kafka topic.
      type = KafkaTopicMinInSyncReplicasTransformation
      priority = 100
      config = {
        minInSyncReplicas = 2
      }
    }
  ]
}

6.6 - Validations

Validations are applied to inbound resources to ensure that the resource entities adhere to specific rules or constraints. These validations are carried out after the execution of the transformation chain and before the reconciliation process takes place.

Available Validations

You can list all the available validations using the Jikkou CLI command:

jikkou api-extensions list --category=validation [--kinds <a resource kind to filter returned results>]

Validation chain

When using Jikkou CLI, you can configure a validation chain that will be applied to every resource. This chain consists of multiple validations, each designed to handle different types of resources. Jikkou ensures that a validation is executed only for the resource types it supports. In cases where a resource is not accepted by a validation, it is passed to the next validation in the chain. This process continues until a suitable validation is found or until all validations have been attempted.

Configuration

jikkou {
  # The list of validations to execute
  validations: [
    {
      # Custom name for the validation rule
      name = ""
      # Simple or fully qualified class name of the validation extension.
      type = ""
      config = {
        # Configuration properties for this validation
      }
    }
  ]
}

Example

jikkou {
  # The list of transformations to execute
  validations: [
    {
      # Custom name for the validation rule
      name = topicMustBePrefixedWithRegion
      # Simple or fully qualified class name of the validation extension.
      type = TopicNameRegexValidation
      # The config values that will be passed to the validation.
      config = {
        topicNameRegex = "(europe|northamerica|asiapacific)-.+"
      }
    }
  ]
}

6.7 - Template

Template helps you to dynamically define resource definition files from external data.

Template Engine

Jikkou provides a simple templating mechanism based-on Jinjava, a Jinja template engine for Java.

Read the official documentation of Jinja to learn more about the syntax and semantics of the template engine.

How Does It Work ?

Jikkou performs the rendering of your template in two phases:

  1. First, an initial rendering is performed using only the values and labels passed through the command-lines arguments.
    • Thus, it is perfectly OK if your resource file is not initially a valid YAML file.
  2. Then, a second and final rendering is performed after parsing the YAML resource file using the additional values and labels as defined into the YAML resource file.
    • Therefore, it’s important that your resource file is converted into a valid YAML file after the first rendering.

VariablesCLI-Configuration.md

Jikkou defines a number of top-level variables that are passed to the template engine.

  • values:

    • The values passed into the template through the command-line --values-files and/or --set-value arguments
    • In addition, values can be defined into the application.conf file and directly into the template file using the property template.values.
    • By default, values is empty.
  • labels:

    • The labels passed into the template through the command-line argument: --set-label.
    • In addition, labels can be defined into the template file using the property metadata.labels.
    • By default, labels is empty.
  • system.env:

    • This provides access to all environment variables.
  • system.props:

    • This provides access to all system properties.

Template Values

When using templating, a resource definition file may contain the additional property template. fields:

apiVersion: The api version (required)
kind: The resource kind (required)
metadata:
  labels: The set of key/value pairs that you can use to describe your resource file (optional)
  annotations: The set of key/value pairs automatically generated by the tool (optional)
template:
  values: The set of key/value pairs to be passed to the template engine (optional)
spec: Specification of the resource

Values Data File

Values Data File are used to define all the necessary values (i.e., the variables) to be used for generating a template.

Example

# file: ./values.yaml
topicConfigs:
  partitions: 4
  replicas: 3
topicPrefix: "{{ system.env.TOPIC_PREFIX | default('test', true) }}"
countryCodes:
  - fr
  - be
  - de
  - es
  - uk
  - us

Template Resource File

Example

# file: ./kafka-topics.tpl
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaTopicList'
items:
  { % for country in values.countryCodes % }
  - metadata:
      name: "{{ values.topicPrefix}}-iot-events-{{ country }}"
    spec:
      partitions: { { values.topicConfigs.partitions } }
      replicas: { { values.topicConfigs.replicas } }
      configMapRefs:
        - TopicConfig
    { % endfor % }
---
apiVersion: "core.jikkou.io/v1beta2"
kind: "ConfigMap"
metadata:
  name: TopicConfig
template:
  values:
    default_min_insync_replicas: "{{ values.topicConfigs.replicas | default(3, true) | int | add(-1) }}"
data:
  retention.ms: 3600000
  max.message.bytes: 20971520
  min.insync.replicas: '{% raw %}{{ values.default_min_insync_replicas }}{% endraw %}'

Command

$  TOPIC_PREFIX=local jikkou validate --files topics.tpl --values-files values.yaml 

(Output)

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopicList"
metadata:
  labels: { }
  annotations:
    jikkou.io/resource-location: "file:///tmp/jikkou/topics.tpl"
spec:
  topics:
    - metadata:
        name: "local-iot-events-fr"
      spec:
        partitions: 4
        replicas: 3
        configs:
          min.insync.replicas: "2"
          retention.ms: 3600000
          max.message.bytes: 20971520
    - metadata:
        name: "local-iot-events-be"
      spec:
        partitions: 4
        replicas: 3
        configs:
          min.insync.replicas: "2"
          retention.ms: 3600000
          max.message.bytes: 20971520
    - metadata:
        name: "local-iot-events-de"
      spec:
        partitions: 4
        replicas: 3
        configs:
          min.insync.replicas: "2"
          max.message.bytes: 20971520
          retention.ms: 3600000
    - metadata:
        name: "local-iot-events-es"
      spec:
        partitions: 4
        replicas: 3
        configs:
          min.insync.replicas: "2"
          max.message.bytes: 20971520
          retention.ms: 3600000
    - metadata:
        name: "local-iot-events-uk"
      spec:
        partitions: 4
        replicas: 3
        configs:
          min.insync.replicas: "2"
          max.message.bytes: 20971520
          retention.ms: 3600000
    - metadata:
        name: "local-iot-events-us"
      spec:
        partitions: 4
        replicas: 3
        configs:
          min.insync.replicas: "2"
          max.message.bytes: 20971520
          retention.ms: 3600000

6.8 - Collectors

Collectors are used to collect and describe all entities that exist into your system for a specific resource type.

Available Collectors

You can list all the available collectors using the Jikkou CLI command:

jikkou extensions list --type=Collector [-kinds <a resource kind to filter returned results>]

6.9 - Controllers

Controllers are used to compute and apply changes required to reconcile resources into a managed system.

Available Controllers

You can list all the available controllers using the Jikkou CLI command:

jikkou extensions list --type=Controller [-kinds <a resource kind to filter returned results>]

6.10 - Extensions

Extension Providers

Most of the Jikkou’s features are provided by Jikkou Extension Providers. A provider is a module providing a set of extensions used to manage one or more resources.

Built-in Extension Providers

Jikkou ships with a number of extension providers:

6.11 - Reporters

Reporters can be used to report changes applied by Jikkou to a third-party system.

Configuration

Jikkou allows you to configure multiple reporters as follows:

jikkou {
  # The list of reporters to execute
  reporters: [
    {
      # Custom name for the reporter
      name = ""
      # Simple or fully qualified class name of the transformation extension.
      type = ""
      config = {
        # Configuration properties for this reporter
      }
    }
  ]
}

Built-in implementations

Jikkou packs with some built-in ChangeReporter implementations:

KafkaChangeReporter

The KafkaChangeReporter can be used to send change results into a given kafka topic. Changes will be published as Cloud Events.

Configuration

The below example shows how to configure the KafkaChangeReporter.

jikkou {
  # The default custom reporters to report applied changes.
  reporters = [
    {
      name = "kafka-reporter"
      type = io.streamthoughts.jikkou.kafka.reporter.KafkaChangeReporter
      config = {
        # The 'source' of the event that will be generated.
        event.source = "jikkou/cli"
        kafka = {
          # If 'true', topic will be automatically created if it does not already exist.
          topic.creation.enabled = true
          # The default replication factor used for creating topic.
          topic.creation.defaultReplicationFactor = 1
          # The name of the topic the events will be sent.
          topic.name = "jikkou-resource-change-event"
          # The configuration settings for Kafka Producer and AdminClient
          client = ${jikkou.kafka.client} {
            client.id = "jikkou-reporter-producer"
          }
        }
      }
    }
  ]
}

6.12 - Actions

Actions allow a user to execute a specific and one-shot operation on resources.

Available Actions (CLI)

You can list all the available actions using the Jikkou CLI command:

jikkou api-extensions list --category=action [-kinds <a resource kind to filter returned results>]

Execution Actions (CLI)

You can execute a specific extension using the Jikkou CLI command:

jikkou action <ACTION_NAME> execute [<options>]

7 - Extension Providers

Learn how to use Jikkou Extension Provider to provision and manage configuration assets on your data infrastructure.

The section helps you learn more about the built-in Extension Providers for Jikkou.

7.1 - Core

The core Extensions for Jikkou

Here, you will find information to use the Core extensions.

More information:

7.1.1 - Resources

Here, you will find the list of core resources supported for Jikkou.

Core Resources

More information:

7.1.1.1 - ConfigMap

Learn how to use ConfigMap objects.

You can use a ConfigMap to define reusable data in the form of key/value pairs that can then be referenced and used by other resources.

Specification

---
apiVersion: "core.jikkou.io/v1beta2"
kind: ConfigMap
metadata:
  name: '<CONFIG-MAP-NAME>'   # Name of the ConfigMap (required)
data:                         # Map of key-value pairs (required)
  <KEY_1>: "<VALUE_1>" 

Example

For example, the below ConfigMap show how to define default config properties namedcKafkaTopicConfig that can then reference and used to define multiple KafkaTopic. resources.

---
apiVersion: "core.jikkou.io/v1beta2"
kind: ConfigMap
metadata:
name: 'KafkaTopicConfig'
data:
  cleanup.policy: 'delete'
  min.insync.replicas: 2
  retention.ms: 86400000 # (1 day)

7.2 - Apache Kafka

Lean how to use the Jikkou Extension Provider for Apache Kafka.

Here, you will find information to use the Apache Kafka extensions.

More information:

7.2.1 - Configuration

Learn how to configure the extensions for Apache Kafka.

Here, you will find the list of resources supported for Apache Kafka.

Configuration

The Apache Kafka extension is built on top of the Kafka Admin Client. You can configure the properties to be passed to kafka client through the Jikkou client configuration property jikkou.kafka.client.

Example:

jikkou {
  kafka {
    client {
      bootstrap.servers = "localhost:9092"
      security.protocol = "SSL"
      ssl.keystore.location = "/tmp/client.keystore.p12"
      ssl.keystore.password = "password"
      ssl.keystore.type = "PKCS12"
      ssl.truststore.location = "/tmp/client.truststore.jks"
      ssl.truststore.password = "password"
      ssl.key.password = "password"
    }
  }
}

In addition, the extension support configuration settings to wait for at least a minimal number of brokers before processing.

jikkou {
  kafka {
    brokers {
      # If 'True' 
      waitForEnabled = true
      waitForEnabled = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_ENABLED}
      # The minimal number of brokers that should be alive for the CLI stops waiting.
      waitForMinAvailable = 1
      waitForMinAvailable = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_MIN_AVAILABLE}
      # The amount of time to wait before verifying that brokers are available.
      waitForRetryBackoffMs = 1000
      waitForRetryBackoffMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_RETRY_BACKOFF_MS}
      # Wait until brokers are available or this timeout is reached.
      waitForTimeoutMs = 60000
      waitForTimeoutMs = ${?JIKKOU_KAFKA_BROKERS_WAIT_FOR_TIMEOUT_MS}
    }
  }
}

7.2.2 - Resources

Learn how to use the built-in resources provided by the Extension Provider for Apache Kafka.

Here, you will find the list of resources supported for Apache Kafka.

Apache Kafka Resources

More information:

7.2.2.1 - Kafka Brokers

Learn how to manage Kafka Brokers.

This section describes the resource definition format for kafkabrokers entities, which can be used to define the brokers you plan to manage on a specific Kafka cluster.

Listing KafkaBroker

You can retrieve the state of Kafka Consumer Groups using the jikkou get kafkabrokers (or jikkou get kb) command.

Usage

Usage:

Get all 'KafkaBroker' resources.

jikkou get kafkabrokers [-hV] [--default-configs] [--dynamic-broker-configs]
                        [--list] [--static-broker-configs]
                        [--logger-level=<level>] [-o=<format>]
                        [-s=<expressions>]...

DESCRIPTION:

Use jikkou get kafkabrokers when you want to describe the state of all
resources of type 'KafkaBroker'.

OPTIONS:

      --default-configs   Describe built-in default configuration for configs
                            that have a default value.
      --dynamic-broker-configs
                          Describe dynamic configs that are configured as
                            default for all brokers or for specific broker in
                            the cluster.
  -h, --help              Show this help message and exit.
      --list              Get resources as ResourceListObject.
      --logger-level=<level>
                          Specify the log level verbosity to be used while
                            running a command.
                          Valid level values are: TRACE, DEBUG, INFO, WARN,
                            ERROR.
                          For example, `--logger-level=INFO`
  -o, --output=<format>   Prints the output in the specified format. Allowed
                            values: json, yaml (default yaml).
  -s, --selector=<expressions>
                          The selector expression used for including or
                            excluding resources.
      --static-broker-configs
                          Describe static configs provided as broker properties
                            at start up (e.g. server.properties file).
  -V, --version           Print version information and exit.

(The output from your current Jikkou version may be different from the above example.)

Examples

(command)

$ jikkou get kafkabrokers --static-broker-configs

(output)

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaBroker"
metadata:
  name: "101"
  labels: {}
  annotations:
    kafka.jikkou.io/cluster-id: "xtzWWN4bTjitpL3kfd9s5g"
spec:
  id: "101"
  host: "localhost"
  port: 9092
  configs:
    advertised.listeners: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092"
    authorizer.class.name: "org.apache.kafka.metadata.authorizer.StandardAuthorizer"
    broker.id: "101"
    controller.listener.names: "CONTROLLER"
    controller.quorum.voters: "101@kafka:29093"
    inter.broker.listener.name: "PLAINTEXT"
    listener.security.protocol.map: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
    listeners: "PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://kafka:29093"
    log.dirs: "/var/lib/kafka/data"
    node.id: "101"
    offsets.topic.replication.factor: "1"
    process.roles: "broker,controller"
    transaction.state.log.replication.factor: "1"
    zookeeper.connect: ""

7.2.2.2 - Kafka Consumer Groups

Learn how to manage Kafka Consumer Groups.

This section describes the resource definition format for KafkaConsumerGroup entities, which can be used to define the consumer groups you plan to manage on a specific Kafka cluster.

Listing KafkaConsumerGroup

You can retrieve the state of Kafka Consumer Groups using the jikkou get kafkaconsumergroups (or jikkou get kcg) command.

Usage

$ jikkou get kafkaconsumergroups --help

Usage:

Get all 'KafkaConsumerGroup' resources.

jikkou get kafkaconsumergroups [-hV] [--list] [--offsets]
                               [--logger-level=<level>] [-o=<format>]
                               [--in-states=PARAM]... [-s=<expressions>]...

DESCRIPTION:

Use jikkou get kafkaconsumergroups when you want to describe the state of all
resources of type 'KafkaConsumerGroup'.

OPTIONS:

  -h, --help              Show this help message and exit.
      --in-states=PARAM   If states is set, only groups in these states will be
                            returned. Otherwise, all groups are returned. This
                            operation is supported by brokers with version
                            2.6.0 or later
      --list              Get resources as ResourceListObject.
      --logger-level=<level>
                          Specify the log level verbosity to be used while
                            running a command.
                          Valid level values are: TRACE, DEBUG, INFO, WARN,
                            ERROR.
                          For example, `--logger-level=INFO
  -o, --output=<format>   Prints the output in the specified format. Allowed
                            values: json, yaml (default yaml).
      --offsets           Specify whether consumer group offsets should be
                            described.
  -s, --selector=<expressions>
                          The selector expression used for including or
                            excluding resources.
  -V, --version           Print version information and exit

(The output from your current Jikkou version may be different from the above example.)

Examples

(command)

$ jikkou get kafkaconsumergroups --in-states STABLE --offsets

(output)

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
  name: "my-group"
  labels:
    kafka.jikkou.io/is-simple-consumer: false
status:
  state: "STABLE"
  members:
    - memberId: "console-consumer-b103994e-bcd5-4236-9d03-97065057e594"
      clientId: "console-consumer"
      host: "/127.0.0.1"
      assignments:
        - "my-topic-0"
      offsets:
        - topic: "my-topic"
          partition: 0
          offset: 0
  coordinator:
    id: "101"
    host: "localhost"
    port: 9092

7.2.2.3 - Kafka Topics

Learn how to manage Kafka Topics.

KafkaTopic resources are used to define the topics you want to manage on your Kafka Cluster(s). A KafkaTopic resource defines the number of partitions, the replication factor, and the configuration properties to be associated to a topics.

KafkaTopic

Specification

Here is the resource definition file for defining a KafkaTopic.

apiVersion: "kafka.jikkou.io/v1beta2"  # The api version (required)
kind: "KafkaTopic"                     # The resource kind (required)
metadata:
  name: <The name of the topic>        # (required)
  labels: { }
  annotations: { }
spec:
  partitions: <Number of partitions>   # (optional)
  replicas: <Number of replicas>       # (optional)
  configs:
    <config_key>: <Config Value>       # The topic config properties keyed by name to override (optional)
  configMapRefs: [ ]                   # The list of ConfigMap to be applied to this topic (optional)

The metadata.name property is mandatory and specifies the name of the kafka topic.

To use the cluster default values for the number of partitions and replicas you can set the property spec.partitions and spec.replicas to -1.

Example

Here is a simple example that shows how to define a single YAML file containing two topic definition using the KafkaTopic resource type.

file: kafka-topics.yaml

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
  name: 'my-topic-p1-r1'  # Name of the topic
  labels:
    environment: example
spec:
  partitions: 1           # Number of topic partitions (use -1 to use cluster default)
  replicas: 1             # Replication factor per partition (use -1 to use cluster default)
  configs:
    min.insync.replicas: 1
    cleanup.policy: 'delete'
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
  name: 'my-topic-p2-r1'   # Name of the topic 
  labels:
    environment: example
spec:
  partitions: 2             # Number of topic partitions (use -1 to use cluster default)
  replicas: 1               # Replication factor per partition (use -1 to use cluster default)
  configs:
    min.insync.replicas: 1
    cleanup.policy: 'delete'

See official Apache Kafka documentation for details about the topic-level configs.

KafkaTopicList

If you need to define multiple topics (e.g. using a template), it may be easier to use a KafkaTopicList resource.

Specification

Here the resource definition file for defining a KafkaTopicList.

apiVersion: "kafka.jikkou.io/v1beta2"  # The api version (required)
kind: "KafkaTopicList"                 # The resource kind (required)
metadata: # (optional)
  labels: { }
  annotations: { }
items: [ ]                             # An array of KafkaTopic

Example

Here is a simple example that shows how to define a single YAML file containing two topic definitions using the KafkaTopicList resource type. In addition, the example uses a ConfigMap object to define the topic configuration only once.

file: kafka-topics.yaml

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopicList
metadata:
  labels:
    environment: example
items:
  - metadata:
      name: 'my-topic-p1-r1'
    spec:
      partitions: 1
      replicas: 1
      configMapRefs: [ "TopicConfig" ]

  - metadata:
      name: 'my-topic-p2-r1'
    spec:
      partitions: 2
      replicas: 1
      configMapRefs: [ "TopicConfig" ]
---
apiVersion: "core.jikkou.io/v1beta2"
kind: ConfigMap
metadata:
  name: 'TopicConfig'
data:
  min.insync.replicas: 1
  cleanup.policy: 'delete'

7.2.2.4 - Kafka Authorizations

Learn how to manage Kafka Authorizations and ACLs.

KafkaPrincipalAuthorization resources are used to define Access Control Lists (ACLs) for principals authenticated to your Kafka Cluster.

Jikkou can be used to describe all ACL policies that need to be created on Kafka Cluster

KafkaPrincipalAuthorization

Specification

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
  name: "User:Alice"
spec:
  roles: [ ]                     # List of roles to be added to the principal (optional)
  acls:                          # List of KafkaPrincipalACL (required)
    - resource:
        type: <The type of the resource>                            #  (required)
        pattern: <The pattern to be used for matching resources>    #  (required) 
        patternType: <The pattern type>                             #  (required) 
      type: <The type of this ACL>     # ALLOW or DENY (required) 
      operations: [ ]                  # Operation that will be allowed or denied (required) 
      host: <HOST>                     # IP address from which principal will have access or will be denied (optional)

For more information on how to define authorization and ACLs, see the official Apache Kafka documentation: Security

Operations

The list below describes the valid values for the spec.acls.[].operations property :

  • READ
  • WRITE
  • CERATE
  • DELETE
  • ALTER
  • DESCRIBE
  • CLUSTER_ACTION
  • DESCRIBE_CONFIGS
  • ALTER_CONFIGS
  • IDEMPOTENT_WRITE
  • CREATE_TOKEN
  • DESCRIBE_TOKENS
  • ALL

For more information see official Apache Kafka documentation: Operations in Kafka

Resource Types

The list below describes the valid values for the spec.acls.[].resource.type property :

  • TOPIC
  • GROUP
  • CLUSTER
  • USER
  • TRANSACTIONAL_ID

For more information see official Apache Kafka documentation: Resources in Kafka

Pattern Types

The list below describes the valid values for the spec.acls.[].resource.patternType property :

  • LITERAL: Use to allow or denied a principal to have access to a specific resource name.
  • MATCH: Use to allow or denied a principal to have access to all resources matching the given regex.
  • PREFIXED: Use to allow or denied a principal to have access to all resources having the given prefix.

Example

---
apiVersion: "kafka.jikkou.io/v1beta2"    # The api version (required)
kind: "KafkaPrincipalAuthorization"      # The resource kind (required)
metadata:
  name: "User:Alice"
spec:
  acls:
    - resource:
        type: 'topic'
        pattern: 'my-topic-'
        patternType: 'PREFIXED'
      type: "ALLOW"
      operations: [ 'READ', 'WRITE' ]
      host: "*"
    - resource:
        type: 'topic'
        pattern: 'my-other-topic-.*'
        patternType: 'MATCH'
      type: 'ALLOW'
      operations: [ 'READ' ]
      host: "*"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
  name: "User:Bob"
spec:
  acls:
    - resource:
        type: 'topic'
        pattern: 'my-topic-'
        patternType: 'PREFIXED'
      type: 'ALLOW'
      operations: [ 'READ', 'WRITE' ]
      host: "*"

KafkaPrincipalRole

Specification

apiVersion: "kafka.jikkou.io/v1beta2"    # The api version (required)
kind: "KafkaPrincipalRole"               # The resource kind (required)
metadata:
  name: <Name of role>                   # The name of the role (required)  
spec:
acls: [ ]                                # A list of KafkaPrincipalACL (required)

Example

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalRole"
metadata:
  name: "KafkaTopicPublicRead"
spec:
  acls:
    - type: "ALLOW"
      operations: [ 'READ' ]
      resource:
        type: 'topic'
        pattern: '/public-([.-])*/'
        patternType: 'MATCH'
      host: "*"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalRole"
metadata:
  name: "KafkaTopicPublicWrite"
spec:
  acls:
    - type: "ALLOW"
      operations: [ 'WRITE' ]
      resource:
        type: 'topic'
        pattern: '/public-([.-])*/'
        patternType: 'MATCH'
      host: "*"
---

apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
  name: "User:Alice"
spec:
  roles:
    - "KafkaTopicPublicRead"
    - "KafkaTopicPublicWrite"
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaPrincipalAuthorization"
metadata:
  name: "User:Bob"
spec:
  roles:
    - "KafkaTopicPublicRead"

7.2.2.5 - Kafka Quotas

Learn how to manage Kafka Client Quotas

KafkaClientQuota resources are used to define the quota limits to be applied on Kafka consumers and producers. A KafkaClientQuota resource can be used to apply limit to consumers and/or producers identified by a client-id or a user principal.

KafkaClientQuota

Specification

Here is the resource definition file for defining a KafkaClientQuota.

apiVersion: "kafka.jikkou.io/v1beta2" # The api version (required)
kind: "KafkaClientQuota"              # The resource kind (required)
metadata: # (optional)
  labels: { }
  annotations: { }
spec:
  type: <The quota type> # (required)       
  entity:
    clientId: <The id of the client>    # (required depending on the quota type).
    user: <The principal of the user>   # (required depending on the quota type).
  configs:
    requestPercentage: <The quota in percentage (%) of total requests>      # (optional)
    producerByteRate: <The quota in bytes for restricting data production>  # (optional)
    consumerByteRate: <The quota in bytes for restricting data consumption> # (optional)

Quota Types

The list below describes the supported quota types:

  • USERS_DEFAULT: Set default quotas for all users.
  • USER: Set quotas for a specific user principal.
  • USER_CLIENT: Set quotas for a specific user principal and a specific client-id.
  • USER_ALL_CLIENTS: Set default quotas for a specific user and all clients.
  • CLIENT: Set default quotas for a specific client.
  • CLIENTS_DEFAULT: Set default quotas for all clients.

Example

Here is a simple example that shows how to define a single YAML file containing two quota definitions using the KafkaClientQuota resource type.

file: kafka-quotas.yaml

---
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuota'
metadata:
  labels: { }
  annotations: { }
spec:
  type: 'CLIENT'
  entity:
    clientId: 'my-client'
  configs:
    requestPercentage: 10
    producerByteRate: 1024
    consumerByteRate: 1024
---
apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuota'
metadata:
  labels: { }
  annotations: { }
spec:
  type: 'USER'
  entity:
    user: 'my-user'
  configs:
    requestPercentage: 10
    producerByteRate: 1024
    consumerByteRate: 1024

KafkaClientQuotaList

If you need to define multiple topics (e.g. using a template), it may be easier to use a KafkaClientQuotaList resource.

Specification

Here the resource definition file for defining a KafkaTopicList.

apiVersion: "kafka.jikkou.io/v1beta2"  # The api version (required)
kind: "KafkaClientQuotaList"           # The resource kind (required)
metadata: # (optional)
  name: <The name of the topic>
  labels: { }
  annotations: { }
items: [ ]                              # An array of KafkaClientQuota

Example

Here is a simple example that shows how to define a single YAML file containing two KafkaClientQuota definition using the KafkaClientQuotaList resource type.

apiVersion: 'kafka.jikkou.io/v1beta2'
kind: 'KafkaClientQuotaList'
metadata:
  labels: { }
  annotations: { }
items:
  - spec:
    type: 'CLIENT'
    entity:
      clientId: 'my-client'
    configs:
      requestPercentage: 10
      producerByteRate: 1024
      consumerByteRate: 1024

  - spec:
      type: 'USER'
      entity:
        user: 'my-user'
      configs:
        requestPercentage: 10
        producerByteRate: 1024
        consumerByteRate: 1024

7.2.2.6 - Kafka Table Records

Learn how to manage a KTable Topic Records

A KafkaTableRecord resource can be used to produce a key/value record into a given compacted topic, i.e., a topic with cleanup.policy=compact (a.k.a. KTable).

KafkaTableRecord

Specification

Here is the resource definition file for defining a KafkaTableRecord.

apiVersion: "kafka.jikkou.io/v1beta1" # The api version (required)
kind: "KafkaTableRecord"              # The resource kind (required)
metadata:
  labels: { }
  annotations: { }
spec:
  type: <string>      # The topic name (required)        
  headers: # The list of headers
    - name: <string>
      value: <string>
  key: # The record-key (required)
    type: <string>                   # The record-key type. Must be one of: BINARY, STRING, JSON (required)
    data: # The record-key in JSON serialized form.
      $ref: <url or path>            # Or an url to a local file containing the JSON string value.
  value: # The record-value (required)
    type: <string>                   # The record-value type. Must be one of: BINARY, STRING, JSON (required)
    data: # The record-value in JSON serialized form.
      $ref: <url or path>            # Or an url to a local file containing the JSON string value.

Usage

The KafkaTableRecord resource has been designed primarily to manage reference data published and shared via Kafka. Therefore, it is highly recommended to use this resource only with compacted Kafka topics containing a small amount of data.

Examples

Here are some examples that show how to a KafkaTableRecord using the different supported data type.

STRING:

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
  topic: "my-topic"
  headers:
    - name: "content-type"
      value: "application/text"
  key:
    type: STRING
    data: |
      "bar"      
  value:
    type: STRING
    data: |
      "foo"      

JSON:

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
  topic: "my-topic"
  headers:
    - name: "content-type"
      value: "application/text"
  key:
    type: STRING
    data: |
      "bar"      
  value:
    type: JSON
    data: |
      {
        "foo": "bar"
      }      

BINARY:

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaTableRecord"
spec:
  topic: "my-topic"
  headers:
    - name: "content-type"
      value: "application/text"
  key:
    type: STRING
    data: |
      "bar"      
  value:
    type: BINARY
    data: |
      "eyJmb28iOiAiYmFyIn0K"      

7.2.3 - Transformations

Learn how to use the built-in transformation provided by the Extension Provider for Apache Kafka.

Here, you will find information to use the built-in transformations for Apache Kafka resources.

More information:

7.2.3.1 - KafkaTopicMaxNumPartitions

This transformation can be used to enforce a maximum value for the number of partitions of kafka topics.

Configuration

NameTypeDescriptionDefault
maxNumPartitionsIntmaximum value for the number of partitions to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMaxNumPartitions
      priority = 100
      config = {
        maxNumPartitions = 50
      }
    }
  ]
}

7.2.3.2 - KafkaTopicMaxRetentionMs

This transformation can be used to enforce a maximum value for the retention.ms property of kafka topics.

Configuration

NameTypeDescriptionDefault
maxRetentionMsIntMinimum value of retention.ms to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinRetentionMsTransformation
      priority = 100
      config = {
        maxRetentionMs = 2592000000 # 30 days
      }
    }
  ]
}

7.2.3.3 - KafkaTopicMinInSyncReplicas

This transformation can be used to enforce a minimum value for the min.insync.replicas property of kafka topics.

Configuration

NameTypeDescriptionDefault
minInSyncReplicasIntMinimum value of min.insync.replicas to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinInSyncReplicasTransformation
      priority = 100
      config = {
        minInSyncReplicas = 2
      }
    }
  ]
}

7.2.3.4 - KafkaTopicMinReplicas

This transformation can be used to enforce a minimum value for the replication factor of kafka topics.

Configuration

NameTypeDescriptionDefault
minReplicationFactorIntMinimum value of replication factor to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinReplicasTransformation
      priority = 100
      config = {
        minReplicationFactor = 3
      }
    }
  ]
}

7.2.3.5 - KafkaTopicMinRetentionMs

This transformation can be used to enforce a minimum value for the retention.ms property of kafka topics.

Configuration

NameTypeDescriptionDefault
minRetentionMsIntMinimum value of retention.ms to be used for Kafka Topics

Example

jikkou {
  transformations: [
    {
      type = io.streamthoughts.jikkou.kafka.transform.KafkaTopicMinRetentionMsTransformation
      priority = 100
      config = {
        minRetentionMs = 604800000 # 7 days
      }
    }
  ]
}

7.2.4 - Validations

Learn how to use the built-in validations provided by the Extension Provider for Apache Kafka.

Jikkou ships with the following built-in validations:

Topics

NoDuplicateTopicsAllowedValidation

(auto registered)

TopicConfigKeysValidation

(auto registered)

type = io.streamthoughts.jikkou.kafka.validation.TopicConfigKeysValidation

The TopicConfigKeysValidation allows checking if the specified topic configs are all valid.

TopicMinNumPartitions

type = io.streamthoughts.jikkou.kafka.validation.TopicMinNumPartitionsValidation

The TopicMinNumPartitions allows checking if the specified number of partitions for a topic is not less than the minimum required.

Configuration

NameTypeDescriptionDefault
topicMinNumPartitionsIntMinimum number of partitions allowed

TopicMaxNumPartitions

type = io.streamthoughts.jikkou.kafka.validation.TopicMaxNumPartitions

The TopicMaxNumPartitions allows checking if the number of partitions for a topic is not greater than the maximum configured.

Configuration

NameTypeDescriptionDefault
topicMaxNumPartitionsIntMaximum number of partitions allowed

TopicMinReplicationFactor

type = io.streamthoughts.jikkou.kafka.validation.TopicMinReplicationFactor

The TopicMinReplicationFactor allows checking if the specified replication factor for a topic is not less than the minimum required.

Configuration

NameTypeDescriptionDefault
topicMinReplicationFactorIntMinimum replication factor allowed

TopicMaxReplicationFactor

type = io.streamthoughts.jikkou.kafka.validation.TopicMaxReplicationFactor

The TopicMaxReplicationFactor allows checking if the specified replication factor for a topic is not greater than the maximum configured.

Configuration

NameTypeDescriptionDefault
topicMaxReplicationFactorIntMaximum replication factor allowed

TopicNamePrefix

type = io.streamthoughts.jikkou.kafka.validation.TopicNamePrefix

The TopicNamePrefix allows checking if the specified name for a topic starts with one of the configured suffixes.

Configuration

NameTypeDescriptionDefault
topicNamePrefixesListList of topic name prefixes allows

TopicNameSuffix

type = io.streamthoughts.jikkou.kafka.validation.TopicNameSuffix

The TopicNameSuffix allows checking if the specified name for a topic ends with one of the configured suffixes.

Configuration

NameTypeDescriptionDefault
topicNameSuffixesListList of topic name suffixes allows

ACLs

NoDuplicateUsersAllowedValidation

(auto registered)

NoDuplicateRolesAllowedValidation

(auto registered)

Quotas

QuotasEntityValidation

(auto registered)

7.2.5 - Annotations

Learn how to use the metadata annotations provided by the extensions for Apache Kafka.

Here, you will find information about the annotations provided the Apache Kafka extension for Jikkou.

List of built-in annotations

kafka.jikkou.io/cluster-id

Used by jikkou.

The annotation is automatically added by Jikkou to a describe object part of an Apache Kafka cluster.

7.2.6 - Actions

Learn how to use the actions provided by the Extension Provider for Apache Kafka.

Here, you will find the list of actions provided by the Extension Provider for Apache Kafka.

Apache Kafka Action

More information:

7.2.6.1 - KafkaConsumerGroupsResetOffsets

Learn how to use the KafkaConsumerGroupsResetOffsets action.

The KafkaConsumerGroupsResetOffsets action allows resetting offsets of consumer group. It supports one consumer group at the time, and group should be in EMPTY state.

Usage (CLI)

Usage:

Execute the action.

jikkou action KafkaConsumerGroupsResetOffsets execute [-hV] [--dry-run]
[--to-earliest] [--to-latest] --group=PARAM [--logger-level=<level>]
[-o=<format>] [--to-datetime=PARAM] [--to-offset=PARAM] --topic=PARAM
[--topic=PARAM]...

DESCRIPTION:

Reset offsets of consumer group. Supports one consumer group at the time, and
group should be in EMPTY state.
You must choose one of the following reset specifications: to-datetime,
by-duration, to-earliest, to-latest, to-offset.


OPTIONS:

      --dry-run             Only show results without executing changes on
                              Consumer Groups.
      --group=PARAM         The consumer group to act on.
  -h, --help                Show this help message and exit.
      --logger-level=<level>
                            Specify the log level verbosity to be used while
                              running a command.
                            Valid level values are: TRACE, DEBUG, INFO, WARN,
                              ERROR.
                            For example, `--logger-level=INFO`
  -o, --output=<format>     Prints the output in the specified format. Allowed
                              values: JSON, YAML (default YAML).
      --to-datetime=PARAM   Reset offsets to offset from datetime. Format:
                              'YYYY-MM-DDTHH:mm:SS.sss'
      --to-earliest         Reset offsets to earliest offset.
      --to-latest           Reset offsets to latest offset.
      --to-offset=PARAM     Reset offsets to a specific offset.
      --topic=PARAM         The topic whose partitions must be included in the
                              reset-offset action.
  -V, --version             Print version information and exit.

Examples

Reset Consumer Group to the earliest offsets

jikkou action kafkaconsumergroupresetoffsets execute \
--group my-group \
--topic test \
--to-earliest

(output)

---
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
  labels: {}
  annotations:
    configs.jikkou.io/to-earliest: "true"
    configs.jikkou.io/group: "my-group"
    configs.jikkou.io/dry-run: "false"
    configs.jikkou.io/topic: 
        - "test"
results:
- status: "SUCCEEDED"
  errors: []
  data:
    apiVersion: "kafka.jikkou.io/v1beta1"
    kind: "KafkaConsumerGroup"
    metadata:
      name: "my-group"
      labels:
        kafka.jikkou.io/is-simple-consumer: false
      annotations: {}
    status:
      state: "EMPTY"
      members: []
      offsets:
      - topic: "test"
        partition: 1
        offset: 0
      - topic: "test"
        partition: 0
        offset: 0
      - topic: "test"
        partition: 2
        offset: 0
      - topic: "--test"
        partition: 0
        offset: 0
      coordinator:
        id: "101"
        host: "localhost"
        port: 9092

7.2.7 - Compatibility

Compatibility for Apache Kafka.

The Apache Kafka extension for Jikkou utilizes the Kafka Admin Client which is compatible with any Kafka infrastructure, such as :

  • Aiven
  • Apache Kafka
  • Confluent Cloud
  • MSK
  • Redpanda
  • etc.

In addition, Kafka Protocol has a “bidirectional” client compatibility policy. In other words, new clients can talk to old servers, and old clients can talk to new servers.

7.3 - Apache Kafka Connect

Lean how to use the Jikkou Extension Provider for Apache Kafka Connect.

Here, you will find information to use the Kafka Connect extension for Jikkou.

More information:

7.3.1 - Configuration

Learn how to configure the extensions for Kafka Connect.

This section describes how to configure the Kafka Connect extension.

Extension

The Kafka Connect extension can be enabled/disabled via the configuration properties:

# Example
jikkou {
  extensions.provider.kafkaconnect.enabled = true
}

Configuration

You can configure the properties to be used to connect the Kafka Connect cluster through the Jikkou client configuration property: jikkou.kafkaConnect.

Example:

jikkou {
  kafkaConnect {
    # Array of Kafka Connect clusters configurations.
    clusters = [
      {
        # Name of the cluster (e.g., dev, staging, production, etc.)
        name = "locahost"
        # URL of the Kafka Connect service
        url = "http://localhost:8083"
        # Method to use for authenticating on Kafka Connect. Available values are: [none, basicauth, ssl]
        authMethod = none
        # Use when 'authMethod' is 'basicauth' to specify the username for Authorization Basic header
        basicAuthUser = null
        # Use when 'authMethod' is 'basicauth' to specify the password for Authorization Basic header
        basicAuthPassword = null
        # Enable debug logging
        debugLoggingEnabled = false
  
        # Ssl Config: Use when 'authMethod' is 'ssl'
        # The location of the key store file.
        sslKeyStoreLocation = "/certs/registry.keystore.jks"
        # The file format of the key store file.
        sslKeyStoreType = "JKS"
        # The password for the key store file.
        sslKeyStorePassword = "password"
        # The password of the private key in the key store file.
        sslKeyPassword = "password"
        # The location of the trust store file.
        sslTrustStoreLocation = "/certs/registry.truststore.jks"
        # The file format of the trust store file.
        sslTrustStoreType = "JKS"
        # The password for the trust store file.
        sslTrustStorePassword = "password"
        # Specifies whether to ignore the hostname verification.
        sslIgnoreHostnameVerification = true
      }
    ]
  }
}

7.3.2 - Resources

Learn how to use the resources provided by the Kafka Connect extension.

Here, you will find the list of resources supported by the Kafka Connect Extension.

Kafka Connect Resources

More information:

7.3.2.1 - KafkaConnectors

Learn how to manage Kafka Connectors.

This section describes the resource definition format for KafkaConnector entities, which can be used to define the configuration and status of connectors you plan to create and manage on specific Kafka Connect clusters.

Definition Format of KafkaConnector

Below is the overall structure of the KafkaConnector resource.

---
apiVersion: "kafka.jikkou.io/v1beta1"  # The api version (required)
kind: "KafkaConnector"                 # The resource kind (required)
metadata:
  name: <string>                       # The name of the connector (required)
  labels:
    # Name of the Kafka Connect cluster to create the connector instance in (required).
    kafka.jikkou.io/connect-cluster: <string>
  annotations:
    # Override client properties to connect to Kafka Connect cluster (optional).
    jikkou.io/config-override: | 
      <json>
spec:
  connectorClass: <string>            # Name or alias of the class for this connector.
  tasksMax: <integer>                 # The maximum number of tasks for the Kafka Connector.
  config:                             # Configuration properties of the connector.
    <key>: <value>
  state: <string>                     # The state the connector should be in. Defaults to running.

See below for details about all these fields.

Metadata

metadata.name [required]

The name of the connector.

labels.kafka.jikkou.io/connect-cluster [required]

The name of the Kafka Connect cluster to create the connector instance in. The cluster name must be configured through the kafkaConnect.clusters[] Jikkou’s configuration setting (see: Configuration).

jikkou.io/config-override: [optional]

The JSON client configurations to override for connecting to the Kafka Connect cluster. The configuration properties passed through this annotation override any cluster properties defined in the Jikkou’s configuration setting (see: Configuration).

apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  name: "my-connector"
  labels:
    kafka.jikkou.io/connect-cluster: "my-connect-cluster"
  annotations:
    jikkou.io/config-override: |
      { "url": "http://localhost:8083" }      

Specification

spec.connectorClass [required]

The name or alias of the class for this connector.

spec.tasksMax [optional]

The maximum number of tasks for the Kafka Connector. Default is 1.

spec.config [required]

The connector’s configuration properties.

spec.state [optional]

The state the connector should be in. Defaults to running.

Below are the valid values:

  • running: Transition the connector and its tasks to RUNNING state.
  • paused: Pause the connector and its tasks, which stops message processing until the connector is resumed.
  • stopped: Completely shut down the connector and its tasks. The connector config remains present in the config topic of the cluster (if running in distributed mode), unmodified.

Examples

The following is an example of a resource describing a Kafka connector:

---
# Example: file: kafka-connector-filestream-sink.yaml
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  name: "local-file-sink"
  labels:
    kafka.jikkou.io/connect-cluster: "my-connect-cluster"
spec:
  connectorClass: "FileStreamSink"
  tasksMax: 1
  config:
    file: "/tmp/test.sink.txt"
    topics: "connect-test"
  state: "RUNNING"

Listing KafkaConnector

You can retrieve the state of Kafka Connector instances running on your Kafka Connect clusters using the jikkou get kafkaconnectors (or jikkou get kc) command.

Usage

$jikkou get kc --help

Usage:

Get all 'KafkaConnector' resources.

jikkou get kafkaconnectors [-hV] [--expand-status] [-o=<format>]
                           [-s=<expressions>]...

Description:

Use jikkou get kafkaconnectors when you want to describe the state of all
resources of type 'KafkaConnector'.

Options:

      --expand-status     Retrieves additional information about the status of
                            the connector and its tasks.
  -h, --help              Show this help message and exit.
  -o, --output=<format>   Prints the output in the specified format. Allowed
                            values: json, yaml (default yaml).
  -s, --selector=<expressions>
                          The selector expression use for including or
                            excluding resources.
  -V, --version           Print version information and exit.

(The output from your current Jikkou version may be different from the above example.)

Examples

(command)

$ jikkou get kc --expand-status 

(output)

apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  name: "local-file-sink"
  labels:
    kafka.jikkou.io/connect-cluster: "localhost"
spec:
  connectorClass: "FileStreamSink"
  tasksMax: 1
  config:
    file: "/tmp/test.sink.txt"
    topics: "connect-test"
  state: "RUNNING"
status:
  connectorStatus:
    name: "local-file-sink"
    connector:
      state: "RUNNING"
      worker_id: "localhost:8083"
    tasks:
      id: 1
      state: "RUNNING"
      worker_id: "localhost:8083"

The status.connectorStatus provides the connector status, as reported by the Kafka Connect REST API.

7.3.3 - Validations

Learn how to use the validations provided by the Kafka Connect extension.

Jikkou ships with the following built-in validations:

7.3.4 - Annotations

Learn how to use the metadata annotations provided by the Kafka Connect extension.

This section lists a number of well known annotations, that have defined semantics. They can be attached to KafkaConnect resources through the metadata.annotations field and consumed as needed by extensions (i.e., validations, transformations, controller, collector, etc.).

List of built-in annotations

7.3.5 - Labels

Learn how to use the metadata labels provided by the Kafka Connect extension.

This section lists a number of well known labels, that have defined semantics. They can be attached to KafkaConnect resources through the metadata.labels field and consumed as needed by extensions (i.e., validations, transformations, controller, collector, etc.).

Labels

kafka.jikkou.io/connect-cluster

# Example
---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  labels:
    kafka.jikkou.io/connect-cluster: 'my-connect-cluster'

The value of this label defined the name of the Kafka Connect cluster to create the connector instance in. The cluster name must be configured through the kafkaConnect.clusters[] Jikkou’s configuration setting (see: Configuration).

7.3.6 - Actions

Learn how to use the actions provided by the Extension Provider for Kafka Connect.

Here, you will find the list of actions provided by the Extension Provider for Kafka Connect.

Kafka Connect Action

More information:

7.3.6.1 - KafkaConnectRestartConnectors

Learn how to use the KafkaConnectRestartConnector action.

The KafkaConnectRestartConnectors action allows a user to restart all or just the failed Connector and Task instances for one or multiple named connectors.

Usage (CLI)

Usage:

Execute the action.

jikkou action KafkaConnectRestartConnectors execute [-hV] [--include-tasks]
[--only-failed] [--connect-cluster=PARAM] [--logger-level=<level>]
[-o=<format>] [--connector-name=PARAM]...

DESCRIPTION:

The KafkaConnectRestartConnectors action a user to restart all or just the
failed Connector and Task instances for one or multiple named connectors.

OPTIONS:

      --connect-cluster=PARAM
                          The name of the connect cluster.
      --connector-name=PARAM
                          The connector's name.
  -h, --help              Show this help message and exit.
      --include-tasks     Specifies whether to restart the connector instance
                            and task instances (includeTasks=true) or just the
                            connector instance (includeTasks=false)
      --logger-level=<level>
                          Specify the log level verbosity to be used while
                            running a command.
                          Valid level values are: TRACE, DEBUG, INFO, WARN,
                            ERROR.
                          For example, `--logger-level=INFO`
  -o, --output=<format>   Prints the output in the specified format. Allowed
                            values: JSON, YAML (default YAML).
      --only-failed       Specifies whether to restart just the instances with
                            a FAILED status (onlyFailed=true) or all instances
                            (onlyFailed=false)
  -V, --version           Print version information and exit.

Examples

Restart all connectors for all Kafka Connect clusters.

jikkou action kafkaconnectrestartconnectors execute

(output)

---
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
  labels: {}
  annotations: {}
results:
- status: "SUCCEEDED"
  data:
    apiVersion: "kafka.jikkou.io/v1beta1"
    kind: "KafkaConnector"
    metadata:
      name: "local-file-sink"
      labels:
        kafka.jikkou.io/connect-cluster: "my-connect-cluster"
      annotations: {}
    spec:
      connectorClass: "FileStreamSink"
      tasksMax: 1
      config:
        file: "/tmp/test.sink.txt"
        topics: "connect-test"
      state: "RUNNING"
    status:
      connectorStatus:
        name: "local-file-sink"
        connector:
          state: "RUNNING"
          workerId: "connect:8083"
        tasks:
        - id: 0
          state: "RUNNING"
          workerId: "connect:8083"

Restart all connectors with a FAILED status on all Kafka Connect clusters.

jikkou action kafkaconnectrestartconnectors execute \
--only-failed

Restart specific connector and tasks for on Kafka Connect cluster

jikkou action kafkaconnectrestartconnectors execute \
--cluster-name my-connect-cluster
--connector-name local-file-sink \
--include-tasks

7.4 - Schema Registry

Lean how to use the Jikkou Extension Provider for Schema Registry.

Here, you will find information to use the Schema Registry extensions.

More information:

7.4.1 - Configuration

Learn how to configure the extensions for SchemaRegistry.

Here, you will find the list of resources supported for SchemaRegistry.

Configuration

You can configure the properties to be used to connect the SchemaRegistry service through the Jikkou client configuration property jikkou.schemaRegistry.

Example:

jikkou {
  schemaRegistry {
    # Comma-separated list of URLs for schema registry instances that can be used to register or look up schemas
    url = "http://localhost:8081"
    # The name of the schema registry implementation vendor - can be any value
    vendor = generic
    # Method to use for authenticating on Schema Registry. Available values are: [none, basicauth, ssl]
    authMethod = none
    # Use when 'schemaRegistry.authMethod' is 'basicauth' to specify the username for Authorization Basic header
    basicAuthUser = null
    # Use when 'schemaRegistry.authMethod' is 'basicauth' to specify the password for Authorization Basic header
    basicAuthPassword = null
    # Enable debug logging
    debugLoggingEnabled = false

    # Ssl Config: Use when 'authMethod' is 'ssl'
    # The location of the key store file.
    sslKeyStoreLocation = "/certs/registry.keystore.jks"
    # The file format of the key store file.
    sslKeyStoreType = "JKS"
    # The password for the key store file.
    sslKeyStorePassword = "password"
    # The password of the private key in the key store file.
    sslKeyPassword = "password"
    # The location of the trust store file.
    sslTrustStoreLocation = "/certs/registry.truststore.jks"
    # The file format of the trust store file.
    sslTrustStoreType = "JKS"
    # The password for the trust store file.
    sslTrustStorePassword = "password"
    # Specifies whether to ignore the hostname verification.
    sslIgnoreHostnameVerification = true
  }
}

7.4.2 - Resources

Learn how to use the built-in resources provided by the extensions for Schema Registry.

Here, you will find the list of resources supported for Schema Registry.

Schema Registry Resources

More information:

7.4.2.1 - Schema Registry Subjects

Learn how to manage SchemaRegistry Subjects.

SchemaRegistrySubject resources are used to define the subject schemas you want to manage on your SchemaRegistry. A SchemaRegistrySubject resource defines the schema, the compatibility level, and the references to be associated with a subject version.

SchemaRegistrySubject

Specification

Here is the resource definition file for defining a SchemaRegistrySubject.

apiVersion: "schemaregistry.jikkou.io/v1beta2"  # The api version (required)
kind: "SchemaRegistrySubject"                   # The resource kind (required)
metadata:
  name: <The name of the subject>               # (required)
  labels: { }
  annotations: { }
spec:
  schemaRegistry:
    vendor: <vendor_name>                       # (optional) The vendor of the SchemaRegistry, e.g., Confluent, Karapace, etc
  compatibilityLevel: <compatibility_level>     # (optional) The schema compatibility level for this subject.
  schemaType: <The schema format>               # (required) Accepted values are: AVRO, PROTOBUF, JSON
  schema:
    $ref: <url or path>  # 
  references: # Specifies the names of referenced schemas (optional array).
    - name: <>                                  # The name for the reference.
      subject: <>                               # The subject under which the referenced schema is registered.
      version: <>                               # The exact version of the schema under the registered subject.
]

The metadata.name property is mandatory and specifies the name of the Subject.

To use the SchemaRegistry default values for the compatibilityLevel you can omit the property.

Example

Here is a simple example that shows how to define a single subject AVRO schema for type using the SchemaRegistrySubject resource type.

file: subject-user.yaml

---
apiVersion: "schemaregistry.jikkou.io/v1beta2"
kind: "SchemaRegistrySubject"
metadata:
  name: "User"
  labels: { }
  annotations:
    schemaregistry.jikkou.io/normalize-schema: true
spec:
  compatibilityLevel: "FULL_TRANSITIVE"
  schemaType: "AVRO"
  schema:
    $ref: ./user-schema.avsc

file: user-schema.avsc

---
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": [ "null", "string" ],
      "default": null,
    },
    {
      "name": "favorite_number",
      "type": [ "null", "int" ],
      "default": null
    },
    {
      "name": "favorite_color",
      "type": [ "null", "string" ],
      "default": null
    }
  ]
}

Alternatively, we can directly pass the Avro schema as follows :

file: subject-user.yaml

---
apiVersion: "schemaregistry.jikkou.io/v1beta2"
kind: "SchemaRegistrySubject"
metadata:
  name: "User"
  labels: { }
  annotations:
    schemaregistry.jikkou.io/normalize-schema: true
spec:
  compatibilityLevel: "FULL_TRANSITIVE"
  schemaType: "AVRO"
  schema: |
    {
      "namespace": "example.avro",
      "type": "record",
      "name": "User",
      "fields": [
        {
          "name": "name",
          "type": [ "null", "string" ],
          "default": null
        },
        {
          "name": "favorite_number",
          "type": [ "null", "int" ],
          "default": null
        },
        {
          "name": "favorite_color",
          "type": [ "null", "string"],
          "default": null
        }
      ]
    }    

7.4.3 - Validations

Learn how to use the built-in validations provided by the extensions for Schema Registry.

Jikkou ships with the following built-in validations:

Subject

SchemaCompatibilityValidation

type = io.streamthoughts.jikkou.schema.registry.validation.SchemaCompatibilityValidation

The SchemaCompatibilityValidation allows testing the compatibility of the schema with the latest version already registered in the Schema Registry using the provided compatibility-level.

AvroSchemaValidation

The AvroSchemaValidation allows checking if the specified Avro schema matches some specific avro schema definition rules;

type = io.streamthoughts.jikkou.schema.registry.validation.AvroSchemaValidation

Configuration

NameTypeDescriptionDefault
fieldsMustHaveDocBooleanVerify that all record fields have a doc propertyfalse
fieldsMustBeNullableBooleanVerify that all record fields are nullablefalse
fieldsMustBeOptionalBooleanVerify that all record fields are optionalfalse

7.4.4 - Annotations

Learn how to use the metadata annotations provided by the extensions for Schema Registry.

Here, you will find information about the annotations provided by the Schema Registry extension for Jikkou.

List of built-in annotations

schemaregistry.jikkou.io/url

Used by jikkou.

The annotation is automatically added by Jikkou to describe the SchemaRegistry URL from which a subject schema is retrieved.

schemaregistry.jikkou.io/schema-version

Used by jikkou.

The annotation is automatically added by Jikkou to describe the version of a subject schema.

schemaregistry.jikkou.io/schema-id

Used by jikkou.

The annotation is automatically added by Jikkou to describe the version of a subject id.

schemaregistry.jikkou.io/normalize-schema

Used on: schemaregistry.jikkou.io/v1beta2:SchemaRegistrySubject

This annotation can be used to normalize the schema on SchemaRegistry server side. Note, that Jikkou will attempt to normalize AVRO and JSON schema.

See: Confluent SchemaRegistry API Reference

schemaregistry.jikkou.io/permanante-delete

Used on: schemaregistry.jikkou.io/v1beta2:SchemaRegistrySubject

The annotation can be used to specify a hard delete of the subject, which removes all associated metadata including the schema ID. The default is false. If the flag is not included, a soft delete is performed. You must perform a soft delete first, then the hard delete.

See: Confluent SchemaRegistry API Reference

7.5 - Aiven

Lean how to use the Jikkou Extensions Providers for Aiven.

Here, you will find information to use the Aiven for Kafka extensions.

More information:

7.5.1 - Configuration

Learn how to configure the extensions for Aiven.

Here, you will find the list of resources supported by the extension for Aiven.

Configuration

You can configure the properties to be used to connect the Aiven service through the Jikkou client configuration property jikkou.aiven.

Example:

jikkou {
  aiven {
    # Aiven project name
    project = "http://localhost:8081"
    # Aiven service name
    service = generic
    # URL to the Aiven REST API.
    apiUrl = "https://api.aiven.io/v1/"
    # Aiven Bearer Token. Tokens can be obtained from your Aiven profile page
    tokenAuth = null
    # Enable debug logging
    debugLoggingEnabled = false
  }
}

7.5.2 - Resources

Learn how to use the built-in resources provided by the extensions for Aiven.

Here, you will find the list of resources supported by the extensions for Aiven.

Aiven for Apache Kafka Resources

More information:

7.5.2.1 - ACL for Aiven Apache Kafka®

Learn how to manage Access Control Lists (ACLs) in Aiven for Apache Kafka®

The KafkaTopicAclEntry resources are used to manage the Access Control Lists in Aiven for Apache Kafka®. A KafkaTopicAclEntry resource defines the permission to be granted to a user for one or more kafka topics.

KafkaTopicAclEntry

Specification

Here is the resource definition file for defining a KafkaTopicAclEntry.

---
apiVersion: "kafka.aiven.io/v1beta1"   # The api version (required)
kind: "KafkaTopicAclEntry"             # The resource kind (required)
metadata:
  labels: { }
  annotations: { }
spec:
  permission: <>               # The permission. Accepted values are: READ, WRITE, READWRITE, ADMIN
  username: <>                 # The username
  topic: <>                    # Topic name or glob pattern

Example

Here is a simple example that shows how to define a single ACL entry using the KafkaTopicAclEntry resource type.

file: kafka-topic-acl-entry.yaml

---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "KafkaTopicAclEntry"
metadata:
  labels: { }
  annotations: { }
spec:
  permission: "READWRITE"
  username: "alice"
  topic: "public-*"

KafkaTopicAclEntryList

If you need to define multiple ACL entries (e.g. using a template), it may be easier to use a KafkaTopicAclEntryList resource.

Specification

Here the resource definition file for defining a KafkaTopicList.

---
apiVersion: "kafka.aiven.io/v1beta1"    # The api version (required)
kind: "KafkaTopicAclEntryList"          # The resource kind (required)
metadata: # (optional)
  name: <The name of the topic>
  labels: { }
  annotations: { }
items: [ ]                             # An array of KafkaTopicAclEntry

Example

Here is a simple example that shows how to define a single YAML file containing two ACL entry definitions using the KafkaTopicAclEntryList resource type.

---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "KafkaTopicAclEntryList"
items:
  - spec:
      permission: "READWRITE"
      username: "alice"
      topic: "public-*"
  - spec:
      permission: "READ"
      username: "bob"
      topic: "public-*"

7.5.2.2 - Quotas for Aiven Apache Kafka®

Learn how to manage Quotas in Aiven for Apache Kafka®

The KafkaQuota resources are used to manage the Quotas in Aiven for Apache Kafka® service. For more details, see https://docs.aiven.io/docs/products/kafka/concepts/kafka-quotas

KafkaQuota

Specification

Here is the resource definition file for defining a KafkaQuota.

---
apiVersion: "kafka.aiven.io/v1beta1"   # The api version (required)
kind: "KafkaQuota"                     # The resource kind (required)
metadata:
  labels: { }
  annotations: { }
spec:
  user: <string>                     # The username: (Optional: 'default' if null)
  clientId: <string>                 # The client-id
  consumerByteRate: <number>         # The quota in bytes for restricting data consumption
  producerByteRate: <number>         # The quota in bytes for restricting data production
  requestPercentage: <number>

Example

Here is a simple example that shows how to define a single ACL entry using the KafkaQuota resource type.

file: kafka-quotas.yaml

---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "KafkaQuota"
spec:
  user: "default"
  clientId: "default"
  consumerByteRate: 1048576
  producerByteRate: 1048576
  requestPercentage: 25

KafkaQuotaList

If you need to define multiple Kafka quotas (e.g. using a template), it may be easier to use a KafkaQuotaList resource.

Specification

Here the resource definition file for defining a KafkaTopicList.

---
apiVersion: "kafka.aiven.io/v1beta1"    # The api version (required)
kind: "KafkaQuotaList"                  # The resource kind (required)
metadata: # (optional)
  labels: { }
  annotations: { }
items: [ ]                             # An array of KafkaQuotaList

Example

Here is a simple example that shows how to define a single YAML file containing two ACL entry definitions using the KafkaQuotaList resource type.

---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "KafkaQuotaList"
items:
  - spec:
      user: "default"
      clientId: "default"
      consumerByteRate: 1048576
      producerByteRate: 1048576
      requestPercentage: 5
  - spec:
      user: "avnadmin"
      consumerByteRate: 5242880
      producerByteRate: 5242880
      requestPercentage: 25

7.5.2.3 - ACL for Aiven Schema Registry

Learn how to manage Access Control Lists (ACLs) in Aiven for Schema Registry

The SchemaRegistryAclEntry resources are used to manage the Access Control Lists in Aiven for Schema Registry. A SchemaRegistryAclEntry resource defines the permission to be granted to a user for one or more Schema Registry Subjects.

SchemaRegistryAclEntry

Specification

Here is the resource definition file for defining a SchemaRegistryAclEntry.

---
apiVersion: "kafka.aiven.io/v1beta1"   # The api version (required)
kind: "SchemaRegistryAclEntry"         # The resource kind (required)
metadata:
  labels: { }
  annotations: { }
spec:
  permission: <>               # The permission. Accepted values are: READ, WRITE
  username: <>                 # The username
  resource: <>                 # The Schema Registry ACL entry resource name pattern

NOTE: The resource name pattern should be Config: or Subject:<subject_name> where subject_name must consist of alpha-numeric characters, underscores, dashes, dots and glob characters * and ?.

Example

Here is an example that shows how to define a simple ACL entry using the SchemaRegistryAclEntry resource type.

file: schema-registry-acl-entry.yaml

---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "SchemaRegistryAclEntry"
spec:
  permission: "READ"
  username: "Alice"
  resource: "Subject:*"

SchemaRegistryAclEntryList

If you need to define multiple ACL entries (e.g. using a template), it may be easier to use a SchemaRegistryAclEntryList resource.

Specification

Here the resource definition file for defining a SchemaRegistryAclEntryList.

---
apiVersion: "kafka.aiven.io/v1beta1"    # The api version (required)
kind: "SchemaRegistryAclEntryList"      # The resource kind (required)
metadata: # (optional)
  labels: { }
  annotations: { }
items: [ ]                              # An array of SchemaRegistryAclEntry

Example

Here is a simple example that shows how to define a single YAML file containing two ACL entry definitions using the SchemaRegistryAclEntryList resource type.

---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "SchemaRegistryAclEntryList"
items:
  - spec:
      permission: "READ"
      username: "alice"
      resource: "Config:"
  - spec:
      permission: "WRITE"
      username: "alice"
      resource: "Subject:*"

7.5.2.4 - Subject for Aiven Schema Registry

Learn how to manage Schema Registry Subject Schema in Aiven.

SchemaRegistrySubject resources are used to define the subject schemas you want to manage on your Schema Registry. A SchemaRegistrySubject resource defines the schema, the compatibility level, and the references to be associated with a subject version.

SchemaRegistrySubject

Specification

Here is the resource definition file for defining a SchemaRegistrySubject.

apiVersion: "kafka.aiven.io/v1beta1"  # The api version (required)
kind: "SchemaRegistrySubject"                   # The resource kind (required)
metadata:
  name: <The name of the subject>               # (required)
  labels: { }
  annotations: { }
spec:
  schemaRegistry:
    vendor: 'Karapace'                          # (optional) The vendor of the Schema Registry
  compatibilityLevel: <compatibility_level>     # (optional) The schema compatibility level for this subject.
  schemaType: <The schema format>               # (required) Accepted values are: AVRO, PROTOBUF, JSON
  schema:
    $ref: <url or path>  # 
  references:                                   # Specifies the names of referenced schemas (optional array).
    - name: <>                                  # The name for the reference.
      subject: <>                               # The subject under which the referenced schema is registered.
      version: <>                               # The exact version of the schema under the registered subject.
]

The metadata.name property is mandatory and specifies the name of the Subject.

To use the SchemaRegistry default values for the compatibilityLevel you can omit the property.

Example

Here is a simple example that shows how to define a single subject AVRO schema for type using the SchemaRegistrySubject resource type.

file: subject-user.yaml

---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "SchemaRegistrySubject"
metadata:
  name: "User"
  labels: { }
  annotations:
    schemaregistry.jikkou.io/normalize-schema: true
spec:
  compatibilityLevel: "FULL_TRANSITIVE"
  schemaType: "AVRO"
  schema:
    $ref: ./user-schema.avsc

file: user-schema.avsc

---
{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {
      "name": "name",
      "type": [ "null", "string" ],
      "default": null,
    },
    {
      "name": "favorite_number",
      "type": [ "null", "int" ],
      "default": null
    },
    {
      "name": "favorite_color",
      "type": [ "null", "string" ],
      "default": null
    }
  ]
}

Alternatively, we can directly pass the Avro schema as follows :

file: subject-user.yaml

---
apiVersion: "kafka.aiven.io/v1beta1"
kind: "SchemaRegistrySubject"
metadata:
  name: "User"
  labels: { }
  annotations:
    schemaregistry.jikkou.io/normalize-schema: true
spec:
  compatibilityLevel: "FULL_TRANSITIVE"
  schemaType: "AVRO"
  schema: |
    {
      "namespace": "example.avro",
      "type": "record",
      "name": "User",
      "fields": [
        {
          "name": "name",
          "type": [ "null", "string" ],
          "default": null
        },
        {
          "name": "favorite_number",
          "type": [ "null", "int" ],
          "default": null
        },
        {
          "name": "favorite_color",
          "type": [ "null", "string"],
          "default": null
        }
      ]
    }    

7.5.3 - Validations

Learn how to use the built-in validations provided by the extensions for Aiven.

Jikkou ships with the following built-in validations:

No validation

7.5.4 - Annotations

Learn how to use the metadata annotations provided by the extensions for Aiven.

Here, you will find information about the annotations provided by the Aiven extension for Jikkou.

List of built-in annotations

kafka.aiven.io/acl-entry-id

Used by jikkou.

The annotation is automatically added by Jikkou to describe the ID of an ACL entry.

8 - Developer Guide

Learn how to use the Jikkou Core API

Here, you will find the necessary information to develop with the Jikkou API.

More information:

8.1 - Extension Developer Guide

Learn how to write custom extensions for Jikkou

This guide describes how developers can write new extensions for Jikkou.

More information:

8.1.1 - Package Extensions

Learn how to package and install custom extensions for Jikkou.

Packaging Extensions

You can extend Jikkou’s capabilities by developing custom extensions and resources.

An extension must be developed in Java and packaged as a tarball or ZIP archive. The archive must contain a single top-level directory containing the extension JAR files, as well as any resource files or third party libraries required by your extensions. An alternative approach is to create an uber-JAR that contains all the extension’s JAR files and other resource files needed.

An extension package is more commonly described as an Extension Provider.

Dependencies

Jikkou’s sources are available on Maven Central

To start developing custom extension for Jikkou, simply add the Core library to your project’s dependencies.

For Maven:


<dependency>
    <groupId>io.streamthoughts</groupId>
    <artifactId>jikkou-core</artifactId>
    <version>${jikkou.version}</version>
</dependency>

For Gradle:

implementation group: 'io.streamthoughts', name: 'jikkou-core', version: ${jikkou.version}

Extension Discovery

Jikkou uses the standard Java ServiceLoader mechanism to discover and registers custom extensions and resources. For this, you will need to the implement the Service Provider Interface: io.streamthoughts.jikkou.spi.ExtensionProvider

/**
 * <pre>
 * Service interface for registering extensions and resources to Jikkou at runtime.
 * The implementations are discovered using the standard Java {@link java.util.ServiceLoader} mechanism.
 *
 * Hence, the fully qualified name of the extension classes that implement the {@link ExtensionProvider}
 * interface must be added to a {@code META-INF/services/io.streamthoughts.jikkou.spi.ExtensionProvider} file.
 * </pre>
 */
public interface ExtensionProvider extends HasName, Configurable {

    /**
     * Registers the extensions for this provider.
     *
     * @param registry The ExtensionRegistry.
     */
    void registerExtensions(@NotNull ExtensionRegistry registry);

    /**
     * Registers the resources for this provider.
     *
     * @param registry The ResourceRegistry.
     */
    void registerResources(@NotNull ResourceRegistry registry);
}

Recommendations

If you are using Maven as project management tool, we recommended to use the Apache Maven Assembly Plugin to package your extensions as a tarball or ZIP archive.

Simply create an assembly descriptor in your project as follows:

src/main/assembly/package.xml


<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.2.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.2.0 http://maven.apache.org/xsd/assembly-2.2.0.xsd">
    <id>package</id>
    <formats>
        <format>zip</format>
    </formats>
    <includeBaseDirectory>false</includeBaseDirectory>
    <fileSets>
        <fileSet>
            <directory>${project.basedir}</directory>
            <outputDirectory>${organization.name}-${project.artifactId}/doc</outputDirectory>
            <includes>
                <include>README*</include>
                <include>LICENSE*</include>
                <include>NOTICE*</include>
            </includes>
        </fileSet>
    </fileSets>
    <dependencySets>
        <dependencySet>
            <outputDirectory>${organization.name}-${project.artifactId}/lib</outputDirectory>
            <useProjectArtifact>true</useProjectArtifact>
            <useTransitiveFiltering>true</useTransitiveFiltering>
            <unpack>false</unpack>
            <excludes>
                <exclude>io.streamthoughts:jikkou-core</exclude>
            </excludes>
        </dependencySet>
    </dependencySets>
</assembly>

Then, configure the maven-assembly-plugin in the pom.xml file of your project:


<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <finalName>${organization.name}-${project.artifactId}-${project.version}</finalName>
        <appendAssemblyId>false</appendAssemblyId>
        <descriptors>
            <descriptor>src/assembly/package.xml</descriptor>
        </descriptors>
    </configuration>
    <executions>
        <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
        <execution>
            <id>test-make-assembly</id>
            <phase>pre-integration-test</phase>
            <goals>
                <goal>single</goal>
            </goals>
        </execution>
    </executions>
</plugin>

Finally, use the mvn clean package to build your project and create the archive.

Installing Extension Providers

To install an Extension Provider, all you need to do is to unpacks the archive into a desired location ( e.g., /usr/share/jikkou-extensions). Also, you should ensure that the archive’s top-level directory name is unique, to prevent overwriting existing files or extensions.

Configuring Extension Providers

Custom extensions can be supplied to the Jikkou’s API Server and Jikkou CLI (when running the Java Binary Distribution, i.e., not the native version). For this, you simply need to configure the jikkou.extension.paths property. The property accepts a list of paths from which to load extension providers.

Example for the Jikkou API Server:

# application.yaml
jikkou:
  extension.paths:
    - /usr/share/jikkou-extensions

Once your extensions are configured you should be able to list your extensions using either :

  • The Jikkou CLI: jikkou api-extensions list command, or
  • The Jikkou API Server: GET /apis/core.jikkou.io/v1/extensions -H "Accept: application/json"

8.1.2 - Develop Custom Validations

Learn how to develop custom resource validations.

This section covers the core classes to develop validation extensions.

Interface

To create a custom validation, you will need to implement the Java interface: io.streamthoughts.jikkou.core.validation.Validation.

This interface defines two methods, with a default implementation for each, to give you the option of validating either all resources accepted by validation at once, or each resource one by one.

public interface Validation<T extends HasMetadata> extends Interceptor {

    /**
     * Validates the specified resource list.
     *
     * @param resources              The list of resources to be validated.
     * @return The ValidationResult.
     */
    default ValidationResult validate(@NotNull final List<T> resources) {
        // code omitted for clarity
    }

    /**
     * Validates the specified resource.
     *
     * @param resource               The resource to be validated.
     * @return The ValidationResult.
     */
    default ValidationResult validate(@NotNull final T resource) {
        // code omitted for clarity
    }
}

Examples

The validation class below shows how to validate that any resource has a specific non-empty label.


@Title("HasNonEmptyLabelValidation allows validating that resources have a non empty label.")
@Description("This validation can be used to ensure that all resources are associated to a specific label. The labe key is passed through the configuration of the extension.")
@Example(
        title = "Validate that resources have a non-empty label with key 'owner'.",
        full = true,
        code = {"""
                validations:
                - name: "resourceMustHaveNonEmptyLabelOwner"
                  type: "com.example.jikkou.validation.HasNonEmptyLabelValidation"
                  priority: 100
                  config:
                    key: owner
                """
        }
)
@SupportedResources(value = {}) // an empty list implies that the extension supports any resource-type
public final class HasNonEmptyLabelValidation implements Validation {

    // The required config property.
    static final ConfigProperty<String> LABEL_KEY_CONFIG = ConfigProperty.ofString("key");

    private String key;

    /**
     * Empty constructor - required.
     */
    public HasNonEmptyLabelValidation() {
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void configure(@NotNull final Configuration config) {
        // Get the key from the configuration.
        this.key = LABEL_KEY_CONFIG
                .getOptional(config)
                .orElseThrow(() -> new ConfigException(
                        String.format("The '%s' configuration property is required for %s",
                                LABEL_KEY_CONFIG.key(),
                                TopicNamePrefixValidation.class.getSimpleName()
                        )
                ));
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public ValidationResult validate(final @NotNull HasMetadata resource) {
        Optional<String> label = resource.getMetadata()
                .findLabelByKey(this.key)
                .map(NamedValue::getValue)
                .map(Value::asString)
                .filter(String::isEmpty);
        // Failure
        if (label.isEmpty()) {
            String error = String.format(
                    "Resource for name '%s' have no defined or empty label for key: '%s'",
                    resource.getMetadata().getName(),
                    this.key
            );
            return ValidationResult.failure(new ValidationError(getName(), resource, error));
        }
        // Success
        return ValidationResult.success();
    }
}

8.1.3 - Develop Custom Action

Learn how to develop custom actions.

This section covers the core classes to develop action extensions.

Interface

To create a custom action, you will need to implement the Java interface: io.streamthoughts.jikkou.core.action.Action.

/**
 * Interface for executing a one-shot action on a specific type of resources.
 *
 * @param <T> The type of the resource.
 */
@Category(ExtensionCategory.ACTION)
public interface Action<T extends HasMetadata> extends HasMetadataAcceptable, Extension {

    /**
     * Executes the action.
     *
     * @param configuration The configuration
     * @return The ExecutionResultSet
     */
    @NotNull ExecutionResultSet<T> execute(@NotNull Configuration configuration);
}

Examples

The Action class below shows how to implement a custom action accepting options`.

@Named(EchoAction.NAME)
@Title("Print the input.")
@Description("The EchoAction allows printing the text provided in input.")
@ExtensionSpec(
        options = {
                @ExtensionOptionSpec(
                        name = INPUT_CONFIG_NAME,
                        description = "The input text to print.",
                        type = String.class,
                        required = true
                )
        }
)
public final class EchoAction extends ContextualExtension implements Action<HasMetadata> {
    public static final String NAME = "EchoAction";
    public static final String INPUT_CONFIG_NAME = "input";
    @Override
    public @NotNull ExecutionResultSet<HasMetadata> execute(@NotNull Configuration configuration) {

        String input = extensionContext().<String>configProperty(INPUT_CONFIG_NAME).get(configuration);

        return ExecutionResultSet
                .newBuilder()
                .result(ExecutionResult
                        .newBuilder()
                        .status(ExecutionStatus.SUCCEEDED)
                        .data(new EchoOut(input))
                        .build())
                .build();
    }

    @Kind("EchoOutput")
    @ApiVersion("core.jikkou.io/v1")
    @Reflectable
    record EchoOut(@JsonProperty("out") String out) implements HasMetadata {

        @Override
        public ObjectMeta getMetadata() {
            return new ObjectMeta();
        }

        @Override
        public HasMetadata withMetadata(ObjectMeta objectMeta) {
            throw new UnsupportedOperationException();
        }
    }
}

8.1.4 - Develop Custom Transformations

Learn how to develop custom resource transformations.

This section covers the core classes to develop transformation extensions.

Interface

To create a custom transformation, you will need to implement the Java interface: io.streamthoughts.jikkou.core.transformation.Transformation.


/**
 * This interface is used to transform or filter resources.
 *
 * @param <T> The resource type supported by the transformation.
 */
public interface Transformation<T extends HasMetadata> extends Interceptor {

    /**
     * Executes the transformation on the specified {@link HasMetadata} object.
     *
     * @param resource  The {@link HasMetadata} to be transformed.
     * @param resources The {@link ResourceListObject} involved in the current operation.
     * @param context   The {@link ReconciliationContext}.
     * @return The list of resources resulting from the transformation.
     */
    @NotNull Optional<T> transform(@NotNull T resource,
                                   @NotNull HasItems resources,
                                   @NotNull ReconciliationContext context);
}

Examples

The transformation class below shows how to filter resource having an annotation exclude: true.

import java.util.Optional;

@Named("ExcludeIgnoreResource")
@Title("ExcludeIgnoreResource allows filtering resources whose 'metadata.annotations.ignore' property is equal to 'true'")
@Description("The ExcludeIgnoreResource transformation is used to exclude from the"
        + " reconciliation process any resource whose 'metadata.annotations.ignore'"
        + " property is equal to 'true'. This transformation is automatically enabled."
)
@Enabled
@Priority(HasPriority.HIGHEST_PRECEDENCE)
public final class ExcludeIgnoreResourceTransformation implements Transformation<HasMetadata> {

    /** {@inheritDoc}**/
    @Override
    public @NotNull Optional<HasMetadata> transform(@NotNull HasMetadata resource,
                                                    @NotNull HasItems resources,
                                                    @NotNull ReconciliationContext context) {
        return Optional.of(resource)
                .filter(r -> HasMetadata.getMetadataAnnotation(resource, "ignore")
                        .map(NamedValue::getValue)
                        .map(Value::asBoolean)
                        .orElse(false)
                 );
    }
}

9 - Frequently Asked Questions

This section regroups all frequently asked questions about Jikkou.

Is Jikkou Free to Use?

Yes, Jikkou is developed and distributed under the Apache License 2.0.

Can I Use Jikkou with Any Kafka Implementation?

Yes, Jikkou can be used with a wide range of Apache Kafka infrastructures, including:

Why would I use Jikkou over Terraform?

What is Terraform and how is it typically used?

Terraform (OpenToFu) is widely recognized as the leading solution for infrastructure provisioning and management. It is commonly used by operations teams for managing cloud infrastructure through its HCL (HashiCorp Configuration Language) syntax.

What are the limitations of Terraform for Kafka Users ?

Many development teams find Terraform challenging to use because:

  • They need to learn HCL syntax, which is not commonly known among developers.
  • They often lack the necessary permissions to apply configuration files directly.
  • They often struggle with Terraform states.

How does Jikkou address these limitations?

Jikkou is designed to be a straightforward CLI tool for both developers and operations teams. It simplifies the process of managing infrastructure, especially for development teams who may not have expertise in HCL or the permissions required for Terraform.

What are the benefits of using Jikkou for Kafka management?

  • On-Premises and Multi-Cloud Support: Unlike many Terraform providers which focus on cloud-based Kafka services ( e.g., Confluent Cloud), Jikkou supports on-premises, multi-cloud, and hybrid infrastructures.

  • Versatility: Jikkou can manage Kafka topics across various environments, including local Kafka clusters in Docker, ephemeral clusters in Kubernetes for CI/CD, and production clusters in Aiven Cloud.

  • Auditing and Backup: Beyond provisioning, Jikkou can audit Kafka platforms for configuration issues and create backups of Kafka configurations (Topics, ACLs, Quotas, etc.).

There are, of course, many reasons to use Terraform rather than Jikkou and vice versa. As usual, the choice of tool really depends on your needs, the organization you’re in, the skills of the people involved and so on.

10 - Community

What does your user need to know to try your project?

10.1 - Developer Guide

How to set up your environment for developing on Jikkou.

Prerequisites

Building Jikkou

We use Maven Wrapper to build our project. The simplest way to get started is:

For building distribution files.

$ ./mvnw clean package -Pdist -DskipTests

Alternatively, we also use Make to package and build the Docker image for Jikkou:

$ make

Running tests

For running all tests and checks:

$ ./mvnw clean verify

Code Format

This project uses the Maven plugin Spotless to format all Java classes and to apply some code quality checks.

Bugs & Security

This project uses the Maven plugin SpotBugs and FindSecBugs to run some static analysis to look for bugs in Java code.

Reported bugs can be analysed using SpotBugs GUI:

$ ./mvnw spotbugs:gui

10.2 - Contribution Guidelines

How to contribute to Jikkou

Jikkou is an open source project, and we love getting patches and contributions to make Jikkou and its docs even better.

Contributing to Jikkou

The Jikkou project itself lives in https://github.com/streamthoughts/jikkou

Code reviews

All submissions, including submissions by project members, require review. We use GitHub pull requests for this purpose. Consult GitHub Help for more information on using pull requests.

Creating issues

Alternatively, if there’s something you’d like to see in Jikkou (or if you’ve found something that isn’t working the way you’d expect), but you’re not sure how to fix it yourself, please create an issue.

11 - Releases

11.1 - Release v0.32.0

Jikkou 0.32.0: Moving Beyond Apache Kafka. Introducing new features: Extension Providers, Actions, etc.!

I’m thrilled to announce the release of Jikkou 0.32.0 which packs two major features: External Extension Providers and Actions. 🙂

Highlights: What’s new in Jikkou 0.32.0?

  • New External Extension Provider mechanism to extend Jikkou features.

  • New extension type Action to execute specific operations against resources.

  • New action for resetting consumer group offsets.

  • New action for restarting connector and tasks for Kafka Connect.

  • New option selector-match to exclude/include resources from being returned or reconciled by Jikkou.

  • New API to get resources by their name.

Extension Providers

Jikkou is a project that continues to reinvent and redefine itself with each new version. Initially developed exclusively to manage the configuration of Kafka topics, it can now be used to manage Schema Registries, Kafka Connect connectors, and more. But, the funny thing is that Jikkou isn’t coupled with Kafka. It was designed around a concept of pluggable extensions that enable new capabilities and kind of resources to be seamlessly added to the project. For this, Jikkou uses the Java Service Loader mechanism to automatically discover new extensions at runtime.

Unfortunately, until now there has been no official way of using this mechanism with Jikkou CLI or Jikkou API Server. For this reason, Jikkou 0.32.0 brings the capability to easily configuration external extensions.

So how does it work? Well, let’s imagine you want to be able to load Text Files from the local filesystem using Jikkou.

First, we need to create a new Java project and add the Jikkou Core library to your project’s dependencies ( io.streamthoughts:jikkou-core:0.32.0 dependency).

Then, you will need to create some POJO classes to represent your resource (e.g., V1File.class) and to implement the Collector interface :


@SupportedResource(type = V1File.class)
@ExtensionSpec(
        options = {
                @ExtensionOptionSpec(
                        name = "directory",
                        description = "The absolute path of the directory from which to collect files",
                        type = String.class,
                        required = true
                )
        }
)
@Description("FileCollector allows listing all files in a given directory.")
public final class FileCollector
        extends ContextualExtension
        implements Collector<V1File> {
    private static final Logger LOG = LoggerFactory.getLogger(FileCollector.class);

    @Override
    public ResourceListObject<V1File> listAll(@NotNull Configuration configuration,
                                              @NotNull Selector selector) {

        // Get the 'directory' option.
        String directory = extensionContext().<String>configProperty("directory").get(configuration);

        // Collect all files.
        List<V1File> objects = Stream.of(new File(directory).listFiles())
                .filter(file -> !file.isDirectory())
                .map(file -> {
                    try {
                        Path path = file.toPath();
                        String content = Files.readString(path);
                        V1File object = new V1File(ObjectMeta
                                .builder()
                                .withName(file.getName())
                                .withAnnotation("system.jikkou.io/fileSize", Files.size(path))
                                .withAnnotation("system.jikkou.io/fileLastModifier", Files.getLastModifiedTime(path))
                                .build(),
                                new V1FileSpec(content)
                        );
                        return Optional.of(object);
                    } catch (IOException e) {
                        LOG.error("Cannot read content from file: {}", file.getName(), e);
                        return Optional.<V1File>empty();
                    }
                })
                .flatMap(Optional::stream)
                .toList();
        ObjectMeta objectMeta = ObjectMeta
                .builder()
                .withAnnotation("system.jikkou.io/directory", directory)
                .build();
        return new DefaultResourceListObject<>("FileList", "system.jikkou.io/v1", objectMeta, objects);
    }
}

Next, you will need to implement the ExtensionProvider interface to register both your extension and your resource kind.

public final class FileExtensionProvider implements ExtensionProvider {

    /**
     * Registers the extensions for this provider.
     *
     * @param registry The ExtensionRegistry.
     */
    public void registerExtensions(@NotNull ExtensionRegistry registry) {
        registry.register(FileCollector.class, FileCollector::new);
    }

    /**
     * Registers the resources for this provider.
     *
     * @param registry The ResourceRegistry.
     */
    public void registerResources(@NotNull ResourceRegistry registry) {
        registry.register(V1File.class);
    }
}

Then, the fully qualified name of the class must be added to the resource file META-INF/service/io.streamthoughts.jikkou.spi.ExtensionProvider.

Finally, all you need to do is to package your project as a tarball or ZIP archive. The archive must contain a single top-level directory containing the extension JAR files, as well as any resource files or third-party libraries required by your extensions.

To install your Extension Provider, all you need to do is to unpacks the archive into a desired location (e.g., /usr/share/jikkou-extensions) and to configure the Jikkou’s API Server or Jikkou CLI (when running the Java Binary Distribution, i.e., not the native version) with the jikkou.extension.paths property (e.g., jikkou.extension.paths=/usr/share/jikkou-extensions). For people who are familiar with how Kafka Connect works, it’s more or less the same mechanism.

(The full code source of this example is available on GitHub).

And that’s it! 🙂

Extension Providers open up the possibility of infinitely expanding Jikkou to manage your own resources, and use it with systems other than Kafka.

Actions

Jikkou uses a declarative approach to manage the asset state of your data infrastructure using resource descriptors written in YAML. But sometimes, ops and development teams may need to perform specific operations on their resources that cannot be included in their descriptor files. For example, you may need to reset offsets for one or multiple Kafka Consumer Groups, restart failed connectors and tasks for Kafka Connect, etc. So instead of having to switch from one tool to another, why not just use Jikkou for this?

Well, to solve that need, Jikkou 0.32.0 introduces a new type of extension called “Actions” that allows users to perform specific operations on resources.

Combined with the External Extension Provider mechanism, you can now implement the Action interface to add custom operations to Jikkou.


@Category(ExtensionCategory.ACTION)
public interface Action<T extends HasMetadata> extends HasMetadataAcceptable, Extension {

    /**
     * Executes the action.
     *
     * @param configuration The configuration
     * @return The ExecutionResultSet
     */
    @NotNull
    ExecutionResultSet<T> execute(@NotNull Configuration configuration);
}

Actions are fully integrated with Jikkou API Server through the new Endpoint: /api/v1/actions/{name}/execute{?[options]

Kafka Consumer Groups

Altering Consumer Group Offsets

Jikkou 0.32.0 introduces the new KafkaConsumerGroupsResetOffsets action allows resetting offsets of consumer groups.

Here is an example showing how to reset the group my-group to the earliest offsets for topic test.

$ jikkou action kafkaconsumergroupresetoffsets execute \
--group my-group \
--topic test \
--to-earliest

(output)

kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
  labels: { }
  annotations:
    configs.jikkou.io/to-earliest: "true"
    configs.jikkou.io/group: "my-group"
    configs.jikkou.io/dry-run: "false"
    configs.jikkou.io/topic:
      - "test"
results:
  - status: "SUCCEEDED"
    errors: [ ]
    data:
      apiVersion: "kafka.jikkou.io/v1beta1"
      kind: "KafkaConsumerGroup"
      metadata:
        name: "my-group"
        labels:
          kafka.jikkou.io/is-simple-consumer: false
        annotations: { }
      status:
        state: "EMPTY"
        members: [ ]
        offsets:
          - topic: "test"
            partition: 1
            offset: 0
        coordinator:
          id: "101"
          host: "localhost"
          port: 9092

This action is pretty similar to the kafka-consumer-group script that ships with Apache Kafka. You can use it to reset a consumer group to the earliest or latest offsets, to a specific datetime or specific offset.

In addition, it can be executed in a dry-run.

Deleting Consumer Groups

You can now add the core annotation jikkou.io/delete to a KafkaConsumerGroup resource to mark it for deletion:

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
  name: "my-group"
  labels:
    kafka.jikkou.io/is-simple-consumer: false
  annotations:
    jikkou.io/delete: true
$ jikkou delete --files my-consumer-group.yaml -o wide

TASK [DELETE] Delete consumer group 'my-group' - CHANGED ************************************************
{
  "status" : "CHANGED",
  "changed" : true,
  "failed" : false,
  "end" : 1701162781494,
  "data" : {
    "apiVersion" : "core.jikkou.io/v1beta2",
    "kind" : "GenericResourceChange",
    "metadata" : {
      "name" : "my-group",
      "labels" : {
        "kafka.jikkou.io/is-simple-consumer" : false
      },
      "annotations" : {
        "jikkou.io/delete" : true,
        "jikkou.io/managed-by-location" : "./my-consumer-group.yaml"
      }
    },
    "change" : {
      "before" : {
        "apiVersion" : "kafka.jikkou.io/v1beta1",
        "kind" : "KafkaConsumerGroup",
        "metadata" : {
          "name" : "my-group",
          "labels" : {
            "kafka.jikkou.io/is-simple-consumer" : false
          },
          "annotations" : { }
        }
      },
      "operation" : "DELETE"
    }
  },
  "description" : "Delete consumer group 'my-group'"
}
EXECUTION in 64ms 
ok : 0, created : 0, altered : 0, deleted : 1 failed : 0

Kafka Connect

Restarting Connector and Tasks

This new release also packs with the new action KafkaConnectRestartConnectors action allows a user to restart all or just the failed Connector and Task instances for one or multiple named connectors.

Here are a few examples from the documentation:

  • Restarting all connectors for all Kafka Connect clusters.
  $ jikkou action kafkaconnectrestartconnectors execute
---
kind: "ApiActionResultSet"
apiVersion: "core.jikkou.io/v1"
metadata:
labels: {}
annotations: {}
results:
- status: "SUCCEEDED"
  data:
    apiVersion: "kafka.jikkou.io/v1beta1"
    kind: "KafkaConnector"
    metadata:
      name: "local-file-sink"
      labels:
        kafka.jikkou.io/connect-cluster: "my-connect-cluster"
      annotations: {}
    spec:
      connectorClass: "FileStreamSink"
      tasksMax: 1
      config:
        file: "/tmp/test.sink.txt"
        topics: "connect-test"
      state: "RUNNING"
      status:
        connectorStatus:
        name: "local-file-sink"
        connector:
        state: "RUNNING"
        workerId: "connect:8083"
        tasks:
          - id: 0
           state: "RUNNING"
            workerId: "connect:8083"
  • Restarting a specific connector and tasks for on Kafka Connect cluster
  $ jikkou action kafkaconnectrestartconnectors execute \
  --cluster-name my-connect-cluster
  --connector-name local-file-sink \
  --include-tasks

New Selector Matching Strategy

Jikkou CLI allows you to provide one or multiple *selector expressions *in order to include or exclude resources from being returned or reconciled by Jikkou. In previous versions, selectors were cumulative, so resources had to match all selectors to be returned. Now, you can specify a selector matching strategy to determine how expressions must be combined using the option –selector-match=[ANY|ALL|NONE].

  • ALL: A resource is selected if it matches all selectors.

  • ANY: A resource is selected if it matches one of the selectors.

  • NONE: A resource is selected if it matches none of the selectors.

For example, the below command will only return topics matching the regex ^__.* or having a name equals to _schemas.

$ jikkou get kafkatopics \
--selector 'metadata.name MATCHES (^__connect-*)'
--selector 'metadata.name IN (_schemas)'
--selector-match ANY

New Get Resource by Name

In some cases, it may be necessary to retrieve only a specific resource for a specific name. In previous versions, the solution was to use selectors. Unfortunately, this approach isn’t very efficient, as it involves retrieving all the resources and then filtering them. To start solving that issue, Jikkou v0.32.0 adds a new API to retrieve a resource by its name.

Example (using Jikkou CLI):

$ jikkou get kafkatopics --name _schemas

Example (using Jikkou API Server):

$ curl -sX GET \
http://localhost:28082/apis/kafka.jikkou.io/v1/kafkatopics/_schemas \
-H 'Accept:application/json'

Note : Currently not all resources have been updated to use that new API, so it’s possible that selectors are used as a default implementation by internal Jikkou API.

11.2 - Release v0.33.0

Introducing Jikkou 0.33.0

We’re excited to unveil the latest release of Jikkou 0.33.0. 🎉

To install the new version, please visit the installation guide. For detailed release notes, check out the GitHub page.

What’s New in Jikkou 0.33.0?

  • Enhanced resource change format.
  • Added support for the patch command.
  • Introduced the new --status option for KafkaTopic resources.
  • Exported offset-lag to the status of KafkaConsumerGroup resources.

Below is a summary of these new features with examples.

Diff/Patch Commands

In previous versions, Jikkou provided the diff command to display changes required to reconcile input resources. However, this command lacked certain capabilities to be truly useful. This new version introduces a standardized change format for all resource types, along with two new options for filtering changes:

  • --filter-resource-op=: Filters out all state changes except those corresponding to the given operations.
  • --filter-change-op=: Filters out all resources except those corresponding to the given operations.

The new output format you can expect from the diff command is as follows:

---
apiVersion: [ group/version of the change ]
kind: [ kind of the change ]
metadata: [ resource metadata ]
spec:
  # Array of state changes
  changes:
    - name: [ name of the changed state ]
      op: [ change operation ]
      before: [ value of the state before the operation ]
      after: [ value of the state after the operation ]
  data: [ static data attached to the resource ]
  op: [ resource change operation ]

The primary motivation behind this new format is the introduction of a new patch command. Prior to Jikkou 0.33.0, when using the apply command after a dry-run or a diff command, Jikkou couldn’t guarantee that the applied changes matched those returned from the previous command. With Jikkou 0.33.0, you can now directly pass the result of the diff command to the new patch command to efficiently apply the desired state changes.

Here’s a workflow to create your resources:

Step 1) Create a resource descriptor file

cat << EOF > my-topic.yaml
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: KafkaTopic
metadata:
  name: 'my-topic'
  labels:
    environment: example
spec:
  partitions: 3
  replicas: 1
  configs:
    min.insync.replicas: 1
    cleanup.policy: 'delete'
EOF

Step 2) Run diff

jikkou diff -f ./my-topic.yaml > my-topic-diff.yaml

(output)
---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopicChange"
metadata:
  name: "my-topic"
  labels:
    environment: "example"
  annotations:
    jikkou.io/managed-by-location: "my-topic.yaml"
spec:
  changes:
    - name: "partitions"
      op: "CREATE"
      after: 3
    - name: "replicas"
      op: "CREATE"
      after: 1
    - name: "config.cleanup.policy"
      op: "CREATE"
      after: "delete"
    - name: "config.min.insync.replicas"
      op: "CREATE"
      after: 1
  op: "CREATE"

Step 3) Run patch

jikkou patch -f ./my-topic-diff.yaml --mode FULL --output compact

(output)

TASK [
  CREATE
] Create topic 'my-topic' (partitions=3, replicas=1, configs=[cleanup.policy=delete, min.insync.replicas=1]) - CHANGED
EXECUTION in 3s 797ms
ok: 0, created: 1, altered: 0, deleted: 0 failed: 0

Attempting to apply the changes a second time may result in an error from the remote system:

{
  "status": "FAILED",
  "description": "Create topic 'my-topic' (partitions=3, replicas=1,configs=[cleanup.policy=delete,min.insync.replicas=1])",
  "errors": [ {
    "message": "TopicExistsException: Topic 'my-topic' already exists."
  } ],
  ...
}

Resource Provider for Apache Kafka

Jikkou 0.33.0 also packs with some minor improvements for the Apache Kafka provider.

KafkaTopic Status

You can now describe the status of a topic-partitions by using the new --status option
when getting a KafkaTopic resource.

jikkou get kt --status --selector "metadata.name IN (my-topic)"

---
apiVersion: "kafka.jikkou.io/v1beta2"
kind: "KafkaTopic"
metadata:
  name: "my-topic"
  labels:
    kafka.jikkou.io/topic-id: "UbZI2N2YQTqfNcbKKHps5A"
  annotations:
    kafka.jikkou.io/cluster-id: "xtzWWN4bTjitpL3kfd9s5g"
spec:
  partitions: 1
  replicas: 1
  configs:
    cleanup.policy: "delete"
  configMapRefs: [ ]
status:
  partitions:
    - id: 0
      leader: 101
      isr:
        - 101
      replicas:
        - 101

KafkaConsumerGroup OffsetLags

With Jikkou 0.33.0, you can export the offset-lag of a KafkaConsumerGroup resource using the --offsets option.

jikkou get kafkaconsumergroups --offsets

---
apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConsumerGroup"
metadata:
  name: "my-group"
  labels:
    kafka.jikkou.io/is-simple-consumer: false
status:
  state: "EMPTY"
  members: [ ]
  offsets:
    - topic: "my-topic"
      partition: 0
      offset: 16
      offset-lag: 0
  coordinator:
    id: "101"
    host: "localhost"
    port: 9092

Finally, all those new features are also completely available through the Jikkou REST Server.

Wrapping Up

We hope you enjoy these new features. If you encounter any issues with Jikkou v0.33.0, please feel free to open a GitHub issue on our project page. Don’t forget to give us a ⭐️ on Github to support the team, and join us on Slack.

11.3 - Release v0.34.0

Introducing Jikkou 0.34.0

We’re excited to unveil the latest release of Jikkou 0.34.0. 🎉

To install the new version, please visit the installation guide. For detailed release notes, check out the GitHub page.

What’s New in Jikkou 0.34.0?

  • Enhanced Aiven provider with support for Kafka topics.
  • Added SSL support for Kafka Connect and Schema Registry
  • Introduced dynamic connection for Kafka Connect clusters

Below is a summary of these new features with examples.

Topic Aiven for Apache Kafka

Jikkou 0.34.0 adds a new KafkaTopic kind that can be used to manage kafka Topics directly though the Aiven API.

You can list kafka topics using the new command below:

jikkou get avn-kafkatopics

In addition, topics can be described, created and updated using the same resource model as the Apache Kafka provider.

# file:./aiven-kafkat-topics.yaml
---
apiVersion: "kafka.aiven.io/v1beta2"
kind: "KafkaTopic"
metadata:
  name: "test"
  labels:
    tag.aiven.io/my-tag: "test-tag"
spec:
  partitions: 1
  replicas: 3
  configs:
    cleanup.policy: "delete"
    compression.type: "producer"

The main advantages of using this new resource kind are the use of the Aiven Token API to authenticate to the Aiven API and the ability to manage tags for kafka topics.

SSL support for Kafka Connect and Schema Registry

Jikkou 0.34.0 also brings SSL support for the Kafka Connect and Schema Registry providers. Therefore, it’s now possible to configure the providers to authenticate using SSL certificate.

Example for the Schema Registry:

jikkou {
  schemaRegistry {
    url = "https://localhost:8081"
    authMethod = "SSL"
    sslKeyStoreLocation = "/certs/registry.keystore.jks"
    sslKeyStoreType = "JKS"
    sslKeyStorePassword = "password"
    sslKeyPassword = "password"
    sslTrustStoreLocation = "/certs/registry.truststore.jks"
    sslTrustStoreType = "JKS"
    sslTrustStorePassword = "password"
    sslIgnoreHostnameVerification = true
  }
}

Dynamically connection for Kafka Connect clusters

Before Jikkou 0.34.0, to deploy a Kafka Connect connector, it was mandatory to configure a connection to a target cluster:

jikkou {
  extensions.provider.kafkaconnect.enabled = true
  kafkaConnect {
    clusters = [
      {
        name = "my-connect-cluster"
        url = "http://localhost:8083"
      }
    ]
  }
}

This connection could then be referenced in a connector resource definition via the annotation kafka.jikkou.io/connect-cluster.

apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  name: "mu-connector"
  labels:
    kafka.jikkou.io/connect-cluster: "my-connect-cluster"

This approach is suitable for most use cases, but can be challenging if you need to manage a large and dynamic number of Kafka Connect clusters.

To meet this need, it is now possible to provide connection information for the cluster to connect to directly, through the new metadata annotation new metadata annotation: jikkou.io/config-override.

Here is a simple example showing the use of the new annotation:

apiVersion: "kafka.jikkou.io/v1beta1"
kind: "KafkaConnector"
metadata:
  name: "mu-connector"
  labels:
    kafka.jikkou.io/connect-cluster: "my-connect-cluster"
  annotations:
    jikkou.io/config-override: |
      { "url": "http://localhost:8083" }      

This new annotation can be used with the Jikkou’s Jinja template creation mechanism to define dynamic configuration.

Wrapping Up

We hope you enjoy these new features. If you encounter any issues with Jikkou v0.33.0, please feel free to open a GitHub issue on our project page. Don’t forget to give us a ⭐️ on Github to support the team, and join us on Slack.