Module pyminflux.reader

Reader of MINFLUX data.

Expand source code
#  Copyright (c) 2022 - 2024 D-BSSE, ETH Zurich.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#   limitations under the License.
#

__doc__ = "Reader of MINFLUX data."
__all__ = [
    "NativeMetadataReader",
    "NativeArrayReader",
    "NativeDataFrameReader",
    "MinFluxReader",
]

from ._native_reader import (
    NativeArrayReader,
    NativeDataFrameReader,
    NativeMetadataReader,
)
from ._reader import MinFluxReader

Sub-modules

pyminflux.reader.metadata
pyminflux.reader.util

Classes

class MinFluxReader (filename: Union[pathlib.Path, str], valid: bool = True, z_scaling_factor: float = 1.0, is_tracking: bool = False, pool_dcr: bool = False, dwell_time: float = 1.0)

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx, .npy or .mat file to read
valid : bool (optional, default = True)
Whether to load only valid localizations.
z_scaling_factor : float (optional, default = 1.0)
Refractive index mismatch correction factor to apply to the z coordinates.
is_tracking : bool (optional, default = False)
Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment.
pool_dcr : bool (optional, default = False)
Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.
dwell_time : float (optional, default 1.0)
Dwell time in milliseconds.
Expand source code
class MinFluxReader:
    __docs__ = "Reader of MINFLUX data in `.pmx`, `.npy` or `.mat` formats."

    __slots__ = [
        "_pool_dcr",
        "_cfr_index",
        "_data_array",
        "_data_df",
        "_data_full_df",
        "_dcr_index",
        "_dwell_time",
        "_eco_index",
        "_efo_index",
        "_filename",
        "_is_3d",
        "_is_aggregated",
        "_is_last_valid",
        "_is_tracking",
        "_last_valid",
        "_last_valid_cfr",
        "_loc_index",
        "_relocalizations",
        "_reps",
        "_tid_index",
        "_tim_index",
        "_unit_scaling_factor",
        "_valid",
        "_valid_cfr",
        "_valid_entries",
        "_vld_index",
        "_z_scaling_factor",
    ]

    def __init__(
        self,
        filename: Union[Path, str],
        valid: bool = True,
        z_scaling_factor: float = 1.0,
        is_tracking: bool = False,
        pool_dcr: bool = False,
        dwell_time: float = 1.0,
    ):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx`, `.npy` or `.mat` file to read

        valid: bool (optional, default = True)
            Whether to load only valid localizations.

        z_scaling_factor: float (optional, default = 1.0)
            Refractive index mismatch correction factor to apply to the z coordinates.

        is_tracking: bool (optional, default = False)
            Whether the dataset comes from a tracking experiment; otherwise, it is considered as a
            localization experiment.

        pool_dcr: bool (optional, default = False)
            Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.

        dwell_time: float (optional, default 1.0)
            Dwell time in milliseconds.
        """

        # Store the filename
        self._filename: Path = Path(filename)
        if not self._filename.is_file():
            raise IOError(f"The file {self._filename} does not seem to exist.")

        # Keep track of whether the chosen sequence is the last valid.
        self._is_last_valid: bool = False

        # Store the valid flag
        self._valid: bool = valid

        # The localizations are stored in meters in the Imspector files and by
        # design also in the `.pmx` format. Here, we scale them to be in nm
        self._unit_scaling_factor: float = 1e9

        # Store the z correction factor
        self._z_scaling_factor: float = z_scaling_factor

        # Store the dwell time
        self._dwell_time = dwell_time

        # Initialize the data
        self._data_array = None
        self._data_df = None
        self._data_full_df = None
        self._valid_entries = None

        # Whether the acquisition is 2D or 3D
        self._is_3d: bool = False

        # Whether the acquisition is a tracking dataset
        self._is_tracking: bool = is_tracking

        # Whether to pool the dcr values
        self._pool_dcr = pool_dcr

        # Whether the file contains aggregate measurements
        self._is_aggregated: bool = False

        # Indices dependent on 2D or 3D acquisition and whether the
        # data comes from a localization or a tracking experiment.
        self._reps: int = -1
        self._efo_index: int = -1
        self._cfr_index: int = -1
        self._dcr_index: int = -1
        self._eco_index: int = -1
        self._loc_index: int = -1
        self._valid_cfr: list = []
        self._relocalizations: list = []

        # Constant indices
        self._tid_index: int = 0
        self._tim_index: int = 0
        self._vld_index: int = 0

        # Keep track of the last valid global and CFR iterations as returned
        # by the initial scan
        self._last_valid: int = -1
        self._last_valid_cfr: int = -1

        # Load the file
        if not self._load():
            raise IOError(f"The file {self._filename} is not a valid MINFLUX file.")

    @property
    def is_last_valid(self) -> Union[bool, None]:
        """Return True if the selected iteration is the "last valid", False otherwise.
        If the dataframe has not been processed yet, `is_last_valid` will be None."""
        if self._data_df is None:
            return None
        return self._is_last_valid

    @property
    def z_scaling_factor(self) -> float:
        """Returns the scaling factor for the z coordinates."""
        return self._z_scaling_factor

    @property
    def is_3d(self) -> bool:
        """Returns True is the acquisition is 3D, False otherwise."""
        return self._is_3d

    @property
    def is_aggregated(self) -> bool:
        """Returns True is the acquisition is aggregated, False otherwise."""
        return self._is_aggregated

    @property
    def is_tracking(self) -> bool:
        """Returns True for a tracking acquisition, False otherwise."""
        return self._is_tracking

    @property
    def is_pool_dcr(self) -> bool:
        """Returns True if the DCR values over all relocalized iterations (to use all photons)."""
        return self._pool_dcr

    @property
    def dwell_time(self) -> float:
        """Returns the dwell time."""
        return self._dwell_time

    @property
    def num_valid_entries(self) -> int:
        """Number of valid entries."""
        if self._data_array is None:
            return 0
        return self._valid_entries.sum()

    @property
    def num_invalid_entries(self) -> int:
        """Number of valid entries."""
        if self._data_array is None:
            return 0
        return np.logical_not(self._valid_entries).sum()

    @property
    def valid_cfr(self) -> list:
        """Return the iterations with valid CFR measurements.

        Returns
        -------
        cfr: boolean array with True for the iteration indices
             that have a valid measurement.
        """
        if self._data_array is None:
            return []
        return self._valid_cfr

    @property
    def relocalizations(self) -> list:
        """Return the iterations with relocalizations.

        Returns
        -------
        reloc: boolean array with True for the iteration indices that are relocalized.
        """
        if self._data_array is None:
            return []
        return self._relocalizations

    @property
    def valid_raw_data(self) -> Union[None, np.ndarray]:
        """Return the raw data."""
        if self._data_array is None:
            return None
        return self._data_array[self._valid_entries].copy()

    @property
    def processed_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return the raw data as dataframe (some properties only)."""
        if self._data_df is not None:
            return self._data_df

        self._data_df = self._process()
        return self._data_df

    @property
    def raw_data_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return the raw data as dataframe (some properties only)."""
        if self._data_full_df is not None:
            return self._data_full_df
        self._data_full_df = self._raw_data_to_full_dataframe()
        return self._data_full_df

    @property
    def filename(self) -> Union[Path, None]:
        """Return the filename if set."""
        if self._filename is None:
            return None
        return Path(self._filename)

    def set_indices(self, index, cfr_index, process: bool = True):
        """Set the parameter indices.

        We distinguish between the index of all parameters
        that are always measured and are accessed from the
        same iteration, and the cfr index, that is not
        always measured.

        Parameters
        ----------

        index: int
            Global iteration index for all parameters but cfr

        cfr_index: int
            Iteration index for cfr

        process: bool (Optional, default = True)
            By default, when setting the indices, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Make sure there is loaded data
        if self._data_array is None:
            raise ValueError("No data loaded.")

        if self._reps == -1:
            raise ValueError("No data loaded.")

        if len(self._valid_cfr) == 0:
            raise ValueError("No data loaded.")

        # Check that the arguments are compatible with the loaded data
        if index < 0 or index > self._reps - 1:
            raise ValueError(
                f"The value of index must be between 0 and {self._reps - 1}."
            )

        if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1:
            raise ValueError(
                f"The value of index must be between 0 and {len(self._valid_cfr) - 1}."
            )

        # Now set the general values
        self._efo_index = index
        self._dcr_index = index
        self._eco_index = index
        self._loc_index = index

        # Set the cfr index
        self._cfr_index = cfr_index

        # Constant indices
        self._tid_index: int = 0
        self._tim_index: int = 0
        self._vld_index: int = 0

        # Re-process the file? If the processed dataframe already exists,
        # the processing will take place anyway.
        if process or self._data_df is not None:
            self._process()

    def set_tracking(self, is_tracking: bool, process: bool = True):
        """Sets whether the acquisition is tracking or localization.

        Parameters
        ----------

        is_tracking: bool
            Set to True for a tracking acquisition, False for a localization
            acquisition.

        process: bool (Optional, default = True)
            By default, when setting the tracking flag, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._is_tracking = is_tracking

        # Re-process the file?
        if process or self._data_df is not None:
            self._process()

    def set_dwell_time(self, dwell_time: float, process: bool = True):
        """
        Sets the dwell time.

        Parameters
        ----------
        dwell_time: float
            Dwell time.

        process: bool (Optional, default = True)
            By default, when setting the dwell time, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._dwell_time = dwell_time

        # Re-process the file?
        if process or self._data_df is not None:
            self._process()

    def set_pool_dcr(self, pool_dcr: bool, process: bool = True):
        """
        Sets whether the DCR values should be pooled (and weighted by ECO).

        Parameters
        ----------
        pool_dcr: bool
            Whether the DCR values should be pooled (and weighted by ECO).

        process: bool (Optional, default = True)
            By default, when setting the DCR binning flag, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._pool_dcr = pool_dcr

        # Re-process the file?
        if process or self._data_df is not None:
            self._process()

    @classmethod
    def processed_properties(cls) -> list:
        """Returns the properties read from the file that correspond to the processed dataframe column names."""
        return [
            "tid",
            "tim",
            "x",
            "y",
            "z",
            "efo",
            "cfr",
            "eco",
            "dcr",
            "dwell",
            "fluo",
        ]

    @classmethod
    def raw_properties(cls) -> list:
        """Returns the properties read from the file and dynamic that correspond to the raw dataframe column names."""
        return ["tid", "aid", "vld", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr"]

    def _load(self) -> bool:
        """Load the file."""

        if not self._filename.is_file():
            print(f"File {self._filename} does not exist.")
            return False

        # Call the specialized _load_*() function
        if self._filename.name.lower().endswith(".npy"):
            try:
                data_array = np.load(str(self._filename))
                if "fluo" in data_array.dtype.names:
                    self._data_array = data_array
                else:
                    self._data_array = migrate_npy_array(data_array)
            except (
                OSError,
                UnpicklingError,
                ValueError,
                EOFError,
                FileNotFoundError,
                TypeError,
                Exception,
            ) as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        elif self._filename.name.lower().endswith(".mat"):
            try:
                self._data_array = convert_from_mat(self._filename)
            except Exception as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        elif self._filename.name.lower().endswith(".pmx"):
            try:
                self._data_array = NativeArrayReader().read(self._filename)
                if self._data_array is None:
                    print(f"Could not open {self._filename}.")
                    return False
            except Exception as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        else:
            print(f"Unexpected file {self._filename}.")
            return False

        # Store a logical array with the valid entries
        self._valid_entries = self._data_array["vld"]

        # Cache whether the data is 2D or 3D and whether is aggregated
        # The cases are different for localization vs. tracking experiments
        # num_locs = self._data_array["itr"].shape[1]
        self._is_3d = (
            float(np.nanmean(self._data_array["itr"][:, -1]["loc"][:, -1])) != 0.0
        )

        # Set all relevant indices
        self._set_all_indices()

        # Return success
        return True

    def _read_from_pmx(self) -> Union[np.ndarray, None]:
        """Load the PMX file."""

        # Open the file and read the data
        with h5py.File(self._filename, "r") as f:
            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version != "1.0":
                return False

            # We only read the raw NumPy array
            data_array = f["raw/npy"][:]

        return data_array

    def _process(self) -> Union[None, pd.DataFrame]:
        """Returns processed dataframe for valid (or invalid) entries.

        Returns
        -------

        df: pd.DataFrame
            Processed data as DataFrame.
        """

        # Do we have a data array to work on?
        if self._data_array is None:
            return None

        if self._valid:
            indices = self._valid_entries
        else:
            indices = np.logical_not(self._valid_entries)

        # Extract the valid iterations
        itr = self._data_array["itr"][indices]

        # Extract the valid identifiers
        tid = self._data_array["tid"][indices]

        # Extract the valid time points
        tim = self._data_array["tim"][indices]

        # Extract the fluorophore IDs
        fluo = self._data_array["fluo"][indices]
        if np.all(fluo) == 0:
            fluo = np.ones(fluo.shape, dtype=fluo.dtype)

        # The following extraction pattern will change whether the
        # acquisition is normal or aggregated
        if self.is_aggregated:
            # Extract the locations
            loc = itr["loc"].squeeze() * self._unit_scaling_factor
            loc[:, 2] = loc[:, 2] * self._z_scaling_factor

            # Extract EFO
            efo = itr["efo"]

            # Extract CFR
            cfr = itr["cfr"]

            # Extract ECO
            eco = itr["eco"]

            # Extract DCR
            dcr = itr["dcr"]

            # Dwell
            dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0)

        else:
            # Extract the locations
            loc = itr[:, self._loc_index]["loc"] * self._unit_scaling_factor
            loc[:, 2] = loc[:, 2] * self._z_scaling_factor

            # Extract EFO
            efo = itr[:, self._efo_index]["efo"]

            # Extract CFR
            cfr = itr[:, self._cfr_index]["cfr"]

            # Extract ECO
            eco = itr[:, self._eco_index]["eco"]

            # Pool DCR values?
            if self._pool_dcr and np.sum(self._relocalizations) > 1:

                # Calculate ECO contributions
                eco_all = itr[:, self._relocalizations]["eco"]
                eco_sum = eco_all.sum(axis=1)
                eco_all_norm = eco_all / eco_sum.reshape(-1, 1)

                # Extract DCR values and weigh them by the relative ECO contributions
                dcr = itr[:, self._relocalizations]["dcr"]
                dcr = dcr * eco_all_norm
                dcr = dcr.sum(axis=1)

            else:

                # Extract DCR
                dcr = itr[:, self._dcr_index]["dcr"]

            # Calculate dwell
            dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0)

        # Create a Pandas dataframe for the results
        df = pd.DataFrame(
            index=pd.RangeIndex(start=0, stop=len(tid)),
            columns=MinFluxReader.processed_properties(),
        )

        # Store the extracted valid hits into the dataframe
        df["tid"] = tid
        df["x"] = loc[:, 0]
        df["y"] = loc[:, 1]
        df["z"] = loc[:, 2]
        df["tim"] = tim
        df["efo"] = efo
        df["cfr"] = cfr
        df["eco"] = eco
        df["dcr"] = dcr
        df["dwell"] = dwell
        df["fluo"] = fluo

        # Remove rows with NaNs in the loc matrix
        df = df.dropna(subset=["x"])

        # Check if the selected indices correspond to the last valid iteration
        self._is_last_valid = bool(
            self._cfr_index == self._last_valid_cfr
            and self._efo_index == self._last_valid
        )

        return df

    def _raw_data_to_full_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return raw data arranged into a dataframe."""
        if self._data_array is None:
            return None

        # Initialize output dataframe
        df = pd.DataFrame(columns=MinFluxReader.raw_properties())

        # Allocate space for the columns
        n_rows = len(self._data_array) * self._reps

        # Get all unique TIDs and their counts
        _, tid_counts = np.unique(self._data_array["tid"], return_counts=True)

        # Get all tids (repeated over the repetitions)
        tid = np.repeat(self._data_array["tid"], self._reps)

        # Create virtual IDs to mark the measurements of repeated tids
        # @TODO Optimize this!
        aid = np.zeros((n_rows, 1), dtype=np.int32)
        index = 0
        for c in np.nditer(tid_counts):
            tmp = np.repeat(np.arange(c), self._reps)
            n = len(tmp)
            aid[index : index + n, 0] = tmp
            index += n

        # Get all valid flags (repeated over the repetitions)
        vld = np.repeat(self._data_array["vld"], self._reps)

        # Get all timepoints (repeated over the repetitions)
        tim = np.repeat(self._data_array["tim"], self._reps)

        # Get all localizations (reshaped to drop the first dimension)
        loc = (
            self._data_array["itr"]["loc"].reshape((n_rows, 3))
            * self._unit_scaling_factor
        )
        loc[:, 2] = loc[:, 2] * self._z_scaling_factor

        # Get all efos (reshaped to drop the first dimension)
        efo = self._data_array["itr"]["efo"].reshape((n_rows, 1))

        # Get all cfrs (reshaped to drop the first dimension)
        cfr = self._data_array["itr"]["cfr"].reshape((n_rows, 1))

        # Get all ecos (reshaped to drop the first dimension)
        eco = self._data_array["itr"]["eco"].reshape((n_rows, 1))

        # Get all dcrs (reshaped to drop the first dimension)
        dcr = self._data_array["itr"]["dcr"].reshape((n_rows, 1))

        # Build the dataframe
        df["tid"] = tid.astype(np.int32)
        df["aid"] = aid.astype(np.int32)
        df["vld"] = vld
        df["tim"] = tim
        df["x"] = loc[:, 0]
        df["y"] = loc[:, 1]
        df["z"] = loc[:, 2]
        df["efo"] = efo
        df["cfr"] = cfr
        df["eco"] = eco
        df["dcr"] = dcr

        return df

    def _set_all_indices(self):
        """Set indices of properties to be read."""
        if self._data_array is None:
            return False

        # Number of iterations
        self._reps = self._data_array["itr"].shape[1]

        # Is this an aggregated acquisition?
        if self._reps == 1:
            self._is_aggregated = True
        else:
            self._is_aggregated = False

        # Query the data to find the last valid iteration
        # for all measurements
        last_valid = find_last_valid_iteration(self._data_array)

        # Set the extracted indices
        self._efo_index = last_valid["efo_index"]
        self._cfr_index = last_valid["cfr_index"]
        self._dcr_index = last_valid["dcr_index"]
        self._eco_index = last_valid["eco_index"]
        self._loc_index = last_valid["loc_index"]
        self._valid_cfr = last_valid["valid_cfr"]
        self._relocalizations = last_valid["reloc"]

        # Keep track of the last valid iteration
        self._last_valid = len(self._valid_cfr) - 1
        self._last_valid_cfr = last_valid["cfr_index"]

    def __repr__(self) -> str:
        """String representation of the object."""
        if self._data_array is None:
            return "No file loaded."

        str_valid = (
            "all valid"
            if len(self._data_array) == self.num_valid_entries
            else f"{self.num_valid_entries} valid and {self.num_invalid_entries} non valid"
        )

        str_acq = "3D" if self.is_3d else "2D"
        aggr_str = "aggregated" if self.is_aggregated else "normal"

        return (
            f"File: {self._filename.name}: "
            f"{str_acq} {aggr_str} acquisition with {len(self._data_array)} entries ({str_valid})."
        )

    def __str__(self) -> str:
        """Human-friendly representation of the object."""
        return self.__repr__()

Static methods

def processed_properties() ‑> list

Returns the properties read from the file that correspond to the processed dataframe column names.

Expand source code
@classmethod
def processed_properties(cls) -> list:
    """Returns the properties read from the file that correspond to the processed dataframe column names."""
    return [
        "tid",
        "tim",
        "x",
        "y",
        "z",
        "efo",
        "cfr",
        "eco",
        "dcr",
        "dwell",
        "fluo",
    ]
def raw_properties() ‑> list

Returns the properties read from the file and dynamic that correspond to the raw dataframe column names.

Expand source code
@classmethod
def raw_properties(cls) -> list:
    """Returns the properties read from the file and dynamic that correspond to the raw dataframe column names."""
    return ["tid", "aid", "vld", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr"]

Instance variables

var dwell_time : float

Returns the dwell time.

Expand source code
@property
def dwell_time(self) -> float:
    """Returns the dwell time."""
    return self._dwell_time
var filename : Optional[pathlib.Path]

Return the filename if set.

Expand source code
@property
def filename(self) -> Union[Path, None]:
    """Return the filename if set."""
    if self._filename is None:
        return None
    return Path(self._filename)
var is_3d : bool

Returns True is the acquisition is 3D, False otherwise.

Expand source code
@property
def is_3d(self) -> bool:
    """Returns True is the acquisition is 3D, False otherwise."""
    return self._is_3d
var is_aggregated : bool

Returns True is the acquisition is aggregated, False otherwise.

Expand source code
@property
def is_aggregated(self) -> bool:
    """Returns True is the acquisition is aggregated, False otherwise."""
    return self._is_aggregated
var is_last_valid : Optional[bool]

Return True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet, is_last_valid will be None.

Expand source code
@property
def is_last_valid(self) -> Union[bool, None]:
    """Return True if the selected iteration is the "last valid", False otherwise.
    If the dataframe has not been processed yet, `is_last_valid` will be None."""
    if self._data_df is None:
        return None
    return self._is_last_valid
var is_pool_dcr : bool

Returns True if the DCR values over all relocalized iterations (to use all photons).

Expand source code
@property
def is_pool_dcr(self) -> bool:
    """Returns True if the DCR values over all relocalized iterations (to use all photons)."""
    return self._pool_dcr
var is_tracking : bool

Returns True for a tracking acquisition, False otherwise.

Expand source code
@property
def is_tracking(self) -> bool:
    """Returns True for a tracking acquisition, False otherwise."""
    return self._is_tracking
var num_invalid_entries : int

Number of valid entries.

Expand source code
@property
def num_invalid_entries(self) -> int:
    """Number of valid entries."""
    if self._data_array is None:
        return 0
    return np.logical_not(self._valid_entries).sum()
var num_valid_entries : int

Number of valid entries.

Expand source code
@property
def num_valid_entries(self) -> int:
    """Number of valid entries."""
    if self._data_array is None:
        return 0
    return self._valid_entries.sum()
var processed_dataframe : Optional[None]

Return the raw data as dataframe (some properties only).

Expand source code
@property
def processed_dataframe(self) -> Union[None, pd.DataFrame]:
    """Return the raw data as dataframe (some properties only)."""
    if self._data_df is not None:
        return self._data_df

    self._data_df = self._process()
    return self._data_df
var raw_data_dataframe : Optional[None]

Return the raw data as dataframe (some properties only).

Expand source code
@property
def raw_data_dataframe(self) -> Union[None, pd.DataFrame]:
    """Return the raw data as dataframe (some properties only)."""
    if self._data_full_df is not None:
        return self._data_full_df
    self._data_full_df = self._raw_data_to_full_dataframe()
    return self._data_full_df
var relocalizations : list

Return the iterations with relocalizations.

Returns

reloc: boolean array with True for the iteration indices that are relocalized.

Expand source code
@property
def relocalizations(self) -> list:
    """Return the iterations with relocalizations.

    Returns
    -------
    reloc: boolean array with True for the iteration indices that are relocalized.
    """
    if self._data_array is None:
        return []
    return self._relocalizations
var valid_cfr : list

Return the iterations with valid CFR measurements.

Returns

cfr : boolean array with True for the iteration indices
that have a valid measurement.
Expand source code
@property
def valid_cfr(self) -> list:
    """Return the iterations with valid CFR measurements.

    Returns
    -------
    cfr: boolean array with True for the iteration indices
         that have a valid measurement.
    """
    if self._data_array is None:
        return []
    return self._valid_cfr
var valid_raw_data : Optional[numpy.ndarray]

Return the raw data.

Expand source code
@property
def valid_raw_data(self) -> Union[None, np.ndarray]:
    """Return the raw data."""
    if self._data_array is None:
        return None
    return self._data_array[self._valid_entries].copy()
var z_scaling_factor : float

Returns the scaling factor for the z coordinates.

Expand source code
@property
def z_scaling_factor(self) -> float:
    """Returns the scaling factor for the z coordinates."""
    return self._z_scaling_factor

Methods

def set_dwell_time(self, dwell_time: float, process: bool = True)

Sets the dwell time.

Parameters

dwell_time : float
Dwell time.
process : bool (Optional, default = True)
By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_dwell_time(self, dwell_time: float, process: bool = True):
    """
    Sets the dwell time.

    Parameters
    ----------
    dwell_time: float
        Dwell time.

    process: bool (Optional, default = True)
        By default, when setting the dwell time, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._dwell_time = dwell_time

    # Re-process the file?
    if process or self._data_df is not None:
        self._process()
def set_indices(self, index, cfr_index, process: bool = True)

Set the parameter indices.

We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured.

Parameters

index : int
Global iteration index for all parameters but cfr
cfr_index : int
Iteration index for cfr
process : bool (Optional, default = True)
By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_indices(self, index, cfr_index, process: bool = True):
    """Set the parameter indices.

    We distinguish between the index of all parameters
    that are always measured and are accessed from the
    same iteration, and the cfr index, that is not
    always measured.

    Parameters
    ----------

    index: int
        Global iteration index for all parameters but cfr

    cfr_index: int
        Iteration index for cfr

    process: bool (Optional, default = True)
        By default, when setting the indices, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Make sure there is loaded data
    if self._data_array is None:
        raise ValueError("No data loaded.")

    if self._reps == -1:
        raise ValueError("No data loaded.")

    if len(self._valid_cfr) == 0:
        raise ValueError("No data loaded.")

    # Check that the arguments are compatible with the loaded data
    if index < 0 or index > self._reps - 1:
        raise ValueError(
            f"The value of index must be between 0 and {self._reps - 1}."
        )

    if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1:
        raise ValueError(
            f"The value of index must be between 0 and {len(self._valid_cfr) - 1}."
        )

    # Now set the general values
    self._efo_index = index
    self._dcr_index = index
    self._eco_index = index
    self._loc_index = index

    # Set the cfr index
    self._cfr_index = cfr_index

    # Constant indices
    self._tid_index: int = 0
    self._tim_index: int = 0
    self._vld_index: int = 0

    # Re-process the file? If the processed dataframe already exists,
    # the processing will take place anyway.
    if process or self._data_df is not None:
        self._process()
def set_pool_dcr(self, pool_dcr: bool, process: bool = True)

Sets whether the DCR values should be pooled (and weighted by ECO).

Parameters

pool_dcr : bool
Whether the DCR values should be pooled (and weighted by ECO).
process : bool (Optional, default = True)
By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_pool_dcr(self, pool_dcr: bool, process: bool = True):
    """
    Sets whether the DCR values should be pooled (and weighted by ECO).

    Parameters
    ----------
    pool_dcr: bool
        Whether the DCR values should be pooled (and weighted by ECO).

    process: bool (Optional, default = True)
        By default, when setting the DCR binning flag, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._pool_dcr = pool_dcr

    # Re-process the file?
    if process or self._data_df is not None:
        self._process()
def set_tracking(self, is_tracking: bool, process: bool = True)

Sets whether the acquisition is tracking or localization.

Parameters

is_tracking : bool
Set to True for a tracking acquisition, False for a localization acquisition.
process : bool (Optional, default = True)
By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_tracking(self, is_tracking: bool, process: bool = True):
    """Sets whether the acquisition is tracking or localization.

    Parameters
    ----------

    is_tracking: bool
        Set to True for a tracking acquisition, False for a localization
        acquisition.

    process: bool (Optional, default = True)
        By default, when setting the tracking flag, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._is_tracking = is_tracking

    # Re-process the file?
    if process or self._data_df is not None:
        self._process()
class NativeArrayReader

Reads the native NumPy array from .pmx files.

Expand source code
class NativeArrayReader:
    """Reads the native NumPy array from `.pmx` files."""

    @staticmethod
    def read(filename: Union[Path, str]):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        # Open the file and read the data
        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version != "1.0" and file_version != "2.0":
                return None

            # We only read the raw NumPy array
            data_array = f["raw/npy"][:]

        return data_array

Static methods

def read(filename: Union[pathlib.Path, str])

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
Expand source code
@staticmethod
def read(filename: Union[Path, str]):
    """Constructor.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    # Open the file and read the data
    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version != "1.0" and file_version != "2.0":
            return None

        # We only read the raw NumPy array
        data_array = f["raw/npy"][:]

    return data_array
class NativeDataFrameReader

Reads the Pandas DataFrame from .pmx files.

Expand source code
class NativeDataFrameReader:
    """Reads the Pandas DataFrame from `.pmx` files."""

    @staticmethod
    def read(filename: Union[Path, str]):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version != "1.0" and file_version != "2.0":
                return None

            # Read dataset
            dataset = f["/paraview/dataframe"]

            # Read the NumPy data
            data_array = dataset[:]

            # Read column names
            column_names = dataset.attrs["column_names"]

            # Read column data types
            column_types = dataset.attrs["column_types"]

            # Read the index
            index_data = f["/paraview/dataframe_index"][:]

            # Create DataFrame with specified columns
            df = pd.DataFrame(data_array, index=index_data, columns=column_names)

            # Apply column data types
            for col, dtype in zip(column_names, column_types):
                df[col] = df[col].astype(dtype)

        return df

Static methods

def read(filename: Union[pathlib.Path, str])

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
Expand source code
@staticmethod
def read(filename: Union[Path, str]):
    """Constructor.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version != "1.0" and file_version != "2.0":
            return None

        # Read dataset
        dataset = f["/paraview/dataframe"]

        # Read the NumPy data
        data_array = dataset[:]

        # Read column names
        column_names = dataset.attrs["column_names"]

        # Read column data types
        column_types = dataset.attrs["column_types"]

        # Read the index
        index_data = f["/paraview/dataframe_index"][:]

        # Create DataFrame with specified columns
        df = pd.DataFrame(data_array, index=index_data, columns=column_names)

        # Apply column data types
        for col, dtype in zip(column_names, column_types):
            df[col] = df[col].astype(dtype)

    return df
class NativeMetadataReader

Reads metadata information from .pmx files.

Expand source code
class NativeMetadataReader:
    """Reads metadata information from `.pmx` files."""

    @staticmethod
    def scan(filename: Union[Path, str]):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        # Open the file
        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version != "1.0" and file_version != "2.0":
                return None

            # Version 1 parameters
            try:
                z_scaling_factor = float(f["parameters/z_scaling_factor"][()])
            except KeyError:
                return None

            try:
                min_trace_length = int(f["parameters/min_trace_length"][()])
            except KeyError:
                return None

            try:
                efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:])
            except KeyError as e:
                efo_thresholds = None
            try:
                cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:])
            except KeyError as e:
                cfr_thresholds = None

            try:
                num_fluorophores = int(f["parameters/num_fluorophores"][()])
            except KeyError:
                return None

            # Version 2.0 parameters
            if file_version == "2.0":

                try:
                    # This setting can be missing
                    tr_len_thresholds = tuple(
                        f["parameters/applied_tr_len_thresholds"][:]
                    )
                except KeyError as e:
                    tr_len_thresholds = None

                try:
                    dwell_time = float(f["parameters/dwell_time"][()])
                except KeyError as e:
                    return None

                try:
                    # This setting can be missing
                    time_thresholds = tuple(f["parameters/applied_time_thresholds"][:])
                except KeyError as e:
                    time_thresholds = None

                # HDF5 does not have a native boolean type, so we save as int8 and convert it
                # back to boolean on read.
                try:
                    is_tracking = bool(f["parameters/is_tracking"][()])
                except KeyError as e:
                    return None

                try:
                    pool_dcr = bool(f["parameters/pool_dcr"][()])
                except KeyError as e:
                    # This is an addendum to version 2.0, and we allow it to be missing.
                    # It will fall back to False.
                    pool_dcr = False

                try:
                    scale_bar_size = float(f["parameters/scale_bar_size"][()])
                except KeyError as e:
                    return None

            else:
                tr_len_thresholds = None
                time_thresholds = None
                dwell_time = 1.0
                is_tracking = False
                pool_dcr = False
                scale_bar_size = 500

        # Store and return
        metadata = NativeMetadata(
            pool_dcr=pool_dcr,
            cfr_thresholds=cfr_thresholds,
            dwell_time=dwell_time,
            efo_thresholds=efo_thresholds,
            is_tracking=is_tracking,
            min_trace_length=min_trace_length,
            num_fluorophores=num_fluorophores,
            scale_bar_size=scale_bar_size,
            time_thresholds=time_thresholds,
            tr_len_thresholds=tr_len_thresholds,
            z_scaling_factor=z_scaling_factor,
        )

        return metadata

Static methods

def scan(filename: Union[pathlib.Path, str])

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
Expand source code
@staticmethod
def scan(filename: Union[Path, str]):
    """Constructor.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    # Open the file
    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version != "1.0" and file_version != "2.0":
            return None

        # Version 1 parameters
        try:
            z_scaling_factor = float(f["parameters/z_scaling_factor"][()])
        except KeyError:
            return None

        try:
            min_trace_length = int(f["parameters/min_trace_length"][()])
        except KeyError:
            return None

        try:
            efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:])
        except KeyError as e:
            efo_thresholds = None
        try:
            cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:])
        except KeyError as e:
            cfr_thresholds = None

        try:
            num_fluorophores = int(f["parameters/num_fluorophores"][()])
        except KeyError:
            return None

        # Version 2.0 parameters
        if file_version == "2.0":

            try:
                # This setting can be missing
                tr_len_thresholds = tuple(
                    f["parameters/applied_tr_len_thresholds"][:]
                )
            except KeyError as e:
                tr_len_thresholds = None

            try:
                dwell_time = float(f["parameters/dwell_time"][()])
            except KeyError as e:
                return None

            try:
                # This setting can be missing
                time_thresholds = tuple(f["parameters/applied_time_thresholds"][:])
            except KeyError as e:
                time_thresholds = None

            # HDF5 does not have a native boolean type, so we save as int8 and convert it
            # back to boolean on read.
            try:
                is_tracking = bool(f["parameters/is_tracking"][()])
            except KeyError as e:
                return None

            try:
                pool_dcr = bool(f["parameters/pool_dcr"][()])
            except KeyError as e:
                # This is an addendum to version 2.0, and we allow it to be missing.
                # It will fall back to False.
                pool_dcr = False

            try:
                scale_bar_size = float(f["parameters/scale_bar_size"][()])
            except KeyError as e:
                return None

        else:
            tr_len_thresholds = None
            time_thresholds = None
            dwell_time = 1.0
            is_tracking = False
            pool_dcr = False
            scale_bar_size = 500

    # Store and return
    metadata = NativeMetadata(
        pool_dcr=pool_dcr,
        cfr_thresholds=cfr_thresholds,
        dwell_time=dwell_time,
        efo_thresholds=efo_thresholds,
        is_tracking=is_tracking,
        min_trace_length=min_trace_length,
        num_fluorophores=num_fluorophores,
        scale_bar_size=scale_bar_size,
        time_thresholds=time_thresholds,
        tr_len_thresholds=tr_len_thresholds,
        z_scaling_factor=z_scaling_factor,
    )

    return metadata