123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358 |
- #!/usr/bin/env python3
- """
- Simple test of read and write times for columnar data formats:
- python filetimes.py <filepath> [pandas|dask [hdf5base [xcolumn [ycolumn] [categories...]]]]
- Test files may be generated starting from any file format supported by Pandas:
- python -c "import filetimes ; filetimes.base='<hdf5base>' ; filetimes.categories=['<cat1>','<cat2>']; filetimes.timed_write('<file>')"
- """
- from __future__ import print_function
- import time
- global_start = time.time()
- import os, os.path, sys, glob, argparse, resource, multiprocessing
- import pandas as pd
- import dask.dataframe as dd
- import numpy as np
- import datashader as ds
- import bcolz
- import feather
- import fastparquet as fp
- from datashader.utils import export_image
- from datashader import transfer_functions as tf
- from collections import OrderedDict as odict
- #from multiprocessing.pool import ThreadPool
- #dask.set_options(pool=ThreadPool(3)) # select a pecific number of threads
- from dask import distributed
- # Toggled by command-line arguments
- DEBUG = False
- DD_FORCE_LOAD = False
- DASK_CLIENT = None
- class Parameters(object):
- base,x,y='data','x','y'
- dftype='pandas'
- categories=[]
- chunksize=76668751
- cat_width=1 # Size of fixed-width string for representing categories
- columns=None
- cachesize=9e9
- parq_opts=dict(file_scheme='hive', has_nulls=False, write_index=False)
- n_workers=multiprocessing.cpu_count()
- p=Parameters()
- filetypes_storing_categories = {'parq'}
- class Kwargs(odict):
- """Used to distinguish between dictionary argument values, and
- keyword-arguments.
- """
- pass
- def benchmark(fn, args, filetype=None):
- """Benchmark when "fn" function gets called on "args" tuple.
- "args" may have a Kwargs instance at the end.
- If "filetype" is provided, it may be used to convert columns to
- categorical dtypes after reading (the "loading" is assumed).
- """
- posargs = list(args)
- kwargs = {}
- # Remove Kwargs instance at end of posargs list, if one exists
- if posargs and isinstance(posargs[-1], Kwargs):
- lastarg = posargs.pop()
- kwargs.update(lastarg)
- if DEBUG:
- printable_posargs = ', '.join([str(posarg.head()) if hasattr(posarg, 'head') else str(posarg) for posarg in posargs])
- printable_kwargs = ', '.join(['{}={}'.format(k, v) for k,v in kwargs.items()])
- print('DEBUG: {}({}{})'.format(fn.__name__, printable_posargs, ', '+printable_kwargs if printable_kwargs else '', flush=True))
- # Benchmark fn when run on posargs and kwargs
- start = time.time()
- res = fn(*posargs, **kwargs)
- # If we're loading data
- if filetype is not None:
- if filetype not in filetypes_storing_categories:
- opts=odict()
- if p.dftype == 'pandas':
- opts['copy']=False
- for c in p.categories:
- res[c]=res[c].astype('category',**opts)
- # Force loading (--cache=persist was provided)
- if p.dftype == 'dask' and DD_FORCE_LOAD:
- if DASK_CLIENT is not None:
- # 2017-04-28: This combination leads to a large drop in
- # aggregation performance (both --distributed and
- # --cache=persist were provided)
- res = DASK_CLIENT.persist(res)
- distributed.wait(res)
- else:
- if DEBUG:
- print("DEBUG: Force-loading Dask dataframe", flush=True)
- res = res.persist()
- end = time.time()
- return end-start, res
-
- read = odict([(f,odict()) for f in ["parq","snappy.parq","gz.parq","bcolz","feather","h5","csv"]])
- def read_csv_dask(filepath, usecols=None):
- # Pandas writes CSV files out as a single file
- if os.path.isfile(filepath):
- return dd.read_csv(filepath, usecols=usecols)
- # Dask may have written out CSV files in partitions
- filepath_expr = filepath.replace('.csv', '*.csv')
- return dd.read_csv(filepath_expr, usecols=usecols)
- read["csv"] ["dask"] = lambda filepath,p,filetype: benchmark(read_csv_dask, (filepath, Kwargs(usecols=p.columns)), filetype)
- read["h5"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.read_hdf, (filepath, p.base, Kwargs(chunksize=p.chunksize, columns=p.columns)), filetype)
- def read_feather_dask(filepath):
- df = feather.read_dataframe(filepath, columns=p.columns)
- return dd.from_pandas(df, npartitions=p.n_workers)
- read["feather"] ["dask"] = lambda filepath,p,filetype: benchmark(read_feather_dask, (filepath,), filetype)
- read["bcolz"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.from_bcolz, (filepath, Kwargs(chunksize=1000000)), filetype)
- read["parq"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.read_parquet, (filepath, Kwargs(index=False, columns=p.columns)), filetype)
- read["gz.parq"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.read_parquet, (filepath, Kwargs(index=False, columns=p.columns)), filetype)
- read["snappy.parq"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.read_parquet, (filepath, Kwargs(index=False, columns=p.columns)), filetype)
- def read_csv_pandas(filepath, usecols=None):
- # Pandas writes CSV files out as a single file
- if os.path.isfile(filepath):
- return pd.read_csv(filepath, usecols=usecols)
- # Dask may have written out CSV files in partitions
- filepath_expr = filepath.replace('.csv', '*.csv')
- filepaths = glob.glob(filepath_expr)
- return pd.concat((pd.read_csv(f, usecols=usecols) for f in filepaths))
- read["csv"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_csv_pandas, (filepath, Kwargs(usecols=p.columns)), filetype)
- read["h5"] ["pandas"] = lambda filepath,p,filetype: benchmark(pd.read_hdf, (filepath, p.base, Kwargs(columns=p.columns)), filetype)
- read["feather"] ["pandas"] = lambda filepath,p,filetype: benchmark(feather.read_dataframe, (filepath,), filetype)
- def read_bcolz_pandas(filepath, chunksize=None):
- return bcolz.ctable(rootdir=filepath).todataframe(columns=p.columns)
- read["bcolz"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_bcolz_pandas, (filepath, Kwargs(chunksize=1000000)), filetype)
- def read_parq_pandas(filepath):
- return fp.ParquetFile(filepath).to_pandas()
- read["parq"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_parq_pandas, (filepath,), filetype)
- read["gz.parq"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_parq_pandas, (filepath,), filetype)
- read["snappy.parq"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_parq_pandas, (filepath,), filetype)
- write = odict([(f,odict()) for f in ["parq","snappy.parq","gz.parq","bcolz","feather","h5","csv"]])
- write["csv"] ["dask"] = lambda df,filepath,p: benchmark(df.to_csv, (filepath.replace(".csv","*.csv"), Kwargs(index=False)))
- write["h5"] ["dask"] = lambda df,filepath,p: benchmark(df.to_hdf, (filepath, p.base))
- def write_bcolz_dask(filepath, df):
- return bcolz.ctable.fromdataframe(df.compute(), rootdir=filepath)
- write["bcolz"] ["dask"] = lambda df,filepath,p: benchmark(write_bcolz_dask, (filepath, df))
- def write_feather_dask(filepath, df):
- return feather.write_dataframe(df.compute(), filepath)
- write["feather"] ["dask"] = lambda df,filepath,p: benchmark(write_feather_dask, (filepath, df))
- write["parq"] ["dask"] = lambda df,filepath,p: benchmark(dd.to_parquet, (filepath, df)) # **p.parq_opts
- write["snappy.parq"] ["dask"] = lambda df,filepath,p: benchmark(dd.to_parquet, (filepath, df, Kwargs(compression='SNAPPY'))) ## **p.parq_opts
- write["gz.parq"] ["dask"] = lambda df,filepath,p: benchmark(dd.to_parquet, (filepath, df, Kwargs(compression='GZIP')))
- write["csv"] ["pandas"] = lambda df,filepath,p: benchmark(df.to_csv, (filepath, Kwargs(index=False)))
- write["h5"] ["pandas"] = lambda df,filepath,p: benchmark(df.to_hdf, (filepath, Kwargs(key=p.base, format='table')))
- write["bcolz"] ["pandas"] = lambda df,filepath,p: benchmark(bcolz.ctable.fromdataframe, (df, Kwargs(rootdir=filepath)))
- write["feather"] ["pandas"] = lambda df,filepath,p: benchmark(feather.write_dataframe, (df, filepath))
- write["parq"] ["pandas"] = lambda df,filepath,p: benchmark(fp.write, (filepath, df, Kwargs(**p.parq_opts)))
- write["gz.parq"] ["pandas"] = lambda df,filepath,p: benchmark(fp.write, (filepath, df, Kwargs(compression='GZIP', **p.parq_opts)))
- write["snappy.parq"] ["pandas"] = lambda df,filepath,p: benchmark(fp.write, (filepath, df, Kwargs(compression='SNAPPY', **p.parq_opts)))
- def timed_write(filepath,dftype,fsize='double',output_directory="times"):
- """Accepts any file with a dataframe readable by the given dataframe type, and writes it out as a variety of file types"""
- assert fsize in ('single', 'double')
- p.dftype = dftype # This function may get called from outside main()
- df,duration=timed_read(filepath,dftype)
- for ext in write.keys():
- directory,filename = os.path.split(filepath)
- basename, extension = os.path.splitext(filename)
- fname = output_directory+os.path.sep+basename+"."+ext
- if os.path.exists(fname):
- print("{:28} (keeping existing)".format(fname), flush=True)
- else:
- filetype=ext.split(".")[-1]
- if not filetype in filetypes_storing_categories:
- for c in p.categories:
- if filetype == 'parq' and df[c].dtype == 'object':
- df[c]=df[c].str.encode('utf8')
- else:
- df[c]=df[c].astype(str)
- # Convert doubles to floats when writing out datasets
- if fsize == 'single':
- for colname in df.columns:
- if df[colname].dtype == 'float64':
- df[colname] = df[colname].astype(np.float32)
- code = write[ext].get(dftype,None)
- if code is None:
- print("{:28} {:7} Operation not supported".format(fname,dftype), flush=True)
- else:
- duration, res = code(df,fname,p)
- print("{:28} {:7} {:05.2f}".format(fname,dftype,duration), flush=True)
- if not filetype in filetypes_storing_categories:
- for c in p.categories:
- df[c]=df[c].astype('category')
-
- def timed_read(filepath,dftype):
- basename, extension = os.path.splitext(filepath)
- extension = extension[1:]
- filetype=extension.split(".")[-1]
- code = read[extension].get(dftype,None)
- if code is None:
- return (None, -1)
- p.columns=[p.x]+[p.y]+p.categories
-
- duration, df = code(filepath,p,filetype)
-
- return df, duration
- CACHED_RANGES = (None, None)
- def timed_agg(df, filepath, plot_width=int(900), plot_height=int(900*7.0/12), cache_ranges=True):
- global CACHED_RANGES
- start = time.time()
- cvs = ds.Canvas(plot_width, plot_height, x_range=CACHED_RANGES[0], y_range=CACHED_RANGES[1])
- agg = cvs.points(df, p.x, p.y)
- end = time.time()
- if cache_ranges:
- CACHED_RANGES = (cvs.x_range, cvs.y_range)
- img = export_image(tf.shade(agg),filepath,export_path=".")
- return img, end-start
- def get_size(path):
- total = 0
- # CSV files are broken up by dask when they're written out
- if os.path.isfile(path):
- return os.path.getsize(path)
- elif path.endswith('csv'):
- for csv_fpath in glob.glob(path.replace('.csv', '*.csv')):
- total += os.path.getsize(csv_fpath)
- return total
- # If path is a directory (such as parquet), sum all files in directory
- for dirpath, dirnames, filenames in os.walk(path):
- for f in filenames:
- fp = os.path.join(dirpath, f)
- total += os.path.getsize(fp)
- return total
- def get_proc_mem():
- return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1e6
- def main(argv):
- global DEBUG, DD_FORCE_LOAD, DASK_CLIENT
- parser = argparse.ArgumentParser(epilog=__doc__, formatter_class=argparse.RawTextHelpFormatter)
- parser.add_argument('filepath')
- parser.add_argument('dftype')
- parser.add_argument('base')
- parser.add_argument('x')
- parser.add_argument('y')
- parser.add_argument('categories', nargs='+')
- parser.add_argument('--debug', action='store_true', help='Enable increased verbosity and DEBUG messages')
- parser.add_argument('--cache', choices=('persist', 'cachey'), default=None, help='Enable caching: "persist" causes Dask dataframes to force loading into memory; "cachey" uses dask.cache.Cache with a cachesize of {}. Caching is disabled by default'.format(int(p.cachesize)))
- parser.add_argument('--distributed', action='store_true', help='Enable the distributed scheduler instead of the threaded, which is the default.')
- parser.add_argument('--recalc-ranges', action='store_true', help='Tell datashader to recalculate the ranges on each aggregation, instead of caching them (by default).')
- args = parser.parse_args(argv[1:])
- if args.cache is None:
- if args.debug:
- print("DEBUG: Cache disabled", flush=True)
- else:
- if args.cache == 'cachey':
- from dask.cache import Cache
- cache = Cache(p.cachesize)
- cache.register()
- elif args.cache == 'persist':
- DD_FORCE_LOAD = True
- if args.debug:
- print('DEBUG: Cache "{}" mode enabled'.format(args.cache), flush=True)
- if args.dftype == 'dask' and args.distributed:
- local_cluster = distributed.LocalCluster(n_workers=p.n_workers, threads_per_worker=1)
- DASK_CLIENT = distributed.Client(local_cluster)
- if args.debug:
- print('DEBUG: "distributed" scheduler is enabled')
- else:
- if args.dftype != 'dask' and args.distributed:
- raise ValueError('--distributed argument is only available with the dask dataframe type (not pandas)')
- if args.debug:
- print('DEBUG: "threaded" scheduler is enabled')
- filepath = args.filepath
- basename, extension = os.path.splitext(filepath)
- p.dftype = args.dftype
- p.base = args.base
- p.x = args.x
- p.y = args.y
- p.categories = args.categories
- DEBUG = args.debug
- if DEBUG:
- print('DEBUG: Memory usage (before read):\t{} MB'.format(get_proc_mem(), flush=True))
- df,loadtime = timed_read(filepath, p.dftype)
- if df is None:
- if loadtime == -1:
- print("{:28} {:6} Operation not supported".format(filepath, p.dftype), flush=True)
- return 1
- if DEBUG:
- print('DEBUG: Memory usage (after read):\t{} MB'.format(get_proc_mem(), flush=True))
- img,aggtime1 = timed_agg(df,filepath,5,5,cache_ranges=(not args.recalc_ranges))
- if DEBUG:
- mem_usage = df.memory_usage(deep=True)
- if p.dftype == 'dask':
- mem_usage = mem_usage.compute()
- print('DEBUG:', mem_usage, flush=True)
- mem_usage_total = mem_usage.sum()
- print('DEBUG: DataFrame size:\t\t\t{} MB'.format(mem_usage_total / 1e6, flush=True))
- for colname in df.columns:
- print('DEBUG: column "{}" dtype: {}'.format(colname, df[colname].dtype))
- print('DEBUG: Memory usage (after agg1):\t{} MB'.format(get_proc_mem(), flush=True))
- img,aggtime2 = timed_agg(df,filepath,cache_ranges=(not args.recalc_ranges))
- if DEBUG:
- print('DEBUG: Memory usage (after agg2):\t{} MB'.format(get_proc_mem(), flush=True))
-
- in_size = get_size(filepath)
- out_size = get_size(filepath+".png")
-
- global_end = time.time()
- print("{:28} {:6} Aggregate1:{:06.2f} ({:06.2f}+{:06.2f}) Aggregate2:{:06.2f} In:{:011d} Out:{:011d} Total:{:06.2f}"\
- .format(filepath, p.dftype, loadtime+aggtime1, loadtime, aggtime1, aggtime2, in_size, out_size, global_end-global_start), flush=True)
- return 0
- if __name__ == '__main__':
- sys.exit(main(sys.argv))
|