diff --git a/pom.xml b/pom.xml index c1bd7a9e..be8abd5d 100644 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ limitations under the License. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 com.microsoft.azure - azure-cosmosdb-spark_2.4.0_2.11 + azure-cosmosdb-spark_2.4.0_2.12 jar 3.0.6 ${project.groupId}:${project.artifactId} @@ -32,8 +32,8 @@ limitations under the License. - 2.11.12 - 2.11 + 2.12.11 + 2.12 azure-cosmosdb-spark 3.1.1 2.4.4 diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala index 63f0db55..f58a033b 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala @@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.util.TaskCompletionListener import org.apache.spark.{Partition, TaskContext} import scala.collection.mutable @@ -114,9 +115,11 @@ class CosmosDBRDD( case cosmosDBPartition: CosmosDBPartition => logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") - context.addTaskCompletionListener((ctx: TaskContext) => { - logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") - }) + val completionListener: TaskCompletionListener = new TaskCompletionListener() { + override def onTaskCompletion(context: TaskContext): Unit = + logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}") + } + context.addTaskCompletionListener(completionListener) new CosmosDBRDDIterator( hadoopConfig, diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala index 2edb3ab3..bdd05e84 100644 --- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala +++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala @@ -41,6 +41,7 @@ import com.microsoft.azure.documentdb.internal.HttpConstants.SubStatusCodes import org.apache.commons.lang3.StringUtils import org.apache.spark._ import org.apache.spark.sql.sources.Filter +import org.apache.spark.util.TaskCompletionListener import scala.collection.mutable @@ -424,9 +425,13 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String], }) // Register an on-task-completion callback to close the input stream. - taskContext.addTaskCompletionListener((_: TaskContext) => { - closeIfNeeded() - }) + val taskCompletionListerner = new TaskCompletionListener() { + override def onTaskCompletion(context: TaskContext): Unit = { + closeIfNeeded() + } + } + + taskContext.addTaskCompletionListener(taskCompletionListerner) if (!readingChangeFeed) { queryDocuments