Browse Source

Merge pull request #84 from vmarkovtsev/master

Add multiple branch support to plumbing and Burndown
Vadim Markovtsev 6 years ago
parent
commit
34c377c129

+ 6 - 4
.travis.yml

@@ -26,6 +26,8 @@ cache:
   directories:
     - $HOME/.cache/pip
     - $HOME/gopath/src
+before_cache:
+  - rm -rf $HOME/gopath/src/gopkg.in/src-d/hercules.v4
 
 matrix:
   fast_finish: true
@@ -37,7 +39,7 @@ stages:
   - deploy
 
 env:
-  - PROTOC_VERSION=3.5.1 TENSORFLOW_VERSION=1.7.0
+  - PROTOC_VERSION=3.6.0 TENSORFLOW_VERSION=1.8.0
 
 before_install:
   - sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-6 90
@@ -86,6 +88,7 @@ jobs:
       os: osx
       osx_image: xcode9.3
       go: 1.10.x
+      go_import_path: gopkg.in/src-d/hercules.v4
       before_install:
         - wget -O protoc.zip https://github.com/google/protobuf/releases/download/v$PROTOC_VERSION/protoc-$PROTOC_VERSION-osx-x86_64.zip
         - unzip -d ~/.local protoc.zip && rm protoc.zip
@@ -105,16 +108,15 @@ jobs:
     - stage: deploy
       os: linux
       go: 1.10.x
+      go_import_path: gopkg.in/src-d/hercules.v4
       before_install:
         - sudo update-alternatives --install /usr/bin/g++ g++ /usr/bin/g++-6 90
         - sudo update-alternatives --install /usr/bin/gcc gcc /usr/bin/gcc-6 90
         - wget -O protoc.zip https://github.com/google/protobuf/releases/download/v$PROTOC_VERSION/protoc-$PROTOC_VERSION-linux-x86_64.zip
         - unzip -d ~/.local protoc.zip && rm protoc.zip
-        - curl -L "https://storage.googleapis.com/tensorflow/libtensorflow/libtensorflow-cpu-$(go env GOOS)-x86_64-$TENSORFLOW_VERSION.tar.gz" | sudo tar -C /usr/local -xz
-        - sudo ldconfig
       script: skip
       install:
-        - make
+        - DISABLE_TENSORFLOW=1 make
       after_success:
         - gzip -S .linux_amd64.gz $GOPATH/bin/hercules
       deploy:

+ 7 - 7
OCTOPUS.md

@@ -5,9 +5,9 @@ It follows the main (zero index) branch when it encounters a fork.
 This behavior ignores all the side branches, and we are currently
 thinking how to include them into the analysis.
 
-### Plan
+### Plan - done
 
-* Commits must be sorted by time.
+* Commits must be ordered topologically.
 * When a fork is hit, clone the pipeline. Assign the old instance to the main branch and new
 instances to the sprouts. BurndownAnalysis should share the same counters for efficiency
 and simplicity, but the files must be copied.
@@ -21,7 +21,7 @@ with the previous commit in the main branch.
 * The sequence of commits must be the analysis scenario: it must inform when to fork and to merge,
 which pipeline instance to apply.
 
-### New APIs
+### New APIs - done
 
 * PipelineItem
   * `Fork()`
@@ -30,8 +30,8 @@ which pipeline instance to apply.
 ### Major changes
 
 * `Pipeline`
-  * `Commits()`
-  * `Run()`
-* `Burndown`
+  * `Commits()` - done
+  * `Run()` - done
+* `Burndown` - done
 * `Couples`
-* `FileDiff`
+* `FileDiff` - done

+ 13 - 0
cmd/hercules/plugin.template

@@ -30,6 +30,10 @@ import (
 // {{.name}} contains the intermediate state which is mutated by Consume(). It should implement
 // hercules.LeafPipelineItem.
 type {{.name}} struct {
+  // No special branch merge logic is required
+  hercules.NoopMerger
+  // Process each merge commit only once
+  hercules.OneShotMergeProcessor
 }
 
 // {{.name}}Result is returned by Finalize() and represents the analysis result.
@@ -76,13 +80,22 @@ func ({{.varname}} *{{.name}}) Configure(facts map[string]interface{}) {
 
 // Initialize resets the internal temporary data structures and prepares the object for Consume().
 func ({{.varname}} *{{.name}}) Initialize(repository *git.Repository) {
+  {{.varname}}.OneShotMergeProcessor.Initialize()
 }
 
 // Consume is called for every commit in the sequence.
 func ({{.varname}} *{{.name}}) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
+  if !{{.varname}}.ShouldConsumeCommit(deps) {
+    return nil, nil
+  }
   return nil, nil
 }
 
+// Fork clones the same item several times on branches.
+func ({{.varname}} *{{.name}}) Fork(n int) []hercules.PipelineItem {
+  return hercules.ForkSamePipelineItem({{.varname}}, n)
+}
+
 // Finalize produces the result of the analysis. No more Consume() calls are expected afterwards.
 func ({{.varname}} *{{.name}}) Finalize() interface{} {
   result := {{.name}}Result{}

+ 13 - 0
contrib/_plugin_example/churn_analysis.go

@@ -20,6 +20,10 @@ import (
 // ChurnAnalysis contains the intermediate state which is mutated by Consume(). It should implement
 // hercules.LeafPipelineItem.
 type ChurnAnalysis struct {
+	// No special merge logic is required
+	hercules.NoopMerger
+	// Process each merge only once
+	hercules.OneShotMergeProcessor
 	TrackPeople bool
 
 	global []editInfo
@@ -109,9 +113,13 @@ func (churn *ChurnAnalysis) Configure(facts map[string]interface{}) {
 func (churn *ChurnAnalysis) Initialize(repository *git.Repository) {
 	churn.global = []editInfo{}
 	churn.people = map[int][]editInfo{}
+	churn.OneShotMergeProcessor.Initialize()
 }
 
 func (churn *ChurnAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
+	if !churn.ShouldConsumeCommit(deps) {
+		return nil, nil
+	}
 	fileDiffs := deps[hercules.DependencyFileDiff].(map[string]hercules.FileDiffData)
 	treeDiffs := deps[hercules.DependencyTreeChanges].(object.Changes)
 	cache := deps[hercules.DependencyBlobCache].(map[plumbing.Hash]*object.Blob)
@@ -167,6 +175,11 @@ func (churn *ChurnAnalysis) Consume(deps map[string]interface{}) (map[string]int
 	return nil, nil
 }
 
+// Fork clones the same item several times on branches.
+func (churn *ChurnAnalysis) Fork(n int) []hercules.PipelineItem {
+	return hercules.ForkSamePipelineItem(churn, n)
+}
+
 func (churn *ChurnAnalysis) Finalize() interface{} {
 	result := ChurnAnalysisResult{
 		Global: editInfosToEdits(churn.global),

+ 16 - 0
core.go

@@ -44,6 +44,12 @@ type MergeablePipelineItem = core.MergeablePipelineItem
 // CommonAnalysisResult holds the information which is always extracted at Pipeline.Run().
 type CommonAnalysisResult = core.CommonAnalysisResult
 
+// NoopMerger provides an empty Merge() method suitable for PipelineItem.
+type NoopMerger = core.NoopMerger
+
+// OneShotMergeProcessor provides the convenience method to consume merges only once.
+type OneShotMergeProcessor = core.OneShotMergeProcessor
+
 // MetadataToCommonAnalysisResult copies the data from a Protobuf message.
 func MetadataToCommonAnalysisResult(meta *core.Metadata) *CommonAnalysisResult {
 	return core.MetadataToCommonAnalysisResult(meta)
@@ -78,6 +84,16 @@ func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Com
 	return core.LoadCommitsFromFile(path, repository)
 }
 
+// ForkSamePipelineItem clones items by referencing the same origin.
+func ForkSamePipelineItem(origin PipelineItem, n int) []PipelineItem {
+	return core.ForkSamePipelineItem(origin ,n)
+}
+
+// ForkCopyPipelineItem clones items by copying them by value from the origin.
+func ForkCopyPipelineItem(origin PipelineItem, n int) []PipelineItem {
+	return core.ForkCopyPipelineItem(origin ,n)
+}
+
 // PipelineItemRegistry contains all the known PipelineItem-s.
 type PipelineItemRegistry = core.PipelineItemRegistry
 

+ 590 - 0
internal/core/forks.go

@@ -0,0 +1,590 @@
+package core
+
+import (
+	"log"
+	"reflect"
+	"sort"
+
+	"gopkg.in/src-d/go-git.v4/plumbing/object"
+	"gopkg.in/src-d/go-git.v4/plumbing"
+	"gopkg.in/src-d/hercules.v4/internal/toposort"
+)
+
+// OneShotMergeProcessor provides the convenience method to consume merges only once.
+type OneShotMergeProcessor struct {
+	merges map[plumbing.Hash]bool
+}
+
+// Initialize resets OneShotMergeProcessor.
+func (proc *OneShotMergeProcessor) Initialize() {
+	proc.merges = map[plumbing.Hash]bool{}
+}
+
+// ShouldConsumeCommit returns true on regular commits. It also returns true upon
+// the first occurrence of a particular merge commit.
+func (proc *OneShotMergeProcessor) ShouldConsumeCommit(deps map[string]interface{}) bool {
+	commit := deps[DependencyCommit].(*object.Commit)
+	if commit.NumParents() <= 1 {
+		return true
+	}
+	if !proc.merges[commit.Hash] {
+		proc.merges[commit.Hash] = true
+		return true
+	}
+	return false
+}
+
+// NoopMerger provides an empty Merge() method suitable for PipelineItem.
+type NoopMerger struct {
+}
+
+// Merge does nothing.
+func (merger *NoopMerger) Merge(branches []PipelineItem) {
+	// no-op
+}
+
+// ForkSamePipelineItem clones items by referencing the same origin.
+func ForkSamePipelineItem(origin PipelineItem, n int) []PipelineItem {
+	clones := make([]PipelineItem, n)
+	for i := 0; i < n; i++ {
+		clones[i] = origin
+	}
+	return clones
+}
+
+// ForkCopyPipelineItem clones items by copying them by value from the origin.
+func ForkCopyPipelineItem(origin PipelineItem, n int) []PipelineItem {
+	originValue := reflect.Indirect(reflect.ValueOf(origin))
+	originType := originValue.Type()
+	clones := make([]PipelineItem, n)
+	for i := 0; i < n; i++ {
+		cloneValue := reflect.New(originType).Elem()
+		cloneValue.Set(originValue)
+		clones[i] = cloneValue.Addr().Interface().(PipelineItem)
+	}
+	return clones
+}
+
+const (
+	// runActionCommit corresponds to a regular commit
+	runActionCommit = 0
+	// runActionFork splits a branch into several parts
+	runActionFork = iota
+	// runActionMerge merges several branches together
+	runActionMerge = iota
+	// runActionDelete removes the branch as it is no longer needed
+	runActionDelete = iota
+)
+
+type runAction struct {
+	Action int
+	Commit *object.Commit
+	Items []int
+}
+
+func cloneItems(origin []PipelineItem, n int) [][]PipelineItem {
+	clones := make([][]PipelineItem, n)
+	for j := 0; j < n; j++ {
+		clones[j] = make([]PipelineItem, len(origin))
+	}
+	for i, item := range origin {
+		itemClones := item.Fork(n)
+		for j := 0; j < n; j++ {
+			clones[j][i] = itemClones[j]
+		}
+	}
+	return clones
+}
+
+func mergeItems(branches [][]PipelineItem) {
+	buffer := make([]PipelineItem, len(branches) - 1)
+	for i, item := range branches[0] {
+		for j := 0; j < len(branches)-1; j++ {
+			buffer[j] = branches[j+1][i]
+		}
+		item.Merge(buffer)
+	}
+}
+
+// getMasterBranch returns the branch with the smallest index.
+func getMasterBranch(branches map[int][]PipelineItem) []PipelineItem {
+	minKey := 1 << 31
+	var minVal []PipelineItem
+	for key, val := range branches {
+		if key < minKey {
+			minKey = key
+			minVal = val
+		}
+	}
+	return minVal
+}
+
+// prepareRunPlan schedules the actions for Pipeline.Run().
+func prepareRunPlan(commits []*object.Commit) []runAction {
+	hashes, dag := buildDag(commits)
+	leaveRootComponent(hashes, dag)
+	numParents := bindNumParents(hashes, dag)
+	mergedDag, mergedSeq := mergeDag(numParents, hashes, dag)
+	orderNodes := bindOrderNodes(mergedDag)
+	collapseFastForwards(orderNodes, hashes, mergedDag, dag, mergedSeq)
+
+	/*fmt.Printf("digraph Hercules {\n")
+	for i, c := range order {
+		commit := hashes[c]
+		fmt.Printf("  \"%s\"[label=\"[%d] %s\"]\n", commit.Hash.String(), i, commit.Hash.String()[:6])
+		for _, child := range mergedDag[commit.Hash] {
+			fmt.Printf("  \"%s\" -> \"%s\"\n", commit.Hash.String(), child.Hash.String())
+		}
+	}
+	fmt.Printf("}\n")*/
+
+	plan := generatePlan(orderNodes, numParents, hashes, mergedDag, dag, mergedSeq)
+	plan = optimizePlan(plan)
+	return plan
+}
+
+// buildDag generates the raw commit DAG and the commit hash map.
+func buildDag(commits []*object.Commit) (
+	map[string]*object.Commit, map[plumbing.Hash][]*object.Commit) {
+
+	hashes := map[string]*object.Commit{}
+	for _, commit := range commits {
+		hashes[commit.Hash.String()] = commit
+	}
+	dag := map[plumbing.Hash][]*object.Commit{}
+	for _, commit := range commits {
+		if _, exists := dag[commit.Hash]; !exists {
+			dag[commit.Hash] = make([]*object.Commit, 0, 1)
+		}
+		for _, parent := range commit.ParentHashes {
+			if _, exists := hashes[parent.String()]; !exists {
+				continue
+			}
+			children := dag[parent]
+			if children == nil {
+				children = make([]*object.Commit, 0, 1)
+			}
+			dag[parent] = append(children, commit)
+		}
+	}
+	return hashes, dag
+}
+
+// bindNumParents returns curried "numParents" function.
+func bindNumParents(
+	hashes map[string]*object.Commit,
+	dag map[plumbing.Hash][]*object.Commit) func(c *object.Commit) int {
+	return func(c *object.Commit) int {
+		r := 0
+		for _, parent := range c.ParentHashes {
+			if p, exists := hashes[parent.String()]; exists {
+				for _, pc := range dag[p.Hash] {
+					if pc.Hash == c.Hash {
+						r++
+						break
+					}
+				}
+			}
+		}
+		return r
+	}
+}
+
+// leaveRootComponent runs connected components analysis and throws away everything
+// but the part which grows from the root.
+func leaveRootComponent(
+	hashes map[string]*object.Commit,
+	dag map[plumbing.Hash][]*object.Commit) {
+
+	visited := map[plumbing.Hash]bool{}
+	var sets [][]plumbing.Hash
+	for key := range dag {
+		if visited[key] {
+			continue
+		}
+		var set []plumbing.Hash
+		for queue := []plumbing.Hash{key}; len(queue) > 0; {
+			head := queue[len(queue)-1]
+			queue = queue[:len(queue)-1]
+			if visited[head] {
+				continue
+			}
+			set = append(set, head)
+			visited[head] = true
+			for _, c := range dag[head] {
+				if !visited[c.Hash] {
+					queue = append(queue, c.Hash)
+				}
+			}
+			if commit, exists := hashes[head.String()]; exists {
+				for _, p := range commit.ParentHashes {
+					if !visited[p] {
+						if _, exists := hashes[p.String()]; exists {
+							queue = append(queue, p)
+						}
+					}
+				}
+			}
+		}
+		sets = append(sets, set)
+	}
+	if len(sets) > 1 {
+		maxlen := 0
+		maxind := -1
+		for i, set := range sets {
+			if len(set) > maxlen {
+				maxlen = len(set)
+				maxind = i
+			}
+		}
+		for i, set := range sets {
+			if i == maxind {
+				continue
+			}
+			for _, h := range set {
+				log.Printf("warning: dropped %s from the analysis - disjoint", h.String())
+				delete(dag, h)
+				delete(hashes, h.String())
+			}
+		}
+	}
+}
+
+// bindOrderNodes returns curried "orderNodes" function.
+func bindOrderNodes(mergedDag map[plumbing.Hash][]*object.Commit) func(reverse bool) []string {
+	return func(reverse bool) []string {
+		graph := toposort.NewGraph()
+		keys := make([]plumbing.Hash, 0, len(mergedDag))
+		for key := range mergedDag {
+			keys = append(keys, key)
+		}
+		sort.Slice(keys, func(i, j int) bool { return keys[i].String() < keys[j].String() })
+		for _, key := range keys {
+			graph.AddNode(key.String())
+		}
+		for _, key := range keys {
+			children := mergedDag[key]
+			sort.Slice(children, func(i, j int) bool {
+				return children[i].Hash.String() < children[j].Hash.String()
+			})
+			for _, c := range children {
+				graph.AddEdge(key.String(), c.Hash.String())
+			}
+		}
+		order, ok := graph.Toposort()
+		if !ok {
+			// should never happen
+			panic("Could not topologically sort the DAG of commits")
+		}
+		if reverse {
+			// one day this must appear in the standard library...
+			for i, j := 0, len(order)-1; i < len(order)/2; i, j = i+1, j-1 {
+				order[i], order[j] = order[j], order[i]
+			}
+		}
+		return order
+	}
+}
+
+// mergeDag turns sequences of consecutive commits into single nodes.
+func mergeDag(
+	numParents func(c *object.Commit) int,
+	hashes map[string]*object.Commit,
+	dag map[plumbing.Hash][]*object.Commit) (
+	mergedDag, mergedSeq map[plumbing.Hash][]*object.Commit) {
+
+	parentOf := func(c *object.Commit) plumbing.Hash {
+		var parent plumbing.Hash
+		for _, p := range c.ParentHashes {
+			if _, exists := hashes[p.String()]; exists {
+				if parent != plumbing.ZeroHash {
+					// more than one parent
+					return plumbing.ZeroHash
+				}
+				parent = p
+			}
+		}
+		return parent
+	}
+	mergedDag = map[plumbing.Hash][]*object.Commit{}
+	mergedSeq = map[plumbing.Hash][]*object.Commit{}
+	visited := map[plumbing.Hash]bool{}
+	for ch := range dag {
+		c := hashes[ch.String()]
+		if visited[c.Hash] {
+			continue
+		}
+		for true {
+			parent := parentOf(c)
+			if parent == plumbing.ZeroHash || len(dag[parent]) != 1 {
+				break
+			}
+			c = hashes[parent.String()]
+		}
+		head := c
+		var seq []*object.Commit
+		children := dag[c.Hash]
+		for true {
+			visited[c.Hash] = true
+			seq = append(seq, c)
+			if len(children) != 1 {
+				break
+			}
+			c = children[0]
+			children = dag[c.Hash]
+			if numParents(c) != 1 {
+				break
+			}
+		}
+		mergedSeq[head.Hash] = seq
+		mergedDag[head.Hash] = dag[seq[len(seq)-1].Hash]
+	}
+	return
+}
+
+// collapseFastForwards removes the fast forward merges.
+func collapseFastForwards(
+	orderNodes func(reverse bool) []string,
+	hashes map[string]*object.Commit,
+	mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit)  {
+
+	for _, strkey := range orderNodes(true) {
+		key := hashes[strkey].Hash
+		vals, exists := mergedDag[key]
+		if !exists {
+			continue
+		}
+		if len(vals) == 2 {
+			grand1 := mergedDag[vals[0].Hash]
+			grand2 := mergedDag[vals[1].Hash]
+			if len(grand2) == 1 && vals[0].Hash == grand2[0].Hash {
+				mergedDag[key] = mergedDag[vals[0].Hash]
+				dag[key] = vals[1:]
+				delete(mergedDag, vals[0].Hash)
+				delete(mergedDag, vals[1].Hash)
+				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
+				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
+				delete(mergedSeq, vals[0].Hash)
+				delete(mergedSeq, vals[1].Hash)
+			}
+			// symmetric
+			if len(grand1) == 1 && vals[1].Hash == grand1[0].Hash {
+				mergedDag[key] = mergedDag[vals[1].Hash]
+				dag[key] = vals[:1]
+				delete(mergedDag, vals[0].Hash)
+				delete(mergedDag, vals[1].Hash)
+				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
+				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
+				delete(mergedSeq, vals[0].Hash)
+				delete(mergedSeq, vals[1].Hash)
+			}
+		}
+	}
+}
+
+// generatePlan creates the list of actions from the commit DAG.
+func generatePlan(
+	orderNodes func(reverse bool) []string,
+	numParents func(c *object.Commit) int,
+	hashes map[string]*object.Commit,
+	mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) []runAction {
+
+	var plan []runAction
+	branches := map[plumbing.Hash]int{}
+	counter := 1
+	for seqIndex, name := range orderNodes(false) {
+		commit := hashes[name]
+		if seqIndex == 0 {
+			branches[commit.Hash] = 0
+		}
+		var branch int
+		{
+			var exists bool
+			branch, exists = branches[commit.Hash]
+			if !exists {
+				branch = -1
+			}
+		}
+		branchExists := func() bool { return branch >= 0 }
+		appendCommit := func(c *object.Commit, branch int) {
+			plan = append(plan, runAction{
+				Action: runActionCommit,
+				Commit: c,
+				Items: []int{branch},
+			})
+		}
+		appendMergeIfNeeded := func() {
+			if numParents(commit) < 2 {
+				return
+			}
+			// merge after the merge commit (the first in the sequence)
+			var items []int
+			minBranch := 1 << 31
+			for _, parent := range commit.ParentHashes {
+				if _, exists := hashes[parent.String()]; exists {
+					parentBranch := branches[parent]
+					if len(dag[parent]) == 1 && minBranch > parentBranch {
+						minBranch = parentBranch
+					}
+					items = append(items, parentBranch)
+					if parentBranch != branch {
+						appendCommit(commit, parentBranch)
+					}
+				}
+			}
+			if minBranch < 1 << 31 {
+				branch = minBranch
+				branches[commit.Hash] = minBranch
+			} else if !branchExists() {
+				panic("!branchExists()")
+			}
+			plan = append(plan, runAction{
+				Action: runActionMerge,
+				Commit: nil,
+				Items: items,
+			})
+		}
+		if subseq, exists := mergedSeq[commit.Hash]; exists {
+			for subseqIndex, offspring := range subseq {
+				if branchExists() {
+					appendCommit(offspring, branch)
+				}
+				if subseqIndex == 0 {
+					appendMergeIfNeeded()
+				}
+			}
+			branches[subseq[len(subseq)-1].Hash] = branch
+		}
+		if len(mergedDag[commit.Hash]) > 1 {
+			branches[mergedDag[commit.Hash][0].Hash] = branch
+			children := []int{branch}
+			for i, child := range mergedDag[commit.Hash] {
+				if i > 0 {
+					branches[child.Hash] = counter
+					children = append(children, counter)
+					counter++
+				}
+			}
+			plan = append(plan, runAction{
+				Action: runActionFork,
+				Commit: nil,
+				Items: children,
+			})
+		}
+	}
+	return plan
+}
+
+// optimizePlan removes "dead" nodes and inserts `runActionDelete` disposal steps.
+//
+// |   *
+// *  /
+// |\/
+// |/
+// *
+//
+func optimizePlan(plan []runAction) []runAction {
+	// lives maps branch index to the number of commits in that branch
+	lives := map[int]int{}
+	// lastMentioned maps branch index to the index inside `plan` when that branch was last used
+	lastMentioned := map[int]int{}
+	for i, p := range plan {
+		firstItem := p.Items[0]
+		switch p.Action {
+		case runActionCommit:
+			lives[firstItem]++
+			lastMentioned[firstItem] = i
+		case runActionFork:
+			lastMentioned[firstItem] = i
+		case runActionMerge:
+			for _, item := range p.Items {
+				lastMentioned[item] = i
+			}
+		}
+	}
+	branchesToDelete := map[int]bool{}
+	for key, life := range lives {
+		if life == 1 {
+			branchesToDelete[key] = true
+			delete(lastMentioned, key)
+		}
+	}
+	var optimizedPlan []runAction
+	lastMentionedArr := make([][2]int, 0, len(lastMentioned) + 1)
+	for key, val := range lastMentioned {
+		if val != len(plan) - 1 {
+			lastMentionedArr = append(lastMentionedArr, [2]int{val, key})
+		}
+	}
+	if len(lastMentionedArr) == 0 && len(branchesToDelete) == 0 {
+		// early return - we have nothing to optimize
+		return plan
+	}
+	sort.Slice(lastMentionedArr, func(i, j int) bool {
+		return lastMentionedArr[i][0] < lastMentionedArr[j][0]
+	})
+	lastMentionedArr = append(lastMentionedArr, [2]int{len(plan)-1, -1})
+	prevpi := -1
+	for _, pair := range lastMentionedArr {
+		for pi := prevpi + 1; pi <= pair[0]; pi++ {
+			p := plan[pi]
+			switch p.Action {
+			case runActionCommit:
+				if !branchesToDelete[p.Items[0]] {
+					optimizedPlan = append(optimizedPlan, p)
+				}
+			case runActionFork:
+				var newBranches []int
+				for _, b := range p.Items {
+					if !branchesToDelete[b] {
+						newBranches = append(newBranches, b)
+					}
+				}
+				if len(newBranches) > 1 {
+					optimizedPlan = append(optimizedPlan, runAction{
+						Action: runActionFork,
+						Commit: p.Commit,
+						Items:  newBranches,
+					})
+				}
+			case runActionMerge:
+				var newBranches []int
+				for _, b := range p.Items {
+					if !branchesToDelete[b] {
+						newBranches = append(newBranches, b)
+					}
+				}
+				if len(newBranches) > 1 {
+					optimizedPlan = append(optimizedPlan, runAction{
+						Action: runActionMerge,
+						Commit: p.Commit,
+						Items:  newBranches,
+					})
+				}
+			}
+		}
+		if pair[1] >= 0 {
+			prevpi = pair[0]
+			optimizedPlan = append(optimizedPlan, runAction{
+				Action: runActionDelete,
+				Commit: nil,
+				Items:  []int{pair[1]},
+			})
+		}
+	}
+	// single commit can be detected as redundant
+	if len(optimizedPlan) > 0 {
+		return optimizedPlan
+	}
+	return plan
+	// TODO(vmarkovtsev): there can be also duplicate redundant merges, e.g.
+	/*
+	0 4e34f03d829fbacb71cde0e010de87ea945dc69a [3]
+	0 4e34f03d829fbacb71cde0e010de87ea945dc69a [12]
+	2                                          [3 12]
+	0 06716c2b39422938b77ddafa4d5c39bb9e4476da [3]
+	0 06716c2b39422938b77ddafa4d5c39bb9e4476da [12]
+	2                                          [3 12]
+	0 1219c7bf9e0e1a93459a052ab8b351bfc379dc19 [12]
+	*/
+}

+ 59 - 503
internal/core/pipeline.go

@@ -99,9 +99,18 @@ type PipelineItem interface {
 	Initialize(*git.Repository)
 	// Consume processes the next commit.
 	// deps contains the required entities which match Depends(). Besides, it always includes
-	// "commit" and "index".
+	// DependencyCommit and DependencyIndex.
 	// Returns the calculated entities which match Provides().
 	Consume(deps map[string]interface{}) (map[string]interface{}, error)
+	// Fork clones the item the requested number of times. The data links between the clones
+	// are up to the implementation. Needed to handle Git branches. See also Merge().
+	// Returns a slice with `n` fresh clones. In other words, it does not include the original item.
+	Fork(n int) []PipelineItem
+	// Merge combines several branches together. Each is supposed to have been created with Fork().
+	// The result is stored in the called item, thus this function returns nothing.
+	// Merge() must update all the branches, not only self. When several branches merge, some of
+	// them may continue to live, hence this requirement.
+	Merge(branches []PipelineItem)
 }
 
 // FeaturedPipelineItem enables switching the automatic insertion of pipeline items on or off.
@@ -197,8 +206,8 @@ func MetadataToCommonAnalysisResult(meta *Metadata) *CommonAnalysisResult {
 // See the extended example of how a Pipeline works in doc.go
 type Pipeline struct {
 	// OnProgress is the callback which is invoked in Analyse() to output it's
-	// progress. The first argument is the number of processed commits and the
-	// second is the total number of commits.
+	// progress. The first argument is the number of complete steps and the
+	// second is the total number of steps.
 	OnProgress func(int, int)
 
 	// Repository points to the analysed Git repository struct from go-git.
@@ -227,6 +236,12 @@ const (
 	// ConfigPipelineCommits is the name of the Pipeline configuration option (Pipeline.Initialize())
 	// which allows to specify the custom commit sequence. By default, Pipeline.Commits() is used.
 	ConfigPipelineCommits = "commits"
+	// DependencyCommit is the name of one of the two items in `deps` supplied to PipelineItem.Consume()
+	// which always exists. It corresponds to the currently analyzed commit.
+	DependencyCommit = "commit"
+	// DependencyIndex is the name of one of the two items in `deps` supplied to PipelineItem.Consume()
+	// which always exists. It corresponds to the currently analyzed commit's index.
+	DependencyIndex = "index"
 )
 
 // NewPipeline initializes a new instance of Pipeline struct.
@@ -549,33 +564,56 @@ func (pipeline *Pipeline) Run(commits []*object.Commit) (map[LeafPipelineItem]in
 	if onProgress == nil {
 		onProgress = func(int, int) {}
 	}
+	plan := prepareRunPlan(commits)
+	progressSteps := len(plan) + 2
+	branches := map[int][]PipelineItem{0: pipeline.items}
 
-	for index, commit := range commits {
-		onProgress(index, len(commits))
-		state := map[string]interface{}{"commit": commit, "index": index}
-		for _, item := range pipeline.items {
-			update, err := item.Consume(state)
-			if err != nil {
-				log.Printf("%s failed on commit #%d %s\n",
-					item.Name(), index, commit.Hash.String())
-				return nil, err
-			}
-			for _, key := range item.Provides() {
-				val, ok := update[key]
-				if !ok {
-					panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
+	for index, step := range plan {
+		onProgress(index + 1, progressSteps)
+		firstItem := step.Items[0]
+		switch step.Action {
+		case runActionCommit:
+			state := map[string]interface{}{
+				DependencyCommit: step.Commit,
+				DependencyIndex: index,
+			}
+			for _, item := range branches[firstItem] {
+				update, err := item.Consume(state)
+				if err != nil {
+					log.Printf("%s failed on commit #%d %s\n",
+						item.Name(), index + 1, step.Commit.Hash.String())
+					return nil, err
+				}
+				for _, key := range item.Provides() {
+					val, ok := update[key]
+					if !ok {
+						panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
+					}
+					state[key] = val
 				}
-				state[key] = val
 			}
+		case runActionFork:
+			for i, clone := range cloneItems(branches[firstItem], len(step.Items)-1) {
+				branches[step.Items[i+1]] = clone
+			}
+		case runActionMerge:
+			merged := make([][]PipelineItem, len(step.Items))
+			for i, b := range step.Items {
+				merged[i] = branches[b]
+			}
+			mergeItems(merged)
+		case runActionDelete:
+			delete(branches, firstItem)
 		}
 	}
-	onProgress(len(commits), len(commits))
+	onProgress(len(plan) + 1, progressSteps)
 	result := map[LeafPipelineItem]interface{}{}
-	for _, item := range pipeline.items {
+	for _, item := range getMasterBranch(branches) {
 		if casted, ok := item.(LeafPipelineItem); ok {
 			result[casted] = casted.Finalize()
 		}
 	}
+	onProgress(progressSteps, progressSteps)
 	result[nil] = &CommonAnalysisResult{
 		BeginTime:     commits[0].Author.When.Unix(),
 		EndTime:       commits[len(commits)-1].Author.When.Unix(),
@@ -600,7 +638,7 @@ func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Com
 		file = os.Stdin
 	}
 	scanner := bufio.NewScanner(file)
-	commits := []*object.Commit{}
+	var commits []*object.Commit
 	for scanner.Scan() {
 		hash := plumbing.NewHash(scanner.Text())
 		if len(hash) != 20 {
@@ -614,485 +652,3 @@ func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Com
 	}
 	return commits, nil
 }
-
-const (
-	runActionCommit = 0
-	runActionFork = iota
-	runActionMerge = iota
-	runActionDelete = iota
-)
-
-type runAction struct {
-	Action int
-	Commit *object.Commit
-	Items []int
-}
-
-func prepareRunPlan(commits []*object.Commit) []runAction {
-	hashes, dag := buildDag(commits)
-	leaveRootComponent(hashes, dag)
-	numParents := bindNumParents(hashes, dag)
-	mergedDag, mergedSeq := mergeDag(numParents, hashes, dag)
-	orderNodes := bindOrderNodes(mergedDag)
-	collapseFastForwards(orderNodes, hashes, mergedDag, dag, mergedSeq)
-
-	/*fmt.Printf("digraph Hercules {\n")
-	for i, c := range order {
-		commit := hashes[c]
-		fmt.Printf("  \"%s\"[label=\"[%d] %s\"]\n", commit.Hash.String(), i, commit.Hash.String()[:6])
-		for _, child := range mergedDag[commit.Hash] {
-			fmt.Printf("  \"%s\" -> \"%s\"\n", commit.Hash.String(), child.Hash.String())
-		}
-	}
-	fmt.Printf("}\n")*/
-
-	plan := generatePlan(orderNodes, numParents, hashes, mergedDag, dag, mergedSeq)
-	plan = optimizePlan(plan)
-	return plan
-}
-
-// buildDag generates the raw commit DAG and the commit hash map.
-func buildDag(commits []*object.Commit) (
-	map[string]*object.Commit, map[plumbing.Hash][]*object.Commit) {
-
-	hashes := map[string]*object.Commit{}
-	for _, commit := range commits {
-		hashes[commit.Hash.String()] = commit
-	}
-	dag := map[plumbing.Hash][]*object.Commit{}
-	for _, commit := range commits {
-		if _, exists := dag[commit.Hash]; !exists {
-			dag[commit.Hash] = make([]*object.Commit, 0, 1)
-		}
-		for _, parent := range commit.ParentHashes {
-			if _, exists := hashes[parent.String()]; !exists {
-				continue
-			}
-			children := dag[parent]
-			if children == nil {
-				children = make([]*object.Commit, 0, 1)
-			}
-			dag[parent] = append(children, commit)
-		}
-	}
-	return hashes, dag
-}
-
-// bindNumParents returns curried "numParents" function.
-func bindNumParents(
-	hashes map[string]*object.Commit,
-	dag map[plumbing.Hash][]*object.Commit) func(c *object.Commit) int {
-	return func(c *object.Commit) int {
-		r := 0
-		for _, parent := range c.ParentHashes {
-			if p, exists := hashes[parent.String()]; exists {
-				for _, pc := range dag[p.Hash] {
-					if pc.Hash == c.Hash {
-						r++
-						break
-					}
-				}
-			}
-		}
-		return r
-	}
-}
-
-// leaveRootComponent runs connected components analysis and throws away everything
-// but the part which grows from the root.
-func leaveRootComponent(
-	hashes map[string]*object.Commit,
-	dag map[plumbing.Hash][]*object.Commit) {
-
-	visited := map[plumbing.Hash]bool{}
-	var sets [][]plumbing.Hash
-	for key := range dag {
-		if visited[key] {
-			continue
-		}
-		var set []plumbing.Hash
-		for queue := []plumbing.Hash{key}; len(queue) > 0; {
-			head := queue[len(queue)-1]
-			queue = queue[:len(queue)-1]
-			if visited[head] {
-				continue
-			}
-			set = append(set, head)
-			visited[head] = true
-			for _, c := range dag[head] {
-				if !visited[c.Hash] {
-					queue = append(queue, c.Hash)
-				}
-			}
-			if commit, exists := hashes[head.String()]; exists {
-				for _, p := range commit.ParentHashes {
-					if !visited[p] {
-						if _, exists := hashes[p.String()]; exists {
-							queue = append(queue, p)
-						}
-					}
-				}
-			}
-		}
-		sets = append(sets, set)
-	}
-	if len(sets) > 1 {
-		maxlen := 0
-		maxind := -1
-		for i, set := range sets {
-			if len(set) > maxlen {
-				maxlen = len(set)
-				maxind = i
-			}
-		}
-		for i, set := range sets {
-			if i == maxind {
-				continue
-			}
-			for _, h := range set {
-				log.Printf("warning: dropped %s from the analysis - disjoint", h.String())
-				delete(dag, h)
-				delete(hashes, h.String())
-			}
-		}
-	}
-}
-
-// bindOrderNodes returns curried "orderNodes" function.
-func bindOrderNodes(mergedDag map[plumbing.Hash][]*object.Commit) func(reverse bool) []string {
-	return func(reverse bool) []string {
-		graph := toposort.NewGraph()
-		keys := make([]plumbing.Hash, 0, len(mergedDag))
-		for key := range mergedDag {
-			keys = append(keys, key)
-		}
-		sort.Slice(keys, func(i, j int) bool { return keys[i].String() < keys[j].String() })
-		for _, key := range keys {
-			graph.AddNode(key.String())
-		}
-		for _, key := range keys {
-			children := mergedDag[key]
-			sort.Slice(children, func(i, j int) bool {
-				return children[i].Hash.String() < children[j].Hash.String()
-			})
-			for _, c := range children {
-				graph.AddEdge(key.String(), c.Hash.String())
-			}
-		}
-		order, ok := graph.Toposort()
-		if !ok {
-			// should never happen
-			panic("Could not topologically sort the DAG of commits")
-		}
-		if reverse {
-			// one day this must appear in the standard library...
-			for i, j := 0, len(order)-1; i < len(order)/2; i, j = i+1, j-1 {
-				order[i], order[j] = order[j], order[i]
-			}
-		}
-		return order
-	}
-}
-
-// mergeDag turns sequences of consecutive commits into single nodes.
-func mergeDag(
-	numParents func(c *object.Commit) int,
-	hashes map[string]*object.Commit,
-	dag map[plumbing.Hash][]*object.Commit) (
-		mergedDag, mergedSeq map[plumbing.Hash][]*object.Commit) {
-
-	parentOf := func(c *object.Commit) plumbing.Hash {
-		var parent plumbing.Hash
-		for _, p := range c.ParentHashes {
-			if _, exists := hashes[p.String()]; exists {
-				if parent != plumbing.ZeroHash {
-					// more than one parent
-					return plumbing.ZeroHash
-				}
-				parent = p
-			}
-		}
-		return parent
-	}
-	mergedDag = map[plumbing.Hash][]*object.Commit{}
-	mergedSeq = map[plumbing.Hash][]*object.Commit{}
-	visited := map[plumbing.Hash]bool{}
-	for ch := range dag {
-		c := hashes[ch.String()]
-		if visited[c.Hash] {
-			continue
-		}
-		for true {
-			parent := parentOf(c)
-			if parent == plumbing.ZeroHash || len(dag[parent]) != 1 {
-				break
-			}
-			c = hashes[parent.String()]
-		}
-		head := c
-		var seq []*object.Commit
-		children := dag[c.Hash]
-		for true {
-			visited[c.Hash] = true
-			seq = append(seq, c)
-			if len(children) != 1 {
-				break
-			}
-			c = children[0]
-			children = dag[c.Hash]
-			if numParents(c) != 1 {
-				break
-			}
-		}
-		mergedSeq[head.Hash] = seq
-		mergedDag[head.Hash] = dag[seq[len(seq)-1].Hash]
-	}
-	return
-}
-
-// collapseFastForwards removes the fast forward merges.
-func collapseFastForwards(
-	orderNodes func(reverse bool) []string,
-	hashes map[string]*object.Commit,
-	mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit)  {
-
-	for _, strkey := range orderNodes(true) {
-		key := hashes[strkey].Hash
-		vals, exists := mergedDag[key]
-		if !exists {
-			continue
-		}
-		if len(vals) == 2 {
-			grand1 := mergedDag[vals[0].Hash]
-			grand2 := mergedDag[vals[1].Hash]
-			if len(grand2) == 1 && vals[0].Hash == grand2[0].Hash {
-				mergedDag[key] = mergedDag[vals[0].Hash]
-				dag[key] = vals[1:]
-				delete(mergedDag, vals[0].Hash)
-				delete(mergedDag, vals[1].Hash)
-				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
-				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
-				delete(mergedSeq, vals[0].Hash)
-				delete(mergedSeq, vals[1].Hash)
-			}
-			// symmetric
-			if len(grand1) == 1 && vals[1].Hash == grand1[0].Hash {
-				mergedDag[key] = mergedDag[vals[1].Hash]
-				dag[key] = vals[:1]
-				delete(mergedDag, vals[0].Hash)
-				delete(mergedDag, vals[1].Hash)
-				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
-				mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
-				delete(mergedSeq, vals[0].Hash)
-				delete(mergedSeq, vals[1].Hash)
-			}
-		}
-	}
-}
-
-// generatePlan creates the list of actions from the commit DAG.
-func generatePlan(
-	orderNodes func(reverse bool) []string,
-	numParents func(c *object.Commit) int,
-	hashes map[string]*object.Commit,
-	mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) []runAction {
-
-	var plan []runAction
-	branches := map[plumbing.Hash]int{}
-	counter := 1
-	for seqIndex, name := range orderNodes(false) {
-		commit := hashes[name]
-		if seqIndex == 0 {
-			branches[commit.Hash] = 0
-		}
-		var branch int
-		{
-			var exists bool
-			branch, exists = branches[commit.Hash]
-			if !exists {
-				branch = -1
-			}
-		}
-		branchExists := func() bool { return branch >= 0 }
-		appendCommit := func(c *object.Commit, branch int) {
-			plan = append(plan, runAction{
-				Action: runActionCommit,
-				Commit: c,
-				Items: []int{branch},
-			})
-		}
-		appendMergeIfNeeded := func() {
-			if numParents(commit) < 2 {
-				return
-			}
-			// merge after the merge commit (the first in the sequence)
-			var items []int
-			minBranch := 1 << 31
-			for _, parent := range commit.ParentHashes {
-				if _, exists := hashes[parent.String()]; exists {
-					parentBranch := branches[parent]
-					if len(dag[parent]) == 1 && minBranch > parentBranch {
-						minBranch = parentBranch
-					}
-					items = append(items, parentBranch)
-					if parentBranch != branch {
-						appendCommit(commit, parentBranch)
-					}
-				}
-			}
-			if minBranch < 1 << 31 {
-				branch = minBranch
-				branches[commit.Hash] = minBranch
-			} else if !branchExists() {
-				panic("!branchExists()")
-			}
-			plan = append(plan, runAction{
-				Action: runActionMerge,
-				Commit: nil,
-				Items: items,
-			})
-		}
-		if subseq, exists := mergedSeq[commit.Hash]; exists {
-			for subseqIndex, offspring := range subseq {
-				if branchExists() {
-					appendCommit(offspring, branch)
-				}
-				if subseqIndex == 0 {
-					appendMergeIfNeeded()
-				}
-			}
-			branches[subseq[len(subseq)-1].Hash] = branch
-		}
-		if len(mergedDag[commit.Hash]) > 1 {
-			branches[mergedDag[commit.Hash][0].Hash] = branch
-			children := []int{branch}
-			for i, child := range mergedDag[commit.Hash] {
-				if i > 0 {
-					branches[child.Hash] = counter
-					children = append(children, counter)
-					counter++
-				}
-			}
-			plan = append(plan, runAction{
-				Action: runActionFork,
-				Commit: nil,
-				Items: children,
-			})
-		}
-	}
-	return plan
-}
-
-// optimizePlan removes "dead" nodes and inserts `runActionDelete` disposal steps.
-//
-// |   *
-// *  /
-// |\/
-// |/
-// *
-//
-func optimizePlan(plan []runAction) []runAction {
-	// lives maps branch index to the number of commits in that branch
-	lives := map[int]int{}
-	// lastMentioned maps branch index to the index inside `plan` when that branch was last used
-	lastMentioned := map[int]int{}
-	for i, p := range plan {
-		firstItem := p.Items[0]
-		switch p.Action {
-		case runActionCommit:
-			lives[firstItem]++
-			lastMentioned[firstItem] = i
-		case runActionFork:
-			lastMentioned[firstItem] = i
-		case runActionMerge:
-			for _, item := range p.Items {
-				lastMentioned[item] = i
-			}
-		}
-	}
-	branchesToDelete := map[int]bool{}
-	for key, life := range lives {
-		if life == 1 {
-			branchesToDelete[key] = true
-			delete(lastMentioned, key)
-		}
-	}
-	var optimizedPlan []runAction
-	lastMentionedArr := make([][2]int, 0, len(lastMentioned) + 1)
-	for key, val := range lastMentioned {
-		if val != len(plan) - 1 {
-			lastMentionedArr = append(lastMentionedArr, [2]int{val, key})
-		}
-	}
-	if len(lastMentionedArr) == 0 && len(branchesToDelete) == 0 {
-		// early return - we have nothing to optimize
-		return plan
-	}
-	sort.Slice(lastMentionedArr, func(i, j int) bool { 
-		return lastMentionedArr[i][0] < lastMentionedArr[j][0]
-	})
-	lastMentionedArr = append(lastMentionedArr, [2]int{len(plan)-1, -1})
-	prevpi := -1
-	for _, pair := range lastMentionedArr {
-		for pi := prevpi + 1; pi <= pair[0]; pi++ {
-			p := plan[pi]
-			switch p.Action {
-			case runActionCommit:
-				if !branchesToDelete[p.Items[0]] {
-					optimizedPlan = append(optimizedPlan, p)
-				}
-			case runActionFork:
-				var newBranches []int
-				for _, b := range p.Items {
-					if !branchesToDelete[b] {
-						newBranches = append(newBranches, b)
-					}
-				}
-				if len(newBranches) > 1 {
-					optimizedPlan = append(optimizedPlan, runAction{
-						Action: runActionFork,
-						Commit: p.Commit,
-						Items:  newBranches,
-					})
-				}
-			case runActionMerge:
-				var newBranches []int
-				for _, b := range p.Items {
-					if !branchesToDelete[b] {
-						newBranches = append(newBranches, b)
-					}
-				}
-				if len(newBranches) > 1 {
-					optimizedPlan = append(optimizedPlan, runAction{
-						Action: runActionMerge,
-						Commit: p.Commit,
-						Items:  newBranches,
-					})
-				}
-			}
-		}
-		if pair[1] >= 0 {
-			prevpi = pair[0]
-			optimizedPlan = append(optimizedPlan, runAction{
-				Action: runActionDelete,
-				Commit: nil,
-				Items:  []int{pair[1]},
-			})
-		}
-	}
-	// single commit can be detected as redundant
-	if len(optimizedPlan) > 0 {
-		return optimizedPlan
-	}
-	return plan
-	// TODO(vmarkovtsev): there can be also duplicate redundant merges, e.g.
-	/*
-	0 4e34f03d829fbacb71cde0e010de87ea945dc69a [3]
-	0 4e34f03d829fbacb71cde0e010de87ea945dc69a [12]
-	2                                          [3 12]
-	0 06716c2b39422938b77ddafa4d5c39bb9e4476da [3]
-	0 06716c2b39422938b77ddafa4d5c39bb9e4476da [12]
-	2                                          [3 12]
-	0 1219c7bf9e0e1a93459a052ab8b351bfc379dc19 [12]
-	*/
-}

+ 67 - 10
internal/core/pipeline_test.go

@@ -20,6 +20,8 @@ import (
 type testPipelineItem struct {
 	Initialized   bool
 	DepsConsumed  bool
+	Forked        bool
+	Merged        *bool
 	CommitMatches bool
 	IndexMatches  bool
 	TestError     bool
@@ -69,13 +71,13 @@ func (item *testPipelineItem) Consume(deps map[string]interface{}) (map[string]i
 	if item.TestError {
 		return nil, errors.New("error")
 	}
-	obj, exists := deps["commit"]
+	obj, exists := deps[DependencyCommit]
 	item.DepsConsumed = exists
 	if item.DepsConsumed {
 		commit := obj.(*object.Commit)
 		item.CommitMatches = commit.Hash == plumbing.NewHash(
 			"af9ddc0db70f09f3f27b4b98e415592a7485171c")
-		obj, item.DepsConsumed = deps["index"]
+		obj, item.DepsConsumed = deps[DependencyIndex]
 		if item.DepsConsumed {
 			item.IndexMatches = obj.(int) == 0
 		}
@@ -83,6 +85,19 @@ func (item *testPipelineItem) Consume(deps map[string]interface{}) (map[string]i
 	return map[string]interface{}{"test": item}, nil
 }
 
+func (item *testPipelineItem) Fork(n int) []PipelineItem {
+	result := make([]PipelineItem, n)
+	for i := 0; i < n; i++ {
+		result[i] = &testPipelineItem{Merged: item.Merged}
+	}
+	item.Forked = true
+	return result
+}
+
+func (item *testPipelineItem) Merge(branches []PipelineItem) {
+	*item.Merged = true
+}
+
 func (item *testPipelineItem) Finalize() interface{} {
 	return item
 }
@@ -140,6 +155,13 @@ func (item *dependingTestPipelineItem) Consume(deps map[string]interface{}) (map
 	return nil, nil
 }
 
+func (item *dependingTestPipelineItem) Fork(n int) []PipelineItem {
+	return nil
+}
+
+func (item *dependingTestPipelineItem) Merge(branches []PipelineItem) {
+}
+
 func (item *dependingTestPipelineItem) Finalize() interface{} {
 	return true
 }
@@ -176,7 +198,7 @@ func TestPipelineFeatures(t *testing.T) {
 
 func TestPipelineRun(t *testing.T) {
 	pipeline := NewPipeline(test.Repository)
-	item := &testPipelineItem{}
+	item := &testPipelineItem{Merged: new(bool)}
 	pipeline.AddItem(item)
 	pipeline.Initialize(map[string]interface{}{})
 	assert.True(t, item.Initialized)
@@ -195,22 +217,58 @@ func TestPipelineRun(t *testing.T) {
 	assert.True(t, item.DepsConsumed)
 	assert.True(t, item.CommitMatches)
 	assert.True(t, item.IndexMatches)
+	assert.False(t, item.Forked)
+	assert.False(t, *item.Merged)
 	pipeline.RemoveItem(item)
 	result, err = pipeline.Run(commits)
 	assert.Nil(t, err)
 	assert.Equal(t, 1, len(result))
 }
 
+func TestPipelineRunBranches(t *testing.T) {
+	pipeline := NewPipeline(test.Repository)
+	item := &testPipelineItem{Merged: new(bool)}
+	pipeline.AddItem(item)
+	pipeline.Initialize(map[string]interface{}{})
+	assert.True(t, item.Initialized)
+	commits := make([]*object.Commit, 5)
+	hashes := []string {
+		"6db8065cdb9bb0758f36a7e75fc72ab95f9e8145",
+		"f30daba81ff2bf0b3ba02a1e1441e74f8a4f6fee",
+		"8a03b5620b1caa72ec9cb847ea88332621e2950a",
+		"dd9dd084d5851d7dc4399fc7dbf3d8292831ebc5",
+		"f4ed0405b14f006c0744029d87ddb3245607587a",
+	}
+	for i, h := range hashes {
+		var err error
+		commits[i], err = test.Repository.CommitObject(plumbing.NewHash(h))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+	result, err := pipeline.Run(commits)
+	assert.Nil(t, err)
+	assert.True(t, item.Forked)
+	assert.True(t, *item.Merged)
+	assert.Equal(t, 2, len(result))
+	assert.Equal(t, item, result[item].(*testPipelineItem))
+	common := result[nil].(*CommonAnalysisResult)
+	assert.Equal(t, common.CommitsNumber, 5)
+}
+
 func TestPipelineOnProgress(t *testing.T) {
 	pipeline := NewPipeline(test.Repository)
-	var progressOk1, progressOk2 bool
+	progressOk := 0
 
 	onProgress := func(step int, total int) {
-		if step == 0 && total == 1 {
-			progressOk1 = true
+		if step == 1 && total == 3 {
+			progressOk++
+		}
+		if step == 2 && total == 3 {
+			progressOk++
 		}
-		if step == 1 && total == 1 && progressOk1 {
-			progressOk2 = true
+		if step == 3 && total == 3 {
+			progressOk++
 		}
 	}
 
@@ -221,8 +279,7 @@ func TestPipelineOnProgress(t *testing.T) {
 	result, err := pipeline.Run(commits)
 	assert.Nil(t, err)
 	assert.Equal(t, 1, len(result))
-	assert.True(t, progressOk1)
-	assert.True(t, progressOk2)
+	assert.Equal(t, 3, progressOk)
 }
 
 func TestPipelineCommits(t *testing.T) {

+ 14 - 0
internal/core/registry_test.go

@@ -59,6 +59,13 @@ func (item *dummyPipelineItem) Consume(deps map[string]interface{}) (map[string]
 	return map[string]interface{}{"dummy": nil}, nil
 }
 
+func (item *dummyPipelineItem) Fork(n int) []PipelineItem {
+	return nil
+}
+
+func (item *dummyPipelineItem) Merge(branches []PipelineItem) {
+}
+
 type dummyPipelineItem2 struct{}
 
 func (item *dummyPipelineItem2) Name() string {
@@ -92,6 +99,13 @@ func (item *dummyPipelineItem2) Consume(deps map[string]interface{}) (map[string
 	return map[string]interface{}{"dummy2": nil}, nil
 }
 
+func (item *dummyPipelineItem2) Fork(n int) []PipelineItem {
+	return nil
+}
+
+func (item *dummyPipelineItem2) Merge(branches []PipelineItem) {
+}
+
 func TestRegistrySummon(t *testing.T) {
 	reg := getRegistry()
 	reg.Register(&testPipelineItem{})

+ 53 - 29
internal/plumbing/blob_cache.go

@@ -17,19 +17,20 @@ import (
 // It must provide the old and the new objects; "blobCache" rotates and allows to not load
 // the same blobs twice. Outdated objects are removed so "blobCache" never grows big.
 type BlobCache struct {
-	// Specifies how to handle the situation when we encounter a git submodule - an object without
-	// the blob. If false, we look inside .gitmodules and if don't find, raise an error.
-	// If true, we do not look inside .gitmodules and always succeed.
-	IgnoreMissingSubmodules bool
+	core.NoopMerger
+	// Specifies how to handle the situation when we encounter a git submodule - an object
+	// without the blob. If true, we look inside .gitmodules and if we don't find it,
+	// raise an error. If false, we do not look inside .gitmodules and always succeed.
+	FailOnMissingSubmodules bool
 
 	repository *git.Repository
 	cache      map[plumbing.Hash]*object.Blob
 }
 
 const (
-	// ConfigBlobCacheIgnoreMissingSubmodules is the name of the configuration option for
-	// BlobCache.Configure() to not check if the referenced submodules exist.
-	ConfigBlobCacheIgnoreMissingSubmodules = "BlobCache.IgnoreMissingSubmodules"
+	// ConfigBlobCacheFailOnMissingSubmodules is the name of the configuration option for
+	// BlobCache.Configure() to check if the referenced submodules are registered in .gitignore.
+	ConfigBlobCacheFailOnMissingSubmodules = "BlobCache.FailOnMissingSubmodules"
 	// DependencyBlobCache identifies the dependency provided by BlobCache.
 	DependencyBlobCache = "blob_cache"
 )
@@ -58,11 +59,11 @@ func (blobCache *BlobCache) Requires() []string {
 // ListConfigurationOptions returns the list of changeable public properties of this PipelineItem.
 func (blobCache *BlobCache) ListConfigurationOptions() []core.ConfigurationOption {
 	options := [...]core.ConfigurationOption{{
-		Name: ConfigBlobCacheIgnoreMissingSubmodules,
-		Description: "Specifies whether to panic if some referenced submodules do not exist and thus" +
-			" the corresponding Git objects cannot be loaded. Override this if you know that the " +
-			"history is dirty and you want to get things done.",
-		Flag:    "ignore-missing-submodules",
+		Name: ConfigBlobCacheFailOnMissingSubmodules,
+		Description: "Specifies whether to panic if any referenced submodule does " +
+			"not exist in .gitmodules and thus the corresponding Git object cannot be loaded. " +
+			"Override this if you want to ensure that your repository is integral. ",
+		Flag:    "fail-on-missing-submodules",
 		Type:    core.BoolConfigurationOption,
 		Default: false}}
 	return options[:]
@@ -70,8 +71,8 @@ func (blobCache *BlobCache) ListConfigurationOptions() []core.ConfigurationOptio
 
 // Configure sets the properties previously published by ListConfigurationOptions().
 func (blobCache *BlobCache) Configure(facts map[string]interface{}) {
-	if val, exists := facts[ConfigBlobCacheIgnoreMissingSubmodules].(bool); exists {
-		blobCache.IgnoreMissingSubmodules = val
+	if val, exists := facts[ConfigBlobCacheFailOnMissingSubmodules].(bool); exists {
+		blobCache.FailOnMissingSubmodules = val
 	}
 }
 
@@ -84,11 +85,12 @@ func (blobCache *BlobCache) Initialize(repository *git.Repository) {
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
-// This function returns the mapping with analysis results. The keys must be the same as
-// in Provides(). If there was an error, nil is returned.
+// Additionally, DependencyCommit is always present there and represents
+// the analysed *object.Commit. This function returns the mapping with analysis
+// results. The keys must be the same as in Provides(). If there was an error,
+// nil is returned.
 func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	changes := deps[DependencyTreeChanges].(object.Changes)
 	cache := map[plumbing.Hash]*object.Blob{}
 	newCache := map[plumbing.Hash]*object.Blob{}
@@ -104,22 +106,25 @@ func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]int
 		case merkletrie.Insert:
 			blob, err = blobCache.getBlob(&change.To, commit.File)
 			if err != nil {
-				log.Printf("file to %s %s\n", change.To.Name, change.To.TreeEntry.Hash)
+				log.Printf("file to %s %s\n",
+					change.To.Name, change.To.TreeEntry.Hash)
 			} else {
 				cache[change.To.TreeEntry.Hash] = blob
 				newCache[change.To.TreeEntry.Hash] = blob
 			}
 		case merkletrie.Delete:
-			cache[change.From.TreeEntry.Hash], exists = blobCache.cache[change.From.TreeEntry.Hash]
+			cache[change.From.TreeEntry.Hash], exists =
+				blobCache.cache[change.From.TreeEntry.Hash]
 			if !exists {
-				cache[change.From.TreeEntry.Hash], err = blobCache.getBlob(&change.From, commit.File)
+				cache[change.From.TreeEntry.Hash], err =
+					blobCache.getBlob(&change.From, commit.File)
 				if err != nil {
 					if err.Error() != plumbing.ErrObjectNotFound.Error() {
 						log.Printf("file from %s %s\n", change.From.Name,
 							change.From.TreeEntry.Hash)
 					} else {
-						cache[change.From.TreeEntry.Hash], err = internal.CreateDummyBlob(
-							change.From.TreeEntry.Hash)
+						cache[change.From.TreeEntry.Hash], err =
+							internal.CreateDummyBlob(change.From.TreeEntry.Hash)
 					}
 				}
 			}
@@ -131,9 +136,11 @@ func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]int
 				cache[change.To.TreeEntry.Hash] = blob
 				newCache[change.To.TreeEntry.Hash] = blob
 			}
-			cache[change.From.TreeEntry.Hash], exists = blobCache.cache[change.From.TreeEntry.Hash]
+			cache[change.From.TreeEntry.Hash], exists =
+				blobCache.cache[change.From.TreeEntry.Hash]
 			if !exists {
-				cache[change.From.TreeEntry.Hash], err = blobCache.getBlob(&change.From, commit.File)
+				cache[change.From.TreeEntry.Hash], err =
+					blobCache.getBlob(&change.From, commit.File)
 				if err != nil {
 					log.Printf("file from %s\n", change.From.Name)
 				}
@@ -147,9 +154,26 @@ func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]int
 	return map[string]interface{}{DependencyBlobCache: cache}, nil
 }
 
-// FileGetter defines a function which loads the Git file by the specified path.
-// The state can be arbitrary though here it always corresponds to the currently processed
-// commit.
+// Fork clones this PipelineItem.
+func (blobCache *BlobCache) Fork(n int) []core.PipelineItem {
+	caches := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		cache := map[plumbing.Hash]*object.Blob{}
+		for k, v := range blobCache.cache {
+			cache[k] = v
+		}
+		caches[i] = &BlobCache{
+			FailOnMissingSubmodules: blobCache.FailOnMissingSubmodules,
+			repository: blobCache.repository,
+			cache: cache,
+		}
+	}
+	return caches
+}
+
+// FileGetter defines a function which loads the Git file by
+// the specified path. The state can be arbitrary though here it always
+// corresponds to the currently processed commit.
 type FileGetter func(path string) (*object.File, error)
 
 // Returns the blob which corresponds to the specified ChangeEntry.
@@ -164,7 +188,7 @@ func (blobCache *BlobCache) getBlob(entry *object.ChangeEntry, fileGetter FileGe
 		if entry.TreeEntry.Mode != 0160000 {
 			// this is not a submodule
 			return nil, err
-		} else if blobCache.IgnoreMissingSubmodules {
+		} else if !blobCache.FailOnMissingSubmodules {
 			return internal.CreateDummyBlob(entry.TreeEntry.Hash)
 		}
 		file, errModules := fileGetter(".gitmodules")

+ 52 - 17
internal/plumbing/blob_cache_test.go

@@ -20,14 +20,14 @@ func fixtureBlobCache() *BlobCache {
 func TestBlobCacheConfigureInitialize(t *testing.T) {
 	cache := fixtureBlobCache()
 	assert.Equal(t, test.Repository, cache.repository)
-	assert.False(t, cache.IgnoreMissingSubmodules)
+	assert.False(t, cache.FailOnMissingSubmodules)
 	facts := map[string]interface{}{}
-	facts[ConfigBlobCacheIgnoreMissingSubmodules] = true
+	facts[ConfigBlobCacheFailOnMissingSubmodules] = true
 	cache.Configure(facts)
-	assert.True(t, cache.IgnoreMissingSubmodules)
+	assert.True(t, cache.FailOnMissingSubmodules)
 	facts = map[string]interface{}{}
 	cache.Configure(facts)
-	assert.True(t, cache.IgnoreMissingSubmodules)
+	assert.True(t, cache.FailOnMissingSubmodules)
 }
 
 func TestBlobCacheMetadata(t *testing.T) {
@@ -40,7 +40,7 @@ func TestBlobCacheMetadata(t *testing.T) {
 	assert.Equal(t, cache.Requires()[0], changes.Provides()[0])
 	opts := cache.ListConfigurationOptions()
 	assert.Len(t, opts, 1)
-	assert.Equal(t, opts[0].Name, ConfigBlobCacheIgnoreMissingSubmodules)
+	assert.Equal(t, opts[0].Name, ConfigBlobCacheFailOnMissingSubmodules)
 }
 
 func TestBlobCacheRegistration(t *testing.T) {
@@ -78,7 +78,7 @@ func TestBlobCacheConsumeModification(t *testing.T) {
 		},
 	}}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, err)
@@ -124,7 +124,7 @@ func TestBlobCacheConsumeInsertionDeletion(t *testing.T) {
 	},
 	}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, err)
@@ -151,7 +151,7 @@ func TestBlobCacheConsumeNoAction(t *testing.T) {
 		"63076fa0dfd93e94b6d2ef0fc8b1fdf9092f83c4"))
 	changes[0] = &object.Change{From: object.ChangeEntry{}, To: object.ChangeEntry{}}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, result)
@@ -188,7 +188,7 @@ func TestBlobCacheConsumeBadHashes(t *testing.T) {
 		TreeEntry: object.TreeEntry{},
 	}}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, result)
@@ -235,7 +235,7 @@ func TestBlobCacheConsumeInvalidHash(t *testing.T) {
 		TreeEntry: object.TreeEntry{},
 	}}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, result)
@@ -294,7 +294,7 @@ func TestBlobCacheDeleteInvalidBlob(t *testing.T) {
 	}, To: object.ChangeEntry{},
 	}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, err)
@@ -325,7 +325,7 @@ func TestBlobCacheInsertInvalidBlob(t *testing.T) {
 	},
 	}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.NotNil(t, err)
@@ -334,14 +334,14 @@ func TestBlobCacheInsertInvalidBlob(t *testing.T) {
 
 func TestBlobCacheGetBlobIgnoreMissing(t *testing.T) {
 	cache := fixtureBlobCache()
-	cache.IgnoreMissingSubmodules = true
+	cache.FailOnMissingSubmodules = false
 	treeFrom, _ := test.Repository.TreeObject(plumbing.NewHash(
 		"80fe25955b8e725feee25c08ea5759d74f8b670d"))
 	entry := object.ChangeEntry{
-		Name: "commit",
+		Name: core.DependencyCommit,
 		Tree: treeFrom,
 		TreeEntry: object.TreeEntry{
-			Name: "commit",
+			Name: core.DependencyCommit,
 			Mode: 0160000,
 			Hash: plumbing.NewHash("ffffffffffffffffffffffffffffffffffffffff"),
 		},
@@ -353,7 +353,7 @@ func TestBlobCacheGetBlobIgnoreMissing(t *testing.T) {
 	assert.NotNil(t, blob)
 	assert.Nil(t, err)
 	assert.Equal(t, blob.Size, int64(0))
-	cache.IgnoreMissingSubmodules = false
+	cache.FailOnMissingSubmodules = true
 	getter = func(path string) (*object.File, error) {
 		assert.Equal(t, path, ".gitmodules")
 		commit, _ := test.Repository.CommitObject(plumbing.NewHash(
@@ -367,7 +367,7 @@ func TestBlobCacheGetBlobIgnoreMissing(t *testing.T) {
 
 func TestBlobCacheGetBlobGitModulesErrors(t *testing.T) {
 	cache := fixtureBlobCache()
-	cache.IgnoreMissingSubmodules = false
+	cache.FailOnMissingSubmodules = true
 	entry := object.ChangeEntry{
 		Name: "labours.py",
 		TreeEntry: object.TreeEntry{
@@ -402,3 +402,38 @@ func TestBlobCacheGetBlobGitModulesErrors(t *testing.T) {
 	assert.NotNil(t, err)
 	assert.NotEqual(t, err.Error(), plumbing.ErrObjectNotFound.Error())
 }
+
+func TestBlobCacheFork(t *testing.T) {
+	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
+		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
+	changes := make(object.Changes, 1)
+	treeTo, _ := test.Repository.TreeObject(plumbing.NewHash(
+		"251f2094d7b523d5bcc60e663b6cf38151bf8844"))
+	hash := plumbing.NewHash("db99e1890f581ad69e1527fe8302978c661eb473")
+	changes[0] = &object.Change{From: object.ChangeEntry{}, To: object.ChangeEntry{
+		Name: "pipeline.go",
+		Tree: treeTo,
+		TreeEntry: object.TreeEntry{
+			Name: "pipeline.go",
+			Mode: 0100644,
+			Hash: hash,
+		},
+	}}
+	deps := map[string]interface{}{}
+	deps[core.DependencyCommit] = commit
+	deps[DependencyTreeChanges] = changes
+	cache1 := fixtureBlobCache()
+	cache1.FailOnMissingSubmodules = true
+	cache1.Consume(deps)
+	clones := cache1.Fork(1)
+	assert.Len(t, clones, 1)
+	cache2 := clones[0].(*BlobCache)
+	assert.True(t, cache2.FailOnMissingSubmodules)
+	assert.Equal(t, cache1.repository, cache2.repository)
+	cache1.cache[plumbing.ZeroHash] = nil
+	assert.Len(t, cache1.cache, 2)
+	assert.Len(t, cache2.cache, 1)
+	assert.Equal(t, cache1.cache[hash].Size, cache2.cache[hash].Size)
+	// just for the sake of it
+	cache1.Merge([]core.PipelineItem{cache2})
+}

+ 20 - 4
internal/plumbing/day.go

@@ -12,6 +12,7 @@ import (
 // DaysSinceStart provides the relative date information for every commit.
 // It is a PipelineItem.
 type DaysSinceStart struct {
+	core.NoopMerger
 	day0        time.Time
 	previousDay int
 	commits     map[int][]plumbing.Hash
@@ -77,12 +78,12 @@ func (days *DaysSinceStart) Initialize(repository *git.Repository) {
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (days *DaysSinceStart) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
-	index := deps["index"].(int)
+	commit := deps[core.DependencyCommit].(*object.Commit)
+	index := deps[core.DependencyIndex].(int)
 	if index == 0 {
 		// first iteration - initialize the file objects from the tree
 		days.day0 = commit.Author.When
@@ -99,10 +100,25 @@ func (days *DaysSinceStart) Consume(deps map[string]interface{}) (map[string]int
 	if dayCommits == nil {
 		dayCommits = []plumbing.Hash{}
 	}
-	days.commits[day] = append(dayCommits, commit.Hash)
+	exists := false
+	if commit.NumParents() > 0 {
+		for i := range dayCommits {
+			if dayCommits[len(dayCommits)-i-1] == commit.Hash {
+				exists = true
+			}
+		}
+	}
+	if !exists {
+		days.commits[day] = append(dayCommits, commit.Hash)
+	}
 	return map[string]interface{}{DependencyDay: day}, nil
 }
 
+// Fork clones this PipelineItem.
+func (days *DaysSinceStart) Fork(n int) []core.PipelineItem {
+	return core.ForkCopyPipelineItem(days, n)
+}
+
 func init() {
 	core.Registry.Register(&DaysSinceStart{})
 }

+ 27 - 10
internal/plumbing/day_test.go

@@ -40,8 +40,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 	deps := map[string]interface{}{}
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"cce947b98a050c6d356bc6ba95030254914027b1"))
-	deps["commit"] = commit
-	deps["index"] = 0
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 0
 	res, err := dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 0)
@@ -52,8 +52,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"fc9ceecb6dabcb2aab60e8619d972e8d8208a7df"))
-	deps["commit"] = commit
-	deps["index"] = 10
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 10
 	res, err = dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 1)
@@ -61,8 +61,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"a3ee37f91f0d705ec9c41ae88426f0ae44b2fbc3"))
-	deps["commit"] = commit
-	deps["index"] = 20
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 20
 	res, err = dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 1)
@@ -70,8 +70,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"a8b665a65d7aced63f5ba2ff6d9b71dac227f8cf"))
-	deps["commit"] = commit
-	deps["index"] = 20
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 20
 	res, err = dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 2)
@@ -79,8 +79,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"186ff0d7e4983637bb3762a24d6d0a658e7f4712"))
-	deps["commit"] = commit
-	deps["index"] = 30
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 30
 	res, err = dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 2)
@@ -106,3 +106,20 @@ func TestDaysCommits(t *testing.T) {
 	assert.Len(t, dss.commits, 0)
 	assert.Equal(t, dss.commits, commits)
 }
+
+func TestDaysSinceStartFork(t *testing.T) {
+	dss1 := fixtureDaysSinceStart()
+	dss1.commits[0] = []plumbing.Hash{plumbing.NewHash(
+		"cce947b98a050c6d356bc6ba95030254914027b1")}
+	clones := dss1.Fork(1)
+	assert.Len(t, clones, 1)
+	dss2 := clones[0].(*DaysSinceStart)
+	assert.Equal(t, dss1.day0, dss2.day0)
+	assert.Equal(t, dss1.previousDay, dss2.previousDay)
+	assert.Equal(t, dss1.commits, dss2.commits)
+	dss1.commits[0] = append(dss1.commits[0], plumbing.ZeroHash)
+	assert.Len(t, dss2.commits[0], 2)
+	assert.True(t, dss1 != dss2)
+	// just for the sake of it
+	dss1.Merge([]core.PipelineItem{dss2})
+}

+ 7 - 1
internal/plumbing/diff.go

@@ -18,6 +18,7 @@ import (
 // FileDiff calculates the difference of files which were modified.
 // It is a PipelineItem.
 type FileDiff struct {
+	core.NoopMerger
 	CleanupDisabled bool
 }
 
@@ -84,7 +85,7 @@ func (diff *FileDiff) Initialize(repository *git.Repository) {}
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (diff *FileDiff) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -128,6 +129,11 @@ func (diff *FileDiff) Consume(deps map[string]interface{}) (map[string]interface
 	return map[string]interface{}{DependencyFileDiff: result}, nil
 }
 
+// Fork clones this PipelineItem.
+func (diff *FileDiff) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(diff, n)
+}
+
 // CountLines returns the number of lines in a *object.Blob.
 func CountLines(file *object.Blob) (int, error) {
 	if file == nil {

+ 9 - 0
internal/plumbing/diff_test.go

@@ -273,3 +273,12 @@ func TestFileDiffDarkMagic(t *testing.T) {
 	assert.Equal(t, magicDiffs.OldLinesOfCode, plainDiffs.OldLinesOfCode)
 	assert.Equal(t, magicDiffs.NewLinesOfCode, plainDiffs.NewLinesOfCode)
 }
+
+func TestFileDiffFork(t *testing.T) {
+	fd1 := fixtures.FileDiff()
+	clones := fd1.Fork(1)
+	assert.Len(t, clones, 1)
+	fd2 := clones[0].(*items.FileDiff)
+	assert.True(t, fd1 == fd2)
+	fd1.Merge([]core.PipelineItem{fd2})
+}

+ 35 - 29
internal/plumbing/identity/identity.go

@@ -15,7 +15,8 @@ import (
 // signatures, and we apply some heuristics to merge those together.
 // It is a PipelineItem.
 type Detector struct {
-	// PeopleDict maps email || name  -> developer id.
+	core.NoopMerger
+	// PeopleDict maps email || name  -> developer id
 	PeopleDict map[string]int
 	// ReversedPeopleDict maps developer id -> description
 	ReversedPeopleDict []string
@@ -49,14 +50,14 @@ const (
 )
 
 // Name of this PipelineItem. Uniquely identifies the type, used for mapping keys, etc.
-func (id *Detector) Name() string {
+func (detector *Detector) Name() string {
 	return "IdentityDetector"
 }
 
 // Provides returns the list of names of entities which are produced by this PipelineItem.
 // Each produced entity will be inserted into `deps` of dependent Consume()-s according
 // to this list. Also used by core.Registry to build the global map of providers.
-func (id *Detector) Provides() []string {
+func (detector *Detector) Provides() []string {
 	arr := [...]string{DependencyAuthor}
 	return arr[:]
 }
@@ -64,12 +65,12 @@ func (id *Detector) Provides() []string {
 // Requires returns the list of names of entities which are needed by this PipelineItem.
 // Each requested entity will be inserted into `deps` of Consume(). In turn, those
 // entities are Provides() upstream.
-func (id *Detector) Requires() []string {
+func (detector *Detector) Requires() []string {
 	return []string{}
 }
 
 // ListConfigurationOptions returns the list of changeable public properties of this PipelineItem.
-func (id *Detector) ListConfigurationOptions() []core.ConfigurationOption {
+func (detector *Detector) ListConfigurationOptions() []core.ConfigurationOption {
 	options := [...]core.ConfigurationOption{{
 		Name:        ConfigIdentityDetectorPeopleDictPath,
 		Description: "Path to the developers' email associations.",
@@ -81,48 +82,48 @@ func (id *Detector) ListConfigurationOptions() []core.ConfigurationOption {
 }
 
 // Configure sets the properties previously published by ListConfigurationOptions().
-func (id *Detector) Configure(facts map[string]interface{}) {
+func (detector *Detector) Configure(facts map[string]interface{}) {
 	if val, exists := facts[FactIdentityDetectorPeopleDict].(map[string]int); exists {
-		id.PeopleDict = val
+		detector.PeopleDict = val
 	}
 	if val, exists := facts[FactIdentityDetectorReversedPeopleDict].([]string); exists {
-		id.ReversedPeopleDict = val
+		detector.ReversedPeopleDict = val
 	}
-	if id.PeopleDict == nil || id.ReversedPeopleDict == nil {
+	if detector.PeopleDict == nil || detector.ReversedPeopleDict == nil {
 		peopleDictPath, _ := facts[ConfigIdentityDetectorPeopleDictPath].(string)
 		if peopleDictPath != "" {
-			id.LoadPeopleDict(peopleDictPath)
-			facts[FactIdentityDetectorPeopleCount] = len(id.ReversedPeopleDict) - 1
+			detector.LoadPeopleDict(peopleDictPath)
+			facts[FactIdentityDetectorPeopleCount] = len(detector.ReversedPeopleDict) - 1
 		} else {
 			if _, exists := facts[core.ConfigPipelineCommits]; !exists {
 				panic("IdentityDetector needs a list of commits to initialize.")
 			}
-			id.GeneratePeopleDict(facts[core.ConfigPipelineCommits].([]*object.Commit))
-			facts[FactIdentityDetectorPeopleCount] = len(id.ReversedPeopleDict)
+			detector.GeneratePeopleDict(facts[core.ConfigPipelineCommits].([]*object.Commit))
+			facts[FactIdentityDetectorPeopleCount] = len(detector.ReversedPeopleDict)
 		}
 	} else {
-		facts[FactIdentityDetectorPeopleCount] = len(id.ReversedPeopleDict)
+		facts[FactIdentityDetectorPeopleCount] = len(detector.ReversedPeopleDict)
 	}
-	facts[FactIdentityDetectorPeopleDict] = id.PeopleDict
-	facts[FactIdentityDetectorReversedPeopleDict] = id.ReversedPeopleDict
+	facts[FactIdentityDetectorPeopleDict] = detector.PeopleDict
+	facts[FactIdentityDetectorReversedPeopleDict] = detector.ReversedPeopleDict
 }
 
 // Initialize resets the temporary caches and prepares this PipelineItem for a series of Consume()
 // calls. The repository which is going to be analysed is supplied as an argument.
-func (id *Detector) Initialize(repository *git.Repository) {
+func (detector *Detector) Initialize(repository *git.Repository) {
 }
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
-func (id *Detector) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
+func (detector *Detector) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	signature := commit.Author
-	authorID, exists := id.PeopleDict[strings.ToLower(signature.Email)]
+	authorID, exists := detector.PeopleDict[strings.ToLower(signature.Email)]
 	if !exists {
-		authorID, exists = id.PeopleDict[strings.ToLower(signature.Name)]
+		authorID, exists = detector.PeopleDict[strings.ToLower(signature.Name)]
 		if !exists {
 			authorID = AuthorMissing
 		}
@@ -130,10 +131,15 @@ func (id *Detector) Consume(deps map[string]interface{}) (map[string]interface{}
 	return map[string]interface{}{DependencyAuthor: authorID}, nil
 }
 
+// Fork clones this PipelineItem.
+func (detector *Detector) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(detector, n)
+}
+
 // LoadPeopleDict loads author signatures from a text file.
 // The format is one signature per line, and the signature consists of several
 // keys separated by "|". The first key is the main one and used to reference all the rest.
-func (id *Detector) LoadPeopleDict(path string) error {
+func (detector *Detector) LoadPeopleDict(path string) error {
 	file, err := os.Open(path)
 	if err != nil {
 		return err
@@ -152,13 +158,13 @@ func (id *Detector) LoadPeopleDict(path string) error {
 		size++
 	}
 	reverseDict = append(reverseDict, AuthorMissingName)
-	id.PeopleDict = dict
-	id.ReversedPeopleDict = reverseDict
+	detector.PeopleDict = dict
+	detector.ReversedPeopleDict = reverseDict
 	return nil
 }
 
 // GeneratePeopleDict loads author signatures from the specified list of Git commits.
-func (id *Detector) GeneratePeopleDict(commits []*object.Commit) {
+func (detector *Detector) GeneratePeopleDict(commits []*object.Commit) {
 	dict := map[string]int{}
 	emails := map[int][]string{}
 	names := map[int][]string{}
@@ -249,12 +255,12 @@ func (id *Detector) GeneratePeopleDict(commits []*object.Commit) {
 		sort.Strings(emails[val])
 		reverseDict[val] = strings.Join(names[val], "|") + "|" + strings.Join(emails[val], "|")
 	}
-	id.PeopleDict = dict
-	id.ReversedPeopleDict = reverseDict
+	detector.PeopleDict = dict
+	detector.ReversedPeopleDict = reverseDict
 }
 
 // MergeReversedDicts joins two identity lists together, excluding duplicates, in-order.
-func (id Detector) MergeReversedDicts(rd1, rd2 []string) (map[string][3]int, []string) {
+func (detector Detector) MergeReversedDicts(rd1, rd2 []string) (map[string][3]int, []string) {
 	people := map[string][3]int{}
 	for i, pid := range rd1 {
 		ptrs := people[pid]

+ 11 - 2
internal/plumbing/identity/identity_test.go

@@ -133,13 +133,13 @@ func TestIdentityDetectorConsume(t *testing.T) {
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"5c0e755dd85ac74584d9988cc361eccf02ce1a48"))
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	res, err := fixtureIdentityDetector().Consume(deps)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyAuthor].(int), 0)
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"8a03b5620b1caa72ec9cb847ea88332621e2950a"))
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	res, err = fixtureIdentityDetector().Consume(deps)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyAuthor].(int), AuthorMissing)
@@ -392,3 +392,12 @@ func TestIdentityDetectorMergeReversedDicts(t *testing.T) {
 	vm = [...]string{"two", "one", "three"}
 	assert.Equal(t, merged, vm[:])
 }
+
+func TestIdentityDetectorFork(t *testing.T) {
+	id1 := fixtureIdentityDetector()
+	clones := id1.Fork(1)
+	assert.Len(t, clones, 1)
+	id2 := clones[0].(*Detector)
+	assert.True(t, id1 == id2)
+	id1.Merge([]core.PipelineItem{id2})
+}

+ 7 - 1
internal/plumbing/renames.go

@@ -18,6 +18,7 @@ import (
 // paths which are likely to be the result of a rename with subsequent edits.
 // RenameAnalysis is a PipelineItem.
 type RenameAnalysis struct {
+	core.NoopMerger
 	// SimilarityThreshold adjusts the heuristic to determine file renames.
 	// It has the same units as cgit's -X rename-threshold or -M. Better to
 	// set it to the default value of 90 (90%).
@@ -89,7 +90,7 @@ func (ra *RenameAnalysis) Initialize(repository *git.Repository) {
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (ra *RenameAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -203,6 +204,11 @@ func (ra *RenameAnalysis) Consume(deps map[string]interface{}) (map[string]inter
 	return map[string]interface{}{DependencyTreeChanges: reducedChanges}, nil
 }
 
+// Fork clones this PipelineItem.
+func (ra *RenameAnalysis) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(ra, n)
+}
+
 func (ra *RenameAnalysis) sizesAreClose(size1 int64, size2 int64) bool {
 	return internal.Abs64(size1-size2)*100/internal.Max64(1, internal.Min64(size1, size2)) <=
 		int64(100-ra.SimilarityThreshold)

+ 9 - 0
internal/plumbing/renames_test.go

@@ -164,3 +164,12 @@ func TestSortableBlobs(t *testing.T) {
 	assert.Equal(t, blobs[0].size, int64(1))
 	assert.Equal(t, blobs[1].size, int64(0))
 }
+
+func TestRenameAnalysisFork(t *testing.T) {
+	ra1 := fixtureRenameAnalysis()
+	clones := ra1.Fork(1)
+	assert.Len(t, clones, 1)
+	ra2 := clones[0].(*RenameAnalysis)
+	assert.True(t, ra1 == ra2)
+	ra1.Merge([]core.PipelineItem{ra2})
+}

+ 8 - 2
internal/plumbing/tree_diff.go

@@ -14,6 +14,7 @@ import (
 // If "after" is nil, the change is a removal. Otherwise, it is a modification.
 // TreeDiff is a PipelineItem.
 type TreeDiff struct {
+	core.NoopMerger
 	SkipDirs     []string
 	previousTree *object.Tree
 }
@@ -83,11 +84,11 @@ func (treediff *TreeDiff) Initialize(repository *git.Repository) {
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (treediff *TreeDiff) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	tree, err := commit.Tree()
 	if err != nil {
 		return nil, err
@@ -141,6 +142,11 @@ func (treediff *TreeDiff) Consume(deps map[string]interface{}) (map[string]inter
 	return map[string]interface{}{DependencyTreeChanges: diff}, nil
 }
 
+// Fork clones this PipelineItem.
+func (treediff *TreeDiff) Fork(n int) []core.PipelineItem {
+	return core.ForkCopyPipelineItem(treediff, n)
+}
+
 func init() {
 	core.Registry.Register(&TreeDiff{})
 }

+ 16 - 4
internal/plumbing/tree_diff_test.go

@@ -46,7 +46,7 @@ func TestTreeDiffConsume(t *testing.T) {
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	prevCommit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"fbe766ffdc3f87f6affddc051c6f8b419beea6a2"))
 	td.previousTree, _ = prevCommit.Tree()
@@ -87,7 +87,7 @@ func TestTreeDiffConsumeFirst(t *testing.T) {
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	res, err := td.Consume(deps)
 	assert.Nil(t, err)
 	assert.Equal(t, len(res), 1)
@@ -106,7 +106,7 @@ func TestTreeDiffBadCommit(t *testing.T) {
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 	commit.TreeHash = plumbing.NewHash("0000000000000000000000000000000000000000")
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	res, err := td.Consume(deps)
 	assert.Nil(t, res)
 	assert.NotNil(t, err)
@@ -118,7 +118,7 @@ func TestTreeDiffConsumeSkip(t *testing.T) {
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"aefdedf7cafa6ee110bae9a3910bf5088fdeb5a9"))
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	prevCommit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"1e076dc56989bc6aa1ef5f55901696e9e01423d4"))
 	td.previousTree, _ = prevCommit.Tree()
@@ -141,3 +141,15 @@ func TestTreeDiffConsumeSkip(t *testing.T) {
 	changes = res[DependencyTreeChanges].(object.Changes)
 	assert.Equal(t, 31, len(changes))
 }
+
+func TestTreeDiffFork(t *testing.T) {
+	td1 := fixtureTreeDiff()
+	td1.SkipDirs = append(td1.SkipDirs, "skip")
+	clones := td1.Fork(1)
+	assert.Len(t, clones, 1)
+	td2 := clones[0].(*TreeDiff)
+	assert.False(t, td1 == td2)
+	assert.Equal(t, td1.SkipDirs, td2.SkipDirs)
+	assert.Equal(t, td1.previousTree, td2.previousTree)
+	td1.Merge([]core.PipelineItem{td2})
+}

+ 7 - 1
internal/plumbing/uast/diff_refiner.go

@@ -15,6 +15,7 @@ import (
 // The idea behind this algorithm is simple: in case of multiple choices which are equally
 // optimal, choose the one which touches less AST nodes.
 type FileDiffRefiner struct {
+	core.NoopMerger
 }
 
 // Name of this PipelineItem. Uniquely identifies the type, used for mapping keys, etc.
@@ -59,7 +60,7 @@ func (ref *FileDiffRefiner) Initialize(repository *git.Repository) {
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (ref *FileDiffRefiner) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -161,6 +162,11 @@ func (ref *FileDiffRefiner) Consume(deps map[string]interface{}) (map[string]int
 	return map[string]interface{}{plumbing.DependencyFileDiff: result}, nil
 }
 
+// Fork clones this PipelineItem.
+func (ref *FileDiffRefiner) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(ref, n)
+}
+
 // VisitEachNode is a handy routine to execute a callback on every node in the subtree,
 // including the root itself. Depth first tree traversal.
 func VisitEachNode(root *uast.Node, payload func(*uast.Node)) {

+ 9 - 0
internal/plumbing/uast/diff_refiner_test.go

@@ -153,3 +153,12 @@ func TestFileDiffRefinerConsumeNoUast(t *testing.T) {
 	assert.Len(t, result, 1)
 	assert.Equal(t, fileDiffs[fileName], result[fileName])
 }
+
+func TestFileDiffRefinerFork(t *testing.T) {
+	fd1 := fixtureFileDiffRefiner()
+	clones := fd1.Fork(1)
+	assert.Len(t, clones, 1)
+	fd2 := clones[0].(*FileDiffRefiner)
+	assert.True(t, fd1 == fd2)
+	fd1.Merge([]core.PipelineItem{fd2})
+}

+ 36 - 3
internal/plumbing/uast/uast.go

@@ -33,6 +33,7 @@ import (
 // Extractor retrieves UASTs from Babelfish server which correspond to changed files in a commit.
 // It is a PipelineItem.
 type Extractor struct {
+	core.NoopMerger
 	Endpoint       string
 	Context        func() (context.Context, context.CancelFunc)
 	PoolSize       int
@@ -216,7 +217,7 @@ func (exr *Extractor) Initialize(repository *git.Repository) {
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (exr *Extractor) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -287,6 +288,11 @@ func (exr *Extractor) Consume(deps map[string]interface{}) (map[string]interface
 	return map[string]interface{}{DependencyUasts: uasts}, nil
 }
 
+// Fork clones this PipelineItem.
+func (exr *Extractor) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(exr, n)
+}
+
 func (exr *Extractor) extractUAST(
 	client *bblfsh.Client, file *object.File) (*uast.Node, error) {
 	request := client.NewParseRequest()
@@ -347,6 +353,7 @@ const (
 // Changes is a structured analog of TreeDiff: it provides UASTs for every logical change
 // in a commit. It is a PipelineItem.
 type Changes struct {
+	core.NoopMerger
 	cache map[plumbing.Hash]*uast.Node
 }
 
@@ -393,7 +400,7 @@ func (uc *Changes) Initialize(repository *git.Repository) {
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (uc *Changes) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -427,9 +434,26 @@ func (uc *Changes) Consume(deps map[string]interface{}) (map[string]interface{},
 	return map[string]interface{}{DependencyUastChanges: commit}, nil
 }
 
+// Fork clones this PipelineItem.
+func (uc *Changes) Fork(n int) []core.PipelineItem {
+	ucs := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		clone := &Changes{
+			cache: map[plumbing.Hash]*uast.Node{},
+		}
+		for key, val := range uc.cache {
+			clone.cache[key] = val
+		}
+		ucs[i] = clone
+	}
+	return ucs
+}
+
 // ChangesSaver dumps changed files and corresponding UASTs for every commit.
 // it is a LeafPipelineItem.
 type ChangesSaver struct {
+	core.NoopMerger
+	core.OneShotMergeProcessor
 	// OutputPath points to the target directory with UASTs
 	OutputPath string
 
@@ -498,14 +522,18 @@ func (saver *ChangesSaver) Configure(facts map[string]interface{}) {
 func (saver *ChangesSaver) Initialize(repository *git.Repository) {
 	saver.repository = repository
 	saver.result = [][]Change{}
+	saver.OneShotMergeProcessor.Initialize()
 }
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (saver *ChangesSaver) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
+	if !saver.ShouldConsumeCommit(deps) {
+		return nil, nil
+	}
 	changes := deps[DependencyUastChanges].([]Change)
 	saver.result = append(saver.result, changes)
 	return nil, nil
@@ -516,6 +544,11 @@ func (saver *ChangesSaver) Finalize() interface{} {
 	return saver.result
 }
 
+// Fork clones this PipelineItem.
+func (saver *ChangesSaver) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(saver, n)
+}
+
 // Serialize converts the analysis result as returned by Finalize() to text or bytes.
 // The text format is YAML and the bytes format is Protocol Buffers.
 func (saver *ChangesSaver) Serialize(result interface{}, binary bool, writer io.Writer) error {

+ 78 - 0
internal/plumbing/uast/uast_test.go

@@ -125,6 +125,8 @@ func TestUASTExtractorConsume(t *testing.T) {
 	deps := map[string]interface{}{}
 	deps[items.DependencyBlobCache] = cache
 	deps[items.DependencyTreeChanges] = changes
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(
+		plumbing.NewHash("2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 	res, err := exr.Consume(deps)
 	// Language not enabled
 	assert.Len(t, res[DependencyUasts], 0)
@@ -154,6 +156,15 @@ func TestUASTExtractorConsume(t *testing.T) {
 	assert.Equal(t, len(uasts[hash].Children), 24)
 }
 
+func TestUASTExtractorFork(t *testing.T) {
+	exr1 := fixtureUASTExtractor()
+	clones := exr1.Fork(1)
+	assert.Len(t, clones, 1)
+	exr2 := clones[0].(*Extractor)
+	assert.True(t, exr1 == exr2)
+	exr1.Merge([]core.PipelineItem{exr2})
+}
+
 func fixtureUASTChanges() *Changes {
 	ch := Changes{}
 	ch.Configure(nil)
@@ -271,6 +282,19 @@ func TestUASTChangesConsume(t *testing.T) {
 	assert.Nil(t, result[2].After)
 }
 
+func TestUASTChangesFork(t *testing.T) {
+	changes1 := fixtureUASTChanges()
+	changes1.cache[plumbing.ZeroHash] = nil
+	clones := changes1.Fork(1)
+	assert.Len(t, clones, 1)
+	changes2 := clones[0].(*Changes)
+	assert.False(t, changes1 == changes2)
+	assert.Equal(t, changes1.cache, changes2.cache)
+	delete(changes1.cache, plumbing.ZeroHash)
+	assert.Len(t, changes2.cache, 1)
+	changes1.Merge([]core.PipelineItem{changes2})
+}
+
 func fixtureUASTChangesSaver() *ChangesSaver {
 	ch := ChangesSaver{}
 	ch.Initialize(test.Repository)
@@ -322,6 +346,8 @@ func TestUASTChangesSaverPayload(t *testing.T) {
 	deps := map[string]interface{}{}
 	changes := make([]Change, 1)
 	deps[DependencyUastChanges] = changes
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(
+		plumbing.NewHash("2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 	treeFrom, _ := test.Repository.TreeObject(plumbing.NewHash(
 		"a1eb2ea76eb7f9bfbde9b243861474421000eb96"))
 	treeTo, _ := test.Repository.TreeObject(plumbing.NewHash(
@@ -388,3 +414,55 @@ func TestUASTChangesSaverPayload(t *testing.T) {
 `, tmpdir, tmpdir, tmpdir, tmpdir))
 	checkFiles()
 }
+
+func TestUASTChangesSaverConsumeMerge(t *testing.T) {
+	chs := fixtureUASTChangesSaver()
+	deps := map[string]interface{}{}
+	changes := make([]Change, 1)
+	deps[DependencyUastChanges] = changes
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(
+		plumbing.NewHash("2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
+	treeFrom, _ := test.Repository.TreeObject(plumbing.NewHash(
+		"a1eb2ea76eb7f9bfbde9b243861474421000eb96"))
+	treeTo, _ := test.Repository.TreeObject(plumbing.NewHash(
+		"994eac1cd07235bb9815e547a75c84265dea00f5"))
+	changes[0] = Change{Before: &uast.Node{}, After: &uast.Node{},
+		Change: &object.Change{From: object.ChangeEntry{
+			Name: "analyser.go",
+			Tree: treeFrom,
+			TreeEntry: object.TreeEntry{
+				Name: "analyser.go",
+				Mode: 0100644,
+				Hash: plumbing.NewHash("dc248ba2b22048cc730c571a748e8ffcf7085ab9"),
+			},
+		}, To: object.ChangeEntry{
+			Name: "analyser.go",
+			Tree: treeTo,
+			TreeEntry: object.TreeEntry{
+				Name: "analyser.go",
+				Mode: 0100644,
+				Hash: plumbing.NewHash("334cde09da4afcb74f8d2b3e6fd6cce61228b485"),
+			},
+		}}}
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(
+		plumbing.NewHash("cce947b98a050c6d356bc6ba95030254914027b1"))
+	chs.Consume(deps)
+	assert.Len(t, chs.result, 1)
+	chs.Consume(deps)
+	assert.Len(t, chs.result, 2)
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(
+		plumbing.NewHash("dd9dd084d5851d7dc4399fc7dbf3d8292831ebc5"))
+	chs.Consume(deps)
+	assert.Len(t, chs.result, 3)
+	chs.Consume(deps)
+	assert.Len(t, chs.result, 3)
+}
+
+func TestUASTChangesSaverFork(t *testing.T) {
+	saver1 := fixtureUASTChangesSaver()
+	clones := saver1.Fork(1)
+	assert.Len(t, clones, 1)
+	saver2 := clones[0].(*ChangesSaver)
+	assert.True(t, saver1 == saver2)
+	saver1.Merge([]core.PipelineItem{saver2})
+}

+ 27 - 1
internal/test/repository.go

@@ -8,6 +8,8 @@ import (
 	"gopkg.in/src-d/go-git.v4/plumbing"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
 	"gopkg.in/src-d/go-git.v4/storage/memory"
+	"path"
+	"io/ioutil"
 )
 
 // Repository is a boilerplate sample repository (Hercules itself).
@@ -28,6 +30,27 @@ func FakeChangeForName(name string, hashFrom string, hashTo string) *object.Chan
 func init() {
 	cwd, err := os.Getwd()
 	if err == nil {
+		for true {
+			files, err := ioutil.ReadDir(cwd)
+			if err != nil {
+				break
+			}
+			found := false
+			for _, f := range files {
+				if f.Name() == "README.md" {
+					found = true
+					break
+				}
+			}
+			if found {
+				break
+			}
+			oldCwd := cwd
+			cwd = path.Dir(cwd)
+			if oldCwd == cwd {
+				break
+			}
+		}
 		Repository, err = git.PlainOpen(cwd)
 		if err == nil {
 			iter, err := Repository.CommitObjects()
@@ -45,7 +68,10 @@ func init() {
 			}
 		}
 	}
-	Repository, _ = git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
+	Repository, err = git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
 		URL: "https://github.com/src-d/hercules",
 	})
+	if err != nil {
+		panic(err)
+	}
 }

+ 63 - 18
leaves/burndown.go

@@ -120,7 +120,7 @@ const (
 	DefaultBurndownGranularity = 30
 	// authorSelf is the internal author index which is used in BurndownAnalysis.Finalize() to
 	// format the author overwrites matrix.
-	authorSelf = (1 << 18) - 2
+	authorSelf = (1 << (32 - burndown.TreeMaxBinPower)) - 2
 )
 
 // Name of this PipelineItem. Uniquely identifies the type, used for mapping keys, etc.
@@ -238,21 +238,20 @@ func (analyser *BurndownAnalysis) Initialize(repository *git.Repository) {
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (analyser *BurndownAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	sampling := analyser.Sampling
-	if sampling == 0 {
-		sampling = 1
-	}
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	author := deps[identity.DependencyAuthor].(int)
-	analyser.day = deps[items.DependencyDay].(int)
-	delta := (analyser.day / sampling) - (analyser.previousDay / sampling)
-	if delta > 0 {
-		analyser.previousDay = analyser.day
-		gs, fss, pss := analyser.groupStatus()
-		analyser.updateHistories(gs, fss, pss, delta)
+	day := deps[items.DependencyDay].(int)
+	if len(commit.ParentHashes) <= 1 {
+		analyser.day = day
+		analyser.onNewDay()
+	} else {
+		// effectively disables the status updates if the commit is a merge
+		// we will analyse the conflicts resolution in Merge()
+		analyser.day = burndown.TreeMergeMark
 	}
 	cache := deps[items.DependencyBlobCache].(map[plumbing.Hash]*object.Blob)
 	treeDiffs := deps[items.DependencyTreeChanges].(object.Changes)
@@ -272,13 +271,47 @@ func (analyser *BurndownAnalysis) Consume(deps map[string]interface{}) (map[stri
 			return nil, err
 		}
 	}
+	// in case there is a merge analyser.day equals to TreeMergeMark
+	analyser.day = day
 	return nil, nil
 }
 
+// Fork clones this item. Everything is copied by reference except the files
+// which are copied by value.
+func (analyser *BurndownAnalysis) Fork(n int) []core.PipelineItem {
+	result := make([]core.PipelineItem, n)
+	for i := range result {
+		clone := *analyser
+		clone.files = map[string]*burndown.File{}
+		for key, file := range analyser.files {
+			clone.files[key] = file.Clone(false)
+		}
+		result[i] = &clone
+	}
+	return result
+}
+
+// Merge combines several items together. We apply the special file merging logic here.
+func (analyser *BurndownAnalysis) Merge(branches []core.PipelineItem) {
+	for key, file := range analyser.files {
+		others := make([]*burndown.File, len(branches))
+		for i, branch := range branches {
+			others[i] = branch.(*BurndownAnalysis).files[key]
+		}
+		// don't worry, we compare the hashes first before heavy-lifting
+		if file.Merge(analyser.day, others...) {
+			for _, branch := range branches {
+				branch.(*BurndownAnalysis).files[key] = file.Clone(false)
+			}
+		}
+	}
+	analyser.onNewDay()
+}
+
 // Finalize returns the result of the analysis. Further Consume() calls are not expected.
 func (analyser *BurndownAnalysis) Finalize() interface{} {
 	gs, fss, pss := analyser.groupStatus()
-	analyser.updateHistories(gs, fss, pss, 1)
+	analyser.updateHistories(1, gs, fss, pss)
 	for key, statuses := range analyser.fileHistories {
 		if len(statuses) == len(analyser.globalHistory) {
 			continue
@@ -799,9 +832,10 @@ func (analyser *BurndownAnalysis) packPersonWithDay(person int, day int) int {
 	if analyser.PeopleNumber == 0 {
 		return day
 	}
-	result := day
-	result |= person << 14
-	// This effectively means max 16384 days (>44 years) and (131072 - 2) devs
+	result := day & burndown.TreeMergeMark
+	result |= person << burndown.TreeMaxBinPower
+	// This effectively means max (16383 - 1) days (>44 years) and (131072 - 2) devs.
+	// One day less because burndown.TreeMergeMark = ((1 << 14) - 1) is a special day.
 	return result
 }
 
@@ -809,7 +843,18 @@ func (analyser *BurndownAnalysis) unpackPersonWithDay(value int) (int, int) {
 	if analyser.PeopleNumber == 0 {
 		return identity.AuthorMissing, value
 	}
-	return value >> 14, value & 0x3FFF
+	return value >> burndown.TreeMaxBinPower, value & burndown.TreeMergeMark
+}
+
+func (analyser *BurndownAnalysis) onNewDay() {
+	day := analyser.day
+	sampling := analyser.Sampling
+	delta := (day / sampling) - (analyser.previousDay / sampling)
+	if delta > 0 {
+		analyser.previousDay = day
+		gs, fss, pss := analyser.groupStatus()
+		analyser.updateHistories(delta, gs, fss, pss)
+	}
 }
 
 func (analyser *BurndownAnalysis) updateStatus(
@@ -1092,7 +1137,7 @@ func (analyser *BurndownAnalysis) groupStatus() ([]int64, map[string][]int64, []
 }
 
 func (analyser *BurndownAnalysis) updateHistories(
-	globalStatus []int64, fileStatuses map[string][]int64, peopleStatuses [][]int64, delta int) {
+	delta int, globalStatus []int64, fileStatuses map[string][]int64, peopleStatuses [][]int64) {
 	for i := 0; i < delta; i++ {
 		analyser.globalHistory = append(analyser.globalHistory, globalStatus)
 	}

+ 4 - 0
leaves/burndown_test.go

@@ -178,6 +178,8 @@ func TestBurndownConsumeFinalize(t *testing.T) {
 	result, err := fd.Consume(deps)
 	assert.Nil(t, err)
 	deps[items.DependencyFileDiff] = result[items.DependencyFileDiff]
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(plumbing.NewHash(
+		"cce947b98a050c6d356bc6ba95030254914027b1"))
 	result, err = burndown.Consume(deps)
 	assert.Nil(t, result)
 	assert.Nil(t, err)
@@ -394,6 +396,8 @@ func TestBurndownSerialize(t *testing.T) {
 	},
 	}
 	deps[items.DependencyTreeChanges] = changes
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(plumbing.NewHash(
+		"cce947b98a050c6d356bc6ba95030254914027b1"))
 	fd := fixtures.FileDiff()
 	result, _ := fd.Consume(deps)
 	deps[items.DependencyFileDiff] = result[items.DependencyFileDiff]

+ 12 - 1
leaves/comment_sentiment.go

@@ -25,6 +25,8 @@ import (
 
 // CommentSentimentAnalysis measures comment sentiment through time.
 type CommentSentimentAnalysis struct {
+	core.NoopMerger
+	core.OneShotMergeProcessor
 	MinCommentLength int
 	Gap              float32
 
@@ -141,14 +143,18 @@ func (sent *CommentSentimentAnalysis) Initialize(repository *git.Repository) {
 	sent.commentsByDay = map[int][]string{}
 	sent.xpather = &uast_items.ChangesXPather{XPath: "//*[@roleComment]"}
 	sent.validate()
+	sent.OneShotMergeProcessor.Initialize()
 }
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (sent *CommentSentimentAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
+	if !sent.ShouldConsumeCommit(deps) {
+		return nil, nil
+	}
 	changes := deps[uast_items.DependencyUastChanges].([]uast_items.Change)
 	day := deps[items.DependencyDay].(int)
 	commentNodes := sent.xpather.Extract(changes)
@@ -225,6 +231,11 @@ func (sent *CommentSentimentAnalysis) Finalize() interface{} {
 	return result
 }
 
+// Fork clones this PipelineItem.
+func (sent *CommentSentimentAnalysis) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(sent, n)
+}
+
 // Serialize converts the analysis result as returned by Finalize() to text or bytes.
 // The text format is YAML and the bytes format is Protocol Buffers.
 func (sent *CommentSentimentAnalysis) Serialize(result interface{}, binary bool, writer io.Writer) error {

+ 9 - 0
leaves/comment_sentiment_test.go

@@ -87,6 +87,15 @@ func TestCommentSentimentRegistration(t *testing.T) {
 	assert.True(t, matched)
 }
 
+func TestCommentSentimentFork(t *testing.T) {
+	sent1 := fixtureCommentSentiment()
+	clones := sent1.Fork(1)
+	assert.Len(t, clones, 1)
+	sent2 := clones[0].(*CommentSentimentAnalysis)
+	assert.True(t, sent1 == sent2)
+	sent1.Merge([]core.PipelineItem{sent2})
+}
+
 func TestCommentSentimentSerializeText(t *testing.T) {
 	sent := fixtureCommentSentiment()
 	result := CommentSentimentResult{

+ 12 - 1
leaves/couples.go

@@ -20,6 +20,8 @@ import (
 // The results are matrices, where cell at row X and column Y is the number of commits which
 // changed X and Y together. In case with people, the numbers are summed for every common file.
 type CouplesAnalysis struct {
+	core.NoopMerger
+	core.OneShotMergeProcessor
 	// PeopleNumber is the number of developers for which to build the matrix. 0 disables this analysis.
 	PeopleNumber int
 
@@ -92,14 +94,18 @@ func (couples *CouplesAnalysis) Initialize(repository *git.Repository) {
 	}
 	couples.peopleCommits = make([]int, couples.PeopleNumber+1)
 	couples.files = map[string]map[string]int{}
+	couples.OneShotMergeProcessor.Initialize()
 }
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (couples *CouplesAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
+	if !couples.ShouldConsumeCommit(deps) {
+		return nil, nil
+	}
 	author := deps[identity.DependencyAuthor].(int)
 	if author == identity.AuthorMissing {
 		author = couples.PeopleNumber
@@ -217,6 +223,11 @@ func (couples *CouplesAnalysis) Finalize() interface{} {
 	}
 }
 
+// Fork clones this pipeline item.
+func (couples *CouplesAnalysis) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(couples, n)
+}
+
 // Serialize converts the analysis result as returned by Finalize() to text or bytes.
 // The text format is YAML and the bytes format is Protocol Buffers.
 func (couples *CouplesAnalysis) Serialize(result interface{}, binary bool, writer io.Writer) error {

+ 14 - 0
leaves/couples_test.go

@@ -10,6 +10,7 @@ import (
 	"github.com/gogo/protobuf/proto"
 	"github.com/stretchr/testify/assert"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
+	gitplumbing "gopkg.in/src-d/go-git.v4/plumbing"
 	"gopkg.in/src-d/hercules.v4/internal/core"
 	"gopkg.in/src-d/hercules.v4/internal/pb"
 	"gopkg.in/src-d/hercules.v4/internal/plumbing"
@@ -89,6 +90,8 @@ func TestCouplesConsumeFinalize(t *testing.T) {
 	c := fixtureCouples()
 	deps := map[string]interface{}{}
 	deps[identity.DependencyAuthor] = 0
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(gitplumbing.NewHash(
+		"cce947b98a050c6d356bc6ba95030254914027b1"))
 	deps[plumbing.DependencyTreeChanges] = generateChanges("+two", "+four", "+six")
 	c.Consume(deps)
 	deps[plumbing.DependencyTreeChanges] = generateChanges("+one", "-two", "=three", ">four>five")
@@ -168,6 +171,15 @@ func TestCouplesConsumeFinalize(t *testing.T) {
 	assert.Equal(t, cr.FilesMatrix[2][2], int64(2))
 }
 
+func TestCouplesFork(t *testing.T) {
+	couples1 := fixtureCouples()
+	clones := couples1.Fork(1)
+	assert.Len(t, clones, 1)
+	couples2 := clones[0].(*CouplesAnalysis)
+	assert.True(t, couples1 == couples2)
+	couples1.Merge([]core.PipelineItem{couples2})
+}
+
 func TestCouplesSerialize(t *testing.T) {
 	c := fixtureCouples()
 	c.PeopleNumber = 1
@@ -182,6 +194,8 @@ func TestCouplesSerialize(t *testing.T) {
 	deps := map[string]interface{}{}
 	deps[identity.DependencyAuthor] = 0
 	deps[plumbing.DependencyTreeChanges] = generateChanges("+two", "+four", "+six")
+	deps[core.DependencyCommit], _ = test.Repository.CommitObject(gitplumbing.NewHash(
+		"cce947b98a050c6d356bc6ba95030254914027b1"))
 	c.Consume(deps)
 	deps[plumbing.DependencyTreeChanges] = generateChanges("+one", "-two", "=three", ">four>five")
 	c.Consume(deps)

+ 13 - 2
leaves/file_history.go

@@ -19,6 +19,8 @@ import (
 // FileHistory contains the intermediate state which is mutated by Consume(). It should implement
 // LeafPipelineItem.
 type FileHistory struct {
+	core.NoopMerger
+	core.OneShotMergeProcessor
 	files map[string][]plumbing.Hash
 }
 
@@ -65,15 +67,19 @@ func (history *FileHistory) Configure(facts map[string]interface{}) {
 // calls. The repository which is going to be analysed is supplied as an argument.
 func (history *FileHistory) Initialize(repository *git.Repository) {
 	history.files = map[string][]plumbing.Hash{}
+	history.OneShotMergeProcessor.Initialize()
 }
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (history *FileHistory) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit).Hash
+	if !history.ShouldConsumeCommit(deps) {
+		return nil, nil
+	}
+	commit := deps[core.DependencyCommit].(*object.Commit).Hash
 	changes := deps[items.DependencyTreeChanges].(object.Changes)
 	for _, change := range changes {
 		action, _ := change.Action()
@@ -101,6 +107,11 @@ func (history *FileHistory) Finalize() interface{} {
 	return FileHistoryResult{Files: history.files}
 }
 
+// Fork clones this PipelineItem.
+func (history *FileHistory) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(history, n)
+}
+
 // Serialize converts the analysis result as returned by Finalize() to text or bytes.
 // The text format is YAML and the bytes format is Protocol Buffers.
 func (history *FileHistory) Serialize(result interface{}, binary bool, writer io.Writer) error {

+ 12 - 3
leaves/file_history_test.go

@@ -93,7 +93,7 @@ func TestFileHistoryConsume(t *testing.T) {
 	deps[items.DependencyTreeChanges] = changes
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	fh.files["cmd/hercules/main.go"] = []plumbing.Hash{plumbing.NewHash(
 		"0000000000000000000000000000000000000000")}
 	fh.files["analyser.go"] = []plumbing.Hash{plumbing.NewHash(
@@ -113,6 +113,15 @@ func TestFileHistoryConsume(t *testing.T) {
 	assert.Equal(t, fh.files, res.Files)
 }
 
+func TestFileHistoryFork(t *testing.T) {
+	fh1 := fixtureFileHistory()
+	clones := fh1.Fork(1)
+	assert.Len(t, clones, 1)
+	fh2 := clones[0].(*FileHistory)
+	assert.True(t, fh1 == fh2)
+	fh1.Merge([]core.PipelineItem{fh2})
+}
+
 func TestFileHistorySerializeText(t *testing.T) {
 	fh := fixtureFileHistory()
 	deps := map[string]interface{}{}
@@ -132,7 +141,7 @@ func TestFileHistorySerializeText(t *testing.T) {
 	deps[items.DependencyTreeChanges] = changes
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	fh.Consume(deps)
 	res := fh.Finalize().(FileHistoryResult)
 	buffer := &bytes.Buffer{}
@@ -159,7 +168,7 @@ func TestFileHistorySerializeBinary(t *testing.T) {
 	deps[items.DependencyTreeChanges] = changes
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	fh.Consume(deps)
 	res := fh.Finalize().(FileHistoryResult)
 	buffer := &bytes.Buffer{}

+ 13 - 2
leaves/shotness.go

@@ -22,6 +22,8 @@ import (
 // ShotnessAnalysis contains the intermediate state which is mutated by Consume(). It should implement
 // LeafPipelineItem.
 type ShotnessAnalysis struct {
+	core.NoopMerger
+	core.OneShotMergeProcessor
 	XpathStruct string
 	XpathName   string
 
@@ -138,15 +140,19 @@ func (shotness *ShotnessAnalysis) Configure(facts map[string]interface{}) {
 func (shotness *ShotnessAnalysis) Initialize(repository *git.Repository) {
 	shotness.nodes = map[string]*nodeShotness{}
 	shotness.files = map[string]map[string]*nodeShotness{}
+	shotness.OneShotMergeProcessor.Initialize()
 }
 
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 func (shotness *ShotnessAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
+	if !shotness.ShouldConsumeCommit(deps) {
+		return nil, nil
+	}
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	changesList := deps[uast_items.DependencyUastChanges].([]uast_items.Change)
 	diffs := deps[items.DependencyFileDiff].(map[string]items.FileDiffData)
 	allNodes := map[string]bool{}
@@ -322,6 +328,11 @@ func (shotness *ShotnessAnalysis) Consume(deps map[string]interface{}) (map[stri
 	return nil, nil
 }
 
+// Fork clones this PipelineItem.
+func (shotness *ShotnessAnalysis) Fork(n int) []core.PipelineItem {
+	return core.ForkSamePipelineItem(shotness, n)
+}
+
 // Finalize returns the result of the analysis. Further Consume() calls are not expected.
 func (shotness *ShotnessAnalysis) Finalize() interface{} {
 	result := ShotnessResult{

+ 11 - 2
leaves/shotness_test.go

@@ -77,7 +77,7 @@ func bakeShotness(t *testing.T, eraseEndPosition bool) (*ShotnessAnalysis, Shotn
 	dmp := diffmatchpatch.New()
 	src, dst, _ := dmp.DiffLinesToRunes(string(bytes1), string(bytes2))
 	state := map[string]interface{}{}
-	state["commit"] = &object.Commit{}
+	state[core.DependencyCommit] = &object.Commit{}
 	fileDiffs := map[string]items.FileDiffData{}
 	const fileName = "test.java"
 	fileDiffs[fileName] = items.FileDiffData{
@@ -130,7 +130,7 @@ func TestShotnessConsume(t *testing.T) {
 	dmp := diffmatchpatch.New()
 	src, dst, _ := dmp.DiffLinesToRunes(string(bytes1), string(bytes2))
 	state := map[string]interface{}{}
-	state["commit"] = &object.Commit{}
+	state[core.DependencyCommit] = &object.Commit{}
 	fileDiffs := map[string]items.FileDiffData{}
 	const fileName = "test.java"
 	const newfileName = "new.java"
@@ -209,6 +209,15 @@ func TestShotnessConsume(t *testing.T) {
 	assert.Len(t, sh.files, 0)
 }
 
+func TestShotnessFork(t *testing.T) {
+	sh1 := fixtureShotness()
+	clones := sh1.Fork(1)
+	assert.Len(t, clones, 1)
+	sh2 := clones[0].(*ShotnessAnalysis)
+	assert.True(t, sh1 == sh2)
+	sh1.Merge([]core.PipelineItem{sh2})
+}
+
 func TestShotnessConsumeNoEnd(t *testing.T) {
 	_, result1 := bakeShotness(t, false)
 	_, result2 := bakeShotness(t, true)