Skip to content

Commit 4eba25b

Browse files
wjohnsonhmoazam
andauthored
Hotfix/max query plan ol in (#226)
* Simplify reducing the payload size by introducing a maxPayloadSize and removing maxQueryPlanSize. Now we will remove spark plan first, then column lineage --------- Co-authored-by: Hanna Moazam <hannamoazam@gmail.com>
1 parent f66f229 commit 4eba25b

File tree

8 files changed

+122
-83
lines changed

8 files changed

+122
-83
lines changed

docs/configuration.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ The following app settings are experimental and may be removed in future release
3030
| App Setting| Default Value in Code| Note|
3131
|----|----|----|
3232
|useResourceSet|true|Experimental feature|
33-
|maxQueryPlanSize|null|If the query plan bytes is greater than this value it will be removed from the databricks_process|
3433
|prioritizeFirstResourceSet|true|When matching against existing assets, the first resource set found will be prioritized over other assets like folders or purview custom connector entities.|
3534
|Spark_Entities|databricks_workspace;databricks_job;databricks_notebook;databricks_notebook_task||
3635
|Spark_Process|databricks_process||
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Function.Domain.Models.Purview;
5+
using Function.Domain.Models.OL;
6+
using Microsoft.Extensions.Logging;
7+
using Function.Domain.Models.Settings;
8+
using Newtonsoft.Json;
9+
using Newtonsoft.Json.Linq;
10+
using System;
11+
12+
namespace Function.Domain.Helpers
13+
{
14+
public class EventParser:IEventParser
15+
{
16+
private ILogger _logger;
17+
private AppConfigurationSettings? _appSettingsConfig = new AppConfigurationSettings();
18+
public EventParser(ILogger logger){
19+
_logger = logger;
20+
}
21+
public Event? ParseOlEvent(string eventPayload){
22+
try{
23+
var trimString = TrimPrefix(eventPayload);
24+
var _totalPayloadSize = System.Text.Encoding.Unicode.GetByteCount(trimString);
25+
26+
var _event = JsonConvert.DeserializeObject<Event>(trimString);
27+
if (_event == null){
28+
_logger.LogWarning($"ParseOlEvent: Event Payload was null");
29+
return null;
30+
}
31+
32+
// Handle the 1 MB size limit of an Event Hub in an opinionated way
33+
_logger.LogDebug($"ParseOlEvent: Payload Size Initial: {_totalPayloadSize}");
34+
if (_totalPayloadSize > _appSettingsConfig!.maxPayloadSize){
35+
// First remove the Spark Plan, it has the least impact
36+
_logger.LogWarning("Total Payload from OpenLineage exceeded maximum size.");
37+
_logger.LogWarning("Total Payload from OpenLineage exceeded maximum size: Removing Spark Plan");
38+
_event.Run.Facets.SparkLogicalPlan = new JObject();
39+
var _sizeAfterStripPlan = System.Text.Encoding.Unicode.GetByteCount(JsonConvert.SerializeObject(_event).ToString());
40+
_logger.LogDebug($"ParseOlEvent: Payload Size After Pruning Spark Plan: {_totalPayloadSize}");
41+
42+
if (_sizeAfterStripPlan > _appSettingsConfig!.maxPayloadSize){
43+
// Next remove the column lineage but this affects column mapping
44+
_logger.LogWarning("Total Payload from OpenLineage exceeded maximum size: Removing column lineage from OpenLineage Event");
45+
System.Collections.Generic.List<Outputs> updatedOutputs = new System.Collections.Generic.List<Outputs>();
46+
foreach (var output in _event.Outputs){
47+
output.Facets.ColFacets = new ColumnLineageFacetsClass();
48+
updatedOutputs.Add(output);
49+
}
50+
_event.Outputs = updatedOutputs;
51+
}
52+
53+
var _sizeAfterStripPlanAndColLineage = System.Text.Encoding.Unicode.GetByteCount(JsonConvert.SerializeObject(_event).ToString());
54+
_logger.LogWarning($"ParseOlEvent - Payload Size After Pruning Spark Plan and ColumnLineage: {_sizeAfterStripPlanAndColLineage}");
55+
// TODO: Add reducing Mount Points
56+
}
57+
58+
return _event;
59+
}
60+
catch (JsonSerializationException ex) {
61+
_logger.LogWarning($"Json Serialization Issue: {eventPayload}, error: {ex.Message} path: {ex.Path}");
62+
}
63+
// Parsing error
64+
catch (Exception ex){
65+
_logger.LogWarning($"Unrecognized Message: {eventPayload}, error: {ex.Message}");
66+
}
67+
return null;
68+
69+
}
70+
public string TrimPrefix(string strEvent){
71+
return strEvent.Substring(strEvent.IndexOf('{')).Trim();
72+
}
73+
}
74+
}
75+
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT License.
3+
4+
using Function.Domain.Models.OL;
5+
using Function.Domain.Models.Purview;
6+
7+
namespace Function.Domain.Helpers
8+
{
9+
public interface IEventParser
10+
{
11+
public Event? ParseOlEvent(string eventPayload);
12+
13+
public string TrimPrefix(string strEvent);
14+
}
15+
}

function-app/adb-to-purview/src/Function.Domain/Models/Parser/Settings/AppConfigurationSettings.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public string Authority
3838
public string AuthenticationUri { get; set; } = "purview.azure.net";
3939
public string AppDomainUrl { get; set; } = "purview.azure.com";
4040
public string ResourceUri { get; set; } = "https://purview.azure.com";
41-
public int maxQueryPlanSize {get; set; } = Int32.MaxValue;
41+
public int maxPayloadSize {get; set;} = 1048576;
4242
public bool prioritizeFirstResourceSet { get; set; } = true;
4343
public string purviewApiEndPoint { get; set; } = "{ResourceUri}/catalog/api";
4444
public string purviewApiEntityBulkMethod { get; set; } = "/atlas/v2/entity/bulk";

function-app/adb-to-purview/src/Function.Domain/Services/IOlFilter.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
// Copyright (c) Microsoft Corporation.
33
// Licensed under the MIT License.
44

5+
using Function.Domain.Models.OL;
6+
57
namespace Function.Domain.Services
68
{
79
public interface IOlFilter
810
{
9-
bool FilterOlMessage(string strRequest);
10-
string GetJobNamespace(string strRequest);
11+
bool FilterOlMessage(Event olEvent);
12+
string GetJobNamespace(Event olEvent);
1113
}
1214
}

function-app/adb-to-purview/src/Function.Domain/Services/OlConsolodateEnrich.cs

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -52,20 +52,8 @@ public OlConsolodateEnrich(
5252
/// <returns> Enriched event or null if unable to process the event </returns>
5353
public async Task<EnrichedEvent?> ProcessOlMessage(String strEvent)
5454
{
55-
56-
var trimString = TrimPrefix(strEvent);
57-
try
58-
{
59-
_event = JsonConvert.DeserializeObject<Event>(trimString) ?? new Event();
60-
int planSize = System.Text.Encoding.Unicode.GetByteCount(_event.Run.Facets.SparkLogicalPlan.ToString());
61-
if (planSize > _appSettingsConfig!.maxQueryPlanSize){
62-
_logger.LogWarning("Query Plan size exceeded maximum. Removing query plan from OpenLineage Event");
63-
_event.Run.Facets.SparkLogicalPlan = new JObject();
64-
}
65-
}
66-
catch (JsonSerializationException ex) {
67-
_logger.LogWarning(ex,$"Unrecognized Message: {strEvent}, error: {ex.Message} path: {ex.Path}");
68-
}
55+
var _eventParser = new EventParser(_logger);
56+
var _event = _eventParser.ParseOlEvent(strEvent) ?? new Event();
6957

7058
var validateOlEvent = new ValidateOlEvent(_loggerFactory);
7159
var olMessageConsolodation = new OlMessageConsolodation(_loggerFactory, _configuration);
@@ -129,26 +117,5 @@ public string GetJobNamespace()
129117
{
130118
return _event.Job.Namespace;
131119
}
132-
133-
private Event? ParseOlEvent(string strEvent)
134-
{
135-
try{
136-
var trimString = TrimPrefix(strEvent);
137-
return JsonConvert.DeserializeObject<Event>(trimString);
138-
}
139-
catch (JsonSerializationException ex) {
140-
_logger.LogWarning($"Json Serialization Issue: {strEvent}, error: {ex.Message} path: {ex.Path}");
141-
}
142-
// Parsing error
143-
catch (Exception ex){
144-
_logger.LogWarning($"Unrecognized Message: {strEvent}, error: {ex.Message}");
145-
}
146-
return null;
147-
}
148-
149-
private string TrimPrefix(string strEvent)
150-
{
151-
return strEvent.Substring(strEvent.IndexOf('{')).Trim();
152-
}
153120
}
154121
}

function-app/adb-to-purview/src/Function.Domain/Services/OlFilter.cs

Lines changed: 6 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -33,24 +33,19 @@ public OlFilter(ILoggerFactory loggerFactory)
3333
/// <summary>
3434
/// Filters the OL events to only those that need to be passed on to the event hub
3535
/// </summary>
36-
/// <param name="strRequest">This is the OpenLineage data passed from the Spark listener</param>
36+
/// <param name="olEvent">This is the OpenLineage data passed from the Spark listener</param>
3737
/// <returns>
3838
/// true: if the message should be passed on for further processing
3939
/// false: if the message should be filtered out
4040
/// </returns>
41-
public bool FilterOlMessage(string strRequest)
41+
public bool FilterOlMessage(Event olEvent)
4242
{
43-
var olEvent = ParseOlEvent(strRequest);
4443
try {
45-
if (olEvent is not null)
46-
{
47-
var validateEvent = new ValidateOlEvent(_loggerFactory);
48-
return validateEvent.Validate(olEvent);
49-
}
50-
return false;
44+
var validateEvent = new ValidateOlEvent(_loggerFactory);
45+
return validateEvent.Validate(olEvent);
5146
}
5247
catch (Exception ex) {
53-
_logger.LogError(ex, $"Error during event validation: {strRequest}, error: {ex.Message}");
48+
_logger.LogError(ex, $"Error during event validation: {olEvent.ToString()}, error: {ex.Message}");
5449
return false;
5550
}
5651
}
@@ -60,37 +55,9 @@ public bool FilterOlMessage(string strRequest)
6055
/// </summary>
6156
/// <param name="strRequest"> the OpenLineage event </param>
6257
/// <returns> the namespace value from the event </returns>
63-
public string GetJobNamespace(string strRequest)
58+
public string GetJobNamespace(Event olEvent)
6459
{
65-
var olEvent = ParseOlEvent(strRequest);
6660
return olEvent?.Job.Namespace ?? "";
6761
}
68-
69-
private Event? ParseOlEvent(string strEvent)
70-
{
71-
try{
72-
var trimString = TrimPrefix(strEvent);
73-
var _event = JsonConvert.DeserializeObject<Event>(trimString);
74-
int planSize = System.Text.Encoding.Unicode.GetByteCount(_event!.Run.Facets.SparkLogicalPlan.ToString());
75-
if (planSize > _appSettingsConfig!.maxQueryPlanSize){
76-
_logger.LogWarning("Query Plan size exceeded maximum. Removing query plan from OpenLineage Event");
77-
_event.Run.Facets.SparkLogicalPlan = new JObject();
78-
}
79-
return _event;
80-
}
81-
catch (JsonSerializationException ex) {
82-
_logger.LogWarning($"Json Serialization Issue: {strEvent}, error: {ex.Message} path: {ex.Path}");
83-
}
84-
// Parsing error
85-
catch (Exception ex){
86-
_logger.LogWarning($"Unrecognized Message: {strEvent}, error: {ex.Message}");
87-
}
88-
return null;
89-
}
90-
91-
private string TrimPrefix(string strEvent)
92-
{
93-
return strEvent.Substring(strEvent.IndexOf('{')).Trim();
94-
}
9562
}
9663
}

function-app/adb-to-purview/src/Functions/OpenLineageIn.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using Microsoft.Azure.Functions.Worker.Http;
99
using Microsoft.Extensions.Logging;
1010
using Microsoft.Extensions.Configuration;
11+
using Newtonsoft.Json;
1112
using System.IO;
1213
using Azure.Messaging.EventHubs;
1314
using Azure.Messaging.EventHubs.Producer;
@@ -30,6 +31,7 @@ public class OpenLineageIn
3031
private EventHubProducerClient _producerClient;
3132
private IConfiguration _configuration;
3233
private IOlFilter _olFilter;
34+
private IEventParser _eventParser;
3335

3436
public OpenLineageIn(
3537
ILogger<OpenLineageIn> logger,
@@ -41,6 +43,7 @@ public OpenLineageIn(
4143
_configuration = configuration;
4244
_producerClient = new EventHubProducerClient(_configuration[EH_CONNECTION_STRING], _configuration[EVENT_HUB_NAME]);
4345
_olFilter = olFilter;
46+
_eventParser = new EventParser(_logger);
4447
}
4548

4649
[Function("OpenLineageIn")]
@@ -57,23 +60,34 @@ public async Task<HttpResponseData> Run(
5760
var events = new List<EventData>();
5861
string requestBody = new StreamReader(req.Body).ReadToEnd();
5962
var strRequest = requestBody.ToString();
60-
if (_olFilter.FilterOlMessage(strRequest))
63+
var parsedEvent = _eventParser.ParseOlEvent(strRequest);
64+
if (parsedEvent == null){
65+
_logger.LogError($"After parsing, event was null. Original payload: {strRequest}");
66+
return _httpHelper.CreateServerErrorHttpResponse(req);
67+
}
68+
var parsedEventString = JsonConvert.SerializeObject(parsedEvent);
69+
if (parsedEventString == null){
70+
_logger.LogError($"After string serialization, event was null. Original payload: {strRequest}");
71+
return _httpHelper.CreateServerErrorHttpResponse(req);
72+
}
73+
if (_olFilter.FilterOlMessage(parsedEvent))
6174
{
62-
var sendEvent = new EventData(strRequest);
75+
var sendEvent = new EventData(parsedEventString);
6376
var sendEventOptions = new SendEventOptions();
6477
// uses the OL Job Namespace as the EventHub partition key
65-
var jobNamespace = _olFilter.GetJobNamespace(strRequest);
78+
var jobNamespace = _olFilter.GetJobNamespace(parsedEvent);
6679
if (jobNamespace == "" || jobNamespace == null)
6780
{
68-
_logger.LogError($"No Job Namespace found in event: {strRequest}");
81+
_logger.LogError($"No Job Namespace found in event: {parsedEventString}");
6982
}
7083
else
7184
{
7285
sendEventOptions.PartitionKey = jobNamespace;
86+
7387
events.Add(sendEvent);
88+
_logger.LogInformation($"OpenLineageIn:{parsedEventString}");
7489
await _producerClient.SendAsync(events, sendEventOptions);
7590
// log OpenLineage incoming data
76-
_logger.LogInformation($"OpenLineageIn:{strRequest}");
7791
}
7892
}
7993
// Send appropriate success response

0 commit comments

Comments
 (0)