Skip to content

rechunk

NetCDF4Backend

Bases: RechunkingBackendBase

rechunk

rechunk(
    input_filepath: Path,
    output_filepath: Path,
    time: int = None,
    lat: int = None,
    lon: int = None,
) -> None

Rechunk data stored in a NetCDF4 file.

Notes

Text partially quoted from

https://unidata.github.io/netcdf4-python/#netCDF4.Dataset.createVariable :

The function createVariable() available through the netcdf4-python python interface to the netCDF C library, features the optional keyword chunksizes which can be used to manually specify the HDF5 chunk sizes for each dimension of the variable.

A detailed discussion of HDF chunking and I/O performance is available at https://support.hdfgroup.org/HDF5/doc/Advanced/Chunking/. The default chunking scheme in the netcdf-c library is discussed at https://docs.unidata.ucar.edu/nug/current/netcdf_perf_chunking.html.

Basically, the chunk size for each dimension should match as closely as possible the size of the data block that users will read from the file. chunksizes cannot be set if contiguous=True.

Source code in rekx/rechunk.py
def rechunk(
    input_filepath: Path,
    output_filepath: Path,
    time: int = None,
    lat: int = None,
    lon: int = None,
) -> None:
    """Rechunk data stored in a NetCDF4 file.

    Notes
    -----
    Text partially quoted from

    https://unidata.github.io/netcdf4-python/#netCDF4.Dataset.createVariable :

    The function `createVariable()` available through the `netcdf4-python`
    python interface to the netCDF C library, features the optional keyword
    `chunksizes` which can be used to manually specify the HDF5 chunk sizes for
    each dimension of the variable.

    A detailed discussion of HDF chunking and I/O performance is available at
    https://support.hdfgroup.org/HDF5/doc/Advanced/Chunking/. The default
    chunking scheme in the netcdf-c library is discussed at
    https://docs.unidata.ucar.edu/nug/current/netcdf_perf_chunking.html.

    Basically, the chunk size for each dimension should match as closely as
    possible the size of the data block that users will read from the file.
    `chunksizes` cannot be set if `contiguous=True`.
    """
    # Check if any chunking has been requested
    if time is None and lat is None and lon is None:
        logger.info(
            f"No chunking requested for {input_filepath}. Exiting function."
        )
        return

    # logger.info(f"Rechunking of {input_filepath} with chunk sizes: time={time}, lat={lat}, lon={lon}")
    new_chunks = {"time": time, "lat": lat, "lon": lon}
    with nc.Dataset(input_filepath, mode="r") as input_dataset:
        with nc.Dataset(output_filepath, mode="w") as output_dataset:
            for name in input_dataset.ncattrs():
                output_dataset.setncattr(name, input_dataset.getncattr(name))
            for name, dimension in input_dataset.dimensions.items():
                output_dataset.createDimension(
                    name, (len(dimension) if not dimension.isunlimited() else None)
                )
            for name, variable in input_dataset.variables.items():
                # logger.debug(f"Processing variable: {name}")
                if name in new_chunks:
                    chunk_size = new_chunks[name]
                    import dask.array as da

                    if chunk_size is not None:
                        # logger.debug(f"Chunking variable `{name}` with chunk sizes: {chunk_size}")
                        x = da.from_array(
                            variable, chunks=(chunk_size,) * len(variable.shape)
                        )
                        debug(locals())
                        output_dataset.createVariable(
                            name,
                            variable.datatype,
                            variable.dimensions,
                            zlib=True,
                            complevel=4,
                            chunksizes=(chunk_size,) * len(variable.shape),
                        )
                        output_dataset[name].setncatts(input_dataset[name].__dict__)
                        output_dataset[name][:] = x
                    else:
                        # logger.debug(f"No chunk sizes specified for `{name}`, copying as is.")
                        output_dataset.createVariable(
                            name, variable.datatype, variable.dimensions
                        )
                        output_dataset[name].setncatts(input_dataset[name].__dict__)
                        output_dataset[name][:] = variable[:]
                else:
                    # logger.debug(f"Variable `{name}` not in chunking list, copying as is.")
                    output_dataset.createVariable(
                        name, variable.datatype, variable.dimensions
                    )
                    output_dataset[name].setncatts(input_dataset[name].__dict__)
                    output_dataset[name][:] = variable[:]

RechunkingBackend

Bases: str, Enum

default classmethod

default() -> RechunkingBackend

Default rechunking backend to use

Source code in rekx/rechunk.py
@classmethod
def default(cls) -> "RechunkingBackend":
    """Default rechunking backend to use"""
    return cls.nccopy

get_backend

get_backend() -> RechunkingBackendBase

Array type associated to a backend.

Source code in rekx/rechunk.py
def get_backend(self) -> RechunkingBackendBase:
    """Array type associated to a backend."""

    if self.name == "nccopy":
        return nccopyBackend()

    elif self.name == "netcdf4":
        return NetCDF4Backend()

    elif self.name == "xarray":
        return XarrayBackend()

    else:
        raise ValueError(f"No known backend for {self.name}.")

XarrayBackend

Bases: RechunkingBackendBase

rechunk_netcdf_via_xarray

rechunk_netcdf_via_xarray(
    input_filepath: Path,
    output_filepath: Path,
    time: int = None,
    latitude: int = None,
    longitude: int = None,
) -> None

Rechunk a NetCDF dataset and save it to a new file.

Parameters:

Name Type Description Default
input_filepath Path

The path to the input NetCDF file.

required
output_filepath Path

The path to the output NetCDF file where the rechunked dataset will be saved.

required
chunks Dict[str, Union[int, None]]

A dictionary specifying the new chunk sizes for each dimension. Use None for dimensions that should not be chunked.

required

Returns:

Type Description
None

The function saves the rechunked dataset to output_filepath.

Examples:

>>> rechunk_netcdf(Path("input.nc"), Path("output.nc"), {'time': 365, 'lat': 25, 'lon': 25})

Source code in rekx/rechunk.py
def rechunk_netcdf_via_xarray(
    input_filepath: Path,
    output_filepath: Path,
    time: int = None,
    latitude: int = None,
    longitude: int = None,
) -> None:
    """
    Rechunk a NetCDF dataset and save it to a new file.

    Parameters
    ----------
    input_filepath : Path
        The path to the input NetCDF file.
    output_filepath : Path
        The path to the output NetCDF file where the rechunked dataset will be saved.
    chunks : Dict[str, Union[int, None]]
        A dictionary specifying the new chunk sizes for each dimension.
        Use `None` for dimensions that should not be chunked.

    Returns
    -------
    None
        The function saves the rechunked dataset to `output_filepath`.

    Examples
    --------
    # >>> rechunk_netcdf(Path("input.nc"), Path("output.nc"), {'time': 365, 'lat': 25, 'lon': 25})
    """
    dataset = xr.open_dataset(input_filepath)
    chunks = {"time": time, "lat": lat, "lon": lon}
    dataset_rechunked = dataset.chunk(chunks)
    dataset_rechunked.to_netcdf(output_filepath)

nccopyBackend

Bases: RechunkingBackendBase

rechunk

rechunk(
    input: Path,
    variables: List[str],
    output_directory: Path,
    time: Optional[int] = None,
    latitude: Optional[int] = None,
    longitude: Optional[int] = None,
    cache_size: Optional[int] = 16777216,
    cache_elements: Optional[int] = 4133,
    cache_preemption: Optional[float] = 0.75,
    compression: str = "zlib",
    compression_level: int = 4,
    shuffling: bool = None,
    memory: bool = False,
    dry_run: bool = False,
)

Options considered for nccopy : [ ] [-k kind_name] [ ][-kind_code] [x] [-d n] # deflate [x][-s] # shuffling [x] [-c chunkspec] # chunking sizes [ ][-u] [x][-w] # read and process data in-memory, write out in the end [x][-[v|V] var1,...] [ ][-[g|G] grp1,...] [ ] [-m bufsize] [x] [-h chunk_cache] # [x] [-e cache_elems] # Number of elements in cache [ ][-r] [x] infile [x] outfile

Source code in rekx/rechunk.py
def rechunk(
    self,
    input: Path,
    variables: List[str],
    output_directory: Path,
    time: Optional[int] = None,
    latitude: Optional[int] = None,
    longitude: Optional[int] = None,
    cache_size: Optional[int] = 16777216,
    cache_elements: Optional[int] = 4133,
    cache_preemption: Optional[float] = 0.75,
    compression: str = "zlib",
    compression_level: int = 4,
    shuffling: bool = None,
    memory: bool = False,
    dry_run: bool = False,  # return command as a string ?
):  # **kwargs):
    """
    Options considered for ``nccopy`` :
    [ ] [-k kind_name]
    [ ] [-kind_code]
    [x] [-d n]  # deflate
    [x] [-s]  # shuffling
    [x] [-c chunkspec]  # chunking sizes
    [ ] [-u]
    [x] [-w]  # read and process data in-memory, write out in the end
    [x] [-[v|V] var1,...]
    [ ] [-[g|G] grp1,...]
    [ ] [-m bufsize]
    [x] [-h chunk_cache]  #
    [x] [-e cache_elems]  # Number of elements in cache
    [ ] [-r]
    [x] infile
    [x] outfile
    """
    chunking_shape = (
        f"-c time/{time},lat/{latitude},lon/{longitude}"
        if all([time, latitude, longitude])
        else ""
    )
    compression_options = f"-d {compression_level}" if compression == "zlib" else ""
    shuffling_option = f"-s" if shuffling and compression_level > 0 else ""
    # --------------------------------------------------------------------
    cache_size = f"-h {cache_size} " if cache_size else ""  # cache size in bytes
    cache_elements = f"-e {cache_elements}" if cache_elements else ""
    # cache_preemption = f"-e {cache_preemption}" if cache_preemption else ""
    cache_options = cache_size + cache_elements  # + cache_preemption
    memory_option = f"-w" if memory else ""

    # build the command
    command = "nccopy "
    # if variable_option:
    #     variable_option = f"-v {','.join(variables + [XarrayVariableSet.time])}"  # 'time' required
    #     command += f"{variable_option} "
    command += f"{chunking_shape} "
    command += f"{compression_options} "
    command += f"{shuffling_option} "
    command += f"{cache_options} "
    command += f"{memory_option} "
    command += f"{input} "
    output_filename = f"{input.stem}"
    output_filename += f"_{time}"
    output_filename += f"_{latitude}"
    output_filename += f"_{longitude}"
    output_filename += f"_{compression}"
    output_filename += f"_{compression_level}"
    if shuffling and compression_level > 0:
        output_filename += f"_shuffled"
    output_filename += f"{input.suffix}"
    output_directory.mkdir(parents=True, exist_ok=True)
    output_filepath = output_directory / output_filename
    command += f"{output_filepath}"

    if dry_run:
        return command

    else:
        output_directory.mkdir(parents=True, exist_ok=True)
        args = shlex.split(command)
        subprocess.run(args)

generate_rechunk_commands

generate_rechunk_commands(
    input: Optional[Path],
    output: Optional[Path],
    time: int,
    latitude: int,
    longitude: int,
    spatial_symmetry: bool = SPATIAL_SYMMETRY_DEFAULT,
    variable_set: XarrayVariableSet = XarrayVariableSet.all,
    cache_size: int = CACHE_SIZE_DEFAULT,
    cache_elements: int = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: float = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: bool = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dask_scheduler: str = None,
    commands_file: Path = "rechunk_commands.txt",
    dry_run: bool = False,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
)

Generate variations of rechunking commands based on nccopy.

Source code in rekx/rechunk.py
def generate_rechunk_commands(
    input: Annotated[Optional[Path], typer.Argument(help="Input NetCDF file.")],
    output: Annotated[
        Optional[Path], typer.Argument(help="Path to the output NetCDF file.")
    ],
    time: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `time` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    latitude: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `lat` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    longitude: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `lon` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    spatial_symmetry: Annotated[
        bool,
        typer.Option(
            help="Add command only for identical latitude and longitude chunk sizes"
        ),
    ] = SPATIAL_SYMMETRY_DEFAULT,
    variable_set: Annotated[
        XarrayVariableSet, typer.Option(help="Set of Xarray variables to diagnose")
    ] = XarrayVariableSet.all,
    cache_size: Annotated[
        int,
        typer.Option(
            help="Cache size", show_default=True, parser=parse_numerical_option
        ),
    ] = CACHE_SIZE_DEFAULT,
    cache_elements: Annotated[
        int,
        typer.Option(help="Number of elements in cache", parser=parse_numerical_option),
    ] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Annotated[
        float,
        typer.Option(
            help=f"Cache preemption strategy {NOT_IMPLEMENTED_CLI}",
            parser=parse_float_option,
        ),
    ] = CACHE_PREEMPTION_DEFAULT,
    compression: Annotated[
        str, typer.Option(help="Compression filter", parser=parse_compression_filters)
    ] = COMPRESSION_FILTER_DEFAULT,
    compression_level: Annotated[
        int, typer.Option(help="Compression level", parser=parse_numerical_option)
    ] = COMPRESSION_LEVEL_DEFAULT,
    shuffling: Annotated[bool, typer.Option(help=f"Shuffle... ")] = SHUFFLING_DEFAULT,
    memory: Annotated[
        bool, typer.Option(help="Use the -w flag to nccopy")
    ] = RECHUNK_IN_MEMORY_DEFAULT,
    # backend: Annotated[RechunkingBackend, typer.Option(help="Backend to use for rechunking. [code]nccopy[/code] [red]Not Implemented Yet![/red]")] = RechunkingBackend.nccopy,
    dask_scheduler: Annotated[
        str, typer.Option(help="The port:ip of the dask scheduler")
    ] = None,
    commands_file: Path = "rechunk_commands.txt",
    dry_run: Annotated[bool, typer_option_dry_run] = False,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
    """
    Generate variations of rechunking commands based on `nccopy`.
    """
    # Shuffling makes sense only along with compression
    if any([level > 0 for level in compression_level]) and shuffling:
        shuffling = [shuffling, False]
    else:
        shuffling = [False]
    with xr.open_dataset(input, engine="netcdf4") as dataset:
        selected_variables = select_xarray_variable_set_from_dataset(
            XarrayVariableSet, variable_set, dataset
        )
        import itertools

        commands = []
        for (
            chunking_time,
            chunking_latitude,
            chunking_longitude,
            caching_size,
            caching_elements,
            caching_preemption,
            compressing_filter,
            compressing_level,
            shuffling,
        ) in itertools.product(
            time,
            latitude,
            longitude,
            cache_size,
            cache_elements,
            cache_preemption,
            compression,
            compression_level,
            shuffling,
        ):
            backend = RechunkingBackend.nccopy.get_backend()  # hard-coded!
            # Review Me ----------------------------------------------------
            if spatial_symmetry and chunking_latitude != chunking_longitude:
                continue
            else:
                command = backend.rechunk(
                    input=input,
                    variables=list(selected_variables),
                    output_directory=output,
                    time=chunking_time,
                    latitude=chunking_latitude,
                    longitude=chunking_longitude,
                    cache_size=caching_size,
                    cache_elements=caching_elements,
                    cache_preemption=caching_preemption,
                    compression=compressing_filter,
                    compression_level=compressing_level,
                    shuffling=shuffling,
                    memory=memory,
                    dry_run=True,  # just return the command!
                )
                if not command in commands:
                    commands.append(command)

    commands_file = Path(
        commands_file.stem + "_for_" + input.stem + commands_file.suffix
    )
    if verbose:
        print(
            f"[bold]Writing generated commands into[/bold] [code]{commands_file}[/code]"
        )
        for command in commands:
            print(f"{command}")

    if not dry_run:
        with open(commands_file, "w") as f:
            for command in commands:
                f.write(command + "\n")

generate_rechunk_commands_for_multiple_netcdf

generate_rechunk_commands_for_multiple_netcdf(
    file_paths: List[Path],
    output: Optional[Path],
    time: int,
    latitude: int,
    longitude: int,
    spatial_symmetry: bool = SPATIAL_SYMMETRY_DEFAULT,
    variable_set: XarrayVariableSet = XarrayVariableSet.all,
    cache_size: int = CACHE_SIZE_DEFAULT,
    cache_elements: int = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: float = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: bool = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dask_scheduler: str = None,
    commands_file: Path = "rechunk_commands.txt",
    dry_run: bool = False,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
)

Generate variations of rechunking commands based on nccopy.

Source code in rekx/rechunk.py
def generate_rechunk_commands_for_multiple_netcdf(
    file_paths: Annotated[List[Path], typer.Argument(help="Input NetCDF files.")],
    output: Annotated[
        Optional[Path], typer.Argument(help="Path to the output NetCDF file.")
    ],
    time: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `time` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    latitude: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `lat` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    longitude: Annotated[
        int,
        typer.Option(
            help="New chunk size for the `lon` dimension.",
            parser=parse_numerical_option,
        ),
    ],
    spatial_symmetry: Annotated[
        bool,
        typer.Option(
            help="Add command only for identical latitude and longitude chunk sizes"
        ),
    ] = SPATIAL_SYMMETRY_DEFAULT,
    variable_set: Annotated[
        XarrayVariableSet, typer.Option(help="Set of Xarray variables to diagnose")
    ] = XarrayVariableSet.all,
    cache_size: Annotated[
        int,
        typer.Option(
            help="Cache size", show_default=True, parser=parse_numerical_option
        ),
    ] = CACHE_SIZE_DEFAULT,
    cache_elements: Annotated[
        int,
        typer.Option(help="Number of elements in cache", parser=parse_numerical_option),
    ] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Annotated[
        float,
        typer.Option(
            help=f"Cache preemption strategy {NOT_IMPLEMENTED_CLI}",
            parser=parse_float_option,
        ),
    ] = CACHE_PREEMPTION_DEFAULT,
    compression: Annotated[
        str, typer.Option(help="Compression filter", parser=parse_compression_filters)
    ] = COMPRESSION_FILTER_DEFAULT,
    compression_level: Annotated[
        int, typer.Option(help="Compression level", parser=parse_numerical_option)
    ] = COMPRESSION_LEVEL_DEFAULT,
    shuffling: Annotated[
        bool,
        typer.Option(
            help=f"Shuffle... [reverse bold orange] Testing [/reverse bold orange]"
        ),
    ] = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    # backend: Annotated[RechunkingBackend, typer.Option(help="Backend to use for rechunking. [code]nccopy[/code] [red]Not Implemented Yet![/red]")] = RechunkingBackend.nccopy,
    dask_scheduler: Annotated[
        str, typer.Option(help="The port:ip of the dask scheduler")
    ] = None,
    commands_file: Path = "rechunk_commands.txt",
    dry_run: Annotated[bool, typer_option_dry_run] = False,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
    """
    Generate variations of rechunking commands based on `nccopy`.
    """
    command_series = {}
    with ProcessPoolExecutor() as executor:
        futures = [
            executor.submit(
                generate_rechunk_commands,
                file_path,  # input
                output,
                time,
                latitude,
                longitude,
                spatial_symmetry,
                variable_set,
                cache_size,
                cache_elements,
                cache_preemption,
                compression,
                compression_level,
                shuffling,
                memory,
                dask_scheduler,
                commands_file,
                dry_run,
                verbose,
            )
            for file_path in file_paths
        ]
        for future in as_completed(futures):
            try:
                future.result()

            except Exception as e:
                logger.error(f"Error processing : {e}")

modify_chunk_size

modify_chunk_size(netcdf_file, variable, chunk_size)

Modify the chunk size of a variable in a NetCDF file.

Parameters: - nc_file: path to the NetCDF file - variable_name: name of the variable to modify - new_chunk_size: tuple specifying the new chunk size, e.g., (2600, 2600)

Source code in rekx/rechunk.py
def modify_chunk_size(
    netcdf_file,
    variable,
    chunk_size,
):
    """
    Modify the chunk size of a variable in a NetCDF file.

    Parameters:
    - nc_file: path to the NetCDF file
    - variable_name: name of the variable to modify
    - new_chunk_size: tuple specifying the new chunk size, e.g., (2600, 2600)
    """
    with nc.Dataset(netcdf_file, "r+") as dataset:
        variable = dataset.variables[variable]

        if variable.chunking() != [None]:
            variable.set_auto_chunking(chunk_size)
            print(
                f"Modified chunk size for variable '{variable}' in file '{netcdf_file}' to {chunk_size}."
            )

        else:
            print(
                f"Variable '{variable}' in file '{netcdf_file}' is not chunked. Skipping."
            )

rechunk

rechunk(
    input: Optional[Path],
    output_directory: Optional[Path],
    time: int,
    latitude: int,
    longitude: int,
    variable_set: XarrayVariableSet = XarrayVariableSet.all,
    cache_size: Optional[int] = CACHE_SIZE_DEFAULT,
    cache_elements: Optional[int] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Optional[
        float
    ] = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: str = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dry_run: bool = DRY_RUN_DEFAULT,
    backend: RechunkingBackend = RechunkingBackend.nccopy,
    dask_scheduler: str = None,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
)

Rechunk a NetCDF4 dataset with options to fine tune the output

Source code in rekx/rechunk.py
def rechunk(
    input: Annotated[Optional[Path], typer.Argument(help="Input NetCDF file.")],
    output_directory: Annotated[
        Optional[Path], typer.Argument(help="Path to the output NetCDF file.")
    ],
    time: Annotated[int, typer.Option(help="New chunk size for the `time` dimension.")],
    latitude: Annotated[
        int, typer.Option(help="New chunk size for the `lat` dimension.")
    ],
    longitude: Annotated[
        int, typer.Option(help="New chunk size for the `lon` dimension.")
    ],
    variable_set: Annotated[
        XarrayVariableSet, typer.Option(help="Set of Xarray variables to diagnose")
    ] = XarrayVariableSet.all,
    cache_size: Optional[int] = CACHE_SIZE_DEFAULT,
    cache_elements: Optional[int] = CACHE_ELEMENTS_DEFAULT,
    cache_preemption: Optional[float] = CACHE_PREEMPTION_DEFAULT,
    compression: str = COMPRESSION_FILTER_DEFAULT,
    compression_level: int = COMPRESSION_LEVEL_DEFAULT,
    shuffling: str = SHUFFLING_DEFAULT,
    memory: bool = RECHUNK_IN_MEMORY_DEFAULT,
    dry_run: Annotated[bool, typer_option_dry_run] = DRY_RUN_DEFAULT,
    backend: Annotated[
        RechunkingBackend,
        typer.Option(
            help="Backend to use for rechunking. [code]nccopy[/code] [red]Not Implemented Yet![/red]"
        ),
    ] = RechunkingBackend.nccopy,
    dask_scheduler: Annotated[
        str, typer.Option(help="The port:ip of the dask scheduler")
    ] = None,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
    """
    Rechunk a NetCDF4 dataset with options to fine tune the output
    """
    if verbose:
        import time as timer

        rechunking_timer_start = timer.time()

    # if dask_scheduler:
    #     from dask.distributed import Client
    #     client = Client(dask_scheduler)
    #     typer.echo(f"Using Dask scheduler at {dask_scheduler}")

    with xr.open_dataset(input, engine="netcdf4") as dataset:
        # with Dataset(input, 'r') as dataset:
        selected_variables = select_xarray_variable_set_from_dataset(
            XarrayVariableSet, variable_set, dataset
        )
        rechunk_parameters = {
            "input": input,
            "variables": selected_variables,
            "output_directory": output_directory,
            "time": time,
            "latitude": latitude,
            "longitude": longitude,
            "cache_size": cache_size,
            "cache_elements": cache_elements,
            "cache_preemption": cache_preemption,
            "shuffling": shuffling,
            "compression": compression,
            "compression_level": compression_level,
            "memory": memory,
        }
        backend = backend.get_backend()
        command = backend.rechunk(**rechunk_parameters, dry_run=dry_run)
        if dry_run:
            print(
                f"[bold]Dry run[/bold] the [bold]following command that would be executed[/bold]:"
            )
            print(f"    {command}")
            # print(f"    {rechunk_parameters}")
            return  # Exit for a dry run

        else:
            command_arguments = shlex.split(command)
            try:
                subprocess.run(command_arguments, check=True)
                print(f"Command {command} executed successfully.")
            except subprocess.CalledProcessError as e:
                print(f"An error occurred while executing the command: {e}")

        if verbose:
            rechunking_timer_end = timer.time()
            elapsed_time = rechunking_timer_end - rechunking_timer_start
            logger.debug(f"Rechunking via {backend} took {elapsed_time:.2f} seconds")
            print(f"Rechunking took {elapsed_time:.2f} seconds.")