layouttrans.cpp 40 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "layouttrans.ipp"
  14. //MORE: handle ifblocks
  15. //MORE: handle non-trivial field translations (for expandable types)
  16. char const * const scopeSeparator = ".";
  17. static IAtom * internalFposAtom;
  18. MODULE_INIT(INIT_PRIORITY_STANDARD)
  19. {
  20. internalFposAtom = createAtom("__internal_fpos__");
  21. return true;
  22. }
  23. RLTFailure * RLTFailure::appendScopeDesc(char const * scope)
  24. {
  25. if(scope)
  26. detail.append(" in ").append(scope);
  27. return this;
  28. }
  29. RLTFailure * RLTFailure::appendFieldName(char const * scope, IDefRecordElement const * field)
  30. {
  31. if(scope)
  32. detail.append(scope).append(scopeSeparator);
  33. detail.append(str(field->queryName()));
  34. return this;
  35. }
  36. RLTFailure * makeFailure(IRecordLayoutTranslator::Failure::Code code)
  37. {
  38. return new RLTFailure(code);
  39. }
  40. FieldSearcher::FieldSearcher(IDefRecordElement const * elem)
  41. {
  42. unsigned num = elem->numChildren();
  43. tab.reinit(num + num/2);
  44. for(unsigned i=0; i<num; ++i)
  45. tab.setValue(elem->queryChild(i)->queryName(), i);
  46. }
  47. bool FieldSearcher::search(IAtom * search, unsigned & pos) const
  48. {
  49. unsigned * ret = tab.getValue(search);
  50. if(ret)
  51. {
  52. pos = *ret;
  53. return true;
  54. }
  55. return false;
  56. }
  57. MappingLevel::MappingLevel(FieldMapping::List & _mappings) : topLevel(true), mappings(_mappings)
  58. {
  59. }
  60. MappingLevel::MappingLevel(MappingLevel * parent, char const * name, FieldMapping::List & _mappings) : topLevel(false), mappings(_mappings)
  61. {
  62. StringAttrBuilder fullScope(scope);
  63. if(!parent->topLevel)
  64. fullScope.append(parent->scope).append(scopeSeparator);
  65. fullScope.append(name);
  66. }
  67. void MappingLevel::calculateMappings(IDefRecordElement const * diskRecord, unsigned numKeyedDisk, IDefRecordElement const * activityRecord, unsigned numKeyedActivity)
  68. {
  69. if(diskRecord->getKind() != DEKrecord)
  70. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append("Disk record metadata had unexpected structure (expected record)")->appendScopeDesc(topLevel ? NULL : scope.str());
  71. if(activityRecord->getKind() != DEKrecord)
  72. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append("Activity record metadata had unexpected structure (expected record)")->appendScopeDesc(topLevel ? NULL : scope.str());
  73. unsigned numActivityChildren = activityRecord->numChildren();
  74. unsigned numDiskChildren = diskRecord->numChildren();
  75. bool activityHasInternalFpos = false;
  76. if(topLevel && (numActivityChildren > numKeyedActivity))
  77. {
  78. IDefRecordElement const * lastChild = activityRecord->queryChild(numActivityChildren-1);
  79. if((lastChild->queryName() == internalFposAtom) && (lastChild->queryType()->isInteger()))
  80. activityHasInternalFpos = true;
  81. }
  82. if((numActivityChildren - (activityHasInternalFpos ? 1 : 0)) > numDiskChildren) //if last activity field might be unmatched __internal_fpos__, should be more lenient by 1 as would fill that in (see below)
  83. throw makeFailure(IRecordLayoutTranslator::Failure::MissingDiskField)->append("Activity record requires more fields than index provides")->appendScopeDesc(topLevel ? NULL : scope.str());
  84. if(numKeyedActivity > numKeyedDisk)
  85. throw makeFailure(IRecordLayoutTranslator::Failure::UnkeyedDiskField)->append("Activity record requires more keyed fields than index provides")->appendScopeDesc(topLevel ? NULL : scope.str());
  86. BoolArray activityFieldMapped;
  87. activityFieldMapped.ensure(numActivityChildren);
  88. for(unsigned i=0; i<numActivityChildren; ++i)
  89. activityFieldMapped.append(false);
  90. FieldSearcher searcher(activityRecord);
  91. for(unsigned diskFieldNum = 0; diskFieldNum < numDiskChildren; ++diskFieldNum)
  92. {
  93. checkField(diskRecord, diskFieldNum, "Disk");
  94. bool diskFieldKeyed = (diskFieldNum < numKeyedDisk);
  95. unsigned activityFieldNum;
  96. if(searcher.search(diskRecord->queryChild(diskFieldNum)->queryName(), activityFieldNum))
  97. {
  98. bool activityFieldKeyed = (activityFieldNum < numKeyedActivity);
  99. if(activityFieldKeyed && !diskFieldKeyed)
  100. throw makeFailure(IRecordLayoutTranslator::Failure::UnkeyedDiskField)->append("Field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" is keyed in activity but not on disk");
  101. checkField(activityRecord, activityFieldNum, "Activity");
  102. attemptMapping(diskRecord, diskFieldNum, diskFieldKeyed, activityRecord, activityFieldNum, activityFieldKeyed);
  103. activityFieldMapped.replace(true, activityFieldNum);
  104. }
  105. else
  106. {
  107. mappings.append(*new FieldMapping(FieldMapping::None, diskRecord, diskFieldNum, diskFieldKeyed, NULL, 0, false));
  108. }
  109. }
  110. for(unsigned activityFieldNum=0; activityFieldNum<numActivityChildren; ++activityFieldNum)
  111. if(!activityFieldMapped.item(activityFieldNum))
  112. {
  113. checkField(activityRecord, activityFieldNum, "Activity");
  114. if((activityFieldNum != numActivityChildren-1) || !activityHasInternalFpos) //if last activity field is unmatched __internal_fpos__, this is not an error, we need do nothing and it will get correctly set to zero
  115. throw makeFailure(IRecordLayoutTranslator::Failure::MissingDiskField)->append("Field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" is required by activity but not present on disk index");
  116. }
  117. }
  118. void MappingLevel::attemptMapping(IDefRecordElement const * diskRecord, unsigned diskFieldNum, bool diskFieldKeyed, IDefRecordElement const * activityRecord, unsigned activityFieldNum, bool activityFieldKeyed)
  119. {
  120. IDefRecordElement const * diskField = diskRecord->queryChild(diskFieldNum);
  121. IDefRecordElement const * activityField = activityRecord->queryChild(activityFieldNum);
  122. IDefRecordElement const * diskChild = NULL;
  123. IDefRecordElement const * diskBlob = NULL;
  124. queryCheckFieldChild(diskField, "Disk", diskChild, diskBlob);
  125. IDefRecordElement const * activityChild = NULL;
  126. IDefRecordElement const * activityBlob = NULL;
  127. queryCheckFieldChild(activityField, "Activity", activityChild, activityBlob);
  128. if(diskBlob)
  129. {
  130. if(!activityBlob)
  131. throw makeFailure(IRecordLayoutTranslator::Failure::UntranslatableField)->append("Field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" is blob on disk but not in activity");
  132. if(!isSameBasicType(diskBlob->queryType(), activityBlob->queryType()))
  133. throw makeFailure(IRecordLayoutTranslator::Failure::UntranslatableField)->append("Blob field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" has differing referenced types on disk and in activity");
  134. }
  135. else
  136. {
  137. if(activityBlob)
  138. throw makeFailure(IRecordLayoutTranslator::Failure::UntranslatableField)->append("Field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" is blob in activity but not on disk");
  139. }
  140. if(diskChild)
  141. {
  142. if(!activityChild)
  143. throw makeFailure(IRecordLayoutTranslator::Failure::UntranslatableField)->append("Field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" is child dataset on disk but not in activity");
  144. if(activityFieldKeyed)
  145. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append("Activity record metadata had unexpected structure (keyed field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" is child dataset)");
  146. if(*activityChild == *diskChild)
  147. {
  148. mappings.append(*new FieldMapping(FieldMapping::Simple, diskRecord, diskFieldNum, false, activityRecord, activityFieldNum, false));
  149. }
  150. else
  151. {
  152. Owned<FieldMapping> mapping(new FieldMapping(FieldMapping::ChildDataset, diskRecord, diskFieldNum, false, activityRecord, activityFieldNum, false));
  153. MappingLevel childMappingLevel(this, str(diskField->queryName()), mapping->queryChildMappings());
  154. childMappingLevel.calculateMappings(diskChild, 0, activityChild, 0);
  155. mappings.append(*mapping.getClear());
  156. }
  157. }
  158. else
  159. {
  160. if(activityChild)
  161. throw makeFailure(IRecordLayoutTranslator::Failure::UntranslatableField)->append("Field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" is child dataset in activity but not on disk");
  162. if(!isSameBasicType(diskField->queryType(), activityField->queryType()))
  163. throw makeFailure(IRecordLayoutTranslator::Failure::UntranslatableField)->append("Field ")->appendFieldName(topLevel ? NULL : scope.str(), activityRecord->queryChild(activityFieldNum))->append(" has differing types on disk and in activity");
  164. mappings.append(*new FieldMapping(FieldMapping::Simple, diskRecord, diskFieldNum, diskFieldKeyed, activityRecord, activityFieldNum, activityFieldKeyed));
  165. }
  166. }
  167. void MappingLevel::checkField(IDefRecordElement const * record, unsigned num, char const * label)
  168. {
  169. switch(record->queryChild(num)->getKind())
  170. {
  171. case DEKfield:
  172. break;
  173. case DEKifblock:
  174. throw makeFailure(IRecordLayoutTranslator::Failure::UntranslatableField)->append(label)->append(" record metadata field #")->append(num)->append(" is an ifblock which is not currently translatable");
  175. case DEKnone:
  176. case DEKrecord:
  177. case DEKattr:
  178. default:
  179. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append(label)->append(" record metadata had unexpected structure (child #")->append(num)->append("was neither field nor ifblock)")->appendScopeDesc(topLevel ? NULL : scope.str());
  180. }
  181. }
  182. void MappingLevel::queryCheckFieldChild(IDefRecordElement const * field, char const * label, IDefRecordElement const * & child, IDefRecordElement const * & blob)
  183. {
  184. if(field->getKind() != DEKfield)
  185. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append(label)->append(" record metadata had unexpected structure (non-field found where field expected");
  186. type_t fieldType = field->queryType()->getTypeCode();
  187. switch(fieldType)
  188. {
  189. case type_table:
  190. case type_groupedtable:
  191. if(field->numChildren() != 1)
  192. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append(label)->append(" record metadata had unexpected structure (expected exactly one child of table field ")->appendFieldName(topLevel ? NULL : scope.str(), field)->append(")");
  193. child = field->queryChild(0);
  194. if(child->getKind() != DEKrecord)
  195. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append(label)->append(" record metadata had unexpected structure (unexpected non-record child of table field ")->appendFieldName(topLevel ? NULL : scope.str(), field)->append(")");
  196. break;
  197. case type_blob:
  198. if(field->numChildren() != 1)
  199. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append(label)->append(" record metadata had unexpected structure (expected exactly one child of blob field ")->appendFieldName(topLevel ? NULL : scope.str(), field)->append(")");
  200. blob = field->queryChild(0);
  201. if(blob->getKind() != DEKfield)
  202. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append(label)->append(" record metadata had unexpected structure (expected non-field child of blob field ")->appendFieldName(topLevel ? NULL : scope.str(), field)->append(")");
  203. break;
  204. default:
  205. if(field->numChildren() != 0)
  206. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append(label)->append(" record metadata had unexpected structure (unexpected children of field ")->appendFieldName(topLevel ? NULL : scope.str(), field)->append(")");
  207. }
  208. }
  209. void RowTransformer::build(unsigned & seq, FieldMapping::List const & mappings)
  210. {
  211. CIArrayOf<RowRecord> records;
  212. analyseMappings(mappings, records);
  213. keepFpos = false;
  214. copyToFpos = false;
  215. generateCopies(seq, records);
  216. }
  217. RowTransformer::RowRecord & RowTransformer::ensureItem(CIArrayOf<RowRecord> & arr, unsigned pos)
  218. {
  219. while(arr.ordinality() <= pos) arr.append(*new RowRecord);
  220. return arr.item(pos);
  221. }
  222. void RowTransformer::createRowRecord(FieldMapping const & mapping, CIArrayOf<RowRecord> & records, size32_t diskOffset, unsigned numVarFields, bool & prevActivityField, unsigned & prevActivityFieldNum)
  223. {
  224. size32_t diskSize = mapping.queryDiskFieldSize();
  225. unsigned activityFieldNum = mapping.queryActivityFieldNum();
  226. switch(mapping.queryType())
  227. {
  228. case FieldMapping::Simple:
  229. ensureItem(records, activityFieldNum).setVals(diskOffset, numVarFields, diskSize, (prevActivityField && (activityFieldNum == (prevActivityFieldNum+1))));
  230. prevActivityField = true;
  231. prevActivityFieldNum = activityFieldNum;
  232. break;
  233. case FieldMapping::ChildDataset:
  234. ensureItem(records, activityFieldNum).setVals(diskOffset, numVarFields, diskSize, false).setChildMappings(&mapping.queryChildMappings());
  235. prevActivityField = false;
  236. break;
  237. case FieldMapping::None:
  238. prevActivityField = false;
  239. break;
  240. default:
  241. throwUnexpected();
  242. }
  243. }
  244. void RowTransformer::analyseMappings(FieldMapping::List const & mappings, CIArrayOf<RowRecord> & records)
  245. {
  246. size32_t diskOffset = 0;
  247. unsigned numRowDiskFields = mappings.ordinality();
  248. bool prevActivityField = false;
  249. unsigned prevActivityFieldNum;
  250. for(unsigned diskFieldNum = 0; diskFieldNum < numRowDiskFields; ++diskFieldNum)
  251. {
  252. FieldMapping const & mapping = mappings.item(diskFieldNum);
  253. createRowRecord(mapping, records, diskOffset, diskVarFieldRelOffsets.ordinality(), prevActivityField, prevActivityFieldNum);
  254. size32_t diskSize = mapping.queryDiskFieldSize();
  255. if(diskSize == UNKNOWN_LENGTH)
  256. {
  257. diskVarFieldRelOffsets.append(diskOffset);
  258. diskVarFieldLenDisplacements.append(mapping.isDiskFieldSet() ? 1 : 0);
  259. diskOffset = 0;
  260. }
  261. else
  262. {
  263. diskOffset += diskSize;
  264. }
  265. }
  266. finalFixedSize = diskOffset;
  267. }
  268. void RowTransformer::generateSimpleCopy(unsigned & seq, RowRecord const & record)
  269. {
  270. if(!record.queryFollowOn())
  271. copies.append(*new FieldCopy(sequence, record.queryRelOffset(), record.queryRelBase()));
  272. FieldCopy & copy = copies.tos();
  273. size32_t size = record.querySize();
  274. if(size == UNKNOWN_LENGTH)
  275. copy.addVarField(record.queryRelBase()+1);
  276. else
  277. copy.addFixedSize(size);
  278. FieldMapping::List const * childMappings = record.queryChildMappings();
  279. if(childMappings)
  280. copy.setChildTransformer(new RowTransformer(seq, *childMappings));
  281. }
  282. void RowTransformer::generateCopies(unsigned & seq, CIArrayOf<RowRecord> const & records)
  283. {
  284. sequence = seq++;
  285. ForEachItemIn(fieldNum, records)
  286. {
  287. RowRecord const & record = records.item(fieldNum);
  288. generateSimpleCopy(seq, record);
  289. }
  290. }
  291. void RowTransformer::transform(IRecordLayoutTranslator::RowTransformContext * ctx, byte const * in, size32_t inSize, size32_t & inOffset, IMemoryBlock & out, size32_t & outOffset) const
  292. {
  293. ctx->set(sequence, 0, 0, in+inOffset);
  294. for(unsigned varIdx = 1; varIdx <= diskVarFieldRelOffsets.ordinality(); ++varIdx)
  295. {
  296. if(inOffset >= inSize)
  297. throw MakeStringException(0, "Disk row invalid during record layout translation");
  298. inOffset += diskVarFieldRelOffsets.item(varIdx-1);
  299. size32_t disp = diskVarFieldLenDisplacements.item(varIdx-1);
  300. size32_t size = *reinterpret_cast<size32_t const *>(in+inOffset+disp);
  301. // length prefix is little-endian
  302. #if __BYTE_ORDER == __BIG_ENDIAN
  303. _rev(&size);
  304. #endif
  305. size += disp;
  306. size += sizeof(size32_t);
  307. inOffset += size;
  308. ctx->set(sequence, varIdx, size, in+inOffset);
  309. }
  310. inOffset += finalFixedSize;
  311. ForEachItemIn(copyIdx, copies)
  312. copies.item(copyIdx).copy(ctx, out, outOffset);
  313. }
  314. void RowTransformer::createRowTransformContext(IRecordLayoutTranslator::RowTransformContext * ctx) const
  315. {
  316. ctx->init(sequence, diskVarFieldRelOffsets.ordinality()+1);
  317. ForEachItemIn(idx, copies)
  318. {
  319. RowTransformer const * child = copies.item(idx).queryChildTransformer();
  320. if(child)
  321. child->createRowTransformContext(ctx);
  322. }
  323. }
  324. void FieldCopy::copy(IRecordLayoutTranslator::RowTransformContext * ctx, IMemoryBlock & out, size32_t & outOffset) const
  325. {
  326. size32_t diskFieldSize = fixedSize;
  327. ForEachItemIn(varIdx, varFields)
  328. diskFieldSize += ctx->querySize(sequence, varFields.item(varIdx));
  329. byte const * in = ctx->queryPointer(sequence, relBase) + relOffset;
  330. if(childTransformer)
  331. {
  332. size32_t inOffset = sizeof(size32_t);
  333. size32_t sizeOutOffset = outOffset;
  334. outOffset += sizeof(size32_t);
  335. size32_t startOutOffset = outOffset;
  336. while(inOffset < diskFieldSize)
  337. childTransformer->transform(ctx, in, diskFieldSize, inOffset, out, outOffset);
  338. //Now patch the length up - transform may have resized out...
  339. size32_t * outSizePtr = reinterpret_cast<size32_t *>(out.getMem()+sizeOutOffset);
  340. *outSizePtr = outOffset-startOutOffset;
  341. }
  342. else
  343. {
  344. byte * target = out.ensure(outOffset+diskFieldSize);
  345. memcpy(target+outOffset, in, diskFieldSize);
  346. outOffset += diskFieldSize;
  347. }
  348. }
  349. IRecordLayoutTranslator::RowTransformContext::RowTransformContext(unsigned _num) : num(_num)
  350. {
  351. sizes = new unsigned *[num];
  352. ptrs = new byte const * *[num];
  353. for(unsigned i=0; i<num; ++i)
  354. {
  355. sizes[i] = NULL;
  356. ptrs[i] = NULL;
  357. }
  358. }
  359. IRecordLayoutTranslator::RowTransformContext::~RowTransformContext()
  360. {
  361. for(unsigned i=0; i<num; ++i)
  362. {
  363. if(sizes[i])
  364. delete [] sizes[i];
  365. if(ptrs[i])
  366. delete [] ptrs[i];
  367. }
  368. delete [] sizes;
  369. delete [] ptrs;
  370. }
  371. CRecordLayoutTranslator::CRecordLayoutTranslator(IDefRecordMeta const * _diskMeta, IDefRecordMeta const * _activityMeta, IRecordLayoutTranslator::Mode _mode) : diskMeta(const_cast<IDefRecordMeta *>(_diskMeta)), activityMeta(const_cast<IDefRecordMeta *>(_activityMeta)), activityKeySizes(NULL)
  372. {
  373. numKeyedDisk = diskMeta->numKeyedFields();
  374. numKeyedActivity = activityMeta->numKeyedFields();
  375. MappingLevel topMappingLevel(mappings);
  376. numTransformers = 0;
  377. try
  378. {
  379. if(numKeyedDisk==0)
  380. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append("Disk record had no keyed fields");
  381. if(numKeyedActivity==0)
  382. throw makeFailure(IRecordLayoutTranslator::Failure::BadStructure)->append("Activity record had no keyed fields");
  383. topMappingLevel.calculateMappings(diskMeta->queryRecord(), numKeyedDisk, activityMeta->queryRecord(), numKeyedActivity);
  384. calculateActivityKeySizes();
  385. calculateKeysTransformed();
  386. if (keysTransformed && _mode != TranslateAll)
  387. throw makeFailure(IRecordLayoutTranslator::Failure::KeyedDisallowed)->append("Translation of key fields would be required");
  388. transformer.build(numTransformers, mappings);
  389. }
  390. catch(Failure * f)
  391. {
  392. failure.setown(f);
  393. }
  394. }
  395. void CRecordLayoutTranslator::calculateActivityKeySizes()
  396. {
  397. activityKeySizes = new size32_t[numKeyedActivity];
  398. for(unsigned activityFieldNum=0; activityFieldNum<numKeyedActivity; ++activityFieldNum)
  399. activityKeySizes[activityFieldNum] = 0;
  400. ForEachItemIn(diskFieldNum, mappings)
  401. {
  402. FieldMapping const & mapping = mappings.item(diskFieldNum);
  403. if(mapping.queryType() == FieldMapping::Simple)
  404. {
  405. unsigned activityFieldNum = mapping.queryActivityFieldNum();
  406. if(activityFieldNum < numKeyedActivity)
  407. activityKeySizes[activityFieldNum] = activityMeta->queryRecord()->queryChild(activityFieldNum)->queryType()->getSize();
  408. }
  409. }
  410. }
  411. void CRecordLayoutTranslator::calculateKeysTransformed()
  412. {
  413. keysTransformed = true;
  414. if(numKeyedActivity != numKeyedDisk)
  415. return;
  416. for(unsigned diskFieldNum=0; diskFieldNum<numKeyedDisk; ++diskFieldNum)
  417. {
  418. FieldMapping const & mapping = mappings.item(diskFieldNum);
  419. if((mapping.queryType() != FieldMapping::Simple) || (mapping.queryActivityFieldNum() != diskFieldNum))
  420. return;
  421. }
  422. keysTransformed = false;
  423. }
  424. void CRecordLayoutTranslator::createDiskSegmentMonitors(SegmentMonitorContext const & in, IIndexReadContext & out)
  425. {
  426. if(failure) return;
  427. if(in.ordinality() != numKeyedActivity)
  428. {
  429. failure.setown(makeFailure(Failure::UnsupportedFilter)->append("Unsupported filter (segment monitor) type (too few filters)"));
  430. return;
  431. }
  432. size32_t diskOffset = 0;
  433. for(unsigned diskFieldNum = 0; diskFieldNum < numKeyedDisk; ++diskFieldNum)
  434. {
  435. FieldMapping const & mapping = mappings.item(diskFieldNum);
  436. assertex(mapping.queryDiskFieldNum() == diskFieldNum);
  437. size32_t size = mapping.queryDiskFieldSize();
  438. Owned<IKeySegmentMonitor> monitor;
  439. switch(mapping.queryType())
  440. {
  441. case FieldMapping::Simple:
  442. if(mapping.queryActivityFieldNum() < numKeyedActivity)
  443. {
  444. assertex(mapping.queryActivityFieldSize() == size);
  445. monitor.set(in.item(mapping.queryActivityFieldNum()));
  446. assertex(monitor->getSize() == size);
  447. if(monitor->getOffset() != diskOffset)
  448. {
  449. monitor.setown(monitor->clone());
  450. if(!monitor || !monitor->setOffset(diskOffset))
  451. {
  452. failure.setown(makeFailure(Failure::UnsupportedFilter)->append("Unable to change offset of filter (segment monitor) for field ")->append(mapping.queryDiskFieldName()));
  453. return;
  454. }
  455. }
  456. break;
  457. }
  458. //fall through
  459. case FieldMapping::None:
  460. monitor.setown(createWildKeySegmentMonitor(diskFieldNum, diskOffset, size));
  461. break;
  462. case FieldMapping::ChildDataset:
  463. default:
  464. throwUnexpected();
  465. }
  466. out.append(monitor.getLink());
  467. diskOffset += size;
  468. }
  469. }
  470. void CRecordLayoutTranslator::checkSizes(char const * filename, size32_t activitySize, size32_t diskSize) const
  471. {
  472. }
  473. IRecordLayoutTranslator::RowTransformContext * CRecordLayoutTranslator::getRowTransformContext()
  474. {
  475. Owned<IRecordLayoutTranslator::RowTransformContext> ctx = new RowTransformContext(numTransformers);
  476. transformer.createRowTransformContext(ctx);
  477. return ctx.getClear();
  478. }
  479. size32_t CRecordLayoutTranslator::transformRow(RowTransformContext * ctx, byte const * in, size32_t inSize, IMemoryBlock & out) const
  480. {
  481. size32_t inOffset = 0;
  482. size32_t outOffset = 0;
  483. transformer.transform(ctx, in, inSize, inOffset, out, outOffset);
  484. return outOffset;
  485. }
  486. void ExpandedSegmentMonitorList::append(IKeySegmentMonitor * monitor)
  487. {
  488. if(owner->failure) return;
  489. if(monitor->getSize() > owner->activityKeySizes[monitors.ordinality()])
  490. {
  491. owner->failure.setown(makeFailure(IRecordLayoutTranslator::Failure::UnsupportedFilter)->append("Unsupported filter (segment monitor) type (was larger than keyed field)"));
  492. return;
  493. }
  494. if(monitor->getSize() < owner->activityKeySizes[monitors.ordinality()])
  495. {
  496. owner->failure.setown(makeFailure(IRecordLayoutTranslator::Failure::UnsupportedFilter)->append("Unsupported filter (segment monitor) type (was smaller than keyed field)"));
  497. return;
  498. }
  499. monitors.append(*monitor);
  500. }
  501. IRecordLayoutTranslator * createRecordLayoutTranslator(IDefRecordMeta const * diskMeta, IDefRecordMeta const * activityMeta, IRecordLayoutTranslator::Mode _mode)
  502. {
  503. Owned<IRecordLayoutTranslator> layoutTrans = new CRecordLayoutTranslator(diskMeta, activityMeta, _mode);
  504. if(!layoutTrans->querySuccess())
  505. {
  506. StringBuffer cause;
  507. layoutTrans->queryFailure().getDetail(cause);
  508. throw MakeStringException(0, "Unable to recover from record layout mismatch (%s)", cause.str());
  509. }
  510. return layoutTrans.getClear();
  511. };
  512. extern THORHELPER_API IRecordLayoutTranslator * createRecordLayoutTranslator(size32_t diskMetaSize, const void *diskMetaData, size32_t activityMetaSize, const void *activityMetaData, IRecordLayoutTranslator::Mode _mode)
  513. {
  514. MemoryBuffer activityMetaSerialized;
  515. activityMetaSerialized.setBuffer(activityMetaSize, (void *) activityMetaData, false);
  516. Owned<IDefRecordMeta> activityMeta = deserializeRecordMeta(activityMetaSerialized, true);
  517. MemoryBuffer diskMetaSerialized;
  518. diskMetaSerialized.setBuffer(diskMetaSize, (void *) diskMetaData, false);
  519. Owned<IDefRecordMeta> diskMeta = deserializeRecordMeta(diskMetaSerialized, true);
  520. return createRecordLayoutTranslator(diskMeta, activityMeta, _mode);
  521. }
  522. #ifdef DEBUG_HELPERS_REQUIRED
  523. IPropertyTree * convertFieldMappingsToPTree(FieldMapping::List const & mappings)
  524. {
  525. Owned<IPropertyTree> tree = createPTree("Record");
  526. ForEachItemIn(mappingIdx, mappings)
  527. {
  528. FieldMapping const & m = mappings.item(mappingIdx);
  529. Owned<IPropertyTree> branch = createPTree();
  530. branch->setPropInt("@diskFieldNum", m.queryDiskFieldNum());
  531. branch->setProp("@diskFieldName", m.queryDiskFieldName());
  532. switch(m.queryType())
  533. {
  534. case FieldMapping::None:
  535. branch->setProp("@type", "None");
  536. break;
  537. case FieldMapping::Simple:
  538. branch->setProp("@type", "Simple");
  539. branch->setPropInt("@activityFieldNum", m.queryActivityFieldNum());
  540. branch->setProp("@activityFieldName", m.queryActivityFieldName());
  541. break;
  542. case FieldMapping::ChildDataset:
  543. branch->setProp("@type", "ChildDataset");
  544. branch->setPropInt("@activityFieldNum", m.queryActivityFieldNum());
  545. branch->setProp("@activityFieldName", m.queryActivityFieldName());
  546. branch->setPropTree("Record", convertFieldMappingsToPTree(m.queryChildMappings()));
  547. break;
  548. default:
  549. throwUnexpected();
  550. }
  551. tree->addPropTree("Mapping", branch.getClear());
  552. }
  553. return tree.getClear();
  554. }
  555. StringBuffer & CRecordLayoutTranslator::getMappingsAsString(StringBuffer & out) const
  556. {
  557. Owned<IPropertyTree> tree = convertFieldMappingsToPTree(mappings);
  558. toXML(tree, out);
  559. return out;
  560. }
  561. #endif
  562. CacheKey::CacheKey(IRecordLayoutTranslator::Mode _mode, size32_t _s1, void const * _d1, size32_t _s2, void const * _d2)
  563. : mode(_mode), s1(_s1), d1(static_cast<byte const *>(_d1)), s2(_s2), d2(static_cast<byte const *>(_d2))
  564. {
  565. hashval = hashc(d1, s1, (unsigned) mode);
  566. hashval = hashc(d2, s2, hashval);
  567. }
  568. CacheValue::CacheValue(IRecordLayoutTranslator::Mode _mode, size32_t s1, void const * d1, size32_t s2, void const * d2, IRecordLayoutTranslator * _trans)
  569. : b1(s1, d1), b2(s2, d2), key(_mode, (size32_t)b1.length(), b1.get(), (size32_t)b2.length(), b2.get()), trans(_trans)
  570. {
  571. }
  572. IRecordLayoutTranslator * CRecordLayoutTranslatorCache::get(IRecordLayoutTranslator::Mode mode, size32_t diskMetaSize, void const * diskMetaData, size32_t activityMetaSize, void const * activityMetaData, IDefRecordMeta const * activityMeta)
  573. {
  574. CacheKey key(mode, diskMetaSize, diskMetaData, activityMetaSize, activityMetaData);
  575. CacheValue * value = find(&key);
  576. if(!value)
  577. {
  578. Owned<IDefRecordMeta> activityMetaDeserialized;
  579. if(!activityMeta)
  580. {
  581. MemoryBuffer activityMetaSerialized;
  582. activityMetaSerialized.setBuffer(activityMetaSize, (void *) activityMetaData, false);
  583. activityMetaDeserialized.setown(deserializeRecordMeta(activityMetaSerialized, true));
  584. activityMeta = activityMetaDeserialized.get();
  585. }
  586. MemoryBuffer diskMetaSerialized;
  587. diskMetaSerialized.setBuffer(diskMetaSize, (void *) diskMetaData, false);
  588. Owned<IDefRecordMeta> diskMeta = deserializeRecordMeta(diskMetaSerialized, true);
  589. Owned<IRecordLayoutTranslator> trans = createRecordLayoutTranslator(diskMeta, activityMeta, mode);
  590. value = new CacheValue(mode, diskMetaSize, diskMetaData, activityMetaSize, activityMetaData, trans.getLink());
  591. addNew(value);
  592. }
  593. return value->getTranslator();
  594. }
  595. extern THORHELPER_API IRecordLayoutTranslatorCache * createRecordLayoutTranslatorCache()
  596. {
  597. return new CRecordLayoutTranslatorCache();
  598. }
  599. #ifdef _USE_CPPUNIT
  600. #include <cppunit/extensions/HelperMacros.h>
  601. //MORE: This does not test translation with blobs or child datasets. Also, it only creates translators --- testing they actually work would require a lot more framework...
  602. class RecordLayoutTranslatorTest : public CppUnit::TestFixture
  603. {
  604. CPPUNIT_TEST_SUITE(RecordLayoutTranslatorTest);
  605. CPPUNIT_TEST(testCount);
  606. CPPUNIT_TEST(testKeyedSwap);
  607. CPPUNIT_TEST(testUnkey);
  608. CPPUNIT_TEST(testKeyFail);
  609. CPPUNIT_TEST(testSwapKeyFail);
  610. CPPUNIT_TEST(testCache);
  611. CPPUNIT_TEST(testDropKeyed);
  612. CPPUNIT_TEST(testDropUnkeyed);
  613. CPPUNIT_TEST(testNewFieldFail);
  614. CPPUNIT_TEST(testRenamedFieldFail);
  615. CPPUNIT_TEST(testChangeTypeFail);
  616. CPPUNIT_TEST_SUITE_END();
  617. public:
  618. void setUp()
  619. {
  620. bool done = false;
  621. for(unsigned m=0; !done; ++m)
  622. {
  623. Owned<IDefRecordBuilder> builder = createDErecord(4096);
  624. Owned<ITypeInfo> type;
  625. IAtom * name;
  626. size32_t size;
  627. unsigned keyed = 0;
  628. unsigned f;
  629. for(f = 0; getFieldData(m, f, type, name, size, keyed); ++f)
  630. {
  631. Owned<IDefRecordElement> field = createDEfield(name, type, NULL, size);
  632. builder->addChild(field);
  633. }
  634. Owned<IDefRecordElement> record = builder->close();
  635. if(f)
  636. meta.append(*createDefRecordMeta(record, keyed));
  637. else
  638. done = true;
  639. }
  640. }
  641. void testCount()
  642. {
  643. CPPUNIT_ASSERT(meta.ordinality() == 10);
  644. }
  645. void testKeyedSwap()
  646. {
  647. doTranslate(0, 1);
  648. }
  649. void testUnkey()
  650. {
  651. doTranslate(0, 2);
  652. }
  653. void testKeyFail()
  654. {
  655. doTranslateFail(0, 3, IRecordLayoutTranslator::Failure::UnkeyedDiskField);
  656. }
  657. void testSwapKeyFail()
  658. {
  659. doTranslateFail(0, 4, IRecordLayoutTranslator::Failure::UnkeyedDiskField);
  660. }
  661. void testDropKeyed()
  662. {
  663. doTranslate(0, 5);
  664. }
  665. void testDropUnkeyed()
  666. {
  667. doTranslate(0, 6);
  668. }
  669. void testNewFieldFail()
  670. {
  671. doTranslateFail(0, 7, IRecordLayoutTranslator::Failure::MissingDiskField);
  672. }
  673. void testRenamedFieldFail()
  674. {
  675. doTranslateFail(0, 8, IRecordLayoutTranslator::Failure::MissingDiskField);
  676. }
  677. void testChangeTypeFail()
  678. {
  679. doTranslateFail(0, 9, IRecordLayoutTranslator::Failure::UntranslatableField);
  680. }
  681. void testCache()
  682. {
  683. MemoryBuffer buff[3];
  684. for(unsigned m=0; m<3; ++m)
  685. serializeRecordMeta(buff[m], &meta.item(m), true);
  686. Owned<IRecordLayoutTranslatorCache> cache = createRecordLayoutTranslatorCache();
  687. CPPUNIT_ASSERT(cache.get() != 0);
  688. CPPUNIT_ASSERT(cache->count() == 0);
  689. Owned<IRecordLayoutTranslator> t1 = cache->get(IRecordLayoutTranslator::TranslateAll, buff[0].length(), buff[0].bufferBase(), buff[1].length(), buff[1].bufferBase(), NULL);
  690. CPPUNIT_ASSERT(cache->count() == 1);
  691. Owned<IRecordLayoutTranslator> t2 = cache->get(IRecordLayoutTranslator::TranslateAll, buff[0].length(), buff[0].bufferBase(), buff[1].length(), buff[1].bufferBase(), NULL);
  692. CPPUNIT_ASSERT(cache->count() == 1);
  693. Owned<IRecordLayoutTranslator> t3 = cache->get(IRecordLayoutTranslator::TranslateAll, buff[0].length(), buff[0].bufferBase(), buff[2].length(), buff[2].bufferBase(), NULL);
  694. CPPUNIT_ASSERT(cache->count() == 2);
  695. CPPUNIT_ASSERT(t1.get() == t2.get());
  696. CPPUNIT_ASSERT(t1.get() != t3.get());
  697. }
  698. private:
  699. bool getFieldData(unsigned m, unsigned f, Owned<ITypeInfo> & type, IAtom * & name, size32_t & size, unsigned & keyed)
  700. {
  701. switch(m)
  702. {
  703. case 0:
  704. //disk
  705. keyed = 2;
  706. switch(f)
  707. {
  708. case 0:
  709. type.setown(makeIntType(1, false));
  710. name = createAtom("i");
  711. size = 1;
  712. return true;
  713. case 1:
  714. type.setown(makeIntType(4, false));
  715. name = createAtom("j");
  716. size = 4;
  717. return true;
  718. case 2:
  719. type.setown(makeStringType(8));
  720. name = createAtom("str");
  721. size = 8;
  722. return true;
  723. }
  724. return false;
  725. case 1:
  726. //swap i,j
  727. keyed = 2;
  728. switch(f)
  729. {
  730. case 0:
  731. type.setown(makeIntType(4, false));
  732. name = createAtom("j");
  733. size = 4;
  734. return true;
  735. case 1:
  736. type.setown(makeIntType(1, false));
  737. name = createAtom("i");
  738. size = 1;
  739. return true;
  740. case 2:
  741. type.setown(makeStringType(8));
  742. name = createAtom("str");
  743. size = 8;
  744. return true;
  745. }
  746. return false;
  747. case 2:
  748. //unkey j
  749. keyed = 1;
  750. switch(f)
  751. {
  752. case 0:
  753. type.setown(makeIntType(1, false));
  754. name = createAtom("i");
  755. size = 1;
  756. return true;
  757. case 1:
  758. type.setown(makeIntType(4, false));
  759. name = createAtom("j");
  760. size = 4;
  761. return true;
  762. case 2:
  763. type.setown(makeStringType(8));
  764. name = createAtom("str");
  765. size = 8;
  766. return true;
  767. }
  768. return false;
  769. case 3:
  770. //key str (fails)
  771. keyed = 3;
  772. switch(f)
  773. {
  774. case 0:
  775. type.setown(makeIntType(1, false));
  776. name = createAtom("i");
  777. size = 1;
  778. return true;
  779. case 1:
  780. type.setown(makeIntType(4, false));
  781. name = createAtom("j");
  782. size = 4;
  783. return true;
  784. case 2:
  785. type.setown(makeStringType(8));
  786. name = createAtom("str");
  787. size = 8;
  788. return true;
  789. }
  790. return false;
  791. case 4:
  792. //move str into key (fails)
  793. keyed = 2;
  794. switch(f)
  795. {
  796. case 0:
  797. type.setown(makeIntType(1, false));
  798. name = createAtom("i");
  799. size = 1;
  800. return true;
  801. case 1:
  802. type.setown(makeStringType(8));
  803. name = createAtom("str");
  804. size = 8;
  805. return true;
  806. case 2:
  807. type.setown(makeIntType(4, false));
  808. name = createAtom("j");
  809. size = 4;
  810. return true;
  811. }
  812. return false;
  813. case 5:
  814. //drop j
  815. keyed = 1;
  816. switch(f)
  817. {
  818. case 0:
  819. type.setown(makeIntType(1, false));
  820. name = createAtom("i");
  821. size = 1;
  822. return true;
  823. case 1:
  824. type.setown(makeStringType(8));
  825. name = createAtom("str");
  826. size = 8;
  827. return true;
  828. }
  829. return false;
  830. case 6:
  831. //drop str
  832. keyed = 2;
  833. switch(f)
  834. {
  835. case 0:
  836. type.setown(makeIntType(1, false));
  837. name = createAtom("i");
  838. size = 1;
  839. return true;
  840. case 1:
  841. type.setown(makeIntType(4, false));
  842. name = createAtom("j");
  843. size = 4;
  844. return true;
  845. }
  846. return false;
  847. case 7:
  848. //add new field
  849. keyed = 2;
  850. switch(f)
  851. {
  852. case 0:
  853. type.setown(makeIntType(1, false));
  854. name = createAtom("i");
  855. size = 1;
  856. return true;
  857. case 1:
  858. type.setown(makeIntType(4, false));
  859. name = createAtom("j");
  860. size = 4;
  861. return true;
  862. case 2:
  863. type.setown(makeStringType(8));
  864. name = createAtom("str");
  865. size = 8;
  866. return true;
  867. case 3:
  868. type.setown(makeStringType(8));
  869. name = createAtom("other");
  870. size = 8;
  871. return true;
  872. }
  873. return false;
  874. case 8:
  875. //rename field
  876. keyed = 2;
  877. switch(f)
  878. {
  879. case 0:
  880. type.setown(makeIntType(1, false));
  881. name = createAtom("i");
  882. size = 1;
  883. return true;
  884. case 1:
  885. type.setown(makeIntType(4, false));
  886. name = createAtom("j");
  887. size = 4;
  888. return true;
  889. case 2:
  890. type.setown(makeStringType(8));
  891. name = createAtom("other");
  892. size = 8;
  893. return true;
  894. }
  895. return false;
  896. case 9:
  897. //change type
  898. keyed = 2;
  899. switch(f)
  900. {
  901. case 0:
  902. type.setown(makeIntType(1, false));
  903. name = createAtom("i");
  904. size = 1;
  905. return true;
  906. case 1:
  907. type.setown(makeIntType(4, false));
  908. name = createAtom("j");
  909. size = 4;
  910. return true;
  911. case 2:
  912. type.setown(makeStringType(9));
  913. name = createAtom("str");
  914. size = 9;
  915. return true;
  916. }
  917. return false;
  918. }
  919. return false;
  920. }
  921. void doTranslate(unsigned disk, unsigned activity)
  922. {
  923. Owned<IRecordLayoutTranslator> trans = new CRecordLayoutTranslator(&meta.item(disk), &meta.item(activity), IRecordLayoutTranslator::TranslateAll);
  924. CPPUNIT_ASSERT(trans.get() != NULL);
  925. CPPUNIT_ASSERT(trans->querySuccess());
  926. }
  927. void doTranslateFail(unsigned disk, unsigned activity, unsigned code)
  928. {
  929. Owned<IRecordLayoutTranslator> trans = new CRecordLayoutTranslator(&meta.item(disk), &meta.item(activity), IRecordLayoutTranslator::TranslateAll);
  930. CPPUNIT_ASSERT(trans.get() != 0);
  931. CPPUNIT_ASSERT(!trans->querySuccess());
  932. CPPUNIT_ASSERT(trans->queryFailure().queryCode() == code);
  933. }
  934. private:
  935. IArrayOf<IDefRecordMeta> meta;
  936. MemoryBuffer * buff;
  937. Owned<IRecordLayoutTranslatorCache> cache;
  938. };
  939. CPPUNIT_TEST_SUITE_REGISTRATION(RecordLayoutTranslatorTest);
  940. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(RecordLayoutTranslatorTest, "RecordLayoutTranslatorTest");
  941. #endif