Spark to consume Kafka Stream
A simple PySpark example to show how to consume Kafka stream (given Kafka tutorial).
Run Kafka
The Kafka application creates the stream created using Confluent Developer tutorial. I have slightly changed the docker-compose.yaml to work using an external host instead of localhost.
---
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.1
hostname: zookeeper
container_name: zookeeper
ports:
- '2181:2181'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.0.1
hostname: broker
container_name: broker
ports:
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://192.168.1.197:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TOOLS_LOG4J_LOGLEVEL: ERROR
to start Kafka
docker-compose up -d
Verify Kafka
Selling to the Kafka
docker-compose exec Kafka bash
Kafka shift with Kafka-topics
script filee. You can use the kafka-topics
script to create, list or describe topics.
Create topic
kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic mytopic \
--partitions 4
list the topics
Kafka-topics --bootstrap-server localhost:9092 --list
Producer
Use the Kafka-console-producer
script to produce sample data.
kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic mytopic \
--property 'key.separator=|' \
--property 'parse.key=true'
In the prompt enter the data as follows key | {...}
:
1|{"id": 1, "name": "Mike"}
2|{"id": 2, "name": "Rand"}
If you want input from the file, you have to add the new property instead of the above two console data: --property 'key.separator=|' < /path/to/data/inputs.txt
.
Consumer
Press Ctrl+c to logout from the bootstrap-server, and verify is in Kafka using the following command:
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic mytopic \
--from-beginning \
--property print.key=true
Configuration
Under the configuration
folder, you can find the dev.properties
file:
application.id=kafka-streams-101
bootstrap.servers=192.168.1.197:29092
input.topic.name=random-strings
output.topic.name=tall-random-strings
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
I’ve modified the second line to have a remote host. After that, as explained in the Kafka tutorial, you can use the same producer:
java -jar build/libs/creating-first-apache-Kafka-streams-application-*.jar configuration/dev.properties
The consumer is written in spark as follows.
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
import findspark
findspark.init()
spark = SparkSession.builder \
.appName("Spark consume Kafka") \
.master("local[*]") \
.config("spark.steaming.stopGracefullyOnShutdown", "true") \
.config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0")\
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')
read the Kafka stream to DataFrame
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","192.168.1.197:29092") \
.option("subscribe", "random-strings") \
.option("startingOffsets", "earliest") \
.load()
The DataFrame created from Kafka is always similar to this:
kafka_df.printSchema()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
The binary type of the value
needs to be converted to the string type before printing to the console.
from pyspark.sql.functions import col
from pyspark.sql.types import StringType
value_df = kafka_df.select(col('value').cast("string").alias("mystring"))
Write the contents to the console:
writer_query = value_df.writeStream \
.format("console") \
.outputMode("append") \
.option("checkpointLocation", "chk-point-dir") \
.trigger(processingTime="10 second") \
.start().awaitTermination(1)
As shown in the above code, the output mode is append
, and the trigger time is 10 seconds.
Reference: