Select¶
select
¶
Functions:
Name | Description |
---|---|
select_fast |
Bare timing to read data over a location and optionally write |
select_time_series |
Select data using a Kerchunk reference file |
select_time_series_from_json |
Select data using a Kerchunk reference file |
select_fast
¶
select_fast(
time_series: Path,
variable: str,
longitude: float,
latitude: float,
time_series_2: Path = None,
tolerance: Optional[float] = 0.1,
csv: Path = None,
tocsv: Path = None,
verbose: int = VERBOSE_LEVEL_DEFAULT,
)
Bare timing to read data over a location and optionally write comma-separated values.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_series
|
Path
|
Path to Xarray-supported input file |
required |
variable
|
str
|
Name of the variable to query |
required |
longitude
|
float
|
The longitude of the location to read data |
required |
latitude
|
float
|
The latitude of the location to read data |
required |
time_series
|
Path
|
Path to second Xarray-supported input file |
required |
tolerance
|
Optional[float]
|
Maximum distance between original and new labels for inexact matches. Read Xarray manual on nearest-neighbor-lookups |
0.1
|
csv
|
Path
|
CSV output filename |
None
|
to_csv
|
CSV output filename (fast implementation from xarray-extras) |
required |
Returns:
Name | Type | Description |
---|---|---|
data_retrieval_time |
float
|
An estimation of the time it took to retrieve data over the requested location if no verbosity is asked. |
Notes
mask_and_scale
is always set to False
to avoid errors related with
decoding timestamps.
Source code in rekx/select.py
def select_fast(
time_series: Annotated[Path, typer_argument_time_series],
variable: Annotated[str, typer.Argument(help="Variable to select data from")],
longitude: Annotated[float, typer_argument_longitude_in_degrees],
latitude: Annotated[float, typer_argument_latitude_in_degrees],
time_series_2: Annotated[Path, typer_option_time_series] = None,
tolerance: Annotated[
Optional[float], typer_option_tolerance
] = 0.1, # Customize default if needed
# in_memory: Annotated[bool, typer_option_in_memory] = False,
csv: Annotated[Path, typer_option_csv] = None,
tocsv: Annotated[Path, typer_option_csv] = None,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
):
"""Bare timing to read data over a location and optionally write
comma-separated values.
Parameters
----------
time_series:
Path to Xarray-supported input file
variable: str
Name of the variable to query
longitude: float
The longitude of the location to read data
latitude: float
The latitude of the location to read data
time_series:
Path to second Xarray-supported input file
tolerance: float
Maximum distance between original and new labels for inexact matches.
Read Xarray manual on nearest-neighbor-lookups
csv:
CSV output filename
to_csv:
CSV output filename (fast implementation from xarray-extras)
Returns
-------
data_retrieval_time : float
An estimation of the time it took to retrieve data over the requested
location if no verbosity is asked.
Notes
-----
``mask_and_scale`` is always set to ``False`` to avoid errors related with
decoding timestamps.
"""
try:
data_retrieval_start_time = timer.perf_counter() # time()
series = xr.open_dataset(time_series, mask_and_scale=False)[variable].sel(
lon=longitude, lat=latitude, method="nearest"
)
if time_series_2:
series_2 = xr.open_dataset(time_series_2, mask_and_scale=False)[
variable
].sel(lon=longitude, lat=latitude, method="nearest")
if csv:
series.to_pandas().to_csv(csv)
if time_series_2:
series_2.to_pandas().to_csv(csv.name + "2")
elif tocsv:
to_csv(
x=series,
path=str(tocsv),
)
if time_series_2:
to_csv(x=series_2, path=str(tocsv) + "2")
data_retrieval_time = f"{timer.perf_counter() - data_retrieval_start_time:.3f}"
if not verbose:
return data_retrieval_time
else:
print(
f"[bold green]It worked[/bold green] and took : {data_retrieval_time}"
)
except Exception as e:
print(f"An error occurred: {e}")
select_time_series
¶
select_time_series(
time_series: Path,
variable: str,
longitude: float,
latitude: float,
list_variables: bool = False,
timestamps: Optional[Any] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
time: Optional[int] = None,
lat: Optional[int] = None,
lon: Optional[int] = None,
mask_and_scale: bool = False,
neighbor_lookup: MethodForInexactMatches = nearest,
tolerance: Optional[float] = 0.1,
in_memory: bool = False,
statistics: bool = False,
output_filename: Path | None = None,
verbose: int = VERBOSE_LEVEL_DEFAULT,
) -> None
Select data using a Kerchunk reference file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
time_series
|
Path
|
Path to Xarray-supported input file |
required |
variable
|
str
|
Name of the variable to query |
required |
longitude
|
float
|
The longitude of the location to read data |
required |
latitude
|
float
|
The latitude of the location to read data |
required |
list_variables
|
bool
|
Optional flag to list data variables and exit without doing anything else. |
False
|
timestamps
|
Optional[Any]
|
A string of properly formatted timestamps to be parsed and use for temporal selection. |
None
|
start_time
|
Optional[datetime]
|
A start time to generate a temporal selection period |
None
|
end_time
|
Optional[datetime]
|
An end time for the generation of a temporal selection period |
None
|
time
|
Optional[int]
|
New chunk size for the 'time' dimension |
None
|
lat
|
Optional[int]
|
New chunk size for the 'lat' dimension |
None
|
lon
|
Optional[int]
|
New chunk size for the 'lon' dimension |
None
|
mask_and_scale
|
bool
|
Flag to apply masking and scaling based on the input metadata |
False
|
neighbor_lookup
|
MethodForInexactMatches
|
Method to use for inexact matches. |
nearest
|
tolerance
|
Optional[float]
|
Maximum distance between original and new labels for inexact matches. Read Xarray manual on nearest-neighbor-lookups |
0.1
|
statistics
|
bool
|
Optional flag to calculate and display summary statistics |
False
|
verbose
|
int
|
Verbosity level |
VERBOSE_LEVEL_DEFAULT
|
Source code in rekx/select.py
def select_time_series(
time_series: Path,
variable: Annotated[str, typer.Argument(..., help="Variable name to select from")],
longitude: Annotated[float, typer_argument_longitude_in_degrees],
latitude: Annotated[float, typer_argument_latitude_in_degrees],
list_variables: Annotated[bool, typer_option_list_variables] = False,
timestamps: Annotated[Optional[Any], typer_argument_timestamps] = None,
start_time: Annotated[Optional[datetime], typer_option_start_time] = None,
end_time: Annotated[Optional[datetime], typer_option_end_time] = None,
time: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'time' dimension")
] = None,
lat: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'lat' dimension")
] = None,
lon: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'lon' dimension")
] = None,
# convert_longitude_360: Annotated[bool, typer_option_convert_longitude_360] = False,
mask_and_scale: Annotated[bool, typer_option_mask_and_scale] = False,
neighbor_lookup: Annotated[
MethodForInexactMatches, typer_option_neighbor_lookup
] = MethodForInexactMatches.nearest,
tolerance: Annotated[
Optional[float], typer_option_tolerance
] = 0.1, # Customize default if needed
in_memory: Annotated[bool, typer_option_in_memory] = False,
statistics: Annotated[bool, typer_option_statistics] = False,
output_filename: Annotated[
Path|None, typer_option_output_filename
] = None,
# output_filename: Annotated[Path, typer_option_output_filename] = 'series_in', #Path(),
# variable_name_as_suffix: Annotated[bool, typer_option_variable_name_as_suffix] = True,
# rounding_places: Annotated[Optional[int], typer_option_rounding_places] = ROUNDING_PLACES_DEFAULT,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
) -> None:
"""
Select data using a Kerchunk reference file
Parameters
----------
time_series:
Path to Xarray-supported input file
variable: str
Name of the variable to query
longitude: float
The longitude of the location to read data
latitude: float
The latitude of the location to read data
list_variables: bool
Optional flag to list data variables and exit without doing anything
else.
timestamps: str
A string of properly formatted timestamps to be parsed and use for
temporal selection.
start_time: str
A start time to generate a temporal selection period
end_time: str
An end time for the generation of a temporal selection period
time: int
New chunk size for the 'time' dimension
lat: int
New chunk size for the 'lat' dimension
lon: int
New chunk size for the 'lon' dimension
mask_and_scale: bool
Flag to apply masking and scaling based on the input metadata
neighbor_lookup: str
Method to use for inexact matches.
tolerance: float
Maximum distance between original and new labels for inexact matches.
Read Xarray manual on nearest-neighbor-lookups
statistics: bool
Optional flag to calculate and display summary statistics
verbose: int
Verbosity level
Returns
-------
"""
# if convert_longitude_360:
# longitude = longitude % 360
# warn_for_negative_longitude(longitude)
logger.debug(f"Command context : {typer.Context}")
data_retrieval_start_time = timer.time()
logger.debug(f"Starting data retrieval... {data_retrieval_start_time}")
timer_start = timer.time()
dataset = xr.open_dataset(
time_series,
mask_and_scale=mask_and_scale,
) # is a dataset
timer_end = timer.time()
logger.debug(
f"Dataset opening via Xarray took {timer_end - timer_start:.2f} seconds"
)
available_variables = list(dataset.data_vars) # Is there a faster way ?
if list_variables:
logger.info(
f"The dataset contains the following variables : `{available_variables}`."
)
print(
f"The dataset contains the following variables : `{available_variables}`."
)
return
if not variable in available_variables:
logger.debug(
f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
)
print(
f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
)
raise typer.Exit(code=0)
else:
timer_start = timer.time()
time_series = dataset[variable]
timer_end = timer.time()
logger.debug(
f"Data array variable selection took {timer_end - timer_start:.2f} seconds"
)
timer_start = timer.time()
chunks = {"time": time, "lat": lat, "lon": lon}
time_series.chunk(chunks=chunks)
timer_end = timer.time()
logger.debug(
f"Data array rechunking took {timer_end - timer_start:.2f} seconds"
)
timer_start = timer.time()
indexers = set_location_indexers(
data_array=time_series,
longitude=longitude,
latitude=latitude,
verbose=verbose,
)
timer_end = timer.time()
logger.debug(f"Data array indexers : {indexers}")
logger.debug(
f"Data array indexers setting took {timer_end - timer_start:.2f} seconds"
)
try:
timer_start = timer.time()
location_time_series = time_series.sel(
**indexers,
method=neighbor_lookup,
tolerance=tolerance,
)
timer_end = timer.time()
indentation = " " * 4 * 9
indented_location_time_series = "\n".join(
indentation + line for line in str(location_time_series).split("\n")
)
logger.debug(
f"Location time series selection :\n{indented_location_time_series}"
)
logger.debug(f"Location selection took {timer_end - timer_start:.2f} seconds")
if in_memory:
timer_start = timer.time()
location_time_series.load() # load into memory for faster ... ?
timer_end = timer.time()
logger.debug(
f"Location selection loading in memory took {timer_end - timer_start:.2f} seconds"
)
except Exception as exception:
logger.error(f"{ERROR_IN_SELECTING_DATA} : {exception}")
print(f"{ERROR_IN_SELECTING_DATA} : {exception}")
raise SystemExit(33)
# ------------------------------------------------------------------------
if start_time or end_time:
timestamps = None # we don't need a timestamp anymore!
if start_time and not end_time: # set `end_time` to end of series
end_time = location_time_series.time.values[-1]
elif end_time and not start_time: # set `start_time` to beginning of series
start_time = location_time_series.time.values[0]
else: # Convert `start_time` & `end_time` to the correct string format
start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")
timer_start = timer.time()
location_time_series = location_time_series.sel(
time=slice(start_time, end_time)
)
timer_end = timer.time()
logger.debug(
f"Time slicing with `start_time` and `end_time` took {timer_end - timer_start:.2f} seconds"
)
if timestamps is not None and not start_time and not end_time:
if len(timestamps) == 1:
start_time = end_time = timestamps[0]
try:
timer_start = timer.time()
location_time_series = location_time_series.sel(
time=timestamps, method=neighbor_lookup
)
timer_end = timer.time()
logger.debug(
f"Time selection with `timestamps` took {timer_end - timer_start:.2f} seconds"
)
except KeyError:
print(f"No data found for one or more of the given {timestamps}.")
if location_time_series.size == 1:
timer_start = timer.time()
single_value = float(location_time_series.values)
warning = (
f"{exclamation_mark} The selected timestamp "
+ f"{location_time_series.time.values}"
+ f" matches the single value "
+ f"{single_value}"
)
timer_end = timer.time()
logger.debug(
f"Single value conversion to float took {timer_end - timer_start:.2f} seconds"
)
logger.warning(warning)
if verbose > 0:
print(warning)
data_retrieval_end_time = timer.time()
logger.debug(
f"Data retrieval took {data_retrieval_end_time - data_retrieval_start_time:.2f} seconds"
)
if not verbose:
print(location_time_series.values)
else:
print(location_time_series)
if statistics: # after echoing series which might be Long!
print_series_statistics(
data_array=location_time_series,
title="Selected series",
)
output_handlers = {
".nc": lambda location_time_series, path: write_to_netcdf(
location_time_series=location_time_series,
path=path,
longitude=longitude,
latitude=latitude
),
".csv": lambda location_time_series, path: to_csv(
x=location_time_series, path=path
),
}
if output_filename:
extension = output_filename.suffix.lower()
if extension in output_handlers:
output_handlers[extension](location_time_series, output_filename)
else:
raise ValueError(f"Unsupported file extension: {extension}")
select_time_series_from_json
¶
select_time_series_from_json(
reference_file: Path,
variable: str,
longitude: float,
latitude: float,
list_variables: bool = False,
timestamps: Optional[Any] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
time: Optional[int] = None,
lat: Optional[int] = None,
lon: Optional[int] = None,
mask_and_scale: bool = False,
neighbor_lookup: MethodForInexactMatches = None,
tolerance: Optional[float] = 0.1,
in_memory: bool = False,
statistics: bool = False,
csv: Path = None,
verbose: int = VERBOSE_LEVEL_DEFAULT,
) -> None
Select data using a Kerchunk reference file
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reference_file
|
Path
|
Path to an input JSON Kerchunk reference file |
required |
variable
|
str
|
Name of the variable to query |
required |
longitude
|
float
|
The longitude of the location to read data |
required |
latitude
|
float
|
The latitude of the location to read data |
required |
list_variables
|
bool
|
Optional flag to list data variables and exit without doing anything else. |
False
|
timestamps
|
Optional[Any]
|
A string of properly formatted timestamps to be parsed and use for temporal selection. |
None
|
start_time
|
Optional[datetime]
|
A start time to generate a temporal selection period |
None
|
end_time
|
Optional[datetime]
|
An end time for the generation of a temporal selection period |
None
|
time
|
Optional[int]
|
New chunk size for the 'time' dimension |
None
|
lat
|
Optional[int]
|
New chunk size for the 'lat' dimension |
None
|
lon
|
Optional[int]
|
New chunk size for the 'lon' dimension |
None
|
mask_and_scale
|
bool
|
Flag to apply masking and scaling based on the input metadata |
False
|
neighbor_lookup
|
MethodForInexactMatches
|
Method to use for inexact matches. |
None
|
tolerance
|
Optional[float]
|
Maximum distance between original and new labels for inexact matches. Read Xarray manual on nearest-neighbor-lookups |
0.1
|
in_memory
|
bool
|
? |
False
|
statistics
|
bool
|
Optional flag to calculate and display summary statistics |
False
|
csv
|
Path
|
CSV output filename |
None
|
verbose
|
int
|
Verbosity level |
VERBOSE_LEVEL_DEFAULT
|
Source code in rekx/select.py
def select_time_series_from_json(
reference_file: Annotated[
Path, typer.Argument(..., help="Path to the kerchunk reference file")
],
variable: Annotated[str, typer.Argument(..., help="Variable name to select from")],
longitude: Annotated[float, typer_argument_longitude_in_degrees],
latitude: Annotated[float, typer_argument_latitude_in_degrees],
list_variables: Annotated[bool, typer_option_list_variables] = False,
timestamps: Annotated[Optional[Any], typer_argument_timestamps] = None,
start_time: Annotated[Optional[datetime], typer_option_start_time] = None,
end_time: Annotated[Optional[datetime], typer_option_end_time] = None,
time: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'time' dimension")
] = None,
lat: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'lat' dimension")
] = None,
lon: Annotated[
Optional[int], typer.Option(help="New chunk size for the 'lon' dimension")
] = None,
# convert_longitude_360: Annotated[bool, typer_option_convert_longitude_360] = False,
mask_and_scale: Annotated[bool, typer_option_mask_and_scale] = False,
neighbor_lookup: Annotated[
MethodForInexactMatches, typer_option_neighbor_lookup
] = None,
tolerance: Annotated[
Optional[float], typer_option_tolerance
] = 0.1, # Customize default if needed
in_memory: Annotated[bool, typer_option_in_memory] = False,
statistics: Annotated[bool, typer_option_statistics] = False,
csv: Annotated[Path, typer_option_csv] = None,
# output_filename: Annotated[Path, typer_option_output_filename] = 'series_in', #Path(),
# variable_name_as_suffix: Annotated[bool, typer_option_variable_name_as_suffix] = True,
# rounding_places: Annotated[Optional[int], typer_option_rounding_places] = ROUNDING_PLACES_DEFAULT,
verbose: Annotated[int, typer_option_verbose] = VERBOSE_LEVEL_DEFAULT,
) -> None:
"""
Select data using a Kerchunk reference file
Parameters
----------
reference_file:
Path to an input JSON Kerchunk reference file
variable: str
Name of the variable to query
longitude: float
The longitude of the location to read data
latitude: float
The latitude of the location to read data
list_variables: bool
Optional flag to list data variables and exit without doing anything
else.
timestamps: str
A string of properly formatted timestamps to be parsed and use for
temporal selection.
start_time: str
A start time to generate a temporal selection period
end_time: str
An end time for the generation of a temporal selection period
time: int
New chunk size for the 'time' dimension
lat: int
New chunk size for the 'lat' dimension
lon: int
New chunk size for the 'lon' dimension
mask_and_scale: bool
Flag to apply masking and scaling based on the input metadata
neighbor_lookup: str
Method to use for inexact matches.
tolerance: float
Maximum distance between original and new labels for inexact matches.
Read Xarray manual on nearest-neighbor-lookups
in_memory: bool
?
statistics: bool
Optional flag to calculate and display summary statistics
csv:
CSV output filename
verbose: int
Verbosity level
"""
# if convert_longitude_360:
# longitude = longitude % 360
# warn_for_negative_longitude(longitude)
# logger.debug(f'Command context : {print(typer.Context)}')
data_retrieval_start_time = timer.time()
logger.debug(f"Starting data retrieval... {data_retrieval_start_time}")
timer_start = timer.time()
mapper = fsspec.get_mapper(
"reference://",
fo=str(reference_file),
remote_protocol="file",
remote_options={"skip_instance_cache": True},
)
timer_end = timer.time()
logger.debug(f"Mapper creation took {timer_end - timer_start:.2f} seconds")
timer_start = timer.time()
dataset = xr.open_dataset(
mapper,
engine="zarr",
backend_kwargs={"consolidated": False},
chunks=None,
mask_and_scale=mask_and_scale,
) # is a dataset
timer_end = timer.time()
logger.debug(
f"Dataset opening via Xarray took {timer_end - timer_start:.2f} seconds"
)
available_variables = list(dataset.data_vars) # Is there a faster way ?
if list_variables:
print(
f"The dataset contains the following variables : `{available_variables}`."
)
return
if not variable in available_variables:
logger.error(
f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
)
print(
f"The requested variable `{variable}` does not exist! Plese select one among the available variables : {available_variables}."
)
raise typer.Exit(code=0)
else:
# variable
timer_start = timer.time()
time_series = dataset[variable]
timer_end = timer.time()
logger.debug(
f"Data array variable selection took {timer_end - timer_start:.2f} seconds"
)
# chunking
timer_start = timer.time()
chunks = {"time": time, "lat": lat, "lon": lon}
time_series.chunk(chunks=chunks)
timer_end = timer.time()
logger.debug(
f"Data array rechunking took {timer_end - timer_start:.2f} seconds"
)
# ReviewMe --------------------------------------------------------- ?
# in-memory
if in_memory:
timer_start = timer.time()
location_time_series.load() # load into memory for faster ... ?
timer_end = timer.time()
logger.debug(
f"Location selection loading in memory took {timer_end - timer_start:.2f} seconds"
)
# --------------------------------------------------------------------
timer_start = timer.time()
indexers = set_location_indexers(
data_array=time_series,
longitude=longitude,
latitude=latitude,
verbose=verbose,
)
timer_end = timer.time()
logger.debug(
f"Data array indexers setting took {timer_end - timer_start:.2f} seconds"
)
try:
timer_start = timer.time()
location_time_series = time_series.sel(
**indexers,
method=neighbor_lookup,
tolerance=tolerance,
)
timer_end = timer.time()
logger.debug(f"Location selection took {timer_end - timer_start:.2f} seconds")
# in-memory
if in_memory:
timer_start = timer.time()
location_time_series.load() # load into memory for faster ... ?
timer_end = timer.time()
logger.debug(
f"Location selection loading in memory took {timer_end - timer_start:.2f} seconds"
)
except Exception as exception:
logger.error(f"{ERROR_IN_SELECTING_DATA} : {exception}")
print(f"{ERROR_IN_SELECTING_DATA} : {exception}")
raise SystemExit(33)
# ------------------------------------------------------------------------
if start_time or end_time:
timestamps = None # we don't need a timestamp anymore!
if start_time and not end_time: # set `end_time` to end of series
end_time = location_time_series.time.values[-1]
elif end_time and not start_time: # set `start_time` to beginning of series
start_time = location_time_series.time.values[0]
else: # Convert `start_time` & `end_time` to the correct string format
start_time = start_time.strftime("%Y-%m-%d %H:%M:%S")
end_time = end_time.strftime("%Y-%m-%d %H:%M:%S")
timer_start = timer.time()
location_time_series = location_time_series.sel(
time=slice(start_time, end_time)
)
timer_end = timer.time()
logger.debug(
f"Time slicing with `start_time` and `end_time` took {timer_end - timer_start:.2f} seconds"
)
if timestamps is not None and not start_time and not end_time:
if len(timestamps) == 1:
start_time = end_time = timestamps[0]
try:
timer_start = timer.time()
location_time_series = location_time_series.sel(
time=timestamps, method=neighbor_lookup
)
timer_end = timer.time()
logger.debug(
f"Time selection with `timestamps` took {timer_end - timer_start:.2f} seconds"
)
except KeyError:
logger.error(f"No data found for one or more of the given {timestamps}.")
print(f"No data found for one or more of the given {timestamps}.")
if location_time_series.size == 1:
timer_start = timer.time()
single_value = float(location_time_series.values)
warning = (
f"{exclamation_mark} The selected timestamp "
+ f"{location_time_series.time.values}"
+ f" matches the single value "
+ f"{single_value}"
)
timer_end = timer.time()
logger.debug(
f"Single value conversion to float took {timer_end - timer_start:.2f} seconds"
)
logger.warning(warning)
if verbose > 0:
print(warning)
data_retrieval_end_time = timer.time()
logger.debug(
f"Data retrieval took {data_retrieval_end_time - data_retrieval_start_time:.2f} seconds"
)
if not verbose:
print(location_time_series.values)
else:
print(location_time_series)
# special case!
if location_time_series is not None and timestamps is None:
timestamps = location_time_series.time.to_numpy()
if statistics: # after echoing series which might be Long!
print_series_statistics(
data_array=location_time_series,
timestamps=timestamps,
title="Selected series",
)
if csv:
to_csv(
x=location_time_series,
path=csv,
)
statistics
¶
Functions:
Name | Description |
---|---|
print_series_statistics |
|
print_series_statistics
¶
print_series_statistics(
data_array,
timestamps,
title="Time series",
rounding_places: int = None,
)
Source code in rekx/statistics.py
def print_series_statistics(
data_array,
timestamps,
title="Time series",
rounding_places: int = None,
):
""" """
statistics = calculate_series_statistics(data_array, timestamps)
from rich import box
from rich.table import Table
table = Table(
title=title,
caption="Caption text",
show_header=True,
header_style="bold magenta",
row_styles=["none", "dim"],
box=box.SIMPLE_HEAD,
highlight=True,
)
table.add_column("Statistic", justify="right", style="magenta", no_wrap=True)
table.add_column("Value", style="cyan")
# Basic metadata
basic_metadata = ["Start", "End", "Count"]
for key in basic_metadata:
if key in statistics:
table.add_row(key, str(statistics[key]))
# Separate!
table.add_row("", "")
# Index of items
index_metadata = [
"Time of Min",
"Index of Min",
"Time of Max",
"Index of Max",
]
# Add statistics
for key, value in statistics.items():
if key not in basic_metadata and key not in index_metadata:
# table.add_row(key, str(round_float_values(value, rounding_places)))
table.add_row(key, str(value))
# Separate!
table.add_row("", "")
# Index of
for key, value in statistics.items():
if key in index_metadata:
# table.add_row(key, str(round_float_values(value, rounding_places)))
table.add_row(key, str(value))
from rich.console import Console
console = Console()
console.print(table)
csv
¶
Multi-threaded CSV writer, much faster than :meth:pandas.DataFrame.to_csv
,
with full support for dask <http://dask.org/>
_ and dask distributed
<http://distributed.dask.org/>
_.
Functions:
Name | Description |
---|---|
to_csv |
Print DataArray to CSV. |
to_csv
¶
Print DataArray to CSV.
When x has numpy backend, this function is functionally equivalent to (but much) faster than)::
x.to_pandas().to_csv(path_or_buf, **kwargs)
When x has dask backend, this function returns a dask delayed object which will write to the disk only when its .compute() method is invoked.
Formatting and optional compression are parallelised across all available CPUs, using one dask task per chunk on the first dimension. Chunks on other dimensions will be merged ahead of computation.
:param x:
:class:~xarray.DataArray
with one or two dimensions
:param str path:
Output file path
:param bool nogil:
If True, use accelerated C implementation. Several kwargs won't be
processed correctly (see limitations below). If False, use pandas
to_csv method (slow, and does not release the GIL).
nogil=True exclusively supports float and integer values dtypes (but
the coords can be anything). In case of incompatible dtype, nogil
is automatically switched to False.
:param kwargs:
Passed verbatim to :meth:pandas.DataFrame.to_csv
or
:meth:pandas.Series.to_csv
Limitations
- Fancy URIs are not (yet) supported.
- compression='zip' is not supported. All other compression methods (gzip, bz2, xz) are supported.
- When running with nogil=True, the following parameters are ignored: columns, quoting, quotechar, doublequote, escapechar, chunksize, decimal
Distributed computing
This function supports dask distributed
_, with the caveat that all workers
must write to the same shared mountpoint and that the shared filesystem
must strictly guarantee close-open coherency, meaning that one must be
able to call write() and then close() on a file descriptor from one host
and then immediately afterwards open() from another host and see the output
from the first host. Note that, for performance reasons, most network
filesystems do not enable this feature by default.
Alternatively, one may write to local mountpoints and then manually collect and concatenate the partial outputs.
Source code in rekx/csv.py
def to_csv(
x: xarray.DataArray,
path: str | Path,
*,
nogil: bool = True,
**kwargs,
):
"""Print DataArray to CSV.
When x has numpy backend, this function is functionally equivalent to (but
much) faster than)::
x.to_pandas().to_csv(path_or_buf, **kwargs)
When x has dask backend, this function returns a dask delayed object which
will write to the disk only when its .compute() method is invoked.
Formatting and optional compression are parallelised across all available
CPUs, using one dask task per chunk on the first dimension. Chunks on other
dimensions will be merged ahead of computation.
:param x:
:class:`~xarray.DataArray` with one or two dimensions
:param str path:
Output file path
:param bool nogil:
If True, use accelerated C implementation. Several kwargs won't be
processed correctly (see limitations below). If False, use pandas
to_csv method (slow, and does not release the GIL).
nogil=True exclusively supports float and integer values dtypes (but
the coords can be anything). In case of incompatible dtype, nogil
is automatically switched to False.
:param kwargs:
Passed verbatim to :meth:`pandas.DataFrame.to_csv` or
:meth:`pandas.Series.to_csv`
**Limitations**
- Fancy URIs are not (yet) supported.
- compression='zip' is not supported. All other compression methods (gzip,
bz2, xz) are supported.
- When running with nogil=True, the following parameters are ignored:
columns, quoting, quotechar, doublequote, escapechar, chunksize, decimal
**Distributed computing**
This function supports `dask distributed`_, with the caveat that all workers
must write to the same shared mountpoint and that the shared filesystem
must strictly guarantee **close-open coherency**, meaning that one must be
able to call write() and then close() on a file descriptor from one host
and then immediately afterwards open() from another host and see the output
from the first host. Note that, for performance reasons, most network
filesystems do not enable this feature by default.
Alternatively, one may write to local mountpoints and then manually collect
and concatenate the partial outputs.
"""
if not isinstance(x, xarray.DataArray):
raise ValueError("first argument must be a DataArray")
# Health checks
if not isinstance(path, Path):
try:
path = Path(path)
except:
raise ValueError("path_or_buf must be a file path")
if x.ndim not in (1, 2):
raise ValueError(
"cannot convert arrays with %d dimensions into " "pandas objects" % x.ndim
)
if nogil and x.dtype.kind not in "if":
nogil = False
# Extract row and columns indices
indices = [x.get_index(dim) for dim in x.dims]
if x.ndim == 2:
index, columns = indices
else:
index = indices[0]
columns = None
compression = kwargs.pop("compression", "infer")
compress = _compress_func(path, compression)
mode = kwargs.pop("mode", "w")
if mode not in "wa":
raise ValueError('mode: expected w or a; got "%s"' % mode)
# Fast exit for numpy backend
if not x.chunks:
bdata = kernels.to_csv(x.values, index, columns, True, nogil, kwargs)
if compress:
bdata = compress(bdata)
with open(path, mode + "b") as fh:
fh.write(bdata)
return None
# Merge chunks on all dimensions beyond the first
x = x.chunk((x.chunks[0],) + tuple((s,) for s in x.shape[1:]))
# Manually define the dask graph
tok = tokenize(x.data, index, columns, compression, path, kwargs)
name1 = "to_csv_encode-" + tok
name2 = "to_csv_compress-" + tok
name3 = "to_csv_write-" + tok
name4 = "to_csv-" + tok
dsk: dict[str | tuple, tuple] = {}
assert x.chunks
assert x.chunks[0]
offset = 0
for i, size in enumerate(x.chunks[0]):
# Slice index
index_i = index[offset : offset + size]
offset += size
x_i = (x.data.name, i) + (0,) * (x.ndim - 1)
# Step 1: convert to CSV and encode to binary blob
if i == 0:
# First chunk: print header
dsk[name1, i] = (kernels.to_csv, x_i, index_i, columns, True, nogil, kwargs)
else:
kwargs_i = kwargs.copy()
kwargs_i["header"] = False
dsk[name1, i] = (kernels.to_csv, x_i, index_i, None, False, nogil, kwargs_i)
# Step 2 (optional): compress
if compress:
prevname = name2
dsk[name2, i] = compress, (name1, i)
else:
prevname = name1
# Step 3: write to file
if i == 0:
# First chunk: overwrite file if it already exists
dsk[name3, i] = kernels.to_file, path, mode + "b", (prevname, i)
else:
# Next chunks: wait for previous chunk to complete and append
dsk[name3, i] = (kernels.to_file, path, "ab", (prevname, i), (name3, i - 1))
# Rename final key
dsk[name4] = dsk.pop((name3, i))
hlg = HighLevelGraph.from_collections(name4, dsk, (x,))
return Delayed(name4, hlg)
write_metadata_dictionary_to_csv
¶
Write a metadata dictionary to a CSV file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dictionary
|
dict
|
A dictionary containing the metadata. |
required |
output_filename
|
Path
|
Path to the output CSV file. |
required |
Returns:
Type | Description |
---|---|
None
|
|
Source code in rekx/csv.py
def write_metadata_dictionary_to_csv(
dictionary: dict,
output_filename: Path,
) -> None:
"""
Write a metadata dictionary to a CSV file.
Parameters
----------
dictionary:
A dictionary containing the metadata.
output_filename: Path
Path to the output CSV file.
Returns
-------
None
"""
if not dictionary:
raise ValueError("The given dictionary is empty!")
headers = [
"File Name",
"File Size",
"Variable",
"Shape",
"Type",
"Compression",
"Read time",
]
with open(output_filename, "w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerow(headers)
file_name = dictionary.get("File name", "")
file_size = dictionary.get("File size", "")
for variable, metadata in dictionary.get("Variables", {}).items():
if "Compression" in metadata:
from .print import format_compression
compression_details = format_compression(metadata["Compression"])
row = [
file_name,
file_size,
variable,
metadata.get("Shape", ""),
metadata.get("Type", ""),
metadata.get("Scale", ""),
metadata.get("Offset", ""),
compression_details["Filters"] if compression_details else None,
compression_details["Level"] if compression_details else None,
metadata.get("Shuffling", ""),
metadata.get("Read time", ""),
]
writer.writerow(row)
print(f"Output written to [code]{output_filename}[/code]")
write_nested_dictionary_to_csv
¶
Source code in rekx/csv.py
def write_nested_dictionary_to_csv(
nested_dictionary: dict,
output_filename: Path,
) -> None:
""" """
if not nested_dictionary:
raise ValueError("The given dictionary is empty!")
with open(output_filename, "w", newline="", encoding="utf-8") as file:
writer = csv.writer(file)
writer.writerow(
[
"File",
"Size",
"Variable",
"Shape",
"Chunks",
"Cache",
"Elements",
"Preemption",
"Type",
"Scale",
"Offset",
"Compression",
"Level",
"Shuffling",
# "Repetitions",
"Read time",
]
)
for file_name, file_data in nested_dictionary.items():
for variable, metadata in file_data.get("Variables", {}).items():
row = [
file_data.get("File name", ""),
file_data.get("File size", ""),
variable,
metadata.get("Shape", ""),
metadata.get("Chunks", ""),
metadata.get("Cache", ""),
metadata.get("Elements", ""),
metadata.get("Preemption", ""),
metadata.get("Type", ""),
metadata.get("Scale", ""),
metadata.get("Offset", ""),
metadata.get("Compression", ""),
metadata.get("Level", ""),
metadata.get("Shuffling", ""),
metadata.get("Read time", ""),
]
writer.writerow(row)
print(f"Output written to [code]{output_filename}[/code]")