forks.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. package core
  2. import (
  3. "fmt"
  4. "log"
  5. "os"
  6. "reflect"
  7. "sort"
  8. "gopkg.in/src-d/go-git.v4/plumbing"
  9. "gopkg.in/src-d/go-git.v4/plumbing/object"
  10. "gopkg.in/src-d/hercules.v6/internal/toposort"
  11. )
  12. // OneShotMergeProcessor provides the convenience method to consume merges only once.
  13. type OneShotMergeProcessor struct {
  14. merges map[plumbing.Hash]bool
  15. }
  16. // Initialize resets OneShotMergeProcessor.
  17. func (proc *OneShotMergeProcessor) Initialize() {
  18. proc.merges = map[plumbing.Hash]bool{}
  19. }
  20. // ShouldConsumeCommit returns true on regular commits. It also returns true upon
  21. // the first occurrence of a particular merge commit.
  22. func (proc *OneShotMergeProcessor) ShouldConsumeCommit(deps map[string]interface{}) bool {
  23. commit := deps[DependencyCommit].(*object.Commit)
  24. if commit.NumParents() <= 1 {
  25. return true
  26. }
  27. if !proc.merges[commit.Hash] {
  28. proc.merges[commit.Hash] = true
  29. return true
  30. }
  31. return false
  32. }
  33. // NoopMerger provides an empty Merge() method suitable for PipelineItem.
  34. type NoopMerger struct {
  35. }
  36. // Merge does nothing.
  37. func (merger *NoopMerger) Merge(branches []PipelineItem) {
  38. // no-op
  39. }
  40. // ForkSamePipelineItem clones items by referencing the same origin.
  41. func ForkSamePipelineItem(origin PipelineItem, n int) []PipelineItem {
  42. clones := make([]PipelineItem, n)
  43. for i := 0; i < n; i++ {
  44. clones[i] = origin
  45. }
  46. return clones
  47. }
  48. // ForkCopyPipelineItem clones items by copying them by value from the origin.
  49. func ForkCopyPipelineItem(origin PipelineItem, n int) []PipelineItem {
  50. originValue := reflect.Indirect(reflect.ValueOf(origin))
  51. originType := originValue.Type()
  52. clones := make([]PipelineItem, n)
  53. for i := 0; i < n; i++ {
  54. cloneValue := reflect.New(originType).Elem()
  55. cloneValue.Set(originValue)
  56. clones[i] = cloneValue.Addr().Interface().(PipelineItem)
  57. }
  58. return clones
  59. }
  60. const (
  61. // runActionCommit corresponds to a regular commit
  62. runActionCommit = 0
  63. // runActionFork splits a branch into several parts
  64. runActionFork = iota
  65. // runActionMerge merges several branches together
  66. runActionMerge = iota
  67. // runActionEmerge starts a root branch
  68. runActionEmerge = iota
  69. // runActionDelete removes the branch as it is no longer needed
  70. runActionDelete = iota
  71. // rootBranchIndex is the minimum branch index in the plan
  72. rootBranchIndex = 1
  73. )
  74. // planPrintFunc is used to print the execution plan in prepareRunPlan().
  75. var planPrintFunc = func(args ...interface{}) {
  76. fmt.Fprintln(os.Stderr, args...)
  77. }
  78. type runAction struct {
  79. Action int
  80. Commit *object.Commit
  81. Items []int
  82. }
  83. type orderer = func(reverse, direction bool) []string
  84. func cloneItems(origin []PipelineItem, n int) [][]PipelineItem {
  85. clones := make([][]PipelineItem, n)
  86. for j := 0; j < n; j++ {
  87. clones[j] = make([]PipelineItem, len(origin))
  88. }
  89. for i, item := range origin {
  90. itemClones := item.Fork(n)
  91. for j := 0; j < n; j++ {
  92. clones[j][i] = itemClones[j]
  93. }
  94. }
  95. return clones
  96. }
  97. func mergeItems(branches [][]PipelineItem) {
  98. buffer := make([]PipelineItem, len(branches)-1)
  99. for i, item := range branches[0] {
  100. for j := 0; j < len(branches)-1; j++ {
  101. buffer[j] = branches[j+1][i]
  102. }
  103. item.Merge(buffer)
  104. }
  105. }
  106. // getMasterBranch returns the branch with the smallest index.
  107. func getMasterBranch(branches map[int][]PipelineItem) []PipelineItem {
  108. minKey := 1 << 31
  109. var minVal []PipelineItem
  110. for key, val := range branches {
  111. if key < minKey {
  112. minKey = key
  113. minVal = val
  114. }
  115. }
  116. return minVal
  117. }
  118. // prepareRunPlan schedules the actions for Pipeline.Run().
  119. func prepareRunPlan(commits []*object.Commit, printResult bool) []runAction {
  120. hashes, dag := buildDag(commits)
  121. leaveRootComponent(hashes, dag)
  122. mergedDag, mergedSeq := mergeDag(hashes, dag)
  123. orderNodes := bindOrderNodes(mergedDag)
  124. collapseFastForwards(orderNodes, hashes, mergedDag, dag, mergedSeq)
  125. /*fmt.Printf("digraph Hercules {\n")
  126. for i, c := range orderNodes(false, false) {
  127. commit := hashes[c]
  128. fmt.Printf(" \"%s\"[label=\"[%d] %s\"]\n", commit.Hash.String(), i, commit.Hash.String()[:6])
  129. for _, child := range mergedDag[commit.Hash] {
  130. fmt.Printf(" \"%s\" -> \"%s\"\n", commit.Hash.String(), child.Hash.String())
  131. }
  132. }
  133. fmt.Printf("}\n")*/
  134. plan := generatePlan(orderNodes, hashes, mergedDag, dag, mergedSeq)
  135. plan = collectGarbage(plan)
  136. if printResult {
  137. for _, p := range plan {
  138. firstItem := p.Items[0]
  139. switch p.Action {
  140. case runActionCommit:
  141. planPrintFunc("C", firstItem, p.Commit.Hash.String())
  142. case runActionFork:
  143. planPrintFunc("F", p.Items)
  144. case runActionMerge:
  145. planPrintFunc("M", p.Items)
  146. case runActionEmerge:
  147. planPrintFunc("E", p.Items)
  148. case runActionDelete:
  149. planPrintFunc("D", p.Items)
  150. }
  151. }
  152. }
  153. return plan
  154. }
  155. // buildDag generates the raw commit DAG and the commit hash map.
  156. func buildDag(commits []*object.Commit) (
  157. map[string]*object.Commit, map[plumbing.Hash][]*object.Commit) {
  158. hashes := map[string]*object.Commit{}
  159. for _, commit := range commits {
  160. hashes[commit.Hash.String()] = commit
  161. }
  162. dag := map[plumbing.Hash][]*object.Commit{}
  163. for _, commit := range commits {
  164. if _, exists := dag[commit.Hash]; !exists {
  165. dag[commit.Hash] = make([]*object.Commit, 0, 1)
  166. }
  167. for _, parent := range commit.ParentHashes {
  168. if _, exists := hashes[parent.String()]; !exists {
  169. continue
  170. }
  171. children := dag[parent]
  172. if children == nil {
  173. children = make([]*object.Commit, 0, 1)
  174. }
  175. dag[parent] = append(children, commit)
  176. }
  177. }
  178. return hashes, dag
  179. }
  180. // leaveRootComponent runs connected components analysis and throws away everything
  181. // but the part which grows from the root.
  182. func leaveRootComponent(
  183. hashes map[string]*object.Commit,
  184. dag map[plumbing.Hash][]*object.Commit) {
  185. visited := map[plumbing.Hash]bool{}
  186. var sets [][]plumbing.Hash
  187. for key := range dag {
  188. if visited[key] {
  189. continue
  190. }
  191. var set []plumbing.Hash
  192. for queue := []plumbing.Hash{key}; len(queue) > 0; {
  193. head := queue[len(queue)-1]
  194. queue = queue[:len(queue)-1]
  195. if visited[head] {
  196. continue
  197. }
  198. set = append(set, head)
  199. visited[head] = true
  200. for _, c := range dag[head] {
  201. if !visited[c.Hash] {
  202. queue = append(queue, c.Hash)
  203. }
  204. }
  205. if commit, exists := hashes[head.String()]; exists {
  206. for _, p := range commit.ParentHashes {
  207. if !visited[p] {
  208. if _, exists := hashes[p.String()]; exists {
  209. queue = append(queue, p)
  210. }
  211. }
  212. }
  213. }
  214. }
  215. sets = append(sets, set)
  216. }
  217. if len(sets) > 1 {
  218. maxlen := 0
  219. maxind := -1
  220. for i, set := range sets {
  221. if len(set) > maxlen {
  222. maxlen = len(set)
  223. maxind = i
  224. }
  225. }
  226. for i, set := range sets {
  227. if i == maxind {
  228. continue
  229. }
  230. for _, h := range set {
  231. log.Printf("warning: dropped %s from the analysis - disjoint", h.String())
  232. delete(dag, h)
  233. delete(hashes, h.String())
  234. }
  235. }
  236. }
  237. }
  238. // bindOrderNodes returns curried "orderNodes" function.
  239. func bindOrderNodes(mergedDag map[plumbing.Hash][]*object.Commit) orderer {
  240. return func(reverse, direction bool) []string {
  241. graph := toposort.NewGraph()
  242. keys := make([]plumbing.Hash, 0, len(mergedDag))
  243. for key := range mergedDag {
  244. keys = append(keys, key)
  245. }
  246. sort.Slice(keys, func(i, j int) bool { return keys[i].String() < keys[j].String() })
  247. for _, key := range keys {
  248. graph.AddNode(key.String())
  249. }
  250. for _, key := range keys {
  251. children := mergedDag[key]
  252. sort.Slice(children, func(i, j int) bool {
  253. return children[i].Hash.String() < children[j].Hash.String()
  254. })
  255. for _, c := range children {
  256. if !direction {
  257. graph.AddEdge(key.String(), c.Hash.String())
  258. } else {
  259. graph.AddEdge(c.Hash.String(), key.String())
  260. }
  261. }
  262. }
  263. order, ok := graph.Toposort()
  264. if !ok {
  265. // should never happen
  266. panic("Could not topologically sort the DAG of commits")
  267. }
  268. if reverse != direction {
  269. // one day this must appear in the standard library...
  270. for i, j := 0, len(order)-1; i < len(order)/2; i, j = i+1, j-1 {
  271. order[i], order[j] = order[j], order[i]
  272. }
  273. }
  274. return order
  275. }
  276. }
  277. // inverts `dag`
  278. func buildParents(dag map[plumbing.Hash][]*object.Commit) map[plumbing.Hash]map[plumbing.Hash]bool {
  279. parents := map[plumbing.Hash]map[plumbing.Hash]bool{}
  280. for key, vals := range dag {
  281. for _, val := range vals {
  282. myps := parents[val.Hash]
  283. if myps == nil {
  284. myps = map[plumbing.Hash]bool{}
  285. parents[val.Hash] = myps
  286. }
  287. myps[key] = true
  288. }
  289. }
  290. return parents
  291. }
  292. // mergeDag turns sequences of consecutive commits into single nodes.
  293. func mergeDag(
  294. hashes map[string]*object.Commit,
  295. dag map[plumbing.Hash][]*object.Commit) (
  296. mergedDag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  297. parents := buildParents(dag)
  298. mergedDag = map[plumbing.Hash][]*object.Commit{}
  299. mergedSeq = map[plumbing.Hash][]*object.Commit{}
  300. visited := map[plumbing.Hash]bool{}
  301. for head := range dag {
  302. if visited[head] {
  303. continue
  304. }
  305. c := head
  306. for true {
  307. nextParents := parents[c]
  308. var next plumbing.Hash
  309. for p := range nextParents {
  310. next = p
  311. break
  312. }
  313. if len(nextParents) != 1 || len(dag[next]) != 1 {
  314. break
  315. }
  316. c = next
  317. }
  318. head = c
  319. var seq []*object.Commit
  320. for true {
  321. visited[c] = true
  322. seq = append(seq, hashes[c.String()])
  323. if len(dag[c]) != 1 {
  324. break
  325. }
  326. c = dag[c][0].Hash
  327. if len(parents[c]) != 1 {
  328. break
  329. }
  330. }
  331. mergedSeq[head] = seq
  332. mergedDag[head] = dag[seq[len(seq)-1].Hash]
  333. }
  334. return
  335. }
  336. // collapseFastForwards removes the fast forward merges.
  337. func collapseFastForwards(
  338. orderNodes orderer, hashes map[string]*object.Commit,
  339. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  340. parents := buildParents(mergedDag)
  341. processed := map[plumbing.Hash]bool{}
  342. for _, strkey := range orderNodes(false, true) {
  343. key := hashes[strkey].Hash
  344. processed[key] = true
  345. repeat:
  346. vals, exists := mergedDag[key]
  347. if !exists {
  348. continue
  349. }
  350. if len(vals) < 2 {
  351. continue
  352. }
  353. toRemove := map[plumbing.Hash]bool{}
  354. sort.Slice(vals, func(i, j int) bool { return vals[i].Hash.String() < vals[j].Hash.String() })
  355. for _, child := range vals {
  356. var queue []plumbing.Hash
  357. visited := map[plumbing.Hash]bool{child.Hash: true}
  358. childParents := parents[child.Hash]
  359. childNumOtherParents := 0
  360. for parent := range childParents {
  361. if parent != key {
  362. visited[parent] = true
  363. childNumOtherParents++
  364. queue = append(queue, parent)
  365. }
  366. }
  367. var immediateParent plumbing.Hash
  368. if childNumOtherParents == 1 {
  369. immediateParent = queue[0]
  370. }
  371. for len(queue) > 0 {
  372. head := queue[len(queue)-1]
  373. queue = queue[:len(queue)-1]
  374. if processed[head] {
  375. if head == key {
  376. toRemove[child.Hash] = true
  377. if childNumOtherParents == 1 && len(mergedDag[immediateParent]) == 1 {
  378. mergedSeq[immediateParent] = append(
  379. mergedSeq[immediateParent], mergedSeq[child.Hash]...)
  380. delete(mergedSeq, child.Hash)
  381. mergedDag[immediateParent] = mergedDag[child.Hash]
  382. delete(mergedDag, child.Hash)
  383. parents[child.Hash] = parents[immediateParent]
  384. for _, vals := range parents {
  385. for v := range vals {
  386. if v == child.Hash {
  387. delete(vals, v)
  388. vals[immediateParent] = true
  389. break
  390. }
  391. }
  392. }
  393. }
  394. break
  395. }
  396. } else {
  397. for parent := range parents[head] {
  398. if !visited[parent] {
  399. visited[head] = true
  400. queue = append(queue, parent)
  401. }
  402. }
  403. }
  404. }
  405. }
  406. if len(toRemove) == 0 {
  407. continue
  408. }
  409. // update dag
  410. var newVals []*object.Commit
  411. node := mergedSeq[key][len(mergedSeq[key])-1].Hash
  412. for _, child := range dag[node] {
  413. if !toRemove[child.Hash] {
  414. newVals = append(newVals, child)
  415. }
  416. }
  417. dag[node] = newVals
  418. // update mergedDag
  419. newVals = []*object.Commit{}
  420. for _, child := range vals {
  421. if !toRemove[child.Hash] {
  422. newVals = append(newVals, child)
  423. }
  424. }
  425. merged := false
  426. if len(newVals) == 1 {
  427. onlyChild := newVals[0].Hash
  428. if len(parents[onlyChild]) == 1 {
  429. merged = true
  430. mergedSeq[key] = append(mergedSeq[key], mergedSeq[onlyChild]...)
  431. delete(mergedSeq, onlyChild)
  432. mergedDag[key] = mergedDag[onlyChild]
  433. delete(mergedDag, onlyChild)
  434. parents[onlyChild] = parents[key]
  435. for _, vals := range parents {
  436. for v := range vals {
  437. if v == onlyChild {
  438. delete(vals, v)
  439. vals[key] = true
  440. break
  441. }
  442. }
  443. }
  444. }
  445. }
  446. // update parents
  447. for rm := range toRemove {
  448. delete(parents[rm], key)
  449. }
  450. if !merged {
  451. mergedDag[key] = newVals
  452. } else {
  453. goto repeat
  454. }
  455. }
  456. }
  457. // generatePlan creates the list of actions from the commit DAG.
  458. func generatePlan(
  459. orderNodes orderer, hashes map[string]*object.Commit,
  460. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) []runAction {
  461. parents := buildParents(dag)
  462. var plan []runAction
  463. branches := map[plumbing.Hash]int{}
  464. branchers := map[plumbing.Hash]map[plumbing.Hash]int{}
  465. counter := rootBranchIndex
  466. for _, name := range orderNodes(false, true) {
  467. commit := hashes[name]
  468. if len(parents[commit.Hash]) == 0 {
  469. branches[commit.Hash] = counter
  470. plan = append(plan, runAction{
  471. Action: runActionEmerge,
  472. Commit: commit,
  473. Items: []int{counter},
  474. })
  475. counter++
  476. }
  477. var branch int
  478. {
  479. var exists bool
  480. branch, exists = branches[commit.Hash]
  481. if !exists {
  482. branch = -1
  483. }
  484. }
  485. branchExists := func() bool { return branch >= rootBranchIndex }
  486. appendCommit := func(c *object.Commit, branch int) {
  487. if branch == 0 {
  488. log.Panicf("setting a zero branch for %s", c.Hash.String())
  489. }
  490. plan = append(plan, runAction{
  491. Action: runActionCommit,
  492. Commit: c,
  493. Items: []int{branch},
  494. })
  495. }
  496. appendMergeIfNeeded := func() bool {
  497. if len(parents[commit.Hash]) < 2 {
  498. return false
  499. }
  500. // merge after the merge commit (the first in the sequence)
  501. var items []int
  502. minBranch := 1 << 31
  503. for parent := range parents[commit.Hash] {
  504. parentBranch := -1
  505. if parents, exists := branchers[commit.Hash]; exists {
  506. if inheritedBranch, exists := parents[parent]; exists {
  507. parentBranch = inheritedBranch
  508. }
  509. }
  510. if parentBranch == -1 {
  511. parentBranch = branches[parent]
  512. if parentBranch < rootBranchIndex {
  513. log.Panicf("parent %s > %s does not have a branch assigned",
  514. parent.String(), commit.Hash.String())
  515. }
  516. }
  517. if len(dag[parent]) == 1 && minBranch > parentBranch {
  518. minBranch = parentBranch
  519. }
  520. items = append(items, parentBranch)
  521. if parentBranch != branch {
  522. appendCommit(commit, parentBranch)
  523. }
  524. }
  525. // there should be no duplicates in items
  526. if minBranch < 1<<31 {
  527. branch = minBranch
  528. branches[commit.Hash] = minBranch
  529. } else if !branchExists() {
  530. log.Panicf("failed to assign the branch to merge %s", commit.Hash.String())
  531. }
  532. plan = append(plan, runAction{
  533. Action: runActionMerge,
  534. Commit: nil,
  535. Items: items,
  536. })
  537. return true
  538. }
  539. var head plumbing.Hash
  540. if subseq, exists := mergedSeq[commit.Hash]; exists {
  541. for subseqIndex, offspring := range subseq {
  542. if branchExists() {
  543. appendCommit(offspring, branch)
  544. }
  545. if subseqIndex == 0 {
  546. if !appendMergeIfNeeded() && !branchExists() {
  547. log.Panicf("head of the sequence does not have an assigned branch: %s",
  548. commit.Hash.String())
  549. }
  550. }
  551. }
  552. head = subseq[len(subseq)-1].Hash
  553. branches[head] = branch
  554. } else {
  555. head = commit.Hash
  556. }
  557. if len(mergedDag[commit.Hash]) > 1 {
  558. children := []int{branch}
  559. for i, child := range mergedDag[commit.Hash] {
  560. if i == 0 {
  561. branches[child.Hash] = branch
  562. continue
  563. }
  564. if _, exists := branches[child.Hash]; !exists {
  565. branches[child.Hash] = counter
  566. }
  567. parents := branchers[child.Hash]
  568. if parents == nil {
  569. parents = map[plumbing.Hash]int{}
  570. branchers[child.Hash] = parents
  571. }
  572. parents[head] = counter
  573. children = append(children, counter)
  574. counter++
  575. }
  576. plan = append(plan, runAction{
  577. Action: runActionFork,
  578. Commit: hashes[head.String()],
  579. Items: children,
  580. })
  581. }
  582. }
  583. return plan
  584. }
  585. // collectGarbage inserts `runActionDelete` disposal steps.
  586. func collectGarbage(plan []runAction) []runAction {
  587. // lastMentioned maps branch index to the index inside `plan` when that branch was last used
  588. lastMentioned := map[int]int{}
  589. for i, p := range plan {
  590. firstItem := p.Items[0]
  591. switch p.Action {
  592. case runActionCommit:
  593. lastMentioned[firstItem] = i
  594. if firstItem < rootBranchIndex {
  595. log.Panicf("commit %s does not have an assigned branch",
  596. p.Commit.Hash.String())
  597. }
  598. case runActionFork:
  599. lastMentioned[firstItem] = i
  600. case runActionMerge:
  601. for _, item := range p.Items {
  602. lastMentioned[item] = i
  603. }
  604. case runActionEmerge:
  605. lastMentioned[firstItem] = i
  606. }
  607. }
  608. var garbageCollectedPlan []runAction
  609. lastMentionedArr := make([][2]int, 0, len(lastMentioned)+1)
  610. for key, val := range lastMentioned {
  611. if val != len(plan)-1 {
  612. lastMentionedArr = append(lastMentionedArr, [2]int{val, key})
  613. }
  614. }
  615. if len(lastMentionedArr) == 0 {
  616. // early return - we have nothing to collect
  617. return plan
  618. }
  619. sort.Slice(lastMentionedArr, func(i, j int) bool {
  620. return lastMentionedArr[i][0] < lastMentionedArr[j][0]
  621. })
  622. lastMentionedArr = append(lastMentionedArr, [2]int{len(plan) - 1, -1})
  623. prevpi := -1
  624. for _, pair := range lastMentionedArr {
  625. for pi := prevpi + 1; pi <= pair[0]; pi++ {
  626. garbageCollectedPlan = append(garbageCollectedPlan, plan[pi])
  627. }
  628. if pair[1] >= 0 {
  629. prevpi = pair[0]
  630. garbageCollectedPlan = append(garbageCollectedPlan, runAction{
  631. Action: runActionDelete,
  632. Commit: nil,
  633. Items: []int{pair[1]},
  634. })
  635. }
  636. }
  637. return garbageCollectedPlan
  638. }