Source code for arsenal.iterextras.util

import heapq
from operator import getitem, itemgetter
from itertools import chain, count, cycle, islice, repeat, tee
from random import shuffle
from collections import defaultdict
from arsenal.iterview import iterview


[docs]class head_iter: def __init__(self, iterator): self.done = False self.head = None self.tail = iter(iterator) self.__next__() def __lt__(self, other): return self.head < other.head def __next__(self): h = self.head try: self.head = self.tail.__next__() except StopIteration: self.done = True self.head = None return h
[docs]class buf_iter: "Lazy, random access wrapper around an iterator." def __init__(self, x): self.buf = [] self.x = iter(x) self.j = 0 # num things emitted def __iter__(self): return self def __next__(self): try: y = self[self.j] self.j += 1 return y except IndexError: raise StopIteration() def __len__(self): for _ in self: pass return len(self.buf) def __getitem__(self, i): k = i - len(self.buf) while k >= 0: try: self.buf.append(self.x.__next__()) except StopIteration: raise IndexError(i) k -= 1 return self.buf[i]
[docs]def argmax(f, seq): """ >>> argmax(lambda x: -x**2 + 1, range(-10,10)) 0 """ return argmax2(f,seq)[1]
[docs]def argmax2(f, seq): """ >>> argmax2(lambda x: -x**2 + 1, range(-10,10)) (1, 0) """ return max(((f(x),x) for x in seq), key=itemgetter(0))
[docs]def argmin(f, seq): """ >>> argmin(lambda x: x**2 + 1, range(-10,10)) 0 """ return argmin2(f,seq)[1]
[docs]def argmin2(f, seq): """ >>> argmin2(lambda x: x**2 + 1, range(-10,10)) (1, 0) """ return min(((f(x),x) for x in seq), key=itemgetter(0))
[docs]def groupby2(s, key=lambda x: x): """ Eager version of groupby which does what you'd expect groupby to do. >>> groupby2(range(10), lambda x: x % 2) {0: [0, 2, 4, 6, 8], 1: [1, 3, 5, 7, 9]} """ groups = defaultdict(list) for x in s: groups[key(x)].append(x) return dict(groups)
[docs]def atmost(k, seq): """ >>> atmost(1, [0,0,0]) True >>> atmost(1, [0,1,0]) True >>> atmost(1, [0,1,1]) False """ cnt = 0 if cnt > k: return False for x in seq: if x: cnt += 1 if cnt > k: return False return True
[docs]def partition(data, proportion): """ Deterministically partition `data` according to proportion. Note: assumes sum(proportion) <= 1.0 >>> partition(range(10), [0.3, 0.7]) [[0, 1, 2], [3, 4, 5, 6, 7, 8, 9]] """ assert(sum(proportion) <= 1) data = list(data) D = iter(data) N = len(data) return [list(take(j, D)) for j in [int(p*N) for p in proportion]]
[docs]def breadth_first(tree, children=iter, depth=-1, queue=None): """Traverse the nodes of a tree in breadth-first order. - (No need to check for cycles.) - The first argument should be the tree root; - children should be a function taking as argument a tree node and returning an iterator of the node's children. """ if queue is None: queue = [] queue.append(tree) while queue: node = queue.pop(0) yield node if depth != 0: try: queue += children(node) depth -= 1 except (StopIteration, TypeError, IndexError): pass
[docs]def compress(data, selectors): """ >>> list(compress('ABCDEF', [1,0,1,0,1,1])) ['A', 'C', 'E', 'F'] """ return (d for d, s in zip(data, selectors) if s)
[docs]def k_fold_cross_validation(X, K, randomize=False): """ Generates K (training, validation) pairs from the items in X. Each pair is a partition of X, where validation is an iterable of length len(X)/K. So each training iterable is of length (K-1)*len(X)/K. If randomise is true, a copy of X is shuffled before partitioning, otherwise its order is preserved in training and validation. >>> for train, test in k_fold_cross_validation(range(3), 3): ... print('test:', test, ' train:', list(train), sep=' ') test: [0] train: [1, 2] test: [1] train: [0, 2] test: [2] train: [0, 1] """ if randomize: X = list(X) shuffle(X) # folds = [X[i::K] for i in xrange(K)] # run thru X once and splitting into folds folds = [[] for k in range(K)] for i, item in enumerate(X): folds[i % K].append(item) for k in range(K): training = chain(*(fold for j, fold in enumerate(folds) if j != k)) yield training, folds[k]
[docs]def unique(iterable, key=None): """ List unique elements, preserving order. Remember all elements ever seen. """ # unique_everseen('AAAABBBCCDAABBB') --> A B C D # unique_everseen('ABBCcAD', str.lower) --> A B C D seen = set() seen_add = seen.add if key is None: for element in iterable: if element not in seen: seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element
[docs]def window(iterable, k): """ s -> (s[0],...,s[k]) (s[1],...,s[k+1]) ... (s[i],...,s[i+k]) ... (s[n-1-k],...,s[n-1]) >>> [x+y+z for x,y,z in window('abcdef', 3)] ['abc', 'bcd', 'cde', 'def'] """ iterators = tee(iterable, k) for i, it in enumerate(iterators): # advance iterator, 'it', by i steps for _ in range(i): try: next(it) # advance by one except StopIteration: pass return zip(*iterators)
#_______________________________________________________________________________ #
[docs]def drop(iterator, n): "Advance the iterator n-steps ahead." # advance to the emtpy slice starting at position n next(islice(iterator, n, n), None)
[docs]def take(n, seq): """ Return the first n items in a sequence. """ return islice(seq, None, n)
[docs]def padnone(seq): """ Returns the sequence elements and then returns None indefinitely """ return chain(seq, repeat(None))
[docs]def ncycles(seq, n): """ Returns the sequence elements n times """ return chain(*repeat(seq, n))
[docs]def batch(size, iterable): """Yield a list of (up to) batchsize items at a time. The last element will be shorter if there are items left over. batch(s, 2) -> [s0,s1], [s2,s3], [s4, s5], ... >>> list(batch(2, range(5))) [[0, 1], [2, 3], [4]] """ current = [] for item in iterable: current.append(item) if len(current) == size: yield current current = [] if current: yield current
if __name__ == '__main__': import doctest doctest.testmod()