During this hands-on session, we will explore Kafka Streams. We will set up a cluster of Kafka brokers using Docker. Moreover, we will interact with this cluster using our machine. To set-up your environment, you should download Kafka (kafka_2.13-3.2.0.tgz) on your machine (e.g., in ~/kafka). Moreover, to run the kafka scripts, we find convenient to set-up a new environment variable. To this end, we should edit the file ~/.profile and add the following line that includes the path where we downloaded and extracted kafka_2.13-3.2.0.tgz: export KAFKA_HOME=~/kafka_2.13-3.2.0 after having reloaded the .profile file (e.g., by running 'source ~/.profile' or by rebooting your machine), we can use the kafka scripts as follows: $KAFKA_HOME/bin/kafka-topics.sh ==== ==== ==== #1 KAFKA STREAMS: A simple, stateless transformation on a stream We want to demostrate how to perform simple, state-less transformations via map functions. A kafka stream application reads from a kafka topic, processes data, and emits results in a new topic. In our case, we will read from the "TextLinesTopic" topic and we will emit results in the "UppercasedTextLinesTopic" topic. 1. Create the input and output topics used by this example. $KAFKA_HOME/bin/kafka-topics.sh --create --topic TextLinesTopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 $KAFKA_HOME/bin/kafka-topics.sh --create --topic UppercasedTextLinesTopic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 (alternatively, we can use the script: dist/1_mapfunction/1_createTopics.sh) 2. From our IDE, we can start the application, by running the Java class stream.MapFunctionLambdaExample. It is very important that topics have been created before starting the application, otherwise the application will soon terminate. Key points: - final KStream textLines = builder.stream("TextLinesTopic", Consumed.with(Serdes.ByteArray(), Serdes.String())); A Kafka streams is created using the StreamBuilder, that requires to specify a Serialize/Deserialize class (Serdes). We use the ones provided by the Kafka library. - we transform each elements of the stream using a "mapValues" transformation, that accepts a lambda function - we save data using the "to()" operation, specifying the sink topic. - after the stream definition, we can create and start it using the primitives: final KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.cleanUp(); streams.start(); Note that '.cleanUp()' performs a clean up of the local StateStore directory (StreamsConfig.STATE_DIR_CONFIG) by deleting all data with regard to the application ID. 3. Now that our application is ready, we should create a consumer that reads the data emitted by our application, i.e., a consumer of the UppercasedTextLinesTopic topic. To create a consumer that prints results on console, we can use the following command (or use the script: dist/1_mapfunction/2_startConsumer.sh) $KAFKA_HOME/bin/kafka-console-consumer.sh --topic UppercasedTextLinesTopic --from-beginning --bootstrap-server localhost:9092 4. Now, we are ready to create a producer that will send data to the TextLinesTopic. We can run the command (or use the script: dist/1_mapfunction/3_startProducer.sh) $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TextLinesTopic 5. By writing on the producer console, we can see that the consumer receives text in upper case. 6. To conclude, we can delete the topics using the script: dist/1_mapfunction/4_deleteTopics.sh ==== ==== ==== #3 KStreams and KTables This example aims to show the KTable concept (see theory for further details). A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an "UPDATE" of the last value for the same record key, if any (if a corresponding key doesn't exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten. Also, null values are interpreted in a special way: a record with a null value represents a "DELETE" or tombstone for the record's key. We often talk about "stream-table duality" to describe the close relationship between streams and tables. A stream can be considered a changelog of a table: aggregating data records in a stream will return a table. A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream's data records are key-value pairs). read more: https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_duality In this example, we will create an application that count all odds number received on a data stream. In Kafka streams, aggregations (i.e., count(), reduce(), aggregate()) lead to the creation a KTable. A KTable, stores the most updated value for a specific key. 1. We create the input and output topics used by this example (dist/2_sum/1_createTopics.sh): $KAFKA_HOME/bin/kafka-topics.sh --create --topic numbers-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 $KAFKA_HOME/bin/kafka-topics.sh --create --topic sum-of-odd-numbers-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 2. From our IDE, we can start the application, by running the Java class stream.SumLambdaExample. Key points: - the result of the reduce operation is a KTable final KTable sumOfOddNumbers = input .filter((k, v) -> v % 2 != 0) .selectKey((k, v) -> "overallSum") .groupByKey() .reduce((v1, v2) -> v1 + v2); - we use the selectKey() api, to set a new key for each input record - we count elements using the reduce() operation 3. Now we can create a consumer that reads the data emitted by our application. To create a consumer that prints results on console, we can use the following command (or use the script: dist/2_sum/2_startConsumer.sh) $KAFKA_HOME/bin/kafka-console-consumer.sh --topic sum-of-odd-numbers-topic --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer 4. Now, we are ready to create a producer. This time, we will create a Java application that acts as producer. From our IDE, we can start the producer application, by running the Java class stream.SumLambdaExampleProducer. NOTE: the producer should terminates after few seconds. 5. When the producer terminates (after a while - 10s), we can see that the consumer receives the results of the sum application: overallSum 2500 6. If we run the producer multi times, we will received the updated value, e.g.: overallSum 5000 overallSum 7500 Conceptually, streams and tables in kafka streams are deeply interleaved. New data on the streams carry the KTable updates. If we compare this behavior with the one obtained by other frameworks, we can consider the KTable as a result of stateful transformation on a stream of data. 7. To conclude, we can delete the topics using the script: dist/2_sum/4_deleteTopics.sh ==== ==== ==== #4 A stateful transformation on a stream: WordCount In this example, we want to implement the "hello-world" of data streaming applications: the word count. 1. We create the input and output topics used by this example (dist/3_wordcount/1_createTopics.sh): $KAFKA_HOME/bin/kafka-topics.sh --create --topic streams-plaintext-input --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 $KAFKA_HOME/bin/kafka-topics.sh --create --topic streams-wordcount-output --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 2. From our IDE, we can start the application, by running the Java class stream.WordCount. Key points: - The application definition is very similar to the one by other frameworks: final KTable wordCounts = textLines .flatMapValues( value -> Arrays.asList (pattern.split(value.toLowerCase()))) .groupBy((key, word) -> word) .count(); We run a flatMap, a groupBy and count. - We specify the serdes for produced data pairs (key, value). 3. Now we can create a consumer that reads the data emitted by our application. To create a consumer that prints results on console, we can use the following command (or use the script: dist/3_wordcount/2_startConsumer.sh) $KAFKA_HOME/bin/kafka-console-consumer.sh --topic streams-wordcount-output --from-beginning --bootstrap-server localhost:9092 --property print.key=true --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer 4. Now, we are ready to create a producer that will send data to the TextLinesTopic. We can run the command (or use the script: dist/3_wordcount/3_startProducer.sh) $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input 5. By writing on the producer console, we can see that the consumer receives the results of the wordcount application: hello 1 world 1 this 1 is 1 the 1 sabd 1 course 1 hello 2 world 2 Key point: Note that new results are emitted as soon as new data arrive. For each new data, the updated count is emitted. 6. To conclude, we can delete the topics using the script: dist/3_wordcount/4_deleteTopics.sh