Source code for hic3defdr.analysis.analysis

import os

import numpy as np
import scipy.sparse as sparse
import pandas as pd

from lib5c.util.statistics import adjust_pvalues

import hic3defdr.util.scaling as scaling
import hic3defdr.util.dispersion as dispersion
from hic3defdr.util.printing import eprint
from hic3defdr.util.matrices import sparse_union
from hic3defdr.util.clusters import load_clusters, save_clusters
from hic3defdr.util.cluster_table import clusters_to_table, \
    load_cluster_table, sort_cluster_table
from hic3defdr.util.lowess import lowess_fit, weighted_lowess_fit
from hic3defdr.util.lrt import lrt
from hic3defdr.util.thresholding import threshold_and_cluster, size_filter
from hic3defdr.util.classification import classify
from hic3defdr.util.progress import tqdm_maybe as tqdm
from hic3defdr.util.parallelization import parallel_apply, parallel_map


[docs]class AnalyzingHiC3DeFDR(object): """ Mixin class containing analysis functions for HiC3DeFDR. """
[docs] def prepare_data(self, chrom=None, norm='conditional_mor', n_bins=-1, n_threads=-1, verbose=True): """ Prepares raw and normalized data for analysis. Parameters ---------- chrom : str The name of the chromosome to prepare raw data for. Pass None to run for all chromosomes in series. norm : str The method to use to account for differences in sequencing depth. Valid options are: - simple_scaling: scale each replicate to equal total depth - median_of_ratios: use median of ratios normalization, ignoring pixels at which any replicate has a zero - conditional_scaling: apply simple scaling independently at each distance scale - conditional_mor: apply median of ratios independently at each distance scale n_bins : int, optional Number of distance bins to use during scaling normalization if ``norm`` is one of the conditional options. Pass 0 or None to match pixels by exact distance. Pass -1 to use a reasonable default: 1/5 of ``self.dist_thesh_max``. n_threads : int The number of threads (technically GIL-avoiding child processes) to use to process multiple chromosomes in parallel. Pass -1 to use as many threads as there are CPUs. Pass 0 to process the chromosomes serially. verbose : bool Pass False to silence reporting of progress to stderr. """ if n_bins == -1: n_bins = int(self.dist_thresh_max / 5) if chrom is None: if n_threads: parallel_apply( self.prepare_data, [{'chrom': c, 'norm': norm, 'n_bins': n_bins, 'verbose': False} for c in self.chroms], n_threads=n_threads ) else: for chrom in self.chroms: self.prepare_data(chrom=chrom, norm=norm, n_bins=n_bins) return eprint('preparing data for chrom %s' % chrom) eprint(' loading bias', skip=not verbose) bias = self.load_bias(chrom) eprint(' computing union pixel set', skip=not verbose) row, col = sparse_union( [pattern.replace('<chrom>', chrom) for pattern in self.raw_npz_patterns], dist_thresh=self.dist_thresh_max, bias=bias ) eprint(' loading raw data', skip=not verbose) raw = np.zeros((len(row), len(self.raw_npz_patterns)), dtype=int) for i, pattern in enumerate(self.raw_npz_patterns): raw[:, i] = sparse.load_npz(pattern.replace('<chrom>', chrom)) \ .tocsr()[row, col] eprint(' loading balanced data', skip=not verbose) balanced = np.zeros((len(row), len(self.raw_npz_patterns)), dtype=float) for r, pattern in enumerate(self.raw_npz_patterns): balanced[:, r] = sparse.load_npz(pattern.replace('<chrom>', chrom))\ .tocsr()[row, col] / (bias[row, r] * bias[col, r]) eprint(' computing size factors', skip=not verbose) if 'conditional' in norm: size_factors = scaling.__dict__[norm](balanced, col - row, n_bins=n_bins) else: size_factors = scaling.__dict__[norm](balanced) scaled = balanced / size_factors eprint(' computing disp_idx', skip=not verbose) dist = col - row mean = np.dot(scaled, self.design) / np.sum(self.design, axis=0).values disp_idx = np.all(mean >= self.mean_thresh, axis=1) & \ (dist >= self.dist_thresh_min) if self.loop_patterns: eprint(' making loop_idx', skip=not verbose) loop_pixels = set().union( *sum((load_clusters(pattern.replace('<chrom>', chrom)) for pattern in self.loop_patterns.values()), [])) loop_idx = np.array([True if pixel in loop_pixels else False for pixel in zip(row[disp_idx], col[disp_idx])]) self.save_data(loop_idx, 'loop_idx', chrom) eprint(' saving data to disk', skip=not verbose) self.save_data(row, 'row', chrom) self.save_data(col, 'col', chrom) self.save_data(raw, 'raw', chrom) self.save_data(size_factors, 'size_factors', chrom) self.save_data(scaled, 'scaled', chrom) self.save_data(disp_idx, 'disp_idx', chrom)
[docs] def estimate_disp(self, estimator='qcml', frac=None, auto_frac_factor=15., weighted_lowess=True, n_threads=-1): """ Estimates dispersion parameters. Parameters ---------- estimator : 'cml', 'qcml', 'mme', or a function Pass 'cml', 'qcml', 'mme' to use conditional maximum likelihood (CML), quantile-adjusted CML (qCML), or method of moments estimation (MME) to estimate the dispersion within each bin. Pass a function that takes in a (pixels, replicates) shaped array of data and returns a dispersion value to use that instead. frac : float, optional The lowess smoothing fraction to use when fitting the distance vs dispersion trend. Pass None to choose a value automatically. auto_frac_factor : float When ``frac`` is None, this factor scales the automatically determined fraction parameter. weighted_lowess : bool Whether or not to use a weighted lowess fit when fitting the smoothed dispersion curve. n_threads : int The number of threads (technically GIL-avoiding child processes) to use to process multiple distance scales in parallel. Pass -1 to use as many threads as there are CPUs. Pass 0 to process the distance scales serially. """ eprint('estimating dispersion') estimator = dispersion.__dict__[estimator] \ if estimator in dispersion.__dict__ else estimator lowess_fn = weighted_lowess_fit if weighted_lowess else lowess_fit eprint(' loading data') disp_idx, disp_idx_offsets = self.load_data('disp_idx', 'all') row, offsets = self.load_data('row', 'all', idx=disp_idx) col, _ = self.load_data('col', 'all', idx=disp_idx) raw, _ = self.load_data('raw', 'all', idx=disp_idx) dist = col - row f = np.ones_like(raw, dtype=float) for i, chrom in enumerate(self.chroms): chrom_slice = slice(offsets[i], offsets[i+1]) row_chrom = row[chrom_slice] col_chrom = col[chrom_slice] disp_idx_chrom = disp_idx[disp_idx_offsets[i]:disp_idx_offsets[i+1]] bias = self.load_bias(chrom) size_factors = self.load_data('size_factors', chrom)[disp_idx_chrom] f[chrom_slice] = bias[row_chrom, :] * bias[col_chrom, :] \ * size_factors def _estimate(raw_slice, f_slice): return np.nan if not raw_slice.size \ else estimator(raw_slice, f=f_slice) disp_per_dist = np.zeros((self.dist_thresh_max+1, self.design.shape[1])) disp = np.zeros((disp_idx.sum(), self.design.shape[1])) for c, cond in enumerate(self.design.columns): eprint(' estimating dispersion for condition %s' % cond) if n_threads: disp_per_dist[:, c] = parallel_map( _estimate, [{'raw_slice': raw[dist == d, :][:, self.design[cond]], 'f_slice': f[dist == d, :][:, self.design[cond]]} for d in range(self.dist_thresh_max+1)], n_threads=n_threads ) else: for d in tqdm(range(self.dist_thresh_max+1)): raw_slice = raw[dist == d, :][:, self.design[cond]] f_slice = f[dist == d, :][:, self.design[cond]] disp_per_dist[d, c] = np.nan if not raw_slice.size \ else estimator(raw_slice, f=f_slice) eprint(' fitting distance vs dispersion relationship') idx = np.isfinite(disp_per_dist[:, c]) x = np.arange(self.dist_thresh_max+1)[idx] y = disp_per_dist[:, c][idx] lowess_kwargs = {'left_boundary': y[0]} if frac is not None: lowess_kwargs['frac'] = frac if weighted_lowess: lowess_kwargs['auto_frac_factor'] = auto_frac_factor disp_fn = lowess_fn(x, y, **lowess_kwargs) disp[:, c] = disp_fn(dist) self.save_disp_fn(cond, disp_fn) eprint(' saving estimated dispersions to disk') self.save_data(disp, 'disp', offsets) self.save_data(disp_per_dist, 'disp_per_dist')
[docs] def lrt(self, chrom=None, refit_mu=True, n_threads=-1, verbose=True): """ Runs the likelihood ratio test to test for differential interactions. Parameters ---------- chrom : str The name of the chromosome to run the LRT for. Pass None to run for all chromosomes in series. refit_mu : bool Pass True to refit the mean parameters in the NB models being compared in the LRT. Pass False to use the means across replicates directly, which is simpler and slightly faster but technically violates the assumptions of the LRT. n_threads : int The number of threads (technically GIL-avoiding child processes) to use to process multiple chromosomes in parallel. Pass -1 to use as many threads as there are CPUs. Pass 0 to process the chromosomes serially. verbose : bool Pass False to silence reporting of progress to stderr. """ if chrom is None: if n_threads: parallel_apply( self.lrt, [{'chrom': c, 'refit_mu': refit_mu, 'verbose': False} for c in self.chroms], n_threads=n_threads ) else: for chrom in self.chroms: self.lrt(chrom=chrom, refit_mu=refit_mu) return eprint('running LRT for chrom %s' % chrom) eprint(' loading data', skip=not verbose) bias = self.load_bias(chrom) # we load size_factors the slow way because we don't know in advance if # it's 2 dimensional or not size_factors = self.load_data('size_factors', chrom) disp_idx = self.load_data('disp_idx', chrom) row = self.load_data('row', chrom, idx=disp_idx) col = self.load_data('col', chrom, idx=disp_idx) raw = self.load_data('raw', chrom, idx=disp_idx) disp = self.load_data('disp', chrom) eprint(' computing LRT results', skip=not verbose) if len(size_factors.shape) == 2: f = bias[row] * bias[col] * size_factors[disp_idx, :] else: f = bias[row] * bias[col] * size_factors pvalues, llr, mu_hat_null, mu_hat_alt = lrt( raw, f, np.dot(disp, self.design.values.T), self.design.values, refit_mu=refit_mu) eprint(' saving results to disk', skip=not verbose) self.save_data(pvalues, 'pvalues', chrom) self.save_data(llr, 'llr', chrom) self.save_data(mu_hat_null, 'mu_hat_null', chrom) self.save_data(mu_hat_alt, 'mu_hat_alt', chrom)
[docs] def bh(self): """ Applies BH-FDR control to p-values across all chromosomes to obtain q-values. Should only be run after all chromosomes have been processed through p-values. """ eprint('applying BH-FDR correction') if self.loop_patterns: loop_idx, _ = self.load_data('loop_idx', 'all') else: loop_idx = None pvalues, offsets = self.load_data('pvalues', 'all', idx=loop_idx) all_qvalues = adjust_pvalues(pvalues) for i, chrom in enumerate(self.chroms): self.save_data(all_qvalues[offsets[i]:offsets[i+1]], 'qvalues', chrom)
[docs] def run_to_qvalues(self, norm='conditional_mor', n_bins_norm=-1, estimator='qcml', frac=None, auto_frac_factor=15., weighted_lowess=True, refit_mu=True, n_threads=-1, verbose=True): """ Shortcut method to run the analysis to q-values. Parameters ---------- norm : str The method to use to account for differences in sequencing depth. Valid options are: - simple_scaling: scale each replicate to equal total depth - median_of_ratios: use median of ratios normalization, ignoring pixels at which any replicate has a zero - conditional_scaling: apply simple scaling independently at each distance scale - conditional_mor: apply median of ratios independently at each distance scale n_bins_norm : int, optional Number of distance bins to use during scaling normalization if ``norm`` is one of the conditional options. Pass 0 or None to match pixels by exact distance. Pass -1 to use a reasonable default: 1/5 of ``self.dist_thesh_max``. estimator : 'cml', 'qcml', 'mme', or a function Pass 'cml', 'qcml', 'mme' to use conditional maximum likelihood (CML), qnorm-CML (qCML), or method of moments estimation (MME) to estimate the dispersion within each bin. Pass a function that takes in a (pixels, replicates) shaped array of data and returns a dispersion value to use that instead. frac : float, optional The lowess smoothing fraction to use when fitting the distance vs dispersion trend. Pass None to choose a value automatically. auto_frac_factor : float When ``frac`` is None, this factor scales the automatically determined fraction parameter. weighted_lowess : bool Whether or not to use a weighted lowess fit when fitting the smoothed dispersion curve. refit_mu : bool Pass True to refit the mean parameters in the NB models being compared in the LRT. Pass False to use the means across replicates directly, which is simpler and slightly faster but technically violates the assumptions of the LRT. n_threads : int The number of threads (technically GIL-avoiding child processes) to use to process multiple chromosomes in parallel. Pass -1 to use as many threads as there are CPUs. Pass 0 to process the chromosomes serially. verbose : bool Pass False to silence reporting of progress to stderr. """ self.prepare_data(norm=norm, n_bins=n_bins_norm, n_threads=n_threads) self.estimate_disp( estimator=estimator, frac=frac, auto_frac_factor=auto_frac_factor, weighted_lowess=weighted_lowess, n_threads=n_threads) self.lrt(refit_mu=refit_mu, n_threads=n_threads) self.bh()
[docs] def threshold(self, chrom=None, fdr=0.05, cluster_size=3, n_threads=-1): """ Thresholds and clusters significantly differential pixels. Should only be run after q-values have been obtained. Parameters ---------- chrom : str The name of the chromosome to threshold. Pass None to threshold all chromosomes in series. fdr : float or list of float The FDR to threshold on. Pass a list to do a sweep in series. cluster_size : int or list of int Clusters smaller than this size will be filtered out. Pass a list to do a sweep in series., n_threads : int The number of threads (technically GIL-avoiding child processes) to use to process multiple chromosomes in parallel. Pass -1 to use as many threads as there are CPUs. Pass 0 to process the chromosomes serially. """ if chrom is None: if n_threads: parallel_apply( self.threshold, [{'chrom': c, 'fdr': fdr, 'cluster_size': cluster_size} for c in self.chroms], n_threads=n_threads ) else: for chrom in self.chroms: self.threshold(chrom=chrom, fdr=fdr, cluster_size=cluster_size) return eprint('thresholding and clustering chrom %s' % chrom) # load everything row, col, qvalues = self.load_data('qvalues', chrom, coo=True) # upgrade fdr and cluster_size to list if not hasattr(fdr, '__len__'): fdr = [fdr] if not hasattr(cluster_size, '__len__'): cluster_size = [cluster_size] for f in fdr: sig_clusters, insig_clusters = threshold_and_cluster( qvalues, row, col, fdr) for s in cluster_size: # threshold on cluster size filtered_sig_clusters = size_filter(sig_clusters, s) filtered_insig_clusters = size_filter(insig_clusters, s) # save to disk sig_outfile = '%s/sig_%g_%i_%s.json' % \ (self.outdir, f, s, chrom) insig_outfile = '%s/insig_%g_%i_%s.json' % \ (self.outdir, f, s, chrom) save_clusters(filtered_sig_clusters, sig_outfile) save_clusters(filtered_insig_clusters, insig_outfile) if self.res is not None: clusters_to_table(filtered_sig_clusters, chrom, self.res)\ .to_csv(sig_outfile.replace('.json', '.tsv'), sep='\t') clusters_to_table(filtered_insig_clusters, chrom, self.res)\ .to_csv(insig_outfile.replace('.json', '.tsv'), sep='\t')
[docs] def classify(self, chrom=None, fdr=0.05, cluster_size=3, n_threads=-1): """ Classifies significantly differential pixels according to which condition they are strongest in. Parameters ---------- chrom : str The chromosome to classify significantly differential pixels on. Pass None to run for all chromosomes in series. fdr : float or list of float The FDR threshold used to identify clusters of significantly differential pixels via ``self.threshold()``. Pass a list to do a sweep in series. cluster_size : int or list of int The cluster size threshold used to identify clusters of significantly differential pixels via ``threshold()``. Pass a list to do a sweep in series. n_threads : int The number of threads (technically GIL-avoiding child processes) to use to process multiple chromosomes in parallel. Pass -1 to use as many threads as there are CPUs. Pass 0 to process the chromosomes serially. """ if chrom is None: if n_threads: parallel_apply( self.classify, [{'chrom': c, 'fdr': fdr, 'cluster_size': cluster_size} for c in self.chroms], n_threads=n_threads ) else: for chrom in self.chroms: self.classify(chrom=chrom, fdr=fdr, cluster_size=cluster_size) return eprint('classifying differential interactions on chrom %s' % chrom) # load everything disp_idx = self.load_data('disp_idx', chrom) loop_idx = self.load_data('loop_idx', chrom) row = self.load_data('row', chrom, idx=(disp_idx, loop_idx)) col = self.load_data('col', chrom, idx=(disp_idx, loop_idx)) mu_hat_alt = self.load_data('mu_hat_alt', chrom, idx=loop_idx) # upgrade fdr and cluster_size to list if not hasattr(fdr, '__len__'): fdr = [fdr] if not hasattr(cluster_size, '__len__'): cluster_size = [cluster_size] for f in fdr: for s in cluster_size: infile = '%s/sig_%g_%i_%s.json' % (self.outdir, f, s, chrom) if not os.path.isfile(infile): self.threshold(chrom=chrom, fdr=f, cluster_size=s) sig_clusters = load_clusters(infile) class_clusters = classify(row, col, mu_hat_alt, sig_clusters) for i, c in enumerate(class_clusters): outfile = '%s/%s_%g_%i_%s.json' % \ (self.outdir, self.design.columns[i], f, s, chrom) save_clusters(c, outfile) if self.res is not None: clusters_to_table(c, chrom, self.res)\ .to_csv(outfile.replace('.json', '.tsv'), sep='\t')
[docs] def collect(self, fdr=0.05, cluster_size=3, n_threads=-1): """ Collects information on thresholded and classified differential interactions into a single TSV output file. Parameters ---------- fdr : float or list of float The FDR threshold used to identify clusters of significantly differential pixels via ``self.threshold()``. Pass a list to do a sweep in series. cluster_size : int or list of int The cluster size threshold used to identify clusters of significantly differential pixels via ``threshold()``. Pass a list to do a sweep in series. n_threads : int The number of threads (technically GIL-avoiding child processes) to use to process multiple chromosomes in parallel. Pass -1 to use as many threads as there are CPUs. Pass 0 to process the chromosomes serially. """ if self.res is None: raise ValueError( 'the collect() step can only be run if the res kwarg was ' 'passed during construction of the HiC3DeFDR object; please ' 'run the classify() step instead or re-create the HiC3DeFDR ' 'object (you do not need to re-run any other steps)' ) eprint('collecting differential interactions') # upgrade fdr and cluster_size to list if not hasattr(fdr, '__len__'): fdr = [fdr] if not hasattr(cluster_size, '__len__'): cluster_size = [cluster_size] for f in fdr: for s in cluster_size: # file pattern where we expect tables to be present pattern = '%s/<class>_%g_%i_<chrom>.tsv' % (self.outdir, f, s) # ensure that all outputs are present if not all(os.path.isfile(pattern .replace('<class>', 'insig') .replace('<chrom>', chrom)) for chrom in self.chroms): self.threshold(fdr=f, cluster_size=s, n_threads=n_threads) if not all(os.path.isfile(pattern .replace('<class>', c) .replace('<chrom>', chrom)) for c in self.design.columns for chrom in self.chroms): self.classify(fdr=f, cluster_size=s, n_threads=n_threads) # output file we want to write the final table to outfile = '%s/results_%g_%i.tsv' % (self.outdir, f, s) # collect cluster tables tables = [] for chrom in self.chroms: df = load_cluster_table(pattern .replace('<class>', 'insig') .replace('<chrom>', chrom)) df['classification'] = 'constitutive' tables.append(df) for c in self.design.columns: df = load_cluster_table(pattern .replace('<class>', c) .replace('<chrom>', chrom)) df['classification'] = c tables.append(df) # write to disk sort_cluster_table(pd.concat(tables)).to_csv(outfile, sep='\t')