diff --git a/industries/asset_lifecycle_management_agent/.cursor.rules.md b/industries/asset_lifecycle_management_agent/.cursor.rules.md index eb7bceb73..c9f49cb96 100644 --- a/industries/asset_lifecycle_management_agent/.cursor.rules.md +++ b/industries/asset_lifecycle_management_agent/.cursor.rules.md @@ -542,6 +542,120 @@ configs/ .env # Environment variables ``` +## NAT Version Compatibility + +### NAT 1.2.1 vs 1.3.0 + +**Current Version**: NAT 1.2.1 (with pydantic 2.10.x) + +**Key Compatibility Rules**: + +1. **Optional String Fields**: +```python +# ❌ WRONG - Will fail validation +elasticsearch_url: str = Field(default=None) + +# ✅ CORRECT - Use Optional for nullable strings +from typing import Optional +elasticsearch_url: Optional[str] = Field(default=None) +``` + +2. **Reference Field Types (NAT 1.2.1)**: +```python +# NAT 1.2.1 uses plain strings for references +llm_name: str = Field(description="LLM reference") +embedding_name: str = Field(description="Embedder reference") +``` + +3. **Reference Field Types (NAT 1.3.0 - Future)**: +```python +# NAT 1.3.0 requires typed references +from nat.data_models.component_ref import LLMRef, EmbedderRef, FunctionRef + +llm_name: LLMRef = Field(description="LLM reference") +embedding_name: EmbedderRef = Field(description="Embedder reference") +code_execution_tool: FunctionRef = Field(description="Function reference") +``` + +4. **YAML Configuration Quoting**: +```yaml +# Always quote string references in YAML configs for pydantic 2.10+ +functions: + sql_retriever: + llm_name: "sql_llm" # Quoted + embedding_name: "vanna_embedder" # Quoted + vector_store_type: "chromadb" # Quoted + db_type: "sqlite" # Quoted + + data_analysis_assistant: + tool_names: [ + "sql_retriever", # All tool names quoted + "predict_rul", + "plot_distribution" + ] +``` + +### Pydantic 2.10+ Best Practices + +**Type Annotations**: +```python +from typing import Optional + +class ToolConfig(FunctionBaseConfig): + # Required fields + required_param: str = Field(description="Must be provided") + + # Optional fields with None default + optional_param: Optional[str] = Field(default=None, description="Can be None") + + # Optional fields with non-None default + param_with_default: str = Field(default="default_value", description="Has default") + + # Numeric fields (can use None without Optional if you want) + max_retries: int = Field(default=3, description="Number of retries") +``` + +**Common Validation Errors**: +``` +ValidationError: Input should be a valid string [input_value=None, input_type=NoneType] +→ Solution: Use Optional[str] instead of str for fields with default=None + +ValidationError: functions: Input should be a valid string (4 times) +→ Solution: Quote all string values in YAML config, especially references +``` + +### Upgrading to NAT 1.3.0 (Future) + +When upgrading, you'll need to: + +1. Update pyproject.toml: +```toml +dependencies = [ + "nvidia-nat[profiling,langchain,telemetry]==1.3.0", + "pydantic>=2.11.0,<3.0.0", +] +``` + +2. Update all tool configs: +```python +# Before (NAT 1.2.1) +llm_name: str = Field(...) + +# After (NAT 1.3.0) +from nat.data_models.component_ref import LLMRef +llm_name: LLMRef = Field(...) +``` + +3. Update evaluator configs: +```python +# multimodal_llm_judge_evaluator_register.py +# llm_judge_evaluator_register.py +from nat.data_models.component_ref import LLMRef +llm_name: LLMRef = Field(...) +``` + +4. Keep Optional[str] for nullable fields (both versions need this) + ## Debugging and Troubleshooting ### Common Issues and Solutions diff --git a/industries/asset_lifecycle_management_agent/.gitignore b/industries/asset_lifecycle_management_agent/.gitignore index b47a17cbf..f186aface 100644 --- a/industries/asset_lifecycle_management_agent/.gitignore +++ b/industries/asset_lifecycle_management_agent/.gitignore @@ -1,46 +1,123 @@ -# macOS system files +# Misc +config_examples.yml +config_examples.yaml +env.sh +frontend/ +prompts.md + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +*.egg +*.egg-info/ +dist/ +build/ +*.whl +pip-wheel-metadata/ .DS_Store -.DS_Store? -._* -.Spotlight-V100 -.Trashes -ehthumbs.db -Thumbs.db -# Database and vector store files -database/ -*.db -*.sqlite3 +# Virtual environments +.venv/ +venv/ +ENV/ +env/ + +# IDEs and Editors +.vscode/ +.idea/ +*.swp +*.swo +*~ +.DS_Store + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.hypothesis/ + +# Jupyter Notebook +.ipynb_checkpoints/ +*.ipynb_checkpoints/ -# Output and generated files +# Output and Data Directories output_data/ -moment/ -readmes/ -*.html -*.csv -*.npy +eval_output/ +example_eval_output/ +output/ +results/ +logs/ -# Python package metadata -src/**/*.egg-info/ -*.egg-info/ +# Database files +*.db +*.sqlite +*.sqlite3 +database/*.db +database/*.sqlite -# Environment files (if they contain secrets) -env.sh +# Vector store data (ChromaDB) +database/ +chroma_db/ +vector_store/ +vanna_vector_store/ -# Model files (if large/binary) +# Model files (large binary files) models/*.pkl -models/*.joblib -models/*.model +models/*.h5 +models/*.pt +models/*.pth +models/*.ckpt +*.pkl +*.h5 +*.pt +*.pth +moment/ -# Logs -*.log -logs/ +# Data files (CSV, JSON, etc. - be selective) +*.csv +*.json +!training_data.json +!vanna_training_data.yaml +!config*.json +!config*.yaml +!config*.yml +!pyproject.toml +!package.json + +# Frontend build artifacts +frontend/node_modules/ +frontend/dist/ +frontend/build/ +frontend/.next/ +frontend/out/ + +# Environment and secrets +.env +.env.local +.env.*.local +*.secret +secrets/ +credentials/ # Temporary files *.tmp *.temp -.pytest_cache/ -__pycache__/ +*.log +*.cache + +# OS specific +Thumbs.db +Desktop.ini + +# Experiment tracking +mlruns/ +wandb/ -# dot env -mydot.env +# Documentation builds +docs/_build/ +docs/.doctrees/ +site/ diff --git a/industries/asset_lifecycle_management_agent/INSTALLATION.md b/industries/asset_lifecycle_management_agent/INSTALLATION.md new file mode 100644 index 000000000..612d6399b --- /dev/null +++ b/industries/asset_lifecycle_management_agent/INSTALLATION.md @@ -0,0 +1,196 @@ +# Installation Guide + +This guide explains how to install the Predictive Maintenance Agent with different database and vector store options. + +## Base Installation + +Install the core package with default dependencies (ChromaDB + SQLite): + +```bash +pip install -e . +``` + +This includes: +- **ChromaDB** - Default vector store for SQL retriever +- **SQLite** - Built-in database support (no additional packages needed) +- **SQLAlchemy** - Generic SQL database support framework +- All core ML and visualization dependencies + +## Optional Dependencies + +Install additional packages based on your needs: + +### Elasticsearch Vector Store + +For production deployments with Elasticsearch as the vector store: + +```bash +pip install -e ".[elasticsearch]" +``` + +### PostgreSQL Database + +For PostgreSQL database support: + +```bash +pip install -e ".[postgres]" +``` + +### MySQL Database + +For MySQL database support: + +```bash +pip install -e ".[mysql]" +``` + +### SQL Server Database + +For Microsoft SQL Server support: + +```bash +pip install -e ".[sqlserver]" +``` + +**Note:** You also need to install the Microsoft ODBC Driver for SQL Server from [Microsoft's website](https://learn.microsoft.com/en-us/sql/connect/odbc/download-odbc-driver-for-sql-server). + +### Oracle Database + +For Oracle database support: + +```bash +pip install -e ".[oracle]" +``` + +**Note:** You also need to install Oracle Instant Client from [Oracle's website](https://www.oracle.com/database/technologies/instant-client.html). + +## Combined Installations + +### All Databases + +Install support for all SQL databases at once: + +```bash +pip install -e ".[all-databases]" +``` + +This includes: PostgreSQL, MySQL, SQL Server, and Oracle drivers. + +### Everything + +Install all optional dependencies (Elasticsearch + all databases): + +```bash +pip install -e ".[all]" +``` + +## Installation Examples by Use Case + +### Development Setup (Simplest) +```bash +# Base installation - ChromaDB + SQLite +pip install -e . +``` + +### Production with PostgreSQL +```bash +# Base + PostgreSQL +pip install -e ".[postgres]" +``` + +### Production with Elasticsearch and PostgreSQL +```bash +# Base + Elasticsearch + PostgreSQL +pip install -e ".[elasticsearch,postgres]" +``` + +### Enterprise with All Options +```bash +# Everything +pip install -e ".[all]" +``` + +## Verification + +After installation, verify your setup: + +```python +# Check installed packages +import chromadb # Should work with base install +import sqlalchemy # Should work with base install + +# Optional packages (only if installed) +import elasticsearch # If [elasticsearch] installed +import psycopg2 # If [postgres] installed +import pymysql # If [mysql] installed +import pyodbc # If [sqlserver] installed +import cx_Oracle # If [oracle] installed +``` + +## System Requirements + +- **Python:** 3.11 or 3.12 (Python 3.13 not yet supported) +- **OS:** Linux, macOS, or Windows +- **Memory:** Minimum 8GB RAM recommended +- **Disk:** Minimum 10GB free space + +## External Service Requirements + +Depending on your configuration, you may need: + +### Elasticsearch (Optional) +- Elasticsearch 8.0 or higher running +- Network access to Elasticsearch cluster +- Authentication credentials (API key or username/password) + +### Database Servers (Optional) +- **PostgreSQL:** PostgreSQL 12 or higher +- **MySQL:** MySQL 8.0 or higher +- **SQL Server:** SQL Server 2016 or higher +- **Oracle:** Oracle 19c or higher + +## Troubleshooting + +### Import Errors + +**Problem:** `ModuleNotFoundError: No module named 'elasticsearch'` +**Solution:** Install elasticsearch support: `pip install -e ".[elasticsearch]"` + +**Problem:** `ModuleNotFoundError: No module named 'psycopg2'` +**Solution:** Install PostgreSQL support: `pip install -e ".[postgres]"` + +### Binary Dependencies + +**SQL Server on Linux/Mac:** +```bash +# Install unixODBC first +# macOS: +brew install unixodbc + +# Ubuntu/Debian: +sudo apt-get install unixodbc unixodbc-dev + +# Then install ODBC driver from Microsoft +``` + +**Oracle:** +- Download and install Oracle Instant Client +- Set environment variables: + ```bash + export ORACLE_HOME=/path/to/instantclient + export LD_LIBRARY_PATH=$ORACLE_HOME:$LD_LIBRARY_PATH + ``` + +## Next Steps + +After installation, see: +- **Configuration Guide:** `configs/README.md` - How to configure vector stores and databases +- **Examples:** `config_examples.yaml` - Sample configurations +- **Getting Started:** Run the predictive maintenance workflow + +## Support + +For issues or questions: +1. Check the configuration guide: `configs/README.md` +2. Review example configs: `config_examples.yaml` +3. See troubleshooting sections in the README diff --git a/industries/asset_lifecycle_management_agent/README.md b/industries/asset_lifecycle_management_agent/README.md index 6cce9fc85..c2acdd906 100644 --- a/industries/asset_lifecycle_management_agent/README.md +++ b/industries/asset_lifecycle_management_agent/README.md @@ -33,6 +33,8 @@ This architecture provides the foundation for comprehensive asset health monitor ## Setup and Installation +> 📖 **For detailed installation instructions including database setup (PostgreSQL, MySQL, SQLite) and vector store configuration (ChromaDB, Elasticsearch), see [INSTALLATION.md](INSTALLATION.md)** + ### Prerequisites - Python 3.11+ (< 3.13) - Conda or Miniconda @@ -175,6 +177,21 @@ Now install the ALM workflow: uv pip install -e . ``` +#### Installation Options + +**Base Installation** (default - includes ChromaDB + SQLite): +```bash +uv pip install -e . +``` + +**Optional Database Support:** +- PostgreSQL: `uv pip install -e ".[postgres]"` +- MySQL: `uv pip install -e ".[mysql]"` +- All databases: `uv pip install -e ".[all-databases]"` + +**Optional Vector Store:** +- Elasticsearch: `uv pip install -e ".[elasticsearch]"` + ### [Optional] Verify if all prerequisite packages are installed ```bash uv pip list | grep -E "nvidia-nat|nvidia-nat-ragaai|nvidia-nat-phoenix|vanna|chromadb|xgboost|pytest|torch|matplotlib" @@ -239,6 +256,41 @@ This ensures that: - Output files are organized within your project - Configuration remains portable across different machines +#### Setting Up Workspace Utilities + +**IMPORTANT**: The code generation assistant requires a `utils` folder inside your `output_data` directory for RUL transformation tasks. + +**Setup Instructions:** + +1. Create the output_data directory (if it doesn't exist): +```bash +mkdir -p output_data +``` + +2. Copy the pre-built utility functions from the template: +```bash +cp -r utils_template output_data/utils +``` + +3. Verify the setup: +```bash +ls output_data/utils/ +# You should see: __init__.py rul_utils.py +``` + +**What's included:** +- `apply_piecewise_rul_transformation(df, maxlife=100)` - Transforms RUL data to create realistic "knee" patterns +- `show_utilities()` - Display available utility functions + +These utilities are automatically available to the code generation assistant when running in the Docker sandbox (mapped as `/workspace/utils/`). The system will only import these utilities when specifically needed for RUL transformations, preventing unnecessary module loading errors (`ModuleNotFoundError: No module named 'utils'` will not occur). + +**How It Works:** +- When you start the sandbox with `output_data/` as the mount point, the `utils/` folder becomes accessible at `/workspace/utils/` +- The code generation assistant only imports utils when performing RUL transformations +- For regular tasks (data retrieval, plotting, etc.), utils are not imported, avoiding module errors + +**Note**: If you move your `output_data` folder, make sure the `utils` subfolder comes with it, or copy it from `utils_template` again. + ### 6. Vanna SQL Agent Training (Automatic) **Important**: The Vanna SQL agent training happens automatically when you start the workflow server. The `vanna_training_data.yaml` file contains pre-configured domain-specific knowledge that will be loaded automatically during server startup. @@ -463,7 +515,9 @@ def your_custom_utility(file_path: str, param: int = 100) -> str: 4. **Consistent Interface**: All utilities return descriptive success messages 5. **Documentation**: Use `utils.show_utilities()` to discover available functions -### Setup Web Interface +### Alternative: Generic NeMo-Agent-Toolkit UI + +If you prefer the generic NeMo Agent Toolkit UI instead of our custom interface: ```bash git clone https://github.com/NVIDIA/NeMo-Agent-Toolkit-UI.git @@ -509,7 +563,7 @@ Retrieve and detect anomalies in sensor 4 measurements for engine number 78 in t **Workspace Utilities Demo** ``` -Retrieve ground truth RUL values and time in cycles from FD001 train dataset. Apply piecewise RUL transformation with MAXLIFE=100. Finally, Plot a line chart of the transformed values across time. +Retrieve RUL values and time in cycles for engine unit 24 from FD001 train dataset. Use the piece wise RUL transformation code utility to perform piecewise RUL transformation on the ground truth RUL values with MAXLIFE=100.Finally, Plot a comparison line chart with RUL values and its transformed values across time. ``` *This example demonstrates how to discover and use workspace utilities directly. The system will show available utilities and then apply the RUL transformation using the pre-built, reliable utility functions.* @@ -518,9 +572,9 @@ Retrieve ground truth RUL values and time in cycles from FD001 train dataset. Ap ``` Perform the following steps: -1.Retrieve the time in cycles, all sensor measurements, and ground truth RUL values for engine unit 24 from FD001 train dataset. +1.Retrieve the time in cycles, all sensor measurements, and ground truth RUL values, partition by unit number for engine unit 24 from FD001 train dataset. 2.Use the retrieved data to predict the Remaining Useful Life (RUL). -3.Use the piece wise RUL transformation code utility to apply piecewise RUL transformation only to the observed RUL column. +3.Use the piece wise RUL transformation code utility to apply piecewise RUL transformation only to the observed RUL column with MAXLIFE of 100. 4.Generate a plot that compares the transformed RUL values and the predicted RUL values across time. ``` ![Prediction Example](imgs/test_prompt_3.png) diff --git a/industries/asset_lifecycle_management_agent/configs/README.md b/industries/asset_lifecycle_management_agent/configs/README.md new file mode 100644 index 000000000..bb7218bcb --- /dev/null +++ b/industries/asset_lifecycle_management_agent/configs/README.md @@ -0,0 +1,571 @@ +# SQL Query and Retrieve Tool Configuration Guide + +This comprehensive guide explains how to configure the SQL Query and Retrieve Tool, covering both vector store backends and SQL database connections. + +## Table of Contents +1. [Vector Store Configuration](#vector-store-configuration) +2. [SQL Database Configuration](#sql-database-configuration) +3. [Complete Configuration Examples](#complete-configuration-examples) +4. [Troubleshooting](#troubleshooting) + +--- + +## Vector Store Configuration + +### Overview + +The tool supports **two vector store backends** for storing Vanna AI SQL training data: +- **ChromaDB** (local, file-based) - Default +- **Elasticsearch** (distributed, server-based) + +Both vector stores provide identical functionality and store the same data (DDL, documentation, question-SQL pairs). + +### Quick Start - Vector Stores + +#### Option 1: ChromaDB (Recommended for Development) + +```yaml +functions: + - name: my_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + + # ChromaDB Configuration (DEFAULT) + vector_store_type: chromadb + vector_store_path: ./vanna_vector_store + + # Database and other settings... + db_connection_string_or_path: ./database.db + db_type: sqlite + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +**Requirements:** +- No additional services required +- No extra Python packages needed + +#### Option 2: Elasticsearch (Recommended for Production) + +```yaml +functions: + - name: my_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + + # Elasticsearch Configuration + vector_store_type: elasticsearch + elasticsearch_url: http://localhost:9200 + elasticsearch_index_name: vanna_sql_vectors # Optional + elasticsearch_username: elastic # Optional + elasticsearch_password: changeme # Optional + + # Database and other settings... + db_connection_string_or_path: ./database.db + db_type: sqlite + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +**Requirements:** +- Elasticsearch service must be running +- Install: `pip install elasticsearch` + +### Detailed Comparison - Vector Stores + +| Feature | ChromaDB | Elasticsearch | +|---------|----------|---------------| +| **Setup Complexity** | Simple | Moderate | +| **External Services** | None required | Requires ES cluster | +| **Storage Type** | Local file-based | Distributed | +| **High Availability** | No | Yes (with clustering) | +| **Horizontal Scaling** | No | Yes | +| **Best For** | Dev, testing, single-server | Production, multi-user | +| **Authentication** | File system | API key or basic auth | +| **Performance** | Fast for single-user | Fast for multi-user | +| **Backup** | Copy directory | ES snapshots | + +### When to Use Each Vector Store + +#### Use ChromaDB When: +✅ Getting started or prototyping +✅ Single-server deployment +✅ Local development environment +✅ Simple setup required +✅ No existing Elasticsearch infrastructure +✅ Small to medium data volume + +#### Use Elasticsearch When: +✅ Production environment +✅ Multiple instances/users need access +✅ Need high availability and clustering +✅ Already have Elasticsearch infrastructure +✅ Need advanced search capabilities +✅ Distributed deployment required +✅ Large scale deployments + +### Vector Store Configuration Parameters + +#### Common Parameters (Both Vector Stores) +```yaml +llm_name: string # LLM to use +embedding_name: string # Embedding model to use +db_connection_string_or_path: string # Database connection +db_type: string # 'sqlite', 'postgres', or 'sql' +output_folder: string # Output directory +vanna_training_data_path: string # Training data YAML file +``` + +#### ChromaDB-Specific Parameters +```yaml +vector_store_type: chromadb # Set to 'chromadb' +vector_store_path: string # Directory for ChromaDB storage +``` + +#### Elasticsearch-Specific Parameters +```yaml +vector_store_type: elasticsearch # Set to 'elasticsearch' +elasticsearch_url: string # ES URL (e.g., http://localhost:9200) +elasticsearch_index_name: string # Index name (default: vanna_vectors) +elasticsearch_username: string # Optional: for basic auth +elasticsearch_password: string # Optional: for basic auth +elasticsearch_api_key: string # Optional: alternative to username/password +``` + +### Elasticsearch Authentication + +Choose one of these authentication methods: + +#### Option 1: API Key (Recommended) +```yaml +elasticsearch_api_key: your-api-key-here +``` + +#### Option 2: Basic Auth +```yaml +elasticsearch_username: elastic +elasticsearch_password: changeme +``` + +#### Option 3: No Auth (Development Only) +```yaml +# Omit all auth parameters +``` + +### Data Migration Between Vector Stores + +#### From ChromaDB to Elasticsearch +1. Export training data from ChromaDB +2. Update configuration to use Elasticsearch +3. Run tool - it will auto-initialize Elasticsearch with training data + +#### From Elasticsearch to ChromaDB +1. Training data is reloaded from YAML file automatically +2. Update configuration to use ChromaDB +3. Run tool - it will auto-initialize ChromaDB + +### Vector Store Troubleshooting + +#### ChromaDB Issues +**Problem:** `FileNotFoundError` or permission errors +**Solution:** Ensure directory exists and has write permissions + +**Problem:** Slow performance +**Solution:** ChromaDB is single-threaded, consider Elasticsearch for better performance + +#### Elasticsearch Issues +**Problem:** `ConnectionError` or `ConnectionTimeout` +**Solution:** Verify Elasticsearch is running: `curl http://localhost:9200` + +**Problem:** `AuthenticationException` +**Solution:** Check username/password or API key + +**Problem:** Index already exists with different mapping +**Solution:** Delete index and let tool recreate: `curl -X DELETE http://localhost:9200/vanna_vectors` + +--- + +## SQL Database Configuration + +### Overview + +The tool supports **multiple SQL database types** through a unified `db_connection_string_or_path` parameter: +- **SQLite** (local, file-based) - Default +- **PostgreSQL** (open-source RDBMS) +- **MySQL** (open-source RDBMS) +- **SQL Server** (Microsoft database) +- **Oracle** (enterprise database) +- **Any SQLAlchemy-compatible database** + +### Quick Start - SQL Databases + +#### Option 1: SQLite (File-Based, No Server Required) + +```yaml +db_connection_string_or_path: ./database.db # Just a file path +db_type: sqlite +``` + +**Requirements:** +- No additional services required +- No extra Python packages needed (sqlite3 is built-in) + +#### Option 2: PostgreSQL + +```yaml +db_connection_string_or_path: postgresql://user:password@localhost:5432/database +db_type: postgres +``` + +**Requirements:** +- PostgreSQL server must be running +- Install: `pip install psycopg2-binary` + +#### Option 3: MySQL + +```yaml +db_connection_string_or_path: mysql+pymysql://user:password@localhost:3306/database +db_type: sql # Generic SQL via SQLAlchemy +``` + +**Requirements:** +- MySQL server must be running +- Install: `pip install pymysql sqlalchemy` + +#### Option 4: SQL Server + +```yaml +db_connection_string_or_path: mssql+pyodbc://user:pass@host:1433/db?driver=ODBC+Driver+17+for+SQL+Server +db_type: sql # Generic SQL via SQLAlchemy +``` + +**Requirements:** +- SQL Server must be running +- Install: `pip install pyodbc sqlalchemy` +- Install ODBC Driver for SQL Server + +#### Option 5: Oracle + +```yaml +db_connection_string_or_path: oracle+cx_oracle://user:password@host:1521/?service_name=service +db_type: sql # Generic SQL via SQLAlchemy +``` + +**Requirements:** +- Oracle database must be running +- Install: `pip install cx_Oracle sqlalchemy` + +### Detailed Comparison - SQL Databases + +| Feature | SQLite | PostgreSQL | MySQL | SQL Server | Oracle | +|---------|--------|------------|-------|------------|--------| +| **Setup** | None | Server required | Server required | Server required | Server required | +| **Cost** | Free | Free | Free | Licensed | Licensed | +| **Use Case** | Dev/testing | Production | Production | Enterprise | Enterprise | +| **Concurrent Users** | Limited | Excellent | Excellent | Excellent | Excellent | +| **File-Based** | Yes | No | No | No | No | +| **Advanced Features** | Basic | Advanced | Good | Advanced | Advanced | +| **Python Driver** | Built-in | psycopg2 | pymysql | pyodbc | cx_Oracle | + +### When to Use Each Database + +#### Use SQLite When: +✅ Development and testing +✅ Prototyping and demos +✅ Single-user applications +✅ No server infrastructure required +✅ Small to medium data volume +✅ Embedded applications +✅ Quick setup needed + +#### Use PostgreSQL When: +✅ Production deployments +✅ Multi-user applications +✅ Need advanced SQL features +✅ Open-source preference +✅ Need strong data integrity +✅ Complex queries and analytics +✅ GIS data support needed + +#### Use MySQL When: +✅ Web applications +✅ Read-heavy workloads +✅ Need wide compatibility +✅ Open-source preference +✅ Large-scale deployments +✅ Replication required + +#### Use SQL Server When: +✅ Microsoft ecosystem +✅ Enterprise applications +✅ .NET integration needed +✅ Advanced analytics (T-SQL) +✅ Business intelligence +✅ Existing SQL Server infrastructure + +#### Use Oracle When: +✅ Large enterprise deployments +✅ Mission-critical applications +✅ Need advanced features (RAC, Data Guard) +✅ Existing Oracle infrastructure +✅ High-availability requirements +✅ Maximum performance needed + +### Connection String Formats + +#### SQLite +``` +Format: /path/to/database.db +Example: ./data/sales.db +Example: /var/app/database.db +``` + +#### PostgreSQL +``` +Format: postgresql://username:password@host:port/database +Example: postgresql://admin:secret@db.example.com:5432/sales_db +Example: postgresql://user:pass@localhost:5432/mydb +``` + +#### MySQL +``` +Format: mysql+pymysql://username:password@host:port/database +Example: mysql+pymysql://root:password@localhost:3306/inventory +Example: mysql+pymysql://dbuser:pass@192.168.1.10:3306/analytics +``` + +#### SQL Server +``` +Format: mssql+pyodbc://user:pass@host:port/db?driver=ODBC+Driver+XX+for+SQL+Server +Example: mssql+pyodbc://sa:MyPass@localhost:1433/sales?driver=ODBC+Driver+17+for+SQL+Server +Example: mssql+pyodbc://user:pwd@server:1433/db?driver=ODBC+Driver+18+for+SQL+Server +``` + +#### Oracle +``` +Format: oracle+cx_oracle://username:password@host:port/?service_name=service +Example: oracle+cx_oracle://admin:secret@localhost:1521/?service_name=ORCLPDB +Example: oracle+cx_oracle://user:pass@oracledb:1521/?service_name=PROD +``` + +### Database Configuration Parameters + +```yaml +db_connection_string_or_path: string # Path (SQLite) or connection string (others) +db_type: string # 'sqlite', 'postgres', or 'sql' +``` + +**db_type values:** +- `sqlite` - For SQLite databases (uses connect_to_sqlite internally) +- `postgres` or `postgresql` - For PostgreSQL databases (uses connect_to_postgres) +- `sql` - For generic SQL databases via SQLAlchemy (MySQL, SQL Server, Oracle, etc.) + +### SQL Database Troubleshooting + +#### SQLite Issues +**Problem:** `database is locked` error +**Solution:** Close all connections or use WAL mode + +**Problem:** `unable to open database file` +**Solution:** Check file path and permissions + +#### PostgreSQL Issues +**Problem:** `connection refused` +**Solution:** Check PostgreSQL is running: `systemctl status postgresql` + +**Problem:** `authentication failed` +**Solution:** Verify credentials and check pg_hba.conf + +**Problem:** `database does not exist` +**Solution:** Create database: `createdb database_name` + +#### MySQL Issues +**Problem:** `Access denied for user` +**Solution:** Check credentials and user permissions: `GRANT ALL ON db.* TO 'user'@'host'` + +**Problem:** `Can't connect to MySQL server` +**Solution:** Check MySQL is running: `systemctl status mysql` + +#### SQL Server Issues +**Problem:** `Login failed for user` +**Solution:** Check SQL Server authentication mode and user permissions + +**Problem:** `ODBC Driver not found` +**Solution:** Install ODBC Driver: Download from Microsoft + +**Problem:** `SSL Provider: No credentials are available` +**Solution:** Add `TrustServerCertificate=yes` to connection string + +#### Oracle Issues +**Problem:** `ORA-12541: TNS:no listener` +**Solution:** Start Oracle listener: `lsnrctl start` + +**Problem:** `ORA-01017: invalid username/password` +**Solution:** Verify credentials and user exists + +**Problem:** `cx_Oracle.DatabaseError` +**Solution:** Check Oracle client libraries are installed + +### Required Python Packages by Database + +```bash +# SQLite (built-in, no packages needed) +# Already included with Python + +# PostgreSQL +pip install psycopg2-binary + +# MySQL +pip install pymysql sqlalchemy + +# SQL Server +pip install pyodbc sqlalchemy +# Also install: Microsoft ODBC Driver for SQL Server + +# Oracle +pip install cx_Oracle sqlalchemy +# Also install: Oracle Instant Client + +# Generic SQL (covers MySQL, SQL Server, Oracle via SQLAlchemy) +pip install sqlalchemy +``` + +--- + +## Complete Configuration Examples + +### Example 1: SQLite with ChromaDB (Simplest Setup) +```yaml +functions: + - name: simple_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + # Vector store + vector_store_type: chromadb + vector_store_path: ./vanna_vector_store + # Database + db_connection_string_or_path: ./database.db + db_type: sqlite + # Output + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +### Example 2: PostgreSQL with Elasticsearch (Production Setup) +```yaml +functions: + - name: production_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + # Vector store + vector_store_type: elasticsearch + elasticsearch_url: http://elasticsearch:9200 + elasticsearch_username: elastic + elasticsearch_password: changeme + # Database + db_connection_string_or_path: postgresql://dbuser:dbpass@postgres:5432/analytics + db_type: postgres + # Output + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +### Example 3: MySQL with ChromaDB +```yaml +functions: + - name: mysql_sql_tool + type: generate_sql_query_and_retrieve_tool + llm_name: nim_llm + embedding_name: nim_embeddings + # Vector store + vector_store_type: chromadb + vector_store_path: ./vanna_vector_store + # Database + db_connection_string_or_path: mysql+pymysql://root:password@localhost:3306/sales + db_type: sql + # Output + output_folder: ./output + vanna_training_data_path: ./training_data.yaml +``` + +--- + +## Architecture Notes + +Both vector stores: +- Use the same NVIDIA embedding models +- Store identical training data +- Provide the same vector similarity search +- Are managed automatically by VannaManager +- Support the same training data YAML format + +The tool automatically: +- Detects if vector store needs initialization +- Loads training data from YAML file +- Creates embeddings using NVIDIA models +- Manages vector store lifecycle + +### Performance Tips + +#### ChromaDB +- Keep on SSD for faster I/O +- Regular directory backups +- Monitor disk space + +#### Elasticsearch +- Use SSD-backed storage +- Configure appropriate heap size +- Enable index caching +- Use snapshots for backups +- Monitor cluster health + +--- + +## Quick Reference + +### Configuration Matrix + +| Database | Vector Store | db_type | Connection Format | +|----------|--------------|---------|-------------------| +| SQLite | ChromaDB | sqlite | `./database.db` | +| SQLite | Elasticsearch | sqlite | `./database.db` | +| PostgreSQL | ChromaDB | postgres | `postgresql://user:pass@host:port/db` | +| PostgreSQL | Elasticsearch | postgres | `postgresql://user:pass@host:port/db` | +| MySQL | ChromaDB | sql | `mysql+pymysql://user:pass@host:port/db` | +| MySQL | Elasticsearch | sql | `mysql+pymysql://user:pass@host:port/db` | +| SQL Server | ChromaDB | sql | `mssql+pyodbc://user:pass@host:port/db?driver=...` | +| SQL Server | Elasticsearch | sql | `mssql+pyodbc://user:pass@host:port/db?driver=...` | +| Oracle | ChromaDB | sql | `oracle+cx_oracle://user:pass@host:port/?service_name=...` | +| Oracle | Elasticsearch | sql | `oracle+cx_oracle://user:pass@host:port/?service_name=...` | + +### Recommended Combinations + +| Use Case | Vector Store | Database | Why | +|----------|--------------|----------|-----| +| **Development** | ChromaDB | SQLite | Simplest setup, no servers | +| **Production (Small)** | ChromaDB | PostgreSQL | Reliable, open-source | +| **Production (Large)** | Elasticsearch | PostgreSQL | Scalable, distributed | +| **Enterprise** | Elasticsearch | SQL Server/Oracle | Advanced features, HA | +| **Web App** | ChromaDB | MySQL | Standard web stack | +| **Analytics** | Elasticsearch | PostgreSQL | Complex queries, multi-user | + +### Default Values + +```yaml +vector_store_type: chromadb # Default +elasticsearch_index_name: vanna_vectors # Default ES index +db_type: sqlite # Default +``` + +--- + +## Additional Resources + +For more detailed examples, see: +- **`config_examples.yaml`** - Complete working examples with all combinations of vector stores and databases +- **`vanna_manager.py`** - Implementation details for connection management +- **`vanna_util.py`** - Vector store implementations (ChromaDB and Elasticsearch) diff --git a/industries/asset_lifecycle_management_agent/configs/config-reasoning.yml b/industries/asset_lifecycle_management_agent/configs/config-reasoning.yaml similarity index 93% rename from industries/asset_lifecycle_management_agent/configs/config-reasoning.yml rename to industries/asset_lifecycle_management_agent/configs/config-reasoning.yaml index 88bfd34c1..6a061ae41 100644 --- a/industries/asset_lifecycle_management_agent/configs/config-reasoning.yml +++ b/industries/asset_lifecycle_management_agent/configs/config-reasoning.yaml @@ -25,11 +25,11 @@ general: # _type: file # path: "alm.log" # level: DEBUG - tracing: - phoenix: - _type: phoenix - endpoint: http://localhost:6006/v1/traces - project: alm-agent + # tracing: + # phoenix: + # _type: phoenix + # endpoint: http://localhost:6006/v1/traces + # project: alm-agent # catalyst: # _type: catalyst # project: "alm-agent" @@ -45,8 +45,6 @@ llms: analyst_llm: _type: nim model_name: "qwen/qwen2.5-coder-32b-instruct" - # _type: openai - # model_name: "gpt-4.1-mini" # Python code generation model coding_llm: @@ -61,21 +59,26 @@ llms: # Multimodal evaluation model (Vision-Language Model) multimodal_judging_llm: _type: nim - model_name: nvidia/llama-3.1-nemotron-nano-vl-8b-v1 + model_name: nvidia/llama-3.1-nemotron-nano-vl-8b-v1 embedders: # Text embedding model for vector database operations vanna_embedder: _type: nim - model_name: "nvidia/nv-embed-v1" + model_name: "nvidia/llama-3_2-nv-embedqa-1b-v2" functions: sql_retriever: _type: generate_sql_query_and_retrieve_tool - llm_name: sql_llm - embedding_name: vanna_embedder + llm_name: "sql_llm" + embedding_name: "vanna_embedder" + # Vector store configuration + vector_store_type: "chromadb" # Optional, chromadb is default vector_store_path: "database" - db_path: "database/nasa_turbo.db" + # Database configuration + db_type: "sqlite" # Optional, sqlite is default + db_connection_string_or_path: "database/nasa_turbo.db" + # Output configuration output_folder: "output_data" vanna_training_data_path: "vanna_training_data.yaml" @@ -107,30 +110,30 @@ functions: code_generation_assistant: _type: code_generation_assistant - llm_name: coding_llm - code_execution_tool: code_execution + llm_name: "coding_llm" + code_execution_tool: "code_execution" verbose: true code_execution: _type: code_execution uri: http://127.0.0.1:6000/execute - sandbox_type: local + sandbox_type: "local" max_output_characters: 2000 data_analysis_assistant: _type: react_agent - llm_name: analyst_llm + llm_name: "analyst_llm" max_iterations: 20 max_retries: 2 tool_names: [ - sql_retriever, - predict_rul, - plot_distribution, - plot_line_chart, - plot_comparison, - anomaly_detection, - plot_anomaly, - code_generation_assistant + "sql_retriever", + "predict_rul", + "plot_distribution", + "plot_line_chart", + "plot_comparison", + "anomaly_detection", + "plot_anomaly", + "code_generation_assistant" ] parse_agent_response_max_retries: 2 system_prompt: | @@ -212,8 +215,8 @@ functions: workflow: _type: reasoning_agent - augmented_fn: data_analysis_assistant - llm_name: reasoning_llm + augmented_fn: "data_analysis_assistant" + llm_name: "reasoning_llm" verbose: true reasoning_prompt_template: | ### DESCRIPTION ### @@ -273,7 +276,7 @@ eval: evaluators: multimodal_eval: _type: multimodal_llm_judge_evaluator - llm_name: multimodal_judging_llm + llm_name: "multimodal_judging_llm" judge_prompt: | You are an expert evaluator for Asset Lifecycle Management agentic workflows, with expertise in predictive maintenance tasks. Your task is to evaluate how well a generated response (which may include both text and visualizations) diff --git a/industries/asset_lifecycle_management_agent/env_template.txt b/industries/asset_lifecycle_management_agent/env_template.txt index 7c2d52b1b..839c9b6d9 100644 --- a/industries/asset_lifecycle_management_agent/env_template.txt +++ b/industries/asset_lifecycle_management_agent/env_template.txt @@ -6,6 +6,9 @@ # NVIDIA NIM API key - Replace with your actual key from build.nvidia.com NVIDIA_API_KEY="your-actual-nvidia-api-key" +# OPENAI API KEY - Replace with your openai api key if you are using the openai endpoint +OPENAI_API_KEY="your-openai-api-key" + # Optional: Catalyst observability credentials (for monitoring with RAGA AI) # Uncomment and set these if you want to use Catalyst tracing # CATALYST_ACCESS_KEY="your-actual-catalyst-access-key" diff --git a/industries/asset_lifecycle_management_agent/eval_output/multimodal_eval_output.json b/industries/asset_lifecycle_management_agent/example_eval_output/multimodal_eval_output.json similarity index 100% rename from industries/asset_lifecycle_management_agent/eval_output/multimodal_eval_output.json rename to industries/asset_lifecycle_management_agent/example_eval_output/multimodal_eval_output.json diff --git a/industries/asset_lifecycle_management_agent/eval_output/workflow_output.json b/industries/asset_lifecycle_management_agent/example_eval_output/workflow_output.json similarity index 100% rename from industries/asset_lifecycle_management_agent/eval_output/workflow_output.json rename to industries/asset_lifecycle_management_agent/example_eval_output/workflow_output.json diff --git a/industries/asset_lifecycle_management_agent/pyproject.toml b/industries/asset_lifecycle_management_agent/pyproject.toml index 5e7c4677f..757cc391d 100644 --- a/industries/asset_lifecycle_management_agent/pyproject.toml +++ b/industries/asset_lifecycle_management_agent/pyproject.toml @@ -6,11 +6,11 @@ requires = ["setuptools >= 64"] name = "asset_lifecycle_management_agent" dynamic = ["version"] dependencies = [ - "nvidia-nat[profiling, langchain, telemetry]==1.2.1", + "nvidia-nat[profiling, langchain, telemetry]==1.3.0", "momentfm", - "pydantic ~= 2.10.0, <2.11.0", "vanna==0.7.9", "chromadb", + "sqlalchemy>=2.0.0", "xgboost", "matplotlib", "torch", @@ -23,6 +23,36 @@ classifiers = ["Programming Language :: Python"] authors = [{ name = "Vineeth Kalluru" }] maintainers = [{ name = "NVIDIA Corporation" }] +[project.optional-dependencies] +elasticsearch = [ + "elasticsearch>=8.0.0" +] +postgres = [ + "psycopg2-binary>=2.9.0" +] +mysql = [ + "pymysql>=1.0.0" +] +sqlserver = [ + "pyodbc>=4.0.0" +] +oracle = [ + "cx_Oracle>=8.0.0" +] +all-databases = [ + "psycopg2-binary>=2.9.0", + "pymysql>=1.0.0", + "pyodbc>=4.0.0", + "cx_Oracle>=8.0.0" +] +all = [ + "elasticsearch>=8.0.0", + "psycopg2-binary>=2.9.0", + "pymysql>=1.0.0", + "pyodbc>=4.0.0", + "cx_Oracle>=8.0.0" +] + [project.entry-points.'nat.components'] asset_lifecycle_management_agent = "asset_lifecycle_management_agent.register" diff --git a/industries/asset_lifecycle_management_agent/setup_database.py b/industries/asset_lifecycle_management_agent/setup_database.py index 595a542f0..972ebf151 100644 --- a/industries/asset_lifecycle_management_agent/setup_database.py +++ b/industries/asset_lifecycle_management_agent/setup_database.py @@ -46,7 +46,7 @@ class NASADatasetProcessor: """Processes NASA Turbofan Engine Dataset and creates SQLite database.""" - def __init__(self, data_dir: str = "data", db_path: str = "PredM_db/nasa_turbo.db"): + def __init__(self, data_dir: str = "data", db_path: str = "database/nasa_turbo.db"): """ Initialize the processor. @@ -125,27 +125,20 @@ def process_training_data(self, conn: sqlite3.Connection): 'train_FD001.txt', 'train_FD002.txt', 'train_FD003.txt', 'train_FD004.txt' ] - all_training_data = [] - for file_name in training_files: file_path = self.data_dir / file_name if file_path.exists(): df = self.read_data_file(file_path) if not df.empty: - # Add dataset identifier - df['dataset'] = file_name.replace('train_', '').replace('.txt', '') - # Calculate RUL for training data (max cycle - current cycle) df['RUL'] = df.groupby('unit_number')['time_in_cycles'].transform('max') - df['time_in_cycles'] - all_training_data.append(df) + # Create separate table for each dataset (e.g., train_FD001) + table_name = file_name.replace('.txt', '') + df.to_sql(table_name, conn, if_exists='replace', index=False) + logger.info(f"Created {table_name} table with {len(df)} records") else: logger.warning(f"Training file not found: {file_path}") - - if all_training_data: - training_df = pd.concat(all_training_data, ignore_index=True) - training_df.to_sql('training_data', conn, if_exists='replace', index=False) - logger.info(f"Created training_data table with {len(training_df)} records") def process_test_data(self, conn: sqlite3.Connection): """Process test data files and create database tables.""" @@ -155,23 +148,17 @@ def process_test_data(self, conn: sqlite3.Connection): 'test_FD001.txt', 'test_FD002.txt', 'test_FD003.txt', 'test_FD004.txt' ] - all_test_data = [] - for file_name in test_files: file_path = self.data_dir / file_name if file_path.exists(): df = self.read_data_file(file_path) if not df.empty: - # Add dataset identifier - df['dataset'] = file_name.replace('test_', '').replace('.txt', '') - all_test_data.append(df) + # Create separate table for each dataset (e.g., test_FD001) + table_name = file_name.replace('.txt', '') + df.to_sql(table_name, conn, if_exists='replace', index=False) + logger.info(f"Created {table_name} table with {len(df)} records") else: logger.warning(f"Test file not found: {file_path}") - - if all_test_data: - test_df = pd.concat(all_test_data, ignore_index=True) - test_df.to_sql('test_data', conn, if_exists='replace', index=False) - logger.info(f"Created test_data table with {len(test_df)} records") def process_rul_data(self, conn: sqlite3.Connection): """Process RUL (Remaining Useful Life) data files.""" @@ -181,8 +168,6 @@ def process_rul_data(self, conn: sqlite3.Connection): 'RUL_FD001.txt', 'RUL_FD002.txt', 'RUL_FD003.txt', 'RUL_FD004.txt' ] - all_rul_data = [] - for file_name in rul_files: file_path = self.data_dir / file_name if file_path.exists(): @@ -190,18 +175,15 @@ def process_rul_data(self, conn: sqlite3.Connection): # RUL files contain one RUL value per line for each test engine rul_values = pd.read_csv(file_path, header=None, names=['RUL']) rul_values['unit_number'] = range(1, len(rul_values) + 1) - rul_values['dataset'] = file_name.replace('RUL_', '').replace('.txt', '') - all_rul_data.append(rul_values[['unit_number', 'dataset', 'RUL']]) - logger.info(f"Loaded {len(rul_values)} RUL values from {file_name}") + + # Create separate table for each dataset (e.g., RUL_FD001) + table_name = file_name.replace('.txt', '') + rul_values[['unit_number', 'RUL']].to_sql(table_name, conn, if_exists='replace', index=False) + logger.info(f"Created {table_name} table with {len(rul_values)} records") except Exception as e: logger.error(f"Error reading RUL file {file_path}: {e}") else: logger.warning(f"RUL file not found: {file_path}") - - if all_rul_data: - rul_df = pd.concat(all_rul_data, ignore_index=True) - rul_df.to_sql('rul_data', conn, if_exists='replace', index=False) - logger.info(f"Created rul_data table with {len(rul_df)} records") def create_metadata_tables(self, conn: sqlite3.Connection): """Create metadata tables with sensor descriptions and dataset information.""" @@ -229,18 +211,24 @@ def create_indexes(self, conn: sqlite3.Connection): """Create database indexes for better query performance.""" logger.info("Creating database indexes...") - indexes = [ - "CREATE INDEX IF NOT EXISTS idx_training_unit ON training_data(unit_number)", - "CREATE INDEX IF NOT EXISTS idx_training_dataset ON training_data(dataset)", - "CREATE INDEX IF NOT EXISTS idx_training_cycle ON training_data(time_in_cycles)", - "CREATE INDEX IF NOT EXISTS idx_test_unit ON test_data(unit_number)", - "CREATE INDEX IF NOT EXISTS idx_test_dataset ON test_data(dataset)", - "CREATE INDEX IF NOT EXISTS idx_test_cycle ON test_data(time_in_cycles)", - "CREATE INDEX IF NOT EXISTS idx_rul_unit ON rul_data(unit_number, dataset)" - ] + datasets = ['FD001', 'FD002', 'FD003', 'FD004'] + indexes = [] + + # Create indexes for each dataset's tables + for dataset in datasets: + indexes.extend([ + f"CREATE INDEX IF NOT EXISTS idx_train_{dataset}_unit ON train_{dataset}(unit_number)", + f"CREATE INDEX IF NOT EXISTS idx_train_{dataset}_cycle ON train_{dataset}(time_in_cycles)", + f"CREATE INDEX IF NOT EXISTS idx_test_{dataset}_unit ON test_{dataset}(unit_number)", + f"CREATE INDEX IF NOT EXISTS idx_test_{dataset}_cycle ON test_{dataset}(time_in_cycles)", + f"CREATE INDEX IF NOT EXISTS idx_RUL_{dataset}_unit ON RUL_{dataset}(unit_number)" + ]) for index_sql in indexes: - conn.execute(index_sql) + try: + conn.execute(index_sql) + except Exception as e: + logger.warning(f"Failed to create index: {e}") conn.commit() logger.info("Created database indexes") @@ -345,7 +333,7 @@ def main(): parser = argparse.ArgumentParser(description="Convert NASA Turbofan Dataset to SQLite") parser.add_argument("--data-dir", default="data", help="Directory containing NASA dataset text files") - parser.add_argument("--db-path", default="PredM_db/nasa_turbo.db", + parser.add_argument("--db-path", default="database/nasa_turbo.db", help="Path for output SQLite database") args = parser.parse_args() diff --git a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/__init__.py b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/__init__.py index 1b79187be..f2fcadbdb 100644 --- a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/__init__.py +++ b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.0.0" +__version__ = "2.0.0" diff --git a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/plotting/code_generation_assistant.py b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/plotting/code_generation_assistant.py index 46677f366..0c236f782 100644 --- a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/plotting/code_generation_assistant.py +++ b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/plotting/code_generation_assistant.py @@ -36,7 +36,7 @@ class CodeGenerationAssistantConfig(FunctionBaseConfig, name="code_generation_as code_execution_tool: FunctionRef = Field(description="The code execution tool to run generated code") output_folder: str = Field(description="The path to the output folder for generated files", default="/output_data") verbose: bool = Field(description="Enable verbose logging", default=True) - max_retries: int = Field(description="Maximum number of retries if code execution fails", default=3) + max_retries: int = Field(description="Maximum number of retries if code execution fails", default=0) @register_function(config_type=CodeGenerationAssistantConfig, framework_wrappers=[LLMFrameworkEnum.LANGCHAIN]) @@ -70,37 +70,26 @@ async def _generate_and_execute_code( OUTPUT ONLY THE CODE. NO COMMENTS. NO DOCSTRINGS. NO EXPLANATIONS. Generate only the code needed. Your response must contain ONLY executable Python code which will be DIRECTLY EXECUTED IN A SANDBOX. -**UTILITIES AVAILABLE:** -A 'utils' folder contains pre-built functions for Asset Lifecycle Management tasks (especially predictive maintenance): -- utils.apply_piecewise_rul_transformation - INPUTS: - - file_path: Path to JSON file with time series data - - maxlife: Maximum life threshold for the piecewise function (default: 100) - - time_col: Name of the time/cycle column (default: 'time_in_cycles') - - rul_col: Name of the RUL column to transform (default: 'RUL') - OUTPUTS: - - pandas DataFrame with original data plus new 'transformed_RUL' column - * Transform RUL data with realistic knee pattern - * Returns a pandas DataFrame with original data plus new 'transformed_RUL' column +**DATABASE PATH:** +For SQLite operations, the database file is located at: '/workspace/database/nasa_turbo.db' +ALWAYS use this exact path when connecting to the database. -- utils.show_utilities(): Show all available utilities if you need to see them +**UTILITIES (OPTIONAL - ONLY FOR RUL TRANSFORMATIONS):** +ONLY IF the task involves piecewise RUL transformation, you may use: +- utils.apply_piecewise_rul_transformation(df, maxlife=100, time_col='time_in_cycles', rul_col='RUL') + Takes a pandas DataFrame and returns it with an added 'transformed_RUL' column using piecewise transformation. -**CRITICAL REQUIREMENT: ALWAYS USE UTILITIES when available instead of writing custom implementations.** -This ensures reliable, tested functionality and consistent results. - -To use utilities, start your code with: +To use utilities (ONLY if needed for RUL transformation): ```python import sys -sys.path.append(".") +sys.path.append("/workspace") import utils + +# Your code using the utility +df_transformed = utils.apply_piecewise_rul_transformation(df, maxlife=100) ``` -**UTILITY USAGE GUIDELINES:** -- Check if your task can be accomplished using the utility function -- For RUL transformations: ALWAYS use utils.apply_piecewise_rul_transformation() instead of custom logic -- The utility handles all error checking and provides detailed success messages -- Use maxlife parameter to control the knee threshold (default: 100) -- Capture the DataFrame returned by the utility function for further processing +DO NOT import utils unless specifically needed for RUL transformation tasks. **CODE REQUIREMENTS:** 1. Generate COMPLETE, SYNTACTICALLY CORRECT Python code @@ -109,6 +98,7 @@ async def _generate_and_execute_code( 4. NO comments, NO docstrings, NO explanations 5. Use minimal variable names (df, fig, data, etc.) 6. **CRITICAL FILE PATH RULE**: Use ONLY the filename directly (e.g., "filename.json"), NOT "output_data/filename.json" +7. **DATABASE PATH RULE**: Use '/workspace/database/nasa_turbo.db' for SQLite connections 8. **IF YOU STILL NEED TO SAVE FILES, THEN PRINT FILE NAMES TO STDOUT. (eg: print("Saved file to: filename.json"))** GENERATE CODE ONLY. NO COMMENTS. NO EXPLANATIONS.""" @@ -226,10 +216,11 @@ def is_code_incomplete(code): {error_info} Please generate corrected Python code that fixes the problem. Follow all requirements: -- Use utilities when available +- Use '/workspace/database/nasa_turbo.db' for database connections +- Only import utils if doing RUL transformations (use sys.path.append("/workspace")) - Generate only executable Python code - No comments or explanations -- Handle file paths correctly +- Handle file paths correctly (use only filename, not paths) - Complete all code blocks properly - Ensure the code is complete and not truncated @@ -340,7 +331,6 @@ def _clean_generated_code(raw_code: str) -> str: return '\n'.join(clean_lines).strip() - def _extract_file_paths(stdout: str, output_folder: str) -> list: """Extract generated file paths from execution output.""" import re diff --git a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/generate_sql_query_and_retrieve_tool.py b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/generate_sql_query_and_retrieve_tool.py index 319285c52..dd34e0bc2 100644 --- a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/generate_sql_query_and_retrieve_tool.py +++ b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/generate_sql_query_and_retrieve_tool.py @@ -16,6 +16,7 @@ import json import logging import os +from typing import Optional from pydantic import Field, BaseModel @@ -30,12 +31,58 @@ class GenerateSqlQueryAndRetrieveToolConfig(FunctionBaseConfig, name="generate_sql_query_and_retrieve_tool"): """ NeMo Agent Toolkit function to generate SQL queries and retrieve data. + + Supports multiple database types through flexible connection configuration. """ # Runtime configuration parameters llm_name: str = Field(description="The name of the LLM to use for the function.") embedding_name: str = Field(description="The name of the embedding to use for the function.") - vector_store_path: str = Field(description="The path to the vector store to use for the function.") - db_path: str = Field(description="The path to the SQL database to use for the function.") + + # Vector store configuration + vector_store_type: str = Field( + default="chromadb", + description="Type of vector store: 'chromadb' or 'elasticsearch'" + ) + vector_store_path: Optional[str] = Field( + default=None, + description="Path to ChromaDB vector store (required if vector_store_type='chromadb')" + ) + elasticsearch_url: Optional[str] = Field( + default=None, + description="Elasticsearch URL (required if vector_store_type='elasticsearch', e.g., 'http://localhost:9200')" + ) + elasticsearch_index_name: str = Field( + default="vanna_vectors", + description="Elasticsearch index name (used if vector_store_type='elasticsearch')" + ) + elasticsearch_username: Optional[str] = Field( + default=None, + description="Elasticsearch username for basic auth (optional)" + ) + elasticsearch_password: Optional[str] = Field( + default=None, + description="Elasticsearch password for basic auth (optional)" + ) + elasticsearch_api_key: Optional[str] = Field( + default=None, + description="Elasticsearch API key for authentication (optional)" + ) + + # Database configuration + db_connection_string_or_path: str = Field( + description=( + "Database connection (path for SQLite, connection string for others). Format depends on db_type:\n" + "- sqlite: Path to .db file (e.g., './database.db')\n" + "- postgres: Connection string (e.g., 'postgresql://user:pass@host:port/db')\n" + "- sql: SQLAlchemy connection string (e.g., 'mysql+pymysql://user:pass@host/db')" + ) + ) + db_type: str = Field( + default="sqlite", + description="Type of database: 'sqlite', 'postgres', or 'sql' (generic SQL via SQLAlchemy)" + ) + + # Output configuration output_folder: str = Field(description="The path to the output folder to use for the function.") vanna_training_data_path: str = Field(description="The path to the YAML file containing Vanna training data.") @@ -106,8 +153,15 @@ class GenerateSqlQueryInputSchema(BaseModel): vanna_manager = VannaManager.create_with_config( vanna_llm_config=vanna_llm_config, vanna_embedder_config=vanna_embedder_config, + vector_store_type=config.vector_store_type, vector_store_path=config.vector_store_path, - db_path=config.db_path, + elasticsearch_url=config.elasticsearch_url, + elasticsearch_index_name=config.elasticsearch_index_name, + elasticsearch_username=config.elasticsearch_username, + elasticsearch_password=config.elasticsearch_password, + elasticsearch_api_key=config.elasticsearch_api_key, + db_connection_string_or_path=config.db_connection_string_or_path, + db_type=config.db_type, training_data_path=config.vanna_training_data_path ) @@ -173,27 +227,27 @@ async def _response_fn(input_question_in_english: str) -> str: import re llm_response = re.sub(r',\[object Object\],?', '', llm_response) - if "save" in llm_response.lower(): - # Clean the question for filename - clean_question = re.sub(r'[^\w\s-]', '', input_question_in_english.lower()) - clean_question = re.sub(r'\s+', '_', clean_question.strip())[:30] - suggested_filename = f"{clean_question}_results.json" - - sql_output_path = os.path.join(config.output_folder, suggested_filename) + # if "save" in llm_response.lower(): + # Clean the question for filename + clean_question = re.sub(r'[^\w\s-]', '', input_question_in_english.lower()) + clean_question = re.sub(r'\s+', '_', clean_question.strip())[:30] + suggested_filename = f"{clean_question}_results.json" - # Save the data to JSON file - os.makedirs(config.output_folder, exist_ok=True) - json_result = df.to_json(orient="records") - with open(sql_output_path, 'w') as f: - json.dump(json.loads(json_result), f, indent=4) + sql_output_path = os.path.join(config.output_folder, suggested_filename) - logger.info(f"Data saved to {sql_output_path}") + # Save the data to JSON file + os.makedirs(config.output_folder, exist_ok=True) + json_result = df.to_json(orient="records") + with open(sql_output_path, 'w') as f: + json.dump(json.loads(json_result), f, indent=4) - llm_response += f"\n\nData has been saved to file: {suggested_filename}" + logger.info(f"Data saved to {sql_output_path}") - return llm_response + llm_response += f"\n\nData has been saved to file: {suggested_filename}" return llm_response + + # return llm_response except Exception as e: return f"Error running SQL query '{sql}': {e}" diff --git a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/vanna_manager.py b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/vanna_manager.py index a5a3a93e6..bce296bbd 100644 --- a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/vanna_manager.py +++ b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/vanna_manager.py @@ -21,7 +21,7 @@ import threading import hashlib from typing import Dict, Optional -from .vanna_util import NIMVanna, initVanna, CustomEmbeddingFunction +from .vanna_util import NIMVanna, ElasticNIMVanna, initVanna, NVIDIAEmbeddingFunction logger = logging.getLogger(__name__) @@ -33,6 +33,7 @@ class VannaManager: - Singleton pattern to ensure only one instance per configuration - Thread-safe operations - Simple instance management + - Support for multiple database types: SQLite, generic SQL, and PostgreSQL """ _instances: Dict[str, 'VannaManager'] = {} @@ -49,8 +50,31 @@ def __new__(cls, config_key: str): logger.debug(f"VannaManager: Returning existing singleton instance for config: {config_key}") return cls._instances[config_key] - def __init__(self, config_key: str, vanna_llm_config=None, vanna_embedder_config=None, vector_store_path: str = None, db_path: str = None, training_data_path: str = None): - """Initialize the VannaManager and create Vanna instance immediately if all config is provided""" + def __init__(self, config_key: str, vanna_llm_config=None, vanna_embedder_config=None, + vector_store_type: str = "chromadb", vector_store_path: str = None, + elasticsearch_url: str = None, elasticsearch_index_name: str = "vanna_vectors", + elasticsearch_username: str = None, elasticsearch_password: str = None, + elasticsearch_api_key: str = None, + db_connection_string_or_path: str = None, db_type: str = "sqlite", + training_data_path: str = None, nvidia_api_key: str = None): + """Initialize the VannaManager and create Vanna instance immediately if all config is provided + + Args: + config_key: Unique key for this configuration + vanna_llm_config: LLM configuration object + vanna_embedder_config: Embedder configuration object + vector_store_type: Type of vector store - 'chromadb' or 'elasticsearch' + vector_store_path: Path to ChromaDB vector store (required if vector_store_type='chromadb') + elasticsearch_url: Elasticsearch URL (required if vector_store_type='elasticsearch') + elasticsearch_index_name: Elasticsearch index name + elasticsearch_username: Elasticsearch username for basic auth + elasticsearch_password: Elasticsearch password for basic auth + elasticsearch_api_key: Elasticsearch API key + db_connection_string_or_path: Database connection (path for SQLite, connection string for others) + db_type: Type of database - 'sqlite', 'postgres', or 'sql' (generic SQL with SQLAlchemy) + training_data_path: Path to YAML training data file + nvidia_api_key: NVIDIA API key (optional, can use NVIDIA_API_KEY env var) + """ if hasattr(self, '_initialized') and self._initialized: return @@ -60,17 +84,29 @@ def __init__(self, config_key: str, vanna_llm_config=None, vanna_embedder_config # Store configuration self.vanna_llm_config = vanna_llm_config self.vanna_embedder_config = vanna_embedder_config + self.vector_store_type = vector_store_type self.vector_store_path = vector_store_path - self.db_path = db_path + self.elasticsearch_url = elasticsearch_url + self.elasticsearch_index_name = elasticsearch_index_name + self.elasticsearch_username = elasticsearch_username + self.elasticsearch_password = elasticsearch_password + self.elasticsearch_api_key = elasticsearch_api_key + self.db_connection_string_or_path = db_connection_string_or_path + self.db_type = db_type self.training_data_path = training_data_path + self.nvidia_api_key = nvidia_api_key or os.getenv("NVIDIA_API_KEY") # Create and initialize Vanna instance immediately if all required config is provided self.vanna_instance = None - if all([vanna_llm_config, vanna_embedder_config, vector_store_path, db_path]): + has_vector_config = ( + (vector_store_type == "chromadb" and vector_store_path) or + (vector_store_type == "elasticsearch" and elasticsearch_url) + ) + if all([vanna_llm_config, vanna_embedder_config, has_vector_config, self.db_connection_string_or_path]): logger.debug(f"VannaManager: Initializing with immediate Vanna instance creation") self.vanna_instance = self._create_instance() else: - if any([vanna_llm_config, vanna_embedder_config, vector_store_path, db_path]): + if any([vanna_llm_config, vanna_embedder_config, vector_store_path, elasticsearch_url, self.db_connection_string_or_path]): logger.debug(f"VannaManager: Partial configuration provided, Vanna instance will be created later") else: logger.debug(f"VannaManager: No configuration provided, Vanna instance will be created later") @@ -78,7 +114,11 @@ def __init__(self, config_key: str, vanna_llm_config=None, vanna_embedder_config self._initialized = True logger.debug(f"VannaManager initialized for config: {config_key}") - def get_instance(self, vanna_llm_config=None, vanna_embedder_config=None, vector_store_path: str = None, db_path: str = None, training_data_path: str = None) -> NIMVanna: + def get_instance(self, vanna_llm_config=None, vanna_embedder_config=None, + vector_store_type: str = None, vector_store_path: str = None, + elasticsearch_url: str = None, + db_connection_string_or_path: str = None, db_type: str = None, + training_data_path: str = None, nvidia_api_key: str = None): """ Get the Vanna instance. If not created during init, create it now with provided parameters. """ @@ -89,59 +129,155 @@ def get_instance(self, vanna_llm_config=None, vanna_embedder_config=None, vector # Update configuration with provided parameters self.vanna_llm_config = vanna_llm_config or self.vanna_llm_config self.vanna_embedder_config = vanna_embedder_config or self.vanna_embedder_config + self.vector_store_type = vector_store_type or self.vector_store_type self.vector_store_path = vector_store_path or self.vector_store_path - self.db_path = db_path or self.db_path + self.elasticsearch_url = elasticsearch_url or self.elasticsearch_url + self.db_connection_string_or_path = db_connection_string_or_path or self.db_connection_string_or_path + self.db_type = db_type or self.db_type self.training_data_path = training_data_path or self.training_data_path + self.nvidia_api_key = nvidia_api_key or self.nvidia_api_key + + # Check if we have required vector store config + has_vector_config = ( + (self.vector_store_type == "chromadb" and self.vector_store_path) or + (self.vector_store_type == "elasticsearch" and self.elasticsearch_url) + ) - if all([self.vanna_llm_config, self.vanna_embedder_config, self.vector_store_path, self.db_path]): + if all([self.vanna_llm_config, self.vanna_embedder_config, has_vector_config, self.db_connection_string_or_path]): self.vanna_instance = self._create_instance() else: raise RuntimeError("VannaManager: Missing required configuration parameters") else: logger.debug(f"VannaManager: Returning pre-initialized Vanna instance (ID: {id(self.vanna_instance)})") + logger.debug(f"VannaManager: Vector store type: {self.vector_store_type}") # Show vector store status for pre-initialized instances try: - if os.path.exists(self.vector_store_path): - list_of_folders = [d for d in os.listdir(self.vector_store_path) - if os.path.isdir(os.path.join(self.vector_store_path, d))] - logger.debug(f"VannaManager: Vector store contains {len(list_of_folders)} collections/folders") - if list_of_folders: - logger.debug(f"VannaManager: Vector store folders: {list_of_folders}") - else: - logger.debug(f"VannaManager: Vector store directory does not exist") + if self.vector_store_type == "chromadb" and self.vector_store_path: + if os.path.exists(self.vector_store_path): + list_of_folders = [d for d in os.listdir(self.vector_store_path) + if os.path.isdir(os.path.join(self.vector_store_path, d))] + logger.debug(f"VannaManager: ChromaDB contains {len(list_of_folders)} collections/folders") + if list_of_folders: + logger.debug(f"VannaManager: ChromaDB folders: {list_of_folders}") + else: + logger.debug(f"VannaManager: ChromaDB directory does not exist") + elif self.vector_store_type == "elasticsearch": + logger.debug(f"VannaManager: Using Elasticsearch at {self.elasticsearch_url}") except Exception as e: logger.warning(f"VannaManager: Could not check vector store status: {e}") return self.vanna_instance - def _create_instance(self) -> NIMVanna: + def _create_instance(self): """ Create a new Vanna instance using the stored configuration. + Returns NIMVanna (ChromaDB) or ElasticNIMVanna (Elasticsearch) based on vector_store_type. """ logger.info(f"VannaManager: Creating instance for {self.config_key}") - logger.debug(f"VannaManager: Vector store path: {self.vector_store_path}") - logger.debug(f"VannaManager: Database path: {self.db_path}") + logger.debug(f"VannaManager: Vector store type: {self.vector_store_type}") + logger.debug(f"VannaManager: Database connection: {self.db_connection_string_or_path}") + logger.debug(f"VannaManager: Database type: {self.db_type}") logger.debug(f"VannaManager: Training data path: {self.training_data_path}") - # Create instance - vn_instance = NIMVanna( - VectorConfig={ - "client": "persistent", - "path": self.vector_store_path, - "embedding_function": CustomEmbeddingFunction( - api_key=os.getenv("NVIDIA_API_KEY"), - model=self.vanna_embedder_config.model_name) - }, - LLMConfig={ - "api_key": os.getenv("NVIDIA_API_KEY"), - "model": self.vanna_llm_config.model_name - } + # Create embedding function (used by both ChromaDB and Elasticsearch) + embedding_function = NVIDIAEmbeddingFunction( + api_key=self.nvidia_api_key, + model=self.vanna_embedder_config.model_name ) - # Connect to database - logger.debug(f"VannaManager: Connecting to SQLite database...") - vn_instance.connect_to_sqlite(self.db_path) + # LLM configuration (common for both) + llm_config = { + "api_key": self.nvidia_api_key, + "model": self.vanna_llm_config.model_name + } + + # Create instance based on vector store type + if self.vector_store_type == "chromadb": + logger.debug(f"VannaManager: Creating NIMVanna with ChromaDB") + logger.debug(f"VannaManager: ChromaDB path: {self.vector_store_path}") + vn_instance = NIMVanna( + VectorConfig={ + "client": "persistent", + "path": self.vector_store_path, + "embedding_function": embedding_function + }, + LLMConfig=llm_config + ) + elif self.vector_store_type == "elasticsearch": + logger.debug(f"VannaManager: Creating ElasticNIMVanna with Elasticsearch") + logger.debug(f"VannaManager: Elasticsearch URL: {self.elasticsearch_url}") + logger.debug(f"VannaManager: Elasticsearch index: {self.elasticsearch_index_name}") + + # Build Elasticsearch vector config + es_config = { + "url": self.elasticsearch_url, + "index_name": self.elasticsearch_index_name, + "embedding_function": embedding_function + } + + # Add authentication if provided + if self.elasticsearch_api_key: + es_config["api_key"] = self.elasticsearch_api_key + logger.debug("VannaManager: Using Elasticsearch API key authentication") + elif self.elasticsearch_username and self.elasticsearch_password: + es_config["username"] = self.elasticsearch_username + es_config["password"] = self.elasticsearch_password + logger.debug("VannaManager: Using Elasticsearch basic authentication") + + vn_instance = ElasticNIMVanna( + VectorConfig=es_config, + LLMConfig=llm_config + ) + else: + raise ValueError( + f"Unsupported vector store type: {self.vector_store_type}. " + "Supported types: 'chromadb', 'elasticsearch'" + ) + + # Connect to database based on type + logger.debug(f"VannaManager: Connecting to {self.db_type} database...") + if self.db_type == "sqlite": + # Vanna's connect_to_sqlite has broken URL detection in 0.7.9 + # It tries to download everything with requests.get() + # For local files, use direct SQLite connection + import os + db_path = self.db_connection_string_or_path + + # Convert relative paths to absolute + if not os.path.isabs(db_path): + db_path = os.path.abspath(db_path) + + # For local files, use sqlite3 directly + if os.path.exists(db_path): + import sqlite3 + import pandas as pd + + def run_sql_sqlite(sql: str): + """Execute SQL on local SQLite database.""" + conn = sqlite3.connect(db_path) + try: + df = pd.read_sql_query(sql, conn) + return df + finally: + conn.close() + + vn_instance.run_sql = run_sql_sqlite + vn_instance.run_sql_is_set = True + logger.debug(f"VannaManager: Connected to local SQLite database: {db_path}") + else: + # If file doesn't exist, let Vanna try (maybe it's a URL) + logger.warning(f"VannaManager: Database file not found: {db_path}") + vn_instance.connect_to_sqlite(self.db_connection_string_or_path) + elif self.db_type == "postgres" or self.db_type == "postgresql": + self._connect_to_postgres(vn_instance, self.db_connection_string_or_path) + elif self.db_type == "sql": + self._connect_to_sql(vn_instance, self.db_connection_string_or_path) + else: + raise ValueError( + f"Unsupported database type: {self.db_type}. " + "Supported types: 'sqlite', 'postgres', 'sql'" + ) # Set configuration - allow LLM to see data for database introspection vn_instance.allow_llm_to_see_data = True @@ -163,29 +299,126 @@ def _create_instance(self) -> NIMVanna: logger.info(f"VannaManager: Instance created successfully") return vn_instance + def _connect_to_postgres(self, vn_instance: NIMVanna, connection_string: str): + """ + Connect to a PostgreSQL database. + + Args: + vn_instance: The Vanna instance to connect + connection_string: PostgreSQL connection string in format: + postgresql://user:password@host:port/database + """ + try: + import psycopg2 + from psycopg2.pool import SimpleConnectionPool + + logger.info("Connecting to PostgreSQL database...") + + # Parse connection string if needed + if connection_string.startswith("postgresql://"): + # Use SQLAlchemy-style connection for Vanna + vn_instance.connect_to_postgres(url=connection_string) + else: + # Assume it's a psycopg2 connection string + vn_instance.connect_to_postgres(url=f"postgresql://{connection_string}") + + logger.info("Successfully connected to PostgreSQL database") + except ImportError: + logger.error( + "psycopg2 is required for PostgreSQL connections. " + "Install it with: pip install psycopg2-binary" + ) + raise + except Exception as e: + logger.error(f"Error connecting to PostgreSQL: {e}") + raise + + def _connect_to_sql(self, vn_instance: NIMVanna, connection_string: str): + """ + Connect to a generic SQL database using SQLAlchemy. + + Args: + vn_instance: The Vanna instance to connect + connection_string: SQLAlchemy-compatible connection string, e.g.: + - MySQL: mysql+pymysql://user:password@host:port/database + - PostgreSQL: postgresql://user:password@host:port/database + - SQL Server: mssql+pyodbc://user:password@host:port/database?driver=ODBC+Driver+17+for+SQL+Server + - Oracle: oracle+cx_oracle://user:password@host:port/?service_name=service + """ + try: + from sqlalchemy import create_engine + + logger.info("Connecting to SQL database via SQLAlchemy...") + + # Create SQLAlchemy engine + engine = create_engine(connection_string) + + # Connect Vanna to the database using the engine + vn_instance.connect_to_sqlalchemy(engine) + + logger.info("Successfully connected to SQL database") + except ImportError: + logger.error( + "SQLAlchemy is required for generic SQL connections. " + "Install it with: pip install sqlalchemy" + ) + raise + except Exception as e: + logger.error(f"Error connecting to SQL database: {e}") + raise + def _needs_initialization(self) -> bool: """ Check if the vector store needs initialization by checking if it's empty. + For ChromaDB: checks directory existence and contents + For Elasticsearch: checks if index exists and has data """ logger.debug(f"VannaManager: Checking if vector store needs initialization...") - logger.debug(f"VannaManager: Vector store path: {self.vector_store_path}") + logger.debug(f"VannaManager: Vector store type: {self.vector_store_type}") try: - if not os.path.exists(self.vector_store_path): - logger.debug(f"VannaManager: Vector store directory does not exist -> needs initialization") - return True - - # Check if there are any subdirectories (ChromaDB creates subdirectories when data is stored) - list_of_folders = [d for d in os.listdir(self.vector_store_path) - if os.path.isdir(os.path.join(self.vector_store_path, d))] - - logger.debug(f"VannaManager: Found {len(list_of_folders)} folders in vector store") - if list_of_folders: - logger.debug(f"VannaManager: Vector store folders: {list_of_folders}") - logger.debug(f"VannaManager: Vector store is populated -> skipping initialization") - return False + if self.vector_store_type == "chromadb": + logger.debug(f"VannaManager: Checking ChromaDB at: {self.vector_store_path}") + + if not os.path.exists(self.vector_store_path): + logger.debug(f"VannaManager: ChromaDB directory does not exist -> needs initialization") + return True + + # Check if there are any subdirectories (ChromaDB creates subdirectories when data is stored) + list_of_folders = [d for d in os.listdir(self.vector_store_path) + if os.path.isdir(os.path.join(self.vector_store_path, d))] + + logger.debug(f"VannaManager: Found {len(list_of_folders)} folders in ChromaDB") + if list_of_folders: + logger.debug(f"VannaManager: ChromaDB folders: {list_of_folders}") + logger.debug(f"VannaManager: ChromaDB is populated -> skipping initialization") + return False + else: + logger.debug(f"VannaManager: ChromaDB is empty -> needs initialization") + return True + + elif self.vector_store_type == "elasticsearch": + logger.debug(f"VannaManager: Checking Elasticsearch at: {self.elasticsearch_url}") + + # For Elasticsearch, check if training data is available in the instance + # This is a simplified check - we assume if we can connect, we should initialize if no training data exists + try: + if hasattr(self.vanna_instance, 'get_training_data'): + training_data = self.vanna_instance.get_training_data() + if training_data and len(training_data) > 0: + logger.debug(f"VannaManager: Elasticsearch has {len(training_data)} training data entries -> skipping initialization") + return False + else: + logger.debug(f"VannaManager: Elasticsearch has no training data -> needs initialization") + return True + else: + logger.debug(f"VannaManager: Cannot check Elasticsearch training data -> needs initialization") + return True + except Exception as e: + logger.debug(f"VannaManager: Error checking Elasticsearch data ({e}) -> needs initialization") + return True else: - logger.debug(f"VannaManager: Vector store is empty -> needs initialization") + logger.warning(f"VannaManager: Unknown vector store type: {self.vector_store_type}") return True except Exception as e: @@ -233,16 +466,42 @@ def get_stats(self) -> Dict: return { "config_key": self.config_key, "instance_id": id(self.vanna_instance) if self.vanna_instance else None, - "has_instance": self.vanna_instance is not None + "has_instance": self.vanna_instance is not None, + "db_type": self.db_type, } @classmethod - def create_with_config(cls, vanna_llm_config, vanna_embedder_config, vector_store_path: str, db_path: str, training_data_path: str = None): + def create_with_config(cls, vanna_llm_config, vanna_embedder_config, + vector_store_type: str = "chromadb", vector_store_path: str = None, + elasticsearch_url: str = None, elasticsearch_index_name: str = "vanna_vectors", + elasticsearch_username: str = None, elasticsearch_password: str = None, + elasticsearch_api_key: str = None, + db_connection_string_or_path: str = None, db_type: str = "sqlite", + training_data_path: str = None, nvidia_api_key: str = None): """ Class method to create a VannaManager with full configuration. Uses create_config_key to ensure singleton behavior based on configuration. + + Args: + vanna_llm_config: LLM configuration object + vanna_embedder_config: Embedder configuration object + vector_store_type: Type of vector store - 'chromadb' or 'elasticsearch' + vector_store_path: Path to ChromaDB vector store (required if vector_store_type='chromadb') + elasticsearch_url: Elasticsearch URL (required if vector_store_type='elasticsearch') + elasticsearch_index_name: Elasticsearch index name + elasticsearch_username: Elasticsearch username for basic auth + elasticsearch_password: Elasticsearch password for basic auth + elasticsearch_api_key: Elasticsearch API key + db_connection_string_or_path: Database connection (path for SQLite, connection string for others) + db_type: Type of database - 'sqlite', 'postgres', or 'sql' + training_data_path: Path to YAML training data file + nvidia_api_key: NVIDIA API key (optional) """ - config_key = create_config_key(vanna_llm_config, vanna_embedder_config, vector_store_path, db_path) + config_key = create_config_key( + vanna_llm_config, vanna_embedder_config, + vector_store_type, vector_store_path, elasticsearch_url, + db_connection_string_or_path, db_type + ) # Create instance with just config_key (singleton pattern) instance = cls(config_key) @@ -251,9 +510,17 @@ def create_with_config(cls, vanna_llm_config, vanna_embedder_config, vector_stor if not hasattr(instance, 'vanna_llm_config') or instance.vanna_llm_config is None: instance.vanna_llm_config = vanna_llm_config instance.vanna_embedder_config = vanna_embedder_config + instance.vector_store_type = vector_store_type instance.vector_store_path = vector_store_path - instance.db_path = db_path + instance.elasticsearch_url = elasticsearch_url + instance.elasticsearch_index_name = elasticsearch_index_name + instance.elasticsearch_username = elasticsearch_username + instance.elasticsearch_password = elasticsearch_password + instance.elasticsearch_api_key = elasticsearch_api_key + instance.db_connection_string_or_path = db_connection_string_or_path + instance.db_type = db_type instance.training_data_path = training_data_path + instance.nvidia_api_key = nvidia_api_key # Create Vanna instance immediately if all config is available if instance.vanna_instance is None: @@ -262,9 +529,24 @@ def create_with_config(cls, vanna_llm_config, vanna_embedder_config, vector_stor return instance -def create_config_key(vanna_llm_config, vanna_embedder_config, vector_store_path: str, db_path: str) -> str: +def create_config_key(vanna_llm_config, vanna_embedder_config, + vector_store_type: str, vector_store_path: str, elasticsearch_url: str, + db_connection_string_or_path: str, db_type: str = "sqlite") -> str: """ Create a unique configuration key for the VannaManager singleton. + + Args: + vanna_llm_config: LLM configuration object + vanna_embedder_config: Embedder configuration object + vector_store_type: Type of vector store + vector_store_path: Path to ChromaDB vector store + elasticsearch_url: Elasticsearch URL + db_connection_string_or_path: Database connection (path for SQLite, connection string for others) + db_type: Type of database + + Returns: + str: Unique configuration key """ - config_str = f"{vanna_llm_config.model_name}_{vanna_embedder_config.model_name}_{vector_store_path}_{db_path}" + vector_id = vector_store_path if vector_store_type == "chromadb" else elasticsearch_url + config_str = f"{vanna_llm_config.model_name}_{vanna_embedder_config.model_name}_{vector_store_type}_{vector_id}_{db_connection_string_or_path}_{db_type}" return hashlib.md5(config_str.encode()).hexdigest()[:12] diff --git a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/vanna_util.py b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/vanna_util.py index 2c90fd85c..f4764e556 100644 --- a/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/vanna_util.py +++ b/industries/asset_lifecycle_management_agent/src/asset_lifecycle_management_agent/retrievers/vanna_util.py @@ -13,12 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -from vanna.chromadb import ChromaDB_VectorStore -from vanna.base import VannaBase -from langchain_nvidia import ChatNVIDIA +"""Vanna utilities for SQL generation using NVIDIA NIM services.""" + +import logging + +from langchain_nvidia import ChatNVIDIA, NVIDIAEmbeddings from tqdm import tqdm +from vanna.base import VannaBase +from vanna.chromadb import ChromaDB_VectorStore + +logger = logging.getLogger(__name__) class NIMCustomLLM(VannaBase): + """Custom LLM implementation for Vanna using NVIDIA NIM.""" + def __init__(self, config=None): VannaBase.__init__(self, config=config) @@ -27,10 +35,10 @@ def __init__(self, config=None): # default parameters - can be overrided using config self.temperature = 0.7 - + if "temperature" in config: self.temperature = config["temperature"] - + # If only config is passed if "api_key" not in config: raise ValueError("config must contain a NIM api_key") @@ -40,7 +48,7 @@ def __init__(self, config=None): api_key = config["api_key"] model = config["model"] - + # Initialize ChatNVIDIA client self.client = ChatNVIDIA( api_key=api_key, @@ -49,16 +57,23 @@ def __init__(self, config=None): ) self.model = model - def system_message(self, message: str) -> any: - return {"role": "system", "content": message+"\n DO NOT PRODUCE MARKDOWN, ONLY RESPOND IN PLAIN TEXT"} + def system_message(self, message: str) -> dict: + """Create a system message.""" + return { + "role": "system", + "content": message + "\n DO NOT PRODUCE MARKDOWN, ONLY RESPOND IN PLAIN TEXT", + } - def user_message(self, message: str) -> any: + def user_message(self, message: str) -> dict: + """Create a user message.""" return {"role": "user", "content": message} - def assistant_message(self, message: str) -> any: + def assistant_message(self, message: str) -> dict: + """Create an assistant message.""" return {"role": "assistant", "content": message} def submit_prompt(self, prompt, **kwargs) -> str: + """Submit a prompt to the LLM.""" if prompt is None: raise Exception("Prompt is None") @@ -70,60 +85,547 @@ def submit_prompt(self, prompt, **kwargs) -> str: num_tokens = 0 for message in prompt: num_tokens += len(message["content"]) / 4 - print(f"Using model {self.model} for {num_tokens} tokens (approx)") - - response = self.client.invoke(prompt) - return response.content + logger.debug(f"Using model {self.model} for {num_tokens} tokens (approx)") + + logger.debug(f"Submitting prompt with {len(prompt)} messages") + logger.debug(f"Prompt content preview: {str(prompt)[:500]}...") + + try: + response = self.client.invoke(prompt) + logger.debug(f"Response type: {type(response)}") + logger.debug(f"Response content type: {type(response.content)}") + logger.debug( + f"Response content length: {len(response.content) if response.content else 0}" + ) + logger.debug( + f"Response content preview: {response.content[:200] if response.content else 'None'}..." + ) + return response.content + except Exception as e: + logger.error(f"Error in submit_prompt: {e}") + logger.error(f"Error type: {type(e)}") + import traceback + + logger.error(f"Full traceback: {traceback.format_exc()}") + raise class NIMVanna(ChromaDB_VectorStore, NIMCustomLLM): - def __init__(self, VectorConfig = None, LLMConfig = None): + """Vanna implementation using NVIDIA NIM for LLM and ChromaDB for vector storage.""" + + def __init__(self, VectorConfig=None, LLMConfig=None): ChromaDB_VectorStore.__init__(self, config=VectorConfig) NIMCustomLLM.__init__(self, config=LLMConfig) + + +class ElasticVectorStore(VannaBase): + """ + Elasticsearch-based vector store for Vanna. + + This class provides vector storage and retrieval capabilities using Elasticsearch's + dense_vector field type and kNN search functionality. + + Configuration: + config: Dictionary with the following keys: + - url: Elasticsearch connection URL (e.g., "http://localhost:9200") + - index_name: Name of the Elasticsearch index to use (default: "vanna_vectors") + - api_key: Optional API key for authentication + - username: Optional username for basic auth + - password: Optional password for basic auth + - embedding_function: Function to generate embeddings (required) + """ + + def __init__(self, config=None): + VannaBase.__init__(self, config=config) + + if not config: + raise ValueError("config must be passed for ElasticVectorStore") + + # Elasticsearch connection parameters + self.url = config.get("url", "http://localhost:9200") + self.index_name = config.get("index_name", "vanna_vectors") + self.api_key = config.get("api_key") + self.username = config.get("username") + self.password = config.get("password") + + # Embedding function (required) + if "embedding_function" not in config: + raise ValueError("embedding_function must be provided in config") + self.embedding_function = config["embedding_function"] + + # Initialize Elasticsearch client + self._init_elasticsearch_client() + + # Create index if it doesn't exist + self._create_index_if_not_exists() + + logger.info(f"ElasticVectorStore initialized with index: {self.index_name}") + + def _init_elasticsearch_client(self): + """Initialize the Elasticsearch client with authentication.""" + try: + from elasticsearch import Elasticsearch + except ImportError: + raise ImportError( + "elasticsearch package is required for ElasticVectorStore. " + "Install it with: pip install elasticsearch" + ) + + # Build client kwargs + client_kwargs = {} + + if self.api_key: + client_kwargs["api_key"] = self.api_key + elif self.username and self.password: + client_kwargs["basic_auth"] = (self.username, self.password) + + self.es_client = Elasticsearch(self.url, **client_kwargs) + + # Test connection (try but don't fail if ping doesn't work) + try: + if self.es_client.ping(): + logger.info(f"Successfully connected to Elasticsearch at {self.url}") + else: + logger.warning(f"Elasticsearch ping failed, but will try to proceed at {self.url}") + except Exception as e: + logger.warning(f"Elasticsearch ping check failed ({e}), but will try to proceed") + + def _create_index_if_not_exists(self): + """Create the Elasticsearch index with appropriate mappings if it doesn't exist.""" + if self.es_client.indices.exists(index=self.index_name): + logger.debug(f"Index {self.index_name} already exists") + return + + # Get embedding dimension by creating a test embedding + test_embedding = self._generate_embedding("test") + embedding_dim = len(test_embedding) + + # Index mapping with dense_vector field for embeddings + index_mapping = { + "mappings": { + "properties": { + "id": {"type": "keyword"}, + "text": {"type": "text"}, + "embedding": { + "type": "dense_vector", + "dims": embedding_dim, + "index": True, + "similarity": "cosine" + }, + "metadata": {"type": "object", "enabled": True}, + "type": {"type": "keyword"}, # ddl, documentation, sql + "created_at": {"type": "date"} + } + } + } + + self.es_client.indices.create(index=self.index_name, body=index_mapping) + logger.info(f"Created Elasticsearch index: {self.index_name}") + + def _generate_embedding(self, text: str) -> list[float]: + """Generate embedding for a given text using the configured embedding function.""" + if hasattr(self.embedding_function, 'embed_query'): + # NVIDIA embedding function returns [[embedding]] + result = self.embedding_function.embed_query(text) + if isinstance(result, list) and len(result) > 0: + if isinstance(result[0], list): + return result[0] # Extract the inner list + return result # type: ignore[return-value] + return result # type: ignore[return-value] + elif callable(self.embedding_function): + # Generic callable + result = self.embedding_function(text) + if isinstance(result, list) and len(result) > 0: + if isinstance(result[0], list): + return result[0] + return result # type: ignore[return-value] + return result # type: ignore[return-value] + else: + raise ValueError("embedding_function must be callable or have embed_query method") + + def add_ddl(self, ddl: str, **kwargs) -> str: + """ + Add a DDL statement to the vector store. + + Args: + ddl: The DDL statement to store + **kwargs: Additional metadata + + Returns: + Document ID + """ + import hashlib + from datetime import datetime + + # Generate document ID + doc_id = hashlib.md5(ddl.encode()).hexdigest() + + # Generate embedding + embedding = self._generate_embedding(ddl) + + # Create document + doc = { + "id": doc_id, + "text": ddl, + "embedding": embedding, + "type": "ddl", + "metadata": kwargs, + "created_at": datetime.utcnow().isoformat() + } + + # Index document + self.es_client.index(index=self.index_name, id=doc_id, document=doc) + logger.debug(f"Added DDL to Elasticsearch: {doc_id}") + + return doc_id + + def add_documentation(self, documentation: str, **kwargs) -> str: + """ + Add documentation to the vector store. + + Args: + documentation: The documentation text to store + **kwargs: Additional metadata + + Returns: + Document ID + """ + import hashlib + from datetime import datetime + + doc_id = hashlib.md5(documentation.encode()).hexdigest() + embedding = self._generate_embedding(documentation) + + doc = { + "id": doc_id, + "text": documentation, + "embedding": embedding, + "type": "documentation", + "metadata": kwargs, + "created_at": datetime.utcnow().isoformat() + } + + self.es_client.index(index=self.index_name, id=doc_id, document=doc) + logger.debug(f"Added documentation to Elasticsearch: {doc_id}") + + return doc_id -class CustomEmbeddingFunction: + def add_question_sql(self, question: str, sql: str, **kwargs) -> str: + """ + Add a question-SQL pair to the vector store. + + Args: + question: The natural language question + sql: The corresponding SQL query + **kwargs: Additional metadata + + Returns: + Document ID + """ + import hashlib + from datetime import datetime + + # Combine question and SQL for embedding + combined_text = f"Question: {question}\nSQL: {sql}" + doc_id = hashlib.md5(combined_text.encode()).hexdigest() + embedding = self._generate_embedding(question) + + doc = { + "id": doc_id, + "text": combined_text, + "embedding": embedding, + "type": "sql", + "metadata": { + "question": question, + "sql": sql, + **kwargs + }, + "created_at": datetime.utcnow().isoformat() + } + + self.es_client.index(index=self.index_name, id=doc_id, document=doc) + logger.debug(f"Added question-SQL pair to Elasticsearch: {doc_id}") + + return doc_id + + def get_similar_question_sql(self, question: str, **kwargs) -> list: + """ + Retrieve similar question-SQL pairs using vector similarity search. + + Args: + question: The question to find similar examples for + **kwargs: Additional parameters (e.g., top_k) + + Returns: + List of similar documents + """ + top_k = kwargs.get("top_k", 10) + + # Generate query embedding + query_embedding = self._generate_embedding(question) + + # Build kNN search query + search_query = { + "knn": { + "field": "embedding", + "query_vector": query_embedding, + "k": top_k, + "num_candidates": top_k * 2, + "filter": {"term": {"type": "sql"}} + }, + "_source": ["text", "metadata", "type"] + } + + # Execute search + response = self.es_client.search(index=self.index_name, body=search_query) + + # Extract results + results = [] + for hit in response["hits"]["hits"]: + source = hit["_source"] + results.append({ + "question": source["metadata"].get("question", ""), + "sql": source["metadata"].get("sql", ""), + "score": hit["_score"] + }) + + logger.debug(f"Found {len(results)} similar question-SQL pairs") + return results + + def get_related_ddl(self, question: str, **kwargs) -> list: + """ + Retrieve related DDL statements using vector similarity search. + + Args: + question: The question to find related DDL for + **kwargs: Additional parameters (e.g., top_k) + + Returns: + List of related DDL statements + """ + top_k = kwargs.get("top_k", 10) + query_embedding = self._generate_embedding(question) + + search_query = { + "knn": { + "field": "embedding", + "query_vector": query_embedding, + "k": top_k, + "num_candidates": top_k * 2, + "filter": {"term": {"type": "ddl"}} + }, + "_source": ["text"] + } + + response = self.es_client.search(index=self.index_name, body=search_query) + + results = [hit["_source"]["text"] for hit in response["hits"]["hits"]] + logger.debug(f"Found {len(results)} related DDL statements") + return results + + def get_related_documentation(self, question: str, **kwargs) -> list: + """ + Retrieve related documentation using vector similarity search. + + Args: + question: The question to find related documentation for + **kwargs: Additional parameters (e.g., top_k) + + Returns: + List of related documentation + """ + top_k = kwargs.get("top_k", 10) + query_embedding = self._generate_embedding(question) + + search_query = { + "knn": { + "field": "embedding", + "query_vector": query_embedding, + "k": top_k, + "num_candidates": top_k * 2, + "filter": {"term": {"type": "documentation"}} + }, + "_source": ["text"] + } + + response = self.es_client.search(index=self.index_name, body=search_query) + + results = [hit["_source"]["text"] for hit in response["hits"]["hits"]] + logger.debug(f"Found {len(results)} related documentation entries") + return results + + def remove_training_data(self, id: str, **kwargs) -> bool: + """ + Remove a training data entry by ID. + + Args: + id: The document ID to remove + **kwargs: Additional parameters + + Returns: + True if successful + """ + try: + self.es_client.delete(index=self.index_name, id=id) + logger.debug(f"Removed training data: {id}") + return True + except Exception as e: + logger.error(f"Error removing training data {id}: {e}") + return False + + def generate_embedding(self, data: str, **kwargs) -> list[float]: + """ + Generate embedding for given data (required by Vanna base class). + + Args: + data: Text to generate embedding for + **kwargs: Additional parameters + + Returns: + Embedding vector + """ + return self._generate_embedding(data) + + def get_training_data(self, **kwargs) -> list: + """ + Get all training data from the vector store (required by Vanna base class). + + Args: + **kwargs: Additional parameters + + Returns: + List of training data entries + """ + try: + # Query all documents + query = { + "query": {"match_all": {}}, + "size": 10000 # Adjust based on expected data size + } + + response = self.es_client.search(index=self.index_name, body=query) + + training_data = [] + for hit in response["hits"]["hits"]: + source = hit["_source"] + training_data.append({ + "id": hit["_id"], + "type": source.get("type"), + "text": source.get("text"), + "metadata": source.get("metadata", {}) + }) + + return training_data + except Exception as e: + logger.error(f"Error getting training data: {e}") + return [] + + +class ElasticNIMVanna(ElasticVectorStore, NIMCustomLLM): + """ + Vanna implementation using NVIDIA NIM for LLM and Elasticsearch for vector storage. + + This class combines ElasticVectorStore for vector operations with NIMCustomLLM + for SQL generation, providing an alternative to ChromaDB-based storage. + + Example: + >>> vanna = ElasticNIMVanna( + ... VectorConfig={ + ... "url": "http://localhost:9200", + ... "index_name": "my_sql_vectors", + ... "username": "elastic", + ... "password": "changeme", + ... "embedding_function": NVIDIAEmbeddingFunction( + ... api_key="your-api-key", + ... model="nvidia/llama-3.2-nv-embedqa-1b-v2" + ... ) + ... }, + ... LLMConfig={ + ... "api_key": "your-api-key", + ... "model": "meta/llama-3.1-70b-instruct" + ... } + ... ) + """ + + def __init__(self, VectorConfig=None, LLMConfig=None): + ElasticVectorStore.__init__(self, config=VectorConfig) + NIMCustomLLM.__init__(self, config=LLMConfig) + + +class NVIDIAEmbeddingFunction: """ A class that can be used as a replacement for chroma's DefaultEmbeddingFunction. It takes in input (text or list of texts) and returns embeddings using NVIDIA's API. + + This class fixes two major interface compatibility issues between ChromaDB and NVIDIA embeddings: + + 1. INPUT FORMAT MISMATCH: + - ChromaDB passes ['query text'] (list) to embed_query() + - But langchain_nvidia's embed_query() expects 'query text' (string) + - When list is passed, langchain does [text] internally → [['query text']] → API 500 error + - FIX: Detect list input and extract string before calling langchain + + 2. OUTPUT FORMAT MISMATCH: + - ChromaDB expects embed_query() to return [[embedding_vector]] (list of embeddings) + - But langchain returns [embedding_vector] (single embedding vector) + - This causes: TypeError: 'float' object cannot be converted to 'Sequence' + - FIX: Wrap single embedding in list: return [embeddings] """ - def __init__(self, api_key, model="nvidia/nv-embedqa-e5-v5"): + def __init__(self, api_key, model="nvidia/llama-3.2-nv-embedqa-1b-v2"): """ Initialize the embedding function with the API key and model name. Parameters: - api_key (str): The API key for authentication. - - model (str): The model name to use for embeddings (default is "nvidia/nv-embedqa-e5-v5"). + - model (str): The model name to use for embeddings. + Default: nvidia/llama-3.2-nv-embedqa-1b-v2 (tested and working) """ - from langchain_nvidia import NVIDIAEmbeddings - + self.api_key = api_key + self.model = model + + logger.info(f"Initializing NVIDIA embeddings with model: {model}") + logger.debug(f"API key length: {len(api_key) if api_key else 0}") + self.embeddings = NVIDIAEmbeddings( - api_key=api_key, - model_name=model, - input_type="query", - truncate="NONE" + api_key=api_key, model_name=model, input_type="query", truncate="NONE" ) + logger.info("Successfully initialized NVIDIA embeddings") def __call__(self, input): """ Call method to make the object callable, as required by chroma's EmbeddingFunction interface. + NOTE: This method is used by ChromaDB for batch embedding operations. + The embed_query() method above handles the single query case with the critical fixes. + Parameters: - input (str or list): The input data for which embeddings need to be generated. Returns: - embedding (list): The embedding vector(s) for the input data. """ - # Ensure input is a list, as required by the API - input_data = [input] if isinstance(input, str) else input - - # Generate embeddings + logger.debug(f"__call__ method called with input type: {type(input)}") + logger.debug(f"__call__ input: {input}") + + # Ensure input is a list, as required by ChromaDB + if isinstance(input, str): + input_data = [input] + else: + input_data = input + + logger.debug(f"Processing {len(input_data)} texts for embedding") + + # Generate embeddings for each text embeddings = [] - for text in input_data: + for i, text in enumerate(input_data): + logger.debug(f"Embedding text {i+1}/{len(input_data)}: {text[:50]}...") embedding = self.embeddings.embed_query(text) embeddings.append(embedding) - - return embeddings[0] if len(embeddings) == 1 and isinstance(input, str) else embeddings - + + logger.debug(f"Generated {len(embeddings)} embeddings") + # Always return a list of embeddings for ChromaDB + return embeddings + def name(self): """ Returns a custom name for the embedding function. @@ -132,192 +634,78 @@ def name(self): str: The name of the embedding function. """ return "NVIDIA Embedding Function" - -def initVannaBackup(vn): - """ - Backup initialization function for Vanna with hardcoded NASA Turbofan Engine training data. - - This function provides the original hardcoded training approach for NASA Turbofan Engine - predictive maintenance queries. Use this as a fallback if the JSON-based training fails. - - Args: - vn: Vanna instance to be trained and configured - - Returns: - None: Modifies the Vanna instance in-place - - Example: - >>> from vanna.chromadb import ChromaDB_VectorStore - >>> vn = NIMCustomLLM(config) & ChromaDB_VectorStore() - >>> vn.connect_to_sqlite("path/to/nasa_turbo.db") - >>> initVannaBackup(vn) - >>> # Vanna is now ready with hardcoded NASA Turbofan training - """ - import json - import os - - # Get and train DDL from sqlite_master - df_ddl = vn.run_sql("SELECT type, sql FROM sqlite_master WHERE sql is not null") - for ddl in df_ddl['sql'].to_list(): - vn.train(ddl=ddl) - - # Fallback to default NASA Turbofan training - fd_datasets = ["FD001", "FD002", "FD003", "FD004"] - for fd in fd_datasets: - vn.train(ddl=f""" - CREATE TABLE IF NOT EXISTS RUL_{fd} ( - "unit_number" INTEGER, - "RUL" INTEGER + + def embed_query(self, input: str) -> list[list[float]]: + """ + Generate embeddings for a single query. + + ChromaDB calls this method with ['query text'] (list) but langchain_nvidia expects 'query text' (string). + We must extract the string from the list to prevent API 500 errors. + + ChromaDB expects this method to return [[embedding_vector]] (list of embeddings) + but langchain returns [embedding_vector] (single embedding). We wrap it in a list. + """ + logger.debug(f"Embedding query: {input}") + logger.debug(f"Input type: {type(input)}") + logger.debug(f"Using model: {self.model}") + + # Handle ChromaDB's list input format + # ChromaDB sometimes passes a list instead of a string + # Extract the string from the list if needed + if isinstance(input, list): + if len(input) == 1: + query_text = input[0] + logger.debug(f"Extracted string from list: {query_text}") + else: + logger.error(f"Unexpected list length: {len(input)}") + raise ValueError( + f"Expected single string or list with one element, got list with {len(input)} elements" + ) + else: + query_text = input + + try: + # Call langchain_nvidia with the extracted string + embeddings = self.embeddings.embed_query(query_text) + logger.debug( + f"Successfully generated embeddings of length: {len(embeddings) if embeddings else 0}" ) - """) - - sensor_columns = """ - "unit_number" INTEGER, - "time_in_cycles" INTEGER, - "operational_setting_1" REAL, - "operational_setting_2" REAL, - "operational_setting_3" REAL, - "sensor_measurement_1" REAL, - "sensor_measurement_2" REAL, - "sensor_measurement_3" REAL, - "sensor_measurement_4" REAL, - "sensor_measurement_5" REAL, - "sensor_measurement_6" REAL, - "sensor_measurement_7" REAL, - "sensor_measurement_8" REAL, - "sensor_measurement_9" REAL, - "sensor_measurement_10" REAL, - "sensor_measurement_11" REAL, - "sensor_measurement_12" REAL, - "sensor_measurement_13" REAL, - "sensor_measurement_14" REAL, - "sensor_measurement_15" REAL, - "sensor_measurement_16" REAL, - "sensor_measurement_17" INTEGER, - "sensor_measurement_18" INTEGER, - "sensor_measurement_19" REAL, - "sensor_measurement_20" REAL, - "sensor_measurement_21" REAL - """ - for fd in fd_datasets: - vn.train(ddl=f"CREATE TABLE IF NOT EXISTS train_{fd} ({sensor_columns})") - vn.train(ddl=f"CREATE TABLE IF NOT EXISTS test_{fd} ({sensor_columns})") - - # Default documentation for NASA Turbofan - dataset_documentation = """ - This SQL database contains train and test splits of four different datasets: FD001, FD002, FD003, FD004. - Each dataset consists of multiple multivariate time series from different engines of the same type. - - DATABASE STRUCTURE: - The data is organized into separate tables for each dataset: - - Training Tables: train_FD001, train_FD002, train_FD003, train_FD004 - Test Tables: test_FD001, test_FD002, test_FD003, test_FD004 - RUL Tables: RUL_FD001, RUL_FD002, RUL_FD003, RUL_FD004 - - Each training and test table contains 26 columns with identical structure: - - unit_number: INTEGER - Identifier for each engine unit - - time_in_cycles: INTEGER - Time step in operational cycles - - operational_setting_1: REAL - First operational setting affecting performance - - operational_setting_2: REAL - Second operational setting affecting performance - - operational_setting_3: REAL - Third operational setting affecting performance - - sensor_measurement_1 through sensor_measurement_21: REAL/INTEGER - Twenty-one sensor measurements - - Each RUL table contains 2 columns: - - unit_number: INTEGER - Engine unit identifier - - RUL: INTEGER - Remaining Useful Life value for that test unit - - QUERY PATTERNS: - - Table References: - - "train_FD001" or "dataset train_FD001" → Use table train_FD001 - - "test_FD002" or "dataset test_FD002" → Use table test_FD002 - - "FD003" (without train/test prefix) → Determine from context whether to use train_FD003 or test_FD003 - - For RUL queries: Use specific RUL table (RUL_FD001, RUL_FD002, RUL_FD003, or RUL_FD004) - - Counting Patterns: - - "How many units" → Use COUNT(DISTINCT unit_number) to count unique engines - - "How many records/data points/measurements/entries/rows" → Use COUNT(*) to count all records - - RUL Handling (CRITICAL DISTINCTION): - - 1. GROUND TRUTH RUL (for test data): - - Use when query asks for "actual RUL", "true RUL", "ground truth", or "what is the RUL" - - Query specific RUL table: SELECT RUL FROM RUL_FD001 WHERE unit_number=N - - For time-series with ground truth: ((SELECT MAX(time_in_cycles) FROM test_FDxxx WHERE unit_number=N) + (SELECT RUL FROM RUL_FDxxx WHERE unit_number=N) - time_in_cycles) - - 2. PREDICTED/CALCULATED RUL (for training data or prediction requests): - - Use when query asks to "predict RUL", "calculate RUL", "estimate RUL", or "find RUL" for training data - - For training data: Calculate as remaining cycles until failure = (MAX(time_in_cycles) - current_time_in_cycles + 1) - - Training RUL query: SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FDxxx - - DEFAULT BEHAVIOR: If unclear, assume user wants PREDICTION (since this is more common) - - Column Names (consistent across all training and test tables): - - unit_number: Engine identifier - - time_in_cycles: Time step - - operational_setting_1, operational_setting_2, operational_setting_3: Operational settings - - sensor_measurement_1, sensor_measurement_2, ..., sensor_measurement_21: Sensor readings - - IMPORTANT NOTES: - - Each dataset (FD001, FD002, FD003, FD004) has its own separate RUL table - - RUL tables do NOT have a 'dataset' column - they are dataset-specific by table name - - Training tables contain data until engine failure - - Test tables contain data that stops before failure - - RUL tables provide the actual remaining cycles for test units - - ENGINE OPERATION CONTEXT: - Each engine starts with different degrees of initial wear and manufacturing variation. - The engine operates normally at the start of each time series and develops a fault at some point during the series. - In the training set, the fault grows in magnitude until system failure. - In the test set, the time series ends some time prior to system failure. - The objective is to predict the number of remaining operational cycles before failure in the test set. - """ - vn.train(documentation=dataset_documentation) + # Wrap single embedding in list for ChromaDB compatibility + # ChromaDB expects a list of embeddings, even for a single query + return [embeddings] + except Exception as e: + logger.error(f"Error generating embeddings for query: {e}") + logger.error(f"Error type: {type(e)}") + logger.error(f"Query text: {query_text}") + import traceback - # Default training for NASA Turbofan - queries = [ - # 1. JOIN pattern between training and RUL tables - "SELECT t.unit_number, t.time_in_cycles, t.operational_setting_1, r.RUL FROM train_FD001 AS t JOIN RUL_FD001 AS r ON t.unit_number = r.unit_number WHERE t.unit_number = 1 ORDER BY t.time_in_cycles", - - # 2. Aggregation with multiple statistical functions - "SELECT unit_number, AVG(sensor_measurement_1) AS avg_sensor1, MAX(sensor_measurement_2) AS max_sensor2, MIN(sensor_measurement_3) AS min_sensor3 FROM train_FD002 GROUP BY unit_number", - - # 3. Test table filtering with time-based conditions - "SELECT * FROM test_FD003 WHERE time_in_cycles > 50 AND sensor_measurement_1 > 500 ORDER BY unit_number, time_in_cycles", - - # 4. Window function for predicted RUL calculation on training data - "SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FD004 WHERE unit_number <= 3 ORDER BY unit_number, time_in_cycles", - - # 5. Direct RUL table query with filtering - "SELECT unit_number, RUL FROM RUL_FD001 WHERE RUL > 100 ORDER BY RUL DESC" - ] + logger.error(f"Full traceback: {traceback.format_exc()}") + raise - for query in tqdm(queries, desc="Training NIMVanna"): - vn.train(sql=query) + def embed_documents(self, input: list[str]) -> list[list[float]]: + """ + Generate embeddings for multiple documents. + + This function expects a list of strings. If it's a list of lists of strings, flatten it to handle cases + where the input is unexpectedly nested. + """ + logger.debug(f"Embedding {len(input)} documents...") + logger.debug(f"Using model: {self.model}") + + try: + embeddings = self.embeddings.embed_documents(input) + logger.debug("Successfully generated document embeddings") + return embeddings + except Exception as e: + logger.error(f"Error generating document embeddings: {e}") + logger.error(f"Error type: {type(e)}") + logger.error(f"Input documents count: {len(input)}") + import traceback + + logger.error(f"Full traceback: {traceback.format_exc()}") + raise - # Essential question-SQL training pairs (covering key RUL distinction) - vn.train(question="Get time cycles and operational setting 1 for unit 1 from test FD001", - sql="SELECT time_in_cycles, operational_setting_1 FROM test_FD001 WHERE unit_number = 1") - - # Ground Truth RUL (from RUL tables) - vn.train(question="What is the actual remaining useful life for unit 1 in test dataset FD001", - sql="SELECT RUL FROM RUL_FD001 WHERE unit_number = 1") - - # Predicted RUL (calculated for training data) - vn.train(question="Predict the remaining useful life for each time cycle of unit 1 in training dataset FD001", - sql="SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FD001 WHERE unit_number = 1 ORDER BY time_in_cycles") - - vn.train(question="How many units are in the training data for FD002", - sql="SELECT COUNT(DISTINCT unit_number) FROM train_FD002") - - # Additional RUL distinction training - vn.train(question="Calculate RUL for training data in FD003", - sql="SELECT unit_number, time_in_cycles, (MAX(time_in_cycles) OVER (PARTITION BY unit_number) - time_in_cycles + 1) AS predicted_RUL FROM train_FD003 ORDER BY unit_number, time_in_cycles") - - vn.train(question="Get ground truth RUL values for all units in test FD002", - sql="SELECT unit_number, RUL FROM RUL_FD002 ORDER BY unit_number") def chunk_documentation(text: str, max_chars: int = 1500) -> list: """ diff --git a/industries/asset_lifecycle_management_agent/utils_template/__init__.py b/industries/asset_lifecycle_management_agent/utils_template/__init__.py new file mode 100644 index 000000000..d5528a384 --- /dev/null +++ b/industries/asset_lifecycle_management_agent/utils_template/__init__.py @@ -0,0 +1,11 @@ +""" +Workspace utilities for Asset Lifecycle Management tasks. + +These pre-built utility functions provide reliable, tested implementations +for common data processing tasks, particularly for predictive maintenance workflows. +""" + +from .rul_utils import apply_piecewise_rul_transformation, show_utilities + +__all__ = ['apply_piecewise_rul_transformation', 'show_utilities'] + diff --git a/industries/asset_lifecycle_management_agent/utils_template/rul_utils.py b/industries/asset_lifecycle_management_agent/utils_template/rul_utils.py new file mode 100644 index 000000000..afebd9f0c --- /dev/null +++ b/industries/asset_lifecycle_management_agent/utils_template/rul_utils.py @@ -0,0 +1,146 @@ +""" +RUL (Remaining Useful Life) transformation utilities. + +Provides pre-built functions for transforming RUL data to create realistic patterns +for Asset Lifecycle Management and predictive maintenance tasks. +""" + +import pandas as pd +import logging + +logger = logging.getLogger(__name__) + + +def apply_piecewise_rul_transformation( + df: pd.DataFrame, + maxlife: int = 100, + time_col: str = 'time_in_cycles', + rul_col: str = 'RUL' +) -> pd.DataFrame: + """ + Transform RUL data to create realistic "knee" patterns. + + This function applies a piecewise transformation to RUL (Remaining Useful Life) values + to create a more realistic degradation pattern commonly seen in predictive maintenance: + - RUL stays constant at MAXLIFE until the remaining cycles drop below the threshold + - Then RUL decreases linearly to 0 as the equipment approaches failure + + This creates the characteristic "knee" pattern seen in actual equipment degradation. + + Args: + df: pandas DataFrame with time series data containing RUL values + maxlife: Maximum life threshold for the piecewise function (default: 100) + RUL values above this will be capped at maxlife + time_col: Name of the time/cycle column (default: 'time_in_cycles') + rul_col: Name of the RUL column to transform (default: 'RUL') + + Returns: + pandas DataFrame with original data plus new 'transformed_RUL' column + + Raises: + ValueError: If required columns are missing from the DataFrame + + Example: + >>> df = pd.DataFrame({'time_in_cycles': [1, 2, 3], 'RUL': [150, 100, 50]}) + >>> df_transformed = apply_piecewise_rul_transformation(df, maxlife=100) + >>> print(df_transformed['transformed_RUL']) + 0 100 + 1 100 + 2 50 + Name: transformed_RUL, dtype: int64 + """ + # Validate inputs + if not isinstance(df, pd.DataFrame): + raise ValueError(f"Expected pandas DataFrame, got {type(df)}") + + if rul_col not in df.columns: + raise ValueError( + f"RUL column '{rul_col}' not found in DataFrame. " + f"Available columns: {list(df.columns)}" + ) + + if time_col not in df.columns: + logger.warning( + f"Time column '{time_col}' not found in DataFrame, but continuing anyway. " + f"Available columns: {list(df.columns)}" + ) + + # Create a copy to avoid modifying the original + df_copy = df.copy() + + logger.info(f"Applying piecewise RUL transformation with maxlife={maxlife}") + logger.debug(f"Input RUL range: [{df_copy[rul_col].min()}, {df_copy[rul_col].max()}]") + + # Apply piecewise transformation + def transform_rul(rul_value): + """Apply the piecewise transformation to a single RUL value.""" + if pd.isna(rul_value): + return rul_value # Keep NaN values as NaN + if rul_value > maxlife: + return maxlife + return rul_value + + # Apply transformation to create new column + df_copy['transformed_RUL'] = df_copy[rul_col].apply(transform_rul) + + logger.info( + f"✅ Transformation complete! Added 'transformed_RUL' column. " + f"Output range: [{df_copy['transformed_RUL'].min()}, {df_copy['transformed_RUL'].max()}]" + ) + logger.debug(f"Total rows processed: {len(df_copy)}") + + return df_copy + + +def show_utilities(): + """ + Display available utility functions and their usage. + + Prints a formatted list of all available utilities in this workspace, + including descriptions and example usage. + """ + utilities_info = """ + ================================================================================ + WORKSPACE UTILITIES - Asset Lifecycle Management + ================================================================================ + + Available utility functions: + + 1. apply_piecewise_rul_transformation(df, maxlife=100, time_col='time_in_cycles', rul_col='RUL') + + Description: + Transforms RUL (Remaining Useful Life) data to create realistic "knee" patterns + commonly seen in predictive maintenance scenarios. + + Parameters: + - df: pandas DataFrame with time series data + - maxlife: Maximum life threshold (default: 100) + - time_col: Name of time/cycle column (default: 'time_in_cycles') + - rul_col: Name of RUL column to transform (default: 'RUL') + + Returns: + DataFrame with original data plus new 'transformed_RUL' column + + Example: + df_transformed = utils.apply_piecewise_rul_transformation(df, maxlife=100) + print(df_transformed[['time_in_cycles', 'RUL', 'transformed_RUL']]) + + 2. show_utilities() + + Description: + Displays this help message with all available utilities. + + Example: + utils.show_utilities() + + ================================================================================ + """ + print(utilities_info) + + +if __name__ == "__main__": + # Simple test + print("RUL Utilities Module") + print("=" * 50) + show_utilities() +