import numpy as np
import dill as pickle
from hic3defdr.util.matrices import select_matrix
[docs]class CoreHiC3DeFDR(object):
"""
Mixin class providing core saving and loading functionality for HiC3DeFDR.
"""
@property
def picklefile(self):
return '%s/pickle' % self.outdir
[docs] @classmethod
def load(cls, outdir):
"""
Loads a HiC3DeFDR analysis object from disk.
It is safe to have multiple instances of the same analysis open at once.
Parameters
----------
outdir : str
Folder path to where the HiC3DeFDR was saved.
Returns
-------
HiC3DeFDR
The loaded object.
"""
with open('%s/pickle' % outdir, 'rb') as handle:
return cls(outdir=outdir, **pickle.load(handle))
[docs] def load_bias(self, chrom):
"""
Loads the bias matrix for one chromosome.
The rows of the bias matrix correspond to bin indices along the
chromosome. The columns correspond to the replicates.
The bias factors for bins that fail ``bias_thresh`` are set to zero.
This is designed so that all pixels in these bins get dropped during
union pixel set computation.
Parameters
----------
chrom : str
The name of the chromosome to load the bias matrix for.
Returns
-------
np.ndarray
The bias matrix.
"""
bias = np.array([np.loadtxt(pattern.replace('<chrom>', chrom))
for pattern in self.bias_patterns]).T
bias[(np.any(bias < self.bias_thresh, axis=1)) |
(np.any(bias > 1. / self.bias_thresh, axis=1)), :] = 0
return bias
[docs] def load_data(self, name, chrom=None, idx=None, rep=None, cond=None,
coo=False):
"""
Loads arbitrary data for one chromosome or all chromosomes.
Parameters
----------
name : str
The name of the data to load.
chrom : str, optional
The name of the chromosome to load data for. Pass None if this data
is not organized by chromosome. Pass 'all' to load data for all
chromosomes.
idx : np.ndarray or tuple of np.ndarray, optional
Pass a boolean array to load only a subset of the data. Pass a tuple
of exactly two boolean arrays to chain the indices.
rep : str, optional
If the data is a (pixels, reps) rectangular array, pass the name of
a rep to get a (pixels,) vector of data for just that rep.
cond : str, optional
If the data is a (pixels, conds) rectangular array, pass the name of
a condition to get a (pixels,) vector of data for just that
condition.
coo : bool
Pass True to return a COO-format ``(row, col, data)`` tuple of
``np.ndarray``. ``chrom`` cannot be 'all', and ``idx`` must be None
(this function will handle filtering automatically).
Returns
-------
data or (concatenated_data, offsets) or (row, col, data) : np.ndarray
The loaded data for one chromosome, or a tuple of the concatenated
data and an array of offsets per chromosome. ``offsets`` satisfies
the following properties: ``offsets[0] == 0``,
``offsets[-1] == concatenated_data.shape[0]``, and
``concatenated_data[offsets[i]:offsets[i+1], :]`` contains the data
for the ``i``th chromosome. If called with ``coo=True``, this
function returns a COO-format ``(row, col, data)`` tuple of
``np.ndarray``.
"""
# short-circuit when asking for loop_idx when no loop_patterns exist
if name == 'loop_idx' and self.loop_patterns is None and idx is None \
and chrom != 'all':
disp_idx = np.load_data('disp_idx', chrom)
return np.ones(disp_idx.sum(), dtype=bool)
# resolve col_idx
col_idx = self.design.index.tolist().index(rep) if rep is not None \
else self.design.columns.tolist().index(cond) if cond is not None \
else None
# handle coo
if coo:
if chrom == 'all' or idx is not None:
raise ValueError("cannot pass coo=True with chrom='all' or idx")
if name in ['row', 'col', 'bias', 'cov_per_bin', 'disp_per_bin']:
raise ValueError('data with name %s cannot be loaded as COO'
% name)
if name in ['raw', 'size_factors', 'scaled', 'disp_idx']:
row = self.load_data('row', chrom)
col = self.load_data('col', chrom)
elif name in ['loop_idx', 'disp', 'mu_hat_null', 'mu_hat_alt',
'llr', 'pvalues']:
disp_idx = self.load_data('disp_idx', chrom)
row = self.load_data('row', chrom, idx=disp_idx)
col = self.load_data('col', chrom, idx=disp_idx)
elif name in ['qvalues']:
disp_idx = self.load_data('disp_idx', chrom)
loop_idx = self.load_data('loop_idx', chrom)
row = self.load_data('row', chrom, idx=(disp_idx, loop_idx))
col = self.load_data('col', chrom, idx=(disp_idx, loop_idx))
else:
raise ValueError('data name %s not recognized' % name)
data = self.load_data(name, chrom)
if col_idx is not None:
return row, col, data[:, col_idx]
return row, col, data
# index chaining
if type(idx) == tuple:
big_idx, small_idx = idx
big_idx = big_idx.copy()
big_idx[np.where(big_idx)[0][~small_idx]] = False
idx = big_idx
# tackle simple cases first
if chrom is None:
fname = '%s/%s.npy' % (self.outdir, name)
elif chrom != 'all':
fname = '%s/%s_%s.npy' % (self.outdir, name, chrom)
else:
fname = None
if fname is not None:
if idx is None:
data = np.load(fname)
if col_idx is not None:
return data[:, col_idx]
return data
else:
data = np.load(fname, mmap_mode='r')
if col_idx is not None:
return data[idx, col_idx]
return data[idx]
# idx is genome-wide, this tracks where we are in idx so that we can
# find a subset of idx that aligns with the current chrom
idx_offset = 0
# list of data arrays per chromosome
all_data = []
# running total of the sizes of the elements of all data
offset = 0
# saves value of offset after each chrom
offsets = [0]
# loop over chroms
for chrom in self.chroms:
fname = '%s/%s_%s.npy' % (self.outdir, name, chrom)
if idx is not None:
data = np.load(fname, mmap_mode='r')
full_data_size = data.shape[0]
data = data[idx[idx_offset:idx_offset+full_data_size]]
idx_offset += full_data_size
else:
data = np.load(fname)
offset += data.shape[0]
offsets.append(offset)
all_data.append(data)
all_data = np.concatenate(all_data)
if col_idx is not None:
return all_data[:, col_idx], np.array(offsets)
return all_data, np.array(offsets)
[docs] def save_data(self, data, name, chrom=None):
"""
Saves arbitrary data for one chromosome to disk.
Parameters
----------
data : np.ndarray
The data to save.
name : str
The name of the data to save.
chrom : str or np.ndarray, optional
The name of the chromosome to save data for, or None if this data is
not organized by chromosome. Pass an np.ndarray of offsets to save
data for all chromosomes.
"""
if chrom is None:
np.save('%s/%s.npy' % (self.outdir, name), data)
elif isinstance(chrom, np.ndarray):
for i, c in enumerate(self.chroms):
self.save_data(data[chrom[i]:chrom[i + 1]], name, c)
else:
np.save('%s/%s_%s.npy' % (self.outdir, name, chrom), data)
[docs] def load_disp_fn(self, cond):
"""
Loads the fitted dispersion function for a specific condition from disk.
Parameters
----------
cond : str
The condition to load the dispersion function for.
Returns
-------
function
Vectorized. Takes in the value of the covariate the dispersion was
fitted against and returns the appropriate dispersion.
"""
picklefile = '%s/disp_fn_%s.pickle' % (self.outdir, cond)
with open(picklefile, 'rb') as handle:
return pickle.load(handle)
[docs] def save_disp_fn(self, cond, disp_fn):
"""
Saves the fitted dispersion function for a specific condition and
chromosome to disk.
Parameters
----------
cond : str
The condition to save the dispersion function for.
disp_fn : function
The dispersion function to save.
"""
picklefile = '%s/disp_fn_%s.pickle' % (self.outdir, cond)
with open(picklefile, 'wb') as handle:
return pickle.dump(disp_fn, handle, -1)
[docs] def get_matrix(self, name, chrom, row_slice, col_slice, rep=None,
cond=None):
"""
Loads data with ``name`` as a dense matrix specified by ``chrom,
row_slice, col_slice``.
Parameters
----------
name : str
The name (stage) of the data to load. Add a special suffix '_mean'
to average per-rep stages within condtiions.
chrom : str
The chromosome to select data from.
row_slice, col_slice : slice
Row and column slice to use, respectively.
rep, cond : str, optional
Pass the rep name or condition name if the data specified by
``name`` has multiple columns.
Returns
-------
np.ndarray
The selected dense matrix.
"""
if name.endswith('_mean'):
reps = self.design[self.design[cond]].index.tolist()
return np.mean(
[self.get_matrix(name.strip('_mean'), chrom, row_slice,
col_slice, rep=r)
for r in reps],
axis=0
)
return select_matrix(
row_slice,
col_slice,
*self.load_data(name, chrom, rep=rep, cond=cond, coo=True)
)