Skip to content

Commit c64b52f

Browse files
committed
removed service bus and modified other changes.
1 parent 3fe6fce commit c64b52f

File tree

4 files changed

+125
-151
lines changed

4 files changed

+125
-151
lines changed

EventHubs/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ azure-identity==1.15.0
44
azure-core==1.29.6
55
azure-mgmt-resource==23.0.1
66
azure-mgmt-eventhub==11.0.0
7-
azure-servicebus==0.21.1
7+
azure-eventhub==5.11.5
88
azure-mgmt-storage==21.1.0
99
zure-mgmt-loganalytics==12.0.0
Lines changed: 86 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
import os
22
import sys
33
import json
4-
import time
5-
from time import sleep
6-
from requests import Session
7-
from datetime import datetime
8-
from azure.servicebus import ServiceBusService
4+
from datetime import timedelta
95
from azure.mgmt.resource import ResourceManagementClient
106
from azure.mgmt.eventhub import EventHubManagementClient
7+
from azure.eventhub import EventHubProducerClient
8+
from azure.eventhub.exceptions import EventHubError
9+
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
10+
from azure.mgmt.loganalytics import LogAnalyticsManagementClient
1111

1212
sys.path.insert(0, '../../test_utils')
1313
from basetest import BaseTest
1414

15+
1516
class BaseEventHubTest(BaseTest):
1617

1718
def setUp(self):
@@ -41,69 +42,6 @@ def get_resource(self, restype):
4142
return item
4243
raise Exception("%s Resource Not Found" % (restype))
4344

44-
def get_row_count(self, query):
45-
rows = self.table_service.query_entities(
46-
self.log_table_name, filter="PartitionKey eq 'R2'", select='PartitionKey')
47-
48-
return len(rows.items)
49-
50-
def wait_for_table_results(self, query):
51-
max_retries = 50
52-
while(max_retries > 0 and (not (self.table_service.exists(
53-
self.log_table_name) and self.get_row_count(query) > 0))):
54-
print("waiting for logs creation...", max_retries)
55-
sleep(15)
56-
max_retries -= 1
57-
58-
def insert_mock_logs_in_EventHub(self, filename):
59-
print("Inserting fake logs in EventHub")
60-
namespace_name = self.get_resource_name(self.event_hub_namespace_prefix, "Microsoft.EventHub/namespaces")
61-
62-
defaultauthorule_name = "RootManageSharedAccessKey"
63-
64-
eventhub_client = EventHubManagementClient(self.credentials,
65-
self.subscription_id)
66-
67-
ehkeys = eventhub_client.namespaces.list_keys(
68-
self.RESOURCE_GROUP_NAME, namespace_name, defaultauthorule_name)
69-
70-
sbs = ServiceBusService(
71-
namespace_name,
72-
shared_access_key_name=defaultauthorule_name,
73-
shared_access_key_value=ehkeys.primary_key,
74-
request_session=Session()
75-
)
76-
mock_logs = json.load(open(filename))
77-
print("inserting %s" % (mock_logs))
78-
sbs.send_event(self.eventhub_name, json.dumps(mock_logs))
79-
80-
print("Event inserted")
81-
82-
def insert_mock_metrics_in_EventHub(self, filename):
83-
print("Inserting fake metrics in EventHub")
84-
85-
defaultauthorule_name = "RootManageSharedAccessKey"
86-
namespace_name = self.get_resource_name(self.event_hub_namespace_prefix, "Microsoft.EventHub/namespaces")
87-
eventhub_client = EventHubManagementClient(self.azure_credential, self.subscription_id)
88-
eventhub_keys = eventhub_client.namespaces.list_keys(self.RESOURCE_GROUP_NAME, namespace_name, defaultauthorule_name)
89-
90-
sbs = ServiceBusService(
91-
namespace_name,
92-
shared_access_key_name=defaultauthorule_name,
93-
shared_access_key_value=eventhub_keys.primary_key,
94-
request_session=Session()
95-
)
96-
97-
with open(filename, 'r') as template_file_fd:
98-
mock_logs = json.load(template_file_fd)
99-
mock_logs = json.dumps(mock_logs)
100-
mock_logs = mock_logs.replace("2018-03-07T14:23:51.991Z", datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
101-
mock_logs = mock_logs.replace("C088DC46", "%d-%s" % (1, str(int(time.time()))))
102-
103-
# print("inserting %s" % (mock_logs))
104-
sbs.send_event(self.eventhub_name, mock_logs)
105-
print("Event inserted")
106-
10745
def _parse_template(self):
10846
template_path = os.path.join(os.path.abspath('..'), 'src',
10947
self.template_name)
@@ -118,3 +56,83 @@ def _parse_template(self):
11856
template_data["parameters"]["location"]["defaultValue"] = self.resourcegroup_location
11957

12058
return template_data
59+
60+
def send_event_data_list(self, event_hub_namespace_prefix, event_hub_name, event_data_list):
61+
62+
defaultauthorule_name = "RootManageSharedAccessKey"
63+
namespace_name = self.get_resource_name(event_hub_namespace_prefix, "Microsoft.EventHub/namespaces")
64+
eventhub_client = EventHubManagementClient(self.azure_credential, self.subscription_id)
65+
eventhub_keys = eventhub_client.namespaces.list_keys(self.RESOURCE_GROUP_NAME, namespace_name, defaultauthorule_name)
66+
67+
producer = EventHubProducerClient.from_connection_string(
68+
conn_str=eventhub_keys.primary_connection_string,
69+
eventhub_name=event_hub_name
70+
)
71+
72+
with producer:
73+
try:
74+
producer.send_batch(event_data_list)
75+
except ValueError: # Size exceeds limit. This shouldn't happen if you make sure before hand.
76+
print("Size of the event data list exceeds the size limit of a single send")
77+
except EventHubError as eh_err:
78+
print("Sending error: ", eh_err)
79+
80+
print("Event inserted")
81+
82+
def fetchlogs(self, app_insights):
83+
result = []
84+
try:
85+
client = LogsQueryClient(self.azure_credential)
86+
query = f"app('{app_insights}').traces | where operation_Name == '{self.function_name}' | project operation_Id, timestamp, message, severityLevel"
87+
response = client.query_workspace(self.get_Workspace_Id(), query, timespan=timedelta(hours=1))
88+
89+
if response.status == LogsQueryStatus.FAILURE:
90+
raise Exception(f"LogsQueryError: {response.message}")
91+
elif response.status == LogsQueryStatus.PARTIAL:
92+
data = response.partial_data
93+
error = response.partial_error
94+
print("partial_error: ", error)
95+
elif response.status == LogsQueryStatus.SUCCESS:
96+
data = response.tables
97+
98+
for table in data:
99+
for row in table.rows:
100+
row_dict = {str(col): str(item) for col, item in zip(table.columns, row)}
101+
result.append(row_dict)
102+
except Exception as e:
103+
print("An unexpected error occurred during the test:")
104+
print("Exception", e)
105+
106+
return result
107+
108+
def get_resources(self, resource_group_name):
109+
return self.resource_client.resources.list_by_resource_group(resource_group_name)
110+
111+
def get_Workspace_Id(self):
112+
workspace = self.get_resource('microsoft.operationalinsights/workspaces')
113+
client = LogAnalyticsManagementClient(
114+
credential=self.azure_credential,
115+
subscription_id=self.subscription_id,
116+
)
117+
118+
response = client.workspaces.get(
119+
resource_group_name=self.RESOURCE_GROUP_NAME,
120+
workspace_name=workspace.name,
121+
)
122+
return response.customer_id
123+
124+
def filter_logs(self, logs, key, value):
125+
return value in [d.get(key) for d in logs]
126+
127+
def check_resource_count(self):
128+
resource_count = len(list(self.get_resources(self.RESOURCE_GROUP_NAME)))
129+
self.assertTrue(resource_count == self.expected_resource_count, f"resource count of resource group {self.RESOURCE_GROUP_NAME} differs from expected count : {resource_count}")
130+
131+
def check_success_log(self, logs):
132+
self.assertTrue(self.filter_logs(logs, 'message', self.successful_sent_message))
133+
134+
def check_error_log(self, logs):
135+
self.assertTrue(not self.filter_logs(logs, 'severityLevel', '3'))
136+
137+
def check_warning_log(self, logs):
138+
self.assertTrue(not self.filter_logs(logs, 'severityLevel', '2'))

EventHubs/tests/test_eventhub_logs.py

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
import unittest
2-
import datetime
2+
from datetime import datetime
3+
import time
4+
import json
35
from baseeventhubtest import BaseEventHubTest
6+
from azure.eventhub import EventData
47

58

69
class TestEventHubLogs(BaseEventHubTest):
710

811
def setUp(self):
912
super(TestEventHubLogs, self).setUp()
1013
self.RESOURCE_GROUP_NAME = "TestEventHubLogs-%s" % (
11-
datetime.datetime.now().strftime("%d-%m-%y-%H-%M-%S"))
12-
self.STORAGE_ACCOUNT_NAME = "sumoapplogs"
14+
datetime.now().strftime("%d-%m-%y-%H-%M-%S"))
1315
self.function_name_prefix = "EventHubs_Logs"
1416
self.template_name = 'azuredeploy_logs.json'
1517
self.event_hub_namespace_prefix = "SumoAzureLogsNamespace"
16-
self.log_table_name = "AzureWebJobsHostLogs%d%02d" % (
17-
datetime.datetime.now().year, datetime.datetime.now().month)
1818
self.eventhub_name = 'insights-operational-logs'
1919

2020
def test_pipeline(self):
@@ -24,24 +24,21 @@ def test_pipeline(self):
2424
self.assertTrue(self.resource_group_exists(self.RESOURCE_GROUP_NAME))
2525
self.table_service = self.get_table_service()
2626
self.insert_mock_logs_in_EventHub('activity_log_fixtures.json')
27-
self.check_error_logs()
28-
29-
def check_error_logs(self):
30-
print("sleeping 1min for function execution")
31-
query = "PartitionKey eq 'R2'"
32-
self.wait_for_table_results(query)
33-
self.assertTrue(self.get_row_count(query) > 0)
34-
35-
rows = self.table_service.query_entities(
36-
self.log_table_name, filter=query)
37-
38-
haserr = False
39-
for row in rows.items:
40-
print("LogRow: ", row["FunctionName"], row["HasError"])
41-
if row["FunctionName"].startswith(self.function_name_prefix) and row["HasError"]:
42-
haserr = True
43-
44-
self.assertTrue(not haserr)
27+
28+
def insert_mock_logs_in_EventHub(self, filename):
29+
print("Inserting fake logs in EventHub")
30+
31+
with open(filename, 'r') as template_file_fd:
32+
mock_logs = json.load(template_file_fd)
33+
mock_logs = json.dumps(mock_logs)
34+
mock_logs = mock_logs.replace("2018-03-07T14:23:51.991Z", datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
35+
mock_logs = mock_logs.replace("C088DC46", "%d-%s" % (1, str(int(time.time()))))
36+
37+
event_data_list = [EventData(mock_logs)]
38+
# print("inserting %s" % (mock_logs))
39+
self.send_event_data_list(self.event_hub_namespace_prefix, self.eventhub_name, event_data_list)
40+
41+
print("Event inserted")
4542

4643

4744
if __name__ == '__main__':
Lines changed: 18 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
import sys
21
import unittest
32
import time
4-
from io import StringIO
5-
from datetime import timedelta, datetime
3+
import json
4+
from datetime import datetime
65
from baseeventhubtest import BaseEventHubTest
7-
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
8-
from azure.mgmt.loganalytics import LogAnalyticsManagementClient
6+
from azure.eventhub import EventData
7+
98

109
class TestEventHubMetrics(BaseEventHubTest):
1110

@@ -16,75 +15,35 @@ def setUp(self):
1615
self.event_hub_namespace_prefix = "SMNamespace"
1716
self.eventhub_name = "insights-metrics-pt1m"
1817
self.function_name = "EventHubs_Metrics"
19-
self.resource_count = 8
18+
self.successful_sent_message = 'Sent all metric data to Sumo. Exit now.'
19+
self.expected_resource_count = 7
2020

2121
def test_pipeline(self):
2222
self.create_resource_group()
2323
self.deploy_template()
2424
self.assertTrue(self.resource_group_exists(self.RESOURCE_GROUP_NAME))
2525
self.insert_mock_metrics_in_EventHub('metrics_fixtures.json')
2626
time.sleep(300) # Due to latency, logs are available after few mins.
27-
self.get_resource_count()
27+
self.check_resource_count()
2828
app_insights = self.get_resource('Microsoft.Insights/components')
2929
captured_output = self.fetchlogs(app_insights.name)
3030
self.check_success_log(captured_output)
3131
self.check_error_log(captured_output)
3232
self.check_warning_log(captured_output)
3333

34-
def fetchlogs(self, app_insights):
35-
result = []
36-
try:
37-
client = LogsQueryClient(self.azure_credential)
38-
query = f"app('{app_insights}').traces | where operation_Name == '{self.function_name}' | project operation_Id, timestamp, message, severityLevel"
39-
response = client.query_workspace(self.get_Workspace_Id(), query, timespan=timedelta(hours=1))
40-
41-
if response.status == LogsQueryStatus.PARTIAL:
42-
data = response.partial_data
43-
elif response.status == LogsQueryStatus.SUCCESS:
44-
data = response.tables
45-
46-
for table in data:
47-
for row in table.rows:
48-
row_dict = {str(col): str(item) for col, item in zip(table.columns, row)}
49-
result.append(row_dict)
50-
except Exception as e:
51-
print("An unexpected error occurred during the test:")
52-
print("Exception", e)
53-
54-
return result
55-
56-
def get_resource_count(self):
57-
return len(list(self.resource_client.resources.list_by_resource_group(self.RESOURCE_GROUP_NAME)))
58-
59-
def filter_logs(self, logs, key, value):
60-
return value in [d.get(key) for d in logs]
61-
62-
def check_resource_count(self):
63-
self.assertTrue(self.get_resource_count() == self.resource_count)
64-
65-
def check_success_log(self, logs):
66-
successful_sent_message = 'Sent all metric data to Sumo. Exit now.'
67-
self.assertTrue(self.filter_logs(logs, 'message', successful_sent_message))
68-
69-
def check_error_log(self, logs):
70-
self.assertTrue(not self.filter_logs(logs, 'severityLevel', '3'))
71-
72-
def check_warning_log(self, logs):
73-
self.assertTrue(not self.filter_logs(logs, 'severityLevel', '2'))
34+
def insert_mock_metrics_in_EventHub(self, filename):
35+
print("Inserting fake metrics in EventHub")
36+
37+
with open(filename, 'r') as template_file_fd:
38+
mock_logs = json.load(template_file_fd)
39+
mock_logs = json.dumps(mock_logs)
40+
mock_logs = mock_logs.replace("2018-03-07T14:23:51.991Z", datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"))
41+
mock_logs = mock_logs.replace("C088DC46", "%d-%s" % (1, str(int(time.time()))))
7442

75-
def get_Workspace_Id(self):
76-
workspace = self.get_resource('microsoft.operationalinsights/workspaces')
77-
client = LogAnalyticsManagementClient(
78-
credential=self.azure_credential,
79-
subscription_id=self.subscription_id,
80-
)
43+
event_data_list = [EventData(mock_logs)]
44+
# print("inserting %s" % (mock_logs))
45+
self.send_event_data_list(self.event_hub_namespace_prefix, self.eventhub_name, event_data_list)
8146

82-
response = client.workspaces.get(
83-
resource_group_name=self.RESOURCE_GROUP_NAME,
84-
workspace_name=workspace.name,
85-
)
86-
return response.customer_id
87-
8847

8948
if __name__ == '__main__':
9049
unittest.main()

0 commit comments

Comments
 (0)