From 20db964fca51d8e170cf4009433e09fd9e3b3397 Mon Sep 17 00:00:00 2001 From: Anup Kalburgi Date: Tue, 23 Sep 2025 12:16:41 -0400 Subject: [PATCH 1/5] working test wireup --- dbldatagen/spec/compat.py | 18 ++++++++++ dbldatagen/spec/generator.py | 69 ++++++++++++++++++++++++++++++++++++ makefile | 4 +-- pyproject.toml | 29 ++++++++++----- tests/test_spec.py | 7 ++++ 5 files changed, 117 insertions(+), 10 deletions(-) create mode 100644 dbldatagen/spec/compat.py create mode 100644 dbldatagen/spec/generator.py create mode 100644 tests/test_spec.py diff --git a/dbldatagen/spec/compat.py b/dbldatagen/spec/compat.py new file mode 100644 index 00000000..68649b36 --- /dev/null +++ b/dbldatagen/spec/compat.py @@ -0,0 +1,18 @@ +# This module acts as a compatibility layer for Pydantic V1 and V2. + +try: + # This will succeed on environments with Pydantic V2.x + # It imports the V1 API that is bundled within V2. + from pydantic.v1 import BaseModel, Field, validator, constr + +except ImportError: + # This will be executed on environments with only Pydantic V1.x + from pydantic import BaseModel, Field, validator, constr + +# In your application code, do this: +# from .compat import BaseModel +# NOT this: +# from pydantic import BaseModel + +# FastAPI Notes +# https://github.com/fastapi/fastapi/blob/master/fastapi/_compat.py \ No newline at end of file diff --git a/dbldatagen/spec/generator.py b/dbldatagen/spec/generator.py new file mode 100644 index 00000000..6039165f --- /dev/null +++ b/dbldatagen/spec/generator.py @@ -0,0 +1,69 @@ +from .compat import BaseModel +from typing import Dict, Optional, Union, Any + + +# class ColumnDefinition(BaseModel): +# name: str +# type: Optional[DbldatagenBasicType] = None +# primary: bool = False +# options: Optional[Dict[str, Any]] = {} +# nullable: Optional[bool] = False +# omit: Optional[bool] = False +# baseColumn: Optional[str] = "id" +# baseColumnType: Optional[str] = "auto" + +# @model_validator(mode="after") +# def check_constraints(self): +# if self.primary: +# if "min" in self.options or "max" in self.options: +# raise ValueError( +# f"Primary column '{self.name}' cannot have min/max options.") +# if self.nullable: +# raise ValueError( +# f"Primary column '{self.name}' cannot be nullable.") +# if self.primary and self.type is None: +# raise ValueError( +# f"Primary column '{self.name}' must have a type defined.") +# return self + + +# class TableDefinition(BaseModel): +# number_of_rows: int +# partitions: Optional[int] = None +# columns: List[ColumnDefinition] + + +# class DatagenSpec(BaseModel): +# tables: Dict[str, TableDefinition] +# output_destination: Optional[Union[UCSchemaTarget, FilePathTarget]] = None +# generator_options: Optional[Dict[str, Any]] = {} +# intended_for_databricks: Optional[bool] = None + + + +# def display_all_tables(self): +# for table_name, table_def in self.tables.items(): +# print(f"Table: {table_name}") + +# if self.output_destination: +# output = f"{self.output_destination}" +# display(HTML(f"Output destination: {output}")) +# else: +# message = ( +# "Output destination: " +# "None
" +# "Set it using the output_destination " +# "attribute on your DatagenSpec object " +# "(e.g., my_spec.output_destination = UCSchemaTarget(...))." +# ) +# display(HTML(message)) + +# df = pd.DataFrame([col.dict() for col in table_def.columns]) +# try: +# display(df) +# except NameError: +# print(df.to_string()) + + +class DatagenSpec(BaseModel): + name: str \ No newline at end of file diff --git a/makefile b/makefile index 772397bf..a5f4486c 100644 --- a/makefile +++ b/makefile @@ -8,7 +8,7 @@ clean: .venv/bin/python: pip install hatch - hatch env create + hatch env create test-pydantic.pydantic==1.10.6-v1 dev: .venv/bin/python @hatch run which python @@ -20,7 +20,7 @@ fmt: hatch run fmt test: - hatch run test + hatch run test-pydantic:test test-coverage: make test && open htmlcov/index.html diff --git a/pyproject.toml b/pyproject.toml index 13728ba2..304562e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -106,18 +106,31 @@ dependencies = [ ] python="3.10" - -# store virtual env as the child of this folder. Helps VSCode (and PyCharm) to run better path = ".venv" [tool.hatch.envs.default.scripts] test = "pytest tests/ -n 10 --cov --cov-report=html --timeout 600 --durations 20" -fmt = ["ruff check . --fix", - "mypy .", - "pylint --output-format=colorized -j 0 dbldatagen tests"] -verify = ["ruff check .", - "mypy .", - "pylint --output-format=colorized -j 0 dbldatagen tests"] +fmt = [ + "ruff check . --fix", + "mypy .", + "pylint --output-format=colorized -j 0 dbldatagen tests" +] +verify = [ + "ruff check .", + "mypy .", + "pylint --output-format=colorized -j 0 dbldatagen tests" +] + + +[tool.hatch.envs.test-pydantic] +template = "default" +matrix = [ + { pydantic_version = ["1.10.6", "2.8.2"] } +] +extra-dependencies = [ + "pydantic=={matrix:pydantic_version}" +] + # Ruff configuration - replaces flake8, isort, pydocstyle, etc. [tool.ruff] diff --git a/tests/test_spec.py b/tests/test_spec.py new file mode 100644 index 00000000..2e12cb27 --- /dev/null +++ b/tests/test_spec.py @@ -0,0 +1,7 @@ +from dbldatagen.spec.generator import DatagenSpec + +def test_spec(): + spec = DatagenSpec(name="test_spec") + assert spec.name == "test_spec" + + From 4e9cba5272586692337ea2f29574a428ff3e585d Mon Sep 17 00:00:00 2001 From: Anup Kalburgi Date: Tue, 4 Nov 2025 09:10:38 -0500 Subject: [PATCH 2/5] Initial code, spec and test, pushing for review --- dbldatagen/spec/compat.py | 17 +- dbldatagen/spec/generator.py | 69 ---- dbldatagen/spec/generator_spec.py | 324 +++++++++++++++++ dbldatagen/spec/generator_spec_impl.py | 254 ++++++++++++++ pydantic_compat.md | 101 ++++++ scratch.md | 4 + tests/test_spec.py | 7 - tests/test_specs.py | 466 +++++++++++++++++++++++++ 8 files changed, 1164 insertions(+), 78 deletions(-) delete mode 100644 dbldatagen/spec/generator.py create mode 100644 dbldatagen/spec/generator_spec.py create mode 100644 dbldatagen/spec/generator_spec_impl.py create mode 100644 pydantic_compat.md create mode 100644 scratch.md delete mode 100644 tests/test_spec.py create mode 100644 tests/test_specs.py diff --git a/dbldatagen/spec/compat.py b/dbldatagen/spec/compat.py index 68649b36..7c30d57d 100644 --- a/dbldatagen/spec/compat.py +++ b/dbldatagen/spec/compat.py @@ -7,7 +7,7 @@ except ImportError: # This will be executed on environments with only Pydantic V1.x - from pydantic import BaseModel, Field, validator, constr + from pydantic import BaseModel, Field, validator, constr, root_validator, field_validator # In your application code, do this: # from .compat import BaseModel @@ -15,4 +15,17 @@ # from pydantic import BaseModel # FastAPI Notes -# https://github.com/fastapi/fastapi/blob/master/fastapi/_compat.py \ No newline at end of file +# https://github.com/fastapi/fastapi/blob/master/fastapi/_compat.py + + +""" +## Why This Approach +No Installation Required: It directly addresses your core requirement. +You don't need to %pip install anything, which avoids conflicts with the pre-installed libraries on Databricks. +Single Codebase: You maintain one set of code that is guaranteed to work with the Pydantic V1 API, which is available in both runtimes. + +Environment Agnostic: Your application code in models.py has no idea which version of Pydantic is actually installed. The compat.py module handles that complexity completely. + +Future-Ready: When you eventually decide to migrate fully to the Pydantic V2 API (to take advantage of its speed and features), +you only need to change your application code and your compat.py import statements, making the transition much clearer. +""" \ No newline at end of file diff --git a/dbldatagen/spec/generator.py b/dbldatagen/spec/generator.py deleted file mode 100644 index 6039165f..00000000 --- a/dbldatagen/spec/generator.py +++ /dev/null @@ -1,69 +0,0 @@ -from .compat import BaseModel -from typing import Dict, Optional, Union, Any - - -# class ColumnDefinition(BaseModel): -# name: str -# type: Optional[DbldatagenBasicType] = None -# primary: bool = False -# options: Optional[Dict[str, Any]] = {} -# nullable: Optional[bool] = False -# omit: Optional[bool] = False -# baseColumn: Optional[str] = "id" -# baseColumnType: Optional[str] = "auto" - -# @model_validator(mode="after") -# def check_constraints(self): -# if self.primary: -# if "min" in self.options or "max" in self.options: -# raise ValueError( -# f"Primary column '{self.name}' cannot have min/max options.") -# if self.nullable: -# raise ValueError( -# f"Primary column '{self.name}' cannot be nullable.") -# if self.primary and self.type is None: -# raise ValueError( -# f"Primary column '{self.name}' must have a type defined.") -# return self - - -# class TableDefinition(BaseModel): -# number_of_rows: int -# partitions: Optional[int] = None -# columns: List[ColumnDefinition] - - -# class DatagenSpec(BaseModel): -# tables: Dict[str, TableDefinition] -# output_destination: Optional[Union[UCSchemaTarget, FilePathTarget]] = None -# generator_options: Optional[Dict[str, Any]] = {} -# intended_for_databricks: Optional[bool] = None - - - -# def display_all_tables(self): -# for table_name, table_def in self.tables.items(): -# print(f"Table: {table_name}") - -# if self.output_destination: -# output = f"{self.output_destination}" -# display(HTML(f"Output destination: {output}")) -# else: -# message = ( -# "Output destination: " -# "None
" -# "Set it using the output_destination " -# "attribute on your DatagenSpec object " -# "(e.g., my_spec.output_destination = UCSchemaTarget(...))." -# ) -# display(HTML(message)) - -# df = pd.DataFrame([col.dict() for col in table_def.columns]) -# try: -# display(df) -# except NameError: -# print(df.to_string()) - - -class DatagenSpec(BaseModel): - name: str \ No newline at end of file diff --git a/dbldatagen/spec/generator_spec.py b/dbldatagen/spec/generator_spec.py new file mode 100644 index 00000000..23afc4a0 --- /dev/null +++ b/dbldatagen/spec/generator_spec.py @@ -0,0 +1,324 @@ +from .compat import BaseModel, validator, root_validator, field_validator +from typing import Dict, Optional, Union, Any, Literal, List +import pandas as pd +from IPython.display import display, HTML + +DbldatagenBasicType = Literal[ + "string", + "int", + "long", + "float", + "double", + "decimal", + "boolean", + "date", + "timestamp", + "short", + "byte", + "binary", + "integer", + "bigint", + "tinyint", +] + +class ColumnDefinition(BaseModel): + name: str + type: Optional[DbldatagenBasicType] = None + primary: bool = False + options: Optional[Dict[str, Any]] = {} + nullable: Optional[bool] = False + omit: Optional[bool] = False + baseColumn: Optional[str] = "id" + baseColumnType: Optional[str] = "auto" + + @root_validator(skip_on_failure=True) + def check_model_constraints(cls, values: Dict[str, Any]) -> Dict[str, Any]: + """ + Validates constraints across the entire model after individual fields are processed. + """ + is_primary = values.get("primary") + options = values.get("options", {}) + name = values.get("name") + is_nullable = values.get("nullable") + column_type = values.get("type") + + if is_primary: + if "min" in options or "max" in options: + raise ValueError(f"Primary column '{name}' cannot have min/max options.") + + if is_nullable: + raise ValueError(f"Primary column '{name}' cannot be nullable.") + + if column_type is None: + raise ValueError(f"Primary column '{name}' must have a type defined.") + return values + + +class UCSchemaTarget(BaseModel): + catalog: str + schema_: str + output_format: str = "delta" # Default to delta for UC Schema + + @field_validator("catalog", "schema_", mode="after") + def validate_identifiers(cls, v): # noqa: N805, pylint: disable=no-self-argument + if not v.strip(): + raise ValueError("Identifier must be non-empty.") + if not v.isidentifier(): + logger.warning( + f"'{v}' is not a basic Python identifier. Ensure validity for Unity Catalog.") + return v.strip() + + def __str__(self): + return f"{self.catalog}.{self.schema_} (Format: {self.output_format}, Type: UC Table)" + + +class FilePathTarget(BaseModel): + base_path: str + output_format: Literal["csv", "parquet"] # No default, must be specified + + @field_validator("base_path", mode="after") + def validate_base_path(cls, v): # noqa: N805, pylint: disable=no-self-argument + if not v.strip(): + raise ValueError("base_path must be non-empty.") + return v.strip() + + def __str__(self): + return f"{self.base_path} (Format: {self.output_format}, Type: File Path)" + + +class TableDefinition(BaseModel): + number_of_rows: int + partitions: Optional[int] = None + columns: List[ColumnDefinition] + + +class ValidationResult: + """Container for validation results with errors and warnings.""" + + def __init__(self) -> None: + self.errors: List[str] = [] + self.warnings: List[str] = [] + + def add_error(self, message: str) -> None: + """Add an error message.""" + self.errors.append(message) + + def add_warning(self, message: str) -> None: + """Add a warning message.""" + self.warnings.append(message) + + def is_valid(self) -> bool: + """Returns True if there are no errors.""" + return len(self.errors) == 0 + + def __str__(self) -> str: + """String representation of validation results.""" + lines = [] + if self.is_valid(): + lines.append("✓ Validation passed successfully") + else: + lines.append("✗ Validation failed") + + if self.errors: + lines.append(f"\nErrors ({len(self.errors)}):") + for i, error in enumerate(self.errors, 1): + lines.append(f" {i}. {error}") + + if self.warnings: + lines.append(f"\nWarnings ({len(self.warnings)}):") + for i, warning in enumerate(self.warnings, 1): + lines.append(f" {i}. {warning}") + + return "\n".join(lines) + +class DatagenSpec(BaseModel): + tables: Dict[str, TableDefinition] + output_destination: Optional[Union[UCSchemaTarget, FilePathTarget]] = None # there is a abstraction, may be we can use that? talk to Greg + generator_options: Optional[Dict[str, Any]] = {} + intended_for_databricks: Optional[bool] = None # May be infered. + + def _check_circular_dependencies( + self, + table_name: str, + columns: List[ColumnDefinition] + ) -> List[str]: + """ + Check for circular dependencies in baseColumn references. + Returns a list of error messages if circular dependencies are found. + """ + errors = [] + column_map = {col.name: col for col in columns} + + for col in columns: + if col.baseColumn and col.baseColumn != "id": + # Track the dependency chain + visited = set() + current = col.name + + while current: + if current in visited: + # Found a cycle + cycle_path = " -> ".join(list(visited) + [current]) + errors.append( + f"Table '{table_name}': Circular dependency detected in column '{col.name}': {cycle_path}" + ) + break + + visited.add(current) + current_col = column_map.get(current) + + if not current_col: + break + + # Move to the next column in the chain + if current_col.baseColumn and current_col.baseColumn != "id": + if current_col.baseColumn not in column_map: + # baseColumn doesn't exist - we'll catch this in another validation + break + current = current_col.baseColumn + else: + # Reached a column that doesn't have a baseColumn or uses "id" + break + + return errors + + def validate(self, strict: bool = True) -> ValidationResult: + """ + Validates the entire DatagenSpec configuration. + Always runs all validation checks and collects all errors and warnings. + + Args: + strict: If True, raises ValueError if any errors or warnings are found. + If False, only raises ValueError if errors (not warnings) are found. + + Returns: + ValidationResult object containing all errors and warnings found. + + Raises: + ValueError: If validation fails based on strict mode setting. + The exception message contains all errors and warnings. + """ + result = ValidationResult() + + # 1. Check that there's at least one table + if not self.tables: + result.add_error("Spec must contain at least one table definition") + + # 2. Validate each table (continue checking all tables even if errors found) + for table_name, table_def in self.tables.items(): + # Check table has at least one column + if not table_def.columns: + result.add_error(f"Table '{table_name}' must have at least one column") + continue # Skip further checks for this table since it has no columns + + # Check row count is positive + if table_def.number_of_rows <= 0: + result.add_error( + f"Table '{table_name}' has invalid number_of_rows: {table_def.number_of_rows}. " + "Must be a positive integer." + ) + + # Check partitions if specified + #TODO: though this can be a model field check, we are checking here so that one can correct + # Can we find a way to use the default way? + if table_def.partitions is not None and table_def.partitions <= 0: + result.add_error( + f"Table '{table_name}' has invalid partitions: {table_def.partitions}. " + "Must be a positive integer or None." + ) + + # Check for duplicate column names + # TODO: Not something possible if we right model, recheck + column_names = [col.name for col in table_def.columns] + duplicates = [name for name in set(column_names) if column_names.count(name) > 1] + if duplicates: + result.add_error( + f"Table '{table_name}' has duplicate column names: {', '.join(duplicates)}" + ) + + # Build column map for reference checking + column_map = {col.name: col for col in table_def.columns} + + # TODO: Check baseColumn references, this is tricky? check the dbldefaults + for col in table_def.columns: + if col.baseColumn and col.baseColumn != "id": + if col.baseColumn not in column_map: + result.add_error( + f"Table '{table_name}', column '{col.name}': " + f"baseColumn '{col.baseColumn}' does not exist in the table" + ) + + # Check for circular dependencies in baseColumn references + circular_errors = self._check_circular_dependencies(table_name, table_def.columns) + for error in circular_errors: + result.add_error(error) + + # Check primary key constraints + primary_columns = [col for col in table_def.columns if col.primary] + if len(primary_columns) > 1: + primary_names = [col.name for col in primary_columns] + result.add_warning( + f"Table '{table_name}' has multiple primary columns: {', '.join(primary_names)}. " + "This may not be the intended behavior." + ) + + # Check for columns with no type and not using baseColumn properly + for col in table_def.columns: + if not col.primary and not col.type and not col.options: + result.add_warning( + f"Table '{table_name}', column '{col.name}': " + "No type specified and no options provided. " + "Column may not generate data as expected." + ) + + # 3. Check output destination + if not self.output_destination: + result.add_warning( + "No output_destination specified. Data will be generated but not persisted. " + "Set output_destination to save generated data." + ) + + # 4. Validate generator options (if any known options) + if self.generator_options: + known_options = [ + "random", "randomSeed", "randomSeedMethod", "verbose", + "debug", "seedColumnName" + ] + for key in self.generator_options.keys(): + if key not in known_options: + result.add_warning( + f"Unknown generator option: '{key}'. " + "This may be ignored during generation." + ) + + # Now that all validations are complete, decide whether to raise + if strict and (result.errors or result.warnings): + raise ValueError(str(result)) + elif not strict and result.errors: + raise ValueError(str(result)) + + return result + + + def display_all_tables(self) -> None: + for table_name, table_def in self.tables.items(): + print(f"Table: {table_name}") + + if self.output_destination: + output = f"{self.output_destination}" + display(HTML(f"Output destination: {output}")) + else: + message = ( + "Output destination: " + "None
" + "Set it using the output_destination " + "attribute on your DatagenSpec object " + "(e.g., my_spec.output_destination = UCSchemaTarget(...))." + ) + display(HTML(message)) + + df = pd.DataFrame([col.dict() for col in table_def.columns]) + try: + display(df) + except NameError: + print(df.to_string()) diff --git a/dbldatagen/spec/generator_spec_impl.py b/dbldatagen/spec/generator_spec_impl.py new file mode 100644 index 00000000..a508b1a5 --- /dev/null +++ b/dbldatagen/spec/generator_spec_impl.py @@ -0,0 +1,254 @@ +import logging +from typing import Dict, Union +import posixpath + +from dbldatagen.spec.generator_spec import TableDefinition +from pyspark.sql import SparkSession +import dbldatagen as dg +from .generator_spec import DatagenSpec, UCSchemaTarget, FilePathTarget, ColumnDefinition + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + +INTERNAL_ID_COLUMN_NAME = "id" + + +class Generator: + """ + Main data generation orchestrator that handles configuration, preparation, and writing of data. + """ + + def __init__(self, spark: SparkSession, app_name: str = "DataGen_ClassBased") -> None: + """ + Initialize the Generator with a SparkSession. + Args: + spark: An existing SparkSession instance + app_name: Application name for logging purposes + Raises: + RuntimeError: If spark is None + """ + if not spark: + logger.error( + "SparkSession cannot be None during Generator initialization") + raise RuntimeError("SparkSession cannot be None") + self.spark = spark + self._created_spark_session = False + self.app_name = app_name + logger.info("Generator initialized with SparkSession") + + def _columnspec_to_datagen_columnspec(self, col_def: ColumnDefinition) -> Dict[str, str]: + """ + Convert a ColumnDefinition to dbldatagen column specification. + Args: + col_def: ColumnDefinition object containing column configuration + Returns: + Dictionary containing dbldatagen column specification + """ + col_name = col_def.name + col_type = col_def.type + kwargs = col_def.options.copy() if col_def.options is not None else {} + + if col_def.primary: + kwargs["colType"] = col_type + kwargs["baseColumn"] = INTERNAL_ID_COLUMN_NAME + + if col_type == "string": + kwargs["baseColumnType"] = "hash" + elif col_type not in ["int", "long", "integer", "bigint", "short"]: + kwargs["baseColumnType"] = "auto" + logger.warning( + f"Primary key '{col_name}' has non-standard type '{col_type}'") + + # Log conflicting options for primary keys + conflicting_opts_for_pk = [ + "distribution", "template", "dataRange", "random", "omit", + "min", "max", "uniqueValues", "values", "expr" + ] + + for opt_key in conflicting_opts_for_pk: + if opt_key in kwargs: + logger.warning( + f"Primary key '{col_name}': Option '{opt_key}' may be ignored") + + if col_def.omit is not None and col_def.omit: + kwargs["omit"] = True + else: + kwargs = col_def.options.copy() if col_def.options is not None else {} + + if col_type: + kwargs["colType"] = col_type + if col_def.baseColumn: + kwargs["baseColumn"] = col_def.baseColumn + if col_def.baseColumnType: + kwargs["baseColumnType"] = col_def.baseColumnType + if col_def.omit is not None: + kwargs["omit"] = col_def.omit + + return kwargs + + def _prepare_data_generators( + self, + config: DatagenSpec, + config_source_name: str = "PydanticConfig" + ) -> Dict[str, dg.DataGenerator]: + """ + Prepare DataGenerator specifications for each table based on the configuration. + Args: + config: DatagenSpec Pydantic object containing table configurations + config_source_name: Name for the configuration source (for logging) + Returns: + Dictionary mapping table names to their configured dbldatagen.DataGenerator objects + Raises: + RuntimeError: If SparkSession is not available + ValueError: If any table preparation fails + Exception: If any unexpected error occurs during preparation + """ + logger.info( + f"Preparing data generators for {len(config.tables)} tables") + + if not self.spark: + logger.error( + "SparkSession is not available. Cannot prepare data generators") + raise RuntimeError( + "SparkSession is not available. Cannot prepare data generators") + + tables_config: Dict[str, TableDefinition] = config.tables + global_gen_options = config.generator_options if config.generator_options else {} + + prepared_generators: Dict[str, dg.DataGenerator] = {} + generation_order = list(tables_config.keys()) # This becomes impotant when we get into multitable + + for table_name in generation_order: + table_spec = tables_config[table_name] + logger.info(f"Preparing table: {table_name}") + + try: + # Create DataGenerator instance + data_gen = dg.DataGenerator( + sparkSession=self.spark, + name=f"{table_name}_spec_from_{config_source_name}", + rows=table_spec.number_of_rows, + partitions=table_spec.partitions, + **global_gen_options, + ) + + # Process each column + for col_def in table_spec.columns: + kwargs = self._columnspec_to_datagen_columnspec(col_def) + data_gen = data_gen.withColumn(colName=col_def.name, **kwargs) + # Has performance implications. + + prepared_generators[table_name] = data_gen + logger.info(f"Successfully prepared table: {table_name}") + + except Exception as e: + logger.error(f"Failed to prepare table '{table_name}': {e}") + raise RuntimeError( + f"Failed to prepare table '{table_name}': {e}") from e + + logger.info("All data generators prepared successfully") + return prepared_generators + + def write_prepared_data( + self, + prepared_generators: Dict[str, dg.DataGenerator], + output_destination: Union[UCSchemaTarget, FilePathTarget, None], + config_source_name: str = "PydanticConfig", + ) -> None: + """ + Write data from prepared generators to the specified output destination. + + Args: + prepared_generators: Dictionary of prepared DataGenerator objects + output_destination: Target destination for data output + config_source_name: Name for the configuration source (for logging) + + Raises: + RuntimeError: If any table write fails + ValueError: If output destination is not properly configured + """ + logger.info("Starting data writing phase") + + if not prepared_generators: + logger.warning("No prepared data generators to write") + return + + for table_name, data_gen in prepared_generators.items(): + logger.info(f"Writing table: {table_name}") + + try: + df = data_gen.build() + requested_rows = data_gen.rowCount + actual_row_count = df.count() + logger.info( + f"Built DataFrame for '{table_name}': {actual_row_count} rows (requested: {requested_rows})") + + if actual_row_count == 0 and requested_rows > 0: + logger.warning(f"Table '{table_name}': Requested {requested_rows} rows but built 0") + + # Write data based on destination type + if isinstance(output_destination, FilePathTarget): + output_path = posixpath.join(output_destination.base_path, table_name) + df.write.format(output_destination.output_format).mode("overwrite").save(output_path) + logger.info(f"Wrote table '{table_name}' to file path: {output_path}") + + elif isinstance(output_destination, UCSchemaTarget): + output_table = f"{output_destination.catalog}.{output_destination.schema_}.{table_name}" + df.write.mode("overwrite").saveAsTable(output_table) + logger.info(f"Wrote table '{table_name}' to Unity Catalog: {output_table}") + else: + logger.warning("No output destination specified, skipping data write") + return + except Exception as e: + logger.error(f"Failed to write table '{table_name}': {e}") + raise RuntimeError(f"Failed to write table '{table_name}': {e}") from e + logger.info("All data writes completed successfully") + + def generate_and_write_data( + self, + config: DatagenSpec, + config_source_name: str = "PydanticConfig" + ) -> None: + """ + Combined method to prepare data generators and write data in one operation. + This method orchestrates the complete data generation workflow: + 1. Prepare data generators from configuration + 2. Write data to the specified destination + Args: + config: DatagenSpec Pydantic object containing table configurations + config_source_name: Name for the configuration source (for logging) + Raises: + RuntimeError: If SparkSession is not available or any step fails + ValueError: If critical errors occur during preparation or writing + """ + logger.info(f"Starting combined data generation and writing for {len(config.tables)} tables") + + try: + # Phase 1: Prepare data generators + prepared_generators_map = self._prepare_data_generators(config, config_source_name) + + if not prepared_generators_map and list(config.tables.keys()): + logger.warning( + "No data generators were successfully prepared, though tables were defined") + return + + # Phase 2: Write data + self.write_prepared_data( + prepared_generators_map, + config.output_destination, + config_source_name + ) + + logger.info( + "Combined data generation and writing completed successfully") + + except Exception as e: + logger.error( + f"Error during combined data generation and writing: {e}") + raise RuntimeError( + f"Error during combined data generation and writing: {e}") from e \ No newline at end of file diff --git a/pydantic_compat.md b/pydantic_compat.md new file mode 100644 index 00000000..abf26e60 --- /dev/null +++ b/pydantic_compat.md @@ -0,0 +1,101 @@ +To write code that works on both Pydantic V1 and V2 and ensures a smooth future migration, you should code against the V1 API but import it through a compatibility shim. This approach uses V1's syntax, which Pydantic V2 can understand via its built-in V1 compatibility layer. + +----- + +### \#\# The Golden Rule: Code to V1, Import via a Shim 💡 + +The core strategy is to **write all your models using Pydantic V1 syntax and features**. You then use a special utility file to handle the imports, which makes your application code completely agnostic to the installed Pydantic version. + +----- + +### \#\# 1. Implement a Compatibility Shim (`compat.py`) + +This is the most critical step. Create a file named `compat.py` in your project that intelligently imports Pydantic components. Your application will import everything from this file instead of directly from `pydantic`. + +```python +# compat.py +# This module acts as a compatibility layer for Pydantic V1 and V2. + +try: + # This will succeed on environments with Pydantic V2.x + # It imports the V1 API that is bundled within V2. + from pydantic.v1 import BaseModel, Field, validator, constr + +except ImportError: + # This will be executed on environments with only Pydantic V1.x + from pydantic import BaseModel, Field, validator, constr + +# In your application code, do this: +# from .compat import BaseModel +# NOT this: +# from pydantic import BaseModel +``` + +----- + +### \#\# 2. Stick to V1 Features and Syntax (Do's and Don'ts) + +By following these rules in your application code, you ensure the logic works on both versions. + +#### **✅ Models and Fields: DO** + + * Use standard `BaseModel` and `Field` for all your data structures. This is the most stable part of the API. + +#### **❌ Models and Fields: DON'T** + + * **Do not use `__root__` models**. This V1 feature was removed in V2 and the compatibility is not perfect. Instead, model the data explicitly, even if it feels redundant. + * **Bad (Avoid):** `class MyList(BaseModel): __root__: list[str]` + * **Good (Compatible):** `class MyList(BaseModel): items: list[str]` + +#### **✅ Configuration: DO** + + * Use the nested `class Config:` for model configuration. This is the V1 way and is fully supported by the V2 compatibility layer. + * **Example:** + ```python + from .compat import BaseModel + + class User(BaseModel): + id: int + full_name: str + + class Config: + orm_mode = True # V2's compatibility layer translates this + allow_population_by_field_name = True + ``` + +#### **❌ Configuration: DON'T** + + * **Do not use the V2 `model_config` dictionary**. This is a V2-only feature. + +#### **✅ Validators and Data Types: DO** + + * Use the standard V1 `@validator`. It's robust and works perfectly across both versions. + * Use V1 constrained types like `constr`, `conint`, `conlist`. + * **Example:** + ```python + from .compat import BaseModel, validator, constr + + class Product(BaseModel): + name: constr(min_length=3) + + @validator("name") + def name_must_be_alpha(cls, v): + if not v.isalpha(): + raise ValueError("Name must be alphabetic") + return v + ``` + +#### **❌ Validators and Data Types: DON'T** + + * **Do not use V2 decorators** like `@field_validator`, `@model_validator`, or `@field_serializer`. + * **Do not use the V2 `Annotated` syntax** for validation (e.g., `Annotated[str, StringConstraints(min_length=2)]`). + +----- + +### \#\# 3. The Easy Migration Path + +When you're finally ready to leave V1 behind and upgrade your code to be V2-native, the process will be straightforward because your code is already consistent: + +1. **Change Imports**: Your first step will be a simple find-and-replace to change all `from .compat import ...` statements to `from pydantic import ...`. +2. **Run a Codelinter**: Tools like **Ruff** have built-in rules that can automatically refactor most of your V1 syntax (like `Config` classes and `@validator`s) to the new V2 syntax. +3. **Manual Refinements**: Address any complex patterns the automated tools couldn't handle, like replacing your `__root__` model alternatives. \ No newline at end of file diff --git a/scratch.md b/scratch.md new file mode 100644 index 00000000..a3afa5c3 --- /dev/null +++ b/scratch.md @@ -0,0 +1,4 @@ +Pydantic Notes +https://docs.databricks.com/aws/en/release-notes/runtime/14.3lts - 1.10.6 +https://docs.databricks.com/aws/en/release-notes/runtime/15.4lts - 1.10.6 +https://docs.databricks.com/aws/en/release-notes/runtime/16.4lts - 2.8.2 (2.20.1 - core) \ No newline at end of file diff --git a/tests/test_spec.py b/tests/test_spec.py deleted file mode 100644 index 2e12cb27..00000000 --- a/tests/test_spec.py +++ /dev/null @@ -1,7 +0,0 @@ -from dbldatagen.spec.generator import DatagenSpec - -def test_spec(): - spec = DatagenSpec(name="test_spec") - assert spec.name == "test_spec" - - diff --git a/tests/test_specs.py b/tests/test_specs.py new file mode 100644 index 00000000..d3c8ab2c --- /dev/null +++ b/tests/test_specs.py @@ -0,0 +1,466 @@ +from dbldatagen.spec.generator_spec import DatagenSpec +import pytest +from dbldatagen.spec.generator_spec import ( + DatagenSpec, + TableDefinition, + ColumnDefinition, + UCSchemaTarget, + FilePathTarget, + ValidationResult +) + +class TestValidationResult: + """Tests for ValidationResult class""" + + def test_empty_result_is_valid(self): + result = ValidationResult() + assert result.is_valid() + assert len(result.errors) == 0 + assert len(result.warnings) == 0 + + def test_result_with_errors_is_invalid(self): + result = ValidationResult() + result.add_error("Test error") + assert not result.is_valid() + assert len(result.errors) == 1 + + def test_result_with_only_warnings_is_valid(self): + result = ValidationResult() + result.add_warning("Test warning") + assert result.is_valid() + assert len(result.warnings) == 1 + + def test_result_string_representation(self): + result = ValidationResult() + result.add_error("Error 1") + result.add_error("Error 2") + result.add_warning("Warning 1") + + result_str = str(result) + assert "✗ Validation failed" in result_str + assert "Errors (2)" in result_str + assert "Error 1" in result_str + assert "Error 2" in result_str + assert "Warnings (1)" in result_str + assert "Warning 1" in result_str + + def test_valid_result_string_representation(self): + result = ValidationResult() + result_str = str(result) + assert "✓ Validation passed successfully" in result_str + + +class TestColumnDefinitionValidation: + """Tests for ColumnDefinition validation""" + + def test_valid_primary_column(self): + col = ColumnDefinition( + name="id", + type="int", + primary=True + ) + assert col.primary + assert col.type == "int" + + def test_primary_column_with_min_max_raises_error(self): + with pytest.raises(ValueError, match="cannot have min/max options"): + ColumnDefinition( + name="id", + type="int", + primary=True, + options={"min": 1, "max": 100} + ) + + def test_primary_column_nullable_raises_error(self): + with pytest.raises(ValueError, match="cannot be nullable"): + ColumnDefinition( + name="id", + type="int", + primary=True, + nullable=True + ) + + def test_primary_column_without_type_raises_error(self): + with pytest.raises(ValueError, match="must have a type defined"): + ColumnDefinition( + name="id", + primary=True + ) + + def test_non_primary_column_without_type(self): + # Should not raise + col = ColumnDefinition( + name="data", + options={"values": ["a", "b", "c"]} + ) + assert col.name == "data" + + +class TestDatagenSpecValidation: + """Tests for DatagenSpec.validate() method""" + + def test_valid_spec_passes_validation(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="name", type="string", options={"values": ["Alice", "Bob"]}), + ] + ) + }, + output_destination=UCSchemaTarget(catalog="main", schema_="default") + ) + + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.errors) == 0 + + def test_empty_tables_raises_error(self): + spec = DatagenSpec(tables={}) + + with pytest.raises(ValueError, match="at least one table"): + spec.validate(strict=True) + + def test_table_without_columns_raises_error(self): + spec = DatagenSpec( + tables={ + "empty_table": TableDefinition( + number_of_rows=100, + columns=[] + ) + } + ) + + with pytest.raises(ValueError, match="must have at least one column"): + spec.validate() + + def test_negative_row_count_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=-10, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + ) + + with pytest.raises(ValueError, match="invalid number_of_rows"): + spec.validate() + + def test_zero_row_count_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=0, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + ) + + with pytest.raises(ValueError, match="invalid number_of_rows"): + spec.validate() + + def test_invalid_partitions_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + partitions=-5, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + ) + + with pytest.raises(ValueError, match="invalid partitions"): + spec.validate() + + def test_duplicate_column_names_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="duplicate", type="string"), + ColumnDefinition(name="duplicate", type="int"), + ] + ) + } + ) + + with pytest.raises(ValueError, match="duplicate column names"): + spec.validate() + + def test_invalid_base_column_reference_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="email", type="string", baseColumn="nonexistent"), + ] + ) + } + ) + + with pytest.raises(ValueError, match="does not exist"): + spec.validate() + + def test_circular_dependency_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="col_a", type="string", baseColumn="col_b"), + ColumnDefinition(name="col_b", type="string", baseColumn="col_c"), + ColumnDefinition(name="col_c", type="string", baseColumn="col_a"), + ] + ) + } + ) + + with pytest.raises(ValueError, match="Circular dependency"): + spec.validate() + + def test_multiple_primary_columns_warning(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id1", type="int", primary=True), + ColumnDefinition(name="id2", type="int", primary=True), + ] + ) + } + ) + + # In strict mode, warnings cause errors + with pytest.raises(ValueError, match="multiple primary columns"): + spec.validate(strict=True) + + # In non-strict mode, should pass but have warnings + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.warnings) > 0 + assert any("multiple primary columns" in w for w in result.warnings) + + def test_column_without_type_or_options_warning(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="empty_col"), + ] + ) + } + ) + + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.warnings) > 0 + assert any("No type specified" in w for w in result.warnings) + + def test_no_output_destination_warning(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + ) + + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.warnings) > 0 + assert any("No output_destination" in w for w in result.warnings) + + def test_unknown_generator_option_warning(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + }, + generator_options={"unknown_option": "value"} + ) + + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.warnings) > 0 + assert any("Unknown generator option" in w for w in result.warnings) + + def test_multiple_errors_collected(self): + """Test that all errors are collected before raising""" + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=-10, # Error 1 + partitions=0, # Error 2 + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="id", type="string"), # Error 3: duplicate + ColumnDefinition(name="email", baseColumn="phone"), # Error 4: nonexistent + ] + ) + } + ) + + with pytest.raises(ValueError) as exc_info: + spec.validate() + + error_msg = str(exc_info.value) + # Should contain all errors + assert "invalid number_of_rows" in error_msg + assert "invalid partitions" in error_msg + assert "duplicate column names" in error_msg + assert "does not exist" in error_msg + + def test_strict_mode_raises_on_warnings(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + # No output_destination - will generate warning + ) + + # Strict mode should raise + with pytest.raises(ValueError): + spec.validate(strict=True) + + # Non-strict mode should pass + result = spec.validate(strict=False) + assert result.is_valid() + + def test_valid_base_column_chain(self): + """Test that valid baseColumn chains work""" + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="code", type="string", baseColumn="id"), + ColumnDefinition(name="hash", type="string", baseColumn="code"), + ] + ) + }, + output_destination=FilePathTarget(base_path="/tmp/data", output_format="parquet") + ) + + result = spec.validate(strict=False) + assert result.is_valid() + + def test_multiple_tables_validation(self): + """Test validation across multiple tables""" + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ), + "orders": TableDefinition( + number_of_rows=-50, # Error in second table + columns=[ColumnDefinition(name="order_id", type="int", primary=True)] + ), + "products": TableDefinition( + number_of_rows=200, + columns=[] # Error: no columns + ) + } + ) + + with pytest.raises(ValueError) as exc_info: + spec.validate() + + error_msg = str(exc_info.value) + # Should find errors in both tables + assert "orders" in error_msg + assert "products" in error_msg + + +class TestTargetValidation: + """Tests for output target validation""" + + def test_valid_uc_schema_target(self): + target = UCSchemaTarget(catalog="main", schema_="default") + assert target.catalog == "main" + assert target.schema_ == "default" + + def test_uc_schema_empty_catalog_raises_error(self): + with pytest.raises(ValueError, match="non-empty"): + UCSchemaTarget(catalog="", schema_="default") + + def test_valid_file_path_target(self): + target = FilePathTarget(base_path="/tmp/data", output_format="parquet") + assert target.base_path == "/tmp/data" + assert target.output_format == "parquet" + + def test_file_path_empty_base_path_raises_error(self): + with pytest.raises(ValueError, match="non-empty"): + FilePathTarget(base_path="", output_format="csv") + + def test_file_path_invalid_format_raises_error(self): + with pytest.raises(ValueError): + FilePathTarget(base_path="/tmp/data", output_format="json") + + +class TestValidationIntegration: + """Integration tests for validation""" + + def test_realistic_valid_spec(self): + """Test a realistic, valid specification""" + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=1000, + partitions=4, + columns=[ + ColumnDefinition(name="user_id", type="int", primary=True), + ColumnDefinition(name="username", type="string", options={ + "template": r"\w{8,12}" + }), + ColumnDefinition(name="email", type="string", options={ + "template": r"\w.\w@\w.com" + }), + ColumnDefinition(name="age", type="int", options={ + "min": 18, "max": 99 + }), + ] + ), + "orders": TableDefinition( + number_of_rows=5000, + columns=[ + ColumnDefinition(name="order_id", type="int", primary=True), + ColumnDefinition(name="amount", type="decimal", options={ + "min": 10.0, "max": 1000.0 + }), + ] + ) + }, + output_destination=UCSchemaTarget( + catalog="main", + schema_="synthetic_data" + ), + generator_options={ + "random": True, + "randomSeed": 42 + } + ) + + result = spec.validate(strict=True) + assert result.is_valid() + assert len(result.errors) == 0 + assert len(result.warnings) == 0 \ No newline at end of file From d37de684738cda5e7882a2755e7bbf889911df6f Mon Sep 17 00:00:00 2001 From: Anup Kalburgi Date: Wed, 5 Nov 2025 12:18:14 -0500 Subject: [PATCH 3/5] fixing tests --- dbldatagen/spec/column_spec.py | 55 +++++++++++++ dbldatagen/spec/compat.py | 9 +-- dbldatagen/spec/generator_spec.py | 103 +++++++------------------ dbldatagen/spec/generator_spec_impl.py | 22 +++--- makefile | 2 +- pyproject.toml | 3 +- 6 files changed, 104 insertions(+), 90 deletions(-) create mode 100644 dbldatagen/spec/column_spec.py diff --git a/dbldatagen/spec/column_spec.py b/dbldatagen/spec/column_spec.py new file mode 100644 index 00000000..8fd50496 --- /dev/null +++ b/dbldatagen/spec/column_spec.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from typing import Any, Literal + +from .compat import BaseModel, root_validator + + +DbldatagenBasicType = Literal[ + "string", + "int", + "long", + "float", + "double", + "decimal", + "boolean", + "date", + "timestamp", + "short", + "byte", + "binary", + "integer", + "bigint", + "tinyint", +] +class ColumnDefinition(BaseModel): + name: str + type: DbldatagenBasicType | None = None + primary: bool = False + options: dict[str, Any] | None = None + nullable: bool | None = False + omit: bool | None = False + baseColumn: str | None = "id" + baseColumnType: str | None = "auto" + + @root_validator() + def check_model_constraints(cls, values: dict[str, Any]) -> dict[str, Any]: + """ + Validates constraints across the entire model after individual fields are processed. + """ + is_primary = values.get("primary") + options = values.get("options") or {} # Handle None case + name = values.get("name") + is_nullable = values.get("nullable") + column_type = values.get("type") + + if is_primary: + if "min" in options or "max" in options: + raise ValueError(f"Primary column '{name}' cannot have min/max options.") + + if is_nullable: + raise ValueError(f"Primary column '{name}' cannot be nullable.") + + if column_type is None: + raise ValueError(f"Primary column '{name}' must have a type defined.") + return values diff --git a/dbldatagen/spec/compat.py b/dbldatagen/spec/compat.py index 7c30d57d..dfafe7b1 100644 --- a/dbldatagen/spec/compat.py +++ b/dbldatagen/spec/compat.py @@ -2,13 +2,12 @@ try: # This will succeed on environments with Pydantic V2.x - # It imports the V1 API that is bundled within V2. - from pydantic.v1 import BaseModel, Field, validator, constr - + from pydantic.v1 import BaseModel, Field, constr, root_validator, validator except ImportError: # This will be executed on environments with only Pydantic V1.x - from pydantic import BaseModel, Field, validator, constr, root_validator, field_validator + from pydantic import BaseModel, Field, constr, root_validator, validator # type: ignore[assignment,no-redef] +__all__ = ["BaseModel", "Field", "constr", "root_validator", "validator"] # In your application code, do this: # from .compat import BaseModel # NOT this: @@ -28,4 +27,4 @@ Future-Ready: When you eventually decide to migrate fully to the Pydantic V2 API (to take advantage of its speed and features), you only need to change your application code and your compat.py import statements, making the transition much clearer. -""" \ No newline at end of file +""" diff --git a/dbldatagen/spec/generator_spec.py b/dbldatagen/spec/generator_spec.py index 23afc4a0..ce3ec9ed 100644 --- a/dbldatagen/spec/generator_spec.py +++ b/dbldatagen/spec/generator_spec.py @@ -1,66 +1,25 @@ -from .compat import BaseModel, validator, root_validator, field_validator -from typing import Dict, Optional, Union, Any, Literal, List +from __future__ import annotations + +import logging +from typing import Any, Literal, Union + import pandas as pd -from IPython.display import display, HTML - -DbldatagenBasicType = Literal[ - "string", - "int", - "long", - "float", - "double", - "decimal", - "boolean", - "date", - "timestamp", - "short", - "byte", - "binary", - "integer", - "bigint", - "tinyint", -] - -class ColumnDefinition(BaseModel): - name: str - type: Optional[DbldatagenBasicType] = None - primary: bool = False - options: Optional[Dict[str, Any]] = {} - nullable: Optional[bool] = False - omit: Optional[bool] = False - baseColumn: Optional[str] = "id" - baseColumnType: Optional[str] = "auto" - - @root_validator(skip_on_failure=True) - def check_model_constraints(cls, values: Dict[str, Any]) -> Dict[str, Any]: - """ - Validates constraints across the entire model after individual fields are processed. - """ - is_primary = values.get("primary") - options = values.get("options", {}) - name = values.get("name") - is_nullable = values.get("nullable") - column_type = values.get("type") +from IPython.display import HTML, display - if is_primary: - if "min" in options or "max" in options: - raise ValueError(f"Primary column '{name}' cannot have min/max options.") +from dbldatagen.spec.column_spec import ColumnDefinition - if is_nullable: - raise ValueError(f"Primary column '{name}' cannot be nullable.") +from .compat import BaseModel, validator - if column_type is None: - raise ValueError(f"Primary column '{name}' must have a type defined.") - return values +logger = logging.getLogger(__name__) class UCSchemaTarget(BaseModel): catalog: str schema_: str output_format: str = "delta" # Default to delta for UC Schema - @field_validator("catalog", "schema_", mode="after") - def validate_identifiers(cls, v): # noqa: N805, pylint: disable=no-self-argument + @validator("catalog", "schema_") + def validate_identifiers(cls, v: str) -> str: if not v.strip(): raise ValueError("Identifier must be non-empty.") if not v.isidentifier(): @@ -68,7 +27,7 @@ def validate_identifiers(cls, v): # noqa: N805, pylint: disable=no-self-argumen f"'{v}' is not a basic Python identifier. Ensure validity for Unity Catalog.") return v.strip() - def __str__(self): + def __str__(self) -> str: return f"{self.catalog}.{self.schema_} (Format: {self.output_format}, Type: UC Table)" @@ -76,28 +35,28 @@ class FilePathTarget(BaseModel): base_path: str output_format: Literal["csv", "parquet"] # No default, must be specified - @field_validator("base_path", mode="after") - def validate_base_path(cls, v): # noqa: N805, pylint: disable=no-self-argument + @validator("base_path") + def validate_base_path(cls, v: str) -> str: if not v.strip(): raise ValueError("base_path must be non-empty.") return v.strip() - def __str__(self): + def __str__(self) -> str: return f"{self.base_path} (Format: {self.output_format}, Type: File Path)" class TableDefinition(BaseModel): number_of_rows: int - partitions: Optional[int] = None - columns: List[ColumnDefinition] + partitions: int | None = None + columns: list[ColumnDefinition] class ValidationResult: """Container for validation results with errors and warnings.""" def __init__(self) -> None: - self.errors: List[str] = [] - self.warnings: List[str] = [] + self.errors: list[str] = [] + self.warnings: list[str] = [] def add_error(self, message: str) -> None: """Add an error message.""" @@ -132,16 +91,16 @@ def __str__(self) -> str: return "\n".join(lines) class DatagenSpec(BaseModel): - tables: Dict[str, TableDefinition] - output_destination: Optional[Union[UCSchemaTarget, FilePathTarget]] = None # there is a abstraction, may be we can use that? talk to Greg - generator_options: Optional[Dict[str, Any]] = {} - intended_for_databricks: Optional[bool] = None # May be infered. + tables: dict[str, TableDefinition] + output_destination: Union[UCSchemaTarget, FilePathTarget] | None = None # there is a abstraction, may be we can use that? talk to Greg + generator_options: dict[str, Any] | None = None + intended_for_databricks: bool | None = None # May be infered. def _check_circular_dependencies( self, table_name: str, - columns: List[ColumnDefinition] - ) -> List[str]: + columns: list[ColumnDefinition] + ) -> list[str]: """ Check for circular dependencies in baseColumn references. Returns a list of error messages if circular dependencies are found. @@ -152,13 +111,13 @@ def _check_circular_dependencies( for col in columns: if col.baseColumn and col.baseColumn != "id": # Track the dependency chain - visited = set() + visited: set[str] = set() current = col.name while current: if current in visited: # Found a cycle - cycle_path = " -> ".join(list(visited) + [current]) + cycle_path = " -> ".join([*list(visited), current]) errors.append( f"Table '{table_name}': Circular dependency detected in column '{col.name}': {cycle_path}" ) @@ -182,7 +141,7 @@ def _check_circular_dependencies( return errors - def validate(self, strict: bool = True) -> ValidationResult: + def validate(self, strict: bool = True) -> ValidationResult: # type: ignore[override] """ Validates the entire DatagenSpec configuration. Always runs all validation checks and collects all errors and warnings. @@ -284,7 +243,7 @@ def validate(self, strict: bool = True) -> ValidationResult: "random", "randomSeed", "randomSeedMethod", "verbose", "debug", "seedColumnName" ] - for key in self.generator_options.keys(): + for key in self.generator_options: if key not in known_options: result.add_warning( f"Unknown generator option: '{key}'. " @@ -292,9 +251,7 @@ def validate(self, strict: bool = True) -> ValidationResult: ) # Now that all validations are complete, decide whether to raise - if strict and (result.errors or result.warnings): - raise ValueError(str(result)) - elif not strict and result.errors: + if (strict and (result.errors or result.warnings)) or (not strict and result.errors): raise ValueError(str(result)) return result diff --git a/dbldatagen/spec/generator_spec_impl.py b/dbldatagen/spec/generator_spec_impl.py index a508b1a5..e03e30fb 100644 --- a/dbldatagen/spec/generator_spec_impl.py +++ b/dbldatagen/spec/generator_spec_impl.py @@ -1,11 +1,13 @@ import logging -from typing import Dict, Union import posixpath +from typing import Any, Union -from dbldatagen.spec.generator_spec import TableDefinition from pyspark.sql import SparkSession + import dbldatagen as dg -from .generator_spec import DatagenSpec, UCSchemaTarget, FilePathTarget, ColumnDefinition +from dbldatagen.spec.generator_spec import TableDefinition + +from .generator_spec import ColumnDefinition, DatagenSpec, FilePathTarget, UCSchemaTarget logging.basicConfig( @@ -41,7 +43,7 @@ def __init__(self, spark: SparkSession, app_name: str = "DataGen_ClassBased") -> self.app_name = app_name logger.info("Generator initialized with SparkSession") - def _columnspec_to_datagen_columnspec(self, col_def: ColumnDefinition) -> Dict[str, str]: + def _columnspec_to_datagen_columnspec(self, col_def: ColumnDefinition) -> dict[str, Any]: """ Convert a ColumnDefinition to dbldatagen column specification. Args: @@ -95,7 +97,7 @@ def _prepare_data_generators( self, config: DatagenSpec, config_source_name: str = "PydanticConfig" - ) -> Dict[str, dg.DataGenerator]: + ) -> dict[str, dg.DataGenerator]: """ Prepare DataGenerator specifications for each table based on the configuration. Args: @@ -117,10 +119,10 @@ def _prepare_data_generators( raise RuntimeError( "SparkSession is not available. Cannot prepare data generators") - tables_config: Dict[str, TableDefinition] = config.tables + tables_config: dict[str, TableDefinition] = config.tables global_gen_options = config.generator_options if config.generator_options else {} - prepared_generators: Dict[str, dg.DataGenerator] = {} + prepared_generators: dict[str, dg.DataGenerator] = {} generation_order = list(tables_config.keys()) # This becomes impotant when we get into multitable for table_name in generation_order: @@ -156,7 +158,7 @@ def _prepare_data_generators( def write_prepared_data( self, - prepared_generators: Dict[str, dg.DataGenerator], + prepared_generators: dict[str, dg.DataGenerator], output_destination: Union[UCSchemaTarget, FilePathTarget, None], config_source_name: str = "PydanticConfig", ) -> None: @@ -188,7 +190,7 @@ def write_prepared_data( logger.info( f"Built DataFrame for '{table_name}': {actual_row_count} rows (requested: {requested_rows})") - if actual_row_count == 0 and requested_rows > 0: + if actual_row_count == 0 and requested_rows is not None and requested_rows > 0: logger.warning(f"Table '{table_name}': Requested {requested_rows} rows but built 0") # Write data based on destination type @@ -251,4 +253,4 @@ def generate_and_write_data( logger.error( f"Error during combined data generation and writing: {e}") raise RuntimeError( - f"Error during combined data generation and writing: {e}") from e \ No newline at end of file + f"Error during combined data generation and writing: {e}") from e diff --git a/makefile b/makefile index a5f4486c..df3e5e6e 100644 --- a/makefile +++ b/makefile @@ -8,7 +8,7 @@ clean: .venv/bin/python: pip install hatch - hatch env create test-pydantic.pydantic==1.10.6-v1 + hatch env create dev: .venv/bin/python @hatch run which python diff --git a/pyproject.toml b/pyproject.toml index 304562e2..99be0820 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,6 +103,7 @@ dependencies = [ "jmespath>=0.10.0", "py4j>=0.10.9", "pickleshare>=0.7.5", + "ipython>=7.32.0", ] python="3.10" @@ -431,7 +432,7 @@ check_untyped_defs = true disallow_untyped_decorators = false no_implicit_optional = true warn_redundant_casts = true -warn_unused_ignores = true +warn_unused_ignores = false warn_no_return = true warn_unreachable = true strict_equality = true From f5214caf8b9859aa64dea3228bdcbcfe7306b34f Mon Sep 17 00:00:00 2001 From: Anup Kalburgi Date: Thu, 6 Nov 2025 10:02:23 -0500 Subject: [PATCH 4/5] changes to make file --- makefile | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/makefile b/makefile index df3e5e6e..a551f964 100644 --- a/makefile +++ b/makefile @@ -3,21 +3,18 @@ all: clean dev lint fmt test clean: - rm -fr .venv clean htmlcov .mypy_cache .pytest_cache .ruff_cache .coverage coverage.xml + rm -fr clean htmlcov .mypy_cache .pytest_cache .ruff_cache .coverage coverage.xml rm -fr **/*.pyc -.venv/bin/python: - pip install hatch - hatch env create - -dev: .venv/bin/python +dev: + @which hatch > /dev/null || pip install hatch @hatch run which python lint: - hatch run verify + hatch run test-pydantic.2.8.2:verify fmt: - hatch run fmt + hatch run test-pydantic.2.8.2:fmt test: hatch run test-pydantic:test From 0a8fa2a29977033c0217b85d57631dd2b5ba4aef Mon Sep 17 00:00:00 2001 From: Anup Kalburgi Date: Thu, 6 Nov 2025 13:06:32 -0500 Subject: [PATCH 5/5] updating docs --- dbldatagen/spec/column_spec.py | 56 +++++- dbldatagen/spec/compat.py | 69 +++++--- dbldatagen/spec/generator_spec.py | 226 ++++++++++++++++++++++--- dbldatagen/spec/generator_spec_impl.py | 187 ++++++++++++++------ 4 files changed, 445 insertions(+), 93 deletions(-) diff --git a/dbldatagen/spec/column_spec.py b/dbldatagen/spec/column_spec.py index 8fd50496..74e9e57f 100644 --- a/dbldatagen/spec/column_spec.py +++ b/dbldatagen/spec/column_spec.py @@ -22,7 +22,48 @@ "bigint", "tinyint", ] +"""Type alias representing supported basic Spark SQL data types for column definitions. + +Includes both standard SQL types (e.g. string, int, double) and Spark-specific type names +(e.g. bigint, tinyint). These types are used in the ColumnDefinition to specify the data type +for generated columns. +""" + + class ColumnDefinition(BaseModel): + """Defines the specification for a single column in a synthetic data table. + + This class encapsulates all the information needed to generate data for a single column, + including its name, type, constraints, and generation options. It supports both primary key + columns and derived columns that can reference other columns. + + :param name: Name of the column to be generated + :param type: Spark SQL data type for the column (e.g., "string", "int", "timestamp"). + If None, type may be inferred from options or baseColumn + :param primary: If True, this column will be treated as a primary key column with unique values. + Primary columns cannot have min/max options and cannot be nullable + :param options: Dictionary of additional options controlling column generation behavior. + Common options include: min, max, step, values, template, distribution, etc. + See dbldatagen documentation for full list of available options + :param nullable: If True, the column may contain NULL values. Primary columns cannot be nullable + :param omit: If True, this column will be generated internally but excluded from the final output. + Useful for intermediate columns used in calculations + :param baseColumn: Name of another column to use as the basis for generating this column's values. + Default is "id" which refers to the internal row identifier + :param baseColumnType: Method for deriving values from the baseColumn. Common values: + "auto" (infer behavior), "hash" (hash the base column values), + "values" (use base column values directly) + + .. note:: + Primary columns have special constraints: + - Must have a type defined + - Cannot have min/max options + - Cannot be nullable + + .. note:: + Columns can be chained via baseColumn references, but circular dependencies + will be caught during validation + """ name: str type: DbldatagenBasicType | None = None primary: bool = False @@ -34,8 +75,19 @@ class ColumnDefinition(BaseModel): @root_validator() def check_model_constraints(cls, values: dict[str, Any]) -> dict[str, Any]: - """ - Validates constraints across the entire model after individual fields are processed. + """Validates constraints across the entire ColumnDefinition model. + + This validator runs after all individual field validators and checks for cross-field + constraints that depend on multiple fields being set. It ensures that primary key + columns meet all necessary requirements and that conflicting options are not specified. + + :param values: Dictionary of all field values for this ColumnDefinition instance + :returns: The validated values dictionary, unmodified if all validations pass + :raises ValueError: If primary column has min/max options, or if primary column is nullable, + or if primary column doesn't have a type defined + + .. note:: + This is a Pydantic root validator that runs automatically during model instantiation """ is_primary = values.get("primary") options = values.get("options") or {} # Handle None case diff --git a/dbldatagen/spec/compat.py b/dbldatagen/spec/compat.py index dfafe7b1..8fe47508 100644 --- a/dbldatagen/spec/compat.py +++ b/dbldatagen/spec/compat.py @@ -1,30 +1,57 @@ -# This module acts as a compatibility layer for Pydantic V1 and V2. +"""Pydantic compatibility layer for supporting both Pydantic V1 and V2. + +This module provides a unified interface for Pydantic functionality that works across both +Pydantic V1.x and V2.x versions. It ensures that the dbldatagen spec API works in multiple +environments without requiring specific Pydantic version installations. + +The module exports a consistent Pydantic V1-compatible API regardless of which version is installed: + +- **BaseModel**: Base class for all Pydantic models +- **Field**: Field definition with metadata and validation +- **constr**: Constrained string type for validation +- **root_validator**: Decorator for model-level validation +- **validator**: Decorator for field-level validation + +Usage in other modules: + Always import from this compat module, not directly from pydantic:: + + # Correct + from .compat import BaseModel, validator + + # Incorrect - don't do this + from pydantic import BaseModel, validator + +Environment Support: + - **Pydantic V2.x environments**: Imports from pydantic.v1 compatibility layer + - **Pydantic V1.x environments**: Imports directly from pydantic package + - **Databricks runtimes**: Works with pre-installed Pydantic versions without conflicts + +.. note:: + This approach is inspired by FastAPI's compatibility layer: + https://github.com/fastapi/fastapi/blob/master/fastapi/_compat.py + +Benefits: + - **No Installation Required**: Works with whatever Pydantic version is available + - **Single Codebase**: One set of code works across both Pydantic versions + - **Environment Agnostic**: Application code doesn't need to know which version is installed + - **Future-Ready**: Easy migration path to Pydantic V2 API when ready + - **Databricks Compatible**: Avoids conflicts with pre-installed libraries + +Future Migration: + When ready to migrate to native Pydantic V2 API: + 1. Update application code to use V2 patterns + 2. Modify this compat.py to import from native V2 locations + 3. Test in both environments + 4. Deploy incrementally +""" try: # This will succeed on environments with Pydantic V2.x + # Pydantic V2 provides a v1 compatibility layer for backwards compatibility from pydantic.v1 import BaseModel, Field, constr, root_validator, validator except ImportError: # This will be executed on environments with only Pydantic V1.x + # Import directly from pydantic since v1 subpackage doesn't exist from pydantic import BaseModel, Field, constr, root_validator, validator # type: ignore[assignment,no-redef] __all__ = ["BaseModel", "Field", "constr", "root_validator", "validator"] -# In your application code, do this: -# from .compat import BaseModel -# NOT this: -# from pydantic import BaseModel - -# FastAPI Notes -# https://github.com/fastapi/fastapi/blob/master/fastapi/_compat.py - - -""" -## Why This Approach -No Installation Required: It directly addresses your core requirement. -You don't need to %pip install anything, which avoids conflicts with the pre-installed libraries on Databricks. -Single Codebase: You maintain one set of code that is guaranteed to work with the Pydantic V1 API, which is available in both runtimes. - -Environment Agnostic: Your application code in models.py has no idea which version of Pydantic is actually installed. The compat.py module handles that complexity completely. - -Future-Ready: When you eventually decide to migrate fully to the Pydantic V2 API (to take advantage of its speed and features), -you only need to change your application code and your compat.py import statements, making the transition much clearer. -""" diff --git a/dbldatagen/spec/generator_spec.py b/dbldatagen/spec/generator_spec.py index ce3ec9ed..d0a750db 100644 --- a/dbldatagen/spec/generator_spec.py +++ b/dbldatagen/spec/generator_spec.py @@ -13,13 +13,44 @@ logger = logging.getLogger(__name__) + class UCSchemaTarget(BaseModel): + """Defines a Unity Catalog schema as the output destination for generated data. + + This class represents a Unity Catalog location (catalog.schema) where generated tables + will be written. Unity Catalog is Databricks' unified governance solution for data and AI. + + :param catalog: Unity Catalog catalog name where tables will be written + :param schema_: Unity Catalog schema (database) name within the catalog + :param output_format: Data format for table storage. Defaults to "delta" which is the + recommended format for Unity Catalog tables + + .. note:: + The schema parameter is named `schema_` (with underscore) to avoid conflict with + Python's built-in schema keyword and Pydantic functionality + + .. note:: + Tables will be written to the location: `{catalog}.{schema_}.{table_name}` + """ catalog: str schema_: str output_format: str = "delta" # Default to delta for UC Schema @validator("catalog", "schema_") def validate_identifiers(cls, v: str) -> str: + """Validates that catalog and schema names are valid identifiers. + + Ensures the identifier is non-empty and follows Python identifier conventions. + Issues a warning if the identifier is not a basic Python identifier, as this may + cause issues with Unity Catalog. + + :param v: The identifier string to validate (catalog or schema name) + :returns: The validated and stripped identifier string + :raises ValueError: If the identifier is empty or contains only whitespace + + .. note:: + This is a Pydantic field validator that runs automatically during model instantiation + """ if not v.strip(): raise ValueError("Identifier must be non-empty.") if not v.isidentifier(): @@ -28,50 +59,130 @@ def validate_identifiers(cls, v: str) -> str: return v.strip() def __str__(self) -> str: + """Returns a human-readable string representation of the Unity Catalog target. + + :returns: Formatted string showing catalog, schema, format and type + """ return f"{self.catalog}.{self.schema_} (Format: {self.output_format}, Type: UC Table)" class FilePathTarget(BaseModel): + """Defines a file system path as the output destination for generated data. + + This class represents a file system location where generated tables will be written + as files. Each table will be written to a subdirectory within the base path. + + :param base_path: Base file system path where table data files will be written. + Each table will be written to {base_path}/{table_name}/ + :param output_format: File format for data storage. Must be either "csv" or "parquet". + No default value - must be explicitly specified + + .. note:: + Unlike UCSchemaTarget, this requires an explicit output_format with no default + + .. note:: + The base_path can be a local file system path, DBFS path, or cloud storage path + (e.g., s3://, gs://, abfs://) depending on your environment + """ base_path: str output_format: Literal["csv", "parquet"] # No default, must be specified @validator("base_path") def validate_base_path(cls, v: str) -> str: + """Validates that the base path is non-empty. + + :param v: The base path string to validate + :returns: The validated and stripped base path string + :raises ValueError: If the base path is empty or contains only whitespace + + .. note:: + This is a Pydantic field validator that runs automatically during model instantiation + """ if not v.strip(): raise ValueError("base_path must be non-empty.") return v.strip() def __str__(self) -> str: + """Returns a human-readable string representation of the file path target. + + :returns: Formatted string showing base path, format and type + """ return f"{self.base_path} (Format: {self.output_format}, Type: File Path)" class TableDefinition(BaseModel): + """Defines the complete specification for a single synthetic data table. + + This class encapsulates all the information needed to generate a table of synthetic data, + including the number of rows, partitioning, and column specifications. + + :param number_of_rows: Total number of data rows to generate for this table. + Must be a positive integer + :param partitions: Number of Spark partitions to use when generating data. + If None, defaults to Spark's default parallelism setting. + More partitions can improve generation speed for large datasets + :param columns: List of ColumnDefinition objects specifying the columns to generate + in this table. At least one column must be specified + + .. note:: + Setting an appropriate number of partitions can significantly impact generation performance. + As a rule of thumb, use 2-4 partitions per CPU core available in your Spark cluster + + .. note:: + Column order in the list determines the order of columns in the generated output + """ number_of_rows: int partitions: int | None = None columns: list[ColumnDefinition] class ValidationResult: - """Container for validation results with errors and warnings.""" + """Container for validation results that collects errors and warnings during spec validation. + + This class accumulates validation issues found while checking a DatagenSpec configuration. + It distinguishes between errors (which prevent data generation) and warnings (which + indicate potential issues but don't block generation). + + .. note:: + Validation passes if there are no errors, even if warnings are present + """ def __init__(self) -> None: + """Initialize an empty ValidationResult with no errors or warnings.""" self.errors: list[str] = [] self.warnings: list[str] = [] def add_error(self, message: str) -> None: - """Add an error message.""" + """Add an error message to the validation results. + + Errors indicate critical issues that will prevent successful data generation. + + :param message: Descriptive error message explaining the validation failure + """ self.errors.append(message) def add_warning(self, message: str) -> None: - """Add a warning message.""" + """Add a warning message to the validation results. + + Warnings indicate potential issues or non-optimal configurations that may affect + data generation but won't prevent it from completing. + + :param message: Descriptive warning message explaining the potential issue + """ self.warnings.append(message) def is_valid(self) -> bool: - """Returns True if there are no errors.""" + """Check if validation passed without errors. + + :returns: True if there are no errors (warnings are allowed), False otherwise + """ return len(self.errors) == 0 def __str__(self) -> str: - """String representation of validation results.""" + """Generate a formatted string representation of all validation results. + + :returns: Multi-line string containing formatted errors and warnings with counts + """ lines = [] if self.is_valid(): lines.append("✓ Validation passed successfully") @@ -91,6 +202,34 @@ def __str__(self) -> str: return "\n".join(lines) class DatagenSpec(BaseModel): + """Top-level specification for synthetic data generation across one or more tables. + + This is the main configuration class for the dbldatagen spec-based API. It defines all tables + to be generated, where the output should be written, and global generation options. + + :param tables: Dictionary mapping table names to their TableDefinition specifications. + Keys are the table names that will be used in the output destination + :param output_destination: Target location for generated data. Can be either a + UCSchemaTarget (Unity Catalog) or FilePathTarget (file system). + If None, data will be generated but not persisted + :param generator_options: Dictionary of global options affecting data generation behavior. + Common options include: + - random: Enable random data generation + - randomSeed: Seed for reproducible random generation + - randomSeedMethod: Method for computing random seeds + - verbose: Enable verbose logging + - debug: Enable debug logging + - seedColumnName: Name of internal seed column + :param intended_for_databricks: Flag indicating if this spec is designed for Databricks. + May be automatically inferred based on configuration + + .. note:: + Call the validate() method before using this spec to ensure configuration is correct + + .. note:: + Multiple tables can share the same DatagenSpec and will be generated in the order + they appear in the tables dictionary + """ tables: dict[str, TableDefinition] output_destination: Union[UCSchemaTarget, FilePathTarget] | None = None # there is a abstraction, may be we can use that? talk to Greg generator_options: dict[str, Any] | None = None @@ -101,9 +240,19 @@ def _check_circular_dependencies( table_name: str, columns: list[ColumnDefinition] ) -> list[str]: - """ - Check for circular dependencies in baseColumn references. - Returns a list of error messages if circular dependencies are found. + """Check for circular dependencies in baseColumn references within a table. + + Analyzes column dependencies to detect cycles where columns reference each other + in a circular manner (e.g., col A depends on col B, col B depends on col A). + Such circular dependencies would make data generation impossible. + + :param table_name: Name of the table being validated (used in error messages) + :param columns: List of ColumnDefinition objects to check for circular dependencies + :returns: List of error message strings describing any circular dependencies found. + Empty list if no circular dependencies exist + + .. note:: + This method performs a graph traversal to detect cycles in the dependency chain """ errors = [] column_map = {col.name: col for col in columns} @@ -142,20 +291,35 @@ def _check_circular_dependencies( return errors def validate(self, strict: bool = True) -> ValidationResult: # type: ignore[override] - """ - Validates the entire DatagenSpec configuration. - Always runs all validation checks and collects all errors and warnings. - - Args: - strict: If True, raises ValueError if any errors or warnings are found. - If False, only raises ValueError if errors (not warnings) are found. - - Returns: - ValidationResult object containing all errors and warnings found. - - Raises: - ValueError: If validation fails based on strict mode setting. - The exception message contains all errors and warnings. + """Validate the entire DatagenSpec configuration comprehensively. + + This method performs extensive validation of the entire spec, including: + - Ensuring at least one table is defined + - Validating each table has columns and positive row counts + - Checking for duplicate column names within tables + - Verifying baseColumn references point to existing columns + - Detecting circular dependencies in baseColumn chains + - Validating primary key constraints + - Checking output destination configuration + - Validating generator options + + All validation checks are performed regardless of whether errors are found, allowing + you to see all issues at once rather than fixing them one at a time. + + :param strict: Controls validation failure behavior: + - If True: Raises ValueError for any errors OR warnings found + - If False: Only raises ValueError for errors (warnings are tolerated) + :returns: ValidationResult object containing all collected errors and warnings, + even if an exception is raised + :raises ValueError: If validation fails based on strict mode setting. + The exception message contains the formatted ValidationResult + + .. note:: + It's recommended to call validate() before attempting to generate data to catch + configuration issues early + + .. note:: + Use strict=False during development to see warnings without blocking generation """ result = ValidationResult() @@ -258,6 +422,24 @@ def validate(self, strict: bool = True) -> ValidationResult: # type: ignore[ove def display_all_tables(self) -> None: + """Display a formatted view of all table definitions in the spec. + + This method provides a user-friendly visualization of the spec configuration, showing + each table's structure and the output destination. It's designed for use in Jupyter + notebooks and will render HTML output when available. + + For each table, displays: + - Table name + - Output destination (or warning if not configured) + - DataFrame showing all columns with their properties + + .. note:: + This method uses IPython.display.HTML when available, falling back to plain text + output in non-notebook environments + + .. note:: + This is intended for interactive exploration and debugging of spec configurations + """ for table_name, table_def in self.tables.items(): print(f"Table: {table_name}") diff --git a/dbldatagen/spec/generator_spec_impl.py b/dbldatagen/spec/generator_spec_impl.py index e03e30fb..fc53863e 100644 --- a/dbldatagen/spec/generator_spec_impl.py +++ b/dbldatagen/spec/generator_spec_impl.py @@ -21,18 +21,35 @@ class Generator: - """ - Main data generation orchestrator that handles configuration, preparation, and writing of data. + """Main orchestrator for generating synthetic data from DatagenSpec configurations. + + This class provides the primary interface for the spec-based data generation API. It handles + the complete lifecycle of data generation: + 1. Converting spec configurations into dbldatagen DataGenerator objects + 2. Building the actual data as Spark DataFrames + 3. Writing the data to specified output destinations (Unity Catalog or file system) + + The Generator encapsulates all the complexity of translating declarative specs into + executable data generation plans, allowing users to focus on what data they want rather + than how to generate it. + + :param spark: Active SparkSession to use for data generation + :param app_name: Application name used in logging and tracking. Defaults to "DataGen_ClassBased" + + .. note:: + The Generator requires an active SparkSession. On Databricks, you can use the pre-configured + `spark` variable. For local development, create a SparkSession first + + .. note:: + The same Generator instance can be reused to generate multiple different specs """ def __init__(self, spark: SparkSession, app_name: str = "DataGen_ClassBased") -> None: - """ - Initialize the Generator with a SparkSession. - Args: - spark: An existing SparkSession instance - app_name: Application name for logging purposes - Raises: - RuntimeError: If spark is None + """Initialize the Generator with a SparkSession. + + :param spark: An active SparkSession instance to use for data generation operations + :param app_name: Application name for logging and identification purposes + :raises RuntimeError: If spark is None or not properly initialized """ if not spark: logger.error( @@ -44,12 +61,26 @@ def __init__(self, spark: SparkSession, app_name: str = "DataGen_ClassBased") -> logger.info("Generator initialized with SparkSession") def _columnspec_to_datagen_columnspec(self, col_def: ColumnDefinition) -> dict[str, Any]: - """ - Convert a ColumnDefinition to dbldatagen column specification. - Args: - col_def: ColumnDefinition object containing column configuration - Returns: - Dictionary containing dbldatagen column specification + """Convert a ColumnDefinition spec into dbldatagen DataGenerator column arguments. + + This internal method translates the declarative ColumnDefinition format into the + keyword arguments expected by dbldatagen's withColumn() method. It handles special + cases like primary keys, nullable columns, and omitted columns. + + Primary key columns receive special treatment: + - Automatically use the internal ID column as their base + - String primary keys use hash-based generation + - Numeric primary keys maintain sequential values + + :param col_def: ColumnDefinition object from a DatagenSpec + :returns: Dictionary of keyword arguments suitable for DataGenerator.withColumn() + + .. note:: + This is an internal method not intended for direct use by end users + + .. note:: + Conflicting options for primary keys (like min/max, values, expr) will generate + warnings but won't prevent generation - the primary key behavior takes precedence """ col_name = col_def.name col_type = col_def.type @@ -98,17 +129,33 @@ def _prepare_data_generators( config: DatagenSpec, config_source_name: str = "PydanticConfig" ) -> dict[str, dg.DataGenerator]: - """ - Prepare DataGenerator specifications for each table based on the configuration. - Args: - config: DatagenSpec Pydantic object containing table configurations - config_source_name: Name for the configuration source (for logging) - Returns: - Dictionary mapping table names to their configured dbldatagen.DataGenerator objects - Raises: - RuntimeError: If SparkSession is not available - ValueError: If any table preparation fails - Exception: If any unexpected error occurs during preparation + """Prepare DataGenerator objects for all tables defined in the spec. + + This internal method is the first phase of data generation. It processes the DatagenSpec + and creates configured dbldatagen.DataGenerator objects for each table, but does not + yet build the actual data. Each table's definition is converted into a DataGenerator + with all its columns configured. + + The method: + 1. Iterates through all tables in the spec + 2. Creates a DataGenerator for each table with appropriate row count and partitioning + 3. Adds all columns to each DataGenerator using withColumn() + 4. Applies global generator options + 5. Returns the prepared generators ready for building + + :param config: DatagenSpec containing table definitions and configuration + :param config_source_name: Descriptive name for the config source, used in logging + and DataGenerator naming + :returns: Dictionary mapping table names to their prepared DataGenerator instances + :raises RuntimeError: If SparkSession is not available or if any table preparation fails + :raises ValueError: If table configuration is invalid (should be caught by validate() first) + + .. note:: + This is an internal method. Use generate_and_write_data() for the complete workflow + + .. note:: + Preparation is separate from building to allow inspection and modification of + DataGenerators before data generation begins """ logger.info( f"Preparing data generators for {len(config.tables)} tables") @@ -162,17 +209,34 @@ def write_prepared_data( output_destination: Union[UCSchemaTarget, FilePathTarget, None], config_source_name: str = "PydanticConfig", ) -> None: - """ - Write data from prepared generators to the specified output destination. - - Args: - prepared_generators: Dictionary of prepared DataGenerator objects - output_destination: Target destination for data output - config_source_name: Name for the configuration source (for logging) - - Raises: - RuntimeError: If any table write fails - ValueError: If output destination is not properly configured + """Build and write data from prepared generators to the specified output destination. + + This method handles the second phase of data generation: taking prepared DataGenerator + objects, building them into actual Spark DataFrames, and writing the results to the + configured output location. + + The method: + 1. Iterates through all prepared generators + 2. Builds each generator into a DataFrame using build() + 3. Writes the DataFrame to the appropriate destination: + - For FilePathTarget: Writes to {base_path}/{table_name}/ in specified format + - For UCSchemaTarget: Writes to {catalog}.{schema}.{table_name} as managed table + 4. Logs row counts and write locations + + :param prepared_generators: Dictionary mapping table names to DataGenerator objects + (typically from _prepare_data_generators()) + :param output_destination: Target location for output. Can be UCSchemaTarget, + FilePathTarget, or None (no write, data generated only) + :param config_source_name: Descriptive name for the config source, used in logging + :raises RuntimeError: If DataFrame building or writing fails for any table + :raises ValueError: If output destination type is not recognized + + .. note:: + If output_destination is None, data is generated but not persisted anywhere. + This can be useful for testing or when you want to process the data in-memory + + .. note:: + Writing uses "overwrite" mode, so existing tables/files will be replaced """ logger.info("Starting data writing phase") @@ -216,17 +280,44 @@ def generate_and_write_data( config: DatagenSpec, config_source_name: str = "PydanticConfig" ) -> None: - """ - Combined method to prepare data generators and write data in one operation. - This method orchestrates the complete data generation workflow: - 1. Prepare data generators from configuration - 2. Write data to the specified destination - Args: - config: DatagenSpec Pydantic object containing table configurations - config_source_name: Name for the configuration source (for logging) - Raises: - RuntimeError: If SparkSession is not available or any step fails - ValueError: If critical errors occur during preparation or writing + """Execute the complete data generation workflow from spec to output. + + This is the primary high-level method for generating data from a DatagenSpec. It + orchestrates the entire process in one call, handling both preparation and writing phases. + + The complete workflow: + 1. Validates that the config is properly structured (you should call config.validate() first) + 2. Converts the spec into DataGenerator objects for each table + 3. Builds the DataFrames by executing the generation logic + 4. Writes the results to the configured output destination + 5. Logs progress and completion status + + This method is the recommended entry point for most use cases. For more control over + the generation process, use _prepare_data_generators() and write_prepared_data() separately. + + :param config: DatagenSpec object defining tables, columns, and output destination. + Should be validated with config.validate() before calling this method + :param config_source_name: Descriptive name for the config source, used in logging + and naming DataGenerator instances + :raises RuntimeError: If SparkSession is unavailable, or if preparation or writing fails + :raises ValueError: If the config is invalid (though config.validate() should catch this first) + + .. note:: + It's strongly recommended to call config.validate() before this method to catch + configuration errors early with better error messages + + .. note:: + Generation is performed sequentially: table1 is fully generated and written before + table2 begins. For multi-table generation with dependencies, the order matters + + Example: + >>> spec = DatagenSpec( + ... tables={"users": user_table_def}, + ... output_destination=UCSchemaTarget(catalog="main", schema_="test") + ... ) + >>> spec.validate() # Check for errors first + >>> generator = Generator(spark) + >>> generator.generate_and_write_data(spec) """ logger.info(f"Starting combined data generation and writing for {len(config.tables)} tables")