diff --git a/pom.xml b/pom.xml
index c1bd7a9e..be8abd5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,7 +19,7 @@ limitations under the License.
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.microsoft.azure
- azure-cosmosdb-spark_2.4.0_2.11
+ azure-cosmosdb-spark_2.4.0_2.12
jar
3.0.6
${project.groupId}:${project.artifactId}
@@ -32,8 +32,8 @@ limitations under the License.
- 2.11.12
- 2.11
+ 2.12.11
+ 2.12
azure-cosmosdb-spark
3.1.1
2.4.4
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
index 63f0db55..f58a033b 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDD.scala
@@ -31,6 +31,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.util.TaskCompletionListener
import org.apache.spark.{Partition, TaskContext}
import scala.collection.mutable
@@ -114,9 +115,11 @@ class CosmosDBRDD(
case cosmosDBPartition: CosmosDBPartition =>
logInfo(s"CosmosDBRDD:compute: Start CosmosDBRDD compute task for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
- context.addTaskCompletionListener((ctx: TaskContext) => {
- logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
- })
+ val completionListener: TaskCompletionListener = new TaskCompletionListener() {
+ override def onTaskCompletion(context: TaskContext): Unit =
+ logInfo(s"CosmosDBRDD:compute: CosmosDBRDD compute task completed for partition key range id ${cosmosDBPartition.partitionKeyRangeId}")
+ }
+ context.addTaskCompletionListener(completionListener)
new CosmosDBRDDIterator(
hadoopConfig,
diff --git a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
index 2edb3ab3..bdd05e84 100644
--- a/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
+++ b/src/main/scala/com/microsoft/azure/cosmosdb/spark/rdd/CosmosDBRDDIterator.scala
@@ -41,6 +41,7 @@ import com.microsoft.azure.documentdb.internal.HttpConstants.SubStatusCodes
import org.apache.commons.lang3.StringUtils
import org.apache.spark._
import org.apache.spark.sql.sources.Filter
+import org.apache.spark.util.TaskCompletionListener
import scala.collection.mutable
@@ -424,9 +425,13 @@ class CosmosDBRDDIterator(hadoopConfig: mutable.Map[String, String],
})
// Register an on-task-completion callback to close the input stream.
- taskContext.addTaskCompletionListener((_: TaskContext) => {
- closeIfNeeded()
- })
+ val taskCompletionListerner = new TaskCompletionListener() {
+ override def onTaskCompletion(context: TaskContext): Unit = {
+ closeIfNeeded()
+ }
+ }
+
+ taskContext.addTaskCompletionListener(taskCompletionListerner)
if (!readingChangeFeed) {
queryDocuments