Module pyminflux.writer

Writer of processed MINFLUX data.

Expand source code
#  Copyright (c) 2022 - 2024 D-BSSE, ETH Zurich.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

__doc__ = "Writer of processed MINFLUX data."
__all__ = ["MinFluxWriter", "PyMinFluxNativeWriter"]

from ._native_writer import PyMinFluxNativeWriter
from ._writer import MinFluxWriter

Classes

class MinFluxWriter
Expand source code
class MinFluxWriter:
    __docs__ = "Writer of (filtered) MINFLUX data to either `.npy` or `.csv` formats."

    @staticmethod
    def write_npy(processor: MinFluxProcessor, file_name: Union[Path, str]) -> bool:
        """Write (filtered) data as a NumPy structured array with the same structure as in the Imspector `.npy` file."""

        # Get the raw data
        filtered_array = processor.filtered_numpy_array
        if filtered_array is None:
            return False

        # Save the filtered structured NumPy array to disk
        try:
            np.save(str(file_name), filtered_array)
        except Exception as e:
            print(f"Could not save {file_name}: {e}")
            return False

        return True

    @staticmethod
    def write_csv(processor: MinFluxProcessor, file_name: Union[Path, str]) -> bool:
        """Write (filtered) data as a comma-separated-value `.csv` file."""

        # Save the filtered dataframe to disk
        try:
            processor.filtered_dataframe.to_csv(file_name, index=False)
        except Exception as e:
            print(f"Could not save {file_name}: {e}")
            return False

        return True

Static methods

def write_csv(processor: pyminflux.processor.MinFluxProcessor, file_name: Union[pathlib.Path, str]) ‑> bool

Write (filtered) data as a comma-separated-value .csv file.

Expand source code
@staticmethod
def write_csv(processor: MinFluxProcessor, file_name: Union[Path, str]) -> bool:
    """Write (filtered) data as a comma-separated-value `.csv` file."""

    # Save the filtered dataframe to disk
    try:
        processor.filtered_dataframe.to_csv(file_name, index=False)
    except Exception as e:
        print(f"Could not save {file_name}: {e}")
        return False

    return True
def write_npy(processor: pyminflux.processor.MinFluxProcessor, file_name: Union[pathlib.Path, str]) ‑> bool

Write (filtered) data as a NumPy structured array with the same structure as in the Imspector .npy file.

Expand source code
@staticmethod
def write_npy(processor: MinFluxProcessor, file_name: Union[Path, str]) -> bool:
    """Write (filtered) data as a NumPy structured array with the same structure as in the Imspector `.npy` file."""

    # Get the raw data
    filtered_array = processor.filtered_numpy_array
    if filtered_array is None:
        return False

    # Save the filtered structured NumPy array to disk
    try:
        np.save(str(file_name), filtered_array)
    except Exception as e:
        print(f"Could not save {file_name}: {e}")
        return False

    return True
class PyMinFluxNativeWriter (processor: pyminflux.processor.MinFluxProcessor)
Expand source code
class PyMinFluxNativeWriter:
    __docs__ = "Writer of (processed) MINFLUX into native `.pmx` format."

    def __init__(self, processor: MinFluxProcessor):
        self.processor = processor
        self.state = State()
        self._message = ""

    def write(self, file_name: Union[Path, str]) -> bool:

        try:

            # Create HDF5 file with the structure
            with h5py.File(file_name, "w") as f:

                # Set file version attribute
                f.attrs["file_version"] = "2.0"

                # Create groups
                raw_data_group = f.create_group("raw")
                paraview_group = f.create_group("paraview")
                parameters_group = f.create_group("parameters")

                # Store the filtered numpy array (with fluorophores)
                raw_data_group.create_dataset(
                    "npy", data=self.processor.filtered_numpy_array, compression="gzip"
                )

                # Store the pandas dataframe: to make sure not to depend on additional
                # dependencies (like "tables") that are not bundled with ParaView, we
                # save the dataframe as a NumPy array and save the column names and types
                # as attributes.
                self._store_dataframe(paraview_group)

                # Store important parameters
                self._store_parameters(parameters_group)

            # No error
            self._message = ""

            # Return success
            return True

        except Exception as e:

            # Set the error message
            self._message = str(e)

            return False

    @property
    def message(self):
        """Return last error message."""
        return self._message

    def _store_dataframe(self, group):
        """Write the Pandas DataFrame in a way that it can be reloaded without external dependencies."""

        if self.processor.filtered_dataframe is None:
            return

        dataset = group.create_dataset(
            "dataframe",
            data=self.processor.filtered_dataframe.to_numpy(),
            compression="gzip",
        )

        # Convert the column names to a list of strings
        column_names = self.processor.filtered_dataframe.columns.tolist()

        # Convert column data types to a list of strings
        column_types = [
            str(self.processor.filtered_dataframe[col].dtype)
            for col in self.processor.filtered_dataframe.columns
        ]

        # Store the column names as an attribute of the dataset
        dataset.attrs["column_names"] = column_names

        # Store column data types as attribute of the dataset
        dataset.attrs["column_types"] = column_types

        # We preserve the index as well (as a Dataset, since it can be large)
        index_data = np.array(self.processor.filtered_dataframe.index)
        group.create_dataset("dataframe_index", data=index_data, compression="gzip")

    def _store_parameters(self, group):
        """Write important parameters."""

        group.create_dataset("z_scaling_factor", data=self.processor.z_scaling_factor)
        group.create_dataset("min_trace_length", data=self.processor.min_trace_length)
        if self.state.applied_efo_thresholds is not None:
            group.create_dataset(
                "applied_efo_thresholds",
                data=self.state.applied_efo_thresholds,
            )
        if self.state.applied_cfr_thresholds is not None:
            group.create_dataset(
                "applied_cfr_thresholds",
                data=self.state.applied_cfr_thresholds,
            )
        if self.state.applied_tr_len_thresholds is not None:
            group.create_dataset(
                "applied_tr_len_thresholds",
                data=self.state.applied_tr_len_thresholds,
            )
        if self.state.applied_time_thresholds is not None:
            group.create_dataset(
                "applied_time_thresholds",
                data=self.state.applied_time_thresholds,
            )
        group.create_dataset("num_fluorophores", data=self.processor.num_fluorophores)
        group.create_dataset("dwell_time", data=self.state.dwell_time)
        group.create_dataset("scale_bar_size", data=self.state.scale_bar_size)

        # HDF5 does not have a native boolean type, so we save as int8 and convert it
        # back to boolean on read.
        group.create_dataset("is_tracking", data=np.int8(self.state.is_tracking))
        group.create_dataset("pool_dcr", data=np.int8(self.state.pool_dcr))

Instance variables

var message

Return last error message.

Expand source code
@property
def message(self):
    """Return last error message."""
    return self._message

Methods

def write(self, file_name: Union[pathlib.Path, str]) ‑> bool
Expand source code
def write(self, file_name: Union[Path, str]) -> bool:

    try:

        # Create HDF5 file with the structure
        with h5py.File(file_name, "w") as f:

            # Set file version attribute
            f.attrs["file_version"] = "2.0"

            # Create groups
            raw_data_group = f.create_group("raw")
            paraview_group = f.create_group("paraview")
            parameters_group = f.create_group("parameters")

            # Store the filtered numpy array (with fluorophores)
            raw_data_group.create_dataset(
                "npy", data=self.processor.filtered_numpy_array, compression="gzip"
            )

            # Store the pandas dataframe: to make sure not to depend on additional
            # dependencies (like "tables") that are not bundled with ParaView, we
            # save the dataframe as a NumPy array and save the column names and types
            # as attributes.
            self._store_dataframe(paraview_group)

            # Store important parameters
            self._store_parameters(parameters_group)

        # No error
        self._message = ""

        # Return success
        return True

    except Exception as e:

        # Set the error message
        self._message = str(e)

        return False