Module pyminflux.reader

Reader of MINFLUX data.

Expand source code
#  Copyright (c) 2022 - 2024 D-BSSE, ETH Zurich.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

__doc__ = "Reader of MINFLUX data."
__all__ = [
    "NativeMetadataReader",
    "NativeArrayReader",
    "NativeDataFrameReader",
    "MinFluxReader",
    "MSRReader",
]

from ._msr_reader import MSRReader
from ._native_reader import (
    NativeArrayReader,
    NativeDataFrameReader,
    NativeMetadataReader,
)
from ._reader import MinFluxReader

Sub-modules

pyminflux.reader.metadata
pyminflux.reader.util

Classes

class MSRReader (filename: Union[pathlib.Path, str])

Reads data and metadata information from .MSR (OBF format) files.

For documentation, see: https://imspectordocs.readthedocs.io/en/latest/fileformat.html#the-obf-file-format

Note: binary data is stored in little-endian order.

Constructor.

Parameters

filename : Union[Path, str]
Full path to the file name to open.
Expand source code
class MSRReader:
    """Reads data and metadata information from `.MSR` (OBF format) files.

    For documentation, see:
    https://imspectordocs.readthedocs.io/en/latest/fileformat.html#the-obf-file-format

    Note: binary data is stored in little-endian order.
    """

    def __init__(self, filename: Union[Path, str]):
        """Constructor.

        Parameters
        ----------
        filename: Union[Path, str]
            Full path to the file name to open.
        """

        # Store the filename
        self.filename = Path(filename)

        # File header
        self.obf_file_header = OBFFileHeader()

        # Metadata
        self.obf_file_metadata = OBFFileMetadata()

        # List of stack metadata objects
        self._obf_stacks_list: list[OBFStackMetadata] = []

    def scan(self) -> bool:
        """Scan the metadata of the file.

        Returns
        -------

        success: bool
            True if the file was scanned successfully, False otherwise.
        """

        # Open the file
        with open(self.filename, mode="rb") as f:

            if not self._read_obf_header(f):
                return False

            # Scan metadata
            self.obf_file_metadata = self._scan_metadata(f, self.obf_file_header)

            # Get the first stack position
            next_stack_pos = self.obf_file_header.first_stack_pos

            while next_stack_pos != 0:

                # Scan the next stack
                success, obs_stack_metadata = self._read_obf_stack(f, next_stack_pos)

                if not success:
                    return False

                # Append current stack header
                self._obf_stacks_list.append(obs_stack_metadata)

                # Do we have a next header to parse?
                next_stack_pos = obs_stack_metadata.next_stack_pos

        return True

    def __getitem__(self, stack_index: int) -> Union[OBFStackMetadata, None]:
        """Allows accessing the reader with the `[]` notation to get the next stack metadata.

        Parameters
        ----------

        stack_index: int
            Index of the stack to be retrieved.

        Returns
        -------

        metadata: Union[OBFStackMetadata, None]
            Metadata for the requested stack, or None if no file was loaded.
        """

        # Is anything loaded?
        if len(self._obf_stacks_list) == 0:
            return None

        if stack_index < 0 or stack_index > (len(self._obf_stacks_list) - 1):
            raise ValueError(f"Index value {stack_index} is out of bounds.")

        # Get and return the metadata
        metadata = self._obf_stacks_list[stack_index]
        return metadata

    def __iter__(self):
        """Return the iterator.

        Returns
        -------

            iterator
        """
        self._current_index = 0
        return self

    def __next__(self):
        if self._current_index < len(self._obf_stacks_list):
            metadata = self.__getitem__(self._current_index)
            self._current_index += 1
            return metadata
        else:
            raise StopIteration

    @property
    def num_stacks(self):
        """Return the number of stacks contained in the file."""
        return len(self._obf_stacks_list)

    def get_data_physical_sizes(
        self, stack_index: int, scaled: bool = True
    ) -> Union[list, None]:
        """Returns the (scaled) data physical size for the requested stack.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        scaled: bool
            If scaled is True, the physical sizes will be scaled by the corresponding scale factors
            as reported by MSRReader.get_data_units().

        Returns
        -------

        offsets: Union[list, None]
            Physical sizes for 2D images, None otherwise.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        if obf_stack_metadata is None:
            return None

        # Get the physical lengths
        phys_lengths = obf_stack_metadata.physical_lengths[: obf_stack_metadata.rank]

        # Do we need to scale?
        if scaled:
            _, factors = self.get_data_units(stack_index=stack_index)
            for i, factor in enumerate(factors):
                if factor != 1.0:
                    phys_lengths[i] *= factor

        # Return the physical lengths as list
        return phys_lengths

    def get_data_offsets(
        self, stack_index: int, scaled: bool = True
    ) -> Union[list, None]:
        """Returns the (scaled) data offsets for the requested stack.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        scaled: bool
            If scaled is True, the offsets will be scaled by the corresponding scale factors
            as reported by MSRReader.get_data_units().

        Returns
        -------

        offsets: Union[list, None]
            Offsets for 2D images, None otherwise.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        if obf_stack_metadata is None:
            return None

        # Get the offsets
        offsets = obf_stack_metadata.physical_offsets[: obf_stack_metadata.rank]

        # Do we need to scale?
        if scaled:
            _, factors = self.get_data_units(stack_index=stack_index)
            for i, factor in enumerate(factors):
                if factor != 1.0:
                    offsets[i] *= factor

        return offsets

    def get_data_pixel_sizes(
        self, stack_index: int, scaled: bool = True
    ) -> Union[list, None]:
        """Returns the (scaled) data pixel size for the requested stack.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        scaled: bool
            If scaled is True, the pixel sizes will be scaled by the corresponding scale factors
            as reported by MSRReader.get_data_units().

        Returns
        -------

        offsets: Union[list, None]
            Pixel sizes for 2D images, None otherwise.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        if obf_stack_metadata is None:
            return None

        # Get the physical sizes
        phys_lengths = self.get_data_physical_sizes(
            stack_index=stack_index, scaled=scaled
        )

        # Get the number of pixels along each dimension
        num_pixels = obf_stack_metadata.num_pixels[: obf_stack_metadata.rank]

        # Now divide by the image size
        pixel_sizes = np.array(phys_lengths) / np.array(num_pixels)

        # Return the pixel size as list
        return pixel_sizes.tolist()

    def get_data_units(self, stack_index: int) -> Union[tuple[list, list], None]:
        """Returns the data units and scale factors per dimension for requested stack.

        Units are one of:
            "m": meters
            "kg": kilograms
            "s": s
            "A": Amperes
            "K": Kelvin
            "mol": moles
            "cd": candela
            "r": radian
            "sr": sr

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        Returns
        -------

        unit: Union[tuple[list, list], None]
            List of units and list of scale factors, or None if no file was opened.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        if obf_stack_metadata is None:
            return None

        units = []
        scale_factors = []
        for dim in range(obf_stack_metadata.rank):
            dimensions = obf_stack_metadata.si_dimensions[dim]
            scale_factors.append(dimensions.scale_factor)
            for i, exponent in enumerate(dimensions.exponents):
                if i == 0 and exponent.numerator > 0:
                    units.append("m")
                    break
                elif i == 1 and exponent.numerator > 0:
                    units.append("kg")
                    break
                elif i == 2 and exponent.numerator > 0:
                    units.append("s")
                    break
                elif i == 3 and exponent.numerator > 0:
                    units.append("A")
                    break
                elif i == 4 and exponent.numerator > 0:
                    units.append("K")
                    break
                elif i == 5 and exponent.numerator > 0:
                    units.append("mol")
                    break
                elif i == 6 and exponent.numerator > 0:
                    units.append("cd")
                    break
                elif i == 7 and exponent.numerator > 0:
                    units.append("r")
                    break
                elif i == 8 and exponent.numerator > 0:
                    units.append("sr")
                    break
                else:
                    units.append("")
                    break

        # Return the extracted units and scale factors
        return units, scale_factors

    def get_data(self, stack_index: int) -> Union[np.ndarray, None]:
        """Read the data for requested stack: only images are returned.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to read the data.

        Returns
        -------

        frame: Union[np.ndarray, None]
            Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"stack_index={stack_index} is out of bounds.")

        # Get the metadata for the requested stack
        obf_stack_metadata = self._obf_stacks_list[stack_index]

        # Currently, we only support format 6 and newer
        if obf_stack_metadata.format_version < 6:
            print("Reading data is supported only for stack format 6 and newer.")
            return None

        # If there are chunks, we currently do not read
        if obf_stack_metadata.num_chunk_positions > 0:
            print("Reading chunked data is currently not supported.")
            return None

        # We currently only read 2D images
        if self._get_num_dims(obf_stack_metadata.num_pixels) != 2:
            print("Only 2D images are currently supported.")
            return None

        # Get NumPy data type
        np_data_type, _ = self._get_numpy_data_type(
            obf_stack_metadata.data_type_on_disk
        )
        if np_data_type is None:
            print("Unsupported data type.")
            return None

        # Extract some info
        height = obf_stack_metadata.num_pixels[1]
        width = obf_stack_metadata.num_pixels[0]
        bytes_per_sample = obf_stack_metadata.bytes_per_sample

        # Expected number of (decompressed) samples
        expected_num_samples = width * height

        # Number of written bytes
        written_bytes = obf_stack_metadata.samples_written * bytes_per_sample

        # Open the file
        with open(self.filename, mode="rb") as f:

            # Seek to the beginning of the data
            f.seek(obf_stack_metadata.data_start_position)

            # Is there compression?
            if obf_stack_metadata.compression_type != 0:

                # Read the bytes
                compressed_data = f.read(written_bytes)

                # Decompress them
                decompressed_data = zlib.decompress(compressed_data)

                # Cast to a "byte" NumPy array
                raw_frame = np.frombuffer(decompressed_data, dtype=np.uint8)

            else:

                # Read the bytes
                raw_data = f.read(written_bytes)

                # Cast to a "byte" NumPy array
                raw_frame = np.frombuffer(raw_data, dtype=np.uint8)

        # Reinterpret as final data type format (little Endian)
        frame = raw_frame.view(np.dtype(np_data_type))

        # Make sure the final frame size matches the expected size
        if len(frame) != expected_num_samples:
            print("Unexpected length of data retrieved!")
            return None

        # Reshape
        frame = frame.reshape((height, width))

        return frame

    def get_ome_xml_metadata(self) -> Union[str, None]:
        """Return the OME XML metadata.

        Returns
        -------

        ome_xml_metadata: Union[str, None]
            OME XML metadata as formatted string. If no file was loaded, returns None.
        """

        # Get the ome-xml tree
        root = self.obf_file_metadata.tree
        if root is None:
            return None

        # Return metadata as formatted XML string
        return self._tree_to_formatted_xml(root)

    def export_ome_xml_metadata(self, file_name: Union[str, Path]):
        """Export the OME-XML metadata to file.

        Parameters
        ----------

        file_name: Union[str, Path]
            Output file name.
        """

        # Get the ome-xml tree, optionally as formatted string
        metadata = self.get_ome_xml_metadata()
        if metadata is None:
            print("Nothing to export.")
            return

        # Make sure the parent path to the file exists
        Path(file_name).parent.mkdir(parents=True, exist_ok=True)

        # Save to file
        with open(file_name, "w", encoding="utf-8") as f:
            f.write(metadata)

    def get_tag_dictionary(self, stack_index: int) -> Union[dict, None]:
        """Return the tag dictionary for the requested stack.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to return the tag dictionary.

        Returns
        -------

        tag_dictionary: Union[dict, None]
            Dictionary. If no file was loaded, returns None.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"Stack number {stack_index} is out of range.")

        # Get stack metadata
        obf_stack_metadata = self._obf_stacks_list[stack_index]
        if obf_stack_metadata is None:
            return None

        # Get the tag dictionary
        tag = obf_stack_metadata.tag_dictionary

        # Return the tag dictionary
        return tag

    def export_tag_dictionary(self, stack_index: int, file_name: Union[str, Path]):
        """Export the tag dictionary to file.

        Parameters
        ----------

        stack_index: int
            Index of the stack for which to export the tag dictionary.

        file_name: Union[str, Path]
            Output file name.
        """

        if stack_index < 0 or stack_index > len(self._obf_stacks_list):
            raise ValueError(f"Stack number {stack_index} is out of range.")

        # Get tag dictionary
        tag_dictionary = self.get_tag_dictionary(stack_index)
        if tag_dictionary is None:
            return None

        # Make sure file_name is of type Path
        file_name = Path(file_name)

        # Make sure the parent path to the file exists
        file_name.parent.mkdir(parents=True, exist_ok=True)

        # Export the dictionaries
        for key, value in tag_dictionary.items():
            if type(value) is ET.Element:
                mod_file_name = file_name.parent / f"{file_name.stem}_{key}.xml"
                xml_str = self._tree_to_formatted_xml(value)
                with open(mod_file_name, "w") as f:
                    f.write(xml_str)
            elif type(value) is dict:
                mod_file_name = file_name.parent / f"{file_name.stem}_{key}.json"
                with open(mod_file_name, "w") as f:
                    json.dump(value, f, indent=4)
            else:
                mod_file_name = file_name.parent / f"{file_name.stem}_{key}.txt"
                with open(mod_file_name, "w") as f:
                    f.write(value)

    @staticmethod
    def _tree_to_formatted_xml(root: ET, xml_declaration: bool = True) -> str:
        """Converts an xml. tree to formatted xml.

        Parameters
        ----------

        root: xml.etree.ElementTree
            Root element of the xml tree.

        xml_declaration: bool
            Whether to prepend the xml declaration to the converted xml.

        Returns
        -------

        xml_str: str
            Formatted xml.
        """

        # Format tree (optionally add xml declaration)
        xml_str = ET.tostring(
            root, encoding="utf-8", xml_declaration=xml_declaration, method="xml"
        ).decode("utf-8")

        # Remove tabs and new lines
        xml_str = re.sub(r"[\n\t]", "", xml_str)

        # Remove stretches of blank spaces between nodes
        xml_str = re.sub(r">\s+<", "><", xml_str)

        return xml_str

    @staticmethod
    def _get_footer_struct_size(version: int) -> int:
        """Returns the size in pixel of the footer structure for given version.

        Parameters
        ----------

        version: int
            Version number.

        Returns
        -------

        size: int
            Size in bytes of the footer structure for specified version.
        """
        if version == 0:
            return 0
        elif version == 1:
            return _Constants.V1A_FOOTER_LENGTH  # We return version "1A"
        elif version == 2:
            return _Constants.V2_FOOTER_LENGTH
        elif version == 3:
            return _Constants.V3_FOOTER_LENGTH
        elif version == 4:
            return _Constants.V4_FOOTER_LENGTH
        elif version == 5:
            return _Constants.V5A_FOOTER_LENGTH  # We return version "5A"
        elif version == 6:
            return _Constants.V6_FOOTER_LENGTH
        elif version == 7:
            return _Constants.V7_FOOTER_LENGTH
        else:
            raise ValueError(f"Unexpected stack version {version}.")

    @staticmethod
    def _get_num_dims(num_pixels: list[uint32]):
        """Return the number of dimensions of the data.

        Parameters
        ----------

        num_pixels: list[uint32]
            List of number of pixels per dimension.

        Returns
        -------

        n_dims: int
            Number of dimensions for which the number of pixels is larger than 1.
        """
        n_dims = int(np.sum(np.array(num_pixels) > 1))
        return n_dims

    @staticmethod
    def _get_numpy_data_type(
        data_type_on_disk: uint32,
    ) -> tuple[Union[np.dtype, None], Union[str, None]]:
        """Get the NumPy data type corresponding to the stored datatype.

        Parameters
        ----------

        data_type_on_disk: uint32
            UInt32 value from the stack metadata indicating the type of the data.

        Returns
        -------

        numpy_type: np.dtype
            Numpy dtype class. If the data type is not supported, returns None instead.

        str_type: str
            Type string (little endian). If the data type is not supported, returns None instead.
        """
        if data_type_on_disk == 0x00000001:
            return np.uint8, "<u1"
        elif data_type_on_disk == 0x00000002:
            return np.int8, "<i1"
        elif data_type_on_disk == 0x00000004:
            return np.uint16, "<u2"
        elif data_type_on_disk == 0x00000008:
            return np.int16, "<i2"
        elif data_type_on_disk == 0x00000010:
            return np.uint32, "<u4"
        elif data_type_on_disk == 0x00000020:
            return np.int32, "<i4"
        elif data_type_on_disk == 0x00000040:
            return np.float32, "<f4"
        elif data_type_on_disk == 0x00000080:
            return np.float64, "<f8"
        elif data_type_on_disk == 0x00001000:
            return np.uint64, "<u8"
        elif data_type_on_disk == 0x00002000:
            return np.int64, "<i8"
        else:
            return None, None

    def _read_obf_header(self, f: BinaryIO) -> bool:
        """Read the OBF header.

        Parameters
        ----------

        f: BinaryIO
            Open file handle.

        Returns
        -------
        success: bool
            True if reading the file header was successful, False otherwise.
        """

        # Read the magic header
        magic_header = f.read(10)

        if not magic_header == b"OMAS_BF\n\xff\xff":
            print("Not a valid MSR (OBF) file.")
            return False

        # Store the magic header
        self.obf_file_header.magic_header = magic_header

        # Get format version (uint32)
        self.obf_file_header.format_version = struct.unpack("<I", f.read(4))[0]

        if self.obf_file_header.format_version < 2:
            print("The MSR (OBF) file must be version 2 or above.")
            return False

        # Get position of the first stack header in the file (uint64)
        self.obf_file_header.first_stack_pos = struct.unpack("<Q", f.read(8))[0]

        # Get length of following utf-8 description (uint32)
        self.obf_file_header.descr_len = struct.unpack("<I", f.read(4))[0]

        # Get description (bytes -> utf-8)
        description = ""
        if self.obf_file_header.descr_len > 0:
            description = f.read(self.obf_file_header.descr_len).decode(
                "utf-8", errors="replace"
            )
        self.obf_file_header.description = description

        # Get metadata position (uint64)
        self.obf_file_header.meta_data_position = struct.unpack("<Q", f.read(8))[0]

        return True

    def _read_obf_stack(
        self, f: BinaryIO, next_stack_pos: int
    ) -> tuple[bool, OBFStackMetadata]:
        """Read current OBF stack metadata (header + footer).

        Parameters
        ----------

        f: BinaryIO
            Open file handle.

        next_stack_pos: int
            Position in file where the next stack starts.

        Returns
        -------

        success: bool
            Whether parsing was successful.

        obf_stack_metadata: OBFStackMetadata
            OFBStackMetadata object.
        """

        # Initialize the metadata
        obf_stack_metadata = OBFStackMetadata()

        # Move at the beginning of the stack
        f.seek(next_stack_pos)

        # Read the header
        success, obf_stack_metadata = self._read_obf_stack_header(f, obf_stack_metadata)
        if not success:
            return False, obf_stack_metadata

        # Process the footer
        obf_stack_metadata = self._read_obf_stack_footer(f, obf_stack_metadata)

        # Return
        return True, obf_stack_metadata

    def _read_obf_stack_header(
        self, f: BinaryIO, obf_stack_metadata: OBFStackMetadata
    ) -> tuple[bool, OBFStackMetadata]:
        """Read the OBF stack header and update metadata.

        The file should already be positioned at the right location.

        Parameters
        ----------

        f: BinaryIO
            File handle to open.

        obf_stack_metadata: OBFStackMetadata
            Current OFBStackMetadata object

        Returns
        -------

        success: bool
            Whether parsing was successful.

        obf_stack_metadata: OBFStackMetadata
            Updated OFBStackMetadata object
        """

        # Read the magic header
        obf_stack_metadata.magic_header = f.read(16)

        if not obf_stack_metadata.magic_header == b"OMAS_BF_STACK\n\xff\xff":
            print("Could not find OBF stack header.")
            return False, obf_stack_metadata

        # Get format version (uint32)
        obf_stack_metadata.format_version = struct.unpack("<I", f.read(4))[0]

        # Get the number of valid dimensions
        obf_stack_metadata.rank = struct.unpack("<I", f.read(4))[0]

        # Get the number of pixels along each dimension
        obf_stack_metadata.num_pixels = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            n = struct.unpack("<I", f.read(4))[0]
            if i < obf_stack_metadata.rank:
                obf_stack_metadata.num_pixels.append(n)

        # Get the physical lengths along each dimension
        obf_stack_metadata.physical_lengths = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            p = struct.unpack("<d", f.read(8))[0]
            if i < obf_stack_metadata.rank:
                obf_stack_metadata.physical_lengths.append(p)

        # Get the physical lengths along each dimension
        obf_stack_metadata.physical_offsets = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            o = struct.unpack("<d", f.read(8))[0]
            if i < obf_stack_metadata.rank:
                obf_stack_metadata.physical_offsets.append(o)

        # Read the data type; it should be one of:
        # 0x00000000: automatically determine the data type
        # 0x00000001: uint8
        # 0x00000002: int8
        # 0x00000004: uint16
        # 0x00000008: int16
        # 0x00000010: uint32
        # 0x00000020: int32
        # 0x00000040: float32
        # 0x00000080: float64 (double)
        # 0x00000400: Byte RGB, 3 samples per pixel
        # 0x00000800: Byte RGB, 4 samples per pixel
        # 0x00001000: uint64
        # 0x00002000: int64
        # 0x00010000: (c++) boolean
        #
        # Note: all numeric formats have a complex-number variant with
        # format: data_type | 0x40000000
        obf_stack_metadata.data_type_on_disk = struct.unpack("<I", f.read(4))[0]
        obf_stack_metadata.bytes_per_sample = self._get_bytes_per_sample_from_data_type(
            obf_stack_metadata.data_type_on_disk
        )

        # Compression type (0 for none, 1 for zip)
        obf_stack_metadata.compression_type = struct.unpack("<I", f.read(4))[0]

        # Compression level (0 through 9)
        obf_stack_metadata.compression_level = struct.unpack("<I", f.read(4))[0]

        # Length of the stack name
        obf_stack_metadata.length_stack_name = struct.unpack("<I", f.read(4))[0]

        # Description length
        obf_stack_metadata.length_stack_description = struct.unpack("<I", f.read(4))[0]

        # Reserved field
        obf_stack_metadata.reserved = struct.unpack("<Q", f.read(8))[0]

        # Data length on disk
        obf_stack_metadata.data_len_disk = struct.unpack("<Q", f.read(8))[0]

        # Next stack position in the file
        obf_stack_metadata.next_stack_pos = struct.unpack("<Q", f.read(8))[0]

        # Scan also stack name and description (right after the end of the header)
        obf_stack_metadata.stack_name = (
            ""
            if obf_stack_metadata.length_stack_name == 0
            else f.read(obf_stack_metadata.length_stack_name).decode(
                "utf-8", errors="replace"
            )
        )
        obf_stack_metadata.stack_description = (
            ""
            if obf_stack_metadata.length_stack_description == 0
            else f.read(obf_stack_metadata.length_stack_description).decode("utf-8")
        )

        # Now we are at the beginning of the stack (image or other)
        obf_stack_metadata.data_start_position = f.tell()

        # Start position of the footer
        footer_start_position = (
            obf_stack_metadata.data_start_position + obf_stack_metadata.data_len_disk
        )

        # Move to the beginning of the footer
        f.seek(footer_start_position)

        return True, obf_stack_metadata

    def _read_obf_stack_footer(self, f: BinaryIO, obf_stack_metadata: OBFStackMetadata):
        """Process footer.

        Parameters
        ----------
        f: BinaryIO
            Open file handle.

        obf_stack_metadata: OBFStackMetadata
            Metadata object for current stack.

        Returns
        -------

        obf_stack_metadata: OBFFileMetadata
            Updated metadata object for current stack.
        """

        #
        # Version 0
        #

        # Current position (beginning of the footer)
        obf_stack_metadata.footer_start_pos = f.tell()

        # If stack version is 0, there is no footer
        if obf_stack_metadata.format_version == 0:
            obf_stack_metadata.footer_size = 0
            return obf_stack_metadata

        #
        # Version 1/1A
        #

        # What is the expected size of the footer for this header version?
        size_for_version = self._get_footer_struct_size(
            obf_stack_metadata.format_version
        )

        # Keep track ot the side while we proceed
        current_size = 0

        # Get size of the footer header
        obf_stack_metadata.footer_size = struct.unpack("<I", f.read(4))[0]
        current_size += 4

        # Position of the beginning of the variable metadata
        obf_stack_metadata.variable_metadata_start_position = (
            obf_stack_metadata.footer_start_pos + obf_stack_metadata.footer_size
        )

        # Entries are != 0 for all axes that have a pixel position array (after the footer)
        col_positions_present = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            p = struct.unpack("<I", f.read(4))[0]
            if i < obf_stack_metadata.rank:
                col_positions_present.append(p != 0)
            current_size += 4
        obf_stack_metadata.has_col_positions = col_positions_present

        # Entries are != 0 for all axes that have a label (after the footer)
        col_labels_present = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            b = struct.unpack("<I", f.read(4))[0]
            if i < obf_stack_metadata.rank:
                col_labels_present.append(b != 0)
            current_size += 4
        obf_stack_metadata.has_col_labels = col_labels_present

        # Metadata length (superseded by tag dictionary in version > 4)
        obf_stack_metadata.obsolete_metadata_length = struct.unpack("<I", f.read(4))[0]
        current_size += 4

        # Internal check
        assert (
            current_size == _Constants.V1A_FOOTER_LENGTH
        ), "Unexpected length of version 1/1A data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 2
        #

        # SI units of the value carried
        fractions = []
        for i in range(_Constants.OBF_SI_FRACTION_NUM_ELEMENTS):
            numerator = struct.unpack("<i", f.read(4))[0]
            denominator = struct.unpack("<i", f.read(4))[0]
            fractions.append(SIFraction(numerator=numerator, denominator=denominator))
            current_size += 8
        scale_factor = struct.unpack("<d", f.read(8))[0]
        current_size += 8
        si_value = SIUnit(exponents=fractions, scale_factor=scale_factor)
        obf_stack_metadata.si_value = si_value

        # SI units of the axes
        dimensions = []
        for i in range(_Constants.BF_MAX_DIMENSIONS):
            fractions = []
            for j in range(_Constants.OBF_SI_FRACTION_NUM_ELEMENTS):
                numerator = struct.unpack("<i", f.read(4))[0]
                denominator = struct.unpack("<i", f.read(4))[0]
                fractions.append(
                    SIFraction(numerator=numerator, denominator=denominator)
                )
                current_size += 8
            scale_factor = struct.unpack("<d", f.read(8))[0]
            current_size += 8
            dimensions.append(SIUnit(exponents=fractions, scale_factor=scale_factor))

        # Add all SI dimensions
        obf_stack_metadata.si_dimensions = dimensions

        # Internal check
        assert (
            current_size == _Constants.V2_FOOTER_LENGTH
        ), "Unexpected length of version 2 data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 3
        #

        # The number of flush points
        num_flush_points = struct.unpack("<Q", f.read(8))[0]
        current_size += 8
        obf_stack_metadata.num_flush_points = num_flush_points

        # The flush block size
        flush_block_size = struct.unpack("<Q", f.read(8))[0]
        current_size += 8
        obf_stack_metadata.flush_block_size = flush_block_size

        # Internal check
        assert (
            current_size == _Constants.V3_FOOTER_LENGTH
        ), "Unexpected length of version 3 data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 4
        #
        obf_stack_metadata.tag_dictionary_length = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        # Internal check
        assert (
            current_size == _Constants.V4_FOOTER_LENGTH
        ), "Unexpected length of version 4 data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 5/5A
        #

        # Where on disk all the meta-data ends
        obf_stack_metadata.stack_end_disk = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        # Min supported format version
        obf_stack_metadata.min_format_version = struct.unpack("<I", f.read(4))[0]
        current_size += 4

        # The position where the stack ends on disk.
        obf_stack_metadata.stack_end_used_disk = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        # Internal check
        assert (
            current_size == _Constants.V5A_FOOTER_LENGTH
        ), "Unexpected length of version 5/5A data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 6
        #

        # The total number of samples available on disk. By convention all remaining data is
        # assumed to be zero or undefined. If this is less than the data contained of the stack
        # it is safe to assume that the stack was truncated by ending the measurement early.
        # If 0, the number of samples written is the one expected from the stack size.
        obf_stack_metadata.samples_written = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        obf_stack_metadata.num_chunk_positions = struct.unpack("<Q", f.read(8))[0]
        current_size += 8

        # Internal check
        assert (
            current_size == _Constants.V6_FOOTER_LENGTH
        ), "Unexpected length of version 6 data."

        # Have we read enough for this version?
        if current_size > size_for_version:
            return obf_stack_metadata

        #
        # Version 7
        #

        # There is no new documented footer metadata for version 7.

        #
        # Read data after the end of footer
        #

        f.seek(obf_stack_metadata.variable_metadata_start_position)

        # Read labels
        labels = []
        for i in range(obf_stack_metadata.rank):
            n = struct.unpack("<I", f.read(4))[0]
            label = f.read(n).decode("utf-8")
            labels.append(label)
        obf_stack_metadata.labels = labels

        # Read steps (where presents)
        steps = []
        for dimension in range(obf_stack_metadata.rank):
            lst = []
            if obf_stack_metadata.has_col_positions[dimension]:
                for position in range(obf_stack_metadata.num_pixels[dimension]):
                    step = struct.unpack("<d", f.read(8))[0]
                    lst.append(step)
            steps.append(lst)

        # Skip the obsolete metadata
        f.seek(f.tell() + obf_stack_metadata.obsolete_metadata_length)

        # Flush points
        if obf_stack_metadata.num_flush_points > 0:
            flush_points = []
            for i in range(obf_stack_metadata.num_flush_points):
                flush_points.append(struct.unpack("<Q", f.read(8))[0])
            obf_stack_metadata.flush_points = flush_points

        # Tag dictionary
        tag_dictionary = {}
        length_key = 1
        while length_key > 0:
            new_key = self._read_string(f)
            length_key = len(new_key)
            if length_key > 0:
                # Get value
                new_value = self._read_string(f, as_str=True, as_utf8=True)

                # Try to process it
                try:
                    tree = ET.fromstring(new_value)
                except ET.ParseError:
                    # Some keys are not XML, but stringified dictionaries
                    try:
                        tree = json.loads(new_value)
                    except json.JSONDecodeError as e:
                        print(
                            f"Failed processing value for key '{new_key}' ({e}): storing as raw string."
                        )
                        tree = new_value

                # Store it without further processing
                tag_dictionary[new_key] = tree

        obf_stack_metadata.tag_dictionary = tag_dictionary

        # Chunk positions
        if obf_stack_metadata.num_chunk_positions > 0:
            logical_positions = []
            file_positions = []

            # Start with 0
            logical_positions.append(0)
            file_positions.append(0)

            for i in range(obf_stack_metadata.num_chunk_positions):
                logical_positions.append(struct.unpack("<Q", f.read(8))[0])
                file_positions.append(struct.unpack("<Q", f.read(8))[0])

            obf_stack_metadata.chunk_logical_positions = logical_positions
            obf_stack_metadata.chunk_file_positions = file_positions

        # Return
        return obf_stack_metadata

    def _scan_metadata(
        self, f: BinaryIO, obf_file_header: OBFFileHeader
    ) -> Union[OBFFileMetadata, None]:
        """Scan the metadata at the location stored in the header.

        The expected values are a key matching: "ome_xml" followed by
        valid OME XML metadata that we parse and return as an ElementTree.

        Parameters
        ----------

        f: BinaryIO
            Open file handle.

        obf_file_header: OBFFileHeader
            File header structure.

        Returns
        -------

        metadata: OBFFileMetadata
            OME-XML file metadata.
        """

        if obf_file_header.meta_data_position == 0:
            return None

        # Remember current position
        current_pos = f.tell()

        # Move to the beginning of the metadata
        f.seek(obf_file_header.meta_data_position)

        # Initialize OBFFileMetadata object
        metadata = OBFFileMetadata()

        # Keep reading strings until done
        strings = []
        length_str = 1
        while length_str > 0:
            new_str = self._read_string(f)
            length_str = len(new_str)
            if length_str > 0:
                strings.append(new_str)

        # Now parse
        success = False
        tree = None
        if len(strings) == 2 and strings[0] == "ome_xml":
            try:
                tree = ET.fromstring(strings[1])
                success = True
            except ET.ParseError as e:
                success = False

        if not success:
            metadata.tree = None
            metadata.unknown_strings = strings
        else:
            metadata.tree = tree
            metadata.unknown_strings = []

        # Return to previous file position
        f.seek(current_pos)

        return metadata

    @staticmethod
    def _read_string(
        f: BinaryIO, as_str: bool = True, as_utf8: bool = True
    ) -> Union[str, bytes]:
        """Read a string at current position.

        Parameters
        ----------

        f: BinaryIO
            Open file handles.

        as_str: bool = True
            If True parse the raw byte array to string.

        as_utf8: bool = True
            If True decode the string to utf-8. Ignored if as_str is False.

        Returns
        -------

        string: Union[bytes, str]
            Either raw bytes or a str, optionally utf-8 encoded.
        """

        # Read the length of the following string
        length = struct.unpack("<I", f.read(4))[0]
        if length == 0:
            return ""

        # Read `length` bytes and convert them to utf-8 if requested
        value = f.read(length)
        if as_str:
            if as_utf8:
                value = value.decode("utf-8")

        return value

    @staticmethod
    def _get_bytes_per_sample_from_data_type(data_type: uint32) -> int:
        """Return the number of bytes per sample for given data type."""
        supported_types = {
            0x00000001: 1,  # 8-bit unsigned byte
            0x00000002: 1,  # 8-bit signed char
            0x00000004: 2,  # 16-bit word value
            0x00000008: 2,  # 16-bit signed integer
            0x00000010: 4,  # 32-bit unsigned integer
            0x00000020: 4,  # 32-bit signed integer
            0x00000040: 4,  # 32-bit floating point value
            0x00000080: 8,  # 64-bit floating point value
        }

        # Get the number of bytes
        num_bytes_per_sample = supported_types.get(data_type, -1)

        # Check that it is supported
        if num_bytes_per_sample == -1:
            raise ValueError(f"Unsupported data type 0x{data_type:08x}.")

        # Return it
        return num_bytes_per_sample

    def get_image_info_list(self):
        """Return a list of images from all stacks."""

        # Initialize the list
        images = []

        # Do we have images?
        if self.num_stacks == 0:
            return images

        for i, stack in enumerate(self._obf_stacks_list):

            # Only return images
            if (np.array(stack.num_pixels) > 1).sum() == 2:

                # Get pixel size
                pixel_sizes = np.round(
                    np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2
                )[:2]

                # Get detector
                detector = self._get_detector(
                    imspector_dictionary_root=stack.tag_dictionary["imspector"],
                    img_name=stack.stack_name,
                )

                # Build a (univocal) summary string
                as_string = (
                    f"{detector}: {stack.stack_name}: "
                    f"size = (h={stack.num_pixels[1]} x w={stack.num_pixels[0]}); "
                    f"pixel size = {pixel_sizes[0]}nm "
                    f"(index = {i})"
                )
                images.append(
                    {
                        "index": i,
                        "name": stack.stack_name,
                        "detector": detector,
                        "description": stack.stack_description,
                        "num_pixels": stack.num_pixels,
                        "physical_lengths": stack.physical_lengths,
                        "physical_offsets": stack.physical_offsets,
                        "pixel_sizes": pixel_sizes,
                        "as_string": as_string,
                    }
                )

        # Sort the list using natural sorting by the 'as_string' key
        images = natsorted(images, key=lambda x: x["as_string"])

        # Return the extracted metadata
        return images

    def get_image_info_dict(self):
        """Return a hierarchical dictionary of images from all stacks."""

        # Initialize the dictionary
        images = {}

        # Do we have images?
        if self.num_stacks == 0:
            return images

        for i, stack in enumerate(self._obf_stacks_list):

            # Only return images
            if (np.array(stack.num_pixels) > 1).sum() == 2:

                # Get pixel size
                pixel_sizes = np.round(
                    np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2
                )[:2]

                # Get detector
                detector = self._get_detector(
                    imspector_dictionary_root=stack.tag_dictionary["imspector"],
                    img_name=stack.stack_name,
                )

                # Get acquisition number
                match = re.match(
                    r"^.+{(?P<index>\d+)}(?P<extra>.*)$",
                    stack.stack_name,
                    re.IGNORECASE,
                )
                if match:
                    if match["extra"] == "":
                        key = f"Image {match['index']}"
                    else:
                        key = f"Image {match['index']} ({match['extra']})"
                else:
                    key = stack.stack_name

                if key in images:
                    image = images[key]
                else:
                    image = {
                        "metadata": "",
                        "detectors": [],
                    }

                # Metadata
                frame_size = (
                    stack.num_pixels[0] * pixel_sizes[0] / 1000,
                    stack.num_pixels[1] * pixel_sizes[1] / 1000,
                )

                # Build metadata string
                metadata = f"Frame: {frame_size[0]:.1f}x{frame_size[1]:.1f}µm - Pixel: {pixel_sizes[0]}nm"
                if image["metadata"] == "":
                    image["metadata"] = metadata
                else:
                    if image["metadata"] != metadata:
                        raise ValueError(f"Unexpected metadata for '{key}'")

                # Append current detectir
                image["detectors"].append(
                    {
                        "index": i,
                        "name": stack.stack_name,
                        "detector": detector,
                        "description": stack.stack_description,
                        "num_pixels": stack.num_pixels,
                        "physical_lengths": stack.physical_lengths,
                        "physical_offsets": stack.physical_offsets,
                        "pixel_sizes": pixel_sizes,
                    }
                )

                # Store the (updated) image in the dictionary
                images[key] = image

        # Sort the dictionary using natural sorting of its keys
        images = dict(natsorted(images.items()))

        # Return the extracted metadata
        return images

    @staticmethod
    def _get_detector(imspector_dictionary_root: ET, img_name: str) -> Union[str, None]:
        """Extract the detector names from the tag dictionary of current stack.

        Parameters
        ----------

        imspector_dictionary_root: xml.etree.ElementTree
            Root of the "imspector" tree (i.e., tag_dictionary["imspector"]).

        Returns
        -------

        name: Union[str, None]
            Name of the detector, or None if the detector could not be found.
        """

        # Get the channels node
        channels_node = imspector_dictionary_root.find(
            "./doc/ExpControl/measurement/channels"
        )
        if channels_node is None:
            return None

        # Find all items
        items = channels_node.findall("item")
        if items is None:
            return None

        # Process items
        detector = None
        for item in items:
            detector = item.find("./detsel/detector")
            name = item.find("./name")
            if detector is not None and name is not None:
                if name.text in img_name:
                    return detector.text

        return detector

Instance variables

var num_stacks

Return the number of stacks contained in the file.

Expand source code
@property
def num_stacks(self):
    """Return the number of stacks contained in the file."""
    return len(self._obf_stacks_list)

Methods

def export_ome_xml_metadata(self, file_name: Union[pathlib.Path, str])

Export the OME-XML metadata to file.

Parameters

file_name : Union[str, Path]
Output file name.
Expand source code
def export_ome_xml_metadata(self, file_name: Union[str, Path]):
    """Export the OME-XML metadata to file.

    Parameters
    ----------

    file_name: Union[str, Path]
        Output file name.
    """

    # Get the ome-xml tree, optionally as formatted string
    metadata = self.get_ome_xml_metadata()
    if metadata is None:
        print("Nothing to export.")
        return

    # Make sure the parent path to the file exists
    Path(file_name).parent.mkdir(parents=True, exist_ok=True)

    # Save to file
    with open(file_name, "w", encoding="utf-8") as f:
        f.write(metadata)
def export_tag_dictionary(self, stack_index: int, file_name: Union[pathlib.Path, str])

Export the tag dictionary to file.

Parameters

stack_index : int
Index of the stack for which to export the tag dictionary.
file_name : Union[str, Path]
Output file name.
Expand source code
def export_tag_dictionary(self, stack_index: int, file_name: Union[str, Path]):
    """Export the tag dictionary to file.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to export the tag dictionary.

    file_name: Union[str, Path]
        Output file name.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"Stack number {stack_index} is out of range.")

    # Get tag dictionary
    tag_dictionary = self.get_tag_dictionary(stack_index)
    if tag_dictionary is None:
        return None

    # Make sure file_name is of type Path
    file_name = Path(file_name)

    # Make sure the parent path to the file exists
    file_name.parent.mkdir(parents=True, exist_ok=True)

    # Export the dictionaries
    for key, value in tag_dictionary.items():
        if type(value) is ET.Element:
            mod_file_name = file_name.parent / f"{file_name.stem}_{key}.xml"
            xml_str = self._tree_to_formatted_xml(value)
            with open(mod_file_name, "w") as f:
                f.write(xml_str)
        elif type(value) is dict:
            mod_file_name = file_name.parent / f"{file_name.stem}_{key}.json"
            with open(mod_file_name, "w") as f:
                json.dump(value, f, indent=4)
        else:
            mod_file_name = file_name.parent / f"{file_name.stem}_{key}.txt"
            with open(mod_file_name, "w") as f:
                f.write(value)
def get_data(self, stack_index: int) ‑> Optional[numpy.ndarray]

Read the data for requested stack: only images are returned.

Parameters

stack_index : int
Index of the stack for which to read the data.

Returns

frame : Union[np.ndarray, None]
Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image.
Expand source code
def get_data(self, stack_index: int) -> Union[np.ndarray, None]:
    """Read the data for requested stack: only images are returned.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    Returns
    -------

    frame: Union[np.ndarray, None]
        Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    # Currently, we only support format 6 and newer
    if obf_stack_metadata.format_version < 6:
        print("Reading data is supported only for stack format 6 and newer.")
        return None

    # If there are chunks, we currently do not read
    if obf_stack_metadata.num_chunk_positions > 0:
        print("Reading chunked data is currently not supported.")
        return None

    # We currently only read 2D images
    if self._get_num_dims(obf_stack_metadata.num_pixels) != 2:
        print("Only 2D images are currently supported.")
        return None

    # Get NumPy data type
    np_data_type, _ = self._get_numpy_data_type(
        obf_stack_metadata.data_type_on_disk
    )
    if np_data_type is None:
        print("Unsupported data type.")
        return None

    # Extract some info
    height = obf_stack_metadata.num_pixels[1]
    width = obf_stack_metadata.num_pixels[0]
    bytes_per_sample = obf_stack_metadata.bytes_per_sample

    # Expected number of (decompressed) samples
    expected_num_samples = width * height

    # Number of written bytes
    written_bytes = obf_stack_metadata.samples_written * bytes_per_sample

    # Open the file
    with open(self.filename, mode="rb") as f:

        # Seek to the beginning of the data
        f.seek(obf_stack_metadata.data_start_position)

        # Is there compression?
        if obf_stack_metadata.compression_type != 0:

            # Read the bytes
            compressed_data = f.read(written_bytes)

            # Decompress them
            decompressed_data = zlib.decompress(compressed_data)

            # Cast to a "byte" NumPy array
            raw_frame = np.frombuffer(decompressed_data, dtype=np.uint8)

        else:

            # Read the bytes
            raw_data = f.read(written_bytes)

            # Cast to a "byte" NumPy array
            raw_frame = np.frombuffer(raw_data, dtype=np.uint8)

    # Reinterpret as final data type format (little Endian)
    frame = raw_frame.view(np.dtype(np_data_type))

    # Make sure the final frame size matches the expected size
    if len(frame) != expected_num_samples:
        print("Unexpected length of data retrieved!")
        return None

    # Reshape
    frame = frame.reshape((height, width))

    return frame
def get_data_offsets(self, stack_index: int, scaled: bool = True) ‑> Optional[list]

Returns the (scaled) data offsets for the requested stack.

Parameters

stack_index : int
Index of the stack for which to read the data.
scaled : bool
If scaled is True, the offsets will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().

Returns

offsets : Union[list, None]
Offsets for 2D images, None otherwise.
Expand source code
def get_data_offsets(
    self, stack_index: int, scaled: bool = True
) -> Union[list, None]:
    """Returns the (scaled) data offsets for the requested stack.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    scaled: bool
        If scaled is True, the offsets will be scaled by the corresponding scale factors
        as reported by MSRReader.get_data_units().

    Returns
    -------

    offsets: Union[list, None]
        Offsets for 2D images, None otherwise.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    if obf_stack_metadata is None:
        return None

    # Get the offsets
    offsets = obf_stack_metadata.physical_offsets[: obf_stack_metadata.rank]

    # Do we need to scale?
    if scaled:
        _, factors = self.get_data_units(stack_index=stack_index)
        for i, factor in enumerate(factors):
            if factor != 1.0:
                offsets[i] *= factor

    return offsets
def get_data_physical_sizes(self, stack_index: int, scaled: bool = True) ‑> Optional[list]

Returns the (scaled) data physical size for the requested stack.

Parameters

stack_index : int
Index of the stack for which to read the data.
scaled : bool
If scaled is True, the physical sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().

Returns

offsets : Union[list, None]
Physical sizes for 2D images, None otherwise.
Expand source code
def get_data_physical_sizes(
    self, stack_index: int, scaled: bool = True
) -> Union[list, None]:
    """Returns the (scaled) data physical size for the requested stack.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    scaled: bool
        If scaled is True, the physical sizes will be scaled by the corresponding scale factors
        as reported by MSRReader.get_data_units().

    Returns
    -------

    offsets: Union[list, None]
        Physical sizes for 2D images, None otherwise.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    if obf_stack_metadata is None:
        return None

    # Get the physical lengths
    phys_lengths = obf_stack_metadata.physical_lengths[: obf_stack_metadata.rank]

    # Do we need to scale?
    if scaled:
        _, factors = self.get_data_units(stack_index=stack_index)
        for i, factor in enumerate(factors):
            if factor != 1.0:
                phys_lengths[i] *= factor

    # Return the physical lengths as list
    return phys_lengths
def get_data_pixel_sizes(self, stack_index: int, scaled: bool = True) ‑> Optional[list]

Returns the (scaled) data pixel size for the requested stack.

Parameters

stack_index : int
Index of the stack for which to read the data.
scaled : bool
If scaled is True, the pixel sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().

Returns

offsets : Union[list, None]
Pixel sizes for 2D images, None otherwise.
Expand source code
def get_data_pixel_sizes(
    self, stack_index: int, scaled: bool = True
) -> Union[list, None]:
    """Returns the (scaled) data pixel size for the requested stack.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    scaled: bool
        If scaled is True, the pixel sizes will be scaled by the corresponding scale factors
        as reported by MSRReader.get_data_units().

    Returns
    -------

    offsets: Union[list, None]
        Pixel sizes for 2D images, None otherwise.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    if obf_stack_metadata is None:
        return None

    # Get the physical sizes
    phys_lengths = self.get_data_physical_sizes(
        stack_index=stack_index, scaled=scaled
    )

    # Get the number of pixels along each dimension
    num_pixels = obf_stack_metadata.num_pixels[: obf_stack_metadata.rank]

    # Now divide by the image size
    pixel_sizes = np.array(phys_lengths) / np.array(num_pixels)

    # Return the pixel size as list
    return pixel_sizes.tolist()
def get_data_units(self, stack_index: int) ‑> Optional[tuple[list, list]]

Returns the data units and scale factors per dimension for requested stack.

Units are one of: "m": meters "kg": kilograms "s": s "A": Amperes "K": Kelvin "mol": moles "cd": candela "r": radian "sr": sr

Parameters

stack_index : int
Index of the stack for which to read the data.

Returns

unit : Union[tuple[list, list], None]
List of units and list of scale factors, or None if no file was opened.
Expand source code
def get_data_units(self, stack_index: int) -> Union[tuple[list, list], None]:
    """Returns the data units and scale factors per dimension for requested stack.

    Units are one of:
        "m": meters
        "kg": kilograms
        "s": s
        "A": Amperes
        "K": Kelvin
        "mol": moles
        "cd": candela
        "r": radian
        "sr": sr

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to read the data.

    Returns
    -------

    unit: Union[tuple[list, list], None]
        List of units and list of scale factors, or None if no file was opened.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"stack_index={stack_index} is out of bounds.")

    # Get the metadata for the requested stack
    obf_stack_metadata = self._obf_stacks_list[stack_index]

    if obf_stack_metadata is None:
        return None

    units = []
    scale_factors = []
    for dim in range(obf_stack_metadata.rank):
        dimensions = obf_stack_metadata.si_dimensions[dim]
        scale_factors.append(dimensions.scale_factor)
        for i, exponent in enumerate(dimensions.exponents):
            if i == 0 and exponent.numerator > 0:
                units.append("m")
                break
            elif i == 1 and exponent.numerator > 0:
                units.append("kg")
                break
            elif i == 2 and exponent.numerator > 0:
                units.append("s")
                break
            elif i == 3 and exponent.numerator > 0:
                units.append("A")
                break
            elif i == 4 and exponent.numerator > 0:
                units.append("K")
                break
            elif i == 5 and exponent.numerator > 0:
                units.append("mol")
                break
            elif i == 6 and exponent.numerator > 0:
                units.append("cd")
                break
            elif i == 7 and exponent.numerator > 0:
                units.append("r")
                break
            elif i == 8 and exponent.numerator > 0:
                units.append("sr")
                break
            else:
                units.append("")
                break

    # Return the extracted units and scale factors
    return units, scale_factors
def get_image_info_dict(self)

Return a hierarchical dictionary of images from all stacks.

Expand source code
def get_image_info_dict(self):
    """Return a hierarchical dictionary of images from all stacks."""

    # Initialize the dictionary
    images = {}

    # Do we have images?
    if self.num_stacks == 0:
        return images

    for i, stack in enumerate(self._obf_stacks_list):

        # Only return images
        if (np.array(stack.num_pixels) > 1).sum() == 2:

            # Get pixel size
            pixel_sizes = np.round(
                np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2
            )[:2]

            # Get detector
            detector = self._get_detector(
                imspector_dictionary_root=stack.tag_dictionary["imspector"],
                img_name=stack.stack_name,
            )

            # Get acquisition number
            match = re.match(
                r"^.+{(?P<index>\d+)}(?P<extra>.*)$",
                stack.stack_name,
                re.IGNORECASE,
            )
            if match:
                if match["extra"] == "":
                    key = f"Image {match['index']}"
                else:
                    key = f"Image {match['index']} ({match['extra']})"
            else:
                key = stack.stack_name

            if key in images:
                image = images[key]
            else:
                image = {
                    "metadata": "",
                    "detectors": [],
                }

            # Metadata
            frame_size = (
                stack.num_pixels[0] * pixel_sizes[0] / 1000,
                stack.num_pixels[1] * pixel_sizes[1] / 1000,
            )

            # Build metadata string
            metadata = f"Frame: {frame_size[0]:.1f}x{frame_size[1]:.1f}µm - Pixel: {pixel_sizes[0]}nm"
            if image["metadata"] == "":
                image["metadata"] = metadata
            else:
                if image["metadata"] != metadata:
                    raise ValueError(f"Unexpected metadata for '{key}'")

            # Append current detectir
            image["detectors"].append(
                {
                    "index": i,
                    "name": stack.stack_name,
                    "detector": detector,
                    "description": stack.stack_description,
                    "num_pixels": stack.num_pixels,
                    "physical_lengths": stack.physical_lengths,
                    "physical_offsets": stack.physical_offsets,
                    "pixel_sizes": pixel_sizes,
                }
            )

            # Store the (updated) image in the dictionary
            images[key] = image

    # Sort the dictionary using natural sorting of its keys
    images = dict(natsorted(images.items()))

    # Return the extracted metadata
    return images
def get_image_info_list(self)

Return a list of images from all stacks.

Expand source code
def get_image_info_list(self):
    """Return a list of images from all stacks."""

    # Initialize the list
    images = []

    # Do we have images?
    if self.num_stacks == 0:
        return images

    for i, stack in enumerate(self._obf_stacks_list):

        # Only return images
        if (np.array(stack.num_pixels) > 1).sum() == 2:

            # Get pixel size
            pixel_sizes = np.round(
                np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2
            )[:2]

            # Get detector
            detector = self._get_detector(
                imspector_dictionary_root=stack.tag_dictionary["imspector"],
                img_name=stack.stack_name,
            )

            # Build a (univocal) summary string
            as_string = (
                f"{detector}: {stack.stack_name}: "
                f"size = (h={stack.num_pixels[1]} x w={stack.num_pixels[0]}); "
                f"pixel size = {pixel_sizes[0]}nm "
                f"(index = {i})"
            )
            images.append(
                {
                    "index": i,
                    "name": stack.stack_name,
                    "detector": detector,
                    "description": stack.stack_description,
                    "num_pixels": stack.num_pixels,
                    "physical_lengths": stack.physical_lengths,
                    "physical_offsets": stack.physical_offsets,
                    "pixel_sizes": pixel_sizes,
                    "as_string": as_string,
                }
            )

    # Sort the list using natural sorting by the 'as_string' key
    images = natsorted(images, key=lambda x: x["as_string"])

    # Return the extracted metadata
    return images
def get_ome_xml_metadata(self) ‑> Optional[str]

Return the OME XML metadata.

Returns

ome_xml_metadata : Union[str, None]
OME XML metadata as formatted string. If no file was loaded, returns None.
Expand source code
def get_ome_xml_metadata(self) -> Union[str, None]:
    """Return the OME XML metadata.

    Returns
    -------

    ome_xml_metadata: Union[str, None]
        OME XML metadata as formatted string. If no file was loaded, returns None.
    """

    # Get the ome-xml tree
    root = self.obf_file_metadata.tree
    if root is None:
        return None

    # Return metadata as formatted XML string
    return self._tree_to_formatted_xml(root)
def get_tag_dictionary(self, stack_index: int) ‑> Optional[dict]

Return the tag dictionary for the requested stack.

Parameters

stack_index : int
Index of the stack for which to return the tag dictionary.

Returns

tag_dictionary : Union[dict, None]
Dictionary. If no file was loaded, returns None.
Expand source code
def get_tag_dictionary(self, stack_index: int) -> Union[dict, None]:
    """Return the tag dictionary for the requested stack.

    Parameters
    ----------

    stack_index: int
        Index of the stack for which to return the tag dictionary.

    Returns
    -------

    tag_dictionary: Union[dict, None]
        Dictionary. If no file was loaded, returns None.
    """

    if stack_index < 0 or stack_index > len(self._obf_stacks_list):
        raise ValueError(f"Stack number {stack_index} is out of range.")

    # Get stack metadata
    obf_stack_metadata = self._obf_stacks_list[stack_index]
    if obf_stack_metadata is None:
        return None

    # Get the tag dictionary
    tag = obf_stack_metadata.tag_dictionary

    # Return the tag dictionary
    return tag
def scan(self) ‑> bool

Scan the metadata of the file.

Returns

success : bool
True if the file was scanned successfully, False otherwise.
Expand source code
def scan(self) -> bool:
    """Scan the metadata of the file.

    Returns
    -------

    success: bool
        True if the file was scanned successfully, False otherwise.
    """

    # Open the file
    with open(self.filename, mode="rb") as f:

        if not self._read_obf_header(f):
            return False

        # Scan metadata
        self.obf_file_metadata = self._scan_metadata(f, self.obf_file_header)

        # Get the first stack position
        next_stack_pos = self.obf_file_header.first_stack_pos

        while next_stack_pos != 0:

            # Scan the next stack
            success, obs_stack_metadata = self._read_obf_stack(f, next_stack_pos)

            if not success:
                return False

            # Append current stack header
            self._obf_stacks_list.append(obs_stack_metadata)

            # Do we have a next header to parse?
            next_stack_pos = obs_stack_metadata.next_stack_pos

    return True
class MinFluxReader (filename: Union[pathlib.Path, str], valid: bool = True, z_scaling_factor: float = 1.0, is_tracking: bool = False, pool_dcr: bool = False, dwell_time: float = 1.0)

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx, .npy or .mat file to read
valid : bool (optional, default = True)
Whether to load only valid localizations.
z_scaling_factor : float (optional, default = 1.0)
Refractive index mismatch correction factor to apply to the z coordinates.
is_tracking : bool (optional, default = False)
Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment.
pool_dcr : bool (optional, default = False)
Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.
dwell_time : float (optional, default 1.0)
Dwell time in milliseconds.
Expand source code
class MinFluxReader:
    __docs__ = "Reader of MINFLUX data in `.pmx`, `.npy` or `.mat` formats."

    __slots__ = [
        "_pool_dcr",
        "_cfr_index",
        "_data_array",
        "_data_df",
        "_data_full_df",
        "_dcr_index",
        "_dwell_time",
        "_eco_index",
        "_efo_index",
        "_filename",
        "_is_3d",
        "_is_aggregated",
        "_is_last_valid",
        "_is_tracking",
        "_last_valid",
        "_last_valid_cfr",
        "_loc_index",
        "_relocalizations",
        "_reps",
        "_tid_index",
        "_tim_index",
        "_unit_scaling_factor",
        "_valid",
        "_valid_cfr",
        "_valid_entries",
        "_vld_index",
        "_z_scaling_factor",
    ]

    def __init__(
        self,
        filename: Union[Path, str],
        valid: bool = True,
        z_scaling_factor: float = 1.0,
        is_tracking: bool = False,
        pool_dcr: bool = False,
        dwell_time: float = 1.0,
    ):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx`, `.npy` or `.mat` file to read

        valid: bool (optional, default = True)
            Whether to load only valid localizations.

        z_scaling_factor: float (optional, default = 1.0)
            Refractive index mismatch correction factor to apply to the z coordinates.

        is_tracking: bool (optional, default = False)
            Whether the dataset comes from a tracking experiment; otherwise, it is considered as a
            localization experiment.

        pool_dcr: bool (optional, default = False)
            Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.

        dwell_time: float (optional, default 1.0)
            Dwell time in milliseconds.
        """

        # Store the filename
        self._filename: Path = Path(filename)
        if not self._filename.is_file():
            raise IOError(f"The file {self._filename} does not seem to exist.")

        # Keep track of whether the chosen sequence is the last valid.
        self._is_last_valid: bool = False

        # Store the valid flag
        self._valid: bool = valid

        # The localizations are stored in meters in the Imspector files and by
        # design also in the `.pmx` format. Here, we scale them to be in nm
        self._unit_scaling_factor: float = 1e9

        # Store the z correction factor
        self._z_scaling_factor: float = z_scaling_factor

        # Store the dwell time
        self._dwell_time = dwell_time

        # Initialize the data
        self._data_array = None
        self._data_df = None
        self._data_full_df = None
        self._valid_entries = None

        # Whether the acquisition is 2D or 3D
        self._is_3d: bool = False

        # Whether the acquisition is a tracking dataset
        self._is_tracking: bool = is_tracking

        # Whether to pool the dcr values
        self._pool_dcr = pool_dcr

        # Whether the file contains aggregate measurements
        self._is_aggregated: bool = False

        # Indices dependent on 2D or 3D acquisition and whether the
        # data comes from a localization or a tracking experiment.
        self._reps: int = -1
        self._efo_index: int = -1
        self._cfr_index: int = -1
        self._dcr_index: int = -1
        self._eco_index: int = -1
        self._loc_index: int = -1
        self._valid_cfr: list = []
        self._relocalizations: list = []

        # Constant indices
        self._tid_index: int = 0
        self._tim_index: int = 0
        self._vld_index: int = 0

        # Keep track of the last valid global and CFR iterations as returned
        # by the initial scan
        self._last_valid: int = -1
        self._last_valid_cfr: int = -1

        # Load the file
        if not self._load():
            raise IOError(f"The file {self._filename} is not a valid MINFLUX file.")

    @property
    def is_last_valid(self) -> Union[bool, None]:
        """Return True if the selected iteration is the "last valid", False otherwise.
        If the dataframe has not been processed yet, `is_last_valid` will be None."""
        if self._data_df is None:
            return None
        return self._is_last_valid

    @property
    def z_scaling_factor(self) -> float:
        """Returns the scaling factor for the z coordinates."""
        return self._z_scaling_factor

    @property
    def is_3d(self) -> bool:
        """Returns True is the acquisition is 3D, False otherwise."""
        return self._is_3d

    @property
    def is_aggregated(self) -> bool:
        """Returns True is the acquisition is aggregated, False otherwise."""
        return self._is_aggregated

    @property
    def is_tracking(self) -> bool:
        """Returns True for a tracking acquisition, False otherwise."""
        return self._is_tracking

    @property
    def is_pool_dcr(self) -> bool:
        """Returns True if the DCR values over all relocalized iterations (to use all photons)."""
        return self._pool_dcr

    @property
    def dwell_time(self) -> float:
        """Returns the dwell time."""
        return self._dwell_time

    @property
    def num_valid_entries(self) -> int:
        """Number of valid entries."""
        if self._data_array is None:
            return 0
        return self._valid_entries.sum()

    @property
    def num_invalid_entries(self) -> int:
        """Number of valid entries."""
        if self._data_array is None:
            return 0
        return np.logical_not(self._valid_entries).sum()

    @property
    def valid_cfr(self) -> list:
        """Return the iterations with valid CFR measurements.

        Returns
        -------
        cfr: boolean array with True for the iteration indices
             that have a valid measurement.
        """
        if self._data_array is None:
            return []
        return self._valid_cfr

    @property
    def relocalizations(self) -> list:
        """Return the iterations with relocalizations.

        Returns
        -------
        reloc: boolean array with True for the iteration indices that are relocalized.
        """
        if self._data_array is None:
            return []
        return self._relocalizations

    @property
    def valid_raw_data(self) -> Union[None, np.ndarray]:
        """Return the raw data."""
        if self._data_array is None:
            return None
        return self._data_array[self._valid_entries].copy()

    @property
    def processed_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return the raw data as dataframe (some properties only)."""
        if self._data_df is not None:
            return self._data_df

        self._data_df = self._process()
        return self._data_df

    @property
    def raw_data_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return the raw data as dataframe (some properties only)."""
        if self._data_full_df is not None:
            return self._data_full_df
        self._data_full_df = self._raw_data_to_full_dataframe()
        return self._data_full_df

    @property
    def filename(self) -> Union[Path, None]:
        """Return the filename if set."""
        if self._filename is None:
            return None
        return Path(self._filename)

    def set_indices(self, index, cfr_index, process: bool = True):
        """Set the parameter indices.

        We distinguish between the index of all parameters
        that are always measured and are accessed from the
        same iteration, and the cfr index, that is not
        always measured.

        Parameters
        ----------

        index: int
            Global iteration index for all parameters but cfr

        cfr_index: int
            Iteration index for cfr

        process: bool (Optional, default = True)
            By default, when setting the indices, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Make sure there is loaded data
        if self._data_array is None:
            raise ValueError("No data loaded.")

        if self._reps == -1:
            raise ValueError("No data loaded.")

        if len(self._valid_cfr) == 0:
            raise ValueError("No data loaded.")

        # Check that the arguments are compatible with the loaded data
        if index < 0 or index > self._reps - 1:
            raise ValueError(
                f"The value of index must be between 0 and {self._reps - 1}."
            )

        if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1:
            raise ValueError(
                f"The value of index must be between 0 and {len(self._valid_cfr) - 1}."
            )

        # Now set the general values
        self._efo_index = index
        self._dcr_index = index
        self._eco_index = index
        self._loc_index = index

        # Set the cfr index
        self._cfr_index = cfr_index

        # Constant indices
        self._tid_index: int = 0
        self._tim_index: int = 0
        self._vld_index: int = 0

        # Re-process the file? If the processed dataframe already exists,
        # the processing will take place anyway.
        if process or self._data_df is not None:
            self._process()

    def set_tracking(self, is_tracking: bool, process: bool = True):
        """Sets whether the acquisition is tracking or localization.

        Parameters
        ----------

        is_tracking: bool
            Set to True for a tracking acquisition, False for a localization
            acquisition.

        process: bool (Optional, default = True)
            By default, when setting the tracking flag, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._is_tracking = is_tracking

        # Re-process the file?
        if process or self._data_df is not None:
            self._process()

    def set_dwell_time(self, dwell_time: float, process: bool = True):
        """
        Sets the dwell time.

        Parameters
        ----------
        dwell_time: float
            Dwell time.

        process: bool (Optional, default = True)
            By default, when setting the dwell time, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._dwell_time = dwell_time

        # Re-process the file?
        if process or self._data_df is not None:
            self._process()

    def set_pool_dcr(self, pool_dcr: bool, process: bool = True):
        """
        Sets whether the DCR values should be pooled (and weighted by ECO).

        Parameters
        ----------
        pool_dcr: bool
            Whether the DCR values should be pooled (and weighted by ECO).

        process: bool (Optional, default = True)
            By default, when setting the DCR binning flag, the data is rescanned
            and the dataframe is rebuilt. In case several properties of
            the MinFluxReader are modified sequentially, the processing
            can be disabled and run only once after the last change.
            However, this only applies after the first load/scan, when
            the processed dataframe has not been created yet. If the
            dataframe already exists, this flag will be ignored and the
            processing will take place.
        """

        # Update the flag
        self._pool_dcr = pool_dcr

        # Re-process the file?
        if process or self._data_df is not None:
            self._process()

    @classmethod
    def processed_properties(cls) -> list:
        """Returns the properties read from the file that correspond to the processed dataframe column names."""
        return [
            "tid",
            "tim",
            "x",
            "y",
            "z",
            "efo",
            "cfr",
            "eco",
            "dcr",
            "dwell",
            "fluo",
        ]

    @classmethod
    def raw_properties(cls) -> list:
        """Returns the properties read from the file and dynamic that correspond to the raw dataframe column names."""
        return ["tid", "aid", "vld", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr"]

    def _load(self) -> bool:
        """Load the file."""

        if not self._filename.is_file():
            print(f"File {self._filename} does not exist.")
            return False

        # Call the specialized _load_*() function
        if self._filename.name.lower().endswith(".npy"):
            try:
                data_array = np.load(str(self._filename))
                if "fluo" in data_array.dtype.names:
                    self._data_array = data_array
                else:
                    self._data_array = migrate_npy_array(data_array)
            except (
                OSError,
                UnpicklingError,
                ValueError,
                EOFError,
                FileNotFoundError,
                TypeError,
                Exception,
            ) as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        elif self._filename.name.lower().endswith(".mat"):
            try:
                self._data_array = convert_from_mat(self._filename)
            except Exception as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        elif self._filename.name.lower().endswith(".pmx"):
            try:
                self._data_array = NativeArrayReader().read(self._filename)
                if self._data_array is None:
                    print(f"Could not open {self._filename}.")
                    return False
            except Exception as e:
                print(f"Could not open {self._filename}: {e}")
                return False

        else:
            print(f"Unexpected file {self._filename}.")
            return False

        # Store a logical array with the valid entries
        self._valid_entries = self._data_array["vld"]

        # Cache whether the data is 2D or 3D and whether is aggregated
        # The cases are different for localization vs. tracking experiments
        # num_locs = self._data_array["itr"].shape[1]
        self._is_3d = (
            float(np.nanmean(self._data_array["itr"][:, -1]["loc"][:, -1])) != 0.0
        )

        # Set all relevant indices
        self._set_all_indices()

        # Return success
        return True

    def _process(self) -> Union[None, pd.DataFrame]:
        """Returns processed dataframe for valid (or invalid) entries.

        Returns
        -------

        df: pd.DataFrame
            Processed data as DataFrame.
        """

        # Do we have a data array to work on?
        if self._data_array is None:
            return None

        if self._valid:
            indices = self._valid_entries
        else:
            indices = np.logical_not(self._valid_entries)

        # Extract the valid iterations
        itr = self._data_array["itr"][indices]

        # Extract the valid identifiers
        tid = self._data_array["tid"][indices]

        # Extract the valid time points
        tim = self._data_array["tim"][indices]

        # Extract the fluorophore IDs
        fluo = self._data_array["fluo"][indices]
        if np.all(fluo) == 0:
            fluo = np.ones(fluo.shape, dtype=fluo.dtype)

        # The following extraction pattern will change whether the
        # acquisition is normal or aggregated
        if self.is_aggregated:
            # Extract the locations
            loc = itr["loc"].squeeze() * self._unit_scaling_factor
            loc[:, 2] = loc[:, 2] * self._z_scaling_factor

            # Extract EFO
            efo = itr["efo"]

            # Extract CFR
            cfr = itr["cfr"]

            # Extract ECO
            eco = itr["eco"]

            # Extract DCR
            dcr = itr["dcr"]

            # Dwell
            dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0)

        else:
            # Extract the locations
            loc = itr[:, self._loc_index]["loc"] * self._unit_scaling_factor
            loc[:, 2] = loc[:, 2] * self._z_scaling_factor

            # Extract EFO
            efo = itr[:, self._efo_index]["efo"]

            # Extract CFR
            cfr = itr[:, self._cfr_index]["cfr"]

            # Extract ECO
            eco = itr[:, self._eco_index]["eco"]

            # Pool DCR values?
            if self._pool_dcr and np.sum(self._relocalizations) > 1:

                # Calculate ECO contributions
                eco_all = itr[:, self._relocalizations]["eco"]
                eco_sum = eco_all.sum(axis=1)
                eco_all_norm = eco_all / eco_sum.reshape(-1, 1)

                # Extract DCR values and weigh them by the relative ECO contributions
                dcr = itr[:, self._relocalizations]["dcr"]
                dcr = dcr * eco_all_norm
                dcr = dcr.sum(axis=1)

            else:

                # Extract DCR
                dcr = itr[:, self._dcr_index]["dcr"]

            # Calculate dwell
            dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0)

        # Create a Pandas dataframe for the results
        df = pd.DataFrame(
            index=pd.RangeIndex(start=0, stop=len(tid)),
            columns=MinFluxReader.processed_properties(),
        )

        # Store the extracted valid hits into the dataframe
        df["tid"] = tid
        df["x"] = loc[:, 0]
        df["y"] = loc[:, 1]
        df["z"] = loc[:, 2]
        df["tim"] = tim
        df["efo"] = efo
        df["cfr"] = cfr
        df["eco"] = eco
        df["dcr"] = dcr
        df["dwell"] = dwell
        df["fluo"] = fluo

        # Remove rows with NaNs in the loc matrix
        df = df.dropna(subset=["x"])

        # Check if the selected indices correspond to the last valid iteration
        self._is_last_valid = bool(
            self._cfr_index == self._last_valid_cfr
            and self._efo_index == self._last_valid
        )

        return df

    def _raw_data_to_full_dataframe(self) -> Union[None, pd.DataFrame]:
        """Return raw data arranged into a dataframe."""
        if self._data_array is None:
            return None

        # Initialize output dataframe
        df = pd.DataFrame(columns=MinFluxReader.raw_properties())

        # Allocate space for the columns
        n_rows = len(self._data_array) * self._reps

        # Get all unique TIDs and their counts
        _, tid_counts = np.unique(self._data_array["tid"], return_counts=True)

        # Get all tids (repeated over the repetitions)
        tid = np.repeat(self._data_array["tid"], self._reps)

        # Create virtual IDs to mark the measurements of repeated tids
        # @TODO Optimize this!
        aid = np.zeros((n_rows, 1), dtype=np.int32)
        index = 0
        for c in np.nditer(tid_counts):
            tmp = np.repeat(np.arange(c), self._reps)
            n = len(tmp)
            aid[index : index + n, 0] = tmp
            index += n

        # Get all valid flags (repeated over the repetitions)
        vld = np.repeat(self._data_array["vld"], self._reps)

        # Get all timepoints (repeated over the repetitions)
        tim = np.repeat(self._data_array["tim"], self._reps)

        # Get all localizations (reshaped to drop the first dimension)
        loc = (
            self._data_array["itr"]["loc"].reshape((n_rows, 3))
            * self._unit_scaling_factor
        )
        loc[:, 2] = loc[:, 2] * self._z_scaling_factor

        # Get all efos (reshaped to drop the first dimension)
        efo = self._data_array["itr"]["efo"].reshape((n_rows, 1))

        # Get all cfrs (reshaped to drop the first dimension)
        cfr = self._data_array["itr"]["cfr"].reshape((n_rows, 1))

        # Get all ecos (reshaped to drop the first dimension)
        eco = self._data_array["itr"]["eco"].reshape((n_rows, 1))

        # Get all dcrs (reshaped to drop the first dimension)
        dcr = self._data_array["itr"]["dcr"].reshape((n_rows, 1))

        # Build the dataframe
        df["tid"] = tid.astype(np.int32)
        df["aid"] = aid.astype(np.int32)
        df["vld"] = vld
        df["tim"] = tim
        df["x"] = loc[:, 0]
        df["y"] = loc[:, 1]
        df["z"] = loc[:, 2]
        df["efo"] = efo
        df["cfr"] = cfr
        df["eco"] = eco
        df["dcr"] = dcr

        return df

    def _set_all_indices(self):
        """Set indices of properties to be read."""
        if self._data_array is None:
            return False

        # Number of iterations
        self._reps = self._data_array["itr"].shape[1]

        # Is this an aggregated acquisition?
        if self._reps == 1:
            self._is_aggregated = True
        else:
            self._is_aggregated = False

        # Query the data to find the last valid iteration
        # for all measurements
        last_valid = find_last_valid_iteration(self._data_array)

        # Set the extracted indices
        self._efo_index = last_valid["efo_index"]
        self._cfr_index = last_valid["cfr_index"]
        self._dcr_index = last_valid["dcr_index"]
        self._eco_index = last_valid["eco_index"]
        self._loc_index = last_valid["loc_index"]
        self._valid_cfr = last_valid["valid_cfr"]
        self._relocalizations = last_valid["reloc"]

        # Keep track of the last valid iteration
        self._last_valid = len(self._valid_cfr) - 1
        self._last_valid_cfr = last_valid["cfr_index"]

    def __repr__(self) -> str:
        """String representation of the object."""
        if self._data_array is None:
            return "No file loaded."

        str_valid = (
            "all valid"
            if len(self._data_array) == self.num_valid_entries
            else f"{self.num_valid_entries} valid and {self.num_invalid_entries} non valid"
        )

        str_acq = "3D" if self.is_3d else "2D"
        aggr_str = "aggregated" if self.is_aggregated else "normal"

        return (
            f"File: {self._filename.name}: "
            f"{str_acq} {aggr_str} acquisition with {len(self._data_array)} entries ({str_valid})."
        )

    def __str__(self) -> str:
        """Human-friendly representation of the object."""
        return self.__repr__()

Static methods

def processed_properties() ‑> list

Returns the properties read from the file that correspond to the processed dataframe column names.

Expand source code
@classmethod
def processed_properties(cls) -> list:
    """Returns the properties read from the file that correspond to the processed dataframe column names."""
    return [
        "tid",
        "tim",
        "x",
        "y",
        "z",
        "efo",
        "cfr",
        "eco",
        "dcr",
        "dwell",
        "fluo",
    ]
def raw_properties() ‑> list

Returns the properties read from the file and dynamic that correspond to the raw dataframe column names.

Expand source code
@classmethod
def raw_properties(cls) -> list:
    """Returns the properties read from the file and dynamic that correspond to the raw dataframe column names."""
    return ["tid", "aid", "vld", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr"]

Instance variables

var dwell_time : float

Returns the dwell time.

Expand source code
@property
def dwell_time(self) -> float:
    """Returns the dwell time."""
    return self._dwell_time
var filename : Optional[pathlib.Path]

Return the filename if set.

Expand source code
@property
def filename(self) -> Union[Path, None]:
    """Return the filename if set."""
    if self._filename is None:
        return None
    return Path(self._filename)
var is_3d : bool

Returns True is the acquisition is 3D, False otherwise.

Expand source code
@property
def is_3d(self) -> bool:
    """Returns True is the acquisition is 3D, False otherwise."""
    return self._is_3d
var is_aggregated : bool

Returns True is the acquisition is aggregated, False otherwise.

Expand source code
@property
def is_aggregated(self) -> bool:
    """Returns True is the acquisition is aggregated, False otherwise."""
    return self._is_aggregated
var is_last_valid : Optional[bool]

Return True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet, is_last_valid will be None.

Expand source code
@property
def is_last_valid(self) -> Union[bool, None]:
    """Return True if the selected iteration is the "last valid", False otherwise.
    If the dataframe has not been processed yet, `is_last_valid` will be None."""
    if self._data_df is None:
        return None
    return self._is_last_valid
var is_pool_dcr : bool

Returns True if the DCR values over all relocalized iterations (to use all photons).

Expand source code
@property
def is_pool_dcr(self) -> bool:
    """Returns True if the DCR values over all relocalized iterations (to use all photons)."""
    return self._pool_dcr
var is_tracking : bool

Returns True for a tracking acquisition, False otherwise.

Expand source code
@property
def is_tracking(self) -> bool:
    """Returns True for a tracking acquisition, False otherwise."""
    return self._is_tracking
var num_invalid_entries : int

Number of valid entries.

Expand source code
@property
def num_invalid_entries(self) -> int:
    """Number of valid entries."""
    if self._data_array is None:
        return 0
    return np.logical_not(self._valid_entries).sum()
var num_valid_entries : int

Number of valid entries.

Expand source code
@property
def num_valid_entries(self) -> int:
    """Number of valid entries."""
    if self._data_array is None:
        return 0
    return self._valid_entries.sum()
var processed_dataframe : Optional[None]

Return the raw data as dataframe (some properties only).

Expand source code
@property
def processed_dataframe(self) -> Union[None, pd.DataFrame]:
    """Return the raw data as dataframe (some properties only)."""
    if self._data_df is not None:
        return self._data_df

    self._data_df = self._process()
    return self._data_df
var raw_data_dataframe : Optional[None]

Return the raw data as dataframe (some properties only).

Expand source code
@property
def raw_data_dataframe(self) -> Union[None, pd.DataFrame]:
    """Return the raw data as dataframe (some properties only)."""
    if self._data_full_df is not None:
        return self._data_full_df
    self._data_full_df = self._raw_data_to_full_dataframe()
    return self._data_full_df
var relocalizations : list

Return the iterations with relocalizations.

Returns

reloc: boolean array with True for the iteration indices that are relocalized.

Expand source code
@property
def relocalizations(self) -> list:
    """Return the iterations with relocalizations.

    Returns
    -------
    reloc: boolean array with True for the iteration indices that are relocalized.
    """
    if self._data_array is None:
        return []
    return self._relocalizations
var valid_cfr : list

Return the iterations with valid CFR measurements.

Returns

cfr : boolean array with True for the iteration indices
that have a valid measurement.
Expand source code
@property
def valid_cfr(self) -> list:
    """Return the iterations with valid CFR measurements.

    Returns
    -------
    cfr: boolean array with True for the iteration indices
         that have a valid measurement.
    """
    if self._data_array is None:
        return []
    return self._valid_cfr
var valid_raw_data : Optional[numpy.ndarray]

Return the raw data.

Expand source code
@property
def valid_raw_data(self) -> Union[None, np.ndarray]:
    """Return the raw data."""
    if self._data_array is None:
        return None
    return self._data_array[self._valid_entries].copy()
var z_scaling_factor : float

Returns the scaling factor for the z coordinates.

Expand source code
@property
def z_scaling_factor(self) -> float:
    """Returns the scaling factor for the z coordinates."""
    return self._z_scaling_factor

Methods

def set_dwell_time(self, dwell_time: float, process: bool = True)

Sets the dwell time.

Parameters

dwell_time : float
Dwell time.
process : bool (Optional, default = True)
By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_dwell_time(self, dwell_time: float, process: bool = True):
    """
    Sets the dwell time.

    Parameters
    ----------
    dwell_time: float
        Dwell time.

    process: bool (Optional, default = True)
        By default, when setting the dwell time, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._dwell_time = dwell_time

    # Re-process the file?
    if process or self._data_df is not None:
        self._process()
def set_indices(self, index, cfr_index, process: bool = True)

Set the parameter indices.

We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured.

Parameters

index : int
Global iteration index for all parameters but cfr
cfr_index : int
Iteration index for cfr
process : bool (Optional, default = True)
By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_indices(self, index, cfr_index, process: bool = True):
    """Set the parameter indices.

    We distinguish between the index of all parameters
    that are always measured and are accessed from the
    same iteration, and the cfr index, that is not
    always measured.

    Parameters
    ----------

    index: int
        Global iteration index for all parameters but cfr

    cfr_index: int
        Iteration index for cfr

    process: bool (Optional, default = True)
        By default, when setting the indices, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Make sure there is loaded data
    if self._data_array is None:
        raise ValueError("No data loaded.")

    if self._reps == -1:
        raise ValueError("No data loaded.")

    if len(self._valid_cfr) == 0:
        raise ValueError("No data loaded.")

    # Check that the arguments are compatible with the loaded data
    if index < 0 or index > self._reps - 1:
        raise ValueError(
            f"The value of index must be between 0 and {self._reps - 1}."
        )

    if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1:
        raise ValueError(
            f"The value of index must be between 0 and {len(self._valid_cfr) - 1}."
        )

    # Now set the general values
    self._efo_index = index
    self._dcr_index = index
    self._eco_index = index
    self._loc_index = index

    # Set the cfr index
    self._cfr_index = cfr_index

    # Constant indices
    self._tid_index: int = 0
    self._tim_index: int = 0
    self._vld_index: int = 0

    # Re-process the file? If the processed dataframe already exists,
    # the processing will take place anyway.
    if process or self._data_df is not None:
        self._process()
def set_pool_dcr(self, pool_dcr: bool, process: bool = True)

Sets whether the DCR values should be pooled (and weighted by ECO).

Parameters

pool_dcr : bool
Whether the DCR values should be pooled (and weighted by ECO).
process : bool (Optional, default = True)
By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_pool_dcr(self, pool_dcr: bool, process: bool = True):
    """
    Sets whether the DCR values should be pooled (and weighted by ECO).

    Parameters
    ----------
    pool_dcr: bool
        Whether the DCR values should be pooled (and weighted by ECO).

    process: bool (Optional, default = True)
        By default, when setting the DCR binning flag, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._pool_dcr = pool_dcr

    # Re-process the file?
    if process or self._data_df is not None:
        self._process()
def set_tracking(self, is_tracking: bool, process: bool = True)

Sets whether the acquisition is tracking or localization.

Parameters

is_tracking : bool
Set to True for a tracking acquisition, False for a localization acquisition.
process : bool (Optional, default = True)
By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_tracking(self, is_tracking: bool, process: bool = True):
    """Sets whether the acquisition is tracking or localization.

    Parameters
    ----------

    is_tracking: bool
        Set to True for a tracking acquisition, False for a localization
        acquisition.

    process: bool (Optional, default = True)
        By default, when setting the tracking flag, the data is rescanned
        and the dataframe is rebuilt. In case several properties of
        the MinFluxReader are modified sequentially, the processing
        can be disabled and run only once after the last change.
        However, this only applies after the first load/scan, when
        the processed dataframe has not been created yet. If the
        dataframe already exists, this flag will be ignored and the
        processing will take place.
    """

    # Update the flag
    self._is_tracking = is_tracking

    # Re-process the file?
    if process or self._data_df is not None:
        self._process()
class NativeArrayReader

Reads the native NumPy array from .pmx files.

Expand source code
class NativeArrayReader:
    """Reads the native NumPy array from `.pmx` files."""

    @staticmethod
    def read(filename: Union[Path, str]):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        # Open the file and read the data
        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version != "1.0" and file_version != "2.0":
                return None

            # We only read the raw NumPy array
            data_array = f["raw/npy"][:]

        return data_array

Static methods

def read(filename: Union[pathlib.Path, str])

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
Expand source code
@staticmethod
def read(filename: Union[Path, str]):
    """Constructor.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    # Open the file and read the data
    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version != "1.0" and file_version != "2.0":
            return None

        # We only read the raw NumPy array
        data_array = f["raw/npy"][:]

    return data_array
class NativeDataFrameReader

Reads the Pandas DataFrame from .pmx files.

Expand source code
class NativeDataFrameReader:
    """Reads the Pandas DataFrame from `.pmx` files."""

    @staticmethod
    def read(filename: Union[Path, str]):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version != "1.0" and file_version != "2.0":
                return None

            # Read dataset
            dataset = f["/paraview/dataframe"]

            # Read the NumPy data
            data_array = dataset[:]

            # Read column names
            column_names = dataset.attrs["column_names"]

            # Read column data types
            column_types = dataset.attrs["column_types"]

            # Read the index
            index_data = f["/paraview/dataframe_index"][:]

            # Create DataFrame with specified columns
            df = pd.DataFrame(data_array, index=index_data, columns=column_names)

            # Apply column data types
            for col, dtype in zip(column_names, column_types):
                df[col] = df[col].astype(dtype)

        return df

Static methods

def read(filename: Union[pathlib.Path, str])

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
Expand source code
@staticmethod
def read(filename: Union[Path, str]):
    """Constructor.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version != "1.0" and file_version != "2.0":
            return None

        # Read dataset
        dataset = f["/paraview/dataframe"]

        # Read the NumPy data
        data_array = dataset[:]

        # Read column names
        column_names = dataset.attrs["column_names"]

        # Read column data types
        column_types = dataset.attrs["column_types"]

        # Read the index
        index_data = f["/paraview/dataframe_index"][:]

        # Create DataFrame with specified columns
        df = pd.DataFrame(data_array, index=index_data, columns=column_names)

        # Apply column data types
        for col, dtype in zip(column_names, column_types):
            df[col] = df[col].astype(dtype)

    return df
class NativeMetadataReader

Reads metadata information from .pmx files.

Expand source code
class NativeMetadataReader:
    """Reads metadata information from `.pmx` files."""

    @staticmethod
    def scan(filename: Union[Path, str]):
        """Constructor.

        Parameters
        ----------

        filename: Union[Path, str]
            Full path to the `.pmx` file to scan.
        """

        # Open the file
        with h5py.File(filename, "r") as f:

            # Read the file_version attribute
            file_version = f.attrs["file_version"]

            if file_version != "1.0" and file_version != "2.0":
                return None

            # Version 1 parameters
            try:
                z_scaling_factor = float(f["parameters/z_scaling_factor"][()])
            except KeyError:
                return None

            try:
                min_trace_length = int(f["parameters/min_trace_length"][()])
            except KeyError:
                return None

            try:
                efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:])
            except KeyError as e:
                efo_thresholds = None
            try:
                cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:])
            except KeyError as e:
                cfr_thresholds = None

            try:
                num_fluorophores = int(f["parameters/num_fluorophores"][()])
            except KeyError:
                return None

            # Version 2.0 parameters
            if file_version == "2.0":

                try:
                    # This setting can be missing
                    tr_len_thresholds = tuple(
                        f["parameters/applied_tr_len_thresholds"][:]
                    )
                except KeyError as e:
                    tr_len_thresholds = None

                try:
                    dwell_time = float(f["parameters/dwell_time"][()])
                except KeyError as e:
                    return None

                try:
                    # This setting can be missing
                    time_thresholds = tuple(f["parameters/applied_time_thresholds"][:])
                except KeyError as e:
                    time_thresholds = None

                # HDF5 does not have a native boolean type, so we save as int8 and convert it
                # back to boolean on read.
                try:
                    is_tracking = bool(f["parameters/is_tracking"][()])
                except KeyError as e:
                    return None

                try:
                    pool_dcr = bool(f["parameters/pool_dcr"][()])
                except KeyError as e:
                    # This is an addendum to version 2.0, and we allow it to be missing.
                    # It will fall back to False.
                    pool_dcr = False

                try:
                    scale_bar_size = float(f["parameters/scale_bar_size"][()])
                except KeyError as e:
                    return None

            else:
                tr_len_thresholds = None
                time_thresholds = None
                dwell_time = 1.0
                is_tracking = False
                pool_dcr = False
                scale_bar_size = 500

        # Store and return
        metadata = NativeMetadata(
            pool_dcr=pool_dcr,
            cfr_thresholds=cfr_thresholds,
            dwell_time=dwell_time,
            efo_thresholds=efo_thresholds,
            is_tracking=is_tracking,
            min_trace_length=min_trace_length,
            num_fluorophores=num_fluorophores,
            scale_bar_size=scale_bar_size,
            time_thresholds=time_thresholds,
            tr_len_thresholds=tr_len_thresholds,
            z_scaling_factor=z_scaling_factor,
        )

        return metadata

Static methods

def scan(filename: Union[pathlib.Path, str])

Constructor.

Parameters

filename : Union[Path, str]
Full path to the .pmx file to scan.
Expand source code
@staticmethod
def scan(filename: Union[Path, str]):
    """Constructor.

    Parameters
    ----------

    filename: Union[Path, str]
        Full path to the `.pmx` file to scan.
    """

    # Open the file
    with h5py.File(filename, "r") as f:

        # Read the file_version attribute
        file_version = f.attrs["file_version"]

        if file_version != "1.0" and file_version != "2.0":
            return None

        # Version 1 parameters
        try:
            z_scaling_factor = float(f["parameters/z_scaling_factor"][()])
        except KeyError:
            return None

        try:
            min_trace_length = int(f["parameters/min_trace_length"][()])
        except KeyError:
            return None

        try:
            efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:])
        except KeyError as e:
            efo_thresholds = None
        try:
            cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:])
        except KeyError as e:
            cfr_thresholds = None

        try:
            num_fluorophores = int(f["parameters/num_fluorophores"][()])
        except KeyError:
            return None

        # Version 2.0 parameters
        if file_version == "2.0":

            try:
                # This setting can be missing
                tr_len_thresholds = tuple(
                    f["parameters/applied_tr_len_thresholds"][:]
                )
            except KeyError as e:
                tr_len_thresholds = None

            try:
                dwell_time = float(f["parameters/dwell_time"][()])
            except KeyError as e:
                return None

            try:
                # This setting can be missing
                time_thresholds = tuple(f["parameters/applied_time_thresholds"][:])
            except KeyError as e:
                time_thresholds = None

            # HDF5 does not have a native boolean type, so we save as int8 and convert it
            # back to boolean on read.
            try:
                is_tracking = bool(f["parameters/is_tracking"][()])
            except KeyError as e:
                return None

            try:
                pool_dcr = bool(f["parameters/pool_dcr"][()])
            except KeyError as e:
                # This is an addendum to version 2.0, and we allow it to be missing.
                # It will fall back to False.
                pool_dcr = False

            try:
                scale_bar_size = float(f["parameters/scale_bar_size"][()])
            except KeyError as e:
                return None

        else:
            tr_len_thresholds = None
            time_thresholds = None
            dwell_time = 1.0
            is_tracking = False
            pool_dcr = False
            scale_bar_size = 500

    # Store and return
    metadata = NativeMetadata(
        pool_dcr=pool_dcr,
        cfr_thresholds=cfr_thresholds,
        dwell_time=dwell_time,
        efo_thresholds=efo_thresholds,
        is_tracking=is_tracking,
        min_trace_length=min_trace_length,
        num_fluorophores=num_fluorophores,
        scale_bar_size=scale_bar_size,
        time_thresholds=time_thresholds,
        tr_len_thresholds=tr_len_thresholds,
        z_scaling_factor=z_scaling_factor,
    )

    return metadata