Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 6 additions & 11 deletions aws/app.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
#!/usr/bin/env python3
import boto3
import aws_cdk as cdk
from constants import SSM_PARAMETER_PREFIX
from utils import get_parameter_from_ssm
from parameters import (
account_id,
cricsheet_data_downloading_bucket_name,
region,
stack_name,
)
from mens_t20i_dataset_stack import MenT20IDatasetStack


# Fetch values from SSM
ssm = boto3.client('ssm')
account_id = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}account_id")
cricsheet_data_downloading_bucket_name = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}cricsheet_data_downloading_bucket")
region = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}aws_region")
stack_name = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}stack_name")


app = cdk.App()
env = cdk.Environment(account=account_id, region=region)

Expand Down
11 changes: 11 additions & 0 deletions aws/mens_t20i_dataset_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import boto3
from constructs import Construct
from constants import AWS_SDK_PANDAS_LAYER_ARN, THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING
from parameters import TELEGRAM_BOT_TOKEN, TELEGRAM_CHAT_ID
from utils import get_secret_from_secrets_manager


Expand Down Expand Up @@ -90,6 +91,8 @@ def __init__(
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
"DYNAMODB_TABLE_NAME": dynamodb_to_store_file_status_data.table_name,
"THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING": THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING,
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
},
function_name="cricsheet-data-downloading-lambda",
layers=[
Expand Down Expand Up @@ -140,6 +143,8 @@ def __init__(
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
**__db_secrets,
"DYNAMODB_TABLE_NAME": dynamodb_to_store_file_status_data.table_name,
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
},
function_name="cricsheet-deliverywise-data-extraction-lambda",
layers=[
Expand Down Expand Up @@ -201,6 +206,8 @@ def __init__(
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
**__db_secrets,
"DYNAMODB_TABLE_NAME": dynamodb_to_store_file_status_data.table_name,
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
},
function_name="cricsheet-matchwise-data-extraction-lambda",
layers=[
Expand Down Expand Up @@ -261,6 +268,8 @@ def __init__(
environment={
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
**__db_secrets,
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
},
function_name="convert-mongo-data-to-csv-lambda",
layers=[
Expand Down Expand Up @@ -301,6 +310,8 @@ def __init__(
environment={
"DOWNLOAD_BUCKET_NAME": cricsheet_data_downloading_bucket.bucket_name,
**__kaggle_secrets,
"TELEGRAM_BOT_TOKEN": TELEGRAM_BOT_TOKEN,
"TELEGRAM_CHAT_ID": TELEGRAM_CHAT_ID,
},
function_name="upload-dataset-to-kaggle-lambda",
layers=[
Expand Down
13 changes: 13 additions & 0 deletions aws/parameters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import boto3
from constants import SSM_PARAMETER_PREFIX
from utils import get_parameter_from_ssm


# Fetch values from SSM
ssm = boto3.client('ssm')
account_id = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}account_id")
cricsheet_data_downloading_bucket_name = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}cricsheet_data_downloading_bucket")
region = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}aws_region")
stack_name = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}stack_name")
TELEGRAM_BOT_TOKEN = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = get_parameter_from_ssm(ssm, f"{SSM_PARAMETER_PREFIX}TELEGRAM_CHAT_ID")
14 changes: 14 additions & 0 deletions src/mens_t20i_data_collector/_lambdas/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,17 @@
"fielder_name"
]
MATCHWISE_DATA_CSV_FILE_NAME: str = "matchwise_data.csv"
TELEGRAM_MESSAGE_TEMPLATE: str = """
<b>🏏 T20I Data Extraction Pipeline Status - {}</b>


<b> Timestamp :</b> {}

<b> Lambda Name :</b> {}

<b> Execution Status :</b> {}

<b> Message :</b> {}

-- Automated Message from T20I Data Extraction Pipeline --
"""
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ def handler(_, __): # noqa: Vulture
"""
dataset_preparation_handler = DatasetPreparationHandler()
dataset_preparation_handler.prepare_dataset()
return "Datasets prepared and uploaded successfully."
return "Datasets prepared and uploaded to S3 successfully."
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CRICSHEET_DATA_S3_FOLDER_TO_STORE_PROCESSED_JSON_FILES_ZIP
)
from mens_t20i_data_collector._lambdas.utils import (
exception_handler,
get_environmental_variable_value
)

Expand Down Expand Up @@ -72,6 +73,9 @@ def upload_new_json_data_files_for_data_processing(self, downloaded_zip_file_pat
self._upload_new_json_files_to_s3(new_files=new_files)
if new_files:
self._trigger_an_sqs_message_whenever_new_file_is_downloaded(new_files=new_files)
return "Data file has been downloaded and placed successfully for processing"
logger.info("No new files to process")
return "No new files to process"

def _list_all_files_from_dynamo_db(self) -> Set:
response = self._dynamo_db_to_store_file_data_extraction_status.scan(ProjectionExpression="file_name")
Expand Down Expand Up @@ -115,21 +119,10 @@ def _upload_new_json_files_to_s3(self, new_files: List):
logger.info(f"File {file} uploaded to {key}")


def handler(_, __): # noqa: Vulture
try:
downloader = DownloadDataFromCricsheetHandler()
zip_file_path = downloader.download_data_from_cricsheet()
downloader.upload_new_json_data_files_for_data_processing(zip_file_path)

logging.shutdown()
return {
"statusCode": 200,
"body": "Data downloaded and placed successfully for processing"
}
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Handler execution failed: {e}")
logging.shutdown()
return {
"statusCode": 500,
"body": f"Internal Server Error: {str(e)}"
}
@exception_handler # noqa: Vulture
def handler(_, __):
downloader = DownloadDataFromCricsheetHandler()
zip_file_path = downloader.download_data_from_cricsheet()
output = downloader.upload_new_json_data_files_for_data_processing(zip_file_path)
logging.shutdown()
return output
52 changes: 52 additions & 0 deletions src/mens_t20i_data_collector/_lambdas/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import datetime
import functools
import logging
import os
from typing import Any
import requests
from botocore.exceptions import ClientError
from mens_t20i_data_collector._lambdas.constants import (
TELEGRAM_MESSAGE_TEMPLATE
)

# Set up logging
logger = logging.getLogger()
Expand All @@ -15,14 +20,42 @@ def exception_handler(function):
"""
@functools.wraps(function)
def wrapper(*args, **kwargs):
current_time = datetime.datetime.now()
function_name = "unknown_function"
for arg in args:
if hasattr(arg, "function_name"):
function_name = arg.function_name
break
try:
response_body = function(*args, **kwargs)
send_alert_via_telegram_bot(
chat_id=get_environmental_variable_value("TELEGRAM_CHAT_ID"),
message=TELEGRAM_MESSAGE_TEMPLATE.format(
current_time.strftime("%d-%m-%Y"),
current_time.strftime("%H:%M:%S"),
function_name,
"SUCCESS ✅",
response_body
),
telegram_bot_token=get_environmental_variable_value("TELEGRAM_BOT_TOKEN")
)
return {
"statusCode": 200,
"body": response_body
}
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(f"Error occurred: {str(e)}", exc_info=True)
send_alert_via_telegram_bot(
chat_id=get_environmental_variable_value("TELEGRAM_CHAT_ID"),
message=TELEGRAM_MESSAGE_TEMPLATE.format(
current_time.strftime("%d-%m-%Y"),
current_time.strftime("%H:%M:%S"),
function_name,
"ERROR ❌",
str(e)
),
telegram_bot_token=get_environmental_variable_value("TELEGRAM_BOT_TOKEN")
)
return {
"statusCode": 500,
"body": f"Internal Server Error: {str(e)}"
Expand Down Expand Up @@ -71,3 +104,22 @@ def wrapper(event, _):
return function(json_file_key, match_id)

return wrapper


def send_alert_via_telegram_bot(chat_id: str, message: str, telegram_bot_token: str, ) -> None:
"""
Sends the statsu of the function execution through an alert to a Telegram chat.

:param telegram_bot_token: Telegram bot token
:param chat_id: Chat ID
:param message: Message to send in HTML format
"""
url = f"https://api.telegram.org/bot{telegram_bot_token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": message,
"parse_mode": "HTML"
}
response = requests.post(url, json=payload, timeout=10)
if response.status_code != 200:
print(f"Failed to send message: {response.text}")