Skip to content

rechunk

Classes:

Name Description
NetCDF4Backend
RechunkingBackend
XarrayBackend
nccopyBackend

Functions:

Name Description
generate_rechunk_commands

Generate variations of rechunking commands based on nccopy.

generate_rechunk_commands_for_multiple_netcdf

Generate variations of rechunking commands based on nccopy.

modify_chunk_size

Modify the chunk size of a variable in a NetCDF file.

rechunk

Rechunk a NetCDF4 dataset with options to fine tune the output

NetCDF4Backend

Bases: RechunkingBackendBase

Methods:

Name Description
rechunk

Rechunk data stored in a NetCDF4 file.

rechunk

rechunk(
    input_filepath: Path,
    output_filepath: Path,
    time: int = None,
    lat: int = None,
    lon: int = None,
) -> None

Rechunk data stored in a NetCDF4 file.

Notes

Text partially quoted from

https://unidata.github.io/netcdf4-python/#netCDF4.Dataset.createVariable :

The function createVariable() available through the netcdf4-python python interface to the netCDF C library, features the optional keyword chunksizes which can be used to manually specify the HDF5 chunk sizes for each dimension of the variable.

A detailed discussion of HDF chunking and I/O performance is available at https://support.hdfgroup.org/HDF5/doc/Advanced/Chunking/. The default chunking scheme in the netcdf-c library is discussed at https://docs.unidata.ucar.edu/nug/current/netcdf_perf_chunking.html.

Basically, the chunk size for each dimension should match as closely as possible the size of the data block that users will read from the file. chunksizes cannot be set if contiguous=True.

Source code in rekx/rechunk.py
def rechunk(
    input_filepath: Path,
    output_filepath: Path,
    time: int = None,
    lat: int = None,
    lon: int = None,
) -> None:
    """Rechunk data stored in a NetCDF4 file.

    Notes
    -----
    Text partially quoted from

    https://unidata.github.io/netcdf4-python/#netCDF4.Dataset.createVariable :

    The function `createVariable()` available through the `netcdf4-python`
    python interface to the netCDF C library, features the optional keyword
    `chunksizes` which can be used to manually specify the HDF5 chunk sizes for
    each dimension of the variable.

    A detailed discussion of HDF chunking and I/O performance is available at
    https://support.hdfgroup.org/HDF5/doc/Advanced/Chunking/. The default
    chunking scheme in the netcdf-c library is discussed at
    https://docs.unidata.ucar.edu/nug/current/netcdf_perf_chunking.html.

    Basically, the chunk size for each dimension should match as closely as
    possible the size of the data block that users will read from the file.
    `chunksizes` cannot be set if `contiguous=True`.
    """
    # Check if any chunking has been requested
    if time is None and lat is None and lon is None:
        logger.info(
            f"No chunking requested for {input_filepath}. Exiting function."
        )
        return

    # logger.info(f"Rechunking of {input_filepath} with chunk sizes: time={time}, lat={lat}, lon={lon}")
    new_chunks = {"time": time, "lat": lat, "lon": lon}
    with nc.Dataset(input_filepath, mode="r") as input_dataset:
        with nc.Dataset(output_filepath, mode="w") as output_dataset:
            for name in input_dataset.ncattrs():
                output_dataset.setncattr(name, input_dataset.getncattr(name))
            for name, dimension in input_dataset.dimensions.items():
                output_dataset.createDimension(
                    name, (len(dimension) if not dimension.isunlimited() else None)
                )
            for name, variable in input_dataset.variables.items():
                # logger.debug(f"Processing variable: {name}")
                if name in new_chunks:
                    chunk_size = new_chunks[name]
                    import dask.array as da

                    if chunk_size is not None:
                        # logger.debug(f"Chunking variable `{name}` with chunk sizes: {chunk_size}")
                        x = da.from_array(
                            variable, chunks=(chunk_size,) * len(variable.shape)
                        )
                        output_dataset.createVariable(
                            name,
                            variable.datatype,
                            variable.dimensions,
                            zlib=True,
                            complevel=4,
                            chunksizes=(chunk_size,) * len(variable.shape),
                        )
                        output_dataset[name].setncatts(input_dataset[name].__dict__)
                        output_dataset[name][:] = x
                    else:
                        # logger.debug(f"No chunk sizes specified for `{name}`, copying as is.")
                        output_dataset.createVariable(
                            name, variable.datatype, variable.dimensions
                        )
                        output_dataset[name].setncatts(input_dataset[name].__dict__)
                        output_dataset[name][:] = variable[:]
                else:
                    # logger.debug(f"Variable `{name}` not in chunking list, copying as is.")
                    output_dataset.createVariable(
                        name, variable.datatype, variable.dimensions
                    )
                    output_dataset[name].setncatts(input_dataset[name].__dict__)
                    output_dataset[name][:] = variable[:]

RechunkingBackend

Bases: str, Enum

Methods:

Name Description
default

Default rechunking backend to use

get_backend

Array type associated to a backend.

default classmethod

default() -> RechunkingBackend

Default rechunking backend to use

Source code in rekx/rechunk.py
@classmethod
def default(cls) -> "RechunkingBackend":
    """Default rechunking backend to use"""
    return cls.nccopy

get_backend

get_backend() -> RechunkingBackendBase

Array type associated to a backend.

Source code in rekx/rechunk.py
def get_backend(self) -> RechunkingBackendBase:
    """Array type associated to a backend."""

    if self.name == "nccopy":
        return nccopyBackend()

    elif self.name == "netcdf4":
        return NetCDF4Backend()

    elif self.name == "xarray":
        return XarrayBackend()

    else:
        raise ValueError(f"No known backend for {self.name}.")

XarrayBackend

Bases: RechunkingBackendBase

Methods:

Name Description
rechunk_netcdf_via_xarray

Rechunk a NetCDF dataset and save it to a new file.

rechunk_netcdf_via_xarray

rechunk_netcdf_via_xarray(
    input_filepath: Path,
    output_filepath: Path,
    time: int = None,
    latitude: int = None,
    longitude: int = None,
) -> None

Rechunk a NetCDF dataset and save it to a new file.

Parameters:

Name Type Description Default
input_filepath Path

The path to the input NetCDF file.

required
output_filepath Path

The path to the output NetCDF file where the rechunked dataset will be saved.

required
chunks Dict[str, Union[int, None]]

A dictionary specifying the new chunk sizes for each dimension. Use None for dimensions that should not be chunked.

required

Returns:

Type Description
None

The function saves the rechunked dataset to output_filepath.

Examples:

>>> rechunk_netcdf(Path("input.nc"), Path("output.nc"), {'time': 365, 'lat': 25, 'lon': 25})

Source code in rekx/rechunk.py
def rechunk_netcdf_via_xarray(
    input_filepath: Path,
    output_filepath: Path,
    time: int = None,
    latitude: int = None,
    longitude: int = None,
) -> None:
    """
    Rechunk a NetCDF dataset and save it to a new file.

    Parameters
    ----------
    input_filepath : Path
        The path to the input NetCDF file.
    output_filepath : Path
        The path to the output NetCDF file where the rechunked dataset will be saved.
    chunks : Dict[str, Union[int, None]]
        A dictionary specifying the new chunk sizes for each dimension.
        Use `None` for dimensions that should not be chunked.

    Returns
    -------
    None
        The function saves the rechunked dataset to `output_filepath`.

    Examples
    --------
    # >>> rechunk_netcdf(Path("input.nc"), Path("output.nc"), {'time': 365, 'lat': 25, 'lon': 25})
    """
    dataset = xr.open_dataset(input_filepath)
    chunks = {"time": time, "lat": lat, "lon": lon}
    dataset_rechunked = dataset.chunk(chunks)
    dataset_rechunked.to_netcdf(output_filepath)

nccopyBackend

Bases: RechunkingBackendBase

Methods:

Name Description
rechunk

Options considered for nccopy :

rechunk

rechunk(
    input_filepath: Path,
    variables: List[str],
    output_directory: Path,
    time: Optional[int] = None,
    latitude: Optional[int] = None,
    longitude: Optional[int] = None,
    fix_unlimited_dimensions: bool = False,
    cache_size: Optional[int] = CACHE_SIZE_DEFAULT,
    cache_elements: Optional[int] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Optional[
        float
    ] = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: bool = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dry_run: bool = False,
)

Options considered for nccopy : [ ][-k kind_name] [ ][-kind_code] [x][-d n] # deflate [x][-s] # shuffling [x][-c chunkspec] # chunking sizes [x][-u] Convert unlimited size input dimensions to fixed size output dimensions. May speed up variable-at-a-time access, but slow down record-at-a-time access. [x][-w] # read and process data in-memory, write out in the end [x][-[v|V] var1,...] [ ][-[g|G] grp1,...] [ ][-m bufsize] [x][-h chunk_cache] # [x][-e cache_elems] # Number of elements in cache [ ][-r] [x] infile [x] outfile

Source code in rekx/rechunk.py
def rechunk(
    self,
    input_filepath: Path,
    variables: List[str],
    output_directory: Path,
    time: Optional[int] = None,
    latitude: Optional[int] = None,
    longitude: Optional[int] = None,
    fix_unlimited_dimensions: bool = False,
    cache_size: Optional[int] = CACHE_SIZE_DEFAULT,
    cache_elements: Optional[int] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Optional[float] = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: bool = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dry_run: bool = False,  # return command as a string ?
):  # **kwargs):
    """
    Options considered for ``nccopy`` :
    [ ] [-k kind_name]
    [ ] [-kind_code]
    [x] [-d n]  # deflate
    [x] [-s]  # shuffling
    [x] [-c chunkspec]  # chunking sizes
    [x] [-u] Convert unlimited size input dimensions to fixed size output dimensions. May speed up variable-at-a-time access, but slow down record-at-a-time access.
    [x] [-w]  # read and process data in-memory, write out in the end
    [x] [-[v|V] var1,...]
    [ ] [-[g|G] grp1,...]
    [ ] [-m bufsize]
    [x] [-h chunk_cache]  #
    [x] [-e cache_elems]  # Number of elements in cache
    [ ] [-r]
    [x] infile
    [x] outfile
    """
    variable_option = f"-v {','.join(variables + [XarrayVariableSet.time])}" if variables else "" # 'time' required
    chunking_shape = (
        f"-c time/{time},lat/{latitude},lon/{longitude}"
        if all([time, latitude, longitude])
        else ""
    )
    fixing_unlimited_dimensions = f"-u" if fix_unlimited_dimensions else ""
    compression_options = f"-d {compression_level}" if compression == "zlib" else ""
    shuffling_option = f"-s" if shuffling and compression_level > 0 else ""
    cache_size_option = f"-h {cache_size} " if cache_size else ""  # cache size in bytes
    cache_elements_option = f"-e {cache_elements}" if cache_elements else ""
    memory_option = f"-w" if memory else ""

    # Collect all non-empty options into a list
    options = [
        variable_option,
        chunking_shape,
        fixing_unlimited_dimensions,
        compression_options,
        shuffling_option,
        cache_size_option,
        cache_elements_option,
        memory_option,
        input_filepath,
    ]
    # Build the command by joining non-empty options
    command = "nccopy " + " ".join(filter(bool, options))

    # Build the output file path
    output_filename = f"{Path(input_filepath).stem}"
    output_filename += f"_{time}"
    output_filename += f"_{latitude}"
    output_filename += f"_{longitude}"
    output_filename += f"_{compression}"
    output_filename += f"_{compression_level}"
    if shuffling and compression_level > 0:
        output_filename += f"_shuffled"
    output_filename += f"{Path(input_filepath).suffix}"
    output_directory.mkdir(parents=True, exist_ok=True)
    output_filepath = output_directory / output_filename
    command += f"{output_filepath}"

    if dry_run:
        return command

    else:
        args = shlex.split(command)
        subprocess.run(args)

generate_rechunk_commands

generate_rechunk_commands(
    input_filepath: Path,
    output: Path | None,
    time: int | None,
    latitude: int | None,
    longitude: int | None,
    fix_unlimited_dimensions: bool = FIX_UNLIMITED_DIMENSIONS_DEFAULT,
    spatial_symmetry: bool = SPATIAL_SYMMETRY_DEFAULT,
    variable_set: XarrayVariableSet = all,
    cache_size: int = CACHE_SIZE_DEFAULT,
    cache_elements: int = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: float = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: bool = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dask_scheduler: str = None,
    commands_file: Path = "rechunk_commands.txt",
    dry_run: bool = False,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
)

Generate variations of rechunking commands based on nccopy.

Source code in rekx/rechunk.py
def generate_rechunk_commands(
    input_filepath: Annotated[Path, typer.Argument(help="Input NetCDF file.")],
    output: Annotated[
        Path | None, typer.Argument(help="Path to the output NetCDF file.")
    ],
    time: Annotated[
        int | None,
        typer.Option(
            help="New chunk size for the `time` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    latitude: Annotated[
        int | None,
        typer.Option(
            help="New chunk size for the `lat` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    longitude: Annotated[
        int | None,
        typer.Option(
            help="New chunk size for the `lon` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    fix_unlimited_dimensions: Annotated[
        bool,
        typer.Option(
            help="Convert unlimited size input dimensions to fixed size dimensions in output."
        ),
    ] = FIX_UNLIMITED_DIMENSIONS_DEFAULT,
    spatial_symmetry: Annotated[
        bool,
        typer.Option(
            help="Add command only for identical latitude and longitude chunk sizes"
        ),
    ] = SPATIAL_SYMMETRY_DEFAULT,
    variable_set: Annotated[
        XarrayVariableSet, typer.Option(help="Set of Xarray variables to diagnose")
    ] = XarrayVariableSet.all,
    cache_size: Annotated[
        int,
        typer.Option(
            help="Cache size", show_default=True, parser=parse_numerical_option
        ),
    ] = CACHE_SIZE_DEFAULT,
    cache_elements: Annotated[
        int,
        typer.Option(help="Number of elements in cache", parser=parse_numerical_option),
    ] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Annotated[
        float,
        typer.Option(
            help=f"Cache preemption strategy {NOT_IMPLEMENTED_CLI}",
            parser=parse_float_option,
        ),
    ] = CACHE_PREEMPTION_DEFAULT,
    compression: Annotated[
        str, typer.Option(help="Compression filter", parser=parse_compression_filters)
    ] = COMPRESSION_FILTER_DEFAULT,
    compression_level: Annotated[
        int, typer.Option(help="Compression level", parser=parse_numerical_option)
    ] = COMPRESSION_LEVEL_DEFAULT,
    shuffling: Annotated[bool, typer.Option(help=f"Shuffle... ")] = SHUFFLING_DEFAULT,
    memory: Annotated[
        bool, typer.Option(help="Use the -w flag to nccopy")
    ] = RECHUNK_IN_MEMORY_DEFAULT,
    # backend: Annotated[RechunkingBackend, typer.Option(help="Backend to use for rechunking. [code]nccopy[/code] [red]Not Implemented Yet![/red]")] = RechunkingBackend.nccopy,
    dask_scheduler: Annotated[
        str, typer.Option(help="The port:ip of the dask scheduler")
    ] = None,
    commands_file: Path = "rechunk_commands.txt",
    dry_run: Annotated[bool, typer_option_dry_run] = False,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
    """
    Generate variations of rechunking commands based on `nccopy`.
    """
    # Shuffling makes sense only along with compression
    if any([level > 0 for level in compression_level]) and shuffling:
        shuffling = [shuffling, False]
    else:
        shuffling = [False]
    with xr.open_dataset(input_filepath, engine="netcdf4") as dataset:
        selected_variables = select_xarray_variable_set_from_dataset(
            XarrayVariableSet, variable_set, dataset
        )
        import itertools

        commands = []
        for (
            chunking_time,
            chunking_latitude,
            chunking_longitude,
            caching_size,
            caching_elements,
            caching_preemption,
            compressing_filter,
            compressing_level,
            shuffling,
        ) in itertools.product(
            time,
            latitude,
            longitude,
            cache_size,
            cache_elements,
            cache_preemption,
            compression,
            compression_level,
            shuffling,
        ):
            backend = RechunkingBackend.nccopy.get_backend()  # hard-coded!
            # Review Me ----------------------------------------------------
            if spatial_symmetry and chunking_latitude != chunking_longitude:
                continue
            else:
                command = backend.rechunk(
                    input_filepath=input_filepath,
                    variables=list(selected_variables),
                    output_directory=output,
                    time=chunking_time,
                    latitude=chunking_latitude,
                    longitude=chunking_longitude,
                    fix_unlimited_dimensions=fix_unlimited_dimensions,
                    cache_size=caching_size,
                    cache_elements=caching_elements,
                    cache_preemption=caching_preemption,
                    compression=compressing_filter,
                    compression_level=compressing_level,
                    shuffling=shuffling,
                    memory=memory,
                    dry_run=True,  # just return the command!
                )
                if not command in commands:
                    commands.append(command)

    commands_file = Path(
        commands_file.stem + "_for_" + Path(input_filepath).stem + commands_file.suffix
    )
    if verbose:
        print(
            f"[bold]Writing generated commands into[/bold] [code]{commands_file}[/code]"
        )
        for command in commands:
            print(f" [green]>[/green] [code dim]{command}[/code dim]")

    if not dry_run:
        with open(commands_file, "w") as f:
            for command in commands:
                f.write(command + "\n")

generate_rechunk_commands_for_multiple_netcdf

generate_rechunk_commands_for_multiple_netcdf(
    source_path: Path,
    time: int,
    latitude: int,
    longitude: int,
    fix_unlimited_dimensions: bool = FIX_UNLIMITED_DIMENSIONS_DEFAULT,
    pattern: str = "*.nc",
    output_directory: Path = Path("."),
    spatial_symmetry: bool = SPATIAL_SYMMETRY_DEFAULT,
    variable_set: XarrayVariableSet = all,
    cache_size: int = CACHE_SIZE_DEFAULT,
    cache_elements: int = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: float = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: bool = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dask_scheduler: str = None,
    commands_file: Path = "rechunk_commands.txt",
    workers: int = 4,
    dry_run: bool = False,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
)

Generate variations of rechunking commands based on nccopy.

Source code in rekx/rechunk.py
def generate_rechunk_commands_for_multiple_netcdf(
    source_path: Annotated[Path, typer_argument_source_path_with_pattern],
    time: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `time` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    latitude: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `lat` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    longitude: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `lon` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    fix_unlimited_dimensions: Annotated[
        bool,
        typer.Option(
            help="Convert unlimited size input dimensions to fixed size dimensions in output."
        ),
    ] = FIX_UNLIMITED_DIMENSIONS_DEFAULT,
    pattern: Annotated[str, typer_option_filename_pattern] = "*.nc",
    output_directory: Annotated[Path, typer_option_output_directory] = Path('.'),
    spatial_symmetry: Annotated[
        bool,
        typer.Option(
            help="Add command only for identical latitude and longitude chunk sizes"
        ),
    ] = SPATIAL_SYMMETRY_DEFAULT,
    variable_set: Annotated[
        XarrayVariableSet, typer.Option(help="Set of Xarray variables to diagnose")
    ] = XarrayVariableSet.all,
    cache_size: Annotated[
        int,
        typer.Option(
            help="Cache size", show_default=True, parser=parse_numerical_option
        ),
    ] = CACHE_SIZE_DEFAULT,
    cache_elements: Annotated[
        int,
        typer.Option(help="Number of elements in cache", parser=parse_numerical_option),
    ] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Annotated[
        float,
        typer.Option(
            help=f"Cache preemption strategy {NOT_IMPLEMENTED_CLI}",
            parser=parse_float_option,
        ),
    ] = CACHE_PREEMPTION_DEFAULT,
    compression: Annotated[
        str, typer.Option(help="Compression filter", parser=parse_compression_filters)
    ] = COMPRESSION_FILTER_DEFAULT,
    compression_level: Annotated[
        int, typer.Option(help="Compression level", parser=parse_numerical_option)
    ] = COMPRESSION_LEVEL_DEFAULT,
    shuffling: Annotated[
        bool,
        typer.Option(
            help=f"Shuffle... [reverse bold orange] Testing [/reverse bold orange]"
        ),
    ] = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    # backend: Annotated[RechunkingBackend, typer.Option(help="Backend to use for rechunking. [code]nccopy[/code] [red]Not Implemented Yet![/red]")] = RechunkingBackend.nccopy,
    dask_scheduler: Annotated[
        str, typer.Option(help="The port:ip of the dask scheduler")
    ] = None,
    commands_file: Path = "rechunk_commands.txt",
    workers: Annotated[int, typer.Option(help="Number of worker processes.")] = 4,
    dry_run: Annotated[bool, typer_option_dry_run] = False,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
    """
    Generate variations of rechunking commands based on `nccopy`.
    """
    input_file_paths = []
    if source_path.is_file():
        input_file_paths.append(source_path)
        print(f"[green]Identified the file in question![/green]")

    elif source_path.is_dir():
        input_file_paths = list(str(path) for path in source_path.glob(pattern))

    else:
        print(f'Something is wrong with the [code]source_path[/code] input.')
        return

    if not list(input_file_paths):
        print(
            f"No files found in [code]{source_path}[/code] matching the pattern [code]{pattern}[/code]!"
        )
        return

    if dry_run:
        print(f"[bold]Dry running operations that would be performed[/bold]:")
        print(f"> Reading files in [code]{source_path}[/code] matching the pattern [code]{pattern}[/code]")
        print(f"> Number of files matched : {len(list(input_file_paths))}")
        print(f"> Writing rechunking commands in [code]{commands_file}[/code]")
        return  # Exit for a dry run

    if input_file_paths and not output_directory.exists():
        output_directory.mkdir(parents=True, exist_ok=True)
        if verbose > 0:
            print(f"[yellow]Convenience action[/yellow] : creating the requested output directory [code]{output_directory}[/code].")
    with multiprocessing.Pool(processes=workers) as pool:
        partial_generate_rechunk_commands = partial(
                generate_rechunk_commands,
                output=output_directory,
                time=time,
                latitude=latitude,
                longitude=longitude,
                fix_unlimited_dimensions=fix_unlimited_dimensions,
                spatial_symmetry=spatial_symmetry,
                variable_set=variable_set,
                cache_size=cache_size,
                cache_elements=cache_elements,
                cache_preemption=cache_preemption,
                compression=compression,
                compression_level=compression_level,
                shuffling=shuffling,
                memory=memory,
                dask_scheduler=dask_scheduler,
                commands_file=commands_file,
                dry_run=dry_run,
                verbose=verbose,
        )
        pool.map(partial_generate_rechunk_commands, input_file_paths)
    if verbose:
        print(f"[bold green]Done![/bold green]")

modify_chunk_size

modify_chunk_size(netcdf_file, variable, chunk_size)

Modify the chunk size of a variable in a NetCDF file.

Parameters: - nc_file: path to the NetCDF file - variable_name: name of the variable to modify - new_chunk_size: tuple specifying the new chunk size, e.g., (2600, 2600)

Source code in rekx/rechunk.py
def modify_chunk_size(
    netcdf_file,
    variable,
    chunk_size,
):
    """
    Modify the chunk size of a variable in a NetCDF file.

    Parameters:
    - nc_file: path to the NetCDF file
    - variable_name: name of the variable to modify
    - new_chunk_size: tuple specifying the new chunk size, e.g., (2600, 2600)
    """
    with nc.Dataset(netcdf_file, "r+") as dataset:
        variable = dataset.variables[variable]

        if variable.chunking() != [None]:
            variable.set_auto_chunking(chunk_size)
            print(
                f"Modified chunk size for variable '{variable}' in file '{netcdf_file}' to {chunk_size}."
            )

        else:
            print(
                f"Variable '{variable}' in file '{netcdf_file}' is not chunked. Skipping."
            )

rechunk

rechunk(
    input_filepath: Path,
    output_directory: Optional[Path],
    time: int,
    latitude: int,
    longitude: int,
    fix_unlimited_dimensions: bool = FIX_UNLIMITED_DIMENSIONS_DEFAULT,
    variable_set: XarrayVariableSet = all,
    cache_size: Optional[int] = CACHE_SIZE_DEFAULT,
    cache_elements: Optional[int] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Optional[
        float
    ] = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: str = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dry_run: bool = DRY_RUN_DEFAULT,
    backend: RechunkingBackend = nccopy,
    dask_scheduler: str = None,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
)

Rechunk a NetCDF4 dataset with options to fine tune the output

Source code in rekx/rechunk.py
def rechunk(
    input_filepath: Annotated[Path, typer.Argument(help="Input NetCDF file.")],
    output_directory: Annotated[
        Optional[Path], typer.Argument(help="Path to the output NetCDF file.")
    ],
    time: Annotated[int, typer.Option(help="New chunk size for the `time` dimension.")],
    latitude: Annotated[
        int, typer.Option(help="New chunk size for the `lat` dimension.")
    ],
    longitude: Annotated[
        int, typer.Option(help="New chunk size for the `lon` dimension.")
    ],
    fix_unlimited_dimensions: Annotated[
        bool, typer.Option(help="Convert unlimited size input dimensions to fixed size dimensions in output.")
    ] = FIX_UNLIMITED_DIMENSIONS_DEFAULT,
    variable_set: Annotated[
        XarrayVariableSet, typer.Option(help="Set of Xarray variables to diagnose")
    ] = XarrayVariableSet.all,
    cache_size: Optional[int] = CACHE_SIZE_DEFAULT,
    cache_elements: Optional[int] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Optional[float] = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: str = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dry_run: Annotated[bool, typer_option_dry_run] = DRY_RUN_DEFAULT,
    backend: Annotated[
        RechunkingBackend,
        typer.Option(
            help="Backend to use for rechunking. [code]nccopy[/code] [red]Not Implemented Yet![/red]"
        ),
    ] = RechunkingBackend.nccopy,
    dask_scheduler: Annotated[
        str, typer.Option(help="The port:ip of the dask scheduler")
    ] = None,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
    """
    Rechunk a NetCDF4 dataset with options to fine tune the output
    """
    if verbose:
        import time as timer

        rechunking_timer_start = timer.time()

    # if dask_scheduler:
    #     from dask.distributed import Client
    #     client = Client(dask_scheduler)
    #     typer.echo(f"Using Dask scheduler at {dask_scheduler}")

    with xr.open_dataset(input_filepath, engine="netcdf4") as dataset:
        # with Dataset(input, 'r') as dataset:
        selected_variables = select_xarray_variable_set_from_dataset(
            XarrayVariableSet, variable_set, dataset
        )
        rechunk_parameters = {
            "input": input,
            "variables": selected_variables,
            "output_directory": output_directory,
            "time": time,
            "latitude": latitude,
            "longitude": longitude,
            "fix_unlimited_dimensions": fix_unlimited_dimensions,
            "cache_size": cache_size,
            "cache_elements": cache_elements,
            "cache_preemption": cache_preemption,
            "shuffling": shuffling,
            "compression": compression,
            "compression_level": compression_level,
            "memory": memory,
        }
        backend = backend.get_backend()
        command = backend.rechunk(**rechunk_parameters, dry_run=dry_run)
        if dry_run:
            print(
                f"[bold]Dry run[/bold] the [bold]following command that would be executed[/bold]:"
            )
            print(f"    {command}")
            # print(f"    {rechunk_parameters}")
            return  # Exit for a dry run

        else:
            command_arguments = shlex.split(command)
            try:
                subprocess.run(command_arguments, check=True)
                print(f"Command {command} executed successfully.")
            except subprocess.CalledProcessError as e:
                print(f"An error occurred while executing the command: {e}")

        if verbose:
            rechunking_timer_end = timer.time()
            elapsed_time = rechunking_timer_end - rechunking_timer_start
            logger.debug(f"Rechunking via {backend} took {elapsed_time:.2f} seconds")
            print(f"Rechunking took {elapsed_time:.2f} seconds.")