Source code for resistics.letsgo

"""
This module is the main interface to resistics and includes:

- Classes and functions for making, loading and using resistics projects
- Functions for processing data
"""
from loguru import logger
from typing import Optional, Dict, Union, Any
from pathlib import Path
from datetime import datetime

from resistics.errors import MetadataReadError, TimeDataReadError
from resistics.common import ResisticsProcess, ResisticsModel
from resistics.config import Configuration, get_default_configuration
from resistics.project import PROJ_FILE, PROJ_DIRS, ProjectMetadata, Project
from resistics.project import Site, Measurement
from resistics.sampling import DateTimeLike, HighResDateTime
from resistics.time import TimeData
from resistics.decimate import DecimationParameters
from resistics.decimate import DecimatedData
from resistics.window import WindowedData
from resistics.spectra import SpectraData
from resistics.gather import GatheredData
from resistics.regression import RegressionInputData, Solution


[docs]class ProjectCreator(ResisticsProcess): """Process to create a project""" dir_path: Path metadata: ProjectMetadata
[docs] def run(self): """ Create the project Raises ------ ProjectCreateError If an existing project found """ from resistics.errors import ProjectCreateError from resistics.common import is_dir, assert_dir metadata_path = self.dir_path / PROJ_FILE if self.dir_path.exists() and is_dir(self.dir_path) and metadata_path.exists(): raise ProjectCreateError( self.dir_path, "Existing project found, try loading" ) elif self.dir_path.exists(): logger.warning("Directory already exists, the project will be saved here") assert_dir(self.dir_path) else: logger.info("Making directory for project") self.dir_path.mkdir(parents=True) self._make_subdirs() self.metadata.write(metadata_path) logger.info(f"Project created in {self.dir_path}")
def _make_subdirs(self): """Make project subdirectories""" for data_type, subdir in PROJ_DIRS.items(): subdir_path = self.dir_path / subdir if not subdir_path.exists(): logger.info(f"Making {data_type} data subdirectory: {subdir_path}") subdir_path.mkdir()
[docs]def new(dir_path: Union[Path, str], proj_info: Dict[str, Any]) -> bool: """ Create a new project Parameters ---------- dir_path : Union[Path, str] The directory to create the project in proj_info : Dict[str, Any] Any project details Returns ------- bool True if the creator was successful """ if isinstance(dir_path, str): dir_path = Path(dir_path) metadata = ProjectMetadata(**proj_info) ProjectCreator(dir_path=dir_path, metadata=metadata).run() return True
[docs]class ProjectLoader(ResisticsProcess): """Project loader""" dir_path: Path
[docs] def run(self, config: Configuration) -> Project: """ Load a project Parameters ---------- config : Configuration The configuration for the purposes of getting the time readers Returns ------- Project Project instance Raises ------ ProjectLoadError If the resistcs project metadata is not found """ from resistics.errors import ProjectLoadError from resistics.common import assert_dir, dir_subdirs assert_dir(self.dir_path) metadata_path = self.dir_path / PROJ_FILE if not metadata_path.exists(): raise ProjectLoadError( self.dir_path, f"Resistics project file {metadata_path} not found" ) self._check_subdirs() metadata = ProjectMetadata.parse_file(metadata_path) time_subdirs = dir_subdirs(self.dir_path / PROJ_DIRS["time"]) sites = {} for site_dir in time_subdirs: site = self._load_site(site_dir, config) sites[site_dir.name] = site if len(sites) > 0: begin_time = min([x.begin_time for x in sites.values()]) end_time = max([x.end_time for x in sites.values()]) else: begin_time = datetime.now() end_time = datetime.now() return Project( dir_path=self.dir_path, metadata=metadata, begin_time=begin_time, end_time=end_time, sites=sites, )
def _check_subdirs(self) -> bool: """Returns True if all require project subdirectories exist otherwise False""" from resistics.common import assert_dir for subdir in PROJ_DIRS.values(): subdir_path = self.dir_path / subdir assert_dir(subdir_path) return False return True def _load_site(self, site_dir: Path, config: Configuration) -> Site: """Load a Site""" from resistics.common import dir_subdirs subdirs = dir_subdirs(site_dir) measurements = {} for meas_dir in subdirs: meas = self._load_measurement(site_dir.name, meas_dir, config) if meas is not None: measurements[meas_dir.name] = meas if len(measurements) > 0: begin_time = min([x.metadata.first_time for x in measurements.values()]) end_time = max([x.metadata.last_time for x in measurements.values()]) else: begin_time = datetime.now() end_time = datetime.now() return Site( dir_path=site_dir, begin_time=begin_time, end_time=end_time, measurements=measurements, ) def _load_measurement( self, site_name: str, meas_dir: Path, config: Configuration ) -> Union[Measurement, None]: """ Load a measurement The loader tries to use any TimeReader provided in the configuration to load the measurement. If no compatible reader is found, the measurement will be ignored. Parameters ---------- site_name : str The name of the Site meas_dir : Path The measurement subdirectory in the site time directory config : Configuration Configuration which is used for the time readers Returns ------- Union[Measurement, None] Measurement if reading was successful, else None """ for reader in config.time_readers: try: metadata = reader.run(dir_path=meas_dir, metadata_only=True) logger.info(f"Read measurement {meas_dir} with {reader.name}") return Measurement( site_name=site_name, dir_path=meas_dir, metadata=metadata, reader=reader, ) except Exception: logger.debug( f"Failed to read measurement {meas_dir} with {reader.name}" ) logger.error(f"No reader found for measurement {meas_dir}") return None
[docs]class ResisticsEnvironment(ResisticsModel): """ A Resistics environment which combines a project and a configuration """ proj: Project """The project""" config: Configuration """The configuration for processing"""
[docs]def load( dir_path: Union[Path, str], config: Optional[Configuration] = None ) -> ResisticsEnvironment: """ Load an existing project into a ResisticsEnvironment Parameters ---------- dir_path : Union[Path, str] The project directory config : Optional[Configuration], optional A configuration of parameters to use Returns ------- ResisticsEnvironment The ResisticsEnvironment combining a project and a configuration Raises ------ ProjectLoadError If the loading failed """ if config is None: config = get_default_configuration() if isinstance(dir_path, str): dir_path = Path(dir_path) proj = ProjectLoader(dir_path=dir_path).run(config) return ResisticsEnvironment(proj=proj, config=config)
[docs]def reload(resenv: ResisticsEnvironment) -> ResisticsEnvironment: """ Reload the project in the ResisticsEnvironment Parameters ---------- resenv : ResisticsEnvironment The current resistics environment Returns ------- ResisticsEnvironment The resistics environment with the project reloaded """ return load(dir_path=resenv.proj.dir_path, config=resenv.config)
[docs]def run_time_processors(config: Configuration, time_data: TimeData) -> TimeData: """ Process time data Parameters ---------- config : Configuration The configuration time_data : TimeData Time data to process Returns ------- TimeData Process time data """ for process in config.time_processors: logger.info(f"Running processor {process.name}") time_data = process.run(time_data) return time_data
[docs]def run_decimation( config: Configuration, time_data: TimeData, dec_params: Optional[DecimationParameters] = None, ) -> DecimatedData: """ Decimate TimeData Parameters ---------- config : Configuration The configuration time_data : TimeData Time data to decimate dec_params : DecimationParameters Number of levels, decimation factors etc. Returns ------- DecimatedData Decimated time data """ logger.info("Decimating time data") if dec_params is None: dec_params = config.dec_setup.run(time_data.metadata.fs) return config.decimator.run(dec_params, time_data)
[docs]def run_windowing( config: Configuration, ref_time: HighResDateTime, dec_data: DecimatedData ) -> WindowedData: """ Window time data Parameters ---------- config : Configuration The configuration ref_time : HighResDateTime The reference time dec_data : DecimatedData Decimated data to window Returns ------- WindowedData The windowed data """ logger.info("Windowing time data") win_params = config.win_setup.run(dec_data.metadata.n_levels, dec_data.metadata.fs) return config.windower.run(ref_time, win_params, dec_data)
[docs]def run_fft(config: Configuration, win_data: WindowedData) -> SpectraData: """ Run Fourier transform Parameters ---------- config : Configuration The configuration win_data : WindowedData Windowed data Returns ------- SpectraData Fourier transformed windowed data """ logger.info("Calculating spectra data") return config.fourier.run(win_data)
[docs]def run_spectra_processors( config: Configuration, spec_data: SpectraData ) -> SpectraData: """ Run any spectra processors Parameters ---------- config : Configuration The configuration spec_data : SpectraData Spectra data Returns ------- SpectraData Processed spectra data """ for process in config.spectra_processors: logger.info(f"Running processor {process.name}") spec_data = process.run(spec_data) return spec_data
[docs]def run_evals( config: Configuration, dec_params: DecimationParameters, spec_data: SpectraData ) -> SpectraData: """ Run evaluation frequency data calculator Parameters ---------- config : Configuration The configuration dec_params : DecimationParameters Decimation parameters with the evaluation frequencies spec_data : SpectraData The spectra data Returns ------- SpectraData Spectra data at evaluation frequencies """ logger.info("Calculating fourier coefficients at evaluation frequencies") return config.evals.run(dec_params, spec_data)
[docs]def run_sensor_calibration( config: Configuration, calibration_path: Path, spec_data: SpectraData ) -> SpectraData: """ Run calibration Parameters ---------- config : Configuration The configuration calibration_path : Path Path to calibration data spec_data : SpectraData Spectra data to calibrate Returns ------- SpectraData Calibrated spectra data """ logger.info("Calibrating time data") return config.sensor_calibrator.run(calibration_path, spec_data)
[docs]def run_regression_preparer( config: Configuration, gathered_data: GatheredData ) -> RegressionInputData: """ Prepare linear regression data Parameters ---------- config : Configuration The configuration gathered_data : GatheredData Gathered data to input into the regression Returns ------- RegressionInputData Regression inputs for all evaluation frequencies """ logger.info("Preparing regression input data") return config.regression_preparer.run(config.tf, gathered_data)
[docs]def run_solver(config: Configuration, reg_data: RegressionInputData) -> Solution: """ Run the regression solver Parameters ---------- config : Configuration The configuration reg_data : RegressionInputData The regression input data Returns ------- Solution Transfer function estimate """ logger.info(f"Running solver {config.solver.name}") return config.solver.run(reg_data)
[docs]def quick_read(dir_path: Path, config: Optional[Configuration] = None) -> TimeData: """ Read time data folder Parameters ---------- dir_path : Path The directory path to read config : Optional[Configuration], optional Configuration with appropriate readers, by default None. Returns ------- TimeData The read TimeData Raises ------ TimeDataReadError If unable to read data """ logger.info(f"Reading data in {dir_path}") if config is None: config = get_default_configuration() for reader in config.time_readers: logger.info(f"Attempting to read data with reader {reader.name}") try: return reader.run(dir_path) except MetadataReadError: logger.debug(f"Unable to read metadata with reader {reader.name}") except TimeDataReadError: logger.debug(f"Failed reading time data with reader {reader.name}") except Exception: logger.debug("Unknown problem reading time data") reader_names = [reader.name for reader in config.time_readers] raise TimeDataReadError( dir_path, f"Unable to read time data with readers {reader_names}" )
[docs]def quick_view( dir_path: Path, config: Optional[Configuration] = None, decimate: bool = False, max_pts: int = 10_000, ): """ Quick plotting of time data Parameters ---------- dir_path : Path The directory path config : Optional[Configuration], optional The configuration with the required time readers, by default None decimate : bool, optional Boolean flag for decimating, by default False max_pts : Optional[int], optional Max points in lttb decimation, by default 10_000 Returns ------- go.Figure Plotly figure Raises ------ ValueError If time data fails reading """ logger.info(f"Plotting time data in {dir_path}") if config is None: config = get_default_configuration() time_data = quick_read(dir_path, config) time_data = run_time_processors(config, time_data) if not decimate: return time_data.plot(max_pts=max_pts) dec_params = config.dec_setup.run(time_data.metadata.fs) dec_data = run_decimation(config, time_data, dec_params=dec_params) return dec_data.plot(max_pts=max_pts)
[docs]def quick_spectra( dir_path: Path, config: Optional[Configuration] = None, ) -> SpectraData: """ Quick plotting of time data Parameters ---------- dir_path : Path The directory path config : Optional[Configuration], optional The configuration with the required time readers, by default None Returns ------- SpectraData The spectra data Raises ------ ValueError If time data fails reading """ logger.info(f"Getting spectra for time data in {dir_path}") if config is None: config = get_default_configuration() time_data = quick_read(dir_path, config) ref_time = time_data.metadata.first_time time_data = run_time_processors(config, time_data) dec_params = config.dec_setup.run(time_data.metadata.fs) dec_data = run_decimation(config, time_data, dec_params=dec_params) win_data = run_windowing(config, ref_time, dec_data) return run_fft(config, win_data)
[docs]def quick_tf( dir_path: Path, config: Optional[Configuration] = None, calibration_path: Optional[Path] = None, ) -> Solution: """ Quickly calculate out a transfer function for time data in its own directory Parameters ---------- dir_path : Path The directory path config : Optional[Configuration], optional A configuration instance, by default None calibration_path : Optional[Path], optional The path to the calibration data, by default None Returns ------- Solution Transfer function estimate """ from resistics.gather import QuickGather logger.info(f"Processing data in {dir_path}") if config is None: config = get_default_configuration() time_data = quick_read(dir_path, config) ref_time = time_data.metadata.first_time time_data = run_time_processors(config, time_data) dec_params = config.dec_setup.run(time_data.metadata.fs) dec_data = run_decimation(config, time_data, dec_params=dec_params) win_data = run_windowing(config, ref_time, dec_data) spec_data = run_fft(config, win_data) spec_data = run_spectra_processors(config, spec_data) eval_data = run_evals(config, dec_params, spec_data) if calibration_path is not None: eval_data = run_sensor_calibration(config, calibration_path, eval_data) gathered_data = QuickGather().run(dir_path, dec_params, config.tf, eval_data) reg_data = run_regression_preparer(config, gathered_data) return run_solver(config, reg_data)
[docs]def process_time( resenv: ResisticsEnvironment, site_name: str, meas_name: str, out_site: str, out_meas: str, from_time: Optional[DateTimeLike] = None, to_time: Optional[DateTimeLike] = None, ) -> None: """ Process time data and save as a new measurement This is useful when resampling data to use with other measurements Parameters ---------- resenv : ResisticsEnvironment The resistics environment site_name : str The name of the site with the data to process meas_name : str The name of the measurement to process out_site : str The site to output the data to out_meas : str The name of the measurement to output the data to from_time : Optional[DateTimeLike], optional Time to get the time data from, by default None to_time : Optional[DateTimeLike], optional Time to get the time data up to, by default None """ from resistics.time import TimeWriterNumpy from resistics.project import get_meas_time_path logger.info(f"Running time processors on meas {meas_name} from site {site_name}") meas = resenv.proj[site_name][meas_name] time_data = meas.reader.run( meas.dir_path, metadata=meas.metadata, from_time=from_time, to_time=to_time ) time_data = run_time_processors(resenv.config, time_data) out_path = get_meas_time_path(resenv.proj.dir_path, out_site, out_meas) writer = TimeWriterNumpy() writer.run(out_path, time_data)
[docs]def process_time_to_evals( resenv: ResisticsEnvironment, site_name: str, meas_name: str ) -> None: """ Process from time data to Fourier spectra Parameters ---------- resenv : ResisticsEnvironment The resistics environment containing the project and configuration site_name : str The name of the site meas_name : str The name of the measurement to process """ from resistics.project import get_calibration_path, get_meas_evals_path from resistics.spectra import SpectraDataWriter proj = resenv.proj config = resenv.config calibration_path = get_calibration_path(proj.dir_path) logger.info(f"Processing measurement {site_name}, {meas_name}") meas = proj[site_name][meas_name] time_data = meas.reader.run(meas.dir_path, metadata=meas.metadata) time_data = run_time_processors(config, time_data) dec_params = config.dec_setup.run(time_data.metadata.fs) dec_data = run_decimation(config, time_data, dec_params=dec_params) win_data = run_windowing(config, proj.metadata.ref_time, dec_data) spec_data = run_fft(config, win_data) spec_data = run_spectra_processors(config, spec_data) eval_data = run_evals(config, dec_params, spec_data) eval_data = run_sensor_calibration(config, calibration_path, eval_data) evals_path = get_meas_evals_path( proj.dir_path, meas.site_name, meas.name, config.name ) logger.info(f"Saving evaluation frequency data to {evals_path}") SpectraDataWriter().run(evals_path, eval_data)
[docs]def process_evals_to_tf( resenv: ResisticsEnvironment, fs: float, out_site: str, in_site: Optional[str] = None, cross_site: Optional[str] = None, masks: Optional[Dict[str, str]] = None, postfix: Optional[str] = None, ) -> Solution: """ Process spectra to transfer functions Parameters ---------- resenv : ResisticsEnvironment The resistics environment fs : float The sampling frequency to process out_site : str The name of the output site in_site : Optional[str], optional The name of the input site, by default None. This should be used for intersite processing cross_site : Optional[str], optional The name of the cross site, by default None. This is usually the site to use as the remote reference. masks : Optional[Dict[str, str]], optional Any masks to apply, by default None postfix : Optional[str] String to add to the end of solution, by default None Returns ------- Solution Transfer function estimate """ from resistics.gather import Selector, ProjectGather from resistics.project import get_results_path, get_solution_name proj = resenv.proj config = resenv.config sites = [out_site] if in_site is not None: sites.append(in_site) if cross_site is not None: sites.append(cross_site) dec_params = config.dec_setup.run(fs) selection = Selector().run(config.name, proj, sites, dec_params) gathered_data = ProjectGather().run( config.name, proj, selection, config.tf, out_name=out_site, in_name=in_site, cross_name=cross_site, ) reg_data = run_regression_preparer(config, gathered_data) solution = run_solver(config, reg_data) solution_path = get_results_path(proj.dir_path, out_site, config.name) if not solution_path.exists(): solution_path.mkdir(parents=True) solution_name = get_solution_name( fs, solution.tf.name, solution.tf.variation, postfix ) solution.write(solution_path / solution_name) return solution
[docs]def get_solution( resenv: ResisticsEnvironment, site_name: str, config_name: str, fs: float, tf_name: str, tf_var: str, postfix: Optional[str] = None, ) -> Solution: """ Get a solution Parameters ---------- resenv : ResisticsEnvironment The resistics environment site_name : str The site for which to get the solution config_name : str The configuration that was used fs : float The sampling frequency tf_name : str The transfer function name tf_var : str The transfer function variation postfix : Optional[str], optional Any postfix on the solution, by default None Returns ------- Solution The solution """ from resistics.project import get_results_path, get_solution_name proj = resenv.proj solution_path = get_results_path(proj.dir_path, site_name, config_name) solution_name = get_solution_name(fs, tf_name, tf_var, postfix) return Solution.parse_file(solution_path / solution_name)