Source code for vaex

"""
Vaex is a library for dealing with larger than memory DataFrames (out of core).

The most important class (datastructure) in vaex is the :class:`.DataFrame`. A DataFrame is obtained by either opening
the example dataset:

>>> import vaex
>>> df = vaex.example()

Or using :func:`open` to open a file.

>>> df1 = vaex.open("somedata.hdf5")
>>> df2 = vaex.open("somedata.fits")
>>> df2 = vaex.open("somedata.arrow")
>>> df4 = vaex.open("somedata.csv")

Or connecting to a remove server:

>>> df_remote = vaex.open("http://try.vaex.io/nyc_taxi_2015")


A few strong features of vaex are:

- Performance: works with huge tabular data, process over a billion (> 10\\ :sup:`9`\\ ) rows/second.
- Expression system / Virtual columns: compute on the fly, without wasting ram.
- Memory efficient: no memory copies when doing filtering/selections/subsets.
- Visualization: directly supported, a one-liner is often enough.
- User friendly API: you will only need to deal with a DataFrame object, and tab completion + docstring will help you out: `ds.mean<tab>`, feels very similar to Pandas.
- Very fast statistics on N dimensional grids such as histograms, running mean, heatmaps.


Follow the tutorial at https://docs.vaex.io/en/latest/tutorial.html to learn how to use vaex.

"""  # -*- coding: utf-8 -*-
import logging as root_logging
import os
from typing import Dict, List
from urllib.parse import urlparse, parse_qs

# first configure logging, which also imports vaex.settings
import vaex.logging
# import this to be explicit
import vaex.settings

import vaex.dataframe
import vaex.dataset
from vaex.docstrings import docsubst
from vaex.registry import register_function
from vaex import functions, struct
from . import stat
# import vaex.file
# import vaex.export
from .delayed import delayed
from .groupby import *
from . import agg
import vaex.datasets

# Re-export these so users can type hint with eg vaex.DataFrame
from vaex.dataframe import DataFrame as DataFrame
from vaex.expression import Expression as Expression

import vaex.progress

try:
    from sys import version_info
    if version_info[:2] >= (3, 10):
        from importlib.metadata import entry_points
    else:
        from importlib_metadata import entry_points
except ImportError:
    import pkg_resources
    entry_points = pkg_resources.iter_entry_points

try:
    from . import version
except:
    import sys
    print("version file not found, please run git/hooks/post-commit or git/hooks/post-checkout and/or install them as hooks (see git/README)", file=sys.stderr)
    raise

logger = root_logging.getLogger('vaex')
DEBUG_MODE = os.environ.get('VAEX_DEBUG', '')
__version__ = version.get_versions()


def app(*args, **kwargs):
    """Create a vaex app, the QApplication mainloop must be started.

    In ipython notebook/jupyter do the following:

    >>> import vaex.ui.main # this causes the qt api level to be set properly
    >>> import vaex

    Next cell:

    >>> %gui qt

    Next cell:

    >>> app = vaex.app()

    From now on, you can run the app along with jupyter

    """

    import vaex.ui.main
    return vaex.ui.main.VaexApp()


[docs]@docsubst def open(path, convert=False, progress=None, shuffle=False, fs_options={}, fs=None, *args, **kwargs): """Open a DataFrame from file given by path. Example: >>> df = vaex.open('sometable.hdf5') >>> df = vaex.open('somedata*.csv', convert='bigdata.hdf5') :param str or list path: local or absolute path to file, or glob string, or list of paths :param convert: Uses `dataframe.export` when convert is a path. If True, ``convert=path+'.hdf5'`` The conversion is skipped if the input file or conversion argument did not change. :param progress: (*Only applies when convert is not False*) {progress} :param bool shuffle: shuffle converted DataFrame or not :param dict fs_options: Extra arguments passed to an optional file system if needed. See below :param group: (optional) Specify the group to be read from and HDF5 file. By default this is set to "/table". :param fs: Apache Arrow FileSystem object, or FSSpec FileSystem object, if specified, fs_options should be empty. :param args: extra arguments for file readers that need it :param kwargs: extra keyword arguments :return: return a DataFrame on success, otherwise None :rtype: DataFrame Note: From version 4.14.0 `vaex.open()` will lazily read CSV files. If you prefer to read the entire CSV file into memory, use `vaex.from_csv()` or `vaex.from_csv_arrow()` instead. Cloud storage support: Vaex supports streaming of HDF5 files from Amazon AWS S3 and Google Cloud Storage. Files are by default cached in $HOME/.vaex/file-cache/(s3|gs) such that successive access is as fast as native disk access. Amazon AWS S3 options: The following common fs_options are used for S3 access: * `anon`: Use anonymous access or not (false by default). (Allowed values are: true,True,1,false,False,0) * `anonymous` - Alias for `anon` * `cache`: Use the disk cache or not, only set to false if the data should be accessed once. (Allowed values are: true,True,1,false,False,0) * `access_key` - AWS access key, if not provided will use the standard env vars, or the `~/.aws/credentials` file * `secret_key` - AWS secret key, similar to `access_key` * `profile` - If multiple profiles are present in `~/.aws/credentials`, pick this one instead of 'default', see https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html * `region` - AWS Region, e.g. 'us-east-1`, will be determined automatically if not provided. * `endpoint_override` - URL/ip to connect to, instead of AWS, e.g. 'localhost:9000' for minio All fs_options can also be encoded in the file path as a query string. Examples: >>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5', fs_options={{'anonymous': True}}) >>> df = vaex.open('s3://vaex/taxi/yellow_taxi_2015_f32s.hdf5?anon=true') >>> df = vaex.open('s3://mybucket/path/to/file.hdf5', fs_options={{'access_key': my_key, 'secret_key': my_secret_key}}) >>> df = vaex.open(f's3://mybucket/path/to/file.hdf5?access_key={{my_key}}&secret_key={{my_secret_key}}') >>> df = vaex.open('s3://mybucket/path/to/file.hdf5?profile=myproject') Google Cloud Storage options: The following fs_options are used for GCP access: * token: Authentication method for GCP. Use 'anon' for annonymous access. See https://gcsfs.readthedocs.io/en/latest/index.html#credentials for more details. * cache: Use the disk cache or not, only set to false if the data should be accessed once. (Allowed values are: true,True,1,false,False,0). * project and other arguments are passed to :py:class:`gcsfs.core.GCSFileSystem` Examples: >>> df = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5', fs_options={{'token': None}}) >>> df = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5?token=anon') >>> df = vaex.open('gs://vaex-data/testing/xys.hdf5?token=anon&cache=False') """ import vaex import vaex.convert import vaex.csv # need to import this to register for dask/fingerprinting try: if not isinstance(path, (list, tuple)): # remote and clusters only support single path, not a list path = vaex.file.stringyfy(path) if path in aliases: path = aliases[path] path = vaex.file.stringyfy(path) if path.startswith("http://") or path.startswith("ws://") or \ path.startswith("vaex+wss://") or path.startswith("wss://") or \ path.startswith("vaex+http://") or path.startswith("vaex+ws://"): server, name = path.rsplit("/", 1) url = urlparse(path) if '?' in name: name = name[:name.index('?')] extra_args = {key: values[0] for key, values in parse_qs(url.query).items()} if 'token' in extra_args: kwargs['token'] = extra_args['token'] if 'token_trusted' in extra_args: kwargs['token_trusted'] = extra_args['token_trusted'] client = vaex.connect(server, **kwargs) return client[name] if path.startswith("cluster"): import vaex.enterprise.distributed return vaex.enterprise.distributed.open(path, *args, **kwargs) import vaex.file import glob if isinstance(path, str): paths = [path] else: paths = path filenames = [] for path in paths: path = vaex.file.stringyfy(path) if path in aliases: path = aliases[path] path = vaex.file.stringyfy(path) naked_path, options = vaex.file.split_options(path) if glob.has_magic(naked_path): filenames.extend(list(sorted(vaex.file.glob(path, fs_options=fs_options, fs=fs)))) else: filenames.append(path) df = None if len(filenames) == 0: raise IOError(f'File pattern did not match anything {path}') filename_hdf5 = vaex.convert._convert_name(filenames, shuffle=shuffle) filename_hdf5_noshuffle = vaex.convert._convert_name(filenames, shuffle=False) if len(filenames) == 1: path = filenames[0] # # naked_path, _ = vaex.file.split_options(path, fs_options) _, ext, _ = vaex.file.split_ext(path) if convert: path_output = convert if isinstance(convert, str) else filename_hdf5 vaex.convert.convert( path_input=path, fs_options_input=fs_options, fs_input=fs, path_output=path_output, fs_options_output=fs_options, fs_output=fs, progress=progress, *args, **kwargs ) ds = vaex.dataset.open(path_output, fs_options=fs_options, fs=fs) else: ds = vaex.dataset.open(path, fs_options=fs_options, fs=fs, **kwargs) df = vaex.from_dataset(ds) if df is None: if os.path.exists(path): raise IOError('Could not open file: {}, did you install vaex-hdf5? Is the format supported?'.format(path)) elif len(filenames) > 1: if convert not in [True, False]: filename_hdf5 = convert else: filename_hdf5 = vaex.convert._convert_name(filenames, shuffle=shuffle) if os.path.exists(filename_hdf5) and convert: # also check mtime df = vaex.open(filename_hdf5) else: dfs = [] for filename in filenames: dfs.append(vaex.open(filename, fs_options=fs_options, fs=fs, convert=bool(convert), shuffle=shuffle, **kwargs)) df = vaex.concat(dfs) if convert: if shuffle: df = df.shuffle() df.export_hdf5(filename_hdf5, progress=progress) df = vaex.open(filename_hdf5) if df is None: raise IOError('Unknown error opening: {}'.format(path)) return df except: logger.exception("error opening %r" % path) raise
[docs]def open_many(filenames): """Open a list of filenames, and return a DataFrame with all DataFrames concatenated. The filenames can be of any format that is supported by :py:func:`vaex.open`, namely hdf5, arrow, parquet, csv, etc. :param list[str] filenames: list of filenames/paths :rtype: DataFrame """ dfs = [] for filename in filenames: filename = filename.strip() if filename and filename[0] != "#": dfs.append(open(filename)) return concat(dfs)
def from_samp(username=None, password=None): """Connect to a SAMP Hub and wait for a single table load event, disconnect, download the table and return the DataFrame. Useful if you want to send a single table from say TOPCAT to vaex in a python console or notebook. """ print("Waiting for SAMP message...") import vaex.samp t = vaex.samp.single_table(username=username, password=password) return from_astropy_table(t.to_table())
[docs]def from_astropy_table(table): """Create a vaex DataFrame from an Astropy Table.""" from vaex.astro.astropy_table import DatasetAstropyTable ds = DatasetAstropyTable(table=table) return vaex.dataframe.DataFrameLocal(ds)
[docs]def from_dict(data): """Create an in memory dataset from a dict with column names as keys and list/numpy-arrays as values Example >>> data = {'A':[1,2,3],'B':['a','b','c']} >>> vaex.from_dict(data) # A B 0 1 'a' 1 2 'b' 2 3 'c' :param data: A dict of {column:[value, value,...]} :rtype: DataFrame """ return vaex.from_arrays(**data)
[docs]def from_items(*items): """Create an in memory DataFrame from numpy arrays, in contrast to from_arrays this keeps the order of columns intact (for Python < 3.6). Example >>> import vaex, numpy as np >>> x = np.arange(5) >>> y = x ** 2 >>> vaex.from_items(('x', x), ('y', y)) # x y 0 0 0 1 1 1 2 2 4 3 3 9 4 4 16 :param items: list of [(name, numpy array), ...] :rtype: DataFrame """ return from_dict(dict(items))
[docs]def from_arrays(**arrays) -> vaex.dataframe.DataFrameLocal: """Create an in memory DataFrame from numpy arrays. Example >>> import vaex, numpy as np >>> x = np.arange(5) >>> y = x ** 2 >>> vaex.from_arrays(x=x, y=y) # x y 0 0 0 1 1 1 2 2 4 3 3 9 4 4 16 >>> some_dict = {'x': x, 'y': y} >>> vaex.from_arrays(**some_dict) # in case you have your columns in a dict # x y 0 0 0 1 1 1 2 2 4 3 3 9 4 4 16 :param arrays: keyword arguments with arrays :rtype: DataFrame """ import numpy as np import six dataset = vaex.dataset.DatasetArrays(arrays) return vaex.dataframe.DataFrameLocal(dataset)
[docs]def from_arrow_table(table) -> vaex.dataframe.DataFrame: """Creates a vaex DataFrame from an arrow Table. :param as_numpy: Will lazily cast columns to a NumPy ndarray. :rtype: DataFrame """ from vaex.arrow.dataset import from_table return from_dataset(from_table(table=table))
[docs]def from_arrow_dataset(arrow_dataset) -> vaex.dataframe.DataFrame: '''Create a DataFrame from an Apache Arrow dataset.''' import vaex.arrow.dataset return from_dataset(vaex.arrow.dataset.DatasetArrow(arrow_dataset))
[docs]def from_dataset(dataset: vaex.dataset.Dataset) -> vaex.dataframe.DataFrame: '''Create a Vaex DataFrame from a Vaex Dataset''' return vaex.dataframe.DataFrameLocal(dataset)
def from_scalars(**kwargs): """Similar to from_arrays, but convenient for a DataFrame of length 1. Example: >>> import vaex >>> df = vaex.from_scalars(x=1, y=2) :rtype: DataFrame """ import numpy as np return from_arrays(**{k: np.array([v]) for k, v in kwargs.items()})
[docs]def from_pandas(df, name="pandas", copy_index=False, index_name="index"): """Create an in memory DataFrame from a pandas DataFrame. :param: pandas.DataFrame df: Pandas DataFrame :param: name: unique for the DataFrame >>> import vaex, pandas as pd >>> df_pandas = pd.from_csv('test.csv') >>> df = vaex.from_pandas(df_pandas) :rtype: DataFrame """ import six import pandas as pd import numpy as np import pyarrow as pa columns = {} def add(name, column): values = column.values # the first test is to support (partially) pandas 0.23 if hasattr(pd.core.arrays, 'integer') and isinstance(values, pd.core.arrays.integer.IntegerArray): values = np.ma.array(values._data, mask=values._mask) elif hasattr(pd.core.arrays, 'StringArray') and isinstance(values, pd.core.arrays.StringArray): values = pa.array(values) elif hasattr(pd.core.arrays, 'FloatingArray') and isinstance(values, pd.core.arrays.FloatingArray): values = np.ma.array(values._data, mask=values._mask) try: columns[name] = vaex.dataset.to_supported_array(values) except Exception as e: print("could not convert column %s, error: %r, will try to convert it to string" % (name, e)) try: values = values.astype("S") columns[name] = vaex.dataset.to_supported_array(values) except Exception as e: print("Giving up column %s, error: %r" % (name, e)) for name in df.columns: add(str(name), df[name]) if copy_index: add(index_name, df.index) return from_dict(columns)
[docs]def from_ascii(path, seperator=None, names=True, skip_lines=0, skip_after=0, **kwargs): """ Create an in memory DataFrame from an ascii file (whitespace seperated by default). >>> ds = vx.from_ascii("table.asc") >>> ds = vx.from_ascii("table.csv", seperator=",", names=["x", "y", "z"]) :param path: file path :param seperator: value seperator, by default whitespace, use "," for comma seperated values. :param names: If True, the first line is used for the column names, otherwise provide a list of strings with names :param skip_lines: skip lines at the start of the file :param skip_after: skip lines at the end of the file :param kwargs: :rtype: DataFrame """ import vaex.ext.readcol as rc ds = vaex.dataframe.DataFrameLocal() if names not in [True, False]: namelist = names names = False else: namelist = None data = rc.readcol(path, fsep=seperator, asdict=namelist is None, names=names, skipline=skip_lines, skipafter=skip_after, **kwargs) if namelist: for name, array in zip(namelist, data.T): ds.add_column(name, array) else: for name, array in data.items(): ds.add_column(name, array) return ds
[docs]def from_json(path_or_buffer, orient=None, precise_float=False, lines=False, copy_index=False, **kwargs): """ A method to read a JSON file using pandas, and convert to a DataFrame directly. :param str path_or_buffer: a valid JSON string or file-like, default: None The string could be a URL. Valid URL schemes include http, ftp, s3, gcs, and file. For file URLs, a host is expected. For instance, a local file could be ``file://localhost/path/to/table.json`` :param str orient: Indication of expected JSON string format. Allowed values are ``split``, ``records``, ``index``, ``columns``, and ``values``. :param bool precise_float: Set to enable usage of higher precision (strtod) function when decoding string to double values. Default (False) is to use fast but less precise builtin functionality :param bool lines: Read the file as a json object per line. :rtype: DataFrame """ # Check for unsupported kwargs if kwargs.get('typ') == 'series': raise ValueError('`typ` must be set to `"frame"`.') if kwargs.get('numpy') == True: raise ValueError('`numpy` must be set to `False`.') if kwargs.get('chunksize') is not None: raise ValueError('`chunksize` must be `None`.') import pandas as pd return from_pandas(pd.read_json(path_or_buffer, orient=orient, precise_float=precise_float, lines=lines, **kwargs), copy_index=copy_index)
[docs]@docsubst def from_records(records : List[Dict], array_type="arrow", defaults={}) -> vaex.dataframe.DataFrame: '''Create a dataframe from a list of dict. .. warning:: This is for convenience only, for performance pass arrays to :func:`from_arrays` for instance. :param str array_type: {array_type} :param dict defaults: default values if a record has a missing entry ''' arrays = dict() for i, record in enumerate(records): for name, value in record.items(): if name not in arrays: # prepend None's arrays[name] = [defaults.get(name)] * i arrays[name].append(value) for name in arrays: if name not in record: # missing values get replaced arrays[name].append(defaults.get(name)) arrays = {k: vaex.array_types.convert(v, array_type) for k, v in arrays.items()} return vaex.from_dict(arrays)
@docsubst def from_csv_arrow(file, read_options=None, parse_options=None, convert_options=None, lazy=False, chunk_size="10MiB", newline_readahead="64kiB", schema_infer_fraction=0.01, fs_options={}, fs=None): """ Fast CSV reader using Apache Arrow. Support for lazy reading of CSV files (experimental). :param file: file path or file-like object :param read_options: PyArrow CSV read options, see https://arrow.apache.org/docs/python/generated/pyarrow.csv.ReadOptions.html :param parse_options: PyArrow CSV parse options, see https://arrow.apache.org/docs/python/generated/pyarrow.csv.ParseOptions.html :param convert_options: PyArrow CSV convert options, see https://arrow.apache.org/docs/python/generated/pyarrow.csv.ConvertOptions.html :param lazy: If True, the CSV file is lazily read, and the DataFrame is not stored in memory. :param chunk_size: The CSV is read in chunks of the specified size. Relevant only if lazy=True. :param newline_readahead: The size of the readahead buffer for newline detection. Relevant only if lazy=True. :param schema_infer_fraction: The fraction of the CSV file to read to infer the schema. Relevant only if lazy=True. :param fs_options: {fs_options} :param fs: {fs} :return: DataFrame """ import vaex.csv if lazy is True: ds = vaex.csv.DatasetCsvLazy(file, chunk_size=chunk_size, read_options=read_options, parse_options=parse_options, convert_options=convert_options, newline_readahead=newline_readahead, schema_infer_fraction=schema_infer_fraction, fs=fs, fs_options=fs_options) return vaex.from_dataset(ds) else: ds = vaex.csv.DatasetCsv(file, read_options=read_options, parse_options=parse_options, convert_options=convert_options, fs=fs, fs_options=fs_options) return vaex.from_dataset(ds)
[docs]@docsubst def from_csv(filename_or_buffer, copy_index=False, chunk_size=None, convert=False, fs_options={}, progress=None, fs=None, **kwargs): """ Load a CSV file as a DataFrame, and optionally convert to an HDF5 file. :param str or file filename_or_buffer: CSV file path or file-like :param bool copy_index: copy index when source is read via Pandas :param int chunk_size: if the CSV file is too big to fit in the memory this parameter can be used to read CSV file in chunks. For example: >>> import vaex >>> for i, df in enumerate(vaex.read_csv('taxi.csv', chunk_size=100_000)): >>> df = df[df.passenger_count < 6] >>> df.export_hdf5(f'taxi_{{i:02}}.hdf5') :param bool or str convert: convert files to an hdf5 file for optimization, can also be a path. The CSV file will be read in chunks: either using the provided chunk_size argument, or a default size. Each chunk will be saved as a separate hdf5 file, then all of them will be combined into one hdf5 file. So for a big CSV file you will need at least double of extra space on the disk. Default chunk_size for converting is 5 million rows, which corresponds to around 1Gb memory on an example of NYC Taxi dataset. :param progress: (*Only applies when convert is not False*) {progress} :param kwargs: extra keyword arguments, currently passed to Pandas read_csv function, but the implementation might change in future versions. :returns: DataFrame """ if not convert: return _read_csv_read(filename_or_buffer=filename_or_buffer, copy_index=copy_index, fs_options=fs_options, fs=fs, chunk_size=chunk_size, **kwargs) else: if chunk_size is None: # make it memory efficient by default chunk_size = 5_000_000 import vaex.convert path_output = convert if isinstance(convert, str) else vaex.convert._convert_name(filename_or_buffer) vaex.convert.convert_csv( path_input=filename_or_buffer, fs_options_input=fs_options, fs_input=fs, path_output=path_output, fs_options_output=fs_options, fs_output=fs, chunk_size=chunk_size, copy_index=copy_index, progress=progress, **kwargs ) return open(path_output, fs_options=fs_options, fs=fs)
def _read_csv_read(filename_or_buffer, copy_index, chunk_size, fs_options={}, fs=None, **kwargs): import pandas as pd if not chunk_size: with vaex.file.open(filename_or_buffer, fs_options=fs_options, fs=fs, for_arrow=True) as f: if "compression" not in kwargs: try: path = vaex.file.stringyfy(filename_or_buffer) except: path = None if path: parts = path.rsplit('.', 3) if len(parts) == 3: # we need to do infer here, because pandas does not look at the fileobj.name # to infer the compression extension_to_compression = {"gz": "gzip", "bz2": "bz2", "zip": "zip", "xz": "xz"} if parts[-1] in extension_to_compression: kwargs = {"compression": extension_to_compression[parts[-1]], **kwargs} full_df = pd.read_csv(f, **kwargs) return from_pandas(full_df, copy_index=copy_index) else: def iterator(): chunk_iterator = pd.read_csv(filename_or_buffer, chunksize=chunk_size, **kwargs) for chunk_df in chunk_iterator: yield from_pandas(chunk_df, copy_index=copy_index) return iterator() def read_csv(filepath_or_buffer, **kwargs): '''Alias to from_csv.''' return from_csv(filepath_or_buffer, **kwargs) aliases = vaex.settings.aliases def connect(url, **kwargs): """Connect to hostname supporting the vaex web api. :param str hostname: hostname or ip address of server :rtype: vaex.server.client.Client """ # dispatch to vaex.server package from vaex.server import connect return connect(url, **kwargs)
[docs]def example(): '''Result of an N-body simulation of the accretion of 33 satellite galaxies into a Milky Way dark matter halo. Data was greated by Helmi & de Zeeuw 2000. The data contains the position (x, y, z), velocitie (vx, vy, vz), the energy (E), the angular momentum (L, Lz) and iron content (FeH) of the particles. :rtype: DataFrame ''' return vaex.datasets.helmi_simulation_data()
# there are kept for backwards compatibility # TODO: remove in vaex v5? def set_log_level_debug(loggers=["vaex"]): """set log level to debug""" vaex.logging.set_log_level_debug(loggers) def set_log_level_info(loggers=["vaex"]): """set log level to info""" vaex.logging.set_log_level_info(loggers) def set_log_level_warning(loggers=["vaex"]): """set log level to warning""" vaex.logging.set_log_level_warning(loggers) def set_log_level_exception(loggers=["vaex"]): """set log level to exception/error""" vaex.logging.set_log_level_error(loggers) def set_log_level_off(): """Disabled logging""" vaex.logging.set_log_level_off() import_script = os.path.expanduser("~/.vaex/vaex_import.py") if os.path.exists(import_script): try: with open(import_script) as f: code = compile(f.read(), import_script, 'exec') exec(code) except: import traceback traceback.print_stack() def register_dataframe_accessor(name, cls=None, override=False): """Registers a new accessor for a dataframe See vaex.geo for an example. """ def wrapper(cls): old_value = getattr(vaex.dataframe.DataFrame, name, None) if old_value is not None and override is False: raise ValueError("DataFrame already has a property/accessor named %r (%r)" % (name, old_value) ) def get_accessor(self): if name in self.__dict__: return self.__dict__[name] else: self.__dict__[name] = cls(self) return self.__dict__[name] setattr(vaex.dataframe.DataFrame, name, property(get_accessor)) return cls if cls is None: return wrapper else: return wrapper(cls) for entry in entry_points(group='vaex.namespace'): logger.warning('(DEPRECATED, use vaex.dataframe.accessor) adding vaex namespace: ' + entry.name) try: add_namespace = entry.load() add_namespace() except Exception: logger.exception('issue loading ' + entry.name) _lazy_accessors_map = {} class _lazy_accessor(object): def __init__(self, name, scope, loader, lazy_accessors): """When adding an accessor geo.cone, scope=='geo', name='cone', scope may be falsy""" self.loader = loader self.name = name self.scope = scope self.lazy_accessors = lazy_accessors def __call__(self, obj): if self.name in obj.__dict__: return obj.__dict__[self.name] else: cls = self.loader() accessor = cls(obj) obj.__dict__[self.name] = accessor fullname = self.name if self.scope: fullname = self.scope + '.' + self.name if fullname in self.lazy_accessors: for name, scope, loader, lazy_accessors in self.lazy_accessors[fullname]: assert fullname == scope setattr(cls, name, property(_lazy_accessor(name, scope, loader, lazy_accessors))) return obj.__dict__[self.name] def _add_lazy_accessor(name, loader, target_class=vaex.dataframe.DataFrame): """Internal use see tests/internal/accessor_test.py for usage This enables us to have df.foo.bar accessors that lazily loads the modules. """ parts = name.split('.') if target_class not in _lazy_accessors_map: _lazy_accessors_map[target_class] = {} lazy_accessors = _lazy_accessors_map[target_class] if len(parts) == 1: setattr(target_class, parts[0], property(_lazy_accessor(name, None, loader, lazy_accessors))) else: scope = ".".join(parts[:-1]) if scope not in lazy_accessors: lazy_accessors[scope] = [] lazy_accessors[scope].append((parts[-1], scope, loader, lazy_accessors)) for entry in entry_points(group='vaex.dataframe.accessor'): logger.debug('adding vaex accessor: ' + entry.name) def loader(entry=entry): return entry.load() _add_lazy_accessor(entry.name, loader) for entry in entry_points(group='vaex.expression.accessor'): logger.debug('adding vaex expression accessor: ' + entry.name) def loader(entry=entry): return entry.load() _add_lazy_accessor(entry.name, loader, vaex.expression.Expression) for entry in entry_points(group='vaex.plugin'): try: module_name = entry.module except AttributeError: module_name = entry.module_name if module_name == 'vaex_arrow.opener': # if vaex_arrow package is installed, we ignore it continue logger.debug('adding vaex plugin: ' + entry.name) try: add_namespace = entry.load() add_namespace() except Exception: logger.exception('issue loading ' + entry.name)
[docs]def concat(dfs, resolver='flexible') -> vaex.dataframe.DataFrame: '''Concatenate a list of DataFrames. :param resolver: How to resolve schema conflicts, see :meth:`DataFrame.concat`. ''' df, *tail = dfs return df.concat(*tail, resolver=resolver)
[docs]def vrange(start, stop, step=1, dtype='f8'): """Creates a virtual column which is the equivalent of numpy.arange, but uses 0 memory :param int start: Start of interval. The interval includes this value. :param int stop: End of interval. The interval does not include this value, :param int step: Spacing between values. :dtype: The preferred dtype for the column. """ from .column import ColumnVirtualRange return ColumnVirtualRange(start, stop, step, dtype)
[docs]def vconstant(value, length, dtype=None, chunk_size=1024): """Creates a virtual column with constant values, which uses 0 memory. :param value: The value with which to fill the column :param length: The length of the column, i.e. the number of rows it should contain. :param dtype: The preferred dtype for the column. :param chunk_size: Could be used to optimize the performance (evaluation) of this column. """ from .column import ColumnVirtualConstant return ColumnVirtualConstant(value=value, length=length, dtype=dtype, chunk_size=chunk_size)
def string_column(strings): import pyarrow as pa return pa.array(strings) def dtype(type): '''Creates a Vaex DataType based on a NumPy or Arrow type''' return vaex.datatype.DataType(type) def dtype_of(ar) -> vaex.datatype.DataType: '''Creates a Vaex DataType from a NumPy or Arrow array''' if isinstance(ar, vaex.dataset.Column): return dtype(ar.dtype) elif vaex.array_types.is_arrow_array(ar): return dtype(ar.type) elif vaex.array_types.is_numpy_array(ar) or isinstance(ar, vaex.column.supported_column_types): return dtype(ar.dtype) else: raise TypeError(f'{ar} is not a an Arrow or NumPy array') class RowLimitException(ValueError): pass