pipeline.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. package hercules
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "log"
  9. "os"
  10. "path/filepath"
  11. "sort"
  12. "time"
  13. "gopkg.in/src-d/go-git.v4"
  14. "gopkg.in/src-d/go-git.v4/plumbing"
  15. "gopkg.in/src-d/go-git.v4/plumbing/object"
  16. "gopkg.in/src-d/hercules.v3/pb"
  17. "gopkg.in/src-d/hercules.v3/toposort"
  18. )
  19. // ConfigurationOptionType represents the possible types of a ConfigurationOption's value.
  20. type ConfigurationOptionType int
  21. const (
  22. // BoolConfigurationOption reflects the boolean value type.
  23. BoolConfigurationOption ConfigurationOptionType = iota
  24. // IntConfigurationOption reflects the integer value type.
  25. IntConfigurationOption
  26. // StringConfigurationOption reflects the string value type.
  27. StringConfigurationOption
  28. // FloatConfigurationOption reflects a floating point value type.
  29. FloatConfigurationOption
  30. )
  31. // String() returns an empty string for the boolean type, "int" for integers and "string" for
  32. // strings. It is used in the command line interface to show the argument's type.
  33. func (opt ConfigurationOptionType) String() string {
  34. switch opt {
  35. case BoolConfigurationOption:
  36. return ""
  37. case IntConfigurationOption:
  38. return "int"
  39. case StringConfigurationOption:
  40. return "string"
  41. case FloatConfigurationOption:
  42. return "float"
  43. }
  44. panic(fmt.Sprintf("Invalid ConfigurationOptionType value %d", opt))
  45. }
  46. // ConfigurationOption allows for the unified, retrospective way to setup PipelineItem-s.
  47. type ConfigurationOption struct {
  48. // Name identifies the configuration option in facts.
  49. Name string
  50. // Description represents the help text about the configuration option.
  51. Description string
  52. // Flag corresponds to the CLI token with "--" prepended.
  53. Flag string
  54. // Type specifies the kind of the configuration option's value.
  55. Type ConfigurationOptionType
  56. // Default is the initial value of the configuration option.
  57. Default interface{}
  58. }
  59. // FormatDefault converts the default value of ConfigurationOption to string.
  60. // Used in the command line interface to show the argument's default value.
  61. func (opt ConfigurationOption) FormatDefault() string {
  62. if opt.Type != StringConfigurationOption {
  63. return fmt.Sprint(opt.Default)
  64. }
  65. return fmt.Sprintf("\"%s\"", opt.Default)
  66. }
  67. // PipelineItem is the interface for all the units in the Git commits analysis pipeline.
  68. type PipelineItem interface {
  69. // Name returns the name of the analysis.
  70. Name() string
  71. // Provides returns the list of keys of reusable calculated entities.
  72. // Other items may depend on them.
  73. Provides() []string
  74. // Requires returns the list of keys of needed entities which must be supplied in Consume().
  75. Requires() []string
  76. // ListConfigurationOptions returns the list of available options which can be consumed by Configure().
  77. ListConfigurationOptions() []ConfigurationOption
  78. // Configure performs the initial setup of the object by applying parameters from facts.
  79. // It allows to create PipelineItems in a universal way.
  80. Configure(facts map[string]interface{})
  81. // Initialize prepares and resets the item. Consume() requires Initialize()
  82. // to be called at least once beforehand.
  83. Initialize(*git.Repository)
  84. // Consume processes the next commit.
  85. // deps contains the required entities which match Depends(). Besides, it always includes
  86. // "commit" and "index".
  87. // Returns the calculated entities which match Provides().
  88. Consume(deps map[string]interface{}) (map[string]interface{}, error)
  89. }
  90. // FeaturedPipelineItem enables switching the automatic insertion of pipeline items on or off.
  91. type FeaturedPipelineItem interface {
  92. PipelineItem
  93. // Features returns the list of names which enable this item to be automatically inserted
  94. // in Pipeline.DeployItem().
  95. Features() []string
  96. }
  97. // LeafPipelineItem corresponds to the top level pipeline items which produce the end results.
  98. type LeafPipelineItem interface {
  99. PipelineItem
  100. // Flag returns the cmdline name of the item.
  101. Flag() string
  102. // Finalize returns the result of the analysis.
  103. Finalize() interface{}
  104. // Serialize encodes the object returned by Finalize() to YAML or Protocol Buffers.
  105. Serialize(result interface{}, binary bool, writer io.Writer) error
  106. }
  107. // MergeablePipelineItem specifies the methods to combine several analysis results together.
  108. type MergeablePipelineItem interface {
  109. LeafPipelineItem
  110. // Deserialize loads the result from Protocol Buffers blob.
  111. Deserialize(pbmessage []byte) (interface{}, error)
  112. // MergeResults joins two results together. Common-s are specified as the global state.
  113. MergeResults(r1, r2 interface{}, c1, c2 *CommonAnalysisResult) interface{}
  114. }
  115. // CommonAnalysisResult holds the information which is always extracted at Pipeline.Run().
  116. type CommonAnalysisResult struct {
  117. // Time of the first commit in the analysed sequence.
  118. BeginTime int64
  119. // Time of the last commit in the analysed sequence.
  120. EndTime int64
  121. // The number of commits in the analysed sequence.
  122. CommitsNumber int
  123. // The duration of Pipeline.Run().
  124. RunTime time.Duration
  125. }
  126. // BeginTimeAsTime converts the UNIX timestamp of the beginning to Go time.
  127. func (car *CommonAnalysisResult) BeginTimeAsTime() time.Time {
  128. return time.Unix(car.BeginTime, 0)
  129. }
  130. // EndTimeAsTime converts the UNIX timestamp of the ending to Go time.
  131. func (car *CommonAnalysisResult) EndTimeAsTime() time.Time {
  132. return time.Unix(car.EndTime, 0)
  133. }
  134. // Merge combines the CommonAnalysisResult with an other one.
  135. // We choose the earlier BeginTime, the later EndTime, sum the number of commits and the
  136. // elapsed run times.
  137. func (car *CommonAnalysisResult) Merge(other *CommonAnalysisResult) {
  138. if car.EndTime == 0 || other.BeginTime == 0 {
  139. panic("Merging with an uninitialized CommonAnalysisResult")
  140. }
  141. if other.BeginTime < car.BeginTime {
  142. car.BeginTime = other.BeginTime
  143. }
  144. if other.EndTime > car.EndTime {
  145. car.EndTime = other.EndTime
  146. }
  147. car.CommitsNumber += other.CommitsNumber
  148. car.RunTime += other.RunTime
  149. }
  150. // FillMetadata copies the data to a Protobuf message.
  151. func (car *CommonAnalysisResult) FillMetadata(meta *pb.Metadata) *pb.Metadata {
  152. meta.BeginUnixTime = car.BeginTime
  153. meta.EndUnixTime = car.EndTime
  154. meta.Commits = int32(car.CommitsNumber)
  155. meta.RunTime = car.RunTime.Nanoseconds() / 1e6
  156. return meta
  157. }
  158. // MetadataToCommonAnalysisResult copies the data from a Protobuf message.
  159. func MetadataToCommonAnalysisResult(meta *pb.Metadata) *CommonAnalysisResult {
  160. return &CommonAnalysisResult{
  161. BeginTime: meta.BeginUnixTime,
  162. EndTime: meta.EndUnixTime,
  163. CommitsNumber: int(meta.Commits),
  164. RunTime: time.Duration(meta.RunTime * 1e6),
  165. }
  166. }
  167. // Pipeline is the core Hercules entity which carries several PipelineItems and executes them.
  168. // See the extended example of how a Pipeline works in doc.go
  169. type Pipeline struct {
  170. // OnProgress is the callback which is invoked in Analyse() to output it's
  171. // progress. The first argument is the number of processed commits and the
  172. // second is the total number of commits.
  173. OnProgress func(int, int)
  174. // Repository points to the analysed Git repository struct from go-git.
  175. repository *git.Repository
  176. // Items are the registered building blocks in the pipeline. The order defines the
  177. // execution sequence.
  178. items []PipelineItem
  179. // The collection of parameters to create items.
  180. facts map[string]interface{}
  181. // Feature flags which enable the corresponding items.
  182. features map[string]bool
  183. }
  184. const (
  185. // ConfigPipelineDumpPath is the name of the Pipeline configuration option (Pipeline.Initialize())
  186. // which enables saving the items DAG to the specified file.
  187. ConfigPipelineDumpPath = "Pipeline.DumpPath"
  188. // ConfigPipelineDryRun is the name of the Pipeline configuration option (Pipeline.Initialize())
  189. // which disables Configure() and Initialize() invocation on each PipelineItem during the
  190. // Pipeline initialization.
  191. // Subsequent Run() calls are going to fail. Useful with ConfigPipelineDumpPath=true.
  192. ConfigPipelineDryRun = "Pipeline.DryRun"
  193. // ConfigPipelineCommits is the name of the Pipeline configuration option (Pipeline.Initialize())
  194. // which allows to specify the custom commit sequence. By default, Pipeline.Commits() is used.
  195. ConfigPipelineCommits = "commits"
  196. )
  197. // NewPipeline initializes a new instance of Pipeline struct.
  198. func NewPipeline(repository *git.Repository) *Pipeline {
  199. return &Pipeline{
  200. repository: repository,
  201. items: []PipelineItem{},
  202. facts: map[string]interface{}{},
  203. features: map[string]bool{},
  204. }
  205. }
  206. // GetFact returns the value of the fact with the specified name.
  207. func (pipeline *Pipeline) GetFact(name string) interface{} {
  208. return pipeline.facts[name]
  209. }
  210. // SetFact sets the value of the fact with the specified name.
  211. func (pipeline *Pipeline) SetFact(name string, value interface{}) {
  212. pipeline.facts[name] = value
  213. }
  214. // GetFeature returns the state of the feature with the specified name (enabled/disabled) and
  215. // whether it exists. See also: FeaturedPipelineItem.
  216. func (pipeline *Pipeline) GetFeature(name string) (bool, bool) {
  217. val, exists := pipeline.features[name]
  218. return val, exists
  219. }
  220. // SetFeature sets the value of the feature with the specified name.
  221. // See also: FeaturedPipelineItem.
  222. func (pipeline *Pipeline) SetFeature(name string) {
  223. pipeline.features[name] = true
  224. }
  225. // SetFeaturesFromFlags enables the features which were specified through the command line flags
  226. // which belong to the given PipelineItemRegistry instance.
  227. // See also: AddItem().
  228. func (pipeline *Pipeline) SetFeaturesFromFlags(registry ...*PipelineItemRegistry) {
  229. var ffr *PipelineItemRegistry
  230. if len(registry) == 0 {
  231. ffr = Registry
  232. } else if len(registry) == 1 {
  233. ffr = registry[0]
  234. } else {
  235. panic("Zero or one registry is allowed to be passed.")
  236. }
  237. for _, feature := range ffr.featureFlags.Flags {
  238. pipeline.SetFeature(feature)
  239. }
  240. }
  241. // DeployItem inserts a PipelineItem into the pipeline. It also recursively creates all of it's
  242. // dependencies (PipelineItem.Requires()). Returns the same item as specified in the arguments.
  243. func (pipeline *Pipeline) DeployItem(item PipelineItem) PipelineItem {
  244. fpi, ok := item.(FeaturedPipelineItem)
  245. if ok {
  246. for _, f := range fpi.Features() {
  247. pipeline.SetFeature(f)
  248. }
  249. }
  250. queue := []PipelineItem{}
  251. queue = append(queue, item)
  252. added := map[string]PipelineItem{}
  253. for _, item := range pipeline.items {
  254. added[item.Name()] = item
  255. }
  256. added[item.Name()] = item
  257. pipeline.AddItem(item)
  258. for len(queue) > 0 {
  259. head := queue[0]
  260. queue = queue[1:]
  261. for _, dep := range head.Requires() {
  262. for _, sibling := range Registry.Summon(dep) {
  263. if _, exists := added[sibling.Name()]; !exists {
  264. disabled := false
  265. // If this item supports features, check them against the activated in pipeline.features
  266. if fpi, matches := sibling.(FeaturedPipelineItem); matches {
  267. for _, feature := range fpi.Features() {
  268. if !pipeline.features[feature] {
  269. disabled = true
  270. break
  271. }
  272. }
  273. }
  274. if disabled {
  275. continue
  276. }
  277. added[sibling.Name()] = sibling
  278. queue = append(queue, sibling)
  279. pipeline.AddItem(sibling)
  280. }
  281. }
  282. }
  283. }
  284. return item
  285. }
  286. // AddItem inserts a PipelineItem into the pipeline. It does not check any dependencies.
  287. // See also: DeployItem().
  288. func (pipeline *Pipeline) AddItem(item PipelineItem) PipelineItem {
  289. pipeline.items = append(pipeline.items, item)
  290. return item
  291. }
  292. // RemoveItem deletes a PipelineItem from the pipeline. It leaves all the rest of the items intact.
  293. func (pipeline *Pipeline) RemoveItem(item PipelineItem) {
  294. for i, reg := range pipeline.items {
  295. if reg == item {
  296. pipeline.items = append(pipeline.items[:i], pipeline.items[i+1:]...)
  297. return
  298. }
  299. }
  300. }
  301. // Len returns the number of items in the pipeline.
  302. func (pipeline *Pipeline) Len() int {
  303. return len(pipeline.items)
  304. }
  305. // Commits returns the critical path in the repository's history. It starts
  306. // from HEAD and traces commits backwards till the root. When it encounters
  307. // a merge (more than one parent), it always chooses the first parent.
  308. func (pipeline *Pipeline) Commits() []*object.Commit {
  309. result := []*object.Commit{}
  310. repository := pipeline.repository
  311. head, err := repository.Head()
  312. if err != nil {
  313. panic(err)
  314. }
  315. commit, err := repository.CommitObject(head.Hash())
  316. if err != nil {
  317. panic(err)
  318. }
  319. // the first parent matches the head
  320. for ; err != io.EOF; commit, err = commit.Parents().Next() {
  321. if err != nil {
  322. panic(err)
  323. }
  324. result = append(result, commit)
  325. }
  326. // reverse the order
  327. for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
  328. result[i], result[j] = result[j], result[i]
  329. }
  330. return result
  331. }
  332. type sortablePipelineItems []PipelineItem
  333. func (items sortablePipelineItems) Len() int {
  334. return len(items)
  335. }
  336. func (items sortablePipelineItems) Less(i, j int) bool {
  337. return items[i].Name() < items[j].Name()
  338. }
  339. func (items sortablePipelineItems) Swap(i, j int) {
  340. items[i], items[j] = items[j], items[i]
  341. }
  342. func (pipeline *Pipeline) resolve(dumpPath string) {
  343. graph := toposort.NewGraph()
  344. sort.Sort(sortablePipelineItems(pipeline.items))
  345. name2item := map[string]PipelineItem{}
  346. ambiguousMap := map[string][]string{}
  347. nameUsages := map[string]int{}
  348. for _, item := range pipeline.items {
  349. nameUsages[item.Name()]++
  350. }
  351. counters := map[string]int{}
  352. for _, item := range pipeline.items {
  353. name := item.Name()
  354. if nameUsages[name] > 1 {
  355. index := counters[item.Name()] + 1
  356. counters[item.Name()] = index
  357. name = fmt.Sprintf("%s_%d", item.Name(), index)
  358. }
  359. graph.AddNode(name)
  360. name2item[name] = item
  361. for _, key := range item.Provides() {
  362. key = "[" + key + "]"
  363. graph.AddNode(key)
  364. if graph.AddEdge(name, key) > 1 {
  365. if ambiguousMap[key] != nil {
  366. fmt.Fprintln(os.Stderr, "Pipeline:")
  367. for _, item2 := range pipeline.items {
  368. if item2 == item {
  369. fmt.Fprint(os.Stderr, "> ")
  370. }
  371. fmt.Fprint(os.Stderr, item2.Name(), " [")
  372. for i, key2 := range item2.Provides() {
  373. fmt.Fprint(os.Stderr, key2)
  374. if i < len(item.Provides())-1 {
  375. fmt.Fprint(os.Stderr, ", ")
  376. }
  377. }
  378. fmt.Fprintln(os.Stderr, "]")
  379. }
  380. panic("Failed to resolve pipeline dependencies: ambiguous graph.")
  381. }
  382. ambiguousMap[key] = graph.FindParents(key)
  383. }
  384. }
  385. }
  386. counters = map[string]int{}
  387. for _, item := range pipeline.items {
  388. name := item.Name()
  389. if nameUsages[name] > 1 {
  390. index := counters[item.Name()] + 1
  391. counters[item.Name()] = index
  392. name = fmt.Sprintf("%s_%d", item.Name(), index)
  393. }
  394. for _, key := range item.Requires() {
  395. key = "[" + key + "]"
  396. if graph.AddEdge(key, name) == 0 {
  397. panic(fmt.Sprintf("Unsatisfied dependency: %s -> %s", key, item.Name()))
  398. }
  399. }
  400. }
  401. // Try to break the cycles in some known scenarios.
  402. if len(ambiguousMap) > 0 {
  403. ambiguous := []string{}
  404. for key := range ambiguousMap {
  405. ambiguous = append(ambiguous, key)
  406. }
  407. sort.Strings(ambiguous)
  408. bfsorder := graph.BreadthSort()
  409. bfsindex := map[string]int{}
  410. for i, s := range bfsorder {
  411. bfsindex[s] = i
  412. }
  413. for len(ambiguous) > 0 {
  414. key := ambiguous[0]
  415. ambiguous = ambiguous[1:]
  416. pair := ambiguousMap[key]
  417. inheritor := pair[1]
  418. if bfsindex[pair[1]] < bfsindex[pair[0]] {
  419. inheritor = pair[0]
  420. }
  421. removed := graph.RemoveEdge(key, inheritor)
  422. cycle := map[string]bool{}
  423. for _, node := range graph.FindCycle(key) {
  424. cycle[node] = true
  425. }
  426. if len(cycle) == 0 {
  427. cycle[inheritor] = true
  428. }
  429. if removed {
  430. graph.AddEdge(key, inheritor)
  431. }
  432. graph.RemoveEdge(inheritor, key)
  433. graph.ReindexNode(inheritor)
  434. // for all nodes key links to except those in cycle, put the link from inheritor
  435. for _, node := range graph.FindChildren(key) {
  436. if _, exists := cycle[node]; !exists {
  437. graph.AddEdge(inheritor, node)
  438. graph.RemoveEdge(key, node)
  439. }
  440. }
  441. graph.ReindexNode(key)
  442. }
  443. }
  444. var graphCopy *toposort.Graph
  445. if dumpPath != "" {
  446. graphCopy = graph.Copy()
  447. }
  448. strplan, ok := graph.Toposort()
  449. if !ok {
  450. panic("Failed to resolve pipeline dependencies: unable to topologically sort the items.")
  451. }
  452. pipeline.items = make([]PipelineItem, 0, len(pipeline.items))
  453. for _, key := range strplan {
  454. if item, ok := name2item[key]; ok {
  455. pipeline.items = append(pipeline.items, item)
  456. }
  457. }
  458. if dumpPath != "" {
  459. // If there is a floating difference, uncomment this:
  460. // fmt.Fprint(os.Stderr, graphCopy.DebugDump())
  461. ioutil.WriteFile(dumpPath, []byte(graphCopy.Serialize(strplan)), 0666)
  462. absPath, _ := filepath.Abs(dumpPath)
  463. log.Printf("Wrote the DAG to %s\n", absPath)
  464. }
  465. }
  466. // Initialize prepares the pipeline for the execution (Run()). This function
  467. // resolves the execution DAG, Configure()-s and Initialize()-s the items in it in the
  468. // topological dependency order. `facts` are passed inside Configure(). They are mutable.
  469. func (pipeline *Pipeline) Initialize(facts map[string]interface{}) {
  470. if facts == nil {
  471. facts = map[string]interface{}{}
  472. }
  473. if _, exists := facts[ConfigPipelineCommits]; !exists {
  474. facts[ConfigPipelineCommits] = pipeline.Commits()
  475. }
  476. dumpPath, _ := facts[ConfigPipelineDumpPath].(string)
  477. pipeline.resolve(dumpPath)
  478. if dryRun, _ := facts[ConfigPipelineDryRun].(bool); dryRun {
  479. return
  480. }
  481. for _, item := range pipeline.items {
  482. item.Configure(facts)
  483. }
  484. for _, item := range pipeline.items {
  485. item.Initialize(pipeline.repository)
  486. }
  487. }
  488. // Run method executes the pipeline.
  489. //
  490. // commits is a slice with the sequential commit history. It shall start from
  491. // the root (ascending order).
  492. //
  493. // Returns the mapping from each LeafPipelineItem to the corresponding analysis result.
  494. // There is always a "nil" record with CommonAnalysisResult.
  495. func (pipeline *Pipeline) Run(commits []*object.Commit) (map[LeafPipelineItem]interface{}, error) {
  496. startRunTime := time.Now()
  497. onProgress := pipeline.OnProgress
  498. if onProgress == nil {
  499. onProgress = func(int, int) {}
  500. }
  501. for index, commit := range commits {
  502. onProgress(index, len(commits))
  503. state := map[string]interface{}{"commit": commit, "index": index}
  504. for _, item := range pipeline.items {
  505. update, err := item.Consume(state)
  506. if err != nil {
  507. log.Printf("%s failed on commit #%d %s\n",
  508. item.Name(), index, commit.Hash.String())
  509. return nil, err
  510. }
  511. for _, key := range item.Provides() {
  512. val, ok := update[key]
  513. if !ok {
  514. panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
  515. }
  516. state[key] = val
  517. }
  518. }
  519. }
  520. onProgress(len(commits), len(commits))
  521. result := map[LeafPipelineItem]interface{}{}
  522. for _, item := range pipeline.items {
  523. if casted, ok := item.(LeafPipelineItem); ok {
  524. result[casted] = casted.Finalize()
  525. }
  526. }
  527. result[nil] = &CommonAnalysisResult{
  528. BeginTime: commits[0].Author.When.Unix(),
  529. EndTime: commits[len(commits)-1].Author.When.Unix(),
  530. CommitsNumber: len(commits),
  531. RunTime: time.Since(startRunTime),
  532. }
  533. return result, nil
  534. }
  535. // LoadCommitsFromFile reads the file by the specified FS path and generates the sequence of commits
  536. // by interpreting each line as a Git commit hash.
  537. func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Commit, error) {
  538. var file io.ReadCloser
  539. if path != "-" {
  540. var err error
  541. file, err = os.Open(path)
  542. if err != nil {
  543. return nil, err
  544. }
  545. defer file.Close()
  546. } else {
  547. file = os.Stdin
  548. }
  549. scanner := bufio.NewScanner(file)
  550. commits := []*object.Commit{}
  551. for scanner.Scan() {
  552. hash := plumbing.NewHash(scanner.Text())
  553. if len(hash) != 20 {
  554. return nil, errors.New("invalid commit hash " + scanner.Text())
  555. }
  556. commit, err := repository.CommitObject(hash)
  557. if err != nil {
  558. return nil, err
  559. }
  560. commits = append(commits, commit)
  561. }
  562. return commits, nil
  563. }