Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
AWS_SDK_PANDAS_LAYER_ARN: str = "arn:aws:lambda:ap-southeast-1:336392948345:layer:AWSSDKPandas-Python311:16"
SSM_PARAMETER_PREFIX: str = "/cdk/stack/mens-t20i-dataset/"
THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING: str = "50"
THRESHOLD_FOR_NUMBER_OF_FILES_TO_BE_SENT_FOR_PROCESSING: str = "10"
18 changes: 13 additions & 5 deletions aws/mens_t20i_dataset_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
RemovalPolicy,
aws_events as events,
aws_events_targets as events_targets,
aws_s3_notifications as s3_notifications,
)
import boto3
from constructs import Construct
Expand All @@ -20,11 +21,11 @@ class MenT20IDatasetStack(Stack):
def __init__(
self,
scope: Construct,
construct_id: str,
stack_name: str,
cricsheet_data_downloading_bucket_name: str,
**kwargs
) -> None:
super().__init__(scope, construct_id, **kwargs)
super().__init__(scope, stack_name, **kwargs)
self._secret_manager_client = boto3.client("secretsmanager")

# S3 bucket for downloading data from Cricsheet
Expand All @@ -38,8 +39,8 @@ def __init__(

######################################## DYNAMODB CONFIGURATIONS ################################################
dynamodb_to_store_file_status_data = dynamodb.Table(
self, "json_file_data_extraction_status_table",
table_name="json_file_data_extraction_status_table",
self, f"{stack_name}-cricsheet_json_file_data_extraction_status_table",
table_name=f"{stack_name}-cricsheet_json_file_data_extraction_status_table",
partition_key=dynamodb.Attribute(
name="file_name",
type=dynamodb.AttributeType.STRING
Expand All @@ -57,7 +58,7 @@ def __init__(
# Lambda layer containing the necessary code and packages
package_layer = _lambda.LayerVersion(
self,
"MensT20IDataCollectorLayer",
f"{stack_name}-MensT20IDataCollectorLayer",
code=_lambda.Code.from_asset("output/mens_t20i_data_collector.zip"),
compatible_runtimes=[_lambda.Runtime.PYTHON_3_11],
description="Layer containing the necessary code and packages for collecting men's T20I data",
Expand Down Expand Up @@ -270,12 +271,19 @@ def __init__(
function_name="upload-dataset-to-kaggle-lambda",
layers=[
package_layer,
pandas_layer,
],
memory_size=300,
timeout=Duration.minutes(10),
)
# Permissions for lambda functions to the S3 bucket
cricsheet_data_downloading_bucket.grant_read(upload_dataset_to_kaggle_lambda)
# S3 bucket notification for the upload_dataset_to_kaggle_lambda
cricsheet_data_downloading_bucket.add_event_notification(
s3.EventType.OBJECT_CREATED,
s3_notifications.LambdaDestination(upload_dataset_to_kaggle_lambda),
s3.NotificationKeyFilter(prefix="output/", suffix="deliverywise_data.csv"),
)
# Policy for CloudWatch logging
upload_dataset_to_kaggle_lambda.add_to_role_policy(
iam.PolicyStatement(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _seggregate_new_files_from_downloaded_zip(self) -> List:
if file.endswith(".json"):
if file not in processed_files:
new_files.append(file)
logger.info(f"Newly downloaded files: {new_files}")
logger.info(f"Total newly downloaded files: {len(new_files)}")
return new_files

def _upload_new_json_files_to_s3_and_send_sns_notification(self, new_files: List):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ def _get_match_data_of_given_match_id_and_store_in_dynamodb(self, json_data: Dic
"team_2": teams[1] if len(teams) > 1 else None,
"toss_winner": info.get('toss', {}).get('winner'),
"toss_decision": info.get('toss', {}).get('decision'),
"team_1_total_runs": self._get_total_runs_scored_by_given_team(json_data, teams[0]),
"team_2_total_runs": self._get_total_runs_scored_by_given_team(json_data, teams[1]) if len(teams) > 1 else None,
"winner": info.get('outcome', {}).get('winner') or info.get('outcome', {}).get('result'),
"margin_runs": info.get('outcome', {}).get('by', {}).get('runs'),
"margin_wickets": info.get('outcome', {}).get('by', {}).get('wickets'),
"winning_method": info.get('outcome', {}).get('method'),
"player_of_the_match": info.get('player_of_match', [None])[0]
}
self._store_dataframe_in_mongodb(match_data)
Expand All @@ -79,12 +82,27 @@ def _get_match_data_of_given_match_id_and_store_in_dynamodb(self, json_data: Dic
status=True
)

def _get_total_runs_scored_by_given_team(self, json_data: Dict, team_name: str) -> int:
"""
Calculates the total runs scored by a given team.
:param json_data: The JSON data containing match information
:param team_name: The name of the team
:return: The total runs scored by the team
"""
total_runs = 0
for inning in json_data.get('innings', []):
if inning.get('team') == team_name:
for over in inning.get('overs', []):
for delivery in over.get('deliveries', []):
total_runs += int(delivery.get('runs', {}).get('total', 0))
return total_runs

def _store_dataframe_in_mongodb(self, match_data: Dict) -> None:
"""
Stores the match dataframe in MongoDB.
"""
match_data['_id'] = match_data['index']
logger.info(f"Inserting match data for match {match_data['index']} in MongoDB...")
match_data['_id'] = match_data['match_id']
logger.info(f"Inserting match data for match {match_data['match_id']} in MongoDB...")
try:
self._matchwise_data_mongo_collection.insert_one(match_data)
logger.info("Data stored in MongoDB successfully")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import tempfile
import boto3
import pandas as pd
from mens_t20i_data_collector._lambdas.constants import (
CRICSHEET_DATA_S3_OUTPUT_FOLDER,
DELIVERYWISE_DATA_CSV_FILE_NAME,
Expand Down Expand Up @@ -53,10 +54,14 @@ def _authenticate_to_kaggle_and_upload_dataset(self):
api.authenticate()
logger.info("Kaggle authentication successful")
logger.info("Uploading dataset to Kaggle...")
last_match_details = self._get_last_match_details()
team_1 = last_match_details["team_1"]
team_2 = last_match_details["team_2"]
date = last_match_details["date"]
api.dataset_create_version(
delete_old_versions=True,
folder=self._folder_to_keep_the_files_to_upload,
version_notes="Mens T20I Dataset",
version_notes=f"Dataset updated till the match between {team_1} and {team_2} on {date}",
)
logger.info("Dataset uploaded to Kaggle successfully")
except Exception as e:
Expand Down Expand Up @@ -108,6 +113,18 @@ def _download_dataset_files_from_s3(self):
)
logger.info("Dataset files downloaded from S3")

def _get_last_match_details(self):
"""
Gets the last match details from the matchwise data.
"""
logger.info("Getting last match details...")
matchwise_data = pd.read_csv(
os.path.join(self._folder_to_keep_the_files_to_upload, MATCHWISE_DATA_CSV_FILE_NAME)
)
last_match_details = matchwise_data.iloc[-1]
logger.info(f"Last match details: {last_match_details}")
return last_match_details.to_dict()


@exception_handler # noqa: Vulture
def handler(_, __):
Expand Down