|
@@ -385,7 +385,7 @@ func (pipeline *Pipeline) DeployItem(item PipelineItem) PipelineItem {
|
|
|
pipeline.SetFeature(f)
|
|
|
}
|
|
|
}
|
|
|
- queue := []PipelineItem{}
|
|
|
+ var queue []PipelineItem
|
|
|
queue = append(queue, item)
|
|
|
added := map[string]PipelineItem{}
|
|
|
for _, item := range pipeline.items {
|
|
@@ -514,7 +514,7 @@ func (items sortablePipelineItems) Swap(i, j int) {
|
|
|
items[i], items[j] = items[j], items[i]
|
|
|
}
|
|
|
|
|
|
-func (pipeline *Pipeline) resolve(dumpPath string) {
|
|
|
+func (pipeline *Pipeline) resolve(dumpPath string) error {
|
|
|
graph := toposort.NewGraph()
|
|
|
sort.Sort(sortablePipelineItems(pipeline.items))
|
|
|
name2item := map[string]PipelineItem{}
|
|
@@ -553,7 +553,7 @@ func (pipeline *Pipeline) resolve(dumpPath string) {
|
|
|
fmt.Fprintln(os.Stderr, "]")
|
|
|
}
|
|
|
pipeline.l.Critical("Failed to resolve pipeline dependencies: ambiguous graph.")
|
|
|
- return
|
|
|
+ return errors.New("ambiguous graph")
|
|
|
}
|
|
|
ambiguousMap[key] = graph.FindParents(key)
|
|
|
}
|
|
@@ -571,7 +571,7 @@ func (pipeline *Pipeline) resolve(dumpPath string) {
|
|
|
key = "[" + key + "]"
|
|
|
if graph.AddEdge(key, name) == 0 {
|
|
|
pipeline.l.Criticalf("Unsatisfied dependency: %s -> %s", key, item.Name())
|
|
|
- return
|
|
|
+ return errors.New("unsatisfied dependency")
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -625,7 +625,7 @@ func (pipeline *Pipeline) resolve(dumpPath string) {
|
|
|
strplan, ok := graph.Toposort()
|
|
|
if !ok {
|
|
|
pipeline.l.Critical("Failed to resolve pipeline dependencies: unable to topologically sort the items.")
|
|
|
- return
|
|
|
+ return errors.New("topological sort failure")
|
|
|
}
|
|
|
pipeline.items = make([]PipelineItem, 0, len(pipeline.items))
|
|
|
for _, key := range strplan {
|
|
@@ -640,6 +640,7 @@ func (pipeline *Pipeline) resolve(dumpPath string) {
|
|
|
absPath, _ := filepath.Abs(dumpPath)
|
|
|
pipeline.l.Infof("Wrote the DAG to %s\n", absPath)
|
|
|
}
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
// Initialize prepares the pipeline for the execution (Run()). This function
|
|
@@ -685,7 +686,10 @@ func (pipeline *Pipeline) Initialize(facts map[string]interface{}) error {
|
|
|
pipeline.HibernationDistance = val
|
|
|
}
|
|
|
dumpPath, _ := facts[ConfigPipelineDAGPath].(string)
|
|
|
- pipeline.resolve(dumpPath)
|
|
|
+ err := pipeline.resolve(dumpPath)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
if dumpPlan, exists := facts[ConfigPipelineDumpPlan].(bool); exists {
|
|
|
pipeline.DumpPlan = dumpPlan
|
|
|
}
|