roxierow.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jexcept.hpp"
  14. #include "jcrc.hpp"
  15. #include "thorcommon.ipp" // for CachedOutputMetaData
  16. #include "roxierow.hpp"
  17. //Classes can be used to configure the allocator, and add extra data to the end.
  18. //The checking needs to be done by setting a bit in the allocatorid
  19. class NoCheckingHelper
  20. {
  21. public:
  22. enum {
  23. extraSize = 0,
  24. allocatorCheckFlag = 0x00000000
  25. };
  26. static inline void setCheck(size32_t size, void * ptr) {}
  27. static inline bool isValid(const void * ptr) { return true; }
  28. };
  29. //NOTE: If a row requires checking then the row will also have the bit set to indicate it requires a destructor
  30. //so that rows are checked on destruction.
  31. //Therefore checking if the destructor is set for a row in isValid() to protect us from uninitialised crcs.
  32. class Crc16CheckingHelper
  33. {
  34. public:
  35. enum {
  36. extraSize = sizeof(unsigned short),
  37. allocatorCheckFlag = 0x00100000|ACTIVITY_FLAG_NEEDSDESTRUCTOR
  38. };
  39. static inline void setCheck(size32_t size, void * _ptr)
  40. {
  41. byte * ptr = static_cast<byte *>(_ptr);
  42. memsize_t capacity = RoxieRowCapacity(ptr);
  43. if (capacity < size + extraSize)
  44. throw MakeStringException(0, "Data was written past the end of the row - allocated %d, written %d", (size32_t)(capacity - extraSize), size);
  45. memset(ptr+size, 0, capacity - size - extraSize);
  46. unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
  47. *check = crc16(ptr, capacity-extraSize, 0);
  48. }
  49. static inline bool isValid(const void * _ptr)
  50. {
  51. if (RoxieRowHasDestructor(_ptr))
  52. {
  53. const byte * ptr = static_cast<const byte *>(_ptr);
  54. memsize_t capacity = RoxieRowCapacity(ptr);
  55. const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
  56. return *check == crc16(ptr, capacity-extraSize, 0);
  57. }
  58. return true;
  59. }
  60. };
  61. //This is here as demonstration of an alternative implementation... crc16 is possibly a bit expensive.
  62. class Sum16CheckingHelper
  63. {
  64. public:
  65. enum {
  66. extraSize = sizeof(unsigned short),
  67. allocatorCheckFlag = 0x00200000|ACTIVITY_FLAG_NEEDSDESTRUCTOR
  68. };
  69. static inline void setCheck(size32_t size, void * _ptr)
  70. {
  71. byte * ptr = static_cast<byte *>(_ptr);
  72. memsize_t capacity = RoxieRowCapacity(ptr);
  73. if (capacity < size + extraSize)
  74. throw MakeStringException(0, "Data was written past the end of the row - allocated %d, written %d", (size32_t)(capacity - extraSize), size);
  75. memset(ptr+size, 0, capacity - size - extraSize);
  76. unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
  77. *check = chksum16(ptr, capacity-extraSize);
  78. }
  79. static inline bool isValid(const void * _ptr)
  80. {
  81. if (RoxieRowHasDestructor(_ptr))
  82. {
  83. const byte * ptr = static_cast<const byte *>(_ptr);
  84. memsize_t capacity = RoxieRowCapacity(ptr);
  85. const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
  86. return chksum16(ptr, capacity-extraSize) == *check;
  87. }
  88. return true;
  89. }
  90. };
  91. bool isRowCheckValid(unsigned allocatorId, const void * row)
  92. {
  93. switch (allocatorId & ALLOCATORID_CHECK_MASK)
  94. {
  95. case NoCheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  96. return true;
  97. case Crc16CheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  98. return Crc16CheckingHelper::isValid(row);
  99. case Sum16CheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  100. return Sum16CheckingHelper::isValid(row);
  101. default:
  102. UNIMPLEMENTED;
  103. }
  104. }
  105. //--------------------------------------------------------------------------------------------------------------------
  106. //More: Function to calculate the total size of a row - requires access to a rowallocator.
  107. //--------------------------------------------------------------------------------------------------------------------
  108. class RoxieEngineRowAllocatorBase : implements IEngineRowAllocator, public CInterface
  109. {
  110. public:
  111. RoxieEngineRowAllocatorBase(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _createFlags)
  112. : createFlags(_createFlags), cache(_cache), rowManager(_rowManager), meta(_meta)
  113. {
  114. activityId = _activityId;
  115. allocatorId = _allocatorId;
  116. }
  117. IMPLEMENT_IINTERFACE
  118. //interface IEngineRowsetAllocator
  119. virtual byte * * createRowset(unsigned count)
  120. {
  121. if (count == 0)
  122. return NULL;
  123. return (byte **) rowManager.allocate(count * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
  124. }
  125. virtual void releaseRowset(unsigned count, byte * * rowset)
  126. {
  127. rtlReleaseRowset(count, rowset);
  128. }
  129. virtual byte * * linkRowset(byte * * rowset)
  130. {
  131. return rtlLinkRowset(rowset);
  132. }
  133. virtual byte * * appendRowOwn(byte * * rowset, unsigned newRowCount, void * row)
  134. {
  135. byte * * expanded = doReallocRows(rowset, newRowCount-1, newRowCount);
  136. expanded[newRowCount-1] = (byte *)row;
  137. return expanded;
  138. }
  139. virtual byte * * reallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
  140. {
  141. //New rows (if any) aren't cleared....
  142. return doReallocRows(rowset, oldRowCount, newRowCount);
  143. }
  144. virtual void releaseRow(const void * row)
  145. {
  146. ReleaseRoxieRow(row);
  147. }
  148. virtual void * linkRow(const void * row)
  149. {
  150. LinkRoxieRow(row);
  151. return const_cast<void *>(row);
  152. }
  153. virtual IOutputMetaData * queryOutputMeta()
  154. {
  155. return meta.queryOriginal();
  156. }
  157. virtual unsigned queryActivityId() const
  158. {
  159. return activityId;
  160. }
  161. virtual StringBuffer &getId(StringBuffer &idStr)
  162. {
  163. return idStr.append(activityId); // MORE - may want more context info in here
  164. }
  165. virtual IOutputRowSerializer *createDiskSerializer(ICodeContext *ctx)
  166. {
  167. return meta.createDiskSerializer(ctx, activityId);
  168. }
  169. virtual IOutputRowDeserializer *createDiskDeserializer(ICodeContext *ctx)
  170. {
  171. return meta.createDiskDeserializer(ctx, activityId);
  172. }
  173. virtual IOutputRowSerializer *createInternalSerializer(ICodeContext *ctx)
  174. {
  175. return meta.createInternalSerializer(ctx, activityId);
  176. }
  177. virtual IOutputRowDeserializer *createInternalDeserializer(ICodeContext *ctx)
  178. {
  179. return meta.createInternalDeserializer(ctx, activityId);
  180. }
  181. virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type)
  182. {
  183. CriticalBlock block(cs); // Not very likely but better be safe
  184. if (children.empty())
  185. {
  186. for (unsigned i =0;;i++)
  187. {
  188. IOutputMetaData * childMeta = meta.queryChildMeta(i);
  189. if (!childMeta)
  190. break;
  191. children.append(*cache->ensure(childMeta, activityId, createFlags));
  192. }
  193. }
  194. ForEachItemIn(i, children)
  195. {
  196. IEngineRowAllocator & cur = children.item(i);
  197. if (cur.queryOutputMeta()->queryTypeInfo() == type)
  198. return LINK(&cur);
  199. }
  200. return NULL;
  201. }
  202. protected:
  203. inline byte * * doReallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
  204. {
  205. if (!rowset)
  206. return createRowset(newRowCount);
  207. //Occasionally (in aggregates) we may try and append to a shared rowset. In this case we need to clone the
  208. //target rowset. It could be that the rowset is unshared immediately, but that is inefficient at worst.
  209. if (RoxieRowIsShared(rowset))
  210. {
  211. byte * * newset = createRowset(newRowCount);
  212. for (unsigned i=0; i < oldRowCount; i++)
  213. {
  214. byte * cur = rowset[i];
  215. LinkRoxieRow(cur);
  216. newset[i] = cur;
  217. }
  218. ReleaseRoxieRow(rowset);
  219. return newset;
  220. }
  221. //This would be more efficient if previous capacity was stored by the caller - or if capacity() is more efficient
  222. if (newRowCount * sizeof(void *) <= RoxieRowCapacity(rowset))
  223. return rowset;
  224. memsize_t capacity;
  225. void * ptr = (void *)rowset;
  226. rowManager.resizeRow(capacity, ptr, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
  227. return (byte * *)ptr;
  228. }
  229. protected:
  230. static CriticalSection cs; // Very unlikely to have contention, so share between all allocators
  231. roxiemem::RoxieHeapFlags createFlags;
  232. IRowAllocatorMetaActIdCache * cache;
  233. roxiemem::IRowManager & rowManager;
  234. const CachedOutputMetaData meta;
  235. unsigned activityId;
  236. unsigned allocatorId;
  237. IArrayOf<IEngineRowAllocator> children;
  238. };
  239. CriticalSection RoxieEngineRowAllocatorBase::cs;
  240. template <class CHECKER>
  241. class RoxieEngineFixedRowAllocator : public RoxieEngineRowAllocatorBase
  242. {
  243. public:
  244. RoxieEngineFixedRowAllocator(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
  245. : RoxieEngineRowAllocatorBase(_cache, _rowManager, _meta, _activityId, _allocatorId, _flags)
  246. {
  247. unsigned flags = _flags;
  248. if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
  249. flags |= roxiemem::RHFhasdestructor;
  250. heap.setown(rowManager.createFixedRowHeap(meta.getFixedSize()+CHECKER::extraSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED | CHECKER::allocatorCheckFlag, (roxiemem::RoxieHeapFlags)flags));
  251. }
  252. virtual void * createRow()
  253. {
  254. return heap->allocate();
  255. }
  256. virtual void * createRow(size32_t & allocatedSize)
  257. {
  258. allocatedSize = meta.getFixedSize();
  259. return heap->allocate();
  260. }
  261. virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
  262. {
  263. throwUnexpected();
  264. return NULL;
  265. }
  266. virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
  267. {
  268. if (!meta.needsDestruct() && !CHECKER::allocatorCheckFlag)
  269. return row;
  270. CHECKER::setCheck(finalSize, row);
  271. return heap->finalizeRow(row);
  272. }
  273. virtual void gatherStats(CRuntimeStatisticCollection & stats) override
  274. {
  275. heap->gatherStats(stats);
  276. }
  277. protected:
  278. Owned<roxiemem::IFixedRowHeap> heap;
  279. };
  280. template <class CHECKER>
  281. class RoxieEngineVariableRowAllocator : public RoxieEngineRowAllocatorBase
  282. {
  283. public:
  284. RoxieEngineVariableRowAllocator(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
  285. : RoxieEngineRowAllocatorBase(_cache, _rowManager, _meta, _activityId, _allocatorId, _flags)
  286. {
  287. unsigned flags = _flags;
  288. if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
  289. flags |= roxiemem::RHFhasdestructor;
  290. heap.setown(rowManager.createVariableRowHeap(allocatorId | ACTIVITY_FLAG_ISREGISTERED | CHECKER::allocatorCheckFlag, (roxiemem::RoxieHeapFlags)flags));
  291. }
  292. virtual void * createRow()
  293. {
  294. memsize_t allocSize = meta.getInitialSize();
  295. memsize_t capacity;
  296. return heap->allocate(allocSize+CHECKER::extraSize, capacity);
  297. }
  298. virtual void * createRow(size32_t & allocatedSize)
  299. {
  300. const memsize_t allocSize = meta.getInitialSize();
  301. memsize_t newCapacity; // always initialised by allocate
  302. void * row = heap->allocate(allocSize+CHECKER::extraSize, newCapacity);
  303. //This test should get constant folded to avoid the decrement when not checked.
  304. if (CHECKER::extraSize)
  305. newCapacity -= CHECKER::extraSize;
  306. allocatedSize = newCapacity;
  307. return row;
  308. }
  309. virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
  310. {
  311. const size32_t oldsize = size; // don't need to include the extra checking bytes
  312. memsize_t newCapacity; // always initialised by resizeRow
  313. void * newrow = heap->resizeRow(row, oldsize, newSize+CHECKER::extraSize, newCapacity);
  314. if (CHECKER::extraSize)
  315. newCapacity -= CHECKER::extraSize;
  316. size = newCapacity;
  317. return newrow;
  318. }
  319. virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
  320. {
  321. if (!meta.needsDestruct() && !CHECKER::allocatorCheckFlag)
  322. return row;
  323. void * newrow = heap->finalizeRow(row, oldSize, finalSize+CHECKER::extraSize);
  324. CHECKER::setCheck(finalSize, newrow);
  325. return newrow;
  326. }
  327. virtual void gatherStats(CRuntimeStatisticCollection & stats) override
  328. {
  329. heap->gatherStats(stats);
  330. }
  331. protected:
  332. Owned<roxiemem::IVariableRowHeap> heap;
  333. };
  334. IEngineRowAllocator * createRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
  335. {
  336. if (meta->getFixedSize() != 0)
  337. return new RoxieEngineFixedRowAllocator<NoCheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  338. else
  339. return new RoxieEngineVariableRowAllocator<NoCheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  340. }
  341. IEngineRowAllocator * createCrcRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
  342. {
  343. if (meta->getFixedSize() != 0)
  344. return new RoxieEngineFixedRowAllocator<Crc16CheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  345. else
  346. return new RoxieEngineVariableRowAllocator<Crc16CheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  347. }
  348. #pragma pack(push,1) // hashing on members, so ensure contiguous
  349. struct AllocatorKey
  350. {
  351. IOutputMetaData *meta;
  352. unsigned activityId;
  353. roxiemem::RoxieHeapFlags flags;
  354. AllocatorKey(IOutputMetaData *_meta, unsigned _activityId, roxiemem::RoxieHeapFlags _flags)
  355. : meta(_meta), activityId(_activityId), flags(_flags)
  356. {
  357. }
  358. bool operator==(AllocatorKey const &other) const
  359. {
  360. return (meta == other.meta) && (activityId == other.activityId) && (flags == other.flags);
  361. }
  362. };
  363. #pragma pack(pop)
  364. class CAllocatorCacheItem : public OwningHTMapping<IEngineRowAllocator, AllocatorKey>
  365. {
  366. Linked<IOutputMetaData> meta;
  367. unsigned allocatorId;
  368. public:
  369. CAllocatorCacheItem(IEngineRowAllocator *allocator, unsigned _allocatorId, AllocatorKey &key)
  370. : OwningHTMapping<IEngineRowAllocator, AllocatorKey>(*allocator, key), allocatorId(_allocatorId)
  371. {
  372. meta.set(key.meta);
  373. }
  374. unsigned queryAllocatorId() const { return allocatorId; }
  375. };
  376. class CAllocatorCache : public CSimpleInterfaceOf<IRowAllocatorMetaActIdCache>
  377. {
  378. OwningSimpleHashTableOf<CAllocatorCacheItem, AllocatorKey> cache;
  379. IArrayOf<IEngineRowAllocator> allAllocators;
  380. mutable SpinLock allAllocatorsLock;
  381. Owned<roxiemem::IRowManager> rowManager;
  382. IRowAllocatorMetaActIdCacheCallback *callback;
  383. inline CAllocatorCacheItem *_lookup(IOutputMetaData *meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
  384. {
  385. AllocatorKey key(meta, activityId, flags);
  386. return cache.find(key);
  387. }
  388. public:
  389. CAllocatorCache(IRowAllocatorMetaActIdCacheCallback *_callback) : callback(_callback)
  390. {
  391. }
  392. // IRowAllocatorMetaActIdCache
  393. virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags)
  394. {
  395. SpinBlock b(allAllocatorsLock);
  396. loop
  397. {
  398. CAllocatorCacheItem *container = _lookup(meta, activityId, flags);
  399. if (container)
  400. {
  401. if (0 == (roxiemem::RHFunique & flags))
  402. return LINK(&container->queryElement());
  403. // if in cache but unique, reuse allocatorId
  404. SpinUnblock b(allAllocatorsLock);
  405. return callback->createAllocator(this, meta, activityId, container->queryAllocatorId(), flags);
  406. }
  407. // NB: a RHFunique allocator, will cause 1st to be added to 'allAllocators'
  408. // subsequent requests for the same type of unique allocator, will share same allocatorId
  409. // resulting in the 1st allocator being reused by all instances for onDestroy() etc.
  410. assertex(allAllocators.ordinality() < ALLOCATORID_MASK);
  411. unsigned allocatorId = allAllocators.ordinality();
  412. IEngineRowAllocator *ret;
  413. {
  414. SpinUnblock b(allAllocatorsLock);
  415. ret = callback->createAllocator(this, meta, activityId, allocatorId, flags);
  416. assertex(ret);
  417. }
  418. if (allocatorId == allAllocators.ordinality())
  419. {
  420. AllocatorKey key(meta, activityId, flags);
  421. container = new CAllocatorCacheItem(LINK(ret), allocatorId, key);
  422. cache.replace(*container);
  423. allAllocators.append(*LINK(ret));
  424. return ret;
  425. }
  426. else
  427. {
  428. // someone has used the allocatorId I was going to use.. release and try again (hopefully happens very seldom)
  429. ret->Release();
  430. }
  431. }
  432. }
  433. virtual unsigned items() const
  434. {
  435. return allAllocators.ordinality();
  436. }
  437. // roxiemem::IRowAllocatorCache
  438. virtual unsigned getActivityId(unsigned cacheId) const
  439. {
  440. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  441. SpinBlock b(allAllocatorsLock);
  442. if (allAllocators.isItem(allocatorIndex))
  443. return allAllocators.item(allocatorIndex).queryActivityId();
  444. else
  445. {
  446. //assert(false);
  447. return UNKNOWN_ACTIVITY; // Used for tracing, better than a crash...
  448. }
  449. }
  450. virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const
  451. {
  452. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  453. SpinBlock b(allAllocatorsLock);
  454. if (allAllocators.isItem(allocatorIndex))
  455. return allAllocators.item(allocatorIndex).getId(out);
  456. else
  457. {
  458. assert(false);
  459. return out.append("unknown"); // Used for tracing, better than a crash...
  460. }
  461. }
  462. virtual void onDestroy(unsigned cacheId, void *row) const
  463. {
  464. IEngineRowAllocator *allocator;
  465. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  466. {
  467. SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
  468. if (allAllocators.isItem(allocatorIndex))
  469. allocator = &allAllocators.item(allocatorIndex);
  470. else
  471. {
  472. assert(false);
  473. return;
  474. }
  475. }
  476. if (!RoxieRowCheckValid(cacheId, row))
  477. {
  478. throw MakeStringException(0, "ERROR: crc check failure destroying row!");
  479. }
  480. allocator->queryOutputMeta()->destruct((byte *) row);
  481. }
  482. virtual void onClone(unsigned cacheId, void *row) const
  483. {
  484. IEngineRowAllocator *allocator;
  485. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  486. {
  487. SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
  488. if (allAllocators.isItem(allocatorIndex))
  489. allocator = &allAllocators.item(allocatorIndex);
  490. else
  491. {
  492. assert(false);
  493. return;
  494. }
  495. }
  496. if (!RoxieRowCheckValid(cacheId, row))
  497. {
  498. throw MakeStringException(0, "ERROR: crc check failure cloning row!");
  499. }
  500. //This should only be called if the destructor needs to be called - so don't bother checking
  501. ChildRowLinkerWalker walker;
  502. allocator->queryOutputMeta()->walkIndirectMembers((const byte *)row, walker);
  503. }
  504. virtual void checkValid(unsigned cacheId, const void *row) const
  505. {
  506. if (!RoxieRowCheckValid(cacheId, row))
  507. {
  508. throw MakeStringException(0, "ERROR: crc check failure checking row!");
  509. }
  510. }
  511. };
  512. IRowAllocatorMetaActIdCache *createRowAllocatorCache(IRowAllocatorMetaActIdCacheCallback *callback)
  513. {
  514. return new CAllocatorCache(callback);
  515. }
  516. #ifdef _USE_CPPUNIT
  517. #include "unittests.hpp"
  518. namespace roxierowtests {
  519. using namespace roxiemem;
  520. class RoxieRowAllocatorTests : public CppUnit::TestFixture
  521. {
  522. CPPUNIT_TEST_SUITE( RoxieRowAllocatorTests );
  523. CPPUNIT_TEST(testSetup);
  524. CPPUNIT_TEST(testChecking);
  525. CPPUNIT_TEST(testCleanup);
  526. CPPUNIT_TEST(testAllocatorCache);
  527. CPPUNIT_TEST_SUITE_END();
  528. const IContextLogger &logctx;
  529. public:
  530. RoxieRowAllocatorTests() : logctx(queryDummyContextLogger())
  531. {
  532. }
  533. ~RoxieRowAllocatorTests()
  534. {
  535. }
  536. protected:
  537. class CheckingRowAllocatorCache : public CSimpleInterface, public IRowAllocatorCache
  538. {
  539. public:
  540. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  541. CheckingRowAllocatorCache() { numFailures = 0; }
  542. virtual unsigned getActivityId(unsigned cacheId) const { return 0; }
  543. virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const { return out.append(cacheId); }
  544. virtual void onDestroy(unsigned cacheId, void *row) const
  545. {
  546. if (!RoxieRowCheckValid(cacheId, row))
  547. ++numFailures;
  548. }
  549. virtual void onClone(unsigned cacheId, void *row) const
  550. {
  551. }
  552. virtual void checkValid(unsigned cacheId, const void *row) const
  553. {
  554. if (!RoxieRowCheckValid(cacheId, row))
  555. ++numFailures;
  556. }
  557. mutable unsigned numFailures;
  558. };
  559. class DummyOutputMeta : public IOutputMetaData, public CInterface
  560. {
  561. public:
  562. DummyOutputMeta(size32_t _minSize, size32_t _fixedSize) : minSize(_minSize), fixedSize(_fixedSize) {}
  563. IMPLEMENT_IINTERFACE
  564. virtual size32_t getRecordSize(const void *rec) { return minSize; }
  565. virtual size32_t getFixedSize() const { return fixedSize; }
  566. virtual size32_t getMinRecordSize() const { return minSize; }
  567. virtual void toXML(const byte * self, IXmlWriter & out) {}
  568. virtual unsigned getVersion() const { return 0; }
  569. virtual unsigned getMetaFlags() { return 0; }
  570. virtual IOutputMetaData * querySerializedDiskMeta() { return this; }
  571. virtual void destruct(byte * self) {}
  572. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  573. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  574. virtual ISourceRowPrefetcher * createDiskPrefetcher(ICodeContext * ctx, unsigned activityId) { return NULL; }
  575. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  576. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  577. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
  578. virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
  579. size32_t minSize;
  580. size32_t fixedSize;
  581. };
  582. void testAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags, unsigned low, unsigned high, int modify, bool checking)
  583. {
  584. CheckingRowAllocatorCache cache;
  585. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, &cache);
  586. Owned<IEngineRowAllocator> alloc = checking ? createCrcRoxieRowAllocator(NULL, *rm, meta, 0, 0, flags) : createRoxieRowAllocator(NULL, *rm, meta, 0, 0, flags);
  587. for (unsigned size=low; size <= high; size++)
  588. {
  589. unsigned capacity;
  590. unsigned prevFailures = cache.numFailures;
  591. void * row = alloc->createRow(capacity);
  592. if (low != high)
  593. row = alloc->resizeRow(size, row, capacity);
  594. for (unsigned i1=0; i1 < capacity; i1++)
  595. ((byte *)row)[i1] = i1;
  596. const void * final = alloc->finalizeRow(capacity, row, capacity);
  597. for (unsigned i2=0; i2 < capacity; i2++)
  598. {
  599. ASSERT(((byte *)row)[i2] == i2);
  600. }
  601. if (modify != 0)
  602. {
  603. if (modify < 0)
  604. ((byte *)row)[0]++;
  605. else
  606. ((byte *)row)[size-1]++;
  607. }
  608. ReleaseRoxieRow(row);
  609. if (modify == 0)
  610. {
  611. ASSERT(prevFailures == cache.numFailures);
  612. }
  613. else
  614. {
  615. ASSERT(prevFailures+1 == cache.numFailures);
  616. }
  617. }
  618. }
  619. void testAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags, unsigned low, unsigned high)
  620. {
  621. testAllocator(meta, flags, low, high, 0, false);
  622. testAllocator(meta, flags, low, high, 0, true);
  623. testAllocator(meta, flags, low, high, -1, true);
  624. testAllocator(meta, flags, low, high, +1, true);
  625. }
  626. void testSetup()
  627. {
  628. setTotalMemoryLimit(false, true, false, 40*HEAP_ALIGNMENT_SIZE, 0, NULL, NULL);
  629. }
  630. void testCleanup()
  631. {
  632. releaseRoxieHeap();
  633. }
  634. void testChecking()
  635. {
  636. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, NULL);
  637. for (unsigned fixedSize=1; fixedSize<64; fixedSize++)
  638. {
  639. DummyOutputMeta meta(fixedSize, fixedSize);
  640. testAllocator(&meta, RHFnone, fixedSize, fixedSize);
  641. testAllocator(&meta, RHFpacked, fixedSize, fixedSize);
  642. }
  643. for (unsigned varSize=1; varSize<64; varSize++)
  644. {
  645. DummyOutputMeta meta(varSize, 0);
  646. testAllocator(&meta, RHFnone, varSize, varSize);
  647. testAllocator(&meta, RHFnone, 1, varSize);
  648. }
  649. }
  650. void testAllocatorCache()
  651. {
  652. IArrayOf<IOutputMetaData> metas;
  653. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, NULL);
  654. class CAllocatorCallback : implements IRowAllocatorMetaActIdCacheCallback
  655. {
  656. IRowManager *rm;
  657. public:
  658. CAllocatorCallback(IRowManager *_rm) : rm(_rm)
  659. {
  660. }
  661. virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
  662. {
  663. return createRoxieRowAllocator(cache, *rm, meta, activityId, cacheId, flags);
  664. }
  665. } callback(rm);
  666. Owned<IRowAllocatorMetaActIdCache> allocatorCache = createRowAllocatorCache(&callback);
  667. // create 64 allocators, 32 different activityId's
  668. for (unsigned fixedSize=1; fixedSize<=64; fixedSize++)
  669. {
  670. DummyOutputMeta *meta = new DummyOutputMeta(fixedSize, fixedSize);
  671. metas.append(*meta);
  672. unsigned activityId = 1 + ((fixedSize-1) % 32); // i.e. make an id, so half are duplicates
  673. Owned<IEngineRowAllocator> allocator = allocatorCache->ensure(meta, activityId, roxiemem::RHFnone);
  674. }
  675. // test that 64 in cache
  676. ASSERT(allocatorCache->items() == 64);
  677. // test ensure again
  678. for (unsigned fixedSize=1; fixedSize<=64; fixedSize++)
  679. {
  680. unsigned activityId = 1 + ((fixedSize-1) % 32); // i.e. make an id, so half are duplicates
  681. IOutputMetaData *meta = &metas.item(fixedSize-1); // from 1st round
  682. Owned<IEngineRowAllocator> allocator = allocatorCache->ensure(meta, activityId, roxiemem::RHFnone);
  683. }
  684. ASSERT(allocatorCache->items() == 64);
  685. metas.kill();
  686. allocatorCache.clear();
  687. }
  688. };
  689. CPPUNIT_TEST_SUITE_REGISTRATION( RoxieRowAllocatorTests );
  690. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieRowAllocatorTests, "RoxieRowAllocatorTests" );
  691. } // namespace roxiemem
  692. #endif