浏览代码

Merge pull request #151 from vmarkovtsev/master

Add --hibernation-distance
Vadim Markovtsev 6 年之前
父节点
当前提交
38302e1f36

+ 77 - 1
internal/core/forks.go

@@ -78,6 +78,10 @@ const (
 	runActionEmerge = iota
 	// runActionDelete removes the branch as it is no longer needed
 	runActionDelete = iota
+	// runActionHibernate preserves the items in the branch
+	runActionHibernate = iota
+	// runActionBoot does the opposite to runActionHibernate - recovers the original memory
+	runActionBoot = iota
 
 	// rootBranchIndex is the minimum branch index in the plan
 	rootBranchIndex = 1
@@ -134,7 +138,8 @@ func getMasterBranch(branches map[int][]PipelineItem) []PipelineItem {
 }
 
 // prepareRunPlan schedules the actions for Pipeline.Run().
-func prepareRunPlan(commits []*object.Commit, printResult bool) []runAction {
+func prepareRunPlan(commits []*object.Commit, hibernationDistance int,
+	printResult bool) []runAction {
 	hashes, dag := buildDag(commits)
 	leaveRootComponent(hashes, dag)
 	mergedDag, mergedSeq := mergeDag(hashes, dag)
@@ -151,6 +156,9 @@ func prepareRunPlan(commits []*object.Commit, printResult bool) []runAction {
 	fmt.Printf("}\n")*/
 	plan := generatePlan(orderNodes, hashes, mergedDag, dag, mergedSeq)
 	plan = collectGarbage(plan)
+	if hibernationDistance > 0 {
+		plan = insertHibernateBoot(plan, hibernationDistance)
+	}
 	if printResult {
 		for _, p := range plan {
 			firstItem := p.Items[0]
@@ -165,6 +173,10 @@ func prepareRunPlan(commits []*object.Commit, printResult bool) []runAction {
 				planPrintFunc("E", p.Items)
 			case runActionDelete:
 				planPrintFunc("D", p.Items)
+			case runActionHibernate:
+				planPrintFunc("H", firstItem)
+			case runActionBoot:
+				planPrintFunc("B", firstItem)
 			}
 		}
 	}
@@ -671,3 +683,67 @@ func collectGarbage(plan []runAction) []runAction {
 	}
 	return garbageCollectedPlan
 }
+
+type hbAction struct {
+	Branch    int
+	Hibernate bool
+}
+
+func insertHibernateBoot(plan []runAction, hibernationDistance int) []runAction {
+	addons := map[int][]hbAction{}
+	lastUsed := map[int]int{}
+	addonsCount := 0
+	for x, action := range plan {
+		if action.Action == runActionDelete {
+			continue
+		}
+		for _, item := range action.Items {
+			if i, exists := lastUsed[item]; exists && (x-i-1) > hibernationDistance {
+				if addons[x] == nil {
+					addons[x] = make([]hbAction, 0, 1)
+				}
+				addons[x] = append(addons[x], hbAction{item, false})
+				if addons[i] == nil {
+					addons[i] = make([]hbAction, 0, 1)
+				}
+				addons[i] = append(addons[i], hbAction{item, true})
+				addonsCount += 2
+			}
+			lastUsed[item] = x
+		}
+	}
+	newPlan := make([]runAction, 0, len(plan)+addonsCount)
+	for x, action := range plan {
+		xaddons := addons[x]
+		var boots []int
+		var hibernates []int
+		if len(xaddons) > 0 {
+			boots = make([]int, 0, len(xaddons))
+			hibernates = make([]int, 0, len(xaddons))
+			for _, addon := range xaddons {
+				if !addon.Hibernate {
+					boots = append(boots, addon.Branch)
+				} else {
+					hibernates = append(hibernates, addon.Branch)
+				}
+			}
+		}
+		if len(boots) > 0 {
+			newPlan = append(newPlan, runAction{
+				Action: runActionBoot,
+				Commit: action.Commit,
+				Items:  boots,
+			})
+		}
+		newPlan = append(newPlan, action)
+		if len(hibernates) > 0 {
+			newPlan = append(newPlan, runAction{
+				Action: runActionHibernate,
+				Commit: action.Commit,
+				Items:  hibernates,
+			})
+		}
+
+	}
+	return newPlan
+}

+ 33 - 0
internal/core/forks_test.go

@@ -67,3 +67,36 @@ func TestForkCopyPipelineItem(t *testing.T) {
 	assert.True(t, clone.Mutable[2])
 	assert.Equal(t, "before", clone.Immutable)
 }
+
+func TestInsertHibernateBoot(t *testing.T) {
+	plan := []runAction{
+		{runActionEmerge, nil, []int{1, 2}},
+		{runActionEmerge, nil, []int{3}},
+		{runActionCommit, nil, []int{3}},
+		{runActionCommit, nil, []int{3}},
+		{runActionCommit, nil, []int{1}},
+		{runActionFork, nil, []int{2, 4}},
+		{runActionCommit, nil, []int{3}},
+		{runActionCommit, nil, []int{3}},
+		{runActionDelete, nil, []int{1}},
+		{runActionMerge, nil, []int{2, 4}},
+	}
+	plan = insertHibernateBoot(plan, 2)
+	assert.Equal(t, []runAction{
+		{runActionEmerge, nil, []int{1, 2}},
+		{runActionHibernate, nil, []int{1, 2}},
+		{runActionEmerge, nil, []int{3}},
+		{runActionCommit, nil, []int{3}},
+		{runActionCommit, nil, []int{3}},
+		{runActionBoot, nil, []int{1}},
+		{runActionCommit, nil, []int{1}},
+		{runActionBoot, nil, []int{2}},
+		{runActionFork, nil, []int{2, 4}},
+		{runActionHibernate, nil, []int{2, 4}},
+		{runActionCommit, nil, []int{3}},
+		{runActionCommit, nil, []int{3}},
+		{runActionDelete, nil, []int{1}},
+		{runActionBoot, nil, []int{2, 4}},
+		{runActionMerge, nil, []int{2, 4}},
+	}, plan)
+}

+ 31 - 1
internal/core/pipeline.go

@@ -233,6 +233,10 @@ type Pipeline struct {
 	// second is the total number of steps.
 	OnProgress func(int, int)
 
+	// HibernationDistance is the minimum number of actions between two sequential usages of
+	// a branch to activate the hibernation optimization (cpu-memory trade-off). 0 disables.
+	HibernationDistance int
+
 	// DryRun indicates whether the items are not executed.
 	DryRun bool
 
@@ -268,6 +272,10 @@ const (
 	// ConfigPipelineDumpPlan is the name of the Pipeline configuration option (Pipeline.Initialize())
 	// which outputs the execution plan to stderr.
 	ConfigPipelineDumpPlan = "Pipeline.DumpPlan"
+	// ConfigPipelineHibernationDistance is the name of the Pipeline configuration option (Pipeline.Initialize())
+	// which is the minimum number of actions between two sequential usages of
+	// a branch to activate the hibernation optimization (cpu-memory trade-off). 0 disables.
+	ConfigPipelineHibernationDistance = "Pipeline.HibernationDistance"
 	// DependencyCommit is the name of one of the three items in `deps` supplied to PipelineItem.Consume()
 	// which always exists. It corresponds to the currently analyzed commit.
 	DependencyCommit = "commit"
@@ -607,6 +615,12 @@ func (pipeline *Pipeline) Initialize(facts map[string]interface{}) error {
 			log.Panicf("failed to list the commits: %v", err)
 		}
 	}
+	if val, exists := facts[ConfigPipelineHibernationDistance].(int); exists {
+		if val < 0 {
+			log.Panicf("--hibernation-distance cannot be negative (got %d)", val)
+		}
+		pipeline.HibernationDistance = val
+	}
 	dumpPath, _ := facts[ConfigPipelineDAGPath].(string)
 	pipeline.resolve(dumpPath)
 	if dumpPlan, exists := facts[ConfigPipelineDumpPlan].(bool); exists {
@@ -645,7 +659,7 @@ func (pipeline *Pipeline) Run(commits []*object.Commit) (map[LeafPipelineItem]in
 	if onProgress == nil {
 		onProgress = func(int, int) {}
 	}
-	plan := prepareRunPlan(commits, pipeline.DumpPlan)
+	plan := prepareRunPlan(commits, pipeline.HibernationDistance, pipeline.DumpPlan)
 	progressSteps := len(plan) + 2
 	branches := map[int][]PipelineItem{}
 	// we will need rootClone if there is more than one root branch
@@ -715,6 +729,22 @@ func (pipeline *Pipeline) Run(commits []*object.Commit) (map[LeafPipelineItem]in
 			}
 		case runActionDelete:
 			delete(branches, firstItem)
+		case runActionHibernate:
+			for _, item := range step.Items {
+				for _, item := range branches[item] {
+					if hi, ok := item.(HibernateablePipelineItem); ok {
+						hi.Hibernate()
+					}
+				}
+			}
+		case runActionBoot:
+			for _, item := range step.Items {
+				for _, item := range branches[item] {
+					if hi, ok := item.(HibernateablePipelineItem); ok {
+						hi.Boot()
+					}
+				}
+			}
 		}
 	}
 	onProgress(len(plan)+1, progressSteps)

+ 48 - 5
internal/core/pipeline_test.go

@@ -134,6 +134,8 @@ func (item *testPipelineItem) Serialize(result interface{}, binary bool, writer
 type dependingTestPipelineItem struct {
 	DependencySatisfied  bool
 	TestNilConsumeReturn bool
+	Hibernated           bool
+	Booted               bool
 }
 
 func (item *dependingTestPipelineItem) Name() string {
@@ -187,12 +189,24 @@ func (item *dependingTestPipelineItem) Consume(deps map[string]interface{}) (map
 }
 
 func (item *dependingTestPipelineItem) Fork(n int) []PipelineItem {
-	return make([]PipelineItem, n)
+	clones := make([]PipelineItem, n)
+	for i := range clones {
+		clones[i] = item
+	}
+	return clones
 }
 
 func (item *dependingTestPipelineItem) Merge(branches []PipelineItem) {
 }
 
+func (item *dependingTestPipelineItem) Hibernate() {
+	item.Hibernated = true
+}
+
+func (item *dependingTestPipelineItem) Boot() {
+	item.Booted = true
+}
+
 func (item *dependingTestPipelineItem) Finalize() interface{} {
 	return true
 }
@@ -286,7 +300,6 @@ func TestPipelineRunBranches(t *testing.T) {
 	pipeline.AddItem(item)
 	pipeline.Initialize(map[string]interface{}{})
 	assert.True(t, item.Initialized)
-	commits := make([]*object.Commit, 5)
 	hashes := []string{
 		"6db8065cdb9bb0758f36a7e75fc72ab95f9e8145",
 		"f30daba81ff2bf0b3ba02a1e1441e74f8a4f6fee",
@@ -294,6 +307,7 @@ func TestPipelineRunBranches(t *testing.T) {
 		"dd9dd084d5851d7dc4399fc7dbf3d8292831ebc5",
 		"f4ed0405b14f006c0744029d87ddb3245607587a",
 	}
+	commits := make([]*object.Commit, len(hashes))
 	for i, h := range hashes {
 		var err error
 		commits[i], err = test.Repository.CommitObject(plumbing.NewHash(h))
@@ -580,7 +594,7 @@ func TestPrepareRunPlanTiny(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	plan := prepareRunPlan([]*object.Commit{rootCommit}, false)
+	plan := prepareRunPlan([]*object.Commit{rootCommit}, 0, true)
 	assert.Len(t, plan, 2)
 	assert.Equal(t, runActionEmerge, plan[0].Action)
 	assert.Equal(t, rootBranchIndex, plan[0].Items[0])
@@ -607,7 +621,7 @@ func TestPrepareRunPlanSmall(t *testing.T) {
 		}
 		return nil
 	})
-	plan := prepareRunPlan(commits, false)
+	plan := prepareRunPlan(commits, 0, false)
 	/*for _, p := range plan {
 		if p.Commit != nil {
 			fmt.Println(p.Action, p.Commit.Hash.String(), p.Items)
@@ -735,7 +749,7 @@ func TestPrepareRunPlanBig(t *testing.T) {
 				}
 				return nil
 			})
-			plan := prepareRunPlan(commits, false)
+			plan := prepareRunPlan(commits, 0, false)
 			/*for _, p := range plan {
 				if p.Commit != nil {
 					fmt.Println(p.Action, p.Commit.Hash.String(), p.Items)
@@ -792,3 +806,32 @@ func TestPrepareRunPlanBig(t *testing.T) {
 		}()
 	}
 }
+
+func TestPipelineRunHibernation(t *testing.T) {
+	pipeline := NewPipeline(test.Repository)
+	pipeline.HibernationDistance = 2
+	pipeline.AddItem(&testPipelineItem{})
+	item := &dependingTestPipelineItem{}
+	pipeline.AddItem(item)
+	pipeline.Initialize(map[string]interface{}{})
+	hashes := []string{
+		"0183e08978007c746468fca9f68e6e2fbf32100c",
+		"b467a682f680a4dcfd74869480a52f8be3a4fdf0",
+		"31c9f752f9ce103e85523442fa3f05b1ff4ea546",
+		"6530890fcd02fb5e6e85ce2951fdd5c555f2c714",
+		"feb2d230777cbb492ecbc27dea380dc1e7b8f437",
+		"9b30d2abc043ab59aa7ec7b50970c65c90b98853",
+	}
+	commits := make([]*object.Commit, len(hashes))
+	for i, h := range hashes {
+		var err error
+		commits[i], err = test.Repository.CommitObject(plumbing.NewHash(h))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+	_, err := pipeline.Run(commits)
+	assert.Nil(t, err)
+	assert.True(t, item.Hibernated)
+	assert.True(t, item.Booted)
+}

+ 6 - 0
internal/core/registry.go

@@ -212,6 +212,12 @@ func (registry *PipelineItemRegistry) AddFlags(flagSet *pflag.FlagSet) (
 		ptr3 := (**bool)(unsafe.Pointer(uintptr(unsafe.Pointer(&iface)) + unsafe.Sizeof(&iface)))
 		*ptr3 = flagSet.Bool("dump-plan", false, "Print the pipeline execution plan to stderr.")
 		flags[ConfigPipelineDumpPlan] = iface
+		iface = interface{}(0)
+		ptr4 := (**int)(unsafe.Pointer(uintptr(unsafe.Pointer(&iface)) + unsafe.Sizeof(&iface)))
+		*ptr4 = flagSet.Int("hibernation-distance", 0,
+			"Minimum number of actions between two sequential usages of a branch to activate "+
+				"the hibernation optimization (cpu-memory trade-off). 0 disables.")
+		flags[ConfigPipelineHibernationDistance] = iface
 	}
 	var features []string
 	for f := range registry.featureFlags.Choices {

+ 3 - 1
internal/core/registry_test.go

@@ -135,12 +135,13 @@ func TestRegistryAddFlags(t *testing.T) {
 		Run:   func(cmd *cobra.Command, args []string) {},
 	}
 	facts, deployed := reg.AddFlags(testCmd.Flags())
-	assert.Len(t, facts, 5)
+	assert.Len(t, facts, 6)
 	assert.IsType(t, 0, facts[(&testPipelineItem{}).ListConfigurationOptions()[0].Name])
 	assert.IsType(t, true, facts[(&dummyPipelineItem{}).ListConfigurationOptions()[0].Name])
 	assert.Contains(t, facts, ConfigPipelineDryRun)
 	assert.Contains(t, facts, ConfigPipelineDAGPath)
 	assert.Contains(t, facts, ConfigPipelineDumpPlan)
+	assert.Contains(t, facts, ConfigPipelineHibernationDistance)
 	assert.Len(t, deployed, 1)
 	assert.Contains(t, deployed, (&testPipelineItem{}).Name())
 	assert.NotNil(t, testCmd.Flags().Lookup((&testPipelineItem{}).Flag()))
@@ -148,6 +149,7 @@ func TestRegistryAddFlags(t *testing.T) {
 	assert.NotNil(t, testCmd.Flags().Lookup("dump-dag"))
 	assert.NotNil(t, testCmd.Flags().Lookup("dump-plan"))
 	assert.NotNil(t, testCmd.Flags().Lookup("dry-run"))
+	assert.NotNil(t, testCmd.Flags().Lookup("hibernation-distance"))
 	assert.NotNil(t, testCmd.Flags().Lookup(
 		(&testPipelineItem{}).ListConfigurationOptions()[0].Flag))
 	assert.NotNil(t, testCmd.Flags().Lookup(

+ 2 - 0
leaves/burndown_test.go

@@ -1212,6 +1212,8 @@ func TestBurndownHibernateBoot(t *testing.T) {
 	assert.Equal(t, burndown.fileAllocator.Size(), 157)
 	assert.Equal(t, burndown.fileAllocator.Used(), 155)
 	burndown.Hibernate()
+	assert.PanicsWithValue(t, "BurndownAnalysis.Consume() was called on a hibernated instance",
+		func() { burndown.Consume(nil) })
 	assert.Equal(t, burndown.fileAllocator.Size(), 0)
 	burndown.Boot()
 	assert.Equal(t, burndown.fileAllocator.Size(), 157)