/*############################################################################## HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ############################################################################## */ #include "platform.h" #include "jmutex.hpp" #include "jqueue.hpp" //--------------------------------------------------------------------------------------------------------------------- template class CRowQueue : implements CInterfaceOf { public: CRowQueue(unsigned _maxItems, unsigned _numProducers, unsigned _numConsumers) : queue(_numProducers, _numConsumers, _maxItems), numConsumers(_numConsumers), numProducers(_numProducers) { } virtual bool enqueue(const void * const item) { return queue.enqueue(item); } virtual bool dequeue(const void * & result) { return queue.dequeue(result, false); } virtual bool tryDequeue(const void * & result) { return queue.dequeue(result, true); } virtual void reset() { //This resets the state, but does not clear the elements, they must be dequeued and released. queue.reset(); //How clean up the queue and ensure the elements are disposed of? } virtual void noteReaderStopped() { queue.noteReaderStopped(); } virtual void noteWriterStopped() { queue.noteWriterStopped(); } virtual void abort() { queue.abort(); } private: ReaderWriterQueue queue; const unsigned numConsumers; const unsigned numProducers; }; IRowQueue * createRowQueue(unsigned numReaders, unsigned numWriters, unsigned maxItems, unsigned maxSlots) { //Ideally if the numberOfReaders or writers is 1 then ideally, supply 1 for the relevant values. // assertex(maxSlots == 0 || maxItems < maxSlots); if ((numReaders == 1) && (numWriters == 1) && (maxItems < 256)) return new CRowQueue(maxItems, numWriters, numReaders); if ((numReaders == 1) && (numWriters == 1) && (maxItems < 0x4000)) return new CRowQueue(maxItems, numWriters, numReaders); if ((numReaders == 1) && (numWriters <= 127) && (maxItems < 256)) return new CRowQueue(maxItems, numWriters, numReaders); if ((numWriters == 1) && (numReaders <= 255) && (maxItems < 2048)) return new CRowQueue(maxItems, numWriters, numReaders); if ((numReaders <= 31) && (numWriters <= 31) && (maxItems < 128)) return new CRowQueue(maxItems, numWriters, numReaders); assertex((numReaders < 0x1000) && (numWriters < 0x400)); return new CRowQueue(maxItems, numWriters, numReaders); } //MORE: //use likely()/unlikely() - they might improve the code //Adaptive spin counts - separate variables for reader/writer. reduce number of spins if a producer/consumer blocks. //Add options to indicate spinning for readers or writers is ok //Use a traits class instead of multiple parameters? //If readers or writers spin, then there is no need to keep track of waiting counts in the state. //Base the blog on an earlier simpler example - without the full templatization