Skip to content

Commit

Permalink
refactor: extracted more writer builder work to class
Browse files Browse the repository at this point in the history
  • Loading branch information
jspaezp committed Dec 15, 2024
1 parent 692a9f2 commit d4d3e8c
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 58 deletions.
5 changes: 4 additions & 1 deletion mokapot/brew.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,15 @@ def brew(
model = PercolatorModel()

try:
# Q: what is this doing? Why does the randon number
# generater get set only if the model has an estimator?
# Shouldn't it assign it to all the models if they are passed?
model.estimator
model.rng = rng
except AttributeError:
pass

# Check that all of the datasets have the same features:
# Check that all of the datasets have the same features:
feat_set = set(datasets[0].feature_columns)
if not all([
set(dataset.feature_columns) == feat_set for dataset in datasets
Expand Down
133 changes: 98 additions & 35 deletions mokapot/confidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,54 +352,30 @@ def assign_confidence(
)

output_writers_factory = OutputWriterFactory(
level_manager.extra_output_columns,
extra_output_columns=level_manager.extra_output_columns,
is_sqlite=is_sqlite,
append_to_output_file=append_to_output_file,
write_decoys=write_decoys,
)

if prefixes is None:
prefixes = [None] * len(datasets)

level_input_output_column_mapping = level_manager.build_output_col_mapping(
curr_dataset
)

out = []

for dataset, score, prefix in strictzip(datasets, scores_list, prefixes):
# todo: nice to have: move this column renaming stuff into the
# column defs module, and further, have standardized columns
# directly from the pin reader (applying the renaming itself)

# Q: why is this done here? it seems constant, since all
# datasets have the same columns.
level_input_output_column_mapping = (
level_manager.build_output_col_mapping(dataset)
output_writers, file_prefix = output_writers_factory.build_writers(
level_manager
)

file_prefix = file_root
if prefix:
file_prefix = f"{file_prefix}{prefix}."

output_writers = {}
for level in level_manager.levels_or_proteins:
output_writers[level] = []

outfile_targets = (
dest_dir / f"{file_prefix}targets.{level}{file_ext}"
)

output_writers[level].append(
output_writers_factory.create_writer(
outfile_targets, level, not append_to_output_file
)
)

if write_decoys and not is_sqlite:
outfile_decoys = (
dest_dir / f"{file_prefix}decoys.{level}{file_ext}"
)
output_writers[level].append(
output_writers_factory.create_output_writer(
outfile_decoys, level, not append_to_output_file
)
)

score_reader = TabularDataReader.from_array(score, "score")
with create_sorted_file_reader(
dataset,
Expand Down Expand Up @@ -566,7 +542,36 @@ def finalize(self):


class LevelManager:
"""Manages level-specific data and operations."""
"""Manages level-specific data and operations.
This class is meant to be used internally by the `Confidence` class.
Parameters
----------
level_columns : list of str
The columns that can be used to aggregate PSMs.
For example, peptides, modified peptides, precursors.
would generate "rollups" of the PSMs at the PSM (default)
and in addition to that, the peptide and modified peptide
columns would generate "peptide groups" of PSMs (each).
default_extension : str
The default extension to use for the output files.
The extension will be used to determine the output format
when initializing the `LevelWriterCollection` which internally
uses the `TabularDataWriter.from_suffix` method.
spectrum_columns : list of str
The columns that uniquely identify a mass spectrum.
do_rollup : bool
Do we apply rollup on peptides, modified peptides etc.?
use_proteins : bool
Whether to roll up protein-level confidence estimates.
dest_dir : Path
The directory in which to save the files.
file_root : str
The prefix added to all output file names.
The final file names will be:
`dest_dir / file_root+level+default_extension`
"""

def __init__(
self,
Expand Down Expand Up @@ -644,6 +649,8 @@ def _setup_level_paths(

def _setup_hash_columns(self) -> None:
"""Setup hash columns for each level."""

# Q: wouldnt the right thing here be to use spectrum_cols + peptide?
self.level_hash_columns = {"psms": self.spectrum_columns}
for level in self.levels[1:]:
if level != "proteins":
Expand Down Expand Up @@ -722,13 +729,24 @@ def build_output_col_mapping(self, dataset: PsmDataset) -> dict:
class OutputWriterFactory:
"""Factory class for creating output writers based on configuration."""

def __init__(self, extra_output_columns: list[str], is_sqlite: bool):
def __init__(
self,
*,
extra_output_columns: list[str],
is_sqlite: bool,
append_to_output_file: bool,
write_decoys: bool,
):
# Q: are we deleting the sqlite ops?
self.is_sqlite = is_sqlite
self.write_decoys = write_decoys
self.extra_output_columns = extra_output_columns
self.append_to_output_file = append_to_output_file
self.output_column_names = [
"PSMId",
"peptide",
*extra_output_columns,
# Q: should we prefix these with "mokapot"?
"score",
"q-value",
"posterior_error_prob",
Expand All @@ -750,6 +768,7 @@ def __repr__(self) -> str:

def create_writer(
self,
*,
path: Path,
level: str,
initialize: bool,
Expand All @@ -776,6 +795,50 @@ def create_writer(
writer.initialize()
return writer

def build_writers(
self, level_manager: LevelManager, prefix: str | None = None
):
output_writers = {}

file_prefix = level_manager.file_root
if prefix:
file_prefix = f"{file_prefix}{prefix}."

for level in level_manager.levels_or_proteins:
output_writers[level] = []

name = [
str(file_prefix),
"targets.",
str(level),
str(level_manager.default_extension),
]

outfile_targets = level_manager.dest_dir / "".join(name)

output_writers[level].append(
self.create_writer(
path=outfile_targets,
level=level,
initialize=not self.append_to_output_file,
)
)

if self.write_decoys and not self.is_sqlite:
outfile_decoys = (
self.dest_dir
/ f"{self.file_prefix}decoys.{level}{self.file_ext}"
)
output_writers[level].append(
self.create_writer(
path=outfile_decoys,
level=level,
initialize=not self.append_to_output_file,
)
)

return output_writers, file_prefix


@contextmanager
@typechecked
Expand Down
16 changes: 11 additions & 5 deletions mokapot/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,15 +748,15 @@ class OnDiskPsmDataset(PsmDataset):
def __init__(
self,
filename_or_reader: Path | TabularDataReader,
columns,
*,
target_column,
spectrum_columns,
peptide_column,
protein_column,
feature_columns,
metadata_columns,
metadata_column_types, # the columns+types could be a dict.
level_columns,
level_columns, # What is this supposed to be?
filename_column,
scan_column,
specId_column, # Why does this have different capitalization?
Expand All @@ -773,7 +773,10 @@ def __init__(
else:
self._reader = TabularDataReader.from_path(filename_or_reader)

columns = self.reader.get_column_names()
self.columns = columns
# Q: Why ae columns asked for in the constructor?
# . Since we can read them from the reader ...
self._target_column = target_column
self._peptide_column = peptide_column
self._protein_column = protein_column
Expand All @@ -791,7 +794,6 @@ def __init__(
self._specId_column = specId_column
self._spectra_dataframe = spectra_dataframe

columns = self.reader.get_column_names()
opt_cols = OptionalColumns(
filename=filename_column,
scan=scan_column,
Expand Down Expand Up @@ -832,7 +834,7 @@ def check_columns(columns):
check_column(self.expmass_column)
check_column(self.rt_column)
check_column(self.charge_column)
check_column(self.specId_column)
# check_column(self.specId_column)

def get_default_extension(self) -> str:
return self.reader.get_default_extension()
Expand All @@ -852,6 +854,10 @@ def peptides(self) -> pd.Series:

@property
def specId_column(self) -> str:
# breakpoint()
# I am thinking on removing this ... since the "key"
# of a spectrum is all the columns that identify it uniquely.
# ... not this column that might or might not be present.
return self._specId_column

@property
Expand Down Expand Up @@ -919,7 +925,7 @@ def __repr__(self) -> str:
rep += f"Expmass column: {self.expmass_column}\n"
rep += f"Rt column: {self.rt_column}\n"
rep += f"Charge column: {self.charge_column}\n"
rep += f"SpecId column: {self.specId_column}\n"
# rep += f"SpecId column: {self.specId_column}\n"
rep += f"Spectra DF: \n{spec_sec}\n"
return rep

Expand Down
9 changes: 8 additions & 1 deletion mokapot/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ def decision_function(self, dataset: LinearPsmDataset):
numpy.ndarray
A :py:class:`numpy.ndarray` containing the score for each PSM.
"""
# Q: we should rename this methid to "score_dataset" ...
# ... or just remove it ... since it is redundant with `predict`
if not self.is_trained:
raise NotFittedError("This model is untrained. Run fit() first.")

Expand Down Expand Up @@ -558,6 +560,10 @@ def _get_starting_labels(dataset: LinearPsmDataset, model):
feat_pass : int
The number of passing PSMs with the best feature.
"""

# Note: This function does sooo much more than getting the starting
# labels, we should at least rename it to something more descriptive.
# JSPP 2024-12-14
LOGGER.debug("Finding initial direction...")
if model.direction is None and not model.is_trained:
feat_res = dataset._find_best_feature(model.train_fdr)
Expand Down Expand Up @@ -656,7 +662,7 @@ def _find_hyperparameters(model, features, labels):
return new_est


def _get_weights(model, features):
def _get_weights(model, features) -> list[str] | None:
"""
If the model is a linear model, parse the weights to a list of strings.
Expand All @@ -680,6 +686,7 @@ def _get_weights(model, features):
assert len(intercept) == 1
weights = list(weights.flatten())
except (AttributeError, AssertionError):
LOGGER.debug("No coefficients in the current model.")
return None

col_width = max([len(f) for f in features]) + 2
Expand Down
3 changes: 2 additions & 1 deletion mokapot/parsers/pin.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ def read_percolator(
chunk_size=CHUNK_SIZE_COLUMNS_FOR_DROP_COLUMNS,
)
df_spectra_list = []
# Q: this really feels like a bad idea ... concurrent mutation of a list
# . where the elements are concrruently mutated datafames in-place.
features_to_drop = Parallel(n_jobs=max_workers, require="sharedmem")(
delayed(drop_missing_values_and_fill_spectra_dataframe)(
reader=reader,
Expand Down Expand Up @@ -252,7 +254,6 @@ def read_percolator(

return OnDiskPsmDataset(
perc_file,
columns=columns,
target_column=labels,
spectrum_columns=spectra,
peptide_column=peptides,
Expand Down
5 changes: 0 additions & 5 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,6 @@ def psms_ondisk() -> OnDiskPsmDataset:
usecols=["ScanNr", "ExpMass", "Label"],
)
# Q: why is the exp mass in the spectra dataframe?
with open(filename) as perc:
columns = perc.readline().rstrip().split("\t")
psms = OnDiskPsmDataset(
filename,
target_column="Label",
Expand Down Expand Up @@ -251,7 +249,6 @@ def psms_ondisk() -> OnDiskPsmDataset:
filename_column=None,
specId_column="SpecId",
spectra_dataframe=df_spectra,
columns=columns,
)
return psms

Expand All @@ -264,7 +261,6 @@ def psms_ondisk_from_parquet() -> OnDiskPsmDataset:
filename, columns=["ScanNr", "ExpMass", "Label"]
).to_pandas()
df_spectra = convert_targets_column(df_spectra, "Label")
columns = pq.ParquetFile(filename).schema.names
psms = OnDiskPsmDataset(
filename,
target_column="Label",
Expand Down Expand Up @@ -309,7 +305,6 @@ def psms_ondisk_from_parquet() -> OnDiskPsmDataset:
filename_column=None,
specId_column="SpecId",
spectra_dataframe=df_spectra,
columns=columns,
)
return psms

Expand Down
4 changes: 0 additions & 4 deletions tests/unit_tests/test_confidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ def test_chunked_assign_confidence(psm_df_1000, tmp_path):
# incorrectly (namely the last and before last)

pin_file, df, _, score_cols = psm_df_1000
columns = list(pd.read_csv(pin_file, sep="\t").columns)
df_spectra = pd.read_csv(
pin_file, sep="\t", usecols=["scannr", "expmass", "target"]
)
Expand All @@ -51,7 +50,6 @@ def test_chunked_assign_confidence(psm_df_1000, tmp_path):
expmass_column="expmass",
rt_column="ret_time",
charge_column="charge",
columns=columns,
protein_column="proteins",
metadata_columns=[
"specid",
Expand Down Expand Up @@ -143,7 +141,6 @@ def test_assign_confidence_parquet(psm_df_1000_parquet, tmp_path):
"""Test that assign_confidence() works with parquet files."""

parquet_file, df, _ = psm_df_1000_parquet
columns = pq.ParquetFile(parquet_file).schema.names
df_spectra = pq.read_table(
parquet_file, columns=["scannr", "expmass", "target"]
).to_pandas()
Expand All @@ -160,7 +157,6 @@ def test_assign_confidence_parquet(psm_df_1000_parquet, tmp_path):
expmass_column="expmass",
rt_column="ret_time",
charge_column="charge",
columns=columns,
protein_column="proteins",
metadata_columns=[
"specid",
Expand Down
Loading

0 comments on commit d4d3e8c

Please sign in to comment.