Scala - AWS EMR Serverless
AWS EMR Serverless is a cost effective AWS Service to which you can submit Spark Scala jobs.
Scala Job
The buid.sbt:
name := "Deequ-Project"
version := "1.0"
scalaVersion := "2.12.10"
libraryDependencies ++= Seq(
("org.apache.spark" %% "spark-sql" % "3.2.0" % "provided")
)
libraryDependencies ++= Seq(
("com.amazon.deequ" % "deequ" % "2.0.1-spark-3.2").
exclude("commons-logging", "commons-logging").
exclude("org.apache.spark", "spark-kvstore_2.12").
exclude("org.apache.spark", "spark-core_2.12").
exclude("org.apache.spark", "spark-launcher_2.12").
exclude("org.apache.spark", "spark-network-common_2.12").
exclude("org.apache.spark", "spark-sql_2.12").
exclude("org.spark-project.spark", "unused").
exclude("org.apache.spark", "spark-unsafe_2.12")
)
libraryDependencies += "software.amazon.awssdk" % "s3" % "2.18.27"
libraryDependencies += "software.amazon.awssdk" % "aws-crt-client" % "2.18.28-PREVIEW"
excludeDependencies ++= Seq(
ExclusionRule("software.amazon.awssdk", "netty-nio-client"),
)
In this scala file you are using Deequ to find the suggestions for the data set.
package com.github.ojitha.emr
import org.apache.spark.sql.SparkSession
import com.amazon.deequ.suggestions.{ConstraintSuggestionRunner, Rules}
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.s3.model.ListObjectsRequest
import software.amazon.awssdk.services.s3.model.ListObjectsResponse
import software.amazon.awssdk.services.s3.model.S3Exception
import software.amazon.awssdk.services.s3.model.S3Object
import software.amazon.awssdk.http.SdkHttpClient
import software.amazon.awssdk.http.apache.ApacheHttpClient
import software.amazon.awssdk.services.s3.model.PutObjectRequest
import software.amazon.awssdk.core.sync.RequestBody
import java.io.PrintWriter
import java.nio.file.{Files, Paths}
import java.io.File
import java.util.HashMap
import java.util.Map
import java.time.Duration
object DeequJob {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("EMR Serverless Application").getOrCreate()
// create S3 client
val region = Region.AP_SOUTHEAST_2
val apacheHttpClient = ApacheHttpClient.builder()
.maxConnections(100)
.tcpKeepAlive(true)
.build()
val s3 = S3Client.builder()
.region(region)
.httpClient(apacheHttpClient)
.build();
val df = spark.read.load("s3a://<source data location in the s3 bucket>/")
val suggestionResult = ConstraintSuggestionRunner().onData(df).addConstraintRules(Rules.DEFAULT).run()
// PrintWriter create local file
val local_file="local.txt"
val pw = new PrintWriter(new File(local_file)) // open the local file
suggestionResult.constraintSuggestions.foreach { case (column, suggestions) =>
suggestions.foreach { suggestion =>
pw.write(s"Constraint suggestion for '$column':\t${suggestion.description}\n" +
s"The corresponding scala code is ${suggestion.codeForConstraint}\n")
}
}
pw.close // close the local file
// load local file to byte array
val byteArray = Files.readAllBytes(Paths.get(local_file))
// upload to S3
val metadata = new HashMap[String, String]()
metadata.put("x-amz-meta-val", "suggestions")
val putOb = PutObjectRequest.builder()
.bucket("<output bucket location in S3>")
.key("output.txt")
.metadata(metadata)
.build()
val response = s3.putObject(putOb, RequestBody.fromBytes(byteArray));
s3.close() // close S3 client
// stop spark
spark.stop()
}
}
You have to create the assembly using the following command
sbt clean assembly
After that you have to copy this to S3 bucket to be accessed by the EMR Serverless application
aws s3 cp ./target/scala-2.12/Deequ-Project-assembly-1.0.jar s3://<S3-scala-job-bucket>/emr/jobs/spark/
Now your job is ready to submit!
Create EMR serverless trust role
aws iam create-role --role-name EMRServerlessS3RuntimeRole --assume-role-policy-document file://emr-serverless-trust-policy.json
The policy is
{
"Version": "2012-10-17",
"Statement": [{
"Sid": "EMRServerlessTrustPolicy",
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "emr-serverless.amazonaws.com"
}
}]
}
Next you have to create EMR serverless application and submit the job.
EMR Serverless Application
I’ve created Boto3 based Python 3 script to create EMR serverless application. Above Scala job can be submitted to this EMR Serverless application.
AWS Client for EMR Serverless service
import boto3
client = boto3.client("emr-serverless")
Create EMR Serverless application
response = client.create_application(
name="table-analysis", releaseLabel="emr-6.6.0", type="SPARK"
)
print(
"Created application {name} with application id {applicationId}. Arn: {arn}".format_map(
response
)
)
app_id="{applicationId}".format_map(response)
Start the EMR Serverless application:
client.start_application(applicationId=app_id)
Submit the job
# Note that application must be in `STARTED` state.
response = client.start_job_run(
applicationId=app_id,
executionRoleArn="< arn of the EMRServerlessS3RuntimeRole role>",
jobDriver={
"sparkSubmit": {
"entryPoint": "s3://<S3-scala-job-bucket>/emr/jobs/spark/Deequ-Project-assembly-1.0.jar",
"entryPointArguments": [],
"sparkSubmitParameters": "--class com.github.ojitha.emr.DeequJob --conf spark.executor.cores=1 --conf spark.executor.memory=4g --conf spark.driver.cores=1 --conf spark.driver.memory=4g --conf spark.executor.instances=1",
}
}
, configurationOverrides={
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://<S3 bucket for log data>/test/logs"}
}
},
)
job_id="{jobRunId}".format_map(response)
You can submitt any number of jobs to the same application had been created.
You can check the status of the current job
# Get the status of the job
client.get_job_run(applicationId=app_id, jobRunId=job_id)
Stop the application
# stop
client.stop_application(applicationId=app_id)
Delete the application
client.delete_application(applicationId=app_id)
when you delete the application, it will delete all the jobs created for that application.