11/**
2- * The MIT License (MIT)
3- * Copyright (c) 2016 Microsoft Corporation
4- *
5- * Permission is hereby granted, free of charge, to any person obtaining a copy
6- * of this software and associated documentation files (the "Software"), to deal
7- * in the Software without restriction, including without limitation the rights
8- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9- * copies of the Software, and to permit persons to whom the Software is
10- * furnished to do so, subject to the following conditions:
11- *
12- * The above copyright notice and this permission notice shall be included in all
13- * copies or substantial portions of the Software.
14- *
15- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19- * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20- * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21- * SOFTWARE.
22- */
2+ * The MIT License (MIT)
3+ * Copyright (c) 2016 Microsoft Corporation
4+ *
5+ * Permission is hereby granted, free of charge, to any person obtaining a copy
6+ * of this software and associated documentation files (the "Software"), to deal
7+ * in the Software without restriction, including without limitation the rights
8+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+ * copies of the Software, and to permit persons to whom the Software is
10+ * furnished to do so, subject to the following conditions:
11+ *
12+ * The above copyright notice and this permission notice shall be included in all
13+ * copies or substantial portions of the Software.
14+ *
15+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+ * SOFTWARE.
22+ */
2323package org .apache .spark .sql .cosmosdb .util
2424
2525import java .time .temporal .ChronoUnit
@@ -31,8 +31,7 @@ import com.microsoft.azure.cosmosdb.{Document, RequestOptions, ResourceResponse}
3131import com .microsoft .azure .cosmosdb .spark .config .{Config , CosmosDBConfig }
3232import com .microsoft .azure .cosmosdb .spark .schema .CosmosDBRowConverter
3333import com .microsoft .azure .cosmosdb .spark .streaming .CosmosDBWriteStreamRetryPolicy
34- import org .apache .spark .sql .catalyst .InternalRow
35- import org .apache .spark .sql .catalyst .encoders .RowEncoder
34+ import org .apache .spark .sql .catalyst .{CatalystTypeConverters , InternalRow }
3635import org .apache .spark .sql .catalyst .expressions .Attribute
3736import org .apache .spark .sql .types .StructType
3837import org .apache .spark .sql .{DataFrame , SQLContext }
@@ -44,21 +43,19 @@ object StreamingUtils extends Serializable {
4443
4544 def createDataFrameStreaming (df : DataFrame , schema : StructType , sqlContext : SQLContext ): DataFrame = {
4645
47- val enconder = RowEncoder .apply(schema)
48- val mappedRdd = df.rdd.map(row => {
49- enconder.toRow(row)
50- })
46+ val convert = CatalystTypeConverters .createToCatalystConverter(schema)
47+ val mappedRdd = df.rdd.map(convert(_).asInstanceOf [InternalRow ])
5148 sqlContext.internalCreateDataFrame(mappedRdd, schema, isStreaming = true )
5249 }
5350}
5451
5552class StreamingWriteTask extends Serializable with CosmosDBLoggingTrait {
5653
5754 def importStreamingData [D : ClassTag ](
58- iter : Iterator [D ],
59- schemaOutput : Seq [Attribute ],
60- config : Config ,
61- retryPolicy : CosmosDBWriteStreamRetryPolicy ): Unit = {
55+ iter : Iterator [D ],
56+ schemaOutput : Seq [Attribute ],
57+ config : Config ,
58+ retryPolicy : CosmosDBWriteStreamRetryPolicy ): Unit = {
6259
6360 val upsert : Boolean = config
6461 .getOrElse(CosmosDBConfig .Upsert , String .valueOf(CosmosDBConfig .DefaultUpsert ))
@@ -89,4 +86,4 @@ class StreamingWriteTask extends Serializable with CosmosDBLoggingTrait {
8986 val latency = Math .abs(ChronoUnit .MILLIS .between(LocalDateTime .now(), startTime))
9087 logInfo(s " Batch of $count records written with latency $latency milliseconds " )
9188 }
92- }
89+ }
0 commit comments