Development Guides
Kafka​
Les politiques BeC ne s'appliquent pas sur des EdS KAFKA.
Protocole Natif de Kafka.
Paramètres à exposer : variables d'environnement​
Les paramètres suivants doivent être exposés par le module pour présenter la manière de paramétrer l'espace de stockage en laissant la main au système ou à l'administrateur pour les configurer.
| Nom du paramètre | Obligatoire | Paramètre producer / consumer | Description |
|---|---|---|---|
| brokers | Oui | bootstrap.servers | Brocker kafka vers lesquels se connecter : host1:port1[,host2:port2]* |
| groupId | Oui | group.id | Pour le consumer uniquement. Identifie si les modules répartissent la charge de consommation. Par défaut un identifiant unique du module sera alimenté. |
| acks | Oui | acks | Pour le producer uniquement. Une valeur par défaut est toutefois attendue dans chacune des configurations alignées avec la préconisation du développeur sur la création des files Kafka. |
| topic | Oui | Nom du Topic Ă appeler |
Par ailleurs, dans l'hypothèse ou un module serait en relation avec différentes files kafka, veuillez à isoler systématiquement tous les paramètres pour chacune des files.
Cela correspond à systématiquement créer un producer / consumer pour chaque topic.
Par exemple:
- topic1
- topic1.brockers
- topic1.groupID
- topic1.user
- topic1.password
- topic2
- topic2.brockers
- topic2.groupID
- topic2.user
- topic2.password
Restrictions au développement​
Fonctionnalités Kafka​
Les modules s'appuient exclusivement sur les API standards producers et consumers. Les API Connect et Stream ne peuvent pas être utilisées dans un module.
Les clients kafka doivent de version 2.x.y
Version librairies​
- Java:
kafka-client:2.4.1 - Python:
kafka_python-2.0.2-py2.py3-none-any.whl - Scala(spark):
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.0.1"
Production de messages​
Le chapitre suivant se focalise sur la publication des messages. Il présente les propriétés pour lesquelles une attention particulière est requise au moment du développement.
key.serializer & value.serializer​
Cf. chapitre ci-dessous sur les standards associés aux formats des messages transités.
acks​
Doit être propagé comme paramètre de manière à pouvoir exposer la garantie de publication souhaitée lors du paramétrage de l'application.
bootstrap.servers​
Doit être propagé comme paramètre.
compression.type​
Le type de compression est géré au niveau du topic. Cette propriété ne doit pas être renseignée.
security.protocol​
Constante: "SASL_PLAINTEXT". Il est préférable de l'exposer auprès du service applicatif.
sasl.mechanism​
Constante: "SCRAM-SHA-512". Peut-être gérée de la même manière que security protocol.
sasl.jaas.config​
La valeur est construite de la manière suivante: org.apache.kafka.common.security.scram.ScramLoginModule required username=" + var_user + " password=" + var_password + ";"
ssl.*​
Constante locale au module dont la valeur sera fournie ultérieurement. Il n'est pas nécessaire de l'exposer auprès du service applicatif. Les valeurs par défaut sont à appliquer en standard.
Exemple de connexion et publication pour Java​
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
public class SimpleProducer {
// A Exposer en dehors du module pour ajustement personnalisé sur la plateforme
private final static String TOPIC = "my-example-topic";
private final static String BROCKERS = "kafka.artemis.sdk:9092";
private final static String USER = "my_kafka_user";
private final static String PASSWORD = "my_kafka_password";
private final static String CLIENTID = "module_name";
private final static String GROUPID = "module_name";
private final static String ACKS = "all";
private final static String SECURITY_PROTOCOL = "SASL_PLAINTEXT";
public static produce() throws Exception{
// Mise en place d'un instance pour fournir toutes les propriétés nécessaires à la configuration de l'accès de la part du producer.
Properties props = new Properties();
props.put("bootstrap.servers", BROCKERS);
props.put("acks", ACKS);
props.put("security.protocol", SECURITY_PROTOCOL);
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", org.apache.kafka.common.security.scram.ScramLoginModule.class.getName() + " required username=" + USER + " password=" + PASSWORD + ";");
props.put("group_id", GROUPID);
props.put("client_id", CLIENTID);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
try (Producer<String, String> producer = new KafkaProducer<String, String>(props);)
{
producer.send(new ProducerRecord<String, String>(TOPIC, "0", "Hello World !"));
}
}
}
Exemple de connexion et publication pour Scala et Spark​
val options: Map[String, String] = Map("kafka.bootstrap.servers" -> kafkaServers)
val sasl = "org.apache.kafka.common.security.scram.ScramLoginModule required username=" + kafkaUser + " password=\"" + kafkaPassword + "\";"
options = options + ("kafka.sasl.jaas.config" -> sasl, "kafka.sasl.mechanism" -> "SCRAM-SHA-512", "kafka.security.protocol" -> "SASL_PLAINTEXT")
kafkaTopic = "test"
var df = spark.emptyDataFrame
df = spark.read
.format("kafka")
.options(options)
.option("subscribe", kafkaTopic)
.load()
Exemple de connexion et publication pour Python​
from kafka import KafkaProducer
from kafka.errors import KafkaError
producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
client_id=str(conf['kafka'][conf_type]['client_id']),
acks='all',
sasl_plain_username=conf['kafka'][conf_type]['user'],
sasl_plain_password=conf['kafka'][conf_type]['password'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='SCRAM-SHA-512')
Souscription aux messages​
Le chapitre suivant se focalise sur la souscription des messages. Il présente les propriétés pour lesquelles une attention particulière est requise au moment du développement.
key.deserializer & value.deserializer​
Cf. chapitre ci-dessous sur les standards associés aux formats des messages transitants.
bootstrap.servers​
Doit être propagé comme paramètre.
group.id​
Doit être propagé comme paramètre.
Le fonctionnement de kafka est le suivant :
- 2 consommateurs abonnés au même topic fixant une même chaine de caractère dans la propriété 'groupid' se partagera la charge (un message consommé par l'un ne le sera pas par l'autre). Il est d'usage de considérer par défaut que toute réplicas d'un module en cours d'exécution au sein d'une même application instanciée aura le même group id.
- 2 consommateurs abonnés au même topic fixant une chaine de caractère différente dans la propriété 'groupid' sera amené à individuellement consommer l'ensembles des messages. Il est d'usage de considérer par défaut que tous modules différents ou modules identiques en cours d'execution sur différentes applications instancies auront des group ID différents.
Les précisions ci-dessus imposent de ne pas fixer "en dur" un même group id pour que ces problématiques puissent être configurées au moment de l'instanciation d'une application.
Exemple de connexion et réception de messages pour java​
public class SimpleKafkaConsumer {
// A Exposer en dehors du module pour ajustement personalisé sur la plateforme
private final static String TOPIC = "my-example-topic";
private final static String BROCKERS = "kafka.artemis.sdk:9092";
private final static String USER = "my_kafka_user";
private final static String PASSWORD = "my_kafka_password";
private final static String GROUPID = "module_name";
private final static String SECURITY_PROTOCOL = "SASL_PLAINTEXT";
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic.
// 'assign' could be used here
// instead of 'subscribe' to subscribe to specific partition.
consumer.subscribe(Arrays.asList(TOPIC));
processRecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", BROCKERS);
props.put("group.id", GROUPID);
props.put("security.protocol", SECURITY_PROTOCOL);
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", org.apache.kafka.common.security.scram.ScramLoginModule.class.getName() + " required username=" + USER + " password=" + PASSWORD + ";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("auto.offset.reset", "earliest");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) throws {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),
record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
consumer.commitAsync();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the record.
Thread.sleep(20);
}
}
Exemple de connexion et réception de messages pour python​
consumer = KafkaConsumer(
conf['kafka'][conf_type]['topic'],
bootstrap_servers=bootstrap_servers,
group_id=str(conf['kafka'][conf_type]['group_id']),
auto_offset_reset='earliest',
consumer_timeout_ms=conf['kafka'][conf_type]['timeout'],
enable_auto_commit=True,
value_deserializer=lambda x: x.decode('utf-8'),
sasl_plain_username=conf['kafka'][conf_type]['user'],
sasl_plain_password=conf['kafka'][conf_type]['password'],
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='SCRAM-SHA-512'
)
Format de la donnée​
La sérialisation et la dé-sérialisation des messages doivent être réalisées dans les formats suivants:
- Les types simples communs standard Kafka (org.apache.kafka.common.serialization)
- byte[]
- ByteBuffer
- Double
- Integer
- Long
- String
Par convention, l'utilisation du type String se limitera à des chaines simples contenant une information non structurée.
- Les types complexes ci-dessous doivent s'appuyer sur des schémas :
- json
- avro
- protobuf
- XML
Pour l'ensemble des types complexes, il est recommandé la mise à disposition des schémas permettant à un producteur ou un consommateur de valider la structure du message. Le schéma est mis à disposition dans le schema de stockage.
Pour permettre à tous nouveaux cas d'usages de consulter ou publier de l'information sur l'espace d'échange, il est demandé de renseigner le document "Utilisation de l'espace de stockage" dans le schéma de stockage.
Restrictions applicables aux scripts d'installation​
La création d'un topic KAFKA permanent n'est pas autorisée depuis un cas d'usage. Il s'agit d'une action à réaliser manuellement avant la mise en service du Cas d'Usage (comme tout espace de stockage).
La mise en place d'un nouveau topic sur le serveur Kafka impose une création manuelle préalable. La fonctionnalité de création dynamique d'un topic lors de la publication d'un message lorsque ce dernier n'existe pas est désactivée.
Les éléments permettant la mise en place et configuration des topics sont attendus dans la description des espaces de stockage.
Les scripts et documents contenus dans le schéma de stockage peuvent guider l'administrateur à réaliser les gestes qui peut toutefois ajuster ces gestes aux spécificités du cas d'usage et à sa connaissance de la plateforme. Ci-dessous la liste des éléments pour lesquels les cas d'usage ont plus ou moins d'espace pour accompagner aux choix de configuration.
Les scripts et documents contenus dans le schéma stockage peuvent guider l’administrateur à réaliser les gestes. L’administrateur peut toutefois ajuster ces gestes aux spécificités du cas d’usage et à sa connaissance de la plateforme.
Nombre de réplicas associés à un topic​
Le choix du nombre de réplicas est à la main de l'administrateur Système ou de Bases de données en fonction de la durabilité souhaitée sur la donnée et les garanties en cas de perte.
Par défaut, il est demandé de livrer les scripts de mise en place avec la valeur 3.
Nombre de partitions associées à un topic​
Le choix du nombre de partition est à la main du cas d'usage en fonction de la performance souhaitée. Dans le cas où tous les consommateurs d'un topic ne sont pas maitrisés par un même cas d'usage, l'administrateur système peut réajuster la valeur proposée dans un souci d'amélioration continue des performances.
Eléments de configuration standard​
Les éléments de configuration standards sont les suivants. Leur description est disponible dans la documentation standard de Kafka.
Concernant les scripts associés aux espaces de stockage, il est demandé aux équipes de développement de fixer par défaut le type de compression (compression.type) à uncompressed (au lieu de producer) afin de laisser la possibilité à d'autres modules d'applications tierces de s'abonner au message sans risque. En cas de modification, il est nécessaire de contacter votre interlocuteur administrateur de base de données.
Concernant les propriétés suivantes, le développeur est amené à se positionner:
- En priorité:
- max.message.bytes: Valeur prise en compte par défaut: 1000012
- retention.bytes: Valeur prise en compte par défaut: -1
- retention.ms: Valeur prise en compte par défaut: 604800000
- preallocate: Valeur prise en compte par défaut: false
- Pour des besoins avancés:
- delete.retention.ms: Valeur prise en compte par défaut: 86400000
- file.delete.delay.ms: Valeur prise en compte par défaut: 60000
- message.timestamp.difference.max.ms: Valeur prise en compte par défaut: 9223372036854775807
- message.timestamp.type: Valeur prise en compte par défaut: CreateTime
- segment.bytes: Valeur prise en compte par défaut: 604800000
- segment.index.bytes: Valeur prise en compte par défaut: 10485760
- segment.ms: Valeur prise en compte par défaut: 604800000
Les autres propriétés ne sont pas à fournir nécessairement car elles seront valorisées par l'administrateur de bases de données sauf besoin particulier.
Exemple​
Le packaging du code source est décrit au début de ce document : il précise les documents à fournir pour les projets Git de type Espace de Stockage.
Les 2 lignes de commande sont un exemple de ce que l'équipe de développement d'un cas d'usage peut proposer.
> kafka-topics.sh --zookeeper ${pf.kafka.zookeeper} --create --topic ${pf.kafka.topic} --replication-factor 3 --partitions 3
> kafka-configs.sh --zookeeper ${pf.kafka.zookeeper} --entity-type topics --entity-name ${pf.kafka.topic} --alter --add-config compression.type=uncompressed, max.message.bytes=150000
Accompagnement sur les garanties de livraison​
Au plus une livraison​
Du côté du producteur, il est nécessaire configurer de la manière suivante:
- Fixer: "enable.auto.commit" Ă "true".
- Fixer: "auto.commit.interval.ms" Ă la valeur minimale possible.
Il s'agit de la configuration par défaut du consommateur. Toutefois, il faut s'assurer de ne pas faire appel à l'instruction suivante: consumer.commitSync();.
Avec cette configuration, le consommateur recevra son acquittement automatiquement si l'intervalle spécifié n'est pas fourni.
Exemple:
package artemis;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class AtMostOnceConsumer {
// A Exposer en dehors du module pour ajustement personalisé sur la plateforme
private final static String BROCKERS = "kk00-1.technique.artemis:9092,kk00-2.technique.artemis:9092,kk00-3.technique.artemis:9092";
private final static String USER = "my_kafka_user";
private final static String PASSWORD = "my_kafka_password";
private final static String GROUPID = "module_name";
//private final static String SECURITY_PROTOCOL = "SASL_PLAINTEXT";
private final static String TOPIC = "my-example-topic";
// Enregistrement de l'offset de lecture du topic
private static Map<String, Long> msiOffSet = new HashMap<String, Long>();
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AtMostOnceConsumer ...");
new AtMostOnceConsumer().execute();
}
private void execute() throws InterruptedException {
// initialiser l'offSet de lecture
if (msiOffSet.get(TOPIC) == null) {
msiOffSet.put(TOPIC, 0L);
}
Date debut = new Date();
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic.
// 'assign' could be used here instead of 'subscribe'
// to subscribe to specific partition.
consumer.subscribe(Arrays.asList(TOPIC));
processRecords(consumer);
Date fin = new Date();
System.out.println("Temps de consommation : "+(fin.getTime()-debut.getTime())+" ms");
}
private KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROCKERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
props.put("security.protocol", SECURITY_PROTOCOL);
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", org.apache.kafka.common.security.scram.ScramLoginModule.class.getName() + " required username=" + USER + " password=" + PASSWORD + ";");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "999999999");
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "135");
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6001");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<>(props);
}
private void processRecords(KafkaConsumer<String, String> consumer)
{
boolean loop=true;
while (loop)
{
// Initialiser le réceptacle du message
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(120));
for (ConsumerRecord<String, String> record : records)
{
if (msiOffSet.get(TOPIC) < record.offset())
{
if (record.key().equals("1") || record.key().equals("2")|| record.key().equals("3") || record.key().equals("4"))
{
if (record.value() != null)
{
System.out.println("Lecture de "+record.value());
// Mettre Ă jour le suivi de l'offSet
msiOffSet.replace(TOPIC, record.offset());
// Sortir de la boucle
if (record.key().equals("4")) loop = false;
}
}
}
}
process();
consumer.commitSync();
}
}
private void process() {
// create some delay to simulate processing of the message.
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// Auto-generated catch block
e.printStackTrace();
}
}
}
A minima une livraison​
Pour configurer ce type de consommateur, il faut soit:
- définir: "enable.auto.commit" à "false"
- définir: "enable.auto.commit" à "true" avec "auto.commit.interval.ms" à une valeur importante.
Le consommateur devra prendre le contrĂ´le de l'offset transmis Ă Kafka en faisant l'appel suivant: consumer.commitSync();
public class AtLeastOnceConsumer {
// A Exposer en dehors du module pour ajustement personalisé sur la plateforme
private final static String TOPIC = "my-example-topic";
private final static String BROCKERS = "kafka.artemis.sdk:9092";
private final static String USER = "my_kafka_user";
private final static String PASSWORD = "my_kafka_password";
private final static String GROUPID = "module_name";
private final static String SECURITY_PROTOCOL = "SASL_PLAINTEXT";
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");
execute();
}
private static void execute() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Subscribe to all partition in that topic.
// 'assign' could be used here
// instead of 'subscribe' to subscribe to specific partition.
consumer.subscribe(Arrays.asList(TOPIC));
processRecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", BROCKERS);
props.put("group.id", GROUPID);
props.put("security.protocol", SECURITY_PROTOCOL);
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("sasl.jaas.config", org.apache.kafka.common.security.scram.ScramLoginModule.class.getName() + " required username=" + USER + " password=" + PASSWORD + ";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// Set this property, if auto commit should happen.
props.put("enable.auto.commit", "true");
// Make Auto commit interval to a big number
// so that auto commit does not happen,
// we are going to control the offset commit
// via consumer.commitSync(); after processing
// message.
props.put("auto.commit.interval.ms", "999999999999");
// This is how to control
// the number of messages being read in each poll
props.put("max.partition.fetch.bytes", "135");
props.put("heartbeat.interval.ms", "3000");
props.put("session.timeout.ms", "6001");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) throws {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : records) {
System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
System.out.println("lastOffset read: " + lastOffset);
process();
// Below call is important to control the offset commit.
// Do this call after you
// finish processing the business process.
consumer.commitSync();
}
}
private static void process() throws InterruptedException {
// create some delay to simulate processing of the record.
Thread.sleep(20);
}
}
Exactement une livraison​
Cet exemple démontre un scenario "exactement 1 fois". Dans cet exemple, un consommateur s'enregistre avec Kafka au travers d'un appel d'enregistrement au travers de la commande "subscribe"
Dans ce scenario, l'offset devra être géré manuellement. Les étapes sont présentées ci-dessous:
- Fixer "enable.auto.commit" Ă "false"
- Ne pas faire d'appel Ă
consumer.commitSync();une fois le message consommé. - Implémenter un "ConsumerRebalanceListener" et dans le "Listener" réaliser un
consumer.seek(topicPartition,offset);afin de démarrer la lecture d'un offset spécifique du topic ou de la partition. - Tandis que les messages sont en cours de traitement, conserver l'offset de chaque message. Entreposer l'offset du message de manière atomique avec le message au travers d'une transaction atomique. Quand la donnée est entreposée dans une base de données relationnelle, l'atomicité est simple à implémenter; Pour des bases non relationnelles (HDFS / Big Data), une manière de procéder est la suivante: entreposer l'offset du message ensemble avec le message.
- Implémenter l'idempotence comme filet de sécurité.
public class ExactlyOnceDynamicConsumer {
private static OffsetManager offsetManager = new OffsetManager("storage2");
public static void main(String[] str) throws InterruptedException {
System.out.println("Starting ExactlyOnceDynamicConsumer ...");
readMessages();
}
private static void readMessages() throws InterruptedException {
KafkaConsumer<String, String> consumer = createConsumer();
// Manually controlling offset but register consumer
// to topics to get dynamically assigned partitions.
// Inside MyConsumerRebalancerListener use
// consumer.seek(topicPartition,offset)
// to control offset which messages to be read.
consumer.subscribe(Arrays.asList("normal-topic"),
new MyConsumerRebalancerListener(consumer));
processRecords(consumer);
}
private static KafkaConsumer<String, String> createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
String consumeGroup = "cg3";
props.put("group.id", consumeGroup);
// Below is a key setting to turn off the auto commit.
props.put("enable.auto.commit", "false");
props.put("heartbeat.interval.ms", "2000");
props.put("session.timeout.ms", "6001");
// Control maximum data on each poll,
// make sure this value is bigger than the maximum
// single message size
props.put("max.partition.fetch.bytes", "140");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
private static void processRecords(KafkaConsumer<String, String> consumer) {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
// Save processed offset in external storage.
offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(), record.offset());
}
}
}
}
public class MyConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
private OffsetManager offsetManager = new OffsetManager("storage2");
private Consumer<String, String> consumer;
public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
this.consumer = consumer;
}
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(), consumer.position(partition));
}
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions) {
consumer.seek(partition, offsetManager.readOffsetFromExternalStore(partition.topic(), partition.partition()));
}
}
}
/**
* The partition offset are stored in an external storage.
* In this case in a local file system where program runs.
*/
public class OffsetManager {
private String storagePrefix;
public OffsetManager(String storagePrefix) {
this.storagePrefix = storagePrefix;
}
/**
* Overwrite the offset for the topic in an external storage.
*
* @param topic - Topic name.
* @param partition - Partition of the topic.
* @param offset - offset to be stored.
*/
void saveOffsetInExternalStore(String topic, int partition, long offset) {
try {
FileWriter writer = new FileWriter(storageName(topic, partition), false);
BufferedWriter bufferedWriter = new BufferedWriter(writer);
bufferedWriter.write(offset + "");
bufferedWriter.flush();
bufferedWriter.close();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* @return he last offset + 1 for the provided topic and partition.
*/
long readOffsetFromExternalStore(String topic, int partition) {
try {
Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
private String storageName(String topic, int partition) {
return storagePrefix + "-" + topic + "-" + partition;
}
}