|
@@ -1,12 +1,14 @@
|
|
|
package hercules
|
|
|
|
|
|
import (
|
|
|
+ "bufio"
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
"io"
|
|
|
+ "io/ioutil"
|
|
|
"os"
|
|
|
+ "reflect"
|
|
|
|
|
|
- "bufio"
|
|
|
"gopkg.in/src-d/go-git.v4"
|
|
|
"gopkg.in/src-d/go-git.v4/plumbing"
|
|
|
"gopkg.in/src-d/go-git.v4/plumbing/object"
|
|
@@ -21,6 +23,9 @@ type PipelineItem interface {
|
|
|
Provides() []string
|
|
|
// Requires returns the list of keys of needed entities which must be supplied in Consume().
|
|
|
Requires() []string
|
|
|
+ // Construct performs the initial creation of the object by taking parameters from facts.
|
|
|
+ // It allows to create PipelineItems in a universal way.
|
|
|
+ Construct(facts map[string]interface{})
|
|
|
// Initialize prepares and resets the item. Consume() requires Initialize()
|
|
|
// to be called at least once beforehand.
|
|
|
Initialize(*git.Repository)
|
|
@@ -33,6 +38,44 @@ type PipelineItem interface {
|
|
|
Finalize() interface{}
|
|
|
}
|
|
|
|
|
|
+type PipelineItemRegistry struct {
|
|
|
+ provided map[string][]reflect.Type
|
|
|
+}
|
|
|
+
|
|
|
+func (registry *PipelineItemRegistry) Register(example PipelineItem) {
|
|
|
+ if registry.provided == nil {
|
|
|
+ registry.provided = map[string][]reflect.Type{}
|
|
|
+ }
|
|
|
+ t := reflect.TypeOf(example)
|
|
|
+ for _, dep := range example.Provides() {
|
|
|
+ ts := registry.provided[dep]
|
|
|
+ if ts == nil {
|
|
|
+ ts = []reflect.Type{}
|
|
|
+ }
|
|
|
+ ts = append(ts, t)
|
|
|
+ registry.provided[dep] = ts
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (registry *PipelineItemRegistry) Summon(provides string) []PipelineItem {
|
|
|
+ if registry.provided == nil {
|
|
|
+ return []PipelineItem{}
|
|
|
+ }
|
|
|
+ ts := registry.provided[provides]
|
|
|
+ items := []PipelineItem{}
|
|
|
+ for _, t := range ts {
|
|
|
+ items = append(items, reflect.New(t.Elem()).Interface().(PipelineItem))
|
|
|
+ }
|
|
|
+ return items
|
|
|
+}
|
|
|
+
|
|
|
+var Registry = &PipelineItemRegistry{}
|
|
|
+
|
|
|
+type wrappedPipelineItem struct {
|
|
|
+ Item PipelineItem
|
|
|
+ Children []wrappedPipelineItem
|
|
|
+}
|
|
|
+
|
|
|
type Pipeline struct {
|
|
|
// OnProgress is the callback which is invoked in Analyse() to output it's
|
|
|
// progress. The first argument is the number of processed commits and the
|
|
@@ -47,19 +90,48 @@ type Pipeline struct {
|
|
|
|
|
|
// plan is the resolved execution sequence.
|
|
|
plan []PipelineItem
|
|
|
+
|
|
|
+ // the collection of parameters to create items.
|
|
|
+ facts map[string]interface{}
|
|
|
}
|
|
|
|
|
|
func NewPipeline(repository *git.Repository) *Pipeline {
|
|
|
return &Pipeline{repository: repository, items: []PipelineItem{}, plan: []PipelineItem{}}
|
|
|
}
|
|
|
|
|
|
-func (pipeline *Pipeline) AddItem(item PipelineItem) {
|
|
|
- for _, reg := range pipeline.items {
|
|
|
- if reg == item {
|
|
|
- return
|
|
|
- }
|
|
|
+func (pipeline *Pipeline) GetFact(name string) interface{} {
|
|
|
+ return pipeline.facts[name]
|
|
|
+}
|
|
|
+
|
|
|
+func (pipeline *Pipeline) SetFact(name string, value interface{}) {
|
|
|
+ pipeline.facts[name] = value
|
|
|
+}
|
|
|
+
|
|
|
+func (pipeline *Pipeline) DeployItem(item PipelineItem) PipelineItem {
|
|
|
+ queue := []PipelineItem{}
|
|
|
+ queue = append(queue, item)
|
|
|
+ added := map[string]PipelineItem{}
|
|
|
+ added[item.Name()] = item
|
|
|
+ pipeline.AddItem(item)
|
|
|
+ for len(queue) > 0 {
|
|
|
+ head := queue[0]
|
|
|
+ queue = queue[1:]
|
|
|
+ for _, dep := range head.Requires() {
|
|
|
+ for _, sibling := range Registry.Summon(dep) {
|
|
|
+ if _, exists := added[sibling.Name()]; !exists {
|
|
|
+ added[sibling.Name()] = sibling
|
|
|
+ queue = append(queue, sibling)
|
|
|
+ pipeline.AddItem(sibling)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ return item
|
|
|
+}
|
|
|
+
|
|
|
+func (pipeline *Pipeline) AddItem(item PipelineItem) PipelineItem {
|
|
|
pipeline.items = append(pipeline.items, item)
|
|
|
+ return item
|
|
|
}
|
|
|
|
|
|
func (pipeline *Pipeline) RemoveItem(item PipelineItem) {
|
|
@@ -99,28 +171,98 @@ func (pipeline *Pipeline) Commits() []*object.Commit {
|
|
|
return result
|
|
|
}
|
|
|
|
|
|
-func (pipeline *Pipeline) Initialize() {
|
|
|
+func (pipeline *Pipeline) Initialize(facts map[string]interface{}) {
|
|
|
graph := toposort.NewGraph()
|
|
|
name2item := map[string]PipelineItem{}
|
|
|
- for index, item := range pipeline.items {
|
|
|
- name := fmt.Sprintf("%s_%d", item.Name(), index)
|
|
|
+ ambiguousMap := map[string][]string{}
|
|
|
+ nameUsages := map[string]int{}
|
|
|
+ for _, item := range pipeline.items {
|
|
|
+ nameUsages[item.Name()]++
|
|
|
+ }
|
|
|
+ counters := map[string]int{}
|
|
|
+ for _, item := range pipeline.items {
|
|
|
+ name := item.Name()
|
|
|
+ if nameUsages[name] > 1 {
|
|
|
+ index := counters[item.Name()] + 1
|
|
|
+ counters[item.Name()] = index
|
|
|
+ name = fmt.Sprintf("%s_%d", item.Name(), index)
|
|
|
+ }
|
|
|
graph.AddNode(name)
|
|
|
name2item[name] = item
|
|
|
for _, key := range item.Provides() {
|
|
|
key = "[" + key + "]"
|
|
|
graph.AddNode(key)
|
|
|
- graph.AddEdge(name, key)
|
|
|
+ if graph.AddEdge(name, key) > 1 {
|
|
|
+ if ambiguousMap[key] != nil {
|
|
|
+ panic("Failed to resolve pipeline dependencies.")
|
|
|
+ }
|
|
|
+ ambiguousMap[key] = graph.FindParents(key)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- for index, item := range pipeline.items {
|
|
|
- name := fmt.Sprintf("%s_%d", item.Name(), index)
|
|
|
+ counters = map[string]int{}
|
|
|
+ for _, item := range pipeline.items {
|
|
|
+ name := item.Name()
|
|
|
+ if nameUsages[name] > 1 {
|
|
|
+ index := counters[item.Name()] + 1
|
|
|
+ counters[item.Name()] = index
|
|
|
+ name = fmt.Sprintf("%s_%d", item.Name(), index)
|
|
|
+ }
|
|
|
for _, key := range item.Requires() {
|
|
|
key = "[" + key + "]"
|
|
|
- if !graph.AddEdge(key, name) {
|
|
|
+ if graph.AddEdge(key, name) == 0 {
|
|
|
panic(fmt.Sprintf("Unsatisfied dependency: %s -> %s", key, item.Name()))
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if len(ambiguousMap) > 0 {
|
|
|
+ ambiguous := []string{}
|
|
|
+ for key := range ambiguousMap {
|
|
|
+ ambiguous = append(ambiguous, key)
|
|
|
+ }
|
|
|
+ bfsorder := graph.BreadthSort()
|
|
|
+ bfsindex := map[string]int{}
|
|
|
+ for i, s := range bfsorder {
|
|
|
+ bfsindex[s] = i
|
|
|
+ }
|
|
|
+ for len(ambiguous) > 0 {
|
|
|
+ key := ambiguous[0]
|
|
|
+ ambiguous = ambiguous[1:]
|
|
|
+ pair := ambiguousMap[key]
|
|
|
+ inheritor := pair[1]
|
|
|
+ if bfsindex[pair[1]] < bfsindex[pair[0]] {
|
|
|
+ inheritor = pair[0]
|
|
|
+ }
|
|
|
+ removed := graph.RemoveEdge(key, inheritor)
|
|
|
+ cycle := map[string]bool{}
|
|
|
+ for _, node := range graph.FindCycle(key) {
|
|
|
+ cycle[node] = true
|
|
|
+ }
|
|
|
+ if len(cycle) == 0 {
|
|
|
+ cycle[inheritor] = true
|
|
|
+ }
|
|
|
+ if removed {
|
|
|
+ graph.AddEdge(key, inheritor)
|
|
|
+ }
|
|
|
+ graph.RemoveEdge(inheritor, key)
|
|
|
+ graph.ReindexNode(inheritor)
|
|
|
+ // for all nodes key links to except those in cycle, put the link from inheritor
|
|
|
+ for _, node := range graph.FindChildren(key) {
|
|
|
+ if _, exists := cycle[node]; !exists {
|
|
|
+ graph.AddEdge(inheritor, node)
|
|
|
+ graph.RemoveEdge(key, node)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ graph.ReindexNode(key)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if dumpPath, exists := facts["Pipeline.DumpPath"].(string); exists {
|
|
|
+ ioutil.WriteFile(dumpPath, []byte(graph.Serialize([]string{})), 0666)
|
|
|
+ }
|
|
|
+ var graphCopy *toposort.Graph
|
|
|
+ if _, exists := facts["Pipeline.DumpPath"].(string); exists {
|
|
|
+ graphCopy = graph.Copy()
|
|
|
+ }
|
|
|
strplan, ok := graph.Toposort()
|
|
|
if !ok {
|
|
|
panic("Failed to resolve pipeline dependencies.")
|
|
@@ -134,6 +276,15 @@ func (pipeline *Pipeline) Initialize() {
|
|
|
if len(pipeline.plan) != len(pipeline.items) {
|
|
|
panic("Internal pipeline dependency resolution error.")
|
|
|
}
|
|
|
+ if dumpPath, exists := facts["Pipeline.DumpPath"].(string); exists {
|
|
|
+ ioutil.WriteFile(dumpPath, []byte(graphCopy.Serialize(strplan)), 0666)
|
|
|
+ }
|
|
|
+ if dryRun, exists := facts["Pipeline.DryRun"].(bool); exists && dryRun {
|
|
|
+ return
|
|
|
+ }
|
|
|
+ for _, item := range pipeline.items {
|
|
|
+ item.Construct(facts)
|
|
|
+ }
|
|
|
for _, item := range pipeline.items {
|
|
|
item.Initialize(pipeline.repository)
|
|
|
}
|