diff --git a/.gitignore b/.gitignore index f60797b..d033567 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,12 @@ !jest.config.js *.d.ts node_modules +*.swp +package-lock.json +__pycache__ +.pytest_cache +.env +*.egg-info # CDK asset staging directory .cdk.staging diff --git a/DeployChemAxonCompRegEnv.sh b/DeployChemAxonCompRegEnv.sh new file mode 100644 index 0000000..b8da2a4 --- /dev/null +++ b/DeployChemAxonCompRegEnv.sh @@ -0,0 +1,68 @@ +#!/bin/bash +set -e -x + +# Browse through the required directory +cd chem-axon-setup + +# Read Arguments +export EnvironVarLower=$1 +export AwsProfile=$2 + +# Install packages +pip3 install -r requirements.txt + +## Initialise Variables +source configs/deploy_config.env ${EnvironVarLower} + +## Touch credentials file as cdk module has a dependency on it. +## Its empty in this case and is only required for cdk module to work. +touch ~/.aws/credentials + +# Function to build compound reg docker image and push it to AWS ECR Repo +build_and_push_comp_reg_image(){ + + # Build the docker image locally + docker build --rm -t ${AccountId}.dkr.ecr.${AwsRegion}.amazonaws.com/${WorkflowEcrRepository}:${WorkflowCompRegImage} \ + --build-arg="PYTHON_VERSION=3.7" \ + --build-arg="UBUNTU_VERSION=18.04" \ + --build-arg="ORACLE_VERSION=12.2.0.1.0" \ + --build-arg="ORACLE_ZIP_INTERNAL_FOLDER=instantclient_12_2" compound_reg_pipeline/ + + # Push the image on AWS ECR + docker push ${AccountId}.dkr.ecr.${AwsRegion}.amazonaws.com/${WorkflowEcrRepository}:${WorkflowCompRegImage} + +} + +# Check for profile and if not passed, use default +if [[ ${AwsProfile} == '' ]] +then + #Get Account ID + aws configure set region ${AwsRegion} + export AccountId=$(aws sts get-caller-identity --output text --query 'Account') + + #ECR Login for pushing Docker Image + $(aws ecr get-login --no-include-email --region ${AwsRegion}) + + cdk deploy gfb-datalake-batch-stack --require-approval never + cdk deploy gfb-datalake-batch-job-stack --require-approval never + build_and_push_comp_reg_image + cdk deploy gfb-datalake-secret-manager-stack --require-approval never + cdk deploy gfb-datalake-lambda-stack --require-approval never + cdk deploy gfb-datalake-glue-stack --require-approval never + +else + #Get Account ID + aws configure set region ${AwsRegion} --profile ${AwsProfile} + export AccountId=$(aws sts get-caller-identity --output text --query 'Account' --profile ${AwsProfile}) + + #ECR Login for pushing Docker Image + $(aws ecr get-login --no-include-email --region ${AwsRegion} --profile ${AwsProfile}) + + cdk deploy gfb-datalake-batch-stack --require-approval never --profile ${AwsProfile} + cdk deploy gfb-datalake-batch-job-stack --require-approval never --profile ${AwsProfile} + build_and_push_comp_reg_image + cdk deploy gfb-datalake-secret-manager-stack --require-approval never --profile ${AwsProfile} + cdk deploy gfb-datalake-lambda-stack --require-approval never --profile ${AwsProfile} + cdk deploy gfb-datalake-glue-stack --require-approval never --profile ${AwsProfile} + +fi \ No newline at end of file diff --git a/chem-axon-setup/README.md b/chem-axon-setup/README.md new file mode 100644 index 0000000..6c56861 --- /dev/null +++ b/chem-axon-setup/README.md @@ -0,0 +1,62 @@ +

ChemAxon Compound Registration DB to AWS Data Lake

+ +The folder/repo holds the codebase for creating AWS Infrastructure and ETL job for loading data from existing Compound +Registration Database to S3 in a scheduled way.
+ +### Pre-requisites + +- AWS RDS Instance which holds Compound Registration Data. + +- AWS Details like VPC, Subnet, AZ's, RDS Connection Details to be filled in configs/deploy_config.env file for each env. + +- Python and CDK installed. + +### Steps to be executed + +- Download the codebase locally. +- Ensure the AWS profiles are set for use. +- Fill the details in configs/deploy_config.env +- Start the deployment by running : + + ```bash + # env is the same variable used in configs/deploy_config.env in lower case. + # aws_profile is the profile to be used. If no profile is manually set, provide default. + sh DeployChemAxonCompRegEnv.sh + ``` + +### What will be setup? + +As soon as the deploy.sh is executed, it will gather the variables from configs/deploy_config.env as per Env passed and make those variables available on the command line. It will then start creating below objects as per order mentioned. + +- AWS Batch Infrastructure : It will create a Compute Environment and Job Queue along with the EC2 Security group and IAM roles and policies required. + +- AWS ECR and Batch Job : It will create ECR Repository and AWS Batch Job Definition. + +- Docker Image : It will create a Docker Image from compound_reg_pipeline folder as per Dockerfile. It is currently considering the Comp-Reg RDS to be Oracle and hence installing dependencies for the ETL code. The actual ETL job is comp_reg_data_load.py which will be invoked as per the required/mentioned frequency. + + This image is then pushed to ECR Repository. + +- AWS Secret Manager : A secret with Comp Reg RDS Credentials is created. This secret key is then used in the ETL. + +- AWS S3 Bucket and Lambda : A s3 bucket is created which will be used for data loading. Along with it a Lambda Function which will be used to trigger the ETL is also created. This lambda function details could be found from chem-axon-setup/lambdas/trigger_compound_reg_pipeline.py + + Currently it is using S3 event trigger but that can be changed to any other trigger of choice as well. + +- AWS S3 Bucket and Lambda : A s3 bucket is created which will be used for data loading. Along with it a Lambda Function which will be used to trigger the ETL is also created. This lambda function details could be found from chem-axon-setup/lambdas/trigger_compound_reg_pipeline.py + + Currently it is using S3 event trigger but that can be changed to any other trigger of choice as well. + +- AWS Glue : A glue database and table on top of the S3 bucket data is created for querying through Athena. + + +### ETL Process + +The script chem-axon-setup/compound_reg_pipeline/comp_reg_data_load.py when triggered follows the below steps : + +- Queries the latest data from Comp Reg DB and creates a pandas dataframe. +- Brings all the already loaded data from S3 and creates another pandas dataframe. +- Compares the 2 DF's using hash's and creates a new DF with only new or updated records. +- Loads this new DF into a new S3 partition with date in parquet format. + (The partition can be user specific and depends on the frequency of execution.) +- If no new data is detected, it will just exit. +- The logs are available in cloudwatch and can be found in AWS Batch -> Jobs dashboard -> Specific Job ID Details. diff --git a/chem-axon-setup/__init__.py b/chem-axon-setup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chem-axon-setup/app.py b/chem-axon-setup/app.py new file mode 100644 index 0000000..2fa1fcb --- /dev/null +++ b/chem-axon-setup/app.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 + +from aws_cdk import core +import os +from data_lake_setup.datalake_batch_stack import DatalakeBatchStack +from data_lake_setup.datalake_batch_job_def_stack import DatalakeBatchJobStack +from data_lake_setup.datalake_secret_manager_stack import DatalakeSecretManagerStack +from data_lake_setup.datalake_lambda_stack import DatalakeLambdaStack +from data_lake_setup.datalake_glue_stack import DatalakeGlueStack + +""" Define your account id to make import vpc work """ +env_cn = core.Environment(account=os.environ.get("AccountId"), region=os.environ.get("AwsRegion")) + +""" Initialising environment variables and creating a dictionary to pass""" +config_dict = {} +config_dict['env_var'] = os.environ.get("EnvironVarLower") +config_dict['vpc_id'] = os.environ.get("VpcId") +config_dict['SubnetIds'] = os.environ.get("SubnetIds") +config_dict['AvailabilityZones'] = os.environ.get("AvailabilityZones") +config_dict['workflow_ecr_repo'] = os.environ.get("WorkflowEcrRepository") +config_dict['datalake_bucket_name'] = "datalake-" + config_dict['env_var'].lower() +config_dict['datalake_db_name'] = "datalake_db" +config_dict['workflow_comp_reg_image_version'] = os.environ.get("WorkflowCompRegImage") +config_dict['comp_reg_secret_name'] = os.environ.get("CompRegSecretName") +config_dict['comp_reg_host_name'] = os.environ.get("CompRegHostName") +config_dict['comp_reg_port'] = os.environ.get("CompRegPort") +config_dict['comp_reg_db_name'] = os.environ.get("CompRegDBName") +config_dict['comp_reg_user_name'] = os.environ.get("CompRegUserName") +config_dict['comp_reg_password'] = os.environ.get("CompRegPassword") + +""" Sample config_dict would look like below : +config_dict = { + 'env_var': 'prod', + 'vpc_id': 'vpc-01234567', + 'SubnetIds': 'subnet-01234567,subnet-0123456789', + 'AvailabilityZones': 'us-east-1a,us-east-1b', + 'workflow_ecr_repo': 'datalake-repo', + 'datalake_bucket_name': 'datalake-prod', + 'datalake_db_name': 'datalake_db', + 'workflow_comp_reg_image_version': 'comp-reg-1.0', + 'comp_reg_secret_name': 'CompRegConn', + 'comp_reg_host_name': 'db_endpoint_host_name', + 'comp_reg_port': 'db_port', + 'comp_reg_db_name': 'db_name', + 'comp_reg_user_name': 'db_user', + 'comp_reg_password': 'db_pass' +} +""" + +""" Start execution of deployment """ +app = core.App() +DatalakeBatchStack(app, "datalake-batch-stack", config_dict, env=env_cn) +DatalakeBatchJobStack(app, "datalake-batch-job-stack", config_dict, env=env_cn) +DatalakeSecretManagerStack(app, "datalake-secret-manager-stack", config_dict, env=env_cn) +DatalakeLambdaStack(app, "datalake-lambda-stack", config_dict, env=env_cn) +DatalakeGlueStack(app, "datalake-glue-stack", config_dict, env=env_cn) + +app.synth() diff --git a/chem-axon-setup/cdk.json b/chem-axon-setup/cdk.json new file mode 100644 index 0000000..b4baa10 --- /dev/null +++ b/chem-axon-setup/cdk.json @@ -0,0 +1,3 @@ +{ + "app": "python3 app.py" +} diff --git a/chem-axon-setup/compound_reg_pipeline/Dockerfile b/chem-axon-setup/compound_reg_pipeline/Dockerfile new file mode 100644 index 0000000..fce399c --- /dev/null +++ b/chem-axon-setup/compound_reg_pipeline/Dockerfile @@ -0,0 +1,41 @@ +ARG PYTHON_VERSION +ARG UBUNTU_VERSION +FROM ubuntu:${UBUNTU_VERSION} AS client +ARG ORACLE_VERSION +ARG ORACLE_ZIP_INTERNAL_FOLDER +WORKDIR /root +ENV CLIENT_ZIP=instantclient-basiclite-linux.x64-${ORACLE_VERSION}.zip +ENV SDK_ZIP=instantclient-sdk-linux.x64-${ORACLE_VERSION}.zip + +RUN apt-get update && apt-get -yq install unzip +COPY ${CLIENT_ZIP} . +COPY ${SDK_ZIP} . +RUN unzip ${CLIENT_ZIP} +RUN unzip ${SDK_ZIP} +RUN mv ${ORACLE_ZIP_INTERNAL_FOLDER} oracle + +FROM python:${PYTHON_VERSION} +LABEL maintainer=spate@goldficnhbio.com +ARG ORACLE_VERSION +ENV HOME /root +ENV ORACLE_HOME /opt/oracle +ENV TNS_ADMIN ${ORACLE_HOME}/network/admin +VOLUME ["${TNS_ADMIN}"] + +COPY --from=client /root/oracle ${ORACLE_HOME} +RUN apt-get update \ + && apt-get -yq install libaio1 \ + && apt-get -yq install vim \ + && apt-get -yq autoremove \ + && apt-get clean \ + # Install Oracle Instant Client + && echo ${ORACLE_HOME} > /etc/ld.so.conf.d/oracle.conf \ + && mkdir -p ${TNS_ADMIN} \ + && ldconfig \ + && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + +RUN pip install numpy pandas boto3 s3fs fastparquet mypy_extensions psutil awscli toolz dask cx_Oracle + +RUN mkdir /scripts +WORKDIR /scripts +ADD comp_reg_data_load.py /scripts \ No newline at end of file diff --git a/chem-axon-setup/compound_reg_pipeline/comp_reg_data_load.py b/chem-axon-setup/compound_reg_pipeline/comp_reg_data_load.py new file mode 100644 index 0000000..e8c287c --- /dev/null +++ b/chem-axon-setup/compound_reg_pipeline/comp_reg_data_load.py @@ -0,0 +1,271 @@ +import cx_Oracle +import os +import pandas as pd +import boto3 +import base64 +from botocore.exceptions import ClientError +import ast +import numpy as np +import hashlib +import sys +import logging +from datetime import datetime +import socket + +""" +Create a logging function and initiate it. +""" +format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s" +logger = logging.getLogger('compound-registration-process') +handler = logging.StreamHandler() +logger.setLevel(logging.DEBUG) +formatter = logging.Formatter(format_string) +handler.setFormatter(formatter) +logger.addHandler(handler) + +def get_secret(secret_name, region_name): + + # In this sample we only handle the specific exceptions for the 'GetSecretValue' API. + # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html + # We rethrow the exception by default. + + try: + get_secret_value_response = client.get_secret_value( + SecretId=secret_name + ) + except ClientError as e: + if e.response['Error']['Code'] == 'DecryptionFailureException': + # Secrets Manager can't decrypt the protected secret text using the provided KMS key. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + elif e.response['Error']['Code'] == 'InternalServiceErrorException': + # An error occurred on the server side. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + elif e.response['Error']['Code'] == 'InvalidParameterException': + # You provided an invalid value for a parameter. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + elif e.response['Error']['Code'] == 'InvalidRequestException': + # You provided a parameter value that is not valid for the current state of the resource. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + elif e.response['Error']['Code'] == 'ResourceNotFoundException': + # We can't find the resource that you asked for. + # Deal with the exception here, and/or rethrow at your discretion. + raise e + else: + # Decrypts secret using the associated KMS CMK. + # Depending on whether the secret is a string or binary, one of these fields will be populated. + if 'SecretString' in get_secret_value_response: + secret = get_secret_value_response['SecretString'] + else: + decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary']) + + return secret + +def initialize_oracle_conn(): + """ + This function will return a connection object to be used for querying. + :return: oracle connection object + """ + # Get details from Secrets Manager + get_oracle_connection_details = ast.literal_eval(get_secret(secret_name, region_name)) + + # Parse secret manager keys into individual parameters for oracle connection + oracle_host = get_oracle_connection_details['db_host'] + oracle_port = get_oracle_connection_details['db_port'] + oracle_service_name = get_oracle_connection_details['db_service_name'] + oracle_user = get_oracle_connection_details['db_username'] + oracle_password = get_oracle_connection_details['db_password'] + + # Create DSN and connection object + dsn_tns = cx_Oracle.makedsn(oracle_host, oracle_port, service_name=oracle_service_name) + conn = cx_Oracle.connect(user=oracle_user, password=oracle_password, dsn=dsn_tns) + + return conn + +def create_already_loaded_data_df(): + """ + This function will parse the existing bucket and load all data one by one into a single dataframe + :return: Returns a dataframe + """ + # Read studies to be ignored + s3_bucket = s3.Bucket(compreg_bucket) + list_of_parquet_file_keys = [] + + for file in s3_bucket.objects.all(): + if ".parquet" in file.key: + list_of_parquet_file_keys.append(file.key) + + if len(list_of_parquet_file_keys) > 0: + df = pd.DataFrame() + for file in list_of_parquet_file_keys: + temp_df = pd.read_parquet("s3://" + compreg_bucket + "/" + file) + df = pd.concat([df, temp_df]) + + df = df.drop_duplicates() + + else: + df = pd.DataFrame(columns=['CHECKSUM']) + + return df + +def get_latest_data_from_oracle(): + """ + This function will invoke oracle connection and run query and bring the data into a pandas dataframe. + :return: Dataframe with latest oracle data. + """ + oracle_connection = initialize_oracle_conn() + + query_string = "SELECT a.lot_compound_id, \ + a.version_id, \ + a.parent_id, \ + a.structure_id, \ + a.smiles, \ + a.parent_mw, \ + a.salt_multiplicity, \ + a.salt_name, \ + a.formula_weight, \ + a.parent_alias, \ + a.stereochemistry, \ + a.stereocomment, \ + a.geometric_isomerism, \ + a.parent_comment, \ + a.parent_project, \ + a.elnRef, \ + a.msmethod, \ + a.msmass, \ + a.provider, \ + a.purity, \ + a.puritymethod, \ + a.nmrshifts, \ + a.lotalias, \ + a.lot_comment, \ + a.lot_project, \ + b.molfile \ + FROM (SELECT il.id_value AS lot_compound_id, \ + iv.ID_VALUE AS version_id, \ + ip.ID_VALUE AS parent_id, \ + max(s.cd_id) AS structure_id, \ + max(s.CD_SMILES) AS smiles, \ + max(s.CD_MOLWEIGHT) AS parent_mw, \ + vss.MULTIPLICITY AS salt_multiplicity, \ + ss.NAME AS salt_name, \ + max((s.cd_molweight + (vss.MULTIPLICITY*st.CD_MOLWEIGHT))) AS formula_weight, \ + max(decode(adp.AD_NAME,'parentalias', adp.AD_VALUE)) AS parent_alias, \ + max(decode(adp.AD_NAME,'Stereochemistry', adp.AD_VALUE)) AS stereochemistry, \ + max(decode(adp.AD_NAME,'stereocomment', adp.AD_VALUE)) AS stereocomment, \ + max(decode(adp.AD_NAME,'Geometric isomerism', adp.AD_VALUE)) AS geometric_isomerism, \ + max(decode(adp.AD_NAME,'comment', adp.AD_VALUE)) AS parent_comment, \ + max(decode(adp.AD_NAME,'projects', adp.AD_VALUE)) AS parent_project, \ + max(decode(adl.AD_NAME,'elnRef',adl.AD_VALUE)) AS elnRef, \ + max(decode(adl.AD_NAME,'msmethod',adl.AD_VALUE)) AS msmethod, \ + max(decode(adl.AD_NAME,'msmass',adl.AD_VALUE)) AS msmass, \ + max(decode(adl.AD_NAME,'provider',adl.AD_VALUE)) AS provider, \ + max(decode(adl.AD_NAME,'purity',adl.AD_VALUE)) AS purity, \ + max(decode(adl.AD_NAME,'puritymethod',adl.AD_VALUE)) AS puritymethod, \ + max(decode(adl.AD_NAME,'nmrshifts',adl.AD_VALUE)) AS nmrshifts, \ + max(decode(adl.AD_NAME,'lotalias',adl.AD_VALUE)) AS lotalias, \ + max(decode(adl.AD_NAME,'comment',adl.AD_VALUE)) AS lot_comment, \ + max(decode(adl.AD_NAME,'projects',adl.AD_VALUE)) AS lot_project \ + FROM IDENTIFIER il \ + JOIN COMPOUND cl ON il.COMPOUND_ID = cl.COMPOUND_ID \ + JOIN COMPOUND cve ON cl.ANCESTOR = cve.COMPOUND_ID \ + JOIN COMPOUND cp ON cve.ANCESTOR = cp.COMPOUND_ID \ + JOIN IDENTIFIER ip ON ip.COMPOUND_ID = cp.COMPOUND_ID \ + JOIN IDENTIFIER iv ON cve.COMPOUND_ID = iv.COMPOUND_ID \ + JOIN molecule m ON cp.COMPOUND_ID = m.COMPOUND_ID \ + JOIN molecule mv ON cve.COMPOUND_ID = mv.COMPOUND_ID \ + JOIN \"STRUCTURE\" s ON m.STRUCTURE_ID = s.cd_id \ + LEFT OUTER JOIN VERSION_SALTSOLVATE vss ON vss.VERSION_ID = cve.COMPOUND_ID \ + LEFT OUTER JOIN SALTSOLVATE ss ON vss.SALTSOLVATE_ID = ss.SALTSOLVATE_ID \ + LEFT OUTER JOIN STRUCTURE st ON ss.STRUCTURE_ID = st.cd_id \ + LEFT OUTER JOIN ADDITIONAL_DATA adp ON adp.COMPOUND_ID = cp.COMPOUND_ID \ + LEFT OUTER JOIN ADDITIONAL_DATA adl ON adl.COMPOUND_ID = cl.COMPOUND_ID \ + WHERE il.TYPE_ID = 3 \ + GROUP BY il.id_value, iv.id_value, ip.id_value, vss.MULTIPLICITY, ss.NAME) a, \ + (SELECT s.cd_id, s.cd_structure AS molfile FROM STRUCTURE s) b \ + WHERE a.structure_id = b.cd_id \ + order by a.lot_compound_id" + + df = pd.read_sql_query(query_string, con=oracle_connection) + df = df.replace(np.nan, '', regex=True) + + # Create Checksum/MD5Sum value for getting duplicates. + df['CHECKSUM'] = df.astype(str).apply(''.join, axis=1) + df['CHECKSUM'] = df.apply(lambda x: hashlib.md5(x.CHECKSUM.encode('utf-8')).hexdigest(), axis=1) + + df = df.astype(str) + oracle_connection.close() + + return df + +def update_new_partitions(tablename): + """ + This function is used to run msck repair table on athena table passed as input. + :return: It returns the response from athena on query execution. + """ + athena_client = boto3.client('athena', region_name=region_name) + athena_location = "s3://" + compreg_bucket + "/athena/" + athena_db_name = 'comp_reg_data_db' + response = athena_client.start_query_execution(QueryString='MSCK REPAIR TABLE ' + athena_db_name + '.' + tablename, + QueryExecutionContext={'Database': athena_db_name}, + ResultConfiguration={ 'OutputLocation': athena_location } + ) + + return(response) + +""" +MAIN PROGRAM START +""" +if __name__ == "__main__": + + """ + Initializing Variables + """ + todays_date = datetime.today().strftime('%Y-%m-%d') + region_name = sys.argv[1] + secret_name = os.environ.get('COMPREG_ORACLE_SECRET_NAME') + compreg_bucket = os.environ.get('COMPREG_BUCKET') + + # Updating /etc/host file to avoid error : cx_Oracle.DatabaseError: ORA-24454: client host name is not set + hostname = socket.gethostname() + command_exec = "echo \"127.0.0.1 localhost " + hostname + "\" > /etc/hosts" + os.system(command_exec) + logger.info("The /etc/hosts file is modified for oracle connection.") + + # Create a Secrets Manager client + session = boto3.session.Session() + client = session.client(service_name='secretsmanager', region_name=region_name ) + + # Create s3 resource for putting json files + s3 = boto3.resource(service_name='s3', region_name=region_name ) + + # Get the historic data already loaded into pandas DF + history_df = create_already_loaded_data_df() + logger.info("history_df created successfully from : s3://" + compreg_bucket + "/compound_reg/compound_data/ and has : " + \ + str(len(history_df)) + " records.") + + # Get latest oracle data into pandas DF + latest_df = get_latest_data_from_oracle() + logger.info("latest_df created successfully from Oracle database and has : " + str(len(latest_df)) + " records.") + + # Get only changed records + new_df = latest_df[(~latest_df.CHECKSUM.isin(history_df.CHECKSUM))] + logger.info("new_df with new/updated records created successfully and has : " + str(len(new_df)) + " records.") + + # Write only new/changed records to S3 + if len(new_df) > 0: + new_df.to_parquet("s3://" + compreg_bucket + "/compound_reg/compound_data/dt=" + str(todays_date) + "/data.parquet", + compression='GZIP', index=False) + logger.info("New/Updated records successfully written to s3://" + compreg_bucket + "/compound_reg/compound_data/dt=" + + str(todays_date) + "/data.parquet") + else: + logger.info("No New/Updated records available to be written to s3") + + """ Update the partitions on the athena tables """ + compound_data_athena = update_new_partitions('compound_data') + logger.info("Partitions added or removed for Athena Table successfully.") + + logger.info('The script will end now.') \ No newline at end of file diff --git a/chem-axon-setup/compound_reg_pipeline/instantclient-basiclite-linux.x64-12.2.0.1.0.zip b/chem-axon-setup/compound_reg_pipeline/instantclient-basiclite-linux.x64-12.2.0.1.0.zip new file mode 100644 index 0000000..e090e5a Binary files /dev/null and b/chem-axon-setup/compound_reg_pipeline/instantclient-basiclite-linux.x64-12.2.0.1.0.zip differ diff --git a/chem-axon-setup/compound_reg_pipeline/instantclient-sdk-linux.x64-12.2.0.1.0.zip b/chem-axon-setup/compound_reg_pipeline/instantclient-sdk-linux.x64-12.2.0.1.0.zip new file mode 100644 index 0000000..243ccda Binary files /dev/null and b/chem-axon-setup/compound_reg_pipeline/instantclient-sdk-linux.x64-12.2.0.1.0.zip differ diff --git a/chem-axon-setup/configs/deploy_config.env b/chem-axon-setup/configs/deploy_config.env new file mode 100644 index 0000000..91e3ef4 --- /dev/null +++ b/chem-axon-setup/configs/deploy_config.env @@ -0,0 +1,52 @@ +#!/usr/bin/env bash + +export ENVIRONMENT_upper=`echo $1 | tr '[:lower:]' '[:upper:]'` + +case "${ENVIRONMENT_upper}" in + +SAMPLE) + export VpcId="vpc-01234567" + export SubnetIds="subnet-01234567,subnet-0123456789" + export AwsRegion="us-east-1" + export AvailabilityZones="us-east-1a,us-east-1b" + export WorkflowEcrRepository="datalake-repo" + export WorkflowCompRegImage="comp-reg-1.0" + export CompRegSecretName="CompRegConn" + export CompRegHostName="db_endpoint_host_name.amazonaws.com" + export CompRegPort="1521" + export CompRegDBName="db_name" + export CompRegUserName="db_user" + export CompRegPassword="db_pass" + ;; + +DEV) + export VpcId="Enter the VPC Id" + export SubnetIds="Enter subnet ids comma separated" + export AwsRegion="Enter the AWS Region in which deployment is required." + export AvailabilityZones="Enter comma separated availability zones" + export WorkflowEcrRepository="Enter the Repo Name of ECR" + export WorkflowCompRegImage="Enter the version name of the ECR Image" + export CompRegSecretName="Enter Secret Name to be created and used in code" + export CompRegHostName="Enter the RDBMS Host Name which will host comp reg data" + export CompRegPort="Enter the RDBMS Port which will host comp reg data " + export CompRegDBName="Enter the RDBMS Database Name which will host comp reg data " + export CompRegUserName="Enter the RDBMS User Name which will host comp reg data " + export CompRegPassword="Enter the RDBMS Password Name which will host comp reg data " + ;; + +PROD) + export VpcId="Enter the VPC Id" + export SubnetIds="Enter subnet ids comma separated" + export AwsRegion="Enter the AWS Region in which deployment is required." + export AvailabilityZones="Enter comma separated availability zones" + export WorkflowEcrRepository="Enter the Repo Name of ECR" + export WorkflowCompRegImage="Enter the version name of the ECR Image" + export CompRegSecretName="Enter Secret Name to be created and used in code" + export CompRegHostName="Enter the RDBMS Host Name which will host comp reg data" + export CompRegPort="Enter the RDBMS Port which will host comp reg data " + export CompRegDBName="Enter the RDBMS Database Name which will host comp reg data " + export CompRegUserName="Enter the RDBMS User Name which will host comp reg data " + export CompRegPassword="Enter the RDBMS Password Name which will host comp reg data " + ;; + +esac diff --git a/chem-axon-setup/data_lake_setup/__init__.py b/chem-axon-setup/data_lake_setup/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/chem-axon-setup/data_lake_setup/datalake_batch_job_def_stack.py b/chem-axon-setup/data_lake_setup/datalake_batch_job_def_stack.py new file mode 100644 index 0000000..1fb279f --- /dev/null +++ b/chem-axon-setup/data_lake_setup/datalake_batch_job_def_stack.py @@ -0,0 +1,45 @@ +from aws_cdk import ( + core, + aws_batch as batch, + aws_ecs as ecs, + aws_ecr as ecr +) + +""" This module uses below parameters from config_dict passed to it : +config_dict = { + 'workflow_ecr_repo': 'datalake-repo', + 'datalake_bucket_name': 'datalake-prod', + 'workflow_comp_reg_image_version': 'comp-reg-1.0', + 'comp_reg_secret_name': 'CompRegConn' +} + +""" +class DatalakeBatchJobStack(core.Stack): + + def __init__(self, scope: core.Construct, id: str, config_dict, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + """ get Comp Reg ECR Image details """ + comp_reg_image_id = ecs.ContainerImage.from_ecr_repository( + repository=ecr.Repository.from_repository_name(self, "GetCompRegRepoName", + repository_name=config_dict['workflow_ecr_repo'] + ), + tag=config_dict['workflow_comp_reg_image_version'] + ) + + """ Create Comp Reg Batch Job Definition """ + createCompRegJob = batch.JobDefinition(self, "createCompRegJob", + job_definition_name="comp-reg-etl-job", + retry_attempts=2, + container=batch.JobDefinitionContainer( + image=comp_reg_image_id, + memory_limit_mib=4000, + vcpus=1, + environment=dict( + COMPREG_ORACLE_SECRET_NAME=config_dict['comp_reg_secret_name'], + COMPREG_BUCKET=config_dict['datalake_bucket_name'] + ) + ) + ) + + core.CfnOutput(self, "createCompRegJobName", value=createCompRegJob.job_definition_name) \ No newline at end of file diff --git a/chem-axon-setup/data_lake_setup/datalake_batch_stack.py b/chem-axon-setup/data_lake_setup/datalake_batch_stack.py new file mode 100644 index 0000000..87e294b --- /dev/null +++ b/chem-axon-setup/data_lake_setup/datalake_batch_stack.py @@ -0,0 +1,156 @@ +from aws_cdk import ( + core, + aws_batch as batch, + aws_ec2 as ec2, + aws_iam as iam, + aws_ecr as ecr +) + +""" This module uses below parameters from config_dict passed to it : +config_dict = { + 'vpc_id': 'vpc-01234567', + 'SubnetIds': 'subnet-01234567,subnet-0123456789', + 'AvailabilityZones': 'us-east-1a,us-east-1b', + 'workflow_ecr_repo': 'datalake-repo' +} +""" + +class DatalakeBatchStack(core.Stack): + + def __init__(self, scope: core.Construct, id: str, config_dict, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + """ Get VPC details """ + vpc = ec2.Vpc.from_lookup(self, "VPC", vpc_id=config_dict['vpc_id']) + + """ Create Security Group for Batch Env """ + batch_security_group = "datalake-batch-security-group" + + createBatchSecurityGroup = ec2.SecurityGroup(self, "createBatchSecurityGroup", + vpc=vpc, + allow_all_outbound=True, + description="This security group will be used for AWS Batch Compute Env", + security_group_name=batch_security_group) + + createBatchSecurityGroup.add_ingress_rule( + peer=ec2.Peer.ipv4("0.0.0.0/0"), + connection=ec2.Port(protocol=ec2.Protocol.TCP, + string_representation="ingress_rule", + from_port=22, + to_port=22) + ) + + createBatchSecurityGroup.add_egress_rule( + peer=ec2.Peer.ipv4("0.0.0.0/0"), + connection=ec2.Port(protocol=ec2.Protocol.TCP, + string_representation="egress_rule", + from_port=-1, + to_port=-1) + ) + + core.CfnOutput(self, "createBatchSecurityGroupId", value=createBatchSecurityGroup.security_group_id) + + """ Create IAM Role for ecsInstance """ + createECSInstanceRole = iam.Role(self, "createECSInstanceRole", + assumed_by=iam.ServicePrincipal("ec2.amazonaws.com"), + description="This instance role will be used by the ECS cluster instances", + managed_policies=[ + iam.ManagedPolicy.from_aws_managed_policy_name("AmazonEC2FullAccess"), + iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3FullAccess"), + iam.ManagedPolicy.from_aws_managed_policy_name("AWSBatchFullAccess"), + iam.ManagedPolicy.from_aws_managed_policy_name("SecretsManagerReadWrite"), + iam.ManagedPolicy.from_aws_managed_policy_name("AmazonAthenaFullAccess"), + iam.ManagedPolicy.from_aws_managed_policy_name("service-role/" + "AmazonEC2ContainerServiceforEC2Role"), + iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSBatchServiceRole") + ], + role_name="datalake-ecsinstance-role" + ) + + createInstanceProfile = iam.CfnInstanceProfile(self, "createInstanceProfile", + roles=[createECSInstanceRole.role_name], + instance_profile_name="datalake-ecsinstance-role" + ) + + useECSInstanceProfile = createInstanceProfile.instance_profile_name + + core.CfnOutput(self, "createECSInstanceRoleName", value=createECSInstanceRole.role_name) + + """ Create Spot Fleet Role """ + createSpotFleetRole = iam.Role(self, 'createSpotFleetRole', + assumed_by=iam.ServicePrincipal("spotfleet.amazonaws.com"), + managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name( + "service-role/AmazonEC2SpotFleetTaggingRole")] + ) + + core.CfnOutput(self, "createSpotFleetRoleName", value=createSpotFleetRole.role_name) + + useSpotFleetRole = createSpotFleetRole.without_policy_updates() + + """ Create Batch Service Role """ + createBatchServiceRole = iam.Role(self, 'createBatchServiceRole', + assumed_by=iam.ServicePrincipal("batch.amazonaws.com"), + managed_policies=[iam.ManagedPolicy.from_aws_managed_policy_name( + "service-role/AWSBatchServiceRole")] + ) + + core.CfnOutput(self, "createBatchServiceRoleName", value=createBatchServiceRole.role_name) + + useBatchServiceRole = createBatchServiceRole.without_policy_updates() + + + """ Create Compute Environment """ + + subnet_1 = ec2.Subnet.from_subnet_attributes(self, "subnet_1", + subnet_id=config_dict['SubnetIds'].split(",")[0], + availability_zone=config_dict['AvailabilityZones'].split(",")[0]) + subnet_2 = ec2.Subnet.from_subnet_attributes(self, "subnet_2", + subnet_id=config_dict['SubnetIds'].split(",")[1], + availability_zone=config_dict['AvailabilityZones'].split(",")[1]) + + + + createBatchComputeEnv = batch.ComputeEnvironment(self, "createBatchComputeEnv", + compute_environment_name="datalake-compute-env", + service_role=useBatchServiceRole, + compute_resources=batch.ComputeResources( + vpc=vpc, + type=batch.ComputeResourceType.SPOT, + bid_percentage=60, + desiredv_cpus=0, + maxv_cpus=100, + minv_cpus=0, + security_groups=[createBatchSecurityGroup], + vpc_subnets=ec2.SubnetSelection(subnets=[subnet_1, subnet_2]), + instance_role=useECSInstanceProfile, + spot_fleet_role=useSpotFleetRole, + compute_resources_tags=core.Tag.add(self, + 'Name', + 'Datalake Pipeline Instance' + ) + ) + ) + + core.CfnOutput(self, "createBatchComputeEnvName", value=createBatchComputeEnv.compute_environment_name) + + getIComputeEnvObject = batch.ComputeEnvironment.from_compute_environment_arn( + self, "getComputeEnvAtrributes", + compute_environment_arn=createBatchComputeEnv.compute_environment_arn + ) + + """ Create Batch Job Queue """ + createBatchJobQueue = batch.JobQueue(self, "createBatchJobQueue", + compute_environments=[batch.JobQueueComputeEnvironment( + compute_environment=getIComputeEnvObject, + order=1 + )], + enabled=True, + job_queue_name="datalake-job-queue", + priority=1) + + core.CfnOutput(self, "createBatchJobQueueName", value=createBatchJobQueue.job_queue_name) + + """ Create ECR Repo for datalake images """ + createECRRepo = ecr.Repository(self, "createECRRepo", repository_name=config_dict['workflow_ecr_repo']) + + core.CfnOutput(self, "createECRRepoName", value=createECRRepo.repository_name) \ No newline at end of file diff --git a/chem-axon-setup/data_lake_setup/datalake_glue_stack.py b/chem-axon-setup/data_lake_setup/datalake_glue_stack.py new file mode 100644 index 0000000..bf85b7a --- /dev/null +++ b/chem-axon-setup/data_lake_setup/datalake_glue_stack.py @@ -0,0 +1,71 @@ +from aws_cdk import ( + core, + aws_s3 as s3, + aws_glue as glue +) + +""" This module uses below parameters from config_dict passed to it : +config_dict = { + 'datalake_bucket_name': 'datalake-prod', + 'datalake_db_name': 'datalake_db' +} +""" +class DatalakeGlueStack(core.Stack): + + def __init__(self, scope: core.Construct, id: str, config_dict, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + """ Create the datalake database """ + createDatalakeDB = glue.Database(self, "createDatalakeDB", + database_name=config_dict['datalake_db_name']) + + core.CfnOutput(self, "createDatalakeDBName", value=createDatalakeDB.database_name) + + + """ Create Comp Reg Table """ + + createDatalakeCompRegTable = glue.Table(self, "createDatalakeCompRegTable", + columns=[ + glue.Column(name="lot_compound_id", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="version_id", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="parent_id", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="smiles", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="parent_mw", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="salt_multiplicity", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="salt_name", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="formula_weight", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="parent_alias", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="stereochemistry", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="stereocomment", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="geometric_isomerism", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="parent_comment", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="parent_project", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="elnref", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="msmethod", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="msmass", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="provider", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="purity", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="puritymethod", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="nmrshifts", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="lotalias", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="lot_comment", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="lot_project", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="molfile", type=glue.Type(input_string="string", is_primitive=True)), + glue.Column(name="checksum", type=glue.Type(input_string="string", is_primitive=True)) + ], + database=createDatalakeDB.from_database_arn(self, "GetDBArn", + database_arn=createDatalakeDB.database_arn), + data_format=glue.DataFormat( + input_format=glue.InputFormat.PARQUET, + output_format=glue.OutputFormat.PARQUET, + serialization_library=glue.SerializationLibrary.PARQUET + ), + table_name="tbl_compound_data", + bucket=s3.Bucket.from_bucket_name(self, "getIBucket", bucket_name=config_dict['datalake_bucket_name']), + compressed=True, + description="This table contains data regarding compound registration coming from RDS", + partition_keys=[glue.Column(name="dt", type=glue.Type(input_string="string", is_primitive=True))], + s3_prefix="compound_reg/compound_data/" + ) + + core.CfnOutput(self, "createDatalakeCompRegTableName", value=createDatalakeCompRegTable.table_name) \ No newline at end of file diff --git a/chem-axon-setup/data_lake_setup/datalake_lambda_stack.py b/chem-axon-setup/data_lake_setup/datalake_lambda_stack.py new file mode 100644 index 0000000..f4e79e4 --- /dev/null +++ b/chem-axon-setup/data_lake_setup/datalake_lambda_stack.py @@ -0,0 +1,89 @@ +from aws_cdk.aws_lambda_event_sources import S3EventSource +from aws_cdk import ( + core, + aws_s3 as s3, + aws_ec2 as ec2, + aws_iam as iam, + aws_lambda as _lambda, + aws_s3_notifications +) + +""" This module uses below parameters from config_dict passed to it : +config_dict = { + 'vpc_id': 'vpc-01234567', + 'SubnetIds': 'subnet-01234567,subnet-0123456789', + 'AvailabilityZones': 'us-east-1a,us-east-1b', + 'datalake_bucket_name': 'datalake-prod' +} +""" + +class DatalakeLambdaStack(core.Stack): + + def __init__(self, scope: core.Construct, id: str, config_dict, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + """ Get VPC details """ + Ivpc = ec2.Vpc.from_lookup(self, "VPC", vpc_id=config_dict['vpc_id']) + + """ Get sunet seclection context created """ + subnet_1 = ec2.Subnet.from_subnet_attributes(self, "subnet_1", + subnet_id=config_dict['SubnetIds'].split(",")[0], + availability_zone=config_dict['AvailabilityZones'].split(",")[0] + ) + subnet_2 = ec2.Subnet.from_subnet_attributes(self, "subnet_2", + subnet_id=config_dict['SubnetIds'].split(",")[1], + availability_zone=config_dict['AvailabilityZones'].split(",")[1] + ) + + """ Create Security Group for Lambda Functions """ + lambda_security_group = "datalake-lambda-sg" + + createLambdaSecurityGroup = ec2.SecurityGroup(self, "createLambdaSecurityGroup", + vpc=Ivpc, + allow_all_outbound=True, + description="This security group will be used for Lambda Funcs", + security_group_name=lambda_security_group + ) + + """ Create the Datalake Bucket """ + createDatalakeBucket = s3.Bucket(self, "createCompRegBucket", + bucket_name=config_dict['datalake_bucket_name'], + block_public_access=s3.BlockPublicAccess(block_public_acls=True, + block_public_policy=True, + ignore_public_acls=True, + restrict_public_buckets=True + ) + ) + + core.CfnOutput(self, "createCompRegBucketName", value=createDatalakeBucket.bucket_name) + + + """ Create Comp Reg Lambda Function """ + createCompRegLambda = _lambda.Function( + self, "createCompRegLambda", + function_name="datalake-comp-reg-trigger", + description="This lambda function will trigger the compound reg pipeline.", + runtime=_lambda.Runtime.PYTHON_3_7, + handler="trigger_compound_reg_pipeline.lambda_handler", + code=_lambda.Code.asset('lambdas'), + timeout=core.Duration.seconds(90), + vpc=Ivpc, + vpc_subnets=ec2.SubnetSelection(subnets=[subnet_1, subnet_2]), + security_group=createLambdaSecurityGroup, + initial_policy=[iam.PolicyStatement(effect=iam.Effect.ALLOW, + actions=["s3:*", "batch:*"], + resources=["*"]) + ] + ) + + """ Add s3 event trigger to above lambda function """ + createCompRegLambda.add_event_source(S3EventSource( + createDatalakeBucket, + events=[s3.EventType.OBJECT_CREATED], + filters=[s3.NotificationKeyFilter( + prefix="compound_reg/triggers/", + suffix=".trigger" + ) + ] + ) + ) \ No newline at end of file diff --git a/chem-axon-setup/data_lake_setup/datalake_secret_manager_stack.py b/chem-axon-setup/data_lake_setup/datalake_secret_manager_stack.py new file mode 100644 index 0000000..a4efed2 --- /dev/null +++ b/chem-axon-setup/data_lake_setup/datalake_secret_manager_stack.py @@ -0,0 +1,40 @@ +from aws_cdk import ( + core, + aws_secretsmanager as sm +) + +""" This module uses below parameters from config_dict passed to it : +config_dict = { + 'comp_reg_secret_name': 'CompRegConn', + 'comp_reg_host_name': 'db_endpoint_host_name', + 'comp_reg_port': 'db_port', + 'comp_reg_db_name': 'db_name', + 'comp_reg_user_name': 'db_user', + 'comp_reg_password': 'db_pass' +} +""" + +class DatalakeSecretManagerStack(core.Stack): + + def __init__(self, scope: core.Construct, id: str, config_dict, **kwargs) -> None: + super().__init__(scope, id, **kwargs) + + """ Create a secret in secret manager with Database credentials for Comp Reg Source """ + stack = DatalakeSecretManagerStack.of(self) + + createCompRegSecret = sm.Secret(self, "createCompRegSecret", + description="Database credentials for Comp Reg Source", + secret_name=config_dict['comp_reg_secret_name'], + generate_secret_string=sm.SecretStringGenerator( + exclude_characters="{`~!@#$%^&*()_-+={[}}|\:;\"'<,>.?/}", + generate_string_key="pass_generated_by_SM", + secret_string_template=stack.to_json_string({ + 'db_username': config_dict['comp_reg_user_name'], + 'db_password': config_dict['comp_reg_password'], + 'db_port': config_dict['comp_reg_port'], + 'db_service_name': config_dict['comp_reg_db_name'], + 'db_host': config_dict['comp_reg_host_name'] + }) + ) + ) + diff --git a/chem-axon-setup/lambdas/trigger_compound_reg_pipeline.py b/chem-axon-setup/lambdas/trigger_compound_reg_pipeline.py new file mode 100644 index 0000000..258c854 --- /dev/null +++ b/chem-axon-setup/lambdas/trigger_compound_reg_pipeline.py @@ -0,0 +1,32 @@ +import boto3 +import os +import logging + +""" +Create a logging function and initiate it. +""" +format_string = "%(asctime)s %(name)s [%(levelname)s] %(message)s" +logger = logging.getLogger('comp-reg-data-load-pipeline-lambda') +handler = logging.StreamHandler() +logger.setLevel(logging.DEBUG) +formatter = logging.Formatter(format_string) +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def lambda_handler(event, context): + + # Initialise the environment variables required to trigger the AWS Batch Job + awsregion = os.environ.get('AWS_REGION') + + # Execute the batch job + batch_client = boto3.client('batch', region_name=awsregion) + execute_cmd = ['python', 'comp_reg_data_load.py', awsregion] + batch_job_id = batch_client.submit_job(jobDefinition='comp-reg-etl-job', + jobQueue='datalake-job-queue', + jobName=f'comp-reg-etl-job', + containerOverrides={'command': execute_cmd})['jobId'] + + # Log the batch job id triggered + logger.info("The command executed by Lambda function is : " + str(execute_cmd)) + logger.info("The AWS Batch Job ID : " + str(batch_job_id)) diff --git a/chem-axon-setup/requirements.txt b/chem-axon-setup/requirements.txt new file mode 100644 index 0000000..eddca61 --- /dev/null +++ b/chem-axon-setup/requirements.txt @@ -0,0 +1,14 @@ +aws-cdk.core +aws-cdk.aws_s3 +aws-cdk.aws_ec2 +aws-cdk.aws_batch +aws-cdk.aws_iam +aws-cdk.aws_glue +aws-cdk.aws_lambda +aws-cdk.aws_athena +aws-cdk.aws_ecs +aws-cdk.aws_ecr +aws_cdk.aws_secretsmanager +aws_cdk.aws_lambda +aws-cdk.aws_s3_notifications +aws_cdk.aws_lambda_event_sources