NewFileProcessing.rst 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. Documentation about the new file work.
  2. YAML files. The following are the YAML definitions which are used to serialize file information from dali/external store to the engines and if necessary to the worker nodes.
  3. Storage planes
  4. ==============
  5. This is already covered in the deployed helm charts. It has been extended and rationalized slightly.
  6. storage:
  7. hostGroups:
  8. - name: <required>
  9. hosts: [ .... ]
  10. - name: <required>
  11. hostGroup: <name>
  12. count: <unsigned:#hosts> # how many hosts within the host group are used ?(default is number of hosts)
  13. offset: <unsigned:0> # index of first host included in the derived group
  14. delta: <unsigned:0> # first host within the range[offset..offset+count-1] in the derived group
  15. planes:
  16. name: <required>
  17. prefix: <path> # Root directory for accessing the plane (if pvc defined), or url to access plane.
  18. numDevices: 1 # number of devices that are part of the plane
  19. hostGroup: <name> # Name of the host group for bare metal
  20. hosts: [ host-names ] # A list of host names for bare metal
  21. secret: <secret-id> # what secret is required to access the files.
  22. options: # not sure if it is needed
  23. Changes:
  24. * The replication information has been removed from the storage plane. It will now be specified on the thor instance indicating where (if anywhere) files are replicated.
  25. * The hash character (#) in a prefix or a secret name will be substituted with the device number. This replaces the old includeDeviceInPath property. This allows more flexible device substition for both local mounts and storage accounts. The number of hashes provides the default padding for the device number. (Existing Helm charts will need to be updated to follow these new rules.)
  26. * Neither thor or roxie replication is special cased. They are represented as multiple locations that the file lives (see examples below). Existing baremetal environments would be mapped to this new representation with implicit replication planes. (It is worth checking the mapping to roxie is fine.)
  27. Files
  28. =====
  29. file:
  30. - name: <logical-file-name>
  31. format: <type> # e.g. flat, csv, xml, key, parquet
  32. meta: <binary> # (opt) format of the file, (serialized hpcc field information).
  33. metaCrc: <unsigned> # hash of the meta
  34. numParts # How many file parts.
  35. singlePartNoSuffix: <boolean> # Does a single part file include .part_1_of_1?
  36. numRows: # total number of rows in the file (if known)
  37. rawSize: # total uncompressed size
  38. diskSize # is this useful? when binary copying?
  39. planes: [] # list of storage planes that the file is stored on.
  40. tlk: # ???Should the tlk be stored in the meta and returned?
  41. splitType: <split-format> # Are there associated split points, and if so what format? (And if so what variant?)
  42. #options relating to the format of the input file:
  43. grouped: <boolean> # is the file grouped?
  44. compressed: <boolean>
  45. blockCompressed: <boolean>
  46. formatOptions: # Any options that relate to the file format e.g. csvTerminator. These are nested because they can be completely free format
  47. recordSize: # if a fixed size record. Not really sure it is useful
  48. part: # optional information about each of the file parts (Cannot implement virtual file position without this)
  49. - numRows: <count> # number of rows in the file part
  50. rawSize: <size> # uncompressed size of the file part
  51. diskSize: <size> # size of the part on disk
  52. # extra fields that are used to return information from the file lookup service
  53. missing: <boolean> # true if the file could not be found
  54. external: <boolean> # filename of the form external:: or plane::
  55. If the information needs to be signed to be passed to dafilesrv for example, the entire structure of (storage, files) is serialized, and compressed, and that then signed.
  56. Functions
  57. =========
  58. Logically executed on the engine, and retrived from dali or in future versions from an esp service (even if for remote reads).
  59. GetFileInfomation(<logical-filename>, <options>)
  60. The logical-filename can be any logical name - including a super file, or an implicit superfile.
  61. options include:
  62. * Are compressed sizes needed?
  63. * Are signatures required?
  64. * Is virtual fileposition (non-local) required?
  65. * name of the user
  66. This returns a structure that provides information about a list of files
  67. meta:
  68. hostGroups:
  69. storage:
  70. files:
  71. secrets:
  72. #The secret names are known, how do we know which keys are required for those secrets?
  73. Some key questions:
  74. * Should the TLK be in the dali meta information? [Possibly, but not in first phase. ]
  75. * Should the split points be in the dali meta information? [Probably not, but the meta should indicate whether they exist, and if so what format they are. ]
  76. * Super files (implicit or explicit) can contain the same file information more than once. Should it be duplicated, or have a flag to indicate a repeat. [I suspect this is fairly uncommon, so duplication would be fine for the first version.]
  77. * What storage plane information is serialized back? [ all is simplest. Can optimize later. ]
  78. NOTE: This doesn't address the question of writing to a disk file...
  79. ----------------------------------------------------
  80. Local class for interpreting the results. Logically executed on the manager, and may gather extra information that will be serialized to all workers. The aim is that the same class implementations are used by all the engines (and fileview in esp).
  81. MasterFileCollection : RemoteFileCollection : FileCollection(eclReadOptions, eclFormatOptions, wuid, user, expectedMeta, projectedMeta);
  82. MasterFileCollection //Master has access to dali
  83. RemoteFileCollection : has access to remote esp // think some more
  84. FileCollection::GatherFileInformation(<logical-filename>, gatherOptions);
  85. - potentially called once per query.
  86. - class is responsible for optimizing case where it matches the previous call (e.g. in a child query).
  87. - possibly responsible for retrieving the split points ()
  88. Following options are used to control whether split points are retrieved when file information is gathered
  89. * number of channels reading the data?
  90. * number of strands reading each channel?
  91. * preserve order?
  92. gatherOptions:
  93. * is it a temporary file?
  94. This class serializes all information to every worker, where it is used to recereate a copy of the master filecollection. This will contain information derived from dali, and locally e.g. options specified in the activity helper. Each worker has a complete copy of the file information. (This is similar to dafilesrv with security tokens.)
  95. The files that are actually required by a worker are calculated by calling the following function. (Note the derived information is not serialized.)
  96. FilePartition FileCollection::calculatePartition(numChannels, partitionOptions)
  97. partitionOptions:
  98. * number of channels reading the data?
  99. * number of strands reading each channel?
  100. * which channel?
  101. * preserve order?
  102. * myIP
  103. A file partition contains a list of file slices:
  104. class FileSlice (not serialized)
  105. {
  106. IMagicRowStream * createRowStream(filter, ...); // MORE!
  107. File * logicalFile;
  108. offset_t startOffset;
  109. offset_t endOffset;
  110. };
  111. Things to bear in mind:
  112. - Optimize same file reused in a child query (filter likely to change)
  113. - Optimize same format reused in a child query (filename may be dynamic)
  114. - Intergrating third party file formats and distributed file systems may require extra information.
  115. - optimize reusing the format options.
  116. - ideally fail over to a backup copy midstream.. and retry in failed read e.g. if network fault
  117. Examples
  118. ========
  119. Example definition for a thor400, and two thor200s on the same nodes:
  120. hostGroup:
  121. - name: thor400Group
  122. host: [node400_01,node400_02,node400_03,...node400_400]
  123. storage:
  124. planes:
  125. #Simple 400 way thor
  126. - name: thor400
  127. prefix: /var/lib/HPCCSystems/thor400
  128. hosts: thor400Group
  129. #The storage plane used for replicating files on thor.
  130. - name: thor400_R1
  131. prefix: /var/lib/HPCCSystems/thor400
  132. hosts: thor400Group
  133. offset: 1
  134. # A 200 way thor using the first 200 nodes as the thor 400
  135. - name: thor200A
  136. prefix: /var/lib/HPCCSystems/thor400
  137. hosts: thor400Group
  138. size: 200
  139. # A 200 way thor using the second 200 nodes as the thor 400
  140. - name: thor200B
  141. prefix: /var/lib/HPCCSystems/thor400
  142. hosts: thor400Group
  143. size: 200
  144. start: 200
  145. # The replication plane for a 200 way thor using the second 200 nodes as the thor 400
  146. - name: thor200B_R1
  147. prefix: /var/lib/HPCCSystems/thor400
  148. hosts: thor400Group
  149. size: 200
  150. start: 200
  151. offset: 1
  152. # A roxie storage where 50way files are stored on a 100 way roxie
  153. - name: roxie100
  154. prefix: /var/lib/HPCCSystems/roxie100
  155. hosts: thor400Group
  156. size: 50
  157. # The replica of the roxie storage where 50way files are stored on a 100 way roxie
  158. - name: roxie100_R1
  159. prefix: /var/lib/HPCCSystems/thor400
  160. hosts: thor400Group
  161. start: 50
  162. size: 50
  163. device = (start + (part + offset) % size;
  164. size <= numDevices
  165. offset < numDevices
  166. device <= numDevices;
  167. There is no special casing of roxie replication, and each file exists on multiple storage planes. All of these should
  168. be considered when determining which is the best copy to read from a particular engine node.
  169. Creating storage planes from an existing systems [implemented]
  170. Milestones:
  171. -----------
  172. a) Create baremetal storage planes [done]
  173. b) [a] Start simplifying information in dali meta (e.g. partmask, remove full path name)
  174. *c) [a] Switch reading code to use storageplane away from using dali path and environment paths - in ALL disk reading and writing code
  175. - change numDevices so it matches the container
  176. d) [c] Convert dali information from using copies to multiple groups/planes
  177. *e) [a] Reimplement the current code to create an IPropertyTree from dali file information (in a form that can be reused in dali)
  178. *f) [e] Refactor existing PR to use data in an IPropertyTree and cleanly separate the interfaces.
  179. g) Switch hthor over to using the new classes by default and work through all issues
  180. h) Refactor stream reading code.
  181. Look at the spark interfaces for inspiration/compatibility
  182. i) Refactor disk writing code into common class?
  183. j) [e] create esp service for accessing meta information
  184. k) [h] Refactor and review azure blob code
  185. l) [k] Re-implement S3 reading and writing code.
  186. m) Switch fileview over to using the new classes. (Great test they can be used in another context + fixes a longstanding bug.)
  187. ) Implications for index reading? Will they end up being treated as a normal file? Don't implement for 8.0, although interface may support it.
  188. *) My primary focus for initial work.
  189. File reading refactoring
  190. ========================
  191. Buffer sizes:
  192. - storage plane specifies an optimal reading minimum
  193. - compression may have a requirement
  194. - the use for the data may impose a requirement e.g. a subset of the data, or only fetching a single record
  195. - parallel disk reading may want to read a big chunk, but then process in sections. groan.
  196. Look at lambda functions to create split points for a file. Can we use the java classes to implement it on binary files (and csv/xml)?
  197. ******************** Reading classes and meta information ********************
  198. meta comes from a combination of the information in dfs and the helper
  199. The main meta information uses the same structure that is return by the function that returns file infromation from dali.
  200. The format specific options are contained in a nested attribute so they can be completely arbitrary
  201. The helper class also generates a meta structure. Some options fill in root elements - e.g. compressed. Some fill in a new section (hints: @x=y). The format options are generated from the paramaters to the dataset format.
  202. note normally there is only a single (or very few) files, so merging isn't too painful.
  203. queryMeta()
  204. queryOptions()
  205. rename meta to format?
  206. ???
  207. DFU server
  208. ==========
  209. Where does DFUserver fit in in a container system?
  210. DFU has the following main functionality in a bare metal system:
  211. a) Spray a file from a 1 way landing zone to an N-way thor
  212. b) Convert file format when spraying. I suspect utf-16->utf8 is the only option actually used.
  213. c) Spray multiple files from a landing zone to a single logical file on an N-way thor
  214. d) Copy a logical file from a remote environment
  215. e) Despray a logical file to an external landing zone.
  216. f) Replicate an existing logical file on a given group.
  217. g) Copy logical files between groups
  218. h) File monitoring
  219. i) logical file operations
  220. j) superfile operations
  221. ECL has the ability to read a logical file directly from a landingzone using 'FILE::<ip>' file syntax, but I don't think it is used very frequently.
  222. How does this map to a containerized system? I think the same basic operations are likely to be useful.
  223. a) In most scenarios Landing zones are likely to be replaced with (blob) storage accounts. But for security reasons these are likely to remain distinct from the main location used by HPCC to store datasets. (The customer will have only access keys to copy files to and from those storage accounts.) The containerized system has a way for ECL to directly read from a blob storage account ('PLANE::<plane'), but I imagine users will still want to copy the files in many situations to control the lifetime of the copies etc.
  224. b) We still need a way to convert from utf16 to utf8, or extend the platform to allow utf16 to be read directly.
  225. c) This is still equally useful, allowing a set of files to be stored as a single file in a form that is easy for ECL to process.
  226. d) Important for copying data from an existing bare metal system to the cloud, and from a cloud system back to a bare metal system.
  227. e) Useful for exporting results to customers
  228. f+g) Essentially the same thing in the cloud world. It might still be useful to have
  229. h) I suspect we will need to map this to cloud-specific apis.
  230. i+j) Just as applicable in the container world.
  231. Broadly, landing zones in bare metal map to special storage planes in containerized, and groups also map to more general storage planes.
  232. There are a couple of complications connected with the implementation:
  233. 1) Copying is currently done by starting an ftslave process on either the source or the target nodes. In the container world there is no local node, and I think we would prefer not to start a process in order to copy each file.
  234. 2) Copying between storage groups should be done using the cloud provider api, rather than transferring data via a k8s job.
  235. Suggestions:
  236. * Have a load balanced dafilesrv which supports multiple replicas. It would have a secure external service, and an internal service for trusted components.
  237. * Move the ftslave logic into dafilesrv. Move the current code for ftslave actions into dafilesrv with new operations.
  238. * When copying from/to a bare metal system the requests are sent to the dafilesrv for the node that currently runs ftslave. For a container system the requests are sent to the loadbalanced service.
  239. * It might be possible to migrate to lamda style functions for some of the work...
  240. * A later optimization would use a cloud service where it was possible.
  241. * When local split points are supported it may be better to spray a file 1:1 along with partition information. Even without local split points it may still be better to spray a file 1:1 (cheaper).
  242. * What are the spray targets? It may need to be storage plane + number of parts, rather than a target cluster. The default number of parts is the #devices on the storage plane.
  243. => Milestones
  244. a) Move ftslave code to dafilesrv (partition, pull, push) [Should be included in 7.12.x stream to allow remote read compatibility?]
  245. b) Create a dafilesrv component to the helm charts, with internal and external services.
  246. c) use storage planes to determine how files are sprayed etc. (bare-metal, #devices)
  247. Adapt dfu/fileservices calls to take (storageplane,number) instead of cluster. There should already be a 1:1 mapping from existing cluster to storage planes in a bare-metal system, so this may not involve much work. [May also need a flag to indicate if ._1_of_1 is appended?]
  248. d) Select correct dafilesrv for bare-metal storage planes, or load balanced service for other.
  249. (May need to think through how remote files are represented.)
  250. => Can import from a bare metal system or a containerized system using command line??
  251. NOTE: Bare-metal to containerized will likely need push operations on the bare-metal system. (And therefore serialized security information)
  252. This may still cause issues since it is unlikely containerized will be able to pull from bare-metal.
  253. Pushing, but not creating a logical file entry on the containerized system should be easier since it can use a local storage plane definition.
  254. e) Switch over to using the esp based meta information, so that it can include details of storage planes and secrets.
  255. [Note this would also need to be in 7.12.x to allow remote export to containerized, that may well be a step too far]
  256. f) Add option to configure the number of file parts for spray/copy/despray
  257. g) Ensure that eclwatch picks up the list of storage planes (and the default number of file parts), and has ability to specify #parts.
  258. Later:
  259. h) plan how cloud-services can be used for some of the copies
  260. i) investigate using serverless functions to calculate split points.
  261. j) Use refactored disk read/write interfaces to clean up read and copy code.
  262. k) we may not want to expose access keys to allow remote reads/writes - in which they would need to be pushed from a bare-metal dafilesrv to a containerized dafilesrv.
  263. Other dependencies:
  264. * Refactored file meta information. If this is switching to being plane based, then the meta information should also be plane based. Main difference is not including the path in the meta information (can just be ignored)
  265. * esp service for getting file information. When reading remotely it needs to go via this now...