Source code for dask.dataframe.io.demo

from __future__ import annotations

import re
import string
from dataclasses import asdict, dataclass, field
from typing import Any, Callable, cast

import numpy as np
import pandas as pd

from dask.dataframe._compat import PANDAS_GE_220, PANDAS_GE_300
from dask.dataframe._pyarrow import is_object_string_dtype
from dask.dataframe.core import tokenize
from dask.dataframe.io.utils import DataFrameIOFunction
from dask.utils import random_state_data

__all__ = [
    "make_timeseries",
    "with_spec",
    "ColumnSpec",
    "RangeIndexSpec",
    "DatetimeIndexSpec",
    "DatasetSpec",
]

default_int_args: dict[str, tuple[tuple[Any, ...], dict[str, Any]]] = {
    "poisson": ((), {"lam": 1000}),
    "normal": ((), {"scale": 1000}),
    "uniform": ((), {"high": 1000}),
    "binomial": ((1000, 0.5), {}),
    "random": ((0,), {"high": 1000}),
}


[docs]@dataclass class ColumnSpec: """Encapsulates properties of a family of columns with the same dtype. Different method can be specified for integer dtype ("poisson", "uniform", "binomial", etc.) Notes ----- This API is still experimental, and will likely change in the future""" prefix: str | None = None """Column prefix. If not specified, will default to str(dtype)""" dtype: str | type | None = None """Column data type. Only supports numpy dtypes""" number: int = 1 """How many columns to create with these properties. Default 1. If more than one columns are specified, they will be numbered: "int1", "int2", etc.""" nunique: int | None = None # number of unique categories """For a "category" column, how many unique categories to generate""" choices: list = field(default_factory=list) """For a "category" or str column, list of possible values""" low: int | None = None """Start value for an int column. Optional if random=True, since ``randint`` doesn't accept high and low.""" high: int | None = None """For an int column, high end of range""" length: int | None = None """For a str or "category" column with random=True, how large a string to generate""" random: bool = False """For an int column, whether to use ``randint``. For a string column produces a random string of specified ``length``""" method: str | None = None """For an int column, method to use when generating the value, such as "poisson", "uniform", "binomial". Default "poisson". Delegates to the same method of ``RandomState``""" args: tuple[Any, ...] = field(default_factory=tuple) """Args to pass into the method""" kwargs: dict[str, Any] = field(default_factory=dict) """Any other kwargs to pass into the method"""
[docs]@dataclass class RangeIndexSpec: """Properties of the dataframe RangeIndex Notes ----- This API is still experimental, and will likely change in the future""" dtype: str | type = int """Index dtype""" step: int = 1 """Step for a RangeIndex"""
[docs]@dataclass class DatetimeIndexSpec: """Properties of the dataframe DatetimeIndex Notes ----- This API is still experimental, and will likely change in the future""" dtype: str | type = int """Index dtype""" start: str | None = None """First value of the index""" freq: str = "1H" """Frequency for the index ("1H", "1D", etc.)""" partition_freq: str | None = None """Partition frequency ("1D", "1M", etc.)"""
[docs]@dataclass class DatasetSpec: """Defines a dataset with random data, such as which columns and data types to generate Notes ----- This API is still experimental, and will likely change in the future""" npartitions: int = 1 """How many partitions generate in the dataframe. If the dataframe has a DatetimeIndex, specify its ``partition_freq`` instead""" nrecords: int = 1000 """Total number of records to generate""" index_spec: RangeIndexSpec | DatetimeIndexSpec = field( default_factory=RangeIndexSpec ) """Properties of the index""" column_specs: list[ColumnSpec] = field(default_factory=list) """List of column definitions"""
def make_float(n, rstate, random=False, **kwargs): kwargs.pop("dtype", None) kwargs.pop("args", None) if random: return rstate.random(size=n, **kwargs) return rstate.rand(n) * 2 - 1 def make_int( n: int, rstate: Any, random: bool = False, dtype: str | type = int, method: str | Callable = "poisson", args: tuple[Any, ...] = (), **kwargs, ): def _with_defaults(_method): handler_args, handler_kwargs = default_int_args.get(_method, ((), {})) handler_kwargs = handler_kwargs.copy() handler_kwargs.update(**kwargs) handler_args = args if args else handler_args return handler_args, handler_kwargs if random: handler_args, handler_kwargs = _with_defaults("random") if "low" in handler_kwargs: handler_args = () data = rstate.randint(*handler_args, size=n, **handler_kwargs) else: if isinstance(method, str): # "poisson", "binomial", etc. handler_args, handler_kwargs = _with_defaults(method) handler = getattr(rstate, method) data = handler(*handler_args, size=n, **handler_kwargs) else: # method is a Callable data = method(*args, state=rstate, size=n, **kwargs) return data names = [ "Alice", "Bob", "Charlie", "Dan", "Edith", "Frank", "George", "Hannah", "Ingrid", "Jerry", "Kevin", "Laura", "Michael", "Norbert", "Oliver", "Patricia", "Quinn", "Ray", "Sarah", "Tim", "Ursula", "Victor", "Wendy", "Xavier", "Yvonne", "Zelda", ] def make_random_string(n, rstate, length: int = 25) -> list[str]: choices = list(string.ascii_letters + string.digits + string.punctuation + " ") return ["".join(rstate.choice(choices, size=length)) for _ in range(n)] def make_string(n, rstate, choices=None, random=False, length=None, **kwargs): kwargs.pop("args", None) if random: return make_random_string(n, rstate, length=length) choices = choices or names return rstate.choice(choices, size=n) def make_categorical(n, rstate, choices=None, nunique=None, **kwargs): kwargs.pop("args", None) if nunique is not None: cat_len = len(str(nunique)) choices = [str(x + 1).zfill(cat_len) for x in range(nunique)] else: choices = choices or names return pd.Categorical.from_codes(rstate.randint(0, len(choices), size=n), choices) make: dict[type | str, Callable] = { float: make_float, int: make_int, str: make_string, object: make_string, "string[python]": make_string, "string[pyarrow]": make_string, "category": make_categorical, "int8": make_int, "int16": make_int, "int32": make_int, "int64": make_int, "float8": make_float, "float16": make_float, "float32": make_float, "float64": make_float, } class MakeDataframePart(DataFrameIOFunction): """ Wrapper Class for ``make_dataframe_part`` Makes a timeseries partition. """ def __init__(self, index_dtype, dtypes, kwargs, columns=None): self.index_dtype = index_dtype self._columns = columns or list(dtypes.keys()) self.dtypes = dtypes self.kwargs = kwargs @property def columns(self): return self._columns def project_columns(self, columns): """Return a new MakeTimeseriesPart object with a sub-column projection. """ if columns == self.columns: return self return MakeDataframePart( self.index_dtype, self.dtypes, self.kwargs, columns=columns, ) def __call__(self, part): divisions, state_data = part return make_dataframe_part( self.index_dtype, divisions[0], divisions[1], self.dtypes, self.columns, state_data, self.kwargs, ) def make_dataframe_part(index_dtype, start, end, dtypes, columns, state_data, kwargs): state = np.random.RandomState(state_data) if pd.api.types.is_datetime64_any_dtype(index_dtype): # FIXME: tzinfo would be lost in pd.date_range index = pd.date_range( start=start, end=end, freq=kwargs.get("freq"), name="timestamp" ) elif pd.api.types.is_integer_dtype(index_dtype): step = kwargs.get("freq") index = pd.RangeIndex(start=start, stop=end + step, step=step).astype( index_dtype ) else: raise TypeError(f"Unhandled index dtype: {index_dtype}") df = make_partition(columns, dtypes, index, kwargs, state) while df.index[-1] >= end: df = df.iloc[:-1] return df def same_astype(a: str | type, b: str | type): """Same as pandas.api.types.is_dtype_equal, but also returns True for str / object""" return pd.api.types.is_dtype_equal(a, b) or ( is_object_string_dtype(a) and is_object_string_dtype(b) ) def make_partition(columns: list, dtypes: dict[str, type | str], index, kwargs, state): data = {} for k, dt in dtypes.items(): kws = { kk.rsplit("_", 1)[1]: v for kk, v in kwargs.items() if kk.rsplit("_", 1)[0] == k } # Note: we compute data for all dtypes in order, not just those in the output # columns. This ensures the same output given the same state_data, regardless # of whether there is any column projection. # cf. https://github.com/dask/dask/pull/9538#issuecomment-1267461887 result = make[dt](len(index), state, **kws) if k in columns: data[k] = result df = pd.DataFrame(data, index=index, columns=columns) update_dtypes = { k: v for k, v in dtypes.items() if k in columns and not same_astype(v, df[k].dtype) } if update_dtypes: kwargs = {} if PANDAS_GE_300 else {"copy": False} df = df.astype(update_dtypes, **kwargs) return df _ME = "ME" if PANDAS_GE_220 else "M" def make_timeseries( start="2000-01-01", end="2000-12-31", dtypes=None, freq="10s", partition_freq=f"1{_ME}", seed=None, **kwargs, ): """Create timeseries dataframe with random data Parameters ---------- start: datetime (or datetime-like string) Start of time series end: datetime (or datetime-like string) End of time series dtypes: dict (optional) Mapping of column names to types. Valid types include {float, int, str, 'category'} freq: string String like '2s' or '1H' or '12W' for the time series frequency partition_freq: string String like '1M' or '2Y' to divide the dataframe into partitions seed: int (optional) Randomstate seed kwargs: Keywords to pass down to individual column creation functions. Keywords should be prefixed by the column name and then an underscore. Examples -------- >>> import dask.dataframe as dd >>> df = dd.demo.make_timeseries('2000', '2010', ... {'value': float, 'name': str, 'id': int}, ... freq='2h', partition_freq='1D', seed=1) >>> df.head() # doctest: +SKIP id name value 2000-01-01 00:00:00 969 Jerry -0.309014 2000-01-01 02:00:00 1010 Ray -0.760675 2000-01-01 04:00:00 1016 Patricia -0.063261 2000-01-01 06:00:00 960 Charlie 0.788245 2000-01-01 08:00:00 1031 Kevin 0.466002 """ if dtypes is None: dtypes = {"name": str, "id": int, "x": float, "y": float} divisions = list(pd.date_range(start=start, end=end, freq=partition_freq)) npartitions = len(divisions) - 1 if seed is None: # Get random integer seed for each partition. We can # call `random_state_data` in `MakeDataframePart` state_data = np.random.randint(2e9, size=npartitions) else: state_data = random_state_data(npartitions, seed) # Build parts parts = [] for i in range(len(divisions) - 1): parts.append((divisions[i : i + 2], state_data[i])) kwargs["freq"] = freq index_dtype = "datetime64[ns]" meta_start, meta_end = list(pd.date_range(start="2000", freq=freq, periods=2)) from dask.dataframe import _dask_expr_enabled if _dask_expr_enabled(): from dask_expr import from_map k = {} else: from dask.dataframe.io.io import from_map k = {"token": tokenize(start, end, dtypes, freq, partition_freq, state_data)} # Construct the output collection with from_map return from_map( MakeDataframePart(index_dtype, dtypes, kwargs), parts, meta=make_dataframe_part( index_dtype, meta_start, meta_end, dtypes, list(dtypes.keys()), state_data[0], kwargs, ), divisions=divisions, label="make-timeseries", enforce_metadata=False, **k, )
[docs]def with_spec(spec: DatasetSpec, seed: int | None = None): """Generate a random dataset according to provided spec Parameters ---------- spec : DatasetSpec Specify all the parameters of the dataset seed: int (optional) Randomstate seed Notes ----- This API is still experimental, and will likely change in the future Examples -------- >>> from dask.dataframe.io.demo import ColumnSpec, DatasetSpec, with_spec >>> ddf = with_spec( ... DatasetSpec( ... npartitions=10, ... nrecords=10_000, ... column_specs=[ ... ColumnSpec(dtype=int, number=2, prefix="p"), ... ColumnSpec(dtype=int, number=2, prefix="n", method="normal"), ... ColumnSpec(dtype=float, number=2, prefix="f"), ... ColumnSpec(dtype=str, prefix="s", number=2, random=True, length=10), ... ColumnSpec(dtype="category", prefix="c", choices=["Y", "N"]), ... ], ... ), seed=42) >>> ddf.head(10) # doctest: +SKIP p1 p2 n1 n2 f1 f2 s1 s2 c1 0 1002 972 -811 20 0.640846 -0.176875 L#h98#}J`? _8C607/:6e N 1 985 982 -1663 -777 0.790257 0.792796 u:XI3,omoZ w~@ /d)'-@ N 2 947 970 799 -269 0.740869 -0.118413 O$dnwCuq\\ !WtSe+(;#9 Y 3 1003 983 1133 521 -0.987459 0.278154 j+Qr_2{XG& &XV7cy$y1T Y 4 1017 1049 826 5 -0.875667 -0.744359 \4bJ3E-{:o {+jC).?vK+ Y 5 984 1017 -492 -399 0.748181 0.293761 ~zUNHNgD"! yuEkXeVot| Y 6 992 1027 -856 67 -0.125132 -0.234529 j.7z;o]Gc9 g|Fi5*}Y92 Y 7 1011 974 762 -1223 0.471696 0.937935 yT?j~N/-u] JhEB[W-}^$ N 8 984 974 856 74 0.109963 0.367864 _j"&@ i&;/ OYXQ)w{hoH N 9 1030 1001 -792 -262 0.435587 -0.647970 Pmrwl{{|.K 3UTqM$86Sg N """ if len(spec.column_specs) == 0: spec.column_specs = [ ColumnSpec(prefix="i", dtype="int64", low=0, high=1_000_000, random=True), ColumnSpec(prefix="f", dtype=float, random=True), ColumnSpec(prefix="c", dtype="category", choices=["a", "b", "c", "d"]), ColumnSpec(prefix="s", dtype=str), ] columns = [] dtypes = {} partition_freq: str | int | None step: str | int if isinstance(spec.index_spec, DatetimeIndexSpec): start = pd.Timestamp(spec.index_spec.start) step = spec.index_spec.freq partition_freq = spec.index_spec.partition_freq end = pd.Timestamp(spec.index_spec.start) + spec.nrecords * pd.Timedelta(step) divisions = list(pd.date_range(start=start, end=end, freq=partition_freq)) if divisions[-1] < end: divisions.append(end) meta_start, meta_end = start, start + pd.Timedelta(step) elif isinstance(spec.index_spec, RangeIndexSpec): step = spec.index_spec.step partition_freq = spec.nrecords * step // spec.npartitions end = spec.nrecords * step - 1 divisions = list(pd.RangeIndex(0, stop=end, step=partition_freq)) if divisions[-1] < (end + 1): divisions.append(end + 1) meta_start, meta_end = 0, step else: raise ValueError(f"Unhandled index: {spec.index_spec}") kwargs: dict[str, Any] = {"freq": step} for col in spec.column_specs: if col.prefix: prefix = col.prefix elif isinstance(col.dtype, str): prefix = re.sub(r"[^a-zA-Z0-9]", "_", f"{col.dtype}").rstrip("_") elif hasattr(col.dtype, "name"): prefix = col.dtype.name # type: ignore[union-attr] else: prefix = col.dtype.__name__ # type: ignore[union-attr] for i in range(col.number): col_n = i + 1 while (col_name := f"{prefix}{col_n}") in dtypes: col_n = col_n + 1 columns.append(col_name) dtypes[col_name] = col.dtype kwargs.update( { f"{col_name}_{k}": v for k, v in asdict(col).items() if k not in {"prefix", "number", "kwargs"} and v not in (None, []) } ) # set untyped kwargs, if any for kw_name, kw_val in col.kwargs.items(): kwargs[f"{col_name}_{kw_name}"] = kw_val npartitions = len(divisions) - 1 if seed is None: state_data = cast(list[Any], np.random.randint(int(2e9), size=npartitions)) else: state_data = random_state_data(npartitions, seed) parts = [(divisions[i : i + 2], state_data[i]) for i in range(npartitions)] from dask.dataframe import _dask_expr_enabled if _dask_expr_enabled(): from dask_expr import from_map k = {} else: from dask.dataframe.io.io import from_map k = { "token": tokenize( 0, spec.nrecords, dtypes, step, partition_freq, state_data ) } return from_map( MakeDataframePart(spec.index_spec.dtype, dtypes, kwargs, columns=columns), parts, meta=make_dataframe_part( spec.index_spec.dtype, meta_start, meta_end, dtypes, columns, state_data[0], kwargs, ), divisions=divisions, label="make-random", enforce_metadata=False, **k, )