Browse Source

Add multiple branch support to plumbing and Burndown

Vadim Markovtsev 6 years ago
parent
commit
e5eb077492

+ 7 - 7
OCTOPUS.md

@@ -5,9 +5,9 @@ It follows the main (zero index) branch when it encounters a fork.
 This behavior ignores all the side branches, and we are currently
 This behavior ignores all the side branches, and we are currently
 thinking how to include them into the analysis.
 thinking how to include them into the analysis.
 
 
-### Plan
+### Plan - done
 
 
-* Commits must be sorted by time.
+* Commits must be ordered topologically.
 * When a fork is hit, clone the pipeline. Assign the old instance to the main branch and new
 * When a fork is hit, clone the pipeline. Assign the old instance to the main branch and new
 instances to the sprouts. BurndownAnalysis should share the same counters for efficiency
 instances to the sprouts. BurndownAnalysis should share the same counters for efficiency
 and simplicity, but the files must be copied.
 and simplicity, but the files must be copied.
@@ -21,7 +21,7 @@ with the previous commit in the main branch.
 * The sequence of commits must be the analysis scenario: it must inform when to fork and to merge,
 * The sequence of commits must be the analysis scenario: it must inform when to fork and to merge,
 which pipeline instance to apply.
 which pipeline instance to apply.
 
 
-### New APIs
+### New APIs - done
 
 
 * PipelineItem
 * PipelineItem
   * `Fork()`
   * `Fork()`
@@ -30,8 +30,8 @@ which pipeline instance to apply.
 ### Major changes
 ### Major changes
 
 
 * `Pipeline`
 * `Pipeline`
-  * `Commits()`
-  * `Run()`
-* `Burndown`
+  * `Commits()` - done
+  * `Run()` - done
+* `Burndown` - done
 * `Couples`
 * `Couples`
-* `FileDiff`
+* `FileDiff` - done

+ 100 - 20
internal/core/pipeline.go

@@ -99,9 +99,18 @@ type PipelineItem interface {
 	Initialize(*git.Repository)
 	Initialize(*git.Repository)
 	// Consume processes the next commit.
 	// Consume processes the next commit.
 	// deps contains the required entities which match Depends(). Besides, it always includes
 	// deps contains the required entities which match Depends(). Besides, it always includes
-	// "commit" and "index".
+	// DependencyCommit and DependencyIndex.
 	// Returns the calculated entities which match Provides().
 	// Returns the calculated entities which match Provides().
 	Consume(deps map[string]interface{}) (map[string]interface{}, error)
 	Consume(deps map[string]interface{}) (map[string]interface{}, error)
+	// Fork clones the item the requested number of times. The data links between the clones
+	// are up to the implementation. Needed to handle Git branches. See also Merge().
+	// Returns a slice with `n` fresh clones. In other words, it does not include the original item.
+	Fork(n int) []PipelineItem
+	// Merge combines several branches together. Each is supposed to have been created with Fork().
+	// The result is stored in the called item, thus this function returns nothing.
+	// Merge() must update all the branches, not only self. When several branches merge, some of
+	// them may continue to live, hence this requirement.
+	Merge(branches []PipelineItem)
 }
 }
 
 
 // FeaturedPipelineItem enables switching the automatic insertion of pipeline items on or off.
 // FeaturedPipelineItem enables switching the automatic insertion of pipeline items on or off.
@@ -197,8 +206,8 @@ func MetadataToCommonAnalysisResult(meta *Metadata) *CommonAnalysisResult {
 // See the extended example of how a Pipeline works in doc.go
 // See the extended example of how a Pipeline works in doc.go
 type Pipeline struct {
 type Pipeline struct {
 	// OnProgress is the callback which is invoked in Analyse() to output it's
 	// OnProgress is the callback which is invoked in Analyse() to output it's
-	// progress. The first argument is the number of processed commits and the
-	// second is the total number of commits.
+	// progress. The first argument is the number of complete steps and the
+	// second is the total number of steps.
 	OnProgress func(int, int)
 	OnProgress func(int, int)
 
 
 	// Repository points to the analysed Git repository struct from go-git.
 	// Repository points to the analysed Git repository struct from go-git.
@@ -227,6 +236,12 @@ const (
 	// ConfigPipelineCommits is the name of the Pipeline configuration option (Pipeline.Initialize())
 	// ConfigPipelineCommits is the name of the Pipeline configuration option (Pipeline.Initialize())
 	// which allows to specify the custom commit sequence. By default, Pipeline.Commits() is used.
 	// which allows to specify the custom commit sequence. By default, Pipeline.Commits() is used.
 	ConfigPipelineCommits = "commits"
 	ConfigPipelineCommits = "commits"
+	// DependencyCommit is the name of one of the two items in `deps` supplied to PipelineItem.Consume()
+	// which always exists. It corresponds to the currently analyzed commit.
+	DependencyCommit = "commit"
+	// DependencyIndex is the name of one of the two items in `deps` supplied to PipelineItem.Consume()
+	// which always exists. It corresponds to the currently analyzed commit's index.
+	DependencyIndex = "index"
 )
 )
 
 
 // NewPipeline initializes a new instance of Pipeline struct.
 // NewPipeline initializes a new instance of Pipeline struct.
@@ -549,33 +564,56 @@ func (pipeline *Pipeline) Run(commits []*object.Commit) (map[LeafPipelineItem]in
 	if onProgress == nil {
 	if onProgress == nil {
 		onProgress = func(int, int) {}
 		onProgress = func(int, int) {}
 	}
 	}
+	plan := prepareRunPlan(commits)
+	progressSteps := len(plan) + 2
+	branches := map[int][]PipelineItem{0: pipeline.items}
 
 
-	for index, commit := range commits {
-		onProgress(index, len(commits))
-		state := map[string]interface{}{"commit": commit, "index": index}
-		for _, item := range pipeline.items {
-			update, err := item.Consume(state)
-			if err != nil {
-				log.Printf("%s failed on commit #%d %s\n",
-					item.Name(), index, commit.Hash.String())
-				return nil, err
+	for index, step := range plan {
+		onProgress(index + 1, progressSteps)
+		firstItem := step.Items[0]
+		switch step.Action {
+		case runActionCommit:
+			state := map[string]interface{}{
+				DependencyCommit: step.Commit,
+				DependencyIndex: index,
 			}
 			}
-			for _, key := range item.Provides() {
-				val, ok := update[key]
-				if !ok {
-					panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
+			for _, item := range branches[firstItem] {
+				update, err := item.Consume(state)
+				if err != nil {
+					log.Printf("%s failed on commit #%d %s\n",
+						item.Name(), index + 1, step.Commit.Hash.String())
+					return nil, err
+				}
+				for _, key := range item.Provides() {
+					val, ok := update[key]
+					if !ok {
+						panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
+					}
+					state[key] = val
 				}
 				}
-				state[key] = val
 			}
 			}
+		case runActionFork:
+			for i, clone := range cloneItems(branches[firstItem], len(step.Items)-1) {
+				branches[step.Items[i+1]] = clone
+			}
+		case runActionMerge:
+			merged := make([][]PipelineItem, len(step.Items))
+			for i, b := range step.Items {
+				merged[i] = branches[b]
+			}
+			mergeItems(merged)
+		case runActionDelete:
+			delete(branches, firstItem)
 		}
 		}
 	}
 	}
-	onProgress(len(commits), len(commits))
+	onProgress(len(plan) + 1, progressSteps)
 	result := map[LeafPipelineItem]interface{}{}
 	result := map[LeafPipelineItem]interface{}{}
-	for _, item := range pipeline.items {
+	for _, item := range getMasterBranch(branches) {
 		if casted, ok := item.(LeafPipelineItem); ok {
 		if casted, ok := item.(LeafPipelineItem); ok {
 			result[casted] = casted.Finalize()
 			result[casted] = casted.Finalize()
 		}
 		}
 	}
 	}
+	onProgress(progressSteps, progressSteps)
 	result[nil] = &CommonAnalysisResult{
 	result[nil] = &CommonAnalysisResult{
 		BeginTime:     commits[0].Author.When.Unix(),
 		BeginTime:     commits[0].Author.When.Unix(),
 		EndTime:       commits[len(commits)-1].Author.When.Unix(),
 		EndTime:       commits[len(commits)-1].Author.When.Unix(),
@@ -600,7 +638,7 @@ func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Com
 		file = os.Stdin
 		file = os.Stdin
 	}
 	}
 	scanner := bufio.NewScanner(file)
 	scanner := bufio.NewScanner(file)
-	commits := []*object.Commit{}
+	var commits []*object.Commit
 	for scanner.Scan() {
 	for scanner.Scan() {
 		hash := plumbing.NewHash(scanner.Text())
 		hash := plumbing.NewHash(scanner.Text())
 		if len(hash) != 20 {
 		if len(hash) != 20 {
@@ -616,9 +654,13 @@ func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Com
 }
 }
 
 
 const (
 const (
+	// runActionCommit corresponds to a regular commit
 	runActionCommit = 0
 	runActionCommit = 0
+	// runActionFork splits a branch into several parts
 	runActionFork = iota
 	runActionFork = iota
+	// runActionMerge merges several branches together
 	runActionMerge = iota
 	runActionMerge = iota
+	// runActionDelete removes the branch as it is no longer needed
 	runActionDelete = iota
 	runActionDelete = iota
 )
 )
 
 
@@ -628,6 +670,44 @@ type runAction struct {
 	Items []int
 	Items []int
 }
 }
 
 
+func cloneItems(origin []PipelineItem, n int) [][]PipelineItem {
+	clones := make([][]PipelineItem, n)
+	for j := 0; j < n; j++ {
+		clones[j] = make([]PipelineItem, len(origin))
+	}
+	for i, item := range origin {
+		itemClones := item.Fork(n)
+		for j := 0; j < n; j++ {
+			clones[j][i] = itemClones[j]
+		}
+	}
+	return clones
+}
+
+func mergeItems(branches [][]PipelineItem) {
+	buffer := make([]PipelineItem, len(branches) - 1)
+	for i, item := range branches[0] {
+		for j := 0; j < len(branches)-1; j++ {
+			buffer[j] = branches[j+1][i]
+		}
+		item.Merge(buffer)
+	}
+}
+
+// getMasterBranch returns the branch with the smallest index.
+func getMasterBranch(branches map[int][]PipelineItem) []PipelineItem {
+	minKey := 1 << 31
+	var minVal []PipelineItem
+	for key, val := range branches {
+		if key < minKey {
+			minKey = key
+			minVal = val
+		}
+	}
+	return minVal
+}
+
+// prepareRunPlan schedules the actions for Pipeline.Run().
 func prepareRunPlan(commits []*object.Commit) []runAction {
 func prepareRunPlan(commits []*object.Commit) []runAction {
 	hashes, dag := buildDag(commits)
 	hashes, dag := buildDag(commits)
 	leaveRootComponent(hashes, dag)
 	leaveRootComponent(hashes, dag)

+ 67 - 10
internal/core/pipeline_test.go

@@ -20,6 +20,8 @@ import (
 type testPipelineItem struct {
 type testPipelineItem struct {
 	Initialized   bool
 	Initialized   bool
 	DepsConsumed  bool
 	DepsConsumed  bool
+	Forked        bool
+	Merged        *bool
 	CommitMatches bool
 	CommitMatches bool
 	IndexMatches  bool
 	IndexMatches  bool
 	TestError     bool
 	TestError     bool
@@ -69,13 +71,13 @@ func (item *testPipelineItem) Consume(deps map[string]interface{}) (map[string]i
 	if item.TestError {
 	if item.TestError {
 		return nil, errors.New("error")
 		return nil, errors.New("error")
 	}
 	}
-	obj, exists := deps["commit"]
+	obj, exists := deps[DependencyCommit]
 	item.DepsConsumed = exists
 	item.DepsConsumed = exists
 	if item.DepsConsumed {
 	if item.DepsConsumed {
 		commit := obj.(*object.Commit)
 		commit := obj.(*object.Commit)
 		item.CommitMatches = commit.Hash == plumbing.NewHash(
 		item.CommitMatches = commit.Hash == plumbing.NewHash(
 			"af9ddc0db70f09f3f27b4b98e415592a7485171c")
 			"af9ddc0db70f09f3f27b4b98e415592a7485171c")
-		obj, item.DepsConsumed = deps["index"]
+		obj, item.DepsConsumed = deps[DependencyIndex]
 		if item.DepsConsumed {
 		if item.DepsConsumed {
 			item.IndexMatches = obj.(int) == 0
 			item.IndexMatches = obj.(int) == 0
 		}
 		}
@@ -83,6 +85,19 @@ func (item *testPipelineItem) Consume(deps map[string]interface{}) (map[string]i
 	return map[string]interface{}{"test": item}, nil
 	return map[string]interface{}{"test": item}, nil
 }
 }
 
 
+func (item *testPipelineItem) Fork(n int) []PipelineItem {
+	result := make([]PipelineItem, n)
+	for i := 0; i < n; i++ {
+		result[i] = &testPipelineItem{Merged: item.Merged}
+	}
+	item.Forked = true
+	return result
+}
+
+func (item *testPipelineItem) Merge(branches []PipelineItem) {
+	*item.Merged = true
+}
+
 func (item *testPipelineItem) Finalize() interface{} {
 func (item *testPipelineItem) Finalize() interface{} {
 	return item
 	return item
 }
 }
@@ -140,6 +155,13 @@ func (item *dependingTestPipelineItem) Consume(deps map[string]interface{}) (map
 	return nil, nil
 	return nil, nil
 }
 }
 
 
+func (item *dependingTestPipelineItem) Fork(n int) []PipelineItem {
+	return nil
+}
+
+func (item *dependingTestPipelineItem) Merge(branches []PipelineItem) {
+}
+
 func (item *dependingTestPipelineItem) Finalize() interface{} {
 func (item *dependingTestPipelineItem) Finalize() interface{} {
 	return true
 	return true
 }
 }
@@ -176,7 +198,7 @@ func TestPipelineFeatures(t *testing.T) {
 
 
 func TestPipelineRun(t *testing.T) {
 func TestPipelineRun(t *testing.T) {
 	pipeline := NewPipeline(test.Repository)
 	pipeline := NewPipeline(test.Repository)
-	item := &testPipelineItem{}
+	item := &testPipelineItem{Merged: new(bool)}
 	pipeline.AddItem(item)
 	pipeline.AddItem(item)
 	pipeline.Initialize(map[string]interface{}{})
 	pipeline.Initialize(map[string]interface{}{})
 	assert.True(t, item.Initialized)
 	assert.True(t, item.Initialized)
@@ -195,22 +217,58 @@ func TestPipelineRun(t *testing.T) {
 	assert.True(t, item.DepsConsumed)
 	assert.True(t, item.DepsConsumed)
 	assert.True(t, item.CommitMatches)
 	assert.True(t, item.CommitMatches)
 	assert.True(t, item.IndexMatches)
 	assert.True(t, item.IndexMatches)
+	assert.False(t, item.Forked)
+	assert.False(t, *item.Merged)
 	pipeline.RemoveItem(item)
 	pipeline.RemoveItem(item)
 	result, err = pipeline.Run(commits)
 	result, err = pipeline.Run(commits)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, 1, len(result))
 	assert.Equal(t, 1, len(result))
 }
 }
 
 
+func TestPipelineRunBranches(t *testing.T) {
+	pipeline := NewPipeline(test.Repository)
+	item := &testPipelineItem{Merged: new(bool)}
+	pipeline.AddItem(item)
+	pipeline.Initialize(map[string]interface{}{})
+	assert.True(t, item.Initialized)
+	commits := make([]*object.Commit, 5)
+	hashes := []string {
+		"6db8065cdb9bb0758f36a7e75fc72ab95f9e8145",
+		"f30daba81ff2bf0b3ba02a1e1441e74f8a4f6fee",
+		"8a03b5620b1caa72ec9cb847ea88332621e2950a",
+		"dd9dd084d5851d7dc4399fc7dbf3d8292831ebc5",
+		"f4ed0405b14f006c0744029d87ddb3245607587a",
+	}
+	for i, h := range hashes {
+		var err error
+		commits[i], err = test.Repository.CommitObject(plumbing.NewHash(h))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+	result, err := pipeline.Run(commits)
+	assert.Nil(t, err)
+	assert.True(t, item.Forked)
+	assert.True(t, *item.Merged)
+	assert.Equal(t, 2, len(result))
+	assert.Equal(t, item, result[item].(*testPipelineItem))
+	common := result[nil].(*CommonAnalysisResult)
+	assert.Equal(t, common.CommitsNumber, 5)
+}
+
 func TestPipelineOnProgress(t *testing.T) {
 func TestPipelineOnProgress(t *testing.T) {
 	pipeline := NewPipeline(test.Repository)
 	pipeline := NewPipeline(test.Repository)
-	var progressOk1, progressOk2 bool
+	progressOk := 0
 
 
 	onProgress := func(step int, total int) {
 	onProgress := func(step int, total int) {
-		if step == 0 && total == 1 {
-			progressOk1 = true
+		if step == 1 && total == 3 {
+			progressOk++
+		}
+		if step == 2 && total == 3 {
+			progressOk++
 		}
 		}
-		if step == 1 && total == 1 && progressOk1 {
-			progressOk2 = true
+		if step == 3 && total == 3 {
+			progressOk++
 		}
 		}
 	}
 	}
 
 
@@ -221,8 +279,7 @@ func TestPipelineOnProgress(t *testing.T) {
 	result, err := pipeline.Run(commits)
 	result, err := pipeline.Run(commits)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, 1, len(result))
 	assert.Equal(t, 1, len(result))
-	assert.True(t, progressOk1)
-	assert.True(t, progressOk2)
+	assert.Equal(t, 3, progressOk)
 }
 }
 
 
 func TestPipelineCommits(t *testing.T) {
 func TestPipelineCommits(t *testing.T) {

+ 14 - 0
internal/core/registry_test.go

@@ -59,6 +59,13 @@ func (item *dummyPipelineItem) Consume(deps map[string]interface{}) (map[string]
 	return map[string]interface{}{"dummy": nil}, nil
 	return map[string]interface{}{"dummy": nil}, nil
 }
 }
 
 
+func (item *dummyPipelineItem) Fork(n int) []PipelineItem {
+	return nil
+}
+
+func (item *dummyPipelineItem) Merge(branches []PipelineItem) {
+}
+
 type dummyPipelineItem2 struct{}
 type dummyPipelineItem2 struct{}
 
 
 func (item *dummyPipelineItem2) Name() string {
 func (item *dummyPipelineItem2) Name() string {
@@ -92,6 +99,13 @@ func (item *dummyPipelineItem2) Consume(deps map[string]interface{}) (map[string
 	return map[string]interface{}{"dummy2": nil}, nil
 	return map[string]interface{}{"dummy2": nil}, nil
 }
 }
 
 
+func (item *dummyPipelineItem2) Fork(n int) []PipelineItem {
+	return nil
+}
+
+func (item *dummyPipelineItem2) Merge(branches []PipelineItem) {
+}
+
 func TestRegistrySummon(t *testing.T) {
 func TestRegistrySummon(t *testing.T) {
 	reg := getRegistry()
 	reg := getRegistry()
 	reg.Register(&testPipelineItem{})
 	reg.Register(&testPipelineItem{})

+ 55 - 29
internal/plumbing/blob_cache.go

@@ -17,19 +17,19 @@ import (
 // It must provide the old and the new objects; "blobCache" rotates and allows to not load
 // It must provide the old and the new objects; "blobCache" rotates and allows to not load
 // the same blobs twice. Outdated objects are removed so "blobCache" never grows big.
 // the same blobs twice. Outdated objects are removed so "blobCache" never grows big.
 type BlobCache struct {
 type BlobCache struct {
-	// Specifies how to handle the situation when we encounter a git submodule - an object without
-	// the blob. If false, we look inside .gitmodules and if don't find, raise an error.
-	// If true, we do not look inside .gitmodules and always succeed.
-	IgnoreMissingSubmodules bool
+	// Specifies how to handle the situation when we encounter a git submodule - an object
+	// without the blob. If true, we look inside .gitmodules and if we don't find it,
+	// raise an error. If false, we do not look inside .gitmodules and always succeed.
+	FailOnMissingSubmodules bool
 
 
 	repository *git.Repository
 	repository *git.Repository
 	cache      map[plumbing.Hash]*object.Blob
 	cache      map[plumbing.Hash]*object.Blob
 }
 }
 
 
 const (
 const (
-	// ConfigBlobCacheIgnoreMissingSubmodules is the name of the configuration option for
-	// BlobCache.Configure() to not check if the referenced submodules exist.
-	ConfigBlobCacheIgnoreMissingSubmodules = "BlobCache.IgnoreMissingSubmodules"
+	// ConfigBlobCacheFailOnMissingSubmodules is the name of the configuration option for
+	// BlobCache.Configure() to check if the referenced submodules are registered in .gitignore.
+	ConfigBlobCacheFailOnMissingSubmodules = "BlobCache.FailOnMissingSubmodules"
 	// DependencyBlobCache identifies the dependency provided by BlobCache.
 	// DependencyBlobCache identifies the dependency provided by BlobCache.
 	DependencyBlobCache = "blob_cache"
 	DependencyBlobCache = "blob_cache"
 )
 )
@@ -58,11 +58,11 @@ func (blobCache *BlobCache) Requires() []string {
 // ListConfigurationOptions returns the list of changeable public properties of this PipelineItem.
 // ListConfigurationOptions returns the list of changeable public properties of this PipelineItem.
 func (blobCache *BlobCache) ListConfigurationOptions() []core.ConfigurationOption {
 func (blobCache *BlobCache) ListConfigurationOptions() []core.ConfigurationOption {
 	options := [...]core.ConfigurationOption{{
 	options := [...]core.ConfigurationOption{{
-		Name: ConfigBlobCacheIgnoreMissingSubmodules,
-		Description: "Specifies whether to panic if some referenced submodules do not exist and thus" +
-			" the corresponding Git objects cannot be loaded. Override this if you know that the " +
-			"history is dirty and you want to get things done.",
-		Flag:    "ignore-missing-submodules",
+		Name: ConfigBlobCacheFailOnMissingSubmodules,
+		Description: "Specifies whether to panic if any referenced submodule does " +
+			"not exist in .gitmodules and thus the corresponding Git object cannot be loaded. " +
+			"Override this if you want to ensure that your repository is integral. ",
+		Flag:    "fail-on-missing-submodules",
 		Type:    core.BoolConfigurationOption,
 		Type:    core.BoolConfigurationOption,
 		Default: false}}
 		Default: false}}
 	return options[:]
 	return options[:]
@@ -70,8 +70,8 @@ func (blobCache *BlobCache) ListConfigurationOptions() []core.ConfigurationOptio
 
 
 // Configure sets the properties previously published by ListConfigurationOptions().
 // Configure sets the properties previously published by ListConfigurationOptions().
 func (blobCache *BlobCache) Configure(facts map[string]interface{}) {
 func (blobCache *BlobCache) Configure(facts map[string]interface{}) {
-	if val, exists := facts[ConfigBlobCacheIgnoreMissingSubmodules].(bool); exists {
-		blobCache.IgnoreMissingSubmodules = val
+	if val, exists := facts[ConfigBlobCacheFailOnMissingSubmodules].(bool); exists {
+		blobCache.FailOnMissingSubmodules = val
 	}
 	}
 }
 }
 
 
@@ -84,11 +84,12 @@ func (blobCache *BlobCache) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
-// This function returns the mapping with analysis results. The keys must be the same as
-// in Provides(). If there was an error, nil is returned.
+// Additionally, DependencyCommit is always present there and represents
+// the analysed *object.Commit. This function returns the mapping with analysis
+// results. The keys must be the same as in Provides(). If there was an error,
+// nil is returned.
 func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	changes := deps[DependencyTreeChanges].(object.Changes)
 	changes := deps[DependencyTreeChanges].(object.Changes)
 	cache := map[plumbing.Hash]*object.Blob{}
 	cache := map[plumbing.Hash]*object.Blob{}
 	newCache := map[plumbing.Hash]*object.Blob{}
 	newCache := map[plumbing.Hash]*object.Blob{}
@@ -104,22 +105,25 @@ func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]int
 		case merkletrie.Insert:
 		case merkletrie.Insert:
 			blob, err = blobCache.getBlob(&change.To, commit.File)
 			blob, err = blobCache.getBlob(&change.To, commit.File)
 			if err != nil {
 			if err != nil {
-				log.Printf("file to %s %s\n", change.To.Name, change.To.TreeEntry.Hash)
+				log.Printf("file to %s %s\n",
+					change.To.Name, change.To.TreeEntry.Hash)
 			} else {
 			} else {
 				cache[change.To.TreeEntry.Hash] = blob
 				cache[change.To.TreeEntry.Hash] = blob
 				newCache[change.To.TreeEntry.Hash] = blob
 				newCache[change.To.TreeEntry.Hash] = blob
 			}
 			}
 		case merkletrie.Delete:
 		case merkletrie.Delete:
-			cache[change.From.TreeEntry.Hash], exists = blobCache.cache[change.From.TreeEntry.Hash]
+			cache[change.From.TreeEntry.Hash], exists =
+				blobCache.cache[change.From.TreeEntry.Hash]
 			if !exists {
 			if !exists {
-				cache[change.From.TreeEntry.Hash], err = blobCache.getBlob(&change.From, commit.File)
+				cache[change.From.TreeEntry.Hash], err =
+					blobCache.getBlob(&change.From, commit.File)
 				if err != nil {
 				if err != nil {
 					if err.Error() != plumbing.ErrObjectNotFound.Error() {
 					if err.Error() != plumbing.ErrObjectNotFound.Error() {
 						log.Printf("file from %s %s\n", change.From.Name,
 						log.Printf("file from %s %s\n", change.From.Name,
 							change.From.TreeEntry.Hash)
 							change.From.TreeEntry.Hash)
 					} else {
 					} else {
-						cache[change.From.TreeEntry.Hash], err = internal.CreateDummyBlob(
-							change.From.TreeEntry.Hash)
+						cache[change.From.TreeEntry.Hash], err =
+							internal.CreateDummyBlob(change.From.TreeEntry.Hash)
 					}
 					}
 				}
 				}
 			}
 			}
@@ -131,9 +135,11 @@ func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]int
 				cache[change.To.TreeEntry.Hash] = blob
 				cache[change.To.TreeEntry.Hash] = blob
 				newCache[change.To.TreeEntry.Hash] = blob
 				newCache[change.To.TreeEntry.Hash] = blob
 			}
 			}
-			cache[change.From.TreeEntry.Hash], exists = blobCache.cache[change.From.TreeEntry.Hash]
+			cache[change.From.TreeEntry.Hash], exists =
+				blobCache.cache[change.From.TreeEntry.Hash]
 			if !exists {
 			if !exists {
-				cache[change.From.TreeEntry.Hash], err = blobCache.getBlob(&change.From, commit.File)
+				cache[change.From.TreeEntry.Hash], err =
+					blobCache.getBlob(&change.From, commit.File)
 				if err != nil {
 				if err != nil {
 					log.Printf("file from %s\n", change.From.Name)
 					log.Printf("file from %s\n", change.From.Name)
 				}
 				}
@@ -147,9 +153,29 @@ func (blobCache *BlobCache) Consume(deps map[string]interface{}) (map[string]int
 	return map[string]interface{}{DependencyBlobCache: cache}, nil
 	return map[string]interface{}{DependencyBlobCache: cache}, nil
 }
 }
 
 
-// FileGetter defines a function which loads the Git file by the specified path.
-// The state can be arbitrary though here it always corresponds to the currently processed
-// commit.
+func (blobCache *BlobCache) Fork(n int) []core.PipelineItem {
+	caches := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		cache := map[plumbing.Hash]*object.Blob{}
+		for k, v := range blobCache.cache {
+			cache[k] = v
+		}
+		caches[i] = &BlobCache{
+			FailOnMissingSubmodules: blobCache.FailOnMissingSubmodules,
+			repository: blobCache.repository,
+			cache: cache,
+		}
+	}
+	return caches
+}
+
+func (blobCache *BlobCache) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
+// FileGetter defines a function which loads the Git file by
+// the specified path. The state can be arbitrary though here it always
+// corresponds to the currently processed commit.
 type FileGetter func(path string) (*object.File, error)
 type FileGetter func(path string) (*object.File, error)
 
 
 // Returns the blob which corresponds to the specified ChangeEntry.
 // Returns the blob which corresponds to the specified ChangeEntry.
@@ -164,7 +190,7 @@ func (blobCache *BlobCache) getBlob(entry *object.ChangeEntry, fileGetter FileGe
 		if entry.TreeEntry.Mode != 0160000 {
 		if entry.TreeEntry.Mode != 0160000 {
 			// this is not a submodule
 			// this is not a submodule
 			return nil, err
 			return nil, err
-		} else if blobCache.IgnoreMissingSubmodules {
+		} else if !blobCache.FailOnMissingSubmodules {
 			return internal.CreateDummyBlob(entry.TreeEntry.Hash)
 			return internal.CreateDummyBlob(entry.TreeEntry.Hash)
 		}
 		}
 		file, errModules := fileGetter(".gitmodules")
 		file, errModules := fileGetter(".gitmodules")

+ 52 - 17
internal/plumbing/blob_cache_test.go

@@ -20,14 +20,14 @@ func fixtureBlobCache() *BlobCache {
 func TestBlobCacheConfigureInitialize(t *testing.T) {
 func TestBlobCacheConfigureInitialize(t *testing.T) {
 	cache := fixtureBlobCache()
 	cache := fixtureBlobCache()
 	assert.Equal(t, test.Repository, cache.repository)
 	assert.Equal(t, test.Repository, cache.repository)
-	assert.False(t, cache.IgnoreMissingSubmodules)
+	assert.False(t, cache.FailOnMissingSubmodules)
 	facts := map[string]interface{}{}
 	facts := map[string]interface{}{}
-	facts[ConfigBlobCacheIgnoreMissingSubmodules] = true
+	facts[ConfigBlobCacheFailOnMissingSubmodules] = true
 	cache.Configure(facts)
 	cache.Configure(facts)
-	assert.True(t, cache.IgnoreMissingSubmodules)
+	assert.True(t, cache.FailOnMissingSubmodules)
 	facts = map[string]interface{}{}
 	facts = map[string]interface{}{}
 	cache.Configure(facts)
 	cache.Configure(facts)
-	assert.True(t, cache.IgnoreMissingSubmodules)
+	assert.True(t, cache.FailOnMissingSubmodules)
 }
 }
 
 
 func TestBlobCacheMetadata(t *testing.T) {
 func TestBlobCacheMetadata(t *testing.T) {
@@ -40,7 +40,7 @@ func TestBlobCacheMetadata(t *testing.T) {
 	assert.Equal(t, cache.Requires()[0], changes.Provides()[0])
 	assert.Equal(t, cache.Requires()[0], changes.Provides()[0])
 	opts := cache.ListConfigurationOptions()
 	opts := cache.ListConfigurationOptions()
 	assert.Len(t, opts, 1)
 	assert.Len(t, opts, 1)
-	assert.Equal(t, opts[0].Name, ConfigBlobCacheIgnoreMissingSubmodules)
+	assert.Equal(t, opts[0].Name, ConfigBlobCacheFailOnMissingSubmodules)
 }
 }
 
 
 func TestBlobCacheRegistration(t *testing.T) {
 func TestBlobCacheRegistration(t *testing.T) {
@@ -78,7 +78,7 @@ func TestBlobCacheConsumeModification(t *testing.T) {
 		},
 		},
 	}}
 	}}
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
@@ -124,7 +124,7 @@ func TestBlobCacheConsumeInsertionDeletion(t *testing.T) {
 	},
 	},
 	}
 	}
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
@@ -151,7 +151,7 @@ func TestBlobCacheConsumeNoAction(t *testing.T) {
 		"63076fa0dfd93e94b6d2ef0fc8b1fdf9092f83c4"))
 		"63076fa0dfd93e94b6d2ef0fc8b1fdf9092f83c4"))
 	changes[0] = &object.Change{From: object.ChangeEntry{}, To: object.ChangeEntry{}}
 	changes[0] = &object.Change{From: object.ChangeEntry{}, To: object.ChangeEntry{}}
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, result)
 	assert.Nil(t, result)
@@ -188,7 +188,7 @@ func TestBlobCacheConsumeBadHashes(t *testing.T) {
 		TreeEntry: object.TreeEntry{},
 		TreeEntry: object.TreeEntry{},
 	}}
 	}}
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, result)
 	assert.Nil(t, result)
@@ -235,7 +235,7 @@ func TestBlobCacheConsumeInvalidHash(t *testing.T) {
 		TreeEntry: object.TreeEntry{},
 		TreeEntry: object.TreeEntry{},
 	}}
 	}}
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, result)
 	assert.Nil(t, result)
@@ -294,7 +294,7 @@ func TestBlobCacheDeleteInvalidBlob(t *testing.T) {
 	}, To: object.ChangeEntry{},
 	}, To: object.ChangeEntry{},
 	}
 	}
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
@@ -325,7 +325,7 @@ func TestBlobCacheInsertInvalidBlob(t *testing.T) {
 	},
 	},
 	}
 	}
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	deps[DependencyTreeChanges] = changes
 	deps[DependencyTreeChanges] = changes
 	result, err := fixtureBlobCache().Consume(deps)
 	result, err := fixtureBlobCache().Consume(deps)
 	assert.NotNil(t, err)
 	assert.NotNil(t, err)
@@ -334,14 +334,14 @@ func TestBlobCacheInsertInvalidBlob(t *testing.T) {
 
 
 func TestBlobCacheGetBlobIgnoreMissing(t *testing.T) {
 func TestBlobCacheGetBlobIgnoreMissing(t *testing.T) {
 	cache := fixtureBlobCache()
 	cache := fixtureBlobCache()
-	cache.IgnoreMissingSubmodules = true
+	cache.FailOnMissingSubmodules = false
 	treeFrom, _ := test.Repository.TreeObject(plumbing.NewHash(
 	treeFrom, _ := test.Repository.TreeObject(plumbing.NewHash(
 		"80fe25955b8e725feee25c08ea5759d74f8b670d"))
 		"80fe25955b8e725feee25c08ea5759d74f8b670d"))
 	entry := object.ChangeEntry{
 	entry := object.ChangeEntry{
-		Name: "commit",
+		Name: core.DependencyCommit,
 		Tree: treeFrom,
 		Tree: treeFrom,
 		TreeEntry: object.TreeEntry{
 		TreeEntry: object.TreeEntry{
-			Name: "commit",
+			Name: core.DependencyCommit,
 			Mode: 0160000,
 			Mode: 0160000,
 			Hash: plumbing.NewHash("ffffffffffffffffffffffffffffffffffffffff"),
 			Hash: plumbing.NewHash("ffffffffffffffffffffffffffffffffffffffff"),
 		},
 		},
@@ -353,7 +353,7 @@ func TestBlobCacheGetBlobIgnoreMissing(t *testing.T) {
 	assert.NotNil(t, blob)
 	assert.NotNil(t, blob)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, blob.Size, int64(0))
 	assert.Equal(t, blob.Size, int64(0))
-	cache.IgnoreMissingSubmodules = false
+	cache.FailOnMissingSubmodules = true
 	getter = func(path string) (*object.File, error) {
 	getter = func(path string) (*object.File, error) {
 		assert.Equal(t, path, ".gitmodules")
 		assert.Equal(t, path, ".gitmodules")
 		commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		commit, _ := test.Repository.CommitObject(plumbing.NewHash(
@@ -367,7 +367,7 @@ func TestBlobCacheGetBlobIgnoreMissing(t *testing.T) {
 
 
 func TestBlobCacheGetBlobGitModulesErrors(t *testing.T) {
 func TestBlobCacheGetBlobGitModulesErrors(t *testing.T) {
 	cache := fixtureBlobCache()
 	cache := fixtureBlobCache()
-	cache.IgnoreMissingSubmodules = false
+	cache.FailOnMissingSubmodules = true
 	entry := object.ChangeEntry{
 	entry := object.ChangeEntry{
 		Name: "labours.py",
 		Name: "labours.py",
 		TreeEntry: object.TreeEntry{
 		TreeEntry: object.TreeEntry{
@@ -402,3 +402,38 @@ func TestBlobCacheGetBlobGitModulesErrors(t *testing.T) {
 	assert.NotNil(t, err)
 	assert.NotNil(t, err)
 	assert.NotEqual(t, err.Error(), plumbing.ErrObjectNotFound.Error())
 	assert.NotEqual(t, err.Error(), plumbing.ErrObjectNotFound.Error())
 }
 }
+
+func TestBlobCacheFork(t *testing.T) {
+	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
+		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
+	changes := make(object.Changes, 1)
+	treeTo, _ := test.Repository.TreeObject(plumbing.NewHash(
+		"251f2094d7b523d5bcc60e663b6cf38151bf8844"))
+	hash := plumbing.NewHash("db99e1890f581ad69e1527fe8302978c661eb473")
+	changes[0] = &object.Change{From: object.ChangeEntry{}, To: object.ChangeEntry{
+		Name: "pipeline.go",
+		Tree: treeTo,
+		TreeEntry: object.TreeEntry{
+			Name: "pipeline.go",
+			Mode: 0100644,
+			Hash: hash,
+		},
+	}}
+	deps := map[string]interface{}{}
+	deps[core.DependencyCommit] = commit
+	deps[DependencyTreeChanges] = changes
+	cache1 := fixtureBlobCache()
+	cache1.FailOnMissingSubmodules = true
+	cache1.Consume(deps)
+	clones := cache1.Fork(1)
+	assert.Len(t, clones, 1)
+	cache2 := clones[0].(*BlobCache)
+	assert.True(t, cache2.FailOnMissingSubmodules)
+	assert.Equal(t, cache1.repository, cache2.repository)
+	cache1.cache[plumbing.ZeroHash] = nil
+	assert.Len(t, cache1.cache, 2)
+	assert.Len(t, cache2.cache, 1)
+	assert.Equal(t, cache1.cache[hash].Size, cache2.cache[hash].Size)
+	// just for the sake of it
+	cache1.Merge([]core.PipelineItem{cache2})
+}

+ 30 - 4
internal/plumbing/day.go

@@ -77,12 +77,12 @@ func (days *DaysSinceStart) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (days *DaysSinceStart) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (days *DaysSinceStart) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
-	index := deps["index"].(int)
+	commit := deps[core.DependencyCommit].(*object.Commit)
+	index := deps[core.DependencyIndex].(int)
 	if index == 0 {
 	if index == 0 {
 		// first iteration - initialize the file objects from the tree
 		// first iteration - initialize the file objects from the tree
 		days.day0 = commit.Author.When
 		days.day0 = commit.Author.When
@@ -99,10 +99,36 @@ func (days *DaysSinceStart) Consume(deps map[string]interface{}) (map[string]int
 	if dayCommits == nil {
 	if dayCommits == nil {
 		dayCommits = []plumbing.Hash{}
 		dayCommits = []plumbing.Hash{}
 	}
 	}
-	days.commits[day] = append(dayCommits, commit.Hash)
+	exists := false
+	if commit.NumParents() > 0 {
+		for i := range dayCommits {
+			if dayCommits[len(dayCommits)-i-1] == commit.Hash {
+				exists = true
+			}
+		}
+	}
+	if !exists {
+		days.commits[day] = append(dayCommits, commit.Hash)
+	}
 	return map[string]interface{}{DependencyDay: day}, nil
 	return map[string]interface{}{DependencyDay: day}, nil
 }
 }
 
 
+func (days *DaysSinceStart) Fork(n int) []core.PipelineItem {
+	clones := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		clones[i] = &DaysSinceStart{
+			previousDay: days.previousDay,
+			day0: days.day0,
+			commits: days.commits,
+		}
+	}
+	return clones
+}
+
+func (days *DaysSinceStart) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
 func init() {
 func init() {
 	core.Registry.Register(&DaysSinceStart{})
 	core.Registry.Register(&DaysSinceStart{})
 }
 }

+ 27 - 10
internal/plumbing/day_test.go

@@ -40,8 +40,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"cce947b98a050c6d356bc6ba95030254914027b1"))
 		"cce947b98a050c6d356bc6ba95030254914027b1"))
-	deps["commit"] = commit
-	deps["index"] = 0
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 0
 	res, err := dss.Consume(deps)
 	res, err := dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 0)
 	assert.Equal(t, res[DependencyDay].(int), 0)
@@ -52,8 +52,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 
 
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"fc9ceecb6dabcb2aab60e8619d972e8d8208a7df"))
 		"fc9ceecb6dabcb2aab60e8619d972e8d8208a7df"))
-	deps["commit"] = commit
-	deps["index"] = 10
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 10
 	res, err = dss.Consume(deps)
 	res, err = dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 1)
 	assert.Equal(t, res[DependencyDay].(int), 1)
@@ -61,8 +61,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 
 
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"a3ee37f91f0d705ec9c41ae88426f0ae44b2fbc3"))
 		"a3ee37f91f0d705ec9c41ae88426f0ae44b2fbc3"))
-	deps["commit"] = commit
-	deps["index"] = 20
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 20
 	res, err = dss.Consume(deps)
 	res, err = dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 1)
 	assert.Equal(t, res[DependencyDay].(int), 1)
@@ -70,8 +70,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 
 
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"a8b665a65d7aced63f5ba2ff6d9b71dac227f8cf"))
 		"a8b665a65d7aced63f5ba2ff6d9b71dac227f8cf"))
-	deps["commit"] = commit
-	deps["index"] = 20
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 20
 	res, err = dss.Consume(deps)
 	res, err = dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 2)
 	assert.Equal(t, res[DependencyDay].(int), 2)
@@ -79,8 +79,8 @@ func TestDaysSinceStartConsume(t *testing.T) {
 
 
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"186ff0d7e4983637bb3762a24d6d0a658e7f4712"))
 		"186ff0d7e4983637bb3762a24d6d0a658e7f4712"))
-	deps["commit"] = commit
-	deps["index"] = 30
+	deps[core.DependencyCommit] = commit
+	deps[core.DependencyIndex] = 30
 	res, err = dss.Consume(deps)
 	res, err = dss.Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyDay].(int), 2)
 	assert.Equal(t, res[DependencyDay].(int), 2)
@@ -106,3 +106,20 @@ func TestDaysCommits(t *testing.T) {
 	assert.Len(t, dss.commits, 0)
 	assert.Len(t, dss.commits, 0)
 	assert.Equal(t, dss.commits, commits)
 	assert.Equal(t, dss.commits, commits)
 }
 }
+
+func TestDaysSinceStartFork(t *testing.T) {
+	dss1 := fixtureDaysSinceStart()
+	dss1.commits[0] = []plumbing.Hash{plumbing.NewHash(
+		"cce947b98a050c6d356bc6ba95030254914027b1")}
+	clones := dss1.Fork(1)
+	assert.Len(t, clones, 1)
+	dss2 := clones[0].(*DaysSinceStart)
+	assert.Equal(t, dss1.day0, dss2.day0)
+	assert.Equal(t, dss1.previousDay, dss2.previousDay)
+	assert.Equal(t, dss1.commits, dss2.commits)
+	dss1.commits[0] = append(dss1.commits[0], plumbing.ZeroHash)
+	assert.Len(t, dss2.commits[0], 2)
+	assert.True(t, dss1 != dss2)
+	// just for the sake of it
+	dss1.Merge([]core.PipelineItem{dss2})
+}

+ 13 - 1
internal/plumbing/diff.go

@@ -84,7 +84,7 @@ func (diff *FileDiff) Initialize(repository *git.Repository) {}
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (diff *FileDiff) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (diff *FileDiff) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -128,6 +128,18 @@ func (diff *FileDiff) Consume(deps map[string]interface{}) (map[string]interface
 	return map[string]interface{}{DependencyFileDiff: result}, nil
 	return map[string]interface{}{DependencyFileDiff: result}, nil
 }
 }
 
 
+func (diff *FileDiff) Fork(n int) []core.PipelineItem {
+	diffs := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		diffs[i] = diff
+	}
+	return diffs
+}
+
+func (diff *FileDiff) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
 // CountLines returns the number of lines in a *object.Blob.
 // CountLines returns the number of lines in a *object.Blob.
 func CountLines(file *object.Blob) (int, error) {
 func CountLines(file *object.Blob) (int, error) {
 	if file == nil {
 	if file == nil {

+ 9 - 0
internal/plumbing/diff_test.go

@@ -273,3 +273,12 @@ func TestFileDiffDarkMagic(t *testing.T) {
 	assert.Equal(t, magicDiffs.OldLinesOfCode, plainDiffs.OldLinesOfCode)
 	assert.Equal(t, magicDiffs.OldLinesOfCode, plainDiffs.OldLinesOfCode)
 	assert.Equal(t, magicDiffs.NewLinesOfCode, plainDiffs.NewLinesOfCode)
 	assert.Equal(t, magicDiffs.NewLinesOfCode, plainDiffs.NewLinesOfCode)
 }
 }
+
+func TestFileDiffFork(t *testing.T) {
+	fd1 := fixtures.FileDiff()
+	clones := fd1.Fork(1)
+	assert.Len(t, clones, 1)
+	fd2 := clones[0].(*items.FileDiff)
+	assert.True(t, fd1 == fd2)
+	fd1.Merge([]core.PipelineItem{fd2})
+}

+ 41 - 29
internal/plumbing/identity/identity.go

@@ -15,7 +15,7 @@ import (
 // signatures, and we apply some heuristics to merge those together.
 // signatures, and we apply some heuristics to merge those together.
 // It is a PipelineItem.
 // It is a PipelineItem.
 type Detector struct {
 type Detector struct {
-	// PeopleDict maps email || name  -> developer id.
+	// PeopleDict maps email || name  -> developer id
 	PeopleDict map[string]int
 	PeopleDict map[string]int
 	// ReversedPeopleDict maps developer id -> description
 	// ReversedPeopleDict maps developer id -> description
 	ReversedPeopleDict []string
 	ReversedPeopleDict []string
@@ -49,14 +49,14 @@ const (
 )
 )
 
 
 // Name of this PipelineItem. Uniquely identifies the type, used for mapping keys, etc.
 // Name of this PipelineItem. Uniquely identifies the type, used for mapping keys, etc.
-func (id *Detector) Name() string {
+func (detector *Detector) Name() string {
 	return "IdentityDetector"
 	return "IdentityDetector"
 }
 }
 
 
 // Provides returns the list of names of entities which are produced by this PipelineItem.
 // Provides returns the list of names of entities which are produced by this PipelineItem.
 // Each produced entity will be inserted into `deps` of dependent Consume()-s according
 // Each produced entity will be inserted into `deps` of dependent Consume()-s according
 // to this list. Also used by core.Registry to build the global map of providers.
 // to this list. Also used by core.Registry to build the global map of providers.
-func (id *Detector) Provides() []string {
+func (detector *Detector) Provides() []string {
 	arr := [...]string{DependencyAuthor}
 	arr := [...]string{DependencyAuthor}
 	return arr[:]
 	return arr[:]
 }
 }
@@ -64,12 +64,12 @@ func (id *Detector) Provides() []string {
 // Requires returns the list of names of entities which are needed by this PipelineItem.
 // Requires returns the list of names of entities which are needed by this PipelineItem.
 // Each requested entity will be inserted into `deps` of Consume(). In turn, those
 // Each requested entity will be inserted into `deps` of Consume(). In turn, those
 // entities are Provides() upstream.
 // entities are Provides() upstream.
-func (id *Detector) Requires() []string {
+func (detector *Detector) Requires() []string {
 	return []string{}
 	return []string{}
 }
 }
 
 
 // ListConfigurationOptions returns the list of changeable public properties of this PipelineItem.
 // ListConfigurationOptions returns the list of changeable public properties of this PipelineItem.
-func (id *Detector) ListConfigurationOptions() []core.ConfigurationOption {
+func (detector *Detector) ListConfigurationOptions() []core.ConfigurationOption {
 	options := [...]core.ConfigurationOption{{
 	options := [...]core.ConfigurationOption{{
 		Name:        ConfigIdentityDetectorPeopleDictPath,
 		Name:        ConfigIdentityDetectorPeopleDictPath,
 		Description: "Path to the developers' email associations.",
 		Description: "Path to the developers' email associations.",
@@ -81,48 +81,48 @@ func (id *Detector) ListConfigurationOptions() []core.ConfigurationOption {
 }
 }
 
 
 // Configure sets the properties previously published by ListConfigurationOptions().
 // Configure sets the properties previously published by ListConfigurationOptions().
-func (id *Detector) Configure(facts map[string]interface{}) {
+func (detector *Detector) Configure(facts map[string]interface{}) {
 	if val, exists := facts[FactIdentityDetectorPeopleDict].(map[string]int); exists {
 	if val, exists := facts[FactIdentityDetectorPeopleDict].(map[string]int); exists {
-		id.PeopleDict = val
+		detector.PeopleDict = val
 	}
 	}
 	if val, exists := facts[FactIdentityDetectorReversedPeopleDict].([]string); exists {
 	if val, exists := facts[FactIdentityDetectorReversedPeopleDict].([]string); exists {
-		id.ReversedPeopleDict = val
+		detector.ReversedPeopleDict = val
 	}
 	}
-	if id.PeopleDict == nil || id.ReversedPeopleDict == nil {
+	if detector.PeopleDict == nil || detector.ReversedPeopleDict == nil {
 		peopleDictPath, _ := facts[ConfigIdentityDetectorPeopleDictPath].(string)
 		peopleDictPath, _ := facts[ConfigIdentityDetectorPeopleDictPath].(string)
 		if peopleDictPath != "" {
 		if peopleDictPath != "" {
-			id.LoadPeopleDict(peopleDictPath)
-			facts[FactIdentityDetectorPeopleCount] = len(id.ReversedPeopleDict) - 1
+			detector.LoadPeopleDict(peopleDictPath)
+			facts[FactIdentityDetectorPeopleCount] = len(detector.ReversedPeopleDict) - 1
 		} else {
 		} else {
 			if _, exists := facts[core.ConfigPipelineCommits]; !exists {
 			if _, exists := facts[core.ConfigPipelineCommits]; !exists {
 				panic("IdentityDetector needs a list of commits to initialize.")
 				panic("IdentityDetector needs a list of commits to initialize.")
 			}
 			}
-			id.GeneratePeopleDict(facts[core.ConfigPipelineCommits].([]*object.Commit))
-			facts[FactIdentityDetectorPeopleCount] = len(id.ReversedPeopleDict)
+			detector.GeneratePeopleDict(facts[core.ConfigPipelineCommits].([]*object.Commit))
+			facts[FactIdentityDetectorPeopleCount] = len(detector.ReversedPeopleDict)
 		}
 		}
 	} else {
 	} else {
-		facts[FactIdentityDetectorPeopleCount] = len(id.ReversedPeopleDict)
+		facts[FactIdentityDetectorPeopleCount] = len(detector.ReversedPeopleDict)
 	}
 	}
-	facts[FactIdentityDetectorPeopleDict] = id.PeopleDict
-	facts[FactIdentityDetectorReversedPeopleDict] = id.ReversedPeopleDict
+	facts[FactIdentityDetectorPeopleDict] = detector.PeopleDict
+	facts[FactIdentityDetectorReversedPeopleDict] = detector.ReversedPeopleDict
 }
 }
 
 
 // Initialize resets the temporary caches and prepares this PipelineItem for a series of Consume()
 // Initialize resets the temporary caches and prepares this PipelineItem for a series of Consume()
 // calls. The repository which is going to be analysed is supplied as an argument.
 // calls. The repository which is going to be analysed is supplied as an argument.
-func (id *Detector) Initialize(repository *git.Repository) {
+func (detector *Detector) Initialize(repository *git.Repository) {
 }
 }
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
-func (id *Detector) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
+func (detector *Detector) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	signature := commit.Author
 	signature := commit.Author
-	authorID, exists := id.PeopleDict[strings.ToLower(signature.Email)]
+	authorID, exists := detector.PeopleDict[strings.ToLower(signature.Email)]
 	if !exists {
 	if !exists {
-		authorID, exists = id.PeopleDict[strings.ToLower(signature.Name)]
+		authorID, exists = detector.PeopleDict[strings.ToLower(signature.Name)]
 		if !exists {
 		if !exists {
 			authorID = AuthorMissing
 			authorID = AuthorMissing
 		}
 		}
@@ -130,10 +130,22 @@ func (id *Detector) Consume(deps map[string]interface{}) (map[string]interface{}
 	return map[string]interface{}{DependencyAuthor: authorID}, nil
 	return map[string]interface{}{DependencyAuthor: authorID}, nil
 }
 }
 
 
+func (detector *Detector) Fork(n int) []core.PipelineItem {
+	detectors := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		// we are safe to share the same dictionaries across branches
+		detectors[i] = detector
+	}
+	return detectors
+}
+
+func (detector *Detector) Merge(branches []core.PipelineItem) {
+}
+
 // LoadPeopleDict loads author signatures from a text file.
 // LoadPeopleDict loads author signatures from a text file.
 // The format is one signature per line, and the signature consists of several
 // The format is one signature per line, and the signature consists of several
 // keys separated by "|". The first key is the main one and used to reference all the rest.
 // keys separated by "|". The first key is the main one and used to reference all the rest.
-func (id *Detector) LoadPeopleDict(path string) error {
+func (detector *Detector) LoadPeopleDict(path string) error {
 	file, err := os.Open(path)
 	file, err := os.Open(path)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -152,13 +164,13 @@ func (id *Detector) LoadPeopleDict(path string) error {
 		size++
 		size++
 	}
 	}
 	reverseDict = append(reverseDict, AuthorMissingName)
 	reverseDict = append(reverseDict, AuthorMissingName)
-	id.PeopleDict = dict
-	id.ReversedPeopleDict = reverseDict
+	detector.PeopleDict = dict
+	detector.ReversedPeopleDict = reverseDict
 	return nil
 	return nil
 }
 }
 
 
 // GeneratePeopleDict loads author signatures from the specified list of Git commits.
 // GeneratePeopleDict loads author signatures from the specified list of Git commits.
-func (id *Detector) GeneratePeopleDict(commits []*object.Commit) {
+func (detector *Detector) GeneratePeopleDict(commits []*object.Commit) {
 	dict := map[string]int{}
 	dict := map[string]int{}
 	emails := map[int][]string{}
 	emails := map[int][]string{}
 	names := map[int][]string{}
 	names := map[int][]string{}
@@ -249,12 +261,12 @@ func (id *Detector) GeneratePeopleDict(commits []*object.Commit) {
 		sort.Strings(emails[val])
 		sort.Strings(emails[val])
 		reverseDict[val] = strings.Join(names[val], "|") + "|" + strings.Join(emails[val], "|")
 		reverseDict[val] = strings.Join(names[val], "|") + "|" + strings.Join(emails[val], "|")
 	}
 	}
-	id.PeopleDict = dict
-	id.ReversedPeopleDict = reverseDict
+	detector.PeopleDict = dict
+	detector.ReversedPeopleDict = reverseDict
 }
 }
 
 
 // MergeReversedDicts joins two identity lists together, excluding duplicates, in-order.
 // MergeReversedDicts joins two identity lists together, excluding duplicates, in-order.
-func (id Detector) MergeReversedDicts(rd1, rd2 []string) (map[string][3]int, []string) {
+func (detector Detector) MergeReversedDicts(rd1, rd2 []string) (map[string][3]int, []string) {
 	people := map[string][3]int{}
 	people := map[string][3]int{}
 	for i, pid := range rd1 {
 	for i, pid := range rd1 {
 		ptrs := people[pid]
 		ptrs := people[pid]

+ 11 - 2
internal/plumbing/identity/identity_test.go

@@ -133,13 +133,13 @@ func TestIdentityDetectorConsume(t *testing.T) {
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"5c0e755dd85ac74584d9988cc361eccf02ce1a48"))
 		"5c0e755dd85ac74584d9988cc361eccf02ce1a48"))
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	res, err := fixtureIdentityDetector().Consume(deps)
 	res, err := fixtureIdentityDetector().Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyAuthor].(int), 0)
 	assert.Equal(t, res[DependencyAuthor].(int), 0)
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ = test.Repository.CommitObject(plumbing.NewHash(
 		"8a03b5620b1caa72ec9cb847ea88332621e2950a"))
 		"8a03b5620b1caa72ec9cb847ea88332621e2950a"))
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	res, err = fixtureIdentityDetector().Consume(deps)
 	res, err = fixtureIdentityDetector().Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, res[DependencyAuthor].(int), AuthorMissing)
 	assert.Equal(t, res[DependencyAuthor].(int), AuthorMissing)
@@ -392,3 +392,12 @@ func TestIdentityDetectorMergeReversedDicts(t *testing.T) {
 	vm = [...]string{"two", "one", "three"}
 	vm = [...]string{"two", "one", "three"}
 	assert.Equal(t, merged, vm[:])
 	assert.Equal(t, merged, vm[:])
 }
 }
+
+func TestIdentityDetectorFork(t *testing.T) {
+	id1 := fixtureIdentityDetector()
+	clones := id1.Fork(1)
+	assert.Len(t, clones, 1)
+	id2 := clones[0].(*Detector)
+	assert.True(t, id1 == id2)
+	id1.Merge([]core.PipelineItem{id2})
+}

+ 13 - 1
internal/plumbing/renames.go

@@ -89,7 +89,7 @@ func (ra *RenameAnalysis) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (ra *RenameAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (ra *RenameAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -203,6 +203,18 @@ func (ra *RenameAnalysis) Consume(deps map[string]interface{}) (map[string]inter
 	return map[string]interface{}{DependencyTreeChanges: reducedChanges}, nil
 	return map[string]interface{}{DependencyTreeChanges: reducedChanges}, nil
 }
 }
 
 
+func (ra *RenameAnalysis) Fork(n int) []core.PipelineItem {
+	clones := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		clones[i] = ra
+	}
+	return clones
+}
+
+func (ra *RenameAnalysis) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
 func (ra *RenameAnalysis) sizesAreClose(size1 int64, size2 int64) bool {
 func (ra *RenameAnalysis) sizesAreClose(size1 int64, size2 int64) bool {
 	return internal.Abs64(size1-size2)*100/internal.Max64(1, internal.Min64(size1, size2)) <=
 	return internal.Abs64(size1-size2)*100/internal.Max64(1, internal.Min64(size1, size2)) <=
 		int64(100-ra.SimilarityThreshold)
 		int64(100-ra.SimilarityThreshold)

+ 9 - 0
internal/plumbing/renames_test.go

@@ -164,3 +164,12 @@ func TestSortableBlobs(t *testing.T) {
 	assert.Equal(t, blobs[0].size, int64(1))
 	assert.Equal(t, blobs[0].size, int64(1))
 	assert.Equal(t, blobs[1].size, int64(0))
 	assert.Equal(t, blobs[1].size, int64(0))
 }
 }
+
+func TestRenameAnalysisFork(t *testing.T) {
+	ra1 := fixtureRenameAnalysis()
+	clones := ra1.Fork(1)
+	assert.Len(t, clones, 1)
+	ra2 := clones[0].(*RenameAnalysis)
+	assert.True(t, ra1 == ra2)
+	ra1.Merge([]core.PipelineItem{ra2})
+}

+ 17 - 2
internal/plumbing/tree_diff.go

@@ -83,11 +83,11 @@ func (treediff *TreeDiff) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (treediff *TreeDiff) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (treediff *TreeDiff) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	tree, err := commit.Tree()
 	tree, err := commit.Tree()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
@@ -141,6 +141,21 @@ func (treediff *TreeDiff) Consume(deps map[string]interface{}) (map[string]inter
 	return map[string]interface{}{DependencyTreeChanges: diff}, nil
 	return map[string]interface{}{DependencyTreeChanges: diff}, nil
 }
 }
 
 
+func (treediff *TreeDiff) Fork(n int) []core.PipelineItem {
+	clones := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		clones[i] = &TreeDiff{
+			SkipDirs: treediff.SkipDirs,
+			previousTree: treediff.previousTree,
+		}
+	}
+	return clones
+}
+
+func (treediff *TreeDiff) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
 func init() {
 func init() {
 	core.Registry.Register(&TreeDiff{})
 	core.Registry.Register(&TreeDiff{})
 }
 }

+ 16 - 4
internal/plumbing/tree_diff_test.go

@@ -46,7 +46,7 @@ func TestTreeDiffConsume(t *testing.T) {
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	prevCommit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	prevCommit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"fbe766ffdc3f87f6affddc051c6f8b419beea6a2"))
 		"fbe766ffdc3f87f6affddc051c6f8b419beea6a2"))
 	td.previousTree, _ = prevCommit.Tree()
 	td.previousTree, _ = prevCommit.Tree()
@@ -87,7 +87,7 @@ func TestTreeDiffConsumeFirst(t *testing.T) {
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	res, err := td.Consume(deps)
 	res, err := td.Consume(deps)
 	assert.Nil(t, err)
 	assert.Nil(t, err)
 	assert.Equal(t, len(res), 1)
 	assert.Equal(t, len(res), 1)
@@ -106,7 +106,7 @@ func TestTreeDiffBadCommit(t *testing.T) {
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 	commit.TreeHash = plumbing.NewHash("0000000000000000000000000000000000000000")
 	commit.TreeHash = plumbing.NewHash("0000000000000000000000000000000000000000")
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	res, err := td.Consume(deps)
 	res, err := td.Consume(deps)
 	assert.Nil(t, res)
 	assert.Nil(t, res)
 	assert.NotNil(t, err)
 	assert.NotNil(t, err)
@@ -118,7 +118,7 @@ func TestTreeDiffConsumeSkip(t *testing.T) {
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"aefdedf7cafa6ee110bae9a3910bf5088fdeb5a9"))
 		"aefdedf7cafa6ee110bae9a3910bf5088fdeb5a9"))
 	deps := map[string]interface{}{}
 	deps := map[string]interface{}{}
-	deps["commit"] = commit
+	deps[core.DependencyCommit] = commit
 	prevCommit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	prevCommit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"1e076dc56989bc6aa1ef5f55901696e9e01423d4"))
 		"1e076dc56989bc6aa1ef5f55901696e9e01423d4"))
 	td.previousTree, _ = prevCommit.Tree()
 	td.previousTree, _ = prevCommit.Tree()
@@ -141,3 +141,15 @@ func TestTreeDiffConsumeSkip(t *testing.T) {
 	changes = res[DependencyTreeChanges].(object.Changes)
 	changes = res[DependencyTreeChanges].(object.Changes)
 	assert.Equal(t, 31, len(changes))
 	assert.Equal(t, 31, len(changes))
 }
 }
+
+func TestTreeDiffFork(t *testing.T) {
+	td1 := fixtureTreeDiff()
+	td1.SkipDirs = append(td1.SkipDirs, "skip")
+	clones := td1.Fork(1)
+	assert.Len(t, clones, 1)
+	td2 := clones[0].(*TreeDiff)
+	assert.False(t, td1 == td2)
+	assert.Equal(t, td1.SkipDirs, td2.SkipDirs)
+	assert.Equal(t, td1.previousTree, td2.previousTree)
+	td1.Merge([]core.PipelineItem{td2})
+}

+ 13 - 1
internal/plumbing/uast/diff_refiner.go

@@ -59,7 +59,7 @@ func (ref *FileDiffRefiner) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (ref *FileDiffRefiner) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (ref *FileDiffRefiner) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -161,6 +161,18 @@ func (ref *FileDiffRefiner) Consume(deps map[string]interface{}) (map[string]int
 	return map[string]interface{}{plumbing.DependencyFileDiff: result}, nil
 	return map[string]interface{}{plumbing.DependencyFileDiff: result}, nil
 }
 }
 
 
+func (ref *FileDiffRefiner) Fork(n int) []core.PipelineItem {
+	refs := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		refs[i] = ref
+	}
+	return refs
+}
+
+func (ref *FileDiffRefiner) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
 // VisitEachNode is a handy routine to execute a callback on every node in the subtree,
 // VisitEachNode is a handy routine to execute a callback on every node in the subtree,
 // including the root itself. Depth first tree traversal.
 // including the root itself. Depth first tree traversal.
 func VisitEachNode(root *uast.Node, payload func(*uast.Node)) {
 func VisitEachNode(root *uast.Node, payload func(*uast.Node)) {

+ 9 - 0
internal/plumbing/uast/diff_refiner_test.go

@@ -153,3 +153,12 @@ func TestFileDiffRefinerConsumeNoUast(t *testing.T) {
 	assert.Len(t, result, 1)
 	assert.Len(t, result, 1)
 	assert.Equal(t, fileDiffs[fileName], result[fileName])
 	assert.Equal(t, fileDiffs[fileName], result[fileName])
 }
 }
+
+func TestFileDiffRefinerFork(t *testing.T) {
+	fd1 := fixtureFileDiffRefiner()
+	clones := fd1.Fork(1)
+	assert.Len(t, clones, 1)
+	fd2 := clones[0].(*FileDiffRefiner)
+	assert.True(t, fd1 == fd2)
+	fd1.Merge([]core.PipelineItem{fd2})
+}

+ 45 - 3
internal/plumbing/uast/uast.go

@@ -216,7 +216,7 @@ func (exr *Extractor) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (exr *Extractor) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (exr *Extractor) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -287,6 +287,18 @@ func (exr *Extractor) Consume(deps map[string]interface{}) (map[string]interface
 	return map[string]interface{}{DependencyUasts: uasts}, nil
 	return map[string]interface{}{DependencyUasts: uasts}, nil
 }
 }
 
 
+func (exr *Extractor) Fork(n int) []core.PipelineItem {
+	exrs := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		exrs[i] = exr
+	}
+	return exrs
+}
+
+func (exr *Extractor) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
 func (exr *Extractor) extractUAST(
 func (exr *Extractor) extractUAST(
 	client *bblfsh.Client, file *object.File) (*uast.Node, error) {
 	client *bblfsh.Client, file *object.File) (*uast.Node, error) {
 	request := client.NewParseRequest()
 	request := client.NewParseRequest()
@@ -393,7 +405,7 @@ func (uc *Changes) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (uc *Changes) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (uc *Changes) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -427,6 +439,24 @@ func (uc *Changes) Consume(deps map[string]interface{}) (map[string]interface{},
 	return map[string]interface{}{DependencyUastChanges: commit}, nil
 	return map[string]interface{}{DependencyUastChanges: commit}, nil
 }
 }
 
 
+func (uc *Changes) Fork(n int) []core.PipelineItem {
+	ucs := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		clone := &Changes{
+			cache: map[plumbing.Hash]*uast.Node{},
+		}
+		for key, val := range uc.cache {
+			clone.cache[key] = val
+		}
+		ucs[i] = clone
+	}
+	return ucs
+}
+
+func (uc *Changes) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
 // ChangesSaver dumps changed files and corresponding UASTs for every commit.
 // ChangesSaver dumps changed files and corresponding UASTs for every commit.
 // it is a LeafPipelineItem.
 // it is a LeafPipelineItem.
 type ChangesSaver struct {
 type ChangesSaver struct {
@@ -502,7 +532,7 @@ func (saver *ChangesSaver) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (saver *ChangesSaver) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (saver *ChangesSaver) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
@@ -516,6 +546,18 @@ func (saver *ChangesSaver) Finalize() interface{} {
 	return saver.result
 	return saver.result
 }
 }
 
 
+func (saver *ChangesSaver) Fork(n int) []core.PipelineItem {
+	savers := make([]core.PipelineItem, n)
+	for i := 0; i < n; i++ {
+		savers[i] = saver
+	}
+	return savers
+}
+
+func (saver *ChangesSaver) Merge(branches []core.PipelineItem) {
+	// no-op
+}
+
 // Serialize converts the analysis result as returned by Finalize() to text or bytes.
 // Serialize converts the analysis result as returned by Finalize() to text or bytes.
 // The text format is YAML and the bytes format is Protocol Buffers.
 // The text format is YAML and the bytes format is Protocol Buffers.
 func (saver *ChangesSaver) Serialize(result interface{}, binary bool, writer io.Writer) error {
 func (saver *ChangesSaver) Serialize(result interface{}, binary bool, writer io.Writer) error {

+ 31 - 0
internal/plumbing/uast/uast_test.go

@@ -154,6 +154,15 @@ func TestUASTExtractorConsume(t *testing.T) {
 	assert.Equal(t, len(uasts[hash].Children), 24)
 	assert.Equal(t, len(uasts[hash].Children), 24)
 }
 }
 
 
+func TestUASTExtractorFork(t *testing.T) {
+	exr1 := fixtureUASTExtractor()
+	clones := exr1.Fork(1)
+	assert.Len(t, clones, 1)
+	exr2 := clones[0].(*Extractor)
+	assert.True(t, exr1 == exr2)
+	exr1.Merge([]core.PipelineItem{exr2})
+}
+
 func fixtureUASTChanges() *Changes {
 func fixtureUASTChanges() *Changes {
 	ch := Changes{}
 	ch := Changes{}
 	ch.Configure(nil)
 	ch.Configure(nil)
@@ -271,6 +280,19 @@ func TestUASTChangesConsume(t *testing.T) {
 	assert.Nil(t, result[2].After)
 	assert.Nil(t, result[2].After)
 }
 }
 
 
+func TestUASTChangesFork(t *testing.T) {
+	changes1 := fixtureUASTChanges()
+	changes1.cache[plumbing.ZeroHash] = nil
+	clones := changes1.Fork(1)
+	assert.Len(t, clones, 1)
+	changes2 := clones[0].(*Changes)
+	assert.False(t, changes1 == changes2)
+	assert.Equal(t, changes1.cache, changes2.cache)
+	delete(changes1.cache, plumbing.ZeroHash)
+	assert.Len(t, changes2.cache, 1)
+	changes1.Merge([]core.PipelineItem{changes2})
+}
+
 func fixtureUASTChangesSaver() *ChangesSaver {
 func fixtureUASTChangesSaver() *ChangesSaver {
 	ch := ChangesSaver{}
 	ch := ChangesSaver{}
 	ch.Initialize(test.Repository)
 	ch.Initialize(test.Repository)
@@ -388,3 +410,12 @@ func TestUASTChangesSaverPayload(t *testing.T) {
 `, tmpdir, tmpdir, tmpdir, tmpdir))
 `, tmpdir, tmpdir, tmpdir, tmpdir))
 	checkFiles()
 	checkFiles()
 }
 }
+
+func TestUASTChangesSaverFork(t *testing.T) {
+	saver1 := fixtureUASTChangesSaver()
+	clones := saver1.Fork(1)
+	assert.Len(t, clones, 1)
+	saver2 := clones[0].(*ChangesSaver)
+	assert.True(t, saver1 == saver2)
+	saver1.Merge([]core.PipelineItem{saver2})
+}

+ 27 - 1
internal/test/repository.go

@@ -8,6 +8,8 @@ import (
 	"gopkg.in/src-d/go-git.v4/plumbing"
 	"gopkg.in/src-d/go-git.v4/plumbing"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
 	"gopkg.in/src-d/go-git.v4/plumbing/object"
 	"gopkg.in/src-d/go-git.v4/storage/memory"
 	"gopkg.in/src-d/go-git.v4/storage/memory"
+	"path"
+	"io/ioutil"
 )
 )
 
 
 // Repository is a boilerplate sample repository (Hercules itself).
 // Repository is a boilerplate sample repository (Hercules itself).
@@ -28,6 +30,27 @@ func FakeChangeForName(name string, hashFrom string, hashTo string) *object.Chan
 func init() {
 func init() {
 	cwd, err := os.Getwd()
 	cwd, err := os.Getwd()
 	if err == nil {
 	if err == nil {
+		for true {
+			files, err := ioutil.ReadDir(cwd)
+			if err != nil {
+				break
+			}
+			found := false
+			for _, f := range files {
+				if f.Name() == "README.md" {
+					found = true
+					break
+				}
+			}
+			if found {
+				break
+			}
+			oldCwd := cwd
+			cwd = path.Dir(cwd)
+			if oldCwd == cwd {
+				break
+			}
+		}
 		Repository, err = git.PlainOpen(cwd)
 		Repository, err = git.PlainOpen(cwd)
 		if err == nil {
 		if err == nil {
 			iter, err := Repository.CommitObjects()
 			iter, err := Repository.CommitObjects()
@@ -45,7 +68,10 @@ func init() {
 			}
 			}
 		}
 		}
 	}
 	}
-	Repository, _ = git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
+	Repository, err = git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
 		URL: "https://github.com/src-d/hercules",
 		URL: "https://github.com/src-d/hercules",
 	})
 	})
+	if err != nil {
+		panic(err)
+	}
 }
 }

+ 60 - 18
leaves/burndown.go

@@ -120,7 +120,7 @@ const (
 	DefaultBurndownGranularity = 30
 	DefaultBurndownGranularity = 30
 	// authorSelf is the internal author index which is used in BurndownAnalysis.Finalize() to
 	// authorSelf is the internal author index which is used in BurndownAnalysis.Finalize() to
 	// format the author overwrites matrix.
 	// format the author overwrites matrix.
-	authorSelf = (1 << 18) - 2
+	authorSelf = (1 << (32 - burndown.TreeMaxBinPower)) - 2
 )
 )
 
 
 // Name of this PipelineItem. Uniquely identifies the type, used for mapping keys, etc.
 // Name of this PipelineItem. Uniquely identifies the type, used for mapping keys, etc.
@@ -238,21 +238,20 @@ func (analyser *BurndownAnalysis) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (analyser *BurndownAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (analyser *BurndownAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	sampling := analyser.Sampling
-	if sampling == 0 {
-		sampling = 1
-	}
+	commit := deps[core.DependencyCommit].(*object.Commit)
 	author := deps[identity.DependencyAuthor].(int)
 	author := deps[identity.DependencyAuthor].(int)
-	analyser.day = deps[items.DependencyDay].(int)
-	delta := (analyser.day / sampling) - (analyser.previousDay / sampling)
-	if delta > 0 {
-		analyser.previousDay = analyser.day
-		gs, fss, pss := analyser.groupStatus()
-		analyser.updateHistories(gs, fss, pss, delta)
+	day := deps[items.DependencyDay].(int)
+	if len(commit.ParentHashes) == 1 {
+		analyser.day = day
+		analyser.onNewDay()
+	} else {
+		// effectively disables the status updates if the commit is a merge
+		// we will analyse the conflicts resolution in Merge()
+		analyser.day = burndown.TreeMergeMark
 	}
 	}
 	cache := deps[items.DependencyBlobCache].(map[plumbing.Hash]*object.Blob)
 	cache := deps[items.DependencyBlobCache].(map[plumbing.Hash]*object.Blob)
 	treeDiffs := deps[items.DependencyTreeChanges].(object.Changes)
 	treeDiffs := deps[items.DependencyTreeChanges].(object.Changes)
@@ -272,13 +271,44 @@ func (analyser *BurndownAnalysis) Consume(deps map[string]interface{}) (map[stri
 			return nil, err
 			return nil, err
 		}
 		}
 	}
 	}
+	// in case there is a merge analyser.day equals to TreeMergeMark
+	analyser.day = day
 	return nil, nil
 	return nil, nil
 }
 }
 
 
+func (analyser *BurndownAnalysis) Fork(n int) []core.PipelineItem {
+	result := make([]core.PipelineItem, n)
+	for i := range result {
+		clone := *analyser
+		clone.files = map[string]*burndown.File{}
+		for key, file := range analyser.files {
+			clone.files[key] = file.Clone(false)
+		}
+		result[i] = &clone
+	}
+	return result
+}
+
+func (analyser *BurndownAnalysis) Merge(branches []core.PipelineItem) {
+	for key, file := range analyser.files {
+		others := make([]*burndown.File, len(branches))
+		for i, branch := range branches {
+			others[i] = branch.(*BurndownAnalysis).files[key]
+		}
+		// don't worry, we compare the hashes first before heavy-lifting
+		if file.Merge(analyser.day, others...) {
+			for _, branch := range branches {
+				branch.(*BurndownAnalysis).files[key] = file.Clone(false)
+			}
+		}
+	}
+	analyser.onNewDay()
+}
+
 // Finalize returns the result of the analysis. Further Consume() calls are not expected.
 // Finalize returns the result of the analysis. Further Consume() calls are not expected.
 func (analyser *BurndownAnalysis) Finalize() interface{} {
 func (analyser *BurndownAnalysis) Finalize() interface{} {
 	gs, fss, pss := analyser.groupStatus()
 	gs, fss, pss := analyser.groupStatus()
-	analyser.updateHistories(gs, fss, pss, 1)
+	analyser.updateHistories(1, gs, fss, pss)
 	for key, statuses := range analyser.fileHistories {
 	for key, statuses := range analyser.fileHistories {
 		if len(statuses) == len(analyser.globalHistory) {
 		if len(statuses) == len(analyser.globalHistory) {
 			continue
 			continue
@@ -799,9 +829,10 @@ func (analyser *BurndownAnalysis) packPersonWithDay(person int, day int) int {
 	if analyser.PeopleNumber == 0 {
 	if analyser.PeopleNumber == 0 {
 		return day
 		return day
 	}
 	}
-	result := day
-	result |= person << 14
-	// This effectively means max 16384 days (>44 years) and (131072 - 2) devs
+	result := day & burndown.TreeMergeMark
+	result |= person << burndown.TreeMaxBinPower
+	// This effectively means max (16383 - 1) days (>44 years) and (131072 - 2) devs.
+	// One day less because burndown.TreeMergeMark = ((1 << 14) - 1) is a special day.
 	return result
 	return result
 }
 }
 
 
@@ -809,7 +840,18 @@ func (analyser *BurndownAnalysis) unpackPersonWithDay(value int) (int, int) {
 	if analyser.PeopleNumber == 0 {
 	if analyser.PeopleNumber == 0 {
 		return identity.AuthorMissing, value
 		return identity.AuthorMissing, value
 	}
 	}
-	return value >> 14, value & 0x3FFF
+	return value >> burndown.TreeMaxBinPower, value & burndown.TreeMergeMark
+}
+
+func (analyser *BurndownAnalysis) onNewDay() {
+	day := analyser.day
+	sampling := analyser.Sampling
+	delta := (day / sampling) - (analyser.previousDay / sampling)
+	if delta > 0 {
+		analyser.previousDay = day
+		gs, fss, pss := analyser.groupStatus()
+		analyser.updateHistories(delta, gs, fss, pss)
+	}
 }
 }
 
 
 func (analyser *BurndownAnalysis) updateStatus(
 func (analyser *BurndownAnalysis) updateStatus(
@@ -1092,7 +1134,7 @@ func (analyser *BurndownAnalysis) groupStatus() ([]int64, map[string][]int64, []
 }
 }
 
 
 func (analyser *BurndownAnalysis) updateHistories(
 func (analyser *BurndownAnalysis) updateHistories(
-	globalStatus []int64, fileStatuses map[string][]int64, peopleStatuses [][]int64, delta int) {
+	delta int, globalStatus []int64, fileStatuses map[string][]int64, peopleStatuses [][]int64) {
 	for i := 0; i < delta; i++ {
 	for i := 0; i < delta; i++ {
 		analyser.globalHistory = append(analyser.globalHistory, globalStatus)
 		analyser.globalHistory = append(analyser.globalHistory, globalStatus)
 	}
 	}

+ 1 - 1
leaves/comment_sentiment.go

@@ -145,7 +145,7 @@ func (sent *CommentSentimentAnalysis) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (sent *CommentSentimentAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (sent *CommentSentimentAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {

+ 1 - 1
leaves/couples.go

@@ -96,7 +96,7 @@ func (couples *CouplesAnalysis) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (couples *CouplesAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (couples *CouplesAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {

+ 2 - 2
leaves/file_history.go

@@ -69,11 +69,11 @@ func (history *FileHistory) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (history *FileHistory) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (history *FileHistory) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit).Hash
+	commit := deps[DependencyCommit].(*object.Commit).Hash
 	changes := deps[items.DependencyTreeChanges].(object.Changes)
 	changes := deps[items.DependencyTreeChanges].(object.Changes)
 	for _, change := range changes {
 	for _, change := range changes {
 		action, _ := change.Action()
 		action, _ := change.Action()

+ 3 - 3
leaves/file_history_test.go

@@ -93,7 +93,7 @@ func TestFileHistoryConsume(t *testing.T) {
 	deps[items.DependencyTreeChanges] = changes
 	deps[items.DependencyTreeChanges] = changes
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
-	deps["commit"] = commit
+	deps[DependencyCommit] = commit
 	fh.files["cmd/hercules/main.go"] = []plumbing.Hash{plumbing.NewHash(
 	fh.files["cmd/hercules/main.go"] = []plumbing.Hash{plumbing.NewHash(
 		"0000000000000000000000000000000000000000")}
 		"0000000000000000000000000000000000000000")}
 	fh.files["analyser.go"] = []plumbing.Hash{plumbing.NewHash(
 	fh.files["analyser.go"] = []plumbing.Hash{plumbing.NewHash(
@@ -132,7 +132,7 @@ func TestFileHistorySerializeText(t *testing.T) {
 	deps[items.DependencyTreeChanges] = changes
 	deps[items.DependencyTreeChanges] = changes
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
-	deps["commit"] = commit
+	deps[DependencyCommit] = commit
 	fh.Consume(deps)
 	fh.Consume(deps)
 	res := fh.Finalize().(FileHistoryResult)
 	res := fh.Finalize().(FileHistoryResult)
 	buffer := &bytes.Buffer{}
 	buffer := &bytes.Buffer{}
@@ -159,7 +159,7 @@ func TestFileHistorySerializeBinary(t *testing.T) {
 	deps[items.DependencyTreeChanges] = changes
 	deps[items.DependencyTreeChanges] = changes
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 	commit, _ := test.Repository.CommitObject(plumbing.NewHash(
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
 		"2b1ed978194a94edeabbca6de7ff3b5771d4d665"))
-	deps["commit"] = commit
+	deps[DependencyCommit] = commit
 	fh.Consume(deps)
 	fh.Consume(deps)
 	res := fh.Finalize().(FileHistoryResult)
 	res := fh.Finalize().(FileHistoryResult)
 	buffer := &bytes.Buffer{}
 	buffer := &bytes.Buffer{}

+ 2 - 2
leaves/shotness.go

@@ -142,11 +142,11 @@ func (shotness *ShotnessAnalysis) Initialize(repository *git.Repository) {
 
 
 // Consume runs this PipelineItem on the next commit data.
 // Consume runs this PipelineItem on the next commit data.
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
 // `deps` contain all the results from upstream PipelineItem-s as requested by Requires().
-// Additionally, "commit" is always present there and represents the analysed *object.Commit.
+// Additionally, DependencyCommit is always present there and represents the analysed *object.Commit.
 // This function returns the mapping with analysis results. The keys must be the same as
 // This function returns the mapping with analysis results. The keys must be the same as
 // in Provides(). If there was an error, nil is returned.
 // in Provides(). If there was an error, nil is returned.
 func (shotness *ShotnessAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
 func (shotness *ShotnessAnalysis) Consume(deps map[string]interface{}) (map[string]interface{}, error) {
-	commit := deps["commit"].(*object.Commit)
+	commit := deps[DependencyCommit].(*object.Commit)
 	changesList := deps[uast_items.DependencyUastChanges].([]uast_items.Change)
 	changesList := deps[uast_items.DependencyUastChanges].([]uast_items.Change)
 	diffs := deps[items.DependencyFileDiff].(map[string]items.FileDiffData)
 	diffs := deps[items.DependencyFileDiff].(map[string]items.FileDiffData)
 	allNodes := map[string]bool{}
 	allNodes := map[string]bool{}

+ 2 - 2
leaves/shotness_test.go

@@ -77,7 +77,7 @@ func bakeShotness(t *testing.T, eraseEndPosition bool) (*ShotnessAnalysis, Shotn
 	dmp := diffmatchpatch.New()
 	dmp := diffmatchpatch.New()
 	src, dst, _ := dmp.DiffLinesToRunes(string(bytes1), string(bytes2))
 	src, dst, _ := dmp.DiffLinesToRunes(string(bytes1), string(bytes2))
 	state := map[string]interface{}{}
 	state := map[string]interface{}{}
-	state["commit"] = &object.Commit{}
+	state[DependencyCommit] = &object.Commit{}
 	fileDiffs := map[string]items.FileDiffData{}
 	fileDiffs := map[string]items.FileDiffData{}
 	const fileName = "test.java"
 	const fileName = "test.java"
 	fileDiffs[fileName] = items.FileDiffData{
 	fileDiffs[fileName] = items.FileDiffData{
@@ -130,7 +130,7 @@ func TestShotnessConsume(t *testing.T) {
 	dmp := diffmatchpatch.New()
 	dmp := diffmatchpatch.New()
 	src, dst, _ := dmp.DiffLinesToRunes(string(bytes1), string(bytes2))
 	src, dst, _ := dmp.DiffLinesToRunes(string(bytes1), string(bytes2))
 	state := map[string]interface{}{}
 	state := map[string]interface{}{}
-	state["commit"] = &object.Commit{}
+	state[DependencyCommit] = &object.Commit{}
 	fileDiffs := map[string]items.FileDiffData{}
 	fileDiffs := map[string]items.FileDiffData{}
 	const fileName = "test.java"
 	const fileName = "test.java"
 	const newfileName = "new.java"
 	const newfileName = "new.java"