Source code for yail.core

__author__ = "John Kirkham <kirkhamj@janelia.hhmi.org>"
__date__ = "$Oct 20, 2016 11:42$"


import itertools
import math

import toolz.itertoolz

from toolz.itertoolz import (
    accumulate,
    concat,
    count,
    first,
    peek,
    sliding_window,
)

from builtins import (
    range,
    map,
    zip,
)

from future.moves.itertools import (
    zip_longest,
)


[docs]def generator(it): """ Creates a generator type from the iterable. Args: it(iterable): An iterable to make a generator. Returns: generator: A generator made from the iterable. Examples: >>> generator(range(5)) # doctest: +ELLIPSIS <generator object generator at 0x...> >>> list(generator(range(5))) [0, 1, 2, 3, 4] """ for each in it: yield each
[docs]def empty(): """ Creates an empty iterator. Examples: >>> list(empty()) [] """ return iter([])
[docs]def single(val): """ Creates an iterator with a single value. Args: val(any): Single value to add to the iterator. Returns: iterable: An iterable yielding the single value. Examples: >>> list(single(1)) [1] """ yield val
[docs]def cycles(seq, n=1): """ Cycles through the sequence n-times. Basically the same as ``itertools.cycle`` except that this sets an upper limit on how many cycles will be done. Note: If ``n`` is `None`, this is identical to ``itertools.cycle``. Args: seq(iterable): The sequence to grab items from. n(integral): Number of times to cycle through. Returns: generator: The cycled sequence generator. Examples: >>> list(cycles([1, 2, 3], 2)) [1, 2, 3, 1, 2, 3] """ if n is None: return(itertools.cycle(seq)) assert (n >= 0), "n must be positive, but got n = " + repr(n) assert ((n % 1) == 0), "n must be an integer, but got n = " + repr(n) return concat(itertools.tee(seq, n))
[docs]def duplicate(seq, n=1): """ Gets each element multiple times. Like ``itertools.repeat`` this will repeat each element n-times. However, it will do this for each element of the sequence. Args: seq(iterable): The sequence to grab items from. n(integral): Number of repeats for each element. Returns: generator: A generator of repeated elements. Examples: >>> list(duplicate([1, 2, 3], 2)) [1, 1, 2, 2, 3, 3] """ assert (n >= 0), "n must be positive, but got n = " + repr(n) assert ((n % 1) == 0), "n must be an integer, but got n = " + repr(n) return concat(map(lambda _: itertools.repeat(_, n), seq))
[docs]def split(n, seq): """ Splits the sequence around element n. Provides 3 ``iterable``s in return. 1. Everything before the ``n``-th value. 2. An ``iterable`` with just the ``n``-th value. 3. Everything after the ``n``-th value. Args: n(integral): Index to split the iterable at. seq(iterable): The sequence to split. Returns: ``tuple`` of ``iterable``s: Each portion of the iterable around the index. Examples: >>> list(map(tuple, split(2, range(5)))) [(0, 1), (2,), (3, 4)] >>> list(map(tuple, split(2, [10, 20, 30, 40, 50]))) [(10, 20), (30,), (40, 50)] """ front, middle, back = itertools.tee(seq, 3) front = itertools.islice(front, 0, n) middle = itertools.islice(middle, n, n + 1) back = itertools.islice(back, n + 1, None) return front, middle, back
[docs]def indices(*sizes): """ Iterates over a length/shape. Takes a size or sizes (unpacked shape) and iterates through all combinations of the indices. Args: *sizes(int): list of sizes to iterate over. Returns: iterable: an iterator over the sizes. Examples: >>> list(indices(3, 2)) [(0, 0), (0, 1), (1, 0), (1, 1), (2, 0), (2, 1)] """ return(itertools.product(*[range(_) for _ in sizes]))
[docs]def pad(seq, before=0, after=0, fill=None): """ Pads a sequence by a fill value before and/or after. Pads the sequence before and after using the fill value provided by ``fill`` up to the lengths specified by ``before`` and ``after``. If either ``before`` or ``after`` is ``None``, pad the fill value infinitely on the respective end. Note: If ``before``is ``None``, the sequence will only be the fill value. Args: seq(iterable): Sequence to pad. before(integral): Amount to pad before. after(integral): Amount to pad after. fill(any): Some value to pad with. Returns: iterable: A sequence that has been padded. Examples: >>> list(pad(range(2, 4), before=1, after=2, fill=0)) [0, 2, 3, 0, 0] """ all_seqs = [] if before is None: return itertools.repeat(fill) elif before > 0: all_seqs.append(itertools.repeat(fill, before)) all_seqs.append(seq) if after is None: all_seqs.append(itertools.repeat(fill)) elif after > 0: all_seqs.append(itertools.repeat(fill, after)) return concat(all_seqs)
[docs]def sliding_window_filled(seq, n, pad_before=False, pad_after=False, fillvalue=None): """ A sliding window with optional padding on either end.. Args: seq(iter): an iterator or something that can be turned into an iterator n(int): number of generators to create as lagged pad_before(bool): whether to continue zipping along the longest generator pad_after(bool): whether to continue zipping along the longest generator fillvalue: value to use to fill generators shorter than the longest. Returns: generator object: a generator object that will return values from each iterator. Examples: >>> list(sliding_window_filled(range(5), 2)) [(0, 1), (1, 2), (2, 3), (3, 4)] >>> list(sliding_window_filled(range(5), 2, pad_after=True)) [(0, 1), (1, 2), (2, 3), (3, 4), (4, None)] >>> list(sliding_window_filled(range(5), 2, pad_before=True, pad_after=True)) [(None, 0), (0, 1), (1, 2), (2, 3), (3, 4), (4, None)] """ if pad_before and pad_after: seq = pad( seq, before=(n - 1), after=(n - 1), fill=fillvalue ) elif pad_before: seq = pad( seq, before=(n - 1), fill=fillvalue ) elif pad_after: seq = pad( seq, after=(n - 1), fill=fillvalue ) return(sliding_window(n, seq))
[docs]def subrange(start, stop=None, step=None, substep=None): """ Generates start and stop values for each subrange. Args: start(int): First value in range (or last if only specified value) stop(int): Last value in range step(int): Step between each range substep(int): Step within each range Yields: range: A subrange within the larger range. Examples: >>> list(map(list, subrange(5))) [[0], [1], [2], [3], [4]] >>> list(map(list, subrange(0, 12, 3, 2))) [[0, 2], [3, 5], [6, 8], [9, 11]] """ if stop is None: stop = start start = 0 if step is None: step = 1 if substep is None: substep = 1 range_ends = itertools.chain(range(start, stop, step), [stop]) for i, j in sliding_window(2, range_ends): yield(range(i, j, substep))
[docs]def disperse(seq): """ Similar to range except that it recursively proceeds through the given range in such a way that values that follow each other are preferably not only non-sequential, but fairly different. This does not always work with small ranges, but works nicely with large ranges. Args: a(int): the lower bound of the range b(int): the upper bound of the range Returns: result(generator): a generator that can be used to iterate through the sequence. Examples: >>> list(disperse(range(10))) [0, 5, 8, 3, 9, 4, 6, 1, 7, 2] """ try: len_seq = len(seq) except TypeError: seq, len_seq = itertools.tee(seq) len_seq = count(len_seq) def disperse_helper(b, part_seq_1): if b != 0: half_diff = float(b) / 2.0 mid_1 = int(math.floor(half_diff)) mid_2 = int(math.ceil(half_diff)) if 0 < mid_1 and b > mid_2: part_seq_1, part_seq_2 = itertools.tee(part_seq_1) front_mid_1_seq, mid_1_val, _ = split(mid_1, part_seq_1) _, mid_2_val, back_mid_2_seq = split(mid_2, part_seq_2) del _ mid_2_val = itertools.tee(mid_2_val) back_mid_2_seq = concat([mid_2_val[0], back_mid_2_seq]) mid_2_val = mid_2_val[1] yield(first(mid_2_val)) for _1, _2 in zip( disperse_helper(mid_1 - 0, front_mid_1_seq), disperse_helper(b - mid_2, back_mid_2_seq) ): yield(_2) yield(_1) if mid_1 != mid_2: yield(first(mid_1_val)) if len_seq == 0: return val, seq = peek(seq) yield(val) for each in disperse_helper(len_seq, seq): yield(each)