|
@@ -1602,11 +1602,8 @@ class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWrite
|
|
|
{
|
|
|
// NB: allowed to go over limit, by as much as inRows.ordinality()-1
|
|
|
rows.appendRows(inRows, true);
|
|
|
- if (readerBlocked && (rows.numCommitted() >= readGranularity))
|
|
|
- {
|
|
|
- emptySem.signal();
|
|
|
- readerBlocked = false;
|
|
|
- }
|
|
|
+ if (rows.numCommitted() >= readGranularity)
|
|
|
+ checkReleaseReader();
|
|
|
return;
|
|
|
}
|
|
|
writersBlocked++;
|
|
@@ -1614,6 +1611,22 @@ class CRowMultiWriterReader : public CSimpleInterface, implements IRowMultiWrite
|
|
|
fullSem.wait();
|
|
|
}
|
|
|
}
|
|
|
+ inline void checkReleaseReader()
|
|
|
+ {
|
|
|
+ if (readerBlocked)
|
|
|
+ {
|
|
|
+ emptySem.signal();
|
|
|
+ readerBlocked = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ inline void checkReleaseWriters()
|
|
|
+ {
|
|
|
+ if (writersBlocked)
|
|
|
+ {
|
|
|
+ fullSem.signal(writersBlocked);
|
|
|
+ writersBlocked = 0;
|
|
|
+ }
|
|
|
+ }
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
@@ -1646,11 +1659,7 @@ public:
|
|
|
{
|
|
|
rows.flush();
|
|
|
eow = true;
|
|
|
- if (readerBlocked)
|
|
|
- {
|
|
|
- emptySem.signal();
|
|
|
- readerBlocked = false;
|
|
|
- }
|
|
|
+ checkReleaseReader();
|
|
|
}
|
|
|
}
|
|
|
// ISharedWriteBuffer impl.
|
|
@@ -1663,16 +1672,8 @@ public:
|
|
|
{
|
|
|
CThorArrayLockBlock block(rows);
|
|
|
eos = true;
|
|
|
- if (writersBlocked)
|
|
|
- {
|
|
|
- fullSem.signal(writersBlocked);
|
|
|
- writersBlocked = 0;
|
|
|
- }
|
|
|
- if (readerBlocked)
|
|
|
- {
|
|
|
- emptySem.signal();
|
|
|
- readerBlocked = false;
|
|
|
- }
|
|
|
+ checkReleaseWriters();
|
|
|
+ checkReleaseReader();
|
|
|
}
|
|
|
// IRowStream impl.
|
|
|
virtual const void *nextRow()
|
|
@@ -1695,11 +1696,7 @@ public:
|
|
|
}
|
|
|
rows.readBlock(readRows, rowsToRead);
|
|
|
rowPos = 0;
|
|
|
- if (writersBlocked)
|
|
|
- {
|
|
|
- fullSem.signal(writersBlocked);
|
|
|
- writersBlocked = 0;
|
|
|
- }
|
|
|
+ checkReleaseWriters();
|
|
|
break; // fall through to return a row
|
|
|
}
|
|
|
readerBlocked = true;
|
|
@@ -1717,6 +1714,7 @@ public:
|
|
|
virtual void stop()
|
|
|
{
|
|
|
eos = true;
|
|
|
+ checkReleaseWriters();
|
|
|
}
|
|
|
};
|
|
|
|