"""
Pareto frontier
"""
import matplotlib.pyplot as pl
import numpy as np
from arsenal.terminal import colors
from arsenal.iterextras import window
[docs]def pareto_frontier(X, Y, maxX=True, maxY=True, indices=False):
"""Determine Pareto frontier, returns list of sorted points.
Args:
X, Y: data.
maxX, maxY: (bool) whether to maximize or minimize along respective
coordinate.
"""
assert len(X) == len(Y)
if len(X) == 0:
return []
xx = -1 if maxX else +1
yy = -1 if maxY else +1
key = lambda xy: (xy[0] * xx, xy[1] * yy) # need to break ties
a = sorted(zip(X, Y, list(range(len(X)))), key=key)
frontier = []
lastx = float('inf') * xx
lasty = float('inf') * yy
for xy in a:
x,y,i = xy
if yy*y <= yy*lasty:
if lastx != x:
frontier.append(xy)
lasty = y
lastx = x
x,y,i = list(zip(*frontier))
return list(i) if indices else list(zip(x,y))
[docs]def pareto_ix(X, Y, *a, **kw):
"Determine Pareto frontier, returns list of indices"
return pareto_frontier(X, Y, *a, **kw)
# assert len(X) == len(Y)
# if len(X) == 0:
# return []
# a = sorted(zip(X, Y, range(len(X))), reverse=maxX)
# frontier = []
# lastx = float('-inf') if maxX else float('+inf')
# lasty = float('-inf') if maxY else float('+inf')
# for xyi in a:
# x,y,i = xyi
# if maxY:
# if y >= lasty:
# if lastx != x:
# frontier.append(i)
# lasty = y
# lastx = x
# else:
# if y <= lasty:
# if lastx != x:
# frontier.append(i)
# lasty = y
# lastx = x
# return frontier
[docs]def show_frontier(X, Y, maxX=False, maxY=True, dots=False,
XMIN=None, XMAX=None,
YMIN=None, YMAX=None,
ax=None, label=None,
interpolation='pessimistic', **style):
"""Plot Pareto frontier.
Args:
X, Y: data.
maxX, maxY: (bool) whether to maximize or minimize along respective
coordinate.
dots: (bool) highlight points on the frontier (will use same color as
`style`).
ax: use an existing axis if non-null.
style: keyword arguments, which will be passed to lines connecting the
points on the Pareto frontier.
XMAX: max value along x-axis
YMIN: min value along y-axis
"""
if ax is None:
ax = pl.gca()
sty = {'c': 'b', 'alpha': 0.3, 'zorder': 0}
sty.update(style)
if interpolation == 'linear-convex':
# Convex hull by itself doesn't work, but what we have is ok because its
# the intersection of the convex hull with the pareto frontier, which is
# handled below.
from scipy.spatial import ConvexHull
X = np.array(X)
Y = np.array(Y)
hull = ConvexHull(np.array([X,Y]).T)
X = X[hull.vertices]
Y = Y[hull.vertices]
# assert not maxX and maxY, 'need to update some hardcoded logic'
f = pareto_frontier(X, Y, maxX=maxX, maxY=maxY)
if not f:
print(colors.yellow % '[warn] Empty frontier')
return
if dots:
xs, ys = list(zip(*f))
ax.scatter(xs, ys, lw=0, alpha=0.5, c=sty['c'])
XMIN = min(min(X), XMIN) if XMIN is not None else min(X)
XMAX = max(max(X), XMAX) if XMAX is not None else max(X)
YMIN = min(min(Y), YMIN) if YMIN is not None else min(Y)
YMAX = max(max(Y), YMAX) if YMAX is not None else max(Y)
# if 0:
# # This version lets you clip away points. I found it a little problematic to use.
# XMIN = XMIN if XMIN is not None else min(X)
# XMAX = XMAX if XMAX is not None else max(X)
# YMIN = YMIN if YMIN is not None else min(Y)
# YMAX = YMAX if YMAX is not None else max(Y)
# if (max(X) > XMAX or min(X) < XMIN or min(Y) > YMIN or max(Y) < YMAX):
# print '[PARETO] Warning: data out of bounds!'
# Connect corners of frontier. The first and last points on frontier have
# lines which surround the point cloud.
# TODO: make this look nicer...
if maxX and maxY:
f = [(XMAX, YMIN)] + f + [(XMIN, YMAX)]
elif not maxX and maxY:
f = [(XMIN, YMIN)] + f + [(XMAX, YMAX)]
elif maxX and not maxY:
f = [(XMAX, YMAX)] + f + [(XMIN, YMIN)]
else:
f = [(XMIN, YMAX)] + f + [(XMAX, YMIN)]
if interpolation == 'pessimistic':
# Make line segments from adjacent points
pts = np.array([x for ((a,b), (c,d)) in window(f, 2) for x in [[a,b], [c,b], [c,b], [c,d]]])
elif interpolation in {'linear','linear-convex'}:
# Make line segments from adjacent points
pts = np.array([x for ((a,b), (c,d)) in window(f, 2) for x in [[a,b], [c,d]]])
# Plot
ax.plot(pts[:,0], pts[:,1], label=label, **sty)
return pts
[docs]class Pareto(object):
def __init__(self, df, xcol, ycol, ax=None):
self.df = df
self.xcol = xcol
self.ycol = ycol
self.ax = ax
self.frontier = pareto_ix(df[xcol], df[ycol])
assert np.isfinite(df[xcol]).all() and np.isfinite(df[ycol]).all(), \
'Pareto: `DataFrame` contains non-finite values.'
[docs] def scatter(self, **plot_kwargs):
if self.ax is None:
self.ax = pl.gca()
kwargs = dict(lw=0)
kwargs.update(plot_kwargs)
self.ax.scatter(self.df[self.xcol], self.df[self.ycol], **kwargs)
[docs] def show_frontier(self, **plot_kwargs):
if self.ax is None:
self.ax = pl.gca()
show_frontier(self.df[self.xcol], self.df[self.ycol], ax=self.ax, **plot_kwargs)
[docs] def lookup_x(self, x):
"""
Find Pareto point constrained by x.
argmax p.y
p: p.x <= x
"""
s = self.df
s = s[s[self.xcol] <= x] # filter
if s.empty:
return np.nan
yy = s[self.ycol].argmax()
return s.ix[yy][self.ycol]
[docs] def lookup_y(self, y):
"""
Find Pareto point constrained by y.
argmax p.x
p: p.x >= y
"""
s = self.df
s = s[s[self.ycol] >= y] # filter
if s.empty:
return np.nan
xx = s[self.xcol].argmin()
return s.ix[xx][self.xcol]
[docs]def test():
if 0:
from pandas import DataFrame
X = np.linspace(0.01, 1.0, 10)
Y = np.log(X)
Y -= Y.min()
Y /= Y.max()
Y *= 0.95
df = DataFrame({'X': X, 'Y': Y})
P = Pareto(df, 'X', 'Y')
data = []
for val in np.linspace(0,1,15):
data.append(dict(val=val, x=P.lookup_x(val), y=P.lookup_y(val)))
pl.axvline(val, alpha=.5)
pl.axhline(val, alpha=.5)
dd = DataFrame(data)
pl.scatter(dd.y, dd.val, lw=0, c='r')
pl.scatter(dd.val, dd.x, lw=0, c='g')
print(dd)
#P.scatter(c='r', lw=0)
P.show_frontier(c='r', lw=4)
pl.show()
X,Y = np.random.normal(0,1,size=(2, 30))
for maxX in [0,1]:
for maxY in [0,1]:
pl.figure()
pl.title('max x: %s, max y: %s' % (maxX, maxY))
pl.scatter(X,Y,lw=0)
show_frontier(X, Y, maxX=maxX, maxY=maxY)
pl.show()
if __name__ == '__main__':
test()