|
@@ -635,7 +635,6 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException
|
|
|
}
|
|
|
Mutex errmutex;
|
|
|
Semaphore ready;
|
|
|
- Semaphore finished;
|
|
|
IException *e=NULL;
|
|
|
Owned<IShuffledIterator> shuffler;
|
|
|
if (shuffled) {
|
|
@@ -666,13 +665,12 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException
|
|
|
public:
|
|
|
Mutex *errmutex;
|
|
|
Semaphore &ready;
|
|
|
- Semaphore &finished;
|
|
|
int timeout;
|
|
|
IException *&erre;
|
|
|
unsigned idx;
|
|
|
CAsyncFor *self;
|
|
|
- cdothread(CAsyncFor *_self,unsigned _idx,Semaphore &_ready,Semaphore &_finished,Mutex *_errmutex,IException *&_e)
|
|
|
- : Thread("CAsyncFor"),ready(_ready),finished(_finished),erre(_e)
|
|
|
+ cdothread(CAsyncFor *_self,unsigned _idx,Semaphore &_ready,Mutex *_errmutex,IException *&_e)
|
|
|
+ : Thread("CAsyncFor"),ready(_ready),erre(_e)
|
|
|
{
|
|
|
errmutex =_errmutex;
|
|
|
idx = _idx;
|
|
@@ -700,7 +698,6 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException
|
|
|
}
|
|
|
#endif
|
|
|
ready.signal();
|
|
|
- finished.signal();
|
|
|
return 0;
|
|
|
}
|
|
|
};
|
|
@@ -708,14 +705,19 @@ void CAsyncFor::For(unsigned num,unsigned maxatonce,bool abortFollowingException
|
|
|
maxatonce = num;
|
|
|
for (i=0;(i<num)&&(i<maxatonce);i++)
|
|
|
ready.signal();
|
|
|
+ IArrayOf<Thread> started;
|
|
|
+ started.ensure(num);
|
|
|
for (i=0;i<num;i++) {
|
|
|
ready.wait();
|
|
|
if (abortFollowingException && e) break;
|
|
|
- Thread *thread = new cdothread(this,shuffled?shuffler->lookup(i):i,ready,finished,&errmutex,e);
|
|
|
- thread->startRelease();
|
|
|
+ Owned<Thread> thread = new cdothread(this,shuffled?shuffler->lookup(i):i,ready,&errmutex,e);
|
|
|
+ thread->start();
|
|
|
+ started.append(*thread.getClear());
|
|
|
+ }
|
|
|
+ ForEachItemIn(idx, started)
|
|
|
+ {
|
|
|
+ started.item(idx).join();
|
|
|
}
|
|
|
- while (i--)
|
|
|
- finished.wait();
|
|
|
}
|
|
|
if (e)
|
|
|
throw e;
|