Module pyminflux.reader

Readers of MINFLUX data.

Sub-modules

pyminflux.reader.metadata
pyminflux.reader.util

Classes

class MSRReader (filename: pathlib.Path | str)
Expand source code
class MSRReader:
    """Reads data and metadata information from `.MSR` (OBF format) files.

    For documentation, see:
    https://imspectordocs.readthedocs.io/en/latest/fileformat.html#the-obf-file-format

    Note: binary data is stored in little-endian order.
    """

    def __init__(self, filename: Union[Path, str]):
        """Constructor.

        Parameters
        ----------
        filename: Union[Path, str]
            Full path to the file name to open.
        """

        # Store the filename
        self.filename = Path(filename)

        # File header
        self.obf_file_header = OBFFileHeader()

        # Metadata
        self.obf_file_metadata = OBFFileMetadata()

        # List of stack metadata objects
        self._obf_stacks_list: list[OBFStackMetadata] = []

    def scan(self) -> bool:
        """Scan the metadata of the file.

        Returns
        -------

        success: bool
            True if the file was scanned successfully, False otherwise.
        """

        # Open the file
        with open(self.filename, mode="rb") as f:

            if not self._read_obf_header(f):
                return False

            # Scan metadata
            self.obf_file_metadata = self._scan_metadata(f, self.obf_file_header)

            # Get the first stack position
            next_stack_pos = self.obf_file_header.first_stack_pos

            while next_stack_pos != 0:

                # Scan the next stack
                success, obs_stack_metadata = self._read_obf_stack(f, next_stack_pos)

                if not success:
                    return False

                # Append current stack header
                self._obf_stacks_list.append(obs_stack_metadata)

                # Do we have a next header to parse?
                next_stack_pos = obs_stack_metadata.next_stack_pos

        return True

    def __getitem__(self, stack_index: int) -> Union[OBFStackMetadata, None]:
        """Allows accessing the reader with the `[]` notation to get the next stack metadata.

        Parameters
        ----------

        stack_index: int
            Index of the stack to be retrieved.

        Returns
        -------

        metadata: Union[OBFStackMetadata, None]
            Metadata for the requested stack, or None if no file was loaded.
        """

        # Is anything loaded?
        if len(self._obf_stacks_list) == 0:
            return None

        if stack_index < 0 or stack_index > (len(self._obf_stacks_list) - 1):
            raise ValueError(f"Index value {stack_index} is out of bounds.")

        # Get and return the metadata
        metadata = self._obf_stacks_list[stack_index]
        return metadata

    def __iter__(self):
        """Return the iterator.

        Returns
        -------

            iterator
        """
        self._current_index = 0
        return self

    def __next__(self):
        if self._current_index < len(self._obf_stacks_list):
            metadata = self.__getitem__(self._current_index)
            self._current_index += 1
            return metadata
        else:
            raise StopIteration

    @property
    def num_stacks(self):
        """Return the number of stacks contained in the file."""
        return len(self._obf_stacks_list)

    def get_data_physical_sizes(
        self, stack_index: int, scaled: bool = True
    ) -> Union[list, None]:
        """Returns the (scaled) data physical size for the requested stack.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        scaled: bool
            If scaled is True, the physical sizes will be scaled by the corresponding scale factors
            as reported by MSRReader.get_data_units().

        Returns
        -------

        offsets: Union[list, None]
            Physical sizes for 2D images, None otherwise.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        if obf_stack_metadata is None:
            return None

        # Get the physical lengths
        phys_lengths = obf_stack_metadata.physical_lengths[: obf_stack_metadata.rank]

        # Do we need to scale?
        if scaled:
            _, factors = self.get_data_units(stack_index=stack_index)
            for i, factor in enumerate(factors):
                if factor != 1.0:
                    phys_lengths[i] *= factor

        # Return the physical lengths as list
        return phys_lengths

    def get_data_offsets(
        self, stack_index: int, scaled: bool = True
    ) -> Union[list, None]:
        """Returns the (scaled) data offsets for the requested stack.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        scaled: bool
            If scaled is True, the offsets will be scaled by the corresponding scale factors
            as reported by MSRReader.get_data_units().

        Returns
        -------

        offsets: Union[list, None]
            Offsets for 2D images, None otherwise.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        if obf_stack_metadata is None:
            return None

        # Get the offsets
        offsets = obf_stack_metadata.physical_offsets[: obf_stack_metadata.rank]

        # Do we need to scale?
        if scaled:
            _, factors = self.get_data_units(stack_index=stack_index)
            for i, factor in enumerate(factors):
                if factor != 1.0:
                    offsets[i] *= factor

        return offsets

    def get_data_pixel_sizes(
        self, stack_index: int, scaled: bool = True
    ) -> Union[list, None]:
        """Returns the (scaled) data pixel size for the requested stack.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        scaled: bool
            If scaled is True, the pixel sizes will be scaled by the corresponding scale factors
            as reported by MSRReader.get_data_units().

        Returns
        -------

        offsets: Union[list, None]
            Pixel sizes for 2D images, None otherwise.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        if obf_stack_metadata is None:
            return None

        # Get the physical sizes
        phys_lengths = self.get_data_physical_sizes(
            stack_index=stack_index, scaled=scaled
        )

        # Get the number of pixels along each dimension
        num_pixels = obf_stack_metadata.num_pixels[: obf_stack_metadata.rank]

        # Now divide by the image size
        pixel_sizes = np.array(phys_lengths) / np.array(num_pixels)

        # Return the pixel size as list
        return pixel_sizes.tolist()

    def get_data_units(self, stack_index: int) -> Union[tuple[list, list], None]:
        """Returns the data units and scale factors per dimension for requested stack.

        Units are one of:
            "m": meters
            "kg": kilograms
            "s": s
            "A": Amperes
            "K": Kelvin
            "mol": moles
            "cd": candela
            "r": radian
            "sr": sr

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        Returns
        -------

        unit: Union[tuple[list, list], None]
            List of units and list of scale factors, or None if no file was opened.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        if obf_stack_metadata is None:
            return None

        units = []
        scale_factors = []
        for dim in range(obf_stack_metadata.rank):
            dimensions = obf_stack_metadata.si_dimensions[dim]
            scale_factors.append(dimensions.scale_factor)
            for i, exponent in enumerate(dimensions.exponents):
                if i == 0 and exponent.numerator > 0:
                    units.append("m")
                    break
                elif i == 1 and exponent.numerator > 0:
                    units.append("kg")
                    break
                elif i == 2 and exponent.numerator > 0:
                    units.append("s")
                    break
                elif i == 3 and exponent.numerator > 0:
                    units.append("A")
                    break
                elif i == 4 and exponent.numerator > 0:
                    units.append("K")
                    break
                elif i == 5 and exponent.numerator > 0:
                    units.append("mol")
                    break
                elif i == 6 and exponent.numerator > 0:
                    units.append("cd")
                    break
                elif i == 7 and exponent.numerator > 0:
                    units.append("r")
                    break
                elif i == 8 and exponent.numerator > 0:
                    units.append("sr")
                    break
                else:
                    units.append("")
                    break

        # Return the extracted units and scale factors
        return units, scale_factors

    def get_data(self, stack_index: int) -> Union[np.ndarray, None]:
        """Read the data for requested stack: only images are returned.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        Returns
        -------

        frame: Union[np.ndarray, None]
            Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        # Currently, we only support format 6 and newer
        if obf_stack_metadata.format_version < 6:
            print("Reading data is supported only for stack format 6 and newer.")
            return None

        # If there are chunks, we currently do not read
        if obf_stack_metadata.num_chunk_positions > 0:
            print("Reading chunked data is currently not supported.")
            return None

        # We currently only read 2D images
        if self._get_num_dims(obf_stack_metadata.num_pixels) != 2:
            print("Only 2D images are currently supported.")
            return None

        # Get NumPy data type
        np_data_type, _ = self._get_numpy_data_type(
            obf_stack_metadata.data_type_on_disk
        )
        if np_data_type is None:
            print("Unsupported data type.")
            return None

        # Extract some info
        height = obf_stack_metadata.num_pixels[1]
        width = obf_stack_metadata.num_pixels[0]
        bytes_per_sample = obf_stack_metadata.bytes_per_sample

        # Expected number of (decompressed) samples
        expected_num_samples = width * height

        # Number of written bytes
        written_bytes = obf_stack_metadata.samples_written * bytes_per_sample

        # Open the file
        with open(self.filename, mode="rb") as f:

            # Seek to the beginning of the data
            f.seek(obf_stack_metadata.data_start_position)

            # Is there compression?
            if obf_stack_metadata.compression_type != 0:

                # Read the bytes
                compressed_data = f.read(written_bytes)

                # Decompress them
                decompressed_data = zlib.decompress(compressed_data)

                # Cast to a "byte" NumPy array
                raw_frame = np.frombuffer(decompressed_data, dtype=np.uint8)

            else:

                # Read the bytes
                raw_data = f.read(written_bytes)

                # Cast to a "byte" NumPy array
                raw_frame = np.frombuffer(raw_data, dtype=np.uint8)

        # Reinterpret as final data type format (little Endian)
        frame = raw_frame.view(np.dtype(np_data_type))

        # Make sure the final frame size matches the expected size
        if len(frame) != expected_num_samples:
            print("Unexpected length of data retrieved!")
            return None

        # Reshape
        frame = frame.reshape((height, width))

        return frame

    def get_ome_xml_metadata(self) -> Union[str, None]:
        """Return the OME XML metadata.

        Returns
        -------

        ome_xml_metadata: Union[str, None]
            OME XML metadata as formatted string. If no file was loaded, returns None.
        """

        # Get the ome-xml tree
        root = self.obf_file_metadata.tree
        if root is None:
            return None

        # Return metadata as formatted XML string
        return self._tree_to_formatted_xml(root)

    def export_ome_xml_metadata(self, file_name: Union[str, Path]):
        """Export the OME-XML metadata to file.

        Parameters
        ----------

        file_name: Union[str, Path]
            Output file name.
        """

        # Get the ome-xml tree, optionally as formatted string
        metadata = self.get_ome_xml_metadata()
        if metadata is None:
            print("Nothing to export.")
            return

        # Make sure the parent path to the file exists
        Path(file_name).parent.mkdir(parents=True, exist_ok=True)

        # Save to file
        with open(file_name, "w", encoding="utf-8") as f:
            f.write(metadata)

    def get_tag_dictionary(self, stack_index: int) -> Union[dict, None]:
        """Return the tag dictionary for the requested stack.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to return the tag dictionary.

        Returns
        -------

        tag_dictionary: Union[dict, None]
            Dictionary. If no file was loaded, returns None.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"Stack number {stack_index} is out of range.")

        # Get stack metadata
        obf_stack_metadata = self._obf_stacks_list[stack_index]
        if obf_stack_metadata is None:
            return None

        # Get the tag dictionary
        tag = obf_stack_metadata.tag_dictionary

        # Return the tag dictionary
        return tag

    def export_tag_dictionary(self, stack_index: int, file_name: Union[str, Path]):
        """Export the tag dictionary to file.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to export the tag dictionary.

        file_name: Union[str, Path]
            Output file name.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"Stack number {stack_index} is out of range.")

        # Get tag dictionary
        tag_dictionary = self.get_tag_dictionary(stack_index)
        if tag_dictionary is None:
            return None

        # Make sure file_name is of type Path
        file_name = Path(file_name)

        # Make sure the parent path to the file exists
        file_name.parent.mkdir(parents=True, exist_ok=True)

        # Export the dictionaries
        for key, value in tag_dictionary.items():
            if type(value) is ET.Element:
                mod_file_name = file_name.parent / f"{file_name.stem}_{key}.xml"
                xml_str = self._tree_to_formatted_xml(value)
                with open(mod_file_name, "w") as f:
                    f.write(xml_str)
            elif type(value) is dict:
                mod_file_name = file_name.parent / f"{file_name.stem}_{key}.json"
                with open(mod_file_name, "w") as f:
                    json.dump(value, f, indent=4)
            else:
                mod_file_name = file_name.parent / f"{file_name.stem}_{key}.txt"
                with open(mod_file_name, "w") as f:
                    f.write(value)

    @staticmethod
    def _tree_to_formatted_xml(root: ET, xml_declaration: bool = True) -> str:
        """Converts an xml. tree to formatted xml.

        Parameters
        ----------

        root: xml.etree.ElementTree
            Root element of the xml tree.

        xml_declaration: bool
            Whether to prepend the xml declaration to the converted xml.

        Returns
        -------

        xml_str: str
            Formatted xml.
        """

        # Format tree (optionally add xml declaration)
        xml_str = ET.tostring(
            root, encoding="utf-8", xml_declaration=xml_declaration, method="xml"
        ).decode("utf-8")

        # Remove tabs and new lines
        xml_str = re.sub(r"[\n\t]", "", xml_str)

        # Remove stretches of blank spaces between nodes
        xml_str = re.sub(r">\s+<", "><", xml_str)

        return xml_str

    @staticmethod
    def _get_footer_struct_size(version: int) -> int:
        """Returns the size in pixel of the footer structure for given version.

        Parameters
        ----------

        version: int
            Version number.

        Returns
        -------

        size: int
            Size in bytes of the footer structure for specified version.
        """
        if version == 0:
            return 0
        elif version == 1:
            return _Constants.V1A_FOOTER_LENGTH  # We return version "1A"
        elif version == 2:
            return _Constants.V2_FOOTER_LENGTH
        elif version == 3:
            return _Constants.V3_FOOTER_LENGTH
        elif version == 4:
            return _Constants.V4_FOOTER_LENGTH
        elif version == 5:
            return _Constants.V5A_FOOTER_LENGTH  # We return version "5A"
        elif version == 6:
            return _Constants.V6_FOOTER_LENGTH
        elif version == 7:
            return _Constants.V7_FOOTER_LENGTH
        else:
            raise ValueError(f"Unexpected stack version {version}.")

    @staticmethod
    def _get_num_dims(num_pixels: list[uint32]):
        """Return the number of dimensions of the data.

        Parameters
        ----------

        num_pixels: list[uint32]
            List of number of pixels per dimension.

        Returns
        -------

        n_dims: int
            Number of dimensions for which the number of pixels is larger than 1.
        """
        n_dims = int(np.sum(np.array(num_pixels) > 1))
        return n_dims

    @staticmethod
    def _get_numpy_data_type(
        data_type_on_disk: uint32,
    ) -> tuple[Union[np.dtype, None], Union[str, None]]:
        """Get the NumPy data type corresponding to the stored datatype.

        Parameters
        ----------

        data_type_on_disk: uint32
            UInt32 value from the stack metadata indicating the type of the data.

        Returns
        -------

        numpy_type: np.dtype
            Numpy dtype class. If the data type is not supported, returns None instead.

        str_type: str
            Type string (little endian). If the data type is not supported, returns None instead.
        """
        if data_type_on_disk == 0x00000001:
            return np.uint8, "<u1"
        elif data_type_on_disk == 0x00000002:
            return np.int8, "<i1"
        elif data_type_on_disk == 0x00000004:
            return np.uint16, "<u2"
        elif data_type_on_disk == 0x00000008:
            return np.int16, "<i2"
        elif data_type_on_disk == 0x00000010:
            return np.uint32, "<u4"
        elif data_type_on_disk == 0x00000020:
            return np.int32, "<i4"
        elif data_type_on_disk == 0x00000040:
            return np.float32, "<f4"
        elif data_type_on_disk == 0x00000080:
            return np.float64, "<f8"
        elif data_type_on_disk == 0x00001000:
            return np.uint64, "<u8"
        elif data_type_on_disk == 0x00002000:
            return np.int64, "<i8"
        else:
            return None, None

    def _read_obf_header(self, f: BinaryIO) -> bool:
        """Read the OBF header.

        Parameters
        ----------

        f: BinaryIO
            Open file handle.

        Returns
        -------
        success: bool
            True if reading the file header was successful, False otherwise.
        """

        # Read the magic header
        magic_header = f.read(10)

        if not magic_header == b"OMAS_BF\n\xff\xff":
            print("Not a valid MSR (OBF) file.")
            return False

        # Store the magic header
        self.obf_file_header.magic_header = magic_header

        # Get format version (uint32)
        self.obf_file_header.format_version = struct.unpack("<I", f.read(4))[0]

        if self.obf_file_header.format_version < 2:
            print("The MSR (OBF) file must be version 2 or above.")
            return False

        # Get position of the first stack header in the file (uint64)
        self.obf_file_header.first_stack_pos = struct.unpack("<Q", f.read(8))[0]

        # Get length of following utf-8 description (uint32)
        self.obf_file_header.descr_len = struct.unpack("<I", f.read(4))[0]

        # Get description (bytes -> utf-8)
        description = ""
        if self.obf_file_header.descr_len > 0:
            description = f.read(self.obf_file_header.descr_len).decode(
                "utf-8", errors="replace"
            )
        self.obf_file_header.description = description

        # Get metadata position (uint64)
        self.obf_file_header.meta_data_position = struct.unpack("<Q", f.read(8))[0]

        return True

    def _read_obf_stack(
        self, f: BinaryIO, next_stack_pos: int
    ) -> tuple[bool, OBFStackMetadata]:
        """Read current OBF stack metadata (header + footer).

        Parameters
        ----------

        f: BinaryIO
            Open file handle.

        next_stack_pos: int
            Position in file where the next stack starts.

        Returns
        -------

        success: bool
            Whether parsing was successful.

        obf_stack_metadata: OBFStackMetadata
            OFBStackMetadata object.
        """

        # Initialize the metadata
        obf_stack_metadata = OBFStackMetadata()

        # Move at the beginning of the stack
        f.seek(next_stack_pos)

        # Read the header
        success, obf_stack_metadata = self._read_obf_stack_header(f, obf_stack_metadata)
        if not success:
            return False, obf_stack_metadata

        # Process the footer
        obf_stack_metadata = self._read_obf_stack_footer(f, obf_stack_metadata)

        # Return
        return True, obf_stack_metadata

    def _read_obf_stack_header(
        self, f: BinaryIO, obf_stack_metadata: OBFStackMetadata
    ) -> tuple[bool, OBFStackMetadata]:
        """Read the OBF stack header and update metadata.

        The file should already be positioned at the right location.

        Parameters
        ----------

        f: BinaryIO
            File handle to open.

        obf_stack_metadata: OBFStackMetadata
            Current OFBStackMetadata object

        Returns
        -------

        success: bool
            Whether parsing was successful.

        obf_stack_metadata: OBFStackMetadata
            Updated OFBStackMetadata object
        """

        # Read the magic header
        obf_stack_metadata.magic_header = f.read(16)

        if not obf_stack_metadata.magic_header == b"OMAS_BF_STACK\n\xff\xff":
            print("Could not find OBF stack header.")
            return False, obf_stack_metadata

        # Get format version (uint32)
        obf_stack_metadata.format_version = struct.unpack("<I", f.read(4))[0]

        # Get the number of valid dimensions
        obf_stack_metadata.rank = struct.unpack("<I", f.read(4))[0]

        # Get the number of pixels along each dimension
        obf_stack_metadata.num_pixels = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            n = struct.unpack("<I", f.read(4))[0]
            if i < obf_stack_metadata.rank:
                obf_stack_metadata.num_pixels.append(n)

        # Get the physical lengths along each dimension
        obf_stack_metadata.physical_lengths = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            p = struct.unpack("<d", f.read(8))[0]
            if i < obf_stack_metadata.rank:
                obf_stack_metadata.physical_lengths.append(p)

        # Get the physical lengths along each dimension
        obf_stack_metadata.physical_offsets = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            o = struct.unpack("<d", f.read(8))[0]
            if i < obf_stack_metadata.rank:
                obf_stack_metadata.physical_offsets.append(o)

        # Read the data type; it should be one of:
        # 0x00000000: automatically determine the data type
        # 0x00000001: uint8
        # 0x00000002: int8
        # 0x00000004: uint16
        # 0x00000008: int16
        # 0x00000010: uint32
        # 0x00000020: int32
        # 0x00000040: float32
        # 0x00000080: float64 (double)
        # 0x00000400: Byte RGB, 3 samples per pixel
        # 0x00000800: Byte RGB, 4 samples per pixel
        # 0x00001000: uint64
        # 0x00002000: int64
        # 0x00010000: (c++) boolean
        #
        # Note: all numeric formats have a complex-number variant with
        # format: data_type | 0x40000000
        obf_stack_metadata.data_type_on_disk = struct.unpack("<I", f.read(4))[0]
        obf_stack_metadata.bytes_per_sample = self._get_bytes_per_sample_from_data_type(
            obf_stack_metadata.data_type_on_disk
        )

        # Compression type (0 for none, 1 for zip)
        obf_stack_metadata.compression_type = struct.unpack("<I", f.read(4))[0]

        # Compression level (0 through 9)
        obf_stack_metadata.compression_level = struct.unpack("<I", f.read(4))[0]

        # Length of the stack name
        obf_stack_metadata.length_stack_name = struct.unpack("<I", f.read(4))[0]

        # Description length
        obf_stack_metadata.length_stack_description = struct.unpack("<I", f.read(4))[0]

        # Reserved field
        obf_stack_metadata.reserved = struct.unpack("<Q", f.read(8))[0]

        # Data length on disk
        obf_stack_metadata.data_len_disk = struct.unpack("<Q", f.read(8))[0]

        # Next stack position in the file
        obf_stack_metadata.next_stack_pos = struct.unpack("<Q", f.read(8))[0]

        # Scan also stack name and description (right after the end of the header)
        obf_stack_metadata.stack_name = (
            ""
            if obf_stack_metadata.length_stack_name == 0
            else f.read(obf_stack_metadata.length_stack_name).decode(
                "utf-8", errors="replace"
            )
        )
        obf_stack_metadata.stack_description = (
            ""
            if obf_stack_metadata.length_stack_description == 0
            else f.read(obf_stack_metadata.length_stack_description).decode("utf-8")
        )

        # Now we are at the beginning of the stack (image or other)
        obf_stack_metadata.data_start_position = f.tell()

        # Start position of the footer
        footer_start_position = (
            obf_stack_metadata.data_start_position + obf_stack_metadata.data_len_disk
        )

        # Move to the beginning of the footer
        f.seek(footer_start_position)

        return True, obf_stack_metadata

    def _read_obf_stack_footer(self, f: BinaryIO, obf_stack_metadata: OBFStackMetadata):
        """Process footer.

        Parameters
        ----------
        f: BinaryIO
            Open file handle.

        obf_stack_metadata: OBFStackMetadata
            Metadata object for current stack.

        Returns
        -------

        obf_stack_metadata: OBFFileMetadata
            Updated metadata object for current stack.
        """

        #
        # Version 0
        #

        # Current position (beginning of the footer)
        obf_stack_metadata.footer_start_pos = f.tell()

        # If stack version is 0, there is no footer
        if obf_stack_metadata.format_version == 0:
            obf_stack_metadata.footer_size = 0
            return obf_stack_metadata

        #
        # Version 1/1A
        #

        # What is the expected size of the footer for this header version?
        size_for_version = self._get_footer_struct_size(
            obf_stack_metadata.format_version
        )

        # Keep track ot the side while we proceed
        current_size = 0

        # Get size of the footer header
        obf_stack_metadata.footer_size = struct.unpack("<I", f.read(4))[0]
        current_size += 4

        # Position of the beginning of the variable metadata
        obf_stack_metadata.variable_metadata_start_position = (
            obf_stack_metadata.footer_start_pos + obf_stack_metadata.footer_size
        )

        # Entries are != 0 for all axes that have a pixel position array (after the footer)
        col_positions_present = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            p = struct.unpack("<I", f.read(4))[0]
            if i < obf_stack_metadata.rank:
                col_positions_present.append(p != 0)
            current_size += 4
        obf_stack_metadata.has_col_positions = col_positions_present

        # Entries are != 0 for all axes that have a label (after the footer)
        col_labels_present = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            b = struct.unpack("<I", f.read(4))[0]
            if i < obf_stack_metadata.rank:
                col_labels_present.append(b != 0)
            current_size += 4
        obf_stack_metadata.has_col_labels = col_labels_present

        # Metadata length (superseded by tag dictionary in version > 4)
        obf_stack_metadata.obsolete_metadata_length = struct.unpack("<I", f.read(4))[0]
        current_size += 4

        # Internal check
        assert (
            current_size == _Constants.V1A_FOOTER_LENGTH
        ), "Unexpected length of version 1/1A data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 2
        #

        # SI units of the value carried
        fractions = []
        for i in range(_Constants.OBF_SI_FRACTION_NUM_ELEMENTS):
            numerator = struct.unpack("<i", f.read(4))[0]
            denominator = struct.unpack("<i", f.read(4))[0]
            fractions.append(SIFraction(numerator=numerator, denominator=denominator))
            current_size += 8
        scale_factor = struct.unpack("<d", f.read(8))[0]
        current_size += 8
        si_value = SIUnit(exponents=fractions, scale_factor=scale_factor)
        obf_stack_metadata.si_value = si_value

        # SI units of the axes
        dimensions = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            fractions = []
            for j in range(_Constants.OBF_SI_FRACTION_NUM_ELEMENTS):
                numerator = struct.unpack("<i", f.read(4))[0]
                denominator = struct.unpack("<i", f.read(4))[0]
                fractions.append(
                    SIFraction(numerator=numerator, denominator=denominator)
                )
                current_size += 8
            scale_factor = struct.unpack("<d", f.read(8))[0]
            current_size += 8
            dimensions.append(SIUnit(exponents=fractions, scale_factor=scale_factor))

        # Add all SI dimensions
        obf_stack_metadata.si_dimensions = dimensions

        # Internal check
        assert (
            current_size == _Constants.V2_FOOTER_LENGTH
        ), "Unexpected length of version 2 data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 3
        #

        # The number of flush points
        num_flush_points = struct.unpack("<Q", f.read(8))[0]
        current_size += 8
        obf_stack_metadata.num_flush_points = num_flush_points

        # The flush block size
        flush_block_size = struct.unpack("<Q", f.read(8))[0]
        current_size += 8
        obf_stack_metadata.flush_block_size = flush_block_size

        # Internal check
        assert (
            current_size == _Constants.V3_FOOTER_LENGTH
        ), "Unexpected length of version 3 data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 4
        #
        obf_stack_metadata.tag_dictionary_length = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        # Internal check
        assert (
            current_size == _Constants.V4_FOOTER_LENGTH
        ), "Unexpected length of version 4 data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 5/5A
        #

        # Where on disk all the meta-data ends
        obf_stack_metadata.stack_end_disk = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        # Min supported format version
        obf_stack_metadata.min_format_version = struct.unpack("<I", f.read(4))[0]
        current_size += 4

        # The position where the stack ends on disk.
        obf_stack_metadata.stack_end_used_disk = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        # Internal check
        assert (
            current_size == _Constants.V5A_FOOTER_LENGTH
        ), "Unexpected length of version 5/5A data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 6
        #

        # The total number of samples available on disk. By convention all remaining data is
        # assumed to be zero or undefined. If this is less than the data contained of the stack
        # it is safe to assume that the stack was truncated by ending the measurement early.
        # If 0, the number of samples written is the one expected from the stack size.
        obf_stack_metadata.samples_written = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        obf_stack_metadata.num_chunk_positions = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        # Internal check
        assert (
            current_size == _Constants.V6_FOOTER_LENGTH
        ), "Unexpected length of version 6 data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 7
        #

        # There is no new documented footer metadata for version 7.

        #
        # Read data after the end of footer
        #

        f.seek(obf_stack_metadata.variable_metadata_start_position)

        # Read labels
        labels = []
        for i in range(obf_stack_metadata.rank):
            n = struct.unpack("<I", f.read(4))[0]
            label = f.read(n).decode("utf-8")
            labels.append(label)
        obf_stack_metadata.labels = labels

        # Read steps (where presents)
        steps = []
        for dimension in range(obf_stack_metadata.rank):
            lst = []
            if obf_stack_metadata.has_col_positions[dimension]:
                for position in range(obf_stack_metadata.num_pixels[dimension]):
                    step = struct.unpack("<d", f.read(8))[0]
                    lst.append(step)
            steps.append(lst)

        # Skip the obsolete metadata
        f.seek(f.tell() + obf_stack_metadata.obsolete_metadata_length)

        # Flush points
        if obf_stack_metadata.num_flush_points > 0:
            flush_points = []
            for i in range(obf_stack_metadata.num_flush_points):
                flush_points.append(struct.unpack("<Q", f.read(8))[0])
            obf_stack_metadata.flush_points = flush_points

        # Tag dictionary
        tag_dictionary = {}
        length_key = 1
        while length_key > 0:
            new_key = self._read_string(f)
            length_key = len(new_key)
            if length_key > 0:
                # Get value
                new_value = self._read_string(f, as_str=True, as_utf8=True)

                # Try to process it
                try:
                    tree = ET.fromstring(new_value)
                except ET.ParseError:
                    # Some keys are not XML, but stringified dictionaries
                    try:
                        tree = json.loads(new_value)
                    except json.JSONDecodeError as e:
                        print(
                            f"Failed processing value for key '{new_key}' ({e}): storing as raw string."
                        )
                        tree = new_value

                # Store it without further processing
                tag_dictionary[new_key] = tree

        obf_stack_metadata.tag_dictionary = tag_dictionary

        # Chunk positions
        if obf_stack_metadata.num_chunk_positions > 0:
            logical_positions = []
            file_positions = []

            # Start with 0
            logical_positions.append(0)
            file_positions.append(0)

            for i in range(obf_stack_metadata.num_chunk_positions):
                logical_positions.append(struct.unpack("<Q", f.read(8))[0])
                file_positions.append(struct.unpack("<Q", f.read(8))[0])

            obf_stack_metadata.chunk_logical_positions = logical_positions
            obf_stack_metadata.chunk_file_positions = file_positions

        # Return
        return obf_stack_metadata

    def _scan_metadata(
        self, f: BinaryIO, obf_file_header: OBFFileHeader
    ) -> Union[OBFFileMetadata, None]:
        """Scan the metadata at the location stored in the header.

        The expected values are a key matching: "ome_xml" followed by
        valid OME XML metadata that we parse and return as an ElementTree.

        Parameters
        ----------

        f: BinaryIO
            Open file handle.

        obf_file_header: OBFFileHeader
            File header structure.

        Returns
        -------

        metadata: OBFFileMetadata
            OME-XML file metadata.
        """

        if obf_file_header.meta_data_position == 0:
            return None

        # Remember current position
        current_pos = f.tell()

        # Move to the beginning of the metadata
        f.seek(obf_file_header.meta_data_position)

        # Initialize OBFFileMetadata object
        metadata = OBFFileMetadata()

        # Keep reading strings until done
        strings = []
        length_str = 1
        while length_str > 0:
            new_str = self._read_string(f)
            length_str = len(new_str)
            if length_str > 0:
                strings.append(new_str)

        # Now parse
        success = False
        tree = None
        if len(strings) == 2 and strings[0] == "ome_xml":
            try:
                tree = ET.fromstring(strings[1])
                success = True
            except ET.ParseError as e:
                success = False

        if not success:
            metadata.tree = None
            metadata.unknown_strings = strings
        else:
            metadata.tree = tree
            metadata.unknown_strings = []

        # Return to previous file position
        f.seek(current_pos)

        return metadata

    @staticmethod
    def _read_string(
        f: BinaryIO, as_str: bool = True, as_utf8: bool = True
    ) -> Union[str, bytes]:
        """Read a string at current position.

        Parameters
        ----------

        f: BinaryIO
            Open file handles.

        as_str: bool = True
            If True parse the raw byte array to string.

        as_utf8: bool = True
            If True decode the string to utf-8. Ignored if as_str is False.

        Returns
        -------

        string: Union[bytes, str]
            Either raw bytes or a str, optionally utf-8 encoded.
        """

        # Read the length of the following string
        length = struct.unpack("<I", f.read(4))[0]
        if length == 0:
            return ""

        # Read `length` bytes and convert them to utf-8 if requested
        value = f.read(length)
        if as_str:
            if as_utf8:
                value = value.decode("utf-8")

        return value

    @staticmethod
    def _get_bytes_per_sample_from_data_type(data_type: uint32) -> int:
        """Return the number of bytes per sample for given data type."""
        supported_types = {
            0x00000001: 1,  # 8-bit unsigned byte
            0x00000002: 1,  # 8-bit signed char
            0x00000004: 2,  # 16-bit word value
            0x00000008: 2,  # 16-bit signed integer
            0x00000010: 4,  # 32-bit unsigned integer
            0x00000020: 4,  # 32-bit signed integer
            0x00000040: 4,  # 32-bit floating point value
            0x00000080: 8,  # 64-bit floating point value
        }

        # Get the number of bytes
        num_bytes_per_sample = supported_types.get(data_type, -1)

        # Check that it is supported
        if num_bytes_per_sample == -1:
            raise ValueError(f"Unsupported data type 0x{data_type:08x}.")

        # Return it
        return num_bytes_per_sample

    def get_image_info_list(self):
        """Return a list of images from all stacks."""

        # Initialize the list
        images = []

        # Do we have images?
        if self.num_stacks == 0:
            return images

        for i, stack in enumerate(self._obf_stacks_list):

            # Only return images
            if (np.array(stack.num_pixels) > 1).sum() == 2:

                # Get pixel size
                pixel_sizes = np.round(
                    np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2
                )[:2]

                # Get detector
                detector = self._get_detector(
                    imspector_dictionary_root=stack.tag_dictionary["imspector"],
                    img_name=stack.stack_name,
                )

                if detector is None:
                    continue

                # Build a (univocal) summary string
                as_string = (
                    f"{detector}: {stack.stack_name}: "
                    f"size = (h={stack.num_pixels[1]} x w={stack.num_pixels[0]}); "
                    f"pixel size = {pixel_sizes[0]}nm "
                    f"(index = {i})"
                )
                images.append(
                    {
                        "index": i,
                        "name": stack.stack_name,
                        "detector": detector,
                        "description": stack.stack_description,
                        "num_pixels": stack.num_pixels,
                        "physical_lengths": stack.physical_lengths,
                        "physical_offsets": stack.physical_offsets,
                        "pixel_sizes": pixel_sizes,
                        "as_string": as_string,
                    }
                )

        # Sort the list using natural sorting by the 'as_string' key
        images = natsorted(images, key=lambda x: x["as_string"])

        # Return the extracted metadata
        return images

    def get_image_info_dict(self):
        """Return a hierarchical dictionary of images from all stacks."""

        # Initialize the dictionary
        images = {}

        # Do we have images?
        if self.num_stacks == 0:
            return images

        for i, stack in enumerate(self._obf_stacks_list):

            # Only return images
            if (np.array(stack.num_pixels) > 1).sum() == 2:

                # Get pixel size
                pixel_sizes = np.round(
                    np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2
                )[:2]

                # Get detector
                detector = self._get_detector(
                    imspector_dictionary_root=stack.tag_dictionary["imspector"],
                    img_name=stack.stack_name,
                )

                if detector is None:
                    continue

                # Get acquisition number
                match = re.match(
                    r"^.+{(?P<index>\d+)}(?P<extra>.*)$",
                    stack.stack_name,
                    re.IGNORECASE,
                )
                if match:
                    if match["extra"] == "":
                        key = f"Image {match['index']}"
                    else:
                        key = f"Image {match['index']} ({match['extra']})"
                else:
                    key = stack.stack_name

                if key in images:
                    image = images[key]
                else:
                    image = {
                        "metadata": "",
                        "detectors": [],
                    }

                # Metadata
                frame_size = (
                    stack.num_pixels[0] * pixel_sizes[0] / 1000,
                    stack.num_pixels[1] * pixel_sizes[1] / 1000,
                )

                # Build metadata string
                metadata = f"Frame: {frame_size[0]:.1f}x{frame_size[1]:.1f}µm - Pixel: {pixel_sizes[0]}nm"
                if image["metadata"] == "":
                    image["metadata"] = metadata
                else:
                    if image["metadata"] != metadata:
                        raise ValueError(
                            f"The same detector seems to have inconsistent metadata across acquisitions!"
                        )

                # Append current detectir
                image["detectors"].append(
                    {
                        "index": i,
                        "name": stack.stack_name,
                        "detector": detector,
                        "description": stack.stack_description,
                        "num_pixels": stack.num_pixels,
                        "physical_lengths": stack.physical_lengths,
                        "physical_offsets": stack.physical_offsets,
                        "pixel_sizes": pixel_sizes,
                    }
                )

                # Store the (updated) image in the dictionary
                images[key] = image

        # Sort the dictionary using natural sorting of its keys
        images = dict(natsorted(images.items()))

        # Return the extracted metadata
        return images

    @staticmethod
    def _get_detector(imspector_dictionary_root: ET, img_name: str) -> Union[str, None]:
        """Extract the detector names from the tag dictionary of current stack.

        Parameters
        ----------

        imspector_dictionary_root: xml.etree.ElementTree
            Root of the "imspector" tree (i.e., tag_dictionary["imspector"]).

        Returns
        -------

        name: Union[str, None]
            Name of the detector, or None if the detector could not be found.
        """

        # Get the channels node
        channels_node = imspector_dictionary_root.find(
            "./doc/ExpControl/measurement/channels"
        )
        if channels_node is None:
            return None

        # Find all items
        items = channels_node.findall("item")
        if items is None:
            return None

        # Process items
        detector = None
        for item in items:
            detector = item.find("./detsel/detector")
            name = item.find("./name")
            if detector is not None and name is not None:
                if name.text in img_name:
                    return detector.text

        return detector

Reads data and metadata information from .MSR (OBF format) files.

For documentation, see: https://imspectordocs.readthedocs.io/en/latest/fileformat.html#the-obf-file-format

Note: binary data is stored in little-endian order.

Constructor.

Parameters

filename : Union[Path, str]
Full path to the file name to open.

Instance variables

prop num_stacks
Expand source code
@property
def num_stacks(self):
    """Return the number of stacks contained in the file."""
    return len(self._obf_stacks_list)

Return the number of stacks contained in the file.

Methods

def export_ome_xml_metadata(self, file_name: pathlib.Path | str)
Expand source code
def export_ome_xml_metadata(self, file_name: Union[str, Path]):
    """Export the OME-XML metadata to file.

    Parameters
    ----------

    file_name: Union[str, Path]
        Output file name.
    """

    # Get the ome-xml tree, optionally as formatted string
    metadata = self.get_ome_xml_metadata()
    if metadata is None:
        print("Nothing to export.")
        return

    # Make sure the parent path to the file exists
    Path(file_name).parent.mkdir(parents=True, exist_ok=True)

    # Save to file
    with open(file_name, "w", encoding="utf-8") as f:
        f.write(metadata)

Export the OME-XML metadata to file.

Parameters

file_name : Union[str, Path]
Output file name.
def export_tag_dictionary(self, stack_index: int, file_name: pathlib.Path | str)
Expand source code
def export_tag_dictionary(self, stack_index: int, file_name: Union[str, Path]):
    """Export the tag dictionary to file.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to export the tag dictionary.

    file_name: Union[str, Path]
        Output file name.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"Stack number {stack_index} is out of range.")

    # Get tag dictionary
    tag_dictionary = self.get_tag_dictionary(stack_index)
    if tag_dictionary is None:
        return None

    # Make sure file_name is of type Path
    file_name = Path(file_name)

    # Make sure the parent path to the file exists
    file_name.parent.mkdir(parents=True, exist_ok=True)

    # Export the dictionaries
    for key, value in tag_dictionary.items():
        if type(value) is ET.Element:
            mod_file_name = file_name.parent / f"{file_name.stem}_{key}.xml"
            xml_str = self._tree_to_formatted_xml(value)
            with open(mod_file_name, "w") as f:
                f.write(xml_str)
        elif type(value) is dict:
            mod_file_name = file_name.parent / f"{file_name.stem}_{key}.json"
            with open(mod_file_name, "w") as f:
                json.dump(value, f, indent=4)
        else:
            mod_file_name = file_name.parent / f"{file_name.stem}_{key}.txt"
            with open(mod_file_name, "w") as f:
                f.write(value)

Export the tag dictionary to file.

Parameters

stack_index : int
Index of the stack for which to export the tag dictionary.
file_name : Union[str, Path]
Output file name.
def get_data(self, stack_index: int) ‑> numpy.ndarray | None
Expand source code
def get_data(self, stack_index: int) -> Union[np.ndarray, None]:
    """Read the data for requested stack: only images are returned.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    Returns
    -------

    frame: Union[np.ndarray, None]
        Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    # Currently, we only support format 6 and newer
    if obf_stack_metadata.format_version < 6:
        print("Reading data is supported only for stack format 6 and newer.")
        return None

    # If there are chunks, we currently do not read
    if obf_stack_metadata.num_chunk_positions > 0:
        print("Reading chunked data is currently not supported.")
        return None

    # We currently only read 2D images
    if self._get_num_dims(obf_stack_metadata.num_pixels) != 2:
        print("Only 2D images are currently supported.")
        return None

    # Get NumPy data type
    np_data_type, _ = self._get_numpy_data_type(
        obf_stack_metadata.data_type_on_disk
    )
    if np_data_type is None:
        print("Unsupported data type.")
        return None

    # Extract some info
    height = obf_stack_metadata.num_pixels[1]
    width = obf_stack_metadata.num_pixels[0]
    bytes_per_sample = obf_stack_metadata.bytes_per_sample

    # Expected number of (decompressed) samples
    expected_num_samples = width * height

    # Number of written bytes
    written_bytes = obf_stack_metadata.samples_written * bytes_per_sample

    # Open the file
    with open(self.filename, mode="rb") as f:

        # Seek to the beginning of the data
        f.seek(obf_stack_metadata.data_start_position)

        # Is there compression?
        if obf_stack_metadata.compression_type != 0:

            # Read the bytes
            compressed_data = f.read(written_bytes)

            # Decompress them
            decompressed_data = zlib.decompress(compressed_data)

            # Cast to a "byte" NumPy array
            raw_frame = np.frombuffer(decompressed_data, dtype=np.uint8)

        else:

            # Read the bytes
            raw_data = f.read(written_bytes)

            # Cast to a "byte" NumPy array
            raw_frame = np.frombuffer(raw_data, dtype=np.uint8)

    # Reinterpret as final data type format (little Endian)
    frame = raw_frame.view(np.dtype(np_data_type))

    # Make sure the final frame size matches the expected size
    if len(frame) != expected_num_samples:
        print("Unexpected length of data retrieved!")
        return None

    # Reshape
    frame = frame.reshape((height, width))

    return frame

Read the data for requested stack: only images are returned.

Parameters

stack_index : int
Index of the stack for which to read the data.

Returns

frame : Union[np.ndarray, None]
Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image.
def get_data_offsets(self, stack_index: int, scaled: bool = True) ‑> list | None
Expand source code
def get_data_offsets(
    self, stack_index: int, scaled: bool = True
) -> Union[list, None]:
    """Returns the (scaled) data offsets for the requested stack.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    scaled: bool
        If scaled is True, the offsets will be scaled by the corresponding scale factors
        as reported by MSRReader.get_data_units().

    Returns
    -------

    offsets: Union[list, None]
        Offsets for 2D images, None otherwise.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    if obf_stack_metadata is None:
        return None

    # Get the offsets
    offsets = obf_stack_metadata.physical_offsets[: obf_stack_metadata.rank]

    # Do we need to scale?
    if scaled:
        _, factors = self.get_data_units(stack_index=stack_index)
        for i, factor in enumerate(factors):
            if factor != 1.0:
                offsets[i] *= factor

    return offsets

Returns the (scaled) data offsets for the requested stack.

Parameters

stack_index : int
Index of the stack for which to read the data.
scaled : bool
If scaled is True, the offsets will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().

Returns

offsets : Union[list, None]
Offsets for 2D images, None otherwise.
def get_data_physical_sizes(self, stack_index: int, scaled: bool = True) ‑> list | None
Expand source code
def get_data_physical_sizes(
    self, stack_index: int, scaled: bool = True
) -> Union[list, None]:
    """Returns the (scaled) data physical size for the requested stack.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    scaled: bool
        If scaled is True, the physical sizes will be scaled by the corresponding scale factors
        as reported by MSRReader.get_data_units().

    Returns
    -------

    offsets: Union[list, None]
        Physical sizes for 2D images, None otherwise.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    if obf_stack_metadata is None:
        return None

    # Get the physical lengths
    phys_lengths = obf_stack_metadata.physical_lengths[: obf_stack_metadata.rank]

    # Do we need to scale?
    if scaled:
        _, factors = self.get_data_units(stack_index=stack_index)
        for i, factor in enumerate(factors):
            if factor != 1.0:
                phys_lengths[i] *= factor

    # Return the physical lengths as list
    return phys_lengths

Returns the (scaled) data physical size for the requested stack.

Parameters

stack_index : int
Index of the stack for which to read the data.
scaled : bool
If scaled is True, the physical sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().

Returns

offsets : Union[list, None]
Physical sizes for 2D images, None otherwise.
def get_data_pixel_sizes(self, stack_index: int, scaled: bool = True) ‑> list | None
Expand source code
def get_data_pixel_sizes(
    self, stack_index: int, scaled: bool = True
) -> Union[list, None]:
    """Returns the (scaled) data pixel size for the requested stack.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    scaled: bool
        If scaled is True, the pixel sizes will be scaled by the corresponding scale factors
        as reported by MSRReader.get_data_units().

    Returns
    -------

    offsets: Union[list, None]
        Pixel sizes for 2D images, None otherwise.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    if obf_stack_metadata is None:
        return None

    # Get the physical sizes
    phys_lengths = self.get_data_physical_sizes(
        stack_index=stack_index, scaled=scaled
    )

    # Get the number of pixels along each dimension
    num_pixels = obf_stack_metadata.num_pixels[: obf_stack_metadata.rank]

    # Now divide by the image size
    pixel_sizes = np.array(phys_lengths) / np.array(num_pixels)

    # Return the pixel size as list
    return pixel_sizes.tolist()

Returns the (scaled) data pixel size for the requested stack.

Parameters

stack_index : int
Index of the stack for which to read the data.
scaled : bool
If scaled is True, the pixel sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().

Returns

offsets : Union[list, None]
Pixel sizes for 2D images, None otherwise.
def get_data_units(self, stack_index: int) ‑> tuple[list, list] | None
Expand source code
def get_data_units(self, stack_index: int) -> Union[tuple[list, list], None]:
    """Returns the data units and scale factors per dimension for requested stack.

    Units are one of:
        "m": meters
        "kg": kilograms
        "s": s
        "A": Amperes
        "K": Kelvin
        "mol": moles
        "cd": candela
        "r": radian
        "sr": sr

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    Returns
    -------

    unit: Union[tuple[list, list], None]
        List of units and list of scale factors, or None if no file was opened.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    if obf_stack_metadata is None:
        return None

    units = []
    scale_factors = []
    for dim in range(obf_stack_metadata.rank):
        dimensions = obf_stack_metadata.si_dimensions[dim]
        scale_factors.append(dimensions.scale_factor)
        for i, exponent in enumerate(dimensions.exponents):
            if i == 0 and exponent.numerator > 0:
                units.append("m")
                break
            elif i == 1 and exponent.numerator > 0:
                units.append("kg")
                break
            elif i == 2 and exponent.numerator > 0:
                units.append("s")
                break
            elif i == 3 and exponent.numerator > 0:
                units.append("A")
                break
            elif i == 4 and exponent.numerator > 0:
                units.append("K")
                break
            elif i == 5 and exponent.numerator > 0:
                units.append("mol")
                break
            elif i == 6 and exponent.numerator > 0:
                units.append("cd")
                break
            elif i == 7 and exponent.numerator > 0:
                units.append("r")
                break
            elif i == 8 and exponent.numerator > 0:
                units.append("sr")
                break
            else:
                units.append("")
                break

    # Return the extracted units and scale factors
    return units, scale_factors

Returns the data units and scale factors per dimension for requested stack.

Units are one of: "m": meters "kg": kilograms "s": s "A": Amperes "K": Kelvin "mol": moles "cd": candela "r": radian "sr": sr

Parameters

stack_index : int
Index of the stack for which to read the data.

Returns

unit : Union[tuple[list, list], None]
List of units and list of scale factors, or None if no file was opened.
def get_image_info_dict(self)
Expand source code
def get_image_info_dict(self):
    """Return a hierarchical dictionary of images from all stacks."""

    # Initialize the dictionary
    images = {}

    # Do we have images?
    if self.num_stacks == 0:
        return images

    for i, stack in enumerate(self._obf_stacks_list):

        # Only return images
        if (np.array(stack.num_pixels) > 1).sum() == 2:

            # Get pixel size
            pixel_sizes = np.round(
                np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2
            )[:2]

            # Get detector
            detector = self._get_detector(
                imspector_dictionary_root=stack.tag_dictionary["imspector"],
                img_name=stack.stack_name,
            )

            if detector is None:
                continue

            # Get acquisition number
            match = re.match(
                r"^.+{(?P<index>\d+)}(?P<extra>.*)$",
                stack.stack_name,
                re.IGNORECASE,
            )
            if match:
                if match["extra"] == "":
                    key = f"Image {match['index']}"
                else:
                    key = f"Image {match['index']} ({match['extra']})"
            else:
                key = stack.stack_name

            if key in images:
                image = images[key]
            else:
                image = {
                    "metadata": "",
                    "detectors": [],
                }

            # Metadata
            frame_size = (
                stack.num_pixels[0] * pixel_sizes[0] / 1000,
                stack.num_pixels[1] * pixel_sizes[1] / 1000,
            )

            # Build metadata string
            metadata = f"Frame: {frame_size[0]:.1f}x{frame_size[1]:.1f}µm - Pixel: {pixel_sizes[0]}nm"
            if image["metadata"] == "":
                image["metadata"] = metadata
            else:
                if image["metadata"] != metadata:
                    raise ValueError(
                        f"The same detector seems to have inconsistent metadata across acquisitions!"
                    )

            # Append current detectir
            image["detectors"].append(
                {
                    "index": i,
                    "name": stack.stack_name,
                    "detector": detector,
                    "description": stack.stack_description,
                    "num_pixels": stack.num_pixels,
                    "physical_lengths": stack.physical_lengths,
                    "physical_offsets": stack.physical_offsets,
                    "pixel_sizes": pixel_sizes,
                }
            )

            # Store the (updated) image in the dictionary
            images[key] = image

    # Sort the dictionary using natural sorting of its keys
    images = dict(natsorted(images.items()))

    # Return the extracted metadata
    return images

Return a hierarchical dictionary of images from all stacks.

def get_image_info_list(self)
Expand source code
def get_image_info_list(self):
    """Return a list of images from all stacks."""

    # Initialize the list
    images = []

    # Do we have images?
    if self.num_stacks == 0:
        return images

    for i, stack in enumerate(self._obf_stacks_list):

        # Only return images
        if (np.array(stack.num_pixels) > 1).sum() == 2:

            # Get pixel size
            pixel_sizes = np.round(
                np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2
            )[:2]

            # Get detector
            detector = self._get_detector(
                imspector_dictionary_root=stack.tag_dictionary["imspector"],
                img_name=stack.stack_name,
            )

            if detector is None:
                continue

            # Build a (univocal) summary string
            as_string = (
                f"{detector}: {stack.stack_name}: "
                f"size = (h={stack.num_pixels[1]} x w={stack.num_pixels[0]}); "
                f"pixel size = {pixel_sizes[0]}nm "
                f"(index = {i})"
            )
            images.append(
                {
                    "index": i,
                    "name": stack.stack_name,
                    "detector": detector,
                    "description": stack.stack_description,
                    "num_pixels": stack.num_pixels,
                    "physical_lengths": stack.physical_lengths,
                    "physical_offsets": stack.physical_offsets,
                    "pixel_sizes": pixel_sizes,
                    "as_string": as_string,
                }
            )

    # Sort the list using natural sorting by the 'as_string' key
    images = natsorted(images, key=lambda x: x["as_string"])

    # Return the extracted metadata
    return images

Return a list of images from all stacks.

def get_ome_xml_metadata(self) ‑> str | None
Expand source code
def get_ome_xml_metadata(self) -> Union[str, None]:
    """Return the OME XML metadata.

    Returns
    -------

    ome_xml_metadata: Union[str, None]
        OME XML metadata as formatted string. If no file was loaded, returns None.
    """

    # Get the ome-xml tree
    root = self.obf_file_metadata.tree
    if root is None:
        return None

    # Return metadata as formatted XML string
    return self._tree_to_formatted_xml(root)

Return the OME XML metadata.

Returns

ome_xml_metadata : Union[str, None]
OME XML metadata as formatted string. If no file was loaded, returns None.
def get_tag_dictionary(self, stack_index: int) ‑> dict | None
Expand source code
def get_tag_dictionary(self, stack_index: int) -> Union[dict, None]:
    """Return the tag dictionary for the requested stack.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to return the tag dictionary.

    Returns
    -------

    tag_dictionary: Union[dict, None]
        Dictionary. If no file was loaded, returns None.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"Stack number {stack_index} is out of range.")

    # Get stack metadata
    obf_stack_metadata = self._obf_stacks_list[stack_index]
    if obf_stack_metadata is None:
        return None

    # Get the tag dictionary
    tag = obf_stack_metadata.tag_dictionary

    # Return the tag dictionary
    return tag

Return the tag dictionary for the requested stack.

Parameters

stack_index : int
Index of the stack for which to return the tag dictionary.

Returns

tag_dictionary : Union[dict, None]
Dictionary. If no file was loaded, returns None.
def scan(self) ‑> bool
Expand source code
def scan(self) -> bool:
    """Scan the metadata of the file.

    Returns
    -------

    success: bool
        True if the file was scanned successfully, False otherwise.
    """

    # Open the file
    with open(self.filename, mode="rb") as f:

        if not self._read_obf_header(f):
            return False

        # Scan metadata
        self.obf_file_metadata = self._scan_metadata(f, self.obf_file_header)

        # Get the first stack position
        next_stack_pos = self.obf_file_header.first_stack_pos

        while next_stack_pos != 0:

            # Scan the next stack
            success, obs_stack_metadata = self._read_obf_stack(f, next_stack_pos)

            if not success:
                return False

            # Append current stack header
            self._obf_stacks_list.append(obs_stack_metadata)

            # Do we have a next header to parse?
            next_stack_pos = obs_stack_metadata.next_stack_pos

    return True

Scan the metadata of the file.

Returns

success : bool
True if the file was scanned successfully, False otherwise.
class MinFluxReader (filename: pathlib.Path | str,
valid: bool = True,
z_scaling_factor: float = 1.0,
is_tracking: bool = False,
pool_dcr: bool = False,
dwell_time: float = 1.0)
Expand source code
class MinFluxReader:
    __docs__ = "Reader of MINFLUX data in `.pmx`, `.npy` or `.mat` formats and Imspector m2205 files, and `.pmx` version 1.0 - 2.0."

    __slots__ = [
        "_pool_dcr",
        "_cfr_index",
        "_processed_dataframe",
        "_dcr_index",
        "_dwell_time",
        "_eco_index",
        "_efo_index",
        "_filename",
        "_full_raw_data_array",
        "_is_3d",
        "_is_aggregated",
        "_is_last_valid",
        "_is_tracking",
        "_last_valid",
        "_last_valid_cfr",
        "_loc_index",
        "_relocalizations",
        "_reps",
        "_tid_index",
        "_tim_index",
        "_unit_scaling_factor",
        "_valid",
        "_valid_cfr",
        "_valid_entries",
        "_vld_index",
        "_z_scaling_factor",
        "_tid_offsets",
    ]

    def __init__(
        self,
        filename: Union[Path, str],
        valid: bool = True,
        z_scaling_factor: float = 1.0,
        is_tracking: bool = False,
        pool_dcr: bool = False,
        dwell_time: float = 1.0,
    ):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx`, `.npy` or `.mat` file to read

        valid: bool (optional, default = True)
            Whether to load only valid localizations.

        z_scaling_factor: float (optional, default = 1.0)
            Refractive index mismatch correction factor to apply to the z coordinates.

        is_tracking: bool (optional, default = False)
            Whether the dataset comes from a tracking experiment; otherwise, it is considered as a
            localization experiment.

        pool_dcr: bool (optional, default = False)
            Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.

        dwell_time: float (optional, default 1.0)
            Dwell time in milliseconds.
        """

        # Store the filename
        self._filename: Path = Path(filename)
        if not self._filename.exists():
            raise IOError(f"The file {self._filename} does not seem to exist.")

        # Keep track of whether the chosen sequence is the last valid.
        self._is_last_valid: bool = False

        # Store the valid flag
        self._valid: bool = valid

        # The localizations are stored in meters in the Imspector files and by
        # design also in the `.pmx` format. Here, we scale them to be in nm
        self._unit_scaling_factor: float = 1e9

        # Store the z correction factor
        self._z_scaling_factor: float = z_scaling_factor

        # Store the dwell time
        self._dwell_time = dwell_time

        # Initialize the data
        self._full_raw_data_array = None
        self._processed_dataframe = None
        self._valid_entries = None
        self._tid_offsets = []

        # Whether the acquisition is 2D or 3D
        self._is_3d: bool = False

        # Whether the acquisition is a tracking dataset
        self._is_tracking: bool = is_tracking

        # Whether to pool the dcr values
        self._pool_dcr = pool_dcr

        # Whether the file contains aggregate measurements
        self._is_aggregated: bool = False

        # Indices dependent on 2D or 3D acquisition and whether the
        # data comes from a localization or a tracking experiment.
        self._reps: int = -1
        self._efo_index: int = -1
        self._cfr_index: int = -1
        self._dcr_index: int = -1
        self._eco_index: int = -1
        self._loc_index: int = -1
        self._valid_cfr: list = []
        self._relocalizations: list = []

        # Constant indices
        self._tid_index: int = 0
        self._tim_index: int = 0
        self._vld_index: int = 0

        # Keep track of the last valid global and CFR iterations as returned
        # by the initial scan
        self._last_valid: int = -1
        self._last_valid_cfr: int = -1

        # Load the file
        if not self._load():
            raise IOError(f"The file {self._filename} is not a valid MINFLUX file.")

    @property
    def version(self) -> int:
        return 1

    @property
    def is_last_valid(self) -> Union[bool, None]:
        """Return True if the selected iteration is the "last valid", False otherwise.
        If the dataframe has not been processed yet, `is_last_valid` will be None."""
        if self._processed_dataframe is None:
            return None
        return self._is_last_valid

    @property
    def z_scaling_factor(self) -> float:
        """Returns the scaling factor for the z coordinates."""
        return self._z_scaling_factor

    @property
    def is_3d(self) -> bool:
        """Returns True is the acquisition is 3D, False otherwise."""
        return self._is_3d

    @property
    def is_aggregated(self) -> bool:
        """Returns True is the acquisition is aggregated, False otherwise."""
        return self._is_aggregated

    @property
    def is_tracking(self) -> bool:
        """Returns True for a tracking acquisition, False otherwise."""
        return self._is_tracking

    @property
    def is_pool_dcr(self) -> bool:
        """Returns True if the DCR values over all relocalized iterations (to use all photons)."""
        return self._pool_dcr

    @property
    def dwell_time(self) -> float:
        """Returns the dwell time."""
        return self._dwell_time

    @property
    def tid_offsets(self) -> list:
        """Return list of (first_iid, tid_offset) pairs applied when combining datasets."""
        return list(self._tid_offsets)

    @property
    def num_valid_entries(self) -> int:
        """Number of valid entries."""
        if self._valid_entries is None:
            return 0
        return int(self._valid_entries.sum())

    @property
    def num_invalid_entries(self) -> int:
        """Number of valid entries."""
        if self._valid_entries is None:
            return 0
        return int(np.logical_not(self._valid_entries).sum())

    @property
    def tot_num_entries(self) -> int:
        """Total number of entries."""
        return self.num_valid_entries + self.num_invalid_entries

    @property
    def valid_cfr(self) -> list:
        """Return the iterations with valid CFR measurements.

        Returns
        -------
        cfr: boolean array with True for the iteration indices
             that have a valid measurement.
        """
        if self.tot_num_entries == 0:
            return []
        return self._valid_cfr

    @property
    def relocalizations(self) -> list:
        """Return the iterations with relocalizations.

        Returns
        -------
        reloc: boolean array with True for the iteration indices that are relocalized.
        """
        if self.tot_num_entries == 0:
            return []
        return self._relocalizations

    @property
    def valid_raw_data_array(self) -> Union[None, np.ndarray]:
        """Return the raw data."""
        if self.tot_num_entries == 0:
            return None
        return self._full_raw_data_array[self._valid_entries].copy()

    @property
    def processed_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return the raw data as dataframe (some properties only)."""
        if self._processed_dataframe is not None:
            return self._processed_dataframe

        self._processed_dataframe = self._process()
        return self._processed_dataframe

    @property
    def filename(self) -> Union[Path, None]:
        """Return the filename if set."""
        if self._filename is None:
            return None
        return Path(self._filename)

    def set_indices(self, index, cfr_index, process: bool = True):
        """Set the parameter indices.

        We distinguish between the index of all parameters
        that are always measured and are accessed from the
        same iteration, and the cfr index, that is not
        always measured.

        Parameters
        ----------

        index: int
            Global iteration index for all parameters but cfr

        cfr_index: int
            Iteration index for cfr

        process: bool (Optional, default = True)
            By default, when setting the indices, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # The cfr index is not allowed to be smaller than the global iteration index
        if index < cfr_index:
            raise ValueError(
                "The value of index must be greater than or equal to cfr_index."
            )

        # Make sure there is loaded data
        if self.tot_num_entries == 0:
            raise ValueError("No data loaded.")

        if self._reps == -1:
            raise ValueError("No data loaded.")

        if len(self._valid_cfr) == 0:
            raise ValueError("No data loaded.")

        # Check that the arguments are compatible with the loaded data
        if index < 0 or index > self._reps - 1:
            raise ValueError(
                f"The value of index must be between 0 and {self._reps - 1}."
            )

        if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1:
            raise ValueError(
                f"The value of index must be between 0 and {len(self._valid_cfr) - 1}."
            )

        # Now set the general values
        self._efo_index = index
        self._dcr_index = index
        self._eco_index = index
        self._loc_index = index

        # Set the cfr index
        self._cfr_index = cfr_index

        # Constant indices
        self._tid_index: int = 0
        self._tim_index: int = 0
        self._vld_index: int = 0

        # Re-process the file? If the processed dataframe already exists,
        # the processing will take place anyway.
        if process or self._processed_dataframe is not None:
            self._processed_dataframe = self._process()

    def set_tracking(self, is_tracking: bool, process: bool = True):
        """Sets whether the acquisition is tracking or localization.

        Parameters
        ----------

        is_tracking: bool
            Set to True for a tracking acquisition, False for a localization
            acquisition.

        process: bool (Optional, default = True)
            By default, when setting the tracking flag, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._is_tracking = is_tracking

        # Re-process the file?
        if process or self._processed_dataframe is not None:
            self._processed_dataframe = self._process()

    def set_dwell_time(self, dwell_time: float, process: bool = True):
        """
        Sets the dwell time.

        Parameters
        ----------
        dwell_time: float
            Dwell time.

        process: bool (Optional, default = True)
            By default, when setting the dwell time, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._dwell_time = dwell_time

        # Re-process the file?
        if process or self._processed_dataframe is not None:
            self._processed_dataframe = self._process()

    def set_pool_dcr(self, pool_dcr: bool, process: bool = True):
        """
        Sets whether the DCR values should be pooled (and weighted by ECO).

        Parameters
        ----------
        pool_dcr: bool
            Whether the DCR values should be pooled (and weighted by ECO).

        process: bool (Optional, default = True)
            By default, when setting the DCR binning flag, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._pool_dcr = pool_dcr

        # Re-process the file?
        if process or self._processed_dataframe is not None:
            self._processed_dataframe = self._process()

    @classmethod
    def processed_properties(cls) -> list:
        """Returns the properties read from the file that correspond to the processed dataframe column names."""
        return [
            "tid",
            "tim",
            "x",
            "y",
            "z",
            "efo",
            "cfr",
            "eco",
            "dcr",
            "dwell",
            "fluo",
            "fbg",
        ]

    @classmethod
    def raw_properties(cls) -> list:
        """Returns the properties read from the file and dynamic that correspond to the raw dataframe column names."""
        return [
            "tid",
            "aid",
            "vld",
            "tim",
            "x",
            "y",
            "z",
            "efo",
            "cfr",
            "eco",
            "dcr",
            "fbg",
        ]

    def _load(self) -> bool:
        """Load the file."""

        if not self._filename.is_file():
            print(f"File {self._filename} does not exist.")
            return False

        # Reset stored TID offsets
        self._tid_offsets = []

        # Call the specialized _load_*() function
        if self._filename.name.lower().endswith(".npy"):
            try:
                data_array = np.load(str(self._filename), allow_pickle=False)
                if "fluo" in data_array.dtype.names:
                    self._full_raw_data_array = data_array
                else:
                    self._full_raw_data_array = _migrate_npy_array(data_array)
            except (
                OSError,
                UnpicklingError,
                ValueError,
                EOFError,
                FileNotFoundError,
                TypeError,
                Exception,
            ) as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        elif self._filename.name.lower().endswith(".mat"):
            try:
                self._full_raw_data_array = _convert_from_mat(self._filename)
            except Exception as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        elif self._filename.name.lower().endswith(".pmx"):
            try:
                # Read filtered dataframe
                self._full_raw_data_array = PMXReader.get_array(self._filename)
                self._tid_offsets = PMXReader.get_tid_offsets(self._filename)

                if self._full_raw_data_array is None:
                    print(f"Could not open {self._filename}.")
                    return False
            except Exception as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        else:
            print(f"Unexpected file {self._filename}.")
            return False

        # Store a logical array with the valid entries
        self._valid_entries = self._full_raw_data_array["vld"]

        # Cache whether the data is 2D or 3D and whether is aggregated
        # The cases are different for localization vs. tracking experiments
        # num_locs = self._full_raw_data_array["itr"].shape[1]
        self._is_3d = (
            float(np.nanmean(self._full_raw_data_array["itr"][:, -1]["loc"][:, -1]))
            != 0.0
        )

        # Set all relevant indices
        self._set_all_indices()

        # Return success
        return True

    def _process(self) -> Union[None, pd.DataFrame]:
        """Returns processed dataframe for valid (or invalid) entries.

        Returns
        -------

        df: pd.DataFrame
            Processed data as DataFrame.
        """

        # Do we have a data array to work on?
        if self.tot_num_entries == 0:
            return None

        if self._valid:
            indices = self._valid_entries
        else:
            indices = np.logical_not(self._valid_entries)

        # Extract the valid iterations
        itr = self._full_raw_data_array["itr"][indices]

        # Extract the valid identifiers
        tid = self._full_raw_data_array["tid"][indices]

        # Extract the valid time points
        tim = self._full_raw_data_array["tim"][indices]

        # Extract the fluorophore IDs
        fluo = self._full_raw_data_array["fluo"][indices]
        if np.all(fluo) == 0:
            fluo = np.ones(fluo.shape, dtype=fluo.dtype)

        # The following extraction pattern will change whether the
        # acquisition is normal or aggregated
        if self.is_aggregated:
            # Extract the locations
            loc = itr["loc"].squeeze() * self._unit_scaling_factor
            loc[:, 2] = loc[:, 2] * self._z_scaling_factor

            # Extract EFO
            efo = itr["efo"]

            # Extract CFR
            cfr = itr["cfr"]

            # Extract ECO
            eco = itr["eco"]

            # Extract DCR
            dcr = itr["dcr"]

            # Extract the background
            bfg = itr["bfg"]

            # Dwell
            dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0)

        else:
            # Extract the locations
            loc = itr[:, self._loc_index]["loc"] * self._unit_scaling_factor
            loc[:, 2] = loc[:, 2] * self._z_scaling_factor

            # Extract EFO
            efo = itr[:, self._efo_index]["efo"]

            # Extract CFR
            cfr = itr[:, self._cfr_index]["cfr"]

            # Extract ECO
            eco = itr[:, self._eco_index]["eco"]

            # Extract the background
            fbg = itr[:, self._loc_index]["fbg"]

            # Pool DCR values?
            if self._pool_dcr and np.sum(self._relocalizations) > 1:

                # Calculate ECO contributions
                eco_all = itr[:, self._relocalizations]["eco"]
                eco_sum = eco_all.sum(axis=1)
                eco_all_norm = eco_all / eco_sum.reshape(-1, 1)

                # Extract DCR values and weigh them by the relative ECO contributions
                dcr = itr[:, self._relocalizations]["dcr"]
                dcr = dcr * eco_all_norm
                dcr = dcr.sum(axis=1)

            else:

                # Extract DCR
                dcr = itr[:, self._dcr_index]["dcr"]

            # Calculate dwell
            dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0)

        # Create a Pandas dataframe for the results
        df = pd.DataFrame(
            index=pd.RangeIndex(start=0, stop=len(tid)),
            columns=MinFluxReader.processed_properties(),
        )

        # Store the extracted valid hits into the dataframe
        df["tid"] = tid
        df["x"] = loc[:, 0]
        df["y"] = loc[:, 1]
        df["z"] = loc[:, 2]
        df["tim"] = tim
        df["efo"] = efo
        df["cfr"] = cfr
        df["eco"] = eco
        df["dcr"] = dcr
        df["dwell"] = dwell
        df["fbg"] = fbg
        df["fluo"] = fluo

        # Remove rows with NaNs in the loc matrix
        df = df.dropna(subset=["x"])

        # Check if the selected indices correspond to the last valid iteration
        self._is_last_valid = bool(
            self._cfr_index == self._last_valid_cfr
            and self._efo_index == self._last_valid
        )

        return df

    def _set_all_indices(self):
        """Set indices of properties to be read."""
        if self.tot_num_entries == 0:
            return False

        # Number of iterations
        self._reps = self._full_raw_data_array["itr"].shape[1]

        # Is this an aggregated acquisition?
        if self._reps == 1:
            self._is_aggregated = True
        else:
            self._is_aggregated = False

        # Query the data to find the last valid iteration
        # for all measurements
        last_valid = find_last_valid_iteration(self._full_raw_data_array)

        # Set the extracted indices
        self._efo_index = last_valid["efo_index"]
        self._cfr_index = last_valid["cfr_index"]
        self._dcr_index = last_valid["dcr_index"]
        self._eco_index = last_valid["eco_index"]
        self._loc_index = last_valid["loc_index"]
        self._valid_cfr = last_valid["valid_cfr"]
        self._relocalizations = last_valid["reloc"]

        # Keep track of the last valid iteration
        self._last_valid = len(self._valid_cfr) - 1
        self._last_valid_cfr = last_valid["cfr_index"]

    def __repr__(self) -> str:
        """String representation of the object."""
        if self.num_valid_entries == 0:
            return "No file loaded."

        str_valid = (
            "all valid"
            if self.num_invalid_entries == 0
            else f"{self.num_valid_entries} valid and {self.num_invalid_entries} non valid"
        )

        str_acq = "3D" if self.is_3d else "2D"
        aggr_str = "aggregated" if self.is_aggregated else "normal"

        return (
            f"File: {self._filename.name}: "
            f"{str_acq} {aggr_str} acquisition with {self.tot_num_entries} entries ({str_valid})."
        )

    def __str__(self) -> str:
        """Human-friendly representation of the object."""
        return self.__repr__()

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx, .npy or .mat file to read
valid : bool (optional, default = True)
Whether to load only valid localizations.
z_scaling_factor : float (optional, default = 1.0)
Refractive index mismatch correction factor to apply to the z coordinates.
is_tracking : bool (optional, default = False)
Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment.
pool_dcr : bool (optional, default = False)
Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.
dwell_time : float (optional, default 1.0)
Dwell time in milliseconds.

Subclasses

  • pyminflux.reader._reader_v2.MinFluxReaderV2

Static methods

def processed_properties() ‑> list

Returns the properties read from the file that correspond to the processed dataframe column names.

def raw_properties() ‑> list

Returns the properties read from the file and dynamic that correspond to the raw dataframe column names.

Instance variables

prop dwell_time : float
Expand source code
@property
def dwell_time(self) -> float:
    """Returns the dwell time."""
    return self._dwell_time

Returns the dwell time.

prop filename : pathlib.Path | None
Expand source code
@property
def filename(self) -> Union[Path, None]:
    """Return the filename if set."""
    if self._filename is None:
        return None
    return Path(self._filename)

Return the filename if set.

prop is_3d : bool
Expand source code
@property
def is_3d(self) -> bool:
    """Returns True is the acquisition is 3D, False otherwise."""
    return self._is_3d

Returns True is the acquisition is 3D, False otherwise.

prop is_aggregated : bool
Expand source code
@property
def is_aggregated(self) -> bool:
    """Returns True is the acquisition is aggregated, False otherwise."""
    return self._is_aggregated

Returns True is the acquisition is aggregated, False otherwise.

prop is_last_valid : bool | None
Expand source code
@property
def is_last_valid(self) -> Union[bool, None]:
    """Return True if the selected iteration is the "last valid", False otherwise.
    If the dataframe has not been processed yet, `is_last_valid` will be None."""
    if self._processed_dataframe is None:
        return None
    return self._is_last_valid

Return True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet, is_last_valid will be None.

prop is_pool_dcr : bool
Expand source code
@property
def is_pool_dcr(self) -> bool:
    """Returns True if the DCR values over all relocalized iterations (to use all photons)."""
    return self._pool_dcr

Returns True if the DCR values over all relocalized iterations (to use all photons).

prop is_tracking : bool
Expand source code
@property
def is_tracking(self) -> bool:
    """Returns True for a tracking acquisition, False otherwise."""
    return self._is_tracking

Returns True for a tracking acquisition, False otherwise.

prop num_invalid_entries : int
Expand source code
@property
def num_invalid_entries(self) -> int:
    """Number of valid entries."""
    if self._valid_entries is None:
        return 0
    return int(np.logical_not(self._valid_entries).sum())

Number of valid entries.

prop num_valid_entries : int
Expand source code
@property
def num_valid_entries(self) -> int:
    """Number of valid entries."""
    if self._valid_entries is None:
        return 0
    return int(self._valid_entries.sum())

Number of valid entries.

prop processed_dataframe : pandas.core.frame.DataFrame | None
Expand source code
@property
def processed_dataframe(self) -> Union[None, pd.DataFrame]:
    """Return the raw data as dataframe (some properties only)."""
    if self._processed_dataframe is not None:
        return self._processed_dataframe

    self._processed_dataframe = self._process()
    return self._processed_dataframe

Return the raw data as dataframe (some properties only).

prop relocalizations : list
Expand source code
@property
def relocalizations(self) -> list:
    """Return the iterations with relocalizations.

    Returns
    -------
    reloc: boolean array with True for the iteration indices that are relocalized.
    """
    if self.tot_num_entries == 0:
        return []
    return self._relocalizations

Return the iterations with relocalizations.

Returns

reloc: boolean array with True for the iteration indices that are relocalized.

prop tid_offsets : list
Expand source code
@property
def tid_offsets(self) -> list:
    """Return list of (first_iid, tid_offset) pairs applied when combining datasets."""
    return list(self._tid_offsets)

Return list of (first_iid, tid_offset) pairs applied when combining datasets.

prop tot_num_entries : int
Expand source code
@property
def tot_num_entries(self) -> int:
    """Total number of entries."""
    return self.num_valid_entries + self.num_invalid_entries

Total number of entries.

prop valid_cfr : list
Expand source code
@property
def valid_cfr(self) -> list:
    """Return the iterations with valid CFR measurements.

    Returns
    -------
    cfr: boolean array with True for the iteration indices
         that have a valid measurement.
    """
    if self.tot_num_entries == 0:
        return []
    return self._valid_cfr

Return the iterations with valid CFR measurements.

Returns

cfr : boolean array with True for the iteration indices
that have a valid measurement.
prop valid_raw_data_array : numpy.ndarray | None
Expand source code
@property
def valid_raw_data_array(self) -> Union[None, np.ndarray]:
    """Return the raw data."""
    if self.tot_num_entries == 0:
        return None
    return self._full_raw_data_array[self._valid_entries].copy()

Return the raw data.

prop version : int
Expand source code
@property
def version(self) -> int:
    return 1
prop z_scaling_factor : float
Expand source code
@property
def z_scaling_factor(self) -> float:
    """Returns the scaling factor for the z coordinates."""
    return self._z_scaling_factor

Returns the scaling factor for the z coordinates.

Methods

def set_dwell_time(self, dwell_time: float, process: bool = True)
Expand source code
def set_dwell_time(self, dwell_time: float, process: bool = True):
    """
    Sets the dwell time.

    Parameters
    ----------
    dwell_time: float
        Dwell time.

    process: bool (Optional, default = True)
        By default, when setting the dwell time, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._dwell_time = dwell_time

    # Re-process the file?
    if process or self._processed_dataframe is not None:
        self._processed_dataframe = self._process()

Sets the dwell time.

Parameters

dwell_time : float
Dwell time.
process : bool (Optional, default = True)
By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
def set_indices(self, index, cfr_index, process: bool = True)
Expand source code
def set_indices(self, index, cfr_index, process: bool = True):
    """Set the parameter indices.

    We distinguish between the index of all parameters
    that are always measured and are accessed from the
    same iteration, and the cfr index, that is not
    always measured.

    Parameters
    ----------

    index: int
        Global iteration index for all parameters but cfr

    cfr_index: int
        Iteration index for cfr

    process: bool (Optional, default = True)
        By default, when setting the indices, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # The cfr index is not allowed to be smaller than the global iteration index
    if index < cfr_index:
        raise ValueError(
            "The value of index must be greater than or equal to cfr_index."
        )

    # Make sure there is loaded data
    if self.tot_num_entries == 0:
        raise ValueError("No data loaded.")

    if self._reps == -1:
        raise ValueError("No data loaded.")

    if len(self._valid_cfr) == 0:
        raise ValueError("No data loaded.")

    # Check that the arguments are compatible with the loaded data
    if index < 0 or index > self._reps - 1:
        raise ValueError(
            f"The value of index must be between 0 and {self._reps - 1}."
        )

    if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1:
        raise ValueError(
            f"The value of index must be between 0 and {len(self._valid_cfr) - 1}."
        )

    # Now set the general values
    self._efo_index = index
    self._dcr_index = index
    self._eco_index = index
    self._loc_index = index

    # Set the cfr index
    self._cfr_index = cfr_index

    # Constant indices
    self._tid_index: int = 0
    self._tim_index: int = 0
    self._vld_index: int = 0

    # Re-process the file? If the processed dataframe already exists,
    # the processing will take place anyway.
    if process or self._processed_dataframe is not None:
        self._processed_dataframe = self._process()

Set the parameter indices.

We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured.

Parameters

index : int
Global iteration index for all parameters but cfr
cfr_index : int
Iteration index for cfr
process : bool (Optional, default = True)
By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
def set_pool_dcr(self, pool_dcr: bool, process: bool = True)
Expand source code
def set_pool_dcr(self, pool_dcr: bool, process: bool = True):
    """
    Sets whether the DCR values should be pooled (and weighted by ECO).

    Parameters
    ----------
    pool_dcr: bool
        Whether the DCR values should be pooled (and weighted by ECO).

    process: bool (Optional, default = True)
        By default, when setting the DCR binning flag, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._pool_dcr = pool_dcr

    # Re-process the file?
    if process or self._processed_dataframe is not None:
        self._processed_dataframe = self._process()

Sets whether the DCR values should be pooled (and weighted by ECO).

Parameters

pool_dcr : bool
Whether the DCR values should be pooled (and weighted by ECO).
process : bool (Optional, default = True)
By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
def set_tracking(self, is_tracking: bool, process: bool = True)
Expand source code
def set_tracking(self, is_tracking: bool, process: bool = True):
    """Sets whether the acquisition is tracking or localization.

    Parameters
    ----------

    is_tracking: bool
        Set to True for a tracking acquisition, False for a localization
        acquisition.

    process: bool (Optional, default = True)
        By default, when setting the tracking flag, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._is_tracking = is_tracking

    # Re-process the file?
    if process or self._processed_dataframe is not None:
        self._processed_dataframe = self._process()

Sets whether the acquisition is tracking or localization.

Parameters

is_tracking : bool
Set to True for a tracking acquisition, False for a localization acquisition.
process : bool (Optional, default = True)
By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
class MinFluxReaderFactory
Expand source code
class MinFluxReaderFactory:
    __docs__ = "Factory for MinFluxReader version 1 or 2."

    @staticmethod
    def get_reader(filename: Union[Path, str]) -> (MinFluxReader, str):
        """Returns the appropriate reader class for the passed filename.

        Usage
        -----

        reader_class = MinFluxReaderFactory.get_reader(filename)  # One of MinFluxReader or MinFluxReaderV2
        reader = reader_class(filename, valid, z_scaling_factor, is_tracking, pool_dcr, dwell_time)

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx`, `.npy`, `.mat`, or '.json' file to read.

        Returns
        -------

        reader: MinFluxReader class
            Either version 1 or version 2 MinFluxReader. Version 2 MinFluxReader supports Imspector >=24.10.
        """

        # Check if the file exists
        filename = Path(filename)

        if not filename.exists():
            return None, f"{filename} does not exist."

        # If filename is a folder, we check for a valid Zarr file
        if filename.is_dir():
            if zarr.load(str(filename)) is not None:
                return MinFluxReaderV2, ""
            else:
                return None, f"{filename} is not a valid Zarr file."

        # Determine file type
        file_ext = filename.suffix.lower()

        # Check the file
        if file_ext == ".npy":
            reader_version = get_reader_version_for_npy_file(filename)
        elif file_ext == ".mat":
            reader_version = get_reader_version_for_mat_file(filename)
        elif file_ext == ".json":
            reader_version = 2
        elif file_ext == ".pmx":
            reader_version = get_reader_version_for_pmx_file(filename)
        else:
            return None, f"{filename} is not supported."

        # Return the requested reader
        if reader_version == 1:
            return MinFluxReader, ""
        elif reader_version == 2:
            return MinFluxReaderV2, ""
        elif reader_version == -1:
            # In case parsing the files failed, the returned reader_version would be 1.
            return None, f"Error processing file {filename}."
        else:
            # Unexpected version number
            return None, f"MinFluxReader version {reader_version} is not supported."

Static methods

def get_reader(filename: pathlib.Path | str) ‑> ()
Expand source code
@staticmethod
def get_reader(filename: Union[Path, str]) -> (MinFluxReader, str):
    """Returns the appropriate reader class for the passed filename.

    Usage
    -----

    reader_class = MinFluxReaderFactory.get_reader(filename)  # One of MinFluxReader or MinFluxReaderV2
    reader = reader_class(filename, valid, z_scaling_factor, is_tracking, pool_dcr, dwell_time)

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx`, `.npy`, `.mat`, or '.json' file to read.

    Returns
    -------

    reader: MinFluxReader class
        Either version 1 or version 2 MinFluxReader. Version 2 MinFluxReader supports Imspector >=24.10.
    """

    # Check if the file exists
    filename = Path(filename)

    if not filename.exists():
        return None, f"{filename} does not exist."

    # If filename is a folder, we check for a valid Zarr file
    if filename.is_dir():
        if zarr.load(str(filename)) is not None:
            return MinFluxReaderV2, ""
        else:
            return None, f"{filename} is not a valid Zarr file."

    # Determine file type
    file_ext = filename.suffix.lower()

    # Check the file
    if file_ext == ".npy":
        reader_version = get_reader_version_for_npy_file(filename)
    elif file_ext == ".mat":
        reader_version = get_reader_version_for_mat_file(filename)
    elif file_ext == ".json":
        reader_version = 2
    elif file_ext == ".pmx":
        reader_version = get_reader_version_for_pmx_file(filename)
    else:
        return None, f"{filename} is not supported."

    # Return the requested reader
    if reader_version == 1:
        return MinFluxReader, ""
    elif reader_version == 2:
        return MinFluxReaderV2, ""
    elif reader_version == -1:
        # In case parsing the files failed, the returned reader_version would be 1.
        return None, f"Error processing file {filename}."
    else:
        # Unexpected version number
        return None, f"MinFluxReader version {reader_version} is not supported."

Returns the appropriate reader class for the passed filename.

Usage

reader_class = MinFluxReaderFactory.get_reader(filename) # One of MinFluxReader or MinFluxReaderV2 reader = reader_class(filename, valid, z_scaling_factor, is_tracking, pool_dcr, dwell_time)

Parameters

filename : Union[Path, str]
Full path to the .pmx, .npy, .mat, or '.json' file to read.

Returns

reader : MinFluxReader class
Either version 1 or version 2 MinFluxReader. Version 2 MinFluxReader supports Imspector >=24.10.
class MinFluxReaderV2 (filename: pathlib.Path | str,
z_scaling_factor: float = 1.0,
is_tracking: bool = False,
pool_dcr: bool = False,
dwell_time: float = 1.0)
Expand source code
class MinFluxReaderV2(MinFluxReader):
    """Reader of MINFLUX data in `.npy`, `.mat` and `.json` Imspector m2410 files, and `.pmx` version 0.6.0 and newer."""

    def __init__(
        self,
        filename: Union[Path, str],
        z_scaling_factor: float = 1.0,
        is_tracking: bool = False,
        pool_dcr: bool = False,
        dwell_time: float = 1.0,
    ):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx`, `.npy` or `.mat` file to read

        z_scaling_factor: float (optional, default = 1.0)
            Refractive index mismatch correction factor to apply to the z coordinates.

        is_tracking: bool (optional, default = False)
            Whether the dataset comes from a tracking experiment; otherwise, it is considered as a
            localization experiment.

        pool_dcr: bool (optional, default = False)
            Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.

        dwell_time: float (optional, default 1.0)
            Dwell time in milliseconds.
        """

        # Version 2 does not use the _full_raw_data_array property, but uses the full dataframe instead
        self._full_raw_dataframe = None

        # Store beamline monitoring data is present
        self._mbm = None

        # Call the base constructor
        super().__init__(
            filename=filename,
            valid=True,  # Pass valid=True to the base class
            z_scaling_factor=z_scaling_factor,
            is_tracking=is_tracking,
            pool_dcr=pool_dcr,
            dwell_time=dwell_time,
        )

        # Delete the _full_raw_data_array property from version 1 (version 2 does NOT
        # store the raw data from Imspector, but the processed dataframes that is
        # derived from it).
        del self._full_raw_data_array

    @property
    def version(self) -> int:
        return 2

    @property
    def mbm_data(self):
        """Return the loaded beamline monitoring data."""
        return self._mbm_data

    @property
    def valid_full_raw_dataframe(self) -> Union[None, np.ndarray]:
        """Return the raw data."""
        if self.tot_num_entries == 0:
            return None
        return self._full_raw_dataframe[self._valid_entries].copy()

    @classmethod
    def processed_properties(cls) -> list:
        """Returns the properties read from the file that correspond to the processed dataframe column names."""
        return [
            "tid",
            "tim",
            "x",
            "y",
            "z",
            "efo",
            "cfr",
            "eco",
            "dcr",
            "dwell",
            "fluo",
            "fbg",
            "iid",  # Custom: iteration ID
        ]

    def _load(self) -> bool:
        """Load the file."""

        if not self._filename.exists():
            print(f"File {self._filename} does not exist.")
            return False

        # Reset stored TID offsets
        self._tid_offsets = []

        raw_dataframe = pd.DataFrame(
            columns=[
                "vld",
                "fnl",
                "bot",
                "eot",
                "sta",
                "tim",
                "tid",
                "gri",
                "thi",
                "sqi",
                "itr",
                "x",
                "y",
                "z",
                "lncx",
                "lncy",
                "lncz",
                "eco",
                "ecc",
                "efo",
                "efc",
                "cfr",
                "dcr",
                "fbg",
                "fluo",  # Custom: fluorophore ID
                "iid",  # Custom: iteration ID
            ]
        )

        # Do we have a  Zarr file?
        if self._filename.is_dir():
            # Create phony file_ext ".zarr" for the following logic
            file_ext = ".zarr"

        else:
            # Determine file type
            file_ext = self._filename.suffix.lower()

        # Call the specialized _load_*() function
        try:
            if file_ext == ".zarr":
                # Load and convert to NumPy
                raw_dataframe = self._load_zarr(raw_dataframe)
            elif file_ext == ".npy":
                raw_dataframe = self._load_numpy(raw_dataframe)
            elif file_ext == ".mat":
                raw_dataframe = self._load_mat(raw_dataframe)
            elif file_ext == ".json":
                raw_dataframe = self._load_json(raw_dataframe)
            elif file_ext == ".pmx":
                raw_dataframe = self._load_pmx(raw_dataframe)
                self._tid_offsets = PMXReader.get_tid_offsets(self._filename)
            else:
                print(f"Unexpected file {self._filename}.")
                return False
        except Exception as e:
            print(f"{e}")
            return False

        # Finalize the initialization for all imported file formats
        if file_ext in [".zarr", ".npy", ".mat", ".json"]:

            # Initialize the fluo field
            raw_dataframe.loc[:, "fluo"] = 1

            # **Important**: apply data types **after** creating the dataframe to make sure that
            # data coming from binary types (.npy and .mat) and data coming from text types (.json)
            # generate identical dataframes.
            data_full_df_dtype = {
                "vld": "?",
                "fnl": "?",
                "bot": "?",
                "eot": "?",
                "sta": "u1",
                "tim": "<f8",
                "tid": "<u4",
                "gri": "<u4",
                "thi": "u1",
                "sqi": "u1",
                "itr": "<i4",
                "x": "<f8",
                "y": "<f8",
                "z": "<f8",
                "lncx": "<f8",
                "lncy": "<f8",
                "lncz": "<f8",
                "eco": "<u4",
                "ecc": "<u4",
                "efo": "<f4",
                "efc": "<f4",
                "fbg": "<f4",
                "cfr": "<f2",
                "dcr": "<f2",
                "fluo": "u1",
                "iid": "<u4",
            }

            # Apply the iteration ID. A new iid is started if:
            # 1. Previous row had fnl=True OR
            # 2. Current row has a different tid than the previous row

            # Calculate tid differences
            tid_values = raw_dataframe["tid"].values
            tid_diff = np.zeros(len(tid_values), dtype=bool)
            tid_diff[1:] = (
                tid_values[1:] != tid_values[:-1]
            )  # Compare current with previous, skip first row

            # Calculate previous fnl flags
            prev_fnl = np.zeros(len(raw_dataframe), dtype=bool)
            prev_fnl[1:] = raw_dataframe["fnl"].values[:-1]  # Previous row's fnl

            # Combine conditions - either previous row had fnl=True or current row has new tid
            new_iid = np.logical_or(prev_fnl, tid_diff)

            # Calculate iid by cumulative sum of these indicators, adding 1 for the first group
            raw_dataframe.loc[:, "iid"] = new_iid.cumsum() + 1

            # Apply the correct datatypes to the columns
            raw_dataframe = raw_dataframe.astype(data_full_df_dtype)

        # Assign the new dataframe
        self._full_raw_dataframe = raw_dataframe

        # Store a logical array with the valid entries
        self._valid_entries = self._full_raw_dataframe["vld"]
        if not np.all(self._get_valid_subset()):
            print("All entries at this stage must be valid!")
            return False

        # Cache whether the data is 2D or 3D and whether is aggregated
        z_values = self._full_raw_dataframe[self._valid_entries]["z"].to_numpy()
        self._is_3d = np.abs(z_values).max() > 1e-11

        # Set all relevant indices
        self._set_all_indices()

        # In case of a Zarr file, try loading beamline monitoring data
        if file_ext == ".zarr":
            self._load_mbm()

        # Return success
        return True

    def _load_mbm(self):
        """Load beamline monitoring data if present."""
        # Make sure that self._filename points to the root of the Zarr file
        self._filename = find_zarr_root(self._filename)

        # Initialize dictionary
        mbm_data = {"mbm": {}}

        # Read grd/mbm
        mbm_points = zarr.load(str(self._filename / "grd" / "mbm" / "points"))
        mbm = zarr.load(str(self._filename / "mbm"))
        grd_mbm = zarr.load(str(self._filename / "grd" / "mbm"))
        if mbm_points is None or mbm is None or grd_mbm is None:
            print(f"No beamline monitoring data found in {self._filename}.")
            self._mbm_data = mbm_data
            return

        mbm_gri = grd_mbm.grp.points.attrs["points_by_gri"]
        mbm_neighbourhood = mbm.grp.attrs["neighbourhood"]
        
        # Get list of used beads from mbm attributes
        # https://wiki.abberior.rocks/MINFLUX_Files_and_Data#MBM_Information
        used_beads = mbm.grp.attrs.get("used", [])

        num_beads = 0
        num_used_beads = 0
        for key in mbm_gri:
            bead_name = mbm_gri[key]["name"]
            pts = mbm_points[mbm_points["gri"] == int(key)]
            bead_data = {"bead_name": bead_name, "gri": key, "used": 0, "points": pts}
            if bead_name in used_beads:
                bead_data["used"] = 1
                num_used_beads += 1
            mbm_data["mbm"][bead_name] = bead_data
            num_beads += 1

        # Add mbm_neighbourhood information
        mbm_data["mbm_neighborhood"] = mbm_neighbourhood

        # Store the loaded information
        self._mbm_data = mbm_data

        print(
            f"Read {num_beads} "
            f"{'beads' if num_beads != 1 else 'bead'} ({num_used_beads} "
            f"used)."
        )

    def _load_zarr(self, df: pd.DataFrame):
        """Load the Zarr file and update the dataframe."""

        # Make sure that self._filename points to the root of the Zarr file
        self._filename = find_zarr_root(self._filename)

        # Path to "mfx"
        filename = self._filename / "mfx"
        if not filename.is_dir():
            print("Could not open the Zarr file.")
            return None

        # Load array
        npy_array = np.array(zarr.load(str(filename)))
        if npy_array is None:
            print("Could not open the Zarr file.")
            return None

        # Drop all invalid entries
        npy_array = npy_array[npy_array["vld"]]

        # Fill the dataframe
        for name in npy_array.dtype.names:
            if name == "dcr":
                # In version 2, the dcr is 2D: dcr[:, 0] corresponds to the dcr of
                # version 1, while dcr[:, 1] is just 1.0 - dcr[:, 0]. We drop the
                # second dimension.
                df["dcr"] = npy_array["dcr"][:, 0]
                continue

            # Special cases
            if name == "loc":
                df["x"] = npy_array["loc"][:, 0]
                df["y"] = npy_array["loc"][:, 1]
                df["z"] = npy_array["loc"][:, 2]
                continue

            if name == "lnc":
                df["lncx"] = npy_array["lnc"][:, 0]
                df["lncy"] = npy_array["lnc"][:, 1]
                df["lncz"] = npy_array["lnc"][:, 2]
                continue

            # Single arrays
            df[name] = npy_array[name]

        # Incomplete traces are kept in the Zarr file; we drop them before
        # building the clean dataframe.
        thresh = int(np.max(df["itr"]) + 1)
        df = df[df.groupby("tid")["tid"].transform("size") >= thresh]

        return df

    def _load_numpy(self, df: pd.DataFrame):
        """Load the NumPy file and update the dataframe."""
        try:
            # Load array
            npy_array = np.load(str(self._filename), allow_pickle=False)

        except (
            OSError,
            UnpicklingError,
            ValueError,
            EOFError,
            FileNotFoundError,
            TypeError,
            Exception,
        ) as e:
            raise Exception(f"Could not open {self._filename}: {e}")

        # Drop all invalid entries
        npy_array = npy_array[npy_array["vld"]]

        # Fill the dataframe
        for name in npy_array.dtype.names:
            if name == "dcr":
                # In version 2, the dcr is 2D: dcr[:, 0] corresponds to the dcr of
                # version 1, while dcr[:, 1] is just 1.0 - dcr[:, 0]. We drop the
                # second dimension.
                df["dcr"] = npy_array["dcr"][:, 0]
                continue

            # Special cases
            if name == "loc":
                df["x"] = npy_array["loc"][:, 0]
                df["y"] = npy_array["loc"][:, 1]
                df["z"] = npy_array["loc"][:, 2]
                continue

            if name == "lnc":
                df["lncx"] = npy_array["lnc"][:, 0]
                df["lncy"] = npy_array["lnc"][:, 1]
                df["lncz"] = npy_array["lnc"][:, 2]
                continue

            # Single arrays
            df[name] = npy_array[name]

        return df

    def _load_mat(self, df: pd.DataFrame):
        """Load the MAT file and update the dataframe."""
        # Load .mat file
        try:
            mat_array = loadmat(str(self._filename))
        except (FileNotFoundError, ValueError) as e:
            raise Exception(f"Could not open {self._filename}: {e}")

        # Fill the dataframe
        for key in mat_array.keys():
            if key in ["__header__", "__version__", "__globals__"]:
                continue

            if key == "dcr":
                # In version 2, the dcr is 2D: dcr[:, 0] corresponds to the dcr of
                # version 1, while dcr[:, 1] is just 1.0 - dcr[:, 0]. We drop the
                # second dimension.
                df["dcr"] = mat_array["dcr"][:, 0]
                continue

            # Special cases
            if key == "loc":
                df["x"] = mat_array["loc"][:, 0]
                df["y"] = mat_array["loc"][:, 1]
                df["z"] = mat_array["loc"][:, 2]
                continue

            if key == "lnc":
                df["lncx"] = mat_array["lnc"][:, 0]
                df["lncy"] = mat_array["lnc"][:, 1]
                df["lncz"] = mat_array["lnc"][:, 2]
                continue

            # Single arrays
            df[key] = mat_array[key].ravel()

        # Only keep valid entries
        df = df[df["vld"] == 1]

        return df

    def _load_json(self, df: pd.DataFrame):
        """Load the JSON file and update the dataframe."""

        # Load array
        try:
            with open(str(self._filename), "r", encoding="utf-8") as f:
                json_array = json.load(f)
        except (
            FileNotFoundError,
            UnicodeDecodeError,
            JSONDecodeError,
            Exception,
        ) as e:
            raise Exception(f"Could not open {self._filename}: {e}")

        # Create a dictionary of empty lists and keys matching the loaded ones
        dict_keys = list(json_array[0].keys()) + ["x", "y", "z", "lncx", "lncy", "lncz"]
        d = {key: [] for key in dict_keys}
        del d["loc"]
        del d["lnc"]
        for entry in json_array:
            for key in entry:
                if key == "dcr":
                    # In version 2, the dcr is 2D: dcr[:, 0] corresponds to the dcr of
                    # version 1, while dcr[:, 1] is just 1.0 - dcr[:, 0]. We drop the
                    # second dimension.
                    d["dcr"].append(entry["dcr"][0])
                    continue

                    # Special cases
                if key == "loc":
                    d["x"].append(entry["loc"][0])
                    d["y"].append(entry["loc"][1])
                    d["z"].append(entry["loc"][2])
                    continue

                if key == "lnc":
                    d["lncx"].append(entry["lnc"][0])
                    d["lncy"].append(entry["lnc"][1])
                    d["lncz"].append(entry["lnc"][2])
                    continue

                d[key].append(entry[key])

        # Fill dataframe
        for key in d:
            df[key] = d[key]

        # Only keep valid entries
        df = df[df["vld"]]

        return df

    def _load_pmx(self, df: pd.DataFrame):
        """Load the PMX file and update the dataframe."""
        # Read filtered dataframe
        df = PMXReader.get_dataframe(self._filename)

        return df

    def _set_all_indices(self):
        """Set indices of properties to be read."""
        if self.num_valid_entries == 0:
            return False

        # Number of iterations
        self._reps = int(np.max(self._full_raw_dataframe["itr"]) + 1)

        # Is this an aggregated acquisition?
        self._is_aggregated = self._reps == 1

        # Query the data to find the last valid iteration
        # for all measurements
        try:
            last_valid = find_last_valid_iteration_v2(
                self._full_raw_dataframe, num_iterations=self._reps
            )
        except ValueError as e:
            print(f"[ERROR] {e}")
            return False

        # Set the extracted indices
        self._efo_index = last_valid["efo_index"]
        self._cfr_index = last_valid["cfr_index"]
        self._dcr_index = last_valid["dcr_index"]
        self._eco_index = last_valid["eco_index"]
        self._loc_index = last_valid["loc_index"]
        self._valid_cfr = last_valid["valid_cfr"]
        self._relocalizations = last_valid["reloc"]

        # Keep track of the last valid iteration
        self._last_valid = len(self._valid_cfr) - 1
        self._last_valid_cfr = last_valid["cfr_index"]

    def _extend_array_with_prepend(self, arr: np.array, n: int):
        """
        Extends the input sorted NumPy array by prepending `n` consecutive values before each element.
        Elements where the gap from the previous kept element is <= `n` are discarded.

        Parameters
        ----------

        arr: np.ndarray
            Sorted 1D NumPy array of integers.

        n: int
            Number of consecutive values to prepend before each element.

        Returns
        -------

        ext_arr: np.ndarray
            Extended array with new values prepended.
        """

        # Compute differences between consecutive elements
        diffs = np.diff(arr)

        # The first element is always kept
        keep_mask = np.concatenate(([True], diffs > n))

        # Select elements to keep
        kept_elements = arr[keep_mask]

        # Generate new values for each kept element
        # For each element x in kept_elements, generate x-n, x-(n-1), ..., x-1
        prepend_offsets = np.arange(n, 0, -1)
        new_values = (
            kept_elements[:, np.newaxis] - prepend_offsets
        )  # Shape: (num_kept, n)

        # Flatten the new_values array
        new_values = new_values.flatten()

        # Combine new values with the kept elements
        combined = np.concatenate((new_values, kept_elements))

        # Remove any potential duplicates and ensure the array is sorted
        extended_array = np.unique(combined)

        return extended_array

    def _get_valid_subset(self):
        """Returns the valid subset of the full dataframe from which to
        extract the requested iteration data."""

        # MinFluxReaderV2 only works with valid entries
        val_indices = self._valid_entries

        # Valid
        data_valid_df = self._full_raw_dataframe.loc[val_indices]

        # Here we have to use different logic for tracking vs. localization
        # acquisitions. Tracking (and potentially other custom sequences)
        # only have one cfr value per trace id.
        condition = (data_valid_df["itr"].eq(self._cfr_index)).groupby(
            data_valid_df["tid"]
        ).sum() == 1
        one_cfr_per_tid = np.all(condition)

        if one_cfr_per_tid:
            # Traces that only have one cfr in the first localization (and then not
            # measured anymore) are a special case of incomplete iterations. In this
            # case, we do not want to drop them: to make them valid, we copy the cfr
            # value from the first iterations to all subsequent localizations.
            indices = data_valid_df.index[
                (data_valid_df["itr"] == self._cfr_index)
                | (data_valid_df["itr"] == self._loc_index)
            ].to_numpy()
        else:
            # For localizations, we preserve only all those iterations that have a full set
            # from the cfr iteration to the localized iteration: but for those we make sure
            # to have all relocalizations to support dcr pooling
            indices = data_valid_df.index[
                data_valid_df["itr"] == self._loc_index
            ].to_numpy()
            if self._pool_dcr:
                start_index = (
                    self._loc_index
                    - np.sum(self.relocalizations[: self._loc_index + 1])
                    + 1
                )
            else:
                start_index = self._cfr_index
            num_rows = self._loc_index - start_index
            if num_rows > 0:
                indices = self._extend_array_with_prepend(indices, num_rows)

            # @TODO DEBUG: remove when properly tested
            # assert np.all(np.unique(data_valid_df.iloc[indices]["itr"]) == np.arange(self._cfr_index, self._loc_index + 1))

        return indices

    def _process(self) -> Union[None, pd.DataFrame]:
        """Returns processed dataframe for valid (or invalid) entries.

        Returns
        -------

        df: pd.DataFrame
            Processed data as DataFrame.
        """

        # Do we have a data array to work on?
        if self.tot_num_entries == 0:
            return None

        # Get valid subset
        valid_subset = self._get_valid_subset()
        data_valid_df = self._full_raw_dataframe.loc[valid_subset]

        # Extract the iteration IDs
        iid = data_valid_df["iid"].to_numpy()

        # Extract the valid iterations
        itr = data_valid_df["itr"].to_numpy()

        # Extract the valid identifiers
        tid = data_valid_df["tid"].to_numpy()

        # Extract the valid time points
        tim = data_valid_df["tim"].to_numpy()

        # Extract the fluorophore IDs
        fluo = data_valid_df["fluo"].to_numpy()
        if np.all(fluo) == 0:
            fluo = np.ones(fluo.shape, dtype=fluo.dtype)

        # The following extraction pattern will change whether the
        # acquisition is normal or aggregated
        if self.is_aggregated:
            # Extract the locations
            x = data_valid_df["x"].to_numpy()
            y = data_valid_df["y"].to_numpy()
            z = data_valid_df["z"].to_numpy()
            z *= self._z_scaling_factor

            # Extract EFO
            efo = data_valid_df["efo"].to_numpy()

            # Extract CFR
            cfr = data_valid_df["cfr"].to_numpy()

            # Extract ECO
            eco = data_valid_df["eco"].to_numpy()

            # Extract DCR
            dcr = data_valid_df["dcr"].to_numpy()

            # Extract the background
            fbg = data_valid_df["fbg"].to_numpy()

            # Dwell
            dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0)

        else:
            # In contrast to version 1 of the reader and of the Imspector file formats, we now extract
            # by value and not by index!

            # Extract the iteration IDs
            iid = iid[itr == self._loc_index]

            # Trace IDs
            tid = tid[itr == self._loc_index]

            # Extract the valid time points
            tim = tim[itr == self._loc_index]

            # Extract the locations
            loc = (
                data_valid_df[["x", "y", "z"]][itr == self._loc_index]
                * self._unit_scaling_factor
            )
            loc["z"] *= self._z_scaling_factor
            x = loc["x"].to_numpy()
            y = loc["y"].to_numpy()
            z = loc["z"].to_numpy()

            # Extract EFO
            efo = data_valid_df["efo"][itr == self._efo_index].to_numpy()

            # Extract CFR (conditional to the presence of the last loc)
            cfr = data_valid_df["cfr"][itr == self._cfr_index].to_numpy()
            if len(cfr) < len(tid):
                # This is the (tracking) case where the cfr value is
                # stored from a non-relocalized iteration. Moreover,
                # this cfr is only measured for the first, complete,
                # sequence of the trace.
                _, counts = np.unique(tid, return_counts=True)
                cfr = np.repeat(cfr, counts)

            # Extract ECO
            eco = data_valid_df["eco"][itr == self._eco_index].to_numpy()

            # Extract the background
            fbg = data_valid_df["fbg"][itr == self._loc_index].to_numpy()

            # Fluorophore
            fluo = data_valid_df["fluo"][itr == self._loc_index].to_numpy()

            # Pool DCR values?
            num_relocs = int(np.sum(self._relocalizations[: self._loc_index + 1]))
            if self._pool_dcr and num_relocs > 1:

                # Calculate ECO contributions
                eco_all = data_valid_df["eco"].to_numpy().reshape(-1, num_relocs)
                eco_sum = eco_all.sum(axis=1)
                eco_all_norm = eco_all / eco_sum.reshape(-1, 1)

                # Extract DCR values and weigh them by the relative ECO contributions
                dcr = data_valid_df["dcr"].to_numpy().reshape(-1, num_relocs)
                dcr = dcr * eco_all_norm
                dcr = dcr.sum(axis=1)

            else:

                # Extract DCR
                dcr = data_valid_df["dcr"][itr == self._dcr_index].to_numpy()

            # Calculate dwell
            dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0)

        # Create a Pandas dataframe for the results (make sure to use properties
        # from the V2 reader
        df = pd.DataFrame(
            index=pd.RangeIndex(start=0, stop=len(tid)),
            columns=self.processed_properties(),
        )

        # Store the extracted valid hits into the dataframe
        df["tid"] = tid
        df["x"] = x
        df["y"] = y
        df["z"] = z
        df["tim"] = tim
        df["efo"] = efo
        df["cfr"] = cfr
        df["eco"] = eco
        df["dcr"] = dcr
        df["dwell"] = dwell
        df["fbg"] = fbg
        df["fluo"] = fluo
        df["iid"] = iid

        # Check if the selected indices correspond to the last valid iteration
        self._is_last_valid = bool(
            self._cfr_index == self._last_valid_cfr
            and self._efo_index == self._last_valid
        )

        return df

Reader of MINFLUX data in .npy, .mat and .json Imspector m2410 files, and .pmx version 0.6.0 and newer.

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx, .npy or .mat file to read
z_scaling_factor : float (optional, default = 1.0)
Refractive index mismatch correction factor to apply to the z coordinates.
is_tracking : bool (optional, default = False)
Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment.
pool_dcr : bool (optional, default = False)
Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.
dwell_time : float (optional, default 1.0)
Dwell time in milliseconds.

Ancestors

  • pyminflux.reader._reader.MinFluxReader

Static methods

def processed_properties() ‑> list

Returns the properties read from the file that correspond to the processed dataframe column names.

Instance variables

prop mbm_data
Expand source code
@property
def mbm_data(self):
    """Return the loaded beamline monitoring data."""
    return self._mbm_data

Return the loaded beamline monitoring data.

prop valid_full_raw_dataframe : numpy.ndarray | None
Expand source code
@property
def valid_full_raw_dataframe(self) -> Union[None, np.ndarray]:
    """Return the raw data."""
    if self.tot_num_entries == 0:
        return None
    return self._full_raw_dataframe[self._valid_entries].copy()

Return the raw data.

prop version : int
Expand source code
@property
def version(self) -> int:
    return 2
class PMXReader
Expand source code
class PMXReader:
    """Reader of (processed) MINFLUX from native `.pmx` format."""

    @staticmethod
    def get_metadata(filename) -> Union[PMXMetadata, None]:
        """Reads metadata information from `.pmx` files."""

        # Open the file
        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            # First, check that the version is known
            if file_version not in ["1.0", "2.0", "3.0"]:
                return None

            # Convert version string to number
            version_int = version_str_to_int(file_version)

            # Initialize parameters (for versions above version 1.0)
            tr_len_thresholds = None
            time_thresholds = None
            dwell_time = 1.0
            is_tracking = False
            pool_dcr = False
            scale_bar_size = 500

            # Version 1.0 parameters
            if version_int > 0:

                try:
                    z_scaling_factor = float(f["parameters/z_scaling_factor"][()])
                except KeyError:
                    return None

                try:
                    min_trace_length = int(f["parameters/min_trace_length"][()])
                except KeyError:
                    return None

                try:
                    efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:])
                except KeyError as e:
                    efo_thresholds = None
                try:
                    cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:])
                except KeyError as e:
                    cfr_thresholds = None

                try:
                    num_fluorophores = int(f["parameters/num_fluorophores"][()])
                except KeyError:
                    return None

            # Version 2.0 parameters
            if version_int > 10000:
                # Parameters are present in the file, and we can read them

                try:
                    # This setting can be missing
                    tr_len_thresholds = tuple(
                        f["parameters/applied_tr_len_thresholds"][:]
                    )
                except KeyError as e:
                    tr_len_thresholds = None

                try:
                    dwell_time = float(f["parameters/dwell_time"][()])
                except KeyError as e:
                    return None

                try:
                    # This setting can be missing
                    time_thresholds = tuple(f["parameters/applied_time_thresholds"][:])
                except KeyError as e:
                    time_thresholds = None

                # HDF5 does not have a native boolean type, so we save as int8 and convert it
                # back to boolean on read.
                try:
                    is_tracking = bool(f["parameters/is_tracking"][()])
                except KeyError as e:
                    return None

                try:
                    pool_dcr = bool(f["parameters/pool_dcr"][()])
                except KeyError as e:
                    # This is an addendum to version 2.0, and we allow it to be missing.
                    # It will fall back to False.
                    pool_dcr = False

                try:
                    scale_bar_size = float(f["parameters/scale_bar_size"][()])
                except KeyError as e:
                    return None

            # Version 3.0 parameters
            # No new parameters

        # Store and return
        metadata = PMXMetadata(
            pool_dcr=pool_dcr,
            cfr_thresholds=cfr_thresholds,
            dwell_time=dwell_time,
            efo_thresholds=efo_thresholds,
            is_tracking=is_tracking,
            min_trace_length=min_trace_length,
            num_fluorophores=num_fluorophores,
            scale_bar_size=scale_bar_size,
            time_thresholds=time_thresholds,
            tr_len_thresholds=tr_len_thresholds,
            z_scaling_factor=z_scaling_factor,
        )

        return metadata

    @staticmethod
    def get_fluorophore_names(filename: Union[Path, str]) -> dict:
        """Read fluorophore names from `.pmx` files.
        
        Parameters
        ----------
        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
            
        Returns
        -------
        fluorophore_names: dict
            Dictionary mapping fluo_id (int) to name (str).
            Returns empty dict if fluorophore names are not present (backwards compatibility).
        """
        try:
            with h5py.File(filename, "r") as f:
                # Try to read fluorophore names from parameters group
                if "parameters" in f and "fluorophore_names" in f["parameters"].attrs:
                    import json
                    names_json = f["parameters"].attrs["fluorophore_names"]
                    # Parse JSON and convert keys back to integers
                    names_dict = json.loads(names_json)
                    return {int(k): v for k, v in names_dict.items()}
        except Exception:
            pass
        
        # Return empty dict for backwards compatibility (will use default string representation)
        return {}

    @staticmethod
    def get_tid_offsets(filename: Union[Path, str]) -> list:
        """Read TID offset mapping from `.pmx` files.
        
        Returns
        -------
        tid_offsets: list
            List of (first_iid, tid_offset) tuples. Returns empty list if not present.
        """
        try:
            with h5py.File(filename, "r") as f:
                if "parameters" in f and "tid_offsets" in f["parameters"].attrs:
                    import json
                    offsets_json = f["parameters"].attrs["tid_offsets"]
                    if isinstance(offsets_json, bytes):
                        offsets_json = offsets_json.decode("utf-8")
                    offsets = json.loads(offsets_json)
                    tid_offsets = []
                    for entry in offsets:
                        if isinstance(entry, dict):
                            first_iid = int(entry.get("first_iid", 0))
                            tid_offset = int(entry.get("tid_offset", 0))
                        else:
                            # Fallback for list/tuple format
                            if len(entry) < 2:
                                continue
                            first_iid = int(entry[0])
                            tid_offset = int(entry[1])
                        tid_offsets.append((first_iid, tid_offset))
                    return tid_offsets
        except Exception:
            pass
        
        return []

    @staticmethod
    def get_dataframe(filename: Union[Path, str]):
        """Return the full dataframe.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version != "3.0":
                return None

            # Read the reader_version attribute: it must be 2
            reader_version = f.attrs["reader_version"]
            if reader_version != 2:
                raise ValueError("`reader_version` must be 2.")

            #
            # Read raw dataset
            #
            dataset = f["/raw/df"]

            # Read the NumPy data
            data_array = dataset[:]

            # Read column names
            column_names = dataset.attrs["column_names"]

            # Read column data types
            column_types = dataset.attrs["column_types"]

            # Read the index
            index_data = f["/raw/df_index"][:]

            # Create DataFrame with specified columns
            df = pd.DataFrame(data_array, index=index_data, columns=column_names)

            # Apply column data types
            for col, dtype in zip(column_names, column_types):
                df[col] = df[col].astype(dtype)

            return df

    @staticmethod
    def get_filtered_dataframe(filename: Union[Path, str]):
        """Reads the Pandas DataFrame from `.pmx` files versions 1.0, 2.0, and 3.0.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version not in ["1.0", "2.0", "3.0"]:
                return None

            # Read dataset
            dataset = f["/paraview/dataframe"]

            # Read the NumPy data
            data_array = dataset[:]

            # Read column names
            column_names = dataset.attrs["column_names"]

            # Read column data types
            column_types = dataset.attrs["column_types"]

            # Read the index
            index_data = f["/paraview/dataframe_index"][:]

            # Create DataFrame with specified columns
            df = pd.DataFrame(data_array, index=index_data, columns=column_names)

            # Apply column data types
            for col, dtype in zip(column_names, column_types):
                df[col] = df[col].astype(dtype)

        return df

    @staticmethod
    def get_array(filename: Union[Path, str]):
        """Returns the raw Numpy array (filtered). This applies to:

        - pmx files version 1.0, 2.0
        - pmx files version 3.0 with reader version 1

        pmx files version 3.0 with reader version 2 only store the (filtered) raw dataframe.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        # Open the file and read the data
        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version not in ["1.0", "2.0", "3.0"]:
                return None

            if file_version == "3.0":
                reader_version = f.attrs["reader_version"]
                if reader_version == 1:
                    data_array = f["raw/npy"][:]
                else:
                    return None
            else:
                # We only read the raw NumPy array
                data_array = f["raw/npy"][:]

        return data_array

Reader of (processed) MINFLUX from native .pmx format.

Static methods

def get_array(filename: pathlib.Path | str)
Expand source code
@staticmethod
def get_array(filename: Union[Path, str]):
    """Returns the raw Numpy array (filtered). This applies to:

    - pmx files version 1.0, 2.0
    - pmx files version 3.0 with reader version 1

    pmx files version 3.0 with reader version 2 only store the (filtered) raw dataframe.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    # Open the file and read the data
    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version not in ["1.0", "2.0", "3.0"]:
            return None

        if file_version == "3.0":
            reader_version = f.attrs["reader_version"]
            if reader_version == 1:
                data_array = f["raw/npy"][:]
            else:
                return None
        else:
            # We only read the raw NumPy array
            data_array = f["raw/npy"][:]

    return data_array

Returns the raw Numpy array (filtered). This applies to:

  • pmx files version 1.0, 2.0
  • pmx files version 3.0 with reader version 1

pmx files version 3.0 with reader version 2 only store the (filtered) raw dataframe.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
def get_dataframe(filename: pathlib.Path | str)
Expand source code
@staticmethod
def get_dataframe(filename: Union[Path, str]):
    """Return the full dataframe.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version != "3.0":
            return None

        # Read the reader_version attribute: it must be 2
        reader_version = f.attrs["reader_version"]
        if reader_version != 2:
            raise ValueError("`reader_version` must be 2.")

        #
        # Read raw dataset
        #
        dataset = f["/raw/df"]

        # Read the NumPy data
        data_array = dataset[:]

        # Read column names
        column_names = dataset.attrs["column_names"]

        # Read column data types
        column_types = dataset.attrs["column_types"]

        # Read the index
        index_data = f["/raw/df_index"][:]

        # Create DataFrame with specified columns
        df = pd.DataFrame(data_array, index=index_data, columns=column_names)

        # Apply column data types
        for col, dtype in zip(column_names, column_types):
            df[col] = df[col].astype(dtype)

        return df

Return the full dataframe.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
def get_filtered_dataframe(filename: pathlib.Path | str)
Expand source code
@staticmethod
def get_filtered_dataframe(filename: Union[Path, str]):
    """Reads the Pandas DataFrame from `.pmx` files versions 1.0, 2.0, and 3.0.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version not in ["1.0", "2.0", "3.0"]:
            return None

        # Read dataset
        dataset = f["/paraview/dataframe"]

        # Read the NumPy data
        data_array = dataset[:]

        # Read column names
        column_names = dataset.attrs["column_names"]

        # Read column data types
        column_types = dataset.attrs["column_types"]

        # Read the index
        index_data = f["/paraview/dataframe_index"][:]

        # Create DataFrame with specified columns
        df = pd.DataFrame(data_array, index=index_data, columns=column_names)

        # Apply column data types
        for col, dtype in zip(column_names, column_types):
            df[col] = df[col].astype(dtype)

    return df

Reads the Pandas DataFrame from .pmx files versions 1.0, 2.0, and 3.0.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
def get_fluorophore_names(filename: pathlib.Path | str) ‑> dict
Expand source code
@staticmethod
def get_fluorophore_names(filename: Union[Path, str]) -> dict:
    """Read fluorophore names from `.pmx` files.
    
    Parameters
    ----------
    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
        
    Returns
    -------
    fluorophore_names: dict
        Dictionary mapping fluo_id (int) to name (str).
        Returns empty dict if fluorophore names are not present (backwards compatibility).
    """
    try:
        with h5py.File(filename, "r") as f:
            # Try to read fluorophore names from parameters group
            if "parameters" in f and "fluorophore_names" in f["parameters"].attrs:
                import json
                names_json = f["parameters"].attrs["fluorophore_names"]
                # Parse JSON and convert keys back to integers
                names_dict = json.loads(names_json)
                return {int(k): v for k, v in names_dict.items()}
    except Exception:
        pass
    
    # Return empty dict for backwards compatibility (will use default string representation)
    return {}

Read fluorophore names from .pmx files.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.

Returns

fluorophore_names : dict
Dictionary mapping fluo_id (int) to name (str). Returns empty dict if fluorophore names are not present (backwards compatibility).
def get_metadata(filename) ‑> pyminflux.reader.metadata._metadata.PMXMetadata | None
Expand source code
@staticmethod
def get_metadata(filename) -> Union[PMXMetadata, None]:
    """Reads metadata information from `.pmx` files."""

    # Open the file
    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        # First, check that the version is known
        if file_version not in ["1.0", "2.0", "3.0"]:
            return None

        # Convert version string to number
        version_int = version_str_to_int(file_version)

        # Initialize parameters (for versions above version 1.0)
        tr_len_thresholds = None
        time_thresholds = None
        dwell_time = 1.0
        is_tracking = False
        pool_dcr = False
        scale_bar_size = 500

        # Version 1.0 parameters
        if version_int > 0:

            try:
                z_scaling_factor = float(f["parameters/z_scaling_factor"][()])
            except KeyError:
                return None

            try:
                min_trace_length = int(f["parameters/min_trace_length"][()])
            except KeyError:
                return None

            try:
                efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:])
            except KeyError as e:
                efo_thresholds = None
            try:
                cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:])
            except KeyError as e:
                cfr_thresholds = None

            try:
                num_fluorophores = int(f["parameters/num_fluorophores"][()])
            except KeyError:
                return None

        # Version 2.0 parameters
        if version_int > 10000:
            # Parameters are present in the file, and we can read them

            try:
                # This setting can be missing
                tr_len_thresholds = tuple(
                    f["parameters/applied_tr_len_thresholds"][:]
                )
            except KeyError as e:
                tr_len_thresholds = None

            try:
                dwell_time = float(f["parameters/dwell_time"][()])
            except KeyError as e:
                return None

            try:
                # This setting can be missing
                time_thresholds = tuple(f["parameters/applied_time_thresholds"][:])
            except KeyError as e:
                time_thresholds = None

            # HDF5 does not have a native boolean type, so we save as int8 and convert it
            # back to boolean on read.
            try:
                is_tracking = bool(f["parameters/is_tracking"][()])
            except KeyError as e:
                return None

            try:
                pool_dcr = bool(f["parameters/pool_dcr"][()])
            except KeyError as e:
                # This is an addendum to version 2.0, and we allow it to be missing.
                # It will fall back to False.
                pool_dcr = False

            try:
                scale_bar_size = float(f["parameters/scale_bar_size"][()])
            except KeyError as e:
                return None

        # Version 3.0 parameters
        # No new parameters

    # Store and return
    metadata = PMXMetadata(
        pool_dcr=pool_dcr,
        cfr_thresholds=cfr_thresholds,
        dwell_time=dwell_time,
        efo_thresholds=efo_thresholds,
        is_tracking=is_tracking,
        min_trace_length=min_trace_length,
        num_fluorophores=num_fluorophores,
        scale_bar_size=scale_bar_size,
        time_thresholds=time_thresholds,
        tr_len_thresholds=tr_len_thresholds,
        z_scaling_factor=z_scaling_factor,
    )

    return metadata

Reads metadata information from .pmx files.

def get_tid_offsets(filename: pathlib.Path | str) ‑> list
Expand source code
@staticmethod
def get_tid_offsets(filename: Union[Path, str]) -> list:
    """Read TID offset mapping from `.pmx` files.
    
    Returns
    -------
    tid_offsets: list
        List of (first_iid, tid_offset) tuples. Returns empty list if not present.
    """
    try:
        with h5py.File(filename, "r") as f:
            if "parameters" in f and "tid_offsets" in f["parameters"].attrs:
                import json
                offsets_json = f["parameters"].attrs["tid_offsets"]
                if isinstance(offsets_json, bytes):
                    offsets_json = offsets_json.decode("utf-8")
                offsets = json.loads(offsets_json)
                tid_offsets = []
                for entry in offsets:
                    if isinstance(entry, dict):
                        first_iid = int(entry.get("first_iid", 0))
                        tid_offset = int(entry.get("tid_offset", 0))
                    else:
                        # Fallback for list/tuple format
                        if len(entry) < 2:
                            continue
                        first_iid = int(entry[0])
                        tid_offset = int(entry[1])
                    tid_offsets.append((first_iid, tid_offset))
                return tid_offsets
    except Exception:
        pass
    
    return []

Read TID offset mapping from .pmx files.

Returns

tid_offsets : list
List of (first_iid, tid_offset) tuples. Returns empty list if not present.