diff --git a/dbldatagen/spec/column_spec.py b/dbldatagen/spec/column_spec.py new file mode 100644 index 00000000..74e9e57f --- /dev/null +++ b/dbldatagen/spec/column_spec.py @@ -0,0 +1,107 @@ +from __future__ import annotations + +from typing import Any, Literal + +from .compat import BaseModel, root_validator + + +DbldatagenBasicType = Literal[ + "string", + "int", + "long", + "float", + "double", + "decimal", + "boolean", + "date", + "timestamp", + "short", + "byte", + "binary", + "integer", + "bigint", + "tinyint", +] +"""Type alias representing supported basic Spark SQL data types for column definitions. + +Includes both standard SQL types (e.g. string, int, double) and Spark-specific type names +(e.g. bigint, tinyint). These types are used in the ColumnDefinition to specify the data type +for generated columns. +""" + + +class ColumnDefinition(BaseModel): + """Defines the specification for a single column in a synthetic data table. + + This class encapsulates all the information needed to generate data for a single column, + including its name, type, constraints, and generation options. It supports both primary key + columns and derived columns that can reference other columns. + + :param name: Name of the column to be generated + :param type: Spark SQL data type for the column (e.g., "string", "int", "timestamp"). + If None, type may be inferred from options or baseColumn + :param primary: If True, this column will be treated as a primary key column with unique values. + Primary columns cannot have min/max options and cannot be nullable + :param options: Dictionary of additional options controlling column generation behavior. + Common options include: min, max, step, values, template, distribution, etc. + See dbldatagen documentation for full list of available options + :param nullable: If True, the column may contain NULL values. Primary columns cannot be nullable + :param omit: If True, this column will be generated internally but excluded from the final output. + Useful for intermediate columns used in calculations + :param baseColumn: Name of another column to use as the basis for generating this column's values. + Default is "id" which refers to the internal row identifier + :param baseColumnType: Method for deriving values from the baseColumn. Common values: + "auto" (infer behavior), "hash" (hash the base column values), + "values" (use base column values directly) + + .. note:: + Primary columns have special constraints: + - Must have a type defined + - Cannot have min/max options + - Cannot be nullable + + .. note:: + Columns can be chained via baseColumn references, but circular dependencies + will be caught during validation + """ + name: str + type: DbldatagenBasicType | None = None + primary: bool = False + options: dict[str, Any] | None = None + nullable: bool | None = False + omit: bool | None = False + baseColumn: str | None = "id" + baseColumnType: str | None = "auto" + + @root_validator() + def check_model_constraints(cls, values: dict[str, Any]) -> dict[str, Any]: + """Validates constraints across the entire ColumnDefinition model. + + This validator runs after all individual field validators and checks for cross-field + constraints that depend on multiple fields being set. It ensures that primary key + columns meet all necessary requirements and that conflicting options are not specified. + + :param values: Dictionary of all field values for this ColumnDefinition instance + :returns: The validated values dictionary, unmodified if all validations pass + :raises ValueError: If primary column has min/max options, or if primary column is nullable, + or if primary column doesn't have a type defined + + .. note:: + This is a Pydantic root validator that runs automatically during model instantiation + """ + is_primary = values.get("primary") + options = values.get("options") or {} # Handle None case + name = values.get("name") + is_nullable = values.get("nullable") + column_type = values.get("type") + + if is_primary: + if "min" in options or "max" in options: + raise ValueError(f"Primary column '{name}' cannot have min/max options.") + + if is_nullable: + raise ValueError(f"Primary column '{name}' cannot be nullable.") + + if column_type is None: + raise ValueError(f"Primary column '{name}' must have a type defined.") + return values diff --git a/dbldatagen/spec/compat.py b/dbldatagen/spec/compat.py new file mode 100644 index 00000000..8fe47508 --- /dev/null +++ b/dbldatagen/spec/compat.py @@ -0,0 +1,57 @@ +"""Pydantic compatibility layer for supporting both Pydantic V1 and V2. + +This module provides a unified interface for Pydantic functionality that works across both +Pydantic V1.x and V2.x versions. It ensures that the dbldatagen spec API works in multiple +environments without requiring specific Pydantic version installations. + +The module exports a consistent Pydantic V1-compatible API regardless of which version is installed: + +- **BaseModel**: Base class for all Pydantic models +- **Field**: Field definition with metadata and validation +- **constr**: Constrained string type for validation +- **root_validator**: Decorator for model-level validation +- **validator**: Decorator for field-level validation + +Usage in other modules: + Always import from this compat module, not directly from pydantic:: + + # Correct + from .compat import BaseModel, validator + + # Incorrect - don't do this + from pydantic import BaseModel, validator + +Environment Support: + - **Pydantic V2.x environments**: Imports from pydantic.v1 compatibility layer + - **Pydantic V1.x environments**: Imports directly from pydantic package + - **Databricks runtimes**: Works with pre-installed Pydantic versions without conflicts + +.. note:: + This approach is inspired by FastAPI's compatibility layer: + https://github.com/fastapi/fastapi/blob/master/fastapi/_compat.py + +Benefits: + - **No Installation Required**: Works with whatever Pydantic version is available + - **Single Codebase**: One set of code works across both Pydantic versions + - **Environment Agnostic**: Application code doesn't need to know which version is installed + - **Future-Ready**: Easy migration path to Pydantic V2 API when ready + - **Databricks Compatible**: Avoids conflicts with pre-installed libraries + +Future Migration: + When ready to migrate to native Pydantic V2 API: + 1. Update application code to use V2 patterns + 2. Modify this compat.py to import from native V2 locations + 3. Test in both environments + 4. Deploy incrementally +""" + +try: + # This will succeed on environments with Pydantic V2.x + # Pydantic V2 provides a v1 compatibility layer for backwards compatibility + from pydantic.v1 import BaseModel, Field, constr, root_validator, validator +except ImportError: + # This will be executed on environments with only Pydantic V1.x + # Import directly from pydantic since v1 subpackage doesn't exist + from pydantic import BaseModel, Field, constr, root_validator, validator # type: ignore[assignment,no-redef] + +__all__ = ["BaseModel", "Field", "constr", "root_validator", "validator"] diff --git a/dbldatagen/spec/generator_spec.py b/dbldatagen/spec/generator_spec.py new file mode 100644 index 00000000..d0a750db --- /dev/null +++ b/dbldatagen/spec/generator_spec.py @@ -0,0 +1,463 @@ +from __future__ import annotations + +import logging +from typing import Any, Literal, Union + +import pandas as pd +from IPython.display import HTML, display + +from dbldatagen.spec.column_spec import ColumnDefinition + +from .compat import BaseModel, validator + + +logger = logging.getLogger(__name__) + + +class UCSchemaTarget(BaseModel): + """Defines a Unity Catalog schema as the output destination for generated data. + + This class represents a Unity Catalog location (catalog.schema) where generated tables + will be written. Unity Catalog is Databricks' unified governance solution for data and AI. + + :param catalog: Unity Catalog catalog name where tables will be written + :param schema_: Unity Catalog schema (database) name within the catalog + :param output_format: Data format for table storage. Defaults to "delta" which is the + recommended format for Unity Catalog tables + + .. note:: + The schema parameter is named `schema_` (with underscore) to avoid conflict with + Python's built-in schema keyword and Pydantic functionality + + .. note:: + Tables will be written to the location: `{catalog}.{schema_}.{table_name}` + """ + catalog: str + schema_: str + output_format: str = "delta" # Default to delta for UC Schema + + @validator("catalog", "schema_") + def validate_identifiers(cls, v: str) -> str: + """Validates that catalog and schema names are valid identifiers. + + Ensures the identifier is non-empty and follows Python identifier conventions. + Issues a warning if the identifier is not a basic Python identifier, as this may + cause issues with Unity Catalog. + + :param v: The identifier string to validate (catalog or schema name) + :returns: The validated and stripped identifier string + :raises ValueError: If the identifier is empty or contains only whitespace + + .. note:: + This is a Pydantic field validator that runs automatically during model instantiation + """ + if not v.strip(): + raise ValueError("Identifier must be non-empty.") + if not v.isidentifier(): + logger.warning( + f"'{v}' is not a basic Python identifier. Ensure validity for Unity Catalog.") + return v.strip() + + def __str__(self) -> str: + """Returns a human-readable string representation of the Unity Catalog target. + + :returns: Formatted string showing catalog, schema, format and type + """ + return f"{self.catalog}.{self.schema_} (Format: {self.output_format}, Type: UC Table)" + + +class FilePathTarget(BaseModel): + """Defines a file system path as the output destination for generated data. + + This class represents a file system location where generated tables will be written + as files. Each table will be written to a subdirectory within the base path. + + :param base_path: Base file system path where table data files will be written. + Each table will be written to {base_path}/{table_name}/ + :param output_format: File format for data storage. Must be either "csv" or "parquet". + No default value - must be explicitly specified + + .. note:: + Unlike UCSchemaTarget, this requires an explicit output_format with no default + + .. note:: + The base_path can be a local file system path, DBFS path, or cloud storage path + (e.g., s3://, gs://, abfs://) depending on your environment + """ + base_path: str + output_format: Literal["csv", "parquet"] # No default, must be specified + + @validator("base_path") + def validate_base_path(cls, v: str) -> str: + """Validates that the base path is non-empty. + + :param v: The base path string to validate + :returns: The validated and stripped base path string + :raises ValueError: If the base path is empty or contains only whitespace + + .. note:: + This is a Pydantic field validator that runs automatically during model instantiation + """ + if not v.strip(): + raise ValueError("base_path must be non-empty.") + return v.strip() + + def __str__(self) -> str: + """Returns a human-readable string representation of the file path target. + + :returns: Formatted string showing base path, format and type + """ + return f"{self.base_path} (Format: {self.output_format}, Type: File Path)" + + +class TableDefinition(BaseModel): + """Defines the complete specification for a single synthetic data table. + + This class encapsulates all the information needed to generate a table of synthetic data, + including the number of rows, partitioning, and column specifications. + + :param number_of_rows: Total number of data rows to generate for this table. + Must be a positive integer + :param partitions: Number of Spark partitions to use when generating data. + If None, defaults to Spark's default parallelism setting. + More partitions can improve generation speed for large datasets + :param columns: List of ColumnDefinition objects specifying the columns to generate + in this table. At least one column must be specified + + .. note:: + Setting an appropriate number of partitions can significantly impact generation performance. + As a rule of thumb, use 2-4 partitions per CPU core available in your Spark cluster + + .. note:: + Column order in the list determines the order of columns in the generated output + """ + number_of_rows: int + partitions: int | None = None + columns: list[ColumnDefinition] + + +class ValidationResult: + """Container for validation results that collects errors and warnings during spec validation. + + This class accumulates validation issues found while checking a DatagenSpec configuration. + It distinguishes between errors (which prevent data generation) and warnings (which + indicate potential issues but don't block generation). + + .. note:: + Validation passes if there are no errors, even if warnings are present + """ + + def __init__(self) -> None: + """Initialize an empty ValidationResult with no errors or warnings.""" + self.errors: list[str] = [] + self.warnings: list[str] = [] + + def add_error(self, message: str) -> None: + """Add an error message to the validation results. + + Errors indicate critical issues that will prevent successful data generation. + + :param message: Descriptive error message explaining the validation failure + """ + self.errors.append(message) + + def add_warning(self, message: str) -> None: + """Add a warning message to the validation results. + + Warnings indicate potential issues or non-optimal configurations that may affect + data generation but won't prevent it from completing. + + :param message: Descriptive warning message explaining the potential issue + """ + self.warnings.append(message) + + def is_valid(self) -> bool: + """Check if validation passed without errors. + + :returns: True if there are no errors (warnings are allowed), False otherwise + """ + return len(self.errors) == 0 + + def __str__(self) -> str: + """Generate a formatted string representation of all validation results. + + :returns: Multi-line string containing formatted errors and warnings with counts + """ + lines = [] + if self.is_valid(): + lines.append("✓ Validation passed successfully") + else: + lines.append("✗ Validation failed") + + if self.errors: + lines.append(f"\nErrors ({len(self.errors)}):") + for i, error in enumerate(self.errors, 1): + lines.append(f" {i}. {error}") + + if self.warnings: + lines.append(f"\nWarnings ({len(self.warnings)}):") + for i, warning in enumerate(self.warnings, 1): + lines.append(f" {i}. {warning}") + + return "\n".join(lines) + +class DatagenSpec(BaseModel): + """Top-level specification for synthetic data generation across one or more tables. + + This is the main configuration class for the dbldatagen spec-based API. It defines all tables + to be generated, where the output should be written, and global generation options. + + :param tables: Dictionary mapping table names to their TableDefinition specifications. + Keys are the table names that will be used in the output destination + :param output_destination: Target location for generated data. Can be either a + UCSchemaTarget (Unity Catalog) or FilePathTarget (file system). + If None, data will be generated but not persisted + :param generator_options: Dictionary of global options affecting data generation behavior. + Common options include: + - random: Enable random data generation + - randomSeed: Seed for reproducible random generation + - randomSeedMethod: Method for computing random seeds + - verbose: Enable verbose logging + - debug: Enable debug logging + - seedColumnName: Name of internal seed column + :param intended_for_databricks: Flag indicating if this spec is designed for Databricks. + May be automatically inferred based on configuration + + .. note:: + Call the validate() method before using this spec to ensure configuration is correct + + .. note:: + Multiple tables can share the same DatagenSpec and will be generated in the order + they appear in the tables dictionary + """ + tables: dict[str, TableDefinition] + output_destination: Union[UCSchemaTarget, FilePathTarget] | None = None # there is a abstraction, may be we can use that? talk to Greg + generator_options: dict[str, Any] | None = None + intended_for_databricks: bool | None = None # May be infered. + + def _check_circular_dependencies( + self, + table_name: str, + columns: list[ColumnDefinition] + ) -> list[str]: + """Check for circular dependencies in baseColumn references within a table. + + Analyzes column dependencies to detect cycles where columns reference each other + in a circular manner (e.g., col A depends on col B, col B depends on col A). + Such circular dependencies would make data generation impossible. + + :param table_name: Name of the table being validated (used in error messages) + :param columns: List of ColumnDefinition objects to check for circular dependencies + :returns: List of error message strings describing any circular dependencies found. + Empty list if no circular dependencies exist + + .. note:: + This method performs a graph traversal to detect cycles in the dependency chain + """ + errors = [] + column_map = {col.name: col for col in columns} + + for col in columns: + if col.baseColumn and col.baseColumn != "id": + # Track the dependency chain + visited: set[str] = set() + current = col.name + + while current: + if current in visited: + # Found a cycle + cycle_path = " -> ".join([*list(visited), current]) + errors.append( + f"Table '{table_name}': Circular dependency detected in column '{col.name}': {cycle_path}" + ) + break + + visited.add(current) + current_col = column_map.get(current) + + if not current_col: + break + + # Move to the next column in the chain + if current_col.baseColumn and current_col.baseColumn != "id": + if current_col.baseColumn not in column_map: + # baseColumn doesn't exist - we'll catch this in another validation + break + current = current_col.baseColumn + else: + # Reached a column that doesn't have a baseColumn or uses "id" + break + + return errors + + def validate(self, strict: bool = True) -> ValidationResult: # type: ignore[override] + """Validate the entire DatagenSpec configuration comprehensively. + + This method performs extensive validation of the entire spec, including: + - Ensuring at least one table is defined + - Validating each table has columns and positive row counts + - Checking for duplicate column names within tables + - Verifying baseColumn references point to existing columns + - Detecting circular dependencies in baseColumn chains + - Validating primary key constraints + - Checking output destination configuration + - Validating generator options + + All validation checks are performed regardless of whether errors are found, allowing + you to see all issues at once rather than fixing them one at a time. + + :param strict: Controls validation failure behavior: + - If True: Raises ValueError for any errors OR warnings found + - If False: Only raises ValueError for errors (warnings are tolerated) + :returns: ValidationResult object containing all collected errors and warnings, + even if an exception is raised + :raises ValueError: If validation fails based on strict mode setting. + The exception message contains the formatted ValidationResult + + .. note:: + It's recommended to call validate() before attempting to generate data to catch + configuration issues early + + .. note:: + Use strict=False during development to see warnings without blocking generation + """ + result = ValidationResult() + + # 1. Check that there's at least one table + if not self.tables: + result.add_error("Spec must contain at least one table definition") + + # 2. Validate each table (continue checking all tables even if errors found) + for table_name, table_def in self.tables.items(): + # Check table has at least one column + if not table_def.columns: + result.add_error(f"Table '{table_name}' must have at least one column") + continue # Skip further checks for this table since it has no columns + + # Check row count is positive + if table_def.number_of_rows <= 0: + result.add_error( + f"Table '{table_name}' has invalid number_of_rows: {table_def.number_of_rows}. " + "Must be a positive integer." + ) + + # Check partitions if specified + #TODO: though this can be a model field check, we are checking here so that one can correct + # Can we find a way to use the default way? + if table_def.partitions is not None and table_def.partitions <= 0: + result.add_error( + f"Table '{table_name}' has invalid partitions: {table_def.partitions}. " + "Must be a positive integer or None." + ) + + # Check for duplicate column names + # TODO: Not something possible if we right model, recheck + column_names = [col.name for col in table_def.columns] + duplicates = [name for name in set(column_names) if column_names.count(name) > 1] + if duplicates: + result.add_error( + f"Table '{table_name}' has duplicate column names: {', '.join(duplicates)}" + ) + + # Build column map for reference checking + column_map = {col.name: col for col in table_def.columns} + + # TODO: Check baseColumn references, this is tricky? check the dbldefaults + for col in table_def.columns: + if col.baseColumn and col.baseColumn != "id": + if col.baseColumn not in column_map: + result.add_error( + f"Table '{table_name}', column '{col.name}': " + f"baseColumn '{col.baseColumn}' does not exist in the table" + ) + + # Check for circular dependencies in baseColumn references + circular_errors = self._check_circular_dependencies(table_name, table_def.columns) + for error in circular_errors: + result.add_error(error) + + # Check primary key constraints + primary_columns = [col for col in table_def.columns if col.primary] + if len(primary_columns) > 1: + primary_names = [col.name for col in primary_columns] + result.add_warning( + f"Table '{table_name}' has multiple primary columns: {', '.join(primary_names)}. " + "This may not be the intended behavior." + ) + + # Check for columns with no type and not using baseColumn properly + for col in table_def.columns: + if not col.primary and not col.type and not col.options: + result.add_warning( + f"Table '{table_name}', column '{col.name}': " + "No type specified and no options provided. " + "Column may not generate data as expected." + ) + + # 3. Check output destination + if not self.output_destination: + result.add_warning( + "No output_destination specified. Data will be generated but not persisted. " + "Set output_destination to save generated data." + ) + + # 4. Validate generator options (if any known options) + if self.generator_options: + known_options = [ + "random", "randomSeed", "randomSeedMethod", "verbose", + "debug", "seedColumnName" + ] + for key in self.generator_options: + if key not in known_options: + result.add_warning( + f"Unknown generator option: '{key}'. " + "This may be ignored during generation." + ) + + # Now that all validations are complete, decide whether to raise + if (strict and (result.errors or result.warnings)) or (not strict and result.errors): + raise ValueError(str(result)) + + return result + + + def display_all_tables(self) -> None: + """Display a formatted view of all table definitions in the spec. + + This method provides a user-friendly visualization of the spec configuration, showing + each table's structure and the output destination. It's designed for use in Jupyter + notebooks and will render HTML output when available. + + For each table, displays: + - Table name + - Output destination (or warning if not configured) + - DataFrame showing all columns with their properties + + .. note:: + This method uses IPython.display.HTML when available, falling back to plain text + output in non-notebook environments + + .. note:: + This is intended for interactive exploration and debugging of spec configurations + """ + for table_name, table_def in self.tables.items(): + print(f"Table: {table_name}") + + if self.output_destination: + output = f"{self.output_destination}" + display(HTML(f"Output destination: {output}")) + else: + message = ( + "Output destination: " + "None
" + "Set it using the output_destination " + "attribute on your DatagenSpec object " + "(e.g., my_spec.output_destination = UCSchemaTarget(...))." + ) + display(HTML(message)) + + df = pd.DataFrame([col.dict() for col in table_def.columns]) + try: + display(df) + except NameError: + print(df.to_string()) diff --git a/dbldatagen/spec/generator_spec_impl.py b/dbldatagen/spec/generator_spec_impl.py new file mode 100644 index 00000000..fc53863e --- /dev/null +++ b/dbldatagen/spec/generator_spec_impl.py @@ -0,0 +1,347 @@ +import logging +import posixpath +from typing import Any, Union + +from pyspark.sql import SparkSession + +import dbldatagen as dg +from dbldatagen.spec.generator_spec import TableDefinition + +from .generator_spec import ColumnDefinition, DatagenSpec, FilePathTarget, UCSchemaTarget + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + +INTERNAL_ID_COLUMN_NAME = "id" + + +class Generator: + """Main orchestrator for generating synthetic data from DatagenSpec configurations. + + This class provides the primary interface for the spec-based data generation API. It handles + the complete lifecycle of data generation: + 1. Converting spec configurations into dbldatagen DataGenerator objects + 2. Building the actual data as Spark DataFrames + 3. Writing the data to specified output destinations (Unity Catalog or file system) + + The Generator encapsulates all the complexity of translating declarative specs into + executable data generation plans, allowing users to focus on what data they want rather + than how to generate it. + + :param spark: Active SparkSession to use for data generation + :param app_name: Application name used in logging and tracking. Defaults to "DataGen_ClassBased" + + .. note:: + The Generator requires an active SparkSession. On Databricks, you can use the pre-configured + `spark` variable. For local development, create a SparkSession first + + .. note:: + The same Generator instance can be reused to generate multiple different specs + """ + + def __init__(self, spark: SparkSession, app_name: str = "DataGen_ClassBased") -> None: + """Initialize the Generator with a SparkSession. + + :param spark: An active SparkSession instance to use for data generation operations + :param app_name: Application name for logging and identification purposes + :raises RuntimeError: If spark is None or not properly initialized + """ + if not spark: + logger.error( + "SparkSession cannot be None during Generator initialization") + raise RuntimeError("SparkSession cannot be None") + self.spark = spark + self._created_spark_session = False + self.app_name = app_name + logger.info("Generator initialized with SparkSession") + + def _columnspec_to_datagen_columnspec(self, col_def: ColumnDefinition) -> dict[str, Any]: + """Convert a ColumnDefinition spec into dbldatagen DataGenerator column arguments. + + This internal method translates the declarative ColumnDefinition format into the + keyword arguments expected by dbldatagen's withColumn() method. It handles special + cases like primary keys, nullable columns, and omitted columns. + + Primary key columns receive special treatment: + - Automatically use the internal ID column as their base + - String primary keys use hash-based generation + - Numeric primary keys maintain sequential values + + :param col_def: ColumnDefinition object from a DatagenSpec + :returns: Dictionary of keyword arguments suitable for DataGenerator.withColumn() + + .. note:: + This is an internal method not intended for direct use by end users + + .. note:: + Conflicting options for primary keys (like min/max, values, expr) will generate + warnings but won't prevent generation - the primary key behavior takes precedence + """ + col_name = col_def.name + col_type = col_def.type + kwargs = col_def.options.copy() if col_def.options is not None else {} + + if col_def.primary: + kwargs["colType"] = col_type + kwargs["baseColumn"] = INTERNAL_ID_COLUMN_NAME + + if col_type == "string": + kwargs["baseColumnType"] = "hash" + elif col_type not in ["int", "long", "integer", "bigint", "short"]: + kwargs["baseColumnType"] = "auto" + logger.warning( + f"Primary key '{col_name}' has non-standard type '{col_type}'") + + # Log conflicting options for primary keys + conflicting_opts_for_pk = [ + "distribution", "template", "dataRange", "random", "omit", + "min", "max", "uniqueValues", "values", "expr" + ] + + for opt_key in conflicting_opts_for_pk: + if opt_key in kwargs: + logger.warning( + f"Primary key '{col_name}': Option '{opt_key}' may be ignored") + + if col_def.omit is not None and col_def.omit: + kwargs["omit"] = True + else: + kwargs = col_def.options.copy() if col_def.options is not None else {} + + if col_type: + kwargs["colType"] = col_type + if col_def.baseColumn: + kwargs["baseColumn"] = col_def.baseColumn + if col_def.baseColumnType: + kwargs["baseColumnType"] = col_def.baseColumnType + if col_def.omit is not None: + kwargs["omit"] = col_def.omit + + return kwargs + + def _prepare_data_generators( + self, + config: DatagenSpec, + config_source_name: str = "PydanticConfig" + ) -> dict[str, dg.DataGenerator]: + """Prepare DataGenerator objects for all tables defined in the spec. + + This internal method is the first phase of data generation. It processes the DatagenSpec + and creates configured dbldatagen.DataGenerator objects for each table, but does not + yet build the actual data. Each table's definition is converted into a DataGenerator + with all its columns configured. + + The method: + 1. Iterates through all tables in the spec + 2. Creates a DataGenerator for each table with appropriate row count and partitioning + 3. Adds all columns to each DataGenerator using withColumn() + 4. Applies global generator options + 5. Returns the prepared generators ready for building + + :param config: DatagenSpec containing table definitions and configuration + :param config_source_name: Descriptive name for the config source, used in logging + and DataGenerator naming + :returns: Dictionary mapping table names to their prepared DataGenerator instances + :raises RuntimeError: If SparkSession is not available or if any table preparation fails + :raises ValueError: If table configuration is invalid (should be caught by validate() first) + + .. note:: + This is an internal method. Use generate_and_write_data() for the complete workflow + + .. note:: + Preparation is separate from building to allow inspection and modification of + DataGenerators before data generation begins + """ + logger.info( + f"Preparing data generators for {len(config.tables)} tables") + + if not self.spark: + logger.error( + "SparkSession is not available. Cannot prepare data generators") + raise RuntimeError( + "SparkSession is not available. Cannot prepare data generators") + + tables_config: dict[str, TableDefinition] = config.tables + global_gen_options = config.generator_options if config.generator_options else {} + + prepared_generators: dict[str, dg.DataGenerator] = {} + generation_order = list(tables_config.keys()) # This becomes impotant when we get into multitable + + for table_name in generation_order: + table_spec = tables_config[table_name] + logger.info(f"Preparing table: {table_name}") + + try: + # Create DataGenerator instance + data_gen = dg.DataGenerator( + sparkSession=self.spark, + name=f"{table_name}_spec_from_{config_source_name}", + rows=table_spec.number_of_rows, + partitions=table_spec.partitions, + **global_gen_options, + ) + + # Process each column + for col_def in table_spec.columns: + kwargs = self._columnspec_to_datagen_columnspec(col_def) + data_gen = data_gen.withColumn(colName=col_def.name, **kwargs) + # Has performance implications. + + prepared_generators[table_name] = data_gen + logger.info(f"Successfully prepared table: {table_name}") + + except Exception as e: + logger.error(f"Failed to prepare table '{table_name}': {e}") + raise RuntimeError( + f"Failed to prepare table '{table_name}': {e}") from e + + logger.info("All data generators prepared successfully") + return prepared_generators + + def write_prepared_data( + self, + prepared_generators: dict[str, dg.DataGenerator], + output_destination: Union[UCSchemaTarget, FilePathTarget, None], + config_source_name: str = "PydanticConfig", + ) -> None: + """Build and write data from prepared generators to the specified output destination. + + This method handles the second phase of data generation: taking prepared DataGenerator + objects, building them into actual Spark DataFrames, and writing the results to the + configured output location. + + The method: + 1. Iterates through all prepared generators + 2. Builds each generator into a DataFrame using build() + 3. Writes the DataFrame to the appropriate destination: + - For FilePathTarget: Writes to {base_path}/{table_name}/ in specified format + - For UCSchemaTarget: Writes to {catalog}.{schema}.{table_name} as managed table + 4. Logs row counts and write locations + + :param prepared_generators: Dictionary mapping table names to DataGenerator objects + (typically from _prepare_data_generators()) + :param output_destination: Target location for output. Can be UCSchemaTarget, + FilePathTarget, or None (no write, data generated only) + :param config_source_name: Descriptive name for the config source, used in logging + :raises RuntimeError: If DataFrame building or writing fails for any table + :raises ValueError: If output destination type is not recognized + + .. note:: + If output_destination is None, data is generated but not persisted anywhere. + This can be useful for testing or when you want to process the data in-memory + + .. note:: + Writing uses "overwrite" mode, so existing tables/files will be replaced + """ + logger.info("Starting data writing phase") + + if not prepared_generators: + logger.warning("No prepared data generators to write") + return + + for table_name, data_gen in prepared_generators.items(): + logger.info(f"Writing table: {table_name}") + + try: + df = data_gen.build() + requested_rows = data_gen.rowCount + actual_row_count = df.count() + logger.info( + f"Built DataFrame for '{table_name}': {actual_row_count} rows (requested: {requested_rows})") + + if actual_row_count == 0 and requested_rows is not None and requested_rows > 0: + logger.warning(f"Table '{table_name}': Requested {requested_rows} rows but built 0") + + # Write data based on destination type + if isinstance(output_destination, FilePathTarget): + output_path = posixpath.join(output_destination.base_path, table_name) + df.write.format(output_destination.output_format).mode("overwrite").save(output_path) + logger.info(f"Wrote table '{table_name}' to file path: {output_path}") + + elif isinstance(output_destination, UCSchemaTarget): + output_table = f"{output_destination.catalog}.{output_destination.schema_}.{table_name}" + df.write.mode("overwrite").saveAsTable(output_table) + logger.info(f"Wrote table '{table_name}' to Unity Catalog: {output_table}") + else: + logger.warning("No output destination specified, skipping data write") + return + except Exception as e: + logger.error(f"Failed to write table '{table_name}': {e}") + raise RuntimeError(f"Failed to write table '{table_name}': {e}") from e + logger.info("All data writes completed successfully") + + def generate_and_write_data( + self, + config: DatagenSpec, + config_source_name: str = "PydanticConfig" + ) -> None: + """Execute the complete data generation workflow from spec to output. + + This is the primary high-level method for generating data from a DatagenSpec. It + orchestrates the entire process in one call, handling both preparation and writing phases. + + The complete workflow: + 1. Validates that the config is properly structured (you should call config.validate() first) + 2. Converts the spec into DataGenerator objects for each table + 3. Builds the DataFrames by executing the generation logic + 4. Writes the results to the configured output destination + 5. Logs progress and completion status + + This method is the recommended entry point for most use cases. For more control over + the generation process, use _prepare_data_generators() and write_prepared_data() separately. + + :param config: DatagenSpec object defining tables, columns, and output destination. + Should be validated with config.validate() before calling this method + :param config_source_name: Descriptive name for the config source, used in logging + and naming DataGenerator instances + :raises RuntimeError: If SparkSession is unavailable, or if preparation or writing fails + :raises ValueError: If the config is invalid (though config.validate() should catch this first) + + .. note:: + It's strongly recommended to call config.validate() before this method to catch + configuration errors early with better error messages + + .. note:: + Generation is performed sequentially: table1 is fully generated and written before + table2 begins. For multi-table generation with dependencies, the order matters + + Example: + >>> spec = DatagenSpec( + ... tables={"users": user_table_def}, + ... output_destination=UCSchemaTarget(catalog="main", schema_="test") + ... ) + >>> spec.validate() # Check for errors first + >>> generator = Generator(spark) + >>> generator.generate_and_write_data(spec) + """ + logger.info(f"Starting combined data generation and writing for {len(config.tables)} tables") + + try: + # Phase 1: Prepare data generators + prepared_generators_map = self._prepare_data_generators(config, config_source_name) + + if not prepared_generators_map and list(config.tables.keys()): + logger.warning( + "No data generators were successfully prepared, though tables were defined") + return + + # Phase 2: Write data + self.write_prepared_data( + prepared_generators_map, + config.output_destination, + config_source_name + ) + + logger.info( + "Combined data generation and writing completed successfully") + + except Exception as e: + logger.error( + f"Error during combined data generation and writing: {e}") + raise RuntimeError( + f"Error during combined data generation and writing: {e}") from e diff --git a/makefile b/makefile index 772397bf..a551f964 100644 --- a/makefile +++ b/makefile @@ -3,24 +3,21 @@ all: clean dev lint fmt test clean: - rm -fr .venv clean htmlcov .mypy_cache .pytest_cache .ruff_cache .coverage coverage.xml + rm -fr clean htmlcov .mypy_cache .pytest_cache .ruff_cache .coverage coverage.xml rm -fr **/*.pyc -.venv/bin/python: - pip install hatch - hatch env create - -dev: .venv/bin/python +dev: + @which hatch > /dev/null || pip install hatch @hatch run which python lint: - hatch run verify + hatch run test-pydantic.2.8.2:verify fmt: - hatch run fmt + hatch run test-pydantic.2.8.2:fmt test: - hatch run test + hatch run test-pydantic:test test-coverage: make test && open htmlcov/index.html diff --git a/pydantic_compat.md b/pydantic_compat.md new file mode 100644 index 00000000..abf26e60 --- /dev/null +++ b/pydantic_compat.md @@ -0,0 +1,101 @@ +To write code that works on both Pydantic V1 and V2 and ensures a smooth future migration, you should code against the V1 API but import it through a compatibility shim. This approach uses V1's syntax, which Pydantic V2 can understand via its built-in V1 compatibility layer. + +----- + +### \#\# The Golden Rule: Code to V1, Import via a Shim 💡 + +The core strategy is to **write all your models using Pydantic V1 syntax and features**. You then use a special utility file to handle the imports, which makes your application code completely agnostic to the installed Pydantic version. + +----- + +### \#\# 1. Implement a Compatibility Shim (`compat.py`) + +This is the most critical step. Create a file named `compat.py` in your project that intelligently imports Pydantic components. Your application will import everything from this file instead of directly from `pydantic`. + +```python +# compat.py +# This module acts as a compatibility layer for Pydantic V1 and V2. + +try: + # This will succeed on environments with Pydantic V2.x + # It imports the V1 API that is bundled within V2. + from pydantic.v1 import BaseModel, Field, validator, constr + +except ImportError: + # This will be executed on environments with only Pydantic V1.x + from pydantic import BaseModel, Field, validator, constr + +# In your application code, do this: +# from .compat import BaseModel +# NOT this: +# from pydantic import BaseModel +``` + +----- + +### \#\# 2. Stick to V1 Features and Syntax (Do's and Don'ts) + +By following these rules in your application code, you ensure the logic works on both versions. + +#### **✅ Models and Fields: DO** + + * Use standard `BaseModel` and `Field` for all your data structures. This is the most stable part of the API. + +#### **❌ Models and Fields: DON'T** + + * **Do not use `__root__` models**. This V1 feature was removed in V2 and the compatibility is not perfect. Instead, model the data explicitly, even if it feels redundant. + * **Bad (Avoid):** `class MyList(BaseModel): __root__: list[str]` + * **Good (Compatible):** `class MyList(BaseModel): items: list[str]` + +#### **✅ Configuration: DO** + + * Use the nested `class Config:` for model configuration. This is the V1 way and is fully supported by the V2 compatibility layer. + * **Example:** + ```python + from .compat import BaseModel + + class User(BaseModel): + id: int + full_name: str + + class Config: + orm_mode = True # V2's compatibility layer translates this + allow_population_by_field_name = True + ``` + +#### **❌ Configuration: DON'T** + + * **Do not use the V2 `model_config` dictionary**. This is a V2-only feature. + +#### **✅ Validators and Data Types: DO** + + * Use the standard V1 `@validator`. It's robust and works perfectly across both versions. + * Use V1 constrained types like `constr`, `conint`, `conlist`. + * **Example:** + ```python + from .compat import BaseModel, validator, constr + + class Product(BaseModel): + name: constr(min_length=3) + + @validator("name") + def name_must_be_alpha(cls, v): + if not v.isalpha(): + raise ValueError("Name must be alphabetic") + return v + ``` + +#### **❌ Validators and Data Types: DON'T** + + * **Do not use V2 decorators** like `@field_validator`, `@model_validator`, or `@field_serializer`. + * **Do not use the V2 `Annotated` syntax** for validation (e.g., `Annotated[str, StringConstraints(min_length=2)]`). + +----- + +### \#\# 3. The Easy Migration Path + +When you're finally ready to leave V1 behind and upgrade your code to be V2-native, the process will be straightforward because your code is already consistent: + +1. **Change Imports**: Your first step will be a simple find-and-replace to change all `from .compat import ...` statements to `from pydantic import ...`. +2. **Run a Codelinter**: Tools like **Ruff** have built-in rules that can automatically refactor most of your V1 syntax (like `Config` classes and `@validator`s) to the new V2 syntax. +3. **Manual Refinements**: Address any complex patterns the automated tools couldn't handle, like replacing your `__root__` model alternatives. \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 13728ba2..99be0820 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -103,21 +103,35 @@ dependencies = [ "jmespath>=0.10.0", "py4j>=0.10.9", "pickleshare>=0.7.5", + "ipython>=7.32.0", ] python="3.10" - -# store virtual env as the child of this folder. Helps VSCode (and PyCharm) to run better path = ".venv" [tool.hatch.envs.default.scripts] test = "pytest tests/ -n 10 --cov --cov-report=html --timeout 600 --durations 20" -fmt = ["ruff check . --fix", - "mypy .", - "pylint --output-format=colorized -j 0 dbldatagen tests"] -verify = ["ruff check .", - "mypy .", - "pylint --output-format=colorized -j 0 dbldatagen tests"] +fmt = [ + "ruff check . --fix", + "mypy .", + "pylint --output-format=colorized -j 0 dbldatagen tests" +] +verify = [ + "ruff check .", + "mypy .", + "pylint --output-format=colorized -j 0 dbldatagen tests" +] + + +[tool.hatch.envs.test-pydantic] +template = "default" +matrix = [ + { pydantic_version = ["1.10.6", "2.8.2"] } +] +extra-dependencies = [ + "pydantic=={matrix:pydantic_version}" +] + # Ruff configuration - replaces flake8, isort, pydocstyle, etc. [tool.ruff] @@ -418,7 +432,7 @@ check_untyped_defs = true disallow_untyped_decorators = false no_implicit_optional = true warn_redundant_casts = true -warn_unused_ignores = true +warn_unused_ignores = false warn_no_return = true warn_unreachable = true strict_equality = true diff --git a/scratch.md b/scratch.md new file mode 100644 index 00000000..a3afa5c3 --- /dev/null +++ b/scratch.md @@ -0,0 +1,4 @@ +Pydantic Notes +https://docs.databricks.com/aws/en/release-notes/runtime/14.3lts - 1.10.6 +https://docs.databricks.com/aws/en/release-notes/runtime/15.4lts - 1.10.6 +https://docs.databricks.com/aws/en/release-notes/runtime/16.4lts - 2.8.2 (2.20.1 - core) \ No newline at end of file diff --git a/tests/test_specs.py b/tests/test_specs.py new file mode 100644 index 00000000..d3c8ab2c --- /dev/null +++ b/tests/test_specs.py @@ -0,0 +1,466 @@ +from dbldatagen.spec.generator_spec import DatagenSpec +import pytest +from dbldatagen.spec.generator_spec import ( + DatagenSpec, + TableDefinition, + ColumnDefinition, + UCSchemaTarget, + FilePathTarget, + ValidationResult +) + +class TestValidationResult: + """Tests for ValidationResult class""" + + def test_empty_result_is_valid(self): + result = ValidationResult() + assert result.is_valid() + assert len(result.errors) == 0 + assert len(result.warnings) == 0 + + def test_result_with_errors_is_invalid(self): + result = ValidationResult() + result.add_error("Test error") + assert not result.is_valid() + assert len(result.errors) == 1 + + def test_result_with_only_warnings_is_valid(self): + result = ValidationResult() + result.add_warning("Test warning") + assert result.is_valid() + assert len(result.warnings) == 1 + + def test_result_string_representation(self): + result = ValidationResult() + result.add_error("Error 1") + result.add_error("Error 2") + result.add_warning("Warning 1") + + result_str = str(result) + assert "✗ Validation failed" in result_str + assert "Errors (2)" in result_str + assert "Error 1" in result_str + assert "Error 2" in result_str + assert "Warnings (1)" in result_str + assert "Warning 1" in result_str + + def test_valid_result_string_representation(self): + result = ValidationResult() + result_str = str(result) + assert "✓ Validation passed successfully" in result_str + + +class TestColumnDefinitionValidation: + """Tests for ColumnDefinition validation""" + + def test_valid_primary_column(self): + col = ColumnDefinition( + name="id", + type="int", + primary=True + ) + assert col.primary + assert col.type == "int" + + def test_primary_column_with_min_max_raises_error(self): + with pytest.raises(ValueError, match="cannot have min/max options"): + ColumnDefinition( + name="id", + type="int", + primary=True, + options={"min": 1, "max": 100} + ) + + def test_primary_column_nullable_raises_error(self): + with pytest.raises(ValueError, match="cannot be nullable"): + ColumnDefinition( + name="id", + type="int", + primary=True, + nullable=True + ) + + def test_primary_column_without_type_raises_error(self): + with pytest.raises(ValueError, match="must have a type defined"): + ColumnDefinition( + name="id", + primary=True + ) + + def test_non_primary_column_without_type(self): + # Should not raise + col = ColumnDefinition( + name="data", + options={"values": ["a", "b", "c"]} + ) + assert col.name == "data" + + +class TestDatagenSpecValidation: + """Tests for DatagenSpec.validate() method""" + + def test_valid_spec_passes_validation(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="name", type="string", options={"values": ["Alice", "Bob"]}), + ] + ) + }, + output_destination=UCSchemaTarget(catalog="main", schema_="default") + ) + + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.errors) == 0 + + def test_empty_tables_raises_error(self): + spec = DatagenSpec(tables={}) + + with pytest.raises(ValueError, match="at least one table"): + spec.validate(strict=True) + + def test_table_without_columns_raises_error(self): + spec = DatagenSpec( + tables={ + "empty_table": TableDefinition( + number_of_rows=100, + columns=[] + ) + } + ) + + with pytest.raises(ValueError, match="must have at least one column"): + spec.validate() + + def test_negative_row_count_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=-10, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + ) + + with pytest.raises(ValueError, match="invalid number_of_rows"): + spec.validate() + + def test_zero_row_count_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=0, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + ) + + with pytest.raises(ValueError, match="invalid number_of_rows"): + spec.validate() + + def test_invalid_partitions_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + partitions=-5, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + ) + + with pytest.raises(ValueError, match="invalid partitions"): + spec.validate() + + def test_duplicate_column_names_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="duplicate", type="string"), + ColumnDefinition(name="duplicate", type="int"), + ] + ) + } + ) + + with pytest.raises(ValueError, match="duplicate column names"): + spec.validate() + + def test_invalid_base_column_reference_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="email", type="string", baseColumn="nonexistent"), + ] + ) + } + ) + + with pytest.raises(ValueError, match="does not exist"): + spec.validate() + + def test_circular_dependency_raises_error(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="col_a", type="string", baseColumn="col_b"), + ColumnDefinition(name="col_b", type="string", baseColumn="col_c"), + ColumnDefinition(name="col_c", type="string", baseColumn="col_a"), + ] + ) + } + ) + + with pytest.raises(ValueError, match="Circular dependency"): + spec.validate() + + def test_multiple_primary_columns_warning(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id1", type="int", primary=True), + ColumnDefinition(name="id2", type="int", primary=True), + ] + ) + } + ) + + # In strict mode, warnings cause errors + with pytest.raises(ValueError, match="multiple primary columns"): + spec.validate(strict=True) + + # In non-strict mode, should pass but have warnings + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.warnings) > 0 + assert any("multiple primary columns" in w for w in result.warnings) + + def test_column_without_type_or_options_warning(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="empty_col"), + ] + ) + } + ) + + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.warnings) > 0 + assert any("No type specified" in w for w in result.warnings) + + def test_no_output_destination_warning(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + ) + + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.warnings) > 0 + assert any("No output_destination" in w for w in result.warnings) + + def test_unknown_generator_option_warning(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + }, + generator_options={"unknown_option": "value"} + ) + + result = spec.validate(strict=False) + assert result.is_valid() + assert len(result.warnings) > 0 + assert any("Unknown generator option" in w for w in result.warnings) + + def test_multiple_errors_collected(self): + """Test that all errors are collected before raising""" + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=-10, # Error 1 + partitions=0, # Error 2 + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="id", type="string"), # Error 3: duplicate + ColumnDefinition(name="email", baseColumn="phone"), # Error 4: nonexistent + ] + ) + } + ) + + with pytest.raises(ValueError) as exc_info: + spec.validate() + + error_msg = str(exc_info.value) + # Should contain all errors + assert "invalid number_of_rows" in error_msg + assert "invalid partitions" in error_msg + assert "duplicate column names" in error_msg + assert "does not exist" in error_msg + + def test_strict_mode_raises_on_warnings(self): + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ) + } + # No output_destination - will generate warning + ) + + # Strict mode should raise + with pytest.raises(ValueError): + spec.validate(strict=True) + + # Non-strict mode should pass + result = spec.validate(strict=False) + assert result.is_valid() + + def test_valid_base_column_chain(self): + """Test that valid baseColumn chains work""" + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ + ColumnDefinition(name="id", type="int", primary=True), + ColumnDefinition(name="code", type="string", baseColumn="id"), + ColumnDefinition(name="hash", type="string", baseColumn="code"), + ] + ) + }, + output_destination=FilePathTarget(base_path="/tmp/data", output_format="parquet") + ) + + result = spec.validate(strict=False) + assert result.is_valid() + + def test_multiple_tables_validation(self): + """Test validation across multiple tables""" + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=100, + columns=[ColumnDefinition(name="id", type="int", primary=True)] + ), + "orders": TableDefinition( + number_of_rows=-50, # Error in second table + columns=[ColumnDefinition(name="order_id", type="int", primary=True)] + ), + "products": TableDefinition( + number_of_rows=200, + columns=[] # Error: no columns + ) + } + ) + + with pytest.raises(ValueError) as exc_info: + spec.validate() + + error_msg = str(exc_info.value) + # Should find errors in both tables + assert "orders" in error_msg + assert "products" in error_msg + + +class TestTargetValidation: + """Tests for output target validation""" + + def test_valid_uc_schema_target(self): + target = UCSchemaTarget(catalog="main", schema_="default") + assert target.catalog == "main" + assert target.schema_ == "default" + + def test_uc_schema_empty_catalog_raises_error(self): + with pytest.raises(ValueError, match="non-empty"): + UCSchemaTarget(catalog="", schema_="default") + + def test_valid_file_path_target(self): + target = FilePathTarget(base_path="/tmp/data", output_format="parquet") + assert target.base_path == "/tmp/data" + assert target.output_format == "parquet" + + def test_file_path_empty_base_path_raises_error(self): + with pytest.raises(ValueError, match="non-empty"): + FilePathTarget(base_path="", output_format="csv") + + def test_file_path_invalid_format_raises_error(self): + with pytest.raises(ValueError): + FilePathTarget(base_path="/tmp/data", output_format="json") + + +class TestValidationIntegration: + """Integration tests for validation""" + + def test_realistic_valid_spec(self): + """Test a realistic, valid specification""" + spec = DatagenSpec( + tables={ + "users": TableDefinition( + number_of_rows=1000, + partitions=4, + columns=[ + ColumnDefinition(name="user_id", type="int", primary=True), + ColumnDefinition(name="username", type="string", options={ + "template": r"\w{8,12}" + }), + ColumnDefinition(name="email", type="string", options={ + "template": r"\w.\w@\w.com" + }), + ColumnDefinition(name="age", type="int", options={ + "min": 18, "max": 99 + }), + ] + ), + "orders": TableDefinition( + number_of_rows=5000, + columns=[ + ColumnDefinition(name="order_id", type="int", primary=True), + ColumnDefinition(name="amount", type="decimal", options={ + "min": 10.0, "max": 1000.0 + }), + ] + ) + }, + output_destination=UCSchemaTarget( + catalog="main", + schema_="synthetic_data" + ), + generator_options={ + "random": True, + "randomSeed": 42 + } + ) + + result = spec.validate(strict=True) + assert result.is_valid() + assert len(result.errors) == 0 + assert len(result.warnings) == 0 \ No newline at end of file