Преглед на файлове

HPCC-23215 Experimental branch that allows reading and writing to S3 buckets

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday преди 5 години
родител
ревизия
d10eb622c3

+ 3 - 0
common/remote/hooks/CMakeLists.txt

@@ -30,3 +30,6 @@ ENDIF()
 IF (USE_LIBARCHIVE)
   add_subdirectory(libarchive)
 ENDIF()
+IF (USE_AWS)
+  add_subdirectory(s3)
+ENDIF()

+ 50 - 0
common/remote/hooks/s3/CMakeLists.txt

@@ -0,0 +1,50 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+
+# Component: s3file
+#####################################################
+# Description:
+# ------------
+#    Cmake Input File for amazon s3 direct reading hook
+#####################################################
+
+project( s3file )
+set(AWS_SDK_CPP_SOURCE_DIR ${HPCC_SOURCE_DIR}/system/aws/aws-sdk-cpp)
+
+set (    SRCS
+         s3file.cpp
+         s3file.hpp
+    )
+
+include_directories (
+         ${HPCC_SOURCE_DIR}/system/include
+         ${HPCC_SOURCE_DIR}/system/jlib
+         ${AWS_SDK_CPP_SOURCE_DIR}/aws-cpp-sdk-core/include
+         ${AWS_SDK_CPP_SOURCE_DIR}/aws-cpp-sdk-s3/include
+    )
+
+ADD_DEFINITIONS( -D_USRDLL -DS3FILE_EXPORTS )
+
+HPCC_ADD_LIBRARY( s3file SHARED ${SRCS}  )
+install ( TARGETS s3file DESTINATION filehooks )
+target_link_options(s3file  PUBLIC  "LINKER:-z,defs" )
+
+target_link_libraries ( s3file
+    jlib
+    aws-cpp-sdk-core
+    aws-cpp-sdk-s3
+)

+ 721 - 0
common/remote/hooks/s3/s3file.cpp

@@ -0,0 +1,721 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2020 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "platform.h"
+
+#include "jlib.hpp"
+#include "jio.hpp"
+
+#include "jmutex.hpp"
+#include "jfile.hpp"
+#include "jregexp.hpp"
+#include "jstring.hpp"
+#include "jlog.hpp"
+
+#include "s3file.hpp"
+
+#include <aws/core/Aws.h>
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/GetObjectRequest.h>
+#include <aws/s3/model/HeadObjectRequest.h>
+#include <aws/s3/model/PutObjectRequest.h>
+#include <aws/s3/model/DeleteObjectRequest.h>
+
+/*
+ * S3 questions:
+ *
+ * Where would we store access id/secrets?
+ *     Current use the default default credential manager which gets them from environment variables, or passed in
+ *     when running on an AWS instance.
+ * What is the latency on file access?  ~200ms.
+ * What is the cost of accessing the data?  $0.4/million GET requests (including requesting meta)
+ * How to you efficiently page results from a S3 bucket?  You can perform a range get, but you'll need to pay for each call.
+ * You can get the length of an object using HeadObject..getContentLength, but you will get charged - so for small files it is better to just get it.
+ *     Probably best to request the first 10Mb, and if that returns a full 10Mb then request the size information.
+ * S3 does supports partial writes in chunks of 5Mb or more.  It may be simpler to create a file locally and then submit it in a single action.
+ *
+ * This is currently a proof of concept implementations.  The following changes are required for a production version:
+ * - Revisit credentials
+ * - Support chunked writes.
+ * - Implement directory iteration.
+ * - Ideally switch engines to use streaming interfaces for reading and writing files.
+ * - Investigate adding a read-ahead thread to read the next block of the file.
+ */
+
+//#define TRACE_S3
+//#define TEST_S3_PAGING
+//#define FIXED_CREDENTIALS
+
+constexpr const char * s3FilePrefix = "s3://";
+
+#ifdef TEST_S3_PAGING
+constexpr offset_t awsReadRequestSize = 50;
+#else
+constexpr offset_t awsReadRequestSize = 0x400000;  // Default to requesting 4Mb each time
+#endif
+
+static Aws::SDKOptions options;
+MODULE_INIT(INIT_PRIORITY_HQLINTERNAL)
+{
+    Aws::InitAPI(options);
+    return true;
+}
+MODULE_EXIT()
+{
+    Aws::ShutdownAPI(options);
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class S3File;
+class S3FileReadIO : implements CInterfaceOf<IFileIO>
+{
+public:
+    S3FileReadIO(S3File * _file, Aws::S3::Model::GetObjectOutcome & firstRead, FileIOStats & _stats);
+
+    virtual size32_t read(offset_t pos, size32_t len, void * data) override;
+    virtual offset_t size() override;
+    virtual void close() override
+    {
+    }
+
+    // Write methods not implemented - this is a read-only file
+    virtual size32_t write(offset_t pos, size32_t len, const void * data) override
+    {
+        throwUnexpected();
+        return 0;
+    }
+    virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override
+    {
+        throwUnexpected();
+        return 0;
+    }
+    virtual void setSize(offset_t size) override
+    {
+        throwUnexpected();
+    }
+    virtual void flush() override
+    {
+        //Could implement if we use the async version of the putObject call.
+    }
+    unsigned __int64 getStatistic(StatisticKind kind) override;
+
+protected:
+    size_t extractDataFromResult(size_t offset, size_t length, void * target);
+
+protected:
+    Linked<S3File> file;
+    CriticalSection cs;
+    offset_t startResultOffset = 0;
+    offset_t endResultOffset = 0;
+    Aws::S3::Model::GetObjectOutcome readResult;
+    FileIOStats stats;
+};
+
+class S3FileWriteIO : implements CInterfaceOf<IFileIO>
+{
+public:
+    S3FileWriteIO(S3File * _file);
+    virtual void beforeDispose() override;
+
+    virtual size32_t read(offset_t pos, size32_t len, void * data) override
+    {
+        throwUnexpected();
+    }
+
+    virtual offset_t size() override
+    {
+        throwUnexpected();
+    }
+
+    virtual void close() override;
+    virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=(offset_t)-1) override;
+    virtual size32_t write(offset_t pos, size32_t len, const void * data) override;
+    virtual void setSize(offset_t size) override;
+    virtual void flush() override;
+
+    unsigned __int64 getStatistic(StatisticKind kind) override;
+
+protected:
+    Linked<S3File> file;
+    CriticalSection cs;
+    FileIOStats stats;
+    bool blobWritten = false;
+};
+
+class S3File : implements CInterfaceOf<IFile>
+{
+    friend class S3FileReadIO;
+    friend class S3FileWriteIO;
+public:
+    S3File(const char *_s3FileName);
+    virtual bool exists()
+    {
+        ensureMetaData();
+        return fileExists;
+    }
+    virtual bool getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime);
+    virtual fileBool isDirectory()
+    {
+        ensureMetaData();
+        if (!fileExists)
+            return notFound;
+        return isDir ? foundYes : foundNo;
+    }
+    virtual fileBool isFile()
+    {
+        ensureMetaData();
+        if (!fileExists)
+            return notFound;
+        return !isDir ? foundYes : foundNo;
+    }
+    virtual fileBool isReadOnly()
+    {
+        ensureMetaData();
+        if (!fileExists)
+            return notFound;
+        return foundYes;
+    }
+    virtual IFileIO * open(IFOmode mode, IFEflags extraFlags=IFEnone)
+    {
+        if (mode == IFOcreate)
+            return createFileWriteIO();
+        assertex(mode==IFOread && fileExists);
+        return createFileReadIO();
+    }
+    virtual IFileAsyncIO * openAsync(IFOmode mode)
+    {
+        UNIMPLEMENTED;
+    }
+    virtual IFileIO * openShared(IFOmode mode, IFSHmode shmode, IFEflags extraFlags=IFEnone)
+    {
+        if (mode == IFOcreate)
+            return createFileWriteIO();
+        assertex(mode==IFOread && fileExists);
+        return createFileReadIO();
+    }
+    virtual const char * queryFilename()
+    {
+        return fullName.str();
+    }
+    virtual offset_t size()
+    {
+        ensureMetaData();
+        return fileSize;
+    }
+
+// Directory functions
+    virtual IDirectoryIterator *directoryFiles(const char *mask, bool sub, bool includeDirs)
+    {
+        UNIMPLEMENTED;
+        return createNullDirectoryIterator();
+    }
+    virtual bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime)
+    {
+        ensureMetaData();
+        isdir = isDir;
+        size = fileSize;
+        modtime.clear();
+        return true;
+    }
+
+    // Not going to be implemented - this IFile interface is too big..
+    virtual bool setTime(const CDateTime * createTime, const CDateTime * modifiedTime, const CDateTime * accessedTime) { UNIMPLEMENTED; }
+    virtual bool remove();
+    virtual void rename(const char *newTail) { UNIMPLEMENTED; }
+    virtual void move(const char *newName) { UNIMPLEMENTED; }
+    virtual void setReadOnly(bool ro) { UNIMPLEMENTED; }
+    virtual void setFilePermissions(unsigned fPerms) { UNIMPLEMENTED; }
+    virtual bool setCompression(bool set) { UNIMPLEMENTED; }
+    virtual offset_t compressedSize() { UNIMPLEMENTED; }
+    virtual unsigned getCRC() { UNIMPLEMENTED; }
+    virtual void setCreateFlags(unsigned short cflags) { UNIMPLEMENTED; }
+    virtual void setShareMode(IFSHmode shmode) { UNIMPLEMENTED; }
+    virtual bool createDirectory() { UNIMPLEMENTED; }
+    virtual IDirectoryDifferenceIterator *monitorDirectory(
+                                  IDirectoryIterator *prev=NULL,    // in (NULL means use current as baseline)
+                                  const char *mask=NULL,
+                                  bool sub=false,
+                                  bool includedirs=false,
+                                  unsigned checkinterval=60*1000,
+                                  unsigned timeout=(unsigned)-1,
+                                  Semaphore *abortsem=NULL)  { UNIMPLEMENTED; }
+    virtual void copySection(const RemoteFilename &dest, offset_t toOfs=(offset_t)-1, offset_t fromOfs=0, offset_t size=(offset_t)-1, ICopyFileProgress *progress=NULL, CFflags copyFlags=CFnone) { UNIMPLEMENTED; }
+    virtual void copyTo(IFile *dest, size32_t buffersize=DEFAULT_COPY_BLKSIZE, ICopyFileProgress *progress=NULL, bool usetmp=false, CFflags copyFlags=CFnone) { UNIMPLEMENTED; }
+    virtual IMemoryMappedFile *openMemoryMapped(offset_t ofs=0, memsize_t len=(memsize_t)-1, bool write=false)  { UNIMPLEMENTED; }
+
+protected:
+    void readBlob(Aws::S3::Model::GetObjectOutcome & readResult, FileIOStats & stats, offset_t from = 0, offset_t length = unknownFileSize);
+    size32_t writeBlob(size32_t len, const void * data, FileIOStats & stats);
+    void ensureMetaData();
+    void gatherMetaData();
+    IFileIO * createFileReadIO();
+    IFileIO * createFileWriteIO();
+
+protected:
+    StringBuffer fullName;
+    StringBuffer bucketName;
+    StringBuffer objectName;
+    offset_t fileSize = unknownFileSize;
+    bool haveMeta = false;
+    bool isDir = false;
+    bool fileExists = false;
+    int64_t modifiedMsTime = 0;
+    CriticalSection cs;
+};
+
+//---------------------------------------------------------------------------------------------------------------------
+
+S3FileReadIO::S3FileReadIO(S3File * _file, Aws::S3::Model::GetObjectOutcome & firstRead, FileIOStats & _firstStats)
+: file(_file), readResult(std::move(firstRead)), stats(_firstStats)
+{
+    startResultOffset = 0;
+    endResultOffset = readResult.GetResult().GetContentLength();
+}
+
+size32_t S3FileReadIO::read(offset_t pos, size32_t len, void * data)
+{
+    if (pos > file->fileSize)
+        return 0;
+    if (pos + len > file->fileSize)
+        len = file->fileSize - pos;
+    if (len == 0)
+        return 0;
+
+    size32_t sizeRead = 0;
+    offset_t lastOffset = pos + len;
+
+    // MORE: Do we ever read file IO from more than one thread?  I'm not convinced we do, and the critical blocks waste space and slow it down.
+    //It might be worth revisiting (although I'm not sure what effect stranding has) - by revisiting the primary interface used to read files.
+    CriticalBlock block(cs);
+    for(;;)
+    {
+        //Check if part of the request can be fulfilled from the current read block
+        if (pos >= startResultOffset && pos < endResultOffset)
+        {
+            size_t copySize = ((lastOffset > endResultOffset) ? endResultOffset : lastOffset) - pos;
+            size_t extractedSize = extractDataFromResult((pos - startResultOffset), copySize, data);
+            assertex(copySize == extractedSize);
+            pos += copySize;
+            len -= copySize;
+            data = (byte *)data + copySize;
+            sizeRead += copySize;
+            if (len == 0)
+                return sizeRead;
+        }
+
+#ifdef TEST_S3_PAGING
+        offset_t readSize = awsReadRequestSize;
+#else
+        offset_t readSize = (len > awsReadRequestSize) ? len : awsReadRequestSize;
+#endif
+
+        file->readBlob(readResult, stats, pos, readSize);
+        if (!readResult.IsSuccess())
+            return sizeRead;
+        offset_t contentSize = readResult.GetResult().GetContentLength();
+        //If the results are inconsistent then do not loop forever
+        if (contentSize == 0)
+            return sizeRead;
+
+        startResultOffset = pos;
+        endResultOffset = pos + contentSize;
+    }
+}
+
+offset_t S3FileReadIO::size()
+{
+    return file->fileSize;
+}
+
+size_t S3FileReadIO::extractDataFromResult(size_t offset, size_t length, void * target)
+{
+    auto & contents = readResult.GetResultWithOwnership().GetBody();
+    auto buffer = contents.rdbuf();
+    buffer->pubseekoff(0, std::ios_base::beg, std::ios_base::in);
+    return buffer->sgetn((char *)target, length);
+}
+
+unsigned __int64 S3FileReadIO::getStatistic(StatisticKind kind)
+{
+    return stats.getStatistic(kind);
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+S3FileWriteIO::S3FileWriteIO(S3File * _file)
+: file(_file)
+{
+}
+
+void S3FileWriteIO::beforeDispose()
+{
+    try
+    {
+        close();
+    }
+    catch (...)
+    {
+    }
+}
+
+void S3FileWriteIO::close()
+{
+    CriticalBlock block(cs);
+    if (!blobWritten)
+        file->writeBlob(0, nullptr, stats);
+}
+
+offset_t S3FileWriteIO::appendFile(IFile *file, offset_t pos, offset_t len)
+{
+    throwUnexpected();
+    return 0;
+}
+
+size32_t S3FileWriteIO::write(offset_t pos, size32_t len, const void * data)
+{
+    if (len)
+    {
+        CriticalBlock block(cs);
+        //Very strange semantics for a proof of concept - only allow a single write to the file.
+        //A full implementation will need to either
+        //  write to a temporary file, and then copy to the s3 file when the file is closed.
+        //  use the multi-part upload functionality (has a minimum part size of 5Mb)
+        assertex(!blobWritten);
+        file->writeBlob(len, data, stats);
+        blobWritten = true;
+    }
+    return len;
+}
+
+void S3FileWriteIO::setSize(offset_t size)
+{
+    UNIMPLEMENTED;
+}
+
+void S3FileWriteIO::flush()
+{
+}
+
+unsigned __int64 S3FileWriteIO::getStatistic(StatisticKind kind)
+{
+    return stats.getStatistic(kind);
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+static Aws::S3::S3Client createAwsClient()
+{
+    //There should be a default region, (and a default client), but allow the region to be overridden with s3:@region//,
+    //in which a new client would be created.
+    // Set up the request
+    Aws::Client::ClientConfiguration configuration;
+    configuration.region = "eu-west-2";
+
+#ifdef FIXED_CREDENTIALS
+    //The following code allows the access id/secret to come from a value that had been saved away in a secrets manager
+    constexpr const char * myAccessKeyId = "<id>";
+    constexpr const char * myAccessKeySecret = "<secret>";
+    auto credentials = std::make_shared<Aws::Auth::SimpleAWSCredentialsProvider>(Aws::String(myAccessKeyId), Aws::String(myAccessKeySecret));
+    return Aws::S3::S3Client(credentials, configuration);
+#else
+    //Retrieve the details from environment variables/file/current environment
+    return Aws::S3::S3Client(configuration);
+#endif
+}
+
+static Aws::S3::S3Client & getAwsClient()
+{
+    static Aws::S3::S3Client client = createAwsClient();
+    return client;
+}
+
+
+S3File::S3File(const char *_s3FileName) : fullName(_s3FileName)
+{
+    const char * filename = fullName + strlen(s3FilePrefix);
+    const char * slash = strchr(filename, '/');
+    assertex(slash);
+
+    bucketName.append(slash-filename, filename);
+    objectName.set(slash+1);
+}
+
+bool S3File::getTime(CDateTime * createTime, CDateTime * modifiedTime, CDateTime * accessedTime)
+{
+    ensureMetaData();
+    if (createTime)
+        createTime->clear();
+    if (modifiedTime)
+    {
+        modifiedTime->clear();
+        modifiedTime->set((time_t)(modifiedMsTime / 1000));
+    }
+    if (accessedTime)
+        accessedTime->clear();
+    return false;
+}
+
+void S3File::readBlob(Aws::S3::Model::GetObjectOutcome & readResult, FileIOStats & stats, offset_t from, offset_t length)
+{
+    Aws::S3::S3Client & s3_client = getAwsClient();
+
+    Aws::S3::Model::GetObjectRequest object_request;
+    object_request.SetBucket(bucketName);
+    object_request.SetKey(objectName);
+    if ((from != 0) || (length != unknownFileSize))
+    {
+        StringBuffer range;
+        range.append("bytes=").append(from).append("-");
+        if (length != unknownFileSize)
+            range.append(from + length - 1);
+        object_request.SetRange(Aws::String(range));
+    }
+
+    // Get the object
+
+    CCycleTimer timer;
+    readResult = s3_client.GetObject(object_request);
+    stats.ioReads++;
+    stats.ioReadCycles += timer.elapsedCycles();
+    stats.ioReadBytes += readResult.GetResult().GetContentLength();
+
+#ifdef TRACE_S3
+    if (!readResult.IsSuccess())
+    {
+        auto error = readResult.GetError();
+        DBGLOG("ERROR: %s: %s", error.GetExceptionName().c_str(), error.GetMessage().c_str());
+    }
+#endif
+}
+
+size32_t S3File::writeBlob(size32_t len, const void * data, FileIOStats & stats)
+{
+    Aws::S3::S3Client & s3_client = getAwsClient();
+
+    Aws::S3::Model::PutObjectOutcome writeResult;
+    Aws::S3::Model::PutObjectRequest writeRequest;
+    writeRequest.WithBucket(bucketName).WithKey(objectName);
+
+    auto body = std::make_shared<std::stringstream>(std::stringstream::in | std::stringstream::out | std::stringstream::binary);
+    body->write(reinterpret_cast<const char*>(data), len);
+
+    writeRequest.SetBody(body);
+
+    CCycleTimer timer;
+    writeResult = s3_client.PutObject(writeRequest);
+    stats.ioWrites++;
+    stats.ioWriteCycles += timer.elapsedCycles();
+    stats.ioWriteBytes += len;
+
+#ifdef TRACE_S3
+    if (!writeResult.IsSuccess())
+    {
+        auto error = writeResult.GetError();
+        DBGLOG("ERROR: %s: %s", error.GetExceptionName().c_str(), error.GetMessage().c_str());
+        return 0;
+    }
+#endif
+    return len;
+}
+
+IFileIO * S3File::createFileReadIO()
+{
+    //Read the first chunk of the file.  If it is the full file then fill in the meta information, otherwise
+    //ensure the meta information is calculated before creating the file IO object
+    Aws::S3::Model::GetObjectOutcome readResult;
+    FileIOStats readStats;
+
+    CriticalBlock block(cs);
+    readBlob(readResult, readStats, 0, awsReadRequestSize);
+    if (!readResult.IsSuccess())
+        return nullptr;
+
+    if (!haveMeta)
+    {
+        offset_t readSize = readResult.GetResult().GetContentLength();
+
+        //If we read the entire file then we don't need to gather the meta to discover the file size.
+        if (readSize < awsReadRequestSize)
+        {
+            haveMeta = true;
+            fileExists = true;
+            fileSize = readResult.GetResult().GetContentLength();
+            modifiedMsTime = readResult.GetResult().GetLastModified().Millis();
+        }
+        else
+        {
+            gatherMetaData();
+            if (!fileExists)
+            {
+                DBGLOG("Internal consistency - read succeeded but head failed.");
+                return nullptr;
+            }
+        }
+    }
+
+    return new S3FileReadIO(this, readResult, readStats);
+}
+
+IFileIO * S3File::createFileWriteIO()
+{
+    return new S3FileWriteIO(this);
+}
+
+void S3File::ensureMetaData()
+{
+    CriticalBlock block(cs);
+    if (haveMeta)
+        return;
+
+    gatherMetaData();
+}
+
+void S3File::gatherMetaData()
+{
+    Aws::S3::S3Client & s3_client = getAwsClient();
+
+    Aws::S3::Model::HeadObjectRequest request;
+    request.SetBucket(bucketName);
+    request.SetKey(objectName);
+
+    // Get the object
+    Aws::S3::Model::HeadObjectOutcome headResult = s3_client.HeadObject(request);
+    if (headResult.IsSuccess())
+    {
+        fileExists = true;
+        fileSize = headResult.GetResult().GetContentLength();
+        modifiedMsTime = headResult.GetResult().GetLastModified().Millis();
+    }
+    else
+    {
+#ifdef TRACE_S3
+        auto error = headResult.GetError();
+        DBGLOG("ERROR: %s: %s", error.GetExceptionName().c_str(), error.GetMessage().c_str());
+#endif
+    }
+    haveMeta = true;
+}
+
+bool S3File::remove()
+{
+    Aws::S3::S3Client & s3_client = getAwsClient();
+
+    Aws::S3::Model::DeleteObjectRequest object_request;
+    object_request.SetBucket(bucketName);
+    object_request.SetKey(objectName);
+
+    // Get the object
+    Aws::S3::Model::DeleteObjectOutcome result = s3_client.DeleteObject(object_request);
+    if (result.IsSuccess())
+    {
+        CriticalBlock block(cs);
+        haveMeta = true;
+        fileExists = false;
+        fileSize = unknownFileSize;
+        return true;
+    }
+    else
+    {
+#ifdef TRACE_S3
+        auto error = result.GetError();
+        DBGLOG("ERROR: S3 Delete Object %s: %s", error.GetExceptionName().c_str(), error.GetMessage().c_str());
+#endif
+        return false;
+    }
+}
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+static IFile *createS3File(const char *s3FileName)
+{
+    return new S3File(s3FileName);
+}
+
+
+//---------------------------------------------------------------------------------------------------------------------
+
+class S3FileHook : public CInterfaceOf<IContainedFileHook>
+{
+public:
+    virtual IFile * createIFile(const char *fileName)
+    {
+        if (isS3FileName(fileName))
+            return createS3File(fileName);
+        else
+            return NULL;
+    }
+
+protected:
+    static bool isS3FileName(const char *fileName)
+    {
+        if (!startsWith(fileName, s3FilePrefix))
+            return false;
+        const char * filename = fileName + strlen(s3FilePrefix);
+        const char * slash = strchr(filename, '/');
+        if (!slash)
+            return false;
+        return true;
+    }
+} *s3FileHook;
+
+static CriticalSection *cs;
+
+extern S3FILE_API void installFileHook()
+{
+    CriticalBlock b(*cs); // Probably overkill!
+    if (!s3FileHook)
+    {
+        s3FileHook = new S3FileHook;
+        addContainedFileHook(s3FileHook);
+    }
+}
+
+extern S3FILE_API void removeFileHook()
+{
+    if (cs)
+    {
+        CriticalBlock b(*cs); // Probably overkill!
+        if (s3FileHook)
+        {
+            removeContainedFileHook(s3FileHook);
+            delete s3FileHook;
+            s3FileHook = NULL;
+        }
+    }
+}
+
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    cs = new CriticalSection;
+    s3FileHook = NULL;  // Not really needed, but you have to have a modinit to match a modexit
+    return true;
+}
+
+MODULE_EXIT()
+{
+    if (s3FileHook)
+    {
+        removeContainedFileHook(s3FileHook);
+        s3FileHook = NULL;
+    }
+    ::Release(s3FileHook);
+    delete cs;
+    cs = NULL;
+}

+ 39 - 0
common/remote/hooks/s3/s3file.hpp

@@ -0,0 +1,39 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2020 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef S3FILE_HPP
+#define S3FILE_HPP
+
+#include "jfile.hpp"
+
+#ifdef S3FILE_EXPORTS
+#define S3FILE_API DECL_EXPORT
+#else
+#define S3FILE_API DECL_IMPORT
+#endif
+
+/*
+ * Direct access to files in s3 buckets
+ * Installs hooks into createIFile, spotting filenames of the form s3://url
+ */
+
+extern "C" {
+  extern S3FILE_API void installFileHook();
+  extern S3FILE_API void removeFileHook();
+};
+
+#endif

+ 9 - 3
ecl/eclcc/eclcc.cpp

@@ -1666,17 +1666,23 @@ void EclCC::processFile(EclCompileInstance & instance)
 {
     clearTransformStats();
 
-    const char * curFilename = instance.inputFile->queryFilename();
+    Linked<IFile> inputFile = instance.inputFile;
+    const char * curFilename = inputFile->queryFilename();
     assertex(curFilename);
     bool inputFromStdIn = streq(curFilename, "stdin:");
+
+    //Ensure the filename is fully expanded, but do not recreate the IFile if it was already absolute
     StringBuffer expandedSourceName;
-    if (!inputFromStdIn && !optNoSourcePath)
+    if (!inputFromStdIn && !optNoSourcePath && !isAbsolutePath(curFilename))
+    {
         makeAbsolutePath(curFilename, expandedSourceName);
+        inputFile.setown(createIFile(expandedSourceName));
+    }
     else
         expandedSourceName.append(curFilename);
 
     Owned<ISourcePath> sourcePath = (optNoSourcePath||inputFromStdIn) ? NULL : createSourcePath(expandedSourceName);
-    Owned<IFileContents> queryText = createFileContentsFromFile(expandedSourceName, sourcePath, false, NULL);
+    Owned<IFileContents> queryText = createFileContents(inputFile, sourcePath, false, NULL);
 
     const char * queryTxt = queryText->getText();
     if (optArchive || optGenerateDepend || optSaveQueryArchive)

+ 8 - 2
system/aws/CMakeLists.txt

@@ -14,21 +14,26 @@ if(USE_AWS)
         SOURCE_DIR ${AWS_SDK_CPP_SOURCE_DIR}
         BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR}/aws-sdk-cpp
         CMAKE_ARGS -DCUSTOM_MEMORY_MANAGEMENT:string=0 -DCMAKE_CXX_FLAGS=${EXTERNAL_SQS_CXX_FLAGS}
-        BUILD_COMMAND ${CMAKE_MAKE_PROGRAM} LDFLAGS=-Wl,-rpath-link,${LIB_PATH} aws-cpp-sdk-sqs
+        BUILD_COMMAND ${CMAKE_MAKE_PROGRAM} LDFLAGS=-Wl,-rpath-link,${LIB_PATH} aws-cpp-sdk-sqs aws-cpp-sdk-s3
         INSTALL_COMMAND "")
 
     include(GNUInstallDirs)
     set(AWS_DEP_LIBDIR ${CMAKE_CURRENT_BINARY_DIR}/aws-sdk-cpp/.deps/install/${CMAKE_INSTALL_LIBDIR})
     add_library(aws-cpp-sdk-core SHARED IMPORTED GLOBAL)
     add_library(aws-cpp-sdk-sqs SHARED IMPORTED GLOBAL)
+    add_library(aws-cpp-sdk-s3 SHARED IMPORTED GLOBAL)
+
     set_property(TARGET aws-cpp-sdk-core
         PROPERTY IMPORTED_LOCATION ${CMAKE_CURRENT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-core/libaws-cpp-sdk-core.so)
     set_property(TARGET aws-cpp-sdk-sqs
         PROPERTY IMPORTED_LOCATION ${CMAKE_CURRENT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-sqs/libaws-cpp-sdk-sqs.so)
+    set_property(TARGET aws-cpp-sdk-s3
+        PROPERTY IMPORTED_LOCATION ${CMAKE_CURRENT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-s3/libaws-cpp-sdk-s3.so)
     add_dependencies(aws-cpp-sdk-core aws-sdk-cpp)
     add_dependencies(aws-cpp-sdk-sqs aws-sdk-cpp)
+    add_dependencies(aws-cpp-sdk-s3 aws-sdk-cpp)
 
-    install(CODE "set(ENV{LD_LIBRARY_PATH} \"\$ENV{LD_LIBRARY_PATH}:${AWS_DEP_LIBDIR}:${PROJECT_BINARY_DIR}:${PROJECT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-core:${PROJECT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-sqs\")")
+    install(CODE "set(ENV{LD_LIBRARY_PATH} \"\$ENV{LD_LIBRARY_PATH}:${AWS_DEP_LIBDIR}:${PROJECT_BINARY_DIR}:${PROJECT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-core:${PROJECT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-sqs:${PROJECT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-s3\")")
     install(FILES
         ${CMAKE_CURRENT_SOURCE_DIR}/aws-sdk-cpp/LICENSE.txt
         DESTINATION "."
@@ -44,6 +49,7 @@ if(USE_AWS)
     install(PROGRAMS
         ${CMAKE_CURRENT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-core/libaws-cpp-sdk-core.so
         ${CMAKE_CURRENT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-sqs/libaws-cpp-sdk-sqs.so
+        ${CMAKE_CURRENT_BINARY_DIR}/aws-sdk-cpp/aws-cpp-sdk-s3/libaws-cpp-sdk-s3.so
         ${AWS_DEP_LIBDIR}/libaws-c-common.so.1.0.0
         ${AWS_DEP_LIBDIR}/libaws-c-common.so
         ${AWS_DEP_LIBDIR}/libaws-c-common.so.0unstable

+ 1 - 1
system/jlib/jatomic.hpp

@@ -58,7 +58,7 @@ public:
     RelaxedAtomic() noexcept = default;
     inline constexpr RelaxedAtomic(T _value) noexcept : BASE(_value) { }
     ~RelaxedAtomic() noexcept = default;
-    RelaxedAtomic(const RelaxedAtomic&) = delete;
+    RelaxedAtomic(const RelaxedAtomic& _value) { BASE::store(_value.load()); }
     RelaxedAtomic& operator=(const RelaxedAtomic&) = delete;
 
     inline operator T() const noexcept { return load(); }

+ 77 - 28
system/jlib/jfile.cpp

@@ -1773,26 +1773,7 @@ offset_t CFileIO::appendFile(IFile *file,offset_t pos,offset_t len)
 
 unsigned __int64 CFileIO::getStatistic(StatisticKind kind)
 {
-    switch (kind)
-    {
-    case StCycleDiskReadIOCycles:
-        return ioReadCycles.load();
-    case StCycleDiskWriteIOCycles:
-        return ioWriteCycles.load();
-    case StTimeDiskReadIO:
-        return cycle_to_nanosec(ioReadCycles.load());
-    case StTimeDiskWriteIO:
-        return cycle_to_nanosec(ioWriteCycles.load());
-    case StSizeDiskRead:
-        return ioReadBytes.load();
-    case StSizeDiskWrite:
-        return ioWriteBytes.load();
-    case StNumDiskReads:
-        return ioReads.load();
-    case StNumDiskWrites:
-        return ioWrites.load();
-    }
-    return 0;
+    return stats.getStatistic(kind);
 }
 
 #ifdef _WIN32
@@ -1911,7 +1892,7 @@ void CFileIO::setSize(offset_t pos)
 
 // More errorno checking TBD
 CFileIO::CFileIO(HANDLE handle, IFOmode _openmode, IFSHmode _sharemode, IFEflags _extraFlags)
-    : ioReadCycles(0), ioWriteCycles(0), ioReadBytes(0), ioWriteBytes(0), ioReads(0), ioWrites(0), unflushedReadBytes(0), unflushedWriteBytes(0)
+    : unflushedReadBytes(0), unflushedWriteBytes(0)
 {
     assertex(handle != NULLFILE);
     throwOnError = false;
@@ -1998,9 +1979,9 @@ size32_t CFileIO::read(offset_t pos, size32_t len, void * data)
 
     CCycleTimer timer;
     size32_t ret = checked_pread(file, data, len, pos);
-    ioReadCycles.fetch_add(timer.elapsedCycles());
-    ioReadBytes.fetch_add(ret);
-    ++ioReads;
+    stats.ioReadCycles.fetch_add(timer.elapsedCycles());
+    stats.ioReadBytes.fetch_add(ret);
+    ++stats.ioReads;
 
     if ( (extraFlags & IFEnocache) && (ret > 0) )
     {
@@ -2037,9 +2018,9 @@ size32_t CFileIO::write(offset_t pos, size32_t len, const void * data)
 {
     CCycleTimer timer;
     size32_t ret = pwrite(file,data,len,pos);
-    ioWriteCycles.fetch_add(timer.elapsedCycles());
-    ioWriteBytes.fetch_add(ret);
-    ++ioWrites;
+    stats.ioWriteCycles.fetch_add(timer.elapsedCycles());
+    stats.ioWriteBytes.fetch_add(ret);
+    ++stats.ioWrites;
 
     if (ret==(size32_t)-1)
         throw makeErrnoException(errno, "CFileIO::write");
@@ -5113,9 +5094,41 @@ StringBuffer &makePathUniversal(const char *path, StringBuffer &out)
     return out;
 }
 
+//Treat a filename as absolute if:
+//  a) The filename begins with a path separator character   e.g. /home/hpcc/blah    \Users\hpcc\blah
+//  b) If there is a colon before the first path separator   e.g. c:\Users\hpcc\blah   s3://mycontainer/myblob
+//
+// Do not match:
+//  A) regress::myfile::x::y
+//  B) local/mydir
+//  C) mylocal
+bool isAbsolutePath(const char *path)
+{
+    if (!path||!*path)
+        return false;
+    if (isPathSepChar(path[0]))
+        return true;
+    const char * cur = path;
+    bool hadColon = false;
+    for (;;)
+    {
+        switch (*cur++)
+        {
+        case '/':
+        case '\\':
+            return hadColon;
+        case '\0':
+            return false;
+        case ':':
+            hadColon = true;
+            break;
+        }
+    }
+}
+
 StringBuffer &makeAbsolutePath(const char *relpath,StringBuffer &out, bool mustExist)
 {
-    if (isPathSepChar(relpath[0])&&(relpath[0]==relpath[1]))
+    if (isAbsolutePath(relpath))
     {
         if (mustExist)
         {
@@ -5125,6 +5138,7 @@ StringBuffer &makeAbsolutePath(const char *relpath,StringBuffer &out, bool mustE
         }
         return out.append(relpath); // if remote then already should be absolute
     }
+
 #ifdef _WIN32
     char rPath[MAX_PATH];
     char *filepart;
@@ -5141,6 +5155,7 @@ StringBuffer &makeAbsolutePath(const char *relpath,StringBuffer &out, bool mustE
     }
     out.append(rPath);
 #else
+
     StringBuffer expanded;
     //Expand ~ on the front of a filename - useful for paths not passed on the command line
     //Note, it does not support the ~user/ version of the syntax
@@ -6840,3 +6855,37 @@ IDirectoryIterator *getSortedDirectoryIterator(const char *dirName, SortDirector
     return getSortedDirectoryIterator(dir, mode, rev, mask, sub, includedirs);
 }
 
+//--------------------------------------------------------------------------------------------------------------------
+
+unsigned __int64 FileIOStats::getStatistic(StatisticKind kind)
+{
+    switch (kind)
+    {
+    case StCycleDiskReadIOCycles:
+        return ioReadCycles.load();
+    case StCycleDiskWriteIOCycles:
+        return ioWriteCycles.load();
+    case StTimeDiskReadIO:
+        return cycle_to_nanosec(ioReadCycles.load());
+    case StTimeDiskWriteIO:
+        return cycle_to_nanosec(ioWriteCycles.load());
+    case StSizeDiskRead:
+        return ioReadBytes.load();
+    case StSizeDiskWrite:
+        return ioWriteBytes.load();
+    case StNumDiskReads:
+        return ioReads.load();
+    case StNumDiskWrites:
+        return ioWrites.load();
+    }
+    return 0;
+}
+
+void FileIOStats::trace()
+{
+    if (ioReads)
+        printf("Reads: %u  Bytes: %u  TimeMs: %u\n", (unsigned)ioReads, (unsigned)ioReadBytes, (unsigned)cycle_to_millisec(ioReadCycles));
+    if (ioWrites)
+        printf("Writes: %u  Bytes: %u  TimeMs: %u\n", (unsigned)ioWrites, (unsigned)ioWriteBytes, (unsigned)cycle_to_millisec(ioWriteCycles));
+}
+

+ 21 - 8
system/jlib/jfile.hpp

@@ -41,6 +41,8 @@ enum IFSHmode { IFSHnone, IFSHread=0x8, IFSHfull=0x10};   // sharing modes
 enum IFSmode { IFScurrent = FILE_CURRENT, IFSend = FILE_END, IFSbegin = FILE_BEGIN };    // seek mode
 enum CFPmode { CFPcontinue, CFPcancel, CFPstop };    // modes for ICopyFileProgress::onProgress return
 enum IFEflags { IFEnone=0x0, IFEnocache=0x1, IFEcache=0x2 };    // mask
+constexpr offset_t unknownFileSize = -1;
+
 class CDateTime;
 
 interface IDirectoryIterator : extends IIteratorOf<IFile> 
@@ -344,7 +346,6 @@ extern jlib_decl ISerialStream *createMemorySerialStream(const void *buffer, mem
 extern jlib_decl ISerialStream *createMemoryBufferSerialStream(MemoryBuffer & buffer, IFileSerialStreamCallback *callback=NULL);
 
 
-
 typedef Linked<IFile> IFileAttr;
 typedef Linked<IFileIO> IFileIOAttr;
 typedef Linked<IFileIOStream> IFileIOStreamAttr;
@@ -538,13 +539,7 @@ inline const char *splitDirTail(const char *path,StringBuffer &dir)
     return tail;        
 }
 
-inline bool isAbsolutePath(const char *path)
-{   
-    if (!path||!*path)
-        return false;
-    return isPathSepChar(path[0])||((path[1]==':')&&(isPathSepChar(path[2])));
-}
-
+extern jlib_decl bool isAbsolutePath(const char *path);
 extern jlib_decl StringBuffer &makeAbsolutePath(const char *relpath,StringBuffer &out,bool mustExist=false);
 extern jlib_decl StringBuffer &makeAbsolutePath(StringBuffer &relpath,bool mustExist=false);
 extern jlib_decl StringBuffer &makeAbsolutePath(const char *relpath, const char *basedir, StringBuffer &out);
@@ -628,4 +623,22 @@ const static bool filenamesAreCaseSensitive = true;
 extern jlib_decl IDirectoryIterator *getSortedDirectoryIterator(IFile *directory, SortDirectoryMode mode = SD_byname, bool rev = false, const char *mask = nullptr, bool sub = false, bool includedirs = false);
 extern jlib_decl IDirectoryIterator *getSortedDirectoryIterator(const char *dirName, SortDirectoryMode mode = SD_byname, bool rev = false, const char *mask = nullptr, bool sub = false, bool includedirs = false);
 
+//--------------------------------------------------------------------------------------------------------------------
+
+class jlib_decl FileIOStats
+{
+public:
+    unsigned __int64 getStatistic(StatisticKind kind);
+    void trace();
+
+public:
+    RelaxedAtomic<cycle_t> ioReadCycles{0};
+    RelaxedAtomic<cycle_t> ioWriteCycles{0};
+    RelaxedAtomic<__uint64> ioReadBytes{0};
+    RelaxedAtomic<__uint64> ioWriteBytes{0};
+    RelaxedAtomic<__uint64> ioReads{0};
+    RelaxedAtomic<__uint64> ioWrites{0};
+};
+
+
 #endif

+ 1 - 6
system/jlib/jfile.ipp

@@ -117,12 +117,7 @@ protected:
     IFSHmode            sharemode;
     IFOmode             openmode;
     IFEflags            extraFlags;
-    RelaxedAtomic<cycle_t> ioReadCycles;
-    RelaxedAtomic<cycle_t> ioWriteCycles;
-    RelaxedAtomic<__uint64> ioReadBytes;
-    RelaxedAtomic<__uint64> ioWriteBytes;
-    RelaxedAtomic<__uint64> ioReads;
-    RelaxedAtomic<__uint64> ioWrites;
+    FileIOStats         stats;
     RelaxedAtomic<unsigned> unflushedReadBytes; // more: If this recorded flushedReadBytes it could have a slightly lower overhead
     RelaxedAtomic<unsigned> unflushedWriteBytes;
 private: