HPCCFileCache.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "HPCCFileCache.hpp"
  14. HPCCFileCache * HPCCFileCache::createFileCache(const char * username, const char * passwd)
  15. {
  16. ESPLOG(LogMax, "WsSQL: Creating new HPCC FILE CACHE");
  17. return new HPCCFileCache(username,passwd);
  18. }
  19. void populateColums(IArrayOf<HPCCColumnMetaData> * cols, IArrayOf<IEspHPCCColumn> & pColumns)
  20. {
  21. for (int i = 0; i < cols->length(); i++)
  22. {
  23. Owned<IEspHPCCColumn> pCol = createHPCCColumn();
  24. HPCCColumnMetaData& currcol = cols->item(i);
  25. IArrayOf<HPCCColumnMetaData> * ccols = currcol.getChildColumns();
  26. if (ccols != nullptr && ccols->length() != 0)
  27. {
  28. IArrayOf<IEspHPCCColumn> pCColumns;
  29. populateColums(ccols, pCColumns);
  30. pCol->setColumns(pCColumns);
  31. }
  32. pCol->setName(currcol.getColumnName());
  33. pCol->setType(currcol.getColumnType());
  34. pColumns.append(*pCol.getLink());
  35. }
  36. }
  37. bool HPCCFileCache::populateTablesResponse(IEspGetDBMetaDataResponse & tablesrespstruct, const char * filterby, bool nestedcolumns=true)
  38. {
  39. bool success = false;
  40. cacheAllHpccFiles(filterby);
  41. IArrayOf<IEspHPCCTable> tables;
  42. HashIterator iterHash(cache);
  43. ForEach(iterHash)
  44. {
  45. const char* key = (const char*)iterHash.query().getKey();
  46. HPCCFilePtr file = dynamic_cast<HPCCFile *>(*cache.getValue(key));
  47. if ( file )
  48. {
  49. Owned<IEspHPCCTable> pTable = createHPCCTable();
  50. pTable->setName(file->getFullname());
  51. pTable->setFormat(file->getFormat());
  52. const char * ecl = file->getEcl();
  53. if (!ecl || !*ecl)
  54. continue;
  55. else
  56. {
  57. pTable->setDescription(file->getDescription());
  58. pTable->setIsKeyed(file->isFileKeyed());
  59. pTable->setIsSuper(file->isFileSuper());
  60. pTable->setOwner(file->getOwner());
  61. IArrayOf<IEspHPCCColumn> pColumns;
  62. IArrayOf<HPCCColumnMetaData> * cols = file->getColumns();
  63. for (int i = 0; i < cols->length(); i++)
  64. {
  65. Owned<IEspHPCCColumn> pCol = createHPCCColumn();
  66. HPCCColumnMetaData& currcol = cols->item(i);
  67. if (nestedcolumns)
  68. {
  69. IArrayOf<HPCCColumnMetaData> * ccols = currcol.getChildColumns();
  70. if (ccols->length() != 0)
  71. {
  72. IArrayOf<IEspHPCCColumn> pCColumns;
  73. populateColums(ccols, pCColumns);
  74. pCol->setColumns(pCColumns);
  75. }
  76. }
  77. pCol->setName(currcol.getColumnName());
  78. pCol->setType(currcol.getColumnType());
  79. pColumns.append(*pCol.getLink());
  80. }
  81. pTable->setColumns(pColumns);
  82. tables.append(*pTable.getLink());
  83. }
  84. }
  85. }
  86. tablesrespstruct.setTables(tables);
  87. return success;
  88. }
  89. bool HPCCFileCache::fetchHpccFilesByTableName(IArrayOf<SQLTable> * sqltables)
  90. {
  91. bool allFound = true;
  92. ForEachItemIn(tableindex, *sqltables)
  93. {
  94. SQLTable & table = sqltables->item(tableindex);
  95. const char * cachedKey = cacheHpccFileByName(table.getName());
  96. allFound &= (cachedKey && *cachedKey);
  97. }
  98. return allFound;
  99. }
  100. bool HPCCFileCache::cacheAllHpccFiles(const char * filterby)
  101. {
  102. bool success = false;
  103. StringBuffer filter;
  104. if(filterby && *filterby)
  105. filter.append(filterby);
  106. else
  107. filter.append("*");
  108. Owned<IDFAttributesIterator> fi = queryDistributedFileDirectory().getDFAttributesIterator(filter, userdesc.get(), true, true, NULL);
  109. if(!fi)
  110. throw MakeStringException(-1,"Cannot get information from file system.");
  111. success = true;
  112. ForEach(*fi)
  113. {
  114. IPropertyTree &attr=fi->query();
  115. #if defined(_DEBUG)
  116. StringBuffer toxml;
  117. toXML(&attr, toxml);
  118. fprintf(stderr, "%s", toxml.str());
  119. #endif
  120. StringBuffer name(attr.queryProp("@name"));
  121. if (name.length()>0 && HPCCFile::validateFileName(name.str()))
  122. {
  123. const char * cachedKey = cacheHpccFileByName(name.str(), true);
  124. success &= (cachedKey && *cachedKey);
  125. }
  126. }
  127. return success;
  128. }
  129. bool HPCCFileCache::updateHpccFileDescription(const char * filename, const char * user, const char * pass, const char * description)
  130. {
  131. Owned<IUserDescriptor> userdesc;
  132. try
  133. {
  134. userdesc.setown(createUserDescriptor());
  135. userdesc->set(user, pass);
  136. //queryDistributedFileDirectory returns singleton
  137. IDistributedFileDirectory & dfd = queryDistributedFileDirectory();
  138. Owned<IDistributedFile> df = dfd.lookup(filename, userdesc, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser);
  139. if(!df)
  140. return false;
  141. DistributedFilePropertyLock lock(df);
  142. lock.queryAttributes().setProp("@description",description);
  143. }
  144. catch (...)
  145. {
  146. return false;
  147. }
  148. return true;
  149. }
  150. HPCCFile * HPCCFileCache::fetchHpccFileByName(IUserDescriptor *user, const char * filename, bool namevalidated, bool acceptrawfiles)
  151. {
  152. StringBuffer username;
  153. user->getUserName(username);
  154. StringBuffer password;
  155. user->getPassword(password);
  156. return HPCCFileCache::fetchHpccFileByName(filename, username.str(), password.str(), namevalidated, acceptrawfiles);
  157. }
  158. HPCCFile * HPCCFileCache::fetchHpccFileByName(const char * filename, const char * user, const char * pass, bool namevalidated, bool acceptrawfiles)
  159. {
  160. Owned<HPCCFile> file;
  161. Owned<IUserDescriptor> userdesc;
  162. try
  163. {
  164. userdesc.setown(createUserDescriptor());
  165. userdesc->set(user, pass);
  166. //queryDistributedFileDirectory returns singleton
  167. IDistributedFileDirectory & dfd = queryDistributedFileDirectory();
  168. Owned<IDistributedFile> df = dfd.lookup(filename, userdesc, AccessMode::tbdRead, false, false, nullptr, defaultPrivilegedUser);
  169. if(!df)
  170. throw MakeStringException(-1,"Cannot find file %s.",filename);
  171. const char* lname=df->queryLogicalName();
  172. if (lname && *lname)
  173. {
  174. const char* fname=strrchr(lname,':');
  175. if (!namevalidated && !HPCCFile::validateFileName(lname))
  176. throw MakeStringException(-1,"Invalid SQL file name detected %s.", fname);
  177. file.setown(HPCCFile::createHPCCFile());
  178. file->setFullname(lname);
  179. file->setName(fname ? fname+1 : lname);
  180. }
  181. else
  182. throw MakeStringException(-1,"Cannot find file %s.",filename);
  183. file->setDescription(df->queryAttributes().queryProp("@description"));
  184. //Do we care about the clusters??
  185. //StringArray clusters;
  186. //if (cluster && *cluster)
  187. //{
  188. //df->getClusterNames(clusters);
  189. //if(!FindInStringArray(clusters, cluster))
  190. //throw MakeStringException(ECLWATCH_FILE_NOT_EXIST,"Cannot find file %s.",fname);
  191. //}
  192. #if defined(_DEBUG)
  193. StringBuffer atttree;
  194. toXML(&df->queryAttributes(), atttree, 0);
  195. fprintf(stderr, "%s", atttree.str());
  196. #endif
  197. IPropertyTree & properties = df->queryAttributes();
  198. if(properties.hasProp("ECL"))
  199. file->setEcl(properties.queryProp("ECL"));
  200. else if (!acceptrawfiles)
  201. throw MakeStringException(-1,"File %s does not contain required ECL record layout.",filename);
  202. file->setOwner(properties.queryProp("@owner"));
  203. IDistributedSuperFile *sf = df->querySuperFile();
  204. if(sf)
  205. {
  206. file->setIsSuperfile(true);
  207. }
  208. //unfortunately @format sometimes holds the file format, sometimes @kind does
  209. const char * kind = properties.queryProp("@kind");
  210. if (kind)
  211. {
  212. file->setFormat(kind);
  213. if ((stricmp(kind, "key") == 0))
  214. {
  215. file->setIsKeyedFile(true);
  216. ISecManager *secmgr = NULL;
  217. ISecUser *secuser = NULL;
  218. StringBuffer username;
  219. StringBuffer passwd;
  220. userdesc->getUserName(username);
  221. userdesc->getPassword(passwd);
  222. Owned<IResultSetFactory> resultSetFactory = getSecResultSetFactory(secmgr, secuser, username.str(), passwd.str());
  223. Owned<INewResultSet> result;
  224. try
  225. {
  226. result.setown(resultSetFactory->createNewFileResultSet(filename, NULL));
  227. if (result)
  228. {
  229. Owned<IResultSetCursor> cursor = result->createCursor();
  230. const IResultSetMetaData & meta = cursor->queryResultSet()->getMetaData();
  231. int columnCount = meta.getColumnCount();
  232. int keyedColumnCount = meta.getNumKeyedColumns();
  233. for (int i = 0; i < keyedColumnCount; i++)
  234. {
  235. SCMStringBuffer columnLabel;
  236. if (meta.hasSetTranslation(i))
  237. {
  238. meta.getNaturalColumnLabel(columnLabel, i);
  239. }
  240. if (columnLabel.length() < 1)
  241. {
  242. meta.getColumnLabel(columnLabel, i);
  243. }
  244. file->setKeyedColumn(columnLabel.str());
  245. }
  246. }
  247. }
  248. catch (IException * se)
  249. {
  250. StringBuffer s;
  251. se->errorMessage(s);
  252. IERRLOG("Error fetching keyed file %s info: %s", filename, s.str());
  253. se->Release();
  254. if (file)
  255. file.clear();
  256. }
  257. }
  258. }
  259. else
  260. {
  261. //@format - what format the file is (if not fixed width)
  262. const char* format = properties.queryProp("@format");
  263. file->setFormat(format);
  264. }
  265. if (file && (strncmp(file->getFormat(), "XML", 3)==0))
  266. throw MakeStringException(-1,"Nested data files not supported: %s.",filename);
  267. }
  268. catch (IException * se)
  269. {
  270. StringBuffer s;
  271. se->errorMessage(s);
  272. IERRLOG("Error fetching file %s info: %s", filename, s.str());
  273. se->Release();
  274. if (file)
  275. file.clear();
  276. }
  277. catch (...)
  278. {
  279. if (file)
  280. file.clear();
  281. }
  282. if (file)
  283. return file.getLink();
  284. else
  285. return NULL;
  286. }
  287. const char * HPCCFileCache::cacheHpccFileByName(const char * filename, bool namevalidated)
  288. {
  289. if(cache.getValue(filename))
  290. return filename;
  291. Owned<HPCCFile> file;
  292. try
  293. {
  294. file.setown(HPCCFileCache::fetchHpccFileByName(userdesc, filename, namevalidated, false));
  295. }
  296. catch (...)
  297. {
  298. if (file)
  299. file.clear();
  300. }
  301. if (file)
  302. {
  303. cache.setValue(file->getFullname(), file.getLink());
  304. return file->getFullname();
  305. }
  306. return "";
  307. }
  308. HPCCFilePtr HPCCFileCache::getHpccFileByName(const char * filename)
  309. {
  310. HPCCFilePtr * hpccfile = cache.getValue(filename);
  311. if (hpccfile)
  312. return *hpccfile;
  313. return NULL;
  314. }
  315. bool HPCCFileCache::isHpccFileCached(const char * filename)
  316. {
  317. return cache.getValue(filename) != NULL;
  318. }