# Copyright (c) 2016-present, Facebook, Inc.
# All rights reserved.
#
# Original code from Facebook and bootstrapped library, modifications by Edwin Chan
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree. An additional grant
# of patent rights can be found in the PATENTS file in the same directory.
'''Functions that allow one to create bootstrapped confidence intervals'''
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import multiprocessing as _multiprocessing
import numpy as _np
import scipy.sparse as _sparse
from pydistinct.stats_estimators import smoothed_jackknife_estimator
class BootstrapResults(object):
def __init__(self, lower_bound, value, upper_bound):
self.lower_bound = lower_bound
self.upper_bound = upper_bound
self.value = value
if self.lower_bound > self.upper_bound:
self.lower_bound, self.upper_bound = self.upper_bound, self.lower_bound
def __str__(self):
return '{1} ({0}, {2})'.format(self.lower_bound, self.value,
self.upper_bound)
def __repr__(self):
return self.__str__()
def _apply(self, other, func):
return BootstrapResults(func(self.lower_bound, other),
func(self.value, other),
func(self.upper_bound, other))
def __add__(self, other):
return self._apply(float(other), lambda x, other: other + x)
def __radd__(self, other):
return self._apply(float(other), lambda x, other: other + x)
def __sub__(self, other):
return self._apply(float(other), lambda x, other: x - other)
def __rsub__(self, other):
return self._apply(float(other), lambda x, other: other - x)
def __mul__(self, other):
return self._apply(float(other), lambda x, other: x * other)
def __rmul__(self, other):
return self._apply(float(other), lambda x, other: x * other)
def __eq__(self, other):
if type(other) != BootstrapResults:
return 0
return 1 if (
self.lower_bound == other.lower_bound and
self.upper_bound == other.upper_bound and
self.value == other.value) \
else 0
def error_width(self):
'''Returns: upper_bound - lower_bound'''
return self.upper_bound - self.lower_bound
def error_fraction(self):
'''Returns the error_width / value'''
if self.value == 0:
return _np.inf
else:
return self.error_width() / self.value
def is_significant(self):
return _np.sign(self.upper_bound) == _np.sign(self.lower_bound)
def get_result(self):
'''Returns:
-1 if statistically significantly negative
+1 if statistically significantly positive
0 otherwise
'''
return int(self.is_significant()) * _np.sign(self.value)
def _get_confidence_interval(bootstrap_dist, stat_val, alpha, is_pivotal):
'''Get the bootstrap confidence interval for a given distribution.
Args:
bootstrap_distribution: numpy array of bootstrap results from
bootstrap_distribution() or bootstrap_ab_distribution()
stat_val: The overall statistic that this method is attempting to
calculate error bars for.
alpha: The alpha value for the confidence intervals.
is_pivotal: if true, use the pivotal method. if false, use the
percentile method.
'''
val = _np.percentile(bootstrap_dist, 50)
bootstrap_dist = [i * (stat_val / val) for i in bootstrap_dist] # necessary for this analysis
if is_pivotal:
low = 2 * stat_val - _np.percentile(bootstrap_dist, 100 * (1 - alpha / 2.))
val = stat_val
high = 2 * stat_val - _np.percentile(bootstrap_dist, 100 * (alpha / 2.))
else:
low = _np.percentile(bootstrap_dist, 100 * ((alpha / 2)))
val = _np.percentile(bootstrap_dist, 50)
high = _np.percentile(bootstrap_dist, 100 * (1 - (alpha / 2)))
return BootstrapResults(low, val, high)
def _validate_arrays(values_lists):
t = values_lists[0]
t_type = type(t)
if not isinstance(t, _np.ndarray):
raise ValueError(('The arrays must either be of type '
'scipy.sparse.csr_matrix or numpy.array'))
for _, values in enumerate(values_lists[1:]):
if not isinstance(values, t_type):
raise ValueError('The arrays must all be of the same type')
if t.shape != values.shape:
raise ValueError('The arrays must all be of the same shape')
if isinstance(t, _sparse.csr_matrix):
if values.shape[0] > 1:
raise ValueError(('The sparse matrix must have shape 1 row X N'
' columns'))
def _generate_distributions(values_lists, num_iterations):
values_shape = values_lists[0].shape[0]
ids = _np.random.choice(
values_shape,
(num_iterations, values_shape),
replace=True,
)
results = [values[ids] for values in values_lists]
return results
def _bootstrap_sim(values_lists, stat_func_lists, num_iterations,
iteration_batch_size, seed):
'''Returns simulated bootstrap distribution.
See bootstrap() funciton for arg descriptions.
'''
if seed is not None:
_np.random.seed(seed)
num_iterations = int(num_iterations)
iteration_batch_size = int(iteration_batch_size)
results = [[] for _ in values_lists]
for rng in range(0, num_iterations, iteration_batch_size):
max_rng = min(iteration_batch_size, num_iterations - rng)
values_sims = _generate_distributions(values_lists, max_rng)
for i, values_sim, stat_func in zip(range(len(values_sims)), values_sims, stat_func_lists):
results[i].extend([stat_func(i) for i in values_sim])
return _np.array(results)
def _bootstrap_distribution(values_lists, stat_func_lists,
num_iterations, iteration_batch_size, num_threads):
'''Returns the simulated bootstrap distribution. The idea is to sample the same
indexes in a bootstrap re-sample across all arrays passed into values_lists.
This is especially useful when you want to co-sample records in a ratio metric.
numerator[k].sum() / denominator[k].sum()
and not
numerator[ j ].sum() / denominator[k].sum()
Args:
values_lists: list of numpy arrays (or scipy.sparse.csr_matrix)
each represents a set of values to bootstrap. All arrays in values_lists
must be of the same length.
stat_func_lists: statistic to bootstrap for each element in values_lists.
num_iterations: number of bootstrap iterations / resamples / simulations
to perform.
iteration_batch_size: The bootstrap sample can generate very large
matrices. This argument limits the memory footprint by
batching bootstrap rounds. If unspecified the underlying code
will produce a matrix of len(values) x num_iterations. If specified
the code will produce sets of len(values) x iteration_batch_size
(one at a time) until num_iterations have been simulated.
Defaults to no batching.
num_threads: The number of therads to use. This speeds up calculation of
the bootstrap. Defaults to 1. If -1 is specified then
multiprocessing.cpu_count() is used instead.
Returns:
The set of bootstrap resamples where each stat_function is applied on
the bootsrapped values.
'''
_validate_arrays(values_lists)
if iteration_batch_size is None:
iteration_batch_size = num_iterations
num_iterations = int(num_iterations)
iteration_batch_size = int(iteration_batch_size)
num_threads = int(num_threads)
if num_threads == -1:
num_threads = _multiprocessing.cpu_count()
if num_threads <= 1:
results = _bootstrap_sim(values_lists, stat_func_lists,
num_iterations, iteration_batch_size, None)
else:
pool = _multiprocessing.Pool(num_threads)
iter_per_job = _np.ceil(num_iterations * 1.0 / num_threads)
results = []
for seed in _np.random.randint(0, 2 ** 32 - 1, size=num_threads, dtype='uint64'):
r = pool.apply_async(_bootstrap_sim, (values_lists, stat_func_lists,
iter_per_job,
iteration_batch_size, seed))
results.append(r)
results = _np.hstack([res.get() for res in results])
pool.close()
return results
[docs]def bootstrap(stat_func=smoothed_jackknife_estimator, sequence=None, attributes=None,
alpha=0.05,
num_iterations=1000, iteration_batch_size=10, is_pivotal=True,
num_threads=1, return_distribution=False):
'''Returns bootstrap estimate.
Args:
stat_func: statistical estimator to use - good choices are median_estimator and smoothed_jackknife_estimator
sequence: sample sequence of integers
attributes: dictionary with keys as the unique elements and values as counts of those elements
alpha: alpha value representing the confidence interval. Defaults to 0.05, i.e., 95th-CI.
num_iterations: number of bootstrap iterations to run. The higher this
number the more sure you can be about the stability your bootstrap.
By this - we mean the returned interval should be consistent across
runs for the same input. This also consumes more memory and makes
analysis slower. Defaults to 10000.
iteration_batch_size: The bootstrap sample can generate very large
matrices. This argument limits the memory footprint by
batching bootstrap rounds. If unspecified the underlying code
will produce a matrix of len(values) x num_iterations. If specified
the code will produce sets of len(values) x iteration_batch_size
(one at a time) until num_iterations have been simulated.
Defaults to 10. Passing None will calculate the full simulation in one step.
is_pivotal: if true, use the pivotal method for bootstrapping confidence
intervals. If false, use the percentile method.
num_threads: The number of therads to use. This speeds up calculation of
the bootstrap. Defaults to 1. If -1 is specified then
multiprocessing.cpu_count() is used instead.
Returns:
BootstrapResults representing CI and estimated value.
'''
if sequence is None and attributes is None:
raise Exception("Must provide a sequence, or a dictionary of attribute counts ")
if attributes:
i = 0
values = []
for value in attributes.values():
create_list = [i, ] * value
values.extend(create_list)
i += 1
values = _np.random.permutation(values)
else:
values = sequence
values_lists = [_np.asarray(values)]
stat_func_lists = [stat_func]
stat_val = stat_func(values)
distribution_results = _bootstrap_distribution(values_lists,
stat_func_lists,
num_iterations,
iteration_batch_size,
num_threads)
if return_distribution:
val = _np.percentile(distribution_results, 50)
return [i * (stat_val / val) for i in distribution_results]
else:
return _get_confidence_interval(distribution_results, stat_val, alpha,
is_pivotal)