|
@@ -10,7 +10,6 @@ using namespace std;
|
|
|
using std::string;
|
|
|
using std::vector;
|
|
|
|
|
|
-//#define EOL "\n\r"
|
|
|
#define EOL "\n"
|
|
|
|
|
|
tOffset getBlockSize(hdfsFS * filefs, const char * filename)
|
|
@@ -75,7 +74,7 @@ long getRecordCount(long fsize, int clustersize, int reclen, int nodeid)
|
|
|
if ((fsize / reclen) % clustersize >= nodeid + 1)
|
|
|
{
|
|
|
readSize += 1;
|
|
|
- fprintf(stderr, "\nThis node will stream one extra rec\n");
|
|
|
+ fprintf(stderr, "\nThis node will pipe one extra rec\n");
|
|
|
}
|
|
|
return readSize;
|
|
|
}
|
|
@@ -325,7 +324,7 @@ int readXMLOffset(hdfsFS * fs, const char * filename,
|
|
|
firstRowfound = strcmp(currentTag.c_str(),
|
|
|
openRowTag.c_str()) == 0;
|
|
|
if (firstRowfound)
|
|
|
- fprintf(stderr, "--start streaming tag %s at %lu--\n",
|
|
|
+ fprintf(stderr, "--start piping tag %s at %lu--\n",
|
|
|
currentTag.c_str(), currentPos);
|
|
|
}
|
|
|
|
|
@@ -360,7 +359,7 @@ int readXMLOffset(hdfsFS * fs, const char * filename,
|
|
|
&& strcmp(currentTag.c_str(), closeRowTag.c_str()) == 0)
|
|
|
{
|
|
|
fprintf(stdout, "%s", currentTag.c_str());
|
|
|
- fprintf(stderr, "--stop streaming at %s %lu--\n",
|
|
|
+ fprintf(stderr, "--stop piping at %s %lu--\n",
|
|
|
currentTag.c_str(), currentPos);
|
|
|
bytesLeft = 0;
|
|
|
break;
|
|
@@ -412,7 +411,7 @@ int readXMLOffset(hdfsFS * fs, const char * filename,
|
|
|
|
|
|
int readCSVOffset(hdfsFS * fs, const char * filename, unsigned long seekPos,
|
|
|
unsigned long readlen, const char * eolseq, unsigned long bufferSize, bool outputTerminator,
|
|
|
- unsigned long recLen, unsigned long maxlen, const char * quote)
|
|
|
+ unsigned long recLen, unsigned long maxLen, const char * quote)
|
|
|
{
|
|
|
fprintf(stderr, "CSV terminator: \'%s\' and quote: \'%c\'\n", eolseq, quote[0]);
|
|
|
unsigned long recsFound = 0;
|
|
@@ -514,7 +513,7 @@ int readCSVOffset(hdfsFS * fs, const char * filename, unsigned long seekPos,
|
|
|
currentPos = currentPos + eolseqlen - 1;
|
|
|
bytesLeft = bytesLeft - eolseqlen;
|
|
|
|
|
|
- fprintf(stderr, "\n--Start streaming: %ld--\n", currentPos);
|
|
|
+ fprintf(stderr, "\n--Start reading: %ld--\n", currentPos);
|
|
|
|
|
|
firstEOLfound = true;
|
|
|
continue;
|
|
@@ -534,7 +533,7 @@ int readCSVOffset(hdfsFS * fs, const char * filename, unsigned long seekPos,
|
|
|
//fprintf(stderr, "\nrecsfound: %ld", recsFound);
|
|
|
if (stopAtNextEOL)
|
|
|
{
|
|
|
- fprintf(stderr, "\n--Stop streaming: %ld--\n", currentPos);
|
|
|
+ fprintf(stderr, "\n--Stop piping: %ld--\n", currentPos);
|
|
|
//fprintf(stdout, "%s", eolseq);
|
|
|
bytesLeft = 0;
|
|
|
break;
|
|
@@ -555,7 +554,7 @@ int readCSVOffset(hdfsFS * fs, const char * filename, unsigned long seekPos,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- //don't stream until we're beyond the first EOL (if offset = 0 start streaming ASAP)
|
|
|
+ //don't pipe until we're beyond the first EOL (if offset = 0 start piping ASAP)
|
|
|
if (firstEOLfound)
|
|
|
{
|
|
|
fprintf(stdout, "%c", currChar);
|
|
@@ -565,7 +564,7 @@ int readCSVOffset(hdfsFS * fs, const char * filename, unsigned long seekPos,
|
|
|
{
|
|
|
fprintf(stderr, "%c", currChar);
|
|
|
bytesLeft--;
|
|
|
- if(recLen > 0 && currentPos-seekPos > recLen * 100)
|
|
|
+ if(maxLen > 0 && currentPos-seekPos > maxLen * 10)
|
|
|
{
|
|
|
fprintf(stderr, "\nFirst EOL was not found within the first %lu bytes", currentPos-seekPos);
|
|
|
exit(-1);
|
|
@@ -575,7 +574,7 @@ int readCSVOffset(hdfsFS * fs, const char * filename, unsigned long seekPos,
|
|
|
if (stopAtNextEOL)
|
|
|
fprintf(stderr, "%c", currChar);
|
|
|
|
|
|
- // ok, so if bytesLeft <= 0 at this point, we need to keep reading
|
|
|
+ // ok, so if bytesLeft <= 0 at this point, we need to keep piping
|
|
|
// IF the last char read was not an EOL char
|
|
|
if (bytesLeft <= 0 && currChar != eolseq[0])
|
|
|
{
|
|
@@ -623,7 +622,7 @@ int readFileOffset(hdfsFS * fs, const char * filename, tOffset seekPos,
|
|
|
|
|
|
unsigned long currentPos = seekPos;
|
|
|
|
|
|
- fprintf(stderr, "\n--Start streaming: %ld--\n", currentPos);
|
|
|
+ fprintf(stderr, "\n--Start piping: %ld--\n", currentPos);
|
|
|
|
|
|
unsigned long bytesLeft = readlen;
|
|
|
while(hdfsAvailable(*fs, readFile) && bytesLeft >0)
|
|
@@ -821,7 +820,7 @@ int writeFlatOffset(hdfsFS * fs, const char * filename, unsigned nodeid, unsigne
|
|
|
size_t totalbytesread = 0;
|
|
|
size_t totalbyteswritten = 0;
|
|
|
|
|
|
- fprintf(stderr, "Writing %s to HDFS [.", filepartname);
|
|
|
+ fprintf(stderr, "Writing %s to HDFS.", filepartname);
|
|
|
while(!in.eof())
|
|
|
{
|
|
|
memset(&char_ptr[0], 0, sizeof(char_ptr));
|
|
@@ -831,7 +830,6 @@ int writeFlatOffset(hdfsFS * fs, const char * filename, unsigned nodeid, unsigne
|
|
|
tSize num_written_bytes = hdfsWrite(*fs, writeFile, (void*)char_ptr, bytesread);
|
|
|
totalbyteswritten += num_written_bytes;
|
|
|
|
|
|
- fprintf(stderr, ".");
|
|
|
//Need to figure out how often this should be done
|
|
|
//if(totalbyteswritten % )
|
|
|
|
|
@@ -850,7 +848,6 @@ int writeFlatOffset(hdfsFS * fs, const char * filename, unsigned nodeid, unsigne
|
|
|
fprintf(stderr, "Failed to 'flush' %s\n", filepartname);
|
|
|
exit(-1);
|
|
|
}
|
|
|
- fprintf(stderr, "]");
|
|
|
|
|
|
fprintf(stderr,"\n total read: %lu, total written: %lu\n", totalbytesread, totalbyteswritten);
|
|
|
|