diff --git a/dbldatagen/column_generation_spec.py b/dbldatagen/column_generation_spec.py index 62482b95..9e4a95a2 100644 --- a/dbldatagen/column_generation_spec.py +++ b/dbldatagen/column_generation_spec.py @@ -8,7 +8,10 @@ import copy import logging +import random +from datetime import date, datetime, timedelta +import numpy as np from pyspark.sql.functions import col, pandas_udf from pyspark.sql.functions import lit, concat, rand, round as sql_round, array, expr, when, udf, \ format_string @@ -27,7 +30,7 @@ from .nrange import NRange from .serialization import SerializableToDict from .text_generators import TemplateGenerator -from .utils import ensure, coalesce_values +from .utils import ensure, coalesce_values, parse_time_interval from .schema_parser import SchemaParser HASH_COMPUTE_METHOD = "hash" @@ -518,6 +521,130 @@ def _setupTemporaryColumns(self): 'description': desc})) self._weightedBaseColumn = temp_name + def _list_random_unique_numeric_values( + self, + unique_count: int, + min_val: int | float | date | datetime | str, + max_val: int | float | date | datetime | str, + step_val: int | float | timedelta | str + ) -> None: + """ + Builds a list of random unique numeric values when ``uniqueValues`` is specified and ``random=True``. + + This creates an internal omitted column with a list of randomly selected unique values from the specified range, + then sets up the main column to select from this list using a random index. + + :param unique_count: Number of unique values to generate + :param min_val: Minimum value of the range + :param max_val: Maximum value of the range + :param step_val: Step value for the range + """ + if self._randomSeed is not None and self._randomSeed != -1: + self._set_random_seed() + + selected_values = set() + while len(selected_values) < unique_count: + if self.distribution and isinstance(self.distribution, DataDistribution): + raw_value = np.clip(self.distribution.generateNormalizedDistributionSample(), 0, 1) + else: + raw_value = random.random() + + range_size = (max_val - min_val) / step_val + if not isinstance(min_val, float) and not isinstance(max_val, float) and not isinstance(step_val, float): + range_size = range_size + 1 + + scaled_index = int(raw_value * range_size) + value = np.clip(min_val + scaled_index * step_val, min_val, max_val) + selected_values.add(value) + + selected_values = list(selected_values) + if len(selected_values) < unique_count: + self.logger.warning( + f"Could not generate {unique_count} unique values for column {self.name}; " + f"Generated {len(selected_values)} unique values" + ) + + self.values = selected_values + self.logger.info( + f"Set up random unique values for column {self.name}: {len(selected_values)} values using " + f"{'distribution' if self.distribution else 'uniform'} sampling" + ) + + def _list_random_unique_datetime_values( + self, + unique_count: int, + begin_val: date | datetime | str, + end_val: date | datetime | str, + interval_val: timedelta | str, + col_type: DataType | str + ) -> None: + """ + Builds a list of random unique date/timestamp values when ``uniqueValues`` is specified and ``random=True``. + + :param unique_count: Number of unique values to generate + :param begin_val: Beginning date/timestamp + :param end_val: End date/timestamp + :param interval_val: Date/time interval + :param col_type: Type of column to generate (e.g. ``DateType`` or ``TimestampType``) + """ + if isinstance(interval_val, str): + interval_val = parse_time_interval(interval_val) + if isinstance(begin_val, str): + if isinstance(col_type, TimestampType): + begin_val = datetime.strptime(begin_val, DateRange.DEFAULT_UTC_TS_FORMAT) + else: + begin_val = datetime.strptime(begin_val, DateRange.DEFAULT_DATE_FORMAT) + if isinstance(end_val, str): + if isinstance(col_type, TimestampType): + end_val = datetime.strptime(end_val, DateRange.DEFAULT_UTC_TS_FORMAT) + else: + end_val = datetime.strptime(end_val, DateRange.DEFAULT_DATE_FORMAT) + if isinstance(col_type, DateType): + begin_val = begin_val.date() + end_val = end_val.date() + + total_span = end_val - begin_val + if isinstance(total_span, timedelta): + total_seconds = total_span.total_seconds() + interval_seconds = interval_val.total_seconds() + num_possible_values = int(total_seconds / interval_seconds) + 1 + else: + total_days = total_span.days + interval_days = interval_val.days + num_possible_values = int(total_days / interval_days) + 1 + + unique_count = min(unique_count, num_possible_values) + + if self._randomSeed is not None and self._randomSeed != -1: + self._set_random_seed() + + selected_values = set() + while len(selected_values) < unique_count: + if self.distribution and isinstance(self.distribution, DataDistribution): + raw_value = np.clip(self.distribution.generateNormalizedDistributionSample(), 0, 1) + else: + raw_value = random.random() + + scaled_index = int(raw_value * (num_possible_values - 1)) + value = begin_val + interval_val * scaled_index + + if value > end_val: + value = end_val + selected_values.add(value) + + selected_values = list(selected_values) + if len(selected_values) < unique_count: + self.logger.warning( + f"Could not generate {unique_count} unique values for column {self.name}; " + f"Generated {len(selected_values)} unique values" + ) + + self.values = selected_values + self.logger.info( + f"Set up random unique values for column {self.name}: {len(selected_values)} values using " + f"{'distribution' if self.distribution else 'uniform'} sampling" + ) + def _setup_logger(self): """Set up logging @@ -553,12 +680,12 @@ def _computeAdjustedNumericRangeForColumn(self, colType, c_min, c_max, c_step, * - if a datarange is specified , use that range - if begin and end are specified or minValue and maxValue are specified, use that - if unique values is specified, compute minValue and maxValue depending on type - + - if unique values and random=True are both specified, generate random unique values from full range """ if c_unique is not None: assert type(c_unique) is int, "unique_values must be integer" assert c_unique >= 1, "if supplied, unique values must be > 0" - # TODO: set maxValue to unique_values + minValue & add unit test + effective_min, effective_max, effective_step = None, None, None if c_range is not None and type(c_range) is NRange: effective_min = c_range.minValue @@ -568,19 +695,27 @@ def _computeAdjustedNumericRangeForColumn(self, colType, c_min, c_max, c_step, * effective_step = coalesce_values(effective_step, c_step, 1) effective_max = coalesce_values(effective_max, c_max) - # due to floating point errors in some Python floating point calculations, we need to apply rounding - # if any of the components are float - if type(effective_min) is float or type(effective_step) is float: - unique_max = round(c_unique * effective_step + effective_min - effective_step, 9) + # Check if both uniqueValues and random=True are specified + if self.random and effective_max is not None: + # Generate random unique values from the full range and store them + self._list_random_unique_numeric_values(c_unique, effective_min, effective_max, effective_step) + # Create a range that maps to indices of the unique values (0 to unique_count-1) + result = NRange(0, c_unique - 1, 1) else: - unique_max = c_unique * effective_step + effective_min - effective_step - result = NRange(effective_min, unique_max, effective_step) - - if result.maxValue is not None and effective_max is not None and result.maxValue > effective_max: - self.logger.warning("Computed maxValue for column [%s] of %s is greater than specified maxValue %s", - self.name, - result.maxValue, - effective_max) + # Original behavior: create sequential range + # due to floating point errors in some Python floating point calculations, we need to apply rounding + # if any of the components are float + if type(effective_min) is float or type(effective_step) is float: + unique_max = round(c_unique * effective_step + effective_min - effective_step, 9) + else: + unique_max = c_unique * effective_step + effective_min - effective_step + result = NRange(effective_min, unique_max, effective_step) + + if result.maxValue is not None and effective_max is not None and result.maxValue > effective_max: + self.logger.warning("Computed maxValue for column [%s] of %s is greater than specified maxValue %s", + self.name, + result.maxValue, + effective_max) elif c_range is not None: result = c_range elif c_range is None: @@ -607,10 +742,21 @@ def _computeAdjustedDateTimeRangeForColumn(self, colType, c_begin, c_end, c_inte effective_end = coalesce_values(effective_end, c_end) effective_begin = coalesce_values(effective_begin, c_begin) - if type(colType) is DateType: - result = DateRange.computeDateRange(effective_begin, effective_end, effective_interval, c_unique) + # Check if both uniqueValues and random=True are specified for date/timestamp + if c_unique is not None and self.random and effective_end is not None: + # Generate random unique date/timestamp values from the full range + self._list_random_unique_datetime_values(c_unique, effective_begin, effective_end, effective_interval, colType) + # Return a minimal range - the actual values will come from discrete values + if type(colType) is DateType: + result = DateRange.computeDateRange(effective_begin, effective_begin, effective_interval, 1) + else: + result = DateRange.computeTimestampRange(effective_begin, effective_begin, effective_interval, 1) else: - result = DateRange.computeTimestampRange(effective_begin, effective_end, effective_interval, c_unique) + # Original behavior + if type(colType) is DateType: + result = DateRange.computeDateRange(effective_begin, effective_end, effective_interval, c_unique) + else: + result = DateRange.computeTimestampRange(effective_begin, effective_end, effective_interval, c_unique) self.logger.debug("Computing adjusted range for column: %s - %s", self.name, result) return result @@ -1322,3 +1468,11 @@ def makeGenerationExpressions(self): retval = F.slice(retval, F.lit(1), F.expr(expr_str)) return retval + + def _set_random_seed(self) -> None: + """ + Sets the random seed value for computing random values from a range. + """ + seed_value = abs(self._randomSeed) % (2**32) # Numpy accepts values in the range from 0 - 2^32-1. + random.seed(seed_value) + np.random.seed(seed_value) diff --git a/tests/test_ranged_values_and_dates.py b/tests/test_ranged_values_and_dates.py index a1d66693..114f235d 100644 --- a/tests/test_ranged_values_and_dates.py +++ b/tests/test_ranged_values_and_dates.py @@ -984,3 +984,233 @@ def test_ranged_data_string5(self): s1_expected_values = [f"testing {x:05} >>" for x in [1.5, 1.8, 2.1, 2.4]] s1_values = [r[0] for r in results.select("s1").distinct().collect()] self.assertSetEqual(set(s1_expected_values), set(s1_values)) + + def test_unique_values_random_integers(self): + test_df = ( + dg.DataGenerator( + sparkSession=spark, + name="test_data_set1", + rows=10000, + partitions=4, + randomSeedMethod="fixed", + randomSeed=24 + ) + .withIdOutput() + .withColumn("val1", "int", minValue=1, maxValue=100, uniqueValues=10, random=True) + .withColumn("val2", "int", minValue=1, maxValue=100, uniqueValues=10, random=False) + .build() + ) + + # Ensure that we get exactly 10 unique values for both columns: + count_unique_val1 = test_df.select("val1").distinct().count() + count_unique_val2 = test_df.select("val2").distinct().count() + self.assertEqual(count_unique_val1, 10) + self.assertEqual(count_unique_val2, 10) + + # Get the generated unique values: + unique_set_val1 = {r[0] for r in test_df.select("val1").distinct().collect()} + unique_set_val2 = {r[0] for r in test_df.select("val2").distinct().collect()} + expected_sequence = set(range(1, 11)) + + # Ensure that the expected values for val1 are not generated in sequence: + self.assertNotEqual(unique_set_val1, expected_sequence) + + # Ensure that the non-random values for val2 are generated in sequence: + self.assertEqual(unique_set_val2, expected_sequence) + + # Ensure that all values are generated in the provided range: + self.assertTrue(all(1 <= v <= 100 for v in unique_set_val1)) + self.assertTrue(all(1 <= v <= 100 for v in unique_set_val2)) + + def test_unique_values_random_floats(self): + test_df = ( + dg.DataGenerator( + sparkSession=spark, + name="test_data_set1", + rows=5000, + partitions=4, + randomSeedMethod="fixed", + randomSeed=42 + ) + .withIdOutput() + .withColumn("val1", "float", minValue=1.0, maxValue=10.0, step=0.5, uniqueValues=10, random=True) + .withColumn("val2", "float", minValue=1.0, maxValue=10.0, step=0.5, uniqueValues=10, random=False) + .build() + ) + + # Ensure that we get exactly 10 unique values for both columns: + count_unique_val1 = test_df.select("val1").distinct().count() + count_unique_val2 = test_df.select("val2").distinct().count() + self.assertEqual(count_unique_val1, 10) + self.assertEqual(count_unique_val2, 10) + + # Get the generated unique values: + unique_set_val1 = {r[0] for r in test_df.select("val1").distinct().collect()} + unique_set_val2 = {r[0] for r in test_df.select("val2").distinct().collect()} + expected_sequence = {1.0 + i * 0.5 for i in range(10)} + + # Ensure that the expected values for val1 are not generated in sequence: + self.assertNotEqual(unique_set_val1, expected_sequence) + + # Ensure that the non-random values for val2 are generated in sequence: + self.assertEqual(unique_set_val2, expected_sequence) + + # Ensure that all values are generated in the provided range: + self.assertTrue(all(1.0 <= v <= 10.0 for v in unique_set_val1)) + self.assertTrue(all(1.0 <= v <= 10.0 for v in unique_set_val2)) + + def test_unique_values_random_dates(self): + test_df = ( + dg.DataGenerator( + sparkSession=spark, + name="test_data_set1", + rows=5000, + partitions=4, + randomSeedMethod="fixed", + randomSeed=456 + ) + .withIdOutput() + .withColumn( + "val1", + "date", + begin="2020-01-01", + end="2020-01-31", + interval="1 day", + uniqueValues=10, + random=True + ) + .withColumn( + "val2", + "date", + begin="2020-01-01", + end="2020-01-31", + interval="1 day", + uniqueValues=10, + random=False + ) + .build() + ) + + # Ensure that we get exactly 10 unique values for both columns: + count_unique_val1 = test_df.select("val1").distinct().count() + count_unique_val2 = test_df.select("val2").distinct().count() + self.assertEqual(count_unique_val1, 10) + self.assertEqual(count_unique_val2, 10) + + # Get the generated unique values: + unique_set_val1 = {r[0] for r in test_df.select("val1").distinct().collect()} + unique_set_val2 = {r[0] for r in test_df.select("val2").distinct().collect()} + end_date = date(2020, 1, 31) + expected_sequence = {end_date - timedelta(days=j-1) for j in range(10, 0, -1)} + + # Ensure that the expected values for val1 are not generated in sequence: + self.assertNotEqual(unique_set_val1, expected_sequence) + + # Ensure that the non-random values for val2 are generated in sequence: + self.assertEqual(unique_set_val2, expected_sequence) + + # Ensure that all values are generated in the provided range: + start_date = date(2020, 1, 1) + end_date = date(2020, 1, 31) + self.assertTrue(all(start_date <= v <= end_date for v in unique_set_val1)) + self.assertTrue(all(start_date <= v <= end_date for v in unique_set_val2)) + + def test_unique_values_random_timestamps(self): + test_df = ( + dg.DataGenerator( + sparkSession=spark, + name="test_data_set1", + rows=3000, + partitions=4, + randomSeedMethod="fixed", + randomSeed=789 + ) + .withIdOutput() + .withColumn( + "val1", + "timestamp", + begin="2020-01-01 00:00:00", + end="2020-01-01 23:59:59", + interval="1 hour", + uniqueValues=10, + random=True + ) + .withColumn( + "val2", + "timestamp", + begin="2020-01-01 00:00:00", + end="2020-01-01 23:59:59", + interval="1 hour", + uniqueValues=10, + random=False + ) + .build() + ) + + # Ensure that we get exactly 10 unique values for both columns: + count_unique_val1 = test_df.select("val1").distinct().count() + count_unique_val2 = test_df.select("val2").distinct().count() + self.assertEqual(count_unique_val1, 10) + self.assertEqual(count_unique_val2, 10) + + # Get the generated unique values: + unique_set_val1 = {r[0] for r in test_df.select("val1").distinct().collect()} + unique_set_val2 = {r[0] for r in test_df.select("val2").distinct().collect()} + end_time = datetime(2020, 1, 1, 23, 59, 59) + expected_sequence = {end_time - timedelta(hours=j-1) for j in range(10, 0, -1)} + + # Ensure that the expected values for val1 are not generated in sequence: + self.assertNotEqual(unique_set_val1, expected_sequence) + + # Ensure that the non-random values for val2 are generated in sequence: + self.assertEqual(unique_set_val2, expected_sequence) + + # Ensure that all values are generated in the provided range: + start_ts = datetime(2020, 1, 1, 0, 0, 0) + end_ts = datetime(2020, 1, 1, 23, 59, 59) + self.assertTrue(all(start_ts <= v <= end_ts for v in unique_set_val1)) + self.assertTrue(all(start_ts <= v <= end_ts for v in unique_set_val2)) + + def test_unique_values_random_reproducible(self): + test_gen = ( + dg.DataGenerator( + sparkSession=spark, + name="test_data_set1", + rows=1000, + partitions=2, + randomSeedMethod="fixed", + randomSeed=999 + ) + .withIdOutput() + .withColumn("val", "int", minValue=1, maxValue=50, uniqueValues=8, random=True) + ) + + # Generate data using the same random seed: + test_df1 = test_gen.build() + test_df2 = test_gen.build() + + # Get the unique generated values: + unique_set_val1 = {r[0] for r in test_df1.select("val").distinct().collect()} + unique_set_val2 = {r[0] for r in test_df2.select("val").distinct().collect()} + + # Ensure the same unique values are generated using the same random seed: + self.assertEqual(unique_set_val1, unique_set_val2) + + # Generate data with a new random seed: + test_gen_new_seed = ( + dg.DataGenerator( + sparkSession=spark, + name="test_data_set1", + rows=1000, + partitions=2, + randomSeedMethod="fixed", + randomSeed=42 + ) + .withIdOutput() + .withColumn("val", "int", minValue=1, maxValue=50, uniqueValues=8, random=True) + ) + test_df3 = test_gen_new_seed.build() + unique_set_val3 = {r[0] for r in test_df3.select("val").distinct().collect()} + + # Ensure different unique values are generated using the new random seed: + self.assertNotEqual(unique_set_val1, unique_set_val3)