Skip to content

Commit 7b0e7f9

Browse files
author
shenavaa
committed
1. Library updates to EMR release 6.3.0
2. Enabling Kinesis ServerSide encryption in the Cloudformation Template 3. Downloading Jar from github repository release
1 parent 3c8ce0b commit 7b0e7f9

File tree

6 files changed

+16
-19
lines changed

6 files changed

+16
-19
lines changed

CloudFormation/emrtemplate.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@
138138
"JobFlowRole": {
139139
"Ref": "EMRClusterinstanceProfile"
140140
},
141-
"ReleaseLabel": "emr-5.29.0",
141+
"ReleaseLabel": "emr-6.3.0",
142142
"LogUri": {
143143
"Ref": "LogUri"
144144
},
@@ -550,7 +550,8 @@
550550
"Name": {
551551
"Ref": "StreamName"
552552
},
553-
"ShardCount": 3
553+
"ShardCount": 3,
554+
"StreamEncryption":{ "EncryptionType":"KMS", "KeyId":"alias/aws/kinesis"}
554555
}
555556
},
556557
"EMRClusterServiceRole": {
@@ -715,7 +716,7 @@
715716
" try: \n",
716717
" obj = S3Url(WritableLocation) \n",
717718
" s3 = boto3.resource('s3', config=Config(signature_version='s3v4')) \n",
718-
" step_base64_file='IyEvYmluL2Jhc2gKQVBQTkFNRT0kMQpTVFJFQU1OQU1FPSQyClJFR0lPTj1gY3VybCAtLXNpbGVudCBodHRwOi8vMTY5LjI1NC4xNjkuMjU0L2xhdGVzdC9keW5hbWljL2luc3RhbmNlLWlkZW50aXR5L2RvY3VtZW50IHwganEgLXIgLnJlZ2lvbmAKSVNNQVNURVI9YGNhdCAvZW1yL2luc3RhbmNlLWNvbnRyb2xsZXIvbGliL2luZm8vaW5zdGFuY2UuanNvbiB8IGpxIC5pc01hc3RlcmAKaWYgWyAiJElTTUFTVEVSIj09InRydWUiIF07IHRoZW4gCiBzdWRvIHl1bSAteSBpbnN0YWxsIGdpdAogd2dldCAiaHR0cHM6Ly9naXRodWIuY29tL3NidC9zYnQvcmVsZWFzZXMvZG93bmxvYWQvdjEuMy44L3NidC0xLjMuOC56aXAiCiBDUD0iL2hvbWUvaGFkb29wIgogU05IPSIkQ1Avc3RyZWFtaW5nLWV4YW1wbGUvIgogdGFyIC14emYgLi9zYnQtMS4zLjgudGd6IC1DICRDUAogZXhwb3J0IFBBVEg9JFBBVEg6JENQL3NidC9iaW4KIEJBU0U9IiRDUC9hd3MtY2xvdWR3YXRjaC1tZXRyaWNzLWN1c3RvbS1zcGFyay1saXN0ZW5lciIKIGdpdCBjbG9uZSAiaHR0cHM6Ly9naXRodWIuY29tL2F3c2xhYnMvYXdzLWNsb3Vkd2F0Y2gtbWV0cmljcy1jdXN0b20tc3BhcmstbGlzdGVuZXIuZ2l0IiAkQkFTRQogbWtkaXIgJFNOSAogQkFTRT0iJENQL2F3cy1jbG91ZHdhdGNoLW1ldHJpY3MtY3VzdG9tLXNwYXJrLWxpc3RlbmVyIgogQ0ZMT0M9IiRCQVNFL0Nsb3VkRm9ybWF0aW9uIgogKCBjZCAkQkFTRSAmJiBzYnQgcGFja2FnZSApIAogY3AgJEJBU0UvdGFyZ2V0L3NjYWxhLTIuMTEvKi5qYXIgJFNOSCAKIGNwICRCQVNFL0Nsb3VkRm9ybWF0aW9uLyouc2ggJFNOSAogY2htb2QgNzU1ICRTTkgvKi5zaApmaQo=' \n",
719+
" step_base64_file='IyEvYmluL2Jhc2gKQVBQTkFNRT0kMQpTVFJFQU1OQU1FPSQyClJFR0lPTj1gY3VybCAtLXNpbGVudCBodHRwOi8vMTY5LjI1NC4xNjkuMjU0L2xhdGVzdC9keW5hbWljL2luc3RhbmNlLWlkZW50aXR5L2RvY3VtZW50IHwganEgLXIgLnJlZ2lvbmAKSVNNQVNURVI9YGNhdCAvZW1yL2luc3RhbmNlLWNvbnRyb2xsZXIvbGliL2luZm8vaW5zdGFuY2UuanNvbiB8IGpxIC5pc01hc3RlcmAKaWYgWyAiJElTTUFTVEVSIj09InRydWUiIF07IHRoZW4gCiBzdWRvIHl1bSAteSBpbnN0YWxsIGdpdAogQ1A9Ii9ob21lL2hhZG9vcCIKIFNOSD0iJENQL3N0cmVhbWluZy1leGFtcGxlLyIKIEJBU0U9IiRDUC9hd3MtY2xvdWR3YXRjaC1tZXRyaWNzLWN1c3RvbS1zcGFyay1saXN0ZW5lciIKIGdpdCBjbG9uZSAiaHR0cHM6Ly9naXRodWIuY29tL2F3c2xhYnMvYXdzLWNsb3Vkd2F0Y2gtbWV0cmljcy1jdXN0b20tc3BhcmstbGlzdGVuZXIuZ2l0IiAkQkFTRQogbWtkaXIgJFNOSAogQkFTRT0iJENQL2F3cy1jbG91ZHdhdGNoLW1ldHJpY3MtY3VzdG9tLXNwYXJrLWxpc3RlbmVyIgogQ0ZMT0M9IiRCQVNFL0Nsb3VkRm9ybWF0aW9uIgogd2dldCAiaHR0cHM6Ly9naXRodWIuY29tL2F3c2xhYnMvYXdzLWNsb3Vkd2F0Y2gtbWV0cmljcy1jdXN0b20tc3BhcmstbGlzdGVuZXIvcmVsZWFzZXMvZG93bmxvYWQvYmV0YS0wLjEvc3BhcmtraW5lc2lzZXhhbXBsZV8yLjExLTEuMC5qYXIiIC1PICRTTkgvc3BhcmtraW5lc2lzZXhhbXBsZV8yLjExLTEuMC5qYXIKIGNwICRCQVNFL0Nsb3VkRm9ybWF0aW9uLyouc2ggJFNOSAogY2htb2QgNzU1ICRTTkgvKi5zaApmaQo=' \n",
719720
" step_file=base64.b64decode(step_base64_file) \n",
720721
" object = s3.Object(obj.bucket(), obj.key() + 'setupStep.sh') \n",
721722
" object.put( Body=step_file) \n",

CloudFormation/setupStep.sh

100644100755
Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,14 @@ REGION=`curl --silent http://169.254.169.254/latest/dynamic/instance-identity/do
55
ISMASTER=`cat /emr/instance-controller/lib/info/instance.json | jq .isMaster`
66
if [ "$ISMASTER"=="true" ]; then
77
sudo yum -y install git
8-
wget "https://github.com/sbt/sbt/releases/download/v1.3.8/sbt-1.3.8.zip"
98
CP="/home/hadoop"
109
SNH="$CP/streaming-example/"
11-
tar -xzf ./sbt-1.3.8.tgz -C $CP
12-
export PATH=$PATH:$CP/sbt/bin
1310
BASE="$CP/aws-cloudwatch-metrics-custom-spark-listener"
1411
git clone "https://github.com/awslabs/aws-cloudwatch-metrics-custom-spark-listener.git" $BASE
1512
mkdir $SNH
1613
BASE="$CP/aws-cloudwatch-metrics-custom-spark-listener"
1714
CFLOC="$BASE/CloudFormation"
18-
( cd $BASE && sbt package )
19-
cp $BASE/target/scala-2.11/*.jar $SNH
15+
wget "https://github.com/awslabs/aws-cloudwatch-metrics-custom-spark-listener/releases/download/beta-0.1/sparkkinesisexample_2.11-1.0.jar" -O $SNH/sparkkinesisexample_2.11-1.0.jar
2016
cp $BASE/CloudFormation/*.sh $SNH
2117
chmod 755 $SNH/*.sh
2218
fi

CloudFormation/streamingStep.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@ APPNAME=$1
44
STREAMNAME=$2
55

66
REGION=`curl --silent http://169.254.169.254/latest/dynamic/instance-identity/document | jq -r .region`
7-
spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.4.4 --class com.amazonaws.awslabs.sparkstreaming.SparkKinesisExample /home/hadoop/streaming-example/sparkkinesisexample_2.11-1.0.jar $APPNAME $STREAMNAME $REGION
7+
spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_2.12:3.1.1 --class com.amazonaws.awslabs.sparkstreaming.SparkKinesisExample /home/hadoop/streaming-example/sparkkinesisexample_2.11-1.0.jar $APPNAME $STREAMNAME $REGION
88

build.sbt

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
// SPDX-License-Identifier: MIT-0
33

44
name := "SparkKinesisExample"
5-
version := "1.0"
6-
scalaVersion := "2.11.12"
7-
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.4.4"
8-
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.659"
9-
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "2.4.4"
10-
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4"
5+
version := "1.1"
6+
scalaVersion := "2.12.10"
7+
libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.1.1"
8+
libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.977"
9+
libraryDependencies += "org.apache.spark" %% "spark-streaming-kinesis-asl" % "3.1.1"
10+
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.1"
1111
libraryDependencies += "log4j" % "log4j" % "1.2.17"
1212
libraryDependencies += "software.amazon.kinesis" % "amazon-kinesis-client" % "2.2.10"

src/main/scala/org/apache/spark/streaming/kinesis/CustomKinesisInputDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ private[kinesis] class CustomKinesisInputDStream[T: ClassTag](
3232
val mLevel: MetricsLevel,
3333
val mEnabledDimensions: Set[String],
3434
val preferredLocation : Option[String]
35-
) extends KinesisInputDStream[T](_ssc,sName,eUrl,rName,iPosition,cAppName,cInterval,sLevel,mHandler,kCreds,dDBCreds,cWatchCreds) {
35+
) extends KinesisInputDStream[T](_ssc,sName,eUrl,rName,iPosition,cAppName,cInterval,sLevel,mHandler,kCreds,dDBCreds,cWatchCreds,mLevel,mEnabledDimensions) {
3636

3737
override def getReceiver(): Receiver[T] = {
3838
new CustomKinesisReceiver(sName, eUrl, rName, iPosition,
@@ -128,12 +128,12 @@ object CustomKinesisInputDStream {
128128
this
129129
}
130130

131-
def metricsLevel(metricsLevel: MetricsLevel): Builder = {
131+
override def metricsLevel(metricsLevel: MetricsLevel): Builder = {
132132
this.metricsLevel = Option(metricsLevel)
133133
this
134134
}
135135

136-
def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = {
136+
override def metricsEnabledDimensions(metricsEnabledDimensions: Set[String]): Builder = {
137137
this.metricsEnabledDimensions = Option(metricsEnabledDimensions)
138138
this
139139
}

src/main/scala/org/apache/spark/streaming/kinesis/CustomKinesisReceiver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private[kinesis] class CustomKinesisReceiver[T](
3535
mLevel: MetricsLevel,
3636
mEnabledDimensions: Set[String],
3737
pLocation: Option[String])
38-
extends KinesisReceiver[T](sName,eUrl,rName,iPosition,cAppName,cInterval,sLevel,mHandler,kCreds,dDBCreds,cWatchCreds) {
38+
extends KinesisReceiver[T](sName,eUrl,rName,iPosition,cAppName,cInterval,sLevel,mHandler,kCreds,dDBCreds,cWatchCreds,mLevel,mEnabledDimensions) {
3939

4040
override def preferredLocation: Option[String] = Some(pLocation.getOrElse("NotMatchingName") )
4141
}

0 commit comments

Comments
 (0)