From b91af98d9eab98106e9c298a03bb32d0332bcb04 Mon Sep 17 00:00:00 2001 From: revinjchalil Date: Thu, 22 Oct 2020 11:58:53 -0700 Subject: [PATCH] add support for spark 3.0 and scala 2.12 --- CHANGELOG.md | 3 +++ pom.xml | 6 +++--- .../com/microsoft/azure/cosmosdb/spark/Constants.scala | 2 +- .../azure/cosmosdb/spark/rdd/CosmosDBRDD.scala | 9 ++++++--- .../azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala | 10 +++++++--- .../azure/cosmosdb/spark/util/StreamingUtils.scala | 9 +++------ 6 files changed, 23 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 86c61a4c..301541e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +### 3.5.0 +- Adds support for spark 3.0 and scala 2.12 + ### 3.3.4 - Fixes an issue in Streaming preventing docs with MapType to be ingested into Cosmos DB diff --git a/pom.xml b/pom.xml index 54008ddf..81bf5d29 100644 --- a/pom.xml +++ b/pom.xml @@ -32,11 +32,11 @@ limitations under the License. - 2.11.12 - 2.11 + 2.12.12 + 2.12 azure-cosmosdb-spark 3.1.1 - 2.4.4 + 3.0.0 1.7.30 1.2.17 3.2.5 diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala index 183e8c1d..676f1e87 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/Constants.scala @@ -23,6 +23,6 @@ package com.microsoft.azure.cosmosdb.spark object Constants { - val currentVersion = "2.4.0_2.11-3.3.4" + val currentVersion = "3.0.0_2.12-3.5.0" val userAgentSuffix = s" SparkConnector/$currentVersion" } \ No newline at end of file diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala index 63f0db55..0cc129ad 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala @@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.util.TaskCompletionListener import org.apache.spark.{Partition, TaskContext} import scala.collection.mutable @@ -114,9 +115,11 @@ class CosmosDBRDD( case cosmosDBPartition: CosmosDBPartition => logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") - context.addTaskCompletionListener((ctx: TaskContext) => { - logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") - }) + val taskCompletionListener: TaskCompletionListener = new TaskCompletionListener() { + override def onTaskCompletion(context: TaskContext): Unit = + logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") + } + context.addTaskCompletionListener(taskCompletionListener) new CosmosDBRDDIterator( hadoopConfig, diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala index 69ee651a..1cb2981c 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala @@ -41,6 +41,7 @@ import com.microsoft.azure.documentdb.internal.HttpConstants.SubStatusCodes import org.apache.commons.lang3.StringUtils import org.apache.spark._ import org.apache.spark.sql.sources.Filter +import org.apache.spark.util.TaskCompletionListener import scala.collection.mutable import org.joda.time.DateTimeZone @@ -437,9 +438,12 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], }) // Register an on-task-completion callback to close the input stream. - taskContext.addTaskCompletionListener((_: TaskContext) => { - closeIfNeeded() - }) + val taskCompletionListerner = new TaskCompletionListener() { + override def onTaskCompletion(taskContext: TaskContext): Unit = { + closeIfNeeded() + } + } + taskContext.addTaskCompletionListener(taskCompletionListerner) if (!readingChangeFeed) { queryDocuments diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/StreamingUtils.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/StreamingUtils.scala index b81b752f..78821ea6 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/StreamingUtils.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/util/StreamingUtils.scala @@ -31,8 +31,7 @@ import com.microsoft.azure.cosmosdb.{Document, RequestOptions, ResourceResponse} import com.microsoft.azure.cosmosdb.spark.config.{Config, CosmosDBConfig} import com.microsoft.azure.cosmosdb.spark.schema.CosmosDBRowConverter import com.microsoft.azure.cosmosdb.spark.streaming.CosmosDBWriteStreamRetryPolicy -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, SQLContext} @@ -44,10 +43,8 @@ object StreamingUtils extends Serializable { def createDataFrameStreaming(df: DataFrame, schema: StructType, sqlContext: SQLContext): DataFrame = { - val enconder = RowEncoder.apply(schema) - val mappedRdd = df.rdd.map(row => { - enconder.toRow(row) - }) + val convert = CatalystTypeConverters.createToCatalystConverter(schema) + val mappedRdd = df.rdd.map(convert(_).asInstanceOf[InternalRow]) sqlContext.internalCreateDataFrame(mappedRdd, schema, isStreaming = true) } }