diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..26d3352
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,3 @@
+# Default ignored files
+/shelf/
+/workspace.xml
diff --git a/.idea/databricks-end-to-end-streaming.iml b/.idea/databricks-end-to-end-streaming.iml
new file mode 100644
index 0000000..d6ebd48
--- /dev/null
+++ b/.idea/databricks-end-to-end-streaming.iml
@@ -0,0 +1,9 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..639900d
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..786e2ce
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..35eb1dd
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/notebooks/confluent/scala/ingest_raw_using_schema-subject.scala b/notebooks/confluent/scala/ingest_raw_using_schema-subject.scala
new file mode 100644
index 0000000..572e8d6
--- /dev/null
+++ b/notebooks/confluent/scala/ingest_raw_using_schema-subject.scala
@@ -0,0 +1,170 @@
+// Databricks notebook source
+// MAGIC %md
+// MAGIC ## Ingesting Data from Confluent Kafka Into Delta
+// MAGIC ### Step 1: Set up the environment
+
+// COMMAND ----------
+
+// MAGIC %md
+// MAGIC * You must have a Confluent cluster, an API Key and secret, a Schema Registry, an API Key and secret for the registry, and a topic
+// MAGIC * Download the kafka_schema_registry_client_5_3_1.jar from here and install it on the cluster: https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/5.3.1/ It must be this specific version and not a higher one due to a Spark dependency on an older version of the Avro serializer. A newer version of the schema registry client will not return the schema from the registry correctly - this error gets thrown when you try and use the schema that is returned: 'java.lang.NoSuchFieldError: FACTORY'
+// MAGIC * Download the kafka-clients-2.6.0.jar from here and install it on the cluster: https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.6.0/ This JAR is required because the version of the schema registry client being used here has a dependency on the org.apache.kafka.common.config.ConfigException class
+// MAGIC * Notebooks must be detached and re-attached before they can see new libraries
+// MAGIC * For production use have the two jars already downloaded and stored in a dbfs location. Then they can be accessible by a cluster init script and can be installed when the cluster spins up
+
+// COMMAND ----------
+
+val confluentClusterName = ""
+val confluentBootstrapServers = ""
+val confluentApiKey = dbutils.secrets.get(scope = "", key = "confluent-key")
+val confluentSecret = dbutils.secrets.get(scope = "", key = "confluent-secret")
+val confluentRegistryApiKey = dbutils.secrets.get(scope = "", key = "confluent-sr-key")
+val confluentRegistrySecret = dbutils.secrets.get(scope = "", key = "confluent-sr-secret")
+val confluentTopicName = ""
+val schemaRegistryUrl = ""
+val kafkaCheckPointPath = s""
+val tableCheckPointPath = s""
+
+val customUrlBasePath = schemaRegistryUrl + "/schemas/ids/"
+val customVersions = "/versions"
+
+// COMMAND ----------
+
+// MAGIC %md
+// MAGIC ### Confluent Schema Registry
+// MAGIC
+// MAGIC
+// MAGIC
+// MAGIC
+
+// COMMAND ----------
+
+// These are the imports for authentication to the schema registry
+import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
+import io.confluent.kafka.schemaregistry.client.rest.RestService
+import scala.jdk.CollectionConverters._
+
+val schemaRegistryRestService = new RestService(schemaRegistryUrl)
+
+val schemaRegistryProperties = Map(
+ "basic.auth.credentials.source" -> "USER_INFO",
+ "schema.registry.basic.auth.user.info" -> "%s:%s".format(confluentRegistryApiKey, confluentRegistrySecret)
+).asJava
+
+val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryRestService, 100, schemaRegistryProperties)
+
+// COMMAND ----------
+
+// MAGIC %md
+// MAGIC ## Streaming the Data from Confluent Kafka
+
+// COMMAND ----------
+
+// MAGIC %md
+// MAGIC ### Set up the Readstream
+
+// COMMAND ----------
+
+import org.apache.spark.sql.functions.udf
+import java.nio.ByteBuffer
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.avro.functions._
+
+// UDF that will decode the magic byte and schema identifier at the front of the Avro data
+// As of this writing the schema registry client provided by Confluent only supports int values for Schema IDs. If there are so many
+// schemas in the schema registry that the Schema ID value is larger than thet max for an int, the behavior of the client is unknown.
+
+val binaryToInt = udf((payload: Array[Byte]) => ByteBuffer.wrap(payload).getInt)
+
+// Set up the Readstream, include the authentication to Confluent Cloud for the Kafka topic.
+// Note the specific kafka.sasl.jaas.config value - on Databricks you have to use kafkashaded.org.apache.kafka.common... for that setting or else it will not find the PlainLoginModule.
+// If the kafka-clients-2.6.0.jar is installed on the cluster than a value of org.apache.kafka.common... will work fine.
+// The below is pulling from only one topic, but can be configured to pull from multiple with a comma-delimited set of topic names in the "subscribe" option
+// The below is also starting from a specific offset in the topic. You can specify both starting and ending offsets. If not specified then "latest" is the default for streaming.
+// The full syntax for the "startingOffsets" and "endingOffsets" options are to specify an offset per topic per partition.
+// Examples:
+// .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") The -2 means "earliest" and -1 means "latest"
+// .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") The -1 means "latest", -2 not allowed for endingOffsets
+
+val confluent_df = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", confluentBootstrapServers)
+ .option("kafka.security.protocol", "SASL_SSL")
+ .option("kafka.sasl.jaas.config", "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='%s';".format(confluentApiKey, confluentSecret))
+ .option("kafka.ssl.endpoint.identification.algorithm", "https")
+ .option("kafka.sasl.mechanism", "PLAIN")
+ .option("subscribe", confluentTopicName)
+ //.option("startingOffsets", "latest")
+ .option("checkpointPath", kafkaCheckPointPath)
+ .option("startingOffsets", """{"%s":{"0":25}}""".format(confluentTopicName))
+ .load()
+ .withColumn("fixedValue", expr("substring(value, 6, length(value)-5)"))
+ .withColumn("valueSchemaId", binaryToInt(expr("substring(value, 2, 4)")))
+ .select("valueSchemaId", "fixedValue")
+
+// COMMAND ----------
+
+//display(confluent_df)
+
+// COMMAND ----------
+
+// MAGIC %md
+// MAGIC ### Write out data from the stream using ForeachBatch
+
+// COMMAND ----------
+
+// Write out the parsed data to a Delta table
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.streaming.Trigger
+
+// This method will read the schema for each micro-batch - more resilient if the schema is expected to change often.
+confluent_df.writeStream
+ .foreachBatch {(df: DataFrame, epoch_id: Long) =>
+ // Cache this since we're going to access it several times
+ val cacheDf = df.cache
+
+ // Set the option for what to do with corrupt data - either stop on the first failure it finds (FAILFAST) or just set corrupt data to null (PERMISSIVE)
+ val fromAvroOptions = new java.util.HashMap[String, String]()
+ fromAvroOptions.put("mode", "PERMISSIVE")
+
+ // Function that will fetch a schema from the schema registry by ID
+ //def getSchema(id: Integer): String = {
+ // return schemaRegistryClient.getById(id).toString
+ //}
+
+ def getSubject(url: String): String = {scala.io.source.fromURL(url).mkString.split(",")(0).split(";")(1).replaceAll("\"","")}
+
+ val distinctValueSchemaIdDF = cacheDf.select("valueSchemaId").distinct()
+ // For each valueSchemaId get the schemas from the schema registry
+ for (valueRow <- distinctValueSchemaIdDF.collect) {
+ // Pull the schema for this schema ID
+ val currentValueSchemaId = sc.broadcast(valueRow.getAs[Int]("valueSchemaId"))
+ //val currentValueSchema = sc.broadcast(getSchema(currentValueSchemaId.value))
+ val customUrl = customUrlBasePath + currentValueSchemaId + customVersions
+ val currentSubject = getSubject(customUrl)
+
+ // Filter the batch to the rows that have this value schema ID
+ val filterValueDF = cacheDf.filter(cacheDf("valueSchemaId") === currentValueSchemaId.value)
+
+ // Parse the Avro data, break out the three columns and write the micro-batch
+ //val parsedDf = filterValueDF.select(from_avro($"fixedValue", currentValueSchema.value,fromAvroOptions).as('parsedValue))
+ val parsedDf = filterValueDF.select(from_avro($"fixedValue", currentSubject,schemaRegistryUrl,fromAvroOptions).as('parsedValue))
+ parsedDf
+ .write
+ .format("delta")
+ .mode("append")
+ .option("mergeSchema", "true")
+ .saveAsTable("raw_events_confluent")
+ }
+ }
+ .trigger(Trigger.Once)
+ .queryName("confluentAvroScalaForeachBatchStream")
+ .option("checkpointLocation", tableCheckPointPath)
+ .start()
+
+// COMMAND ----------
+
+// MAGIC %sql
+// MAGIC
+// MAGIC select * from raw_events_confluent