pipeline.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594
  1. package hercules
  2. import (
  3. "bufio"
  4. "errors"
  5. "flag"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "os"
  10. "path/filepath"
  11. "reflect"
  12. "sort"
  13. "strings"
  14. "unsafe"
  15. "gopkg.in/src-d/go-git.v4"
  16. "gopkg.in/src-d/go-git.v4/plumbing"
  17. "gopkg.in/src-d/go-git.v4/plumbing/object"
  18. "gopkg.in/src-d/hercules.v3/toposort"
  19. )
  20. type ConfigurationOptionType int
  21. const (
  22. // Boolean value type.
  23. BoolConfigurationOption ConfigurationOptionType = iota
  24. // Integer value type.
  25. IntConfigurationOption
  26. // String value type.
  27. StringConfigurationOption
  28. )
  29. const (
  30. ConfigPipelineDumpPath = "Pipeline.DumpPath"
  31. ConfigPipelineDryRun = "Pipeline.DryRun"
  32. )
  33. // ConfigurationOption allows for the unified, retrospective way to setup PipelineItem-s.
  34. type ConfigurationOption struct {
  35. // Name identifies the configuration option in facts.
  36. Name string
  37. // Description represents the help text about the configuration option.
  38. Description string
  39. // Flag corresponds to the CLI token with "-" prepended.
  40. Flag string
  41. // Type specifies the kind of the configuration option's value.
  42. Type ConfigurationOptionType
  43. // Default is the initial value of the configuration option.
  44. Default interface{}
  45. }
  46. // PipelineItem is the interface for all the units of the Git commit analysis pipeline.
  47. type PipelineItem interface {
  48. // Name returns the name of the analysis.
  49. Name() string
  50. // Provides returns the list of keys of reusable calculated entities.
  51. // Other items may depend on them.
  52. Provides() []string
  53. // Requires returns the list of keys of needed entities which must be supplied in Consume().
  54. Requires() []string
  55. // ListConfigurationOptions returns the list of available options which can be consumed by Configure().
  56. ListConfigurationOptions() []ConfigurationOption
  57. // Configure performs the initial setup of the object by applying parameters from facts.
  58. // It allows to create PipelineItems in a universal way.
  59. Configure(facts map[string]interface{})
  60. // Initialize prepares and resets the item. Consume() requires Initialize()
  61. // to be called at least once beforehand.
  62. Initialize(*git.Repository)
  63. // Consume processes the next commit.
  64. // deps contains the required entities which match Depends(). Besides, it always includes
  65. // "commit" and "index".
  66. // Returns the calculated entities which match Provides().
  67. Consume(deps map[string]interface{}) (map[string]interface{}, error)
  68. }
  69. // FeaturedPipelineItem enables switching the automatic insertion of pipeline items on or off.
  70. type FeaturedPipelineItem interface {
  71. PipelineItem
  72. // Features returns the list of names which enable this item to be automatically inserted
  73. // in Pipeline.DeployItem().
  74. Features() []string
  75. }
  76. // LeafPipelineItem corresponds to the top level pipeline items which produce the end results.
  77. type LeafPipelineItem interface {
  78. PipelineItem
  79. // Flag returns the cmdline name of the item.
  80. Flag() string
  81. // Finalize returns the result of the analysis.
  82. Finalize() interface{}
  83. // Serialize encodes the object returned by Finalize() to Text or Protocol Buffers.
  84. Serialize(result interface{}, binary bool, writer io.Writer) error
  85. }
  86. // PipelineItemRegistry contains all the known PipelineItem-s.
  87. type PipelineItemRegistry struct {
  88. provided map[string][]reflect.Type
  89. registered map[string]reflect.Type
  90. flags map[string]reflect.Type
  91. }
  92. // Register adds another PipelineItem to the registry.
  93. func (registry *PipelineItemRegistry) Register(example PipelineItem) {
  94. t := reflect.TypeOf(example)
  95. registry.registered[example.Name()] = t
  96. if fpi, ok := interface{}(example).(LeafPipelineItem); ok {
  97. registry.flags[fpi.Flag()] = t
  98. }
  99. for _, dep := range example.Provides() {
  100. ts := registry.provided[dep]
  101. if ts == nil {
  102. ts = []reflect.Type{}
  103. }
  104. ts = append(ts, t)
  105. registry.provided[dep] = ts
  106. }
  107. }
  108. func (registry *PipelineItemRegistry) Summon(providesOrName string) []PipelineItem {
  109. if registry.provided == nil {
  110. return []PipelineItem{}
  111. }
  112. ts := registry.provided[providesOrName]
  113. items := []PipelineItem{}
  114. for _, t := range ts {
  115. items = append(items, reflect.New(t.Elem()).Interface().(PipelineItem))
  116. }
  117. if t, exists := registry.registered[providesOrName]; exists {
  118. items = append(items, reflect.New(t.Elem()).Interface().(PipelineItem))
  119. }
  120. return items
  121. }
  122. type arrayFeatureFlags struct {
  123. // Flags containts the features activated through the command line.
  124. Flags []string
  125. // Choices contains all registered features.
  126. Choices map[string]bool
  127. }
  128. func (acf *arrayFeatureFlags) String() string {
  129. return strings.Join([]string(acf.Flags), ", ")
  130. }
  131. func (acf *arrayFeatureFlags) Set(value string) error {
  132. if _, exists := acf.Choices[value]; !exists {
  133. return errors.New(fmt.Sprintf("Feature \"%s\" is not registered.", value))
  134. }
  135. acf.Flags = append(acf.Flags, value)
  136. return nil
  137. }
  138. var featureFlags = arrayFeatureFlags{Flags: []string{}, Choices: map[string]bool{}}
  139. // AddFlags inserts the cmdline options from PipelineItem.ListConfigurationOptions(),
  140. // FeaturedPipelineItem().Features() and LeafPipelineItem.Flag() into the global "flag" parser
  141. // built into the Go runtime.
  142. // Returns the "facts" which can be fed into PipelineItem.Configure() and the dictionary of
  143. // runnable analysis (LeafPipelineItem) choices. E.g. if "BurndownAnalysis" was activated
  144. // through "-burndown" cmdline argument, this mapping would contain ["BurndownAnalysis"] = *true.
  145. func (registry *PipelineItemRegistry) AddFlags() (map[string]interface{}, map[string]*bool) {
  146. flags := map[string]interface{}{}
  147. deployed := map[string]*bool{}
  148. for name, it := range registry.registered {
  149. formatHelp := func(desc string) string {
  150. return fmt.Sprintf("%s [%s]", desc, name)
  151. }
  152. itemIface := reflect.New(it.Elem()).Interface()
  153. for _, opt := range itemIface.(PipelineItem).ListConfigurationOptions() {
  154. var iface interface{}
  155. switch opt.Type {
  156. case BoolConfigurationOption:
  157. iface = interface{}(true)
  158. ptr := (**bool)(unsafe.Pointer(uintptr(unsafe.Pointer(&iface)) + unsafe.Sizeof(&iface)))
  159. *ptr = flag.Bool(opt.Flag, opt.Default.(bool), formatHelp(opt.Description))
  160. case IntConfigurationOption:
  161. iface = interface{}(0)
  162. ptr := (**int)(unsafe.Pointer(uintptr(unsafe.Pointer(&iface)) + unsafe.Sizeof(&iface)))
  163. *ptr = flag.Int(opt.Flag, opt.Default.(int), formatHelp(opt.Description))
  164. case StringConfigurationOption:
  165. iface = interface{}("")
  166. ptr := (**string)(unsafe.Pointer(uintptr(unsafe.Pointer(&iface)) + unsafe.Sizeof(&iface)))
  167. *ptr = flag.String(opt.Flag, opt.Default.(string), formatHelp(opt.Description))
  168. }
  169. flags[opt.Name] = iface
  170. }
  171. if fpi, ok := itemIface.(FeaturedPipelineItem); ok {
  172. for _, f := range fpi.Features() {
  173. featureFlags.Choices[f] = true
  174. }
  175. }
  176. if fpi, ok := itemIface.(LeafPipelineItem); ok {
  177. deployed[fpi.Name()] = flag.Bool(
  178. fpi.Flag(), false, fmt.Sprintf("Runs %s analysis.", fpi.Name()))
  179. }
  180. }
  181. {
  182. // Pipeline flags
  183. iface := interface{}("")
  184. ptr1 := (**string)(unsafe.Pointer(uintptr(unsafe.Pointer(&iface)) + unsafe.Sizeof(&iface)))
  185. *ptr1 = flag.String("dump-dag", "", "Write the pipeline DAG to a Graphviz file.")
  186. flags[ConfigPipelineDumpPath] = iface
  187. iface = interface{}(true)
  188. ptr2 := (**bool)(unsafe.Pointer(uintptr(unsafe.Pointer(&iface)) + unsafe.Sizeof(&iface)))
  189. *ptr2 = flag.Bool("dry-run", false, "Do not run any analyses - only resolve the DAG. " +
  190. "Useful for -dump-dag.")
  191. flags[ConfigPipelineDryRun] = iface
  192. }
  193. features := []string{}
  194. for f := range featureFlags.Choices {
  195. features = append(features, f)
  196. }
  197. flag.Var(&featureFlags, "feature",
  198. fmt.Sprintf("Enables specific analysis features, can be specified "+
  199. "multiple times. Available features: [%s].", strings.Join(features, ", ")))
  200. return flags, deployed
  201. }
  202. // Registry contains all known pipeline item types.
  203. var Registry = &PipelineItemRegistry{
  204. provided: map[string][]reflect.Type{},
  205. registered: map[string]reflect.Type{},
  206. flags: map[string]reflect.Type{},
  207. }
  208. type wrappedPipelineItem struct {
  209. Item PipelineItem
  210. Children []wrappedPipelineItem
  211. }
  212. type Pipeline struct {
  213. // OnProgress is the callback which is invoked in Analyse() to output it's
  214. // progress. The first argument is the number of processed commits and the
  215. // second is the total number of commits.
  216. OnProgress func(int, int)
  217. // Repository points to the analysed Git repository struct from go-git.
  218. repository *git.Repository
  219. // Items are the registered building blocks in the pipeline. The order defines the
  220. // execution sequence.
  221. items []PipelineItem
  222. // The collection of parameters to create items.
  223. facts map[string]interface{}
  224. // Feature flags which enable the corresponding items.
  225. features map[string]bool
  226. }
  227. const FactPipelineCommits = "commits"
  228. func NewPipeline(repository *git.Repository) *Pipeline {
  229. return &Pipeline{
  230. repository: repository,
  231. items: []PipelineItem{},
  232. facts: map[string]interface{}{},
  233. features: map[string]bool{},
  234. }
  235. }
  236. func (pipeline *Pipeline) GetFact(name string) interface{} {
  237. return pipeline.facts[name]
  238. }
  239. func (pipeline *Pipeline) SetFact(name string, value interface{}) {
  240. pipeline.facts[name] = value
  241. }
  242. func (pipeline *Pipeline) GetFeature(name string) (bool, bool) {
  243. val, exists := pipeline.features[name]
  244. return val, exists
  245. }
  246. func (pipeline *Pipeline) SetFeature(name string) {
  247. pipeline.features[name] = true
  248. }
  249. func (pipeline *Pipeline) SetFeaturesFromFlags() {
  250. for _, feature := range featureFlags.Flags {
  251. pipeline.SetFeature(feature)
  252. }
  253. }
  254. func (pipeline *Pipeline) DeployItem(item PipelineItem) PipelineItem {
  255. queue := []PipelineItem{}
  256. queue = append(queue, item)
  257. added := map[string]PipelineItem{}
  258. for _, item := range pipeline.items {
  259. added[item.Name()] = item
  260. }
  261. added[item.Name()] = item
  262. pipeline.AddItem(item)
  263. for len(queue) > 0 {
  264. head := queue[0]
  265. queue = queue[1:]
  266. for _, dep := range head.Requires() {
  267. for _, sibling := range Registry.Summon(dep) {
  268. if _, exists := added[sibling.Name()]; !exists {
  269. disabled := false
  270. // If this item supports features, check them against the activated in pipeline.features
  271. if fpi, matches := interface{}(sibling).(FeaturedPipelineItem); matches {
  272. for _, feature := range fpi.Features() {
  273. if !pipeline.features[feature] {
  274. disabled = true
  275. break
  276. }
  277. }
  278. }
  279. if disabled {
  280. continue
  281. }
  282. added[sibling.Name()] = sibling
  283. queue = append(queue, sibling)
  284. pipeline.AddItem(sibling)
  285. }
  286. }
  287. }
  288. }
  289. return item
  290. }
  291. func (pipeline *Pipeline) AddItem(item PipelineItem) PipelineItem {
  292. pipeline.items = append(pipeline.items, item)
  293. return item
  294. }
  295. func (pipeline *Pipeline) RemoveItem(item PipelineItem) {
  296. for i, reg := range pipeline.items {
  297. if reg == item {
  298. pipeline.items = append(pipeline.items[:i], pipeline.items[i+1:]...)
  299. return
  300. }
  301. }
  302. }
  303. func (pipeline *Pipeline) Len() int {
  304. return len(pipeline.items)
  305. }
  306. // Commits returns the critical path in the repository's history. It starts
  307. // from HEAD and traces commits backwards till the root. When it encounters
  308. // a merge (more than one parent), it always chooses the first parent.
  309. func (pipeline *Pipeline) Commits() []*object.Commit {
  310. result := []*object.Commit{}
  311. repository := pipeline.repository
  312. head, err := repository.Head()
  313. if err != nil {
  314. panic(err)
  315. }
  316. commit, err := repository.CommitObject(head.Hash())
  317. if err != nil {
  318. panic(err)
  319. }
  320. // the first parent matches the head
  321. for ; err != io.EOF; commit, err = commit.Parents().Next() {
  322. if err != nil {
  323. panic(err)
  324. }
  325. result = append(result, commit)
  326. }
  327. // reverse the order
  328. for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
  329. result[i], result[j] = result[j], result[i]
  330. }
  331. return result
  332. }
  333. type sortablePipelineItems []PipelineItem
  334. func (items sortablePipelineItems) Len() int {
  335. return len(items)
  336. }
  337. func (items sortablePipelineItems) Less(i, j int) bool {
  338. return items[i].Name() < items[j].Name()
  339. }
  340. func (items sortablePipelineItems) Swap(i, j int) {
  341. items[i], items[j] = items[j], items[i]
  342. }
  343. func (pipeline *Pipeline) resolve(dumpPath string) {
  344. graph := toposort.NewGraph()
  345. sort.Sort(sortablePipelineItems(pipeline.items))
  346. name2item := map[string]PipelineItem{}
  347. ambiguousMap := map[string][]string{}
  348. nameUsages := map[string]int{}
  349. for _, item := range pipeline.items {
  350. nameUsages[item.Name()]++
  351. }
  352. counters := map[string]int{}
  353. for _, item := range pipeline.items {
  354. name := item.Name()
  355. if nameUsages[name] > 1 {
  356. index := counters[item.Name()] + 1
  357. counters[item.Name()] = index
  358. name = fmt.Sprintf("%s_%d", item.Name(), index)
  359. }
  360. graph.AddNode(name)
  361. name2item[name] = item
  362. for _, key := range item.Provides() {
  363. key = "[" + key + "]"
  364. graph.AddNode(key)
  365. if graph.AddEdge(name, key) > 1 {
  366. if ambiguousMap[key] != nil {
  367. fmt.Fprintln(os.Stderr, "Pipeline:")
  368. for _, item2 := range pipeline.items {
  369. if item2 == item {
  370. fmt.Fprint(os.Stderr, "> ")
  371. }
  372. fmt.Fprint(os.Stderr, item2.Name(), " [")
  373. for i, key2 := range item2.Provides() {
  374. fmt.Fprint(os.Stderr, key2)
  375. if i < len(item.Provides()) - 1 {
  376. fmt.Fprint(os.Stderr, ", ")
  377. }
  378. }
  379. fmt.Fprintln(os.Stderr, "]")
  380. }
  381. panic("Failed to resolve pipeline dependencies: ambiguous graph.")
  382. }
  383. ambiguousMap[key] = graph.FindParents(key)
  384. }
  385. }
  386. }
  387. counters = map[string]int{}
  388. for _, item := range pipeline.items {
  389. name := item.Name()
  390. if nameUsages[name] > 1 {
  391. index := counters[item.Name()] + 1
  392. counters[item.Name()] = index
  393. name = fmt.Sprintf("%s_%d", item.Name(), index)
  394. }
  395. for _, key := range item.Requires() {
  396. key = "[" + key + "]"
  397. if graph.AddEdge(key, name) == 0 {
  398. panic(fmt.Sprintf("Unsatisfied dependency: %s -> %s", key, item.Name()))
  399. }
  400. }
  401. }
  402. if len(ambiguousMap) > 0 {
  403. ambiguous := []string{}
  404. for key := range ambiguousMap {
  405. ambiguous = append(ambiguous, key)
  406. }
  407. sort.Strings(ambiguous)
  408. bfsorder := graph.BreadthSort()
  409. bfsindex := map[string]int{}
  410. for i, s := range bfsorder {
  411. bfsindex[s] = i
  412. }
  413. for len(ambiguous) > 0 {
  414. key := ambiguous[0]
  415. ambiguous = ambiguous[1:]
  416. pair := ambiguousMap[key]
  417. inheritor := pair[1]
  418. if bfsindex[pair[1]] < bfsindex[pair[0]] {
  419. inheritor = pair[0]
  420. }
  421. removed := graph.RemoveEdge(key, inheritor)
  422. cycle := map[string]bool{}
  423. for _, node := range graph.FindCycle(key) {
  424. cycle[node] = true
  425. }
  426. if len(cycle) == 0 {
  427. cycle[inheritor] = true
  428. }
  429. if removed {
  430. graph.AddEdge(key, inheritor)
  431. }
  432. graph.RemoveEdge(inheritor, key)
  433. graph.ReindexNode(inheritor)
  434. // for all nodes key links to except those in cycle, put the link from inheritor
  435. for _, node := range graph.FindChildren(key) {
  436. if _, exists := cycle[node]; !exists {
  437. graph.AddEdge(inheritor, node)
  438. graph.RemoveEdge(key, node)
  439. }
  440. }
  441. graph.ReindexNode(key)
  442. }
  443. }
  444. var graphCopy *toposort.Graph
  445. if dumpPath != "" {
  446. graphCopy = graph.Copy()
  447. }
  448. strplan, ok := graph.Toposort()
  449. if !ok {
  450. panic("Failed to resolve pipeline dependencies: unable to topologically sort the items.")
  451. }
  452. pipeline.items = make([]PipelineItem, 0, len(pipeline.items))
  453. for _, key := range strplan {
  454. if item, ok := name2item[key]; ok {
  455. pipeline.items = append(pipeline.items, item)
  456. }
  457. }
  458. if dumpPath != "" {
  459. // If there is a floating difference, uncomment this:
  460. // fmt.Fprint(os.Stderr, graphCopy.DebugDump())
  461. ioutil.WriteFile(dumpPath, []byte(graphCopy.Serialize(strplan)), 0666)
  462. absPath, _ := filepath.Abs(dumpPath)
  463. fmt.Fprintf(os.Stderr, "Wrote the DAG to %s\n", absPath)
  464. }
  465. }
  466. func (pipeline *Pipeline) Initialize(facts map[string]interface{}) {
  467. if facts == nil {
  468. facts = map[string]interface{}{}
  469. }
  470. if _, exists := facts[FactPipelineCommits]; !exists {
  471. facts[FactPipelineCommits] = pipeline.Commits()
  472. }
  473. dumpPath, _ := facts[ConfigPipelineDumpPath].(string)
  474. pipeline.resolve(dumpPath)
  475. if dryRun, _ := facts[ConfigPipelineDryRun].(bool); dryRun {
  476. return
  477. }
  478. for _, item := range pipeline.items {
  479. item.Configure(facts)
  480. }
  481. for _, item := range pipeline.items {
  482. item.Initialize(pipeline.repository)
  483. }
  484. }
  485. // Run executes the pipeline.
  486. //
  487. // commits is a slice with the sequential commit history. It shall start from
  488. // the root (ascending order).
  489. func (pipeline *Pipeline) Run(commits []*object.Commit) (map[PipelineItem]interface{}, error) {
  490. onProgress := pipeline.OnProgress
  491. if onProgress == nil {
  492. onProgress = func(int, int) {}
  493. }
  494. for index, commit := range commits {
  495. onProgress(index, len(commits))
  496. state := map[string]interface{}{"commit": commit, "index": index}
  497. for _, item := range pipeline.items {
  498. update, err := item.Consume(state)
  499. if err != nil {
  500. fmt.Fprintf(os.Stderr, "%s failed on commit #%d %s\n",
  501. item.Name(), index, commit.Hash.String())
  502. return nil, err
  503. }
  504. for _, key := range item.Provides() {
  505. val, ok := update[key]
  506. if !ok {
  507. panic(fmt.Sprintf("%s: Consume() did not return %s", item.Name(), key))
  508. }
  509. state[key] = val
  510. }
  511. }
  512. }
  513. onProgress(len(commits), len(commits))
  514. result := map[PipelineItem]interface{}{}
  515. for _, item := range pipeline.items {
  516. if fpi, ok := interface{}(item).(LeafPipelineItem); ok {
  517. result[item] = fpi.Finalize()
  518. }
  519. }
  520. return result, nil
  521. }
  522. func LoadCommitsFromFile(path string, repository *git.Repository) ([]*object.Commit, error) {
  523. var file io.ReadCloser
  524. if path != "-" {
  525. var err error
  526. file, err = os.Open(path)
  527. if err != nil {
  528. return nil, err
  529. }
  530. defer file.Close()
  531. } else {
  532. file = os.Stdin
  533. }
  534. scanner := bufio.NewScanner(file)
  535. commits := []*object.Commit{}
  536. for scanner.Scan() {
  537. hash := plumbing.NewHash(scanner.Text())
  538. if len(hash) != 20 {
  539. return nil, errors.New("invalid commit hash " + scanner.Text())
  540. }
  541. commit, err := repository.CommitObject(hash)
  542. if err != nil {
  543. return nil, err
  544. }
  545. commits = append(commits, commit)
  546. }
  547. return commits, nil
  548. }