Source code for vaex.agg

from functools import reduce
import operator
import os
import sys
import logging

import numpy as np
import pyarrow as pa

import dask.base
from vaex.expression import Expression
from vaex.utils import _normalize_selection_name
from .expression import _unary_ops, _binary_ops, reversable


from .stat import _Statistic
from vaex import encoding
from .datatype import DataType
from .docstrings import docsubst
import vaex.utils


list_ = list # dangerous, we override list in this module (the aggregator)

logger = logging.getLogger("vaex.agg")

if vaex.utils.has_c_extension:
    import vaex.superagg

_min = min  # we're gonna overwrite builtin min
aggregates = {}


def register(f, name=None):
    name = name or f.__name__
    aggregates[name] = f
    return f


@encoding.register('aggregation')
class aggregation_encoding:
    @staticmethod
    def encode(encoding, agg):
        return agg.encode(encoding)

    @staticmethod
    def decode(encoding, agg_spec):
        agg_spec = agg_spec.copy()
        type = agg_spec.pop('aggregation')
        f = aggregates[type]
        args = []
        if type == '_sum_moment':
            if 'parameters' in agg_spec:  # renameing between spec and implementation
                agg_spec['moment'] = agg_spec.pop('parameters')[0]
        if 'expressions' in agg_spec:
            args = agg_spec.pop('expressions')
        if type == "list":
            if "parameters" in agg_spec:  # renameing between spec and implementation
                agg_spec["dropnan"], agg_spec["dropmissing"] = agg_spec.pop("parameters")
        return f(*args, **agg_spec)


class AggregatorDescriptor:
    def __repr__(self):
        args = [*self.expressions]
        return 'vaex.agg.{}({!r})'.format(self.short_name, ", ".join(map(str, args)))

    def pretty_name(self, id, df):
        if id is None:
            id = "_".join(map(lambda k: df[k]._label, self.expressions))
        return '{0}_{1}'.format(id, self.short_name)

    def finish(self, value):
        return value


class AggregatorExpressionUnary(AggregatorDescriptor):
    def __init__(self, name, op, code, agg):
        self.agg = agg
        self.name = name
        self.op = op
        self.code = code
        self.expressions = self.agg.expressions
        self.selection = self.agg.selection

    def __repr__(self):
        return f'{self.code}{self.agg!r}'

    @property
    def edges(self):
        return self.agg.edges

    @edges.setter
    def edges(self, value):
        self.agg.edges = value

    def add_tasks(self, df, binners, progress):
        tasks, result = self.agg.add_tasks(df, binners, progress)
        @vaex.delayed
        def finish(value):
            return self.finish(value)
        return tasks, finish(result)

    def finish(self, value):
        return self.op(value)


class AggregatorExpressionBinary(AggregatorDescriptor):
    def __init__(self, name, op, code, agg1, agg2, reverse=False):
        self.agg1 = agg1
        self.agg2 = agg2
        self.reverse = reverse
        self.name = name
        self.op = op
        self.code = code
        self.expressions = self.agg1.expressions + self.agg2.expressions
        self.selection = self.agg1.selection
        self.short_name = f'{self.code}{self.agg2.short_name}'
        if self.agg1.selection != self.agg2.selection:
            raise ValueError(f'Selections of aggregator for binary op {self.op} should be the same')

    def __repr__(self):
        if self.reverse:
            return f'({self.agg2!r} {self.code} {self.agg1!r})'
        else:
            return f'({self.agg1!r} {self.code} {self.agg2!r})'

    @property
    def edges(self):
        assert self.agg1.edges == self.agg2.edges
        return self.agg1.edges

    @edges.setter
    def edges(self, value):
        self.agg1.edges = value
        self.agg2.edges = value

    def add_tasks(self, df, binners, progress):
        progressbar = vaex.utils.progressbars(progress, title=repr(self))
        tasks1, result1 = self.agg1.add_tasks(df, binners, progress=progressbar)
        tasks2, result2 = self.agg2.add_tasks(df, binners, progress=progressbar)
        @vaex.delayed
        def finish(value1, value2):
            return self.finish(value1, value2)
        return tasks1 + tasks2, finish(result1, result2)

    def finish(self, value1, value2):
        if self.reverse:
            return self.op(value2, value1)
        return self.op(value1, value2)


class AggregatorExpressionBinaryScalar(AggregatorDescriptor):
    def __init__(self, name, op, code, agg, scalar, reverse=False):
        self.agg = agg
        self.scalar = scalar
        self.name = name
        self.code = code
        self.op = op
        self.reverse = reverse
        self.expressions = self.agg.expressions
        self.selection = self.agg.selection

    def __repr__(self):
        if self.reverse:
            return f'({self.scalar!r} {self.code} {self.agg!r})'
        else:
            return f'({self.agg!r} {self.code} {self.scalar!r})'

    @property
    def edges(self):
        return self.agg.edges

    @edges.setter
    def edges(self, value):
        self.agg.edges = value

    def add_tasks(self, df, binners, progress):
        progressbar = vaex.utils.progressbars(progress, title=repr(self))
        tasks, result = self.agg.add_tasks(df, binners, progress=progressbar)
        @vaex.delayed
        def finish(value):
            return self.finish(value)
        return tasks, finish(result)

    def finish(self, value):
        if self.reverse:
            return self.op(self.scalar, value)
        return self.op(value, self.scalar)


for op in _binary_ops:
    def wrap(op=op):
        def f(a, b):
            if isinstance(a, AggregatorDescriptor):
                if isinstance(b, AggregatorDescriptor):
                    return AggregatorExpressionBinary(op['name'], op['op'], op['code'], a, b)
                else:
                    return AggregatorExpressionBinaryScalar(op['name'], op['op'], op['code'], a, b)
            else:
                raise RuntimeError('Cannot happen')
        setattr(AggregatorDescriptor, '__%s__' % op['name'], f)
        if op['name'] in reversable:
            def f(a, b):
                if isinstance(a, AggregatorDescriptor):
                    if isinstance(b, AggregatorDescriptor):
                        raise RuntimeError('Cannot happen')
                    else:
                        return AggregatorExpressionBinaryScalar(op['name'], op['op'], op['code'], a, b, reverse=True)
                else:
                    raise RuntimeError('Cannot happen')
            setattr(AggregatorDescriptor, '__r%s__' % op['name'], f)
    wrap(op)
for op in _unary_ops:
    def wrap(op=op):
        def f(a):
            return AggregatorExpressionUnary(op['name'], op['op'], op['code'], a)
        setattr(AggregatorDescriptor, '__%s__' % op['name'], f)
    wrap(op)


class AggregatorDescriptorBasic(AggregatorDescriptor):
    def __init__(self, name, expressions, short_name, multi_args=False, agg_args=[], selection=None, edges=False):
        self.name = name
        self.short_name = short_name
        self.agg_args = agg_args
        self.edges = edges
        self.selection = _normalize_selection_name(selection)
        assert isinstance(expressions, (list_, tuple))
        for e in expressions:
            assert not isinstance(e, (list_, tuple))
        self.expressions = [str(k) for k in expressions]
        if len(self.expressions) == 1 and self.expressions[0] == '*':
            self.expressions = []

    def __repr__(self):
        args = [*self.expressions, *self.agg_args]
        return 'vaex.agg.{}({!r})'.format(self.short_name, ", ".join(map(str, args)))

    def encode(self, encoding):
        spec = {'aggregation': self.short_name}
        if len(self.expressions) == 0:
            pass
        else:
            spec['expressions'] = [str(k) for k in self.expressions]
        if self.selection is not None:
            spec['selection'] = str(self.selection) if isinstance(self.selection, Expression) else self.selection
        if self.edges:
            spec['edges'] = True
        if self.agg_args and self.short_name not in ['first', 'last']:
            spec['parameters'] = self.agg_args
        return spec

    def _prepare_types(self, df):
        if len(self.expressions) == 0 and self.short_name == "count":
            self.dtype_in = DataType(np.dtype('int64'))
            self.dtype_out = DataType(np.dtype('int64'))
        else:
            self.dtypes_in = [df[str(e)].data_type().index_type for e in self.expressions]
            self.dtype_in = self.dtypes_in[0]
            self.dtype_out = self.dtype_in
            if self.short_name == "count":
                self.dtype_out = DataType(np.dtype('int64'))
            if self.short_name in ['sum', 'summoment']:
                self.dtype_out = self.dtype_in.upcast()

    def add_tasks(self, df, binners, progress):
        progressbar = vaex.utils.progressbars(progress)
        self._prepare_types(df)
        task = vaex.tasks.TaskAggregation(df, binners, self)
        task = df.executor.schedule(task)
        progressbar.add_task(task, repr(self))
        @vaex.delayed
        def finish(value):
            return self.finish(value)
        return [task], finish(task)

    def _create_operation(self, grid, nthreads):
        if self.name in ["AggFirst", "AggList"]:
            if len(self.dtypes_in) == 1:
                # rows use int64
                agg_op_type = vaex.utils.find_type_from_dtype(vaex.superagg, self.name + "_", self.dtypes_in[0], vaex.dtype(np.dtype('int64')))
            else:
                agg_op_type = vaex.utils.find_type_from_dtype(vaex.superagg, self.name + "_", self.dtypes_in[0], self.dtypes_in[1])
        else:
            agg_op_type = vaex.utils.find_type_from_dtype(vaex.superagg, self.name + "_", self.dtype_in)
        if self.dtype_out.is_primitive or self.dtype_out.is_temporal:
            bytes_per_cell = self.dtype_out.numpy.itemsize
        else:
            bytes_per_cell = self.dtype_out.value_type.numpy.itemsize
        cells = reduce(operator.mul, [len(binner) for binner in grid.binners], 1)
        grids = nthreads
        ncells = len(grid)
        # the more grid cells we have, the less more work have to do to
        # merge/reduce the grids
        if ncells >= 1e4:
            grids = _min(32, nthreads)
        if ncells >= 1e5:
            grids = _min(16, nthreads)
        if ncells >= 1e6:
            grids = _min(8, nthreads)
        if grids < 1:
            grids = 1
        if logger.isEnabledFor(logging.INFO):
            logger.info("Using %r grids for %r thread for aggerator %r (total grid cells %s)", grids, nthreads, self, f"{ncells:,}")
        if self.short_name in ["list"]:
            # cannot predict memory usage
            predicted_memory_usage = None
            grids = 1
        else:
            predicted_memory_usage = bytes_per_cell * cells * grids
            vaex.memory.local.agg.pre_alloc(predicted_memory_usage, f"aggregator data for {agg_op_type}")
        agg_op = agg_op_type(grid, grids, nthreads, *self.agg_args)
        used_memory = sys.getsizeof(agg_op)

        if predicted_memory_usage is not None:
            if predicted_memory_usage != used_memory:
                raise RuntimeError(f'Wrong prediction for {agg_op_type}, expected to take {predicted_memory_usage} bytes but actually used {used_memory}')
        else:
            vaex.memory.local.agg.pre_alloc(used_memory, f"aggregator data for {agg_op_type}")
        return agg_op

    def get_result(self, agg_operation):
        grid = agg_operation.get_result()
        if not self.edges:
            def binner2slice(binner):
                if 'BinnerScalar_' in str(binner):
                    return slice(2, -1)
                elif 'BinnerOrdinal_' in str(binner):
                    return slice(0, -2)
                else:
                    raise TypeError(f'Binner not supported with edges=False {binner}')
            slices = [binner2slice(binner) for binner in agg_operation.grid.binners]
            grid = grid[tuple(slices)]
        return grid


class AggregatorDescriptorNUnique(AggregatorDescriptorBasic):
    def __init__(self, name, expression, short_name, dropmissing, dropnan, selection=None, edges=False):
        super(AggregatorDescriptorNUnique, self).__init__(name, expression, short_name, selection=selection, edges=edges)
        self.dropmissing = dropmissing
        self.dropnan = dropnan

    def encode(self, encoding):
        spec = super().encode(encoding)
        if self.dropmissing:
            spec['dropmissing'] = self.dropmissing
        if self.dropnan:
            spec['dropnan'] = self.dropnan
        return spec

    def _prepare_types(self, df):
        super()._prepare_types(df)
        self.dtype_out = DataType(np.dtype('int64'))

    def _create_operation(self, grid, nthreads):
        grids = 1  # this is using a shared hashmap, which is thread safe
        agg_op_type = vaex.utils.find_type_from_dtype(vaex.superagg, self.name + "_", self.dtype_in)
        cells = reduce(operator.mul, [len(binner) for binner in grid.binners], 1)
        grid0 = vaex.superagg.Grid([])
        agg_op_test = agg_op_type(grid0, grids, nthreads, self.dropmissing, self.dropnan)
        predicted_memory_usage = sys.getsizeof(agg_op_test) * cells
        vaex.memory.local.agg.pre_alloc(predicted_memory_usage, f"aggregator data for {agg_op_type}")
        agg_op = agg_op_type(grid, grids, nthreads, self.dropmissing, self.dropnan)
        used_memory = sys.getsizeof(agg_op)
        if predicted_memory_usage != used_memory:
            raise RuntimeError(f'Wrong prediction for {agg_op_type}, expected to take {predicted_memory_usage} bytes but actually used {used_memory}')

        return agg_op


[docs]class AggregatorDescriptorMulti(AggregatorDescriptor): """Uses multiple operations/aggregation to calculate the final aggretation""" def __init__(self, name, expressions, short_name, selection=None, edges=False): self.name = name self.short_name = short_name self.expressions = expressions self.selection = selection self.edges = edges assert isinstance(expressions, (list_, tuple)) for e in expressions: assert not isinstance(e, (list_, tuple)) self.expressions = [str(k) for k in expressions]
[docs]class AggregatorDescriptorMean(AggregatorDescriptorMulti): def __init__(self, name, expressions, short_name="mean", selection=None, edges=False): super(AggregatorDescriptorMean, self).__init__(name, expressions, short_name, selection=selection, edges=edges) assert len(expressions) == 1 def add_tasks(self, df, binners, progress): progressbar = vaex.utils.progressbars(progress, title=repr(self)) expression = expression_sum = expression = df[str(self.expressions[0])] sum_agg = sum(expression_sum, selection=self.selection, edges=self.edges) count_agg = count(expression, selection=self.selection, edges=self.edges) task_sum = sum_agg.add_tasks(df, binners, progress=progressbar)[0][0] task_count = count_agg.add_tasks(df, binners, progress=progressbar)[0][0] self.dtype_in = sum_agg.dtype_in self.dtype_out = sum_agg.dtype_out @vaex.delayed def finish(sum, count): sum = np.array(sum) dtype = sum.dtype sum_kind = sum.dtype.kind if sum_kind == 'M': sum = sum.view('uint64') count = count.view('uint64') with np.errstate(divide='ignore', invalid='ignore'): mean = sum / count if dtype.kind != mean.dtype.kind and sum_kind == "M": # TODO: not sure why view does not work mean = mean.astype(dtype) return mean return [task_sum, task_count], finish(task_sum, task_count)
[docs]class AggregatorDescriptorVar(AggregatorDescriptorMulti): def __init__(self, name, expression, short_name="var", ddof=0, selection=None, edges=False): super(AggregatorDescriptorVar, self).__init__(name, expression, short_name, selection=selection, edges=edges) self.ddof = ddof def add_tasks(self, df, binners, progress): progressbar = vaex.utils.progressbars(progress, title=repr(self)) expression_sum = expression = df[str(self.expressions[0])] expression = expression_sum = expression.astype('float64') sum_moment = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges) sum_ = sum(str(expression_sum), selection=self.selection, edges=self.edges) count_ = count(str(expression), selection=self.selection, edges=self.edges) task_sum_moment = sum_moment.add_tasks(df, binners, progress=progressbar)[0][0] task_sum = sum_.add_tasks(df, binners, progress=progressbar)[0][0] task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0] self.dtype_in = sum_.dtype_in self.dtype_out = sum_.dtype_out @vaex.delayed def finish(sum_moment, sum, count): sum = np.array(sum) dtype = sum.dtype if sum.dtype.kind == 'M': sum = sum.view('uint64') sum_moment = sum_moment.view('uint64') count = count.view('uint64') with np.errstate(divide='ignore', invalid='ignore'): mean = sum / count raw_moments2 = sum_moment/count variance = (raw_moments2 - mean**2) #* count/(count-self.ddof) if dtype.kind != mean.dtype.kind: # TODO: not sure why view does not work variance = variance.astype(dtype) return self.finish(variance) return [task_sum_moment, task_sum, task_count], finish(task_sum_moment, task_sum, task_count)
[docs]class AggregatorDescriptorSkew(AggregatorDescriptorMulti): def __init__(self, name, expression, short_name='skew', selection=None, edges=False): super(AggregatorDescriptorSkew, self).__init__(name, [expression], short_name, selection=selection, edges=edges) def add_tasks(self, df, binners, progress): progressbar = vaex.utils.progressbars(progress, title=repr(self)) expression = expression_sum = expression = df[str(self.expressions[0])] expression = expression_sum = expression.astype('float64') sum_moment1 = _sum_moment(str(expression_sum), 1, selection=self.selection, edges=self.edges) sum_moment2 = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges) sum_moment3 = _sum_moment(str(expression_sum), 3, selection=self.selection, edges=self.edges) count_ = count(str(expression), selection=self.selection, edges=self.edges) task_sum_moment1 = sum_moment1.add_tasks(df, binners, progress=progressbar)[0][0] task_sum_moment2 = sum_moment2.add_tasks(df, binners, progress=progressbar)[0][0] task_sum_moment3 = sum_moment3.add_tasks(df, binners, progress=progressbar)[0][0] task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0] @vaex.delayed def finish(sum_moment1, sum_moment2, sum_moment3, count): with np.errstate(divide='ignore', invalid='ignore'): m1 = sum_moment1 / count m2 = sum_moment2 / count m3 = sum_moment3 / count skew = (m3 - 3*m1*m2 + 2*m1**3) / (m2 - m1**2)**(3/2) return self.finish(skew) return [task_sum_moment1, task_sum_moment2, task_sum_moment3, task_count], finish(task_sum_moment1, task_sum_moment2, task_sum_moment3, task_count)
[docs]class AggregatorDescriptorKurtosis(AggregatorDescriptorMulti): def __init__(self, name, expression, short_name='kurtosis', selection=None, edges=False): super(AggregatorDescriptorKurtosis, self).__init__(name, [expression], short_name, selection=selection, edges=edges) def add_tasks(self, df, binners, progress): progressbar = vaex.utils.progressbars(progress, title=repr(self)) expression = expression_sum = expression = df[str(self.expressions[0])] expression = expression_sum = expression.astype('float64') sum_moment1 = _sum_moment(str(expression_sum), 1, selection=self.selection, edges=self.edges) sum_moment2 = _sum_moment(str(expression_sum), 2, selection=self.selection, edges=self.edges) sum_moment3 = _sum_moment(str(expression_sum), 3, selection=self.selection, edges=self.edges) sum_moment4 = _sum_moment(str(expression_sum), 4, selection=self.selection, edges=self.edges) count_ = count(str(expression), selection=self.selection, edges=self.edges) task_sum_moment1 = sum_moment1.add_tasks(df, binners, progress=progressbar)[0][0] task_sum_moment2 = sum_moment2.add_tasks(df, binners, progress=progressbar)[0][0] task_sum_moment3 = sum_moment3.add_tasks(df, binners, progress=progressbar)[0][0] task_sum_moment4 = sum_moment4.add_tasks(df, binners, progress=progressbar)[0][0] task_count = count_.add_tasks(df, binners, progress=progressbar)[0][0] @vaex.delayed def finish(sum_moment1, sum_moment2, sum_moment3, sum_moment4, count): with np.errstate(divide='ignore', invalid='ignore'): m1 = sum_moment1 / count m2 = sum_moment2 / count m3 = sum_moment3 / count m4 = sum_moment4 / count kurtosis = (m4 - 4*m1*m3 + 6*m1**2*m2 - 3*m1**4) / (m2 - m1**2)**2 -3.0 return self.finish(kurtosis) return [task_sum_moment1, task_sum_moment2, task_sum_moment3, task_sum_moment4, task_count], finish(task_sum_moment1, task_sum_moment2, task_sum_moment3, task_sum_moment4, task_count)
[docs]class AggregatorDescriptorStd(AggregatorDescriptorVar): def finish(self, value): return value**0.5
[docs]@register def count(expression='*', selection=None, edges=False): '''Creates a count aggregation''' return AggregatorDescriptorBasic('AggCount', [expression], 'count', selection=selection, edges=edges)
[docs]@register def sum(expression, selection=None, edges=False): '''Creates a sum aggregation''' return AggregatorDescriptorBasic('AggSum', [expression], 'sum', selection=selection, edges=edges)
[docs]@register def mean(expression, selection=None, edges=False): '''Creates a mean aggregation''' return AggregatorDescriptorMean('mean', [expression], 'mean', selection=selection, edges=edges)
[docs]@register def min(expression, selection=None, edges=False): '''Creates a min aggregation''' return AggregatorDescriptorBasic('AggMin', [expression], 'min', selection=selection, edges=edges)
@register def _sum_moment(expression, moment, selection=None, edges=False): '''Creates a sum of moment aggregator''' return AggregatorDescriptorBasic('AggSumMoment', [expression], '_sum_moment', agg_args=[moment], selection=selection, edges=edges)
[docs]@register def max(expression, selection=None, edges=False): '''Creates a max aggregation''' return AggregatorDescriptorBasic('AggMax', [expression], 'max', selection=selection, edges=edges)
[docs]@register def first(expression, order_expression=None, selection=None, edges=False): '''Creates a first aggregation. :param expression: {expression_one}. :param order_expression: Order the values in the bins by this expression. :param selection: {selection1} :param edges: {edges} ''' return AggregatorDescriptorBasic('AggFirst', [expression, order_expression] if order_expression is not None else [expression], 'first', multi_args=True, selection=selection, edges=edges, agg_args=[False])
[docs]@register @docsubst def last(expression, order_expression=None, selection=None, edges=False): '''Creates a first aggregation. :param expression: {expression_one}. :param order_expression: Order the values in the bins by this expression. :param selection: {selection1} :param edges: {edges} ''' return AggregatorDescriptorBasic('AggFirst', [expression, order_expression] if order_expression is not None else [expression], 'last', multi_args=True, selection=selection, edges=edges, agg_args=[True])
[docs]@register def std(expression, ddof=0, selection=None, edges=False): '''Creates a standard deviation aggregation''' return AggregatorDescriptorStd('std', [expression], 'std', ddof=ddof, selection=selection, edges=edges)
[docs]@register def var(expression, ddof=0, selection=None, edges=False): '''Creates a variance aggregation''' return AggregatorDescriptorVar('var', [expression], 'var', ddof=ddof, selection=selection, edges=edges)
[docs]@register def skew(expression, selection=None, edges=False): '''Create a skew aggregation.''' return AggregatorDescriptorSkew('skew', expression, 'skew', selection=selection, edges=edges)
[docs]@register def kurtosis(expression, selection=None, edges=False): '''Create a kurtosis aggregation.''' return AggregatorDescriptorKurtosis('kurtosis', expression, 'kurtosis', selection=selection, edges=edges)
[docs]@register @docsubst def nunique(expression, dropna=False, dropnan=False, dropmissing=False, selection=None, edges=False): """Aggregator that calculates the number of unique items per bin. :param expression: {expression_one} :param dropmissing: {dropmissing} :param dropnan: {dropnan} :param dropna: {dropna} :param selection: {selection1} """ if dropna: dropnan = True dropmissing = True return AggregatorDescriptorNUnique('AggNUnique', [expression], 'nunique', dropmissing, dropnan, selection=selection, edges=edges)
[docs]@docsubst def any(expression=None, selection=None): '''Aggregator that returns True when any of the values in the group are True, or when there is any data in the group that is valid (i.e. not missing values or np.nan). The aggregator returns False if there is no data in the group when the selection argument is used. :param expression: {expression_one} :param selection: {selection1} ''' if expression is None and selection is None: return count(selection=selection) > -1 # trivial else: if expression is None: return count(selection=selection) > 0 else: return sum(expression, selection=selection) > 0
[docs]@docsubst def all(expression=None, selection=None): '''Aggregator that returns True when all of the values in the group are True, or when all of the data in the group is valid (i.e. not missing values or np.nan). The aggregator returns False if there is no data in the group when the selection argument is used. :param expression: {expression_one} :param selection: {selection1} ''' if expression is None and selection is None: return count(selection=selection) > -1 # trivial else: if expression is None: # counting how often the selection is true == counting how many rows return sum(selection) == count(selection) else: if selection is None: # counting how often true == counting how much data there is return sum(expression) == count(expression) else: # since we cannot mix different selections: return sum(f'astype({expression}, "bool") & astype({selection}, "bool")') == count(expression)
[docs]@register @docsubst class list(AggregatorDescriptorBasic): '''Aggregator that returns a list of values belonging to the specified expression. :param expression: {expression_one} :param selection: {selection1} :param dropmissing: {dropmissing} :param dropnan: {dropnan} :param dropna: {dropna} :param edges: {edges} ''' def __init__(self, expression, selection=None, dropna=False, dropnan=False, dropmissing=False, edges=False): if dropna: dropnan = True dropmissing = True super(list, self).__init__("AggList", [expression], "list", selection=selection, edges=edges, agg_args=[dropnan, dropmissing]) def _prepare_types(self, df): super()._prepare_types(df) self.dtype_out = vaex.dtype(pa.large_list(self.dtype_out.arrow))
# TODO: we could maybe refactor this to support more struct aggregators @register class describe(AggregatorDescriptor): def __init__(self, expression): self.expression = expression self.expressions = [self.expression] self.short_name = "describe" self.edges = True def __repr__(self): return f"describe({self.expression!r})" def add_tasks(self, df, binners, progress): expression: Expression = df[str(self.expression)] col = expression._label if expression.data_type() != "string": aggs = { f"count": vaex.agg.count(expression, edges=self.edges), f"count_na": vaex.agg.count(edges=self.edges) - vaex.agg.count(expression, edges=self.edges), f"mean": vaex.agg.mean(expression, edges=self.edges), f"std": vaex.agg.std(expression, edges=self.edges), f"min": vaex.agg.min(expression, edges=self.edges), f"max": vaex.agg.max(expression, edges=self.edges), } else: aggs = {f"count": vaex.agg.count(expression, edges=self.edges), f"count_na": vaex.agg.count(edges=self.edges) - vaex.agg.count(expression, edges=self.edges)} progressbar = vaex.utils.progressbars(progress, title=repr(self)) tasks = [] results = [] names = [] for name, agg in aggs.items(): tasks1, result = agg.add_tasks(df, binners, progress=progressbar) tasks.extend(tasks1) results.append(result) names.append(name) @vaex.delayed def finish(*values): return self.finish(values, names) return tasks, finish(*results) def finish(self, values, names): if len(values): if vaex.array_types.is_scalar(values[0]): return pa.StructArray.from_arrays(arrays=[[k] for k in values], names=names) return pa.StructArray.from_arrays(arrays=values, names=names) # @register # def covar(x, y): # '''Creates a standard deviation aggregation''' # return _Statistic('covar', x, y) # @register # def correlation(x, y): # '''Creates a standard deviation aggregation''' # return _Statistic('correlation', x, y) @dask.base.normalize_token.register(AggregatorDescriptor) def normalize(agg): return agg.__class__.__name__, repr(agg)