Skip to main content

Getting started

After your service is successfully provided, you will find it in your overview.

Overview Page

In the overview for your managed kafka cluster, you find everything you need to connect to your kafka cluster. Copy all certificates in the corresponding files, which are already the correct format for certificate files (PEM). Your bootstrap server url is needed to set the bootstrap server configuration property for connecting to kafka.

Create your first consumer and producer

For the examples provided below, the following requirements are needed:

confluent_kafka[avro]==2.1.1
faker==18.11.1

You can use the following python code to produce some JSON messaged on your newly made kafka cluster.

from confluent_kafka import Producer
import json
import argparse
import uuid
from faker import Faker
import random

# Create a faker instance
fake = Faker()


def main(args):
conf = {
'bootstrap.servers': args.bootstrap_servers,
'security.protocol': 'ssl',
'ssl.ca.location': args.ca,
'ssl.certificate.location': args.cert,
'ssl.key.location': args.key
}

# Create Producer instance
p = Producer(**conf)

def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

# Produce random data with UUID key
for _ in range(10): # adjust range as needed
key = str(uuid.uuid4())
data = {
'number': random.randint(1, 100),
'name': fake.name(),
'location': fake.address().replace('\n', ', ')
}
p.produce(args.topic, key=key, value=json.dumps(data), callback=delivery_report)

# Wait for any outstanding messages to be delivered and delivery reports to be received.
p.flush()


if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Kafka Json Producer Script")
parser.add_argument('--bootstrap-servers', required=True, help='Bootstrap server address')
parser.add_argument('--topic', required=True, help='Topic to produce to')
parser.add_argument('--cert', required=True, help='Path to the client certificate file')
parser.add_argument('--key', required=True, help='Path to the client private key file')
parser.add_argument('--ca', required=True, help='Path to the CA certificate file')
args = parser.parse_args()

main(args)

You can execute this python code using:

python producer_json.py \
--bootstrap_servers <your_bootstrap_server> \
--topic <topic_name> \
--cert </path/to/client.pem> \
--key </path/to/key.pem> \
--ca </path/to/ca.pem>

Analog you can read from your freshly generated topic using the following consumer.py code:

from confluent_kafka import Consumer, KafkaException
import argparse
import json
import sys

# Define Kafka configuration


def print_assignment(consumer, partitions):
print('Assignment:', partitions)


def main(args):
conf = {
'bootstrap.servers': args.bootstrap_servers,
'group.id': args.group_id,
'security.protocol': 'ssl',
'ssl.ca.location': args.ca,
'ssl.certificate.location': args.cert,
'ssl.key.location': args.key,
'auto.offset.reset': args.offset
}

# Create Consumer instance
c = Consumer(conf)
try:
c.subscribe([args.topic], on_assign=print_assignment)

while True:
msg = c.poll(1.0) # wait up to 1 second for a message

if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print('Received message: {}'.format(msg.value().decode('utf-8')))

except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')

finally:
# Close down consumer to commit final offsets.
c.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(description="Kafka Consumer Script")
parser.add_argument('--bootstrap-servers', required=True, help='Bootstrap server address')
parser.add_argument('--topic', required=True, help='Topic to consume from')
parser.add_argument('--group-id', required=True, help='Consumer group id')
parser.add_argument('--cert', required=True, help='Path to the client certificate file')
parser.add_argument('--key', required=True, help='Path to the client private key file')
parser.add_argument('--ca', required=True, help='Path to the CA certificate file')
parser.add_argument('--offset', required=False, default='earliest',
help='Offset to start consuming from. Defaults to "earliest".')
args = parser.parse_args()

main(args)

The code allows you to set a group_id and offset for additional configuration. An example execution command will look like this:

python producer_json.py \
--bootstrap_servers <your_bootstrap_server> \
--topic <topic_name> \
--group-id <group_id> \
--cert </path/to/client.pem> \
--key </path/to/key.pem> \
--ca </path/to/ca.pem> \
--offset <offset:earliest>

For the possible types of offset, refer to the kafka documentation.

Avro serizalized/deserialized Producer/Consumer

When using avro, you need to define a key and value schema for your data. For sake of simplicity, we will just provide a value schema and set a key to be an UUID:

{
"type": "record",
"name": "Data",
"fields" : [
{
"name": "number",
"type": "int"},
{
"name": "name",
"type": "string"},
{
"name": "location",
"type": "string"}
]
}

Here is a python producer_avro.py example implementation to produce messages to a kafka topic:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import os
from uuid import uuid4
from random import randint, choice
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from faker import Faker
import time

faker = Faker()

class Data(object):
"""
Data record
Args:
number (int): Random integer
name (str): Random name
location (str): Random location
"""

def __init__(self, number, name, location):
self.number = number
self.name = name
self.location = location

def data_to_dict(data, ctx):
"""
Returns a dict representation of a Data instance for serialization.
Args:
data (Data): Data instance.
ctx : SerializationContext
Returns:
dict: Dict populated with data attributes to be serialized.
"""
return dict(number=data.number, name=data.name, location=data.location)

def delivery_report(err, msg):
"""
Reports the failure or success of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
"""
if err is not None:
print("Delivery failed for Data record {}: {}".format(msg.key(), err))
return
print('Data record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))

def main(args):
topic = args.topic

schema = "value.avsc"

path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/{schema}") as f:
schema_str = f.read()

schema_registry_conf = {
'url': args.schema_registry,
'ssl.certificate.location': args.sr_client_cert,
'ssl.key.location': args.sr_client_key,
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
data_to_dict)

string_serializer = StringSerializer('utf_8')

producer_conf = {
'bootstrap.servers': args.bootstrap_servers,
'security.protocol': 'ssl',
'ssl.ca.location': args.ca_cert,
'ssl.certificate.location': args.client_cert,
'ssl.key.location': args.client_key,
}

producer = Producer(producer_conf)

print("Producing data records to topic {}. ^C to exit.".format(topic))
while True:
producer.poll(0.0)
try:
data = Data(number=randint(0, 100), name=faker.name(), location=faker.address().replace('\n', ', '))
producer.produce(topic=topic,
key=string_serializer(str(uuid4())),
value=avro_serializer(data, SerializationContext(topic, MessageField.VALUE)),
on_delivery=delivery_report)
time.sleep(args.sleep)
except KeyboardInterrupt:
break

print("\nFlushing records...")
producer.flush()

if __name__ == '__main__':
parser = argparse.ArgumentParser(description="AvroSerializer example")
parser.add_argument('-b', dest="bootstrap_servers", required=True,
help="Bootstrap broker(s) (host[:port])")
parser.add_argument('--ca-cert', required=True,
help="Path to the Kafka ca cert (PEM format)")
parser.add_argument('--client-cert', required=True,
help="Path to the Kafka client cert (PEM format)")
parser.add_argument('--client-key', required=True,
help="Path to the Kafka client key (PEM format)")
parser.add_argument('-s', dest="schema_registry", required=True,
help="Schema Registry (http(s)://host[:port]")
parser.add_argument('--sr-client-cert', required=True,
help="Path to the Schema Registry client cert (PEM format)")
parser.add_argument('--sr-client-key', required=True,
help="Path to the Schema Registry client key (PEM format)")
parser.add_argument('-t', dest="topic", default="example_serde_avro",
help="Topic name")
parser.add_argument('--sleep', default=0.5,
help="Default waiting time to produce next message")

main(parser.parse_args())

Use your producer with the following command:

python producer_avro.py \
-b <bootstrap_servers> \
-t <topic_name> \
--client-cert </path/to/client.cert> \
--client-key </path/to/client.key> \
--ca-cert </path/to/ca.cert> \
-s <https://<schema_registry_url{/apis/ccompat/v6}> \
--sr-client-cert </path/to/sr_client.cert> \
--sr-client-key </path/to/sr_client.key>

Analog you can use a consumer_avro.py to read from your freshly created data on a avro serialized topic using schema registry:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse
import os

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer


class Data(object):
"""
Data record

Args:
number (int): A number

name (str): A name

location (str): A location
"""

def __init__(self, number=None, name=None, location=None):
self.number = number
self.name = name
self.location = location


def dict_to_data(obj, ctx):
"""
Converts object literal(dict) to a Data instance.

Args:
obj (dict): Object literal(dict)
ctx : SerializationContext
"""

if obj is None:
return None

return Data(number=obj['number'], name=obj['name'], location=obj['location'])


def main(args):
topic = args.topic
schema = "value.avsc"

path = os.path.realpath(os.path.dirname(__file__))
with open(f"{path}/{schema}") as f:
schema_str = f.read()

sr_conf = {
'url': args.schema_registry,
'ssl.certificate.location': args.sr_client_cert,
'ssl.key.location': args.sr_client_key,
}
schema_registry_client = SchemaRegistryClient(sr_conf)

avro_deserializer = AvroDeserializer(schema_registry_client,
schema_str,
dict_to_data)

consumer_conf = {'bootstrap.servers': args.bootstrap_servers,
'security.protocol': 'ssl',
# CA certificate file for verifying the broker's certificate.
'ssl.ca.location': args.ca_cert,
# Client's certificate
'ssl.certificate.location': args.client_cert,
# Client's key
'ssl.key.location': args.client_key,
'group.id': args.group,
'auto.offset.reset': "earliest"}

consumer = Consumer(consumer_conf)
consumer.subscribe([topic])

while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue

data = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
if data is not None:
print("Data record {}: \n "
"number: {}"
"\tname: {}"
"\tlocation: {}"
.format(msg.key(), data.number, data.name, data.location))
except KeyboardInterrupt:
break

consumer.close()


if __name__ == '__main__':
parser = argparse.ArgumentParser(description="AvroDeserializer example")
parser.add_argument('-b', dest="bootstrap_servers", required=True,
help="Bootstrap broker(s) (host[:port])")
parser.add_argument('--ca-cert', required=True,
help="Path to the Kafka ca cert (PEM format)")
parser.add_argument('--client-cert', required=True,
help="Path to the Kafka client cert (PEM format)")
parser.add_argument('--client-key', required=True,
help="Path to the Kafka client key (PEM format)")
parser.add_argument('-s', dest="schema_registry", required=True,
help="Schema Registry (http(s)://host[:port]")
parser.add_argument('--sr-client-cert', required=True,
help="Path to the Schema Registry client cert (PEM format)")
parser.add_argument('--sr-client-key', required=True,
help="Path to the Schema Registry client key (PEM format)")
parser.add_argument('-t', dest="topic", default="example_serde_avro",
help="Topic name")
parser.add_argument('-g', dest="group", default="example_serde_avro",
help="Consumer group")

main(parser.parse_args())

Use your consumer with the following command:

python consumer_avro.py \
-b <bootstrap_servers> \
-t <topic_name> \
--client-cert </path/to/client.cert> \
--client-key </path/to/client.key> \
--ca-cert </path/to/ca.cert> \
-s <https://<schema_registry_url{/apis/ccompat/v6}> \
--sr-client-cert </path/to/sr_client.cert> \
--sr-client-key </path/to/sr_client.key>

Start including ValueCloud Event Streaming into your applications!