Kerchunking¶
reference
¶
Functions:
Name | Description |
---|---|
create_kerchunk_reference |
Reference local NetCDF files using Kerchunk |
create_single_reference |
Helper function for create_kerchunk_reference() |
create_kerchunk_reference
¶
create_kerchunk_reference(
source_directory: Path,
output_directory: Path,
pattern: str = "*.nc",
workers: int = 4,
dry_run: bool = False,
verbose: int = VERBOSE_LEVEL_DEFAULT,
)
Reference local NetCDF files using Kerchunk
Source code in rekx/reference.py
def create_kerchunk_reference(
source_directory: Annotated[Path, typer_argument_source_directory],
output_directory: Annotated[Path, typer_argument_output_directory],
pattern: Annotated[str, typer_option_filename_pattern] = "*.nc",
workers: Annotated[int, typer_option_number_of_workers] = 4,
dry_run: Annotated[bool, typer_option_dry_run] = False,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
"""Reference local NetCDF files using Kerchunk"""
# import cProfile
# import pstats
# profiler = cProfile.Profile()
# profiler.enable()
file_paths = list(source_directory.glob(pattern))
if not file_paths:
logger.info("No files found in the source directory matching the pattern.")
return
if dry_run:
print(
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched: {len(file_paths)}")
print(f"> Creating single reference files to [code]{output_directory}[/code]")
return # Exit for a dry run
output_directory.mkdir(parents=True, exist_ok=True)
# Map verbosity level to display mode
mode = DisplayMode(verbose)
with display_context[mode]:
with multiprocessing.Pool(processes=workers) as pool:
from functools import partial
partial_create_single_reference = partial(
create_single_reference, output_directory=output_directory
)
results = pool.map(partial_create_single_reference, file_paths)
create_single_reference
¶
Helper function for create_kerchunk_reference()
Notes
Will create an MD5 Hash for each new reference file in order to avoid regenerating the same file in case of a renewed attempt to reference the same file. This is useful in the context or epxlorative massive processing.
Source code in rekx/reference.py
def create_single_reference(
file_path: Path,
output_directory: Path,
# md5: bool = True,
verbose: int = 0,
):
"""Helper function for create_kerchunk_reference()
Notes
-----
Will create an MD5 Hash for each new reference file in order to avoid
regenerating the same file in case of a renewed attempt to reference the
same file. This is useful in the context or epxlorative massive
processing.
"""
filename = file_path.stem
output_file = f"{output_directory}/{filename}.json"
hash_file = output_file + ".hash"
generated_hash = generate_file_md5(file_path)
local_fs = fsspec.filesystem("file")
if local_fs.exists(output_file) and local_fs.exists(hash_file):
logger.debug(f"Found a reference file '{output_file}' and a hash '{hash_file}'")
with local_fs.open(hash_file, "r") as hf:
existing_hash = hf.read().strip()
if existing_hash == generated_hash:
pass
else:
logger.debug(
f"Creating reference file '{output_file}' with hash '{generated_hash}'"
)
file_url = f"file://{file_path}"
with fsspec.open(file_url, mode="rb") as input_file:
h5chunks = SingleHdf5ToZarr(input_file, file_url, inline_threshold=0)
json = ujson.dumps(h5chunks.translate()).encode()
with local_fs.open(output_file, "wb") as f:
f.write(json)
with local_fs.open(hash_file, "w") as hf:
hf.write(generated_hash)
combine
¶
Functions:
Name | Description |
---|---|
combine_kerchunk_references |
Combine multiple JSON references into a single logical aggregate |
combine_kerchunk_references_to_parquet |
Combine multiple JSON references into a single Parquet store using Kerchunk's |
combine_kerchunk_references
¶
combine_kerchunk_references(
source_directory: Path,
pattern: str = "*.json",
combined_reference: Path = "combined_kerchunk.json",
dry_run: bool = False,
verbose: int = VERBOSE_LEVEL_DEFAULT,
)
Combine multiple JSON references into a single logical aggregate
dataset using Kerchunk's MultiZarrToZarr
function
Source code in rekx/combine.py
def combine_kerchunk_references(
source_directory: Annotated[Path, typer_argument_source_directory],
pattern: Annotated[str, typer_option_filename_pattern] = "*.json",
combined_reference: Annotated[
Path, typer_argument_kerchunk_combined_reference
] = "combined_kerchunk.json",
dry_run: Annotated[bool, typer_option_dry_run] = False,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
"""Combine multiple JSON references into a single logical aggregate
dataset using Kerchunk's `MultiZarrToZarr` function"""
mode = DisplayMode(verbose)
with display_context[mode]:
source_directory = Path(source_directory)
reference_file_paths = list(source_directory.glob(pattern))
reference_file_paths = list(map(str, reference_file_paths))
if dry_run:
print(
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched: {len(reference_file_paths)}")
print(
f"> Writing combined reference file to [code]{combined_reference}[/code]"
)
return # Exit for a dry run
from kerchunk.combine import MultiZarrToZarr
mzz = MultiZarrToZarr(
reference_file_paths,
concat_dims=["time"],
identical_dims=["lat", "lon"],
)
multifile_kerchunk = mzz.translate()
combined_reference_filename = Path(combined_reference)
local_fs = fsspec.filesystem("file")
with local_fs.open(combined_reference_filename, "wb") as f:
f.write(ujson.dumps(multifile_kerchunk).encode())
combine_kerchunk_references_to_parquet
¶
combine_kerchunk_references_to_parquet(
source_directory: Path,
pattern: str = "*.json",
combined_reference: Path = "combined_kerchunk.parq",
dry_run: bool = False,
verbose: int = VERBOSE_LEVEL_DEFAULT,
)
Combine multiple JSON references into a single Parquet store using Kerchunk's MultiZarrToZarr
function
Source code in rekx/combine.py
def combine_kerchunk_references_to_parquet(
source_directory: Annotated[Path, typer_argument_source_directory],
pattern: Annotated[str, typer_option_filename_pattern] = "*.json",
combined_reference: Annotated[
Path, typer_argument_kerchunk_combined_reference
] = "combined_kerchunk.parq",
dry_run: Annotated[bool, typer_option_dry_run] = False,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
"""Combine multiple JSON references into a single Parquet store using Kerchunk's `MultiZarrToZarr` function"""
mode = DisplayMode(verbose)
with display_context[mode]:
source_directory = Path(source_directory)
reference_file_paths = list(source_directory.glob(pattern))
reference_file_paths = list(map(str, reference_file_paths))
if dry_run:
print(
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched: {len(reference_file_paths)}")
print(
f"> Writing combined reference file to [code]{combined_reference}[/code]"
)
return # Exit for a dry run
# Create LazyReferenceMapper to pass to MultiZarrToZarr
filesystem = fsspec.filesystem("file")
import os
combined_reference.mkdir(parents=True, exist_ok=True)
from fsspec.implementations.reference import LazyReferenceMapper
output_lazy = LazyReferenceMapper(
root=str(combined_reference),
fs=filesystem,
cache_size=1000,
)
from kerchunk.combine import MultiZarrToZarr
# Combine single references
mzz = MultiZarrToZarr(
reference_file_paths,
remote_protocol="file",
concat_dims=["time"],
identical_dims=["lat", "lon"],
out=output_lazy,
)
multifile_kerchunk = mzz.translate()
output_lazy.flush() # Write all non-full reference batches
# Read from the Parquet storage
kerchunk.df.refs_to_dataframe(multifile_kerchunk, str(combined_reference))
filesystem = fsspec.implementations.reference.ReferenceFileSystem(
fo=str(combined_reference),
target_protocol="file",
remote_protocol="file",
lazy=True,
)
ds = xr.open_dataset(
filesystem.get_mapper(""),
engine="zarr",
chunks={},
backend_kwargs={"consolidated": False},
)
print(ds)
parquet
¶
Functions:
Name | Description |
---|---|
combine_parquet_stores_to_parquet |
Combine multiple Parquet stores into a single aggregate dataset using Kerchunk's |
create_multiple_parquet_stores |
|
create_parquet_store |
|
create_single_parquet_store |
Helper function for create_multiple_parquet_stores() |
parquet_multi_reference |
Create Parquet references from an HDF5/NetCDF file |
parquet_reference |
Create Parquet references from an HDF5/NetCDF file |
select_from_parquet |
Select data from a Parquet store |
combine_parquet_stores_to_parquet
¶
combine_parquet_stores_to_parquet(
source_directory: Path,
pattern: str = "*.parquet",
combined_reference: Path = "combined_kerchunk.parquet",
record_size: int = DEFAULT_RECORD_SIZE,
dry_run: bool = False,
verbose: int = VERBOSE_LEVEL_DEFAULT,
)
Combine multiple Parquet stores into a single aggregate dataset using Kerchunk's MultiZarrToZarr
function
Source code in rekx/parquet.py
def combine_parquet_stores_to_parquet(
source_directory: Annotated[Path, typer_argument_source_directory],
pattern: Annotated[str, typer_option_filename_pattern] = "*.parquet",
combined_reference: Annotated[
Path, typer_argument_kerchunk_combined_reference
] = "combined_kerchunk.parquet",
record_size: int = DEFAULT_RECORD_SIZE,
dry_run: Annotated[
bool,
typer.Option("--dry-run", help="Run the command without making any changes."),
] = False,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
"""Combine multiple Parquet stores into a single aggregate dataset using Kerchunk's `MultiZarrToZarr` function"""
mode = DisplayMode(verbose)
with display_context[mode]:
source_directory = Path(source_directory)
reference_file_paths = list(source_directory.glob(pattern))
reference_file_paths = list(map(str, reference_file_paths))
reference_file_paths.sort()
if dry_run:
print(
f"[bold]Dry run[/bold] of [bold]operations that would be performed[/bold]:"
)
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched: {len(reference_file_paths)}")
print(
f"> Writing combined reference file to [code]{combined_reference}[/code]"
)
return # Exit for a dry run
try:
# Create LazyReferenceMapper to pass to MultiZarrToZarr
combined_reference.mkdir(parents=True, exist_ok=True)
print(f"Combined reference name : {combined_reference}")
filesystem = fsspec.filesystem("file")
from fsspec.implementations.reference import LazyReferenceMapper
output_lazy = LazyReferenceMapper.create(
root=str(combined_reference),
fs=filesystem,
record_size=record_size,
)
# Combine single references
from kerchunk.combine import MultiZarrToZarr
multi_zarr_to_zarr = MultiZarrToZarr(
reference_file_paths,
remote_protocol="file",
concat_dims=["time"],
identical_dims=["lat", "lon"],
coo_map={"time": "cf:time"},
out=output_lazy,
)
multifile_kerchunk = multi_zarr_to_zarr.translate()
output_lazy.flush() # Write all non-full reference batches
except Exception as e:
print(f"Failed creating the [code]{combined_reference}[/code] : {e}!")
import traceback
traceback.print_exc()
if verbose > 1:
# Read from the Parquet storage
# kerchunk.df.refs_to_dataframe(multifile_kerchunk, str(combined_reference))
# filesystem = fsspec.implementations.reference.ReferenceFileSystem(
# fo=str(combined_reference),
# target_protocol='file',
# remote_protocol='file',
# lazy=True
# )
# ds = xr.open_dataset(
# filesystem.get_mapper(''),
# engine="zarr",
# chunks={},
# backend_kwargs={"consolidated": False},
# )
# print(ds)
dataset = xr.open_dataset(
str(combined_reference), # does not handle Path
engine="kerchunk",
storage_options=dict(remote_protocol="file")
# storage_options=dict(skip_instance_cache=True, remote_protocol="file"),
)
print(dataset)
create_multiple_parquet_stores
¶
create_multiple_parquet_stores(
source_directory: Path,
output_directory: Path,
pattern: str = "*.nc",
record_size: int = DEFAULT_RECORD_SIZE,
workers: int = 4,
verbose: int = 0,
)
Source code in rekx/parquet.py
def create_multiple_parquet_stores(
source_directory: Path,
output_directory: Path,
pattern: str = "*.nc",
record_size: int = DEFAULT_RECORD_SIZE,
workers: int = 4,
verbose: int = 0,
):
""" """
input_file_paths = list(source_directory.glob(pattern))
# if verbose:
# print(f'Input file paths : {input_file_paths}')
if not input_file_paths:
print(
"No files found in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]!"
)
return
output_directory.mkdir(parents=True, exist_ok=True)
with multiprocessing.Pool(processes=workers) as pool:
print(
f"Creating the following Parquet stores in [code]{output_directory}[/code] : "
)
partial_create_parquet_references = partial(
create_single_parquet_store,
output_directory=output_directory,
record_size=record_size,
verbose=verbose,
)
pool.map(partial_create_parquet_references, input_file_paths)
if verbose:
print(f"Done!")
create_parquet_store
¶
create_parquet_store(
input_file: Path,
output_parquet_store: Path,
record_size: int = DEFAULT_RECORD_SIZE,
)
Source code in rekx/parquet.py
def create_parquet_store(
input_file: Path,
output_parquet_store: Path,
record_size: int = DEFAULT_RECORD_SIZE,
):
""" """
log_messages = []
log_messages.append("Logging execution of create_parquet_store()")
output_parquet_store.mkdir(parents=True, exist_ok=True)
try:
log_messages.append(f"Creating a filesystem mapper for {output_parquet_store}")
filesystem = fsspec.filesystem("file")
output = LazyReferenceMapper.create(
root=str(output_parquet_store), # does not handle Path
fs=filesystem,
record_size=record_size,
)
log_messages.append(f"Created the filesystem mapper {output}")
log_messages.append(f"Kerchunking the file {input_file}")
single_zarr = SingleHdf5ToZarr(str(input_file), out=output)
single_zarr.translate()
log_messages.append(f"Kerchunked the file {input_file}")
except Exception as e:
print(f"Failed processing file [code]{input_file}[/code] : {e}")
log_messages.append(f"Exception occurred: {e}")
log_messages.append("Traceback (most recent call last):")
tb_lines = traceback.format_exc().splitlines()
for line in tb_lines:
log_messages.append(line)
raise
finally:
logger.info("\n".join(log_messages))
logger.info(f"Returning a Parquet store : {output_parquet_store}")
return output_parquet_store
create_single_parquet_store
¶
create_single_parquet_store(
input_file_path,
output_directory,
record_size: int = DEFAULT_RECORD_SIZE,
verbose: int = 0,
)
Helper function for create_multiple_parquet_stores()
Source code in rekx/parquet.py
def create_single_parquet_store(
input_file_path,
output_directory,
record_size: int = DEFAULT_RECORD_SIZE,
verbose: int = 0,
):
"""Helper function for create_multiple_parquet_stores()"""
filename = input_file_path.stem
single_parquet_store = output_directory / f"{filename}.parquet"
create_parquet_store(
input_file_path,
output_parquet_store=single_parquet_store,
record_size=record_size,
)
if verbose > 0:
print(f" [code]{single_parquet_store}[/code]")
if verbose > 1:
dataset = xr.open_dataset(
str(single_parquet_store),
engine="kerchunk",
storage_options=dict(remote_protocol="file"),
)
print(dataset)
parquet_multi_reference
¶
parquet_multi_reference(
source_directory: Path,
output_directory: Optional[Path] = ".",
pattern: str = "*.nc",
record_size: int = DEFAULT_RECORD_SIZE,
workers: int = 4,
dry_run: bool = False,
verbose: int = VERBOSE_LEVEL_DEFAULT,
)
Create Parquet references from an HDF5/NetCDF file
Source code in rekx/parquet.py
def parquet_multi_reference(
source_directory: Path,
output_directory: Optional[Path] = ".",
pattern: str = "*.nc",
record_size: int = DEFAULT_RECORD_SIZE,
workers: int = 4,
dry_run: Annotated[bool, typer_option_dry_run] = False,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
"""Create Parquet references from an HDF5/NetCDF file"""
input_file_paths = list(source_directory.glob(pattern))
if not input_file_paths:
print("No files found in the source directory matching the pattern.")
return
if dry_run:
print(f"[bold]Dry running operations that would be performed[/bold]:")
print(
f"> Reading files in [code]{source_directory}[/code] matching the pattern [code]{pattern}[/code]"
)
print(f"> Number of files matched : {len(input_file_paths)}")
print(f"> Creating Parquet stores in [code]{output_directory}[/code]")
return # Exit for a dry run
create_multiple_parquet_stores(
source_directory=source_directory,
output_directory=output_directory,
pattern=pattern,
record_size=record_size,
workers=workers,
verbose=verbose,
)
parquet_reference
¶
parquet_reference(
input_file: Path,
output_directory: Optional[Path] = ".",
record_size: int = DEFAULT_RECORD_SIZE,
dry_run: bool = False,
verbose: int = VERBOSE_LEVEL_DEFAULT,
)
Create Parquet references from an HDF5/NetCDF file
Source code in rekx/parquet.py
def parquet_reference(
input_file: Path,
output_directory: Optional[Path] = ".",
record_size: int = DEFAULT_RECORD_SIZE,
dry_run: Annotated[bool, typer_option_dry_run] = False,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
"""Create Parquet references from an HDF5/NetCDF file"""
filename = input_file.stem
output_parquet_store = output_directory / f"{filename}.parquet"
if dry_run:
print(f"[bold]Dry running operations that would be performed[/bold]:")
print(
f"> Creating Parquet references to [code]{input_file}[/code] in [code]{output_parquet_store}[/code]"
)
return # Exit for a dry run
create_single_parquet_store(
input_file_path=input_file,
output_directory=output_directory,
record_size=record_size,
verbose=verbose,
)
select_from_parquet
¶
select_from_parquet(
parquet_store: Path,
variable: str,
longitude: float,
latitude: float,
timestamps: Optional[Any] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
time: Optional[int] = None,
lat: Optional[int] = None,
lon: Optional[int] = None,
mask_and_scale: bool = False,
neighbor_lookup: MethodForInexactMatches = None,
tolerance: Optional[float] = 0.1,
in_memory: bool = False,
statistics: bool = False,
csv: Path = None,
variable_name_as_suffix: bool = True,
rounding_places: Optional[
int
] = ROUNDING_PLACES_DEFAULT,
verbose: int = VERBOSE_LEVEL_DEFAULT,
) -> None
Select data from a Parquet store
Source code in rekx/parquet.py
def select_from_parquet(
parquet_store: Annotated[Path, typer.Argument(..., help="Path to Parquet store")],
variable: Annotated[str, typer.Argument(..., help="Variable name to select from")],
longitude: Annotated[float, typer_argument_longitude_in_degrees],
latitude: Annotated[float, typer_argument_latitude_in_degrees],
timestamps: Annotated[Optional[Any], typer_argument_timestamps] = None,
start_time: Annotated[Optional[datetime], typer_option_start_time] = None,
end_time: Annotated[Optional[datetime], typer_option_end_time] = None,
time: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'time' dimension")
] = None,
lat: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'lat' dimension")
] = None,
lon: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'lon' dimension")
] = None,
# convert_longitude_360: Annotated[bool, typer_option_convert_longitude_360] = False,
mask_and_scale: Annotated[bool, typer_option_mask_and_scale] = False,
neighbor_lookup: Annotated[
MethodForInexactMatches, typer_option_neighbor_lookup
] = None,
tolerance: Annotated[
Optional[float], typer_option_tolerance
] = 0.1, # Customize default if needed
in_memory: Annotated[bool, typer_option_in_memory] = False,
statistics: Annotated[bool, typer_option_statistics] = False,
csv: Annotated[Path, typer_option_csv] = None,
# output_filename: Annotated[Path, typer_option_output_filename] = 'series_in', #Path(),
variable_name_as_suffix: Annotated[
bool, typer_option_variable_name_as_suffix
] = True,
rounding_places: Annotated[
Optional[int], typer_option_rounding_places
] = ROUNDING_PLACES_DEFAULT,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
) -> None:
"""Select data from a Parquet store"""
# if convert_longitude_360:
# longitude = longitude % 360
# warn_for_negative_longitude(longitude)
logger.debug(f"Command context : {typer.Context}")
data_retrieval_start_time = timer.time()
logger.debug(f"Starting data retrieval... {data_retrieval_start_time}")
# timer_start = timer.time()
# mapper = fsspec.get_mapper(
# "reference://",
# fo=str(reference_file),
# remote_protocol="file",
# remote_options={"skip_instance_cache": True},
# )
# timer_end = timer.time()
# logger.debug(f"Mapper creation took {timer_end - timer_start:.2f} seconds")
timer_start = timer.perf_counter()
dataset = xr.open_dataset(
str(parquet_store), # does not handle Path
engine="kerchunk",
storage_options=dict(skip_instance_cache=True, remote_protocol="file"),
# backend_kwargs={"consolidated": False},
# chunks=None,
# mask_and_scale=mask_and_scale,
)
timer_end = timer.perf_counter()
logger.debug(
f"Dataset opening via Xarray took {timer_end - timer_start:.2f} seconds"
)
available_variables = list(dataset.data_vars)
if not variable in available_variables:
print(
f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
)
raise typer.Exit(code=0)
else:
timer_start = timer.time()
time_series = dataset[variable]
timer_end = timer.time()
logger.debug(
f"Data array variable selection took {timer_end - timer_start:.2f} seconds"
)
timer_start = timer.time()
chunks = {"time": time, "lat": lat, "lon": lon}
time_series.chunk(chunks=chunks)
timer_end = timer.time()
logger.debug(
f"Data array rechunking took {timer_end - timer_start:.2f} seconds"
)
timer_start = timer.time()
indexers = set_location_indexers(
data_array=time_series,
longitude=longitude,
latitude=latitude,
verbose=verbose,
)
timer_end = timer.time()
logger.debug(
f"Data array indexers setting took {timer_end - timer_start:.2f} seconds"
)
try:
timer_start = timer.time()
location_time_series = time_series.sel(
**indexers,
method=neighbor_lookup,
tolerance=tolerance,
)
timer_end = timer.time()
logger.debug(f"Location selection took {timer_end - timer_start:.2f} seconds")
if in_memory:
timer_start = timer.time()
location_time_series.load() # load into memory for faster ... ?
timer_end = timer.time()
logger.debug(
f"Location selection loading in memory took {timer_end - timer_start:.2f} seconds"
)
except Exception as exception:
print(f"{ERROR_IN_SELECTING_DATA} : {exception}")
raise SystemExit(33)
# ------------------------------------------------------------------------
if start_time or end_time:
timestamps = None # we don't need a timestamp anymore!
if start_time and not end_time: # set `end_time` to end of series
end_time = location_time_series.time.values[-1]
elif end_time and not start_time: # set `start_time` to beginning of series
start_time = location_time_series.time.values[0]
else: # Convert `start_time` & `end_time` to the correct string format
start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")
timer_start = timer.time()
location_time_series = location_time_series.sel(
time=slice(start_time, end_time)
)
timer_end = timer.time()
logger.debug(
f"Time slicing with `start_time` and `end_time` took {timer_end - timer_start:.2f} seconds"
)
if timestamps is not None and not start_time and not end_time:
if len(timestamps) == 1:
start_time = end_time = timestamps[0]
try:
timer_start = timer.time()
location_time_series = location_time_series.sel(
time=timestamps, method=neighbor_lookup
)
timer_end = timer.time()
logger.debug(
f"Time selection with `timestamps` took {timer_end - timer_start:.2f} seconds"
)
except KeyError:
print(f"No data found for one or more of the given {timestamps}.")
if location_time_series.size == 1:
timer_start = timer.time()
single_value = float(location_time_series.values)
warning = (
f"{exclamation_mark} The selected timestamp "
+ f"{location_time_series.time.values}"
+ f" matches the single value "
+ f"{single_value}"
)
timer_end = timer.time()
logger.debug(
f"Single value conversion to float took {timer_end - timer_start:.2f} seconds"
)
logger.warning(warning)
if verbose > 0:
print(warning)
data_retrieval_end_time = timer.time()
logger.debug(
f"Data retrieval took {data_retrieval_end_time - data_retrieval_start_time:.2f} seconds"
)
timer_start = timer.time()
results = {
location_time_series.name: location_time_series.to_numpy(),
}
timer_end = timer.time()
logger.debug(
f"Data series conversion to NumPy took {timer_end - timer_start:.2f} seconds"
)
title = "Location time series"
# special case!
if location_time_series is not None and timestamps is None:
timer_start = timer.time()
timestamps = location_time_series.time.to_numpy()
timer_end = timer.time()
logger.debug(
f"Timestamps conversion to NumPy from Xarray's _time_ coordinate took {timer_end - timer_start:.2f} seconds"
)
if not verbose and not (statistics or csv):
flat_array = location_time_series.values.flatten()
print(*flat_array, sep=", ")
if verbose > 0:
print(location_time_series)
if statistics: # after echoing series which might be Long!
timer_start = timer.time()
print_series_statistics(
data_array=location_time_series,
timestamps=timestamps,
title="Selected series",
)
timer_end = timer.time()
logger.debug(
f"Printing statistics in the console took {timer_end - timer_start:.2f} seconds"
)
if csv:
timer_start = timer.time()
to_csv(
x=location_time_series,
path=csv,
)
timer_end = timer.time()
logger.debug(f"Exporting to CSV took {timer_end - timer_start:.2f} seconds")