|
@@ -65,14 +65,22 @@ struct TransferStreamHeader
|
|
|
{
|
|
|
rowcount_t numrecs;
|
|
|
rowcount_t pos;
|
|
|
- size32_t recsize;
|
|
|
unsigned id;
|
|
|
- TransferStreamHeader(rowcount_t _pos, rowcount_t _numrecs, unsigned _recsize, unsigned _id)
|
|
|
- : pos(_pos), numrecs(_numrecs), recsize(_recsize), id(_id)
|
|
|
+ unsigned crc = 0;
|
|
|
+ TransferStreamHeader(rowcount_t _pos, rowcount_t _numrecs, unsigned _id)
|
|
|
+ : pos(_pos), numrecs(_numrecs), id(_id)
|
|
|
{
|
|
|
+ crc = getCrc();
|
|
|
}
|
|
|
TransferStreamHeader() {}
|
|
|
- void winrev() { _WINREV(pos); _WINREV(numrecs); _WINREV(recsize); _WINREV(id); }
|
|
|
+ void winrev() { _WINREV(pos); _WINREV(numrecs); _WINREV(id); _WINREV(crc); }
|
|
|
+ unsigned getCrc() const
|
|
|
+ {
|
|
|
+ unsigned retCrc = crc32((const char *)&numrecs, sizeof(numrecs), 0);
|
|
|
+ retCrc = crc32((const char *)&pos, sizeof(pos), retCrc);
|
|
|
+ retCrc = crc32((const char *)&id, sizeof(id), retCrc);
|
|
|
+ return retCrc;
|
|
|
+ }
|
|
|
};
|
|
|
|
|
|
|
|
@@ -281,7 +289,7 @@ public:
|
|
|
IRowStream *ConnectMergeRead(unsigned id, IThorRowInterfaces *rowif,SocketEndpoint &nodeaddr,rowcount_t startrec,rowcount_t numrecs)
|
|
|
{
|
|
|
Owned<ISocket> socket = DoConnect(nodeaddr);
|
|
|
- TransferStreamHeader hdr(startrec,numrecs,0,id);
|
|
|
+ TransferStreamHeader hdr(startrec, numrecs, id);
|
|
|
#ifdef _FULL_TRACE
|
|
|
StringBuffer s;
|
|
|
nodeaddr.getUrlStr(s);
|
|
@@ -296,8 +304,30 @@ IRowStream *ConnectMergeRead(unsigned id, IThorRowInterfaces *rowif,SocketEndpoi
|
|
|
ISocketRowWriter *ConnectMergeWrite(IThorRowInterfaces *rowif,ISocket *socket,size32_t bufsize,rowcount_t &startrec,rowcount_t &numrecs)
|
|
|
{
|
|
|
TransferStreamHeader hdr;
|
|
|
- socket->read(&hdr,sizeof(hdr));
|
|
|
+ unsigned remaining = sizeof(hdr);
|
|
|
+ byte *dst = (byte *)&hdr;
|
|
|
+
|
|
|
+ /*
|
|
|
+ * A client has connected at this stage, the hdr should be sent swiftly.
|
|
|
+ * A generous 1 minute timeout between reads, if longer, timeout exception,
|
|
|
+ * will be thrown, and the connection ignored.
|
|
|
+ */
|
|
|
+ while (true)
|
|
|
+ {
|
|
|
+ size32_t read;
|
|
|
+ socket->readtms(dst, 1, remaining, read, 60*1000); // 1 min timeout
|
|
|
+ if (read == remaining)
|
|
|
+ break;
|
|
|
+ remaining -= read;
|
|
|
+ dst += read;
|
|
|
+ }
|
|
|
hdr.winrev();
|
|
|
+ if (hdr.getCrc() != hdr.crc)
|
|
|
+ {
|
|
|
+ char name[100];
|
|
|
+ int port = socket->peer_name(name,sizeof(name));
|
|
|
+ throw makeStringExceptionV(TE_InvalidSortConnect, "Invalid SORT connection from: %s:%u", name, port);
|
|
|
+ }
|
|
|
startrec = hdr.pos;
|
|
|
numrecs = hdr.numrecs;
|
|
|
#ifdef _FULL_TRACE
|