Module pyminflux.processor

Processor of MINFLUX data.

Functions

def combine_datasets_with_bead_alignment(reference_dataset: pyminflux.processor._dataset.MinFluxDataset,
moving_dataset: pyminflux.processor._dataset.MinFluxDataset,
bead_correspondence: Dict[str, str] = None,
transform_type: str = 'euclidean',
n_points: int = 3,
next_fluo_id: int | None = None) ‑> pyminflux.processor._dataset.MinFluxDataset | None
Expand source code
def combine_datasets_with_bead_alignment(
    reference_dataset: MinFluxDataset,
    moving_dataset: MinFluxDataset,
    bead_correspondence: Dict[str, str] = None,
    transform_type: str = 'euclidean',
    n_points: int = 3,
    next_fluo_id: Optional[int] = None,
) -> Optional[MinFluxDataset]:
    """
    Perform bead-based alignment and combine two datasets.
    
    The MBM (bead) data is extracted from the datasets themselves.
    
    Args:
        reference_dataset: Reference dataset (must contain mbm_data)
        moving_dataset: Moving dataset (must contain mbm_data)
        bead_correspondence: Mapping of reference bead names to moving bead names
        transform_type: Type of transformation ('euclidean', 'affine', etc.)
        n_points: Number of earliest points to use for bead position averaging
        next_fluo_id: Fluorophore ID to assign (auto-assigned if None)
        
    Returns:
        Combined dataset, or None if alignment fails
    """
    # Extract MBM data from datasets
    if reference_dataset.mbm_data is None:
        print("Error: Reference dataset does not contain MBM (bead) data")
        return None
    
    if moving_dataset.mbm_data is None:
        print("Error: Moving dataset does not contain MBM (bead) data")
        return None
    
    ref_mbm_dict = reference_dataset.mbm_data.get('mbm', {})
    mov_mbm_dict = moving_dataset.mbm_data.get('mbm', {})
    
    if not ref_mbm_dict:
        print("Error: Reference dataset MBM dictionary is empty")
        return None
    
    if not mov_mbm_dict:
        print("Error: Moving dataset MBM dictionary is empty")
        return None
    
    # Perform alignment
    transform_model = align_datasets_using_beads(
        ref_mbm_dict,
        mov_mbm_dict,
        bead_correspondence=bead_correspondence,
        transform_type=transform_type,
        n_points=n_points,
    )
    
    if transform_model is None:
        return None
    
    # Combine the datasets
    return combine_datasets_with_alignment(
        reference_dataset,
        moving_dataset,
        transform_model,
        next_fluo_id=next_fluo_id,
    )

Perform bead-based alignment and combine two datasets.

The MBM (bead) data is extracted from the datasets themselves.

Args

reference_dataset
Reference dataset (must contain mbm_data)
moving_dataset
Moving dataset (must contain mbm_data)
bead_correspondence
Mapping of reference bead names to moving bead names
transform_type
Type of transformation ('euclidean', 'affine', etc.)
n_points
Number of earliest points to use for bead position averaging
next_fluo_id
Fluorophore ID to assign (auto-assigned if None)

Returns

Combined dataset, or None if alignment fails

def get_bead_positions_from_mbm(mbm_dict: Dict, n_points: int = 3) ‑> Dict[str, numpy.ndarray]
Expand source code
def get_bead_positions_from_mbm(mbm_dict: Dict, n_points: int = 3) -> Dict[str, np.ndarray]:
    """
    Extract average bead positions from MBM dictionary.
    
    Args:
        mbm_dict: MBM dictionary from MinFluxReaderV2
        n_points: Number of earliest points to average
        
    Returns:
        Dictionary mapping bead names to positions [z, y, x] in nm
    """
    df = mbm_dict_to_dataframe(mbm_dict)
    
    if df.empty:
        return {}
    
    # Filter to only include beads marked as "used"
    df = df[df['used'] == True]
    
    if df.empty:
        return {}
    
    bead_positions = {}
    for bead_name in df['bead_name'].unique():
        bead_data = df[df['bead_name'] == bead_name]
        # Get earliest n_points
        earliest = bead_data.nsmallest(n_points, 'tim')
        # Calculate mean position as [z, y, x]
        pos = earliest[['z', 'y', 'x']].mean(axis=0).to_numpy()
        bead_positions[bead_name] = pos
    
    return bead_positions

Extract average bead positions from MBM dictionary.

Args

mbm_dict
MBM dictionary from MinFluxReaderV2
n_points
Number of earliest points to average

Returns

Dictionary mapping bead names to positions [z, y, x] in nm

def get_next_fluorophore_id(df: pandas.core.frame.DataFrame) ‑> int
Expand source code
def get_next_fluorophore_id(df: pd.DataFrame) -> int:
    """
    Get the next available fluorophore ID.
    
    Args:
        df: DataFrame with 'fluo' column
        
    Returns:
        Next available fluorophore ID
    """
    if 'fluo' not in df.columns:
        return 1
    
    current_ids = df['fluo'].unique()
    if len(current_ids) == 0:
        return 1
    
    return int(np.max(current_ids)) + 1

Get the next available fluorophore ID.

Args

df
DataFrame with 'fluo' column

Returns

Next available fluorophore ID

def load_zarr_for_beads(zarr_path: str) ‑> Tuple[pyminflux.reader._reader_v2.MinFluxReaderV2 | None, Dict | None]
Expand source code
def load_zarr_for_beads(zarr_path: str) -> Tuple[Optional[MinFluxReaderV2], Optional[Dict]]:
    """
    Load a Zarr file and extract MBM (bead) data.
    
    Args:
        zarr_path: Path to the Zarr file
        
    Returns:
        Tuple of (reader, mbm_dict) or (None, None) if loading fails
    """
    try:
        zarr_path = Path(zarr_path)
        if not zarr_path.exists():
            print(f"Error: Zarr path does not exist: {zarr_path}")
            return None, None
        
        # Load the reader
        reader = MinFluxReaderV2(zarr_path)
        
        # Check if MBM data is available
        if not hasattr(reader, 'mbm_data') or reader.mbm_data is None:
            print(f"Error: No MBM (bead) data found in {zarr_path}")
            return None, None
        
        mbm_dict = reader.mbm_data.get('mbm', {})
        
        if not mbm_dict:
            print(f"Error: MBM dictionary is empty in {zarr_path}")
            return None, None
        
        return reader, mbm_dict
        
    except Exception as e:
        print(f"Error loading Zarr file {zarr_path}: {e}")
        return None, None

Load a Zarr file and extract MBM (bead) data.

Args

zarr_path
Path to the Zarr file

Returns

Tuple of (reader, mbm_dict) or (None, None) if loading fails

Classes

class MinFluxDataset (processed_dataframe: pandas.core.frame.DataFrame,
full_raw_dataframe: pandas.core.frame.DataFrame | None = None,
valid_raw_data_array: numpy.ndarray | None = None,
filename: pathlib.Path | str | None = None,
is_3d: bool = False,
is_tracking: bool = False,
is_aggregated: bool = False,
z_scaling_factor: float = 1.0,
unit_scaling_factor: float = 1000000000.0,
dwell_time: float = 1.0,
pool_dcr: bool = False,
version: int = 2,
mbm_data: dict | None = None,
tid_offsets: List[Tuple[int, int]] | None = None)
Expand source code
class MinFluxDataset:
    """Container for MINFLUX data that can be processed by MinFluxProcessor.
    
    This class serves as a data container that holds the processed dataframe
    and associated metadata. It can be created from a MinFluxReader or by
    combining multiple datasets.
    """
    
    __slots__ = [
        "_processed_dataframe",
        "_full_raw_dataframe",
        "_valid_raw_data_array",
        "_filename",
        "_is_3d",
        "_is_tracking",
        "_is_aggregated",
        "_z_scaling_factor",
        "_unit_scaling_factor",
        "_dwell_time",
        "_pool_dcr",
        "_version",
        "_mbm_data",
        "_tid_offsets",
    ]
    
    def __init__(
        self,
        processed_dataframe: pd.DataFrame,
        full_raw_dataframe: Optional[pd.DataFrame] = None,
        valid_raw_data_array: Optional[np.ndarray] = None,
        filename: Optional[Union[Path, str]] = None,
        is_3d: bool = False,
        is_tracking: bool = False,
        is_aggregated: bool = False,
        z_scaling_factor: float = 1.0,
        unit_scaling_factor: float = 1e9,
        dwell_time: float = 1.0,
        pool_dcr: bool = False,
        version: int = 2,
        mbm_data: Optional[dict] = None,
        tid_offsets: Optional[List[Tuple[int, int]]] = None,
    ):
        """Constructor.
        
        Parameters
        ----------
        processed_dataframe: pd.DataFrame
            The processed dataframe with MINFLUX data.
        full_raw_dataframe: Optional[pd.DataFrame]
            The full raw dataframe (for version 2 readers), or None.
        valid_raw_data_array: Optional[np.ndarray]
            The valid raw data array (for version 1 readers), or None.
        filename: Optional[Union[Path, str]]
            Original filename, if applicable.
        is_3d: bool
            Whether the acquisition is 3D.
        is_tracking: bool
            Whether the dataset is from a tracking experiment.
        is_aggregated: bool
            Whether the dataset contains aggregated measurements.
        z_scaling_factor: float
            Refractive index mismatch correction factor for z coordinates.
        unit_scaling_factor: float
            Unit scaling factor to convert raw coordinates to nm.
        dwell_time: float
            Dwell time in milliseconds.
        pool_dcr: bool
            Whether to pool DCR values.
        version: int
            Reader version (1 or 2).
        mbm_data: Optional[dict]
            Beamline monitoring (bead) data, if available.
        tid_offsets: Optional[List[Tuple[int, int]]]
            List of (first_iid, tid_offset) pairs applied when combining datasets.
        """
        self._processed_dataframe = processed_dataframe
        self._full_raw_dataframe = full_raw_dataframe
        self._valid_raw_data_array = valid_raw_data_array
        self._filename = Path(filename) if filename else None
        self._is_3d = is_3d
        self._is_tracking = is_tracking
        self._is_aggregated = is_aggregated
        self._z_scaling_factor = z_scaling_factor
        self._unit_scaling_factor = unit_scaling_factor
        self._dwell_time = dwell_time
        self._pool_dcr = pool_dcr
        self._version = version
        self._mbm_data = mbm_data
        self._tid_offsets = list(tid_offsets) if tid_offsets else []
    
    @classmethod
    def from_reader(cls, reader):
        """Create a MinFluxDataset from a MinFluxReader or MinFluxReaderV2.
        
        Parameters
        ----------
        reader: Union[MinFluxReader, MinFluxReaderV2]
            The reader instance to extract data from.
            
        Returns
        -------
        dataset: MinFluxDataset
            A new dataset instance.
        """
        # Get the full raw dataframe if it exists (version 2)
        full_raw_dataframe = getattr(reader, "_full_raw_dataframe", None)
        mbm_data = getattr(reader, "_mbm_data", None)
        valid_raw_data_array = None
        if hasattr(reader, "valid_raw_data_array"):
            try:
                valid_raw_data_array = reader.valid_raw_data_array
            except Exception:
                valid_raw_data_array = None
        
        return cls(
            processed_dataframe=reader.processed_dataframe.copy(),
            full_raw_dataframe=full_raw_dataframe.copy() if full_raw_dataframe is not None else None,
            valid_raw_data_array=valid_raw_data_array.copy() if valid_raw_data_array is not None else None,
            filename=getattr(reader, "filename", None),
            is_3d=getattr(reader, "is_3d", False),
            is_tracking=getattr(reader, "is_tracking", False),
            is_aggregated=getattr(reader, "is_aggregated", False),
            z_scaling_factor=getattr(reader, "z_scaling_factor", 1.0),
            unit_scaling_factor=getattr(reader, "_unit_scaling_factor", 1e9),
            dwell_time=getattr(reader, "dwell_time", 1.0),
            pool_dcr=getattr(reader, "is_pool_dcr", False),
            version=getattr(reader, "version", 2),
            mbm_data=mbm_data,
            tid_offsets=getattr(reader, "tid_offsets", None),
        )
    
    @property
    def processed_dataframe(self) -> pd.DataFrame:
        """Return the processed dataframe."""
        return self._processed_dataframe
    
    @processed_dataframe.setter
    def processed_dataframe(self, value: pd.DataFrame):
        """Set the processed dataframe."""
        self._processed_dataframe = value
    
    @property
    def full_raw_dataframe(self) -> Optional[pd.DataFrame]:
        """Return the full raw dataframe (version 2 only)."""
        return self._full_raw_dataframe
    
    @full_raw_dataframe.setter
    def full_raw_dataframe(self, value: Optional[pd.DataFrame]):
        """Set the full raw dataframe."""
        self._full_raw_dataframe = value
    
    @property
    def filename(self) -> Optional[Path]:
        """Return the filename."""
        return self._filename
    
    @property
    def is_3d(self) -> bool:
        """Return True if the acquisition is 3D."""
        return self._is_3d
    
    @property
    def is_tracking(self) -> bool:
        """Return True if the dataset is from a tracking experiment."""
        return self._is_tracking
    
    @property
    def is_aggregated(self) -> bool:
        """Return True if the dataset contains aggregated measurements."""
        return self._is_aggregated
    
    @property
    def z_scaling_factor(self) -> float:
        """Return the z scaling factor."""
        return self._z_scaling_factor

    @property
    def unit_scaling_factor(self) -> float:
        """Return the unit scaling factor for raw coordinates."""
        return self._unit_scaling_factor
    
    @property
    def dwell_time(self) -> float:
        """Return the dwell time."""
        return self._dwell_time
    
    @property
    def is_pool_dcr(self) -> bool:
        """Return True if DCR values are pooled."""
        return self._pool_dcr
    
    @property
    def version(self) -> int:
        """Return the reader version."""
        return self._version
    
    @property
    def mbm_data(self) -> Optional[dict]:
        """Return the beamline monitoring data."""
        return self._mbm_data

    @property
    def tid_offsets(self) -> List[Tuple[int, int]]:
        """Return list of (first_iid, tid_offset) pairs applied when combining datasets."""
        return list(self._tid_offsets)
    
    @property
    def valid_full_raw_dataframe(self) -> Optional[pd.DataFrame]:
        """Return the valid raw dataframe (for compatibility with version 2 readers)."""
        if self._full_raw_dataframe is None:
            return None
        # Assume all entries in the dataset are valid
        return self._full_raw_dataframe.copy()
    
    @property
    def valid_raw_data_array(self):
        """Return the valid raw data array (for compatibility with version 1 readers).
        
        Note: Version 1 functionality is not fully supported in datasets.
        This property exists for backwards compatibility but will return None
        for datasets created from version 2 readers.
        """
        if self._valid_raw_data_array is None:
            return None
        return self._valid_raw_data_array.copy()
    
    def copy(self):
        """Create a deep copy of the dataset."""
        return MinFluxDataset(
            processed_dataframe=self._processed_dataframe.copy(),
            full_raw_dataframe=self._full_raw_dataframe.copy() if self._full_raw_dataframe is not None else None,
            valid_raw_data_array=self._valid_raw_data_array.copy() if self._valid_raw_data_array is not None else None,
            filename=self._filename,
            is_3d=self._is_3d,
            is_tracking=self._is_tracking,
            is_aggregated=self._is_aggregated,
            z_scaling_factor=self._z_scaling_factor,
            unit_scaling_factor=self._unit_scaling_factor,
            dwell_time=self._dwell_time,
            pool_dcr=self._pool_dcr,
            version=self._version,
            mbm_data=self._mbm_data,
            tid_offsets=self._tid_offsets.copy(),
        )

Container for MINFLUX data that can be processed by MinFluxProcessor.

This class serves as a data container that holds the processed dataframe and associated metadata. It can be created from a MinFluxReader or by combining multiple datasets.

Constructor.

Parameters

processed_dataframe : pd.DataFrame
The processed dataframe with MINFLUX data.
full_raw_dataframe : Optional[pd.DataFrame]
The full raw dataframe (for version 2 readers), or None.
valid_raw_data_array : Optional[np.ndarray]
The valid raw data array (for version 1 readers), or None.
filename : Optional[Union[Path, str]]
Original filename, if applicable.
is_3d : bool
Whether the acquisition is 3D.
is_tracking : bool
Whether the dataset is from a tracking experiment.
is_aggregated : bool
Whether the dataset contains aggregated measurements.
z_scaling_factor : float
Refractive index mismatch correction factor for z coordinates.
unit_scaling_factor : float
Unit scaling factor to convert raw coordinates to nm.
dwell_time : float
Dwell time in milliseconds.
pool_dcr : bool
Whether to pool DCR values.
version : int
Reader version (1 or 2).
mbm_data : Optional[dict]
Beamline monitoring (bead) data, if available.
tid_offsets : Optional[List[Tuple[int, int]]]
List of (first_iid, tid_offset) pairs applied when combining datasets.

Static methods

def from_reader(reader)

Create a MinFluxDataset from a MinFluxReader or MinFluxReaderV2.

Parameters

reader : Union[MinFluxReader, MinFluxReaderV2]
The reader instance to extract data from.

Returns

dataset : MinFluxDataset
A new dataset instance.

Instance variables

prop dwell_time : float
Expand source code
@property
def dwell_time(self) -> float:
    """Return the dwell time."""
    return self._dwell_time

Return the dwell time.

prop filename : pathlib.Path | None
Expand source code
@property
def filename(self) -> Optional[Path]:
    """Return the filename."""
    return self._filename

Return the filename.

prop full_raw_dataframe : pandas.core.frame.DataFrame | None
Expand source code
@property
def full_raw_dataframe(self) -> Optional[pd.DataFrame]:
    """Return the full raw dataframe (version 2 only)."""
    return self._full_raw_dataframe

Return the full raw dataframe (version 2 only).

prop is_3d : bool
Expand source code
@property
def is_3d(self) -> bool:
    """Return True if the acquisition is 3D."""
    return self._is_3d

Return True if the acquisition is 3D.

prop is_aggregated : bool
Expand source code
@property
def is_aggregated(self) -> bool:
    """Return True if the dataset contains aggregated measurements."""
    return self._is_aggregated

Return True if the dataset contains aggregated measurements.

prop is_pool_dcr : bool
Expand source code
@property
def is_pool_dcr(self) -> bool:
    """Return True if DCR values are pooled."""
    return self._pool_dcr

Return True if DCR values are pooled.

prop is_tracking : bool
Expand source code
@property
def is_tracking(self) -> bool:
    """Return True if the dataset is from a tracking experiment."""
    return self._is_tracking

Return True if the dataset is from a tracking experiment.

prop mbm_data : dict | None
Expand source code
@property
def mbm_data(self) -> Optional[dict]:
    """Return the beamline monitoring data."""
    return self._mbm_data

Return the beamline monitoring data.

prop processed_dataframe : pandas.core.frame.DataFrame
Expand source code
@property
def processed_dataframe(self) -> pd.DataFrame:
    """Return the processed dataframe."""
    return self._processed_dataframe

Return the processed dataframe.

prop tid_offsets : List[Tuple[int, int]]
Expand source code
@property
def tid_offsets(self) -> List[Tuple[int, int]]:
    """Return list of (first_iid, tid_offset) pairs applied when combining datasets."""
    return list(self._tid_offsets)

Return list of (first_iid, tid_offset) pairs applied when combining datasets.

prop unit_scaling_factor : float
Expand source code
@property
def unit_scaling_factor(self) -> float:
    """Return the unit scaling factor for raw coordinates."""
    return self._unit_scaling_factor

Return the unit scaling factor for raw coordinates.

prop valid_full_raw_dataframe : pandas.core.frame.DataFrame | None
Expand source code
@property
def valid_full_raw_dataframe(self) -> Optional[pd.DataFrame]:
    """Return the valid raw dataframe (for compatibility with version 2 readers)."""
    if self._full_raw_dataframe is None:
        return None
    # Assume all entries in the dataset are valid
    return self._full_raw_dataframe.copy()

Return the valid raw dataframe (for compatibility with version 2 readers).

prop valid_raw_data_array
Expand source code
@property
def valid_raw_data_array(self):
    """Return the valid raw data array (for compatibility with version 1 readers).
    
    Note: Version 1 functionality is not fully supported in datasets.
    This property exists for backwards compatibility but will return None
    for datasets created from version 2 readers.
    """
    if self._valid_raw_data_array is None:
        return None
    return self._valid_raw_data_array.copy()

Return the valid raw data array (for compatibility with version 1 readers).

Note: Version 1 functionality is not fully supported in datasets. This property exists for backwards compatibility but will return None for datasets created from version 2 readers.

prop version : int
Expand source code
@property
def version(self) -> int:
    """Return the reader version."""
    return self._version

Return the reader version.

prop z_scaling_factor : float
Expand source code
@property
def z_scaling_factor(self) -> float:
    """Return the z scaling factor."""
    return self._z_scaling_factor

Return the z scaling factor.

Methods

def copy(self)
Expand source code
def copy(self):
    """Create a deep copy of the dataset."""
    return MinFluxDataset(
        processed_dataframe=self._processed_dataframe.copy(),
        full_raw_dataframe=self._full_raw_dataframe.copy() if self._full_raw_dataframe is not None else None,
        valid_raw_data_array=self._valid_raw_data_array.copy() if self._valid_raw_data_array is not None else None,
        filename=self._filename,
        is_3d=self._is_3d,
        is_tracking=self._is_tracking,
        is_aggregated=self._is_aggregated,
        z_scaling_factor=self._z_scaling_factor,
        unit_scaling_factor=self._unit_scaling_factor,
        dwell_time=self._dwell_time,
        pool_dcr=self._pool_dcr,
        version=self._version,
        mbm_data=self._mbm_data,
        tid_offsets=self._tid_offsets.copy(),
    )

Create a deep copy of the dataset.

class MinFluxProcessor (source: pyminflux.reader._reader.MinFluxReader | pyminflux.reader._reader_v2.MinFluxReaderV2 | pyminflux.processor._dataset.MinFluxDataset,
min_trace_length: int = 1)
Expand source code
class MinFluxProcessor:
    """Processor of MINFLUX data."""

    __doc__ = """Allows for filtering and selecting data read by the underlying `MinFluxReader`. Please notice that
     `MinFluxProcessor` makes use of `State.min_trace_length` to make sure that at load and after every
      filtering step, short traces are dropped."""

    __slots__ = [
        "state",
        "dataset",
        "_current_fluorophore_id",
        "_filtered_stats_dataframe",
        "_fluorophore_names",
        "_min_trace_length",
        "_selected_rows_dict",
        "_stats_to_be_recomputed",
        "_weighted_localizations",
        "_weighted_localizations_to_be_recomputed",
        "_use_weighted_localizations",
        "_has_dynamic_filters",
    ]

    def __init__(
        self,
        source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset],
        min_trace_length: int = 1,
    ):
        """Constructor.

        Parameters
        ----------

        source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset]
            Either a MinFluxReader object or a MinFluxDataset.

        min_trace_length: int (Default = 1)
            Minimum number of localizations for a trace to be kept. Shorter traces are dropped.
        """

        # Convert reader-like sources to dataset if necessary (duck-typed)
        if isinstance(source, MinFluxDataset):
            self.dataset = source
        elif isinstance(source, (MinFluxReader, MinFluxReaderV2)) or hasattr(
            source, "processed_dataframe"
        ):
            self.dataset = MinFluxDataset.from_reader(source)
        else:
            raise TypeError(
                f"source must be MinFluxReader, MinFluxReaderV2, MinFluxDataset, or reader-like; got {type(source)}"
            )

        # Global options (to be applied after every operation)
        self._min_trace_length: int = min_trace_length

        # Cache the filtered stats dataframe
        self._filtered_stats_dataframe = None

        # Keep separate arrays of booleans to cache selection state for all
        # fluorophore IDs.
        self._selected_rows_dict = None
        self._init_selected_rows_dict()

        # Keep track of the selected fluorophore
        # 0 - All (default)
        # 1 - Fluorophore 1
        # 2 - Fluorophore 2
        self._current_fluorophore_id = 0

        # Cache the weighted, averaged TID positions
        self._weighted_localizations = None

        # Keep track whether the statistics and the weighted localizations need to be recomputed
        self._stats_to_be_recomputed = False
        self._weighted_localizations_to_be_recomputed = False

        # Whether to use weighted average for localizations
        self._use_weighted_localizations = False

        # Track whether dynamic (user-applied) filters have been applied
        self._has_dynamic_filters = False

        # Initialize fluorophore names mapping (fluo_id -> name)
        self._fluorophore_names = {}
        self._init_fluorophore_names()

        # Apply the global filters
        self._apply_global_filters()

    def _init_selected_rows_dict(self):
        """Initialize the selected rows array."""
        if self.processed_dataframe is None:
            return

        # Create selection mask
        keep_mask = pd.Series(
            data=np.ones(len(self.processed_dataframe.index), dtype=bool),
            index=self.processed_dataframe.index,
        )

        # Store the mask for each fluorophore present in the data
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        unique_fluos = unique_fluos[unique_fluos > 0]
        self._selected_rows_dict = {}
        for fluo_id in unique_fluos:
            self._selected_rows_dict[int(fluo_id)] = keep_mask.copy()

    def has_filters_applied(self) -> bool:
        """Check if any dynamic (user-applied) filters have been applied to the data.
        
        This does not count global filters like min_trace_length which are always applied.
        
        Returns
        -------
        has_filters: bool
            True if any dynamic filters are active, False otherwise.
        """
        return self._has_dynamic_filters

    def _init_fluorophore_names(self):
        """Initialize fluorophore names with default values (string representation of fluo ID)."""
        if self.processed_dataframe is None:
            return
        
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        unique_fluos = unique_fluos[unique_fluos > 0]
        for fluo_id in unique_fluos:
            fluo_id_int = int(fluo_id)
            if fluo_id_int not in self._fluorophore_names:
                self._fluorophore_names[fluo_id_int] = str(fluo_id_int)

    @property
    def reader(self):
        """Return a reader-like interface to the dataset for backwards compatibility.
        
        This property provides access to the underlying dataset using the
        reader interface that existing code expects.
        """
        return self.dataset

    @property
    def is_tracking(self):
        """Minimum number of localizations for the trace to be kept."""
        return self.reader.is_tracking

    @property
    def min_trace_length(self):
        """Minimum number of localizations for the trace to be kept."""
        return self._min_trace_length

    @property
    def z_scaling_factor(self):
        """Returns the scaling factor for the z coordinates from the underlying MinFluxReader."""
        return self.reader.z_scaling_factor

    @min_trace_length.setter
    def min_trace_length(self, value):
        if value < 1 or int(value) != value:
            raise ValueError(
                "MinFluxProcessor.min_trace_length must be a positive integer!"
            )
        self._min_trace_length = value

        # Run the global filters
        self._apply_global_filters()

    @property
    def is_3d(self) -> bool:
        """Return True if the acquisition is 3D.

        Returns
        -------

        is_3d: bool
            True if the acquisition is 3D, False otherwise.
        """
        return self.reader.is_3d

    @property
    def num_values(self) -> int:
        """Return the number of values in the (filtered) dataframe.

        Returns
        -------

        n: int
            Number of values in the dataframe after all filters have been applied.
        """
        if self.filtered_dataframe is not None:
            return len(self.filtered_dataframe.index)

        return 0

    @property
    def current_fluorophore_id(self) -> int:
        """Return current fluorophore ID (0 for all)."""
        return self._current_fluorophore_id

    @current_fluorophore_id.setter
    def current_fluorophore_id(self, fluorophore_id: int) -> None:
        """Set current fluorophore ID (0 for all)."""

        # Validate fluorophore ID: 0 for all, or 1 to num_fluorophores
        if fluorophore_id < 0:
            raise ValueError(f"Fluorophore ID must be non-negative, got {fluorophore_id}.")
        
        if fluorophore_id > 0 and fluorophore_id > self.num_fluorophores:
            raise ValueError(
                f"Fluorophore ID {fluorophore_id} is out of range. "
                f"Valid range is 0 (all) or 1-{self.num_fluorophores}."
            )

        # Set the new fluorophore_id
        self._current_fluorophore_id = fluorophore_id

        # Apply the global filters
        self._apply_global_filters()

        # Flag stats to be recomputed
        self._stats_to_be_recomputed = True

    @property
    def num_fluorophores(self) -> int:
        """Return the number of fluorophores."""
        if self.processed_dataframe is None:
            return 0
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        return len(unique_fluos[unique_fluos > 0])

    @property
    def fluorophore_names(self) -> dict:
        """Return the fluorophore names mapping (fluo_id -> name)."""
        return self._fluorophore_names.copy()

    def set_fluorophore_name(self, fluo_id: int, name: str):
        """Set the name for a specific fluorophore ID.
        
        Parameters
        ----------
        fluo_id: int
            The fluorophore ID (must be >= 1)
        name: str
            The name to assign to this fluorophore
        """
        if fluo_id < 1:
            raise ValueError(f"Fluorophore ID must be >= 1, got {fluo_id}")
        if not isinstance(name, str):
            raise ValueError(f"Fluorophore name must be a string, got {type(name)}")
        self._fluorophore_names[fluo_id] = name

    def set_fluorophore_names(self, names: dict):
        """Set fluorophore names from a dictionary mapping.
        
        This method updates the fluorophore names dictionary, preserving
        any existing names for fluorophore IDs not included in the names parameter.
        
        Parameters
        ----------
        names: dict
            Dictionary mapping fluo_id (int) to name (str)
        """
        if not isinstance(names, dict):
            raise ValueError(f"Names must be a dictionary, got {type(names)}")
        for fluo_id, name in names.items():
            if not isinstance(fluo_id, int) or fluo_id < 1:
                raise ValueError(f"Fluorophore ID must be an integer >= 1, got {fluo_id}")
            if not isinstance(name, str):
                raise ValueError(f"Fluorophore name must be a string, got {type(name)}")
        # Update the dictionary instead of replacing it to preserve existing names
        self._fluorophore_names.update(names)

    def get_fluorophore_name(self, fluo_id: int) -> str:
        """Get the name for a specific fluorophore ID.
        
        Parameters
        ----------
        fluo_id: int
            The fluorophore ID
            
        Returns
        -------
        name: str
            The name of the fluorophore (defaults to string representation of ID if not set)
        """
        return self._fluorophore_names.get(fluo_id, str(fluo_id))

    def _filtered_raw_data_array_all_fluorophores(self):
        """Return the raw NumPy array with applied filters (for all fluorophores).

        This is only compatible with version 1 MinFluxReader (check performed by
        the invoked protected methods).
        """

        # Copy the raw NumPy array
        raw_array = self.reader.valid_raw_data_array
        if raw_array is None:
            return None

        if self.processed_dataframe is None or self._selected_rows_dict is None:
            return None

        # Append the fluorophore ID data
        raw_array["fluo"] = self.processed_dataframe["fluo"].astype(np.uint8)

        # Extract combination of all fluorophore filtered dataframes
        combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool)
        for fluo_id, fluo_mask in self._selected_rows_dict.items():
            combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask
        
        return raw_array[combined_mask]

    @property
    def filtered_raw_data_array(self):
        """Return the raw NumPy array with applied filters for the selected fluorophores.

        This is only compatible with version 1 MinFluxReader (check performed by
        the invoked protected methods).
        """

        # Copy the raw NumPy array
        full_array = self._filtered_raw_data_array_all_fluorophores()
        if full_array is None:
            return None

        if self.current_fluorophore_id == 0:
            return full_array
        else:
            # Filter by the current fluorophore ID
            return full_array[full_array["fluo"] == self.current_fluorophore_id]

    @property
    def processed_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return the full dataframe (with valid entries only), with no selections or filters.

        Returns
        -------

        processed_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        return self.dataset.processed_dataframe

    @property
    def filename(self) -> Union[Path, None]:
        """Return the filename if set."""
        return self.dataset.filename

    def replace_dataset(self, new_dataset: MinFluxDataset):
        """Replace the current dataset with a new one.
        
        This is useful when combining datasets - the processor's dataset
        can be replaced with the combined dataset without losing filter state
        or other settings.
        
        Parameters
        ----------
        new_dataset: MinFluxDataset
            The new dataset to use.
        """
        # Store current settings
        old_min_trace_length = self._min_trace_length
        old_use_weighted = self._use_weighted_localizations
        old_fluorophore_names = self._fluorophore_names.copy()
        
        # Replace the dataset
        self.dataset = new_dataset
        
        # Re-initialize selection and filters
        self._selected_rows_dict = None
        self._init_selected_rows_dict()
        
        # Re-initialize fluorophore names (preserving custom names where IDs match)
        self._fluorophore_names = {}
        self._init_fluorophore_names()
        
        # Restore custom names for matching fluorophore IDs
        for fluo_id, name in old_fluorophore_names.items():
            if fluo_id in self._fluorophore_names:
                self._fluorophore_names[fluo_id] = name
        
        # Restore settings
        self._min_trace_length = old_min_trace_length
        self._use_weighted_localizations = old_use_weighted
        
        # Reset current fluorophore to "all"
        self._current_fluorophore_id = 0
        
        # Reset dynamic filters flag (dataset replacement clears all filters)
        self._has_dynamic_filters = False
        
        # Apply global filters
        self._apply_global_filters()
        
        # Flag derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _filtered_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]:
        """Return joint dataframe for all fluorophores and with all filters applied.

        Returns
        -------

        filtered_dataframe_all: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        if self.processed_dataframe is None or self._selected_rows_dict is None:
            return None

        # Extract combination of all fluorophore filtered dataframes
        combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool)
        for fluo_id, fluo_mask in self._selected_rows_dict.items():
            combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask
        
        return self.processed_dataframe.loc[combined_mask]

    def _filtered_raw_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]:
        """Return joint raw dataframe (all properties) for all fluorophores and with all filters applied.

        Returns
        -------

        filtered_dataframe_all: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        # This is only compatible with MinFluxReader version 2
        if self.reader.version != 2:
            raise ValueError("Only reader version 2 is supported.")

        # valid_full_raw_dataframe is a MinFluxReaderV2 property
        raw_array = self.reader.valid_full_raw_dataframe

        # Now extract the fluorophore assignments from self.processed_dataframe and
        # expand them onto the raw array
        if "fluo" in raw_array.columns:
            raw_fluo = raw_array["fluo"].fillna(0).astype(np.uint8).to_numpy(copy=True)
        else:
            raw_fluo = np.zeros(len(raw_array), dtype=np.uint8)

        if "iid" in raw_array.columns and "iid" in self.processed_dataframe.columns:
            fluo_map = dict(
                zip(
                    self.processed_dataframe["iid"],
                    self.processed_dataframe["fluo"].astype(np.uint8),
                )
            )
            mapped_fluo = raw_array["iid"].map(fluo_map)
            matched_mask = mapped_fluo.notna().to_numpy()
            if matched_mask.any():
                raw_fluo[matched_mask] = mapped_fluo.loc[matched_mask].astype(np.uint8).to_numpy()

        raw_array["fluo"] = raw_fluo.astype(np.uint8)

        # Return the array with the assigned fluorophores
        return raw_array

    @property
    def filtered_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return dataframe with all filters applied.

        Returns
        -------

        filtered_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        if self.processed_dataframe is None:
            return None
        if self.current_fluorophore_id == 0:
            return self._filtered_dataframe_all_fluorophores()
        else:
            # Use .loc to filter the dataframe in a single step
            filtered_df = self.processed_dataframe.loc[
                self.processed_dataframe["fluo"] == self.current_fluorophore_id
            ]
            if self._selected_rows_dict is None:
                return None
            selected_indices = self._selected_rows_dict.get(
                self.current_fluorophore_id, []
            )
            return filtered_df.loc[selected_indices]

    @property
    def filtered_raw_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return joint dataframe with all filters applied.

        Returns
        -------

        filtered_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        # This is only compatible with MinFluxReader version 2
        if self.reader.version != 2:
            raise ValueError("Only reader version 2 is supported.")

        # Copy the raw dataframe array
        full_dataframe = self._filtered_raw_dataframe_all_fluorophores()
        if full_dataframe is None:
            return None

        # Get the IIDs from the currently filtered localizations
        iids = self.filtered_dataframe["iid"].to_numpy()

        # Extract all rows matching the current set of iids
        full_dataframe = full_dataframe[full_dataframe["iid"].isin(iids)]

        # If needed, filter by fluorophore ID
        if self.current_fluorophore_id == 0:
            return full_dataframe
        else:
            # Filter by the current fluorophore ID
            return full_dataframe[full_dataframe["fluo"] == self.current_fluorophore_id]

    @property
    def filtered_dataframe_stats(self) -> Union[None, pd.DataFrame]:
        """Return dataframe stats with all filters applied.

        Returns
        -------

        filtered_dataframe_stats: Union[None, pd.DataFrame]
            A Pandas dataframe with all data statistics or None if no file was loaded.
        """
        if self._stats_to_be_recomputed:
            self._calculate_statistics()
        return self._filtered_stats_dataframe

    @property
    def weighted_localizations(self) -> Union[None, pd.DataFrame]:
        """Return the average (x, y, z) position per TID weighted by the relative photon count."""
        if self._weighted_localizations_to_be_recomputed:
            self._calculate_weighted_positions()
        return self._weighted_localizations

    @property
    def use_weighted_localizations(self) -> bool:
        """Whether to use weighted average to calculate the mean localization per TID."""
        return self._use_weighted_localizations

    @use_weighted_localizations.setter
    def use_weighted_localizations(self, value: bool):
        """Whether to use weighted average to calculate the mean localization per TID."""
        self._use_weighted_localizations = value
        self._weighted_localizations_to_be_recomputed = True

    @classmethod
    def processed_properties(cls):
        """Return the processed dataframe columns."""
        return MinFluxReader.processed_properties()

    @classmethod
    def trace_stats_properties(cls):
        """Return the columns of the filtered_dataframe_stats."""
        return [
            "tid",
            "n",
            "fluo",
            "mx",
            "my",
            "mz",
            "sx",
            "sy",
            "sxy",
            "exy",
            "rms_xy",
            "sz",
            "ez",
            "mtim",
            "tim_tot",
        ]

    @classmethod
    def trace_stats_with_tracking_properties(cls):
        """Return the columns of the filtered_dataframe_stats with tracking columns."""
        return MinFluxProcessor.trace_stats_properties() + [
            "avg_speed",
            "total_dist",
        ]

    def reset(self):
        """Drops all dynamic filters and resets the data to the processed data frame with global filters."""

        # Clear the selection per fluorophore; they will be reinitialized as
        # all selected at the first access.
        self._init_selected_rows_dict()

        # Reset the mapping to the corresponding fluorophore
        self.processed_dataframe["fluo"] = 1

        # Reset fluorophore names (clear custom names first)
        self._fluorophore_names = {}
        self._init_fluorophore_names()

        # Default fluorophore is 0 (no selection)
        self.current_fluorophore_id = 0

        # Reset dynamic filters flag
        self._has_dynamic_filters = False

        # Apply global filters
        self._apply_global_filters()

    def set_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]):
        """Assign the fluorophore IDs to current filtered dataset.
        
        This method assigns new fluorophore IDs only to the rows in the current
        filtered dataset. All other rows remain unchanged.
        """
        if self.filtered_dataframe is None:
            return
        if len(fluorophore_ids) != len(self.filtered_dataframe.index):
            raise ValueError(
                "The number of fluorophore IDs does not match the number of entries in the dataframe."
            )

        # Get the actual indices from the filtered dataframe
        filtered_indices = self.filtered_dataframe.index
        
        # Assign the new fluorophore IDs to those specific indices only
        # All other rows remain unchanged (preserving other fluorophores)
        self.processed_dataframe.loc[filtered_indices, "fluo"] = fluorophore_ids.astype(np.uint8)

        # Apply global filters
        self._init_selected_rows_dict()
        self._init_fluorophore_names()
        self._apply_global_filters()

    def set_full_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]):
        """Assign the fluorophore IDs to the original, full dataframe ignoring current filters."""
        if self.processed_dataframe is None:
            return
        if len(fluorophore_ids) != len(self.processed_dataframe.index):
            raise ValueError(
                "The number of fluorophore IDs does not match the number of entries in the dataframe."
            )
        self.processed_dataframe["fluo"] = fluorophore_ids.astype(np.uint8)

        # Apply global filters
        self._init_selected_rows_dict()
        self._init_fluorophore_names()
        self._apply_global_filters()

    def select_by_rows(
        self, indices: np.ndarray, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed
        DataFrame row indices.

        The underlying dataframe is not modified.

        Parameters
        ----------

        indices: np.ndarray
            Logical array for selecting the elements to be returned.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
        """
        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.iloc[indices]
        else:
            if self.filtered_dataframe is None:
                return None
            return self.filtered_dataframe.iloc[indices]

    def select_by_series_iloc(
        self, iloc: np.ndarray, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed
        DataFrame index locations.

        The underlying dataframe is not modified.

        Parameters
        ----------

        iloc: np.ndarray
            Array of Series index locations for selecting rows.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
        """
        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[iloc]
        else:
            if self.filtered_dataframe is None:
                return None
            return self.filtered_dataframe.loc[iloc]

    def select_by_1d_range(
        self, x_prop, x_range, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed
        parameter and corresponding range.

        The underlying dataframe is not modified.

        Parameters
        ----------

        x_prop: str
            Property to be filtered by corresponding x_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded.
        """

        # Make sure that the range is increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[
                (self._weighted_localizations[x_prop] >= x_min)
                & (self._weighted_localizations[x_prop] < x_max)
            ]
        else:
            # Work with currently selected rows
            if self.filtered_dataframe is None:
                return None
            df = self.filtered_dataframe
            return df.loc[(df[x_prop] >= x_min) & (df[x_prop] < x_max)]

    def select_by_2d_range(
        self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed
        parameters and corresponding ranges.

        The underlying dataframe is not modified.

        Parameters
        ----------

        x_prop: str
            First property to be filtered by corresponding x_range.

        y_prop: str
            Second property to be filtered by corresponding y_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the first property.

        y_range: tuple
            Tuple containing the minimum and maximum values for the second property.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file
            was loaded.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        y_min = y_range[0]
        y_max = y_range[1]
        if y_max < y_min:
            y_max, y_min = y_min, y_max

        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[
                (self._weighted_localizations[x_prop] >= x_min)
                & (self._weighted_localizations[x_prop] < x_max)
                & (self._weighted_localizations[y_prop] >= y_min)
                & (self._weighted_localizations[y_prop] < y_max)
            ]
        else:
            # Work with currently selected rows
            if self.filtered_dataframe is None:
                return None
            df = self.filtered_dataframe
            return df.loc[
                (df[x_prop] >= x_min)
                & (df[x_prop] < x_max)
                & (df[y_prop] >= y_min)
                & (df[y_prop] < y_max)
            ]

    def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range):
        """Filter dataset by the extracting a rectangular ROI over two parameters and two ranges.

        Parameters
        ----------

        x_prop: str
            First property to be filtered by corresponding x_range.

        y_prop: str
            Second property to be filtered by corresponding y_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the first property.

        y_range: tuple
            Tuple containing the minimum and maximum values for the second property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        y_min = y_range[0]
        y_max = y_range[1]
        if y_max < y_min:
            y_max, y_min = y_min, y_max

        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = (
                    self._selected_rows_dict[fluo_id]
                    & (self.filtered_dataframe[x_prop] >= x_min)
                    & (self.filtered_dataframe[x_prop] < x_max)
                    & (self.filtered_dataframe[y_prop] >= y_min)
                    & (self.filtered_dataframe[y_prop] < y_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = (
                self._selected_rows_dict[fluo_id]
                & (self.filtered_dataframe[x_prop] >= x_min)
                & (self.filtered_dataframe[x_prop] < x_max)
                & (self.filtered_dataframe[y_prop] >= y_min)
                & (self.filtered_dataframe[y_prop] < y_max)
            )

        # Make sure to always apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _apply_global_filters(self):
        """Apply filters that are defined in the global application configuration."""

        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._filter_by_tid_length(fluo_id)
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            self._selected_rows_dict[self.current_fluorophore_id] = self._filter_by_tid_length(
                self.current_fluorophore_id
            )

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _filter_by_tid_length(self, index):
        # Make sure to count only currently selected rows
        df = self.processed_dataframe.copy()
        df.loc[np.invert(self._selected_rows_dict[index]), "tid"] = np.nan

        # Select all rows where the count of TIDs is larger than self._min_trace_num
        counts = df["tid"].value_counts(normalize=False)
        return df["tid"].isin(counts[counts >= self.min_trace_length].index)

    def filter_by_single_threshold(
        self, prop: str, threshold: Union[int, float], larger_than: bool = True
    ):
        """Apply single threshold to filter values either lower or higher (equal) than threshold for given property."""

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                if larger_than:
                    self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                        self.filtered_dataframe[prop] >= threshold
                    )
                else:
                    self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                        self.filtered_dataframe[prop] < threshold
                    )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            if larger_than:
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    self.filtered_dataframe[prop] >= threshold
                )
            else:
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    self.filtered_dataframe[prop] < threshold
                )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_range(self, x_prop: str, x_range: tuple):
        """Apply min and max thresholding to the given property.

        Parameters
        ----------

        x_prop: str
            Name of the property (dataframe column) to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = (
                    self._selected_rows_dict[fluo_id]
                    & (self.filtered_dataframe[x_prop] >= x_min)
                    & (self.filtered_dataframe[x_prop] < x_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = (
                self._selected_rows_dict[fluo_id]
                & (self.filtered_dataframe[x_prop] >= x_min)
                & (self.filtered_dataframe[x_prop] < x_max)
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_range_complement(self, prop: str, x_range: tuple):
        """Apply min and max thresholding to the given property but keep the
        data outside the range (i.e., crop the selected range).

        Parameters
        ----------

        prop: str
            Name of the property (dataframe column) to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for cropping the selected property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    (self.filtered_dataframe[prop] < x_min)
                    | (self.filtered_dataframe[prop] >= x_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                (self.filtered_dataframe[prop] < x_min)
                | (self.filtered_dataframe[prop] >= x_max)
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple):
        """Filter TIDs by min and max thresholding using the given property from the stats dataframe.

        Parameters
        ----------

        x_prop_stats: str
            Name of the property (column) from the stats dataframe used to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.
        """

        # Make sure the property exists in the stats dataframe
        if x_prop_stats not in self.filtered_dataframe_stats.columns:
            raise ValueError(
                f"The property {x_prop_stats} does not exist in `filtered_dataframe_stats`."
            )

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Find all TIDs for current fluorophore ID by which the requested stats property is inside the range
        tids_to_keep = self.filtered_dataframe_stats[
            (
                (self.filtered_dataframe_stats[x_prop_stats] >= x_min)
                & (self.filtered_dataframe_stats[x_prop_stats] <= x_max)
            )
        ]["tid"].to_numpy()

        # Rows of the filtered dataframe to keep
        rows_to_keep = self.filtered_dataframe["tid"].isin(tids_to_keep)

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & rows_to_keep
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            self._selected_rows_dict[self.current_fluorophore_id] = (
                self._selected_rows_dict[self.current_fluorophore_id] & rows_to_keep
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _calculate_statistics(self):
        """Calculate per-trace statistics."""

        # Make sure we have processed dataframe to work on
        if self.processed_dataframe is None:
            return

        # Only recompute statistics if needed
        if not self._stats_to_be_recomputed:
            return

        # Work with currently selected rows
        df = self.filtered_dataframe

        # Calculate the statistics
        df_tid = self.calculate_statistics_on(df, self.reader.is_tracking)

        # Store the results
        self._filtered_stats_dataframe = df_tid

        # Flag the statistics to be computed
        self._stats_to_be_recomputed = False

    @staticmethod
    def calculate_statistics_on(
        df: pd.DataFrame, is_tracking: bool = False
    ) -> pd.DataFrame:
        """Calculate per-trace statistics for the selected dataframe.

        Parameters
        ----------

        df: pd.DataFrame
            DataFrame (view) generated by one of the `select_by_*` methods.

        is_tracking: bool
            Whether the data comes from a tracking instead of a localization experiment.

        Returns
        -------
        df_stats: pd.DataFrame
            Per-trace statistics calculated on the passed DataFrame selection (view).
        """

        # Prepare a dataframe with the statistics
        if is_tracking:
            df_tid = pd.DataFrame(
                columns=MinFluxProcessor.trace_stats_with_tracking_properties()
            )
        else:
            df_tid = pd.DataFrame(columns=MinFluxProcessor.trace_stats_properties())

        # Calculate some statistics per TID on the passed dataframe
        df_grouped = df.groupby("tid")

        # Base statistics
        tid = df_grouped["tid"].first().to_numpy()
        n = df_grouped["tid"].count().to_numpy()
        mx = df_grouped["x"].mean().to_numpy()
        my = df_grouped["y"].mean().to_numpy()
        mz = df_grouped["z"].mean().to_numpy()
        sx = df_grouped["x"].std().to_numpy()
        sy = df_grouped["y"].std().to_numpy()
        sz = df_grouped["z"].std().to_numpy()
        tmp = np.power(sx, 2) + np.power(sy, 2)
        sxy = np.sqrt(tmp)
        rms_xy = np.sqrt(tmp / 2)
        exy = sxy / np.sqrt(n)
        ez = sz / np.sqrt(n)
        fluo = df_grouped["fluo"].agg(lambda x: mode(x, keepdims=True)[0][0]).to_numpy()
        mtim = df_grouped["tim"].mean().to_numpy()
        tot_tim, _, _ = calculate_trace_time(df)

        # Optional tracking statistics
        if is_tracking:
            total_distance, _, _ = calculate_total_distance_traveled(df)
            speeds = (
                total_distance["displacement"].to_numpy()
                / tot_tim["tim_tot"].to_numpy()
            )

        # Store trace stats
        df_tid["tid"] = tid  # Trace ID
        df_tid["n"] = n  # Number of localizations for given trace ID
        df_tid["mx"] = mx  # x mean localization
        df_tid["my"] = my  # y mean localization
        df_tid["mz"] = mz  # z mean localization
        df_tid["sx"] = sx  # x localization precision
        df_tid["sy"] = sy  # y localization precision
        df_tid["sxy"] = sxy  # Lateral (x, y) localization precision
        df_tid["rms_xy"] = rms_xy  # Lateral root mean square
        df_tid["exy"] = exy  # Standard error of sxy
        df_tid["sz"] = sz  # z localization precision
        df_tid["ez"] = ez  # Standard error of ez
        df_tid["fluo"] = fluo  # Assigned fluorophore ID
        df_tid["mtim"] = mtim  # Average time per trace
        df_tid["tim_tot"] = tot_tim["tim_tot"].to_numpy()  # Total time per trace
        if is_tracking:
            df_tid["avg_speed"] = speeds  # Average speed per trace
            df_tid["total_dist"] = total_distance[
                "displacement"
            ].to_numpy()  # Total travelled distance per trace

        # ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] columns will contain
        # np.nan if n == 1: we replace them with 0.0.
        # @TODO: should this be changed? It could be a global option.
        df_tid[["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]] = df_tid[
            ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]
        ].fillna(value=0.0)

        # Return the results
        return df_tid

    def _calculate_weighted_positions(self):
        """Calculate per-trace localization weighted by relative photon count."""

        if self.filtered_dataframe is None:
            return

        if not self._weighted_localizations_to_be_recomputed:
            return

        # Work with a copy of a subset of current filtered dataframe
        df = self.filtered_dataframe[
            ["tid", "tim", "eco", "x", "y", "z", "fluo"]
        ].copy()

        if self._use_weighted_localizations:
            # Calculate weights for each coordinate based on 'eco'
            total_eco_per_tid = df.groupby("tid")["eco"].transform("sum")
            eco_rel = df["eco"] / total_eco_per_tid

            # Calculate weighted positions
            df.loc[:, "x_rel"] = df["x"] * eco_rel
            df.loc[:, "y_rel"] = df["y"] * eco_rel
            df.loc[:, "z_rel"] = df["z"] * eco_rel

            # Summing up the relative contributions
            df_grouped = df.groupby("tid")
            x_w = df_grouped["x_rel"].sum()
            y_w = df_grouped["y_rel"].sum()
            z_w = df_grouped["z_rel"].sum()

            # Return the most frequent fluo ID: traces should be homogeneous with respect
            # to their fluorophore assignment, but we search anyway for safety.
            fluo_mode = df_grouped["fluo"].agg(
                lambda x: x.mode()[0] if not x.empty else np.nan
            )

        else:
            # Calculate simple average of localizations by TID
            df_grouped = df.groupby("tid")
            x_w = df_grouped["x"].mean()
            y_w = df_grouped["y"].mean()
            z_w = df_grouped["z"].mean()

            # Return the most frequent fluo ID: traces should be homogeneous with respect
            # to their fluorophore assignment, but we search anyway for safety.
            fluo_mode = df_grouped["fluo"].agg(
                lambda x: x.mode()[0] if not x.empty else np.nan
            )

        # We calculate also the mean timestamp (not weighted)
        tim = df_grouped["tim"].mean()

        # Prepare a dataframe with the weighted localizations
        df_loc = pd.DataFrame(
            {
                "tid": x_w.index,
                "tim": tim.to_numpy(),
                "x": x_w.to_numpy(),
                "y": y_w.to_numpy(),
                "z": z_w.to_numpy(),
                "fluo": fluo_mode.to_numpy(),
            }
        )

        # Update the weighted localization dataframe
        self._weighted_localizations = df_loc

        # Flag the results as up-to-date
        self._weighted_localizations_to_be_recomputed = False

Allows for filtering and selecting data read by the underlying MinFluxReader. Please notice that MinFluxProcessor makes use of State.min_trace_length to make sure that at load and after every filtering step, short traces are dropped.

Constructor.

Parameters

source : Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset]
Either a MinFluxReader object or a MinFluxDataset.
min_trace_length : int (Default = 1)
Minimum number of localizations for a trace to be kept. Shorter traces are dropped.

Static methods

def calculate_statistics_on(df: pandas.core.frame.DataFrame, is_tracking: bool = False) ‑> pandas.core.frame.DataFrame
Expand source code
@staticmethod
def calculate_statistics_on(
    df: pd.DataFrame, is_tracking: bool = False
) -> pd.DataFrame:
    """Calculate per-trace statistics for the selected dataframe.

    Parameters
    ----------

    df: pd.DataFrame
        DataFrame (view) generated by one of the `select_by_*` methods.

    is_tracking: bool
        Whether the data comes from a tracking instead of a localization experiment.

    Returns
    -------
    df_stats: pd.DataFrame
        Per-trace statistics calculated on the passed DataFrame selection (view).
    """

    # Prepare a dataframe with the statistics
    if is_tracking:
        df_tid = pd.DataFrame(
            columns=MinFluxProcessor.trace_stats_with_tracking_properties()
        )
    else:
        df_tid = pd.DataFrame(columns=MinFluxProcessor.trace_stats_properties())

    # Calculate some statistics per TID on the passed dataframe
    df_grouped = df.groupby("tid")

    # Base statistics
    tid = df_grouped["tid"].first().to_numpy()
    n = df_grouped["tid"].count().to_numpy()
    mx = df_grouped["x"].mean().to_numpy()
    my = df_grouped["y"].mean().to_numpy()
    mz = df_grouped["z"].mean().to_numpy()
    sx = df_grouped["x"].std().to_numpy()
    sy = df_grouped["y"].std().to_numpy()
    sz = df_grouped["z"].std().to_numpy()
    tmp = np.power(sx, 2) + np.power(sy, 2)
    sxy = np.sqrt(tmp)
    rms_xy = np.sqrt(tmp / 2)
    exy = sxy / np.sqrt(n)
    ez = sz / np.sqrt(n)
    fluo = df_grouped["fluo"].agg(lambda x: mode(x, keepdims=True)[0][0]).to_numpy()
    mtim = df_grouped["tim"].mean().to_numpy()
    tot_tim, _, _ = calculate_trace_time(df)

    # Optional tracking statistics
    if is_tracking:
        total_distance, _, _ = calculate_total_distance_traveled(df)
        speeds = (
            total_distance["displacement"].to_numpy()
            / tot_tim["tim_tot"].to_numpy()
        )

    # Store trace stats
    df_tid["tid"] = tid  # Trace ID
    df_tid["n"] = n  # Number of localizations for given trace ID
    df_tid["mx"] = mx  # x mean localization
    df_tid["my"] = my  # y mean localization
    df_tid["mz"] = mz  # z mean localization
    df_tid["sx"] = sx  # x localization precision
    df_tid["sy"] = sy  # y localization precision
    df_tid["sxy"] = sxy  # Lateral (x, y) localization precision
    df_tid["rms_xy"] = rms_xy  # Lateral root mean square
    df_tid["exy"] = exy  # Standard error of sxy
    df_tid["sz"] = sz  # z localization precision
    df_tid["ez"] = ez  # Standard error of ez
    df_tid["fluo"] = fluo  # Assigned fluorophore ID
    df_tid["mtim"] = mtim  # Average time per trace
    df_tid["tim_tot"] = tot_tim["tim_tot"].to_numpy()  # Total time per trace
    if is_tracking:
        df_tid["avg_speed"] = speeds  # Average speed per trace
        df_tid["total_dist"] = total_distance[
            "displacement"
        ].to_numpy()  # Total travelled distance per trace

    # ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] columns will contain
    # np.nan if n == 1: we replace them with 0.0.
    # @TODO: should this be changed? It could be a global option.
    df_tid[["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]] = df_tid[
        ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]
    ].fillna(value=0.0)

    # Return the results
    return df_tid

Calculate per-trace statistics for the selected dataframe.

Parameters

df : pd.DataFrame
DataFrame (view) generated by one of the select_by_* methods.
is_tracking : bool
Whether the data comes from a tracking instead of a localization experiment.

Returns

df_stats : pd.DataFrame
Per-trace statistics calculated on the passed DataFrame selection (view).
def processed_properties()

Return the processed dataframe columns.

def trace_stats_properties()

Return the columns of the filtered_dataframe_stats.

def trace_stats_with_tracking_properties()

Return the columns of the filtered_dataframe_stats with tracking columns.

Instance variables

prop current_fluorophore_id : int
Expand source code
@property
def current_fluorophore_id(self) -> int:
    """Return current fluorophore ID (0 for all)."""
    return self._current_fluorophore_id

Return current fluorophore ID (0 for all).

var dataset
Expand source code
class MinFluxProcessor:
    """Processor of MINFLUX data."""

    __doc__ = """Allows for filtering and selecting data read by the underlying `MinFluxReader`. Please notice that
     `MinFluxProcessor` makes use of `State.min_trace_length` to make sure that at load and after every
      filtering step, short traces are dropped."""

    __slots__ = [
        "state",
        "dataset",
        "_current_fluorophore_id",
        "_filtered_stats_dataframe",
        "_fluorophore_names",
        "_min_trace_length",
        "_selected_rows_dict",
        "_stats_to_be_recomputed",
        "_weighted_localizations",
        "_weighted_localizations_to_be_recomputed",
        "_use_weighted_localizations",
        "_has_dynamic_filters",
    ]

    def __init__(
        self,
        source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset],
        min_trace_length: int = 1,
    ):
        """Constructor.

        Parameters
        ----------

        source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset]
            Either a MinFluxReader object or a MinFluxDataset.

        min_trace_length: int (Default = 1)
            Minimum number of localizations for a trace to be kept. Shorter traces are dropped.
        """

        # Convert reader-like sources to dataset if necessary (duck-typed)
        if isinstance(source, MinFluxDataset):
            self.dataset = source
        elif isinstance(source, (MinFluxReader, MinFluxReaderV2)) or hasattr(
            source, "processed_dataframe"
        ):
            self.dataset = MinFluxDataset.from_reader(source)
        else:
            raise TypeError(
                f"source must be MinFluxReader, MinFluxReaderV2, MinFluxDataset, or reader-like; got {type(source)}"
            )

        # Global options (to be applied after every operation)
        self._min_trace_length: int = min_trace_length

        # Cache the filtered stats dataframe
        self._filtered_stats_dataframe = None

        # Keep separate arrays of booleans to cache selection state for all
        # fluorophore IDs.
        self._selected_rows_dict = None
        self._init_selected_rows_dict()

        # Keep track of the selected fluorophore
        # 0 - All (default)
        # 1 - Fluorophore 1
        # 2 - Fluorophore 2
        self._current_fluorophore_id = 0

        # Cache the weighted, averaged TID positions
        self._weighted_localizations = None

        # Keep track whether the statistics and the weighted localizations need to be recomputed
        self._stats_to_be_recomputed = False
        self._weighted_localizations_to_be_recomputed = False

        # Whether to use weighted average for localizations
        self._use_weighted_localizations = False

        # Track whether dynamic (user-applied) filters have been applied
        self._has_dynamic_filters = False

        # Initialize fluorophore names mapping (fluo_id -> name)
        self._fluorophore_names = {}
        self._init_fluorophore_names()

        # Apply the global filters
        self._apply_global_filters()

    def _init_selected_rows_dict(self):
        """Initialize the selected rows array."""
        if self.processed_dataframe is None:
            return

        # Create selection mask
        keep_mask = pd.Series(
            data=np.ones(len(self.processed_dataframe.index), dtype=bool),
            index=self.processed_dataframe.index,
        )

        # Store the mask for each fluorophore present in the data
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        unique_fluos = unique_fluos[unique_fluos > 0]
        self._selected_rows_dict = {}
        for fluo_id in unique_fluos:
            self._selected_rows_dict[int(fluo_id)] = keep_mask.copy()

    def has_filters_applied(self) -> bool:
        """Check if any dynamic (user-applied) filters have been applied to the data.
        
        This does not count global filters like min_trace_length which are always applied.
        
        Returns
        -------
        has_filters: bool
            True if any dynamic filters are active, False otherwise.
        """
        return self._has_dynamic_filters

    def _init_fluorophore_names(self):
        """Initialize fluorophore names with default values (string representation of fluo ID)."""
        if self.processed_dataframe is None:
            return
        
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        unique_fluos = unique_fluos[unique_fluos > 0]
        for fluo_id in unique_fluos:
            fluo_id_int = int(fluo_id)
            if fluo_id_int not in self._fluorophore_names:
                self._fluorophore_names[fluo_id_int] = str(fluo_id_int)

    @property
    def reader(self):
        """Return a reader-like interface to the dataset for backwards compatibility.
        
        This property provides access to the underlying dataset using the
        reader interface that existing code expects.
        """
        return self.dataset

    @property
    def is_tracking(self):
        """Minimum number of localizations for the trace to be kept."""
        return self.reader.is_tracking

    @property
    def min_trace_length(self):
        """Minimum number of localizations for the trace to be kept."""
        return self._min_trace_length

    @property
    def z_scaling_factor(self):
        """Returns the scaling factor for the z coordinates from the underlying MinFluxReader."""
        return self.reader.z_scaling_factor

    @min_trace_length.setter
    def min_trace_length(self, value):
        if value < 1 or int(value) != value:
            raise ValueError(
                "MinFluxProcessor.min_trace_length must be a positive integer!"
            )
        self._min_trace_length = value

        # Run the global filters
        self._apply_global_filters()

    @property
    def is_3d(self) -> bool:
        """Return True if the acquisition is 3D.

        Returns
        -------

        is_3d: bool
            True if the acquisition is 3D, False otherwise.
        """
        return self.reader.is_3d

    @property
    def num_values(self) -> int:
        """Return the number of values in the (filtered) dataframe.

        Returns
        -------

        n: int
            Number of values in the dataframe after all filters have been applied.
        """
        if self.filtered_dataframe is not None:
            return len(self.filtered_dataframe.index)

        return 0

    @property
    def current_fluorophore_id(self) -> int:
        """Return current fluorophore ID (0 for all)."""
        return self._current_fluorophore_id

    @current_fluorophore_id.setter
    def current_fluorophore_id(self, fluorophore_id: int) -> None:
        """Set current fluorophore ID (0 for all)."""

        # Validate fluorophore ID: 0 for all, or 1 to num_fluorophores
        if fluorophore_id < 0:
            raise ValueError(f"Fluorophore ID must be non-negative, got {fluorophore_id}.")
        
        if fluorophore_id > 0 and fluorophore_id > self.num_fluorophores:
            raise ValueError(
                f"Fluorophore ID {fluorophore_id} is out of range. "
                f"Valid range is 0 (all) or 1-{self.num_fluorophores}."
            )

        # Set the new fluorophore_id
        self._current_fluorophore_id = fluorophore_id

        # Apply the global filters
        self._apply_global_filters()

        # Flag stats to be recomputed
        self._stats_to_be_recomputed = True

    @property
    def num_fluorophores(self) -> int:
        """Return the number of fluorophores."""
        if self.processed_dataframe is None:
            return 0
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        return len(unique_fluos[unique_fluos > 0])

    @property
    def fluorophore_names(self) -> dict:
        """Return the fluorophore names mapping (fluo_id -> name)."""
        return self._fluorophore_names.copy()

    def set_fluorophore_name(self, fluo_id: int, name: str):
        """Set the name for a specific fluorophore ID.
        
        Parameters
        ----------
        fluo_id: int
            The fluorophore ID (must be >= 1)
        name: str
            The name to assign to this fluorophore
        """
        if fluo_id < 1:
            raise ValueError(f"Fluorophore ID must be >= 1, got {fluo_id}")
        if not isinstance(name, str):
            raise ValueError(f"Fluorophore name must be a string, got {type(name)}")
        self._fluorophore_names[fluo_id] = name

    def set_fluorophore_names(self, names: dict):
        """Set fluorophore names from a dictionary mapping.
        
        This method updates the fluorophore names dictionary, preserving
        any existing names for fluorophore IDs not included in the names parameter.
        
        Parameters
        ----------
        names: dict
            Dictionary mapping fluo_id (int) to name (str)
        """
        if not isinstance(names, dict):
            raise ValueError(f"Names must be a dictionary, got {type(names)}")
        for fluo_id, name in names.items():
            if not isinstance(fluo_id, int) or fluo_id < 1:
                raise ValueError(f"Fluorophore ID must be an integer >= 1, got {fluo_id}")
            if not isinstance(name, str):
                raise ValueError(f"Fluorophore name must be a string, got {type(name)}")
        # Update the dictionary instead of replacing it to preserve existing names
        self._fluorophore_names.update(names)

    def get_fluorophore_name(self, fluo_id: int) -> str:
        """Get the name for a specific fluorophore ID.
        
        Parameters
        ----------
        fluo_id: int
            The fluorophore ID
            
        Returns
        -------
        name: str
            The name of the fluorophore (defaults to string representation of ID if not set)
        """
        return self._fluorophore_names.get(fluo_id, str(fluo_id))

    def _filtered_raw_data_array_all_fluorophores(self):
        """Return the raw NumPy array with applied filters (for all fluorophores).

        This is only compatible with version 1 MinFluxReader (check performed by
        the invoked protected methods).
        """

        # Copy the raw NumPy array
        raw_array = self.reader.valid_raw_data_array
        if raw_array is None:
            return None

        if self.processed_dataframe is None or self._selected_rows_dict is None:
            return None

        # Append the fluorophore ID data
        raw_array["fluo"] = self.processed_dataframe["fluo"].astype(np.uint8)

        # Extract combination of all fluorophore filtered dataframes
        combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool)
        for fluo_id, fluo_mask in self._selected_rows_dict.items():
            combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask
        
        return raw_array[combined_mask]

    @property
    def filtered_raw_data_array(self):
        """Return the raw NumPy array with applied filters for the selected fluorophores.

        This is only compatible with version 1 MinFluxReader (check performed by
        the invoked protected methods).
        """

        # Copy the raw NumPy array
        full_array = self._filtered_raw_data_array_all_fluorophores()
        if full_array is None:
            return None

        if self.current_fluorophore_id == 0:
            return full_array
        else:
            # Filter by the current fluorophore ID
            return full_array[full_array["fluo"] == self.current_fluorophore_id]

    @property
    def processed_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return the full dataframe (with valid entries only), with no selections or filters.

        Returns
        -------

        processed_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        return self.dataset.processed_dataframe

    @property
    def filename(self) -> Union[Path, None]:
        """Return the filename if set."""
        return self.dataset.filename

    def replace_dataset(self, new_dataset: MinFluxDataset):
        """Replace the current dataset with a new one.
        
        This is useful when combining datasets - the processor's dataset
        can be replaced with the combined dataset without losing filter state
        or other settings.
        
        Parameters
        ----------
        new_dataset: MinFluxDataset
            The new dataset to use.
        """
        # Store current settings
        old_min_trace_length = self._min_trace_length
        old_use_weighted = self._use_weighted_localizations
        old_fluorophore_names = self._fluorophore_names.copy()
        
        # Replace the dataset
        self.dataset = new_dataset
        
        # Re-initialize selection and filters
        self._selected_rows_dict = None
        self._init_selected_rows_dict()
        
        # Re-initialize fluorophore names (preserving custom names where IDs match)
        self._fluorophore_names = {}
        self._init_fluorophore_names()
        
        # Restore custom names for matching fluorophore IDs
        for fluo_id, name in old_fluorophore_names.items():
            if fluo_id in self._fluorophore_names:
                self._fluorophore_names[fluo_id] = name
        
        # Restore settings
        self._min_trace_length = old_min_trace_length
        self._use_weighted_localizations = old_use_weighted
        
        # Reset current fluorophore to "all"
        self._current_fluorophore_id = 0
        
        # Reset dynamic filters flag (dataset replacement clears all filters)
        self._has_dynamic_filters = False
        
        # Apply global filters
        self._apply_global_filters()
        
        # Flag derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _filtered_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]:
        """Return joint dataframe for all fluorophores and with all filters applied.

        Returns
        -------

        filtered_dataframe_all: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        if self.processed_dataframe is None or self._selected_rows_dict is None:
            return None

        # Extract combination of all fluorophore filtered dataframes
        combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool)
        for fluo_id, fluo_mask in self._selected_rows_dict.items():
            combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask
        
        return self.processed_dataframe.loc[combined_mask]

    def _filtered_raw_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]:
        """Return joint raw dataframe (all properties) for all fluorophores and with all filters applied.

        Returns
        -------

        filtered_dataframe_all: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        # This is only compatible with MinFluxReader version 2
        if self.reader.version != 2:
            raise ValueError("Only reader version 2 is supported.")

        # valid_full_raw_dataframe is a MinFluxReaderV2 property
        raw_array = self.reader.valid_full_raw_dataframe

        # Now extract the fluorophore assignments from self.processed_dataframe and
        # expand them onto the raw array
        if "fluo" in raw_array.columns:
            raw_fluo = raw_array["fluo"].fillna(0).astype(np.uint8).to_numpy(copy=True)
        else:
            raw_fluo = np.zeros(len(raw_array), dtype=np.uint8)

        if "iid" in raw_array.columns and "iid" in self.processed_dataframe.columns:
            fluo_map = dict(
                zip(
                    self.processed_dataframe["iid"],
                    self.processed_dataframe["fluo"].astype(np.uint8),
                )
            )
            mapped_fluo = raw_array["iid"].map(fluo_map)
            matched_mask = mapped_fluo.notna().to_numpy()
            if matched_mask.any():
                raw_fluo[matched_mask] = mapped_fluo.loc[matched_mask].astype(np.uint8).to_numpy()

        raw_array["fluo"] = raw_fluo.astype(np.uint8)

        # Return the array with the assigned fluorophores
        return raw_array

    @property
    def filtered_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return dataframe with all filters applied.

        Returns
        -------

        filtered_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        if self.processed_dataframe is None:
            return None
        if self.current_fluorophore_id == 0:
            return self._filtered_dataframe_all_fluorophores()
        else:
            # Use .loc to filter the dataframe in a single step
            filtered_df = self.processed_dataframe.loc[
                self.processed_dataframe["fluo"] == self.current_fluorophore_id
            ]
            if self._selected_rows_dict is None:
                return None
            selected_indices = self._selected_rows_dict.get(
                self.current_fluorophore_id, []
            )
            return filtered_df.loc[selected_indices]

    @property
    def filtered_raw_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return joint dataframe with all filters applied.

        Returns
        -------

        filtered_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        # This is only compatible with MinFluxReader version 2
        if self.reader.version != 2:
            raise ValueError("Only reader version 2 is supported.")

        # Copy the raw dataframe array
        full_dataframe = self._filtered_raw_dataframe_all_fluorophores()
        if full_dataframe is None:
            return None

        # Get the IIDs from the currently filtered localizations
        iids = self.filtered_dataframe["iid"].to_numpy()

        # Extract all rows matching the current set of iids
        full_dataframe = full_dataframe[full_dataframe["iid"].isin(iids)]

        # If needed, filter by fluorophore ID
        if self.current_fluorophore_id == 0:
            return full_dataframe
        else:
            # Filter by the current fluorophore ID
            return full_dataframe[full_dataframe["fluo"] == self.current_fluorophore_id]

    @property
    def filtered_dataframe_stats(self) -> Union[None, pd.DataFrame]:
        """Return dataframe stats with all filters applied.

        Returns
        -------

        filtered_dataframe_stats: Union[None, pd.DataFrame]
            A Pandas dataframe with all data statistics or None if no file was loaded.
        """
        if self._stats_to_be_recomputed:
            self._calculate_statistics()
        return self._filtered_stats_dataframe

    @property
    def weighted_localizations(self) -> Union[None, pd.DataFrame]:
        """Return the average (x, y, z) position per TID weighted by the relative photon count."""
        if self._weighted_localizations_to_be_recomputed:
            self._calculate_weighted_positions()
        return self._weighted_localizations

    @property
    def use_weighted_localizations(self) -> bool:
        """Whether to use weighted average to calculate the mean localization per TID."""
        return self._use_weighted_localizations

    @use_weighted_localizations.setter
    def use_weighted_localizations(self, value: bool):
        """Whether to use weighted average to calculate the mean localization per TID."""
        self._use_weighted_localizations = value
        self._weighted_localizations_to_be_recomputed = True

    @classmethod
    def processed_properties(cls):
        """Return the processed dataframe columns."""
        return MinFluxReader.processed_properties()

    @classmethod
    def trace_stats_properties(cls):
        """Return the columns of the filtered_dataframe_stats."""
        return [
            "tid",
            "n",
            "fluo",
            "mx",
            "my",
            "mz",
            "sx",
            "sy",
            "sxy",
            "exy",
            "rms_xy",
            "sz",
            "ez",
            "mtim",
            "tim_tot",
        ]

    @classmethod
    def trace_stats_with_tracking_properties(cls):
        """Return the columns of the filtered_dataframe_stats with tracking columns."""
        return MinFluxProcessor.trace_stats_properties() + [
            "avg_speed",
            "total_dist",
        ]

    def reset(self):
        """Drops all dynamic filters and resets the data to the processed data frame with global filters."""

        # Clear the selection per fluorophore; they will be reinitialized as
        # all selected at the first access.
        self._init_selected_rows_dict()

        # Reset the mapping to the corresponding fluorophore
        self.processed_dataframe["fluo"] = 1

        # Reset fluorophore names (clear custom names first)
        self._fluorophore_names = {}
        self._init_fluorophore_names()

        # Default fluorophore is 0 (no selection)
        self.current_fluorophore_id = 0

        # Reset dynamic filters flag
        self._has_dynamic_filters = False

        # Apply global filters
        self._apply_global_filters()

    def set_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]):
        """Assign the fluorophore IDs to current filtered dataset.
        
        This method assigns new fluorophore IDs only to the rows in the current
        filtered dataset. All other rows remain unchanged.
        """
        if self.filtered_dataframe is None:
            return
        if len(fluorophore_ids) != len(self.filtered_dataframe.index):
            raise ValueError(
                "The number of fluorophore IDs does not match the number of entries in the dataframe."
            )

        # Get the actual indices from the filtered dataframe
        filtered_indices = self.filtered_dataframe.index
        
        # Assign the new fluorophore IDs to those specific indices only
        # All other rows remain unchanged (preserving other fluorophores)
        self.processed_dataframe.loc[filtered_indices, "fluo"] = fluorophore_ids.astype(np.uint8)

        # Apply global filters
        self._init_selected_rows_dict()
        self._init_fluorophore_names()
        self._apply_global_filters()

    def set_full_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]):
        """Assign the fluorophore IDs to the original, full dataframe ignoring current filters."""
        if self.processed_dataframe is None:
            return
        if len(fluorophore_ids) != len(self.processed_dataframe.index):
            raise ValueError(
                "The number of fluorophore IDs does not match the number of entries in the dataframe."
            )
        self.processed_dataframe["fluo"] = fluorophore_ids.astype(np.uint8)

        # Apply global filters
        self._init_selected_rows_dict()
        self._init_fluorophore_names()
        self._apply_global_filters()

    def select_by_rows(
        self, indices: np.ndarray, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed
        DataFrame row indices.

        The underlying dataframe is not modified.

        Parameters
        ----------

        indices: np.ndarray
            Logical array for selecting the elements to be returned.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
        """
        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.iloc[indices]
        else:
            if self.filtered_dataframe is None:
                return None
            return self.filtered_dataframe.iloc[indices]

    def select_by_series_iloc(
        self, iloc: np.ndarray, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed
        DataFrame index locations.

        The underlying dataframe is not modified.

        Parameters
        ----------

        iloc: np.ndarray
            Array of Series index locations for selecting rows.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
        """
        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[iloc]
        else:
            if self.filtered_dataframe is None:
                return None
            return self.filtered_dataframe.loc[iloc]

    def select_by_1d_range(
        self, x_prop, x_range, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed
        parameter and corresponding range.

        The underlying dataframe is not modified.

        Parameters
        ----------

        x_prop: str
            Property to be filtered by corresponding x_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded.
        """

        # Make sure that the range is increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[
                (self._weighted_localizations[x_prop] >= x_min)
                & (self._weighted_localizations[x_prop] < x_max)
            ]
        else:
            # Work with currently selected rows
            if self.filtered_dataframe is None:
                return None
            df = self.filtered_dataframe
            return df.loc[(df[x_prop] >= x_min) & (df[x_prop] < x_max)]

    def select_by_2d_range(
        self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed
        parameters and corresponding ranges.

        The underlying dataframe is not modified.

        Parameters
        ----------

        x_prop: str
            First property to be filtered by corresponding x_range.

        y_prop: str
            Second property to be filtered by corresponding y_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the first property.

        y_range: tuple
            Tuple containing the minimum and maximum values for the second property.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file
            was loaded.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        y_min = y_range[0]
        y_max = y_range[1]
        if y_max < y_min:
            y_max, y_min = y_min, y_max

        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[
                (self._weighted_localizations[x_prop] >= x_min)
                & (self._weighted_localizations[x_prop] < x_max)
                & (self._weighted_localizations[y_prop] >= y_min)
                & (self._weighted_localizations[y_prop] < y_max)
            ]
        else:
            # Work with currently selected rows
            if self.filtered_dataframe is None:
                return None
            df = self.filtered_dataframe
            return df.loc[
                (df[x_prop] >= x_min)
                & (df[x_prop] < x_max)
                & (df[y_prop] >= y_min)
                & (df[y_prop] < y_max)
            ]

    def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range):
        """Filter dataset by the extracting a rectangular ROI over two parameters and two ranges.

        Parameters
        ----------

        x_prop: str
            First property to be filtered by corresponding x_range.

        y_prop: str
            Second property to be filtered by corresponding y_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the first property.

        y_range: tuple
            Tuple containing the minimum and maximum values for the second property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        y_min = y_range[0]
        y_max = y_range[1]
        if y_max < y_min:
            y_max, y_min = y_min, y_max

        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = (
                    self._selected_rows_dict[fluo_id]
                    & (self.filtered_dataframe[x_prop] >= x_min)
                    & (self.filtered_dataframe[x_prop] < x_max)
                    & (self.filtered_dataframe[y_prop] >= y_min)
                    & (self.filtered_dataframe[y_prop] < y_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = (
                self._selected_rows_dict[fluo_id]
                & (self.filtered_dataframe[x_prop] >= x_min)
                & (self.filtered_dataframe[x_prop] < x_max)
                & (self.filtered_dataframe[y_prop] >= y_min)
                & (self.filtered_dataframe[y_prop] < y_max)
            )

        # Make sure to always apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _apply_global_filters(self):
        """Apply filters that are defined in the global application configuration."""

        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._filter_by_tid_length(fluo_id)
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            self._selected_rows_dict[self.current_fluorophore_id] = self._filter_by_tid_length(
                self.current_fluorophore_id
            )

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _filter_by_tid_length(self, index):
        # Make sure to count only currently selected rows
        df = self.processed_dataframe.copy()
        df.loc[np.invert(self._selected_rows_dict[index]), "tid"] = np.nan

        # Select all rows where the count of TIDs is larger than self._min_trace_num
        counts = df["tid"].value_counts(normalize=False)
        return df["tid"].isin(counts[counts >= self.min_trace_length].index)

    def filter_by_single_threshold(
        self, prop: str, threshold: Union[int, float], larger_than: bool = True
    ):
        """Apply single threshold to filter values either lower or higher (equal) than threshold for given property."""

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                if larger_than:
                    self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                        self.filtered_dataframe[prop] >= threshold
                    )
                else:
                    self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                        self.filtered_dataframe[prop] < threshold
                    )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            if larger_than:
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    self.filtered_dataframe[prop] >= threshold
                )
            else:
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    self.filtered_dataframe[prop] < threshold
                )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_range(self, x_prop: str, x_range: tuple):
        """Apply min and max thresholding to the given property.

        Parameters
        ----------

        x_prop: str
            Name of the property (dataframe column) to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = (
                    self._selected_rows_dict[fluo_id]
                    & (self.filtered_dataframe[x_prop] >= x_min)
                    & (self.filtered_dataframe[x_prop] < x_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = (
                self._selected_rows_dict[fluo_id]
                & (self.filtered_dataframe[x_prop] >= x_min)
                & (self.filtered_dataframe[x_prop] < x_max)
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_range_complement(self, prop: str, x_range: tuple):
        """Apply min and max thresholding to the given property but keep the
        data outside the range (i.e., crop the selected range).

        Parameters
        ----------

        prop: str
            Name of the property (dataframe column) to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for cropping the selected property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    (self.filtered_dataframe[prop] < x_min)
                    | (self.filtered_dataframe[prop] >= x_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                (self.filtered_dataframe[prop] < x_min)
                | (self.filtered_dataframe[prop] >= x_max)
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple):
        """Filter TIDs by min and max thresholding using the given property from the stats dataframe.

        Parameters
        ----------

        x_prop_stats: str
            Name of the property (column) from the stats dataframe used to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.
        """

        # Make sure the property exists in the stats dataframe
        if x_prop_stats not in self.filtered_dataframe_stats.columns:
            raise ValueError(
                f"The property {x_prop_stats} does not exist in `filtered_dataframe_stats`."
            )

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Find all TIDs for current fluorophore ID by which the requested stats property is inside the range
        tids_to_keep = self.filtered_dataframe_stats[
            (
                (self.filtered_dataframe_stats[x_prop_stats] >= x_min)
                & (self.filtered_dataframe_stats[x_prop_stats] <= x_max)
            )
        ]["tid"].to_numpy()

        # Rows of the filtered dataframe to keep
        rows_to_keep = self.filtered_dataframe["tid"].isin(tids_to_keep)

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & rows_to_keep
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            self._selected_rows_dict[self.current_fluorophore_id] = (
                self._selected_rows_dict[self.current_fluorophore_id] & rows_to_keep
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _calculate_statistics(self):
        """Calculate per-trace statistics."""

        # Make sure we have processed dataframe to work on
        if self.processed_dataframe is None:
            return

        # Only recompute statistics if needed
        if not self._stats_to_be_recomputed:
            return

        # Work with currently selected rows
        df = self.filtered_dataframe

        # Calculate the statistics
        df_tid = self.calculate_statistics_on(df, self.reader.is_tracking)

        # Store the results
        self._filtered_stats_dataframe = df_tid

        # Flag the statistics to be computed
        self._stats_to_be_recomputed = False

    @staticmethod
    def calculate_statistics_on(
        df: pd.DataFrame, is_tracking: bool = False
    ) -> pd.DataFrame:
        """Calculate per-trace statistics for the selected dataframe.

        Parameters
        ----------

        df: pd.DataFrame
            DataFrame (view) generated by one of the `select_by_*` methods.

        is_tracking: bool
            Whether the data comes from a tracking instead of a localization experiment.

        Returns
        -------
        df_stats: pd.DataFrame
            Per-trace statistics calculated on the passed DataFrame selection (view).
        """

        # Prepare a dataframe with the statistics
        if is_tracking:
            df_tid = pd.DataFrame(
                columns=MinFluxProcessor.trace_stats_with_tracking_properties()
            )
        else:
            df_tid = pd.DataFrame(columns=MinFluxProcessor.trace_stats_properties())

        # Calculate some statistics per TID on the passed dataframe
        df_grouped = df.groupby("tid")

        # Base statistics
        tid = df_grouped["tid"].first().to_numpy()
        n = df_grouped["tid"].count().to_numpy()
        mx = df_grouped["x"].mean().to_numpy()
        my = df_grouped["y"].mean().to_numpy()
        mz = df_grouped["z"].mean().to_numpy()
        sx = df_grouped["x"].std().to_numpy()
        sy = df_grouped["y"].std().to_numpy()
        sz = df_grouped["z"].std().to_numpy()
        tmp = np.power(sx, 2) + np.power(sy, 2)
        sxy = np.sqrt(tmp)
        rms_xy = np.sqrt(tmp / 2)
        exy = sxy / np.sqrt(n)
        ez = sz / np.sqrt(n)
        fluo = df_grouped["fluo"].agg(lambda x: mode(x, keepdims=True)[0][0]).to_numpy()
        mtim = df_grouped["tim"].mean().to_numpy()
        tot_tim, _, _ = calculate_trace_time(df)

        # Optional tracking statistics
        if is_tracking:
            total_distance, _, _ = calculate_total_distance_traveled(df)
            speeds = (
                total_distance["displacement"].to_numpy()
                / tot_tim["tim_tot"].to_numpy()
            )

        # Store trace stats
        df_tid["tid"] = tid  # Trace ID
        df_tid["n"] = n  # Number of localizations for given trace ID
        df_tid["mx"] = mx  # x mean localization
        df_tid["my"] = my  # y mean localization
        df_tid["mz"] = mz  # z mean localization
        df_tid["sx"] = sx  # x localization precision
        df_tid["sy"] = sy  # y localization precision
        df_tid["sxy"] = sxy  # Lateral (x, y) localization precision
        df_tid["rms_xy"] = rms_xy  # Lateral root mean square
        df_tid["exy"] = exy  # Standard error of sxy
        df_tid["sz"] = sz  # z localization precision
        df_tid["ez"] = ez  # Standard error of ez
        df_tid["fluo"] = fluo  # Assigned fluorophore ID
        df_tid["mtim"] = mtim  # Average time per trace
        df_tid["tim_tot"] = tot_tim["tim_tot"].to_numpy()  # Total time per trace
        if is_tracking:
            df_tid["avg_speed"] = speeds  # Average speed per trace
            df_tid["total_dist"] = total_distance[
                "displacement"
            ].to_numpy()  # Total travelled distance per trace

        # ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] columns will contain
        # np.nan if n == 1: we replace them with 0.0.
        # @TODO: should this be changed? It could be a global option.
        df_tid[["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]] = df_tid[
            ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]
        ].fillna(value=0.0)

        # Return the results
        return df_tid

    def _calculate_weighted_positions(self):
        """Calculate per-trace localization weighted by relative photon count."""

        if self.filtered_dataframe is None:
            return

        if not self._weighted_localizations_to_be_recomputed:
            return

        # Work with a copy of a subset of current filtered dataframe
        df = self.filtered_dataframe[
            ["tid", "tim", "eco", "x", "y", "z", "fluo"]
        ].copy()

        if self._use_weighted_localizations:
            # Calculate weights for each coordinate based on 'eco'
            total_eco_per_tid = df.groupby("tid")["eco"].transform("sum")
            eco_rel = df["eco"] / total_eco_per_tid

            # Calculate weighted positions
            df.loc[:, "x_rel"] = df["x"] * eco_rel
            df.loc[:, "y_rel"] = df["y"] * eco_rel
            df.loc[:, "z_rel"] = df["z"] * eco_rel

            # Summing up the relative contributions
            df_grouped = df.groupby("tid")
            x_w = df_grouped["x_rel"].sum()
            y_w = df_grouped["y_rel"].sum()
            z_w = df_grouped["z_rel"].sum()

            # Return the most frequent fluo ID: traces should be homogeneous with respect
            # to their fluorophore assignment, but we search anyway for safety.
            fluo_mode = df_grouped["fluo"].agg(
                lambda x: x.mode()[0] if not x.empty else np.nan
            )

        else:
            # Calculate simple average of localizations by TID
            df_grouped = df.groupby("tid")
            x_w = df_grouped["x"].mean()
            y_w = df_grouped["y"].mean()
            z_w = df_grouped["z"].mean()

            # Return the most frequent fluo ID: traces should be homogeneous with respect
            # to their fluorophore assignment, but we search anyway for safety.
            fluo_mode = df_grouped["fluo"].agg(
                lambda x: x.mode()[0] if not x.empty else np.nan
            )

        # We calculate also the mean timestamp (not weighted)
        tim = df_grouped["tim"].mean()

        # Prepare a dataframe with the weighted localizations
        df_loc = pd.DataFrame(
            {
                "tid": x_w.index,
                "tim": tim.to_numpy(),
                "x": x_w.to_numpy(),
                "y": y_w.to_numpy(),
                "z": z_w.to_numpy(),
                "fluo": fluo_mode.to_numpy(),
            }
        )

        # Update the weighted localization dataframe
        self._weighted_localizations = df_loc

        # Flag the results as up-to-date
        self._weighted_localizations_to_be_recomputed = False
prop filename : pathlib.Path | None
Expand source code
@property
def filename(self) -> Union[Path, None]:
    """Return the filename if set."""
    return self.dataset.filename

Return the filename if set.

prop filtered_dataframe : pandas.core.frame.DataFrame | None
Expand source code
@property
def filtered_dataframe(self) -> Union[None, pd.DataFrame]:
    """Return dataframe with all filters applied.

    Returns
    -------

    filtered_dataframe: Union[None, pd.DataFrame]
        A Pandas dataframe or None if no file was loaded.
    """
    if self.processed_dataframe is None:
        return None
    if self.current_fluorophore_id == 0:
        return self._filtered_dataframe_all_fluorophores()
    else:
        # Use .loc to filter the dataframe in a single step
        filtered_df = self.processed_dataframe.loc[
            self.processed_dataframe["fluo"] == self.current_fluorophore_id
        ]
        if self._selected_rows_dict is None:
            return None
        selected_indices = self._selected_rows_dict.get(
            self.current_fluorophore_id, []
        )
        return filtered_df.loc[selected_indices]

Return dataframe with all filters applied.

Returns

filtered_dataframe : Union[None, pd.DataFrame]
A Pandas dataframe or None if no file was loaded.
prop filtered_dataframe_stats : pandas.core.frame.DataFrame | None
Expand source code
@property
def filtered_dataframe_stats(self) -> Union[None, pd.DataFrame]:
    """Return dataframe stats with all filters applied.

    Returns
    -------

    filtered_dataframe_stats: Union[None, pd.DataFrame]
        A Pandas dataframe with all data statistics or None if no file was loaded.
    """
    if self._stats_to_be_recomputed:
        self._calculate_statistics()
    return self._filtered_stats_dataframe

Return dataframe stats with all filters applied.

Returns

filtered_dataframe_stats : Union[None, pd.DataFrame]
A Pandas dataframe with all data statistics or None if no file was loaded.
prop filtered_raw_data_array
Expand source code
@property
def filtered_raw_data_array(self):
    """Return the raw NumPy array with applied filters for the selected fluorophores.

    This is only compatible with version 1 MinFluxReader (check performed by
    the invoked protected methods).
    """

    # Copy the raw NumPy array
    full_array = self._filtered_raw_data_array_all_fluorophores()
    if full_array is None:
        return None

    if self.current_fluorophore_id == 0:
        return full_array
    else:
        # Filter by the current fluorophore ID
        return full_array[full_array["fluo"] == self.current_fluorophore_id]

Return the raw NumPy array with applied filters for the selected fluorophores.

This is only compatible with version 1 MinFluxReader (check performed by the invoked protected methods).

prop filtered_raw_dataframe : pandas.core.frame.DataFrame | None
Expand source code
@property
def filtered_raw_dataframe(self) -> Union[None, pd.DataFrame]:
    """Return joint dataframe with all filters applied.

    Returns
    -------

    filtered_dataframe: Union[None, pd.DataFrame]
        A Pandas dataframe or None if no file was loaded.
    """
    # This is only compatible with MinFluxReader version 2
    if self.reader.version != 2:
        raise ValueError("Only reader version 2 is supported.")

    # Copy the raw dataframe array
    full_dataframe = self._filtered_raw_dataframe_all_fluorophores()
    if full_dataframe is None:
        return None

    # Get the IIDs from the currently filtered localizations
    iids = self.filtered_dataframe["iid"].to_numpy()

    # Extract all rows matching the current set of iids
    full_dataframe = full_dataframe[full_dataframe["iid"].isin(iids)]

    # If needed, filter by fluorophore ID
    if self.current_fluorophore_id == 0:
        return full_dataframe
    else:
        # Filter by the current fluorophore ID
        return full_dataframe[full_dataframe["fluo"] == self.current_fluorophore_id]

Return joint dataframe with all filters applied.

Returns

filtered_dataframe : Union[None, pd.DataFrame]
A Pandas dataframe or None if no file was loaded.
prop fluorophore_names : dict
Expand source code
@property
def fluorophore_names(self) -> dict:
    """Return the fluorophore names mapping (fluo_id -> name)."""
    return self._fluorophore_names.copy()

Return the fluorophore names mapping (fluo_id -> name).

prop is_3d : bool
Expand source code
@property
def is_3d(self) -> bool:
    """Return True if the acquisition is 3D.

    Returns
    -------

    is_3d: bool
        True if the acquisition is 3D, False otherwise.
    """
    return self.reader.is_3d

Return True if the acquisition is 3D.

Returns

is_3d : bool
True if the acquisition is 3D, False otherwise.
prop is_tracking
Expand source code
@property
def is_tracking(self):
    """Minimum number of localizations for the trace to be kept."""
    return self.reader.is_tracking

Minimum number of localizations for the trace to be kept.

prop min_trace_length
Expand source code
@property
def min_trace_length(self):
    """Minimum number of localizations for the trace to be kept."""
    return self._min_trace_length

Minimum number of localizations for the trace to be kept.

prop num_fluorophores : int
Expand source code
@property
def num_fluorophores(self) -> int:
    """Return the number of fluorophores."""
    if self.processed_dataframe is None:
        return 0
    unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
    return len(unique_fluos[unique_fluos > 0])

Return the number of fluorophores.

prop num_values : int
Expand source code
@property
def num_values(self) -> int:
    """Return the number of values in the (filtered) dataframe.

    Returns
    -------

    n: int
        Number of values in the dataframe after all filters have been applied.
    """
    if self.filtered_dataframe is not None:
        return len(self.filtered_dataframe.index)

    return 0

Return the number of values in the (filtered) dataframe.

Returns

n : int
Number of values in the dataframe after all filters have been applied.
prop processed_dataframe : pandas.core.frame.DataFrame | None
Expand source code
@property
def processed_dataframe(self) -> Union[None, pd.DataFrame]:
    """Return the full dataframe (with valid entries only), with no selections or filters.

    Returns
    -------

    processed_dataframe: Union[None, pd.DataFrame]
        A Pandas dataframe or None if no file was loaded.
    """
    return self.dataset.processed_dataframe

Return the full dataframe (with valid entries only), with no selections or filters.

Returns

processed_dataframe : Union[None, pd.DataFrame]
A Pandas dataframe or None if no file was loaded.
prop reader
Expand source code
@property
def reader(self):
    """Return a reader-like interface to the dataset for backwards compatibility.
    
    This property provides access to the underlying dataset using the
    reader interface that existing code expects.
    """
    return self.dataset

Return a reader-like interface to the dataset for backwards compatibility.

This property provides access to the underlying dataset using the reader interface that existing code expects.

var state
Expand source code
class MinFluxProcessor:
    """Processor of MINFLUX data."""

    __doc__ = """Allows for filtering and selecting data read by the underlying `MinFluxReader`. Please notice that
     `MinFluxProcessor` makes use of `State.min_trace_length` to make sure that at load and after every
      filtering step, short traces are dropped."""

    __slots__ = [
        "state",
        "dataset",
        "_current_fluorophore_id",
        "_filtered_stats_dataframe",
        "_fluorophore_names",
        "_min_trace_length",
        "_selected_rows_dict",
        "_stats_to_be_recomputed",
        "_weighted_localizations",
        "_weighted_localizations_to_be_recomputed",
        "_use_weighted_localizations",
        "_has_dynamic_filters",
    ]

    def __init__(
        self,
        source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset],
        min_trace_length: int = 1,
    ):
        """Constructor.

        Parameters
        ----------

        source: Union[MinFluxReader, MinFluxReaderV2, MinFluxDataset]
            Either a MinFluxReader object or a MinFluxDataset.

        min_trace_length: int (Default = 1)
            Minimum number of localizations for a trace to be kept. Shorter traces are dropped.
        """

        # Convert reader-like sources to dataset if necessary (duck-typed)
        if isinstance(source, MinFluxDataset):
            self.dataset = source
        elif isinstance(source, (MinFluxReader, MinFluxReaderV2)) or hasattr(
            source, "processed_dataframe"
        ):
            self.dataset = MinFluxDataset.from_reader(source)
        else:
            raise TypeError(
                f"source must be MinFluxReader, MinFluxReaderV2, MinFluxDataset, or reader-like; got {type(source)}"
            )

        # Global options (to be applied after every operation)
        self._min_trace_length: int = min_trace_length

        # Cache the filtered stats dataframe
        self._filtered_stats_dataframe = None

        # Keep separate arrays of booleans to cache selection state for all
        # fluorophore IDs.
        self._selected_rows_dict = None
        self._init_selected_rows_dict()

        # Keep track of the selected fluorophore
        # 0 - All (default)
        # 1 - Fluorophore 1
        # 2 - Fluorophore 2
        self._current_fluorophore_id = 0

        # Cache the weighted, averaged TID positions
        self._weighted_localizations = None

        # Keep track whether the statistics and the weighted localizations need to be recomputed
        self._stats_to_be_recomputed = False
        self._weighted_localizations_to_be_recomputed = False

        # Whether to use weighted average for localizations
        self._use_weighted_localizations = False

        # Track whether dynamic (user-applied) filters have been applied
        self._has_dynamic_filters = False

        # Initialize fluorophore names mapping (fluo_id -> name)
        self._fluorophore_names = {}
        self._init_fluorophore_names()

        # Apply the global filters
        self._apply_global_filters()

    def _init_selected_rows_dict(self):
        """Initialize the selected rows array."""
        if self.processed_dataframe is None:
            return

        # Create selection mask
        keep_mask = pd.Series(
            data=np.ones(len(self.processed_dataframe.index), dtype=bool),
            index=self.processed_dataframe.index,
        )

        # Store the mask for each fluorophore present in the data
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        unique_fluos = unique_fluos[unique_fluos > 0]
        self._selected_rows_dict = {}
        for fluo_id in unique_fluos:
            self._selected_rows_dict[int(fluo_id)] = keep_mask.copy()

    def has_filters_applied(self) -> bool:
        """Check if any dynamic (user-applied) filters have been applied to the data.
        
        This does not count global filters like min_trace_length which are always applied.
        
        Returns
        -------
        has_filters: bool
            True if any dynamic filters are active, False otherwise.
        """
        return self._has_dynamic_filters

    def _init_fluorophore_names(self):
        """Initialize fluorophore names with default values (string representation of fluo ID)."""
        if self.processed_dataframe is None:
            return
        
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        unique_fluos = unique_fluos[unique_fluos > 0]
        for fluo_id in unique_fluos:
            fluo_id_int = int(fluo_id)
            if fluo_id_int not in self._fluorophore_names:
                self._fluorophore_names[fluo_id_int] = str(fluo_id_int)

    @property
    def reader(self):
        """Return a reader-like interface to the dataset for backwards compatibility.
        
        This property provides access to the underlying dataset using the
        reader interface that existing code expects.
        """
        return self.dataset

    @property
    def is_tracking(self):
        """Minimum number of localizations for the trace to be kept."""
        return self.reader.is_tracking

    @property
    def min_trace_length(self):
        """Minimum number of localizations for the trace to be kept."""
        return self._min_trace_length

    @property
    def z_scaling_factor(self):
        """Returns the scaling factor for the z coordinates from the underlying MinFluxReader."""
        return self.reader.z_scaling_factor

    @min_trace_length.setter
    def min_trace_length(self, value):
        if value < 1 or int(value) != value:
            raise ValueError(
                "MinFluxProcessor.min_trace_length must be a positive integer!"
            )
        self._min_trace_length = value

        # Run the global filters
        self._apply_global_filters()

    @property
    def is_3d(self) -> bool:
        """Return True if the acquisition is 3D.

        Returns
        -------

        is_3d: bool
            True if the acquisition is 3D, False otherwise.
        """
        return self.reader.is_3d

    @property
    def num_values(self) -> int:
        """Return the number of values in the (filtered) dataframe.

        Returns
        -------

        n: int
            Number of values in the dataframe after all filters have been applied.
        """
        if self.filtered_dataframe is not None:
            return len(self.filtered_dataframe.index)

        return 0

    @property
    def current_fluorophore_id(self) -> int:
        """Return current fluorophore ID (0 for all)."""
        return self._current_fluorophore_id

    @current_fluorophore_id.setter
    def current_fluorophore_id(self, fluorophore_id: int) -> None:
        """Set current fluorophore ID (0 for all)."""

        # Validate fluorophore ID: 0 for all, or 1 to num_fluorophores
        if fluorophore_id < 0:
            raise ValueError(f"Fluorophore ID must be non-negative, got {fluorophore_id}.")
        
        if fluorophore_id > 0 and fluorophore_id > self.num_fluorophores:
            raise ValueError(
                f"Fluorophore ID {fluorophore_id} is out of range. "
                f"Valid range is 0 (all) or 1-{self.num_fluorophores}."
            )

        # Set the new fluorophore_id
        self._current_fluorophore_id = fluorophore_id

        # Apply the global filters
        self._apply_global_filters()

        # Flag stats to be recomputed
        self._stats_to_be_recomputed = True

    @property
    def num_fluorophores(self) -> int:
        """Return the number of fluorophores."""
        if self.processed_dataframe is None:
            return 0
        unique_fluos = np.unique(self.processed_dataframe["fluo"].to_numpy())
        return len(unique_fluos[unique_fluos > 0])

    @property
    def fluorophore_names(self) -> dict:
        """Return the fluorophore names mapping (fluo_id -> name)."""
        return self._fluorophore_names.copy()

    def set_fluorophore_name(self, fluo_id: int, name: str):
        """Set the name for a specific fluorophore ID.
        
        Parameters
        ----------
        fluo_id: int
            The fluorophore ID (must be >= 1)
        name: str
            The name to assign to this fluorophore
        """
        if fluo_id < 1:
            raise ValueError(f"Fluorophore ID must be >= 1, got {fluo_id}")
        if not isinstance(name, str):
            raise ValueError(f"Fluorophore name must be a string, got {type(name)}")
        self._fluorophore_names[fluo_id] = name

    def set_fluorophore_names(self, names: dict):
        """Set fluorophore names from a dictionary mapping.
        
        This method updates the fluorophore names dictionary, preserving
        any existing names for fluorophore IDs not included in the names parameter.
        
        Parameters
        ----------
        names: dict
            Dictionary mapping fluo_id (int) to name (str)
        """
        if not isinstance(names, dict):
            raise ValueError(f"Names must be a dictionary, got {type(names)}")
        for fluo_id, name in names.items():
            if not isinstance(fluo_id, int) or fluo_id < 1:
                raise ValueError(f"Fluorophore ID must be an integer >= 1, got {fluo_id}")
            if not isinstance(name, str):
                raise ValueError(f"Fluorophore name must be a string, got {type(name)}")
        # Update the dictionary instead of replacing it to preserve existing names
        self._fluorophore_names.update(names)

    def get_fluorophore_name(self, fluo_id: int) -> str:
        """Get the name for a specific fluorophore ID.
        
        Parameters
        ----------
        fluo_id: int
            The fluorophore ID
            
        Returns
        -------
        name: str
            The name of the fluorophore (defaults to string representation of ID if not set)
        """
        return self._fluorophore_names.get(fluo_id, str(fluo_id))

    def _filtered_raw_data_array_all_fluorophores(self):
        """Return the raw NumPy array with applied filters (for all fluorophores).

        This is only compatible with version 1 MinFluxReader (check performed by
        the invoked protected methods).
        """

        # Copy the raw NumPy array
        raw_array = self.reader.valid_raw_data_array
        if raw_array is None:
            return None

        if self.processed_dataframe is None or self._selected_rows_dict is None:
            return None

        # Append the fluorophore ID data
        raw_array["fluo"] = self.processed_dataframe["fluo"].astype(np.uint8)

        # Extract combination of all fluorophore filtered dataframes
        combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool)
        for fluo_id, fluo_mask in self._selected_rows_dict.items():
            combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask
        
        return raw_array[combined_mask]

    @property
    def filtered_raw_data_array(self):
        """Return the raw NumPy array with applied filters for the selected fluorophores.

        This is only compatible with version 1 MinFluxReader (check performed by
        the invoked protected methods).
        """

        # Copy the raw NumPy array
        full_array = self._filtered_raw_data_array_all_fluorophores()
        if full_array is None:
            return None

        if self.current_fluorophore_id == 0:
            return full_array
        else:
            # Filter by the current fluorophore ID
            return full_array[full_array["fluo"] == self.current_fluorophore_id]

    @property
    def processed_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return the full dataframe (with valid entries only), with no selections or filters.

        Returns
        -------

        processed_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        return self.dataset.processed_dataframe

    @property
    def filename(self) -> Union[Path, None]:
        """Return the filename if set."""
        return self.dataset.filename

    def replace_dataset(self, new_dataset: MinFluxDataset):
        """Replace the current dataset with a new one.
        
        This is useful when combining datasets - the processor's dataset
        can be replaced with the combined dataset without losing filter state
        or other settings.
        
        Parameters
        ----------
        new_dataset: MinFluxDataset
            The new dataset to use.
        """
        # Store current settings
        old_min_trace_length = self._min_trace_length
        old_use_weighted = self._use_weighted_localizations
        old_fluorophore_names = self._fluorophore_names.copy()
        
        # Replace the dataset
        self.dataset = new_dataset
        
        # Re-initialize selection and filters
        self._selected_rows_dict = None
        self._init_selected_rows_dict()
        
        # Re-initialize fluorophore names (preserving custom names where IDs match)
        self._fluorophore_names = {}
        self._init_fluorophore_names()
        
        # Restore custom names for matching fluorophore IDs
        for fluo_id, name in old_fluorophore_names.items():
            if fluo_id in self._fluorophore_names:
                self._fluorophore_names[fluo_id] = name
        
        # Restore settings
        self._min_trace_length = old_min_trace_length
        self._use_weighted_localizations = old_use_weighted
        
        # Reset current fluorophore to "all"
        self._current_fluorophore_id = 0
        
        # Reset dynamic filters flag (dataset replacement clears all filters)
        self._has_dynamic_filters = False
        
        # Apply global filters
        self._apply_global_filters()
        
        # Flag derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _filtered_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]:
        """Return joint dataframe for all fluorophores and with all filters applied.

        Returns
        -------

        filtered_dataframe_all: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        if self.processed_dataframe is None or self._selected_rows_dict is None:
            return None

        # Extract combination of all fluorophore filtered dataframes
        combined_mask = np.zeros(len(self.processed_dataframe), dtype=bool)
        for fluo_id, fluo_mask in self._selected_rows_dict.items():
            combined_mask |= (self.processed_dataframe["fluo"] == fluo_id) & fluo_mask
        
        return self.processed_dataframe.loc[combined_mask]

    def _filtered_raw_dataframe_all_fluorophores(self) -> Union[None, pd.DataFrame]:
        """Return joint raw dataframe (all properties) for all fluorophores and with all filters applied.

        Returns
        -------

        filtered_dataframe_all: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        # This is only compatible with MinFluxReader version 2
        if self.reader.version != 2:
            raise ValueError("Only reader version 2 is supported.")

        # valid_full_raw_dataframe is a MinFluxReaderV2 property
        raw_array = self.reader.valid_full_raw_dataframe

        # Now extract the fluorophore assignments from self.processed_dataframe and
        # expand them onto the raw array
        if "fluo" in raw_array.columns:
            raw_fluo = raw_array["fluo"].fillna(0).astype(np.uint8).to_numpy(copy=True)
        else:
            raw_fluo = np.zeros(len(raw_array), dtype=np.uint8)

        if "iid" in raw_array.columns and "iid" in self.processed_dataframe.columns:
            fluo_map = dict(
                zip(
                    self.processed_dataframe["iid"],
                    self.processed_dataframe["fluo"].astype(np.uint8),
                )
            )
            mapped_fluo = raw_array["iid"].map(fluo_map)
            matched_mask = mapped_fluo.notna().to_numpy()
            if matched_mask.any():
                raw_fluo[matched_mask] = mapped_fluo.loc[matched_mask].astype(np.uint8).to_numpy()

        raw_array["fluo"] = raw_fluo.astype(np.uint8)

        # Return the array with the assigned fluorophores
        return raw_array

    @property
    def filtered_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return dataframe with all filters applied.

        Returns
        -------

        filtered_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        if self.processed_dataframe is None:
            return None
        if self.current_fluorophore_id == 0:
            return self._filtered_dataframe_all_fluorophores()
        else:
            # Use .loc to filter the dataframe in a single step
            filtered_df = self.processed_dataframe.loc[
                self.processed_dataframe["fluo"] == self.current_fluorophore_id
            ]
            if self._selected_rows_dict is None:
                return None
            selected_indices = self._selected_rows_dict.get(
                self.current_fluorophore_id, []
            )
            return filtered_df.loc[selected_indices]

    @property
    def filtered_raw_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return joint dataframe with all filters applied.

        Returns
        -------

        filtered_dataframe: Union[None, pd.DataFrame]
            A Pandas dataframe or None if no file was loaded.
        """
        # This is only compatible with MinFluxReader version 2
        if self.reader.version != 2:
            raise ValueError("Only reader version 2 is supported.")

        # Copy the raw dataframe array
        full_dataframe = self._filtered_raw_dataframe_all_fluorophores()
        if full_dataframe is None:
            return None

        # Get the IIDs from the currently filtered localizations
        iids = self.filtered_dataframe["iid"].to_numpy()

        # Extract all rows matching the current set of iids
        full_dataframe = full_dataframe[full_dataframe["iid"].isin(iids)]

        # If needed, filter by fluorophore ID
        if self.current_fluorophore_id == 0:
            return full_dataframe
        else:
            # Filter by the current fluorophore ID
            return full_dataframe[full_dataframe["fluo"] == self.current_fluorophore_id]

    @property
    def filtered_dataframe_stats(self) -> Union[None, pd.DataFrame]:
        """Return dataframe stats with all filters applied.

        Returns
        -------

        filtered_dataframe_stats: Union[None, pd.DataFrame]
            A Pandas dataframe with all data statistics or None if no file was loaded.
        """
        if self._stats_to_be_recomputed:
            self._calculate_statistics()
        return self._filtered_stats_dataframe

    @property
    def weighted_localizations(self) -> Union[None, pd.DataFrame]:
        """Return the average (x, y, z) position per TID weighted by the relative photon count."""
        if self._weighted_localizations_to_be_recomputed:
            self._calculate_weighted_positions()
        return self._weighted_localizations

    @property
    def use_weighted_localizations(self) -> bool:
        """Whether to use weighted average to calculate the mean localization per TID."""
        return self._use_weighted_localizations

    @use_weighted_localizations.setter
    def use_weighted_localizations(self, value: bool):
        """Whether to use weighted average to calculate the mean localization per TID."""
        self._use_weighted_localizations = value
        self._weighted_localizations_to_be_recomputed = True

    @classmethod
    def processed_properties(cls):
        """Return the processed dataframe columns."""
        return MinFluxReader.processed_properties()

    @classmethod
    def trace_stats_properties(cls):
        """Return the columns of the filtered_dataframe_stats."""
        return [
            "tid",
            "n",
            "fluo",
            "mx",
            "my",
            "mz",
            "sx",
            "sy",
            "sxy",
            "exy",
            "rms_xy",
            "sz",
            "ez",
            "mtim",
            "tim_tot",
        ]

    @classmethod
    def trace_stats_with_tracking_properties(cls):
        """Return the columns of the filtered_dataframe_stats with tracking columns."""
        return MinFluxProcessor.trace_stats_properties() + [
            "avg_speed",
            "total_dist",
        ]

    def reset(self):
        """Drops all dynamic filters and resets the data to the processed data frame with global filters."""

        # Clear the selection per fluorophore; they will be reinitialized as
        # all selected at the first access.
        self._init_selected_rows_dict()

        # Reset the mapping to the corresponding fluorophore
        self.processed_dataframe["fluo"] = 1

        # Reset fluorophore names (clear custom names first)
        self._fluorophore_names = {}
        self._init_fluorophore_names()

        # Default fluorophore is 0 (no selection)
        self.current_fluorophore_id = 0

        # Reset dynamic filters flag
        self._has_dynamic_filters = False

        # Apply global filters
        self._apply_global_filters()

    def set_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]):
        """Assign the fluorophore IDs to current filtered dataset.
        
        This method assigns new fluorophore IDs only to the rows in the current
        filtered dataset. All other rows remain unchanged.
        """
        if self.filtered_dataframe is None:
            return
        if len(fluorophore_ids) != len(self.filtered_dataframe.index):
            raise ValueError(
                "The number of fluorophore IDs does not match the number of entries in the dataframe."
            )

        # Get the actual indices from the filtered dataframe
        filtered_indices = self.filtered_dataframe.index
        
        # Assign the new fluorophore IDs to those specific indices only
        # All other rows remain unchanged (preserving other fluorophores)
        self.processed_dataframe.loc[filtered_indices, "fluo"] = fluorophore_ids.astype(np.uint8)

        # Apply global filters
        self._init_selected_rows_dict()
        self._init_fluorophore_names()
        self._apply_global_filters()

    def set_full_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]):
        """Assign the fluorophore IDs to the original, full dataframe ignoring current filters."""
        if self.processed_dataframe is None:
            return
        if len(fluorophore_ids) != len(self.processed_dataframe.index):
            raise ValueError(
                "The number of fluorophore IDs does not match the number of entries in the dataframe."
            )
        self.processed_dataframe["fluo"] = fluorophore_ids.astype(np.uint8)

        # Apply global filters
        self._init_selected_rows_dict()
        self._init_fluorophore_names()
        self._apply_global_filters()

    def select_by_rows(
        self, indices: np.ndarray, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed
        DataFrame row indices.

        The underlying dataframe is not modified.

        Parameters
        ----------

        indices: np.ndarray
            Logical array for selecting the elements to be returned.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
        """
        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.iloc[indices]
        else:
            if self.filtered_dataframe is None:
                return None
            return self.filtered_dataframe.iloc[indices]

    def select_by_series_iloc(
        self, iloc: np.ndarray, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed
        DataFrame index locations.

        The underlying dataframe is not modified.

        Parameters
        ----------

        iloc: np.ndarray
            Array of Series index locations for selecting rows.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
        """
        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[iloc]
        else:
            if self.filtered_dataframe is None:
                return None
            return self.filtered_dataframe.loc[iloc]

    def select_by_1d_range(
        self, x_prop, x_range, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed
        parameter and corresponding range.

        The underlying dataframe is not modified.

        Parameters
        ----------

        x_prop: str
            Property to be filtered by corresponding x_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded.
        """

        # Make sure that the range is increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[
                (self._weighted_localizations[x_prop] >= x_min)
                & (self._weighted_localizations[x_prop] < x_max)
            ]
        else:
            # Work with currently selected rows
            if self.filtered_dataframe is None:
                return None
            df = self.filtered_dataframe
            return df.loc[(df[x_prop] >= x_min) & (df[x_prop] < x_max)]

    def select_by_2d_range(
        self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False
    ) -> Union[None, pd.DataFrame]:
        """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed
        parameters and corresponding ranges.

        The underlying dataframe is not modified.

        Parameters
        ----------

        x_prop: str
            First property to be filtered by corresponding x_range.

        y_prop: str
            Second property to be filtered by corresponding y_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the first property.

        y_range: tuple
            Tuple containing the minimum and maximum values for the second property.

        from_weighted_locs: bool
            If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

        Returns
        -------

        subset: Union[None, pd.DataFrame]
            A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file
            was loaded.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        y_min = y_range[0]
        y_max = y_range[1]
        if y_max < y_min:
            y_max, y_min = y_min, y_max

        if from_weighted_locs:
            if self._weighted_localizations is None:
                return None
            return self._weighted_localizations.loc[
                (self._weighted_localizations[x_prop] >= x_min)
                & (self._weighted_localizations[x_prop] < x_max)
                & (self._weighted_localizations[y_prop] >= y_min)
                & (self._weighted_localizations[y_prop] < y_max)
            ]
        else:
            # Work with currently selected rows
            if self.filtered_dataframe is None:
                return None
            df = self.filtered_dataframe
            return df.loc[
                (df[x_prop] >= x_min)
                & (df[x_prop] < x_max)
                & (df[y_prop] >= y_min)
                & (df[y_prop] < y_max)
            ]

    def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range):
        """Filter dataset by the extracting a rectangular ROI over two parameters and two ranges.

        Parameters
        ----------

        x_prop: str
            First property to be filtered by corresponding x_range.

        y_prop: str
            Second property to be filtered by corresponding y_range.

        x_range: tuple
            Tuple containing the minimum and maximum values for the first property.

        y_range: tuple
            Tuple containing the minimum and maximum values for the second property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        y_min = y_range[0]
        y_max = y_range[1]
        if y_max < y_min:
            y_max, y_min = y_min, y_max

        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = (
                    self._selected_rows_dict[fluo_id]
                    & (self.filtered_dataframe[x_prop] >= x_min)
                    & (self.filtered_dataframe[x_prop] < x_max)
                    & (self.filtered_dataframe[y_prop] >= y_min)
                    & (self.filtered_dataframe[y_prop] < y_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = (
                self._selected_rows_dict[fluo_id]
                & (self.filtered_dataframe[x_prop] >= x_min)
                & (self.filtered_dataframe[x_prop] < x_max)
                & (self.filtered_dataframe[y_prop] >= y_min)
                & (self.filtered_dataframe[y_prop] < y_max)
            )

        # Make sure to always apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _apply_global_filters(self):
        """Apply filters that are defined in the global application configuration."""

        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._filter_by_tid_length(fluo_id)
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            self._selected_rows_dict[self.current_fluorophore_id] = self._filter_by_tid_length(
                self.current_fluorophore_id
            )

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _filter_by_tid_length(self, index):
        # Make sure to count only currently selected rows
        df = self.processed_dataframe.copy()
        df.loc[np.invert(self._selected_rows_dict[index]), "tid"] = np.nan

        # Select all rows where the count of TIDs is larger than self._min_trace_num
        counts = df["tid"].value_counts(normalize=False)
        return df["tid"].isin(counts[counts >= self.min_trace_length].index)

    def filter_by_single_threshold(
        self, prop: str, threshold: Union[int, float], larger_than: bool = True
    ):
        """Apply single threshold to filter values either lower or higher (equal) than threshold for given property."""

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                if larger_than:
                    self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                        self.filtered_dataframe[prop] >= threshold
                    )
                else:
                    self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                        self.filtered_dataframe[prop] < threshold
                    )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            if larger_than:
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    self.filtered_dataframe[prop] >= threshold
                )
            else:
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    self.filtered_dataframe[prop] < threshold
                )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_range(self, x_prop: str, x_range: tuple):
        """Apply min and max thresholding to the given property.

        Parameters
        ----------

        x_prop: str
            Name of the property (dataframe column) to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = (
                    self._selected_rows_dict[fluo_id]
                    & (self.filtered_dataframe[x_prop] >= x_min)
                    & (self.filtered_dataframe[x_prop] < x_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = (
                self._selected_rows_dict[fluo_id]
                & (self.filtered_dataframe[x_prop] >= x_min)
                & (self.filtered_dataframe[x_prop] < x_max)
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_range_complement(self, prop: str, x_range: tuple):
        """Apply min and max thresholding to the given property but keep the
        data outside the range (i.e., crop the selected range).

        Parameters
        ----------

        prop: str
            Name of the property (dataframe column) to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for cropping the selected property.
        """

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    (self.filtered_dataframe[prop] < x_min)
                    | (self.filtered_dataframe[prop] >= x_max)
                )
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            fluo_id = self.current_fluorophore_id
            self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                (self.filtered_dataframe[prop] < x_min)
                | (self.filtered_dataframe[prop] >= x_max)
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple):
        """Filter TIDs by min and max thresholding using the given property from the stats dataframe.

        Parameters
        ----------

        x_prop_stats: str
            Name of the property (column) from the stats dataframe used to filter.

        x_range: tuple
            Tuple containing the minimum and maximum values for the selected property.
        """

        # Make sure the property exists in the stats dataframe
        if x_prop_stats not in self.filtered_dataframe_stats.columns:
            raise ValueError(
                f"The property {x_prop_stats} does not exist in `filtered_dataframe_stats`."
            )

        # Make sure that the ranges are increasing
        x_min = x_range[0]
        x_max = x_range[1]
        if x_max < x_min:
            x_max, x_min = x_min, x_max

        # Find all TIDs for current fluorophore ID by which the requested stats property is inside the range
        tids_to_keep = self.filtered_dataframe_stats[
            (
                (self.filtered_dataframe_stats[x_prop_stats] >= x_min)
                & (self.filtered_dataframe_stats[x_prop_stats] <= x_max)
            )
        ]["tid"].to_numpy()

        # Rows of the filtered dataframe to keep
        rows_to_keep = self.filtered_dataframe["tid"].isin(tids_to_keep)

        # Apply filter
        if self.current_fluorophore_id == 0:
            # Apply to all fluorophores
            for fluo_id in self._selected_rows_dict.keys():
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & rows_to_keep
        elif self.current_fluorophore_id in self._selected_rows_dict:
            # Apply to specific fluorophore
            self._selected_rows_dict[self.current_fluorophore_id] = (
                self._selected_rows_dict[self.current_fluorophore_id] & rows_to_keep
            )

        # Apply the global filters
        self._apply_global_filters()

        # Mark that dynamic filters have been applied
        self._has_dynamic_filters = True

        # Make sure to flag the derived data to be recomputed
        self._stats_to_be_recomputed = True
        self._weighted_localizations_to_be_recomputed = True

    def _calculate_statistics(self):
        """Calculate per-trace statistics."""

        # Make sure we have processed dataframe to work on
        if self.processed_dataframe is None:
            return

        # Only recompute statistics if needed
        if not self._stats_to_be_recomputed:
            return

        # Work with currently selected rows
        df = self.filtered_dataframe

        # Calculate the statistics
        df_tid = self.calculate_statistics_on(df, self.reader.is_tracking)

        # Store the results
        self._filtered_stats_dataframe = df_tid

        # Flag the statistics to be computed
        self._stats_to_be_recomputed = False

    @staticmethod
    def calculate_statistics_on(
        df: pd.DataFrame, is_tracking: bool = False
    ) -> pd.DataFrame:
        """Calculate per-trace statistics for the selected dataframe.

        Parameters
        ----------

        df: pd.DataFrame
            DataFrame (view) generated by one of the `select_by_*` methods.

        is_tracking: bool
            Whether the data comes from a tracking instead of a localization experiment.

        Returns
        -------
        df_stats: pd.DataFrame
            Per-trace statistics calculated on the passed DataFrame selection (view).
        """

        # Prepare a dataframe with the statistics
        if is_tracking:
            df_tid = pd.DataFrame(
                columns=MinFluxProcessor.trace_stats_with_tracking_properties()
            )
        else:
            df_tid = pd.DataFrame(columns=MinFluxProcessor.trace_stats_properties())

        # Calculate some statistics per TID on the passed dataframe
        df_grouped = df.groupby("tid")

        # Base statistics
        tid = df_grouped["tid"].first().to_numpy()
        n = df_grouped["tid"].count().to_numpy()
        mx = df_grouped["x"].mean().to_numpy()
        my = df_grouped["y"].mean().to_numpy()
        mz = df_grouped["z"].mean().to_numpy()
        sx = df_grouped["x"].std().to_numpy()
        sy = df_grouped["y"].std().to_numpy()
        sz = df_grouped["z"].std().to_numpy()
        tmp = np.power(sx, 2) + np.power(sy, 2)
        sxy = np.sqrt(tmp)
        rms_xy = np.sqrt(tmp / 2)
        exy = sxy / np.sqrt(n)
        ez = sz / np.sqrt(n)
        fluo = df_grouped["fluo"].agg(lambda x: mode(x, keepdims=True)[0][0]).to_numpy()
        mtim = df_grouped["tim"].mean().to_numpy()
        tot_tim, _, _ = calculate_trace_time(df)

        # Optional tracking statistics
        if is_tracking:
            total_distance, _, _ = calculate_total_distance_traveled(df)
            speeds = (
                total_distance["displacement"].to_numpy()
                / tot_tim["tim_tot"].to_numpy()
            )

        # Store trace stats
        df_tid["tid"] = tid  # Trace ID
        df_tid["n"] = n  # Number of localizations for given trace ID
        df_tid["mx"] = mx  # x mean localization
        df_tid["my"] = my  # y mean localization
        df_tid["mz"] = mz  # z mean localization
        df_tid["sx"] = sx  # x localization precision
        df_tid["sy"] = sy  # y localization precision
        df_tid["sxy"] = sxy  # Lateral (x, y) localization precision
        df_tid["rms_xy"] = rms_xy  # Lateral root mean square
        df_tid["exy"] = exy  # Standard error of sxy
        df_tid["sz"] = sz  # z localization precision
        df_tid["ez"] = ez  # Standard error of ez
        df_tid["fluo"] = fluo  # Assigned fluorophore ID
        df_tid["mtim"] = mtim  # Average time per trace
        df_tid["tim_tot"] = tot_tim["tim_tot"].to_numpy()  # Total time per trace
        if is_tracking:
            df_tid["avg_speed"] = speeds  # Average speed per trace
            df_tid["total_dist"] = total_distance[
                "displacement"
            ].to_numpy()  # Total travelled distance per trace

        # ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"] columns will contain
        # np.nan if n == 1: we replace them with 0.0.
        # @TODO: should this be changed? It could be a global option.
        df_tid[["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]] = df_tid[
            ["sx", "sy", "sxy", "rms_xy", "exy", "sz", "ez"]
        ].fillna(value=0.0)

        # Return the results
        return df_tid

    def _calculate_weighted_positions(self):
        """Calculate per-trace localization weighted by relative photon count."""

        if self.filtered_dataframe is None:
            return

        if not self._weighted_localizations_to_be_recomputed:
            return

        # Work with a copy of a subset of current filtered dataframe
        df = self.filtered_dataframe[
            ["tid", "tim", "eco", "x", "y", "z", "fluo"]
        ].copy()

        if self._use_weighted_localizations:
            # Calculate weights for each coordinate based on 'eco'
            total_eco_per_tid = df.groupby("tid")["eco"].transform("sum")
            eco_rel = df["eco"] / total_eco_per_tid

            # Calculate weighted positions
            df.loc[:, "x_rel"] = df["x"] * eco_rel
            df.loc[:, "y_rel"] = df["y"] * eco_rel
            df.loc[:, "z_rel"] = df["z"] * eco_rel

            # Summing up the relative contributions
            df_grouped = df.groupby("tid")
            x_w = df_grouped["x_rel"].sum()
            y_w = df_grouped["y_rel"].sum()
            z_w = df_grouped["z_rel"].sum()

            # Return the most frequent fluo ID: traces should be homogeneous with respect
            # to their fluorophore assignment, but we search anyway for safety.
            fluo_mode = df_grouped["fluo"].agg(
                lambda x: x.mode()[0] if not x.empty else np.nan
            )

        else:
            # Calculate simple average of localizations by TID
            df_grouped = df.groupby("tid")
            x_w = df_grouped["x"].mean()
            y_w = df_grouped["y"].mean()
            z_w = df_grouped["z"].mean()

            # Return the most frequent fluo ID: traces should be homogeneous with respect
            # to their fluorophore assignment, but we search anyway for safety.
            fluo_mode = df_grouped["fluo"].agg(
                lambda x: x.mode()[0] if not x.empty else np.nan
            )

        # We calculate also the mean timestamp (not weighted)
        tim = df_grouped["tim"].mean()

        # Prepare a dataframe with the weighted localizations
        df_loc = pd.DataFrame(
            {
                "tid": x_w.index,
                "tim": tim.to_numpy(),
                "x": x_w.to_numpy(),
                "y": y_w.to_numpy(),
                "z": z_w.to_numpy(),
                "fluo": fluo_mode.to_numpy(),
            }
        )

        # Update the weighted localization dataframe
        self._weighted_localizations = df_loc

        # Flag the results as up-to-date
        self._weighted_localizations_to_be_recomputed = False
prop use_weighted_localizations : bool
Expand source code
@property
def use_weighted_localizations(self) -> bool:
    """Whether to use weighted average to calculate the mean localization per TID."""
    return self._use_weighted_localizations

Whether to use weighted average to calculate the mean localization per TID.

prop weighted_localizations : pandas.core.frame.DataFrame | None
Expand source code
@property
def weighted_localizations(self) -> Union[None, pd.DataFrame]:
    """Return the average (x, y, z) position per TID weighted by the relative photon count."""
    if self._weighted_localizations_to_be_recomputed:
        self._calculate_weighted_positions()
    return self._weighted_localizations

Return the average (x, y, z) position per TID weighted by the relative photon count.

prop z_scaling_factor
Expand source code
@property
def z_scaling_factor(self):
    """Returns the scaling factor for the z coordinates from the underlying MinFluxReader."""
    return self.reader.z_scaling_factor

Returns the scaling factor for the z coordinates from the underlying MinFluxReader.

Methods

def filter_by_1d_range(self, x_prop: str, x_range: tuple)
Expand source code
def filter_by_1d_range(self, x_prop: str, x_range: tuple):
    """Apply min and max thresholding to the given property.

    Parameters
    ----------

    x_prop: str
        Name of the property (dataframe column) to filter.

    x_range: tuple
        Tuple containing the minimum and maximum values for the selected property.
    """

    # Make sure that the ranges are increasing
    x_min = x_range[0]
    x_max = x_range[1]
    if x_max < x_min:
        x_max, x_min = x_min, x_max

    # Apply filter
    if self.current_fluorophore_id == 0:
        # Apply to all fluorophores
        for fluo_id in self._selected_rows_dict.keys():
            self._selected_rows_dict[fluo_id] = (
                self._selected_rows_dict[fluo_id]
                & (self.filtered_dataframe[x_prop] >= x_min)
                & (self.filtered_dataframe[x_prop] < x_max)
            )
    elif self.current_fluorophore_id in self._selected_rows_dict:
        # Apply to specific fluorophore
        fluo_id = self.current_fluorophore_id
        self._selected_rows_dict[fluo_id] = (
            self._selected_rows_dict[fluo_id]
            & (self.filtered_dataframe[x_prop] >= x_min)
            & (self.filtered_dataframe[x_prop] < x_max)
        )

    # Apply the global filters
    self._apply_global_filters()

    # Mark that dynamic filters have been applied
    self._has_dynamic_filters = True

    # Make sure to flag the derived data to be recomputed
    self._stats_to_be_recomputed = True
    self._weighted_localizations_to_be_recomputed = True

Apply min and max thresholding to the given property.

Parameters

x_prop : str
Name of the property (dataframe column) to filter.
x_range : tuple
Tuple containing the minimum and maximum values for the selected property.
def filter_by_1d_range_complement(self, prop: str, x_range: tuple)
Expand source code
def filter_by_1d_range_complement(self, prop: str, x_range: tuple):
    """Apply min and max thresholding to the given property but keep the
    data outside the range (i.e., crop the selected range).

    Parameters
    ----------

    prop: str
        Name of the property (dataframe column) to filter.

    x_range: tuple
        Tuple containing the minimum and maximum values for cropping the selected property.
    """

    # Make sure that the ranges are increasing
    x_min = x_range[0]
    x_max = x_range[1]
    if x_max < x_min:
        x_max, x_min = x_min, x_max

    # Apply filter
    if self.current_fluorophore_id == 0:
        # Apply to all fluorophores
        for fluo_id in self._selected_rows_dict.keys():
            self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                (self.filtered_dataframe[prop] < x_min)
                | (self.filtered_dataframe[prop] >= x_max)
            )
    elif self.current_fluorophore_id in self._selected_rows_dict:
        # Apply to specific fluorophore
        fluo_id = self.current_fluorophore_id
        self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
            (self.filtered_dataframe[prop] < x_min)
            | (self.filtered_dataframe[prop] >= x_max)
        )

    # Apply the global filters
    self._apply_global_filters()

    # Mark that dynamic filters have been applied
    self._has_dynamic_filters = True

    # Make sure to flag the derived data to be recomputed
    self._stats_to_be_recomputed = True
    self._weighted_localizations_to_be_recomputed = True

Apply min and max thresholding to the given property but keep the data outside the range (i.e., crop the selected range).

Parameters

prop : str
Name of the property (dataframe column) to filter.
x_range : tuple
Tuple containing the minimum and maximum values for cropping the selected property.
def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple)
Expand source code
def filter_by_1d_stats(self, x_prop_stats: str, x_range: tuple):
    """Filter TIDs by min and max thresholding using the given property from the stats dataframe.

    Parameters
    ----------

    x_prop_stats: str
        Name of the property (column) from the stats dataframe used to filter.

    x_range: tuple
        Tuple containing the minimum and maximum values for the selected property.
    """

    # Make sure the property exists in the stats dataframe
    if x_prop_stats not in self.filtered_dataframe_stats.columns:
        raise ValueError(
            f"The property {x_prop_stats} does not exist in `filtered_dataframe_stats`."
        )

    # Make sure that the ranges are increasing
    x_min = x_range[0]
    x_max = x_range[1]
    if x_max < x_min:
        x_max, x_min = x_min, x_max

    # Find all TIDs for current fluorophore ID by which the requested stats property is inside the range
    tids_to_keep = self.filtered_dataframe_stats[
        (
            (self.filtered_dataframe_stats[x_prop_stats] >= x_min)
            & (self.filtered_dataframe_stats[x_prop_stats] <= x_max)
        )
    ]["tid"].to_numpy()

    # Rows of the filtered dataframe to keep
    rows_to_keep = self.filtered_dataframe["tid"].isin(tids_to_keep)

    # Apply filter
    if self.current_fluorophore_id == 0:
        # Apply to all fluorophores
        for fluo_id in self._selected_rows_dict.keys():
            self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & rows_to_keep
    elif self.current_fluorophore_id in self._selected_rows_dict:
        # Apply to specific fluorophore
        self._selected_rows_dict[self.current_fluorophore_id] = (
            self._selected_rows_dict[self.current_fluorophore_id] & rows_to_keep
        )

    # Apply the global filters
    self._apply_global_filters()

    # Mark that dynamic filters have been applied
    self._has_dynamic_filters = True

    # Make sure to flag the derived data to be recomputed
    self._stats_to_be_recomputed = True
    self._weighted_localizations_to_be_recomputed = True

Filter TIDs by min and max thresholding using the given property from the stats dataframe.

Parameters

x_prop_stats : str
Name of the property (column) from the stats dataframe used to filter.
x_range : tuple
Tuple containing the minimum and maximum values for the selected property.
def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range)
Expand source code
def filter_by_2d_range(self, x_prop, y_prop, x_range, y_range):
    """Filter dataset by the extracting a rectangular ROI over two parameters and two ranges.

    Parameters
    ----------

    x_prop: str
        First property to be filtered by corresponding x_range.

    y_prop: str
        Second property to be filtered by corresponding y_range.

    x_range: tuple
        Tuple containing the minimum and maximum values for the first property.

    y_range: tuple
        Tuple containing the minimum and maximum values for the second property.
    """

    # Make sure that the ranges are increasing
    x_min = x_range[0]
    x_max = x_range[1]
    if x_max < x_min:
        x_max, x_min = x_min, x_max

    y_min = y_range[0]
    y_max = y_range[1]
    if y_max < y_min:
        y_max, y_min = y_min, y_max

    if self.current_fluorophore_id == 0:
        # Apply to all fluorophores
        for fluo_id in self._selected_rows_dict.keys():
            self._selected_rows_dict[fluo_id] = (
                self._selected_rows_dict[fluo_id]
                & (self.filtered_dataframe[x_prop] >= x_min)
                & (self.filtered_dataframe[x_prop] < x_max)
                & (self.filtered_dataframe[y_prop] >= y_min)
                & (self.filtered_dataframe[y_prop] < y_max)
            )
    elif self.current_fluorophore_id in self._selected_rows_dict:
        # Apply to specific fluorophore
        fluo_id = self.current_fluorophore_id
        self._selected_rows_dict[fluo_id] = (
            self._selected_rows_dict[fluo_id]
            & (self.filtered_dataframe[x_prop] >= x_min)
            & (self.filtered_dataframe[x_prop] < x_max)
            & (self.filtered_dataframe[y_prop] >= y_min)
            & (self.filtered_dataframe[y_prop] < y_max)
        )

    # Make sure to always apply the global filters
    self._apply_global_filters()

    # Mark that dynamic filters have been applied
    self._has_dynamic_filters = True

    # Make sure to flag the derived data to be recomputed
    self._stats_to_be_recomputed = True
    self._weighted_localizations_to_be_recomputed = True

Filter dataset by the extracting a rectangular ROI over two parameters and two ranges.

Parameters

x_prop : str
First property to be filtered by corresponding x_range.
y_prop : str
Second property to be filtered by corresponding y_range.
x_range : tuple
Tuple containing the minimum and maximum values for the first property.
y_range : tuple
Tuple containing the minimum and maximum values for the second property.
def filter_by_single_threshold(self, prop: str, threshold: int | float, larger_than: bool = True)
Expand source code
def filter_by_single_threshold(
    self, prop: str, threshold: Union[int, float], larger_than: bool = True
):
    """Apply single threshold to filter values either lower or higher (equal) than threshold for given property."""

    # Apply filter
    if self.current_fluorophore_id == 0:
        # Apply to all fluorophores
        for fluo_id in self._selected_rows_dict.keys():
            if larger_than:
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    self.filtered_dataframe[prop] >= threshold
                )
            else:
                self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                    self.filtered_dataframe[prop] < threshold
                )
    elif self.current_fluorophore_id in self._selected_rows_dict:
        # Apply to specific fluorophore
        fluo_id = self.current_fluorophore_id
        if larger_than:
            self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                self.filtered_dataframe[prop] >= threshold
            )
        else:
            self._selected_rows_dict[fluo_id] = self._selected_rows_dict[fluo_id] & (
                self.filtered_dataframe[prop] < threshold
            )

    # Apply the global filters
    self._apply_global_filters()

    # Mark that dynamic filters have been applied
    self._has_dynamic_filters = True

    # Make sure to flag the derived data to be recomputed
    self._stats_to_be_recomputed = True
    self._weighted_localizations_to_be_recomputed = True

Apply single threshold to filter values either lower or higher (equal) than threshold for given property.

def get_fluorophore_name(self, fluo_id: int) ‑> str
Expand source code
def get_fluorophore_name(self, fluo_id: int) -> str:
    """Get the name for a specific fluorophore ID.
    
    Parameters
    ----------
    fluo_id: int
        The fluorophore ID
        
    Returns
    -------
    name: str
        The name of the fluorophore (defaults to string representation of ID if not set)
    """
    return self._fluorophore_names.get(fluo_id, str(fluo_id))

Get the name for a specific fluorophore ID.

Parameters

fluo_id : int
The fluorophore ID

Returns

name : str
The name of the fluorophore (defaults to string representation of ID if not set)
def has_filters_applied(self) ‑> bool
Expand source code
def has_filters_applied(self) -> bool:
    """Check if any dynamic (user-applied) filters have been applied to the data.
    
    This does not count global filters like min_trace_length which are always applied.
    
    Returns
    -------
    has_filters: bool
        True if any dynamic filters are active, False otherwise.
    """
    return self._has_dynamic_filters

Check if any dynamic (user-applied) filters have been applied to the data.

This does not count global filters like min_trace_length which are always applied.

Returns

has_filters : bool
True if any dynamic filters are active, False otherwise.
def replace_dataset(self, new_dataset: pyminflux.processor._dataset.MinFluxDataset)
Expand source code
def replace_dataset(self, new_dataset: MinFluxDataset):
    """Replace the current dataset with a new one.
    
    This is useful when combining datasets - the processor's dataset
    can be replaced with the combined dataset without losing filter state
    or other settings.
    
    Parameters
    ----------
    new_dataset: MinFluxDataset
        The new dataset to use.
    """
    # Store current settings
    old_min_trace_length = self._min_trace_length
    old_use_weighted = self._use_weighted_localizations
    old_fluorophore_names = self._fluorophore_names.copy()
    
    # Replace the dataset
    self.dataset = new_dataset
    
    # Re-initialize selection and filters
    self._selected_rows_dict = None
    self._init_selected_rows_dict()
    
    # Re-initialize fluorophore names (preserving custom names where IDs match)
    self._fluorophore_names = {}
    self._init_fluorophore_names()
    
    # Restore custom names for matching fluorophore IDs
    for fluo_id, name in old_fluorophore_names.items():
        if fluo_id in self._fluorophore_names:
            self._fluorophore_names[fluo_id] = name
    
    # Restore settings
    self._min_trace_length = old_min_trace_length
    self._use_weighted_localizations = old_use_weighted
    
    # Reset current fluorophore to "all"
    self._current_fluorophore_id = 0
    
    # Reset dynamic filters flag (dataset replacement clears all filters)
    self._has_dynamic_filters = False
    
    # Apply global filters
    self._apply_global_filters()
    
    # Flag derived data to be recomputed
    self._stats_to_be_recomputed = True
    self._weighted_localizations_to_be_recomputed = True

Replace the current dataset with a new one.

This is useful when combining datasets - the processor's dataset can be replaced with the combined dataset without losing filter state or other settings.

Parameters

new_dataset : MinFluxDataset
The new dataset to use.
def reset(self)
Expand source code
def reset(self):
    """Drops all dynamic filters and resets the data to the processed data frame with global filters."""

    # Clear the selection per fluorophore; they will be reinitialized as
    # all selected at the first access.
    self._init_selected_rows_dict()

    # Reset the mapping to the corresponding fluorophore
    self.processed_dataframe["fluo"] = 1

    # Reset fluorophore names (clear custom names first)
    self._fluorophore_names = {}
    self._init_fluorophore_names()

    # Default fluorophore is 0 (no selection)
    self.current_fluorophore_id = 0

    # Reset dynamic filters flag
    self._has_dynamic_filters = False

    # Apply global filters
    self._apply_global_filters()

Drops all dynamic filters and resets the data to the processed data frame with global filters.

def select_by_1d_range(self, x_prop, x_range, from_weighted_locs: bool = False) ‑> pandas.core.frame.DataFrame | None
Expand source code
def select_by_1d_range(
    self, x_prop, x_range, from_weighted_locs: bool = False
) -> Union[None, pd.DataFrame]:
    """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed
    parameter and corresponding range.

    The underlying dataframe is not modified.

    Parameters
    ----------

    x_prop: str
        Property to be filtered by corresponding x_range.

    x_range: tuple
        Tuple containing the minimum and maximum values for the selected property.

    from_weighted_locs: bool
        If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

    Returns
    -------

    subset: Union[None, pd.DataFrame]
        A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded.
    """

    # Make sure that the range is increasing
    x_min = x_range[0]
    x_max = x_range[1]
    if x_max < x_min:
        x_max, x_min = x_min, x_max

    if from_weighted_locs:
        if self._weighted_localizations is None:
            return None
        return self._weighted_localizations.loc[
            (self._weighted_localizations[x_prop] >= x_min)
            & (self._weighted_localizations[x_prop] < x_max)
        ]
    else:
        # Work with currently selected rows
        if self.filtered_dataframe is None:
            return None
        df = self.filtered_dataframe
        return df.loc[(df[x_prop] >= x_min) & (df[x_prop] < x_max)]

Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameter and corresponding range.

The underlying dataframe is not modified.

Parameters

x_prop : str
Property to be filtered by corresponding x_range.
x_range : tuple
Tuple containing the minimum and maximum values for the selected property.
from_weighted_locs : bool
If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

Returns

subset : Union[None, pd.DataFrame]
A view on a subset of the dataframe defined by the passed property and range, or None if no file was loaded.
def select_by_2d_range(self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False) ‑> pandas.core.frame.DataFrame | None
Expand source code
def select_by_2d_range(
    self, x_prop, y_prop, x_range, y_range, from_weighted_locs: bool = False
) -> Union[None, pd.DataFrame]:
    """Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed
    parameters and corresponding ranges.

    The underlying dataframe is not modified.

    Parameters
    ----------

    x_prop: str
        First property to be filtered by corresponding x_range.

    y_prop: str
        Second property to be filtered by corresponding y_range.

    x_range: tuple
        Tuple containing the minimum and maximum values for the first property.

    y_range: tuple
        Tuple containing the minimum and maximum values for the second property.

    from_weighted_locs: bool
        If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

    Returns
    -------

    subset: Union[None, pd.DataFrame]
        A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file
        was loaded.
    """

    # Make sure that the ranges are increasing
    x_min = x_range[0]
    x_max = x_range[1]
    if x_max < x_min:
        x_max, x_min = x_min, x_max

    y_min = y_range[0]
    y_max = y_range[1]
    if y_max < y_min:
        y_max, y_min = y_min, y_max

    if from_weighted_locs:
        if self._weighted_localizations is None:
            return None
        return self._weighted_localizations.loc[
            (self._weighted_localizations[x_prop] >= x_min)
            & (self._weighted_localizations[x_prop] < x_max)
            & (self._weighted_localizations[y_prop] >= y_min)
            & (self._weighted_localizations[y_prop] < y_max)
        ]
    else:
        # Work with currently selected rows
        if self.filtered_dataframe is None:
            return None
        df = self.filtered_dataframe
        return df.loc[
            (df[x_prop] >= x_min)
            & (df[x_prop] < x_max)
            & (df[y_prop] >= y_min)
            & (df[y_prop] < y_max)
        ]

Return a view on a subset of the filtered dataset or the weighted localisations defined by the passed parameters and corresponding ranges.

The underlying dataframe is not modified.

Parameters

x_prop : str
First property to be filtered by corresponding x_range.
y_prop : str
Second property to be filtered by corresponding y_range.
x_range : tuple
Tuple containing the minimum and maximum values for the first property.
y_range : tuple
Tuple containing the minimum and maximum values for the second property.
from_weighted_locs : bool
If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

Returns

subset : Union[None, pd.DataFrame]
A view on a subset of the dataframe defined by the passed properties and ranges, or None if no file was loaded.
def select_by_rows(self, indices: numpy.ndarray, from_weighted_locs: bool = False) ‑> pandas.core.frame.DataFrame | None
Expand source code
def select_by_rows(
    self, indices: np.ndarray, from_weighted_locs: bool = False
) -> Union[None, pd.DataFrame]:
    """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed
    DataFrame row indices.

    The underlying dataframe is not modified.

    Parameters
    ----------

    indices: np.ndarray
        Logical array for selecting the elements to be returned.

    from_weighted_locs: bool
        If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

    Returns
    -------

    subset: Union[None, pd.DataFrame]
        A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
    """
    if from_weighted_locs:
        if self._weighted_localizations is None:
            return None
        return self._weighted_localizations.iloc[indices]
    else:
        if self.filtered_dataframe is None:
            return None
        return self.filtered_dataframe.iloc[indices]

Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame row indices.

The underlying dataframe is not modified.

Parameters

indices : np.ndarray
Logical array for selecting the elements to be returned.
from_weighted_locs : bool
If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

Returns

subset : Union[None, pd.DataFrame]
A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
def select_by_series_iloc(self, iloc: numpy.ndarray, from_weighted_locs: bool = False) ‑> pandas.core.frame.DataFrame | None
Expand source code
def select_by_series_iloc(
    self, iloc: np.ndarray, from_weighted_locs: bool = False
) -> Union[None, pd.DataFrame]:
    """Return view on a subset of the filtered dataset or the weighted localisations defined by the passed
    DataFrame index locations.

    The underlying dataframe is not modified.

    Parameters
    ----------

    iloc: np.ndarray
        Array of Series index locations for selecting rows.

    from_weighted_locs: bool
        If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

    Returns
    -------

    subset: Union[None, pd.DataFrame]
        A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
    """
    if from_weighted_locs:
        if self._weighted_localizations is None:
            return None
        return self._weighted_localizations.loc[iloc]
    else:
        if self.filtered_dataframe is None:
            return None
        return self.filtered_dataframe.loc[iloc]

Return view on a subset of the filtered dataset or the weighted localisations defined by the passed DataFrame index locations.

The underlying dataframe is not modified.

Parameters

iloc : np.ndarray
Array of Series index locations for selecting rows.
from_weighted_locs : bool
If True, select from the weighted_localizations dataframe; otherwise, from the filtered_dataframe.

Returns

subset : Union[None, pd.DataFrame]
A view on a subset of the dataframe defined by the passed indices, or None if no file was loaded.
def set_fluorophore_ids(self,
fluorophore_ids: numpy.ndarray[tuple[typing.Any, ...], numpy.dtype[numpy.uint8]])
Expand source code
def set_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]):
    """Assign the fluorophore IDs to current filtered dataset.
    
    This method assigns new fluorophore IDs only to the rows in the current
    filtered dataset. All other rows remain unchanged.
    """
    if self.filtered_dataframe is None:
        return
    if len(fluorophore_ids) != len(self.filtered_dataframe.index):
        raise ValueError(
            "The number of fluorophore IDs does not match the number of entries in the dataframe."
        )

    # Get the actual indices from the filtered dataframe
    filtered_indices = self.filtered_dataframe.index
    
    # Assign the new fluorophore IDs to those specific indices only
    # All other rows remain unchanged (preserving other fluorophores)
    self.processed_dataframe.loc[filtered_indices, "fluo"] = fluorophore_ids.astype(np.uint8)

    # Apply global filters
    self._init_selected_rows_dict()
    self._init_fluorophore_names()
    self._apply_global_filters()

Assign the fluorophore IDs to current filtered dataset.

This method assigns new fluorophore IDs only to the rows in the current filtered dataset. All other rows remain unchanged.

def set_fluorophore_name(self, fluo_id: int, name: str)
Expand source code
def set_fluorophore_name(self, fluo_id: int, name: str):
    """Set the name for a specific fluorophore ID.
    
    Parameters
    ----------
    fluo_id: int
        The fluorophore ID (must be >= 1)
    name: str
        The name to assign to this fluorophore
    """
    if fluo_id < 1:
        raise ValueError(f"Fluorophore ID must be >= 1, got {fluo_id}")
    if not isinstance(name, str):
        raise ValueError(f"Fluorophore name must be a string, got {type(name)}")
    self._fluorophore_names[fluo_id] = name

Set the name for a specific fluorophore ID.

Parameters

fluo_id : int
The fluorophore ID (must be >= 1)
name : str
The name to assign to this fluorophore
def set_fluorophore_names(self, names: dict)
Expand source code
def set_fluorophore_names(self, names: dict):
    """Set fluorophore names from a dictionary mapping.
    
    This method updates the fluorophore names dictionary, preserving
    any existing names for fluorophore IDs not included in the names parameter.
    
    Parameters
    ----------
    names: dict
        Dictionary mapping fluo_id (int) to name (str)
    """
    if not isinstance(names, dict):
        raise ValueError(f"Names must be a dictionary, got {type(names)}")
    for fluo_id, name in names.items():
        if not isinstance(fluo_id, int) or fluo_id < 1:
            raise ValueError(f"Fluorophore ID must be an integer >= 1, got {fluo_id}")
        if not isinstance(name, str):
            raise ValueError(f"Fluorophore name must be a string, got {type(name)}")
    # Update the dictionary instead of replacing it to preserve existing names
    self._fluorophore_names.update(names)

Set fluorophore names from a dictionary mapping.

This method updates the fluorophore names dictionary, preserving any existing names for fluorophore IDs not included in the names parameter.

Parameters

names : dict
Dictionary mapping fluo_id (int) to name (str)
def set_full_fluorophore_ids(self,
fluorophore_ids: numpy.ndarray[tuple[typing.Any, ...], numpy.dtype[numpy.uint8]])
Expand source code
def set_full_fluorophore_ids(self, fluorophore_ids: NDArray[np.uint8]):
    """Assign the fluorophore IDs to the original, full dataframe ignoring current filters."""
    if self.processed_dataframe is None:
        return
    if len(fluorophore_ids) != len(self.processed_dataframe.index):
        raise ValueError(
            "The number of fluorophore IDs does not match the number of entries in the dataframe."
        )
    self.processed_dataframe["fluo"] = fluorophore_ids.astype(np.uint8)

    # Apply global filters
    self._init_selected_rows_dict()
    self._init_fluorophore_names()
    self._apply_global_filters()

Assign the fluorophore IDs to the original, full dataframe ignoring current filters.