Преглед изворни кода

Merge pull request #177 from afishbeck/issue172

ISSUE #172: Move IDFUWorkUnit::submit() to a standalone function.

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 14 година
родитељ
комит
33d7ef9f54
4 измењених фајлова са 69 додато и 72 уклоњено
  1. 10 16
      dali/datest/dfuwutest.cpp
  2. 43 24
      dali/dfu/dfuwu.cpp
  3. 4 1
      dali/dfu/dfuwu.hpp
  4. 12 31
      esp/services/ws_fs/ws_fsService.cpp

+ 10 - 16
dali/datest/dfuwutest.cpp

@@ -120,7 +120,7 @@ void copyTest()
         destination->setLogicalName(dstname.str());
         options->setReplicate(true);
         options->setOverwrite(true);
-        wu->submit();
+        submitDFUWorkUnit(wu.getClear());
     }
 }
 
@@ -159,7 +159,7 @@ void importTest()
     destination->setLogicalName(dstname.str());
     options->setReplicate(true);
     options->setOverwrite(true);
-    wu->submit();
+    submitDFUWorkUnit(wu.getClear());
 }
 
 void SprayTest(unsigned num)
@@ -207,7 +207,7 @@ void SprayTest(unsigned num)
                 options->setOverwrite(true);
             }
             PROGLOG("submitting %s",wu->queryId()); 
-            wu->submit();
+            submitDFUWorkUnit(wu.getClear());
         }
     }
 }
@@ -410,7 +410,7 @@ void testWUcreate(int kind,StringBuffer &wuidout)
         break;
     }
     wuidout.append(wu->queryId());  
-    wu->submit();
+    submitDFUWorkUnit(wu.getClear());
 }
 
 IFileDescriptor *createRoxieFileDescriptor(const char *cluster, const char *lfn, bool servers)
@@ -744,8 +744,7 @@ void testRepeatedFiles1(StringBuffer &wuid)
     destination->setLogicalName("thor_dev::nigel::testspray1");
     options->setReplicate(true);
     options->setOverwrite(true);
-    wu->submit();
-
+    submitDFUWorkUnit(wu.getClear());
 }
 
 void testRepeatedFiles2(StringBuffer &wuid)
@@ -781,8 +780,7 @@ void testRepeatedFiles2(StringBuffer &wuid)
     destination->setLogicalName("thor_dev::nigel::testcopy1");
     options->setReplicate(true);
     options->setOverwrite(true);
-    wu->submit();
-
+    submitDFUWorkUnit(wu.getClear());
 }
 
 static IGroup *getAuxGroup()
@@ -837,8 +835,7 @@ void testRepeatedFiles3(StringBuffer &wuid)
     destination->setLogicalName("thor_dev::nigel::testspray2");
     options->setReplicate(true);
     options->setOverwrite(true);
-    wu->submit();
-
+    submitDFUWorkUnit(wu.getClear());
 }
 
 void testRepeatedFiles4(StringBuffer &wuid)
@@ -868,8 +865,7 @@ void testRepeatedFiles4(StringBuffer &wuid)
     destination->setWrap(true);
     options->setReplicate(true);
     options->setOverwrite(true);
-    wu->submit();
-
+    submitDFUWorkUnit(wu.getClear());
 }
 
 void testRepeatedFiles5(StringBuffer &wuid)
@@ -902,8 +898,7 @@ void testRepeatedFiles5(StringBuffer &wuid)
     options->setReplicate(true);
     options->setOverwrite(true);
     options->setSuppressNonKeyRepeats(true);                            // **** only repeat last part when src kind = key
-    wu->submit();
-
+    submitDFUWorkUnit(wu.getClear());
 }
 
 void testSuperCopy1(StringBuffer &wuid)
@@ -934,8 +929,7 @@ void testSuperCopy1(StringBuffer &wuid)
     options->setReplicate(true);
     options->setOverwrite(true);
     options->setSuppressNonKeyRepeats(true);
-    wu->submit();
-
+    submitDFUWorkUnit(wu.getClear());
 }
 
 

+ 43 - 24
dali/dfu/dfuwu.cpp

@@ -2542,29 +2542,6 @@ public:
         return &monitor;
     }
 
-    void submit()           
-    {
-        StringBuffer qname;
-        if (getQueue(qname).length()==0) {
-            throw MakeStringException(-1, "DFU no queue name specified");
-        }
-        Owned<IJobQueue> queue = createJobQueue(qname.str());
-        if (!queue.get()) {
-            throw MakeStringException(-1, "Cound not create queue");
-        }
-        IJobQueueItem *item = createJobQueueItem(queryId());
-        item->setEndpoint(queryMyNode()->endpoint());
-        StringBuffer user;
-        if (getUser(user).length()!=0) 
-            item->setOwner(user.str());
-        progress.setState(DFUstate_queued);
-        progress.clearProgress();
-        removeTree(root,"Exceptions");
-        commit();
-        queue->enqueue(item);
-    }
-
-
     void cleanupAndDelete()
     {
         if (isProtected())
@@ -2621,6 +2598,10 @@ public:
         return new CExceptionIterator(tree);
     }
 
+    void clearExceptions()
+    {
+        removeTree(root,"Exceptions");
+    }
 
     StringBuffer& getApplicationValue(const char *app, const char *propname, StringBuffer &str) const
     {
@@ -3141,7 +3122,7 @@ IDfuFileCopier *createRemoteFileCopier(const char *qname,const char *clustername
             StringBuffer eps;
             PROGLOG("%s: Copy %s from %s to %s",wuid,srclfn,srcdali.getUrlStr(eps).str(),lfn);
             wuids.append(wuid);
-            wu->submit();
+            submitDFUWorkUnit(wu.getClear());
             return true;
         }
 
@@ -3178,3 +3159,41 @@ IDfuFileCopier *createRemoteFileCopier(const char *qname,const char *clustername
     };
     return new cCopier(qname,clustername,jobname,replicate);
 }
+
+
+extern dfuwu_decl void submitDFUWorkUnit(IDFUWorkUnit *workunit)
+{
+    Owned<IDFUWorkUnit> wu(workunit);
+    StringBuffer qname;
+    if (wu->getQueue(qname).length()==0) {
+        throw MakeStringException(-1, "DFU no queue name specified");
+    }
+    Owned<IJobQueue> queue = createJobQueue(qname.str());
+    if (!queue.get()) {
+        throw MakeStringException(-1, "Cound not create queue");
+    }
+    StringBuffer user;
+    wu->getUser(user);
+    IDFUprogress *progress = wu->queryUpdateProgress();
+    progress->setState(DFUstate_queued);
+    progress->clearProgress();
+    wu->clearExceptions();
+    wu->commit();
+
+    StringAttr wuid(wu->queryId());
+    wu.clear();
+    IJobQueueItem *item = createJobQueueItem(wuid.get());
+    item->setEndpoint(queryMyNode()->endpoint());
+    if (user.length()!=0)
+        item->setOwner(user.str());
+    queue->enqueue(item);
+}
+
+extern dfuwu_decl void submitDFUWorkUnit(const char *wuid)
+{
+    Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
+    Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(wuid);
+    if(!wu)
+        throw MakeStringException(-1, "DFU workunit %s could not be opened for update", wuid);
+    submitDFUWorkUnit(wu.getClear());
+}

+ 4 - 1
dali/dfu/dfuwu.hpp

@@ -312,6 +312,7 @@ interface IDFUprogress: extends IConstDFUprogress
     virtual void setPercentDone(unsigned pc)=0;
     virtual void setSubInProgress(const char *str) = 0;     // set sub-DFUWUs in progress
     virtual void setSubDone(const char *str) = 0;           // set sub-DFUWUs done
+    virtual void clearProgress() = 0;
 };
 
 interface IDFUprogressSubscriber: extends IInterface
@@ -398,7 +399,6 @@ interface IDFUWorkUnit : extends IConstDFUWorkUnit
     virtual void addOptions(IPropertyTree *tree) = 0;       // used by DFU command line (for moment)
     virtual IDFUprogress *queryUpdateProgress() = 0;
     virtual IDFUmonitor *queryUpdateMonitor() = 0;
-    virtual void submit() = 0;                              // should Release after submitted
     virtual void closeUpdate() = 0;                     // called before WU obtained by openUpdate is released
     virtual void queryRecoveryStore(IRemoteConnection *& conn,IPropertyTree *&tree,StringBuffer &runningpath) = 0; // not nice - needed by daft 
     virtual void removeRecoveryStore() = 0;
@@ -409,6 +409,7 @@ interface IDFUWorkUnit : extends IConstDFUWorkUnit
     virtual void setApplicationValueInt(const char * application, const char * propname, int value, bool overwrite) = 0;
     virtual void setDebugValue(const char *propname, const char *value, bool overwrite) = 0;
     virtual bool removeQueue() = 0;
+    virtual void clearExceptions() = 0;
 };
 
 
@@ -446,6 +447,8 @@ interface IDFUWorkUnitFactory : extends IInterface
 
 
 extern dfuwu_decl IDFUWorkUnitFactory * getDFUWorkUnitFactory();
+extern dfuwu_decl void submitDFUWorkUnit(IDFUWorkUnit *wu);
+extern dfuwu_decl void submitDFUWorkUnit(const char *wuid);
 
 extern dfuwu_decl DFUcmd decodeDFUcommand(const char * str);
 extern dfuwu_decl StringBuffer &encodeDFUcommand(DFUcmd cmd,StringBuffer &str);

+ 12 - 31
esp/services/ws_fs/ws_fsService.cpp

@@ -65,9 +65,9 @@ int Schedule::run()
                         wu->getTimeScheduled(dt);
                         if (now.compare(dt) > 0)
                         {
-                            Owned<IDFUWorkUnit> lwu = wu->openUpdate(false);
-                            if (lwu)
-                                lwu->submit();
+                            StringAttr wuid(wu->queryId());
+                            wu.clear();
+                            submitDFUWorkUnit(wuid.get());
                         }
                     }
                     catch(IException *e)
@@ -1682,12 +1682,8 @@ bool CFileSprayEx::onSubmitDFUWorkunit(IEspContext &context, IEspSubmitDFUWorkun
         if (!context.validateFeatureAccess(DFU_WU_URL, SecAccess_Write, false))
             throw MakeStringException(ECLWATCH_DFU_WU_ACCESS_DENIED, "Failed to submit DFU workunit. Permission denied.");
 
-        Owned<IDFUWorkUnitFactory> factory = getDFUWorkUnitFactory();
-        Owned<IDFUWorkUnit> wu = factory->updateWorkUnit(req.getWuid());
-        if(!wu)
-            throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT, "Dfu workunit %s not found.", req.getWuid());
+        submitDFUWorkUnit(req.getWuid());
 
-        wu->submit();
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(req.getWuid()).str());
     }
     catch(IException* e)
@@ -1906,11 +1902,8 @@ bool CFileSprayEx::onSprayFixed(IEspContext &context, IEspSprayFixed &req, IEspS
         if (req.getPush())
             options->setPush(true);
 
-        resp.setWuid(wu->queryId());
-
-        wu->submit();                            // enqueue job(does implicit commit)
-
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
+        submitDFUWorkUnit(wu.getClear());
     }
     catch(IException* e)
     {   
@@ -2063,10 +2056,8 @@ bool CFileSprayEx::onSprayVariable(IEspContext &context, IEspSprayVariable &req,
         if (req.getPush())
             options->setPush(true);
 
-        resp.setWuid(wu->queryId());
-
-        wu->submit();                            // enqueue job(does implicit commit)
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
+        submitDFUWorkUnit(wu.getClear());
     }
     catch(IException* e)
     {   
@@ -2113,9 +2104,8 @@ bool CFileSprayEx::onReplicate(IEspContext &context, IEspReplicate &req, IEspRep
             IDFUoptions *opt = wu->queryUpdateOptions();
             opt->setReplicateMode(DFURMmissing,cluster,req.getRepeatLast(),req.getOnlyRepeated());
         }
-        wu->submit();
-        resp.setWuid(wu->queryId());
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
+        submitDFUWorkUnit(wu.getClear());
     }
     catch(IException* e)
     {   
@@ -2226,10 +2216,8 @@ bool CFileSprayEx::onDespray(IEspContext &context, IEspDespray &req, IEspDespray
         if ((encryptkey&&*encryptkey)||(decryptkey&&*decryptkey))
             options->setEncDec(encryptkey,decryptkey);
 
-        resp.setWuid(wu->queryId());
-
-        wu->submit();                            // enqueue job(does implicit commit)
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
+        submitDFUWorkUnit(wu.getClear());
     }
     catch(IException* e)
     {   
@@ -2319,9 +2307,8 @@ bool CFileSprayEx::doCopyForRoxie(IEspContext &context,     const char * srcName
         options->setSuppressNonKeyRepeats(true);                            // **** only repeat last part when src kind = key
     }
 
-    resp.setResult(wu->queryId());
-    wu->submit();                            // enqueue job(does implicit commit)
     resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
+    submitDFUWorkUnit(wu.getClear());
     return true;
 }
 
@@ -2525,9 +2512,8 @@ bool CFileSprayEx::onCopy(IEspContext &context, IEspCopy &req, IEspCopyResponse
         if (req.getIfnewer())
             options->setIfNewer(true);
 
-        resp.setResult(wu->queryId());
-        wu->submit();                            // enqueue job(does implicit commit)
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
+        submitDFUWorkUnit(wu.getClear());
     }
     catch(IException* e)
     {
@@ -2597,10 +2583,8 @@ bool CFileSprayEx::onRename(IEspContext &context, IEspRename &req, IEspRenameRes
         IDFUfileSpec *destination = wu->queryUpdateDestination();
         destination->setLogicalName(dstname);
 
-        resp.setWuid(wu->queryId());
-
-        wu->submit();                            // enqueue job(does implicit commit)
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
+        submitDFUWorkUnit(wu.getClear());
     }
     catch(IException* e)
     {
@@ -2873,11 +2857,8 @@ bool CFileSprayEx::onDfuMonitor(IEspContext &context, IEspDfuMonitorRequest &req
         monitor->setShotLimit(req.getShotLimit());
         monitor->setSub(req.getSub());
 
-        resp.setWuid(wu->queryId());
-
-        wu->submit();                            // enqueue job(does implicit commit)
-
         resp.setRedirectUrl(StringBuffer("/FileSpray/GetDFUWorkunit?wuid=").append(wu->queryId()).str());
+        submitDFUWorkUnit(wu.getClear());
     }
     catch(IException* e)
     {