Skip to content

Select

select

read_performance

read_performance(
    time_series: Path,
    variable: str,
    longitude: float,
    latitude: float,
    tolerance: Optional[
        float
    ] = DATASET_SELECT_TOLERANCE_DEFAULT,
    repetitions: int = REPETITIONS_DEFAULT,
) -> str

Count the median time of repeated read and load operations of the time series data over a geographic location from an Xarray-supported file format.

Parameters:

Name Type Description Default
time_series Path

Path to Xarray-supported input file

required
variable str

Name of the variable to query

required
longitude float

The longitude of the location to read data

required
latitude float

The latitude of the location to read data

required
tolerance Optional[float]

Maximum distance between original and new labels for inexact matches. Read Xarray manual on nearest-neighbor-lookups

DATASET_SELECT_TOLERANCE_DEFAULT
repetitions int

Number of times to repeat read operation

REPETITIONS_DEFAULT

Returns:

Name Type Description
data_retrieval_time str

The median time of repeated operations it took to retrieve data over the requested location

Notes

mask_and_scale is always set to False to avoid errors related with decoding timestamps. See also ...

Source code in rekx/select.py
def read_performance(
    time_series: Annotated[Path, typer_argument_time_series],
    variable: Annotated[str, typer.Argument(help="Variable to select data from")],
    longitude: Annotated[float, typer_argument_longitude_in_degrees],
    latitude: Annotated[float, typer_argument_latitude_in_degrees],
    # window: Annotated[int, typer_option_spatial_window_in_degrees] = None,
    tolerance: Annotated[
        Optional[float], typer_option_tolerance
    ] = DATASET_SELECT_TOLERANCE_DEFAULT,
    repetitions: Annotated[int, typer_option_repetitions] = REPETITIONS_DEFAULT,
) -> str:
    """
    Count the median time of repeated read and load operations of the time
    series data over a geographic location from an Xarray-supported file
    format.

    Parameters
    ----------
    time_series:
        Path to Xarray-supported input file
    variable: str
        Name of the variable to query
    longitude: float
        The longitude of the location to read data
    latitude: float
        The latitude of the location to read data
    # window:
    tolerance: float
        Maximum distance between original and new labels for inexact matches.
        Read Xarray manual on nearest-neighbor-lookups
    repetitions: int
        Number of times to repeat read operation

    Returns
    -------
    data_retrieval_time : str
        The median time of repeated operations it took to retrieve data over
        the requested location

    Notes
    -----
    ``mask_and_scale`` is always set to ``False`` to avoid errors related with
    decoding timestamps. See also ...

    """
    from .models import get_file_format

    file_format = get_file_format(time_series)
    open_dataset_options = file_format.open_dataset_options()
    dataset_select_options = file_format.dataset_select_options(tolerance)

    # indexers = set_location_indexers(
    #     data_array=time_series,
    #     longitude=longitude,
    #     latitude=latitude,
    #     verbose=verbose,
    # )
    try:
        timings = []
        for _ in range(repetitions):
            data_retrieval_start_time = timer.perf_counter()
            with xr.open_dataset(str(time_series), **open_dataset_options) as dataset:
                _ = (
                    dataset[variable]
                    .sel(
                        lon=longitude,
                        lat=latitude,
                        method="nearest",
                        **dataset_select_options,
                    )
                    .load()
                )
            timings.append(timer.perf_counter() - data_retrieval_start_time)

        average_data_retrieval_time = sum(timings) / len(timings)
        return f"{average_data_retrieval_time:.3f}"

    except Exception as exception:
        print(
            f"Cannot open [code]{variable}[/code] from [code]{time_series}[/code] via Xarray: {exception}"
        )
        # raise SystemExit(33)
        return "-"

read_performance_cli

read_performance_cli(
    time_series: Path,
    variable: str,
    longitude: float,
    latitude: float,
    tolerance: Optional[
        float
    ] = DATASET_SELECT_TOLERANCE_DEFAULT,
    repetitions: int = REPETITIONS_DEFAULT,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
) -> str

Command line interface to read_performance() to count the time to read and load data over a geographic location from an Xarray-supported file format.

Parameters:

Name Type Description Default
time_series Path

Path to Xarray-supported input file

required
variable str

Name of the variable to query

required
longitude float

The longitude of the location to read data

required
latitude float

The latitude of the location to read data

required
tolerance Optional[float]

Maximum distance between original and new labels for inexact matches. Read Xarray manual on nearest-neighbor-lookups

DATASET_SELECT_TOLERANCE_DEFAULT
repetitions int

Number of times to repeat read operation

REPETITIONS_DEFAULT
verbose int

Verbosity level

VERBOSE_LEVEL_DEFAULT

Returns:

Name Type Description
data_retrieval_time float or None ?

The time it took to retrieve data over the requested location

Notes

mask_and_scale is always set to False to avoid errors related with decoding timestamps.

Source code in rekx/select.py
def read_performance_cli(
    time_series: Annotated[Path, typer_argument_time_series],
    variable: Annotated[str, typer.Argument(help="Variable to select data from")],
    longitude: Annotated[float, typer_argument_longitude_in_degrees],
    latitude: Annotated[float, typer_argument_latitude_in_degrees],
    tolerance: Annotated[
        Optional[float], typer_option_tolerance
    ] = DATASET_SELECT_TOLERANCE_DEFAULT,
    repetitions: Annotated[int, typer_option_repetitions] = REPETITIONS_DEFAULT,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
) -> str:
    """
    Command line interface to `read_performance()` to count the time to read
    and load data over a geographic location from an Xarray-supported file
    format.

    Parameters
    ----------
    time_series:
        Path to Xarray-supported input file
    variable: str
        Name of the variable to query
    longitude: float
        The longitude of the location to read data
    latitude: float
        The latitude of the location to read data
    tolerance: float
        Maximum distance between original and new labels for inexact matches.
        Read Xarray manual on nearest-neighbor-lookups
    repetitions: int
        Number of times to repeat read operation
    verbose: int
        Verbosity level

    Returns
    -------
    data_retrieval_time : float or None ?
        The time it took to retrieve data over the requested location

    Notes
    -----
    ``mask_and_scale`` is always set to ``False`` to avoid errors related with
    decoding timestamps.

    """
    average_data_retrieval_time = read_performance(
        time_series=time_series,
        variable=variable,
        longitude=longitude,
        latitude=latitude,
        tolerance=tolerance,
        repetitions=repetitions,
    )
    if not verbose:
        print(average_data_retrieval_time)
    else:
        print(
            f"[bold green]Data read in memory in[/bold green] : {average_data_retrieval_time} :high_voltage::high_voltage:"
        )

select_fast

select_fast(
    time_series: Path,
    variable: str,
    longitude: float,
    latitude: float,
    time_series_2: Path = None,
    tolerance: Optional[float] = 0.1,
    csv: Path = None,
    tocsv: Path = None,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
)

Bare timing to read data over a location and optionally write comma-separated values.

Parameters:

Name Type Description Default
time_series Path

Path to Xarray-supported input file

required
variable str

Name of the variable to query

required
longitude float

The longitude of the location to read data

required
latitude float

The latitude of the location to read data

required
time_series Path

Path to second Xarray-supported input file

required
tolerance Optional[float]

Maximum distance between original and new labels for inexact matches. Read Xarray manual on nearest-neighbor-lookups

0.1
csv Path

CSV output filename

None
to_csv

CSV output filename (fast implementation from xarray-extras)

required

Returns:

Name Type Description
data_retrieval_time float

An estimation of the time it took to retrieve data over the requested location if no verbosity is asked.

Notes

mask_and_scale is always set to False to avoid errors related with decoding timestamps.

Source code in rekx/select.py
def select_fast(
    time_series: Annotated[Path, typer_argument_time_series],
    variable: Annotated[str, typer.Argument(help="Variable to select data from")],
    longitude: Annotated[float, typer_argument_longitude_in_degrees],
    latitude: Annotated[float, typer_argument_latitude_in_degrees],
    time_series_2: Annotated[Path, typer_option_time_series] = None,
    tolerance: Annotated[
        Optional[float], typer_option_tolerance
    ] = 0.1,  # Customize default if needed
    # in_memory: Annotated[bool, typer_option_in_memory] = False,
    csv: Annotated[Path, typer_option_csv] = None,
    tocsv: Annotated[Path, typer_option_csv] = None,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
    """Bare timing to read data over a location and optionally write
    comma-separated values.

    Parameters
    ----------
    time_series:
        Path to Xarray-supported input file
    variable: str
        Name of the variable to query
    longitude: float
        The longitude of the location to read data
    latitude: float
        The latitude of the location to read data
    time_series:
        Path to second Xarray-supported input file
    tolerance: float
        Maximum distance between original and new labels for inexact matches.
        Read Xarray manual on nearest-neighbor-lookups
    csv:
        CSV output filename
    to_csv:
        CSV output filename (fast implementation from xarray-extras)

    Returns
    -------
    data_retrieval_time : float
        An estimation of the time it took to retrieve data over the requested
        location if no verbosity is asked.
    Notes
    -----
    ``mask_and_scale`` is always set to ``False`` to avoid errors related with
    decoding timestamps.

    """
    try:
        data_retrieval_start_time = timer.perf_counter()  # time()
        series = xr.open_dataset(time_series, mask_and_scale=False)[variable].sel(
            lon=longitude, lat=latitude, method="nearest"
        )
        if time_series_2:
            series_2 = xr.open_dataset(time_series_2, mask_and_scale=False)[
                variable
            ].sel(lon=longitude, lat=latitude, method="nearest")
        if csv:
            series.to_pandas().to_csv(csv)
            if time_series_2:
                series_2.to_pandas().to_csv(csv.name + "2")
        elif tocsv:
            to_csv(
                x=series,
                path=str(tocsv),
            )
            if time_series_2:
                to_csv(x=series_2, path=str(tocsv) + "2")

        data_retrieval_time = f"{timer.perf_counter() - data_retrieval_start_time:.3f}"
        if not verbose:
            return data_retrieval_time
        else:
            print(
                f"[bold green]It worked[/bold green] and took : {data_retrieval_time}"
            )

    except Exception as e:
        print(f"An error occurred: {e}")

select_time_series

select_time_series(
    time_series: Path,
    variable: str,
    longitude: float,
    latitude: float,
    list_variables: bool = False,
    timestamps: Optional[Any] = None,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    time: Optional[int] = None,
    lat: Optional[int] = None,
    lon: Optional[int] = None,
    mask_and_scale: bool = False,
    neighbor_lookup: MethodForInexactMatches = MethodForInexactMatches.nearest,
    tolerance: Optional[float] = 0.1,
    in_memory: bool = False,
    statistics: bool = False,
    csv: Path = None,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
) -> None

Select data using a Kerchunk reference file

Parameters:

Name Type Description Default
time_series Path

Path to Xarray-supported input file

required
variable str

Name of the variable to query

required
longitude float

The longitude of the location to read data

required
latitude float

The latitude of the location to read data

required
list_variables bool

Optional flag to list data variables and exit without doing anything else.

False
timestamps Optional[Any]

A string of properly formatted timestamps to be parsed and use for temporal selection.

None
start_time Optional[datetime]

A start time to generate a temporal selection period

None
end_time Optional[datetime]

An end time for the generation of a temporal selection period

None
time Optional[int]

New chunk size for the 'time' dimension

None
lat Optional[int]

New chunk size for the 'lat' dimension

None
lon Optional[int]

New chunk size for the 'lon' dimension

None
mask_and_scale bool

Flag to apply masking and scaling based on the input metadata

False
neighbor_lookup MethodForInexactMatches

Method to use for inexact matches.

nearest
tolerance Optional[float]

Maximum distance between original and new labels for inexact matches. Read Xarray manual on nearest-neighbor-lookups

0.1
statistics bool

Optional flag to calculate and display summary statistics

False
verbose int

Verbosity level

VERBOSE_LEVEL_DEFAULT
Source code in rekx/select.py
def select_time_series(
    time_series: Path,
    variable: Annotated[str, typer.Argument(..., help="Variable name to select from")],
    longitude: Annotated[float, typer_argument_longitude_in_degrees],
    latitude: Annotated[float, typer_argument_latitude_in_degrees],
    list_variables: Annotated[bool, typer_option_list_variables] = False,
    timestamps: Annotated[Optional[Any], typer_argument_timestamps] = None,
    start_time: Annotated[Optional[datetime], typer_option_start_time] = None,
    end_time: Annotated[Optional[datetime], typer_option_end_time] = None,
    time: Annotated[
        Optional[int], typer.Option(help="New chunk size for the 'time' dimension")
    ] = None,
    lat: Annotated[
        Optional[int], typer.Option(help="New chunk size for the 'lat' dimension")
    ] = None,
    lon: Annotated[
        Optional[int], typer.Option(help="New chunk size for the 'lon' dimension")
    ] = None,
    # convert_longitude_360: Annotated[bool, typer_option_convert_longitude_360] = False,
    mask_and_scale: Annotated[bool, typer_option_mask_and_scale] = False,
    neighbor_lookup: Annotated[
        MethodForInexactMatches, typer_option_neighbor_lookup
    ] = MethodForInexactMatches.nearest,
    tolerance: Annotated[
        Optional[float], typer_option_tolerance
    ] = 0.1,  # Customize default if needed
    in_memory: Annotated[bool, typer_option_in_memory] = False,
    statistics: Annotated[bool, typer_option_statistics] = False,
    csv: Annotated[Path, typer_option_csv] = None,
    # output_filename: Annotated[Path, typer_option_output_filename] = 'series_in',  #Path(),
    # variable_name_as_suffix: Annotated[bool, typer_option_variable_name_as_suffix] = True,
    # rounding_places: Annotated[Optional[int], typer_option_rounding_places] = ROUNDING_PLACES_DEFAULT,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
) -> None:
    """
    Select data using a Kerchunk reference file

    Parameters
    ----------
    time_series:
        Path to Xarray-supported input file
    variable: str
        Name of the variable to query
    longitude: float
        The longitude of the location to read data
    latitude: float
        The latitude of the location to read data
    list_variables: bool
         Optional flag to list data variables and exit without doing anything
         else.
    timestamps: str
        A string of properly formatted timestamps to be parsed and use for
        temporal selection.
    start_time: str
        A start time to generate a temporal selection period
    end_time: str
        An end time for the generation of a temporal selection period
    time: int
        New chunk size for the 'time' dimension
    lat: int
        New chunk size for the 'lat' dimension
    lon: int
        New chunk size for the 'lon' dimension
    mask_and_scale: bool
        Flag to apply masking and scaling based on the input metadata
    neighbor_lookup: str
        Method to use for inexact matches.
    tolerance: float
        Maximum distance between original and new labels for inexact matches.
        Read Xarray manual on nearest-neighbor-lookups
    statistics: bool
        Optional flag to calculate and display summary statistics
    verbose: int
        Verbosity level

    Returns
    -------

    """
    # if convert_longitude_360:
    #     longitude = longitude % 360
    # warn_for_negative_longitude(longitude)

    logger.debug(f"Command context : {typer.Context}")

    data_retrieval_start_time = timer.time()
    logger.debug(f"Starting data retrieval... {data_retrieval_start_time}")

    timer_start = timer.time()
    dataset = xr.open_dataset(
        time_series,
        mask_and_scale=mask_and_scale,
    )  # is a dataset
    timer_end = timer.time()
    logger.debug(
        f"Dataset opening via Xarray took {timer_end - timer_start:.2f} seconds"
    )

    available_variables = list(dataset.data_vars)  # Is there a faster way ?
    if list_variables:
        logger.info(
            f"The dataset contains the following variables : `{available_variables}`."
        )
        print(
            f"The dataset contains the following variables : `{available_variables}`."
        )
        return

    if not variable in available_variables:
        logger.debug(
            f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
        )
        print(
            f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
        )
        raise typer.Exit(code=0)
    else:
        timer_start = timer.time()
        time_series = dataset[variable]
        timer_end = timer.time()
        logger.debug(
            f"Data array variable selection took {timer_end - timer_start:.2f} seconds"
        )

        timer_start = timer.time()
        chunks = {"time": time, "lat": lat, "lon": lon}
        time_series.chunk(chunks=chunks)
        timer_end = timer.time()
        logger.debug(
            f"Data array rechunking took {timer_end - timer_start:.2f} seconds"
        )

    timer_start = timer.time()
    indexers = set_location_indexers(
        data_array=time_series,
        longitude=longitude,
        latitude=latitude,
        verbose=verbose,
    )
    timer_end = timer.time()
    logger.debug(f"Data array indexers : {indexers}")
    logger.debug(
        f"Data array indexers setting took {timer_end - timer_start:.2f} seconds"
    )

    try:
        timer_start = timer.time()
        location_time_series = time_series.sel(
            **indexers,
            method=neighbor_lookup,
            tolerance=tolerance,
        )
        timer_end = timer.time()
        indentation = " " * 4 * 9
        indented_location_time_series = "\n".join(
            indentation + line for line in str(location_time_series).split("\n")
        )
        logger.debug(
            f"Location time series selection :\n{indented_location_time_series}"
        )
        logger.debug(f"Location selection took {timer_end - timer_start:.2f} seconds")

        if in_memory:
            timer_start = timer.time()
            location_time_series.load()  # load into memory for faster ... ?
            timer_end = timer.time()
            logger.debug(
                f"Location selection loading in memory took {timer_end - timer_start:.2f} seconds"
            )

    except Exception as exception:
        logger.error(f"{ERROR_IN_SELECTING_DATA} : {exception}")
        print(f"{ERROR_IN_SELECTING_DATA} : {exception}")
        raise SystemExit(33)
    # ------------------------------------------------------------------------

    if start_time or end_time:
        timestamps = None  # we don't need a timestamp anymore!

        if start_time and not end_time:  # set `end_time` to end of series
            end_time = location_time_series.time.values[-1]

        elif end_time and not start_time:  # set `start_time` to beginning of series
            start_time = location_time_series.time.values[0]

        else:  # Convert `start_time` & `end_time` to the correct string format
            start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
            end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")

        timer_start = timer.time()
        location_time_series = location_time_series.sel(
            time=slice(start_time, end_time)
        )
        timer_end = timer.time()
        logger.debug(
            f"Time slicing with `start_time` and `end_time` took {timer_end - timer_start:.2f} seconds"
        )

    if timestamps is not None and not start_time and not end_time:
        if len(timestamps) == 1:
            start_time = end_time = timestamps[0]

        try:
            timer_start = timer.time()
            location_time_series = location_time_series.sel(
                time=timestamps, method=neighbor_lookup
            )
            timer_end = timer.time()
            logger.debug(
                f"Time selection with `timestamps` took {timer_end - timer_start:.2f} seconds"
            )

        except KeyError:
            print(f"No data found for one or more of the given {timestamps}.")

    if location_time_series.size == 1:
        timer_start = timer.time()
        single_value = float(location_time_series.values)
        warning = (
            f"{exclamation_mark} The selected timestamp "
            + f"{location_time_series.time.values}"
            + f" matches the single value "
            + f"{single_value}"
        )
        timer_end = timer.time()
        logger.debug(
            f"Single value conversion to float took {timer_end - timer_start:.2f} seconds"
        )
        logger.warning(warning)
        if verbose > 0:
            print(warning)

    data_retrieval_end_time = timer.time()
    logger.debug(
        f"Data retrieval took {data_retrieval_end_time - data_retrieval_start_time:.2f} seconds"
    )

    if not verbose:
        print(location_time_series.values)
    else:
        print(location_time_series)

    if statistics:  # after echoing series which might be Long!
        print_series_statistics(
            data_array=location_time_series,
            title="Selected series",
        )
    if csv:
        to_csv(
            x=location_time_series,
            path=csv,
        )

select_time_series_from_json

select_time_series_from_json(
    reference_file: Path,
    variable: str,
    longitude: float,
    latitude: float,
    list_variables: bool = False,
    timestamps: Optional[Any] = None,
    start_time: Optional[datetime] = None,
    end_time: Optional[datetime] = None,
    time: Optional[int] = None,
    lat: Optional[int] = None,
    lon: Optional[int] = None,
    mask_and_scale: bool = False,
    neighbor_lookup: MethodForInexactMatches = None,
    tolerance: Optional[float] = 0.1,
    in_memory: bool = False,
    statistics: bool = False,
    csv: Path = None,
    verbose: int = VERBOSE_LEVEL_DEFAULT,
) -> None

Select data using a Kerchunk reference file

Parameters:

Name Type Description Default
reference_file Path

Path to an input JSON Kerchunk reference file

required
variable str

Name of the variable to query

required
longitude float

The longitude of the location to read data

required
latitude float

The latitude of the location to read data

required
list_variables bool

Optional flag to list data variables and exit without doing anything else.

False
timestamps Optional[Any]

A string of properly formatted timestamps to be parsed and use for temporal selection.

None
start_time Optional[datetime]

A start time to generate a temporal selection period

None
end_time Optional[datetime]

An end time for the generation of a temporal selection period

None
time Optional[int]

New chunk size for the 'time' dimension

None
lat Optional[int]

New chunk size for the 'lat' dimension

None
lon Optional[int]

New chunk size for the 'lon' dimension

None
mask_and_scale bool

Flag to apply masking and scaling based on the input metadata

False
neighbor_lookup MethodForInexactMatches

Method to use for inexact matches.

None
tolerance Optional[float]

Maximum distance between original and new labels for inexact matches. Read Xarray manual on nearest-neighbor-lookups

0.1
in_memory bool

?

False
statistics bool

Optional flag to calculate and display summary statistics

False
csv Path

CSV output filename

None
verbose int

Verbosity level

VERBOSE_LEVEL_DEFAULT
Source code in rekx/select.py
def select_time_series_from_json(
    reference_file: Annotated[
        Path, typer.Argument(..., help="Path to the kerchunk reference file")
    ],
    variable: Annotated[str, typer.Argument(..., help="Variable name to select from")],
    longitude: Annotated[float, typer_argument_longitude_in_degrees],
    latitude: Annotated[float, typer_argument_latitude_in_degrees],
    list_variables: Annotated[bool, typer_option_list_variables] = False,
    timestamps: Annotated[Optional[Any], typer_argument_timestamps] = None,
    start_time: Annotated[Optional[datetime], typer_option_start_time] = None,
    end_time: Annotated[Optional[datetime], typer_option_end_time] = None,
    time: Annotated[
        Optional[int], typer.Option(help="New chunk size for the 'time' dimension")
    ] = None,
    lat: Annotated[
        Optional[int], typer.Option(help="New chunk size for the 'lat' dimension")
    ] = None,
    lon: Annotated[
        Optional[int], typer.Option(help="New chunk size for the 'lon' dimension")
    ] = None,
    # convert_longitude_360: Annotated[bool, typer_option_convert_longitude_360] = False,
    mask_and_scale: Annotated[bool, typer_option_mask_and_scale] = False,
    neighbor_lookup: Annotated[
        MethodForInexactMatches, typer_option_neighbor_lookup
    ] = None,
    tolerance: Annotated[
        Optional[float], typer_option_tolerance
    ] = 0.1,  # Customize default if needed
    in_memory: Annotated[bool, typer_option_in_memory] = False,
    statistics: Annotated[bool, typer_option_statistics] = False,
    csv: Annotated[Path, typer_option_csv] = None,
    # output_filename: Annotated[Path, typer_option_output_filename] = 'series_in',  #Path(),
    # variable_name_as_suffix: Annotated[bool, typer_option_variable_name_as_suffix] = True,
    # rounding_places: Annotated[Optional[int], typer_option_rounding_places] = ROUNDING_PLACES_DEFAULT,
    verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
) -> None:
    """
    Select data using a Kerchunk reference file

    Parameters
    ----------
    reference_file:
        Path to an input JSON Kerchunk reference file
    variable: str
        Name of the variable to query
    longitude: float
        The longitude of the location to read data
    latitude: float
        The latitude of the location to read data
    list_variables: bool
         Optional flag to list data variables and exit without doing anything
         else.
    timestamps: str
        A string of properly formatted timestamps to be parsed and use for
        temporal selection.
    start_time: str
        A start time to generate a temporal selection period
    end_time: str
        An end time for the generation of a temporal selection period
    time: int
        New chunk size for the 'time' dimension
    lat: int
        New chunk size for the 'lat' dimension
    lon: int
        New chunk size for the 'lon' dimension
    mask_and_scale: bool
        Flag to apply masking and scaling based on the input metadata
    neighbor_lookup: str
        Method to use for inexact matches.
    tolerance: float
        Maximum distance between original and new labels for inexact matches.
        Read Xarray manual on nearest-neighbor-lookups
    in_memory: bool
        ?
    statistics: bool
        Optional flag to calculate and display summary statistics
    csv:
        CSV output filename
    verbose: int
        Verbosity level
    """
    # if convert_longitude_360:
    #     longitude = longitude % 360
    # warn_for_negative_longitude(longitude)

    # logger.debug(f'Command context : {print(typer.Context)}')

    data_retrieval_start_time = timer.time()
    logger.debug(f"Starting data retrieval... {data_retrieval_start_time}")

    timer_start = timer.time()
    mapper = fsspec.get_mapper(
        "reference://",
        fo=str(reference_file),
        remote_protocol="file",
        remote_options={"skip_instance_cache": True},
    )
    timer_end = timer.time()
    logger.debug(f"Mapper creation took {timer_end - timer_start:.2f} seconds")
    timer_start = timer.time()
    dataset = xr.open_dataset(
        mapper,
        engine="zarr",
        backend_kwargs={"consolidated": False},
        chunks=None,
        mask_and_scale=mask_and_scale,
    )  # is a dataset
    timer_end = timer.time()
    logger.debug(
        f"Dataset opening via Xarray took {timer_end - timer_start:.2f} seconds"
    )

    available_variables = list(dataset.data_vars)  # Is there a faster way ?
    if list_variables:
        print(
            f"The dataset contains the following variables : `{available_variables}`."
        )
        return

    if not variable in available_variables:
        logger.error(
            f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
        )
        print(
            f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
        )
        raise typer.Exit(code=0)
    else:
        # variable
        timer_start = timer.time()
        time_series = dataset[variable]
        timer_end = timer.time()
        logger.debug(
            f"Data array variable selection took {timer_end - timer_start:.2f} seconds"
        )

        # chunking
        timer_start = timer.time()
        chunks = {"time": time, "lat": lat, "lon": lon}
        time_series.chunk(chunks=chunks)
        timer_end = timer.time()
        logger.debug(
            f"Data array rechunking took {timer_end - timer_start:.2f} seconds"
        )

        # ReviewMe --------------------------------------------------------- ?
        # in-memory
        if in_memory:
            timer_start = timer.time()
            location_time_series.load()  # load into memory for faster ... ?
            timer_end = timer.time()
            logger.debug(
                f"Location selection loading in memory took {timer_end - timer_start:.2f} seconds"
            )
        # --------------------------------------------------------------------

    timer_start = timer.time()
    indexers = set_location_indexers(
        data_array=time_series,
        longitude=longitude,
        latitude=latitude,
        verbose=verbose,
    )
    timer_end = timer.time()
    logger.debug(
        f"Data array indexers setting took {timer_end - timer_start:.2f} seconds"
    )

    try:
        timer_start = timer.time()
        location_time_series = time_series.sel(
            **indexers,
            method=neighbor_lookup,
            tolerance=tolerance,
        )
        timer_end = timer.time()
        logger.debug(f"Location selection took {timer_end - timer_start:.2f} seconds")

        # in-memory
        if in_memory:
            timer_start = timer.time()
            location_time_series.load()  # load into memory for faster ... ?
            timer_end = timer.time()
            logger.debug(
                f"Location selection loading in memory took {timer_end - timer_start:.2f} seconds"
            )

    except Exception as exception:
        logger.error(f"{ERROR_IN_SELECTING_DATA} : {exception}")
        print(f"{ERROR_IN_SELECTING_DATA} : {exception}")
        raise SystemExit(33)
    # ------------------------------------------------------------------------

    if start_time or end_time:
        timestamps = None  # we don't need a timestamp anymore!

        if start_time and not end_time:  # set `end_time` to end of series
            end_time = location_time_series.time.values[-1]

        elif end_time and not start_time:  # set `start_time` to beginning of series
            start_time = location_time_series.time.values[0]

        else:  # Convert `start_time` & `end_time` to the correct string format
            start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
            end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")

        timer_start = timer.time()
        location_time_series = location_time_series.sel(
            time=slice(start_time, end_time)
        )
        timer_end = timer.time()
        logger.debug(
            f"Time slicing with `start_time` and `end_time` took {timer_end - timer_start:.2f} seconds"
        )

    if timestamps is not None and not start_time and not end_time:
        if len(timestamps) == 1:
            start_time = end_time = timestamps[0]

        try:
            timer_start = timer.time()
            location_time_series = location_time_series.sel(
                time=timestamps, method=neighbor_lookup
            )
            timer_end = timer.time()
            logger.debug(
                f"Time selection with `timestamps` took {timer_end - timer_start:.2f} seconds"
            )

        except KeyError:
            logger.error(f"No data found for one or more of the given {timestamps}.")
            print(f"No data found for one or more of the given {timestamps}.")

    if location_time_series.size == 1:
        timer_start = timer.time()
        single_value = float(location_time_series.values)
        warning = (
            f"{exclamation_mark} The selected timestamp "
            + f"{location_time_series.time.values}"
            + f" matches the single value "
            + f"{single_value}"
        )
        timer_end = timer.time()
        logger.debug(
            f"Single value conversion to float took {timer_end - timer_start:.2f} seconds"
        )
        logger.warning(warning)
        if verbose > 0:
            print(warning)

    data_retrieval_end_time = timer.time()
    logger.debug(
        f"Data retrieval took {data_retrieval_end_time - data_retrieval_start_time:.2f} seconds"
    )

    if not verbose:
        print(location_time_series.values)
    else:
        print(location_time_series)

    # special case!
    if location_time_series is not None and timestamps is None:
        timestamps = location_time_series.time.to_numpy()

    if statistics:  # after echoing series which might be Long!
        print_series_statistics(
            data_array=location_time_series,
            timestamps=timestamps,
            title="Selected series",
        )
    if csv:
        to_csv(
            x=location_time_series,
            path=csv,
        )

statistics

print_series_statistics

print_series_statistics(
    data_array,
    timestamps,
    title="Time series",
    rounding_places: int = None,
)
Source code in rekx/statistics.py
def print_series_statistics(
    data_array,
    timestamps,
    title="Time series",
    rounding_places: int = None,
):
    """ """
    statistics = calculate_series_statistics(data_array, timestamps)
    from rich import box
    from rich.table import Table

    table = Table(
        title=title,
        caption="Caption text",
        show_header=True,
        header_style="bold magenta",
        row_styles=["none", "dim"],
        box=box.SIMPLE_HEAD,
        highlight=True,
    )
    table.add_column("Statistic", justify="right", style="magenta", no_wrap=True)
    table.add_column("Value", style="cyan")

    # Basic metadata
    basic_metadata = ["Start", "End", "Count"]
    for key in basic_metadata:
        if key in statistics:
            table.add_row(key, str(statistics[key]))

    # Separate!
    table.add_row("", "")

    # Index of items
    index_metadata = [
        "Time of Min",
        "Index of Min",
        "Time of Max",
        "Index of Max",
    ]

    # Add statistics
    for key, value in statistics.items():
        if key not in basic_metadata and key not in index_metadata:
            # table.add_row(key, str(round_float_values(value, rounding_places)))
            table.add_row(key, str(value))

    # Separate!
    table.add_row("", "")

    # Index of
    for key, value in statistics.items():
        if key in index_metadata:
            # table.add_row(key, str(round_float_values(value, rounding_places)))
            table.add_row(key, str(value))

    from rich.console import Console

    console = Console()
    console.print(table)

csv

Multi-threaded CSV writer, much faster than :meth:pandas.DataFrame.to_csv, with full support for dask <http://dask.org/>_ and dask distributed <http://distributed.dask.org/>_.

to_csv

to_csv(
    x: DataArray,
    path: str | Path,
    *,
    nogil: bool = True,
    **kwargs
)

Print DataArray to CSV.

When x has numpy backend, this function is functionally equivalent to (but much) faster than)::

x.to_pandas().to_csv(path_or_buf, **kwargs)

When x has dask backend, this function returns a dask delayed object which will write to the disk only when its .compute() method is invoked.

Formatting and optional compression are parallelised across all available CPUs, using one dask task per chunk on the first dimension. Chunks on other dimensions will be merged ahead of computation.

:param x: :class:~xarray.DataArray with one or two dimensions :param str path: Output file path :param bool nogil: If True, use accelerated C implementation. Several kwargs won't be processed correctly (see limitations below). If False, use pandas to_csv method (slow, and does not release the GIL). nogil=True exclusively supports float and integer values dtypes (but the coords can be anything). In case of incompatible dtype, nogil is automatically switched to False. :param kwargs: Passed verbatim to :meth:pandas.DataFrame.to_csv or :meth:pandas.Series.to_csv

Limitations

  • Fancy URIs are not (yet) supported.
  • compression='zip' is not supported. All other compression methods (gzip, bz2, xz) are supported.
  • When running with nogil=True, the following parameters are ignored: columns, quoting, quotechar, doublequote, escapechar, chunksize, decimal

Distributed computing

This function supports dask distributed_, with the caveat that all workers must write to the same shared mountpoint and that the shared filesystem must strictly guarantee close-open coherency, meaning that one must be able to call write() and then close() on a file descriptor from one host and then immediately afterwards open() from another host and see the output from the first host. Note that, for performance reasons, most network filesystems do not enable this feature by default.

Alternatively, one may write to local mountpoints and then manually collect and concatenate the partial outputs.

Source code in rekx/csv.py
def to_csv(
    x: xarray.DataArray,
    path: str | Path,
    *,
    nogil: bool = True,
    **kwargs,
):
    """Print DataArray to CSV.

    When x has numpy backend, this function is functionally equivalent to (but
    much) faster than)::

        x.to_pandas().to_csv(path_or_buf, **kwargs)

    When x has dask backend, this function returns a dask delayed object which
    will write to the disk only when its .compute() method is invoked.

    Formatting and optional compression are parallelised across all available
    CPUs, using one dask task per chunk on the first dimension. Chunks on other
    dimensions will be merged ahead of computation.

    :param x:
        :class:`~xarray.DataArray` with one or two dimensions
    :param str path:
        Output file path
    :param bool nogil:
        If True, use accelerated C implementation. Several kwargs won't be
        processed correctly (see limitations below). If False, use pandas
        to_csv method (slow, and does not release the GIL).
        nogil=True exclusively supports float and integer values dtypes (but
        the coords can be anything). In case of incompatible dtype, nogil
        is automatically switched to False.
    :param kwargs:
        Passed verbatim to :meth:`pandas.DataFrame.to_csv` or
        :meth:`pandas.Series.to_csv`

    **Limitations**

    - Fancy URIs are not (yet) supported.
    - compression='zip' is not supported. All other compression methods (gzip,
      bz2, xz) are supported.
    - When running with nogil=True, the following parameters are ignored:
      columns, quoting, quotechar, doublequote, escapechar, chunksize, decimal

    **Distributed computing**

    This function supports `dask distributed`_, with the caveat that all workers
    must write to the same shared mountpoint and that the shared filesystem
    must strictly guarantee **close-open coherency**, meaning that one must be
    able to call write() and then close() on a file descriptor from one host
    and then immediately afterwards open() from another host and see the output
    from the first host. Note that, for performance reasons, most network
    filesystems do not enable this feature by default.

    Alternatively, one may write to local mountpoints and then manually collect
    and concatenate the partial outputs.
    """
    if not isinstance(x, xarray.DataArray):
        raise ValueError("first argument must be a DataArray")

    # Health checks
    if not isinstance(path, Path):
        try:
            path = Path(path)
        except:
            raise ValueError("path_or_buf must be a file path")

    if x.ndim not in (1, 2):
        raise ValueError(
            "cannot convert arrays with %d dimensions into " "pandas objects" % x.ndim
        )

    if nogil and x.dtype.kind not in "if":
        nogil = False

    # Extract row and columns indices
    indices = [x.get_index(dim) for dim in x.dims]
    if x.ndim == 2:
        index, columns = indices
    else:
        index = indices[0]
        columns = None

    compression = kwargs.pop("compression", "infer")
    compress = _compress_func(path, compression)
    mode = kwargs.pop("mode", "w")
    if mode not in "wa":
        raise ValueError('mode: expected w or a; got "%s"' % mode)

    # Fast exit for numpy backend
    if not x.chunks:
        bdata = kernels.to_csv(x.values, index, columns, True, nogil, kwargs)
        if compress:
            bdata = compress(bdata)
        with open(path, mode + "b") as fh:
            fh.write(bdata)
        return None

    # Merge chunks on all dimensions beyond the first
    x = x.chunk((x.chunks[0],) + tuple((s,) for s in x.shape[1:]))

    # Manually define the dask graph
    tok = tokenize(x.data, index, columns, compression, path, kwargs)
    name1 = "to_csv_encode-" + tok
    name2 = "to_csv_compress-" + tok
    name3 = "to_csv_write-" + tok
    name4 = "to_csv-" + tok

    dsk: dict[str | tuple, tuple] = {}

    assert x.chunks
    assert x.chunks[0]
    offset = 0
    for i, size in enumerate(x.chunks[0]):
        # Slice index
        index_i = index[offset : offset + size]
        offset += size

        x_i = (x.data.name, i) + (0,) * (x.ndim - 1)

        # Step 1: convert to CSV and encode to binary blob
        if i == 0:
            # First chunk: print header
            dsk[name1, i] = (kernels.to_csv, x_i, index_i, columns, True, nogil, kwargs)
        else:
            kwargs_i = kwargs.copy()
            kwargs_i["header"] = False
            dsk[name1, i] = (kernels.to_csv, x_i, index_i, None, False, nogil, kwargs_i)

        # Step 2 (optional): compress
        if compress:
            prevname = name2
            dsk[name2, i] = compress, (name1, i)
        else:
            prevname = name1

        # Step 3: write to file
        if i == 0:
            # First chunk: overwrite file if it already exists
            dsk[name3, i] = kernels.to_file, path, mode + "b", (prevname, i)
        else:
            # Next chunks: wait for previous chunk to complete and append
            dsk[name3, i] = (kernels.to_file, path, "ab", (prevname, i), (name3, i - 1))

    # Rename final key
    dsk[name4] = dsk.pop((name3, i))

    hlg = HighLevelGraph.from_collections(name4, dsk, (x,))
    return Delayed(name4, hlg)

write_metadata_dictionary_to_csv

write_metadata_dictionary_to_csv(
    dictionary: dict, output_filename: Path
) -> None

Write a metadata dictionary to a CSV file.

Parameters:

Name Type Description Default
dictionary dict

A dictionary containing the metadata.

required
output_filename Path

Path to the output CSV file.

required

Returns:

Type Description
None
Source code in rekx/csv.py
def write_metadata_dictionary_to_csv(
    dictionary: dict,
    output_filename: Path,
) -> None:
    """
    Write a metadata dictionary to a CSV file.

    Parameters
    ----------
    dictionary:
        A dictionary containing the metadata.
    output_filename: Path
        Path to the output CSV file.

    Returns
    -------
    None

    """
    if not dictionary:
        raise ValueError("The given dictionary is empty!")

    headers = [
        "File Name",
        "File Size",
        "Variable",
        "Shape",
        "Type",
        "Compression",
        "Read time",
    ]

    with open(output_filename, "w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file)
        writer.writerow(headers)

        file_name = dictionary.get("File name", "")
        file_size = dictionary.get("File size", "")

        for variable, metadata in dictionary.get("Variables", {}).items():
            if "Compression" in metadata:
                from .print import format_compression

                compression_details = format_compression(metadata["Compression"])
            row = [
                file_name,
                file_size,
                variable,
                metadata.get("Shape", ""),
                metadata.get("Type", ""),
                metadata.get("Scale", ""),
                metadata.get("Offset", ""),
                compression_details["Filters"] if compression_details else None,
                compression_details["Level"] if compression_details else None,
                metadata.get("Shuffling", ""),
                metadata.get("Read time", ""),
            ]
            writer.writerow(row)
    print(f"Output written to [code]{output_filename}[/code]")

write_nested_dictionary_to_csv

write_nested_dictionary_to_csv(
    nested_dictionary: dict, output_filename: Path
) -> None
Source code in rekx/csv.py
def write_nested_dictionary_to_csv(
    nested_dictionary: dict,
    output_filename: Path,
) -> None:
    """ """
    if not nested_dictionary:
        raise ValueError("The given dictionary is empty!")

    with open(output_filename, "w", newline="", encoding="utf-8") as file:
        writer = csv.writer(file)
        writer.writerow(
            [
                "File",
                "Size",
                "Variable",
                "Shape",
                "Chunks",
                "Cache",
                "Elements",
                "Preemption",
                "Type",
                "Scale",
                "Offset",
                "Compression",
                "Level",
                "Shuffling",
                # "Repetitions",
                "Read time",
            ]
        )

        for file_name, file_data in nested_dictionary.items():
            for variable, metadata in file_data.get("Variables", {}).items():
                row = [
                    file_data.get("File name", ""),
                    file_data.get("File size", ""),
                    variable,
                    metadata.get("Shape", ""),
                    metadata.get("Chunks", ""),
                    metadata.get("Cache", ""),
                    metadata.get("Elements", ""),
                    metadata.get("Preemption", ""),
                    metadata.get("Type", ""),
                    metadata.get("Scale", ""),
                    metadata.get("Offset", ""),
                    metadata.get("Compression", ""),
                    metadata.get("Level", ""),
                    metadata.get("Shuffling", ""),
                    metadata.get("Read time", ""),
                ]
                writer.writerow(row)
    print(f"Output written to [code]{output_filename}[/code]")