Skip to content

Commit f7ebeae

Browse files
committed
basic test for openmetrics_http + pushgateway
1 parent 9f8812e commit f7ebeae

File tree

6 files changed

+155
-8
lines changed

6 files changed

+155
-8
lines changed

.github/workflows/tests.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ jobs:
3131
- 9102:9102
3232
- 9125:9125
3333
- 9125:9125/udp
34+
pushgateway:
35+
image: prom/pushgateway:v1.4.0
36+
ports:
37+
- 9091:9091
3438
steps:
3539
- name: checkout
3640
uses: actions/checkout@v2
@@ -49,6 +53,8 @@ jobs:
4953
STATSD_HOST: localhost
5054
STATSD_API_PORT: 9102
5155
STATSD_UDP_PORT: 9125
56+
PUSHGW_HOST: localhost
57+
PUSHGW_PORT: 9091
5258
- name: upload-cov
5359
uses: codecov/codecov-action@v1
5460
with:

README.md

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33

44
# prometheus-push-client
55

6-
Push metrics from your regular and/or long-running jobs to existing Prometheus/VictoriaMetrics monitoring system.
6+
Push metrics from your periodic long-running jobs to existing Prometheus/VictoriaMetrics monitoring system.
77

8-
Currently supports pushes directly to VictoriaMetrics via UDP and HTTP using InfluxDB line protocol as [described here](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html?highlight=telegraf#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
8+
Currently supports pushes directly to `VictoriaMetrics` via UDP and HTTP using InfluxDB line protocol as [described here](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html?highlight=telegraf#how-to-send-data-from-influxdb-compatible-agents-such-as-telegraf).
99

10-
For pure Prometheus setups, pushes into StatsD/statsd-exporter in StatsD format via UDP are supported ([see exporter docs](https://github.com/prometheus/statsd_exporter#with-statsd)). Prometheus and StatsD metric types are not fully compatible, so currenly all metrics become StatsD gauges, but `rate`, `increase`, `histogram_quantile` and other PromQL functions produce same results as if types never changed.
10+
For `pure Prometheus` setups, several options are supported:
11+
- to [StatsD](https://github.com/statsd/statsd) or [statsd-exporter](https://github.com/prometheus/statsd_exporter#with-statsd) in StatsD format via UDP. Prometheus and StatsD metric types are not fully compatible, so currenly all metrics become StatsD gauges, but `rate`, `increase`, `histogram_quantile` and other PromQL functions produce same results as if types never changed.
12+
- to [pushgateway](https://github.com/prometheus/pushgateway) or [prom-aggregation-gateway](https://github.com/weaveworks/prom-aggregation-gateway) in OpenMetrics format via HTTP. Please read corresponding docs about appropriate use cases and limitations.
1113

1214
Install it via pip:
1315

@@ -74,10 +76,12 @@ Best way to use them is via decorators / context managers. These clients are int
7476
``` python
7577
def influx_udp_async(host, port, period=15.0):
7678
def influx_udp_thread(host, port, period=15.0):
77-
def influx_http_async(url, period=15.0):
78-
def influx_http_thread(url, period=15.0):
7979
def statsd_udp_async(host, port, period=15.0):
8080
def statsd_udp_thread(host, port, period=15.0):
81+
def influx_http_async(url, verb="POST", period=15.0):
82+
def influx_http_thread(url, verb="POST", period=15.0):
83+
def openmetrics_http_async(url, verb="POST", period=15.0):
84+
def openmetrics_http_thread(url, verb="POST", period=15.0):
8185
```
8286

8387
Usage example:
@@ -108,6 +112,8 @@ async def main(urls):
108112
req_hist.labels(gethostname(url)).observe(response.elapsed)
109113
```
110114

115+
Please read about mandatory `job` tag within url while [using pushgateway](https://github.com/prometheus/pushgateway#url).
116+
111117

112118
### Streaming clients
113119

prometheus_push_client/formats/openmetrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ class OpenMetricsFormat(BaseFormat):
1717
})
1818

1919

20-
def format_header(self, metric):
21-
# as in generate_lastest implemented at:
20+
def format_header(self, metric): # pragma: no cover
21+
# as in generate_latest implemented at:
2222
# https://github.com/prometheus/client_python/blob/master/prometheus_client/exposition.py
2323

2424
mname, mtype = metric.name, metric.type

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
readme_lines = []
1313
with open('README.md') as fd:
1414
readme_lines = filter(None, fd.read().splitlines())
15-
readme_lines = list(readme_lines)[:10]
15+
readme_lines = list(readme_lines)[:12]
1616
readme_lines.append('Read more at [github page](%s).' % github_url)
1717
readme = '\n\n'.join(readme_lines)
1818

test/test_online/conftest.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,15 @@ class Cfg:
1414
statsd_api_port = os.getenv("STATSD_API_PORT", "9102")
1515
statsd_udp_port = os.getenv("STATSD_UDP_PORT", "9125")
1616

17+
pushgw_host = os.getenv("PUSHGW_HOST", "localhost")
18+
pushgw_port = os.getenv("PUSHGW_PORT", "9091")
19+
1720
vm_ping_url = f"http://{vm_host}:{vm_api_port}/health"
1821
vm_write_url = f"http://{vm_host}:{vm_api_port}/write"
1922
vm_export_url = f"http://{vm_host}:{vm_api_port}/api/v1/export"
2023

2124
statsd_url = f"http://{statsd_host}:{statsd_api_port}/metrics"
2225

26+
pushgw_url = f"http://{pushgw_host}:{pushgw_port}/metrics"
27+
2328
return Cfg()
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import asyncio
2+
from collections import namedtuple
3+
import time
4+
import pytest
5+
import requests
6+
7+
import re
8+
9+
import prometheus_push_client as ppc
10+
11+
from testutils import make_metric_fixture, collect_metrics
12+
13+
14+
NS = "test_pushgw"
15+
JOB = f"job/{NS}"
16+
17+
RX_JOB_TAG = re.compile(r"(job=\".*?\",?)")
18+
RX_INSTANCE_TAG = re.compile(r"(instance=\".*?\",?)")
19+
20+
21+
def get_metrics(cfg):
22+
response = requests.get(cfg.pushgw_url)
23+
content = response.text
24+
25+
assert response.status_code == 200, content
26+
assert len(content), content
27+
28+
return content
29+
30+
31+
def test_ping(cfg):
32+
assert get_metrics(cfg)
33+
34+
35+
36+
@pytest.fixture
37+
def summ1(request):
38+
return make_metric_fixture(request, ppc.Summary(
39+
name="summ1",
40+
namespace=NS,
41+
unit="seconds",
42+
labelnames=["l"],
43+
))
44+
45+
46+
@pytest.fixture
47+
def gauge1(request):
48+
return make_metric_fixture(request, ppc.Gauge(
49+
name="gauge1",
50+
namespace=NS,
51+
unit="meters",
52+
labelnames=["l"],
53+
))
54+
55+
56+
def test_pushgateway_http_sync(cfg, summ1, gauge1):
57+
period = 0.4
58+
sleeps = 2
59+
sleep_sec = 1.0
60+
61+
@ppc.openmetrics_http_thread(f"{cfg.pushgw_url}/{JOB}", period=period)
62+
def _test():
63+
for _ in range(sleeps):
64+
with gauge1.labels("L1").track_inprogress():
65+
with summ1.labels("L2").time():
66+
time.sleep(sleep_sec)
67+
return 1 / 0 # testing graceful stop
68+
69+
with pytest.raises(ZeroDivisionError):
70+
_test()
71+
72+
time.sleep(1.0) # let them sync
73+
74+
local_res = collect_metrics(summ1._name, gauge1._name)
75+
local_res_lines = sorted(local_res.split("\n"))
76+
77+
remote_all = get_metrics(cfg)
78+
remote_res = collect_metrics(summ1._name, gauge1._name, data=remote_all)
79+
80+
remote_res_lines = []
81+
for rem_line in sorted(remote_res.split("\n")):
82+
rem_line = RX_INSTANCE_TAG.sub("", rem_line)
83+
rem_line = RX_JOB_TAG.sub("", rem_line)
84+
remote_res_lines.append(rem_line)
85+
86+
assert len(local_res_lines) == len(remote_res_lines), remote_res
87+
for loc, rem in zip(local_res_lines, remote_res_lines):
88+
assert (
89+
loc == rem or
90+
loc.startswith(f"{rem}.") # pushgw-side float -> int
91+
)
92+
93+
94+
@pytest.mark.asyncio
95+
async def test_pushgateway_http_sync(cfg, summ1, gauge1):
96+
period = 0.4
97+
sleeps = 2
98+
sleep_sec = 1.0
99+
100+
async def _test():
101+
async with ppc.openmetrics_http_async(f"{cfg.pushgw_url}/{JOB}", period=period):
102+
for _ in range(sleeps):
103+
with gauge1.labels("L1").track_inprogress():
104+
with summ1.labels("L2").time():
105+
await asyncio.sleep(sleep_sec)
106+
return 1 / 0 # testing graceful stop
107+
108+
with pytest.raises(ZeroDivisionError):
109+
await _test()
110+
111+
await asyncio.sleep(1.0) # let them sync
112+
113+
local_res = collect_metrics(summ1._name, gauge1._name)
114+
local_res_lines = sorted(local_res.split("\n"))
115+
116+
remote_all = get_metrics(cfg)
117+
remote_res = collect_metrics(summ1._name, gauge1._name, data=remote_all)
118+
119+
remote_res_lines = []
120+
for rem_line in sorted(remote_res.split("\n")):
121+
rem_line = RX_INSTANCE_TAG.sub("", rem_line)
122+
rem_line = RX_JOB_TAG.sub("", rem_line)
123+
remote_res_lines.append(rem_line)
124+
125+
assert len(local_res_lines) == len(remote_res_lines), remote_res
126+
for loc, rem in zip(local_res_lines, remote_res_lines):
127+
assert (
128+
loc == rem or
129+
loc.startswith(f"{rem}.") # pushgw-side float -> int
130+
)

0 commit comments

Comments
 (0)