"""
Module containing functions and classes related to Spectra calculation and
manipulation
Spectra are calculated from the windowed, decimated time data. The inbuilt
Fourier transform implementation is inspired by the implementation of the
scipy stft function.
"""
from loguru import logger
from pathlib import Path
from typing import Union, Tuple, Dict, List, Any, Optional
from pydantic import PositiveInt
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import plotly.express as px
from resistics.common import ResisticsData, ResisticsProcess, History
from resistics.common import ResisticsWriter, Metadata, WriteableMetadata
from resistics.sampling import HighResDateTime
from resistics.time import ChanMetadata
from resistics.decimate import DecimationParameters
from resistics.window import WindowedData, WindowedLevelMetadata
[docs]class SpectraData(ResisticsData):
"""
Class for holding spectra data
The spectra data is stored in the class as a dictionary mapping decimation
level to numpy array. The shape of the array for each decimation level is:
n_wins x n_chans x n_freqs
"""
def __init__(self, metadata: SpectraMetadata, data: Dict[int, np.ndarray]):
"""
Initialise spectra data
Parameters
----------
metadata : SpectraMetadata
Metadata for the spectra data
data : Dict[int, np.ndarray]
Dictionary of data, one entry for each evaluation level
"""
logger.debug(f"Creating SpectraData with data type {data[0].dtype}")
self.metadata = metadata
self.data = data
[docs] def get_level(self, level: int) -> np.ndarray:
"""Get the spectra data for a decimation level"""
if level >= self.metadata.n_levels:
raise ValueError(f"Level {level} not <= max {self.metadata.n_levels - 1}")
return self.data[level]
[docs] def get_chan(self, level: int, chan: str) -> np.ndarray:
"""Get the channel spectra data for a decimation level"""
from resistics.errors import ChannelNotFoundError
if chan not in self.metadata.chans:
raise ChannelNotFoundError(chan, self.metadata.chans)
idx = self.metadata.chans.index(chan)
return self.data[level][..., idx, :]
[docs] def get_chans(self, level: int, chans: List[str]) -> np.ndarray:
"""Get the channels spectra data for a decimation level"""
from resistics.errors import ChannelNotFoundError
for chan in chans:
if chan not in self.metadata.chans:
raise ChannelNotFoundError(chan, self.metadata.chans)
indices = [self.metadata.chans.index(chan) for chan in chans]
return self.data[level][..., indices, :]
[docs] def get_freq(self, level: int, idx: int) -> np.ndarray:
"""Get the spectra data at a frequency index for a decimation level"""
n_freqs = self.metadata.levels_metadata[level].n_freqs
if idx < 0 or idx >= n_freqs:
raise ValueError(f"Freq. index {idx} not 0 <= idx < {n_freqs}")
return np.squeeze(self.data[level][..., idx])
[docs] def get_mag_phs(
self, level: int, unwrap: bool = False
) -> Tuple[np.ndarray, np.ndarray]:
"""Get magnitude and phase for a decimation level"""
spec = self.data[level]
if unwrap:
return np.absolute(spec), np.unwrap(np.angle(spec))
return np.absolute(spec), np.angle(spec)
[docs] def get_timestamps(self, level: int) -> pd.DatetimeIndex:
"""
Get the start time of each window
Note that this does not use high resolution timestamps
Parameters
----------
level : int
The decimation level
Returns
-------
pd.DatetimeIndex
The starts of each window
Raises
------
ValueError
If the level is out of range
"""
from resistics.window import get_win_starts
if level >= self.metadata.n_levels:
raise ValueError(f"Level {level} not <= max {self.metadata.n_levels - 1}")
level_metadata = self.metadata.levels_metadata[level]
return get_win_starts(
self.metadata.ref_time,
level_metadata.win_size,
level_metadata.olap_size,
level_metadata.fs,
level_metadata.n_wins,
level_metadata.index_offset,
)
[docs] def plot(self, max_pts: Optional[int] = 10_000) -> go.Figure:
"""
Stack spectra data for all decimation levels
Parameters
----------
max_pts : Optional[int], optional
The maximum number of points in any individual plot before applying
lttbc downsampling, by default 10_000. If set to None, no
downsampling will be applied.
Returns
-------
go.Figure
The plotly figure
"""
from resistics.plot import get_spectra_stack_fig
y_labels = {x: "Magnitude" for x in self.metadata.chans}
fig = get_spectra_stack_fig(self.metadata.chans, y_labels)
colors = iter(px.colors.qualitative.Plotly)
for ilevel in range(self.metadata.n_levels):
level_metadata = self.metadata.levels_metadata[ilevel]
freqs = np.array(level_metadata.freqs)
stack = np.mean(np.absolute(self.data[ilevel]), axis=0)
legend = f"{ilevel} - {level_metadata.fs:.4f} Hz"
fig = self._add_stack_data(
fig, freqs, stack, legend, color=next(colors), max_pts=max_pts
)
return fig
[docs] def plot_level_stack(
self,
level: int,
max_pts: int = 10_000,
grouping: Optional[str] = None,
offset: str = "0h",
) -> go.Figure:
"""
Stack the spectra for a decimation level with optional time grouping
Parameters
----------
level : int
The decimation level
max_pts : int, optional
The maximum number of points in any individual plot before applying
lttbc downsampling, by default 10_000
grouping : Optional[str], optional
A grouping interval as a pandas freq string, by default None
offset : str, optional
A time offset to add to the grouping, by default "0h". For instance,
to plot night time and day time spectra, set grouping to "12h" and
offset to "6h"
Returns
-------
go.Figure
The plotly figure
"""
from resistics.plot import get_spectra_stack_fig
if grouping is None:
first_date = pd.Timestamp(self.metadata.first_time.isoformat()).floor("D")
last_date = pd.Timestamp(self.metadata.last_time.isoformat()).ceil("D")
grouping = last_date - first_date
level_metadata = self.metadata.levels_metadata[level]
df = pd.DataFrame(
data=np.arange(level_metadata.n_wins),
index=self.get_timestamps(level),
columns=["local"],
)
# group by the grouping frequency, iterate over the groups and plot
freqs = np.array(level_metadata.freqs)
y_labels = {x: "Magnitude" for x in self.metadata.chans}
fig = get_spectra_stack_fig(self.metadata.chans, y_labels)
colors = iter(px.colors.qualitative.Plotly)
for idx, group in df.groupby(pd.Grouper(freq=grouping, offset=offset)):
stack = np.mean(np.absolute(self.data[level][group["local"]]), axis=0)
fig = self._add_stack_data(
fig, freqs, stack, str(idx), color=next(colors), max_pts=max_pts
)
return fig
def _add_stack_data(
self,
fig: go.Figure,
freqs: np.ndarray,
data: np.ndarray,
legend: str,
color: str = "blue",
max_pts: Optional[int] = 10_000,
) -> go.Figure:
"""
Add stacked spectra data to a plot
Parameters
----------
fig : go.Figure
The figure to add to
freqs : np.ndarray
Frequencies
data : np.ndarray
The magnitude data
legend : str
The legend string for the data
color : str, optional
The color to plot the line, by default "blue"
max_pts : Optional[int], optional
Maximum number of points to plot, by default 10_000. If the number
of samples in the data is above this, it will be downsampled
Returns
-------
go.Figure
Plotly figure
"""
from resistics.plot import apply_lttb
n_chans = data.shape[0]
for idx in range(n_chans):
indices, chan_data = apply_lttb(data[idx, :], max_pts)
chan_freqs = freqs[indices]
scatter = go.Scattergl(
x=chan_freqs,
y=chan_data,
line=dict(color=color),
name=legend,
legendgroup=legend,
showlegend=(idx == 0),
)
fig.add_trace(scatter, row=idx + 1, col=1)
return fig
[docs] def plot_level_section(self, level: int, grouping="30T") -> go.Figure:
"""
Plot a spectra section
Parameters
----------
level : int
The decimation level to plot
grouping : str, optional
The time domain resolution, by default "30T"
Returns
-------
go.Figure
A plotly figure
"""
from resistics.plot import get_spectra_section_fig
level_metadata = self.metadata.levels_metadata[level]
df = pd.DataFrame(
data=np.arange(level_metadata.n_wins),
index=self.get_timestamps(level),
columns=["local"],
)
fig = get_spectra_section_fig(self.metadata.chans)
colorbar_len = 0.90 / self.metadata.n_chans
colorbar_inc = 0.84 / (self.metadata.n_chans - 1)
# group by the grouping frequency, iterate over the groups and plot
data = {}
for idx, group in df.groupby(pd.Grouper(freq=grouping)):
data[idx] = np.mean(np.absolute(self.data[level][group["local"]]), axis=0)
for idx, chan in enumerate(self.metadata.chans):
df_data = pd.DataFrame(
data={k: v[idx] for k, v in data.items()}, index=level_metadata.freqs
)
z = np.log10(df_data.values)
z_min = np.ceil(z.min())
z_max = np.floor(z.max())
z_range = np.arange(z_min, z_max + 1)
colorbar = dict(
tickvals=z_range,
ticktext=[f"10^{int(x)}" for x in z_range],
y=0.92 - idx * colorbar_inc,
len=colorbar_len,
)
heatmap = go.Heatmap(
z=z,
x=pd.to_datetime(df_data.columns) + pd.Timedelta(grouping) / 2,
y=df_data.index,
zmin=z_min,
zmax=z_max,
colorscale="viridis",
colorbar=colorbar,
)
fig.append_trace(heatmap, row=idx + 1, col=1)
return fig
[docs]class EvaluationFreqs(ResisticsProcess):
"""
Calculate the spectra values at the evaluation frequencies
This is done using linear interpolation in the complex domain
Example
-------
The example will show interpolation to evaluation frequencies on a very
simple example. Begin by generating some example spectra data.
>>> from resistics.decimate import DecimationSetup
>>> from resistics.spectra import EvaluationFreqs
>>> from resistics.testing import spectra_data_basic
>>> spec_data = spectra_data_basic()
>>> spec_data.metadata.n_levels
1
>>> spec_data.metadata.chans
['chan1']
>>> spec_data.metadata.levels_metadata[0].summary()
{
'fs': 180.0,
'n_wins': 2,
'win_size': 20,
'olap_size': 5,
'index_offset': 0,
'n_freqs': 10,
'freqs': [0.0, 10.0, 20.0, 30.0, 40.0, 50.0, 60.0, 70.0, 80.0, 90.0]
}
The spectra data has only a single channel and a single level which has 2
windows. Now define our evaluation frequencies.
>>> eval_freqs = [1, 12, 23, 34, 45, 56, 67, 78, 89]
>>> dec_setup = DecimationSetup(n_levels=1, per_level=9, eval_freqs=eval_freqs)
>>> dec_params = dec_setup.run(spec_data.metadata.fs[0])
>>> dec_params.summary()
{
'fs': 180.0,
'n_levels': 1,
'per_level': 9,
'min_samples': 256,
'eval_freqs': [1.0, 12.0, 23.0, 34.0, 45.0, 56.0, 67.0, 78.0, 89.0],
'dec_factors': [1],
'dec_increments': [1],
'dec_fs': [180.0]
}
Now calculate the spectra at the evaluation frequencies
>>> eval_data = EvaluationFreqs().run(dec_params, spec_data)
>>> eval_data.metadata.levels_metadata[0].summary()
{
'fs': 180.0,
'n_wins': 2,
'win_size': 20,
'olap_size': 5,
'index_offset': 0,
'n_freqs': 9,
'freqs': [1.0, 12.0, 23.0, 34.0, 45.0, 56.0, 67.0, 78.0, 89.0]
}
To double check everything is as expected, let's compare the data. Comparing
window 1 gives
>>> print(spec_data.data[0][0, 0])
[0.+0.j 1.+1.j 2.+2.j 3.+3.j 4.+4.j 5.+5.j 6.+6.j 7.+7.j 8.+8.j 9.+9.j]
>>> print(eval_data.data[0][0, 0])
[0.1+0.1j 1.2+1.2j 2.3+2.3j 3.4+3.4j 4.5+4.5j 5.6+5.6j 6.7+6.7j 7.8+7.8j
8.9+8.9j]
And window 2
>>> print(spec_data.data[0][1, 0])
[-1. +1.j 0. +2.j 1. +3.j 2. +4.j 3. +5.j 4. +6.j 5. +7.j 6. +8.j
7. +9.j 8.+10.j]
>>> print(eval_data.data[0][1, 0])
[-0.9+1.1j 0.2+2.2j 1.3+3.3j 2.4+4.4j 3.5+5.5j 4.6+6.6j 5.7+7.7j
6.8+8.8j 7.9+9.9j]
"""
[docs] def run(
self, dec_params: DecimationParameters, spec_data: SpectraData
) -> SpectraData:
"""
Interpolate spectra data to the evaluation frequencies
This is a simple linear interpolation.
Parameters
----------
dec_params : DecimationParameters
The decimation parameters which have the evaluation frequencies for
each decimation level
spec_data : SpectraData
The spectra data
Returns
-------
SpectraData
The spectra data at the evaluation frequencies
"""
metadata_dict = spec_data.metadata.dict()
data = {}
spectra_levels_metadata = []
messages = []
for ilevel in range(spec_data.metadata.n_levels):
logger.info(f"Reducing freqs to evaluation freqs for level {ilevel}")
level_metadata = spec_data.metadata.levels_metadata[ilevel]
freqs = np.array(level_metadata.freqs)
eval_freqs = np.array(dec_params.get_eval_freqs(ilevel))
data[ilevel] = self._get_level_data(
freqs, spec_data.get_level(ilevel), eval_freqs
)
spectra_levels_metadata.append(
self._get_level_metadata(level_metadata, eval_freqs)
)
messages.append("Spectra reduced to evaluation frequencies")
metadata = self._get_metadata(metadata_dict, spectra_levels_metadata)
metadata.history.add_record(self._get_record(messages))
logger.info("Fourier coefficients calculated at evaluation frequencies")
return SpectraData(metadata, data)
def _get_level_data(
self, freqs: np.ndarray, data: np.ndarray, eval_freqs: np.ndarray
) -> np.ndarray:
"""
Interpolate the spectra data to the evaluation frequencies
The input data for a level has shape:
n_wins x n_chans x n_freqs
The new output data will have size:
n_wins x n_chans x n_eval_freqs
This process is doing a linear interpolation. As this is complex data
and numpy does not have an interpolation along axis option,
interpolation is done manually.
First the evaluation frequencies are interpolated to their indices given
the current frequencies and indices.
Then these float indices are used to do the interpolation.
Parameters
----------
freqs : np.ndarray
The input data frequencies
data : np.ndarray
The input spectra data
eval_freqs : List[float]
The evaluation frequencies
Returns
-------
np.ndarray
Output level data
"""
index = np.arange(len(freqs))
eval_indices = np.interp(eval_freqs, freqs, index)
floors = np.floor(eval_indices).astype(int)
ceils = np.ceil(eval_indices).astype(int)
# cast portions to preserve original data type
# otherwise, can expand complex64 to complex128
portions = (eval_indices - floors).astype(data.dtype)
diffs = data[..., ceils] - data[..., floors]
add = np.squeeze(diffs[..., np.newaxis, :] * portions, axis=-2)
return data[..., floors] + add
def _get_level_metadata(
self, level_metadata: SpectraLevelMetadata, eval_freqs: np.ndarray
) -> SpectraLevelMetadata:
"""Get the metadata for the decimation level"""
metadata_dict = level_metadata.dict()
metadata_dict["n_freqs"] = len(eval_freqs)
metadata_dict["freqs"] = eval_freqs.tolist()
return SpectraLevelMetadata(**metadata_dict)
def _get_metadata(
self, metadata_dict: Dict[str, Any], levels_metadata: List[SpectraLevelMetadata]
) -> SpectraMetadata:
"""Get metadata for the dataset"""
metadata_dict.pop("file_info")
metadata_dict["levels_metadata"] = levels_metadata
return SpectraMetadata(**metadata_dict)
[docs]class SpectraDataWriter(ResisticsWriter):
"""Writer of resistics spectra data"""
[docs] def run(self, dir_path: Path, spec_data: SpectraData) -> None:
"""
Write out SpectraData
Parameters
----------
dir_path : Path
The directory path to write to
spec_data : SpectraData
Spectra data to write out
Raises
------
WriteError
If unable to write to the directory
"""
from resistics.errors import WriteError
if not self._check_dir(dir_path):
raise WriteError(dir_path, "Unable to write to directory, check logs")
logger.info(f"Writing spectra data to {dir_path}")
metadata_path = dir_path / "metadata.json"
data_path = dir_path / "data"
np.savez_compressed(data_path, **{str(x): y for x, y in spec_data.data.items()})
metadata = spec_data.metadata.copy()
metadata.history.add_record(self._get_record(dir_path, type(spec_data)))
metadata.write(metadata_path)
[docs]class SpectraDataReader(ResisticsProcess):
"""Reader of resistics spectra data"""
[docs] def run(
self, dir_path: Path, metadata_only: bool = False
) -> Union[SpectraMetadata, SpectraData]:
"""
Read SpectraData
Parameters
----------
dir_path : Path
The directory path to read from
metadata_only : bool, optional
Flag for getting metadata only, by default False
Returns
-------
Union[SpectraMetadata, SpectraData]
The SpectraData or SpectraMetadata if metadata_only is True
Raises
------
ReadError
If the directory does not exist
"""
from resistics.errors import ReadError
if not dir_path.exists():
raise ReadError(dir_path, "Directory does not exist")
logger.info(f"Reading spectra data from {dir_path}")
metadata_path = dir_path / "metadata.json"
metadata = SpectraMetadata.parse_file(metadata_path)
if metadata_only:
return metadata
data_path = dir_path / "data.npz"
npz_file = np.load(data_path)
data = {int(level): npz_file[level] for level in npz_file.files}
messages = [f"Spectra data read from {dir_path}"]
metadata.history.add_record(self._get_record(messages))
return SpectraData(metadata, data)
[docs]class SpectraProcess(ResisticsProcess):
"""Parent class for spectra processes"""
[docs] def run(self, spec_data: SpectraData) -> SpectraData:
"""Run a spectra processor"""
raise NotImplementedError("Run is not implemented in the parent SpectraProcess")
[docs]class SpectraSmootherGaussian(SpectraProcess):
"""
Smooth a spectra with a gaussian filter
For more information, please refer to:
https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.gaussian_filter1d.html
Examples
--------
Smooth a simple spectra data instance
>>> from resistics.spectra import SpectraSmootherGaussian
>>> from resistics.testing import spectra_data_basic
>>> spec_data = spectra_data_basic()
>>> smooth_data = SpectraSmootherGaussian().run(spec_data)
Look at the results for the two windows
>>> spec_data.data[0][0,0]
array([0.+0.j, 1.+1.j, 2.+2.j, 3.+3.j, 4.+4.j, 5.+5.j, 6.+6.j, 7.+7.j,
8.+8.j, 9.+9.j])
>>> smooth_data.data[0][0,0]
array([0.42704095+0.42704095j, 1.06795587+1.06795587j,
2.00483335+2.00483335j, 3.00013383+3.00013383j,
4. +4.j , 5. +5.j ,
5.99986617+5.99986617j, 6.99516665+6.99516665j,
7.93204413+7.93204413j, 8.57295905+8.57295905j])
"""
sigma: float = 3
[docs] def run(self, spec_data: SpectraData) -> SpectraData:
"""
Run Gaussian filtering of spectra data
Parameters
----------
spec_data : SpectraData
Input spectra data
Returns
-------
SpectraData
Output spectra data
"""
import scipy.ndimage as ndimage
data = {}
logger.info(f"Smoothing frequencies with gaussian filter, sigma {self.sigma}")
messages = [f"Smoothing frequencies with gaussian filter, sigma {self.sigma}"]
for ilevel in range(spec_data.metadata.n_levels):
data[ilevel] = ndimage.gaussian_filter1d(
spec_data.get_level(ilevel), 1, axis=-1
)
messages.append(f"Smoothed level {ilevel} with gaussian filter")
metadata = SpectraMetadata(**spec_data.metadata.dict())
metadata.history.add_record(self._get_record(messages))
logger.info("Fourier coefficients calculated at evaluation frequencies")
return SpectraData(metadata, data)