Skip to content

Commit 1fae036

Browse files
authored
Merge pull request #2 from gistart/feat/pushgw
http fixes, pushgateway client
2 parents 73abc55 + d821f10 commit 1fae036

File tree

11 files changed

+228
-26
lines changed

11 files changed

+228
-26
lines changed

.github/workflows/tests.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ jobs:
3131
- 9102:9102
3232
- 9125:9125
3333
- 9125:9125/udp
34+
pushgateway:
35+
image: prom/pushgateway:v1.4.0
36+
ports:
37+
- 9091:9091
3438
steps:
3539
- name: checkout
3640
uses: actions/checkout@v2
@@ -49,6 +53,8 @@ jobs:
4953
STATSD_HOST: localhost
5054
STATSD_API_PORT: 9102
5155
STATSD_UDP_PORT: 9125
56+
PUSHGW_HOST: localhost
57+
PUSHGW_PORT: 9091
5258
- name: upload-cov
5359
uses: codecov/codecov-action@v1
5460
with:

README.md

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33

44
# prometheus-push-client
55

6-
Push metrics from your regular and/or long-running jobs to existing Prometheus/VictoriaMetrics monitoring system.
6+
Push metrics from your periodic long-running jobs to existing Prometheus/VictoriaMetrics monitoring system.
77

8-
Currently supports pushes directly to VictoriaMetrics via UDP and HTTP using InfluxDB line protocol as [described here](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html?highlight=telegraf#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
8+
Currently supports pushes directly to `VictoriaMetrics` via UDP and HTTP using InfluxDB line protocol as [described here](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html?highlight=telegraf#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
99

10-
For pure Prometheus setups, pushes into StatsD/statsd-exporter in StatsD format via UDP are supported ([see exporter docs](https://github.com/prometheus/statsd_exporter#with-statsd)). Prometheus and StatsD metric types are not fully compatible, so currenly all metrics become StatsD gauges, but `rate`, `increase`, `histogram_quantile` and other PromQL functions produce same results as if types never changed.
10+
For `pure Prometheus` setups, several options are supported:
11+
- to [StatsD](https://github.com/statsd/statsd) or [statsd-exporter](https://github.com/prometheus/statsd_exporter#with-statsd) in StatsD format via UDP. Prometheus and StatsD metric types are not fully compatible, so currenly all metrics become StatsD gauges, but `rate`, `increase`, `histogram_quantile` and other PromQL functions produce same results as if types never changed.
12+
- to [pushgateway](https://github.com/prometheus/pushgateway) or [prom-aggregation-gateway](https://github.com/weaveworks/prom-aggregation-gateway) in OpenMetrics format via HTTP. Please read corresponding docs about appropriate use cases and limitations.
1113

1214
Install it via pip:
1315

@@ -74,10 +76,12 @@ Best way to use them is via decorators / context managers. These clients are int
7476
``` python
7577
def influx_udp_async(host, port, period=15.0):
7678
def influx_udp_thread(host, port, period=15.0):
77-
def influx_http_async(url, period=15.0):
78-
def influx_http_thread(url, period=15.0):
7979
def statsd_udp_async(host, port, period=15.0):
8080
def statsd_udp_thread(host, port, period=15.0):
81+
def influx_http_async(url, verb="POST", period=15.0):
82+
def influx_http_thread(url, verb="POST", period=15.0):
83+
def openmetrics_http_async(url, verb="POST", period=15.0):
84+
def openmetrics_http_thread(url, verb="POST", period=15.0):
8185
```
8286

8387
Usage example:
@@ -108,6 +112,8 @@ async def main(urls):
108112
req_hist.labels(gethostname(url)).observe(response.elapsed)
109113
```
110114

115+
Please read about mandatory `job` tag within url while [using pushgateway](https://github.com/prometheus/pushgateway#url).
116+
111117

112118
### Streaming clients
113119

prometheus_push_client/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
statsd_udp_aiostream,
1414
influx_udp_stream,
1515
statsd_udp_stream,
16+
openmetrics_http_async,
17+
openmetrics_http_thread,
1618
)
1719
from .formats.influx import InfluxFormat
1820
from .formats.statsd import StatsdFormat

prometheus_push_client/decorators.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,19 @@ def influx_udp_thread(host, port, period=15.0, registry=ppc.PUSH_REGISTRY):
2121
return _sync_wrap(client)
2222

2323

24-
def influx_http_async(url, period=15.0, registry=ppc.PUSH_REGISTRY):
24+
def influx_http_async(url, verb="POST", period=15.0, registry=ppc.PUSH_REGISTRY):
2525
client = ppc.AsyncBatchClient(
2626
format=ppc.InfluxFormat(registry=registry),
27-
transport=ppc.AioHttpTransport(url),
27+
transport=ppc.AioHttpTransport(url, verb),
2828
period=period,
2929
)
3030
return _async_wrap(client)
3131

3232

33-
def influx_http_thread(url, period=15.0, registry=ppc.PUSH_REGISTRY):
33+
def influx_http_thread(url, verb="POST", period=15.0, registry=ppc.PUSH_REGISTRY):
3434
client = ppc.ThreadBatchClient(
3535
format=ppc.InfluxFormat(registry=registry),
36-
transport=ppc.SyncHttpTransport(url),
36+
transport=ppc.SyncHttpTransport(url, verb),
3737
period=period,
3838
)
3939
return _sync_wrap(client)
@@ -89,6 +89,24 @@ def statsd_udp_stream(host, port, registry=ppc.PUSH_REGISTRY):
8989
return _sync_wrap(client)
9090

9191

92+
def openmetrics_http_async(url, verb="POST", period=15.0, registry=ppc.PUSH_REGISTRY):
93+
client = ppc.AsyncBatchClient(
94+
format=ppc.OpenMetricsFormat(registry=registry),
95+
transport=ppc.AioHttpTransport(url, verb),
96+
period=period,
97+
)
98+
return _async_wrap(client)
99+
100+
101+
def openmetrics_http_thread(url, verb="POST", period=15.0, registry=ppc.PUSH_REGISTRY):
102+
client = ppc.ThreadBatchClient(
103+
format=ppc.OpenMetricsFormat(registry=registry),
104+
transport=ppc.SyncHttpTransport(url, verb),
105+
period=period,
106+
)
107+
return _sync_wrap(client)
108+
109+
92110
#
93111
# decorator AND context manager
94112
#

prometheus_push_client/formats/base.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,22 @@ def iter_samples(self):
1616

1717
def format_metrics(self, *metrics):
1818
for metric in metrics:
19+
for line in self.format_header(metric):
20+
yield _enc(line)
21+
1922
for sample in metric.samples:
2023
line = self.format_sample(sample, metric)
21-
if isinstance(line, str):
22-
line = line.encode("utf-8")
23-
yield line
24+
yield _enc(line)
25+
26+
def format_header(self, metric):
27+
return
28+
yield # "empty" generator by default
2429

2530
def format_sample(self, metric, sample):
26-
raise NotImplementedError()
31+
raise NotImplementedError()
32+
33+
34+
def _enc(line):
35+
if isinstance(line, str):
36+
return line.encode("utf-8")
37+
return line

prometheus_push_client/formats/openmetrics.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ class OpenMetricsFormat(BaseFormat):
77
https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md
88
"""
99

10+
FMT_DATATYPE = "# TYPE %(measurement)s %(dtype)s"
1011
FMT_DATAPOINT = "%(measurement)s%(tag_set)s %(value)s%(timestamp)s"
1112

1213
ESCAPING = str.maketrans({
@@ -16,7 +17,25 @@ class OpenMetricsFormat(BaseFormat):
1617
})
1718

1819

19-
def format_sample(self, sample, metric):
20+
def format_header(self, metric): # pragma: no cover
21+
# as in generate_latest implemented at:
22+
# https://github.com/prometheus/client_python/blob/master/prometheus_client/exposition.py
23+
24+
mname, mtype = metric.name, metric.type
25+
if metric.type == "counter":
26+
mname = f"{mname}_total"
27+
elif metric.type in ("info", "stateset"):
28+
mtype = "gauge"
29+
elif metric.type == "gaugehistogram":
30+
mtype = "histogram"
31+
32+
yield self.FMT_DATATYPE % dict(measurement=mname, dtype=mtype)
33+
yield self.FMT_DATATYPE % dict(
34+
measurement=f"{metric.name}_created",
35+
dtype="gauge"
36+
)
37+
38+
def format_sample(self, sample, _):
2039
tag_set = ""
2140
if sample.labels:
2241
tags = (f'{k}="{v.translate(self.ESCAPING)}"' for k, v in sample.labels.items())
@@ -26,10 +45,9 @@ def format_sample(self, sample, metric):
2645
if sample.timestamp: # pragma: no cover
2746
ts = " %s" % sample.timestamp
2847

29-
# TODO: TYPE string?
3048
return self.FMT_DATAPOINT % dict(
3149
measurement=sample.name,
3250
tag_set=tag_set,
3351
value=sample.value,
3452
timestamp=ts,
35-
)
53+
)

prometheus_push_client/transports/http.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,27 @@
99

1010

1111
class BaseHttpTransport:
12-
def __init__(self, url):
12+
def __init__(self, url, verb="POST"):
1313
self._validate()
1414
self.url = url
15+
self.verb = verb
1516
self.session = None
17+
#TODO: basic auth, custom headers, tls ?
1618

1719
def _validate(self): # pragma: no cover
1820
pass
1921

22+
def prepare_data(self, iterable):
23+
data = b"\n".join(iterable)
24+
if data.endswith(b"\n\n"): # pragma: no cover
25+
return data
26+
to_pad = 1 if data[-1] == ord(b"\n") else 2
27+
return data.ljust(len(data) + to_pad, b"\n")
28+
2029
def push_all_sync(self):
2130
raise NotImplementedError("brave proposal")
2231

2332

24-
# TODO: configurable push formats ?
25-
26-
2733
class SyncHttpTransport(BaseHttpTransport):
2834
def _validate(self):
2935
if requests is None:
@@ -36,8 +42,8 @@ def stop(self):
3642
self.session.close()
3743

3844
def push_all(self, iterable):
39-
data = b"\n".join(iterable)
40-
self.session.post(self.url, data=data)
45+
data = self.prepare_data(iterable)
46+
self.session.request(self.verb, self.url, data=data)
4147

4248

4349
class AioHttpTransport(BaseHttpTransport):
@@ -52,6 +58,6 @@ async def stop(self):
5258
await self.session.close()
5359

5460
async def push_all(self, iterable):
55-
data = b"\n".join(iterable)
56-
async with self.session.post(self.url, data=data) as _:
61+
data = self.prepare_data(iterable)
62+
async with self.session.request(self.verb, self.url, data=data) as _:
5763
pass

prometheus_push_client/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.7"
1+
__version__ = "0.0.8"

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
readme_lines = []
1313
with open('README.md') as fd:
1414
readme_lines = filter(None, fd.read().splitlines())
15-
readme_lines = list(readme_lines)[:10]
15+
readme_lines = list(readme_lines)[:12]
1616
readme_lines.append('Read more at [github page](%s).' % github_url)
1717
readme = '\n\n'.join(readme_lines)
1818

test/test_online/conftest.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@ class Cfg:
1414
statsd_api_port = os.getenv("STATSD_API_PORT", "9102")
1515
statsd_udp_port = os.getenv("STATSD_UDP_PORT", "9125")
1616

17+
pushgw_host = os.getenv("PUSHGW_HOST", "localhost")
18+
pushgw_port = os.getenv("PUSHGW_PORT", "9091")
19+
1720
vm_ping_url = f"http://{vm_host}:{vm_api_port}/health"
1821
vm_write_url = f"http://{vm_host}:{vm_api_port}/write"
1922
vm_export_url = f"http://{vm_host}:{vm_api_port}/api/v1/export"
2023

2124
statsd_url = f"http://{statsd_host}:{statsd_api_port}/metrics"
2225

26+
pushgw_url = f"http://{pushgw_host}:{pushgw_port}/metrics"
27+
2328
return Cfg()

0 commit comments

Comments
 (0)