/*############################################################################## HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ############################################################################## */ #include "jliball.hpp" #include "eclrtl.hpp" #include "hqlexpr.hpp" #include "hqlthql.hpp" #include "fvresultset.ipp" #include "fileview.hpp" #include "fvdisksource.ipp" #include "fvwugen.hpp" #include "fvdsremote.ipp" #include "fverror.hpp" #include "mpcomm.hpp" #define TIMEOUT 60000 #define REMOTE_DATA_SIZE 8000 // roughly how much is sent back for each request enum { FVCMDnone, FVCMDrow, FVCMDraw, FVCMDnumrows, FVCMDcreatewu, FVCMDcreatefile, FVCMDdestroy, FVCMDfetch, FVCMDfetchraw, FVCMDmax }; //--------------------------------------------------------------------------- static void sendReceive(INode * serverNode, CMessageBuffer & msg) { if (!queryWorldCommunicator().sendRecv(msg, serverNode, MPTAG_FILEVIEW, TIMEOUT)) throwError(FVERR_TimeoutRemoteFileView); msg.setEndian(__BIG_ENDIAN); IException * error = deserializeException(msg); if (error) throw error; } RemoteDataSource::RemoteDataSource(const SocketEndpoint & _serverEP, unique_id_t _id, IFvDataSourceMetaData * _metaData, __int64 _cachedNumRows, bool _isIndex) : serverEP(_serverEP) { id = _id; metaData.set(_metaData); serverNode.setown(createINode(serverEP)); cachedNumRows = _cachedNumRows; index = _isIndex; openCount = 0; } IFvDataSourceMetaData * RemoteDataSource::queryMetaData() { return metaData; } void RemoteDataSource::beforeDispose() { CMessageBuffer msg; msg.setEndian(__BIG_ENDIAN); msg.append((byte)FVCMDdestroy); msg.append(id); sendReceive(msg); } bool RemoteDataSource::getARow(MemoryBuffer & out, RowCache & cache, byte cmd, __int64 row) { RowLocation location; if (cache.getCacheRow(row, location)) { out.append(location.matchLength, location.matchRow); return true; } CMessageBuffer msg; msg.setEndian(__BIG_ENDIAN); msg.append(cmd); msg.append(id); msg.append(row); sendReceive(msg); bool ok; msg.read(ok); if (!ok) return false; __int64 start; msg.read(start); VariableRowBlock * next = new VariableRowBlock(msg, start); cache.addRowsOwn(next); if (!cache.getCacheRow(row, location)) assertex(!"Internal Error!"); out.append(location.matchLength, location.matchRow); return true; } bool RemoteDataSource::fetchRow(MemoryBuffer & out, __int64 offset) { CMessageBuffer msg; msg.setEndian(__BIG_ENDIAN); msg.append(FVCMDfetch); msg.append(id); msg.append(offset); sendReceive(msg); bool ok; msg.read(ok); if (!ok) return false; size32_t len; msg.read(len); out.append(len, msg.readDirect(len)); return true; } bool RemoteDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset) { CMessageBuffer msg; msg.setEndian(__BIG_ENDIAN); msg.append(FVCMDfetchraw); msg.append(id); msg.append(offset); sendReceive(msg); bool ok; msg.read(ok); if (!ok) return false; size32_t len; msg.read(len); out.append(len, msg.readDirect(len)); return true; } bool RemoteDataSource::getRow(MemoryBuffer & out, __int64 row) { return getARow(out, translatedRows, FVCMDrow, row); } bool RemoteDataSource::getRawRow(MemoryBuffer & out, __int64 row) { return getARow(out, rawRows, FVCMDraw, row); } __int64 RemoteDataSource::numRows(bool force) { if (!force) return cachedNumRows; CMessageBuffer msg; msg.setEndian(__BIG_ENDIAN); msg.append((byte)FVCMDnumrows); msg.append(id); sendReceive(msg); __int64 result; msg.read(result); return result; } void RemoteDataSource::onClose() { if (--openCount == 0) { //MORE: Should tell the server... } } void RemoteDataSource::onOpen() { //MORE: critical section if (openCount++ == 0) { //MORE - tell the server... } } void RemoteDataSource::sendReceive(CMessageBuffer & msg) { ::sendReceive(serverNode, msg); } IFvDataSource * createRemoteDataSource(const SocketEndpoint & server, const char * username, const char * password, const char * wuid, unsigned sequence, const char * name) { Owned serverNode = createINode(server); CMessageBuffer msg; msg.setEndian(__BIG_ENDIAN); msg.append((byte)FVCMDcreatewu); msg.append(myProcessSession()); msg.append(username); msg.append(password); msg.append(wuid); msg.append(sequence); msg.append(name); sendReceive(serverNode, msg); unsigned short version; unique_id_t id; __int64 numRows; bool isIndex; msg.read(version); msg.read(id); msg.read(numRows); Owned meta = deserializeDataSourceMeta(msg); msg.read(isIndex); if (id) return new RemoteDataSource(server, id, meta, numRows, isIndex); return 0; } IFvDataSource * createRemoteFileDataSource(const SocketEndpoint & server, const char * username, const char * password, const char * logicalName) { Owned serverNode = createINode(server); CMessageBuffer msg; msg.setEndian(__BIG_ENDIAN); msg.append((byte)FVCMDcreatefile); msg.append(myProcessSession()); msg.append(username); msg.append(password); msg.append(logicalName); sendReceive(serverNode, msg); unsigned short version; unique_id_t id; __int64 numRows; bool isIndex; msg.read(version); msg.read(id); msg.read(numRows); Owned meta = deserializeDataSourceMeta(msg); msg.read(isIndex); if (id) return new RemoteDataSource(server, id, meta, numRows, isIndex); return 0; } //--------------------------------------------------------------------------- static RemoteDataSourceServer * server; RemoteDataEntry::~RemoteDataEntry() { if (subscription) querySessionManager().unsubscribeSession(subscription); } RemoteDataSourceServer::RemoteDataSourceServer(const char * _queue, const char * _cluster) : Thread("Remote File View Server") { alive = true; nextId = 0; queue.set(_queue); cluster.set(_cluster); } unique_id_t RemoteDataSourceServer::addDataSource(SessionId session, IFvDataSource * ds) { RemoteDataEntry * newEntry = new RemoteDataEntry; newEntry->id = ++nextId; newEntry->session = session; newEntry->ds.set(ds); newEntry->subscription = querySessionManager().subscribeSession(session, this); //MORE: Register the session so if it dies then we get notified. CriticalBlock procedure(cs); entries.append(*newEntry); return newEntry->id; } void RemoteDataSourceServer::doCmdFetch(bool raw, MemoryBuffer & in, MemoryBuffer & out) { Owned ds = readDataSource(in); if (!ds) { out.append(false); return; } __int64 requestedOffset; in.read(requestedOffset); MemoryBuffer temp; bool ok = ds->fetchRow(temp, requestedOffset); out.append(ok); // ok out.append(temp.length()); out.append(temp.length(), temp.toByteArray()); } void RemoteDataSourceServer::doCmdFetchRaw(bool raw, MemoryBuffer & in, MemoryBuffer & out) { Owned ds = readDataSource(in); if (!ds) { out.append(false); return; } __int64 requestedOffset; in.read(requestedOffset); MemoryBuffer temp; bool ok = ds->fetchRawRow(temp, requestedOffset); out.append(ok); // ok out.append(temp.length()); out.append(temp.length(), temp.toByteArray()); } void RemoteDataSourceServer::doCmdRow(bool raw, MemoryBuffer & in, MemoryBuffer & out) { Owned ds = readDataSource(in); if (!ds) { out.append(false); return; } __int64 requestedRow; in.read(requestedRow); unsigned startPos = out.length(); unsigned numRows = 0; out.append(true); // ok out.append(requestedRow); // start unsigned numRowsPos = out.length(); out.append(numRows); // total number of rows; loop { unsigned lengthPos = out.length(); out.append((unsigned)0); // size of this row. unsigned startRow = out.length(); if (raw) { if (!ds->getRawRow(out, requestedRow+numRows)) break; } else { if (!ds->getRow(out, requestedRow+numRows)) break; } if ((numRows != 0) && (out.length() > REMOTE_DATA_SIZE)) break; unsigned endRow = out.length(); out.rewrite(lengthPos); out.append(endRow-startRow); out.rewrite(endRow); numRows++; } if (numRows == 0) { out.rewrite(startPos); out.append(false); return; } unsigned totalLength = out.length(); out.rewrite(numRowsPos); out.append(numRows); out.rewrite(totalLength); } void RemoteDataSourceServer::doCmdNumRows(MemoryBuffer & in, MemoryBuffer & out) { Owned ds = readDataSource(in); __int64 numRows = ds ? ds->numRows(true) : 0; out.append(numRows); } void RemoteDataSourceServer::doCmdCreateWorkunit(MemoryBuffer & in, MemoryBuffer & out) { SessionId session; StringAttr wuid, username, password; unsigned sequence; StringAttr name; in.read(session); in.read(username).read(password); in.read(wuid); in.read(sequence); in.read(name); DBGLOG("RemoteFileView:CreateWorkunit('%s',%d,'%s') by[%s:%" I64F "d", wuid.get(), sequence, name ? name.get() : "", username.get(), session); Owned wuResult = resolveResult(wuid, sequence, name); Owned ds = createDataSource(wuResult, wuid, username, password); unique_id_t id = addDataSource(session, ds); out.append((unsigned short)CurRemoteVersion); out.append(id); out.append(ds->numRows(false)); ds->queryMetaData()->serialize(out); out.append(ds->isIndex()); DBGLOG("RemoteFileView:CreateWorkunit returns %" I64F "d", id); } void RemoteDataSourceServer::doCmdCreateFile(MemoryBuffer & in, MemoryBuffer & out) { SessionId session; StringAttr username, password, logicalName; in.read(session); in.read(username).read(password); in.read(logicalName); DBGLOG("RemoteFileView:CreateFile('%s') by[%s:%" I64F "d", logicalName.get(), username.get(), session); Owned ds = createFileDataSource(logicalName, cluster, username, password); unique_id_t id = addDataSource(session, ds); out.append((unsigned short)CurRemoteVersion); out.append(id); out.append(ds->numRows(false)); ds->queryMetaData()->serialize(out); out.append(ds->isIndex()); DBGLOG("RemoteFileView:CreateFile returns %" I64F "d", id); } void RemoteDataSourceServer::doCmdDestroy(MemoryBuffer & in, MemoryBuffer & out) { unique_id_t id; in.read(id); DBGLOG("RemoteFileView:Destroy(%" I64F "d)", id); CriticalBlock block(cs); ForEachItemIn(idx, entries) { RemoteDataEntry & cur = entries.item(idx); if (cur.id == id) { entries.remove(idx); return; } } } IFvDataSource * RemoteDataSourceServer::getDataSource(unique_id_t id) { CriticalBlock block(cs); ForEachItemIn(idx, entries) { RemoteDataEntry & cur = entries.item(idx); if (cur.id == id) return LINK(cur.ds); } return NULL; } void RemoteDataSourceServer::closed(SessionId id) { removeSession(id); } void RemoteDataSourceServer::aborted(SessionId id) { removeSession(id); } IFvDataSource * RemoteDataSourceServer::readDataSource(MemoryBuffer & in) { unique_id_t id; in.read(id); return getDataSource(id); } void RemoteDataSourceServer::removeSession(SessionId id) { DBGLOG("RemoteFileView:Session Died"); CriticalBlock block(cs); ForEachItemInRev(idx, entries) { RemoteDataEntry & cur = entries.item(idx); if (cur.session == id) { DBGLOG("RemoteFileView:Instance Died %" I64F "d", cur.id); entries.remove(idx); } } } //MORE: If this is ever actually used then it should probably have several threads // processing the commands, especially if the commands can involve lots of processing. int RemoteDataSourceServer::run() { CMessageBuffer msg; MemoryBuffer result; INode * sender; while (alive) { msg.clear(); if (queryWorldCommunicator().recv(msg, 0, MPTAG_FILEVIEW, &sender)) { msg.setEndian(__BIG_ENDIAN); result.setEndian(__BIG_ENDIAN); try { serializeException(NULL, result.clear()); byte cmd; msg.read(cmd); switch (cmd) { case FVCMDrow: doCmdRow(false, msg, result); break; case FVCMDraw: doCmdRow(true, msg, result); break; case FVCMDnumrows: doCmdNumRows(msg, result); break; case FVCMDcreatewu: doCmdCreateWorkunit(msg, result); break; case FVCMDcreatefile: doCmdCreateFile(msg, result); break; case FVCMDdestroy: doCmdDestroy(msg, result); break; case FVCMDfetch: doCmdFetch(false, msg, result); break; case FVCMDfetchraw: doCmdFetchRaw(false, msg, result); break; default: throwError(FVERR_UnknownRemoteCommand); } msg.clear().append(result); } catch (IException * e) { serializeException(e, msg.clear()); e->Release(); } queryWorldCommunicator().reply(msg, MP_ASYNC_SEND); ::Release(sender); } } server = NULL; return 0; } void RemoteDataSourceServer::stop() { alive = false; queryWorldCommunicator().cancel(0, MPTAG_FILEVIEW); join(); } extern FILEVIEW_API void startRemoteDataSourceServer(const char * queue, const char * cluster) { //This isn't properly thread safe - it also isn't ever used in practice, so not a problem. if (!server) { server = new RemoteDataSourceServer(queue, cluster); server->start(); } } extern FILEVIEW_API void stopRemoteDataSourceServer() { if (server) server->stop(); } IConstWUResult * resolveResult(const char * wuid, unsigned sequence, const char * name) { Owned factory = getWorkUnitFactory(); Owned wu = factory->openWorkUnit(wuid, false); return getWorkUnitResult(wu, name, sequence); } IConstWUResult * secResolveResult(ISecManager &secmgr, ISecUser &secuser, const char * wuid, unsigned sequence, const char * name) { Owned factory = getSecWorkUnitFactory(secmgr, secuser); Owned wu = factory->openWorkUnit(wuid, false); return (wu) ? getWorkUnitResult(wu, name, sequence) : NULL; }