DPWorkunit.ts 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151
  1. import { LogicalFile, Result, Workunit } from "@hpcc-js/comms";
  2. import { globalKeyValStore, IKeyValStore } from "../KeyValStore";
  3. function analyzeECL(name: string, format: string) {
  4. return `
  5. IMPORT STD.DataPatterns;
  6. filePath := '~${name}';
  7. ds := DATASET(filePath, RECORDOF(filePath, LOOKUP), ${format});
  8. profileResults := DataPatterns.Profile(ds);
  9. OUTPUT(profileResults, ALL, NAMED('profileResults'));
  10. `;
  11. }
  12. function optimizeECL(origName, origFormat, newFields, transformFields, newName, overwrite) {
  13. return `
  14. oldName := '~${origName}';
  15. oldLayout := RECORDOF(oldName, LOOKUP);
  16. oldDataset := DATASET(oldName, oldLayout, ${origFormat});
  17. NewLayout := RECORD
  18. ${newFields}
  19. END;
  20. NewLayout MakeNewLayout(oldLayout L) := TRANSFORM
  21. ${transformFields}
  22. SELF := L;
  23. END;
  24. newDataset := PROJECT(oldDataset, MakeNewLayout(LEFT));
  25. OUTPUT(newDataset, , '~${newName}'${overwrite ? ", OVERWRITE" : ""});
  26. `;
  27. }
  28. export class DPWorkunit {
  29. private readonly _lf: LogicalFile;
  30. private readonly _store: IKeyValStore;
  31. private readonly _storeID: string;
  32. private readonly _storeWuidID: string;
  33. private _wu: Workunit;
  34. private _resultPromise;
  35. constructor(nodeGroup: string, name: string, modified: string) {
  36. this._lf = LogicalFile.attach({ baseUrl: "" }, nodeGroup, name);
  37. this._store = globalKeyValStore();
  38. this._storeID = `dp-${nodeGroup}-${name}`;
  39. this._storeWuidID = `${this._storeID}-${modified}-wuid`.split(" ").join("_");
  40. }
  41. clearCache() {
  42. delete this._wu;
  43. delete this._resultPromise;
  44. }
  45. resolveWU(): Promise<Workunit | undefined> {
  46. return this._store.get(this._storeWuidID).then(wuid => {
  47. if (this._wu && this._wu.Wuid === wuid) {
  48. return this._wu;
  49. }
  50. // Clean out old key (HPCC-25544) ---
  51. this._store.delete(`${this._storeID}-wuid`);
  52. this.clearCache();
  53. return wuid && Workunit.attach({ baseUrl: "" }, wuid);
  54. }).then(wu => {
  55. return wu && wu.refresh();
  56. }).then(wu => {
  57. if (wu && !wu.isDeleted()) {
  58. this._wu = wu;
  59. return wu;
  60. }
  61. return undefined;
  62. });
  63. }
  64. refreshWU(): Promise<Workunit | undefined> {
  65. return this.resolveWU().then(wu => {
  66. if (wu) {
  67. return wu.refresh();
  68. }
  69. return wu;
  70. }).then(wu => {
  71. if (wu && wu.Archived) {
  72. return wu.restore().then(() => wu);
  73. }
  74. return wu;
  75. }).then(wu => {
  76. if (wu && !wu.isFailed()) {
  77. return wu;
  78. }
  79. return undefined;
  80. });
  81. }
  82. delete(): Promise<void> {
  83. return this.resolveWU().then(wu => {
  84. if (wu) {
  85. wu.delete();
  86. }
  87. this.clearCache();
  88. return this._store.delete(this._storeWuidID);
  89. });
  90. }
  91. create(target: string): Promise<Workunit> {
  92. return this.resolveWU().then(wu => {
  93. if (wu) {
  94. return wu;
  95. }
  96. return this._lf.fetchInfo().then(() => {
  97. return Workunit.submit({ baseUrl: "" }, target, analyzeECL(this._lf.Name, this._lf.ContentType));
  98. }).then(wu => {
  99. this._wu = wu;
  100. return this._store.set(this._storeWuidID, wu.Wuid).then(() => wu);
  101. });
  102. });
  103. }
  104. fetchResults() {
  105. if (!this._resultPromise) {
  106. if (!this._wu) {
  107. this._resultPromise = Promise.resolve([]);
  108. } else {
  109. this._resultPromise = this._wu.fetchResults().then(results => {
  110. return results && results[0];
  111. }).then((result?: Result) => {
  112. if (result) {
  113. return result.fetchRows();
  114. }
  115. return [];
  116. });
  117. }
  118. }
  119. return this._resultPromise;
  120. }
  121. optimize(target: string, name: string, overwrite: boolean) {
  122. return Promise.all([this._lf.fetchInfo(), this.fetchResults()]).then(([lfInfo, rows]) => {
  123. let fields = "";
  124. let transformFields = "";
  125. rows.forEach(row => {
  126. if (fields.length) fields += "\n";
  127. if (transformFields.length) transformFields += "\n";
  128. fields += ` ${row.best_attribute_type} ${row.attribute};`;
  129. transformFields += ` SELF.${row.attribute} := (${row.best_attribute_type})L.${row.attribute};`;
  130. }, "");
  131. return Workunit.submit({ baseUrl: "" }, target, optimizeECL(this._lf.Name, this._lf.ContentType, fields, transformFields, name, overwrite));
  132. });
  133. }
  134. }