Module pyminflux.processor
Processor of MINFLUX data.
Functions
def combine_datasets_with_bead_alignment(reference_dataset: pyminflux.processor._dataset.MinFluxDataset,
moving_dataset: pyminflux.processor._dataset.MinFluxDataset,
bead_correspondence: Dict[str, str] = None,
transform_type: str = 'euclidean',
n_points: int = 3,
next_fluo_id: int | None = None) ‑> pyminflux.processor._dataset.MinFluxDataset | None-
Expand source code
def combine_datasets_with_bead_alignment( reference_dataset: MinFluxDataset, moving_dataset: MinFluxDataset, bead_correspondence: Dict[str, str] = None, transform_type: str = 'euclidean', n_points: int = 3, next_fluo_id: Optional[int] = None, ) -> Optional[MinFluxDataset]: """ Perform bead-based alignment and combine two datasets. The MBM (bead) data is extracted from the datasets themselves. Args: reference_dataset: Reference dataset (must contain mbm_data) moving_dataset: Moving dataset (must contain mbm_data) bead_correspondence: Mapping of reference bead names to moving bead names transform_type: Type of transformation ('euclidean', 'affine', etc.) n_points: Number of earliest points to use for bead position averaging next_fluo_id: Fluorophore ID to assign (auto-assigned if None) Returns: Combined dataset, or None if alignment fails """ # Extract MBM data from datasets if reference_dataset.mbm_data is None: print("Error: Reference dataset does not contain MBM (bead) data") return None if moving_dataset.mbm_data is None: print("Error: Moving dataset does not contain MBM (bead) data") return None ref_mbm_dict = reference_dataset.mbm_data.get('mbm', {}) mov_mbm_dict = moving_dataset.mbm_data.get('mbm', {}) if not ref_mbm_dict: print("Error: Reference dataset MBM dictionary is empty") return None if not mov_mbm_dict: print("Error: Moving dataset MBM dictionary is empty") return None # Perform alignment transform_model = align_datasets_using_beads( ref_mbm_dict, mov_mbm_dict, bead_correspondence=bead_correspondence, transform_type=transform_type, n_points=n_points, ) if transform_model is None: return None # Combine the datasets return combine_datasets_with_alignment( reference_dataset, moving_dataset, transform_model, next_fluo_id=next_fluo_id, )Perform bead-based alignment and combine two datasets.
The MBM (bead) data is extracted from the datasets themselves.
Args
reference_dataset- Reference dataset (must contain mbm_data)
moving_dataset- Moving dataset (must contain mbm_data)
bead_correspondence- Mapping of reference bead names to moving bead names
transform_type- Type of transformation ('euclidean', 'affine', etc.)
n_points- Number of earliest points to use for bead position averaging
next_fluo_id- Fluorophore ID to assign (auto-assigned if None)
Returns
Combined dataset, or None if alignment fails
def get_bead_positions_from_mbm(mbm_dict: Dict, n_points: int = 3) ‑> Dict[str, numpy.ndarray]-
Expand source code
def get_bead_positions_from_mbm(mbm_dict: Dict, n_points: int = 3) -> Dict[str, np.ndarray]: """ Extract average bead positions from MBM dictionary. Args: mbm_dict: MBM dictionary from MinFluxReaderV2 n_points: Number of earliest points to average Returns: Dictionary mapping bead names to positions [z, y, x] in nm """ df = mbm_dict_to_dataframe(mbm_dict) if df.empty: return {} # Filter to only include beads marked as "used" df = df[df['used'] == True] if df.empty: return {} bead_positions = {} for bead_name in df['bead_name'].unique(): bead_data = df[df['bead_name'] == bead_name] # Get earliest n_points earliest = bead_data.nsmallest(n_points, 'tim') # Calculate mean position as [z, y, x] pos = earliest[['z', 'y', 'x']].mean(axis=0).to_numpy() bead_positions[bead_name] = pos return bead_positionsExtract average bead positions from MBM dictionary.
Args
mbm_dict- MBM dictionary from MinFluxReaderV2
n_points- Number of earliest points to average
Returns
Dictionary mapping bead names to positions [z, y, x] in nm
def get_next_fluorophore_id(df: pandas.core.frame.DataFrame) ‑> int-
Expand source code
def get_next_fluorophore_id(df: pd.DataFrame) -> int: """ Get the next available fluorophore ID. Args: df: DataFrame with 'fluo' column Returns: Next available fluorophore ID """ if 'fluo' not in df.columns: return 1 current_ids = df['fluo'].unique() if len(current_ids) == 0: return 1 return int(np.max(current_ids)) + 1Get the next available fluorophore ID.
Args
df- DataFrame with 'fluo' column
Returns
Next available fluorophore ID
def load_zarr_for_beads(zarr_path: str) ‑> Tuple[pyminflux.reader._reader_v2.MinFluxReaderV2 | None, Dict | None]-
Expand source code
def load_zarr_for_beads(zarr_path: str) -> Tuple[Optional[MinFluxReaderV2], Optional[Dict]]: """ Load a Zarr file and extract MBM (bead) data. Args: zarr_path: Path to the Zarr file Returns: Tuple of (reader, mbm_dict) or (None, None) if loading fails """ try: zarr_path = Path(zarr_path) if not zarr_path.exists(): print(f"Error: Zarr path does not exist: {zarr_path}") return None, None # Load the reader reader = MinFluxReaderV2(zarr_path) # Check if MBM data is available if not hasattr(reader, 'mbm_data') or reader.mbm_data is None: print(f"Error: No MBM (bead) data found in {zarr_path}") return None, None mbm_dict = reader.mbm_data.get('mbm', {}) if not mbm_dict: print(f"Error: MBM dictionary is empty in {zarr_path}") return None, None return reader, mbm_dict except Exception as e: print(f"Error loading Zarr file {zarr_path}: {e}") return None, NoneLoad a Zarr file and extract MBM (bead) data.
Args
zarr_path- Path to the Zarr file
Returns
Tuple of (reader, mbm_dict) or (None, None) if loading fails
Classes
class MinFluxDataset (processed_dataframe: pandas.core.frame.DataFrame,
full_raw_dataframe: pandas.core.frame.DataFrame | None = None,
valid_raw_data_array: numpy.ndarray | None = None,
filename: pathlib.Path | str | None = None,
is_3d: bool = False,
is_tracking: bool = False,
is_aggregated: bool = False,
z_scaling_factor: float = 1.0,
unit_scaling_factor: float = 1000000000.0,
dwell_time: float = 1.0,
pool_dcr: bool = False,
version: int = 2,
mbm_data: dict | None = None,
tid_offsets: List[Tuple[int, int]] | None = None)-
Expand source code
class MinFluxDataset: """Container for MINFLUX data that can be processed by MinFluxProcessor. This class serves as a data container that holds the processed dataframe and associated metadata. It can be created from a MinFluxReader or by combining multiple datasets. """ __slots__ = [ "_processed_dataframe", "_full_raw_dataframe", "_valid_raw_data_array", "_filename", "_is_3d", "_is_tracking", "_is_aggregated", "_z_scaling_factor", "_unit_scaling_factor", "_dwell_time", "_pool_dcr", "_version", "_mbm_data", "_tid_offsets", ] def __init__( self, processed_dataframe: pd.DataFrame, full_raw_dataframe: Optional[pd.DataFrame] = None, valid_raw_data_array: Optional[np.ndarray] = None, filename: Optional[Union[Path, str]] = None, is_3d: bool = False, is_tracking: bool = False, is_aggregated: bool = False, z_scaling_factor: float = 1.0, unit_scaling_factor: float = 1e9, dwell_time: float = 1.0, pool_dcr: bool = False, version: int = 2, mbm_data: Optional[dict] = None, tid_offsets: Optional[List[Tuple[int, int]]] = None, ): """Constructor. Parameters ---------- processed_dataframe: pd.DataFrame The processed dataframe with MINFLUX data. full_raw_dataframe: Optional[pd.DataFrame] The full raw dataframe (for version 2 readers), or None. valid_raw_data_array: Optional[np.ndarray] The valid raw data array (for version 1 readers), or None. filename: Optional[Union[Path, str]] Original filename, if applicable. is_3d: bool Whether the acquisition is 3D. is_tracking: bool Whether the dataset is from a tracking experiment. is_aggregated: bool Whether the dataset contains aggregated measurements. z_scaling_factor: float Refractive index mismatch correction factor for z coordinates. unit_scaling_factor: float Unit scaling factor to convert raw coordinates to nm. dwell_time: float Dwell time in milliseconds. pool_dcr: bool Whether to pool DCR values. version: int Reader version (1 or 2). mbm_data: Optional[dict] Beamline monitoring (bead) data, if available. tid_offsets: Optional[List[Tuple[int, int]]] List of (first_iid, tid_offset) pairs applied when combining datasets. """ self._processed_dataframe = processed_dataframe self._full_raw_dataframe = full_raw_dataframe self._valid_raw_data_array = valid_raw_data_array self._filename = Path(filename) if filename else None self._is_3d = is_3d self._is_tracking = is_tracking self._is_aggregated = is_aggregated self._z_scaling_factor = z_scaling_factor self._unit_scaling_factor = unit_scaling_factor self._dwell_time = dwell_time self._pool_dcr = pool_dcr self._version = version self._mbm_data = mbm_data self._tid_offsets = list(tid_offsets) if tid_offsets else [] @classmethod def from_reader(cls, reader): """Create a MinFluxDataset from a MinFluxReader or MinFluxReaderV2. Parameters ---------- reader: Union[MinFluxReader, MinFluxReaderV2] The reader instance to extract data from. Returns ------- dataset: MinFluxDataset A new dataset instance. """ # Get the full raw dataframe if it exists (version 2) full_raw_dataframe = getattr(reader, "_full_raw_dataframe", None) mbm_data = getattr(reader, "_mbm_data", None) valid_raw_data_array = None if hasattr(reader, "valid_raw_data_array"): try: valid_raw_data_array = reader.valid_raw_data_array except Exception: valid_raw_data_array = None return cls( processed_dataframe=reader.processed_dataframe.copy(), full_raw_dataframe=full_raw_dataframe.copy() if full_raw_dataframe is not None else None, valid_raw_data_array=valid_raw_data_array.copy() if valid_raw_data_array is not None else None, filename=getattr(reader, "filename", None), is_3d=getattr(reader, "is_3d", False), is_tracking=getattr(reader, "is_tracking", False), is_aggregated=getattr(reader, "is_aggregated", False), z_scaling_factor=getattr(reader, "z_scaling_factor", 1.0), unit_scaling_factor=getattr(reader, "_unit_scaling_factor", 1e9), dwell_time=getattr(reader, "dwell_time", 1.0), pool_dcr=getattr(reader, "is_pool_dcr", False), version=getattr(reader, "version", 2), mbm_data=mbm_data, tid_offsets=getattr(reader, "tid_offsets", None), ) @property def processed_dataframe(self) -> pd.DataFrame: """Return the processed dataframe.""" return self._processed_dataframe @processed_dataframe.setter def processed_dataframe(self, value: pd.DataFrame): """Set the processed dataframe.""" self._processed_dataframe = value @property def full_raw_dataframe(self) -> Optional[pd.DataFrame]: """Return the full raw dataframe (version 2 only).""" return self._full_raw_dataframe @full_raw_dataframe.setter def full_raw_dataframe(self, value: Optional[pd.DataFrame]): """Set the full raw dataframe.""" self._full_raw_dataframe = value @property def filename(self) -> Optional[Path]: """Return the filename.""" return self._filename @property def is_3d(self) -> bool: """Return True if the acquisition is 3D.""" return self._is_3d @property def is_tracking(self) -> bool: """Return True if the dataset is from a tracking experiment.""" return self._is_tracking @property def is_aggregated(self) -> bool: """Return True if the dataset contains aggregated measurements.""" return self._is_aggregated @property def z_scaling_factor(self) -> float: """Return the z scaling factor.""" return self._z_scaling_factor @property def unit_scaling_factor(self) -> float: """Return the unit scaling factor for raw coordinates.""" return self._unit_scaling_factor @property def dwell_time(self) -> float: """Return the dwell time.""" return self._dwell_time @property def is_pool_dcr(self) -> bool: """Return True if DCR values are pooled.""" return self._pool_dcr @property def version(self) -> int: """Return the reader version.""" return self._version @property def mbm_data(self) -> Optional[dict]: """Return the beamline monitoring data.""" return self._mbm_data @property def tid_offsets(self) -> List[Tuple[int, int]]: """Return list of (first_iid, tid_offset) pairs applied when combining datasets.""" return list(self._tid_offsets) @property def valid_full_raw_dataframe(self) -> Optional[pd.DataFrame]: """Return the valid raw dataframe (for compatibility with version 2 readers).""" if self._full_raw_dataframe is None: return None # Assume all entries in the dataset are valid return self._full_raw_dataframe.copy() @property def valid_raw_data_array(self): """Return the valid raw data array (for compatibility with version 1 readers). Note: Version 1 functionality is not fully supported in datasets. This property exists for backwards compatibility but will return None for datasets created from version 2 readers. """ if self._valid_raw_data_array is None: return None return self._valid_raw_data_array.copy() def copy(self): """Create a deep copy of the dataset.""" return MinFluxDataset( processed_dataframe=self._processed_dataframe.copy(), full_raw_dataframe=self._full_raw_dataframe.copy() if self._full_raw_dataframe is not None else None, valid_raw_data_array=self._valid_raw_data_array.copy() if self._valid_raw_data_array is not None else None, filename=self._filename, is_3d=self._is_3d, is_tracking=self._is_tracking, is_aggregated=self._is_aggregated, z_scaling_factor=self._z_scaling_factor, unit_scaling_factor=self._unit_scaling_factor, dwell_time=self._dwell_time, pool_dcr=self._pool_dcr, version=self._version, mbm_data=self._mbm_data, tid_offsets=self._tid_offsets.copy(), )Container for MINFLUX data that can be processed by MinFluxProcessor.
This class serves as a data container that holds the processed dataframe and associated metadata. It can be created from a MinFluxReader or by combining multiple datasets.
Constructor.
Parameters
processed_dataframe:pd.DataFrame- The processed dataframe with MINFLUX data.
full_raw_dataframe:Optional[pd.DataFrame]- The full raw dataframe (for version 2 readers), or None.
valid_raw_data_array:Optional[np.ndarray]- The valid raw data array (for version 1 readers), or None.
filename:Optional[Union[Path, str]]- Original filename, if applicable.
is_3d:bool- Whether the acquisition is 3D.
is_tracking:bool- Whether the dataset is from a tracking experiment.
is_aggregated:bool- Whether the dataset contains aggregated measurements.
z_scaling_factor:float- Refractive index mismatch correction factor for z coordinates.
unit_scaling_factor:float- Unit scaling factor to convert raw coordinates to nm.
dwell_time:float- Dwell time in milliseconds.
pool_dcr:bool- Whether to pool DCR values.
version:int- Reader version (1 or 2).
mbm_data:Optional[dict]- Beamline monitoring (bead) data, if available.
tid_offsets:Optional[List[Tuple[int, int]]]- List of (first_iid, tid_offset) pairs applied when combining datasets.
Static methods
def from_reader(reader)-
Create a MinFluxDataset from a MinFluxReader or MinFluxReaderV2.
Parameters
reader:Union[MinFluxReader, MinFluxReaderV2]- The reader instance to extract data from.
Returns
dataset:MinFluxDataset- A new dataset instance.
Instance variables
prop dwell_time : float-
Expand source code
@property def dwell_time(self) -> float: """Return the dwell time.""" return self._dwell_timeReturn the dwell time.
prop filename : pathlib.Path | None-
Expand source code
@property def filename(self) -> Optional[Path]: """Return the filename.""" return self._filenameReturn the filename.
prop full_raw_dataframe : pandas.core.frame.DataFrame | None-
Expand source code
@property def full_raw_dataframe(self) -> Optional[pd.DataFrame]: """Return the full raw dataframe (version 2 only).""" return self._full_raw_dataframeReturn the full raw dataframe (version 2 only).
prop is_3d : bool-
Expand source code
@property def is_3d(self) -> bool: """Return True if the acquisition is 3D.""" return self._is_3dReturn True if the acquisition is 3D.
prop is_aggregated : bool-
Expand source code
@property def is_aggregated(self) -> bool: """Return True if the dataset contains aggregated measurements.""" return self._is_aggregatedReturn True if the dataset contains aggregated measurements.
prop is_pool_dcr : bool-
Expand source code
@property def is_pool_dcr(self) -> bool: """Return True if DCR values are pooled.""" return self._pool_dcrReturn True if DCR values are pooled.
prop is_tracking : bool-
Expand source code
@property def is_tracking(self) -> bool: """Return True if the dataset is from a tracking experiment.""" return self._is_trackingReturn True if the dataset is from a tracking experiment.
prop mbm_data : dict | None-
Expand source code
@property def mbm_data(self) -> Optional[dict]: """Return the beamline monitoring data.""" return self._mbm_dataReturn the beamline monitoring data.
prop processed_dataframe : pandas.core.frame.DataFrame-
Expand source code
@property def processed_dataframe(self) -> pd.DataFrame: """Return the processed dataframe.""" return self._processed_dataframeReturn the processed dataframe.
prop tid_offsets : List[Tuple[int, int]]-
Expand source code
@property def tid_offsets(self) -> List[Tuple[int, int]]: """Return list of (first_iid, tid_offset) pairs applied when combining datasets.""" return list(self._tid_offsets)Return list of (first_iid, tid_offset) pairs applied when combining datasets.
prop unit_scaling_factor : float-
Expand source code
@property def unit_scaling_factor(self) -> float: """Return the unit scaling factor for raw coordinates.""" return self._unit_scaling_factorReturn the unit scaling factor for raw coordinates.
prop valid_full_raw_dataframe : pandas.core.frame.DataFrame | None-
Expand source code
@property def valid_full_raw_dataframe(self) -> Optional[pd.DataFrame]: """Return the valid raw dataframe (for compatibility with version 2 readers).""" if self._full_raw_dataframe is None: return None # Assume all entries in the dataset are valid return self._full_raw_dataframe.copy()Return the valid raw dataframe (for compatibility with version 2 readers).
prop valid_raw_data_array-
Expand source code
@property def valid_raw_data_array(self): """Return the valid raw data array (for compatibility with version 1 readers). Note: Version 1 functionality is not fully supported in datasets. This property exists for backwards compatibility but will return None for datasets created from version 2 readers. """ if self._valid_raw_data_array is None: return None return self._valid_raw_data_array.copy()Return the valid raw data array (for compatibility with version 1 readers).
Note: Version 1 functionality is not fully supported in datasets. This property exists for backwards compatibility but will return None for datasets created from version 2 readers.
prop version : int-
Expand source code
@property def version(self) -> int: """Return the reader version.""" return self._versionReturn the reader version.
prop z_scaling_factor : float-
Expand source code
@property def z_scaling_factor(self) -> float: """Return the z scaling factor.""" return self._z_scaling_factorReturn the z scaling factor.
Methods
def copy(self)-
Expand source code
def copy(self): """Create a deep copy of the dataset.""" return MinFluxDataset( processed_dataframe=self._processed_dataframe.copy(), full_raw_dataframe=self._full_raw_dataframe.copy() if self._full_raw_dataframe is not None else None, valid_raw_data_array=self._valid_raw_data_array.copy() if self._valid_raw_data_array is not None else None, filename=self._filename, is_3d=self._is_3d, is_tracking=self._is_tracking, is_aggregated=self._is_aggregated, z_scaling_factor=self._z_scaling_factor, unit_scaling_factor=self._unit_scaling_factor, dwell_time=self._dwell_time, pool_dcr=self._pool_dcr, version=self._version, mbm_data=self._mbm_data, tid_offsets=self._tid_offsets.copy(), )Create a deep copy of the dataset.
class MinFluxProcessor (source: pyminflux.reader._reader.MinFluxReader | pyminflux.reader._reader_v2.MinFluxReaderV2 | pyminflux.processor._dataset.MinFluxDataset,
min_trace_length: int = 1)-
Expand source code
class MinFluxProcessor: """Processor of MINFLUX data.""" __doc__ = """Allows for filtering and selecting data read by the underlying `MinFluxReader`. Please notice that `MinFluxProcessor` makes use of `State.min_trace_length` to make sure that at load and after every filtering step, short traces are dropped.""" __slots__ = [ "state", "dataset", "_current_fluorophore_id", "_filtered_stats_dataframe", "_fluorophore_names", "_min_trace_length", "_selected_rows_dict", "_stats_to_be_recomputed", "_weighted_localizations", "_weighted_localizations_to_be_recomputed", "_use_weighted_localizations", "_has_dynamic_filters", ] def __init__( self, source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset], min_trace_length: int = 1, ): """Constructor. Parameters ---------- source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset] Either a MinFluxReader object or a MinFluxDataset. min_trace_length: int (Default = 1) Minimum number of localizations for a trace to be kept. Shorter traces are dropped. """ # Convert reader-like sources to dataset if necessary (duck-typed) if isinstance(source, MinFluxDataset): self.dataset = source elif isinstance(source, (MinFluxReader, MinFluxReaderV2)) or hasattr( source, "processed_dataframe" ): self.dataset = MinFluxDataset.from_reader(source) else: raise TypeError( f"source must be MinFluxReader, MinFluxReaderV2, MinFluxDataset, or reader-like; got {type(source)}" ) # Global options (to be applied after every operation) self._min_trace_length: int = min_trace_length # Cache the filtered stats dataframe self._filtered_stats_dataframe = None # Keep separate arrays of booleans to cache selection state for all # fluorophore IDs. self._selected_rows_dict = None self._init_selected_rows_dict() # Keep track of the selected fluorophore # 0 - All (default) # 1 - Fluorophore 1 # 2 - Fluorophore 2 self._current_fluorophore_id = 0 # Cache the weighted, averaged TID positions self._weighted_localizations = None # Keep track whether the statistics and the weighted localizations need to be recomputed self._stats_to_be_recomputed = False self._weighted_localizations_to_be_recomputed = False # Whether to use weighted average for localizations self._use_weighted_localizations = False # Track whether dynamic (user-applied) filters have been applied self._has_dynamic_filters = False # Initialize fluorophore names mapping (fluo_id -> name) self._fluorophore_names = {} self._init_fluorophore_names() # Apply the global filters self._apply_global_filters() def _init_selected_rows_dict(self): """Initialize the selected rows array.""" if self.processed_dataframe is None: return # Create selection mask keep_mask = pd.Series( data=np.ones(len(self.processed_dataframe.index), dtype=bool), index=self.processed_dataframe.index, ) # Store the mask for each fluorophore present in the data unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) unique_fluos = unique_fluos[unique_fluos > 0] self._selected_rows_dict = {} for fluo_id in unique_fluos: self._selected_rows_dict[int(fluo_id)] = keep_mask.copy() def has_filters_applied(self) -> bool: """Check if any dynamic (user-applied) filters have been applied to the data. This does not count global filters like min_trace_length which are always applied. Returns ------- has_filters: bool True if any dynamic filters are active, False otherwise. """ return self._has_dynamic_filters def _init_fluorophore_names(self): """Initialize fluorophore names with default values (string representation of fluo ID).""" if self.processed_dataframe is None: return unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) unique_fluos = unique_fluos[unique_fluos > 0] for fluo_id in unique_fluos: fluo_id_int = int(fluo_id) if fluo_id_int not in self._fluorophore_names: self._fluorophore_names[fluo_id_int] = str(fluo_id_int) @property def reader(self): """Return a reader-like interface to the dataset for backwards compatibility. This property provides access to the underlying dataset using the reader interface that existing code expects. """ return self.dataset @property def is_tracking(self): """Minimum number of localizations for the trace to be kept.""" return self.reader.is_tracking @property def min_trace_length(self): """Minimum number of localizations for the trace to be kept.""" return self._min_trace_length @property def z_scaling_factor(self): """Returns the scaling factor for the z coordinates from the underlying MinFluxReader.""" return self.reader.z_scaling_factor @min_trace_length.setter def min_trace_length(self, value): if value < 1 or int(value) != value: raise ValueError( "MinFluxProcessor.min_trace_length must be a positive integer!" ) self._min_trace_length = value # Run the global filters self._apply_global_filters() @property def is_3d(self) -> bool: """Return True if the acquisition is 3D. Returns ------- is_3d: bool True if the acquisition is 3D, False otherwise. """ return self.reader.is_3d @property def num_values(self) -> int: """Return the number of values in the (filtered) dataframe. Returns ------- n: int Number of values in the dataframe after all filters have been applied. """ if self.filtered_dataframe is not None: return len(self.filtered_dataframe.index) return 0 @property def current_fluorophore_id(self) -> int: """Return current fluorophore ID (0 for all).""" return self._current_fluorophore_id @current_fluorophore_id.setter def current_fluorophore_id(self, fluorophore_id: int) -> None: """Set current fluorophore ID (0 for all).""" # Validate fluorophore ID: 0 for all, or 1 to num_fluorophores if fluorophore_id < 0: raise ValueError(f"Fluorophore ID must be non-negative, got {fluorophore_id}.") if fluorophore_id > 0 and fluorophore_id > self.num_fluorophores: raise ValueError( f"Fluorophore ID {fluorophore_id} is out of range. " f"Valid range is 0 (all) or 1-{self.num_fluorophores}." ) # Set the new fluorophore_id self._current_fluorophore_id = fluorophore_id # Apply the global filters self._apply_global_filters() # Flag stats to be recomputed self._stats_to_be_recomputed = True @property def num_fluorophores(self) -> int: """Return the number of fluorophores.""" if self.processed_dataframe is None: return 0 unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) return len(unique_fluos[unique_fluos > 0]) @property def fluorophore_names(self) -> dict: """Return the fluorophore names mapping (fluo_id -> name).""" return self._fluorophore_names.copy() def set_fluorophore_name(self, fluo_id: int, name: str): """Set the name for a specific fluorophore ID. Parameters ---------- fluo_id: int The fluorophore ID (must be >= 1) name: str The name to assign to this fluorophore """ if fluo_id < 1: raise ValueError(f"Fluorophore ID must be >= 1, got {fluo_id}") if not isinstance(name, str): raise ValueError(f"Fluorophore name must be a string, got {type(name)}") self._fluorophore_names[fluo_id] = name def set_fluorophore_names(self, names: dict): """Set fluorophore names from a dictionary mapping. This method updates the fluorophore names dictionary, preserving any existing names for fluorophore IDs not included in the names parameter. Parameters ---------- names: dict Dictionary mapping fluo_id (int) to name (str) """ if not isinstance(names, dict): raise ValueError(f"Names must be a dictionary, got {type(names)}") for fluo_id, name in names.items(): if not isinstance(fluo_id, int) or fluo_id < 1: raise ValueError(f"Fluorophore ID must be an integer >= 1, got {fluo_id}") if not isinstance(name, str): raise ValueError(f"Fluorophore name must be a string, got {type(name)}") # Update the dictionary instead of replacing it to preserve existing names self._fluorophore_names.update(names) def get_fluorophore_name(self, fluo_id: int) -> str: """Get the name for a specific fluorophore ID. Parameters ---------- fluo_id: int The fluorophore ID Returns ------- name: str The name of the fluorophore (defaults to string representation of ID if not set) """ return self._fluorophore_names.get(fluo_id, str(fluo_id)) def _filtered_raw_data_array_all_fluorophores(self): """Return the raw NumPy array with applied filters (for all fluorophores). This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods). """ # Copy the raw NumPy array raw_array = self.reader.valid_raw_data_array if raw_array is None: return None if self.processed_dataframe is None or self._selected_rows_dict is None: return None # Append the fluorophore ID data raw_array["fluo"] = self.processed_dataframe["fluo"].astype(np.uint8) # Extract combination of all fluorophore filtered dataframes combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool) for fluo_id, fluo_mask in self._selected_rows_dict.items(): combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask return raw_array[combined_mask] @property def filtered_raw_data_array(self): """Return the raw NumPy array with applied filters for the selected fluorophores. This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods). """ # Copy the raw NumPy array full_array = self._filtered_raw_data_array_all_fluorophores() if full_array is None: return None if self.current_fluorophore_id == 0: return full_array else: # Filter by the current fluorophore ID return full_array[full_array["fluo"] == self.current_fluorophore_id] @property def processed_dataframe(self) -> Union[None, pd.DataFrame]: """Return the full dataframe (with valid entries only), with no selections or filters. Returns ------- processed_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ return self.dataset.processed_dataframe @property def filename(self) -> Union[Path, None]: """Return the filename if set.""" return self.dataset.filename def replace_dataset(self, new_dataset: MinFluxDataset): """Replace the current dataset with a new one. This is useful when combining datasets - the processor's dataset can be replaced with the combined dataset without losing filter state or other settings. Parameters ---------- new_dataset: MinFluxDataset The new dataset to use. """ # Store current settings old_min_trace_length = self._min_trace_length old_use_weighted = self._use_weighted_localizations old_fluorophore_names = self._fluorophore_names.copy() # Replace the dataset self.dataset = new_dataset # Re-initialize selection and filters self._selected_rows_dict = None self._init_selected_rows_dict() # Re-initialize fluorophore names (preserving custom names where IDs match) self._fluorophore_names = {} self._init_fluorophore_names() # Restore custom names for matching fluorophore IDs for fluo_id, name in old_fluorophore_names.items(): if fluo_id in self._fluorophore_names: self._fluorophore_names[fluo_id] = name # Restore settings self._min_trace_length = old_min_trace_length self._use_weighted_localizations = old_use_weighted # Reset current fluorophore to "all" self._current_fluorophore_id = 0 # Reset dynamic filters flag (dataset replacement clears all filters) self._has_dynamic_filters = False # Apply global filters self._apply_global_filters() # Flag derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _filtered_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]: """Return joint dataframe for all fluorophores and with all filters applied. Returns ------- filtered_dataframe_all: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ if self.processed_dataframe is None or self._selected_rows_dict is None: return None # Extract combination of all fluorophore filtered dataframes combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool) for fluo_id, fluo_mask in self._selected_rows_dict.items(): combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask return self.processed_dataframe.loc[combined_mask] def _filtered_raw_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]: """Return joint raw dataframe (all properties) for all fluorophores and with all filters applied. Returns ------- filtered_dataframe_all: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ # This is only compatible with MinFluxReader version 2 if self.reader.version != 2: raise ValueError("Only reader version 2 is supported.") # valid_full_raw_dataframe is a MinFluxReaderV2 property raw_array = self.reader.valid_full_raw_dataframe # Now extract the fluorophore assignments from self.processed_dataframe and # expand them onto the raw array if "fluo" in raw_array.columns: raw_fluo = raw_array["fluo"].fillna(0).astype(np.uint8).to_numpy(copy=True) else: raw_fluo = np.zeros(len(raw_array), dtype=np.uint8) if "iid" in raw_array.columns and "iid" in self.processed_dataframe.columns: fluo_map = dict( zip( self.processed_dataframe["iid"], self.processed_dataframe["fluo"].astype(np.uint8), ) ) mapped_fluo = raw_array["iid"].map(fluo_map) matched_mask = mapped_fluo.notna().to_numpy() if matched_mask.any(): raw_fluo[matched_mask] = mapped_fluo.loc[matched_mask].astype(np.uint8).to_numpy() raw_array["fluo"] = raw_fluo.astype(np.uint8) # Return the array with the assigned fluorophores return raw_array @property def filtered_dataframe(self) -> Union[None, pd.DataFrame]: """Return dataframe with all filters applied. Returns ------- filtered_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ if self.processed_dataframe is None: return None if self.current_fluorophore_id == 0: return self._filtered_dataframe_all_fluorophores() else: # Use .loc to filter the dataframe in a single step filtered_df = self.processed_dataframe.loc[ self.processed_dataframe["fluo"] == self.current_fluorophore_id ] if self._selected_rows_dict is None: return None selected_indices = self._selected_rows_dict.get( self.current_fluorophore_id, [] ) return filtered_df.loc[selected_indices] @property def filtered_raw_dataframe(self) -> Union[None, pd.DataFrame]: """Return joint dataframe with all filters applied. Returns ------- filtered_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ # This is only compatible with MinFluxReader version 2 if self.reader.version != 2: raise ValueError("Only reader version 2 is supported.") # Copy the raw dataframe array full_dataframe = self._filtered_raw_dataframe_all_fluorophores() if full_dataframe is None: return None # Get the IIDs from the currently filtered localizations iids = self.filtered_dataframe["iid"].to_numpy() # Extract all rows matching the current set of iids full_dataframe = full_dataframe[full_dataframe["iid"].isin(iids)] # If needed, filter by fluorophore ID if self.current_fluorophore_id == 0: return full_dataframe else: # Filter by the current fluorophore ID return full_dataframe[full_dataframe["fluo"] == self.current_fluorophore_id] @property def filtered_dataframe_stats(self) -> Union[None, pd.DataFrame]: """Return dataframe stats with all filters applied. Returns ------- filtered_dataframe_stats: Union[None, pd.DataFrame] A Pandas dataframe with all data statistics or None if no file was loaded. """ if self._stats_to_be_recomputed: self._calculate_statistics() return self._filtered_stats_dataframe @property def weighted_localizations(self) -> Union[None, pd.DataFrame]: """Return the average (x, y, z) position per TID weighted by the relative photon count.""" if self._weighted_localizations_to_be_recomputed: self._calculate_weighted_positions() return self._weighted_localizations @property def use_weighted_localizations(self) -> bool: """Whether to use weighted average to calculate the mean localization per TID.""" return self._use_weighted_localizations @use_weighted_localizations.setter def use_weighted_localizations(self, value: bool): """Whether to use weighted average to calculate the mean localization per TID.""" self._use_weighted_localizations = value self._weighted_localizations_to_be_recomputed = True @classmethod def processed_properties(cls): """Return the processed dataframe columns.""" return MinFluxReader.processed_properties() @classmethod def trace_stats_properties(cls): """Return the columns of the filtered_dataframe_stats.""" return [ "tid", "n", "fluo", "mx", "my", "mz", "sx", "sy", "sxy", "exy", "rms_xy", "sz", "ez", "mtim", "tim_tot", ] @classmethod def trace_stats_with_tracking_properties(cls): """Return the columns of the filtered_dataframe_stats with tracking columns.""" return MinFluxProcessor.trace_stats_properties() + [ "avg_speed", "total_dist", ] def reset(self): """Drops all dynamic filters and resets the data to the processed data frame with global filters.""" # Clear the selection per fluorophore; they will be reinitialized as # all selected at the first access. self._init_selected_rows_dict() # Reset the mapping to the corresponding fluorophore self.processed_dataframe["fluo"] = 1 # Reset fluorophore names (clear custom names first) self._fluorophore_names = {} self._init_fluorophore_names() # Default fluorophore is 0 (no selection) self.current_fluorophore_id = 0 # Reset dynamic filters flag self._has_dynamic_filters = False # Apply global filters self._apply_global_filters() def set_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]): """Assign the fluorophore IDs to current filtered dataset. This method assigns new fluorophore IDs only to the rows in the current filtered dataset. All other rows remain unchanged. """ if self.filtered_dataframe is None: return if len(fluorophore_ids) != len(self.filtered_dataframe.index): raise ValueError( "The number of fluorophore IDs does not match the number of entries in the dataframe." ) # Get the actual indices from the filtered dataframe filtered_indices = self.filtered_dataframe.index # Assign the new fluorophore IDs to those specific indices only # All other rows remain unchanged (preserving other fluorophores) self.processed_dataframe.loc[filtered_indices, "fluo"] = fluorophore_ids.astype(np.uint8) # Apply global filters self._init_selected_rows_dict() self._init_fluorophore_names() self._apply_global_filters() def set_full_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]): """Assign the fluorophore IDs to the original, full dataframe ignoring current filters.""" if self.processed_dataframe is None: return if len(fluorophore_ids) != len(self.processed_dataframe.index): raise ValueError( "The number of fluorophore IDs does not match the number of entries in the dataframe." ) self.processed_dataframe["fluo"] = fluorophore_ids.astype(np.uint8) # Apply global filters self._init_selected_rows_dict() self._init_fluorophore_names() self._apply_global_filters() def select_by_rows( self, indices: np.ndarray, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame row indices. The underlying dataframe is not modified. Parameters ---------- indices: np.ndarray Logical array for selecting the elements to be returned. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded. """ if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.iloc[indices] else: if self.filtered_dataframe is None: return None return self.filtered_dataframe.iloc[indices] def select_by_series_iloc( self, iloc: np.ndarray, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame index locations. The underlying dataframe is not modified. Parameters ---------- iloc: np.ndarray Array of Series index locations for selecting rows. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded. """ if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[iloc] else: if self.filtered_dataframe is None: return None return self.filtered_dataframe.loc[iloc] def select_by_1d_range( self, x_prop, x_range, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameter and corresponding range. The underlying dataframe is not modified. Parameters ---------- x_prop: str Property to be filtered by corresponding x_range. x_range: tuple Tuple containing the minimum and maximum values for the selected property. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded. """ # Make sure that the range is increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[ (self._weighted_localizations[x_prop] >= x_min) & (self._weighted_localizations[x_prop] < x_max) ] else: # Work with currently selected rows if self.filtered_dataframe is None: return None df = self.filtered_dataframe return df.loc[(df[x_prop] >= x_min) & (df[x_prop] < x_max)] def select_by_2d_range( self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameters and corresponding ranges. The underlying dataframe is not modified. Parameters ---------- x_prop: str First property to be filtered by corresponding x_range. y_prop: str Second property to be filtered by corresponding y_range. x_range: tuple Tuple containing the minimum and maximum values for the first property. y_range: tuple Tuple containing the minimum and maximum values for the second property. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file was loaded. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max y_min = y_range[0] y_max = y_range[1] if y_max < y_min: y_max, y_min = y_min, y_max if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[ (self._weighted_localizations[x_prop] >= x_min) & (self._weighted_localizations[x_prop] < x_max) & (self._weighted_localizations[y_prop] >= y_min) & (self._weighted_localizations[y_prop] < y_max) ] else: # Work with currently selected rows if self.filtered_dataframe is None: return None df = self.filtered_dataframe return df.loc[ (df[x_prop] >= x_min) & (df[x_prop] < x_max) & (df[y_prop] >= y_min) & (df[y_prop] < y_max) ] def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range): """Filter dataset by the extracting a rectangular ROI over two parameters and two ranges. Parameters ---------- x_prop: str First property to be filtered by corresponding x_range. y_prop: str Second property to be filtered by corresponding y_range. x_range: tuple Tuple containing the minimum and maximum values for the first property. y_range: tuple Tuple containing the minimum and maximum values for the second property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max y_min = y_range[0] y_max = y_range[1] if y_max < y_min: y_max, y_min = y_min, y_max if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) & (self.filtered_dataframe[y_prop] >= y_min) & (self.filtered_dataframe[y_prop] < y_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) & (self.filtered_dataframe[y_prop] >= y_min) & (self.filtered_dataframe[y_prop] < y_max) ) # Make sure to always apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _apply_global_filters(self): """Apply filters that are defined in the global application configuration.""" if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._filter_by_tid_length(fluo_id) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore self._selected_rows_dict[self.current_fluorophore_id] = self._filter_by_tid_length( self.current_fluorophore_id ) # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _filter_by_tid_length(self, index): # Make sure to count only currently selected rows df = self.processed_dataframe.copy() df.loc[np.invert(self._selected_rows_dict[index]), "tid"] = np.nan # Select all rows where the count of TIDs is larger than self._min_trace_num counts = df["tid"].value_counts(normalize=False) return df["tid"].isin(counts[counts >= self.min_trace_length].index) def filter_by_single_threshold( self, prop: str, threshold: Union[int, float], larger_than: bool = True ): """Apply single threshold to filter values either lower or higher (equal) than threshold for given property.""" # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): if larger_than: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] >= threshold ) else: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] < threshold ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id if larger_than: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] >= threshold ) else: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] < threshold ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_range(self, x_prop: str, x_range: tuple): """Apply min and max thresholding to the given property. Parameters ---------- x_prop: str Name of the property (dataframe column) to filter. x_range: tuple Tuple containing the minimum and maximum values for the selected property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_range_complement(self, prop: str, x_range: tuple): """Apply min and max thresholding to the given property but keep the data outside the range (i.e., crop the selected range). Parameters ---------- prop: str Name of the property (dataframe column) to filter. x_range: tuple Tuple containing the minimum and maximum values for cropping the selected property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( (self.filtered_dataframe[prop] < x_min) | (self.filtered_dataframe[prop] >= x_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( (self.filtered_dataframe[prop] < x_min) | (self.filtered_dataframe[prop] >= x_max) ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple): """Filter TIDs by min and max thresholding using the given property from the stats dataframe. Parameters ---------- x_prop_stats: str Name of the property (column) from the stats dataframe used to filter. x_range: tuple Tuple containing the minimum and maximum values for the selected property. """ # Make sure the property exists in the stats dataframe if x_prop_stats not in self.filtered_dataframe_stats.columns: raise ValueError( f"The property {x_prop_stats} does not exist in `filtered_dataframe_stats`." ) # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Find all TIDs for current fluorophore ID by which the requested stats property is inside the range tids_to_keep = self.filtered_dataframe_stats[ ( (self.filtered_dataframe_stats[x_prop_stats] >= x_min) & (self.filtered_dataframe_stats[x_prop_stats] <= x_max) ) ]["tid"].to_numpy() # Rows of the filtered dataframe to keep rows_to_keep = self.filtered_dataframe["tid"].isin(tids_to_keep) # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & rows_to_keep elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore self._selected_rows_dict[self.current_fluorophore_id] = ( self._selected_rows_dict[self.current_fluorophore_id] & rows_to_keep ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _calculate_statistics(self): """Calculate per-trace statistics.""" # Make sure we have processed dataframe to work on if self.processed_dataframe is None: return # Only recompute statistics if needed if not self._stats_to_be_recomputed: return # Work with currently selected rows df = self.filtered_dataframe # Calculate the statistics df_tid = self.calculate_statistics_on(df, self.reader.is_tracking) # Store the results self._filtered_stats_dataframe = df_tid # Flag the statistics to be computed self._stats_to_be_recomputed = False @staticmethod def calculate_statistics_on( df: pd.DataFrame, is_tracking: bool = False ) -> pd.DataFrame: """Calculate per-trace statistics for the selected dataframe. Parameters ---------- df: pd.DataFrame DataFrame (view) generated by one of the `select_by_*` methods. is_tracking: bool Whether the data comes from a tracking instead of a localization experiment. Returns ------- df_stats: pd.DataFrame Per-trace statistics calculated on the passed DataFrame selection (view). """ # Prepare a dataframe with the statistics if is_tracking: df_tid = pd.DataFrame( columns=MinFluxProcessor.trace_stats_with_tracking_properties() ) else: df_tid = pd.DataFrame(columns=MinFluxProcessor.trace_stats_properties()) # Calculate some statistics per TID on the passed dataframe df_grouped = df.groupby("tid") # Base statistics tid = df_grouped["tid"].first().to_numpy() n = df_grouped["tid"].count().to_numpy() mx = df_grouped["x"].mean().to_numpy() my = df_grouped["y"].mean().to_numpy() mz = df_grouped["z"].mean().to_numpy() sx = df_grouped["x"].std().to_numpy() sy = df_grouped["y"].std().to_numpy() sz = df_grouped["z"].std().to_numpy() tmp = np.power(sx, 2) + np.power(sy, 2) sxy = np.sqrt(tmp) rms_xy = np.sqrt(tmp / 2) exy = sxy / np.sqrt(n) ez = sz / np.sqrt(n) fluo = df_grouped["fluo"].agg(lambda x: mode(x, keepdims=True)[0][0]).to_numpy() mtim = df_grouped["tim"].mean().to_numpy() tot_tim, _, _ = calculate_trace_time(df) # Optional tracking statistics if is_tracking: total_distance, _, _ = calculate_total_distance_traveled(df) speeds = ( total_distance["displacement"].to_numpy() / tot_tim["tim_tot"].to_numpy() ) # Store trace stats df_tid["tid"] = tid # Trace ID df_tid["n"] = n # Number of localizations for given trace ID df_tid["mx"] = mx # x mean localization df_tid["my"] = my # y mean localization df_tid["mz"] = mz # z mean localization df_tid["sx"] = sx # x localization precision df_tid["sy"] = sy # y localization precision df_tid["sxy"] = sxy # Lateral (x, y) localization precision df_tid["rms_xy"] = rms_xy # Lateral root mean square df_tid["exy"] = exy # Standard error of sxy df_tid["sz"] = sz # z localization precision df_tid["ez"] = ez # Standard error of ez df_tid["fluo"] = fluo # Assigned fluorophore ID df_tid["mtim"] = mtim # Average time per trace df_tid["tim_tot"] = tot_tim["tim_tot"].to_numpy() # Total time per trace if is_tracking: df_tid["avg_speed"] = speeds # Average speed per trace df_tid["total_dist"] = total_distance[ "displacement" ].to_numpy() # Total travelled distance per trace # ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] columns will contain # np.nan if n == 1: we replace them with 0.0. # @TODO: should this be changed? It could be a global option. df_tid[["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]] = df_tid[ ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] ].fillna(value=0.0) # Return the results return df_tid def _calculate_weighted_positions(self): """Calculate per-trace localization weighted by relative photon count.""" if self.filtered_dataframe is None: return if not self._weighted_localizations_to_be_recomputed: return # Work with a copy of a subset of current filtered dataframe df = self.filtered_dataframe[ ["tid", "tim", "eco", "x", "y", "z", "fluo"] ].copy() if self._use_weighted_localizations: # Calculate weights for each coordinate based on 'eco' total_eco_per_tid = df.groupby("tid")["eco"].transform("sum") eco_rel = df["eco"] / total_eco_per_tid # Calculate weighted positions df.loc[:, "x_rel"] = df["x"] * eco_rel df.loc[:, "y_rel"] = df["y"] * eco_rel df.loc[:, "z_rel"] = df["z"] * eco_rel # Summing up the relative contributions df_grouped = df.groupby("tid") x_w = df_grouped["x_rel"].sum() y_w = df_grouped["y_rel"].sum() z_w = df_grouped["z_rel"].sum() # Return the most frequent fluo ID: traces should be homogeneous with respect # to their fluorophore assignment, but we search anyway for safety. fluo_mode = df_grouped["fluo"].agg( lambda x: x.mode()[0] if not x.empty else np.nan ) else: # Calculate simple average of localizations by TID df_grouped = df.groupby("tid") x_w = df_grouped["x"].mean() y_w = df_grouped["y"].mean() z_w = df_grouped["z"].mean() # Return the most frequent fluo ID: traces should be homogeneous with respect # to their fluorophore assignment, but we search anyway for safety. fluo_mode = df_grouped["fluo"].agg( lambda x: x.mode()[0] if not x.empty else np.nan ) # We calculate also the mean timestamp (not weighted) tim = df_grouped["tim"].mean() # Prepare a dataframe with the weighted localizations df_loc = pd.DataFrame( { "tid": x_w.index, "tim": tim.to_numpy(), "x": x_w.to_numpy(), "y": y_w.to_numpy(), "z": z_w.to_numpy(), "fluo": fluo_mode.to_numpy(), } ) # Update the weighted localization dataframe self._weighted_localizations = df_loc # Flag the results as up-to-date self._weighted_localizations_to_be_recomputed = FalseAllows for filtering and selecting data read by the underlying
MinFluxReader. Please notice thatMinFluxProcessormakes use ofState.min_trace_lengthto make sure that at load and after every filtering step, short traces are dropped.Constructor.
Parameters
source:Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset]- Either a MinFluxReader object or a MinFluxDataset.
min_trace_length:int (Default = 1)- Minimum number of localizations for a trace to be kept. Shorter traces are dropped.
Static methods
def calculate_statistics_on(df: pandas.core.frame.DataFrame, is_tracking: bool = False) ‑> pandas.core.frame.DataFrame-
Expand source code
@staticmethod def calculate_statistics_on( df: pd.DataFrame, is_tracking: bool = False ) -> pd.DataFrame: """Calculate per-trace statistics for the selected dataframe. Parameters ---------- df: pd.DataFrame DataFrame (view) generated by one of the `select_by_*` methods. is_tracking: bool Whether the data comes from a tracking instead of a localization experiment. Returns ------- df_stats: pd.DataFrame Per-trace statistics calculated on the passed DataFrame selection (view). """ # Prepare a dataframe with the statistics if is_tracking: df_tid = pd.DataFrame( columns=MinFluxProcessor.trace_stats_with_tracking_properties() ) else: df_tid = pd.DataFrame(columns=MinFluxProcessor.trace_stats_properties()) # Calculate some statistics per TID on the passed dataframe df_grouped = df.groupby("tid") # Base statistics tid = df_grouped["tid"].first().to_numpy() n = df_grouped["tid"].count().to_numpy() mx = df_grouped["x"].mean().to_numpy() my = df_grouped["y"].mean().to_numpy() mz = df_grouped["z"].mean().to_numpy() sx = df_grouped["x"].std().to_numpy() sy = df_grouped["y"].std().to_numpy() sz = df_grouped["z"].std().to_numpy() tmp = np.power(sx, 2) + np.power(sy, 2) sxy = np.sqrt(tmp) rms_xy = np.sqrt(tmp / 2) exy = sxy / np.sqrt(n) ez = sz / np.sqrt(n) fluo = df_grouped["fluo"].agg(lambda x: mode(x, keepdims=True)[0][0]).to_numpy() mtim = df_grouped["tim"].mean().to_numpy() tot_tim, _, _ = calculate_trace_time(df) # Optional tracking statistics if is_tracking: total_distance, _, _ = calculate_total_distance_traveled(df) speeds = ( total_distance["displacement"].to_numpy() / tot_tim["tim_tot"].to_numpy() ) # Store trace stats df_tid["tid"] = tid # Trace ID df_tid["n"] = n # Number of localizations for given trace ID df_tid["mx"] = mx # x mean localization df_tid["my"] = my # y mean localization df_tid["mz"] = mz # z mean localization df_tid["sx"] = sx # x localization precision df_tid["sy"] = sy # y localization precision df_tid["sxy"] = sxy # Lateral (x, y) localization precision df_tid["rms_xy"] = rms_xy # Lateral root mean square df_tid["exy"] = exy # Standard error of sxy df_tid["sz"] = sz # z localization precision df_tid["ez"] = ez # Standard error of ez df_tid["fluo"] = fluo # Assigned fluorophore ID df_tid["mtim"] = mtim # Average time per trace df_tid["tim_tot"] = tot_tim["tim_tot"].to_numpy() # Total time per trace if is_tracking: df_tid["avg_speed"] = speeds # Average speed per trace df_tid["total_dist"] = total_distance[ "displacement" ].to_numpy() # Total travelled distance per trace # ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] columns will contain # np.nan if n == 1: we replace them with 0.0. # @TODO: should this be changed? It could be a global option. df_tid[["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]] = df_tid[ ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] ].fillna(value=0.0) # Return the results return df_tidCalculate per-trace statistics for the selected dataframe.
Parameters
df:pd.DataFrame- DataFrame (view) generated by one of the
select_by_*methods. is_tracking:bool- Whether the data comes from a tracking instead of a localization experiment.
Returns
df_stats:pd.DataFrame- Per-trace statistics calculated on the passed DataFrame selection (view).
def processed_properties()-
Return the processed dataframe columns.
def trace_stats_properties()-
Return the columns of the filtered_dataframe_stats.
def trace_stats_with_tracking_properties()-
Return the columns of the filtered_dataframe_stats with tracking columns.
Instance variables
prop current_fluorophore_id : int-
Expand source code
@property def current_fluorophore_id(self) -> int: """Return current fluorophore ID (0 for all).""" return self._current_fluorophore_idReturn current fluorophore ID (0 for all).
var dataset-
Expand source code
class MinFluxProcessor: """Processor of MINFLUX data.""" __doc__ = """Allows for filtering and selecting data read by the underlying `MinFluxReader`. Please notice that `MinFluxProcessor` makes use of `State.min_trace_length` to make sure that at load and after every filtering step, short traces are dropped.""" __slots__ = [ "state", "dataset", "_current_fluorophore_id", "_filtered_stats_dataframe", "_fluorophore_names", "_min_trace_length", "_selected_rows_dict", "_stats_to_be_recomputed", "_weighted_localizations", "_weighted_localizations_to_be_recomputed", "_use_weighted_localizations", "_has_dynamic_filters", ] def __init__( self, source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset], min_trace_length: int = 1, ): """Constructor. Parameters ---------- source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset] Either a MinFluxReader object or a MinFluxDataset. min_trace_length: int (Default = 1) Minimum number of localizations for a trace to be kept. Shorter traces are dropped. """ # Convert reader-like sources to dataset if necessary (duck-typed) if isinstance(source, MinFluxDataset): self.dataset = source elif isinstance(source, (MinFluxReader, MinFluxReaderV2)) or hasattr( source, "processed_dataframe" ): self.dataset = MinFluxDataset.from_reader(source) else: raise TypeError( f"source must be MinFluxReader, MinFluxReaderV2, MinFluxDataset, or reader-like; got {type(source)}" ) # Global options (to be applied after every operation) self._min_trace_length: int = min_trace_length # Cache the filtered stats dataframe self._filtered_stats_dataframe = None # Keep separate arrays of booleans to cache selection state for all # fluorophore IDs. self._selected_rows_dict = None self._init_selected_rows_dict() # Keep track of the selected fluorophore # 0 - All (default) # 1 - Fluorophore 1 # 2 - Fluorophore 2 self._current_fluorophore_id = 0 # Cache the weighted, averaged TID positions self._weighted_localizations = None # Keep track whether the statistics and the weighted localizations need to be recomputed self._stats_to_be_recomputed = False self._weighted_localizations_to_be_recomputed = False # Whether to use weighted average for localizations self._use_weighted_localizations = False # Track whether dynamic (user-applied) filters have been applied self._has_dynamic_filters = False # Initialize fluorophore names mapping (fluo_id -> name) self._fluorophore_names = {} self._init_fluorophore_names() # Apply the global filters self._apply_global_filters() def _init_selected_rows_dict(self): """Initialize the selected rows array.""" if self.processed_dataframe is None: return # Create selection mask keep_mask = pd.Series( data=np.ones(len(self.processed_dataframe.index), dtype=bool), index=self.processed_dataframe.index, ) # Store the mask for each fluorophore present in the data unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) unique_fluos = unique_fluos[unique_fluos > 0] self._selected_rows_dict = {} for fluo_id in unique_fluos: self._selected_rows_dict[int(fluo_id)] = keep_mask.copy() def has_filters_applied(self) -> bool: """Check if any dynamic (user-applied) filters have been applied to the data. This does not count global filters like min_trace_length which are always applied. Returns ------- has_filters: bool True if any dynamic filters are active, False otherwise. """ return self._has_dynamic_filters def _init_fluorophore_names(self): """Initialize fluorophore names with default values (string representation of fluo ID).""" if self.processed_dataframe is None: return unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) unique_fluos = unique_fluos[unique_fluos > 0] for fluo_id in unique_fluos: fluo_id_int = int(fluo_id) if fluo_id_int not in self._fluorophore_names: self._fluorophore_names[fluo_id_int] = str(fluo_id_int) @property def reader(self): """Return a reader-like interface to the dataset for backwards compatibility. This property provides access to the underlying dataset using the reader interface that existing code expects. """ return self.dataset @property def is_tracking(self): """Minimum number of localizations for the trace to be kept.""" return self.reader.is_tracking @property def min_trace_length(self): """Minimum number of localizations for the trace to be kept.""" return self._min_trace_length @property def z_scaling_factor(self): """Returns the scaling factor for the z coordinates from the underlying MinFluxReader.""" return self.reader.z_scaling_factor @min_trace_length.setter def min_trace_length(self, value): if value < 1 or int(value) != value: raise ValueError( "MinFluxProcessor.min_trace_length must be a positive integer!" ) self._min_trace_length = value # Run the global filters self._apply_global_filters() @property def is_3d(self) -> bool: """Return True if the acquisition is 3D. Returns ------- is_3d: bool True if the acquisition is 3D, False otherwise. """ return self.reader.is_3d @property def num_values(self) -> int: """Return the number of values in the (filtered) dataframe. Returns ------- n: int Number of values in the dataframe after all filters have been applied. """ if self.filtered_dataframe is not None: return len(self.filtered_dataframe.index) return 0 @property def current_fluorophore_id(self) -> int: """Return current fluorophore ID (0 for all).""" return self._current_fluorophore_id @current_fluorophore_id.setter def current_fluorophore_id(self, fluorophore_id: int) -> None: """Set current fluorophore ID (0 for all).""" # Validate fluorophore ID: 0 for all, or 1 to num_fluorophores if fluorophore_id < 0: raise ValueError(f"Fluorophore ID must be non-negative, got {fluorophore_id}.") if fluorophore_id > 0 and fluorophore_id > self.num_fluorophores: raise ValueError( f"Fluorophore ID {fluorophore_id} is out of range. " f"Valid range is 0 (all) or 1-{self.num_fluorophores}." ) # Set the new fluorophore_id self._current_fluorophore_id = fluorophore_id # Apply the global filters self._apply_global_filters() # Flag stats to be recomputed self._stats_to_be_recomputed = True @property def num_fluorophores(self) -> int: """Return the number of fluorophores.""" if self.processed_dataframe is None: return 0 unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) return len(unique_fluos[unique_fluos > 0]) @property def fluorophore_names(self) -> dict: """Return the fluorophore names mapping (fluo_id -> name).""" return self._fluorophore_names.copy() def set_fluorophore_name(self, fluo_id: int, name: str): """Set the name for a specific fluorophore ID. Parameters ---------- fluo_id: int The fluorophore ID (must be >= 1) name: str The name to assign to this fluorophore """ if fluo_id < 1: raise ValueError(f"Fluorophore ID must be >= 1, got {fluo_id}") if not isinstance(name, str): raise ValueError(f"Fluorophore name must be a string, got {type(name)}") self._fluorophore_names[fluo_id] = name def set_fluorophore_names(self, names: dict): """Set fluorophore names from a dictionary mapping. This method updates the fluorophore names dictionary, preserving any existing names for fluorophore IDs not included in the names parameter. Parameters ---------- names: dict Dictionary mapping fluo_id (int) to name (str) """ if not isinstance(names, dict): raise ValueError(f"Names must be a dictionary, got {type(names)}") for fluo_id, name in names.items(): if not isinstance(fluo_id, int) or fluo_id < 1: raise ValueError(f"Fluorophore ID must be an integer >= 1, got {fluo_id}") if not isinstance(name, str): raise ValueError(f"Fluorophore name must be a string, got {type(name)}") # Update the dictionary instead of replacing it to preserve existing names self._fluorophore_names.update(names) def get_fluorophore_name(self, fluo_id: int) -> str: """Get the name for a specific fluorophore ID. Parameters ---------- fluo_id: int The fluorophore ID Returns ------- name: str The name of the fluorophore (defaults to string representation of ID if not set) """ return self._fluorophore_names.get(fluo_id, str(fluo_id)) def _filtered_raw_data_array_all_fluorophores(self): """Return the raw NumPy array with applied filters (for all fluorophores). This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods). """ # Copy the raw NumPy array raw_array = self.reader.valid_raw_data_array if raw_array is None: return None if self.processed_dataframe is None or self._selected_rows_dict is None: return None # Append the fluorophore ID data raw_array["fluo"] = self.processed_dataframe["fluo"].astype(np.uint8) # Extract combination of all fluorophore filtered dataframes combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool) for fluo_id, fluo_mask in self._selected_rows_dict.items(): combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask return raw_array[combined_mask] @property def filtered_raw_data_array(self): """Return the raw NumPy array with applied filters for the selected fluorophores. This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods). """ # Copy the raw NumPy array full_array = self._filtered_raw_data_array_all_fluorophores() if full_array is None: return None if self.current_fluorophore_id == 0: return full_array else: # Filter by the current fluorophore ID return full_array[full_array["fluo"] == self.current_fluorophore_id] @property def processed_dataframe(self) -> Union[None, pd.DataFrame]: """Return the full dataframe (with valid entries only), with no selections or filters. Returns ------- processed_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ return self.dataset.processed_dataframe @property def filename(self) -> Union[Path, None]: """Return the filename if set.""" return self.dataset.filename def replace_dataset(self, new_dataset: MinFluxDataset): """Replace the current dataset with a new one. This is useful when combining datasets - the processor's dataset can be replaced with the combined dataset without losing filter state or other settings. Parameters ---------- new_dataset: MinFluxDataset The new dataset to use. """ # Store current settings old_min_trace_length = self._min_trace_length old_use_weighted = self._use_weighted_localizations old_fluorophore_names = self._fluorophore_names.copy() # Replace the dataset self.dataset = new_dataset # Re-initialize selection and filters self._selected_rows_dict = None self._init_selected_rows_dict() # Re-initialize fluorophore names (preserving custom names where IDs match) self._fluorophore_names = {} self._init_fluorophore_names() # Restore custom names for matching fluorophore IDs for fluo_id, name in old_fluorophore_names.items(): if fluo_id in self._fluorophore_names: self._fluorophore_names[fluo_id] = name # Restore settings self._min_trace_length = old_min_trace_length self._use_weighted_localizations = old_use_weighted # Reset current fluorophore to "all" self._current_fluorophore_id = 0 # Reset dynamic filters flag (dataset replacement clears all filters) self._has_dynamic_filters = False # Apply global filters self._apply_global_filters() # Flag derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _filtered_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]: """Return joint dataframe for all fluorophores and with all filters applied. Returns ------- filtered_dataframe_all: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ if self.processed_dataframe is None or self._selected_rows_dict is None: return None # Extract combination of all fluorophore filtered dataframes combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool) for fluo_id, fluo_mask in self._selected_rows_dict.items(): combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask return self.processed_dataframe.loc[combined_mask] def _filtered_raw_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]: """Return joint raw dataframe (all properties) for all fluorophores and with all filters applied. Returns ------- filtered_dataframe_all: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ # This is only compatible with MinFluxReader version 2 if self.reader.version != 2: raise ValueError("Only reader version 2 is supported.") # valid_full_raw_dataframe is a MinFluxReaderV2 property raw_array = self.reader.valid_full_raw_dataframe # Now extract the fluorophore assignments from self.processed_dataframe and # expand them onto the raw array if "fluo" in raw_array.columns: raw_fluo = raw_array["fluo"].fillna(0).astype(np.uint8).to_numpy(copy=True) else: raw_fluo = np.zeros(len(raw_array), dtype=np.uint8) if "iid" in raw_array.columns and "iid" in self.processed_dataframe.columns: fluo_map = dict( zip( self.processed_dataframe["iid"], self.processed_dataframe["fluo"].astype(np.uint8), ) ) mapped_fluo = raw_array["iid"].map(fluo_map) matched_mask = mapped_fluo.notna().to_numpy() if matched_mask.any(): raw_fluo[matched_mask] = mapped_fluo.loc[matched_mask].astype(np.uint8).to_numpy() raw_array["fluo"] = raw_fluo.astype(np.uint8) # Return the array with the assigned fluorophores return raw_array @property def filtered_dataframe(self) -> Union[None, pd.DataFrame]: """Return dataframe with all filters applied. Returns ------- filtered_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ if self.processed_dataframe is None: return None if self.current_fluorophore_id == 0: return self._filtered_dataframe_all_fluorophores() else: # Use .loc to filter the dataframe in a single step filtered_df = self.processed_dataframe.loc[ self.processed_dataframe["fluo"] == self.current_fluorophore_id ] if self._selected_rows_dict is None: return None selected_indices = self._selected_rows_dict.get( self.current_fluorophore_id, [] ) return filtered_df.loc[selected_indices] @property def filtered_raw_dataframe(self) -> Union[None, pd.DataFrame]: """Return joint dataframe with all filters applied. Returns ------- filtered_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ # This is only compatible with MinFluxReader version 2 if self.reader.version != 2: raise ValueError("Only reader version 2 is supported.") # Copy the raw dataframe array full_dataframe = self._filtered_raw_dataframe_all_fluorophores() if full_dataframe is None: return None # Get the IIDs from the currently filtered localizations iids = self.filtered_dataframe["iid"].to_numpy() # Extract all rows matching the current set of iids full_dataframe = full_dataframe[full_dataframe["iid"].isin(iids)] # If needed, filter by fluorophore ID if self.current_fluorophore_id == 0: return full_dataframe else: # Filter by the current fluorophore ID return full_dataframe[full_dataframe["fluo"] == self.current_fluorophore_id] @property def filtered_dataframe_stats(self) -> Union[None, pd.DataFrame]: """Return dataframe stats with all filters applied. Returns ------- filtered_dataframe_stats: Union[None, pd.DataFrame] A Pandas dataframe with all data statistics or None if no file was loaded. """ if self._stats_to_be_recomputed: self._calculate_statistics() return self._filtered_stats_dataframe @property def weighted_localizations(self) -> Union[None, pd.DataFrame]: """Return the average (x, y, z) position per TID weighted by the relative photon count.""" if self._weighted_localizations_to_be_recomputed: self._calculate_weighted_positions() return self._weighted_localizations @property def use_weighted_localizations(self) -> bool: """Whether to use weighted average to calculate the mean localization per TID.""" return self._use_weighted_localizations @use_weighted_localizations.setter def use_weighted_localizations(self, value: bool): """Whether to use weighted average to calculate the mean localization per TID.""" self._use_weighted_localizations = value self._weighted_localizations_to_be_recomputed = True @classmethod def processed_properties(cls): """Return the processed dataframe columns.""" return MinFluxReader.processed_properties() @classmethod def trace_stats_properties(cls): """Return the columns of the filtered_dataframe_stats.""" return [ "tid", "n", "fluo", "mx", "my", "mz", "sx", "sy", "sxy", "exy", "rms_xy", "sz", "ez", "mtim", "tim_tot", ] @classmethod def trace_stats_with_tracking_properties(cls): """Return the columns of the filtered_dataframe_stats with tracking columns.""" return MinFluxProcessor.trace_stats_properties() + [ "avg_speed", "total_dist", ] def reset(self): """Drops all dynamic filters and resets the data to the processed data frame with global filters.""" # Clear the selection per fluorophore; they will be reinitialized as # all selected at the first access. self._init_selected_rows_dict() # Reset the mapping to the corresponding fluorophore self.processed_dataframe["fluo"] = 1 # Reset fluorophore names (clear custom names first) self._fluorophore_names = {} self._init_fluorophore_names() # Default fluorophore is 0 (no selection) self.current_fluorophore_id = 0 # Reset dynamic filters flag self._has_dynamic_filters = False # Apply global filters self._apply_global_filters() def set_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]): """Assign the fluorophore IDs to current filtered dataset. This method assigns new fluorophore IDs only to the rows in the current filtered dataset. All other rows remain unchanged. """ if self.filtered_dataframe is None: return if len(fluorophore_ids) != len(self.filtered_dataframe.index): raise ValueError( "The number of fluorophore IDs does not match the number of entries in the dataframe." ) # Get the actual indices from the filtered dataframe filtered_indices = self.filtered_dataframe.index # Assign the new fluorophore IDs to those specific indices only # All other rows remain unchanged (preserving other fluorophores) self.processed_dataframe.loc[filtered_indices, "fluo"] = fluorophore_ids.astype(np.uint8) # Apply global filters self._init_selected_rows_dict() self._init_fluorophore_names() self._apply_global_filters() def set_full_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]): """Assign the fluorophore IDs to the original, full dataframe ignoring current filters.""" if self.processed_dataframe is None: return if len(fluorophore_ids) != len(self.processed_dataframe.index): raise ValueError( "The number of fluorophore IDs does not match the number of entries in the dataframe." ) self.processed_dataframe["fluo"] = fluorophore_ids.astype(np.uint8) # Apply global filters self._init_selected_rows_dict() self._init_fluorophore_names() self._apply_global_filters() def select_by_rows( self, indices: np.ndarray, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame row indices. The underlying dataframe is not modified. Parameters ---------- indices: np.ndarray Logical array for selecting the elements to be returned. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded. """ if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.iloc[indices] else: if self.filtered_dataframe is None: return None return self.filtered_dataframe.iloc[indices] def select_by_series_iloc( self, iloc: np.ndarray, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame index locations. The underlying dataframe is not modified. Parameters ---------- iloc: np.ndarray Array of Series index locations for selecting rows. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded. """ if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[iloc] else: if self.filtered_dataframe is None: return None return self.filtered_dataframe.loc[iloc] def select_by_1d_range( self, x_prop, x_range, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameter and corresponding range. The underlying dataframe is not modified. Parameters ---------- x_prop: str Property to be filtered by corresponding x_range. x_range: tuple Tuple containing the minimum and maximum values for the selected property. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded. """ # Make sure that the range is increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[ (self._weighted_localizations[x_prop] >= x_min) & (self._weighted_localizations[x_prop] < x_max) ] else: # Work with currently selected rows if self.filtered_dataframe is None: return None df = self.filtered_dataframe return df.loc[(df[x_prop] >= x_min) & (df[x_prop] < x_max)] def select_by_2d_range( self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameters and corresponding ranges. The underlying dataframe is not modified. Parameters ---------- x_prop: str First property to be filtered by corresponding x_range. y_prop: str Second property to be filtered by corresponding y_range. x_range: tuple Tuple containing the minimum and maximum values for the first property. y_range: tuple Tuple containing the minimum and maximum values for the second property. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file was loaded. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max y_min = y_range[0] y_max = y_range[1] if y_max < y_min: y_max, y_min = y_min, y_max if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[ (self._weighted_localizations[x_prop] >= x_min) & (self._weighted_localizations[x_prop] < x_max) & (self._weighted_localizations[y_prop] >= y_min) & (self._weighted_localizations[y_prop] < y_max) ] else: # Work with currently selected rows if self.filtered_dataframe is None: return None df = self.filtered_dataframe return df.loc[ (df[x_prop] >= x_min) & (df[x_prop] < x_max) & (df[y_prop] >= y_min) & (df[y_prop] < y_max) ] def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range): """Filter dataset by the extracting a rectangular ROI over two parameters and two ranges. Parameters ---------- x_prop: str First property to be filtered by corresponding x_range. y_prop: str Second property to be filtered by corresponding y_range. x_range: tuple Tuple containing the minimum and maximum values for the first property. y_range: tuple Tuple containing the minimum and maximum values for the second property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max y_min = y_range[0] y_max = y_range[1] if y_max < y_min: y_max, y_min = y_min, y_max if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) & (self.filtered_dataframe[y_prop] >= y_min) & (self.filtered_dataframe[y_prop] < y_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) & (self.filtered_dataframe[y_prop] >= y_min) & (self.filtered_dataframe[y_prop] < y_max) ) # Make sure to always apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _apply_global_filters(self): """Apply filters that are defined in the global application configuration.""" if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._filter_by_tid_length(fluo_id) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore self._selected_rows_dict[self.current_fluorophore_id] = self._filter_by_tid_length( self.current_fluorophore_id ) # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _filter_by_tid_length(self, index): # Make sure to count only currently selected rows df = self.processed_dataframe.copy() df.loc[np.invert(self._selected_rows_dict[index]), "tid"] = np.nan # Select all rows where the count of TIDs is larger than self._min_trace_num counts = df["tid"].value_counts(normalize=False) return df["tid"].isin(counts[counts >= self.min_trace_length].index) def filter_by_single_threshold( self, prop: str, threshold: Union[int, float], larger_than: bool = True ): """Apply single threshold to filter values either lower or higher (equal) than threshold for given property.""" # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): if larger_than: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] >= threshold ) else: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] < threshold ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id if larger_than: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] >= threshold ) else: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] < threshold ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_range(self, x_prop: str, x_range: tuple): """Apply min and max thresholding to the given property. Parameters ---------- x_prop: str Name of the property (dataframe column) to filter. x_range: tuple Tuple containing the minimum and maximum values for the selected property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_range_complement(self, prop: str, x_range: tuple): """Apply min and max thresholding to the given property but keep the data outside the range (i.e., crop the selected range). Parameters ---------- prop: str Name of the property (dataframe column) to filter. x_range: tuple Tuple containing the minimum and maximum values for cropping the selected property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( (self.filtered_dataframe[prop] < x_min) | (self.filtered_dataframe[prop] >= x_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( (self.filtered_dataframe[prop] < x_min) | (self.filtered_dataframe[prop] >= x_max) ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple): """Filter TIDs by min and max thresholding using the given property from the stats dataframe. Parameters ---------- x_prop_stats: str Name of the property (column) from the stats dataframe used to filter. x_range: tuple Tuple containing the minimum and maximum values for the selected property. """ # Make sure the property exists in the stats dataframe if x_prop_stats not in self.filtered_dataframe_stats.columns: raise ValueError( f"The property {x_prop_stats} does not exist in `filtered_dataframe_stats`." ) # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Find all TIDs for current fluorophore ID by which the requested stats property is inside the range tids_to_keep = self.filtered_dataframe_stats[ ( (self.filtered_dataframe_stats[x_prop_stats] >= x_min) & (self.filtered_dataframe_stats[x_prop_stats] <= x_max) ) ]["tid"].to_numpy() # Rows of the filtered dataframe to keep rows_to_keep = self.filtered_dataframe["tid"].isin(tids_to_keep) # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & rows_to_keep elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore self._selected_rows_dict[self.current_fluorophore_id] = ( self._selected_rows_dict[self.current_fluorophore_id] & rows_to_keep ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _calculate_statistics(self): """Calculate per-trace statistics.""" # Make sure we have processed dataframe to work on if self.processed_dataframe is None: return # Only recompute statistics if needed if not self._stats_to_be_recomputed: return # Work with currently selected rows df = self.filtered_dataframe # Calculate the statistics df_tid = self.calculate_statistics_on(df, self.reader.is_tracking) # Store the results self._filtered_stats_dataframe = df_tid # Flag the statistics to be computed self._stats_to_be_recomputed = False @staticmethod def calculate_statistics_on( df: pd.DataFrame, is_tracking: bool = False ) -> pd.DataFrame: """Calculate per-trace statistics for the selected dataframe. Parameters ---------- df: pd.DataFrame DataFrame (view) generated by one of the `select_by_*` methods. is_tracking: bool Whether the data comes from a tracking instead of a localization experiment. Returns ------- df_stats: pd.DataFrame Per-trace statistics calculated on the passed DataFrame selection (view). """ # Prepare a dataframe with the statistics if is_tracking: df_tid = pd.DataFrame( columns=MinFluxProcessor.trace_stats_with_tracking_properties() ) else: df_tid = pd.DataFrame(columns=MinFluxProcessor.trace_stats_properties()) # Calculate some statistics per TID on the passed dataframe df_grouped = df.groupby("tid") # Base statistics tid = df_grouped["tid"].first().to_numpy() n = df_grouped["tid"].count().to_numpy() mx = df_grouped["x"].mean().to_numpy() my = df_grouped["y"].mean().to_numpy() mz = df_grouped["z"].mean().to_numpy() sx = df_grouped["x"].std().to_numpy() sy = df_grouped["y"].std().to_numpy() sz = df_grouped["z"].std().to_numpy() tmp = np.power(sx, 2) + np.power(sy, 2) sxy = np.sqrt(tmp) rms_xy = np.sqrt(tmp / 2) exy = sxy / np.sqrt(n) ez = sz / np.sqrt(n) fluo = df_grouped["fluo"].agg(lambda x: mode(x, keepdims=True)[0][0]).to_numpy() mtim = df_grouped["tim"].mean().to_numpy() tot_tim, _, _ = calculate_trace_time(df) # Optional tracking statistics if is_tracking: total_distance, _, _ = calculate_total_distance_traveled(df) speeds = ( total_distance["displacement"].to_numpy() / tot_tim["tim_tot"].to_numpy() ) # Store trace stats df_tid["tid"] = tid # Trace ID df_tid["n"] = n # Number of localizations for given trace ID df_tid["mx"] = mx # x mean localization df_tid["my"] = my # y mean localization df_tid["mz"] = mz # z mean localization df_tid["sx"] = sx # x localization precision df_tid["sy"] = sy # y localization precision df_tid["sxy"] = sxy # Lateral (x, y) localization precision df_tid["rms_xy"] = rms_xy # Lateral root mean square df_tid["exy"] = exy # Standard error of sxy df_tid["sz"] = sz # z localization precision df_tid["ez"] = ez # Standard error of ez df_tid["fluo"] = fluo # Assigned fluorophore ID df_tid["mtim"] = mtim # Average time per trace df_tid["tim_tot"] = tot_tim["tim_tot"].to_numpy() # Total time per trace if is_tracking: df_tid["avg_speed"] = speeds # Average speed per trace df_tid["total_dist"] = total_distance[ "displacement" ].to_numpy() # Total travelled distance per trace # ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] columns will contain # np.nan if n == 1: we replace them with 0.0. # @TODO: should this be changed? It could be a global option. df_tid[["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]] = df_tid[ ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] ].fillna(value=0.0) # Return the results return df_tid def _calculate_weighted_positions(self): """Calculate per-trace localization weighted by relative photon count.""" if self.filtered_dataframe is None: return if not self._weighted_localizations_to_be_recomputed: return # Work with a copy of a subset of current filtered dataframe df = self.filtered_dataframe[ ["tid", "tim", "eco", "x", "y", "z", "fluo"] ].copy() if self._use_weighted_localizations: # Calculate weights for each coordinate based on 'eco' total_eco_per_tid = df.groupby("tid")["eco"].transform("sum") eco_rel = df["eco"] / total_eco_per_tid # Calculate weighted positions df.loc[:, "x_rel"] = df["x"] * eco_rel df.loc[:, "y_rel"] = df["y"] * eco_rel df.loc[:, "z_rel"] = df["z"] * eco_rel # Summing up the relative contributions df_grouped = df.groupby("tid") x_w = df_grouped["x_rel"].sum() y_w = df_grouped["y_rel"].sum() z_w = df_grouped["z_rel"].sum() # Return the most frequent fluo ID: traces should be homogeneous with respect # to their fluorophore assignment, but we search anyway for safety. fluo_mode = df_grouped["fluo"].agg( lambda x: x.mode()[0] if not x.empty else np.nan ) else: # Calculate simple average of localizations by TID df_grouped = df.groupby("tid") x_w = df_grouped["x"].mean() y_w = df_grouped["y"].mean() z_w = df_grouped["z"].mean() # Return the most frequent fluo ID: traces should be homogeneous with respect # to their fluorophore assignment, but we search anyway for safety. fluo_mode = df_grouped["fluo"].agg( lambda x: x.mode()[0] if not x.empty else np.nan ) # We calculate also the mean timestamp (not weighted) tim = df_grouped["tim"].mean() # Prepare a dataframe with the weighted localizations df_loc = pd.DataFrame( { "tid": x_w.index, "tim": tim.to_numpy(), "x": x_w.to_numpy(), "y": y_w.to_numpy(), "z": z_w.to_numpy(), "fluo": fluo_mode.to_numpy(), } ) # Update the weighted localization dataframe self._weighted_localizations = df_loc # Flag the results as up-to-date self._weighted_localizations_to_be_recomputed = False prop filename : pathlib.Path | None-
Expand source code
@property def filename(self) -> Union[Path, None]: """Return the filename if set.""" return self.dataset.filenameReturn the filename if set.
prop filtered_dataframe : pandas.core.frame.DataFrame | None-
Expand source code
@property def filtered_dataframe(self) -> Union[None, pd.DataFrame]: """Return dataframe with all filters applied. Returns ------- filtered_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ if self.processed_dataframe is None: return None if self.current_fluorophore_id == 0: return self._filtered_dataframe_all_fluorophores() else: # Use .loc to filter the dataframe in a single step filtered_df = self.processed_dataframe.loc[ self.processed_dataframe["fluo"] == self.current_fluorophore_id ] if self._selected_rows_dict is None: return None selected_indices = self._selected_rows_dict.get( self.current_fluorophore_id, [] ) return filtered_df.loc[selected_indices]Return dataframe with all filters applied.
Returns
filtered_dataframe:Union[None, pd.DataFrame]- A Pandas dataframe or None if no file was loaded.
prop filtered_dataframe_stats : pandas.core.frame.DataFrame | None-
Expand source code
@property def filtered_dataframe_stats(self) -> Union[None, pd.DataFrame]: """Return dataframe stats with all filters applied. Returns ------- filtered_dataframe_stats: Union[None, pd.DataFrame] A Pandas dataframe with all data statistics or None if no file was loaded. """ if self._stats_to_be_recomputed: self._calculate_statistics() return self._filtered_stats_dataframeReturn dataframe stats with all filters applied.
Returns
filtered_dataframe_stats:Union[None, pd.DataFrame]- A Pandas dataframe with all data statistics or None if no file was loaded.
prop filtered_raw_data_array-
Expand source code
@property def filtered_raw_data_array(self): """Return the raw NumPy array with applied filters for the selected fluorophores. This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods). """ # Copy the raw NumPy array full_array = self._filtered_raw_data_array_all_fluorophores() if full_array is None: return None if self.current_fluorophore_id == 0: return full_array else: # Filter by the current fluorophore ID return full_array[full_array["fluo"] == self.current_fluorophore_id]Return the raw NumPy array with applied filters for the selected fluorophores.
This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods).
prop filtered_raw_dataframe : pandas.core.frame.DataFrame | None-
Expand source code
@property def filtered_raw_dataframe(self) -> Union[None, pd.DataFrame]: """Return joint dataframe with all filters applied. Returns ------- filtered_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ # This is only compatible with MinFluxReader version 2 if self.reader.version != 2: raise ValueError("Only reader version 2 is supported.") # Copy the raw dataframe array full_dataframe = self._filtered_raw_dataframe_all_fluorophores() if full_dataframe is None: return None # Get the IIDs from the currently filtered localizations iids = self.filtered_dataframe["iid"].to_numpy() # Extract all rows matching the current set of iids full_dataframe = full_dataframe[full_dataframe["iid"].isin(iids)] # If needed, filter by fluorophore ID if self.current_fluorophore_id == 0: return full_dataframe else: # Filter by the current fluorophore ID return full_dataframe[full_dataframe["fluo"] == self.current_fluorophore_id]Return joint dataframe with all filters applied.
Returns
filtered_dataframe:Union[None, pd.DataFrame]- A Pandas dataframe or None if no file was loaded.
prop fluorophore_names : dict-
Expand source code
@property def fluorophore_names(self) -> dict: """Return the fluorophore names mapping (fluo_id -> name).""" return self._fluorophore_names.copy()Return the fluorophore names mapping (fluo_id -> name).
prop is_3d : bool-
Expand source code
@property def is_3d(self) -> bool: """Return True if the acquisition is 3D. Returns ------- is_3d: bool True if the acquisition is 3D, False otherwise. """ return self.reader.is_3dReturn True if the acquisition is 3D.
Returns
is_3d:bool- True if the acquisition is 3D, False otherwise.
prop is_tracking-
Expand source code
@property def is_tracking(self): """Minimum number of localizations for the trace to be kept.""" return self.reader.is_trackingMinimum number of localizations for the trace to be kept.
prop min_trace_length-
Expand source code
@property def min_trace_length(self): """Minimum number of localizations for the trace to be kept.""" return self._min_trace_lengthMinimum number of localizations for the trace to be kept.
prop num_fluorophores : int-
Expand source code
@property def num_fluorophores(self) -> int: """Return the number of fluorophores.""" if self.processed_dataframe is None: return 0 unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) return len(unique_fluos[unique_fluos > 0])Return the number of fluorophores.
prop num_values : int-
Expand source code
@property def num_values(self) -> int: """Return the number of values in the (filtered) dataframe. Returns ------- n: int Number of values in the dataframe after all filters have been applied. """ if self.filtered_dataframe is not None: return len(self.filtered_dataframe.index) return 0Return the number of values in the (filtered) dataframe.
Returns
n:int- Number of values in the dataframe after all filters have been applied.
prop processed_dataframe : pandas.core.frame.DataFrame | None-
Expand source code
@property def processed_dataframe(self) -> Union[None, pd.DataFrame]: """Return the full dataframe (with valid entries only), with no selections or filters. Returns ------- processed_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ return self.dataset.processed_dataframeReturn the full dataframe (with valid entries only), with no selections or filters.
Returns
processed_dataframe:Union[None, pd.DataFrame]- A Pandas dataframe or None if no file was loaded.
prop reader-
Expand source code
@property def reader(self): """Return a reader-like interface to the dataset for backwards compatibility. This property provides access to the underlying dataset using the reader interface that existing code expects. """ return self.datasetReturn a reader-like interface to the dataset for backwards compatibility.
This property provides access to the underlying dataset using the reader interface that existing code expects.
var state-
Expand source code
class MinFluxProcessor: """Processor of MINFLUX data.""" __doc__ = """Allows for filtering and selecting data read by the underlying `MinFluxReader`. Please notice that `MinFluxProcessor` makes use of `State.min_trace_length` to make sure that at load and after every filtering step, short traces are dropped.""" __slots__ = [ "state", "dataset", "_current_fluorophore_id", "_filtered_stats_dataframe", "_fluorophore_names", "_min_trace_length", "_selected_rows_dict", "_stats_to_be_recomputed", "_weighted_localizations", "_weighted_localizations_to_be_recomputed", "_use_weighted_localizations", "_has_dynamic_filters", ] def __init__( self, source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset], min_trace_length: int = 1, ): """Constructor. Parameters ---------- source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset] Either a MinFluxReader object or a MinFluxDataset. min_trace_length: int (Default = 1) Minimum number of localizations for a trace to be kept. Shorter traces are dropped. """ # Convert reader-like sources to dataset if necessary (duck-typed) if isinstance(source, MinFluxDataset): self.dataset = source elif isinstance(source, (MinFluxReader, MinFluxReaderV2)) or hasattr( source, "processed_dataframe" ): self.dataset = MinFluxDataset.from_reader(source) else: raise TypeError( f"source must be MinFluxReader, MinFluxReaderV2, MinFluxDataset, or reader-like; got {type(source)}" ) # Global options (to be applied after every operation) self._min_trace_length: int = min_trace_length # Cache the filtered stats dataframe self._filtered_stats_dataframe = None # Keep separate arrays of booleans to cache selection state for all # fluorophore IDs. self._selected_rows_dict = None self._init_selected_rows_dict() # Keep track of the selected fluorophore # 0 - All (default) # 1 - Fluorophore 1 # 2 - Fluorophore 2 self._current_fluorophore_id = 0 # Cache the weighted, averaged TID positions self._weighted_localizations = None # Keep track whether the statistics and the weighted localizations need to be recomputed self._stats_to_be_recomputed = False self._weighted_localizations_to_be_recomputed = False # Whether to use weighted average for localizations self._use_weighted_localizations = False # Track whether dynamic (user-applied) filters have been applied self._has_dynamic_filters = False # Initialize fluorophore names mapping (fluo_id -> name) self._fluorophore_names = {} self._init_fluorophore_names() # Apply the global filters self._apply_global_filters() def _init_selected_rows_dict(self): """Initialize the selected rows array.""" if self.processed_dataframe is None: return # Create selection mask keep_mask = pd.Series( data=np.ones(len(self.processed_dataframe.index), dtype=bool), index=self.processed_dataframe.index, ) # Store the mask for each fluorophore present in the data unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) unique_fluos = unique_fluos[unique_fluos > 0] self._selected_rows_dict = {} for fluo_id in unique_fluos: self._selected_rows_dict[int(fluo_id)] = keep_mask.copy() def has_filters_applied(self) -> bool: """Check if any dynamic (user-applied) filters have been applied to the data. This does not count global filters like min_trace_length which are always applied. Returns ------- has_filters: bool True if any dynamic filters are active, False otherwise. """ return self._has_dynamic_filters def _init_fluorophore_names(self): """Initialize fluorophore names with default values (string representation of fluo ID).""" if self.processed_dataframe is None: return unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) unique_fluos = unique_fluos[unique_fluos > 0] for fluo_id in unique_fluos: fluo_id_int = int(fluo_id) if fluo_id_int not in self._fluorophore_names: self._fluorophore_names[fluo_id_int] = str(fluo_id_int) @property def reader(self): """Return a reader-like interface to the dataset for backwards compatibility. This property provides access to the underlying dataset using the reader interface that existing code expects. """ return self.dataset @property def is_tracking(self): """Minimum number of localizations for the trace to be kept.""" return self.reader.is_tracking @property def min_trace_length(self): """Minimum number of localizations for the trace to be kept.""" return self._min_trace_length @property def z_scaling_factor(self): """Returns the scaling factor for the z coordinates from the underlying MinFluxReader.""" return self.reader.z_scaling_factor @min_trace_length.setter def min_trace_length(self, value): if value < 1 or int(value) != value: raise ValueError( "MinFluxProcessor.min_trace_length must be a positive integer!" ) self._min_trace_length = value # Run the global filters self._apply_global_filters() @property def is_3d(self) -> bool: """Return True if the acquisition is 3D. Returns ------- is_3d: bool True if the acquisition is 3D, False otherwise. """ return self.reader.is_3d @property def num_values(self) -> int: """Return the number of values in the (filtered) dataframe. Returns ------- n: int Number of values in the dataframe after all filters have been applied. """ if self.filtered_dataframe is not None: return len(self.filtered_dataframe.index) return 0 @property def current_fluorophore_id(self) -> int: """Return current fluorophore ID (0 for all).""" return self._current_fluorophore_id @current_fluorophore_id.setter def current_fluorophore_id(self, fluorophore_id: int) -> None: """Set current fluorophore ID (0 for all).""" # Validate fluorophore ID: 0 for all, or 1 to num_fluorophores if fluorophore_id < 0: raise ValueError(f"Fluorophore ID must be non-negative, got {fluorophore_id}.") if fluorophore_id > 0 and fluorophore_id > self.num_fluorophores: raise ValueError( f"Fluorophore ID {fluorophore_id} is out of range. " f"Valid range is 0 (all) or 1-{self.num_fluorophores}." ) # Set the new fluorophore_id self._current_fluorophore_id = fluorophore_id # Apply the global filters self._apply_global_filters() # Flag stats to be recomputed self._stats_to_be_recomputed = True @property def num_fluorophores(self) -> int: """Return the number of fluorophores.""" if self.processed_dataframe is None: return 0 unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy()) return len(unique_fluos[unique_fluos > 0]) @property def fluorophore_names(self) -> dict: """Return the fluorophore names mapping (fluo_id -> name).""" return self._fluorophore_names.copy() def set_fluorophore_name(self, fluo_id: int, name: str): """Set the name for a specific fluorophore ID. Parameters ---------- fluo_id: int The fluorophore ID (must be >= 1) name: str The name to assign to this fluorophore """ if fluo_id < 1: raise ValueError(f"Fluorophore ID must be >= 1, got {fluo_id}") if not isinstance(name, str): raise ValueError(f"Fluorophore name must be a string, got {type(name)}") self._fluorophore_names[fluo_id] = name def set_fluorophore_names(self, names: dict): """Set fluorophore names from a dictionary mapping. This method updates the fluorophore names dictionary, preserving any existing names for fluorophore IDs not included in the names parameter. Parameters ---------- names: dict Dictionary mapping fluo_id (int) to name (str) """ if not isinstance(names, dict): raise ValueError(f"Names must be a dictionary, got {type(names)}") for fluo_id, name in names.items(): if not isinstance(fluo_id, int) or fluo_id < 1: raise ValueError(f"Fluorophore ID must be an integer >= 1, got {fluo_id}") if not isinstance(name, str): raise ValueError(f"Fluorophore name must be a string, got {type(name)}") # Update the dictionary instead of replacing it to preserve existing names self._fluorophore_names.update(names) def get_fluorophore_name(self, fluo_id: int) -> str: """Get the name for a specific fluorophore ID. Parameters ---------- fluo_id: int The fluorophore ID Returns ------- name: str The name of the fluorophore (defaults to string representation of ID if not set) """ return self._fluorophore_names.get(fluo_id, str(fluo_id)) def _filtered_raw_data_array_all_fluorophores(self): """Return the raw NumPy array with applied filters (for all fluorophores). This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods). """ # Copy the raw NumPy array raw_array = self.reader.valid_raw_data_array if raw_array is None: return None if self.processed_dataframe is None or self._selected_rows_dict is None: return None # Append the fluorophore ID data raw_array["fluo"] = self.processed_dataframe["fluo"].astype(np.uint8) # Extract combination of all fluorophore filtered dataframes combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool) for fluo_id, fluo_mask in self._selected_rows_dict.items(): combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask return raw_array[combined_mask] @property def filtered_raw_data_array(self): """Return the raw NumPy array with applied filters for the selected fluorophores. This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods). """ # Copy the raw NumPy array full_array = self._filtered_raw_data_array_all_fluorophores() if full_array is None: return None if self.current_fluorophore_id == 0: return full_array else: # Filter by the current fluorophore ID return full_array[full_array["fluo"] == self.current_fluorophore_id] @property def processed_dataframe(self) -> Union[None, pd.DataFrame]: """Return the full dataframe (with valid entries only), with no selections or filters. Returns ------- processed_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ return self.dataset.processed_dataframe @property def filename(self) -> Union[Path, None]: """Return the filename if set.""" return self.dataset.filename def replace_dataset(self, new_dataset: MinFluxDataset): """Replace the current dataset with a new one. This is useful when combining datasets - the processor's dataset can be replaced with the combined dataset without losing filter state or other settings. Parameters ---------- new_dataset: MinFluxDataset The new dataset to use. """ # Store current settings old_min_trace_length = self._min_trace_length old_use_weighted = self._use_weighted_localizations old_fluorophore_names = self._fluorophore_names.copy() # Replace the dataset self.dataset = new_dataset # Re-initialize selection and filters self._selected_rows_dict = None self._init_selected_rows_dict() # Re-initialize fluorophore names (preserving custom names where IDs match) self._fluorophore_names = {} self._init_fluorophore_names() # Restore custom names for matching fluorophore IDs for fluo_id, name in old_fluorophore_names.items(): if fluo_id in self._fluorophore_names: self._fluorophore_names[fluo_id] = name # Restore settings self._min_trace_length = old_min_trace_length self._use_weighted_localizations = old_use_weighted # Reset current fluorophore to "all" self._current_fluorophore_id = 0 # Reset dynamic filters flag (dataset replacement clears all filters) self._has_dynamic_filters = False # Apply global filters self._apply_global_filters() # Flag derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _filtered_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]: """Return joint dataframe for all fluorophores and with all filters applied. Returns ------- filtered_dataframe_all: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ if self.processed_dataframe is None or self._selected_rows_dict is None: return None # Extract combination of all fluorophore filtered dataframes combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool) for fluo_id, fluo_mask in self._selected_rows_dict.items(): combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask return self.processed_dataframe.loc[combined_mask] def _filtered_raw_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]: """Return joint raw dataframe (all properties) for all fluorophores and with all filters applied. Returns ------- filtered_dataframe_all: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ # This is only compatible with MinFluxReader version 2 if self.reader.version != 2: raise ValueError("Only reader version 2 is supported.") # valid_full_raw_dataframe is a MinFluxReaderV2 property raw_array = self.reader.valid_full_raw_dataframe # Now extract the fluorophore assignments from self.processed_dataframe and # expand them onto the raw array if "fluo" in raw_array.columns: raw_fluo = raw_array["fluo"].fillna(0).astype(np.uint8).to_numpy(copy=True) else: raw_fluo = np.zeros(len(raw_array), dtype=np.uint8) if "iid" in raw_array.columns and "iid" in self.processed_dataframe.columns: fluo_map = dict( zip( self.processed_dataframe["iid"], self.processed_dataframe["fluo"].astype(np.uint8), ) ) mapped_fluo = raw_array["iid"].map(fluo_map) matched_mask = mapped_fluo.notna().to_numpy() if matched_mask.any(): raw_fluo[matched_mask] = mapped_fluo.loc[matched_mask].astype(np.uint8).to_numpy() raw_array["fluo"] = raw_fluo.astype(np.uint8) # Return the array with the assigned fluorophores return raw_array @property def filtered_dataframe(self) -> Union[None, pd.DataFrame]: """Return dataframe with all filters applied. Returns ------- filtered_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ if self.processed_dataframe is None: return None if self.current_fluorophore_id == 0: return self._filtered_dataframe_all_fluorophores() else: # Use .loc to filter the dataframe in a single step filtered_df = self.processed_dataframe.loc[ self.processed_dataframe["fluo"] == self.current_fluorophore_id ] if self._selected_rows_dict is None: return None selected_indices = self._selected_rows_dict.get( self.current_fluorophore_id, [] ) return filtered_df.loc[selected_indices] @property def filtered_raw_dataframe(self) -> Union[None, pd.DataFrame]: """Return joint dataframe with all filters applied. Returns ------- filtered_dataframe: Union[None, pd.DataFrame] A Pandas dataframe or None if no file was loaded. """ # This is only compatible with MinFluxReader version 2 if self.reader.version != 2: raise ValueError("Only reader version 2 is supported.") # Copy the raw dataframe array full_dataframe = self._filtered_raw_dataframe_all_fluorophores() if full_dataframe is None: return None # Get the IIDs from the currently filtered localizations iids = self.filtered_dataframe["iid"].to_numpy() # Extract all rows matching the current set of iids full_dataframe = full_dataframe[full_dataframe["iid"].isin(iids)] # If needed, filter by fluorophore ID if self.current_fluorophore_id == 0: return full_dataframe else: # Filter by the current fluorophore ID return full_dataframe[full_dataframe["fluo"] == self.current_fluorophore_id] @property def filtered_dataframe_stats(self) -> Union[None, pd.DataFrame]: """Return dataframe stats with all filters applied. Returns ------- filtered_dataframe_stats: Union[None, pd.DataFrame] A Pandas dataframe with all data statistics or None if no file was loaded. """ if self._stats_to_be_recomputed: self._calculate_statistics() return self._filtered_stats_dataframe @property def weighted_localizations(self) -> Union[None, pd.DataFrame]: """Return the average (x, y, z) position per TID weighted by the relative photon count.""" if self._weighted_localizations_to_be_recomputed: self._calculate_weighted_positions() return self._weighted_localizations @property def use_weighted_localizations(self) -> bool: """Whether to use weighted average to calculate the mean localization per TID.""" return self._use_weighted_localizations @use_weighted_localizations.setter def use_weighted_localizations(self, value: bool): """Whether to use weighted average to calculate the mean localization per TID.""" self._use_weighted_localizations = value self._weighted_localizations_to_be_recomputed = True @classmethod def processed_properties(cls): """Return the processed dataframe columns.""" return MinFluxReader.processed_properties() @classmethod def trace_stats_properties(cls): """Return the columns of the filtered_dataframe_stats.""" return [ "tid", "n", "fluo", "mx", "my", "mz", "sx", "sy", "sxy", "exy", "rms_xy", "sz", "ez", "mtim", "tim_tot", ] @classmethod def trace_stats_with_tracking_properties(cls): """Return the columns of the filtered_dataframe_stats with tracking columns.""" return MinFluxProcessor.trace_stats_properties() + [ "avg_speed", "total_dist", ] def reset(self): """Drops all dynamic filters and resets the data to the processed data frame with global filters.""" # Clear the selection per fluorophore; they will be reinitialized as # all selected at the first access. self._init_selected_rows_dict() # Reset the mapping to the corresponding fluorophore self.processed_dataframe["fluo"] = 1 # Reset fluorophore names (clear custom names first) self._fluorophore_names = {} self._init_fluorophore_names() # Default fluorophore is 0 (no selection) self.current_fluorophore_id = 0 # Reset dynamic filters flag self._has_dynamic_filters = False # Apply global filters self._apply_global_filters() def set_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]): """Assign the fluorophore IDs to current filtered dataset. This method assigns new fluorophore IDs only to the rows in the current filtered dataset. All other rows remain unchanged. """ if self.filtered_dataframe is None: return if len(fluorophore_ids) != len(self.filtered_dataframe.index): raise ValueError( "The number of fluorophore IDs does not match the number of entries in the dataframe." ) # Get the actual indices from the filtered dataframe filtered_indices = self.filtered_dataframe.index # Assign the new fluorophore IDs to those specific indices only # All other rows remain unchanged (preserving other fluorophores) self.processed_dataframe.loc[filtered_indices, "fluo"] = fluorophore_ids.astype(np.uint8) # Apply global filters self._init_selected_rows_dict() self._init_fluorophore_names() self._apply_global_filters() def set_full_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]): """Assign the fluorophore IDs to the original, full dataframe ignoring current filters.""" if self.processed_dataframe is None: return if len(fluorophore_ids) != len(self.processed_dataframe.index): raise ValueError( "The number of fluorophore IDs does not match the number of entries in the dataframe." ) self.processed_dataframe["fluo"] = fluorophore_ids.astype(np.uint8) # Apply global filters self._init_selected_rows_dict() self._init_fluorophore_names() self._apply_global_filters() def select_by_rows( self, indices: np.ndarray, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame row indices. The underlying dataframe is not modified. Parameters ---------- indices: np.ndarray Logical array for selecting the elements to be returned. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded. """ if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.iloc[indices] else: if self.filtered_dataframe is None: return None return self.filtered_dataframe.iloc[indices] def select_by_series_iloc( self, iloc: np.ndarray, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame index locations. The underlying dataframe is not modified. Parameters ---------- iloc: np.ndarray Array of Series index locations for selecting rows. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded. """ if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[iloc] else: if self.filtered_dataframe is None: return None return self.filtered_dataframe.loc[iloc] def select_by_1d_range( self, x_prop, x_range, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameter and corresponding range. The underlying dataframe is not modified. Parameters ---------- x_prop: str Property to be filtered by corresponding x_range. x_range: tuple Tuple containing the minimum and maximum values for the selected property. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded. """ # Make sure that the range is increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[ (self._weighted_localizations[x_prop] >= x_min) & (self._weighted_localizations[x_prop] < x_max) ] else: # Work with currently selected rows if self.filtered_dataframe is None: return None df = self.filtered_dataframe return df.loc[(df[x_prop] >= x_min) & (df[x_prop] < x_max)] def select_by_2d_range( self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameters and corresponding ranges. The underlying dataframe is not modified. Parameters ---------- x_prop: str First property to be filtered by corresponding x_range. y_prop: str Second property to be filtered by corresponding y_range. x_range: tuple Tuple containing the minimum and maximum values for the first property. y_range: tuple Tuple containing the minimum and maximum values for the second property. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file was loaded. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max y_min = y_range[0] y_max = y_range[1] if y_max < y_min: y_max, y_min = y_min, y_max if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[ (self._weighted_localizations[x_prop] >= x_min) & (self._weighted_localizations[x_prop] < x_max) & (self._weighted_localizations[y_prop] >= y_min) & (self._weighted_localizations[y_prop] < y_max) ] else: # Work with currently selected rows if self.filtered_dataframe is None: return None df = self.filtered_dataframe return df.loc[ (df[x_prop] >= x_min) & (df[x_prop] < x_max) & (df[y_prop] >= y_min) & (df[y_prop] < y_max) ] def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range): """Filter dataset by the extracting a rectangular ROI over two parameters and two ranges. Parameters ---------- x_prop: str First property to be filtered by corresponding x_range. y_prop: str Second property to be filtered by corresponding y_range. x_range: tuple Tuple containing the minimum and maximum values for the first property. y_range: tuple Tuple containing the minimum and maximum values for the second property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max y_min = y_range[0] y_max = y_range[1] if y_max < y_min: y_max, y_min = y_min, y_max if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) & (self.filtered_dataframe[y_prop] >= y_min) & (self.filtered_dataframe[y_prop] < y_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) & (self.filtered_dataframe[y_prop] >= y_min) & (self.filtered_dataframe[y_prop] < y_max) ) # Make sure to always apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _apply_global_filters(self): """Apply filters that are defined in the global application configuration.""" if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._filter_by_tid_length(fluo_id) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore self._selected_rows_dict[self.current_fluorophore_id] = self._filter_by_tid_length( self.current_fluorophore_id ) # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _filter_by_tid_length(self, index): # Make sure to count only currently selected rows df = self.processed_dataframe.copy() df.loc[np.invert(self._selected_rows_dict[index]), "tid"] = np.nan # Select all rows where the count of TIDs is larger than self._min_trace_num counts = df["tid"].value_counts(normalize=False) return df["tid"].isin(counts[counts >= self.min_trace_length].index) def filter_by_single_threshold( self, prop: str, threshold: Union[int, float], larger_than: bool = True ): """Apply single threshold to filter values either lower or higher (equal) than threshold for given property.""" # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): if larger_than: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] >= threshold ) else: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] < threshold ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id if larger_than: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] >= threshold ) else: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] < threshold ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_range(self, x_prop: str, x_range: tuple): """Apply min and max thresholding to the given property. Parameters ---------- x_prop: str Name of the property (dataframe column) to filter. x_range: tuple Tuple containing the minimum and maximum values for the selected property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_range_complement(self, prop: str, x_range: tuple): """Apply min and max thresholding to the given property but keep the data outside the range (i.e., crop the selected range). Parameters ---------- prop: str Name of the property (dataframe column) to filter. x_range: tuple Tuple containing the minimum and maximum values for cropping the selected property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( (self.filtered_dataframe[prop] < x_min) | (self.filtered_dataframe[prop] >= x_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( (self.filtered_dataframe[prop] < x_min) | (self.filtered_dataframe[prop] >= x_max) ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple): """Filter TIDs by min and max thresholding using the given property from the stats dataframe. Parameters ---------- x_prop_stats: str Name of the property (column) from the stats dataframe used to filter. x_range: tuple Tuple containing the minimum and maximum values for the selected property. """ # Make sure the property exists in the stats dataframe if x_prop_stats not in self.filtered_dataframe_stats.columns: raise ValueError( f"The property {x_prop_stats} does not exist in `filtered_dataframe_stats`." ) # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Find all TIDs for current fluorophore ID by which the requested stats property is inside the range tids_to_keep = self.filtered_dataframe_stats[ ( (self.filtered_dataframe_stats[x_prop_stats] >= x_min) & (self.filtered_dataframe_stats[x_prop_stats] <= x_max) ) ]["tid"].to_numpy() # Rows of the filtered dataframe to keep rows_to_keep = self.filtered_dataframe["tid"].isin(tids_to_keep) # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & rows_to_keep elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore self._selected_rows_dict[self.current_fluorophore_id] = ( self._selected_rows_dict[self.current_fluorophore_id] & rows_to_keep ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = True def _calculate_statistics(self): """Calculate per-trace statistics.""" # Make sure we have processed dataframe to work on if self.processed_dataframe is None: return # Only recompute statistics if needed if not self._stats_to_be_recomputed: return # Work with currently selected rows df = self.filtered_dataframe # Calculate the statistics df_tid = self.calculate_statistics_on(df, self.reader.is_tracking) # Store the results self._filtered_stats_dataframe = df_tid # Flag the statistics to be computed self._stats_to_be_recomputed = False @staticmethod def calculate_statistics_on( df: pd.DataFrame, is_tracking: bool = False ) -> pd.DataFrame: """Calculate per-trace statistics for the selected dataframe. Parameters ---------- df: pd.DataFrame DataFrame (view) generated by one of the `select_by_*` methods. is_tracking: bool Whether the data comes from a tracking instead of a localization experiment. Returns ------- df_stats: pd.DataFrame Per-trace statistics calculated on the passed DataFrame selection (view). """ # Prepare a dataframe with the statistics if is_tracking: df_tid = pd.DataFrame( columns=MinFluxProcessor.trace_stats_with_tracking_properties() ) else: df_tid = pd.DataFrame(columns=MinFluxProcessor.trace_stats_properties()) # Calculate some statistics per TID on the passed dataframe df_grouped = df.groupby("tid") # Base statistics tid = df_grouped["tid"].first().to_numpy() n = df_grouped["tid"].count().to_numpy() mx = df_grouped["x"].mean().to_numpy() my = df_grouped["y"].mean().to_numpy() mz = df_grouped["z"].mean().to_numpy() sx = df_grouped["x"].std().to_numpy() sy = df_grouped["y"].std().to_numpy() sz = df_grouped["z"].std().to_numpy() tmp = np.power(sx, 2) + np.power(sy, 2) sxy = np.sqrt(tmp) rms_xy = np.sqrt(tmp / 2) exy = sxy / np.sqrt(n) ez = sz / np.sqrt(n) fluo = df_grouped["fluo"].agg(lambda x: mode(x, keepdims=True)[0][0]).to_numpy() mtim = df_grouped["tim"].mean().to_numpy() tot_tim, _, _ = calculate_trace_time(df) # Optional tracking statistics if is_tracking: total_distance, _, _ = calculate_total_distance_traveled(df) speeds = ( total_distance["displacement"].to_numpy() / tot_tim["tim_tot"].to_numpy() ) # Store trace stats df_tid["tid"] = tid # Trace ID df_tid["n"] = n # Number of localizations for given trace ID df_tid["mx"] = mx # x mean localization df_tid["my"] = my # y mean localization df_tid["mz"] = mz # z mean localization df_tid["sx"] = sx # x localization precision df_tid["sy"] = sy # y localization precision df_tid["sxy"] = sxy # Lateral (x, y) localization precision df_tid["rms_xy"] = rms_xy # Lateral root mean square df_tid["exy"] = exy # Standard error of sxy df_tid["sz"] = sz # z localization precision df_tid["ez"] = ez # Standard error of ez df_tid["fluo"] = fluo # Assigned fluorophore ID df_tid["mtim"] = mtim # Average time per trace df_tid["tim_tot"] = tot_tim["tim_tot"].to_numpy() # Total time per trace if is_tracking: df_tid["avg_speed"] = speeds # Average speed per trace df_tid["total_dist"] = total_distance[ "displacement" ].to_numpy() # Total travelled distance per trace # ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] columns will contain # np.nan if n == 1: we replace them with 0.0. # @TODO: should this be changed? It could be a global option. df_tid[["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]] = df_tid[ ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] ].fillna(value=0.0) # Return the results return df_tid def _calculate_weighted_positions(self): """Calculate per-trace localization weighted by relative photon count.""" if self.filtered_dataframe is None: return if not self._weighted_localizations_to_be_recomputed: return # Work with a copy of a subset of current filtered dataframe df = self.filtered_dataframe[ ["tid", "tim", "eco", "x", "y", "z", "fluo"] ].copy() if self._use_weighted_localizations: # Calculate weights for each coordinate based on 'eco' total_eco_per_tid = df.groupby("tid")["eco"].transform("sum") eco_rel = df["eco"] / total_eco_per_tid # Calculate weighted positions df.loc[:, "x_rel"] = df["x"] * eco_rel df.loc[:, "y_rel"] = df["y"] * eco_rel df.loc[:, "z_rel"] = df["z"] * eco_rel # Summing up the relative contributions df_grouped = df.groupby("tid") x_w = df_grouped["x_rel"].sum() y_w = df_grouped["y_rel"].sum() z_w = df_grouped["z_rel"].sum() # Return the most frequent fluo ID: traces should be homogeneous with respect # to their fluorophore assignment, but we search anyway for safety. fluo_mode = df_grouped["fluo"].agg( lambda x: x.mode()[0] if not x.empty else np.nan ) else: # Calculate simple average of localizations by TID df_grouped = df.groupby("tid") x_w = df_grouped["x"].mean() y_w = df_grouped["y"].mean() z_w = df_grouped["z"].mean() # Return the most frequent fluo ID: traces should be homogeneous with respect # to their fluorophore assignment, but we search anyway for safety. fluo_mode = df_grouped["fluo"].agg( lambda x: x.mode()[0] if not x.empty else np.nan ) # We calculate also the mean timestamp (not weighted) tim = df_grouped["tim"].mean() # Prepare a dataframe with the weighted localizations df_loc = pd.DataFrame( { "tid": x_w.index, "tim": tim.to_numpy(), "x": x_w.to_numpy(), "y": y_w.to_numpy(), "z": z_w.to_numpy(), "fluo": fluo_mode.to_numpy(), } ) # Update the weighted localization dataframe self._weighted_localizations = df_loc # Flag the results as up-to-date self._weighted_localizations_to_be_recomputed = False prop use_weighted_localizations : bool-
Expand source code
@property def use_weighted_localizations(self) -> bool: """Whether to use weighted average to calculate the mean localization per TID.""" return self._use_weighted_localizationsWhether to use weighted average to calculate the mean localization per TID.
prop weighted_localizations : pandas.core.frame.DataFrame | None-
Expand source code
@property def weighted_localizations(self) -> Union[None, pd.DataFrame]: """Return the average (x, y, z) position per TID weighted by the relative photon count.""" if self._weighted_localizations_to_be_recomputed: self._calculate_weighted_positions() return self._weighted_localizationsReturn the average (x, y, z) position per TID weighted by the relative photon count.
prop z_scaling_factor-
Expand source code
@property def z_scaling_factor(self): """Returns the scaling factor for the z coordinates from the underlying MinFluxReader.""" return self.reader.z_scaling_factorReturns the scaling factor for the z coordinates from the underlying MinFluxReader.
Methods
def filter_by_1d_range(self, x_prop: str, x_range: tuple)-
Expand source code
def filter_by_1d_range(self, x_prop: str, x_range: tuple): """Apply min and max thresholding to the given property. Parameters ---------- x_prop: str Name of the property (dataframe column) to filter. x_range: tuple Tuple containing the minimum and maximum values for the selected property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = TrueApply min and max thresholding to the given property.
Parameters
x_prop:str- Name of the property (dataframe column) to filter.
x_range:tuple- Tuple containing the minimum and maximum values for the selected property.
def filter_by_1d_range_complement(self, prop: str, x_range: tuple)-
Expand source code
def filter_by_1d_range_complement(self, prop: str, x_range: tuple): """Apply min and max thresholding to the given property but keep the data outside the range (i.e., crop the selected range). Parameters ---------- prop: str Name of the property (dataframe column) to filter. x_range: tuple Tuple containing the minimum and maximum values for cropping the selected property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( (self.filtered_dataframe[prop] < x_min) | (self.filtered_dataframe[prop] >= x_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( (self.filtered_dataframe[prop] < x_min) | (self.filtered_dataframe[prop] >= x_max) ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = TrueApply min and max thresholding to the given property but keep the data outside the range (i.e., crop the selected range).
Parameters
prop:str- Name of the property (dataframe column) to filter.
x_range:tuple- Tuple containing the minimum and maximum values for cropping the selected property.
def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple)-
Expand source code
def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple): """Filter TIDs by min and max thresholding using the given property from the stats dataframe. Parameters ---------- x_prop_stats: str Name of the property (column) from the stats dataframe used to filter. x_range: tuple Tuple containing the minimum and maximum values for the selected property. """ # Make sure the property exists in the stats dataframe if x_prop_stats not in self.filtered_dataframe_stats.columns: raise ValueError( f"The property {x_prop_stats} does not exist in `filtered_dataframe_stats`." ) # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max # Find all TIDs for current fluorophore ID by which the requested stats property is inside the range tids_to_keep = self.filtered_dataframe_stats[ ( (self.filtered_dataframe_stats[x_prop_stats] >= x_min) & (self.filtered_dataframe_stats[x_prop_stats] <= x_max) ) ]["tid"].to_numpy() # Rows of the filtered dataframe to keep rows_to_keep = self.filtered_dataframe["tid"].isin(tids_to_keep) # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & rows_to_keep elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore self._selected_rows_dict[self.current_fluorophore_id] = ( self._selected_rows_dict[self.current_fluorophore_id] & rows_to_keep ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = TrueFilter TIDs by min and max thresholding using the given property from the stats dataframe.
Parameters
x_prop_stats:str- Name of the property (column) from the stats dataframe used to filter.
x_range:tuple- Tuple containing the minimum and maximum values for the selected property.
def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range)-
Expand source code
def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range): """Filter dataset by the extracting a rectangular ROI over two parameters and two ranges. Parameters ---------- x_prop: str First property to be filtered by corresponding x_range. y_prop: str Second property to be filtered by corresponding y_range. x_range: tuple Tuple containing the minimum and maximum values for the first property. y_range: tuple Tuple containing the minimum and maximum values for the second property. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max y_min = y_range[0] y_max = y_range[1] if y_max < y_min: y_max, y_min = y_min, y_max if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) & (self.filtered_dataframe[y_prop] >= y_min) & (self.filtered_dataframe[y_prop] < y_max) ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id self._selected_rows_dict[fluo_id] = ( self._selected_rows_dict[fluo_id] & (self.filtered_dataframe[x_prop] >= x_min) & (self.filtered_dataframe[x_prop] < x_max) & (self.filtered_dataframe[y_prop] >= y_min) & (self.filtered_dataframe[y_prop] < y_max) ) # Make sure to always apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = TrueFilter dataset by the extracting a rectangular ROI over two parameters and two ranges.
Parameters
x_prop:str- First property to be filtered by corresponding x_range.
y_prop:str- Second property to be filtered by corresponding y_range.
x_range:tuple- Tuple containing the minimum and maximum values for the first property.
y_range:tuple- Tuple containing the minimum and maximum values for the second property.
def filter_by_single_threshold(self, prop: str, threshold: int | float, larger_than: bool = True)-
Expand source code
def filter_by_single_threshold( self, prop: str, threshold: Union[int, float], larger_than: bool = True ): """Apply single threshold to filter values either lower or higher (equal) than threshold for given property.""" # Apply filter if self.current_fluorophore_id == 0: # Apply to all fluorophores for fluo_id in self._selected_rows_dict.keys(): if larger_than: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] >= threshold ) else: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] < threshold ) elif self.current_fluorophore_id in self._selected_rows_dict: # Apply to specific fluorophore fluo_id = self.current_fluorophore_id if larger_than: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] >= threshold ) else: self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & ( self.filtered_dataframe[prop] < threshold ) # Apply the global filters self._apply_global_filters() # Mark that dynamic filters have been applied self._has_dynamic_filters = True # Make sure to flag the derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = TrueApply single threshold to filter values either lower or higher (equal) than threshold for given property.
def get_fluorophore_name(self, fluo_id: int) ‑> str-
Expand source code
def get_fluorophore_name(self, fluo_id: int) -> str: """Get the name for a specific fluorophore ID. Parameters ---------- fluo_id: int The fluorophore ID Returns ------- name: str The name of the fluorophore (defaults to string representation of ID if not set) """ return self._fluorophore_names.get(fluo_id, str(fluo_id))Get the name for a specific fluorophore ID.
Parameters
fluo_id:int- The fluorophore ID
Returns
name:str- The name of the fluorophore (defaults to string representation of ID if not set)
def has_filters_applied(self) ‑> bool-
Expand source code
def has_filters_applied(self) -> bool: """Check if any dynamic (user-applied) filters have been applied to the data. This does not count global filters like min_trace_length which are always applied. Returns ------- has_filters: bool True if any dynamic filters are active, False otherwise. """ return self._has_dynamic_filtersCheck if any dynamic (user-applied) filters have been applied to the data.
This does not count global filters like min_trace_length which are always applied.
Returns
has_filters:bool- True if any dynamic filters are active, False otherwise.
def replace_dataset(self, new_dataset: pyminflux.processor._dataset.MinFluxDataset)-
Expand source code
def replace_dataset(self, new_dataset: MinFluxDataset): """Replace the current dataset with a new one. This is useful when combining datasets - the processor's dataset can be replaced with the combined dataset without losing filter state or other settings. Parameters ---------- new_dataset: MinFluxDataset The new dataset to use. """ # Store current settings old_min_trace_length = self._min_trace_length old_use_weighted = self._use_weighted_localizations old_fluorophore_names = self._fluorophore_names.copy() # Replace the dataset self.dataset = new_dataset # Re-initialize selection and filters self._selected_rows_dict = None self._init_selected_rows_dict() # Re-initialize fluorophore names (preserving custom names where IDs match) self._fluorophore_names = {} self._init_fluorophore_names() # Restore custom names for matching fluorophore IDs for fluo_id, name in old_fluorophore_names.items(): if fluo_id in self._fluorophore_names: self._fluorophore_names[fluo_id] = name # Restore settings self._min_trace_length = old_min_trace_length self._use_weighted_localizations = old_use_weighted # Reset current fluorophore to "all" self._current_fluorophore_id = 0 # Reset dynamic filters flag (dataset replacement clears all filters) self._has_dynamic_filters = False # Apply global filters self._apply_global_filters() # Flag derived data to be recomputed self._stats_to_be_recomputed = True self._weighted_localizations_to_be_recomputed = TrueReplace the current dataset with a new one.
This is useful when combining datasets - the processor's dataset can be replaced with the combined dataset without losing filter state or other settings.
Parameters
new_dataset:MinFluxDataset- The new dataset to use.
def reset(self)-
Expand source code
def reset(self): """Drops all dynamic filters and resets the data to the processed data frame with global filters.""" # Clear the selection per fluorophore; they will be reinitialized as # all selected at the first access. self._init_selected_rows_dict() # Reset the mapping to the corresponding fluorophore self.processed_dataframe["fluo"] = 1 # Reset fluorophore names (clear custom names first) self._fluorophore_names = {} self._init_fluorophore_names() # Default fluorophore is 0 (no selection) self.current_fluorophore_id = 0 # Reset dynamic filters flag self._has_dynamic_filters = False # Apply global filters self._apply_global_filters()Drops all dynamic filters and resets the data to the processed data frame with global filters.
def select_by_1d_range(self, x_prop, x_range, from_weighted_locs: bool = False) ‑> pandas.core.frame.DataFrame | None-
Expand source code
def select_by_1d_range( self, x_prop, x_range, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameter and corresponding range. The underlying dataframe is not modified. Parameters ---------- x_prop: str Property to be filtered by corresponding x_range. x_range: tuple Tuple containing the minimum and maximum values for the selected property. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded. """ # Make sure that the range is increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[ (self._weighted_localizations[x_prop] >= x_min) & (self._weighted_localizations[x_prop] < x_max) ] else: # Work with currently selected rows if self.filtered_dataframe is None: return None df = self.filtered_dataframe return df.loc[(df[x_prop] >= x_min) & (df[x_prop] < x_max)]Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameter and corresponding range.
The underlying dataframe is not modified.
Parameters
x_prop:str- Property to be filtered by corresponding x_range.
x_range:tuple- Tuple containing the minimum and maximum values for the selected property.
from_weighted_locs:bool- If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.
Returns
subset:Union[None, pd.DataFrame]- A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded.
def select_by_2d_range(self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False) ‑> pandas.core.frame.DataFrame | None-
Expand source code
def select_by_2d_range( self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameters and corresponding ranges. The underlying dataframe is not modified. Parameters ---------- x_prop: str First property to be filtered by corresponding x_range. y_prop: str Second property to be filtered by corresponding y_range. x_range: tuple Tuple containing the minimum and maximum values for the first property. y_range: tuple Tuple containing the minimum and maximum values for the second property. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file was loaded. """ # Make sure that the ranges are increasing x_min = x_range[0] x_max = x_range[1] if x_max < x_min: x_max, x_min = x_min, x_max y_min = y_range[0] y_max = y_range[1] if y_max < y_min: y_max, y_min = y_min, y_max if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[ (self._weighted_localizations[x_prop] >= x_min) & (self._weighted_localizations[x_prop] < x_max) & (self._weighted_localizations[y_prop] >= y_min) & (self._weighted_localizations[y_prop] < y_max) ] else: # Work with currently selected rows if self.filtered_dataframe is None: return None df = self.filtered_dataframe return df.loc[ (df[x_prop] >= x_min) & (df[x_prop] < x_max) & (df[y_prop] >= y_min) & (df[y_prop] < y_max) ]Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameters and corresponding ranges.
The underlying dataframe is not modified.
Parameters
x_prop:str- First property to be filtered by corresponding x_range.
y_prop:str- Second property to be filtered by corresponding y_range.
x_range:tuple- Tuple containing the minimum and maximum values for the first property.
y_range:tuple- Tuple containing the minimum and maximum values for the second property.
from_weighted_locs:bool- If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.
Returns
subset:Union[None, pd.DataFrame]- A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file was loaded.
def select_by_rows(self, indices: numpy.ndarray, from_weighted_locs: bool = False) ‑> pandas.core.frame.DataFrame | None-
Expand source code
def select_by_rows( self, indices: np.ndarray, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame row indices. The underlying dataframe is not modified. Parameters ---------- indices: np.ndarray Logical array for selecting the elements to be returned. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded. """ if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.iloc[indices] else: if self.filtered_dataframe is None: return None return self.filtered_dataframe.iloc[indices]Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame row indices.
The underlying dataframe is not modified.
Parameters
indices:np.ndarray- Logical array for selecting the elements to be returned.
from_weighted_locs:bool- If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.
Returns
subset:Union[None, pd.DataFrame]- A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
def select_by_series_iloc(self, iloc: numpy.ndarray, from_weighted_locs: bool = False) ‑> pandas.core.frame.DataFrame | None-
Expand source code
def select_by_series_iloc( self, iloc: np.ndarray, from_weighted_locs: bool = False ) -> Union[None, pd.DataFrame]: """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame index locations. The underlying dataframe is not modified. Parameters ---------- iloc: np.ndarray Array of Series index locations for selecting rows. from_weighted_locs: bool If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe. Returns ------- subset: Union[None, pd.DataFrame] A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded. """ if from_weighted_locs: if self._weighted_localizations is None: return None return self._weighted_localizations.loc[iloc] else: if self.filtered_dataframe is None: return None return self.filtered_dataframe.loc[iloc]Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame index locations.
The underlying dataframe is not modified.
Parameters
iloc:np.ndarray- Array of Series index locations for selecting rows.
from_weighted_locs:bool- If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.
Returns
subset:Union[None, pd.DataFrame]- A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
def set_fluorophore_ids(self,
fluorophore_ids: numpy.ndarray[tuple[typing.Any, ...], numpy.dtype[numpy.uint8]])-
Expand source code
def set_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]): """Assign the fluorophore IDs to current filtered dataset. This method assigns new fluorophore IDs only to the rows in the current filtered dataset. All other rows remain unchanged. """ if self.filtered_dataframe is None: return if len(fluorophore_ids) != len(self.filtered_dataframe.index): raise ValueError( "The number of fluorophore IDs does not match the number of entries in the dataframe." ) # Get the actual indices from the filtered dataframe filtered_indices = self.filtered_dataframe.index # Assign the new fluorophore IDs to those specific indices only # All other rows remain unchanged (preserving other fluorophores) self.processed_dataframe.loc[filtered_indices, "fluo"] = fluorophore_ids.astype(np.uint8) # Apply global filters self._init_selected_rows_dict() self._init_fluorophore_names() self._apply_global_filters()Assign the fluorophore IDs to current filtered dataset.
This method assigns new fluorophore IDs only to the rows in the current filtered dataset. All other rows remain unchanged.
def set_fluorophore_name(self, fluo_id: int, name: str)-
Expand source code
def set_fluorophore_name(self, fluo_id: int, name: str): """Set the name for a specific fluorophore ID. Parameters ---------- fluo_id: int The fluorophore ID (must be >= 1) name: str The name to assign to this fluorophore """ if fluo_id < 1: raise ValueError(f"Fluorophore ID must be >= 1, got {fluo_id}") if not isinstance(name, str): raise ValueError(f"Fluorophore name must be a string, got {type(name)}") self._fluorophore_names[fluo_id] = nameSet the name for a specific fluorophore ID.
Parameters
fluo_id:int- The fluorophore ID (must be >= 1)
name:str- The name to assign to this fluorophore
def set_fluorophore_names(self, names: dict)-
Expand source code
def set_fluorophore_names(self, names: dict): """Set fluorophore names from a dictionary mapping. This method updates the fluorophore names dictionary, preserving any existing names for fluorophore IDs not included in the names parameter. Parameters ---------- names: dict Dictionary mapping fluo_id (int) to name (str) """ if not isinstance(names, dict): raise ValueError(f"Names must be a dictionary, got {type(names)}") for fluo_id, name in names.items(): if not isinstance(fluo_id, int) or fluo_id < 1: raise ValueError(f"Fluorophore ID must be an integer >= 1, got {fluo_id}") if not isinstance(name, str): raise ValueError(f"Fluorophore name must be a string, got {type(name)}") # Update the dictionary instead of replacing it to preserve existing names self._fluorophore_names.update(names)Set fluorophore names from a dictionary mapping.
This method updates the fluorophore names dictionary, preserving any existing names for fluorophore IDs not included in the names parameter.
Parameters
names:dict- Dictionary mapping fluo_id (int) to name (str)
def set_full_fluorophore_ids(self,
fluorophore_ids: numpy.ndarray[tuple[typing.Any, ...], numpy.dtype[numpy.uint8]])-
Expand source code
def set_full_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]): """Assign the fluorophore IDs to the original, full dataframe ignoring current filters.""" if self.processed_dataframe is None: return if len(fluorophore_ids) != len(self.processed_dataframe.index): raise ValueError( "The number of fluorophore IDs does not match the number of entries in the dataframe." ) self.processed_dataframe["fluo"] = fluorophore_ids.astype(np.uint8) # Apply global filters self._init_selected_rows_dict() self._init_fluorophore_names() self._apply_global_filters()Assign the fluorophore IDs to the original, full dataframe ignoring current filters.