Spark Streaming Basics
This is a very basic example created to explain Spark streaming. Spark run on the AWS Glue container locally.
There is plenty of information on Spark streaming. My objective is to provide a running example with a very simple approach.
As a first step, create the following docker-compose.yml
version: '3.3'
services:
aws_glue:
container_name: notebook
build: .
volumes:
- .:/home/glue_user/workspace/jupyter_workspace
privileged: true
# network_mode : host
ports:
- 8888:8888
- 4040:4040
environment:
JUPYTER_ENABLE_LAB: "yes"
DISABLE_SSL: "true"
networks:
default:
external:
name: ojnw-1
To generate text, I use the Netcat application, which is well-known in Unix. As shown in line #9, if you uncomment the line, you should have Netcat in your local machine and comment the lines 17 to 20.
The Dockerfile:
FROM public.ecr.aws/glue/aws-glue-libs:glue_libs_3.0.0_image_01
WORKDIR /home/glue_user/workspace/jupyter_workspace
ENV DISABLE_SSL=true
RUN python3 -m pip install --upgrade pip
RUN pip3 install pyathena
RUN pip3 install awswrangler
RUN pip3 install pydeequ
RUN pip3 install ipython-sql
RUN pip3 install pg8000
CMD [ "./start.sh" ]
To install Netcat in the Docker container:
docker exec -it --user root notebook bash
# run the following to install Netcat
yum install netcat
Now login as the Glue user:
docker exec -it notebook bash
# start the Netcat
nc -lk 9999
Create a Scala Jupyter notebook in the Glue container.
First, create the Spark session in the notebook:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().
master("local[3]").
appName("Kafka Test").
config ("spark.streaming.stopGracefullyOnShutdown", "true").
config ("spark.sql.shuffle.partitions",3).
getOrCreate()
Create the stream block:
import org.apache.spark.sql.functions._
val lineDF = spark.readStream.
format("socket").
option("host", "localhost").
option("port", "9999").
load()
val outDF = lineDF.
writeStream.
format("text").
option("path","/home/glue_user/workspace/jupyter_workspace/test/mytest.txt").
outputMode("append").
option("checkpointLocation", "chk-point-dir").
queryName("out_query").
start()
outDF.awaitTermination()
When you type the text into the Netcat, the text will be saved in the file in the folder given in line #11.
You have to interrupt the notebook to stop forcefully.