瀏覽代碼

Add prepareRunPlan()

Signed-off-by: Vadim Markovtsev <vadim@sourced.tech>
Vadim Markovtsev 6 年之前
父節點
當前提交
b9fcd0222c
共有 3 個文件被更改,包括 547 次插入3 次删除
  1. 446 2
      internal/core/pipeline.go
  2. 100 0
      internal/core/pipeline_test.go
  3. 1 1
      internal/core/global_test.go

+ 446 - 2
internal/core/pipeline.go

@@ -539,8 +539,7 @@ func (pipeline *Pipeline) Initialize(facts map[string]interface{}) {
 
 // Run method executes the pipeline.
 //
-// commits is a slice with the sequential commit history. It shall start from
-// the root (ascending order).
+// `commits` is a slice with the git commits to analyse. Multiple branches are supported.
 //
 // Returns the mapping from each LeafPipelineItem to the corresponding analysis result.
 // There is always a "nil" record with CommonAnalysisResult.
@@ -615,3 +614,448 @@ func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Com
 	}
 	return commits, nil
 }
+
+const (
+	runActionCommit = 0
+	runActionFork = iota
+	runActionMerge = iota
+)
+
+type runAction struct {
+	Action int
+	Commit *object.Commit
+	Items []int
+}
+
+func prepareRunPlan(commits []*object.Commit) []runAction {
+	hashes, dag := buildDag(commits)
+	leaveRootComponent(hashes, dag)
+	numParents := bindNumParents(hashes, dag)
+	mergedDag, mergedSeq := mergeDag(numParents, hashes, dag)
+	orderNodes := bindOrderNodes(mergedDag)
+	collapseFastForwards(orderNodes, hashes, mergedDag, dag, mergedSeq)
+
+	/*fmt.Printf("digraph Hercules {\n")
+	for i, c := range order {
+		commit := hashes[c]
+		fmt.Printf("  \"%s\"[label=\"[%d] %s\"]\n", commit.Hash.String(), i, commit.Hash.String()[:6])
+		for _, child := range mergedDag[commit.Hash] {
+			fmt.Printf("  \"%s\" -> \"%s\"\n", commit.Hash.String(), child.Hash.String())
+		}
+	}
+	fmt.Printf("}\n")*/
+
+	plan := generatePlan(orderNodes, numParents, hashes, mergedDag, dag, mergedSeq)
+	plan = optimizePlan(plan)
+	return plan
+}
+
+// buildDag generates the raw commit DAG and the commit hash map.
+func buildDag(commits []*object.Commit) (
+	map[string]*object.Commit, map[plumbing.Hash][]*object.Commit) {
+
+	hashes := map[string]*object.Commit{}
+	for _, commit := range commits {
+		hashes[commit.Hash.String()] = commit
+	}
+	dag := map[plumbing.Hash][]*object.Commit{}
+	for _, commit := range commits {
+		if _, exists := dag[commit.Hash]; !exists {
+			dag[commit.Hash] = make([]*object.Commit, 0, 1)
+		}
+		for _, parent := range commit.ParentHashes {
+			if _, exists := hashes[parent.String()]; !exists {
+				continue
+			}
+			children := dag[parent]
+			if children == nil {
+				children = make([]*object.Commit, 0, 1)
+			}
+			dag[parent] = append(children, commit)
+		}
+	}
+	return hashes, dag
+}
+
+// bindNumParents returns curried "numParents" function.
+func bindNumParents(
+	hashes map[string]*object.Commit,
+	dag map[plumbing.Hash][]*object.Commit) func(c *object.Commit) int {
+	return func(c *object.Commit) int {
+		r := 0
+		for _, parent := range c.ParentHashes {
+			if p, exists := hashes[parent.String()]; exists {
+				for _, pc := range dag[p.Hash] {
+					if pc.Hash == c.Hash {
+						r++
+						break
+					}
+				}
+			}
+		}
+		return r
+	}
+}
+
+// leaveRootComponent runs connected components analysis and throws away everything
+// but the part which grows from the root.
+func leaveRootComponent(
+	hashes map[string]*object.Commit,
+	dag map[plumbing.Hash][]*object.Commit) {
+
+	visited := map[plumbing.Hash]bool{}
+	var sets [][]plumbing.Hash
+	for key := range dag {
+		if visited[key] {
+			continue
+		}
+		var set []plumbing.Hash
+		for queue := []plumbing.Hash{key}; len(queue) > 0; {
+			head := queue[len(queue)-1]
+			queue = queue[:len(queue)-1]
+			if visited[head] {
+				continue
+			}
+			set = append(set, head)
+			visited[head] = true
+			for _, c := range dag[head] {
+				if !visited[c.Hash] {
+					queue = append(queue, c.Hash)
+				}
+			}
+			if commit, exists := hashes[head.String()]; exists {
+				for _, p := range commit.ParentHashes {
+					if !visited[p] {
+						if _, exists := hashes[p.String()]; exists {
+							queue = append(queue, p)
+						}
+					}
+				}
+			}
+		}
+		sets = append(sets, set)
+	}
+	if len(sets) > 1 {
+		maxlen := 0
+		maxind := -1
+		for i, set := range sets {
+			if len(set) > maxlen {
+				maxlen = len(set)
+				maxind = i
+			}
+		}
+		for i, set := range sets {
+			if i == maxind {
+				continue
+			}
+			for _, h := range set {
+				log.Printf("warning: dropped %s from the analysis - disjoint", h.String())
+				delete(dag, h)
+				delete(hashes, h.String())
+			}
+		}
+	}
+}
+
+// bindOrderNodes returns curried "orderNodes" function.
+func bindOrderNodes(mergedDag map[plumbing.Hash][]*object.Commit) func(reverse bool) []string {
+	return func(reverse bool) []string {
+		graph := toposort.NewGraph()
+		keys := make([]plumbing.Hash, 0, len(mergedDag))
+		for key := range mergedDag {
+			keys = append(keys, key)
+		}
+		sort.Slice(keys, func(i, j int) bool { return keys[i].String() < keys[j].String() })
+		for _, key := range keys {
+			graph.AddNode(key.String())
+		}
+		for _, key := range keys {
+			children := mergedDag[key]
+			sort.Slice(children, func(i, j int) bool {
+				return children[i].Hash.String() < children[j].Hash.String()
+			})
+			for _, c := range children {
+				graph.AddEdge(key.String(), c.Hash.String())
+			}
+		}
+		order, ok := graph.Toposort()
+		if !ok {
+			// should never happen
+			panic("Could not topologically sort the DAG of commits")
+		}
+		if reverse {
+			// one day this must appear in the standard library...
+			for i, j := 0, len(order)-1; i < len(order)/2; i, j = i+1, j-1 {
+				order[i], order[j] = order[j], order[i]
+			}
+		}
+		return order
+	}
+}
+
+// mergeDag turns sequences of consecutive commits into single nodes.
+func mergeDag(
+	numParents func(c *object.Commit) int,
+	hashes map[string]*object.Commit,
+	dag map[plumbing.Hash][]*object.Commit) (
+		mergedDag, mergedSeq map[plumbing.Hash][]*object.Commit) {
+
+	parentOf := func(c *object.Commit) plumbing.Hash {
+		var parent plumbing.Hash
+		for _, p := range c.ParentHashes {
+			if _, exists := hashes[p.String()]; exists {
+				if parent != plumbing.ZeroHash {
+					// more than one parent
+					return plumbing.ZeroHash
+				}
+				parent = p
+			}
+		}
+		return parent
+	}
+	mergedDag = map[plumbing.Hash][]*object.Commit{}
+	mergedSeq = map[plumbing.Hash][]*object.Commit{}
+	visited := map[plumbing.Hash]bool{}
+	for ch := range dag {
+		c := hashes[ch.String()]
+		if visited[c.Hash] {
+			continue
+		}
+		for true {
+			parent := parentOf(c)
+			if parent == plumbing.ZeroHash || len(dag[parent]) != 1 {
+				break
+			}
+			c = hashes[parent.String()]
+		}
+		head := c
+		var seq []*object.Commit
+		children := dag[c.Hash]
+		for true {
+			visited[c.Hash] = true
+			seq = append(seq, c)
+			if len(children) != 1 {
+				break
+			}
+			c = children[0]
+			children = dag[c.Hash]
+			if numParents(c) != 1 {
+				break
+			}
+		}
+		mergedSeq[head.Hash] = seq
+		mergedDag[head.Hash] = dag[seq[len(seq)-1].Hash]
+	}
+	return
+}
+
+// collapseFastForwards removes the fast forward merges.
+func collapseFastForwards(
+	orderNodes func(reverse bool) []string,
+	hashes map[string]*object.Commit,
+	mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit)  {
+
+	for _, strkey := range orderNodes(true) {
+		key := hashes[strkey].Hash
+		vals, exists := mergedDag[key]
+		if !exists {
+			continue
+		}
+		if len(vals) == 2 {
+			grand1 := mergedDag[vals[0].Hash]
+			grand2 := mergedDag[vals[1].Hash]
+			if len(grand2) == 1 && vals[0].Hash == grand2[0].Hash {
+				mergedDag[key] = mergedDag[vals[0].Hash]
+				dag[key] = vals[1:]
+				delete(mergedDag, vals[0].Hash)
+				delete(mergedDag, vals[1].Hash)
+				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
+				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
+				delete(mergedSeq, vals[0].Hash)
+				delete(mergedSeq, vals[1].Hash)
+			}
+			// symmetric
+			if len(grand1) == 1 && vals[1].Hash == grand1[0].Hash {
+				mergedDag[key] = mergedDag[vals[1].Hash]
+				dag[key] = vals[:1]
+				delete(mergedDag, vals[0].Hash)
+				delete(mergedDag, vals[1].Hash)
+				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
+				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
+				delete(mergedSeq, vals[0].Hash)
+				delete(mergedSeq, vals[1].Hash)
+			}
+		}
+	}
+}
+
+// generatePlan creates the list of actions from the commit DAG.
+func generatePlan(
+	orderNodes func(reverse bool) []string,
+	numParents func(c *object.Commit) int,
+	hashes map[string]*object.Commit,
+	mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) []runAction {
+
+	var plan []runAction
+	branches := map[plumbing.Hash]int{}
+	counter := 1
+	for seqIndex, name := range orderNodes(false) {
+		commit := hashes[name]
+		if seqIndex == 0 {
+			branches[commit.Hash] = 0
+		}
+		var branch int
+		{
+			var exists bool
+			branch, exists = branches[commit.Hash]
+			if !exists {
+				branch = -1
+			}
+		}
+		branchExists := func() bool { return branch >= 0 }
+		appendCommit := func(c *object.Commit, branch int) {
+			plan = append(plan, runAction{
+				Action: runActionCommit,
+				Commit: c,
+				Items: []int{branch},
+			})
+		}
+		appendMergeIfNeeded := func() {
+			if numParents(commit) < 2 {
+				return
+			}
+			// merge after the merge commit (the first in the sequence)
+			var items []int
+			minBranch := 1 << 31
+			for _, parent := range commit.ParentHashes {
+				if _, exists := hashes[parent.String()]; exists {
+					parentBranch := branches[parent]
+					if len(dag[parent]) == 1 && minBranch > parentBranch {
+						minBranch = parentBranch
+					}
+					items = append(items, parentBranch)
+					if parentBranch != branch {
+						appendCommit(commit, parentBranch)
+					}
+				}
+			}
+			if minBranch < 1 << 31 {
+				branch = minBranch
+				branches[commit.Hash] = minBranch
+			} else if !branchExists() {
+				panic("!branchExists()")
+			}
+			plan = append(plan, runAction{
+				Action: runActionMerge,
+				Commit: nil,
+				Items: items,
+			})
+		}
+		if subseq, exists := mergedSeq[commit.Hash]; exists {
+			for subseqIndex, offspring := range subseq {
+				if branchExists() {
+					appendCommit(offspring, branch)
+				}
+				if subseqIndex == 0 {
+					appendMergeIfNeeded()
+				}
+			}
+			branches[subseq[len(subseq)-1].Hash] = branch
+		}
+		if len(mergedDag[commit.Hash]) > 1 {
+			branches[mergedDag[commit.Hash][0].Hash] = branch
+			children := []int{branch}
+			for i, child := range mergedDag[commit.Hash] {
+				if i > 0 {
+					branches[child.Hash] = counter
+					children = append(children, counter)
+					counter++
+				}
+			}
+			plan = append(plan, runAction{
+				Action: runActionFork,
+				Commit: nil,
+				Items: children,
+			})
+		}
+	}
+	return plan
+}
+
+// optimizePlan removes "dead" nodes.
+//
+// |   *
+// *  /
+// |\/
+// |/
+// *
+//
+func optimizePlan(plan []runAction) []runAction {
+	lives := map[int][]int{}
+	for i, p := range plan {
+		if p.Action == runActionCommit {
+			lives[p.Items[0]] = append(lives[p.Items[0]], i)
+		}
+	}
+	branchesToDelete := map[int]bool{}
+	for key, life := range lives {
+		if len(life) == 1 {
+			branchesToDelete[key] = true
+		}
+	}
+	if len(branchesToDelete) == 0 {
+		return plan
+	}
+	var optimizedPlan []runAction
+	for _, p := range plan {
+		switch p.Action {
+		case runActionCommit:
+			if !branchesToDelete[p.Items[0]] {
+				optimizedPlan = append(optimizedPlan, p)
+			}
+		case runActionFork:
+			var newBranches []int
+			for _, b := range p.Items {
+				if !branchesToDelete[b] {
+					newBranches = append(newBranches, b)
+				}
+			}
+			if len(newBranches) > 1 {
+				optimizedPlan = append(optimizedPlan, runAction{
+					Action: runActionFork,
+					Commit: p.Commit,
+					Items:  newBranches,
+				})
+			}
+		case runActionMerge:
+			var newBranches []int
+			for _, b := range p.Items {
+				if !branchesToDelete[b] {
+					newBranches = append(newBranches, b)
+				}
+			}
+			if len(newBranches) > 1 {
+				optimizedPlan = append(optimizedPlan, runAction{
+					Action: runActionMerge,
+					Commit: p.Commit,
+					Items:  newBranches,
+				})
+			}
+		}
+	}
+	// single commit can be detected as redundant
+	if len(optimizedPlan) > 0 {
+		return optimizedPlan
+	}
+	return plan
+	// TODO(vmarkovtsev): there can be also duplicate redundant merges, e.g.
+	/*
+	0 4e34f03d829fbacb71cde0e010de87ea945dc69a [3]
+	0 4e34f03d829fbacb71cde0e010de87ea945dc69a [12]
+	2                                          [3 12]
+	0 06716c2b39422938b77ddafa4d5c39bb9e4476da [3]
+	0 06716c2b39422938b77ddafa4d5c39bb9e4476da [12]
+	2                                          [3 12]
+	0 1219c7bf9e0e1a93459a052ab8b351bfc379dc19 [12]
+	*/
+}

+ 100 - 0
internal/core/pipeline_test.go

@@ -2,10 +2,12 @@ package core
 
 import (
 	"errors"
+	"fmt"
 	"io"
 	"io/ioutil"
 	"os"
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 	"gopkg.in/src-d/go-git.v4"
@@ -358,3 +360,101 @@ func TestConfigurationOptionFormatDefault(t *testing.T) {
 	opt = ConfigurationOption{Type: FloatConfigurationOption, Default: 0.5}
 	assert.Equal(t, opt.FormatDefault(), "0.5")
 }
+
+func TestPrepareRunPlanSmall(t *testing.T) {
+	cit, err := test.Repository.Log(&git.LogOptions{From: plumbing.ZeroHash})
+	if err != nil {
+		panic(err)
+	}
+	defer cit.Close()
+	var commits []*object.Commit
+	timeCutoff := time.Date(2016, 12, 15, 0, 0, 0, 0, time.Local)
+	cit.ForEach(func(commit *object.Commit) error {
+		if commit.Author.When.Before(timeCutoff) {
+			commits = append(commits, commit)
+		}
+		return nil
+	})
+	plan := prepareRunPlan(commits)
+	/*for _, p := range plan {
+		if p.Commit != nil {
+			fmt.Println(p.Action, p.Commit.Hash.String(), p.Items)
+		} else {
+			fmt.Println(p.Action, strings.Repeat(" ", 40), p.Items)
+		}
+	}*/
+	// fork, merge and one artificial commit per branch
+	assert.Len(t, plan, len(commits))
+	assert.Equal(t, runActionCommit, plan[0].Action)
+	assert.Equal(t, 0, plan[0].Items[0])
+	assert.Equal(t, "cce947b98a050c6d356bc6ba95030254914027b1", plan[0].Commit.Hash.String())
+	assert.Equal(t, runActionCommit, plan[1].Action)
+	assert.Equal(t, 0, plan[1].Items[0])
+	assert.Equal(t, "a3ee37f91f0d705ec9c41ae88426f0ae44b2fbc3", plan[1].Commit.Hash.String())
+	assert.Equal(t, runActionCommit, plan[9].Action)
+	assert.Equal(t, 0, plan[9].Items[0])
+	assert.Equal(t, "a28e9064c70618dc9d68e1401b889975e0680d11", plan[9].Commit.Hash.String())
+}
+
+func TestPrepareRunPlanBig(t *testing.T) {
+	cases := [][6]int {
+		{2017, 8, 9, 0, 0, 0},
+		{2017, 8, 10, 0, 0, 0},
+		{2017, 8, 24, 1, 1, 1},
+		{2017, 9, 19, 1-2, 1, 1},
+		{2017, 9, 23, 1-2, 1, 1},
+		{2017, 12, 8, 1, 1, 1},
+		{2017, 12, 9, 1, 1, 1},
+		{2017, 12, 10, 1, 1, 1},
+		{2017, 12, 11, 2, 2, 2},
+		{2017, 12, 19, 4, 4, 4},
+		{2017, 12, 27, 4, 4, 4},
+		{2018, 1, 10, 4, 4, 4},
+		{2018, 1, 16, 4, 4, 4},
+		{2018, 1, 18, 7, 6, 7},
+		{2018, 1, 23, 8, 6, 8},
+		{2018, 3, 12, 9, 7, 9},
+		{2018, 5, 13, 9, 7, 9},
+		{2018, 5, 16, 13, 9, 13},
+	}
+	for _, testCase := range cases {
+		cit, err := test.Repository.Log(&git.LogOptions{From: plumbing.ZeroHash})
+		if err != nil {
+			panic(err)
+		}
+		defer cit.Close()
+		var commits []*object.Commit
+		timeCutoff := time.Date(
+			testCase[0], time.Month(testCase[1]), testCase[2], 0, 0, 0, 0, time.Local)
+		cit.ForEach(func(commit *object.Commit) error {
+			if commit.Author.When.Before(timeCutoff) {
+				commits = append(commits, commit)
+			}
+			return nil
+		})
+		plan := prepareRunPlan(commits)
+		/*for _, p := range plan {
+			if p.Commit != nil {
+				fmt.Println(p.Action, p.Commit.Hash.String(), p.Items)
+			} else {
+				fmt.Println(p.Action, strings.Repeat(" ", 40), p.Items)
+			}
+		}*/
+		numCommits := 0
+		numForks := 0
+		numMerges := 0
+		for _, p := range plan {
+			switch p.Action {
+			case runActionCommit:
+				numCommits++
+			case runActionFork:
+				numForks++
+			case runActionMerge:
+				numMerges++
+			}
+		}
+		assert.Equal(t, numCommits, len(commits)+testCase[3], fmt.Sprintf("commits %v", testCase))
+		assert.Equal(t, numForks, testCase[4], fmt.Sprintf("forks %v", testCase))
+		assert.Equal(t, numMerges, testCase[5], fmt.Sprintf("merges %v", testCase))
+	}
+}

+ 1 - 1
internal/core/global_test.go

@@ -1,4 +1,4 @@
-package core_test
+package internal_test
 
 import (
 	"io/ioutil"