import numpy as np
import matplotlib.pyplot as pl
import scipy.stats as st
from numpy import array, exp, cumsum, asarray
from numpy.linalg import norm
from scipy.integrate import quad
[docs]def is_distribution(p):
p = asarray(p)
return (p >= 0).all() and abs(1 - p.sum()) < 1e-10
[docs]def anneal(p, *, invT=None, T=None):
"p ** (1/T)"
if T is not None:
assert invT is None
invT = 1./T
p = p ** invT
p /= p.sum()
return p
[docs]class Max:
def __init__(self, X, Y):
self.X = X
self.Y = Y
[docs] def cdf(self, t):
return self.X.cdf(t)*self.Y.cdf(t)
[docs] def pdf(self, t):
X = self.X; Y = self.Y
# d/dt[cdf(t)] = d/dt[X.cdf(t)*Y.cdf(t)]
# = X.cdf(t)*d/dt[Y.cdf(t)] + d/dt[X.cdf(t)]*Y.cdf(t)
# = X.cdf(t)*Y.pdf(t) + X.pdf(t)*Y.cdf(t)
return X.pdf(t)*Y.cdf(t) + X.cdf(t)*Y.pdf(t)
[docs] def ppf(self, u):
raise ValueError('no closed form for ppf(max(X,Y))')
[docs] def rvs(self, size):
return np.maximum(self.X.rvs(size), self.Y.rvs(size))
class _BruteForce:
"""
Base class for tabular representation of a distribution
p(x) ∝ score(x).
where score(x) > 0 for all x ∈ domain.
"""
def __init__(self):
self.Z = np.sum([self.score(x) for x in self.domain()])
self.P = {x: self.score(x) / self.Z for x in self.domain()}
def domain(self):
raise NotImplementedError
def score(self, x):
raise NotImplementedError()
def entropy(self):
return -np.sum([p * np.log(p) for p in self.P.values() if p != 0])
def logp(self, x):
return np.log(self.P[x])
[docs]class TruncatedDistribution:
def __init__(self, d, a, b):
assert np.all(a <= b), [a, b]
self.d = d; self.a = a; self.b = b
self.cdf_b = d.cdf(b)
self.cdf_a = d.cdf(a)
self.cdf_w = self.cdf_b - self.cdf_a
[docs] def sf(self, x):
return 1-self.cdf(x)
[docs] def pdf(self, x):
return (self.a < x) * (x <= self.b) * self.d.pdf(x) / self.cdf_w
[docs] def rvs(self, size=None):
u = np.random.uniform(0, 1, size=size)
return self.ppf(u)
[docs] def ppf(self, u):
return self.d.ppf(self.cdf_a + u * self.cdf_w)
[docs] def cdf(self, x):
return np.minimum(1, (self.a < x) * (self.d.cdf(x) - self.cdf_a) / self.cdf_w)
[docs] def mean(self):
# The truncated mean is unfortunately not analytical
return quad(lambda x: x * self.pdf(x), self.a, self.b)[0]
# XXX: The Darth Vader only applies to positive random variables
# http://thirdorderscientist.org/homoclinic-orbit/2013/6/25/the-darth-vader-rule-mdash-or-computing-expectations-using-survival-functions
# https://content.sciendo.com/view/journals/tmmp/52/1/article-p53.xml
# return quad(self.sf, self.a, self.b)[0] + self.a
[docs]def show_distr(D, a, b, resolution=1000):
xs = np.linspace(a, b, resolution)
us = np.linspace(0, 1, resolution)
fig, ax = pl.subplots(figsize=(12,4), ncols=3)
if hasattr(D, 'pdf'):
ax[0].plot(xs, D.pdf(xs))
ax[0].set_ylabel('f(x)')
ax[0].set_xlabel('x')
ax[0].set_title('probability density function')
ax[0].set_xlim(a, b)
ax[1].plot(xs, D.cdf(xs))
ax[1].set_ylabel('F(x)')
ax[1].set_xlabel('x')
ax[1].set_title('cumulative distribution function')
ax[1].set_xlim(a, b)
if hasattr(D, 'ppf'):
ax[2].plot(us, D.ppf(us))
ax[2].set_ylabel('$F^{-1}(u)$')
ax[2].set_xlabel('u')
ax[2].set_xlim(0, 1)
ax[2].set_title('quantile function')
fig.tight_layout()
for a in ax:
# Move left and bottom spines outward by 10 points
a.spines['left'].set_position(('outward', 10))
a.spines['bottom'].set_position(('outward', 10))
# Hide the right and top spines
a.spines['right'].set_visible(False)
a.spines['top'].set_visible(False)
# Only show ticks on the left and bottom spines
a.yaxis.set_ticks_position('left')
a.xaxis.set_ticks_position('bottom')
return ax
[docs]def compare_samples_to_distr(D, samples, a, b, bins):
fig, ax = pl.subplots(figsize=(12,4), ncols=3)
ax[0].hist(samples, bins=bins, color='b', alpha=0.5, density=True,
label='histogram')
xs = np.linspace(a, b, 1000)
ax[0].plot(xs, D.pdf(xs), c='k')
ax[0].set_title('pdf')
E = Empirical(samples)
ax[1].plot(xs, E.cdf(xs), alpha=0.5, linestyle=':')
ax[1].plot(xs, D.cdf(xs), alpha=0.5)
ax[1].set_title('cdf')
us = np.linspace(0, 1, 1000)
ax[2].plot(us, E.ppf(us), alpha=0.5, linestyle=':')
ax[2].plot(us, [D.ppf(u) for u in us], alpha=0.5)
ax[2].set_title('ppf')
for a in ax:
# Move left and bottom spines outward by 10 points
a.spines['left'].set_position(('outward', 10))
a.spines['bottom'].set_position(('outward', 10))
# Hide the right and top spines
a.spines['right'].set_visible(False)
a.spines['top'].set_visible(False)
# Only show ticks on the left and bottom spines
a.yaxis.set_ticks_position('left')
a.xaxis.set_ticks_position('bottom')
fig.tight_layout()
return ax
[docs]def test_truncated_distribution():
import matplotlib.pyplot as pl
import scipy.stats as st
d = st.lognorm(1.25)
t = TruncatedDistribution(d, 2, 4)
print(t.mean(), t.rvs(100_000).mean())
if 1:
us = np.linspace(0.01, 0.99, 1000)
xs = np.linspace(d.ppf(0.01), d.ppf(0.99), 1000)
pl.plot(xs, t.cdf(xs), label='cdf')
pl.plot(t.ppf(us), us, label='ppf')
pl.legend(loc='best')
pl.xlim(0, 6)
pl.show()
if 1:
pl.hist(t.rvs(100000), density=True, bins=200)
pl.plot(xs, t.pdf(xs), c='r', lw=3, alpha=0.75)
pl.xlim(0, 6)
pl.show()
if 1:
us = np.linspace(0.01, 0.99, 100)
for u in us:
v = t.cdf(t.ppf(u))
assert abs(u - v)/abs(u) < 1e-3
if 1:
xs = np.linspace(1, 5, 1000)
for x in xs:
y = t.ppf(t.cdf(x))
if t.pdf(x) > 0:
err = abs(x - y)/abs(x)
assert err < 1e-3, [err, x, y]
[docs]def random_dist(*size):
"""
Generate a random conditional distribution which sums to one over the last
dimension of the input dimensions.
"""
return np.random.dirichlet(np.ones(size[-1]), size=size[:-1])
[docs]def random_psd(n):
return st.wishart.rvs(df=n, scale=np.eye(n)) / n
[docs]class Mixture(object):
"""
Mixture of several densities
"""
def __init__(self, w, ds):
w = array(w)
assert is_distribution(w), \
'w is not a prob. distribution.'
self.ds = ds
self.w = w
[docs] def rvs(self, size=1):
# sample component
i = sample(self.w, size=size)
# sample from component
return array([self.ds[j].rvs() for j in i])
[docs] def pdf(self, x):
return sum([d.pdf(x) * w for w, d in zip(self.w, self.ds)])
[docs] def ppf(self, q):
from scipy.optimize import minimize
x0 = sum([d.ppf(q) * w for w, d in zip(self.w, self.ds)])
return minimize(lambda t: np.sum((self.cdf(t) - q)**2), x0).x
[docs] def cdf(self, x):
return sum([d.cdf(x) * w for w, d in zip(self.w, self.ds)])
[docs] def mean(self):
return sum([d.mean() * w for w, d in zip(self.w, self.ds)])
[docs]def spherical(size):
"Generate random vector from spherical Gaussian."
x = np.random.normal(0, 1, size=size)
x /= norm(x, 2)
return x
# TODO: Should we create a class to represent this data with this the fit
# method? This should really be the MLE of some sort of distrbution. What
# distribution is it? I suppose it is nonparametric in the same sense that
# Kaplan-Meier is nonparametric (In fact, KM generalizes this estimator to
# support censored response).
# TODO: That same class would probably have the defacto mean/std estimators.
[docs]class Empirical:
"""
Empirical CDF of data `a`, returns function which makes values to their
cumulative probabilities.
>>> g = cdf([5, 10, 15])
Evaluate the CDF at a few points
>>> g([5,9,13,15,100])
array([0.33333333, 0.33333333, 0.66666667, 1. , 1. ])
Check that ties are handled correctly
>>> g = cdf([5, 5, 15])
The value p(x <= 5) = 2/3
>>> g([0, 5, 15])
array([0. , 0.66666667, 1. ])
The auantile function should be the inverse of the cdf.
>>> g = cdf([-1, 5, 5, 15])
>>> g.quantile(np.linspace(0, 1, 10))
array([-1, -1, -1, 5, 5, 5, 5, 5, 5, 15])
"""
def __init__(self, x):
self.x = x = np.array(x, copy=True)
[self.n] = x.shape
self.x.sort()
def __call__(self, z):
return self.x.searchsorted(z, 'right') / self.n
cdf = __call__
[docs] def sf(self, z):
return 1-self.cdf(z)
[docs] def conditional_mean(self, a, b):
"E[T | a <= T < b]"
m = 0.0; n = 0.0
for i in range(self.x.searchsorted(a, 'right'),
self.x.searchsorted(b, 'left')):
m += self.x[i]
n += 1
return m / n if n > 0 else np.inf
[docs] def quantile(self, q):
# TODO: this could be made fastet given that x is already sorted.
assert np.all((0 <= q) & (q <= 1))
return np.quantile(self.x, q, interpolation='lower')
[docs] def plot(self, confidence=0.95):
pl.plot(self.x, self.cdf(self.x))
if confidence is not None:
# show confidence bands (derived from the DKW inequality)
# https://bjlkeng.github.io/posts/the-empirical-distribution-function/
# https://en.wikipedia.org/wiki/Dvoretzky%E2%80%93Kiefer%E2%80%93Wolfowitz_inequality
eps = np.sqrt(np.log(2 / (1-confidence)) / (2 * self.n))
pl.fill_between(self.x,
np.maximum(0, self(self.x) - eps), # lower
np.minimum(1, self(self.x) + eps), # upper
alpha=0.1,
label=f'{confidence:.2f} confidence band')
ppf = quantile
cdf = Empirical
[docs]def sample(w, size=None, u=None):
"""
Uses the inverse CDF method to return samples drawn from an (unnormalized)
discrete distribution.
"""
c = cumsum(w)
if u is None:
u = np.random.uniform(0,1,size=size)
return c.searchsorted(u * c[-1])
[docs]def log_sample(w):
"Sample from unnormalized log-distribution."
a = np.array(w, dtype=float)
a -= a.max()
exp(a, out=a)
return sample(a)
[docs]def test_mixture():
d = [st.norm(1, 2), st.norm(2, .5), st.norm(3, .1)]
w = np.array([.2, .7, .1])
D = Mixture(w, d)
S = D.rvs(100_000)
assert abs(D.mean() - S.mean()) < 0.005, abs(D.mean() - S.mean())
a, b = -5, 5
compare_samples_to_distr(D, S, a, b, bins=100)
pl.show()
# if 1:
# us = np.linspace(0.01, 0.99, 100)
# for u in us:
# v = D.cdf(D.ppf(u))
# err = abs(u - v)/abs(u)
# assert err < 1e-3, err
#
# if 1:
# xs = np.linspace(a, b, 100)
# for x in xs:
# y = D.ppf(D.cdf(x))
# if D.pdf(x) > 0:
# err = abs(x - y)/abs(x)
# assert err < 1e-3, [err, x, y]
[docs]def test_cdf():
n = 1_000
x = np.random.normal(0,1,size=n)
cdf(x).plot()
pl.legend(loc='best')
pl.show()
if __name__ == '__main__':
#test_truncated_distribution()
#test_mixture()
from arsenal import testing_framework
testing_framework(globals())