diff --git a/.github/workflows/build-release.yml b/.github/workflows/build-release.yml
index 2af5975..9bc1965 100644
--- a/.github/workflows/build-release.yml
+++ b/.github/workflows/build-release.yml
@@ -104,18 +104,17 @@ jobs:
name: FunctionZip
path: ./artifacts
- - name: Deploy Azure Function to Integration Env
- uses: Azure/functions-action@v1.4.6
- with:
- app-name: ${{ secrets.INT_FUNC_NAME }}
- package: ./artifacts/FunctionZip.zip
- publish-profile: ${{ secrets.INT_PUBLISH_PROFILE }}
-
- name: Azure Login
uses: azure/login@v1
with:
creds: ${{ secrets.INT_AZ_CLI_CREDENTIALS }}
-
+
+ - name: Deploy Azure Function to Integration Env
+ uses: Azure/functions-action@v1.5.1
+ with:
+ app-name: ${{ secrets.INT_FUNC_NAME }}
+ package: ./artifacts/FunctionZip.zip
+
- name: Compare and Update App Settings on Deployed Function
uses: azure/CLI@v1
with:
@@ -134,6 +133,13 @@ jobs:
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }}
+ - name: Start Integration ADX Cluster
+ run: source tests/integration/manage-adx-cluster.sh start ${{ secrets.INT_SUBSCRIPTION_ID }} ${{ secrets.ADX_RG_NAME }} ${{ secrets.ADX_CLUSTER_NAME }}
+ env:
+ AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }}
+ AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
+ AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }}
+
- name: Install Python Requirements and Databricks CLI
run: pip install pyapacheatlas==0.12.0 azure-identity databricks-cli
@@ -173,6 +179,13 @@ jobs:
AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }}
+
+ - name: Stop Integration ADX Cluster
+ run: source tests/integration/manage-adx-cluster.sh stop ${{ secrets.INT_SUBSCRIPTION_ID }} ${{ secrets.ADX_RG_NAME }} ${{ secrets.ADX_CLUSTER_NAME }}
+ env:
+ AZURE_CLIENT_ID: ${{ secrets.AZURE_CLIENT_ID }}
+ AZURE_CLIENT_SECRET: ${{ secrets.AZURE_CLIENT_SECRET }}
+ AZURE_TENANT_ID: ${{ secrets.AZURE_TENANT_ID }}
createRelease:
name: Create Release
diff --git a/LIMITATIONS.md b/LIMITATIONS.md
index b146169..4483e8b 100644
--- a/LIMITATIONS.md
+++ b/LIMITATIONS.md
@@ -159,3 +159,7 @@ Starting with OpenLineage 0.18.0 and release 2.3.0 of the solution accelerator,
* Delta Merge statements are not supported at this time
* Delta to Delta is NOT supported at this time
+
+# Unity Catalog
+
+Unity Catalog is not supported due to [OpenLineage not yet supporting Unity Catalog](https://github.com/OpenLineage/OpenLineage/issues/2121). Microsoft Purview supports [Unity Catalog metadata scanning](https://learn.microsoft.com/en-us/purview/register-scan-azure-databricks-unity-catalog) and is the preferred approach to handling Unity Catalog.
diff --git a/README.md b/README.md
index e627e7e..408496b 100644
--- a/README.md
+++ b/README.md
@@ -63,6 +63,7 @@ Gathering lineage data is performed in the following steps:
* Support **column level lineage** for ABFSS, WASBS, and default metastore hive tables (see [Limitations](./LIMITATIONS.md#column-level-mapping-supported-sources) for more detail)
* Once configured, **does not require any code changes to notebooks or jobs**
* Can [add new source support through configuration](./docs/extending-source-support.md)
+* Note: Unity Catalog is not supported. [Unity Catalog metadata scanning](https://learn.microsoft.com/en-us/purview/register-scan-azure-databricks-unity-catalog) is supported in Microsoft Purview and is the preferred way to collect metadata and lineage for Unity enabled Databricks Workspaces.
## Videos
diff --git a/deploy-base.md b/deploy-base.md
index 1fa5b5c..b6bee0b 100644
--- a/deploy-base.md
+++ b/deploy-base.md
@@ -156,6 +156,7 @@ Follow the instructions below and refer to the [OpenLineage Databricks Install I
> If you do not have line feed endings, your cluster will fail to start due to an init script error.
3. Upload the init script and jar to dbfs using the [Databricks CLI](https://docs.microsoft.com/en-us/azure/databricks/dev-tools/cli/)
+ * Alternatively, use the [databricks workspace import --format SOURCE](https://github.com/databricks/cli/blob/main/docs/commands.md#databricks-workspace-import---import-a-workspace-object) command to upload the init script as a workspace file.
```text
dbfs mkdirs dbfs:/databricks/openlineage
@@ -181,7 +182,7 @@ Follow the instructions below and refer to the [OpenLineage Databricks Install I
After configuring the secret storage, the API key for OpenLineage can be configured in the Spark config, as in the following example:
`spark.openlineage.url.param.code {{secrets/secret_scope/Ol-Output-Api-Key}}`
- 1. Add a reference to the uploaded init script `dbfs:/databricks/openlineage/open-lineage-init-script.sh` on the [Init script section](https://docs.microsoft.com/en-us/azure/databricks/clusters/init-scripts#configure-a-cluster-scoped-init-script-using-the-ui) of the Advanced Options.
+ 1. Add a reference to the uploaded init script `dbfs:/databricks/openlineage/open-lineage-init-script.sh` on the [Init script section](https://learn.microsoft.com/en-us/azure/databricks/init-scripts/cluster-scoped#configure-a-cluster-scoped-init-script-using-the-ui) of the Advanced Options.
5. At this point, you can run a Databricks notebook on an "all-purpose cluster" in your configured workspace and observe lineage in Microsoft Purview once the Databricks notebook has finished running all cells.
@@ -191,46 +192,12 @@ Follow the instructions below and refer to the [OpenLineage Databricks Install I
### Support Extracting Lineage from Databricks Jobs
-To support Databricks Jobs, you must add the service principal to your Databricks workspace. To use the below scripts, you must authenticate to Azure Databricks using either [access tokens](https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/authentication) or [AAD tokens](https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/aad/). The snippets below assume you have generated an access token.
-
-1. [Add your Service Principal to Databricks as a User](https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/latest/scim/scim-sp#add-service-principal)
- * Create a file named `add-service-principal.json` that contains
- ```json
- {
- "schemas": [ "urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal" ],
- "applicationId": "",
- "displayName": "",
- "groups": [
- {
- "value": ""
- }
- ],
- "entitlements": [
- {
- "value":"allow-cluster-create"
- }
- ]
- }
- ```
- * Provide a group id by executing the `groups` Databricks API and extracting a group id.
- ```bash
- curl -X GET \
- https:///api/2.0/preview/scim/v2/Groups \
- --header 'Authorization: Bearer DATABRICKS_ACCESS_TOKEN' \
- | jq .
- ```
- You may use the admin group id or create a separate group to isolate the service principal.
-
- * Execute the following bash command after the file above has been created and populated.
- ```bash
- curl -X POST \
- https:///api/2.0/preview/scim/v2/ServicePrincipals \
- --header 'Content-type: application/scim+json' \
- --header 'Authorization: Bearer DATABRICKS_ACCESS_TOKEN' \
- --data @add-service-principal.json \
- | jq .
- ```
-2. [Assign the Service Principal as a contributor to the Databricks Workspace](https://docs.microsoft.com/en-us/azure/role-based-access-control/role-assignments-portal?tabs=current)
+To support Databricks Jobs, you must add the service principal to your Databricks workspace.
+
+1. Follow the [official documentation](https://learn.microsoft.com/en-us/azure/databricks/administration-guide/users-groups/service-principals#--add-a-service-principal-to-a-workspace-using-the-workspace-admin-settings) to add your service principal through the Workspace Admin Settings User Interface. This will also add it to the Admin group.
+ * For adding the service principal via REST API see [databricks jobs and service principals](./docs/databricks-jobs-service-principal.md)
+
+2. This should be the same Service Principal that has Data Curator role in Microsoft Purview.
3. At this point, you can run a Databricks job on a "job cluster" in your configured workspace and observe lineage in Microsoft Purview once the Databricks job has finished.
@@ -238,6 +205,6 @@ To support Databricks Jobs, you must add the service principal to your Databrick
### Global Init Scripts
-You can also configure the OpenLineage listener to run globally, so that any cluster which is created automatically runs the listener. To do this, you can utilize a [global init script](https://docs.microsoft.com/en-us/azure/databricks/clusters/init-scripts#global-init-scripts).
+You can also configure the OpenLineage listener to run globally, so that any cluster which is created automatically runs the listener. To do this, you can utilize a [global init script](https://learn.microsoft.com/en-us/azure/databricks/init-scripts/global).
**Note**: Global initialization cannot currently use values from Azure Databricks KeyVault integration mentioned above. If using global initialization scripts, this key would need to be retrieved in the notebooks themselves, or hardcoded into the global init script.
diff --git a/deployment/infra/newdeploymenttemp.json b/deployment/infra/newdeploymenttemp.json
index df3eb62..567f2a0 100644
--- a/deployment/infra/newdeploymenttemp.json
+++ b/deployment/infra/newdeploymenttemp.json
@@ -57,7 +57,7 @@
"openlineageKeyVaultName": "[replace(replace(toLower(concat(concat('keyvaut',variables('paramName')),variables('uniqueName'))),'-',''),'_','')]",
"purviewAccountName": "[parameters('purviewName')]",
"eventHubSku": "Standard",
- "captureEnabled": true,
+ "captureEnabled": false,
"captureEncodingFormat": "Avro",
"captureTime": 60,
"captureSize": 314572800,
@@ -86,7 +86,7 @@
"sku": {
"name": "Standard_LRS"
},
- "kind": "Storage",
+ "kind": "StorageV2",
"tags": "[parameters('resourceTagValues')]",
"properties": {
"allowBlobPublicAccess": "False",
@@ -101,7 +101,7 @@
"sku": {
"name": "Standard_LRS"
},
- "kind": "Storage",
+ "kind": "StorageV2",
"tags": "[parameters('resourceTagValues')]",
"properties": {
"allowBlobPublicAccess": "False"
diff --git a/docs/configuration.md b/docs/configuration.md
index 25ce4e6..11b68e6 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -30,7 +30,6 @@ The following app settings are experimental and may be removed in future release
| App Setting| Default Value in Code| Note|
|----|----|----|
|useResourceSet|true|Experimental feature|
-|maxQueryPlanSize|null|If the query plan bytes is greater than this value it will be removed from the databricks_process|
|prioritizeFirstResourceSet|true|When matching against existing assets, the first resource set found will be prioritized over other assets like folders or purview custom connector entities.|
|Spark_Entities|databricks_workspace;databricks_job;databricks_notebook;databricks_notebook_task||
|Spark_Process|databricks_process||
diff --git a/docs/data-factory.md b/docs/data-factory.md
new file mode 100644
index 0000000..7dc8b6f
--- /dev/null
+++ b/docs/data-factory.md
@@ -0,0 +1,10 @@
+# Data Factory and Databricks Notebook Lineage
+
+The solution accelerator supports capturing lineage for Databricks Notebook activities in Azure Data Factory (ADF). After running a notebook through ADF on an interactive or job cluster, you will see a Databricks Job asset in Microsoft Purview with a name similar to `ADF__`. For each Databricks notebook activity, you will also see a Databricks Task with a name similar to `ADF___`.
+
+* At this time, the Microsoft Purview view of Azure Data Factory lineage will not contain these tasks unless the Databricks Task uses or feeds a data source to a Data Flow or Copy activity.
+* Copy Activities may not show lineage connecting to these Databricks tasks since it emits individual file assets rather than folder or resource set assets.
+
+## Enable Collecting Data Factory Lineage
+
+To enable Data Factory lineage, you must add the [Service Principal to the Databricks Workspace](./databricks-jobs-service-principal.md) and add it to at least the `users` group.
diff --git a/docs/databricks-jobs-service-principal.md b/docs/databricks-jobs-service-principal.md
new file mode 100644
index 0000000..20e4897
--- /dev/null
+++ b/docs/databricks-jobs-service-principal.md
@@ -0,0 +1,63 @@
+# Add Service Principal to Databricks Workspace for Job and Data Factory Support
+
+When extracting lineage for Databricks Jobs (a.k.a. Workflows), you will need the Solution Accelerator to be able to read Databricks Job information. This also applies to Data Factory calling Databricks Notebooks or Python Files as Activities in a Pipeline. In order to read this information, the Solution Accelerator Service Principal (the one that has been granted the Data Curator role in Microsoft Purview) must be added to the Databricks Workspace as a user.
+
+Given that this solution does not support Unity Catalog, it is assumed that you are using the Workspace Admin settings and not the Account Console to add and manage Service Principals.
+
+For best experience, follow the [official documentation on how to add a service principal to Databricks Workspace and users group](https://learn.microsoft.com/en-us/azure/databricks/administration-guide/users-groups/service-principals#--add-a-service-principal-to-a-workspace-using-the-workspace-admin-settings) via the UI.
+
+The remainder of this page provides sample code for adding a service principal via REST API calls.
+
+## Generate a Databricks Access Token
+
+This sample uses [Databricks Personal Access Tokens](https://learn.microsoft.com/en-us/azure/databricks/dev-tools/auth/pat). Generate a token for use in the code below.
+
+## Add your Service Principal to Databricks as a User and Add to Users Group
+
+[Databricks Workspace REST API: Service Principals - Create](https://docs.databricks.com/api/azure/workspace/serviceprincipals/create)
+
+* Find the `users`` group id by executing the `groups` Databricks API and extracting the group id.
+ ```bash
+ curl -X GET \
+ https:///api/2.0/preview/scim/v2/Groups \
+ --header 'Authorization: Bearer ' \
+ | jq .
+ ```
+ You may use the users group id or create a separate group to isolate the service principal.
+* Create a file named `add-service-principal.json` that contains the below payload with the users group id.
+ ```json
+ {
+ "schemas": [ "urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal" ],
+ "applicationId": "",
+ "displayName": "",
+ "groups": [
+ {
+ "value": ""
+ }
+ ],
+ "entitlements": [
+ {
+ "value":"allow-cluster-create"
+ }
+ ]
+ }
+ ```
+* Execute the following bash command after the file above has been created and populated.
+ ```bash
+ curl -X POST \
+ https:///api/2.0/preview/scim/v2/ServicePrincipals \
+ --header 'Content-type: application/scim+json' \
+ --header 'Authorization: Bearer ' \
+ --data @add-service-principal.json \
+ | jq .
+ ```
+
+## Optional Use the Admin Group
+
+In some cases, you may need to use the Admin group. Repeat the steps above and add the service principal to the Admin group.
+
+## Optional: Assign the Service Principal as a contributor to the Databricks Workspace
+
+The above steps should be sufficient but in some cases, you may need to add the service principal as a contributor to the Databricks Workspace resource.
+
+[How to assign roles to a resource](https://docs.microsoft.com/en-us/azure/role-based-access-control/role-assignments-portal?tabs=current)
diff --git a/function-app/adb-to-purview/src/Function.Domain/Helpers/OlProcessing/ValidateOlEvent.cs b/function-app/adb-to-purview/src/Function.Domain/Helpers/OlProcessing/ValidateOlEvent.cs
index 984eb3b..a6fd1b3 100644
--- a/function-app/adb-to-purview/src/Function.Domain/Helpers/OlProcessing/ValidateOlEvent.cs
+++ b/function-app/adb-to-purview/src/Function.Domain/Helpers/OlProcessing/ValidateOlEvent.cs
@@ -39,18 +39,25 @@ public ValidateOlEvent(ILoggerFactory loggerFactory)
///
/// OpenLineage Event message
/// true if input is valid, false if not
- public bool Validate(Event olEvent){
+ public bool Validate(Event? olEvent){
+ if (olEvent == null){
+ _log.LogWarning("Event considered NOT valid as it was null");
+ return false;
+ }
+ _log.LogInformation($"Validating input of an event with {olEvent.Inputs.Count} inputs and {olEvent.Outputs.Count} outputs");
if (olEvent.Inputs.Count > 0 && olEvent.Outputs.Count > 0)
{
// Need to rework for multiple inputs and outputs in one packet - possibly combine and then hash
if (InOutEqual(olEvent))
{
+ _log.LogWarning($"Event considered NOT valid due to inputs and outputs being equal");
return false;
}
if (olEvent.EventType == "START")
{
if (olEvent.Run.Facets.EnvironmentProperties == null)
{
+ _log.LogWarning($"Start Event considered NOT valid due to missing Databricks Envrionment Properties");
return false;
}
return true;
@@ -61,9 +68,11 @@ public bool Validate(Event olEvent){
}
else
{
+ _log.LogWarning($"Event considered NOT valid due to not matching any other condition");
return false;
}
}
+ _log.LogWarning($"Event considered NOT valid due to not matching any other condition");
return false;
}
@@ -77,7 +86,7 @@ private bool InOutEqual(Event ev)
nms2.Sort();
nmspc.Sort();
nmspc2.Sort();
- return Enumerable.SequenceEqual(nms, nms2) && Enumerable.SequenceEqual(nms, nms2);
+ return Enumerable.SequenceEqual(nms, nms2) && Enumerable.SequenceEqual(nmspc, nmspc2);
}
}
}
\ No newline at end of file
diff --git a/function-app/adb-to-purview/src/Function.Domain/Helpers/PurviewCustomType.cs b/function-app/adb-to-purview/src/Function.Domain/Helpers/PurviewCustomType.cs
index af8c97f..586dec7 100644
--- a/function-app/adb-to-purview/src/Function.Domain/Helpers/PurviewCustomType.cs
+++ b/function-app/adb-to-purview/src/Function.Domain/Helpers/PurviewCustomType.cs
@@ -514,7 +514,7 @@ public class PurviewClient
ILogger _logger;
private AppConfigurationSettings? config = new AppConfigurationSettings();
private MemoryCache _cache = MemoryCache.Default;
- private CacheItemPolicy cacheItemPolicy;
+ private String TOKEN_CACHE_KEY = "token";
///
/// Create Object passing the logger class
///
@@ -524,10 +524,6 @@ public PurviewClient(ILogger logger)
_logger = logger;
httpclientManager = new HttpClientManager(logger);
this.PurviewclientHelper = new PurviewClientHelper(httpclientManager, logger);
- cacheItemPolicy = new CacheItemPolicy
- {
- AbsoluteExpiration = DateTimeOffset.Now.AddHours(config!.tokenCacheTimeInHours)
- };
}
///
@@ -657,11 +653,20 @@ public async Task search_entities(string QualifiedName)
}
private async Task GetToken()
{
-
- if (_cache.Contains("token"))
+ // If the Memory Cache already has a token stored
+ // test to see if it's actually expired (because the token might expire soon)
+ if (_cache.Contains(TOKEN_CACHE_KEY))
{
- return ((AuthenticationResult)_cache.Get("token")).AccessToken;
+ var cachedAuth = (AuthenticationResult)_cache.Get(TOKEN_CACHE_KEY);
+ // If the cached auth expires later than NOW + 3 minutes
+ // We are good to use the token for the next operation
+ if (cachedAuth.ExpiresOn > DateTime.UtcNow.AddMinutes(3)){
+ _logger.LogInformation("PurviewClient-GetToken: Token cache hit, no need to refresh token");
+ return cachedAuth.AccessToken;
+ }
+ // Else, fall through and get a new token because it's about to expire
}
+ _logger.LogWarning("PurviewClient-GetToken: Purview Client Token doesn't exist or will expire soon, attempt refresh");
// Even if this is a console application here, a daemon application is a confidential client application
IConfidentialClientApplication app;
@@ -712,9 +717,19 @@ private async Task GetToken()
return String.Empty;
}
- //_token = result;
- var cacheItem = new CacheItem("token", result);
- _cache.Add(cacheItem, cacheItemPolicy);
+ _logger.LogInformation($"PurviewClient-GetToken: Purview Client Token refresh successful. Adding to cache and will expire in {config!.tokenCacheTimeInHours}");
+ var cacheItem = new CacheItem(TOKEN_CACHE_KEY, result);
+ // We need to recreate this CacheItemPolicy every time
+ // If we set the policy only once, the AbsoluteExpiration won't get updated
+ // This should only be a problem in long standing PurviewClient applications
+ var cacheItemPolicy = new CacheItemPolicy
+ {
+ AbsoluteExpiration = DateTimeOffset.Now.AddHours(config!.tokenCacheTimeInHours)
+ };
+ // Need to use Set in case the token already exists
+ // We expect that a token that expires in three minutes will be updated
+ // so the token will exist already
+ _cache.Set(cacheItem, cacheItemPolicy);
return result.AccessToken;
}
///
diff --git a/function-app/adb-to-purview/src/Function.Domain/Helpers/parser/EventParser.cs b/function-app/adb-to-purview/src/Function.Domain/Helpers/parser/EventParser.cs
new file mode 100644
index 0000000..5425c90
--- /dev/null
+++ b/function-app/adb-to-purview/src/Function.Domain/Helpers/parser/EventParser.cs
@@ -0,0 +1,75 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Function.Domain.Models.Purview;
+using Function.Domain.Models.OL;
+using Microsoft.Extensions.Logging;
+using Function.Domain.Models.Settings;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
+using System;
+
+namespace Function.Domain.Helpers
+{
+ public class EventParser:IEventParser
+ {
+ private ILogger _logger;
+ private AppConfigurationSettings? _appSettingsConfig = new AppConfigurationSettings();
+ public EventParser(ILogger logger){
+ _logger = logger;
+ }
+ public Event? ParseOlEvent(string eventPayload){
+ try{
+ var trimString = TrimPrefix(eventPayload);
+ var _totalPayloadSize = System.Text.Encoding.Unicode.GetByteCount(trimString);
+
+ var _event = JsonConvert.DeserializeObject(trimString);
+ if (_event == null){
+ _logger.LogWarning($"ParseOlEvent: Event Payload was null");
+ return null;
+ }
+
+ // Handle the 1 MB size limit of an Event Hub in an opinionated way
+ _logger.LogDebug($"ParseOlEvent: Payload Size Initial: {_totalPayloadSize}");
+ if (_totalPayloadSize > _appSettingsConfig!.maxPayloadSize){
+ // First remove the Spark Plan, it has the least impact
+ _logger.LogWarning("Total Payload from OpenLineage exceeded maximum size.");
+ _logger.LogWarning("Total Payload from OpenLineage exceeded maximum size: Removing Spark Plan");
+ _event.Run.Facets.SparkLogicalPlan = new JObject();
+ var _sizeAfterStripPlan = System.Text.Encoding.Unicode.GetByteCount(JsonConvert.SerializeObject(_event).ToString());
+ _logger.LogDebug($"ParseOlEvent: Payload Size After Pruning Spark Plan: {_totalPayloadSize}");
+
+ if (_sizeAfterStripPlan > _appSettingsConfig!.maxPayloadSize){
+ // Next remove the column lineage but this affects column mapping
+ _logger.LogWarning("Total Payload from OpenLineage exceeded maximum size: Removing column lineage from OpenLineage Event");
+ System.Collections.Generic.List updatedOutputs = new System.Collections.Generic.List();
+ foreach (var output in _event.Outputs){
+ output.Facets.ColFacets = new ColumnLineageFacetsClass();
+ updatedOutputs.Add(output);
+ }
+ _event.Outputs = updatedOutputs;
+ }
+
+ var _sizeAfterStripPlanAndColLineage = System.Text.Encoding.Unicode.GetByteCount(JsonConvert.SerializeObject(_event).ToString());
+ _logger.LogWarning($"ParseOlEvent - Payload Size After Pruning Spark Plan and ColumnLineage: {_sizeAfterStripPlanAndColLineage}");
+ // TODO: Add reducing Mount Points
+ }
+
+ return _event;
+ }
+ catch (JsonSerializationException ex) {
+ _logger.LogWarning($"Json Serialization Issue: {eventPayload}, error: {ex.Message} path: {ex.Path}");
+ }
+ // Parsing error
+ catch (Exception ex){
+ _logger.LogWarning($"Unrecognized Message: {eventPayload}, error: {ex.Message}");
+ }
+ return null;
+
+ }
+ public string TrimPrefix(string strEvent){
+ return strEvent.Substring(strEvent.IndexOf('{')).Trim();
+ }
+ }
+}
+
diff --git a/function-app/adb-to-purview/src/Function.Domain/Helpers/parser/IEventParser.cs b/function-app/adb-to-purview/src/Function.Domain/Helpers/parser/IEventParser.cs
new file mode 100644
index 0000000..43f8014
--- /dev/null
+++ b/function-app/adb-to-purview/src/Function.Domain/Helpers/parser/IEventParser.cs
@@ -0,0 +1,15 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using Function.Domain.Models.OL;
+using Function.Domain.Models.Purview;
+
+namespace Function.Domain.Helpers
+{
+ public interface IEventParser
+ {
+ public Event? ParseOlEvent(string eventPayload);
+
+ public string TrimPrefix(string strEvent);
+ }
+}
\ No newline at end of file
diff --git a/function-app/adb-to-purview/src/Function.Domain/Models/Parser/Settings/AppConfigurationSettings.cs b/function-app/adb-to-purview/src/Function.Domain/Models/Parser/Settings/AppConfigurationSettings.cs
index 26f6b7a..9f5bd75 100644
--- a/function-app/adb-to-purview/src/Function.Domain/Models/Parser/Settings/AppConfigurationSettings.cs
+++ b/function-app/adb-to-purview/src/Function.Domain/Models/Parser/Settings/AppConfigurationSettings.cs
@@ -38,7 +38,7 @@ public string Authority
public string AuthenticationUri { get; set; } = "purview.azure.net";
public string AppDomainUrl { get; set; } = "purview.azure.com";
public string ResourceUri { get; set; } = "https://purview.azure.com";
- public int maxQueryPlanSize {get; set; } = Int32.MaxValue;
+ public int maxPayloadSize {get; set;} = 1048576;
public bool prioritizeFirstResourceSet { get; set; } = true;
public string purviewApiEndPoint { get; set; } = "{ResourceUri}/catalog/api";
public string purviewApiEntityBulkMethod { get; set; } = "/atlas/v2/entity/bulk";
diff --git a/function-app/adb-to-purview/src/Function.Domain/Providers/AdbClientProvider.cs b/function-app/adb-to-purview/src/Function.Domain/Providers/AdbClientProvider.cs
index 74987ab..f208c91 100644
--- a/function-app/adb-to-purview/src/Function.Domain/Providers/AdbClientProvider.cs
+++ b/function-app/adb-to-purview/src/Function.Domain/Providers/AdbClientProvider.cs
@@ -26,6 +26,7 @@ class AdbClientProvider : IAdbClientProvider
private AppConfigurationSettings? config = new AppConfigurationSettings();
// static for simple function cache
+ // TODO: This may cause an issue when you are trying to handle multiple Databricks workspace
private static JwtSecurityToken? _bearerToken;
private HttpClient _client;
private ILogger _log;
@@ -106,6 +107,7 @@ private async Task GetBearerTokenAsync()
{
if (isTokenExpired(_bearerToken))
{
+ _log.LogWarning($"AdbClient-GetSingleAdbJobAsync: need to update token cache to get runId {runId} for workspace {adbWorkspaceUrl}");
await GetBearerTokenAsync();
if (_bearerToken is null)
@@ -131,7 +133,7 @@ private async Task GetBearerTokenAsync()
}
catch (Exception ex)
{
- _log.LogError(ex, $"AdbClient-GetSingleAdbJobAsync: error, message: {ex.Message}");
+ _log.LogError(ex, $"AdbClient-GetSingleAdbJobAsync: error retrieving run information for runId {runId} for workspace {adbWorkspaceUrl}, message: {ex.Message}");
}
return resultAdbRoot;
}
@@ -143,7 +145,7 @@ private bool isTokenExpired(JwtSecurityToken? jwt)
}
if (jwt.ValidTo > DateTime.Now.AddMinutes(3))
{
- _log.LogInformation("AdbClient-isTokenExpired: Token cache hit");
+ _log.LogInformation("AdbClient-isTokenExpired: Token cache hit, no need to refresh token");
return false;
}
return true;
diff --git a/function-app/adb-to-purview/src/Function.Domain/Services/IOlFilter.cs b/function-app/adb-to-purview/src/Function.Domain/Services/IOlFilter.cs
index 9c54849..da6120e 100644
--- a/function-app/adb-to-purview/src/Function.Domain/Services/IOlFilter.cs
+++ b/function-app/adb-to-purview/src/Function.Domain/Services/IOlFilter.cs
@@ -2,11 +2,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
+using Function.Domain.Models.OL;
+
namespace Function.Domain.Services
{
public interface IOlFilter
{
- bool FilterOlMessage(string strRequest);
- string GetJobNamespace(string strRequest);
+ bool FilterOlMessage(Event? olEvent);
+ string GetJobNamespace(Event olEvent);
}
}
\ No newline at end of file
diff --git a/function-app/adb-to-purview/src/Function.Domain/Services/OlConsolodateEnrich.cs b/function-app/adb-to-purview/src/Function.Domain/Services/OlConsolodateEnrich.cs
index b0868c6..853dff1 100644
--- a/function-app/adb-to-purview/src/Function.Domain/Services/OlConsolodateEnrich.cs
+++ b/function-app/adb-to-purview/src/Function.Domain/Services/OlConsolodateEnrich.cs
@@ -52,20 +52,8 @@ public OlConsolodateEnrich(
/// Enriched event or null if unable to process the event
public async Task ProcessOlMessage(String strEvent)
{
-
- var trimString = TrimPrefix(strEvent);
- try
- {
- _event = JsonConvert.DeserializeObject(trimString) ?? new Event();
- int planSize = System.Text.Encoding.Unicode.GetByteCount(_event.Run.Facets.SparkLogicalPlan.ToString());
- if (planSize > _appSettingsConfig!.maxQueryPlanSize){
- _logger.LogWarning("Query Plan size exceeded maximum. Removing query plan from OpenLineage Event");
- _event.Run.Facets.SparkLogicalPlan = new JObject();
- }
- }
- catch (JsonSerializationException ex) {
- _logger.LogWarning(ex,$"Unrecognized Message: {strEvent}, error: {ex.Message} path: {ex.Path}");
- }
+ var _eventParser = new EventParser(_logger);
+ var _event = _eventParser.ParseOlEvent(strEvent) ?? new Event();
var validateOlEvent = new ValidateOlEvent(_loggerFactory);
var olMessageConsolodation = new OlMessageConsolodation(_loggerFactory, _configuration);
@@ -129,26 +117,5 @@ public string GetJobNamespace()
{
return _event.Job.Namespace;
}
-
- private Event? ParseOlEvent(string strEvent)
- {
- try{
- var trimString = TrimPrefix(strEvent);
- return JsonConvert.DeserializeObject(trimString);
- }
- catch (JsonSerializationException ex) {
- _logger.LogWarning($"Json Serialization Issue: {strEvent}, error: {ex.Message} path: {ex.Path}");
- }
- // Parsing error
- catch (Exception ex){
- _logger.LogWarning($"Unrecognized Message: {strEvent}, error: {ex.Message}");
- }
- return null;
- }
-
- private string TrimPrefix(string strEvent)
- {
- return strEvent.Substring(strEvent.IndexOf('{')).Trim();
- }
}
}
\ No newline at end of file
diff --git a/function-app/adb-to-purview/src/Function.Domain/Services/OlFilter.cs b/function-app/adb-to-purview/src/Function.Domain/Services/OlFilter.cs
index 7554c60..ea326bb 100644
--- a/function-app/adb-to-purview/src/Function.Domain/Services/OlFilter.cs
+++ b/function-app/adb-to-purview/src/Function.Domain/Services/OlFilter.cs
@@ -33,24 +33,19 @@ public OlFilter(ILoggerFactory loggerFactory)
///
/// Filters the OL events to only those that need to be passed on to the event hub
///
- /// This is the OpenLineage data passed from the Spark listener
+ /// This is the OpenLineage data passed from the Spark listener
///
/// true: if the message should be passed on for further processing
/// false: if the message should be filtered out
///
- public bool FilterOlMessage(string strRequest)
+ public bool FilterOlMessage(Event? olEvent)
{
- var olEvent = ParseOlEvent(strRequest);
try {
- if (olEvent is not null)
- {
- var validateEvent = new ValidateOlEvent(_loggerFactory);
- return validateEvent.Validate(olEvent);
- }
- return false;
+ var validateEvent = new ValidateOlEvent(_loggerFactory);
+ return validateEvent.Validate(olEvent);
}
catch (Exception ex) {
- _logger.LogError(ex, $"Error during event validation: {strRequest}, error: {ex.Message}");
+ _logger.LogError(ex, $"Error during event validation: {olEvent.ToString()}, error: {ex.Message}");
return false;
}
}
@@ -60,37 +55,9 @@ public bool FilterOlMessage(string strRequest)
///
/// the OpenLineage event
/// the namespace value from the event
- public string GetJobNamespace(string strRequest)
+ public string GetJobNamespace(Event olEvent)
{
- var olEvent = ParseOlEvent(strRequest);
return olEvent?.Job.Namespace ?? "";
}
-
- private Event? ParseOlEvent(string strEvent)
- {
- try{
- var trimString = TrimPrefix(strEvent);
- var _event = JsonConvert.DeserializeObject(trimString);
- int planSize = System.Text.Encoding.Unicode.GetByteCount(_event!.Run.Facets.SparkLogicalPlan.ToString());
- if (planSize > _appSettingsConfig!.maxQueryPlanSize){
- _logger.LogWarning("Query Plan size exceeded maximum. Removing query plan from OpenLineage Event");
- _event.Run.Facets.SparkLogicalPlan = new JObject();
- }
- return _event;
- }
- catch (JsonSerializationException ex) {
- _logger.LogWarning($"Json Serialization Issue: {strEvent}, error: {ex.Message} path: {ex.Path}");
- }
- // Parsing error
- catch (Exception ex){
- _logger.LogWarning($"Unrecognized Message: {strEvent}, error: {ex.Message}");
- }
- return null;
- }
-
- private string TrimPrefix(string strEvent)
- {
- return strEvent.Substring(strEvent.IndexOf('{')).Trim();
- }
}
}
\ No newline at end of file
diff --git a/function-app/adb-to-purview/src/Functions/OpenLineageIn.cs b/function-app/adb-to-purview/src/Functions/OpenLineageIn.cs
index d26ce98..a18377e 100644
--- a/function-app/adb-to-purview/src/Functions/OpenLineageIn.cs
+++ b/function-app/adb-to-purview/src/Functions/OpenLineageIn.cs
@@ -8,6 +8,7 @@
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;
+using Newtonsoft.Json;
using System.IO;
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
@@ -30,6 +31,7 @@ public class OpenLineageIn
private EventHubProducerClient _producerClient;
private IConfiguration _configuration;
private IOlFilter _olFilter;
+ private IEventParser _eventParser;
public OpenLineageIn(
ILogger logger,
@@ -41,6 +43,7 @@ public OpenLineageIn(
_configuration = configuration;
_producerClient = new EventHubProducerClient(_configuration[EH_CONNECTION_STRING], _configuration[EVENT_HUB_NAME]);
_olFilter = olFilter;
+ _eventParser = new EventParser(_logger);
}
[Function("OpenLineageIn")]
@@ -57,23 +60,34 @@ public async Task Run(
var events = new List();
string requestBody = new StreamReader(req.Body).ReadToEnd();
var strRequest = requestBody.ToString();
- if (_olFilter.FilterOlMessage(strRequest))
+ var parsedEvent = _eventParser.ParseOlEvent(strRequest);
+ if (parsedEvent == null){
+ _logger.LogError($"After parsing, event was null. Original payload: {strRequest}");
+ return _httpHelper.CreateServerErrorHttpResponse(req);
+ }
+ var parsedEventString = JsonConvert.SerializeObject(parsedEvent);
+ if (parsedEventString == null){
+ _logger.LogError($"After string serialization, event was null. Original payload: {strRequest}");
+ return _httpHelper.CreateServerErrorHttpResponse(req);
+ }
+ if (_olFilter.FilterOlMessage(parsedEvent))
{
- var sendEvent = new EventData(strRequest);
+ var sendEvent = new EventData(parsedEventString);
var sendEventOptions = new SendEventOptions();
// uses the OL Job Namespace as the EventHub partition key
- var jobNamespace = _olFilter.GetJobNamespace(strRequest);
+ var jobNamespace = _olFilter.GetJobNamespace(parsedEvent);
if (jobNamespace == "" || jobNamespace == null)
{
- _logger.LogError($"No Job Namespace found in event: {strRequest}");
+ _logger.LogError($"No Job Namespace found in event: {parsedEventString}");
}
else
{
sendEventOptions.PartitionKey = jobNamespace;
+
events.Add(sendEvent);
+ _logger.LogInformation($"OpenLineageIn:{parsedEventString}");
await _producerClient.SendAsync(events, sendEventOptions);
// log OpenLineage incoming data
- _logger.LogInformation($"OpenLineageIn:{strRequest}");
}
}
// Send appropriate success response
diff --git a/function-app/adb-to-purview/tests/unit-tests/Function.Domain/Services/OlFilterTests.cs b/function-app/adb-to-purview/tests/unit-tests/Function.Domain/Services/OlFilterTests.cs
index c8db64c..fc2c15f 100644
--- a/function-app/adb-to-purview/tests/unit-tests/Function.Domain/Services/OlFilterTests.cs
+++ b/function-app/adb-to-purview/tests/unit-tests/Function.Domain/Services/OlFilterTests.cs
@@ -4,10 +4,13 @@
using Xunit;
using Microsoft.Extensions.Logging.Abstractions;
using Function.Domain.Services;
+using Function.Domain.Helpers;
+using Microsoft.Extensions.Logging;
namespace UnitTests.Function.Domain.Services
{
public class OlFilterTests{
+ private IEventParser _eventParser;
private NullLoggerFactory _mockLoggerFactory;
@@ -21,7 +24,10 @@ public OlFilterTests()
public void FilterOlEvent_bool_FilterGoodEvents(string msgEvent, bool expectedResult)
{
IOlFilter filterOlEvent = new OlFilter(_mockLoggerFactory);
- var rslt = filterOlEvent.FilterOlMessage(msgEvent);
+ var _log = _mockLoggerFactory.CreateLogger();
+ var _eventParser = new EventParser(_log);
+ var parsedEvent = _eventParser.ParseOlEvent(msgEvent);
+ var rslt = filterOlEvent.FilterOlMessage(parsedEvent);
Xunit.Assert.Equal(expectedResult, rslt);
}
diff --git a/function-app/adb-to-purview/tests/unit-tests/Function.Domain/Services/UnitTestData.cs b/function-app/adb-to-purview/tests/unit-tests/Function.Domain/Services/UnitTestData.cs
index 21416b3..1400c62 100644
--- a/function-app/adb-to-purview/tests/unit-tests/Function.Domain/Services/UnitTestData.cs
+++ b/function-app/adb-to-purview/tests/unit-tests/Function.Domain/Services/UnitTestData.cs
@@ -29,16 +29,16 @@ public IEnumerator