sqs.h 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2017 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef SQSHPCC_H
  14. #define SQSHPCC_H
  15. #ifdef _WIN32
  16. #define ECL_SQS_CALL _cdecl
  17. #else
  18. #define ECL_SQS_CALL
  19. #endif
  20. #ifdef ECL_SQS_EXPORTS
  21. #define ECL_SQS_API DECL_EXPORT
  22. #else
  23. #define ECL_SQS_API DECL_IMPORT
  24. #endif
  25. #include "platform.h"
  26. #include "jthread.hpp"
  27. #include "hqlplugins.hpp"
  28. #include "eclrtl_imp.hpp"
  29. #include "eclhelper.hpp"
  30. #include <aws/core/Aws.h>
  31. #include <aws/sqs/SQSClient.h>
  32. #include <iostream>
  33. #include <fstream>
  34. #include <stdio.h>
  35. #include <string>
  36. #include <unistd.h>
  37. #include <libgen.h>
  38. #include <atomic>
  39. #ifdef ECL_SQS_EXPORTS
  40. extern "C"
  41. {
  42. ECL_SQS_API bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb);
  43. ECL_SQS_API void setPluginContext(IPluginContext * _ctx);
  44. }
  45. #endif
  46. extern "C++"
  47. {
  48. namespace SQSHPCCPlugin
  49. {
  50. using namespace std;
  51. typedef struct
  52. {
  53. int code;
  54. std::string body;
  55. bool success;
  56. } Response;
  57. struct AtomicCounter
  58. {
  59. std::atomic<int> counter;
  60. void increment() {
  61. ++counter;
  62. }
  63. void decrement() {
  64. --counter;
  65. }
  66. int get() {
  67. return counter.load();
  68. }
  69. };
  70. class SQSHPCC
  71. {
  72. public:
  73. explicit SQSHPCC(const std::string& _queueName);
  74. ~SQSHPCC();
  75. Response sendMessage(const char* message,const char* messagecount);
  76. Response createQueue();
  77. Response deleteQueue();
  78. Response deleteMessage(const std::string& message);
  79. Response receiveMessage();
  80. void setSQSConfiguration(const std::string& protocol,const std::string& region,const bool useProxy, const std::string& proxyHost,const unsigned proxyPort, const std::string& proxyUsername, const std::string& proxyPassword);
  81. void setAwsCredentials(const char* accessKeyId,
  82. const char* secretKey);
  83. bool disconnect();
  84. bool QueueExists();
  85. void setQueueUrlFromQueueName();
  86. protected:
  87. private:
  88. std::string queueName;
  89. Aws::String queueUrl;
  90. std::ofstream handlelog;
  91. AtomicCounter counter;
  92. Aws::SQS::SQSClient* sqsClient;
  93. Aws::SDKOptions options;
  94. Aws::Auth::AWSCredentials* credentials=nullptr;
  95. bool RegionExists(const std::string& region);
  96. const char *const getRegion(const std::string& region);
  97. void upstr(char* s);
  98. std::string convertAwsStringToCharPtr(Aws::String str);
  99. char* convertStringToChar(const string& str);
  100. };
  101. //----------------------------------------------------------------------
  102. /**
  103. * Queues the message for publishing to aws queue
  104. *
  105. * @param Reqion
  106. * @param QueueName
  107. * @param message The message to send
  108. *
  109. * @return true if the message was cached successfully
  110. */
  111. ECL_SQS_API bool ECL_SQS_CALL publishMessage(ICodeContext * ctx,const char* region, const char* queueName, const char* message, bool useProxy, const char* proxyHost, __int32 proxyPort, const char* proxyUsername, const char* proxyPassword);
  112. //---------------------------------------------------------------------
  113. ECL_SQS_API bool ECL_SQS_CALL publishOrderedMessage(ICodeContext * ctx,const char* region, const char* queueName, const char* message, const char* messageCount,const bool useProxy, const char* proxyHost, __int32 proxyPort, const char* proxyUsername, const char* proxyPassword);
  114. /**
  115. *
  116. *
  117. */
  118. ECL_SQS_API bool ECL_SQS_CALL createQueue(ICodeContext * ctx,const char* region, const char* queueName);
  119. ECL_SQS_API bool ECL_SQS_CALL QueueExists(ICodeContext* ctx,const char* region, const char* queueName);
  120. ECL_SQS_API bool ECL_SQS_CALL deleteQueue(ICodeContext * ctx,const char* region, const char* queueName);
  121. }
  122. }
  123. #endif