HPCCFileCache.cpp 11 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "HPCCFileCache.hpp"
  14. HPCCFileCache * HPCCFileCache::createFileCache(const char * username, const char * passwd)
  15. {
  16. ESPLOG(LogMax, "WsSQL: Creating new HPCC FILE CACHE");
  17. return new HPCCFileCache(username,passwd);
  18. }
  19. bool HPCCFileCache::populateTablesResponse(IEspGetDBMetaDataResponse & tablesrespstruct, const char * filterby)
  20. {
  21. bool success = false;
  22. cacheAllHpccFiles(filterby);
  23. IArrayOf<IEspHPCCTable> tables;
  24. HashIterator iterHash(cache);
  25. ForEach(iterHash)
  26. {
  27. const char* key = (const char*)iterHash.query().getKey();
  28. HPCCFilePtr file = dynamic_cast<HPCCFile *>(*cache.getValue(key));
  29. if ( file )
  30. {
  31. Owned<IEspHPCCTable> pTable = createHPCCTable();
  32. pTable->setName(file->getFullname());
  33. pTable->setFormat(file->getFormat());
  34. const char * ecl = file->getEcl();
  35. if (!ecl || !*ecl)
  36. continue;
  37. else
  38. {
  39. pTable->setDescription(file->getDescription());
  40. pTable->setIsKeyed(file->isFileKeyed());
  41. pTable->setIsSuper(file->isFileSuper());
  42. pTable->setOwner(file->getOwner());
  43. IArrayOf<IEspHPCCColumn> pColumns;
  44. IArrayOf<HPCCColumnMetaData> * cols = file->getColumns();
  45. for (int i = 0; i < cols->length(); i++)
  46. {
  47. Owned<IEspHPCCColumn> pCol = createHPCCColumn();
  48. HPCCColumnMetaData currcol = cols->item(i);
  49. pCol->setName(currcol.getColumnName());
  50. pCol->setType(currcol.getColumnType());
  51. pColumns.append(*pCol.getLink());
  52. }
  53. pTable->setColumns(pColumns);
  54. tables.append(*pTable.getLink());
  55. }
  56. }
  57. }
  58. tablesrespstruct.setTables(tables);
  59. return success;
  60. }
  61. bool HPCCFileCache::fetchHpccFilesByTableName(IArrayOf<SQLTable> * sqltables)
  62. {
  63. bool allFound = true;
  64. ForEachItemIn(tableindex, *sqltables)
  65. {
  66. SQLTable table = sqltables->item(tableindex);
  67. const char * cachedKey = cacheHpccFileByName(table.getName());
  68. allFound &= (cachedKey && *cachedKey);
  69. }
  70. return allFound;
  71. }
  72. bool HPCCFileCache::cacheAllHpccFiles(const char * filterby)
  73. {
  74. bool success = false;
  75. StringBuffer filter;
  76. if(filterby && *filterby)
  77. filter.append(filterby);
  78. else
  79. filter.append("*");
  80. Owned<IDFAttributesIterator> fi = queryDistributedFileDirectory().getDFAttributesIterator(filter, userdesc.get(), true, true, NULL);
  81. if(!fi)
  82. throw MakeStringException(-1,"Cannot get information from file system.");
  83. success = true;
  84. ForEach(*fi)
  85. {
  86. IPropertyTree &attr=fi->query();
  87. #if defined(_DEBUG)
  88. StringBuffer toxml;
  89. toXML(&attr, toxml);
  90. fprintf(stderr, "%s", toxml.str());
  91. #endif
  92. StringBuffer name(attr.queryProp("@name"));
  93. if (name.length()>0 && HPCCFile::validateFileName(name.str()))
  94. //if (name.length()>0)
  95. {
  96. const char * cachedKey = cacheHpccFileByName(name.str(), true);
  97. success &= (cachedKey && *cachedKey);
  98. }
  99. }
  100. return success;
  101. }
  102. bool HPCCFileCache::updateHpccFileDescription(const char * filename, const char * user, const char * pass, const char * description)
  103. {
  104. Owned<IUserDescriptor> userdesc;
  105. try
  106. {
  107. userdesc.setown(createUserDescriptor());
  108. userdesc->set(user, pass);
  109. //queryDistributedFileDirectory returns singleton
  110. IDistributedFileDirectory & dfd = queryDistributedFileDirectory();
  111. Owned<IDistributedFile> df = dfd.lookup(filename, userdesc, false, false, false, nullptr, defaultPrivilegedUser);
  112. if(!df)
  113. return false;
  114. DistributedFilePropertyLock lock(df);
  115. lock.queryAttributes().setProp("@description",description);
  116. }
  117. catch (...)
  118. {
  119. return false;
  120. }
  121. return true;
  122. }
  123. HPCCFile * HPCCFileCache::fetchHpccFileByName(IUserDescriptor *user, const char * filename, bool namevalidated, bool acceptrawfiles)
  124. {
  125. StringBuffer username;
  126. user->getUserName(username);
  127. StringBuffer password;
  128. user->getPassword(password);
  129. return HPCCFileCache::fetchHpccFileByName(filename, username.str(), password.str(), namevalidated, acceptrawfiles);
  130. }
  131. HPCCFile * HPCCFileCache::fetchHpccFileByName(const char * filename, const char * user, const char * pass, bool namevalidated, bool acceptrawfiles)
  132. {
  133. Owned<HPCCFile> file;
  134. Owned<IUserDescriptor> userdesc;
  135. try
  136. {
  137. userdesc.setown(createUserDescriptor());
  138. userdesc->set(user, pass);
  139. //queryDistributedFileDirectory returns singleton
  140. IDistributedFileDirectory & dfd = queryDistributedFileDirectory();
  141. Owned<IDistributedFile> df = dfd.lookup(filename, userdesc, false, false, false, nullptr, defaultPrivilegedUser);
  142. if(!df)
  143. throw MakeStringException(-1,"Cannot find file %s.",filename);
  144. const char* lname=df->queryLogicalName();
  145. if (lname && *lname)
  146. {
  147. const char* fname=strrchr(lname,':');
  148. if (!namevalidated && !HPCCFile::validateFileName(lname))
  149. throw MakeStringException(-1,"Invalid SQL file name detected %s.", fname);
  150. file.setown(HPCCFile::createHPCCFile());
  151. file->setFullname(lname);
  152. file->setName(fname ? fname+1 : lname);
  153. }
  154. else
  155. throw MakeStringException(-1,"Cannot find file %s.",filename);
  156. file->setDescription(df->queryAttributes().queryProp("@description"));
  157. //Do we care about the clusters??
  158. //StringArray clusters;
  159. //if (cluster && *cluster)
  160. //{
  161. //df->getClusterNames(clusters);
  162. //if(!FindInStringArray(clusters, cluster))
  163. //throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file %s.",fname);
  164. //}
  165. #if defined(_DEBUG)
  166. StringBuffer atttree;
  167. toXML(&df->queryAttributes(), atttree, 0);
  168. fprintf(stderr, "%s", atttree.str());
  169. #endif
  170. IPropertyTree & properties = df->queryAttributes();
  171. if(properties.hasProp("ECL"))
  172. file->setEcl(properties.queryProp("ECL"));
  173. else if (!acceptrawfiles)
  174. throw MakeStringException(-1,"File %s does not contain required ECL record layout.",filename);
  175. file->setOwner(properties.queryProp("@owner"));
  176. IDistributedSuperFile *sf = df->querySuperFile();
  177. if(sf)
  178. {
  179. file->setIsSuperfile(true);
  180. }
  181. //unfortunately @format sometimes holds the file format, sometimes @kind does
  182. const char * kind = properties.queryProp("@kind");
  183. if (kind)
  184. {
  185. file->setFormat(kind);
  186. if ((stricmp(kind, "key") == 0))
  187. {
  188. file->setIsKeyedFile(true);
  189. ISecManager *secmgr = NULL;
  190. ISecUser *secuser = NULL;
  191. StringBuffer username;
  192. StringBuffer passwd;
  193. userdesc->getUserName(username);
  194. userdesc->getPassword(passwd);
  195. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(secmgr, secuser, username.str(), passwd.str());
  196. Owned<INewResultSet> result;
  197. try
  198. {
  199. result.setown(resultSetFactory->createNewFileResultSet(filename, NULL));
  200. if (result)
  201. {
  202. Owned<IResultSetCursor> cursor = result->createCursor();
  203. const IResultSetMetaData & meta = cursor->queryResultSet()->getMetaData();
  204. int columnCount = meta.getColumnCount();
  205. int keyedColumnCount = meta.getNumKeyedColumns();
  206. for (int i = 0; i < keyedColumnCount; i++)
  207. {
  208. SCMStringBuffer columnLabel;
  209. if (meta.hasSetTranslation(i))
  210. {
  211. meta.getNaturalColumnLabel(columnLabel, i);
  212. }
  213. if (columnLabel.length() < 1)
  214. {
  215. meta.getColumnLabel(columnLabel, i);
  216. }
  217. file->setKeyedColumn(columnLabel.str());
  218. }
  219. }
  220. }
  221. catch (IException * se)
  222. {
  223. StringBuffer s;
  224. se->errorMessage(s);
  225. IERRLOG("Error fetching keyed file %s info: %s", filename, s.str());
  226. se->Release();
  227. if (file)
  228. file.clear();
  229. }
  230. }
  231. }
  232. else
  233. {
  234. //@format - what format the file is (if not fixed width)
  235. const char* format = properties.queryProp("@format");
  236. file->setFormat(format);
  237. }
  238. if (file && ( file->containsNestedColumns() || strncmp(file->getFormat(), "XML", 3)==0))
  239. throw MakeStringException(-1,"Nested data files not supported: %s.",filename);
  240. }
  241. catch (IException * se)
  242. {
  243. StringBuffer s;
  244. se->errorMessage(s);
  245. IERRLOG("Error fetching file %s info: %s", filename, s.str());
  246. se->Release();
  247. if (file)
  248. file.clear();
  249. }
  250. catch (...)
  251. {
  252. if (file)
  253. file.clear();
  254. }
  255. if (file)
  256. return file.getLink();
  257. else
  258. return NULL;
  259. }
  260. const char * HPCCFileCache::cacheHpccFileByName(const char * filename, bool namevalidated)
  261. {
  262. if(cache.getValue(filename))
  263. return filename;
  264. Owned<HPCCFile> file;
  265. try
  266. {
  267. file.setown(HPCCFileCache::fetchHpccFileByName(userdesc, filename, namevalidated, false));
  268. }
  269. catch (...)
  270. {
  271. if (file)
  272. file.clear();
  273. }
  274. if (file)
  275. {
  276. cache.setValue(file->getFullname(), file.getLink());
  277. return file->getFullname();
  278. }
  279. return "";
  280. }
  281. HPCCFilePtr HPCCFileCache::getHpccFileByName(const char * filename)
  282. {
  283. HPCCFilePtr * hpccfile = cache.getValue(filename);
  284. if (hpccfile)
  285. return *hpccfile;
  286. return NULL;
  287. }
  288. bool HPCCFileCache::isHpccFileCached(const char * filename)
  289. {
  290. return cache.getValue(filename) != NULL;
  291. }