Module pyminflux.reader
Reader of MINFLUX data.
Expand source code
# Copyright (c) 2022 - 2024 D-BSSE, ETH Zurich.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__doc__ = "Reader of MINFLUX data."
__all__ = [
"NativeMetadataReader",
"NativeArrayReader",
"NativeDataFrameReader",
"MinFluxReader",
"MSRReader",
]
from ._msr_reader import MSRReader
from ._native_reader import (
NativeArrayReader,
NativeDataFrameReader,
NativeMetadataReader,
)
from ._reader import MinFluxReader
Sub-modules
pyminflux.reader.metadata
pyminflux.reader.util
Classes
class MSRReader (filename: Union[pathlib.Path, str])
-
Reads data and metadata information from
.MSR
(OBF format) files.For documentation, see: https://imspectordocs.readthedocs.io/en/latest/fileformat.html#the-obf-file-format
Note: binary data is stored in little-endian order.
Constructor.
Parameters
filename
:Union[Path, str]
- Full path to the file name to open.
Expand source code
class MSRReader: """Reads data and metadata information from `.MSR` (OBF format) files. For documentation, see: https://imspectordocs.readthedocs.io/en/latest/fileformat.html#the-obf-file-format Note: binary data is stored in little-endian order. """ def __init__(self, filename: Union[Path, str]): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the file name to open. """ # Store the filename self.filename = Path(filename) # File header self.obf_file_header = OBFFileHeader() # Metadata self.obf_file_metadata = OBFFileMetadata() # List of stack metadata objects self._obf_stacks_list: list[OBFStackMetadata] = [] def scan(self) -> bool: """Scan the metadata of the file. Returns ------- success: bool True if the file was scanned successfully, False otherwise. """ # Open the file with open(self.filename, mode="rb") as f: if not self._read_obf_header(f): return False # Scan metadata self.obf_file_metadata = self._scan_metadata(f, self.obf_file_header) # Get the first stack position next_stack_pos = self.obf_file_header.first_stack_pos while next_stack_pos != 0: # Scan the next stack success, obs_stack_metadata = self._read_obf_stack(f, next_stack_pos) if not success: return False # Append current stack header self._obf_stacks_list.append(obs_stack_metadata) # Do we have a next header to parse? next_stack_pos = obs_stack_metadata.next_stack_pos return True def __getitem__(self, stack_index: int) -> Union[OBFStackMetadata, None]: """Allows accessing the reader with the `[]` notation to get the next stack metadata. Parameters ---------- stack_index: int Index of the stack to be retrieved. Returns ------- metadata: Union[OBFStackMetadata, None] Metadata for the requested stack, or None if no file was loaded. """ # Is anything loaded? if len(self._obf_stacks_list) == 0: return None if stack_index < 0 or stack_index > (len(self._obf_stacks_list) - 1): raise ValueError(f"Index value {stack_index} is out of bounds.") # Get and return the metadata metadata = self._obf_stacks_list[stack_index] return metadata def __iter__(self): """Return the iterator. Returns ------- iterator """ self._current_index = 0 return self def __next__(self): if self._current_index < len(self._obf_stacks_list): metadata = self.__getitem__(self._current_index) self._current_index += 1 return metadata else: raise StopIteration @property def num_stacks(self): """Return the number of stacks contained in the file.""" return len(self._obf_stacks_list) def get_data_physical_sizes( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data physical size for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the physical sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Physical sizes for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the physical lengths phys_lengths = obf_stack_metadata.physical_lengths[: obf_stack_metadata.rank] # Do we need to scale? if scaled: _, factors = self.get_data_units(stack_index=stack_index) for i, factor in enumerate(factors): if factor != 1.0: phys_lengths[i] *= factor # Return the physical lengths as list return phys_lengths def get_data_offsets( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data offsets for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the offsets will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Offsets for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the offsets offsets = obf_stack_metadata.physical_offsets[: obf_stack_metadata.rank] # Do we need to scale? if scaled: _, factors = self.get_data_units(stack_index=stack_index) for i, factor in enumerate(factors): if factor != 1.0: offsets[i] *= factor return offsets def get_data_pixel_sizes( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data pixel size for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the pixel sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Pixel sizes for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the physical sizes phys_lengths = self.get_data_physical_sizes( stack_index=stack_index, scaled=scaled ) # Get the number of pixels along each dimension num_pixels = obf_stack_metadata.num_pixels[: obf_stack_metadata.rank] # Now divide by the image size pixel_sizes = np.array(phys_lengths) / np.array(num_pixels) # Return the pixel size as list return pixel_sizes.tolist() def get_data_units(self, stack_index: int) -> Union[tuple[list, list], None]: """Returns the data units and scale factors per dimension for requested stack. Units are one of: "m": meters "kg": kilograms "s": s "A": Amperes "K": Kelvin "mol": moles "cd": candela "r": radian "sr": sr Parameters ---------- stack_index: int Index of the stack for which to read the data. Returns ------- unit: Union[tuple[list, list], None] List of units and list of scale factors, or None if no file was opened. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None units = [] scale_factors = [] for dim in range(obf_stack_metadata.rank): dimensions = obf_stack_metadata.si_dimensions[dim] scale_factors.append(dimensions.scale_factor) for i, exponent in enumerate(dimensions.exponents): if i == 0 and exponent.numerator > 0: units.append("m") break elif i == 1 and exponent.numerator > 0: units.append("kg") break elif i == 2 and exponent.numerator > 0: units.append("s") break elif i == 3 and exponent.numerator > 0: units.append("A") break elif i == 4 and exponent.numerator > 0: units.append("K") break elif i == 5 and exponent.numerator > 0: units.append("mol") break elif i == 6 and exponent.numerator > 0: units.append("cd") break elif i == 7 and exponent.numerator > 0: units.append("r") break elif i == 8 and exponent.numerator > 0: units.append("sr") break else: units.append("") break # Return the extracted units and scale factors return units, scale_factors def get_data(self, stack_index: int) -> Union[np.ndarray, None]: """Read the data for requested stack: only images are returned. Parameters ---------- stack_index: int Index of the stack for which to read the data. Returns ------- frame: Union[np.ndarray, None] Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] # Currently, we only support format 6 and newer if obf_stack_metadata.format_version < 6: print("Reading data is supported only for stack format 6 and newer.") return None # If there are chunks, we currently do not read if obf_stack_metadata.num_chunk_positions > 0: print("Reading chunked data is currently not supported.") return None # We currently only read 2D images if self._get_num_dims(obf_stack_metadata.num_pixels) != 2: print("Only 2D images are currently supported.") return None # Get NumPy data type np_data_type, _ = self._get_numpy_data_type( obf_stack_metadata.data_type_on_disk ) if np_data_type is None: print("Unsupported data type.") return None # Extract some info height = obf_stack_metadata.num_pixels[1] width = obf_stack_metadata.num_pixels[0] bytes_per_sample = obf_stack_metadata.bytes_per_sample # Expected number of (decompressed) samples expected_num_samples = width * height # Number of written bytes written_bytes = obf_stack_metadata.samples_written * bytes_per_sample # Open the file with open(self.filename, mode="rb") as f: # Seek to the beginning of the data f.seek(obf_stack_metadata.data_start_position) # Is there compression? if obf_stack_metadata.compression_type != 0: # Read the bytes compressed_data = f.read(written_bytes) # Decompress them decompressed_data = zlib.decompress(compressed_data) # Cast to a "byte" NumPy array raw_frame = np.frombuffer(decompressed_data, dtype=np.uint8) else: # Read the bytes raw_data = f.read(written_bytes) # Cast to a "byte" NumPy array raw_frame = np.frombuffer(raw_data, dtype=np.uint8) # Reinterpret as final data type format (little Endian) frame = raw_frame.view(np.dtype(np_data_type)) # Make sure the final frame size matches the expected size if len(frame) != expected_num_samples: print("Unexpected length of data retrieved!") return None # Reshape frame = frame.reshape((height, width)) return frame def get_ome_xml_metadata(self) -> Union[str, None]: """Return the OME XML metadata. Returns ------- ome_xml_metadata: Union[str, None] OME XML metadata as formatted string. If no file was loaded, returns None. """ # Get the ome-xml tree root = self.obf_file_metadata.tree if root is None: return None # Return metadata as formatted XML string return self._tree_to_formatted_xml(root) def export_ome_xml_metadata(self, file_name: Union[str, Path]): """Export the OME-XML metadata to file. Parameters ---------- file_name: Union[str, Path] Output file name. """ # Get the ome-xml tree, optionally as formatted string metadata = self.get_ome_xml_metadata() if metadata is None: print("Nothing to export.") return # Make sure the parent path to the file exists Path(file_name).parent.mkdir(parents=True, exist_ok=True) # Save to file with open(file_name, "w", encoding="utf-8") as f: f.write(metadata) def get_tag_dictionary(self, stack_index: int) -> Union[dict, None]: """Return the tag dictionary for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to return the tag dictionary. Returns ------- tag_dictionary: Union[dict, None] Dictionary. If no file was loaded, returns None. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"Stack number {stack_index} is out of range.") # Get stack metadata obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the tag dictionary tag = obf_stack_metadata.tag_dictionary # Return the tag dictionary return tag def export_tag_dictionary(self, stack_index: int, file_name: Union[str, Path]): """Export the tag dictionary to file. Parameters ---------- stack_index: int Index of the stack for which to export the tag dictionary. file_name: Union[str, Path] Output file name. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"Stack number {stack_index} is out of range.") # Get tag dictionary tag_dictionary = self.get_tag_dictionary(stack_index) if tag_dictionary is None: return None # Make sure file_name is of type Path file_name = Path(file_name) # Make sure the parent path to the file exists file_name.parent.mkdir(parents=True, exist_ok=True) # Export the dictionaries for key, value in tag_dictionary.items(): if type(value) is ET.Element: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.xml" xml_str = self._tree_to_formatted_xml(value) with open(mod_file_name, "w") as f: f.write(xml_str) elif type(value) is dict: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.json" with open(mod_file_name, "w") as f: json.dump(value, f, indent=4) else: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.txt" with open(mod_file_name, "w") as f: f.write(value) @staticmethod def _tree_to_formatted_xml(root: ET, xml_declaration: bool = True) -> str: """Converts an xml. tree to formatted xml. Parameters ---------- root: xml.etree.ElementTree Root element of the xml tree. xml_declaration: bool Whether to prepend the xml declaration to the converted xml. Returns ------- xml_str: str Formatted xml. """ # Format tree (optionally add xml declaration) xml_str = ET.tostring( root, encoding="utf-8", xml_declaration=xml_declaration, method="xml" ).decode("utf-8") # Remove tabs and new lines xml_str = re.sub(r"[\n\t]", "", xml_str) # Remove stretches of blank spaces between nodes xml_str = re.sub(r">\s+<", "><", xml_str) return xml_str @staticmethod def _get_footer_struct_size(version: int) -> int: """Returns the size in pixel of the footer structure for given version. Parameters ---------- version: int Version number. Returns ------- size: int Size in bytes of the footer structure for specified version. """ if version == 0: return 0 elif version == 1: return _Constants.V1A_FOOTER_LENGTH # We return version "1A" elif version == 2: return _Constants.V2_FOOTER_LENGTH elif version == 3: return _Constants.V3_FOOTER_LENGTH elif version == 4: return _Constants.V4_FOOTER_LENGTH elif version == 5: return _Constants.V5A_FOOTER_LENGTH # We return version "5A" elif version == 6: return _Constants.V6_FOOTER_LENGTH elif version == 7: return _Constants.V7_FOOTER_LENGTH else: raise ValueError(f"Unexpected stack version {version}.") @staticmethod def _get_num_dims(num_pixels: list[uint32]): """Return the number of dimensions of the data. Parameters ---------- num_pixels: list[uint32] List of number of pixels per dimension. Returns ------- n_dims: int Number of dimensions for which the number of pixels is larger than 1. """ n_dims = int(np.sum(np.array(num_pixels) > 1)) return n_dims @staticmethod def _get_numpy_data_type( data_type_on_disk: uint32, ) -> tuple[Union[np.dtype, None], Union[str, None]]: """Get the NumPy data type corresponding to the stored datatype. Parameters ---------- data_type_on_disk: uint32 UInt32 value from the stack metadata indicating the type of the data. Returns ------- numpy_type: np.dtype Numpy dtype class. If the data type is not supported, returns None instead. str_type: str Type string (little endian). If the data type is not supported, returns None instead. """ if data_type_on_disk == 0x00000001: return np.uint8, "<u1" elif data_type_on_disk == 0x00000002: return np.int8, "<i1" elif data_type_on_disk == 0x00000004: return np.uint16, "<u2" elif data_type_on_disk == 0x00000008: return np.int16, "<i2" elif data_type_on_disk == 0x00000010: return np.uint32, "<u4" elif data_type_on_disk == 0x00000020: return np.int32, "<i4" elif data_type_on_disk == 0x00000040: return np.float32, "<f4" elif data_type_on_disk == 0x00000080: return np.float64, "<f8" elif data_type_on_disk == 0x00001000: return np.uint64, "<u8" elif data_type_on_disk == 0x00002000: return np.int64, "<i8" else: return None, None def _read_obf_header(self, f: BinaryIO) -> bool: """Read the OBF header. Parameters ---------- f: BinaryIO Open file handle. Returns ------- success: bool True if reading the file header was successful, False otherwise. """ # Read the magic header magic_header = f.read(10) if not magic_header == b"OMAS_BF\n\xff\xff": print("Not a valid MSR (OBF) file.") return False # Store the magic header self.obf_file_header.magic_header = magic_header # Get format version (uint32) self.obf_file_header.format_version = struct.unpack("<I", f.read(4))[0] if self.obf_file_header.format_version < 2: print("The MSR (OBF) file must be version 2 or above.") return False # Get position of the first stack header in the file (uint64) self.obf_file_header.first_stack_pos = struct.unpack("<Q", f.read(8))[0] # Get length of following utf-8 description (uint32) self.obf_file_header.descr_len = struct.unpack("<I", f.read(4))[0] # Get description (bytes -> utf-8) description = "" if self.obf_file_header.descr_len > 0: description = f.read(self.obf_file_header.descr_len).decode( "utf-8", errors="replace" ) self.obf_file_header.description = description # Get metadata position (uint64) self.obf_file_header.meta_data_position = struct.unpack("<Q", f.read(8))[0] return True def _read_obf_stack( self, f: BinaryIO, next_stack_pos: int ) -> tuple[bool, OBFStackMetadata]: """Read current OBF stack metadata (header + footer). Parameters ---------- f: BinaryIO Open file handle. next_stack_pos: int Position in file where the next stack starts. Returns ------- success: bool Whether parsing was successful. obf_stack_metadata: OBFStackMetadata OFBStackMetadata object. """ # Initialize the metadata obf_stack_metadata = OBFStackMetadata() # Move at the beginning of the stack f.seek(next_stack_pos) # Read the header success, obf_stack_metadata = self._read_obf_stack_header(f, obf_stack_metadata) if not success: return False, obf_stack_metadata # Process the footer obf_stack_metadata = self._read_obf_stack_footer(f, obf_stack_metadata) # Return return True, obf_stack_metadata def _read_obf_stack_header( self, f: BinaryIO, obf_stack_metadata: OBFStackMetadata ) -> tuple[bool, OBFStackMetadata]: """Read the OBF stack header and update metadata. The file should already be positioned at the right location. Parameters ---------- f: BinaryIO File handle to open. obf_stack_metadata: OBFStackMetadata Current OFBStackMetadata object Returns ------- success: bool Whether parsing was successful. obf_stack_metadata: OBFStackMetadata Updated OFBStackMetadata object """ # Read the magic header obf_stack_metadata.magic_header = f.read(16) if not obf_stack_metadata.magic_header == b"OMAS_BF_STACK\n\xff\xff": print("Could not find OBF stack header.") return False, obf_stack_metadata # Get format version (uint32) obf_stack_metadata.format_version = struct.unpack("<I", f.read(4))[0] # Get the number of valid dimensions obf_stack_metadata.rank = struct.unpack("<I", f.read(4))[0] # Get the number of pixels along each dimension obf_stack_metadata.num_pixels = [] for i in range(_Constants.BF_MAX_DIMENSIONS): n = struct.unpack("<I", f.read(4))[0] if i < obf_stack_metadata.rank: obf_stack_metadata.num_pixels.append(n) # Get the physical lengths along each dimension obf_stack_metadata.physical_lengths = [] for i in range(_Constants.BF_MAX_DIMENSIONS): p = struct.unpack("<d", f.read(8))[0] if i < obf_stack_metadata.rank: obf_stack_metadata.physical_lengths.append(p) # Get the physical lengths along each dimension obf_stack_metadata.physical_offsets = [] for i in range(_Constants.BF_MAX_DIMENSIONS): o = struct.unpack("<d", f.read(8))[0] if i < obf_stack_metadata.rank: obf_stack_metadata.physical_offsets.append(o) # Read the data type; it should be one of: # 0x00000000: automatically determine the data type # 0x00000001: uint8 # 0x00000002: int8 # 0x00000004: uint16 # 0x00000008: int16 # 0x00000010: uint32 # 0x00000020: int32 # 0x00000040: float32 # 0x00000080: float64 (double) # 0x00000400: Byte RGB, 3 samples per pixel # 0x00000800: Byte RGB, 4 samples per pixel # 0x00001000: uint64 # 0x00002000: int64 # 0x00010000: (c++) boolean # # Note: all numeric formats have a complex-number variant with # format: data_type | 0x40000000 obf_stack_metadata.data_type_on_disk = struct.unpack("<I", f.read(4))[0] obf_stack_metadata.bytes_per_sample = self._get_bytes_per_sample_from_data_type( obf_stack_metadata.data_type_on_disk ) # Compression type (0 for none, 1 for zip) obf_stack_metadata.compression_type = struct.unpack("<I", f.read(4))[0] # Compression level (0 through 9) obf_stack_metadata.compression_level = struct.unpack("<I", f.read(4))[0] # Length of the stack name obf_stack_metadata.length_stack_name = struct.unpack("<I", f.read(4))[0] # Description length obf_stack_metadata.length_stack_description = struct.unpack("<I", f.read(4))[0] # Reserved field obf_stack_metadata.reserved = struct.unpack("<Q", f.read(8))[0] # Data length on disk obf_stack_metadata.data_len_disk = struct.unpack("<Q", f.read(8))[0] # Next stack position in the file obf_stack_metadata.next_stack_pos = struct.unpack("<Q", f.read(8))[0] # Scan also stack name and description (right after the end of the header) obf_stack_metadata.stack_name = ( "" if obf_stack_metadata.length_stack_name == 0 else f.read(obf_stack_metadata.length_stack_name).decode( "utf-8", errors="replace" ) ) obf_stack_metadata.stack_description = ( "" if obf_stack_metadata.length_stack_description == 0 else f.read(obf_stack_metadata.length_stack_description).decode("utf-8") ) # Now we are at the beginning of the stack (image or other) obf_stack_metadata.data_start_position = f.tell() # Start position of the footer footer_start_position = ( obf_stack_metadata.data_start_position + obf_stack_metadata.data_len_disk ) # Move to the beginning of the footer f.seek(footer_start_position) return True, obf_stack_metadata def _read_obf_stack_footer(self, f: BinaryIO, obf_stack_metadata: OBFStackMetadata): """Process footer. Parameters ---------- f: BinaryIO Open file handle. obf_stack_metadata: OBFStackMetadata Metadata object for current stack. Returns ------- obf_stack_metadata: OBFFileMetadata Updated metadata object for current stack. """ # # Version 0 # # Current position (beginning of the footer) obf_stack_metadata.footer_start_pos = f.tell() # If stack version is 0, there is no footer if obf_stack_metadata.format_version == 0: obf_stack_metadata.footer_size = 0 return obf_stack_metadata # # Version 1/1A # # What is the expected size of the footer for this header version? size_for_version = self._get_footer_struct_size( obf_stack_metadata.format_version ) # Keep track ot the side while we proceed current_size = 0 # Get size of the footer header obf_stack_metadata.footer_size = struct.unpack("<I", f.read(4))[0] current_size += 4 # Position of the beginning of the variable metadata obf_stack_metadata.variable_metadata_start_position = ( obf_stack_metadata.footer_start_pos + obf_stack_metadata.footer_size ) # Entries are != 0 for all axes that have a pixel position array (after the footer) col_positions_present = [] for i in range(_Constants.BF_MAX_DIMENSIONS): p = struct.unpack("<I", f.read(4))[0] if i < obf_stack_metadata.rank: col_positions_present.append(p != 0) current_size += 4 obf_stack_metadata.has_col_positions = col_positions_present # Entries are != 0 for all axes that have a label (after the footer) col_labels_present = [] for i in range(_Constants.BF_MAX_DIMENSIONS): b = struct.unpack("<I", f.read(4))[0] if i < obf_stack_metadata.rank: col_labels_present.append(b != 0) current_size += 4 obf_stack_metadata.has_col_labels = col_labels_present # Metadata length (superseded by tag dictionary in version > 4) obf_stack_metadata.obsolete_metadata_length = struct.unpack("<I", f.read(4))[0] current_size += 4 # Internal check assert ( current_size == _Constants.V1A_FOOTER_LENGTH ), "Unexpected length of version 1/1A data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 2 # # SI units of the value carried fractions = [] for i in range(_Constants.OBF_SI_FRACTION_NUM_ELEMENTS): numerator = struct.unpack("<i", f.read(4))[0] denominator = struct.unpack("<i", f.read(4))[0] fractions.append(SIFraction(numerator=numerator, denominator=denominator)) current_size += 8 scale_factor = struct.unpack("<d", f.read(8))[0] current_size += 8 si_value = SIUnit(exponents=fractions, scale_factor=scale_factor) obf_stack_metadata.si_value = si_value # SI units of the axes dimensions = [] for i in range(_Constants.BF_MAX_DIMENSIONS): fractions = [] for j in range(_Constants.OBF_SI_FRACTION_NUM_ELEMENTS): numerator = struct.unpack("<i", f.read(4))[0] denominator = struct.unpack("<i", f.read(4))[0] fractions.append( SIFraction(numerator=numerator, denominator=denominator) ) current_size += 8 scale_factor = struct.unpack("<d", f.read(8))[0] current_size += 8 dimensions.append(SIUnit(exponents=fractions, scale_factor=scale_factor)) # Add all SI dimensions obf_stack_metadata.si_dimensions = dimensions # Internal check assert ( current_size == _Constants.V2_FOOTER_LENGTH ), "Unexpected length of version 2 data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 3 # # The number of flush points num_flush_points = struct.unpack("<Q", f.read(8))[0] current_size += 8 obf_stack_metadata.num_flush_points = num_flush_points # The flush block size flush_block_size = struct.unpack("<Q", f.read(8))[0] current_size += 8 obf_stack_metadata.flush_block_size = flush_block_size # Internal check assert ( current_size == _Constants.V3_FOOTER_LENGTH ), "Unexpected length of version 3 data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 4 # obf_stack_metadata.tag_dictionary_length = struct.unpack("<Q", f.read(8))[0] current_size += 8 # Internal check assert ( current_size == _Constants.V4_FOOTER_LENGTH ), "Unexpected length of version 4 data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 5/5A # # Where on disk all the meta-data ends obf_stack_metadata.stack_end_disk = struct.unpack("<Q", f.read(8))[0] current_size += 8 # Min supported format version obf_stack_metadata.min_format_version = struct.unpack("<I", f.read(4))[0] current_size += 4 # The position where the stack ends on disk. obf_stack_metadata.stack_end_used_disk = struct.unpack("<Q", f.read(8))[0] current_size += 8 # Internal check assert ( current_size == _Constants.V5A_FOOTER_LENGTH ), "Unexpected length of version 5/5A data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 6 # # The total number of samples available on disk. By convention all remaining data is # assumed to be zero or undefined. If this is less than the data contained of the stack # it is safe to assume that the stack was truncated by ending the measurement early. # If 0, the number of samples written is the one expected from the stack size. obf_stack_metadata.samples_written = struct.unpack("<Q", f.read(8))[0] current_size += 8 obf_stack_metadata.num_chunk_positions = struct.unpack("<Q", f.read(8))[0] current_size += 8 # Internal check assert ( current_size == _Constants.V6_FOOTER_LENGTH ), "Unexpected length of version 6 data." # Have we read enough for this version? if current_size > size_for_version: return obf_stack_metadata # # Version 7 # # There is no new documented footer metadata for version 7. # # Read data after the end of footer # f.seek(obf_stack_metadata.variable_metadata_start_position) # Read labels labels = [] for i in range(obf_stack_metadata.rank): n = struct.unpack("<I", f.read(4))[0] label = f.read(n).decode("utf-8") labels.append(label) obf_stack_metadata.labels = labels # Read steps (where presents) steps = [] for dimension in range(obf_stack_metadata.rank): lst = [] if obf_stack_metadata.has_col_positions[dimension]: for position in range(obf_stack_metadata.num_pixels[dimension]): step = struct.unpack("<d", f.read(8))[0] lst.append(step) steps.append(lst) # Skip the obsolete metadata f.seek(f.tell() + obf_stack_metadata.obsolete_metadata_length) # Flush points if obf_stack_metadata.num_flush_points > 0: flush_points = [] for i in range(obf_stack_metadata.num_flush_points): flush_points.append(struct.unpack("<Q", f.read(8))[0]) obf_stack_metadata.flush_points = flush_points # Tag dictionary tag_dictionary = {} length_key = 1 while length_key > 0: new_key = self._read_string(f) length_key = len(new_key) if length_key > 0: # Get value new_value = self._read_string(f, as_str=True, as_utf8=True) # Try to process it try: tree = ET.fromstring(new_value) except ET.ParseError: # Some keys are not XML, but stringified dictionaries try: tree = json.loads(new_value) except json.JSONDecodeError as e: print( f"Failed processing value for key '{new_key}' ({e}): storing as raw string." ) tree = new_value # Store it without further processing tag_dictionary[new_key] = tree obf_stack_metadata.tag_dictionary = tag_dictionary # Chunk positions if obf_stack_metadata.num_chunk_positions > 0: logical_positions = [] file_positions = [] # Start with 0 logical_positions.append(0) file_positions.append(0) for i in range(obf_stack_metadata.num_chunk_positions): logical_positions.append(struct.unpack("<Q", f.read(8))[0]) file_positions.append(struct.unpack("<Q", f.read(8))[0]) obf_stack_metadata.chunk_logical_positions = logical_positions obf_stack_metadata.chunk_file_positions = file_positions # Return return obf_stack_metadata def _scan_metadata( self, f: BinaryIO, obf_file_header: OBFFileHeader ) -> Union[OBFFileMetadata, None]: """Scan the metadata at the location stored in the header. The expected values are a key matching: "ome_xml" followed by valid OME XML metadata that we parse and return as an ElementTree. Parameters ---------- f: BinaryIO Open file handle. obf_file_header: OBFFileHeader File header structure. Returns ------- metadata: OBFFileMetadata OME-XML file metadata. """ if obf_file_header.meta_data_position == 0: return None # Remember current position current_pos = f.tell() # Move to the beginning of the metadata f.seek(obf_file_header.meta_data_position) # Initialize OBFFileMetadata object metadata = OBFFileMetadata() # Keep reading strings until done strings = [] length_str = 1 while length_str > 0: new_str = self._read_string(f) length_str = len(new_str) if length_str > 0: strings.append(new_str) # Now parse success = False tree = None if len(strings) == 2 and strings[0] == "ome_xml": try: tree = ET.fromstring(strings[1]) success = True except ET.ParseError as e: success = False if not success: metadata.tree = None metadata.unknown_strings = strings else: metadata.tree = tree metadata.unknown_strings = [] # Return to previous file position f.seek(current_pos) return metadata @staticmethod def _read_string( f: BinaryIO, as_str: bool = True, as_utf8: bool = True ) -> Union[str, bytes]: """Read a string at current position. Parameters ---------- f: BinaryIO Open file handles. as_str: bool = True If True parse the raw byte array to string. as_utf8: bool = True If True decode the string to utf-8. Ignored if as_str is False. Returns ------- string: Union[bytes, str] Either raw bytes or a str, optionally utf-8 encoded. """ # Read the length of the following string length = struct.unpack("<I", f.read(4))[0] if length == 0: return "" # Read `length` bytes and convert them to utf-8 if requested value = f.read(length) if as_str: if as_utf8: value = value.decode("utf-8") return value @staticmethod def _get_bytes_per_sample_from_data_type(data_type: uint32) -> int: """Return the number of bytes per sample for given data type.""" supported_types = { 0x00000001: 1, # 8-bit unsigned byte 0x00000002: 1, # 8-bit signed char 0x00000004: 2, # 16-bit word value 0x00000008: 2, # 16-bit signed integer 0x00000010: 4, # 32-bit unsigned integer 0x00000020: 4, # 32-bit signed integer 0x00000040: 4, # 32-bit floating point value 0x00000080: 8, # 64-bit floating point value } # Get the number of bytes num_bytes_per_sample = supported_types.get(data_type, -1) # Check that it is supported if num_bytes_per_sample == -1: raise ValueError(f"Unsupported data type 0x{data_type:08x}.") # Return it return num_bytes_per_sample def get_image_info_list(self): """Return a list of images from all stacks.""" # Initialize the list images = [] # Do we have images? if self.num_stacks == 0: return images for i, stack in enumerate(self._obf_stacks_list): # Only return images if (np.array(stack.num_pixels) > 1).sum() == 2: # Get pixel size pixel_sizes = np.round( np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2 )[:2] # Get detector detector = self._get_detector( imspector_dictionary_root=stack.tag_dictionary["imspector"], img_name=stack.stack_name, ) # Build a (univocal) summary string as_string = ( f"{detector}: {stack.stack_name}: " f"size = (h={stack.num_pixels[1]} x w={stack.num_pixels[0]}); " f"pixel size = {pixel_sizes[0]}nm " f"(index = {i})" ) images.append( { "index": i, "name": stack.stack_name, "detector": detector, "description": stack.stack_description, "num_pixels": stack.num_pixels, "physical_lengths": stack.physical_lengths, "physical_offsets": stack.physical_offsets, "pixel_sizes": pixel_sizes, "as_string": as_string, } ) # Sort the list using natural sorting by the 'as_string' key images = natsorted(images, key=lambda x: x["as_string"]) # Return the extracted metadata return images def get_image_info_dict(self): """Return a hierarchical dictionary of images from all stacks.""" # Initialize the dictionary images = {} # Do we have images? if self.num_stacks == 0: return images for i, stack in enumerate(self._obf_stacks_list): # Only return images if (np.array(stack.num_pixels) > 1).sum() == 2: # Get pixel size pixel_sizes = np.round( np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2 )[:2] # Get detector detector = self._get_detector( imspector_dictionary_root=stack.tag_dictionary["imspector"], img_name=stack.stack_name, ) # Get acquisition number match = re.match( r"^.+{(?P<index>\d+)}(?P<extra>.*)$", stack.stack_name, re.IGNORECASE, ) if match: if match["extra"] == "": key = f"Image {match['index']}" else: key = f"Image {match['index']} ({match['extra']})" else: key = stack.stack_name if key in images: image = images[key] else: image = { "metadata": "", "detectors": [], } # Metadata frame_size = ( stack.num_pixels[0] * pixel_sizes[0] / 1000, stack.num_pixels[1] * pixel_sizes[1] / 1000, ) # Build metadata string metadata = f"Frame: {frame_size[0]:.1f}x{frame_size[1]:.1f}µm - Pixel: {pixel_sizes[0]}nm" if image["metadata"] == "": image["metadata"] = metadata else: if image["metadata"] != metadata: raise ValueError(f"Unexpected metadata for '{key}'") # Append current detectir image["detectors"].append( { "index": i, "name": stack.stack_name, "detector": detector, "description": stack.stack_description, "num_pixels": stack.num_pixels, "physical_lengths": stack.physical_lengths, "physical_offsets": stack.physical_offsets, "pixel_sizes": pixel_sizes, } ) # Store the (updated) image in the dictionary images[key] = image # Sort the dictionary using natural sorting of its keys images = dict(natsorted(images.items())) # Return the extracted metadata return images @staticmethod def _get_detector(imspector_dictionary_root: ET, img_name: str) -> Union[str, None]: """Extract the detector names from the tag dictionary of current stack. Parameters ---------- imspector_dictionary_root: xml.etree.ElementTree Root of the "imspector" tree (i.e., tag_dictionary["imspector"]). Returns ------- name: Union[str, None] Name of the detector, or None if the detector could not be found. """ # Get the channels node channels_node = imspector_dictionary_root.find( "./doc/ExpControl/measurement/channels" ) if channels_node is None: return None # Find all items items = channels_node.findall("item") if items is None: return None # Process items detector = None for item in items: detector = item.find("./detsel/detector") name = item.find("./name") if detector is not None and name is not None: if name.text in img_name: return detector.text return detector
Instance variables
var num_stacks
-
Return the number of stacks contained in the file.
Expand source code
@property def num_stacks(self): """Return the number of stacks contained in the file.""" return len(self._obf_stacks_list)
Methods
def export_ome_xml_metadata(self, file_name: Union[pathlib.Path, str])
-
Export the OME-XML metadata to file.
Parameters
file_name
:Union[str, Path]
- Output file name.
Expand source code
def export_ome_xml_metadata(self, file_name: Union[str, Path]): """Export the OME-XML metadata to file. Parameters ---------- file_name: Union[str, Path] Output file name. """ # Get the ome-xml tree, optionally as formatted string metadata = self.get_ome_xml_metadata() if metadata is None: print("Nothing to export.") return # Make sure the parent path to the file exists Path(file_name).parent.mkdir(parents=True, exist_ok=True) # Save to file with open(file_name, "w", encoding="utf-8") as f: f.write(metadata)
def export_tag_dictionary(self, stack_index: int, file_name: Union[pathlib.Path, str])
-
Export the tag dictionary to file.
Parameters
stack_index
:int
- Index of the stack for which to export the tag dictionary.
file_name
:Union[str, Path]
- Output file name.
Expand source code
def export_tag_dictionary(self, stack_index: int, file_name: Union[str, Path]): """Export the tag dictionary to file. Parameters ---------- stack_index: int Index of the stack for which to export the tag dictionary. file_name: Union[str, Path] Output file name. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"Stack number {stack_index} is out of range.") # Get tag dictionary tag_dictionary = self.get_tag_dictionary(stack_index) if tag_dictionary is None: return None # Make sure file_name is of type Path file_name = Path(file_name) # Make sure the parent path to the file exists file_name.parent.mkdir(parents=True, exist_ok=True) # Export the dictionaries for key, value in tag_dictionary.items(): if type(value) is ET.Element: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.xml" xml_str = self._tree_to_formatted_xml(value) with open(mod_file_name, "w") as f: f.write(xml_str) elif type(value) is dict: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.json" with open(mod_file_name, "w") as f: json.dump(value, f, indent=4) else: mod_file_name = file_name.parent / f"{file_name.stem}_{key}.txt" with open(mod_file_name, "w") as f: f.write(value)
def get_data(self, stack_index: int) ‑> Optional[numpy.ndarray]
-
Read the data for requested stack: only images are returned.
Parameters
stack_index
:int
- Index of the stack for which to read the data.
Returns
frame
:Union[np.ndarray, None]
- Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image.
Expand source code
def get_data(self, stack_index: int) -> Union[np.ndarray, None]: """Read the data for requested stack: only images are returned. Parameters ---------- stack_index: int Index of the stack for which to read the data. Returns ------- frame: Union[np.ndarray, None] Data as a 2D NumPy array. None if it could not be read or if it was not a 2D image. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] # Currently, we only support format 6 and newer if obf_stack_metadata.format_version < 6: print("Reading data is supported only for stack format 6 and newer.") return None # If there are chunks, we currently do not read if obf_stack_metadata.num_chunk_positions > 0: print("Reading chunked data is currently not supported.") return None # We currently only read 2D images if self._get_num_dims(obf_stack_metadata.num_pixels) != 2: print("Only 2D images are currently supported.") return None # Get NumPy data type np_data_type, _ = self._get_numpy_data_type( obf_stack_metadata.data_type_on_disk ) if np_data_type is None: print("Unsupported data type.") return None # Extract some info height = obf_stack_metadata.num_pixels[1] width = obf_stack_metadata.num_pixels[0] bytes_per_sample = obf_stack_metadata.bytes_per_sample # Expected number of (decompressed) samples expected_num_samples = width * height # Number of written bytes written_bytes = obf_stack_metadata.samples_written * bytes_per_sample # Open the file with open(self.filename, mode="rb") as f: # Seek to the beginning of the data f.seek(obf_stack_metadata.data_start_position) # Is there compression? if obf_stack_metadata.compression_type != 0: # Read the bytes compressed_data = f.read(written_bytes) # Decompress them decompressed_data = zlib.decompress(compressed_data) # Cast to a "byte" NumPy array raw_frame = np.frombuffer(decompressed_data, dtype=np.uint8) else: # Read the bytes raw_data = f.read(written_bytes) # Cast to a "byte" NumPy array raw_frame = np.frombuffer(raw_data, dtype=np.uint8) # Reinterpret as final data type format (little Endian) frame = raw_frame.view(np.dtype(np_data_type)) # Make sure the final frame size matches the expected size if len(frame) != expected_num_samples: print("Unexpected length of data retrieved!") return None # Reshape frame = frame.reshape((height, width)) return frame
def get_data_offsets(self, stack_index: int, scaled: bool = True) ‑> Optional[list]
-
Returns the (scaled) data offsets for the requested stack.
Parameters
stack_index
:int
- Index of the stack for which to read the data.
scaled
:bool
- If scaled is True, the offsets will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().
Returns
offsets
:Union[list, None]
- Offsets for 2D images, None otherwise.
Expand source code
def get_data_offsets( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data offsets for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the offsets will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Offsets for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the offsets offsets = obf_stack_metadata.physical_offsets[: obf_stack_metadata.rank] # Do we need to scale? if scaled: _, factors = self.get_data_units(stack_index=stack_index) for i, factor in enumerate(factors): if factor != 1.0: offsets[i] *= factor return offsets
def get_data_physical_sizes(self, stack_index: int, scaled: bool = True) ‑> Optional[list]
-
Returns the (scaled) data physical size for the requested stack.
Parameters
stack_index
:int
- Index of the stack for which to read the data.
scaled
:bool
- If scaled is True, the physical sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().
Returns
offsets
:Union[list, None]
- Physical sizes for 2D images, None otherwise.
Expand source code
def get_data_physical_sizes( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data physical size for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the physical sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Physical sizes for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the physical lengths phys_lengths = obf_stack_metadata.physical_lengths[: obf_stack_metadata.rank] # Do we need to scale? if scaled: _, factors = self.get_data_units(stack_index=stack_index) for i, factor in enumerate(factors): if factor != 1.0: phys_lengths[i] *= factor # Return the physical lengths as list return phys_lengths
def get_data_pixel_sizes(self, stack_index: int, scaled: bool = True) ‑> Optional[list]
-
Returns the (scaled) data pixel size for the requested stack.
Parameters
stack_index
:int
- Index of the stack for which to read the data.
scaled
:bool
- If scaled is True, the pixel sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units().
Returns
offsets
:Union[list, None]
- Pixel sizes for 2D images, None otherwise.
Expand source code
def get_data_pixel_sizes( self, stack_index: int, scaled: bool = True ) -> Union[list, None]: """Returns the (scaled) data pixel size for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to read the data. scaled: bool If scaled is True, the pixel sizes will be scaled by the corresponding scale factors as reported by MSRReader.get_data_units(). Returns ------- offsets: Union[list, None] Pixel sizes for 2D images, None otherwise. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the physical sizes phys_lengths = self.get_data_physical_sizes( stack_index=stack_index, scaled=scaled ) # Get the number of pixels along each dimension num_pixels = obf_stack_metadata.num_pixels[: obf_stack_metadata.rank] # Now divide by the image size pixel_sizes = np.array(phys_lengths) / np.array(num_pixels) # Return the pixel size as list return pixel_sizes.tolist()
def get_data_units(self, stack_index: int) ‑> Optional[tuple[list, list]]
-
Returns the data units and scale factors per dimension for requested stack.
Units are one of: "m": meters "kg": kilograms "s": s "A": Amperes "K": Kelvin "mol": moles "cd": candela "r": radian "sr": sr
Parameters
stack_index
:int
- Index of the stack for which to read the data.
Returns
unit
:Union[tuple[list, list], None]
- List of units and list of scale factors, or None if no file was opened.
Expand source code
def get_data_units(self, stack_index: int) -> Union[tuple[list, list], None]: """Returns the data units and scale factors per dimension for requested stack. Units are one of: "m": meters "kg": kilograms "s": s "A": Amperes "K": Kelvin "mol": moles "cd": candela "r": radian "sr": sr Parameters ---------- stack_index: int Index of the stack for which to read the data. Returns ------- unit: Union[tuple[list, list], None] List of units and list of scale factors, or None if no file was opened. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"stack_index={stack_index} is out of bounds.") # Get the metadata for the requested stack obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None units = [] scale_factors = [] for dim in range(obf_stack_metadata.rank): dimensions = obf_stack_metadata.si_dimensions[dim] scale_factors.append(dimensions.scale_factor) for i, exponent in enumerate(dimensions.exponents): if i == 0 and exponent.numerator > 0: units.append("m") break elif i == 1 and exponent.numerator > 0: units.append("kg") break elif i == 2 and exponent.numerator > 0: units.append("s") break elif i == 3 and exponent.numerator > 0: units.append("A") break elif i == 4 and exponent.numerator > 0: units.append("K") break elif i == 5 and exponent.numerator > 0: units.append("mol") break elif i == 6 and exponent.numerator > 0: units.append("cd") break elif i == 7 and exponent.numerator > 0: units.append("r") break elif i == 8 and exponent.numerator > 0: units.append("sr") break else: units.append("") break # Return the extracted units and scale factors return units, scale_factors
def get_image_info_dict(self)
-
Return a hierarchical dictionary of images from all stacks.
Expand source code
def get_image_info_dict(self): """Return a hierarchical dictionary of images from all stacks.""" # Initialize the dictionary images = {} # Do we have images? if self.num_stacks == 0: return images for i, stack in enumerate(self._obf_stacks_list): # Only return images if (np.array(stack.num_pixels) > 1).sum() == 2: # Get pixel size pixel_sizes = np.round( np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2 )[:2] # Get detector detector = self._get_detector( imspector_dictionary_root=stack.tag_dictionary["imspector"], img_name=stack.stack_name, ) # Get acquisition number match = re.match( r"^.+{(?P<index>\d+)}(?P<extra>.*)$", stack.stack_name, re.IGNORECASE, ) if match: if match["extra"] == "": key = f"Image {match['index']}" else: key = f"Image {match['index']} ({match['extra']})" else: key = stack.stack_name if key in images: image = images[key] else: image = { "metadata": "", "detectors": [], } # Metadata frame_size = ( stack.num_pixels[0] * pixel_sizes[0] / 1000, stack.num_pixels[1] * pixel_sizes[1] / 1000, ) # Build metadata string metadata = f"Frame: {frame_size[0]:.1f}x{frame_size[1]:.1f}µm - Pixel: {pixel_sizes[0]}nm" if image["metadata"] == "": image["metadata"] = metadata else: if image["metadata"] != metadata: raise ValueError(f"Unexpected metadata for '{key}'") # Append current detectir image["detectors"].append( { "index": i, "name": stack.stack_name, "detector": detector, "description": stack.stack_description, "num_pixels": stack.num_pixels, "physical_lengths": stack.physical_lengths, "physical_offsets": stack.physical_offsets, "pixel_sizes": pixel_sizes, } ) # Store the (updated) image in the dictionary images[key] = image # Sort the dictionary using natural sorting of its keys images = dict(natsorted(images.items())) # Return the extracted metadata return images
def get_image_info_list(self)
-
Return a list of images from all stacks.
Expand source code
def get_image_info_list(self): """Return a list of images from all stacks.""" # Initialize the list images = [] # Do we have images? if self.num_stacks == 0: return images for i, stack in enumerate(self._obf_stacks_list): # Only return images if (np.array(stack.num_pixels) > 1).sum() == 2: # Get pixel size pixel_sizes = np.round( np.array(self.get_data_pixel_sizes(stack_index=i)) * 1e9, 2 )[:2] # Get detector detector = self._get_detector( imspector_dictionary_root=stack.tag_dictionary["imspector"], img_name=stack.stack_name, ) # Build a (univocal) summary string as_string = ( f"{detector}: {stack.stack_name}: " f"size = (h={stack.num_pixels[1]} x w={stack.num_pixels[0]}); " f"pixel size = {pixel_sizes[0]}nm " f"(index = {i})" ) images.append( { "index": i, "name": stack.stack_name, "detector": detector, "description": stack.stack_description, "num_pixels": stack.num_pixels, "physical_lengths": stack.physical_lengths, "physical_offsets": stack.physical_offsets, "pixel_sizes": pixel_sizes, "as_string": as_string, } ) # Sort the list using natural sorting by the 'as_string' key images = natsorted(images, key=lambda x: x["as_string"]) # Return the extracted metadata return images
def get_ome_xml_metadata(self) ‑> Optional[str]
-
Return the OME XML metadata.
Returns
ome_xml_metadata
:Union[str, None]
- OME XML metadata as formatted string. If no file was loaded, returns None.
Expand source code
def get_ome_xml_metadata(self) -> Union[str, None]: """Return the OME XML metadata. Returns ------- ome_xml_metadata: Union[str, None] OME XML metadata as formatted string. If no file was loaded, returns None. """ # Get the ome-xml tree root = self.obf_file_metadata.tree if root is None: return None # Return metadata as formatted XML string return self._tree_to_formatted_xml(root)
def get_tag_dictionary(self, stack_index: int) ‑> Optional[dict]
-
Return the tag dictionary for the requested stack.
Parameters
stack_index
:int
- Index of the stack for which to return the tag dictionary.
Returns
tag_dictionary
:Union[dict, None]
- Dictionary. If no file was loaded, returns None.
Expand source code
def get_tag_dictionary(self, stack_index: int) -> Union[dict, None]: """Return the tag dictionary for the requested stack. Parameters ---------- stack_index: int Index of the stack for which to return the tag dictionary. Returns ------- tag_dictionary: Union[dict, None] Dictionary. If no file was loaded, returns None. """ if stack_index < 0 or stack_index > len(self._obf_stacks_list): raise ValueError(f"Stack number {stack_index} is out of range.") # Get stack metadata obf_stack_metadata = self._obf_stacks_list[stack_index] if obf_stack_metadata is None: return None # Get the tag dictionary tag = obf_stack_metadata.tag_dictionary # Return the tag dictionary return tag
def scan(self) ‑> bool
-
Scan the metadata of the file.
Returns
success
:bool
- True if the file was scanned successfully, False otherwise.
Expand source code
def scan(self) -> bool: """Scan the metadata of the file. Returns ------- success: bool True if the file was scanned successfully, False otherwise. """ # Open the file with open(self.filename, mode="rb") as f: if not self._read_obf_header(f): return False # Scan metadata self.obf_file_metadata = self._scan_metadata(f, self.obf_file_header) # Get the first stack position next_stack_pos = self.obf_file_header.first_stack_pos while next_stack_pos != 0: # Scan the next stack success, obs_stack_metadata = self._read_obf_stack(f, next_stack_pos) if not success: return False # Append current stack header self._obf_stacks_list.append(obs_stack_metadata) # Do we have a next header to parse? next_stack_pos = obs_stack_metadata.next_stack_pos return True
class MinFluxReader (filename: Union[pathlib.Path, str], valid: bool = True, z_scaling_factor: float = 1.0, is_tracking: bool = False, pool_dcr: bool = False, dwell_time: float = 1.0)
-
Constructor.
Parameters
filename
:Union[Path, str]
- Full path to the
.pmx
,.npy
or.mat
file to read valid
:bool (optional
, default= True)
- Whether to load only valid localizations.
z_scaling_factor
:float (optional
, default= 1.0)
- Refractive index mismatch correction factor to apply to the z coordinates.
is_tracking
:bool (optional
, default= False)
- Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment.
pool_dcr
:bool (optional
, default= False)
- Whether to pool DCR values weighted by the relative ECO of all relocalized iterations.
dwell_time
:float (optional
, default1.0)
- Dwell time in milliseconds.
Expand source code
class MinFluxReader: __docs__ = "Reader of MINFLUX data in `.pmx`, `.npy` or `.mat` formats." __slots__ = [ "_pool_dcr", "_cfr_index", "_data_array", "_data_df", "_data_full_df", "_dcr_index", "_dwell_time", "_eco_index", "_efo_index", "_filename", "_is_3d", "_is_aggregated", "_is_last_valid", "_is_tracking", "_last_valid", "_last_valid_cfr", "_loc_index", "_relocalizations", "_reps", "_tid_index", "_tim_index", "_unit_scaling_factor", "_valid", "_valid_cfr", "_valid_entries", "_vld_index", "_z_scaling_factor", ] def __init__( self, filename: Union[Path, str], valid: bool = True, z_scaling_factor: float = 1.0, is_tracking: bool = False, pool_dcr: bool = False, dwell_time: float = 1.0, ): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx`, `.npy` or `.mat` file to read valid: bool (optional, default = True) Whether to load only valid localizations. z_scaling_factor: float (optional, default = 1.0) Refractive index mismatch correction factor to apply to the z coordinates. is_tracking: bool (optional, default = False) Whether the dataset comes from a tracking experiment; otherwise, it is considered as a localization experiment. pool_dcr: bool (optional, default = False) Whether to pool DCR values weighted by the relative ECO of all relocalized iterations. dwell_time: float (optional, default 1.0) Dwell time in milliseconds. """ # Store the filename self._filename: Path = Path(filename) if not self._filename.is_file(): raise IOError(f"The file {self._filename} does not seem to exist.") # Keep track of whether the chosen sequence is the last valid. self._is_last_valid: bool = False # Store the valid flag self._valid: bool = valid # The localizations are stored in meters in the Imspector files and by # design also in the `.pmx` format. Here, we scale them to be in nm self._unit_scaling_factor: float = 1e9 # Store the z correction factor self._z_scaling_factor: float = z_scaling_factor # Store the dwell time self._dwell_time = dwell_time # Initialize the data self._data_array = None self._data_df = None self._data_full_df = None self._valid_entries = None # Whether the acquisition is 2D or 3D self._is_3d: bool = False # Whether the acquisition is a tracking dataset self._is_tracking: bool = is_tracking # Whether to pool the dcr values self._pool_dcr = pool_dcr # Whether the file contains aggregate measurements self._is_aggregated: bool = False # Indices dependent on 2D or 3D acquisition and whether the # data comes from a localization or a tracking experiment. self._reps: int = -1 self._efo_index: int = -1 self._cfr_index: int = -1 self._dcr_index: int = -1 self._eco_index: int = -1 self._loc_index: int = -1 self._valid_cfr: list = [] self._relocalizations: list = [] # Constant indices self._tid_index: int = 0 self._tim_index: int = 0 self._vld_index: int = 0 # Keep track of the last valid global and CFR iterations as returned # by the initial scan self._last_valid: int = -1 self._last_valid_cfr: int = -1 # Load the file if not self._load(): raise IOError(f"The file {self._filename} is not a valid MINFLUX file.") @property def is_last_valid(self) -> Union[bool, None]: """Return True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet, `is_last_valid` will be None.""" if self._data_df is None: return None return self._is_last_valid @property def z_scaling_factor(self) -> float: """Returns the scaling factor for the z coordinates.""" return self._z_scaling_factor @property def is_3d(self) -> bool: """Returns True is the acquisition is 3D, False otherwise.""" return self._is_3d @property def is_aggregated(self) -> bool: """Returns True is the acquisition is aggregated, False otherwise.""" return self._is_aggregated @property def is_tracking(self) -> bool: """Returns True for a tracking acquisition, False otherwise.""" return self._is_tracking @property def is_pool_dcr(self) -> bool: """Returns True if the DCR values over all relocalized iterations (to use all photons).""" return self._pool_dcr @property def dwell_time(self) -> float: """Returns the dwell time.""" return self._dwell_time @property def num_valid_entries(self) -> int: """Number of valid entries.""" if self._data_array is None: return 0 return self._valid_entries.sum() @property def num_invalid_entries(self) -> int: """Number of valid entries.""" if self._data_array is None: return 0 return np.logical_not(self._valid_entries).sum() @property def valid_cfr(self) -> list: """Return the iterations with valid CFR measurements. Returns ------- cfr: boolean array with True for the iteration indices that have a valid measurement. """ if self._data_array is None: return [] return self._valid_cfr @property def relocalizations(self) -> list: """Return the iterations with relocalizations. Returns ------- reloc: boolean array with True for the iteration indices that are relocalized. """ if self._data_array is None: return [] return self._relocalizations @property def valid_raw_data(self) -> Union[None, np.ndarray]: """Return the raw data.""" if self._data_array is None: return None return self._data_array[self._valid_entries].copy() @property def processed_dataframe(self) -> Union[None, pd.DataFrame]: """Return the raw data as dataframe (some properties only).""" if self._data_df is not None: return self._data_df self._data_df = self._process() return self._data_df @property def raw_data_dataframe(self) -> Union[None, pd.DataFrame]: """Return the raw data as dataframe (some properties only).""" if self._data_full_df is not None: return self._data_full_df self._data_full_df = self._raw_data_to_full_dataframe() return self._data_full_df @property def filename(self) -> Union[Path, None]: """Return the filename if set.""" if self._filename is None: return None return Path(self._filename) def set_indices(self, index, cfr_index, process: bool = True): """Set the parameter indices. We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured. Parameters ---------- index: int Global iteration index for all parameters but cfr cfr_index: int Iteration index for cfr process: bool (Optional, default = True) By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Make sure there is loaded data if self._data_array is None: raise ValueError("No data loaded.") if self._reps == -1: raise ValueError("No data loaded.") if len(self._valid_cfr) == 0: raise ValueError("No data loaded.") # Check that the arguments are compatible with the loaded data if index < 0 or index > self._reps - 1: raise ValueError( f"The value of index must be between 0 and {self._reps - 1}." ) if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1: raise ValueError( f"The value of index must be between 0 and {len(self._valid_cfr) - 1}." ) # Now set the general values self._efo_index = index self._dcr_index = index self._eco_index = index self._loc_index = index # Set the cfr index self._cfr_index = cfr_index # Constant indices self._tid_index: int = 0 self._tim_index: int = 0 self._vld_index: int = 0 # Re-process the file? If the processed dataframe already exists, # the processing will take place anyway. if process or self._data_df is not None: self._process() def set_tracking(self, is_tracking: bool, process: bool = True): """Sets whether the acquisition is tracking or localization. Parameters ---------- is_tracking: bool Set to True for a tracking acquisition, False for a localization acquisition. process: bool (Optional, default = True) By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._is_tracking = is_tracking # Re-process the file? if process or self._data_df is not None: self._process() def set_dwell_time(self, dwell_time: float, process: bool = True): """ Sets the dwell time. Parameters ---------- dwell_time: float Dwell time. process: bool (Optional, default = True) By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._dwell_time = dwell_time # Re-process the file? if process or self._data_df is not None: self._process() def set_pool_dcr(self, pool_dcr: bool, process: bool = True): """ Sets whether the DCR values should be pooled (and weighted by ECO). Parameters ---------- pool_dcr: bool Whether the DCR values should be pooled (and weighted by ECO). process: bool (Optional, default = True) By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._pool_dcr = pool_dcr # Re-process the file? if process or self._data_df is not None: self._process() @classmethod def processed_properties(cls) -> list: """Returns the properties read from the file that correspond to the processed dataframe column names.""" return [ "tid", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr", "dwell", "fluo", ] @classmethod def raw_properties(cls) -> list: """Returns the properties read from the file and dynamic that correspond to the raw dataframe column names.""" return ["tid", "aid", "vld", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr"] def _load(self) -> bool: """Load the file.""" if not self._filename.is_file(): print(f"File {self._filename} does not exist.") return False # Call the specialized _load_*() function if self._filename.name.lower().endswith(".npy"): try: data_array = np.load(str(self._filename)) if "fluo" in data_array.dtype.names: self._data_array = data_array else: self._data_array = migrate_npy_array(data_array) except ( OSError, UnpicklingError, ValueError, EOFError, FileNotFoundError, TypeError, Exception, ) as e: print(f"Could not open {self._filename}: {e}") return False elif self._filename.name.lower().endswith(".mat"): try: self._data_array = convert_from_mat(self._filename) except Exception as e: print(f"Could not open {self._filename}: {e}") return False elif self._filename.name.lower().endswith(".pmx"): try: self._data_array = NativeArrayReader().read(self._filename) if self._data_array is None: print(f"Could not open {self._filename}.") return False except Exception as e: print(f"Could not open {self._filename}: {e}") return False else: print(f"Unexpected file {self._filename}.") return False # Store a logical array with the valid entries self._valid_entries = self._data_array["vld"] # Cache whether the data is 2D or 3D and whether is aggregated # The cases are different for localization vs. tracking experiments # num_locs = self._data_array["itr"].shape[1] self._is_3d = ( float(np.nanmean(self._data_array["itr"][:, -1]["loc"][:, -1])) != 0.0 ) # Set all relevant indices self._set_all_indices() # Return success return True def _process(self) -> Union[None, pd.DataFrame]: """Returns processed dataframe for valid (or invalid) entries. Returns ------- df: pd.DataFrame Processed data as DataFrame. """ # Do we have a data array to work on? if self._data_array is None: return None if self._valid: indices = self._valid_entries else: indices = np.logical_not(self._valid_entries) # Extract the valid iterations itr = self._data_array["itr"][indices] # Extract the valid identifiers tid = self._data_array["tid"][indices] # Extract the valid time points tim = self._data_array["tim"][indices] # Extract the fluorophore IDs fluo = self._data_array["fluo"][indices] if np.all(fluo) == 0: fluo = np.ones(fluo.shape, dtype=fluo.dtype) # The following extraction pattern will change whether the # acquisition is normal or aggregated if self.is_aggregated: # Extract the locations loc = itr["loc"].squeeze() * self._unit_scaling_factor loc[:, 2] = loc[:, 2] * self._z_scaling_factor # Extract EFO efo = itr["efo"] # Extract CFR cfr = itr["cfr"] # Extract ECO eco = itr["eco"] # Extract DCR dcr = itr["dcr"] # Dwell dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0) else: # Extract the locations loc = itr[:, self._loc_index]["loc"] * self._unit_scaling_factor loc[:, 2] = loc[:, 2] * self._z_scaling_factor # Extract EFO efo = itr[:, self._efo_index]["efo"] # Extract CFR cfr = itr[:, self._cfr_index]["cfr"] # Extract ECO eco = itr[:, self._eco_index]["eco"] # Pool DCR values? if self._pool_dcr and np.sum(self._relocalizations) > 1: # Calculate ECO contributions eco_all = itr[:, self._relocalizations]["eco"] eco_sum = eco_all.sum(axis=1) eco_all_norm = eco_all / eco_sum.reshape(-1, 1) # Extract DCR values and weigh them by the relative ECO contributions dcr = itr[:, self._relocalizations]["dcr"] dcr = dcr * eco_all_norm dcr = dcr.sum(axis=1) else: # Extract DCR dcr = itr[:, self._dcr_index]["dcr"] # Calculate dwell dwell = np.around((eco / (efo / 1000)) / self._dwell_time, decimals=0) # Create a Pandas dataframe for the results df = pd.DataFrame( index=pd.RangeIndex(start=0, stop=len(tid)), columns=MinFluxReader.processed_properties(), ) # Store the extracted valid hits into the dataframe df["tid"] = tid df["x"] = loc[:, 0] df["y"] = loc[:, 1] df["z"] = loc[:, 2] df["tim"] = tim df["efo"] = efo df["cfr"] = cfr df["eco"] = eco df["dcr"] = dcr df["dwell"] = dwell df["fluo"] = fluo # Remove rows with NaNs in the loc matrix df = df.dropna(subset=["x"]) # Check if the selected indices correspond to the last valid iteration self._is_last_valid = bool( self._cfr_index == self._last_valid_cfr and self._efo_index == self._last_valid ) return df def _raw_data_to_full_dataframe(self) -> Union[None, pd.DataFrame]: """Return raw data arranged into a dataframe.""" if self._data_array is None: return None # Initialize output dataframe df = pd.DataFrame(columns=MinFluxReader.raw_properties()) # Allocate space for the columns n_rows = len(self._data_array) * self._reps # Get all unique TIDs and their counts _, tid_counts = np.unique(self._data_array["tid"], return_counts=True) # Get all tids (repeated over the repetitions) tid = np.repeat(self._data_array["tid"], self._reps) # Create virtual IDs to mark the measurements of repeated tids # @TODO Optimize this! aid = np.zeros((n_rows, 1), dtype=np.int32) index = 0 for c in np.nditer(tid_counts): tmp = np.repeat(np.arange(c), self._reps) n = len(tmp) aid[index : index + n, 0] = tmp index += n # Get all valid flags (repeated over the repetitions) vld = np.repeat(self._data_array["vld"], self._reps) # Get all timepoints (repeated over the repetitions) tim = np.repeat(self._data_array["tim"], self._reps) # Get all localizations (reshaped to drop the first dimension) loc = ( self._data_array["itr"]["loc"].reshape((n_rows, 3)) * self._unit_scaling_factor ) loc[:, 2] = loc[:, 2] * self._z_scaling_factor # Get all efos (reshaped to drop the first dimension) efo = self._data_array["itr"]["efo"].reshape((n_rows, 1)) # Get all cfrs (reshaped to drop the first dimension) cfr = self._data_array["itr"]["cfr"].reshape((n_rows, 1)) # Get all ecos (reshaped to drop the first dimension) eco = self._data_array["itr"]["eco"].reshape((n_rows, 1)) # Get all dcrs (reshaped to drop the first dimension) dcr = self._data_array["itr"]["dcr"].reshape((n_rows, 1)) # Build the dataframe df["tid"] = tid.astype(np.int32) df["aid"] = aid.astype(np.int32) df["vld"] = vld df["tim"] = tim df["x"] = loc[:, 0] df["y"] = loc[:, 1] df["z"] = loc[:, 2] df["efo"] = efo df["cfr"] = cfr df["eco"] = eco df["dcr"] = dcr return df def _set_all_indices(self): """Set indices of properties to be read.""" if self._data_array is None: return False # Number of iterations self._reps = self._data_array["itr"].shape[1] # Is this an aggregated acquisition? if self._reps == 1: self._is_aggregated = True else: self._is_aggregated = False # Query the data to find the last valid iteration # for all measurements last_valid = find_last_valid_iteration(self._data_array) # Set the extracted indices self._efo_index = last_valid["efo_index"] self._cfr_index = last_valid["cfr_index"] self._dcr_index = last_valid["dcr_index"] self._eco_index = last_valid["eco_index"] self._loc_index = last_valid["loc_index"] self._valid_cfr = last_valid["valid_cfr"] self._relocalizations = last_valid["reloc"] # Keep track of the last valid iteration self._last_valid = len(self._valid_cfr) - 1 self._last_valid_cfr = last_valid["cfr_index"] def __repr__(self) -> str: """String representation of the object.""" if self._data_array is None: return "No file loaded." str_valid = ( "all valid" if len(self._data_array) == self.num_valid_entries else f"{self.num_valid_entries} valid and {self.num_invalid_entries} non valid" ) str_acq = "3D" if self.is_3d else "2D" aggr_str = "aggregated" if self.is_aggregated else "normal" return ( f"File: {self._filename.name}: " f"{str_acq} {aggr_str} acquisition with {len(self._data_array)} entries ({str_valid})." ) def __str__(self) -> str: """Human-friendly representation of the object.""" return self.__repr__()
Static methods
def processed_properties() ‑> list
-
Returns the properties read from the file that correspond to the processed dataframe column names.
Expand source code
@classmethod def processed_properties(cls) -> list: """Returns the properties read from the file that correspond to the processed dataframe column names.""" return [ "tid", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr", "dwell", "fluo", ]
def raw_properties() ‑> list
-
Returns the properties read from the file and dynamic that correspond to the raw dataframe column names.
Expand source code
@classmethod def raw_properties(cls) -> list: """Returns the properties read from the file and dynamic that correspond to the raw dataframe column names.""" return ["tid", "aid", "vld", "tim", "x", "y", "z", "efo", "cfr", "eco", "dcr"]
Instance variables
var dwell_time : float
-
Returns the dwell time.
Expand source code
@property def dwell_time(self) -> float: """Returns the dwell time.""" return self._dwell_time
var filename : Optional[pathlib.Path]
-
Return the filename if set.
Expand source code
@property def filename(self) -> Union[Path, None]: """Return the filename if set.""" if self._filename is None: return None return Path(self._filename)
var is_3d : bool
-
Returns True is the acquisition is 3D, False otherwise.
Expand source code
@property def is_3d(self) -> bool: """Returns True is the acquisition is 3D, False otherwise.""" return self._is_3d
var is_aggregated : bool
-
Returns True is the acquisition is aggregated, False otherwise.
Expand source code
@property def is_aggregated(self) -> bool: """Returns True is the acquisition is aggregated, False otherwise.""" return self._is_aggregated
var is_last_valid : Optional[bool]
-
Return True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet,
is_last_valid
will be None.Expand source code
@property def is_last_valid(self) -> Union[bool, None]: """Return True if the selected iteration is the "last valid", False otherwise. If the dataframe has not been processed yet, `is_last_valid` will be None.""" if self._data_df is None: return None return self._is_last_valid
var is_pool_dcr : bool
-
Returns True if the DCR values over all relocalized iterations (to use all photons).
Expand source code
@property def is_pool_dcr(self) -> bool: """Returns True if the DCR values over all relocalized iterations (to use all photons).""" return self._pool_dcr
var is_tracking : bool
-
Returns True for a tracking acquisition, False otherwise.
Expand source code
@property def is_tracking(self) -> bool: """Returns True for a tracking acquisition, False otherwise.""" return self._is_tracking
var num_invalid_entries : int
-
Number of valid entries.
Expand source code
@property def num_invalid_entries(self) -> int: """Number of valid entries.""" if self._data_array is None: return 0 return np.logical_not(self._valid_entries).sum()
var num_valid_entries : int
-
Number of valid entries.
Expand source code
@property def num_valid_entries(self) -> int: """Number of valid entries.""" if self._data_array is None: return 0 return self._valid_entries.sum()
var processed_dataframe : Optional[None]
-
Return the raw data as dataframe (some properties only).
Expand source code
@property def processed_dataframe(self) -> Union[None, pd.DataFrame]: """Return the raw data as dataframe (some properties only).""" if self._data_df is not None: return self._data_df self._data_df = self._process() return self._data_df
var raw_data_dataframe : Optional[None]
-
Return the raw data as dataframe (some properties only).
Expand source code
@property def raw_data_dataframe(self) -> Union[None, pd.DataFrame]: """Return the raw data as dataframe (some properties only).""" if self._data_full_df is not None: return self._data_full_df self._data_full_df = self._raw_data_to_full_dataframe() return self._data_full_df
var relocalizations : list
-
Return the iterations with relocalizations.
Returns
reloc: boolean array with True for the iteration indices that are relocalized.
Expand source code
@property def relocalizations(self) -> list: """Return the iterations with relocalizations. Returns ------- reloc: boolean array with True for the iteration indices that are relocalized. """ if self._data_array is None: return [] return self._relocalizations
var valid_cfr : list
-
Return the iterations with valid CFR measurements.
Returns
cfr
:boolean array with True for the iteration indices
- that have a valid measurement.
Expand source code
@property def valid_cfr(self) -> list: """Return the iterations with valid CFR measurements. Returns ------- cfr: boolean array with True for the iteration indices that have a valid measurement. """ if self._data_array is None: return [] return self._valid_cfr
var valid_raw_data : Optional[numpy.ndarray]
-
Return the raw data.
Expand source code
@property def valid_raw_data(self) -> Union[None, np.ndarray]: """Return the raw data.""" if self._data_array is None: return None return self._data_array[self._valid_entries].copy()
var z_scaling_factor : float
-
Returns the scaling factor for the z coordinates.
Expand source code
@property def z_scaling_factor(self) -> float: """Returns the scaling factor for the z coordinates.""" return self._z_scaling_factor
Methods
def set_dwell_time(self, dwell_time: float, process: bool = True)
-
Sets the dwell time.
Parameters
dwell_time
:float
- Dwell time.
process
:bool (Optional
, default= True)
- By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_dwell_time(self, dwell_time: float, process: bool = True): """ Sets the dwell time. Parameters ---------- dwell_time: float Dwell time. process: bool (Optional, default = True) By default, when setting the dwell time, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._dwell_time = dwell_time # Re-process the file? if process or self._data_df is not None: self._process()
def set_indices(self, index, cfr_index, process: bool = True)
-
Set the parameter indices.
We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured.
Parameters
index
:int
- Global iteration index for all parameters but cfr
cfr_index
:int
- Iteration index for cfr
process
:bool (Optional
, default= True)
- By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_indices(self, index, cfr_index, process: bool = True): """Set the parameter indices. We distinguish between the index of all parameters that are always measured and are accessed from the same iteration, and the cfr index, that is not always measured. Parameters ---------- index: int Global iteration index for all parameters but cfr cfr_index: int Iteration index for cfr process: bool (Optional, default = True) By default, when setting the indices, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Make sure there is loaded data if self._data_array is None: raise ValueError("No data loaded.") if self._reps == -1: raise ValueError("No data loaded.") if len(self._valid_cfr) == 0: raise ValueError("No data loaded.") # Check that the arguments are compatible with the loaded data if index < 0 or index > self._reps - 1: raise ValueError( f"The value of index must be between 0 and {self._reps - 1}." ) if cfr_index < 0 or cfr_index > len(self._valid_cfr) - 1: raise ValueError( f"The value of index must be between 0 and {len(self._valid_cfr) - 1}." ) # Now set the general values self._efo_index = index self._dcr_index = index self._eco_index = index self._loc_index = index # Set the cfr index self._cfr_index = cfr_index # Constant indices self._tid_index: int = 0 self._tim_index: int = 0 self._vld_index: int = 0 # Re-process the file? If the processed dataframe already exists, # the processing will take place anyway. if process or self._data_df is not None: self._process()
def set_pool_dcr(self, pool_dcr: bool, process: bool = True)
-
Sets whether the DCR values should be pooled (and weighted by ECO).
Parameters
pool_dcr
:bool
- Whether the DCR values should be pooled (and weighted by ECO).
process
:bool (Optional
, default= True)
- By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_pool_dcr(self, pool_dcr: bool, process: bool = True): """ Sets whether the DCR values should be pooled (and weighted by ECO). Parameters ---------- pool_dcr: bool Whether the DCR values should be pooled (and weighted by ECO). process: bool (Optional, default = True) By default, when setting the DCR binning flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._pool_dcr = pool_dcr # Re-process the file? if process or self._data_df is not None: self._process()
def set_tracking(self, is_tracking: bool, process: bool = True)
-
Sets whether the acquisition is tracking or localization.
Parameters
is_tracking
:bool
- Set to True for a tracking acquisition, False for a localization acquisition.
process
:bool (Optional
, default= True)
- By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place.
Expand source code
def set_tracking(self, is_tracking: bool, process: bool = True): """Sets whether the acquisition is tracking or localization. Parameters ---------- is_tracking: bool Set to True for a tracking acquisition, False for a localization acquisition. process: bool (Optional, default = True) By default, when setting the tracking flag, the data is rescanned and the dataframe is rebuilt. In case several properties of the MinFluxReader are modified sequentially, the processing can be disabled and run only once after the last change. However, this only applies after the first load/scan, when the processed dataframe has not been created yet. If the dataframe already exists, this flag will be ignored and the processing will take place. """ # Update the flag self._is_tracking = is_tracking # Re-process the file? if process or self._data_df is not None: self._process()
class NativeArrayReader
-
Reads the native NumPy array from
.pmx
files.Expand source code
class NativeArrayReader: """Reads the native NumPy array from `.pmx` files.""" @staticmethod def read(filename: Union[Path, str]): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ # Open the file and read the data with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version != "1.0" and file_version != "2.0": return None # We only read the raw NumPy array data_array = f["raw/npy"][:] return data_array
Static methods
def read(filename: Union[pathlib.Path, str])
-
Constructor.
Parameters
filename
:Union[Path, str]
- Full path to the
.pmx
file to scan.
Expand source code
@staticmethod def read(filename: Union[Path, str]): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ # Open the file and read the data with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version != "1.0" and file_version != "2.0": return None # We only read the raw NumPy array data_array = f["raw/npy"][:] return data_array
class NativeDataFrameReader
-
Reads the Pandas DataFrame from
.pmx
files.Expand source code
class NativeDataFrameReader: """Reads the Pandas DataFrame from `.pmx` files.""" @staticmethod def read(filename: Union[Path, str]): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version != "1.0" and file_version != "2.0": return None # Read dataset dataset = f["/paraview/dataframe"] # Read the NumPy data data_array = dataset[:] # Read column names column_names = dataset.attrs["column_names"] # Read column data types column_types = dataset.attrs["column_types"] # Read the index index_data = f["/paraview/dataframe_index"][:] # Create DataFrame with specified columns df = pd.DataFrame(data_array, index=index_data, columns=column_names) # Apply column data types for col, dtype in zip(column_names, column_types): df[col] = df[col].astype(dtype) return df
Static methods
def read(filename: Union[pathlib.Path, str])
-
Constructor.
Parameters
filename
:Union[Path, str]
- Full path to the
.pmx
file to scan.
Expand source code
@staticmethod def read(filename: Union[Path, str]): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version != "1.0" and file_version != "2.0": return None # Read dataset dataset = f["/paraview/dataframe"] # Read the NumPy data data_array = dataset[:] # Read column names column_names = dataset.attrs["column_names"] # Read column data types column_types = dataset.attrs["column_types"] # Read the index index_data = f["/paraview/dataframe_index"][:] # Create DataFrame with specified columns df = pd.DataFrame(data_array, index=index_data, columns=column_names) # Apply column data types for col, dtype in zip(column_names, column_types): df[col] = df[col].astype(dtype) return df
class NativeMetadataReader
-
Reads metadata information from
.pmx
files.Expand source code
class NativeMetadataReader: """Reads metadata information from `.pmx` files.""" @staticmethod def scan(filename: Union[Path, str]): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ # Open the file with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version != "1.0" and file_version != "2.0": return None # Version 1 parameters try: z_scaling_factor = float(f["parameters/z_scaling_factor"][()]) except KeyError: return None try: min_trace_length = int(f["parameters/min_trace_length"][()]) except KeyError: return None try: efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:]) except KeyError as e: efo_thresholds = None try: cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:]) except KeyError as e: cfr_thresholds = None try: num_fluorophores = int(f["parameters/num_fluorophores"][()]) except KeyError: return None # Version 2.0 parameters if file_version == "2.0": try: # This setting can be missing tr_len_thresholds = tuple( f["parameters/applied_tr_len_thresholds"][:] ) except KeyError as e: tr_len_thresholds = None try: dwell_time = float(f["parameters/dwell_time"][()]) except KeyError as e: return None try: # This setting can be missing time_thresholds = tuple(f["parameters/applied_time_thresholds"][:]) except KeyError as e: time_thresholds = None # HDF5 does not have a native boolean type, so we save as int8 and convert it # back to boolean on read. try: is_tracking = bool(f["parameters/is_tracking"][()]) except KeyError as e: return None try: pool_dcr = bool(f["parameters/pool_dcr"][()]) except KeyError as e: # This is an addendum to version 2.0, and we allow it to be missing. # It will fall back to False. pool_dcr = False try: scale_bar_size = float(f["parameters/scale_bar_size"][()]) except KeyError as e: return None else: tr_len_thresholds = None time_thresholds = None dwell_time = 1.0 is_tracking = False pool_dcr = False scale_bar_size = 500 # Store and return metadata = NativeMetadata( pool_dcr=pool_dcr, cfr_thresholds=cfr_thresholds, dwell_time=dwell_time, efo_thresholds=efo_thresholds, is_tracking=is_tracking, min_trace_length=min_trace_length, num_fluorophores=num_fluorophores, scale_bar_size=scale_bar_size, time_thresholds=time_thresholds, tr_len_thresholds=tr_len_thresholds, z_scaling_factor=z_scaling_factor, ) return metadata
Static methods
def scan(filename: Union[pathlib.Path, str])
-
Constructor.
Parameters
filename
:Union[Path, str]
- Full path to the
.pmx
file to scan.
Expand source code
@staticmethod def scan(filename: Union[Path, str]): """Constructor. Parameters ---------- filename: Union[Path, str] Full path to the `.pmx` file to scan. """ # Open the file with h5py.File(filename, "r") as f: # Read the file_version attribute file_version = f.attrs["file_version"] if file_version != "1.0" and file_version != "2.0": return None # Version 1 parameters try: z_scaling_factor = float(f["parameters/z_scaling_factor"][()]) except KeyError: return None try: min_trace_length = int(f["parameters/min_trace_length"][()]) except KeyError: return None try: efo_thresholds = tuple(f["parameters/applied_efo_thresholds"][:]) except KeyError as e: efo_thresholds = None try: cfr_thresholds = tuple(f["parameters/applied_cfr_thresholds"][:]) except KeyError as e: cfr_thresholds = None try: num_fluorophores = int(f["parameters/num_fluorophores"][()]) except KeyError: return None # Version 2.0 parameters if file_version == "2.0": try: # This setting can be missing tr_len_thresholds = tuple( f["parameters/applied_tr_len_thresholds"][:] ) except KeyError as e: tr_len_thresholds = None try: dwell_time = float(f["parameters/dwell_time"][()]) except KeyError as e: return None try: # This setting can be missing time_thresholds = tuple(f["parameters/applied_time_thresholds"][:]) except KeyError as e: time_thresholds = None # HDF5 does not have a native boolean type, so we save as int8 and convert it # back to boolean on read. try: is_tracking = bool(f["parameters/is_tracking"][()]) except KeyError as e: return None try: pool_dcr = bool(f["parameters/pool_dcr"][()]) except KeyError as e: # This is an addendum to version 2.0, and we allow it to be missing. # It will fall back to False. pool_dcr = False try: scale_bar_size = float(f["parameters/scale_bar_size"][()]) except KeyError as e: return None else: tr_len_thresholds = None time_thresholds = None dwell_time = 1.0 is_tracking = False pool_dcr = False scale_bar_size = 500 # Store and return metadata = NativeMetadata( pool_dcr=pool_dcr, cfr_thresholds=cfr_thresholds, dwell_time=dwell_time, efo_thresholds=efo_thresholds, is_tracking=is_tracking, min_trace_length=min_trace_length, num_fluorophores=num_fluorophores, scale_bar_size=scale_bar_size, time_thresholds=time_thresholds, tr_len_thresholds=tr_len_thresholds, z_scaling_factor=z_scaling_factor, ) return metadata