fvdsremote.ipp 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef FVDSREMOTE_IPP
  14. #define FVDSREMOTE_IPP
  15. #include "fvdatasource.hpp"
  16. #include "fvsource.ipp"
  17. #include "dadfs.hpp"
  18. #include "dasess.hpp"
  19. #include "mpbuff.hpp"
  20. enum {
  21. FVRemoteVersion1 = 1,
  22. CurRemoteVersion = FVRemoteVersion1
  23. };
  24. class RemoteDataSource : public ADataSource
  25. {
  26. public:
  27. RemoteDataSource(const SocketEndpoint & _serverEP, unique_id_t _id, IFvDataSourceMetaData * _metaData, __int64 _cachedNumRows, bool _isIndex);
  28. virtual bool init() { return true; }
  29. virtual IFvDataSourceMetaData * queryMetaData();
  30. virtual __int64 numRows(bool force = false);
  31. virtual bool fetchRow(MemoryBuffer & out, __int64 offset);
  32. virtual bool fetchRawRow(MemoryBuffer & out, __int64 offset);
  33. virtual bool getRow(MemoryBuffer & out, __int64 row);
  34. virtual bool getRawRow(MemoryBuffer & out, __int64 row);
  35. virtual bool isIndex() { return index; }
  36. virtual void onClose();
  37. virtual void onOpen();
  38. virtual void beforeDispose();
  39. virtual bool optimizeFilter(unsigned offset, unsigned len, const void * data) { return false; } // MORE: Needs implementing if this is ever used.
  40. protected:
  41. bool getARow(MemoryBuffer & out, RowCache & cache, byte cmd, __int64 row);
  42. void sendReceive(CMessageBuffer & msg);
  43. protected:
  44. const SocketEndpoint & serverEP;
  45. unique_id_t id;
  46. Owned<IFvDataSourceMetaData> metaData;
  47. Owned<INode> serverNode;
  48. RowCache rawRows;
  49. RowCache translatedRows;
  50. __int64 cachedNumRows;
  51. unsigned openCount;
  52. bool index;
  53. };
  54. class RemoteDataEntry : public CInterface
  55. {
  56. public:
  57. ~RemoteDataEntry();
  58. public:
  59. unique_id_t id;
  60. SessionId session;
  61. SubscriptionId subscription;
  62. Owned<IFvDataSource> ds;
  63. };
  64. class RemoteDataSourceServer : public Thread, public ISessionNotify
  65. {
  66. public:
  67. RemoteDataSourceServer(const char * _queue, const char * _cluster);
  68. IMPLEMENT_IINTERFACE
  69. //Thread
  70. virtual int run();
  71. //ISessionNotify
  72. virtual void closed(SessionId id);
  73. virtual void aborted(SessionId id);
  74. void stop();
  75. protected:
  76. unique_id_t addDataSource(SessionId session, IFvDataSource * ds);
  77. void doCmdFetch(bool raw, MemoryBuffer & in, MemoryBuffer & out);
  78. void doCmdFetchRaw(bool raw, MemoryBuffer & in, MemoryBuffer & out);
  79. void doCmdRow(bool raw, MemoryBuffer & in, MemoryBuffer & out);
  80. void doCmdRaw(MemoryBuffer & in, MemoryBuffer & out);
  81. void doCmdNumRows(MemoryBuffer & in, MemoryBuffer & out);
  82. void doCmdCreateWorkunit(MemoryBuffer & in, MemoryBuffer & out);
  83. void doCmdCreateFile(MemoryBuffer & in, MemoryBuffer & out);
  84. void doCmdDestroy(MemoryBuffer & in, MemoryBuffer & out);
  85. IFvDataSource * getDataSource(unique_id_t id);
  86. IFvDataSource * readDataSource(MemoryBuffer & in);
  87. void removeSession(SessionId id);
  88. protected:
  89. bool alive;
  90. unique_id_t nextId;
  91. CriticalSection cs;
  92. StringAttr queue;
  93. StringAttr cluster;
  94. CIArrayOf<RemoteDataEntry> entries;
  95. };
  96. #endif