Source code for vaex.delayed

__author__ = 'maartenbreddels'
import aplus
import os

"""
Mini library to make working with promises/futures a bit more Python like.

Example:

@delayed
def f(grid):
    return grid**2

f(ds.count(delay=True))

See tests/delayed_test.py for more examples
"""


def promisify(value):
    # TODO, support futures etc..
    if isinstance(value, aplus.Promise):
        return value
    if isinstance(value, (list, tuple)):
        return aplus.listPromise(*list([promisify(k) for k in value]))
    if isinstance(value, dict):
        return aplus.dictPromise({k: promisify(v) for k, v in value.items()})
    else:
        return aplus.Promise.fulfilled(value)


def _log_error(name):
    def _wrapped(exc):
        if os.environ.get('VAEX_DEBUG', False):
            print(f"*** DEBUG: Error from {name}", exc)
        # import vaex
        # vaex.utils.print_stack_trace()
        raise exc
    return _wrapped


[docs]def delayed(f): '''Decorator to transparantly accept delayed computation. Example: >>> delayed_sum = ds.sum(ds.E, binby=ds.x, limits=limits, >>> shape=4, delay=True) >>> @vaex.delayed >>> def total_sum(sums): >>> return sums.sum() >>> sum_of_sums = total_sum(delayed_sum) >>> ds.execute() >>> sum_of_sums.get() See the tutorial for a more complete example https://docs.vaex.io/en/latest/tutorial.html#Parallel-computations ''' def wrapped(*args, **kwargs): # print "calling", f, "with", kwargs # key_values = kwargs.items() key_promise = list([(key, promisify(value)) for key, value in kwargs.items()]) # key_promise = [(key, promisify(value)) for key, value in key_values] arg_promises = list([promisify(value) for value in args]) kwarg_promises = list([promise for key, promise in key_promise]) promises = arg_promises + kwarg_promises for promise in promises: def echo_error(exc, promise=promise): print("error with ", promise, "exception is", exc) raise exc def echo(value, promise=promise): print("done with ", repr(promise), "value is", value) return value # promise.then(echo, echo_error) # print promises allarguments = aplus.listPromise(*promises) def call(_): kwargs_real = {key: promise.get() for key, promise in key_promise} args_real = list([promise.get() for promise in arg_promises]) return f(*args_real, **kwargs_real) return allarguments.then(call, _log_error("delayed decorator")) return wrapped
@delayed def delayed_args(*args): return args @delayed def delayed_kwargs(**kwargs): return kwargs @delayed def delayed_list(l): return delayed_args(*l) @delayed def delayed_dict(d): return delayed_kwargs(**d) @delayed def delayed_apply(f, args, kwargs): @delayed def internal(f, args, kwargs): return f(*args, **kwargs) return internal(f, args, kwargs)