| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937 | package coreimport (	"bufio"	"fmt"	"io"	"io/ioutil"	"log"	"os"	"path/filepath"	"runtime/debug"	"sort"	"strings"	"time"	"github.com/pkg/errors"	"gopkg.in/src-d/go-git.v4"	"gopkg.in/src-d/go-git.v4/plumbing"	"gopkg.in/src-d/go-git.v4/plumbing/object"	"gopkg.in/src-d/go-git.v4/plumbing/storer"	"gopkg.in/src-d/hercules.v10/internal/pb"	"gopkg.in/src-d/hercules.v10/internal/toposort")// ConfigurationOptionType represents the possible types of a ConfigurationOption's value.type ConfigurationOptionType intconst (	// BoolConfigurationOption reflects the boolean value type.	BoolConfigurationOption ConfigurationOptionType = iota	// IntConfigurationOption reflects the integer value type.	IntConfigurationOption	// StringConfigurationOption reflects the string value type.	StringConfigurationOption	// FloatConfigurationOption reflects a floating point value type.	FloatConfigurationOption	// StringsConfigurationOption reflects the array of strings value type.	StringsConfigurationOption	// PathConfigurationOption reflects the file system path value type.	PathConfigurationOption)// String() returns an empty string for the boolean type, "int" for integers and "string" for// strings. It is used in the command line interface to show the argument's type.func (opt ConfigurationOptionType) String() string {	switch opt {	case BoolConfigurationOption:		return ""	case IntConfigurationOption:		return "int"	case StringConfigurationOption:		return "string"	case FloatConfigurationOption:		return "float"	case StringsConfigurationOption:		return "string"	case PathConfigurationOption:		return "path"	}	log.Panicf("Invalid ConfigurationOptionType value %d", opt)	return ""}// ConfigurationOption allows for the unified, retrospective way to setup PipelineItem-s.type ConfigurationOption struct {	// Name identifies the configuration option in facts.	Name string	// Description represents the help text about the configuration option.	Description string	// Flag corresponds to the CLI token with "--" prepended.	Flag string	// Type specifies the kind of the configuration option's value.	Type ConfigurationOptionType	// Default is the initial value of the configuration option.	Default interface{}}// FormatDefault converts the default value of ConfigurationOption to string.// Used in the command line interface to show the argument's default value.func (opt ConfigurationOption) FormatDefault() string {	if opt.Type == StringsConfigurationOption {		return fmt.Sprintf("\"%s\"", strings.Join(opt.Default.([]string), ","))	}	if opt.Type != StringConfigurationOption {		return fmt.Sprint(opt.Default)	}	return fmt.Sprintf("\"%s\"", opt.Default)}// PipelineItem is the interface for all the units in the Git commits analysis pipeline.type PipelineItem interface {	// Name returns the name of the analysis.	Name() string	// Provides returns the list of keys of reusable calculated entities.	// Other items may depend on them.	Provides() []string	// Requires returns the list of keys of needed entities which must be supplied in Consume().	Requires() []string	// ListConfigurationOptions returns the list of available options which can be consumed by Configure().	ListConfigurationOptions() []ConfigurationOption	// Configure performs the initial setup of the object by applying parameters from facts.	// It allows to create PipelineItems in a universal way.	Configure(facts map[string]interface{}) error	// Initialize prepares and resets the item. Consume() requires Initialize()	// to be called at least once beforehand.	Initialize(*git.Repository) error	// Consume processes the next commit.	// deps contains the required entities which match Depends(). Besides, it always includes	// DependencyCommit and DependencyIndex.	// Returns the calculated entities which match Provides().	Consume(deps map[string]interface{}) (map[string]interface{}, error)	// Fork clones the item the requested number of times. The data links between the clones	// are up to the implementation. Needed to handle Git branches. See also Merge().	// Returns a slice with `n` fresh clones. In other words, it does not include the original item.	Fork(n int) []PipelineItem	// Merge combines several branches together. Each is supposed to have been created with Fork().	// The result is stored in the called item, thus this function returns nothing.	// Merge() must update all the branches, not only self. When several branches merge, some of	// them may continue to live, hence this requirement.	Merge(branches []PipelineItem)}// FeaturedPipelineItem enables switching the automatic insertion of pipeline items on or off.type FeaturedPipelineItem interface {	PipelineItem	// Features returns the list of names which enable this item to be automatically inserted	// in Pipeline.DeployItem().	Features() []string}// DisposablePipelineItem enables resources cleanup after finishing running the pipeline.type DisposablePipelineItem interface {	PipelineItem	// Dispose frees any previously allocated unmanaged resources. No Consume() calls are possible	// afterwards. The item needs to be Initialize()-d again.	// This method is invoked once for each item in the pipeline, **in a single forked instance**.	// Thus it is the responsibility of the item's programmer to deal with forks and merges, if	// necessary.	Dispose()}// LeafPipelineItem corresponds to the top level pipeline items which produce the end results.type LeafPipelineItem interface {	PipelineItem	// Flag returns the cmdline switch to run the analysis. Should be dash-lower-case	// without the leading dashes.	Flag() string	// Description returns the text which explains what the analysis is doing.	// Should start with a capital letter and end with a dot.	Description() string	// Finalize returns the result of the analysis.	Finalize() interface{}	// Serialize encodes the object returned by Finalize() to YAML or Protocol Buffers.	Serialize(result interface{}, binary bool, writer io.Writer) error}// ResultMergeablePipelineItem specifies the methods to combine several analysis results together.type ResultMergeablePipelineItem interface {	LeafPipelineItem	// Deserialize loads the result from Protocol Buffers blob.	Deserialize(pbmessage []byte) (interface{}, error)	// MergeResults joins two results together. Common-s are specified as the global state.	MergeResults(r1, r2 interface{}, c1, c2 *CommonAnalysisResult) interface{}}// HibernateablePipelineItem is the interface to allow pipeline items to be frozen (compacted, unloaded)// while they are not needed in the hosting branch.type HibernateablePipelineItem interface {	PipelineItem	// Hibernate signals that the item is temporarily not needed and it's memory can be optimized.	Hibernate() error	// Boot signals that the item is needed again and must be de-hibernate-d.	Boot() error}// CommonAnalysisResult holds the information which is always extracted at Pipeline.Run().type CommonAnalysisResult struct {	// BeginTime is the time of the first commit in the analysed sequence.	BeginTime int64	// EndTime is the time of the last commit in the analysed sequence.	EndTime int64	// CommitsNumber is the number of commits in the analysed sequence.	CommitsNumber int	// RunTime is the duration of Pipeline.Run().	RunTime time.Duration	// RunTimePerItem is the time elapsed by each PipelineItem.	RunTimePerItem map[string]float64}// Copy produces a deep clone of the object.func (car CommonAnalysisResult) Copy() CommonAnalysisResult {	result := car	result.RunTimePerItem = map[string]float64{}	for key, val := range car.RunTimePerItem {		result.RunTimePerItem[key] = val	}	return result}// BeginTimeAsTime converts the UNIX timestamp of the beginning to Go time.func (car *CommonAnalysisResult) BeginTimeAsTime() time.Time {	return time.Unix(car.BeginTime, 0)}// EndTimeAsTime converts the UNIX timestamp of the ending to Go time.func (car *CommonAnalysisResult) EndTimeAsTime() time.Time {	return time.Unix(car.EndTime, 0)}// Merge combines the CommonAnalysisResult with an other one.// We choose the earlier BeginTime, the later EndTime, sum the number of commits and the// elapsed run times.func (car *CommonAnalysisResult) Merge(other *CommonAnalysisResult) {	if car.EndTime == 0 || other.BeginTime == 0 {		panic("Merging with an uninitialized CommonAnalysisResult")	}	if other.BeginTime < car.BeginTime {		car.BeginTime = other.BeginTime	}	if other.EndTime > car.EndTime {		car.EndTime = other.EndTime	}	car.CommitsNumber += other.CommitsNumber	car.RunTime += other.RunTime	for key, val := range other.RunTimePerItem {		car.RunTimePerItem[key] += val	}}// FillMetadata copies the data to a Protobuf message.func (car *CommonAnalysisResult) FillMetadata(meta *pb.Metadata) *pb.Metadata {	meta.BeginUnixTime = car.BeginTime	meta.EndUnixTime = car.EndTime	meta.Commits = int32(car.CommitsNumber)	meta.RunTime = car.RunTime.Nanoseconds() / 1e6	meta.RunTimePerItem = car.RunTimePerItem	return meta}// Metadata is defined in internal/pb/pb.pb.go - header of the binary file.type Metadata = pb.Metadata// MetadataToCommonAnalysisResult copies the data from a Protobuf message.func MetadataToCommonAnalysisResult(meta *Metadata) *CommonAnalysisResult {	return &CommonAnalysisResult{		BeginTime:      meta.BeginUnixTime,		EndTime:        meta.EndUnixTime,		CommitsNumber:  int(meta.Commits),		RunTime:        time.Duration(meta.RunTime * 1e6),		RunTimePerItem: meta.RunTimePerItem,	}}// Pipeline is the core Hercules entity which carries several PipelineItems and executes them.// See the extended example of how a Pipeline works in doc.gotype Pipeline struct {	// OnProgress is the callback which is invoked in Analyse() to output it's	// progress. The first argument is the number of complete steps, the	// second is the total number of steps and the third is some description of the current action.	OnProgress func(int, int, string)	// HibernationDistance is the minimum number of actions between two sequential usages of	// a branch to activate the hibernation optimization (cpu-memory trade-off). 0 disables.	HibernationDistance int	// DryRun indicates whether the items are not executed.	DryRun bool	// DumpPlan indicates whether to print the execution plan to stderr.	DumpPlan bool	// PrintActions indicates whether to print the taken actions during the execution.	PrintActions bool	// Repository points to the analysed Git repository struct from go-git.	repository *git.Repository	// Items are the registered building blocks in the pipeline. The order defines the	// execution sequence.	items []PipelineItem	// The collection of parameters to create items.	facts map[string]interface{}	// Feature flags which enable the corresponding items.	features map[string]bool	// The logger for printing output.	l Logger}const (	// ConfigPipelineDAGPath is the name of the Pipeline configuration option (Pipeline.Initialize())	// which enables saving the items DAG to the specified file.	ConfigPipelineDAGPath = "Pipeline.DAGPath"	// ConfigPipelineDryRun is the name of the Pipeline configuration option (Pipeline.Initialize())	// which disables Configure() and Initialize() invocation on each PipelineItem during the	// Pipeline initialization.	// Subsequent Run() calls are going to fail. Useful with ConfigPipelineDAGPath=true.	ConfigPipelineDryRun = "Pipeline.DryRun"	// ConfigPipelineCommits is the name of the Pipeline configuration option (Pipeline.Initialize())	// which allows to specify the custom commit sequence. By default, Pipeline.Commits() is used.	ConfigPipelineCommits = "Pipeline.Commits"	// ConfigPipelineDumpPlan is the name of the Pipeline configuration option (Pipeline.Initialize())	// which outputs the execution plan to stderr.	ConfigPipelineDumpPlan = "Pipeline.DumpPlan"	// ConfigPipelineHibernationDistance is the name of the Pipeline configuration option (Pipeline.Initialize())	// which is the minimum number of actions between two sequential usages of	// a branch to activate the hibernation optimization (cpu-memory trade-off). 0 disables.	ConfigPipelineHibernationDistance = "Pipeline.HibernationDistance"	// ConfigPipelinePrintActions is the name of the Pipeline configuration option (Pipeline.Initialize())	// which enables printing the taken actions of the execution plan to stderr.	ConfigPipelinePrintActions = "Pipeline.PrintActions"	// DependencyCommit is the name of one of the three items in `deps` supplied to PipelineItem.Consume()	// which always exists. It corresponds to the currently analyzed commit.	DependencyCommit = "commit"	// DependencyIndex is the name of one of the three items in `deps` supplied to PipelineItem.Consume()	// which always exists. It corresponds to the currently analyzed commit's index.	DependencyIndex = "index"	// DependencyIsMerge is the name of one of the three items in `deps` supplied to PipelineItem.Consume()	// which always exists. It indicates whether the analyzed commit is a merge commit.	// Checking the number of parents is not correct - we remove the back edges during the DAG simplification.	DependencyIsMerge = "is_merge"	// MessageFinalize is the status text reported before calling LeafPipelineItem.Finalize()-s.	MessageFinalize = "finalize")// NewPipeline initializes a new instance of Pipeline struct.func NewPipeline(repository *git.Repository) *Pipeline {	return &Pipeline{		repository: repository,		items:      []PipelineItem{},		facts:      map[string]interface{}{},		features:   map[string]bool{},		l:          NewLogger(),	}}// GetFact returns the value of the fact with the specified name.func (pipeline *Pipeline) GetFact(name string) interface{} {	return pipeline.facts[name]}// SetFact sets the value of the fact with the specified name.func (pipeline *Pipeline) SetFact(name string, value interface{}) {	pipeline.facts[name] = value}// GetFeature returns the state of the feature with the specified name (enabled/disabled) and// whether it exists. See also: FeaturedPipelineItem.func (pipeline *Pipeline) GetFeature(name string) (bool, bool) {	val, exists := pipeline.features[name]	return val, exists}// SetFeature sets the value of the feature with the specified name.// See also: FeaturedPipelineItem.func (pipeline *Pipeline) SetFeature(name string) {	pipeline.features[name] = true}// SetFeaturesFromFlags enables the features which were specified through the command line flags// which belong to the given PipelineItemRegistry instance.// See also: AddItem().func (pipeline *Pipeline) SetFeaturesFromFlags(registry ...*PipelineItemRegistry) {	var ffr *PipelineItemRegistry	if len(registry) == 0 {		ffr = Registry	} else if len(registry) == 1 {		ffr = registry[0]	} else {		panic("Zero or one registry is allowed to be passed.")	}	for _, feature := range ffr.featureFlags.Flags {		pipeline.SetFeature(feature)	}}// DeployItem inserts a PipelineItem into the pipeline. It also recursively creates all of it's// dependencies (PipelineItem.Requires()). Returns the same item as specified in the arguments.func (pipeline *Pipeline) DeployItem(item PipelineItem) PipelineItem {	fpi, ok := item.(FeaturedPipelineItem)	if ok {		for _, f := range fpi.Features() {			pipeline.SetFeature(f)		}	}	queue := []PipelineItem{}	queue = append(queue, item)	added := map[string]PipelineItem{}	for _, item := range pipeline.items {		added[item.Name()] = item	}	added[item.Name()] = item	pipeline.AddItem(item)	for len(queue) > 0 {		head := queue[0]		queue = queue[1:]		for _, dep := range head.Requires() {			for _, sibling := range Registry.Summon(dep) {				if _, exists := added[sibling.Name()]; !exists {					disabled := false					// If this item supports features, check them against the activated in pipeline.features					if fpi, matches := sibling.(FeaturedPipelineItem); matches {						for _, feature := range fpi.Features() {							if !pipeline.features[feature] {								disabled = true								break							}						}					}					if disabled {						continue					}					added[sibling.Name()] = sibling					queue = append(queue, sibling)					pipeline.AddItem(sibling)				}			}		}	}	return item}// AddItem inserts a PipelineItem into the pipeline. It does not check any dependencies.// See also: DeployItem().func (pipeline *Pipeline) AddItem(item PipelineItem) PipelineItem {	pipeline.items = append(pipeline.items, item)	return item}// RemoveItem deletes a PipelineItem from the pipeline. It leaves all the rest of the items intact.func (pipeline *Pipeline) RemoveItem(item PipelineItem) {	for i, reg := range pipeline.items {		if reg == item {			pipeline.items = append(pipeline.items[:i], pipeline.items[i+1:]...)			return		}	}}// Len returns the number of items in the pipeline.func (pipeline *Pipeline) Len() int {	return len(pipeline.items)}// Commits returns the list of commits from the history similar to `git log` over the HEAD.// `firstParent` specifies whether to leave only the first parent after each merge// (`git log --first-parent`) - effectively decreasing the accuracy but increasing performance.func (pipeline *Pipeline) Commits(firstParent bool) ([]*object.Commit, error) {	var result []*object.Commit	repository := pipeline.repository	head, err := repository.Head()	if err != nil {		if err == plumbing.ErrReferenceNotFound {			refs, errr := repository.References()			if errr != nil {				return nil, errors.Wrap(errr, "unable to list the references")			}			refs.ForEach(func(ref *plumbing.Reference) error {				if strings.HasPrefix(ref.Name().String(), "refs/heads/HEAD/") {					head = ref					return storer.ErrStop				}				return nil			})		}		if head == nil && err != nil {			return nil, errors.Wrap(err, "unable to collect the commit history")		}	}	if firstParent {		commit, err := repository.CommitObject(head.Hash())		if err != nil {			panic(err)		}		// the first parent matches the head		for ; err != io.EOF; commit, err = commit.Parents().Next() {			if err != nil {				panic(err)			}			result = append(result, commit)		}		// reverse the order		for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {			result[i], result[j] = result[j], result[i]		}		return result, nil	}	cit, err := repository.Log(&git.LogOptions{From: head.Hash()})	if err != nil {		return nil, errors.Wrap(err, "unable to collect the commit history")	}	defer cit.Close()	cit.ForEach(func(commit *object.Commit) error {		result = append(result, commit)		return nil	})	return result, nil}type sortablePipelineItems []PipelineItemfunc (items sortablePipelineItems) Len() int {	return len(items)}func (items sortablePipelineItems) Less(i, j int) bool {	return items[i].Name() < items[j].Name()}func (items sortablePipelineItems) Swap(i, j int) {	items[i], items[j] = items[j], items[i]}func (pipeline *Pipeline) resolve(dumpPath string) {	graph := toposort.NewGraph()	sort.Sort(sortablePipelineItems(pipeline.items))	name2item := map[string]PipelineItem{}	ambiguousMap := map[string][]string{}	nameUsages := map[string]int{}	for _, item := range pipeline.items {		nameUsages[item.Name()]++	}	counters := map[string]int{}	for _, item := range pipeline.items {		name := item.Name()		if nameUsages[name] > 1 {			index := counters[item.Name()] + 1			counters[item.Name()] = index			name = fmt.Sprintf("%s_%d", item.Name(), index)		}		graph.AddNode(name)		name2item[name] = item		for _, key := range item.Provides() {			key = "[" + key + "]"			graph.AddNode(key)			if graph.AddEdge(name, key) > 1 {				if ambiguousMap[key] != nil {					fmt.Fprintln(os.Stderr, "Pipeline:")					for _, item2 := range pipeline.items {						if item2 == item {							fmt.Fprint(os.Stderr, "> ")						}						fmt.Fprint(os.Stderr, item2.Name(), " [")						for i, key2 := range item2.Provides() {							fmt.Fprint(os.Stderr, key2)							if i < len(item.Provides())-1 {								fmt.Fprint(os.Stderr, ", ")							}						}						fmt.Fprintln(os.Stderr, "]")					}					panic("Failed to resolve pipeline dependencies: ambiguous graph.")				}				ambiguousMap[key] = graph.FindParents(key)			}		}	}	counters = map[string]int{}	for _, item := range pipeline.items {		name := item.Name()		if nameUsages[name] > 1 {			index := counters[item.Name()] + 1			counters[item.Name()] = index			name = fmt.Sprintf("%s_%d", item.Name(), index)		}		for _, key := range item.Requires() {			key = "[" + key + "]"			if graph.AddEdge(key, name) == 0 {				pipeline.l.Errorf("Unsatisfied dependency: %s -> %s", key, item.Name())				return			}		}	}	// Try to break the cycles in some known scenarios.	if len(ambiguousMap) > 0 {		var ambiguous []string		for key := range ambiguousMap {			ambiguous = append(ambiguous, key)		}		sort.Strings(ambiguous)		bfsorder := graph.BreadthSort()		bfsindex := map[string]int{}		for i, s := range bfsorder {			bfsindex[s] = i		}		for len(ambiguous) > 0 {			key := ambiguous[0]			ambiguous = ambiguous[1:]			pair := ambiguousMap[key]			inheritor := pair[1]			if bfsindex[pair[1]] < bfsindex[pair[0]] {				inheritor = pair[0]			}			removed := graph.RemoveEdge(key, inheritor)			cycle := map[string]bool{}			for _, node := range graph.FindCycle(key) {				cycle[node] = true			}			if len(cycle) == 0 {				cycle[inheritor] = true			}			if removed {				graph.AddEdge(key, inheritor)			}			graph.RemoveEdge(inheritor, key)			graph.ReindexNode(inheritor)			// for all nodes key links to except those in cycle, put the link from inheritor			for _, node := range graph.FindChildren(key) {				if _, exists := cycle[node]; !exists {					graph.AddEdge(inheritor, node)					graph.RemoveEdge(key, node)				}			}			graph.ReindexNode(key)		}	}	var graphCopy *toposort.Graph	if dumpPath != "" {		graphCopy = graph.Copy()	}	strplan, ok := graph.Toposort()	if !ok {		pipeline.l.Errorf("Failed to resolve pipeline dependencies: unable to topologically sort the items.")		return	}	pipeline.items = make([]PipelineItem, 0, len(pipeline.items))	for _, key := range strplan {		if item, ok := name2item[key]; ok {			pipeline.items = append(pipeline.items, item)		}	}	if dumpPath != "" {		// If there is a floating difference, uncomment this:		// fmt.Fprint(os.Stderr, graphCopy.DebugDump())		ioutil.WriteFile(dumpPath, []byte(graphCopy.Serialize(strplan)), 0666)		absPath, _ := filepath.Abs(dumpPath)		pipeline.l.Infof("Wrote the DAG to %s\n", absPath)	}}// Initialize prepares the pipeline for the execution (Run()). This function// resolves the execution DAG, Configure()-s and Initialize()-s the items in it in the// topological dependency order. `facts` are passed inside Configure(). They are mutable.func (pipeline *Pipeline) Initialize(facts map[string]interface{}) error {	cleanReturn := false	defer func() {		if !cleanReturn {			remotes, _ := pipeline.repository.Remotes()			if len(remotes) > 0 {				pipeline.l.Errorf("Failed to initialize the pipeline on %s", remotes[0].Config().URLs)			}		}	}()	if facts == nil {		facts = map[string]interface{}{}	}	if l, exists := facts[ConfigLogger].(Logger); exists {		pipeline.l = l	} else {		facts[ConfigLogger] = pipeline.l	}	if _, exists := facts[ConfigPipelineCommits]; !exists {		var err error		facts[ConfigPipelineCommits], err = pipeline.Commits(false)		if err != nil {			pipeline.l.Errorf("failed to list the commits: %v", err)			return err		}	}	pipeline.PrintActions, _ = facts[ConfigPipelinePrintActions].(bool)	if val, exists := facts[ConfigPipelineHibernationDistance].(int); exists {		if val < 0 {			err := fmt.Errorf("--hibernation-distance cannot be negative (got %d)", val)			pipeline.l.Error(err)			return err		}		pipeline.HibernationDistance = val	}	dumpPath, _ := facts[ConfigPipelineDAGPath].(string)	pipeline.resolve(dumpPath)	if dumpPlan, exists := facts[ConfigPipelineDumpPlan].(bool); exists {		pipeline.DumpPlan = dumpPlan	}	if dryRun, exists := facts[ConfigPipelineDryRun].(bool); exists {		pipeline.DryRun = dryRun		if dryRun {			cleanReturn = true			return nil		}	}	for _, item := range pipeline.items {		err := item.Configure(facts)		if err != nil {			cleanReturn = true			return errors.Wrapf(err, "%s failed to configure", item.Name())		}	}	for _, item := range pipeline.items {		err := item.Initialize(pipeline.repository)		if err != nil {			cleanReturn = true			return errors.Wrapf(err, "%s failed to initialize", item.Name())		}	}	if pipeline.HibernationDistance > 0 {		// if we want hibernation, then we want to minimize RSS		debug.SetGCPercent(20) // the default is 100	}	cleanReturn = true	return nil}// Run method executes the pipeline.//// `commits` is a slice with the git commits to analyse. Multiple branches are supported.//// Returns the mapping from each LeafPipelineItem to the corresponding analysis result.// There is always a "nil" record with CommonAnalysisResult.func (pipeline *Pipeline) Run(commits []*object.Commit) (map[LeafPipelineItem]interface{}, error) {	startRunTime := time.Now()	cleanReturn := false	defer func() {		if !cleanReturn {			remotes, _ := pipeline.repository.Remotes()			if len(remotes) > 0 {				pipeline.l.Errorf("Failed to run the pipeline on %s", remotes[0].Config().URLs)			}		}	}()	onProgress := pipeline.OnProgress	if onProgress == nil {		onProgress = func(int, int, string) {}	}	plan := prepareRunPlan(commits, pipeline.HibernationDistance, pipeline.DumpPlan)	progressSteps := len(plan) + 2	branches := map[int][]PipelineItem{}	// we will need rootClone if there is more than one root branch	var rootClone []PipelineItem	if !pipeline.DryRun {		rootClone = cloneItems(pipeline.items, 1)[0]	}	var newestTime int64	runTimePerItem := map[string]float64{}	isMerge := func(index int, commit plumbing.Hash) bool {		match := false		// look for the same hash backward		for i := index - 1; i > 0; i-- {			switch plan[i].Action {			case runActionHibernate, runActionBoot:				continue			case runActionCommit:				match = plan[i].Commit.Hash == commit				fallthrough			default:				i = 0			}		}		if match {			return true		}		// look for the same hash forward		for i := index + 1; i < len(plan); i++ {			switch plan[i].Action {			case runActionHibernate, runActionBoot:				continue			case runActionCommit:				match = plan[i].Commit.Hash == commit				fallthrough			default:				i = len(plan)			}		}		return match	}	commitIndex := 0	for index, step := range plan {		onProgress(index+1, progressSteps, step.String())		if pipeline.DryRun {			continue		}		if pipeline.PrintActions {			printAction(step)		}		if index > 0 && index%100 == 0 && pipeline.HibernationDistance > 0 {			debug.FreeOSMemory()		}		firstItem := step.Items[0]		switch step.Action {		case runActionCommit:			state := map[string]interface{}{				DependencyCommit:  step.Commit,				DependencyIndex:   commitIndex,				DependencyIsMerge: isMerge(index, step.Commit.Hash),			}			for _, item := range branches[firstItem] {				startTime := time.Now()				update, err := item.Consume(state)				runTimePerItem[item.Name()] += time.Now().Sub(startTime).Seconds()				if err != nil {					pipeline.l.Errorf("%s failed on commit #%d (%d) %s: %v\n",						item.Name(), commitIndex+1, index+1, step.Commit.Hash.String(), err)					return nil, err				}				for _, key := range item.Provides() {					val, ok := update[key]					if !ok {						err := fmt.Errorf("%s: Consume() did not return %s", item.Name(), key)						pipeline.l.Error(err)						return nil, err					}					state[key] = val				}			}			commitTime := step.Commit.Committer.When.Unix()			if commitTime > newestTime {				newestTime = commitTime			}			commitIndex++		case runActionFork:			startTime := time.Now()			for i, clone := range cloneItems(branches[firstItem], len(step.Items)-1) {				branches[step.Items[i+1]] = clone			}			runTimePerItem["*.Fork"] += time.Now().Sub(startTime).Seconds()		case runActionMerge:			startTime := time.Now()			merged := make([][]PipelineItem, len(step.Items))			for i, b := range step.Items {				merged[i] = branches[b]			}			mergeItems(merged)			runTimePerItem["*.Merge"] += time.Now().Sub(startTime).Seconds()		case runActionEmerge:			if firstItem == rootBranchIndex {				branches[firstItem] = pipeline.items			} else {				branches[firstItem] = cloneItems(rootClone, 1)[0]			}		case runActionDelete:			delete(branches, firstItem)		case runActionHibernate:			for _, item := range step.Items {				for _, item := range branches[item] {					if hi, ok := item.(HibernateablePipelineItem); ok {						startTime := time.Now()						err := hi.Hibernate()						if err != nil {							pipeline.l.Errorf("Failed to hibernate %s: %v\n", item.Name(), err)							return nil, err						}						runTimePerItem[item.Name()+".Hibernation"] += time.Now().Sub(startTime).Seconds()					}				}			}		case runActionBoot:			for _, item := range step.Items {				for _, item := range branches[item] {					if hi, ok := item.(HibernateablePipelineItem); ok {						startTime := time.Now()						err := hi.Boot()						if err != nil {							pipeline.l.Errorf("Failed to boot %s: %v\n", item.Name(), err)							return nil, err						}						runTimePerItem[item.Name()+".Hibernation"] += time.Now().Sub(startTime).Seconds()					}				}			}		}	}	onProgress(len(plan)+1, progressSteps, MessageFinalize)	result := map[LeafPipelineItem]interface{}{}	if !pipeline.DryRun {		for index, item := range getMasterBranch(branches) {			if casted, ok := item.(DisposablePipelineItem); ok {				casted.Dispose()			}			if casted, ok := item.(LeafPipelineItem); ok {				result[pipeline.items[index].(LeafPipelineItem)] = casted.Finalize()			}		}	}	onProgress(progressSteps, progressSteps, "")	result[nil] = &CommonAnalysisResult{		BeginTime:      plan[0].Commit.Committer.When.Unix(),		EndTime:        newestTime,		CommitsNumber:  len(commits),		RunTime:        time.Since(startRunTime),		RunTimePerItem: runTimePerItem,	}	cleanReturn = true	return result, nil}// LoadCommitsFromFile reads the file by the specified FS path and generates the sequence of commits// by interpreting each line as a Git commit hash.func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Commit, error) {	var file io.ReadCloser	if path != "-" {		var err error		file, err = os.Open(path)		if err != nil {			return nil, err		}		defer file.Close()	} else {		file = os.Stdin	}	scanner := bufio.NewScanner(file)	var commits []*object.Commit	for scanner.Scan() {		hash := plumbing.NewHash(scanner.Text())		if len(hash) != 20 {			return nil, errors.New("invalid commit hash " + scanner.Text())		}		commit, err := repository.CommitObject(hash)		if err != nil {			return nil, err		}		commits = append(commits, commit)	}	return commits, nil}// GetSensibleRemote extracts a remote URL of the repository to identify it.func GetSensibleRemote(repository *git.Repository) string {	if r, err := repository.Remotes(); err == nil && len(r) > 0 {		return r[0].Config().URLs[0]	}	return "<no remote>"}
 |