Skip to content

Commit eef82db

Browse files
committed
Add APM server ingestion channel
Added `ApmChannel` class that extends the `TransportChannelBase` to provide event ingestion to APM server, which sends V2 intake API data. Also updated the implementation of `IMetricSet`, `ITransaction`, `IError` and `ISpan` to include an `IIntakeRoot` interface which is used to define the incoming root JSON structure to APM server. Updated several dependency versions in some projects. Also added reference to `Elastic.Ingest.Transport` in `Elastic.Apm.csproj`, enabling data transport using our shared infrastructure. Added new files `ApmChannelOptions` and `IIntakeRoot` to manage channel options and declare the interface for incoming JSON data structure respectively. `IntakeErrorItem` and `EventIntakeResponse` are created to handle responses from ingestion processes.
1 parent 4ccd775 commit eef82db

File tree

14 files changed

+236
-12
lines changed

14 files changed

+236
-12
lines changed

build/scripts/Build.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ open Tooling
2121

2222
module Build =
2323

24+
//TODO remove oldDiagnosticSourceVersion
2425
let private oldDiagnosticSourceVersion = SemVer.parse "4.6.0"
2526
let private diagnosticSourceVersion6 = SemVer.parse "6.0.0"
2627

src/Elastic.Apm/Api/IError.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace Elastic.Apm.Api
1212
/// Represents an error which was captured by the agent.
1313
/// </summary>
1414
[Specification("error.json")]
15-
public interface IError
15+
public interface IError : IIntakeRoot
1616
{
1717
/// <summary>
1818
/// The culprit that caused this error.

src/Elastic.Apm/Api/IIntakeRoot.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
// Licensed to Elasticsearch B.V under
2+
// one or more agreements.
3+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
4+
// See the LICENSE file in the project root for more information
5+
6+
namespace Elastic.Apm.Api;
7+
8+
public interface IIntakeRoot { }

src/Elastic.Apm/Api/IMetricSet.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace Elastic.Apm.Api
1212
/// Data captured by the agent representing a metric occurring in a monitored service
1313
/// </summary>
1414
[Specification("metricset.json")]
15-
public interface IMetricSet
15+
public interface IMetricSet : IIntakeRoot
1616
{
1717
/// <summary>
1818
/// List of captured metrics as key - value pairs

src/Elastic.Apm/Api/ISpan.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ namespace Elastic.Apm.Api
1010
/// An event captured by an agent occurring in a monitored service
1111
/// </summary>
1212
[Specification("span.json")]
13-
public interface ISpan : IExecutionSegment
13+
public interface ISpan : IExecutionSegment, IIntakeRoot
1414
{
1515
/// <summary>
1616
/// The action of the span.

src/Elastic.Apm/Api/ITransaction.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ namespace Elastic.Apm.Api
1919
/// provide different transaction implementations.
2020
/// </remarks>
2121
[Specification("transaction.json")]
22-
public interface ITransaction : IExecutionSegment
22+
public interface ITransaction : IExecutionSegment, IIntakeRoot
2323
{
2424
/// <summary>
2525
/// Contains data related to FaaS (Function as a Service) events.

src/Elastic.Apm/Elastic.Apm.csproj

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@
6969
</ItemGroup>
7070

7171
<ItemGroup Condition="'$(DiagnosticSourceVersion)' == ''">
72-
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="5.0.0" />
72+
<!--- TODO BUMP Elastic.Transport down to 6.0.0 -->
73+
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.1" />
7374
</ItemGroup>
7475
<!-- DiagnosticSourceVersion MsBuild property can be used to compile the agent against a specific version of
7576
System.Diagnostics.DiagnosticSource. Used when creating the ElasticApmAgentStartupHook zip file to
@@ -85,6 +86,7 @@
8586
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="6.0.1" />
8687
</ItemGroup>
8788
<ItemGroup>
89+
<PackageReference Include="Elastic.Ingest.Transport" Version="0.5.5" />
8890
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
8991
<!-- Used by Ben.Demystifier -->
9092
<PackageReference Include="System.Reflection.Metadata" Version="5.0.0" />
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using System.Diagnostics;
8+
using System.IO;
9+
using System.Text.Encodings.Web;
10+
using System.Text.Json;
11+
using System.Text.Json.Serialization;
12+
using System.Threading;
13+
using System.Threading.Tasks;
14+
using Elastic.Apm.Api;
15+
using Elastic.Apm.Report;
16+
using Elastic.Channels;
17+
using Elastic.Ingest.Transport;
18+
using Elastic.Transport;
19+
20+
namespace Elastic.Apm.Ingest;
21+
22+
internal static class ApmChannelStatics
23+
{
24+
public static readonly byte[] LineFeed = { (byte)'\n' };
25+
26+
public static readonly DefaultRequestParameters RequestParams = new()
27+
{
28+
RequestConfiguration = new RequestConfiguration { ContentType = "application/x-ndjson" }
29+
};
30+
31+
public static readonly JsonSerializerOptions SerializerOptions = new()
32+
{
33+
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull, MaxDepth = 64, Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping,
34+
};
35+
}
36+
37+
/// <summary>
38+
/// An <see cref="TransportChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}"/> implementation that sends V2 intake API data
39+
/// to APM server.
40+
/// </summary>
41+
public class ApmChannel
42+
: TransportChannelBase<ApmChannelOptions, IIntakeRoot, EventIntakeResponse, IntakeErrorItem>
43+
, IPayloadSender
44+
{
45+
/// <inheritdoc cref="ApmChannel"/>
46+
public ApmChannel(ApmChannelOptions options) : base(options) { }
47+
48+
void IPayloadSender.QueueError(IError error) => TryWrite(error);
49+
50+
void IPayloadSender.QueueMetrics(IMetricSet metrics) => TryWrite(metrics);
51+
52+
void IPayloadSender.QueueSpan(ISpan span) => TryWrite(span);
53+
54+
void IPayloadSender.QueueTransaction(ITransaction transaction) => TryWrite(transaction);
55+
56+
//retry if APM server returns 429
57+
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Retry"/>
58+
protected override bool Retry(EventIntakeResponse response) => response.ApiCallDetails.HttpStatusCode == 429;
59+
60+
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryAllItems"/>
61+
protected override bool RetryAllItems(EventIntakeResponse response) => response.ApiCallDetails.HttpStatusCode == 429;
62+
63+
//APM does not return the status for all events sent. Therefor we always return an empty set for individual items to retry
64+
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.Zip"/>
65+
protected override List<(IIntakeRoot, IntakeErrorItem)> Zip(EventIntakeResponse response, IReadOnlyCollection<IIntakeRoot> page) =>
66+
_emptyZip;
67+
68+
private readonly List<(IIntakeRoot, IntakeErrorItem)> _emptyZip = new();
69+
70+
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryEvent"/>
71+
protected override bool RetryEvent((IIntakeRoot, IntakeErrorItem) @event) => false;
72+
73+
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RejectEvent"/>
74+
protected override bool RejectEvent((IIntakeRoot, IntakeErrorItem) @event) => false;
75+
76+
/// <inheritdoc cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}.ExportAsync"/>
77+
protected override Task<EventIntakeResponse> ExportAsync(HttpTransport transport, ArraySegment<IIntakeRoot> page, CancellationToken ctx = default) =>
78+
transport.RequestAsync<EventIntakeResponse>(HttpMethod.POST, "/intake/v2/events",
79+
PostData.StreamHandler(page,
80+
(_, _) =>
81+
{
82+
/* NOT USED */
83+
},
84+
async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, ctx).ConfigureAwait(false); })
85+
, ApmChannelStatics.RequestParams, ctx);
86+
87+
private async Task WriteStanzaToStreamAsync(Stream stream, CancellationToken ctx)
88+
{
89+
// {"metadata":{"process":{"pid":1234,"title":"/usr/lib/jvm/java-10-openjdk-amd64/bin/java","ppid":1,"argv":["-v"]},
90+
// "system":{"architecture":"amd64","detected_hostname":"8ec7ceb99074","configured_hostname":"host1","platform":"Linux","container":{"id":"8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4"},
91+
// "kubernetes":{"namespace":"default","pod":{"uid":"b17f231da0ad128dc6c6c0b2e82f6f303d3893e3","name":"instrumented-java-service"},"node":{"name":"node-name"}}},
92+
// "service":{"name":"1234_service-12a3","version":"4.3.0","node":{"configured_name":"8ec7ceb990749e79b37f6dc6cd3628633618d6ce412553a552a0fa6b69419ad4"},"environment":"production","language":{"name":"Java","version":"10.0.2"},
93+
// "agent":{"version":"1.10.0","name":"java","ephemeral_id":"e71be9ac-93b0-44b9-a997-5638f6ccfc36"},"framework":{"name":"spring","version":"5.0.0"},"runtime":{"name":"Java","version":"10.0.2"}},"labels":{"group":"experimental","ab_testing":true,"segment":5}}}
94+
// TODO cache
95+
var p = Process.GetCurrentProcess();
96+
var metadata = new
97+
{
98+
metadata = new
99+
{
100+
process = new { pid = p.Id, title = p.ProcessName },
101+
service = new
102+
{
103+
name = System.Text.RegularExpressions.Regex.Replace(p.ProcessName, "[^a-zA-Z0-9 _-]", "_"),
104+
version = "1.0.0",
105+
agent = new { name = "dotnet", version = "0.0.1" }
106+
}
107+
}
108+
};
109+
await JsonSerializer.SerializeAsync(stream, metadata, metadata.GetType(), ApmChannelStatics.SerializerOptions, ctx)
110+
.ConfigureAwait(false);
111+
await stream.WriteAsync(ApmChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false);
112+
}
113+
114+
private async Task WriteBufferToStreamAsync(IReadOnlyCollection<IIntakeRoot> b, Stream stream, CancellationToken ctx)
115+
{
116+
await WriteStanzaToStreamAsync(stream, ctx).ConfigureAwait(false);
117+
foreach (var @event in b)
118+
{
119+
if (@event == null) continue;
120+
121+
var type = @event switch
122+
{
123+
ITransaction => "transaction",
124+
ISpan => "span",
125+
IError => "error",
126+
IMetricSet => "metricset",
127+
_ => "unknown"
128+
};
129+
var dictionary = new Dictionary<string, object>() { { type, @event } };
130+
131+
await JsonSerializer.SerializeAsync(stream, dictionary, dictionary.GetType(), ApmChannelStatics.SerializerOptions, ctx)
132+
.ConfigureAwait(false);
133+
134+
await stream.WriteAsync(ApmChannelStatics.LineFeed, 0, 1, ctx).ConfigureAwait(false);
135+
}
136+
}
137+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using Elastic.Apm.Api;
6+
using Elastic.Ingest.Transport;
7+
using Elastic.Transport;
8+
9+
namespace Elastic.Apm.Ingest;
10+
11+
/// <summary>
12+
/// Channel options for <see cref="ApmChannel"/>
13+
/// </summary>
14+
public class ApmChannelOptions : TransportChannelOptionsBase<IIntakeRoot, EventIntakeResponse, IntakeErrorItem>
15+
{
16+
/// <inheritdoc cref="ApmChannelOptions"/>
17+
public ApmChannelOptions(HttpTransport transport) : base(transport) { }
18+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Licensed to Elasticsearch B.V under one or more agreements.
2+
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
// See the LICENSE file in the project root for more information
4+
5+
using System.Collections.Generic;
6+
using System.Text.Json.Serialization;
7+
using Elastic.Transport;
8+
9+
namespace Elastic.Apm.Ingest;
10+
11+
/// <summary> </summary>
12+
public class EventIntakeResponse : TransportResponse
13+
{
14+
/// <summary> </summary>
15+
[JsonPropertyName("accepted")]
16+
public long Accepted { get; set; }
17+
18+
/// <summary> </summary>
19+
[JsonPropertyName("errors")]
20+
//[JsonConverter(typeof(ResponseItemsConverter))]
21+
public IReadOnlyCollection<IntakeErrorItem> Errors { get; set; } = null!;
22+
}
23+
24+
/// <summary> </summary>
25+
public class IntakeErrorItem
26+
{
27+
/// <summary> </summary>
28+
[JsonPropertyName("message")]
29+
public string Message { get; set; } = null!;
30+
31+
/// <summary> </summary>
32+
[JsonPropertyName("document")]
33+
public string Document { get; set; } = null!;
34+
}

0 commit comments

Comments
 (0)