roxierow.cpp 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jexcept.hpp"
  14. #include "jcrc.hpp"
  15. #include "thorcommon.ipp" // for CachedOutputMetaData
  16. #include "roxierow.hpp"
  17. //Classes can be used to configure the allocator, and add extra data to the end.
  18. //The checking needs to be done by setting a bit in the allocatorid
  19. class NoCheckingHelper
  20. {
  21. public:
  22. enum {
  23. extraSize = 0,
  24. allocatorCheckFlag = 0x00000000
  25. };
  26. static inline void setCheck(size32_t size, void * ptr) {}
  27. static inline bool isValid(const void * ptr) { return true; }
  28. };
  29. //NOTE: If a row requires checking then the row will also have the bit set to indicate it requires a destructor
  30. //so that rows are checked on destruction.
  31. //Therefore checking if the destructor is set for a row in isValid() to protect us from uninitialised crcs.
  32. class Crc16CheckingHelper
  33. {
  34. public:
  35. enum {
  36. extraSize = sizeof(unsigned short),
  37. allocatorCheckFlag = 0x00100000|ACTIVITY_FLAG_NEEDSDESTRUCTOR
  38. };
  39. static inline void setCheck(size32_t size, void * _ptr)
  40. {
  41. byte * ptr = static_cast<byte *>(_ptr);
  42. memsize_t capacity = RoxieRowCapacity(ptr);
  43. if (capacity < size + extraSize)
  44. throw MakeStringException(0, "Data was written past the end of the row - allocated %d, written %d", (size32_t)(capacity - extraSize), size);
  45. memset(ptr+size, 0, capacity - size - extraSize);
  46. unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
  47. *check = crc16(ptr, capacity-extraSize, 0);
  48. }
  49. static inline bool isValid(const void * _ptr)
  50. {
  51. if (RoxieRowHasDestructor(_ptr))
  52. {
  53. const byte * ptr = static_cast<const byte *>(_ptr);
  54. memsize_t capacity = RoxieRowCapacity(ptr);
  55. const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
  56. return *check == crc16(ptr, capacity-extraSize, 0);
  57. }
  58. return true;
  59. }
  60. };
  61. //This is here as demonstration of an alternative implementation... crc16 is possibly a bit expensive.
  62. class Sum16CheckingHelper
  63. {
  64. public:
  65. enum {
  66. extraSize = sizeof(unsigned short),
  67. allocatorCheckFlag = 0x00200000|ACTIVITY_FLAG_NEEDSDESTRUCTOR
  68. };
  69. static inline void setCheck(size32_t size, void * _ptr)
  70. {
  71. byte * ptr = static_cast<byte *>(_ptr);
  72. memsize_t capacity = RoxieRowCapacity(ptr);
  73. if (capacity < size + extraSize)
  74. throw MakeStringException(0, "Data was written past the end of the row - allocated %d, written %d", (size32_t)(capacity - extraSize), size);
  75. memset(ptr+size, 0, capacity - size - extraSize);
  76. unsigned short * check = reinterpret_cast<unsigned short *>(ptr + capacity - extraSize);
  77. *check = chksum16(ptr, capacity-extraSize);
  78. }
  79. static inline bool isValid(const void * _ptr)
  80. {
  81. if (RoxieRowHasDestructor(_ptr))
  82. {
  83. const byte * ptr = static_cast<const byte *>(_ptr);
  84. memsize_t capacity = RoxieRowCapacity(ptr);
  85. const unsigned short * check = reinterpret_cast<const unsigned short *>(ptr + capacity - extraSize);
  86. return chksum16(ptr, capacity-extraSize) == *check;
  87. }
  88. return true;
  89. }
  90. };
  91. bool isRowCheckValid(unsigned allocatorId, const void * row)
  92. {
  93. switch (allocatorId & ALLOCATORID_CHECK_MASK)
  94. {
  95. case NoCheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  96. return true;
  97. case Crc16CheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  98. return Crc16CheckingHelper::isValid(row);
  99. case Sum16CheckingHelper::allocatorCheckFlag & ALLOCATORID_CHECK_MASK:
  100. return Sum16CheckingHelper::isValid(row);
  101. default:
  102. UNIMPLEMENTED;
  103. }
  104. }
  105. //--------------------------------------------------------------------------------------------------------------------
  106. //More: Function to calculate the total size of a row - requires access to a rowallocator.
  107. //--------------------------------------------------------------------------------------------------------------------
  108. class RoxieEngineRowAllocatorBase : implements IEngineRowAllocator, public CInterface
  109. {
  110. public:
  111. RoxieEngineRowAllocatorBase(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _createFlags)
  112. : createFlags(_createFlags), cache(_cache), rowManager(_rowManager), meta(_meta)
  113. {
  114. activityId = _activityId;
  115. allocatorId = _allocatorId;
  116. }
  117. IMPLEMENT_IINTERFACE
  118. //interface IEngineRowsetAllocator
  119. virtual const byte * * createRowset(unsigned count) override
  120. {
  121. if (count == 0)
  122. return NULL;
  123. return (const byte **) rowManager.allocate(count * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
  124. }
  125. virtual void releaseRowset(unsigned count, const byte * * rowset) override
  126. {
  127. rtlReleaseRowset(count, rowset);
  128. }
  129. virtual const byte * * linkRowset(const byte * * rowset) override
  130. {
  131. return rtlLinkRowset(rowset);
  132. }
  133. virtual const byte * * appendRowOwn(const byte * * rowset, unsigned newRowCount, void * row) override
  134. {
  135. const byte * * expanded = doReallocRows(rowset, newRowCount-1, newRowCount);
  136. expanded[newRowCount-1] = (byte *)row;
  137. return expanded;
  138. }
  139. virtual const byte * * reallocRows(const byte * * rowset, unsigned oldRowCount, unsigned newRowCount) override
  140. {
  141. //New rows (if any) aren't cleared....
  142. return doReallocRows(rowset, oldRowCount, newRowCount);
  143. }
  144. virtual void releaseRow(const void * row) override
  145. {
  146. ReleaseRoxieRow(row);
  147. }
  148. virtual void * linkRow(const void * row) override
  149. {
  150. LinkRoxieRow(row);
  151. return const_cast<void *>(row);
  152. }
  153. virtual IOutputMetaData * queryOutputMeta() override
  154. {
  155. return meta.queryOriginal();
  156. }
  157. virtual unsigned queryActivityId() const override
  158. {
  159. return activityId;
  160. }
  161. virtual StringBuffer &getId(StringBuffer &idStr) override
  162. {
  163. return idStr.append(activityId); // MORE - may want more context info in here
  164. }
  165. virtual IOutputRowSerializer *createDiskSerializer(ICodeContext *ctx) override
  166. {
  167. return meta.createDiskSerializer(ctx, activityId);
  168. }
  169. virtual IOutputRowDeserializer *createDiskDeserializer(ICodeContext *ctx) override
  170. {
  171. return meta.createDiskDeserializer(ctx, activityId);
  172. }
  173. virtual IOutputRowSerializer *createInternalSerializer(ICodeContext *ctx) override
  174. {
  175. return meta.createInternalSerializer(ctx, activityId);
  176. }
  177. virtual IOutputRowDeserializer *createInternalDeserializer(ICodeContext *ctx) override
  178. {
  179. return meta.createInternalDeserializer(ctx, activityId);
  180. }
  181. virtual IEngineRowAllocator *createChildRowAllocator(const RtlTypeInfo *type) override
  182. {
  183. CriticalBlock block(cs); // Not very likely but better be safe
  184. if (children.empty())
  185. {
  186. for (unsigned i =0;;i++)
  187. {
  188. IOutputMetaData * childMeta = meta.queryChildMeta(i);
  189. if (!childMeta)
  190. break;
  191. children.append(*cache->ensure(childMeta, activityId, createFlags));
  192. }
  193. }
  194. ForEachItemIn(i, children)
  195. {
  196. IEngineRowAllocator & cur = children.item(i);
  197. if (cur.queryOutputMeta()->queryTypeInfo() == type)
  198. return LINK(&cur);
  199. }
  200. return NULL;
  201. }
  202. protected:
  203. inline const byte * * doReallocRows(const byte * * rowset, unsigned oldRowCount, unsigned newRowCount)
  204. {
  205. if (!rowset)
  206. return createRowset(newRowCount);
  207. //Occasionally (in aggregates) we may try and append to a shared rowset. In this case we need to clone the
  208. //target rowset. It could be that the rowset is unshared immediately, but that is inefficient at worst.
  209. if (RoxieRowIsShared(rowset))
  210. {
  211. const byte * * newset = createRowset(newRowCount);
  212. for (unsigned i=0; i < oldRowCount; i++)
  213. {
  214. const byte * cur = rowset[i];
  215. LinkRoxieRow(cur);
  216. newset[i] = cur;
  217. }
  218. ReleaseRoxieRow(rowset);
  219. return newset;
  220. }
  221. //This would be more efficient if previous capacity was stored by the caller - or if capacity() is more efficient
  222. if (newRowCount * sizeof(void *) <= RoxieRowCapacity(rowset))
  223. return rowset;
  224. memsize_t capacity;
  225. void * ptr = (void *)rowset;
  226. rowManager.resizeRow(capacity, ptr, oldRowCount * sizeof(void *), newRowCount * sizeof(void *), allocatorId | ACTIVITY_FLAG_ISREGISTERED);
  227. return (const byte * *)ptr;
  228. }
  229. protected:
  230. static CriticalSection cs; // Very unlikely to have contention, so share between all allocators
  231. roxiemem::RoxieHeapFlags createFlags;
  232. IRowAllocatorMetaActIdCache * cache;
  233. roxiemem::IRowManager & rowManager;
  234. const CachedOutputMetaData meta;
  235. unsigned activityId;
  236. unsigned allocatorId;
  237. IArrayOf<IEngineRowAllocator> children;
  238. };
  239. CriticalSection RoxieEngineRowAllocatorBase::cs;
  240. template <class CHECKER>
  241. class RoxieEngineFixedRowAllocator : public RoxieEngineRowAllocatorBase
  242. {
  243. public:
  244. RoxieEngineFixedRowAllocator(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
  245. : RoxieEngineRowAllocatorBase(_cache, _rowManager, _meta, _activityId, _allocatorId, _flags)
  246. {
  247. unsigned flags = _flags;
  248. if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
  249. flags |= roxiemem::RHFhasdestructor;
  250. heap.setown(rowManager.createFixedRowHeap(meta.getFixedSize()+CHECKER::extraSize, allocatorId | ACTIVITY_FLAG_ISREGISTERED | CHECKER::allocatorCheckFlag, (roxiemem::RoxieHeapFlags)flags));
  251. }
  252. virtual void * createRow() override
  253. {
  254. return heap->allocate();
  255. }
  256. virtual void * createRow(size32_t & allocatedSize) override
  257. {
  258. allocatedSize = meta.getFixedSize();
  259. return heap->allocate();
  260. }
  261. virtual void * createRow(size32_t initialSize, size32_t & allocatedSize) override
  262. {
  263. size32_t fixedSize = meta.getFixedSize();
  264. assertex(initialSize == fixedSize);
  265. allocatedSize = fixedSize;
  266. return heap->allocate();
  267. }
  268. virtual void * resizeRow(size32_t newSize, void * row, size32_t & size) override
  269. {
  270. throwUnexpected();
  271. return NULL;
  272. }
  273. virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize) override
  274. {
  275. if (!meta.needsDestruct() && !CHECKER::allocatorCheckFlag)
  276. return row;
  277. CHECKER::setCheck(finalSize, row);
  278. return heap->finalizeRow(row);
  279. }
  280. virtual void gatherStats(CRuntimeStatisticCollection & stats) override
  281. {
  282. heap->gatherStats(stats);
  283. }
  284. virtual void releaseAllRows() override
  285. {
  286. heap->releaseAllRows();
  287. }
  288. protected:
  289. Owned<roxiemem::IFixedRowHeap> heap;
  290. };
  291. template <class CHECKER>
  292. class RoxieEngineVariableRowAllocator : public RoxieEngineRowAllocatorBase
  293. {
  294. public:
  295. RoxieEngineVariableRowAllocator(IRowAllocatorMetaActIdCache * _cache, roxiemem::IRowManager & _rowManager, IOutputMetaData * _meta, unsigned _activityId, unsigned _allocatorId, roxiemem::RoxieHeapFlags _flags)
  296. : RoxieEngineRowAllocatorBase(_cache, _rowManager, _meta, _activityId, _allocatorId, _flags)
  297. {
  298. unsigned flags = _flags;
  299. if (meta.needsDestruct() || CHECKER::allocatorCheckFlag)
  300. flags |= roxiemem::RHFhasdestructor;
  301. heap.setown(rowManager.createVariableRowHeap(allocatorId | ACTIVITY_FLAG_ISREGISTERED | CHECKER::allocatorCheckFlag, (roxiemem::RoxieHeapFlags)flags));
  302. }
  303. virtual void * createRow() override
  304. {
  305. memsize_t allocSize = meta.getInitialSize();
  306. memsize_t capacity;
  307. return heap->allocate(allocSize+CHECKER::extraSize, capacity);
  308. }
  309. virtual void * createRow(size32_t & allocatedSize) override
  310. {
  311. return doCreateRow(meta.getInitialSize(), allocatedSize);
  312. }
  313. virtual void * createRow(size32_t initialSize, size32_t & allocatedSize) override
  314. {
  315. return doCreateRow(initialSize, allocatedSize);
  316. }
  317. virtual void * resizeRow(size32_t newSize, void * row, size32_t & size) override
  318. {
  319. const size32_t oldsize = size; // don't need to include the extra checking bytes
  320. memsize_t newCapacity; // always initialised by resizeRow
  321. void * newrow = heap->resizeRow(row, oldsize, newSize+CHECKER::extraSize, newCapacity);
  322. if (CHECKER::extraSize)
  323. newCapacity -= CHECKER::extraSize;
  324. size = (size32_t)newCapacity;
  325. return newrow;
  326. }
  327. virtual void * finalizeRow(size32_t finalSize, void * row, size32_t oldSize) override
  328. {
  329. if (!meta.needsDestruct() && !CHECKER::allocatorCheckFlag)
  330. return row;
  331. void * newrow = heap->finalizeRow(row, oldSize, finalSize+CHECKER::extraSize);
  332. CHECKER::setCheck(finalSize, newrow);
  333. return newrow;
  334. }
  335. virtual void gatherStats(CRuntimeStatisticCollection & stats) override
  336. {
  337. heap->gatherStats(stats);
  338. }
  339. virtual void releaseAllRows() override
  340. {
  341. //It is not legal to call releaseAllRows on a variable size allocator - they are not allocated in a single heap
  342. throwUnexpected();
  343. }
  344. protected:
  345. void * doCreateRow(size32_t initialSize, size32_t & allocatedSize)
  346. {
  347. memsize_t newCapacity; // always initialised by allocate
  348. void * row = heap->allocate(initialSize + CHECKER::extraSize, newCapacity);
  349. //This test should get constant folded to avoid the decrement when not checked.
  350. if (CHECKER::extraSize)
  351. newCapacity -= CHECKER::extraSize;
  352. allocatedSize = (size32_t)newCapacity;
  353. return row;
  354. }
  355. protected:
  356. Owned<roxiemem::IVariableRowHeap> heap;
  357. };
  358. IEngineRowAllocator * createRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
  359. {
  360. if (meta->getFixedSize() != 0)
  361. return new RoxieEngineFixedRowAllocator<NoCheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  362. else
  363. return new RoxieEngineVariableRowAllocator<NoCheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  364. }
  365. IEngineRowAllocator * createCrcRoxieRowAllocator(IRowAllocatorMetaActIdCache * cache, roxiemem::IRowManager & rowManager, IOutputMetaData * meta, unsigned activityId, unsigned allocatorId, roxiemem::RoxieHeapFlags flags)
  366. {
  367. if (meta->getFixedSize() != 0)
  368. return new RoxieEngineFixedRowAllocator<Crc16CheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  369. else
  370. return new RoxieEngineVariableRowAllocator<Crc16CheckingHelper>(cache, rowManager, meta, activityId, allocatorId, flags);
  371. }
  372. #pragma pack(push,1) // hashing on members, so ensure contiguous
  373. struct AllocatorKey
  374. {
  375. IOutputMetaData *meta;
  376. unsigned activityId;
  377. roxiemem::RoxieHeapFlags flags;
  378. AllocatorKey(IOutputMetaData *_meta, unsigned _activityId, roxiemem::RoxieHeapFlags _flags)
  379. : meta(_meta), activityId(_activityId), flags(_flags)
  380. {
  381. }
  382. bool operator==(AllocatorKey const &other) const
  383. {
  384. return (meta == other.meta) && (activityId == other.activityId) && (flags == other.flags);
  385. }
  386. };
  387. #pragma pack(pop)
  388. class CAllocatorCacheItem : public OwningHTMapping<IEngineRowAllocator, AllocatorKey>
  389. {
  390. Linked<IOutputMetaData> meta;
  391. unsigned allocatorId;
  392. public:
  393. CAllocatorCacheItem(IEngineRowAllocator *allocator, unsigned _allocatorId, AllocatorKey &key)
  394. : OwningHTMapping<IEngineRowAllocator, AllocatorKey>(*allocator, key), allocatorId(_allocatorId)
  395. {
  396. meta.set(key.meta);
  397. }
  398. unsigned queryAllocatorId() const { return allocatorId; }
  399. };
  400. class CAllocatorCache : public CSimpleInterfaceOf<IRowAllocatorMetaActIdCache>
  401. {
  402. OwningSimpleHashTableOf<CAllocatorCacheItem, AllocatorKey> cache;
  403. IArrayOf<IEngineRowAllocator> allAllocators;
  404. mutable SpinLock allAllocatorsLock;
  405. Owned<roxiemem::IRowManager> rowManager;
  406. IRowAllocatorMetaActIdCacheCallback *callback;
  407. inline CAllocatorCacheItem *_lookup(IOutputMetaData *meta, unsigned activityId, roxiemem::RoxieHeapFlags flags) const
  408. {
  409. AllocatorKey key(meta, activityId, flags);
  410. return cache.find(key);
  411. }
  412. public:
  413. CAllocatorCache(IRowAllocatorMetaActIdCacheCallback *_callback) : callback(_callback)
  414. {
  415. }
  416. // IRowAllocatorMetaActIdCache
  417. virtual IEngineRowAllocator *ensure(IOutputMetaData * meta, unsigned activityId, roxiemem::RoxieHeapFlags flags)
  418. {
  419. SpinBlock b(allAllocatorsLock);
  420. for (;;)
  421. {
  422. CAllocatorCacheItem *container = _lookup(meta, activityId, flags);
  423. if (container)
  424. {
  425. if (0 == (roxiemem::RHFunique & flags))
  426. return LINK(&container->queryElement());
  427. // if in cache but unique, reuse allocatorId
  428. SpinUnblock b(allAllocatorsLock);
  429. return callback->createAllocator(this, meta, activityId, container->queryAllocatorId(), flags);
  430. }
  431. // NB: a RHFunique allocator, will cause 1st to be added to 'allAllocators'
  432. // subsequent requests for the same type of unique allocator, will share same allocatorId
  433. // resulting in the 1st allocator being reused by all instances for onDestroy() etc.
  434. assertex(allAllocators.ordinality() < ALLOCATORID_MASK);
  435. unsigned allocatorId = allAllocators.ordinality();
  436. IEngineRowAllocator *ret;
  437. {
  438. SpinUnblock b(allAllocatorsLock);
  439. ret = callback->createAllocator(this, meta, activityId, allocatorId, flags);
  440. assertex(ret);
  441. }
  442. if (allocatorId == allAllocators.ordinality())
  443. {
  444. AllocatorKey key(meta, activityId, flags);
  445. container = new CAllocatorCacheItem(LINK(ret), allocatorId, key);
  446. cache.replace(*container);
  447. allAllocators.append(*LINK(ret));
  448. return ret;
  449. }
  450. else
  451. {
  452. // someone has used the allocatorId I was going to use.. release and try again (hopefully happens very seldom)
  453. ret->Release();
  454. }
  455. }
  456. }
  457. virtual unsigned items() const
  458. {
  459. return allAllocators.ordinality();
  460. }
  461. // roxiemem::IRowAllocatorCache
  462. virtual unsigned getActivityId(unsigned cacheId) const
  463. {
  464. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  465. SpinBlock b(allAllocatorsLock);
  466. if (allAllocators.isItem(allocatorIndex))
  467. return allAllocators.item(allocatorIndex).queryActivityId();
  468. else
  469. {
  470. //assert(false);
  471. return UNKNOWN_ACTIVITY; // Used for tracing, better than a crash...
  472. }
  473. }
  474. virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const
  475. {
  476. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  477. SpinBlock b(allAllocatorsLock);
  478. if (allAllocators.isItem(allocatorIndex))
  479. return allAllocators.item(allocatorIndex).getId(out);
  480. else
  481. {
  482. assert(false);
  483. return out.append("unknown"); // Used for tracing, better than a crash...
  484. }
  485. }
  486. virtual void onDestroy(unsigned cacheId, void *row) const
  487. {
  488. IEngineRowAllocator *allocator;
  489. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  490. {
  491. SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
  492. if (allAllocators.isItem(allocatorIndex))
  493. allocator = &allAllocators.item(allocatorIndex);
  494. else
  495. {
  496. assert(false);
  497. return;
  498. }
  499. }
  500. if (!RoxieRowCheckValid(cacheId, row))
  501. {
  502. throw MakeStringException(0, "ERROR: crc check failure destroying row!");
  503. }
  504. allocator->queryOutputMeta()->destruct((byte *) row);
  505. }
  506. virtual void onClone(unsigned cacheId, void *row) const
  507. {
  508. IEngineRowAllocator *allocator;
  509. unsigned allocatorIndex = (cacheId & ALLOCATORID_MASK);
  510. {
  511. SpinBlock b(allAllocatorsLock); // just protect the access to the array - don't keep locked for the call of destruct or may deadlock
  512. if (allAllocators.isItem(allocatorIndex))
  513. allocator = &allAllocators.item(allocatorIndex);
  514. else
  515. {
  516. assert(false);
  517. return;
  518. }
  519. }
  520. if (!RoxieRowCheckValid(cacheId, row))
  521. {
  522. throw MakeStringException(0, "ERROR: crc check failure cloning row!");
  523. }
  524. //This should only be called if the destructor needs to be called - so don't bother checking
  525. ChildRowLinkerWalker walker;
  526. allocator->queryOutputMeta()->walkIndirectMembers((const byte *)row, walker);
  527. }
  528. virtual void checkValid(unsigned cacheId, const void *row) const
  529. {
  530. if (!RoxieRowCheckValid(cacheId, row))
  531. {
  532. throw MakeStringException(0, "ERROR: crc check failure checking row!");
  533. }
  534. }
  535. };
  536. IRowAllocatorMetaActIdCache *createRowAllocatorCache(IRowAllocatorMetaActIdCacheCallback *callback)
  537. {
  538. return new CAllocatorCache(callback);
  539. }
  540. #ifdef _USE_CPPUNIT
  541. #include "unittests.hpp"
  542. namespace roxierowtests {
  543. using namespace roxiemem;
  544. class RoxieRowAllocatorTests : public CppUnit::TestFixture
  545. {
  546. CPPUNIT_TEST_SUITE( RoxieRowAllocatorTests );
  547. CPPUNIT_TEST(testSetup);
  548. CPPUNIT_TEST(testChecking);
  549. CPPUNIT_TEST(testCleanup);
  550. CPPUNIT_TEST(testAllocatorCache);
  551. CPPUNIT_TEST_SUITE_END();
  552. const IContextLogger &logctx;
  553. public:
  554. RoxieRowAllocatorTests() : logctx(queryDummyContextLogger())
  555. {
  556. }
  557. ~RoxieRowAllocatorTests()
  558. {
  559. }
  560. protected:
  561. class CheckingRowAllocatorCache : public CSimpleInterface, public IRowAllocatorCache
  562. {
  563. public:
  564. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  565. CheckingRowAllocatorCache() { numFailures = 0; }
  566. virtual unsigned getActivityId(unsigned cacheId) const { return 0; }
  567. virtual StringBuffer &getActivityDescriptor(unsigned cacheId, StringBuffer &out) const { return out.append(cacheId); }
  568. virtual void onDestroy(unsigned cacheId, void *row) const
  569. {
  570. if (!RoxieRowCheckValid(cacheId, row))
  571. ++numFailures;
  572. }
  573. virtual void onClone(unsigned cacheId, void *row) const
  574. {
  575. }
  576. virtual void checkValid(unsigned cacheId, const void *row) const
  577. {
  578. if (!RoxieRowCheckValid(cacheId, row))
  579. ++numFailures;
  580. }
  581. mutable unsigned numFailures;
  582. };
  583. class DummyOutputMeta : public IOutputMetaData, public CInterface
  584. {
  585. public:
  586. DummyOutputMeta(size32_t _minSize, size32_t _fixedSize) : minSize(_minSize), fixedSize(_fixedSize) {}
  587. IMPLEMENT_IINTERFACE
  588. virtual size32_t getRecordSize(const void *rec) { return minSize; }
  589. virtual size32_t getFixedSize() const { return fixedSize; }
  590. virtual size32_t getMinRecordSize() const { return minSize; }
  591. virtual void toXML(const byte * self, IXmlWriter & out) {}
  592. virtual unsigned getVersion() const { return 0; }
  593. virtual unsigned getMetaFlags() { return 0; }
  594. virtual const RtlTypeInfo * queryTypeInfo() const { return nullptr; }
  595. virtual IOutputMetaData * querySerializedDiskMeta() { return this; }
  596. virtual void destruct(byte * self) {}
  597. virtual IOutputRowSerializer * createDiskSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  598. virtual IOutputRowDeserializer * createDiskDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  599. virtual ISourceRowPrefetcher * createDiskPrefetcher() { return NULL; }
  600. virtual IOutputRowSerializer * createInternalSerializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  601. virtual IOutputRowDeserializer * createInternalDeserializer(ICodeContext * ctx, unsigned activityId) { return NULL; }
  602. virtual void process(const byte * self, IFieldProcessor & target, unsigned from, unsigned to) {}
  603. virtual void walkIndirectMembers(const byte * self, IIndirectMemberVisitor & visitor) {}
  604. virtual IOutputMetaData * queryChildMeta(unsigned i) { return NULL; }
  605. virtual const RtlRecord &queryRecordAccessor(bool expand) const { UNIMPLEMENTED; }
  606. size32_t minSize;
  607. size32_t fixedSize;
  608. };
  609. void testAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags, unsigned low, unsigned high, int modify, bool checking)
  610. {
  611. CheckingRowAllocatorCache cache;
  612. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, &cache, false);
  613. Owned<IEngineRowAllocator> alloc = checking ? createCrcRoxieRowAllocator(NULL, *rm, meta, 0, 0, flags) : createRoxieRowAllocator(NULL, *rm, meta, 0, 0, flags);
  614. for (unsigned size=low; size <= high; size++)
  615. {
  616. unsigned capacity;
  617. unsigned prevFailures = cache.numFailures;
  618. void * row = alloc->createRow(capacity);
  619. if (low != high)
  620. row = alloc->resizeRow(size, row, capacity);
  621. for (unsigned i1=0; i1 < capacity; i1++)
  622. ((byte *)row)[i1] = i1;
  623. const void * final = alloc->finalizeRow(capacity, row, capacity);
  624. for (unsigned i2=0; i2 < capacity; i2++)
  625. {
  626. ASSERT(((byte *)row)[i2] == i2);
  627. }
  628. if (modify != 0)
  629. {
  630. if (modify < 0)
  631. ((byte *)row)[0]++;
  632. else
  633. ((byte *)row)[size-1]++;
  634. }
  635. ReleaseRoxieRow(row);
  636. if (modify == 0)
  637. {
  638. ASSERT(prevFailures == cache.numFailures);
  639. }
  640. else
  641. {
  642. ASSERT(prevFailures+1 == cache.numFailures);
  643. }
  644. }
  645. }
  646. void testAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags, unsigned low, unsigned high)
  647. {
  648. testAllocator(meta, flags, low, high, 0, false);
  649. testAllocator(meta, flags, low, high, 0, true);
  650. testAllocator(meta, flags, low, high, -1, true);
  651. testAllocator(meta, flags, low, high, +1, true);
  652. }
  653. void testSetup()
  654. {
  655. setTotalMemoryLimit(false, true, false, 40*HEAP_ALIGNMENT_SIZE, 0, NULL, NULL);
  656. }
  657. void testCleanup()
  658. {
  659. releaseRoxieHeap();
  660. }
  661. void testChecking()
  662. {
  663. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, NULL, false);
  664. for (unsigned fixedSize=1; fixedSize<64; fixedSize++)
  665. {
  666. DummyOutputMeta meta(fixedSize, fixedSize);
  667. testAllocator(&meta, RHFnone, fixedSize, fixedSize);
  668. testAllocator(&meta, RHFpacked, fixedSize, fixedSize);
  669. }
  670. for (unsigned varSize=1; varSize<64; varSize++)
  671. {
  672. DummyOutputMeta meta(varSize, 0);
  673. testAllocator(&meta, RHFnone, varSize, varSize);
  674. testAllocator(&meta, RHFnone, 1, varSize);
  675. }
  676. }
  677. void testAllocatorCache()
  678. {
  679. IArrayOf<IOutputMetaData> metas;
  680. Owned<IRowManager> rm = createRowManager(0, NULL, logctx, NULL, false);
  681. class CAllocatorCallback : implements IRowAllocatorMetaActIdCacheCallback
  682. {
  683. IRowManager *rm;
  684. public:
  685. CAllocatorCallback(IRowManager *_rm) : rm(_rm)
  686. {
  687. }
  688. virtual IEngineRowAllocator *createAllocator(IRowAllocatorMetaActIdCache * cache, IOutputMetaData *meta, unsigned activityId, unsigned cacheId, roxiemem::RoxieHeapFlags flags) const
  689. {
  690. return createRoxieRowAllocator(cache, *rm, meta, activityId, cacheId, flags);
  691. }
  692. } callback(rm);
  693. Owned<IRowAllocatorMetaActIdCache> allocatorCache = createRowAllocatorCache(&callback);
  694. // create 64 allocators, 32 different activityId's
  695. for (unsigned fixedSize=1; fixedSize<=64; fixedSize++)
  696. {
  697. DummyOutputMeta *meta = new DummyOutputMeta(fixedSize, fixedSize);
  698. metas.append(*meta);
  699. unsigned activityId = 1 + ((fixedSize-1) % 32); // i.e. make an id, so half are duplicates
  700. Owned<IEngineRowAllocator> allocator = allocatorCache->ensure(meta, activityId, roxiemem::RHFnone);
  701. }
  702. // test that 64 in cache
  703. ASSERT(allocatorCache->items() == 64);
  704. // test ensure again
  705. for (unsigned fixedSize=1; fixedSize<=64; fixedSize++)
  706. {
  707. unsigned activityId = 1 + ((fixedSize-1) % 32); // i.e. make an id, so half are duplicates
  708. IOutputMetaData *meta = &metas.item(fixedSize-1); // from 1st round
  709. Owned<IEngineRowAllocator> allocator = allocatorCache->ensure(meta, activityId, roxiemem::RHFnone);
  710. }
  711. ASSERT(allocatorCache->items() == 64);
  712. metas.kill();
  713. allocatorCache.clear();
  714. }
  715. };
  716. CPPUNIT_TEST_SUITE_REGISTRATION( RoxieRowAllocatorTests );
  717. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( RoxieRowAllocatorTests, "RoxieRowAllocatorTests" );
  718. } // namespace roxiemem
  719. #endif