roxierow.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jexcept.hpp"
  14. #include "jcrc.hpp"
  15. #include "thorcommon.ipp" // for CachedOutputMetaData
  16. #include "roxierow.hpp"
  17. //Classes can be used to configure the allocator, and add extra data to the end.
  18. //The checking needs to be done by setting a bit in the allocatorid
  19. class NoCheckingHelper
  20. {
  21. public:
  22. enum {
  23. extraSize = 0,
  24. allocatorCheckFlag = 0x00000000
  25. };
  26. static inline void setCheck(size32_t size, void * ptr) {}
  27. static inline bool isValid(const void * ptr) { return true; }
  28. };
  29. //NOTE: If a row requires checking then the row will also have the bit set to indicate it requires a destructor
  30. //so that rows are checked on destruction.
  31. //Therefore checking if the destructor is set for a row in isValid() to protect us from uninitialised crcs.
  32. class Crc16CheckingHelper
  33. {
  34. public:
  35. enum {
  36. extraSize = sizeof(unsigned short),
  37. allocatorCheckFlag = 0x00100000|ACTIVITY_FLAG_NEEDSDESTRUCTOR
  38. };
  39. static inline void setCheck(size32_t size, void * _ptr)
  40. {
  41. byte * ptr = static_cast<byte *>(_ptr);
  42. memsize_t capacity = RoxieRowCapacity(ptr);
  43. if (capacity < size + extraSize)
  44. throw MakeStringException(0, "Data was written past the end of the row - allocated %d, written %d", (size32_t)(capacity - extraSize), size);
  45. memset(ptr+size, 0, capacity - size - extraSize);
  46. unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
  47. *check = crc16(ptr, capacity-extraSize, 0);
  48. }
  49. static inline bool isValid(const void * _ptr)
  50. {
  51. if (RoxieRowHasDestructor(_ptr))
  52. {
  53. const byte * ptr = static_cast<const byte *>(_ptr);
  54. memsize_t capacity = RoxieRowCapacity(ptr);
  55. const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
  56. return *check == crc16(ptr, capacity-extraSize, 0);
  57. }
  58. return true;
  59. }
  60. };
  61. //This is here as demonstration of an alternative implementation... crc16 is possibly a bit expensive.
  62. class Sum16CheckingHelper
  63. {
  64. public:
  65. enum {
  66. extraSize = sizeof(unsigned short),
  67. allocatorCheckFlag = 0x00200000|ACTIVITY_FLAG_NEEDSDESTRUCTOR
  68. };
  69. static inline void setCheck(size32_t size, void * _ptr)
  70. {
  71. byte * ptr = static_cast<byte *>(_ptr);
  72. memsize_t capacity = RoxieRowCapacity(ptr);
  73. if (capacity < size + extraSize)
  74. throw MakeStringException(0, "Data was written past the end of the row - allocated %d, written %d", (size32_t)(capacity - extraSize), size);
  75. memset(ptr+size, 0, capacity - size - extraSize);
  76. unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
  77. *check = chksum16(ptr, capacity-extraSize);
  78. }
  79. static inline bool isValid(const void * _ptr)
  80. {
  81. if (RoxieRowHasDestructor(_ptr))
  82. {
  83. const byte * ptr = static_cast<const byte *>(_ptr);
  84. memsize_t capacity = RoxieRowCapacity(ptr);
  85. const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
  86. return chksum16(ptr, capacity-extraSize) == *check;
  87. }
  88. return true;
  89. }
  90. };
  91. bool isRowCheckValid(unsigned allocatorId, const void * row)
  92. {
  93. switch (allocatorId & ALLOCATORID_CHECK_MASK)
  94. {
  95. case NoCheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  96. return true;
  97. case Crc16CheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  98. return Crc16CheckingHelper::isValid(row);
  99. case Sum16CheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  100. return Sum16CheckingHelper::isValid(row);
  101. default:
  102. UNIMPLEMENTED;
  103. }
  104. }
  105. //--------------------------------------------------------------------------------------------------------------------
  106. //More: Function to calculate the total size of a row - requires access to a rowallocator.
  107. //--------------------------------------------------------------------------------------------------------------------
  108. class RoxieEngineRowAllocatorBase : public CInterface, implements IEngineRowAllocator
  109. {
  110. public:
  111. RoxieEngineRowAllocatorBase(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _createFlags)
  112. : cache(_cache), rowManager(_rowManager), meta(_meta), createFlags(_createFlags)
  113. {
  114. activityId = _activityId;
  115. allocatorId = _allocatorId;
  116. }
  117. IMPLEMENT_IINTERFACE
  118. //interface IEngineRowsetAllocator
  119. virtual byte * * createRowset(unsigned count)
  120. {
  121. if (count == 0)
  122. return NULL;
  123. return (byte **) rowManager.allocate(count * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
  124. }
  125. virtual void releaseRowset(unsigned count, byte * * rowset)
  126. {
  127. rtlReleaseRowset(count, rowset);
  128. }
  129. virtual byte * * linkRowset(byte * * rowset)
  130. {
  131. return rtlLinkRowset(rowset);
  132. }
  133. virtual byte * * appendRowOwn(byte * * rowset, unsigned newRowCount, void * row)
  134. {
  135. byte * * expanded = doReallocRows(rowset, newRowCount-1, newRowCount);
  136. expanded[newRowCount-1] = (byte *)row;
  137. return expanded;
  138. }
  139. virtual byte * * reallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
  140. {
  141. //New rows (if any) aren't cleared....
  142. return doReallocRows(rowset, oldRowCount, newRowCount);
  143. }
  144. virtual void releaseRow(const void * row)
  145. {
  146. ReleaseRoxieRow(row);
  147. }
  148. virtual void * linkRow(const void * row)
  149. {
  150. LinkRoxieRow(row);
  151. return const_cast<void *>(row);
  152. }
  153. virtual IOutputMetaData * queryOutputMeta()
  154. {
  155. return meta.queryOriginal();
  156. }
  157. virtual unsigned queryActivityId()
  158. {
  159. return activityId;
  160. }
  161. virtual StringBuffer &getId(StringBuffer &idStr)
  162. {
  163. return idStr.append(activityId); // MORE - may want more context info in here
  164. }
  165. virtual IOutputRowSerializer *createDiskSerializer(ICodeContext *ctx)
  166. {
  167. return meta.createDiskSerializer(ctx, activityId);
  168. }
  169. virtual IOutputRowDeserializer *createDiskDeserializer(ICodeContext *ctx)
  170. {
  171. return meta.createDiskDeserializer(ctx, activityId);
  172. }
  173. virtual IOutputRowSerializer *createInternalSerializer(ICodeContext *ctx)
  174. {
  175. return meta.createInternalSerializer(ctx, activityId);
  176. }
  177. virtual IOutputRowDeserializer *createInternalDeserializer(ICodeContext *ctx)
  178. {
  179. return meta.createInternalDeserializer(ctx, activityId);
  180. }
  181. virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type)
  182. {
  183. CriticalBlock block(cs); // Not very likely but better be safe
  184. if (children.empty())
  185. {
  186. for (unsigned i =0;;i++)
  187. {
  188. IOutputMetaData * childMeta = meta.queryChildMeta(i);
  189. if (!childMeta)
  190. break;
  191. children.append(*cache->ensure(childMeta, activityId, createFlags));
  192. }
  193. }
  194. ForEachItemIn(i, children)
  195. {
  196. IEngineRowAllocator & cur = children.item(i);
  197. if (cur.queryOutputMeta()->queryTypeInfo() == type)
  198. return LINK(&cur);
  199. }
  200. return NULL;
  201. }
  202. protected:
  203. inline byte * * doReallocRows(byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
  204. {
  205. if (!rowset)
  206. return createRowset(newRowCount);
  207. //Occasionally (in aggregates) we may try and append to a shared rowset. In this case we need to clone the
  208. //target rowset. It could be that the rowset is unshared immediately, but that is inefficient at worst.
  209. if (RoxieRowIsShared(rowset))
  210. {
  211. byte * * newset = createRowset(newRowCount);
  212. for (unsigned i=0; i < oldRowCount; i++)
  213. {
  214. byte * cur = rowset[i];
  215. LinkRoxieRow(cur);
  216. newset[i] = cur;
  217. }
  218. ReleaseRoxieRow(rowset);
  219. return newset;
  220. }
  221. //This would be more efficient if previous capacity was stored by the caller - or if capacity() is more efficient
  222. if (newRowCount * sizeof(void *) <= RoxieRowCapacity(rowset))
  223. return rowset;
  224. memsize_t capacity;
  225. void * ptr = (void *)rowset;
  226. rowManager.resizeRow(capacity, ptr, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
  227. return (byte * *)ptr;
  228. }
  229. protected:
  230. static CriticalSection cs; // Very unlikely to have contention, so share between all allocators
  231. IRowAllocatorMetaActIdCache * cache;
  232. roxiemem::IRowManager & rowManager;
  233. const CachedOutputMetaData meta;
  234. unsigned activityId;
  235. unsigned allocatorId;
  236. roxiemem::RoxieHeapFlags createFlags;
  237. IArrayOf<IEngineRowAllocator> children;
  238. };
  239. CriticalSection RoxieEngineRowAllocatorBase::cs;
  240. template <class CHECKER>
  241. class RoxieEngineFixedRowAllocator : public RoxieEngineRowAllocatorBase
  242. {
  243. public:
  244. RoxieEngineFixedRowAllocator(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
  245. : RoxieEngineRowAllocatorBase(_cache, _rowManager, _meta, _activityId, _allocatorId, _flags)
  246. {
  247. unsigned flags = _flags;
  248. if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
  249. flags |= roxiemem::RHFhasdestructor;
  250. heap.setown(rowManager.createFixedRowHeap(meta.getFixedSize()+CHECKER::extraSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED | CHECKER::allocatorCheckFlag, (roxiemem::RoxieHeapFlags)flags));
  251. }
  252. virtual void * createRow()
  253. {
  254. return heap->allocate();
  255. }
  256. virtual void * createRow(size32_t & allocatedSize)
  257. {
  258. allocatedSize = meta.getFixedSize();
  259. return heap->allocate();
  260. }
  261. virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
  262. {
  263. throwUnexpected();
  264. return NULL;
  265. }
  266. virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
  267. {
  268. if (!meta.needsDestruct() && !CHECKER::allocatorCheckFlag)
  269. return row;
  270. CHECKER::setCheck(finalSize, row);
  271. return heap->finalizeRow(row);
  272. }
  273. protected:
  274. Owned<roxiemem::IFixedRowHeap> heap;
  275. };
  276. template <class CHECKER>
  277. class RoxieEngineVariableRowAllocator : public RoxieEngineRowAllocatorBase
  278. {
  279. public:
  280. RoxieEngineVariableRowAllocator(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
  281. : RoxieEngineRowAllocatorBase(_cache, _rowManager, _meta, _activityId, _allocatorId, _flags)
  282. {
  283. unsigned flags = _flags;
  284. if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
  285. flags |= roxiemem::RHFhasdestructor;
  286. heap.setown(rowManager.createVariableRowHeap(allocatorId | ACTIVITY_FLAG_ISREGISTERED | CHECKER::allocatorCheckFlag, (roxiemem::RoxieHeapFlags)flags));
  287. }
  288. virtual void * createRow()
  289. {
  290. memsize_t allocSize = meta.getInitialSize();
  291. memsize_t capacity;
  292. return heap->allocate(allocSize+CHECKER::extraSize, capacity);
  293. }
  294. virtual void * createRow(size32_t & allocatedSize)
  295. {
  296. const memsize_t allocSize = meta.getInitialSize();
  297. memsize_t newCapacity; // always initialised by allocate
  298. void * row = heap->allocate(allocSize+CHECKER::extraSize, newCapacity);
  299. //This test should get constant folded to avoid the decrement when not checked.
  300. if (CHECKER::extraSize)
  301. newCapacity -= CHECKER::extraSize;
  302. allocatedSize = newCapacity;
  303. return row;
  304. }
  305. virtual void * resizeRow(size32_t newSize, void * row, size32_t & size)
  306. {
  307. const size32_t oldsize = size; // don't need to include the extra checking bytes
  308. memsize_t newCapacity; // always initialised by resizeRow
  309. void * newrow = heap->resizeRow(row, oldsize, newSize+CHECKER::extraSize, newCapacity);
  310. if (CHECKER::extraSize)
  311. newCapacity -= CHECKER::extraSize;
  312. size = newCapacity;
  313. return newrow;
  314. }
  315. virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize)
  316. {
  317. if (!meta.needsDestruct() && !CHECKER::allocatorCheckFlag)
  318. return row;
  319. void * newrow = heap->finalizeRow(row, oldSize, finalSize+CHECKER::extraSize);
  320. CHECKER::setCheck(finalSize, newrow);
  321. return newrow;
  322. }
  323. protected:
  324. Owned<roxiemem::IVariableRowHeap> heap;
  325. };
  326. IEngineRowAllocator * createRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
  327. {
  328. if (meta->getFixedSize() != 0)
  329. return new RoxieEngineFixedRowAllocator<NoCheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  330. else
  331. return new RoxieEngineVariableRowAllocator<NoCheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  332. }
  333. IEngineRowAllocator * createCrcRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
  334. {
  335. if (meta->getFixedSize() != 0)
  336. return new RoxieEngineFixedRowAllocator<Crc16CheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  337. else
  338. return new RoxieEngineVariableRowAllocator<Crc16CheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  339. }
  340. #pragma pack(push,1) // hashing on members, so ensure contiguous
  341. struct AllocatorKey
  342. {
  343. IOutputMetaData *meta;
  344. unsigned activityId;
  345. roxiemem::RoxieHeapFlags flags;
  346. AllocatorKey(IOutputMetaData *_meta, unsigned &_activityId, roxiemem::RoxieHeapFlags _flags)
  347. : meta(_meta), activityId(_activityId), flags(_flags)
  348. {
  349. }
  350. bool operator==(AllocatorKey const &other) const
  351. {
  352. return (meta == other.meta) && (activityId == other.activityId) && (flags == other.flags);
  353. }
  354. };
  355. #pragma pack(pop)
  356. class CAllocatorCacheItem : public OwningHTMapping<IEngineRowAllocator, AllocatorKey>
  357. {
  358. Linked<IOutputMetaData> meta;
  359. unsigned allocatorId;
  360. public:
  361. CAllocatorCacheItem(IEngineRowAllocator *allocator, unsigned _allocatorId, AllocatorKey &key)
  362. : OwningHTMapping<IEngineRowAllocator, AllocatorKey>(*allocator, key), allocatorId(_allocatorId)
  363. {
  364. meta.set(key.meta);
  365. }
  366. unsigned queryAllocatorId() const { return allocatorId; }
  367. };
  368. class CAllocatorCache : public CSimpleInterface, implements IRowAllocatorMetaActIdCache
  369. {
  370. OwningSimpleHashTableOf<CAllocatorCacheItem, AllocatorKey> cache;
  371. IArrayOf<IEngineRowAllocator> allAllocators;
  372. mutable SpinLock allAllocatorsLock;
  373. Owned<roxiemem::IRowManager> rowManager;
  374. IRowAllocatorMetaActIdCacheCallback *callback;
  375. inline CAllocatorCacheItem *_lookup(IOutputMetaData *meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
  376. {
  377. AllocatorKey key(meta, activityId, flags);
  378. return cache.find(key);
  379. }
  380. public:
  381. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  382. CAllocatorCache(IRowAllocatorMetaActIdCacheCallback *_callback) : callback(_callback)
  383. {
  384. }
  385. // IRowAllocatorMetaActIdCache
  386. inline IEngineRowAllocator *lookup(IOutputMetaData *meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
  387. {
  388. SpinBlock b(allAllocatorsLock);
  389. CAllocatorCacheItem *container = _lookup(meta, activityId, flags);
  390. if (!container)
  391. return NULL;
  392. return &container->queryElement();
  393. }
  394. virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags)
  395. {
  396. SpinBlock b(allAllocatorsLock);
  397. loop
  398. {
  399. CAllocatorCacheItem *container = _lookup(meta, activityId, flags);
  400. if (container)
  401. {
  402. if (0 == (roxiemem::RHFunique & flags))
  403. return LINK(&container->queryElement());
  404. // if in cache but unique, reuse allocatorId
  405. SpinUnblock b(allAllocatorsLock);
  406. return callback->createAllocator(this, meta, activityId, container->queryAllocatorId(), flags);
  407. }
  408. // NB: a RHFunique allocator, will cause 1st to be added to 'allAllocators'
  409. // subsequent requests for the same type of unique allocator, will share same allocatorId
  410. // resulting in the 1st allocator being reused by all instances for onDestroy() etc.
  411. assertex(allAllocators.ordinality() < ALLOCATORID_MASK);
  412. unsigned allocatorId = allAllocators.ordinality();
  413. IEngineRowAllocator *ret;
  414. {
  415. SpinUnblock b(allAllocatorsLock);
  416. ret = callback->createAllocator(this, meta, activityId, allocatorId, flags);
  417. assertex(ret);
  418. }
  419. if (allocatorId == allAllocators.ordinality())
  420. {
  421. AllocatorKey key(meta, activityId, flags);
  422. container = new CAllocatorCacheItem(LINK(ret), allocatorId, key);
  423. cache.replace(*container);
  424. allAllocators.append(*LINK(ret));
  425. return ret;
  426. }
  427. else
  428. {
  429. // someone has used the allocatorId I was going to use.. release and try again (hopefully happens very seldom)
  430. ret->Release();
  431. }
  432. }
  433. }
  434. virtual unsigned items() const
  435. {
  436. return allAllocators.ordinality();
  437. }
  438. // roxiemem::IRowAllocatorCache
  439. virtual unsigned getActivityId(unsigned cacheId) const
  440. {
  441. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  442. SpinBlock b(allAllocatorsLock);
  443. if (allAllocators.isItem(allocatorIndex))
  444. return allAllocators.item(allocatorIndex).queryActivityId();
  445. else
  446. {
  447. //assert(false);
  448. return UNKNOWN_ACTIVITY; // Used for tracing, better than a crash...
  449. }
  450. }
  451. virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const
  452. {
  453. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  454. SpinBlock b(allAllocatorsLock);
  455. if (allAllocators.isItem(allocatorIndex))
  456. return allAllocators.item(allocatorIndex).getId(out);
  457. else
  458. {
  459. assert(false);
  460. return out.append("unknown"); // Used for tracing, better than a crash...
  461. }
  462. }
  463. virtual void onDestroy(unsigned cacheId, void *row) const
  464. {
  465. IEngineRowAllocator *allocator;
  466. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  467. {
  468. SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
  469. if (allAllocators.isItem(allocatorIndex))
  470. allocator = &allAllocators.item(allocatorIndex);
  471. else
  472. {
  473. assert(false);
  474. return;
  475. }
  476. }
  477. if (!RoxieRowCheckValid(cacheId, row))
  478. {
  479. throw MakeStringException(0, "ERROR: crc check failure destroying row!");
  480. }
  481. allocator->queryOutputMeta()->destruct((byte *) row);
  482. }
  483. virtual void onClone(unsigned cacheId, void *row) const
  484. {
  485. IEngineRowAllocator *allocator;
  486. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  487. {
  488. SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
  489. if (allAllocators.isItem(allocatorIndex))
  490. allocator = &allAllocators.item(allocatorIndex);
  491. else
  492. {
  493. assert(false);
  494. return;
  495. }
  496. }
  497. if (!RoxieRowCheckValid(cacheId, row))
  498. {
  499. throw MakeStringException(0, "ERROR: crc check failure cloning row!");
  500. }
  501. //This should only be called if the destructor needs to be called - so don't bother checking
  502. ChildRowLinkerWalker walker;
  503. allocator->queryOutputMeta()->walkIndirectMembers((const byte *)row, walker);
  504. }
  505. virtual void checkValid(unsigned cacheId, const void *row) const
  506. {
  507. if (!RoxieRowCheckValid(cacheId, row))
  508. {
  509. throw MakeStringException(0, "ERROR: crc check failure checking row!");
  510. }
  511. }
  512. };
  513. IRowAllocatorMetaActIdCache *createRowAllocatorCache(IRowAllocatorMetaActIdCacheCallback *callback)
  514. {
  515. return new CAllocatorCache(callback);
  516. }
  517. #ifdef _USE_CPPUNIT
  518. #include "unittests.hpp"
  519. namespace roxierowtests {
  520. using namespace roxiemem;
  521. class RoxieRowAllocatorTests : public CppUnit::TestFixture
  522. {
  523. CPPUNIT_TEST_SUITE( RoxieRowAllocatorTests );
  524. CPPUNIT_TEST(testSetup);
  525. CPPUNIT_TEST(testChecking);
  526. CPPUNIT_TEST(testCleanup);
  527. CPPUNIT_TEST(testAllocatorCache);
  528. CPPUNIT_TEST_SUITE_END();
  529. const IContextLogger &logctx;
  530. public:
  531. RoxieRowAllocatorTests() : logctx(queryDummyContextLogger())
  532. {
  533. }
  534. ~RoxieRowAllocatorTests()
  535. {
  536. }
  537. protected:
  538. class CheckingRowAllocatorCache : public CSimpleInterface, public IRowAllocatorCache
  539. {
  540. public:
  541. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  542. CheckingRowAllocatorCache() { numFailures = 0; }
  543. virtual unsigned getActivityId(unsigned cacheId) const { return 0; }
  544. virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const { return out.append(cacheId); }
  545. virtual void onDestroy(unsigned cacheId, void *row) const
  546. {
  547. if (!RoxieRowCheckValid(cacheId, row))
  548. ++numFailures;
  549. }
  550. virtual void onClone(unsigned cacheId, void *row) const
  551. {
  552. }
  553. virtual void checkValid(unsigned cacheId, const void *row) const
  554. {
  555. if (!RoxieRowCheckValid(cacheId, row))
  556. ++numFailures;
  557. }
  558. mutable unsigned numFailures;
  559. };
  560. class DummyOutputMeta : public IOutputMetaData, public CInterface
  561. {
  562. public:
  563. DummyOutputMeta(size32_t _minSize, size32_t _fixedSize) : minSize(_minSize), fixedSize(_fixedSize) {}
  564. IMPLEMENT_IINTERFACE
  565. virtual size32_t getRecordSize(const void *rec) { return minSize; }
  566. virtual size32_t getFixedSize() const { return fixedSize; }
  567. virtual size32_t getMinRecordSize() const { return minSize; }
  568. virtual void toXML(const byte * self, IXmlWriter & out) {}
  569. virtual unsigned getVersion() const { return 0; }
  570. virtual unsigned getMetaFlags() { return 0; }
  571. virtual IOutputMetaData * querySerializedDiskMeta() { return this; }
  572. virtual void destruct(byte * self) {}
  573. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  574. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  575. virtual ISourceRowPrefetcher * createDiskPrefetcher(ICodeContext * ctx, unsigned activityId) { return NULL; }
  576. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  577. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  578. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
  579. virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
  580. size32_t minSize;
  581. size32_t fixedSize;
  582. };
  583. void testAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags, unsigned low, unsigned high, int modify, bool checking)
  584. {
  585. CheckingRowAllocatorCache cache;
  586. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, &cache);
  587. Owned<IEngineRowAllocator> alloc = checking ? createCrcRoxieRowAllocator(NULL, *rm, meta, 0, 0, flags) : createRoxieRowAllocator(NULL, *rm, meta, 0, 0, flags);
  588. for (unsigned size=low; size <= high; size++)
  589. {
  590. unsigned capacity;
  591. unsigned prevFailures = cache.numFailures;
  592. void * row = alloc->createRow(capacity);
  593. if (low != high)
  594. row = alloc->resizeRow(size, row, capacity);
  595. for (unsigned i1=0; i1 < capacity; i1++)
  596. ((byte *)row)[i1] = i1;
  597. const void * final = alloc->finalizeRow(capacity, row, capacity);
  598. for (unsigned i2=0; i2 < capacity; i2++)
  599. {
  600. ASSERT(((byte *)row)[i2] == i2);
  601. }
  602. if (modify != 0)
  603. {
  604. if (modify < 0)
  605. ((byte *)row)[0]++;
  606. else
  607. ((byte *)row)[size-1]++;
  608. }
  609. ReleaseRoxieRow(row);
  610. if (modify == 0)
  611. {
  612. ASSERT(prevFailures == cache.numFailures);
  613. }
  614. else
  615. {
  616. ASSERT(prevFailures+1 == cache.numFailures);
  617. }
  618. }
  619. }
  620. void testAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags, unsigned low, unsigned high)
  621. {
  622. testAllocator(meta, flags, low, high, 0, false);
  623. testAllocator(meta, flags, low, high, 0, true);
  624. testAllocator(meta, flags, low, high, -1, true);
  625. testAllocator(meta, flags, low, high, +1, true);
  626. }
  627. void testSetup()
  628. {
  629. setTotalMemoryLimit(false, true, false, 40*HEAP_ALIGNMENT_SIZE, 0, NULL, NULL);
  630. }
  631. void testCleanup()
  632. {
  633. releaseRoxieHeap();
  634. }
  635. void testChecking()
  636. {
  637. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, NULL);
  638. for (unsigned fixedSize=1; fixedSize<64; fixedSize++)
  639. {
  640. DummyOutputMeta meta(fixedSize, fixedSize);
  641. testAllocator(&meta, RHFnone, fixedSize, fixedSize);
  642. testAllocator(&meta, RHFpacked, fixedSize, fixedSize);
  643. }
  644. for (unsigned varSize=1; varSize<64; varSize++)
  645. {
  646. DummyOutputMeta meta(varSize, 0);
  647. testAllocator(&meta, RHFnone, varSize, varSize);
  648. testAllocator(&meta, RHFnone, 1, varSize);
  649. }
  650. }
  651. void testAllocatorCache()
  652. {
  653. IArrayOf<IOutputMetaData> metas;
  654. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, NULL);
  655. class CAllocatorCallback : implements IRowAllocatorMetaActIdCacheCallback
  656. {
  657. IRowManager *rm;
  658. public:
  659. CAllocatorCallback(IRowManager *_rm) : rm(_rm)
  660. {
  661. }
  662. virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
  663. {
  664. return createRoxieRowAllocator(cache, *rm, meta, activityId, cacheId, flags);
  665. }
  666. } callback(rm);
  667. Owned<IRowAllocatorMetaActIdCache> allocatorCache = createRowAllocatorCache(&callback);
  668. // create 64 allocators, 32 different activityId's
  669. for (unsigned fixedSize=1; fixedSize<=64; fixedSize++)
  670. {
  671. DummyOutputMeta *meta = new DummyOutputMeta(fixedSize, fixedSize);
  672. metas.append(*meta);
  673. unsigned activityId = 1 + ((fixedSize-1) % 32); // i.e. make an id, so half are duplicates
  674. Owned<IEngineRowAllocator> allocator = allocatorCache->ensure(meta, activityId, roxiemem::RHFnone);
  675. }
  676. // test that 64 in cache
  677. ASSERT(allocatorCache->items() == 64);
  678. // test ensure again
  679. for (unsigned fixedSize=1; fixedSize<=64; fixedSize++)
  680. {
  681. unsigned activityId = 1 + ((fixedSize-1) % 32); // i.e. make an id, so half are duplicates
  682. IOutputMetaData *meta = &metas.item(fixedSize-1); // from 1st round
  683. Owned<IEngineRowAllocator> allocator = allocatorCache->ensure(meta, activityId, roxiemem::RHFnone);
  684. }
  685. ASSERT(allocatorCache->items() == 64);
  686. metas.kill();
  687. allocatorCache.clear();
  688. }
  689. };
  690. CPPUNIT_TEST_SUITE_REGISTRATION( RoxieRowAllocatorTests );
  691. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieRowAllocatorTests, "RoxieRowAllocatorTests" );
  692. } // namespace roxiemem
  693. #endif