filetimes.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. #!/usr/bin/env python3
  2. """
  3. Simple test of read and write times for columnar data formats:
  4. python filetimes.py <filepath> [pandas|dask [hdf5base [xcolumn [ycolumn] [categories...]]]]
  5. Test files may be generated starting from any file format supported by Pandas:
  6. python -c "import filetimes ; filetimes.base='<hdf5base>' ; filetimes.categories=['<cat1>','<cat2>']; filetimes.timed_write('<file>')"
  7. """
  8. from __future__ import print_function
  9. import time
  10. global_start = time.time()
  11. import os, os.path, sys, glob, argparse, resource, multiprocessing
  12. import pandas as pd
  13. import dask.dataframe as dd
  14. import numpy as np
  15. import datashader as ds
  16. import bcolz
  17. import feather
  18. import fastparquet as fp
  19. from datashader.utils import export_image
  20. from datashader import transfer_functions as tf
  21. from collections import OrderedDict as odict
  22. #from multiprocessing.pool import ThreadPool
  23. #dask.set_options(pool=ThreadPool(3)) # select a pecific number of threads
  24. from dask import distributed
  25. # Toggled by command-line arguments
  26. DEBUG = False
  27. DD_FORCE_LOAD = False
  28. DASK_CLIENT = None
  29. class Parameters(object):
  30. base,x,y='data','x','y'
  31. dftype='pandas'
  32. categories=[]
  33. chunksize=76668751
  34. cat_width=1 # Size of fixed-width string for representing categories
  35. columns=None
  36. cachesize=9e9
  37. parq_opts=dict(file_scheme='hive', has_nulls=False, write_index=False)
  38. n_workers=multiprocessing.cpu_count()
  39. p=Parameters()
  40. filetypes_storing_categories = {'parq'}
  41. class Kwargs(odict):
  42. """Used to distinguish between dictionary argument values, and
  43. keyword-arguments.
  44. """
  45. pass
  46. def benchmark(fn, args, filetype=None):
  47. """Benchmark when "fn" function gets called on "args" tuple.
  48. "args" may have a Kwargs instance at the end.
  49. If "filetype" is provided, it may be used to convert columns to
  50. categorical dtypes after reading (the "loading" is assumed).
  51. """
  52. posargs = list(args)
  53. kwargs = {}
  54. # Remove Kwargs instance at end of posargs list, if one exists
  55. if posargs and isinstance(posargs[-1], Kwargs):
  56. lastarg = posargs.pop()
  57. kwargs.update(lastarg)
  58. if DEBUG:
  59. printable_posargs = ', '.join([str(posarg.head()) if hasattr(posarg, 'head') else str(posarg) for posarg in posargs])
  60. printable_kwargs = ', '.join(['{}={}'.format(k, v) for k,v in kwargs.items()])
  61. print('DEBUG: {}({}{})'.format(fn.__name__, printable_posargs, ', '+printable_kwargs if printable_kwargs else '', flush=True))
  62. # Benchmark fn when run on posargs and kwargs
  63. start = time.time()
  64. res = fn(*posargs, **kwargs)
  65. # If we're loading data
  66. if filetype is not None:
  67. if filetype not in filetypes_storing_categories:
  68. opts=odict()
  69. if p.dftype == 'pandas':
  70. opts['copy']=False
  71. for c in p.categories:
  72. res[c]=res[c].astype('category',**opts)
  73. # Force loading (--cache=persist was provided)
  74. if p.dftype == 'dask' and DD_FORCE_LOAD:
  75. if DASK_CLIENT is not None:
  76. # 2017-04-28: This combination leads to a large drop in
  77. # aggregation performance (both --distributed and
  78. # --cache=persist were provided)
  79. res = DASK_CLIENT.persist(res)
  80. distributed.wait(res)
  81. else:
  82. if DEBUG:
  83. print("DEBUG: Force-loading Dask dataframe", flush=True)
  84. res = res.persist()
  85. end = time.time()
  86. return end-start, res
  87. read = odict([(f,odict()) for f in ["parq","snappy.parq","gz.parq","bcolz","feather","h5","csv"]])
  88. def read_csv_dask(filepath, usecols=None):
  89. # Pandas writes CSV files out as a single file
  90. if os.path.isfile(filepath):
  91. return dd.read_csv(filepath, usecols=usecols)
  92. # Dask may have written out CSV files in partitions
  93. filepath_expr = filepath.replace('.csv', '*.csv')
  94. return dd.read_csv(filepath_expr, usecols=usecols)
  95. read["csv"] ["dask"] = lambda filepath,p,filetype: benchmark(read_csv_dask, (filepath, Kwargs(usecols=p.columns)), filetype)
  96. read["h5"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.read_hdf, (filepath, p.base, Kwargs(chunksize=p.chunksize, columns=p.columns)), filetype)
  97. def read_feather_dask(filepath):
  98. df = feather.read_dataframe(filepath, columns=p.columns)
  99. return dd.from_pandas(df, npartitions=p.n_workers)
  100. read["feather"] ["dask"] = lambda filepath,p,filetype: benchmark(read_feather_dask, (filepath,), filetype)
  101. read["bcolz"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.from_bcolz, (filepath, Kwargs(chunksize=1000000)), filetype)
  102. read["parq"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.read_parquet, (filepath, Kwargs(index=False, columns=p.columns)), filetype)
  103. read["gz.parq"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.read_parquet, (filepath, Kwargs(index=False, columns=p.columns)), filetype)
  104. read["snappy.parq"] ["dask"] = lambda filepath,p,filetype: benchmark(dd.read_parquet, (filepath, Kwargs(index=False, columns=p.columns)), filetype)
  105. def read_csv_pandas(filepath, usecols=None):
  106. # Pandas writes CSV files out as a single file
  107. if os.path.isfile(filepath):
  108. return pd.read_csv(filepath, usecols=usecols)
  109. # Dask may have written out CSV files in partitions
  110. filepath_expr = filepath.replace('.csv', '*.csv')
  111. filepaths = glob.glob(filepath_expr)
  112. return pd.concat((pd.read_csv(f, usecols=usecols) for f in filepaths))
  113. read["csv"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_csv_pandas, (filepath, Kwargs(usecols=p.columns)), filetype)
  114. read["h5"] ["pandas"] = lambda filepath,p,filetype: benchmark(pd.read_hdf, (filepath, p.base, Kwargs(columns=p.columns)), filetype)
  115. read["feather"] ["pandas"] = lambda filepath,p,filetype: benchmark(feather.read_dataframe, (filepath,), filetype)
  116. def read_bcolz_pandas(filepath, chunksize=None):
  117. return bcolz.ctable(rootdir=filepath).todataframe(columns=p.columns)
  118. read["bcolz"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_bcolz_pandas, (filepath, Kwargs(chunksize=1000000)), filetype)
  119. def read_parq_pandas(filepath):
  120. return fp.ParquetFile(filepath).to_pandas()
  121. read["parq"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_parq_pandas, (filepath,), filetype)
  122. read["gz.parq"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_parq_pandas, (filepath,), filetype)
  123. read["snappy.parq"] ["pandas"] = lambda filepath,p,filetype: benchmark(read_parq_pandas, (filepath,), filetype)
  124. write = odict([(f,odict()) for f in ["parq","snappy.parq","gz.parq","bcolz","feather","h5","csv"]])
  125. write["csv"] ["dask"] = lambda df,filepath,p: benchmark(df.to_csv, (filepath.replace(".csv","*.csv"), Kwargs(index=False)))
  126. write["h5"] ["dask"] = lambda df,filepath,p: benchmark(df.to_hdf, (filepath, p.base))
  127. def write_bcolz_dask(filepath, df):
  128. return bcolz.ctable.fromdataframe(df.compute(), rootdir=filepath)
  129. write["bcolz"] ["dask"] = lambda df,filepath,p: benchmark(write_bcolz_dask, (filepath, df))
  130. def write_feather_dask(filepath, df):
  131. return feather.write_dataframe(df.compute(), filepath)
  132. write["feather"] ["dask"] = lambda df,filepath,p: benchmark(write_feather_dask, (filepath, df))
  133. write["parq"] ["dask"] = lambda df,filepath,p: benchmark(dd.to_parquet, (filepath, df)) # **p.parq_opts
  134. write["snappy.parq"] ["dask"] = lambda df,filepath,p: benchmark(dd.to_parquet, (filepath, df, Kwargs(compression='SNAPPY'))) ## **p.parq_opts
  135. write["gz.parq"] ["dask"] = lambda df,filepath,p: benchmark(dd.to_parquet, (filepath, df, Kwargs(compression='GZIP')))
  136. write["csv"] ["pandas"] = lambda df,filepath,p: benchmark(df.to_csv, (filepath, Kwargs(index=False)))
  137. write["h5"] ["pandas"] = lambda df,filepath,p: benchmark(df.to_hdf, (filepath, Kwargs(key=p.base, format='table')))
  138. write["bcolz"] ["pandas"] = lambda df,filepath,p: benchmark(bcolz.ctable.fromdataframe, (df, Kwargs(rootdir=filepath)))
  139. write["feather"] ["pandas"] = lambda df,filepath,p: benchmark(feather.write_dataframe, (df, filepath))
  140. write["parq"] ["pandas"] = lambda df,filepath,p: benchmark(fp.write, (filepath, df, Kwargs(**p.parq_opts)))
  141. write["gz.parq"] ["pandas"] = lambda df,filepath,p: benchmark(fp.write, (filepath, df, Kwargs(compression='GZIP', **p.parq_opts)))
  142. write["snappy.parq"] ["pandas"] = lambda df,filepath,p: benchmark(fp.write, (filepath, df, Kwargs(compression='SNAPPY', **p.parq_opts)))
  143. def timed_write(filepath,dftype,fsize='double',output_directory="times"):
  144. """Accepts any file with a dataframe readable by the given dataframe type, and writes it out as a variety of file types"""
  145. assert fsize in ('single', 'double')
  146. p.dftype = dftype # This function may get called from outside main()
  147. df,duration=timed_read(filepath,dftype)
  148. for ext in write.keys():
  149. directory,filename = os.path.split(filepath)
  150. basename, extension = os.path.splitext(filename)
  151. fname = output_directory+os.path.sep+basename+"."+ext
  152. if os.path.exists(fname):
  153. print("{:28} (keeping existing)".format(fname), flush=True)
  154. else:
  155. filetype=ext.split(".")[-1]
  156. if not filetype in filetypes_storing_categories:
  157. for c in p.categories:
  158. if filetype == 'parq' and df[c].dtype == 'object':
  159. df[c]=df[c].str.encode('utf8')
  160. else:
  161. df[c]=df[c].astype(str)
  162. # Convert doubles to floats when writing out datasets
  163. if fsize == 'single':
  164. for colname in df.columns:
  165. if df[colname].dtype == 'float64':
  166. df[colname] = df[colname].astype(np.float32)
  167. code = write[ext].get(dftype,None)
  168. if code is None:
  169. print("{:28} {:7} Operation not supported".format(fname,dftype), flush=True)
  170. else:
  171. duration, res = code(df,fname,p)
  172. print("{:28} {:7} {:05.2f}".format(fname,dftype,duration), flush=True)
  173. if not filetype in filetypes_storing_categories:
  174. for c in p.categories:
  175. df[c]=df[c].astype('category')
  176. def timed_read(filepath,dftype):
  177. basename, extension = os.path.splitext(filepath)
  178. extension = extension[1:]
  179. filetype=extension.split(".")[-1]
  180. code = read[extension].get(dftype,None)
  181. if code is None:
  182. return (None, -1)
  183. p.columns=[p.x]+[p.y]+p.categories
  184. duration, df = code(filepath,p,filetype)
  185. return df, duration
  186. CACHED_RANGES = (None, None)
  187. def timed_agg(df, filepath, plot_width=int(900), plot_height=int(900*7.0/12), cache_ranges=True):
  188. global CACHED_RANGES
  189. start = time.time()
  190. cvs = ds.Canvas(plot_width, plot_height, x_range=CACHED_RANGES[0], y_range=CACHED_RANGES[1])
  191. agg = cvs.points(df, p.x, p.y)
  192. end = time.time()
  193. if cache_ranges:
  194. CACHED_RANGES = (cvs.x_range, cvs.y_range)
  195. img = export_image(tf.shade(agg),filepath,export_path=".")
  196. return img, end-start
  197. def get_size(path):
  198. total = 0
  199. # CSV files are broken up by dask when they're written out
  200. if os.path.isfile(path):
  201. return os.path.getsize(path)
  202. elif path.endswith('csv'):
  203. for csv_fpath in glob.glob(path.replace('.csv', '*.csv')):
  204. total += os.path.getsize(csv_fpath)
  205. return total
  206. # If path is a directory (such as parquet), sum all files in directory
  207. for dirpath, dirnames, filenames in os.walk(path):
  208. for f in filenames:
  209. fp = os.path.join(dirpath, f)
  210. total += os.path.getsize(fp)
  211. return total
  212. def get_proc_mem():
  213. return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1e6
  214. def main(argv):
  215. global DEBUG, DD_FORCE_LOAD, DASK_CLIENT
  216. parser = argparse.ArgumentParser(epilog=__doc__, formatter_class=argparse.RawTextHelpFormatter)
  217. parser.add_argument('filepath')
  218. parser.add_argument('dftype')
  219. parser.add_argument('base')
  220. parser.add_argument('x')
  221. parser.add_argument('y')
  222. parser.add_argument('categories', nargs='+')
  223. parser.add_argument('--debug', action='store_true', help='Enable increased verbosity and DEBUG messages')
  224. parser.add_argument('--cache', choices=('persist', 'cachey'), default=None, help='Enable caching: "persist" causes Dask dataframes to force loading into memory; "cachey" uses dask.cache.Cache with a cachesize of {}. Caching is disabled by default'.format(int(p.cachesize)))
  225. parser.add_argument('--distributed', action='store_true', help='Enable the distributed scheduler instead of the threaded, which is the default.')
  226. parser.add_argument('--recalc-ranges', action='store_true', help='Tell datashader to recalculate the ranges on each aggregation, instead of caching them (by default).')
  227. args = parser.parse_args(argv[1:])
  228. if args.cache is None:
  229. if args.debug:
  230. print("DEBUG: Cache disabled", flush=True)
  231. else:
  232. if args.cache == 'cachey':
  233. from dask.cache import Cache
  234. cache = Cache(p.cachesize)
  235. cache.register()
  236. elif args.cache == 'persist':
  237. DD_FORCE_LOAD = True
  238. if args.debug:
  239. print('DEBUG: Cache "{}" mode enabled'.format(args.cache), flush=True)
  240. if args.dftype == 'dask' and args.distributed:
  241. local_cluster = distributed.LocalCluster(n_workers=p.n_workers, threads_per_worker=1)
  242. DASK_CLIENT = distributed.Client(local_cluster)
  243. if args.debug:
  244. print('DEBUG: "distributed" scheduler is enabled')
  245. else:
  246. if args.dftype != 'dask' and args.distributed:
  247. raise ValueError('--distributed argument is only available with the dask dataframe type (not pandas)')
  248. if args.debug:
  249. print('DEBUG: "threaded" scheduler is enabled')
  250. filepath = args.filepath
  251. basename, extension = os.path.splitext(filepath)
  252. p.dftype = args.dftype
  253. p.base = args.base
  254. p.x = args.x
  255. p.y = args.y
  256. p.categories = args.categories
  257. DEBUG = args.debug
  258. if DEBUG:
  259. print('DEBUG: Memory usage (before read):\t{} MB'.format(get_proc_mem(), flush=True))
  260. df,loadtime = timed_read(filepath, p.dftype)
  261. if df is None:
  262. if loadtime == -1:
  263. print("{:28} {:6} Operation not supported".format(filepath, p.dftype), flush=True)
  264. return 1
  265. if DEBUG:
  266. print('DEBUG: Memory usage (after read):\t{} MB'.format(get_proc_mem(), flush=True))
  267. img,aggtime1 = timed_agg(df,filepath,5,5,cache_ranges=(not args.recalc_ranges))
  268. if DEBUG:
  269. mem_usage = df.memory_usage(deep=True)
  270. if p.dftype == 'dask':
  271. mem_usage = mem_usage.compute()
  272. print('DEBUG:', mem_usage, flush=True)
  273. mem_usage_total = mem_usage.sum()
  274. print('DEBUG: DataFrame size:\t\t\t{} MB'.format(mem_usage_total / 1e6, flush=True))
  275. for colname in df.columns:
  276. print('DEBUG: column "{}" dtype: {}'.format(colname, df[colname].dtype))
  277. print('DEBUG: Memory usage (after agg1):\t{} MB'.format(get_proc_mem(), flush=True))
  278. img,aggtime2 = timed_agg(df,filepath,cache_ranges=(not args.recalc_ranges))
  279. if DEBUG:
  280. print('DEBUG: Memory usage (after agg2):\t{} MB'.format(get_proc_mem(), flush=True))
  281. in_size = get_size(filepath)
  282. out_size = get_size(filepath+".png")
  283. global_end = time.time()
  284. print("{:28} {:6} Aggregate1:{:06.2f} ({:06.2f}+{:06.2f}) Aggregate2:{:06.2f} In:{:011d} Out:{:011d} Total:{:06.2f}"\
  285. .format(filepath, p.dftype, loadtime+aggtime1, loadtime, aggtime1, aggtime2, in_size, out_size, global_end-global_start), flush=True)
  286. return 0
  287. if __name__ == '__main__':
  288. sys.exit(main(sys.argv))