forks.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. package core
  2. import (
  3. "log"
  4. "reflect"
  5. "sort"
  6. "gopkg.in/src-d/go-git.v4/plumbing/object"
  7. "gopkg.in/src-d/go-git.v4/plumbing"
  8. "gopkg.in/src-d/hercules.v4/internal/toposort"
  9. )
  10. // OneShotMergeProcessor provides the convenience method to consume merges only once.
  11. type OneShotMergeProcessor struct {
  12. merges map[plumbing.Hash]bool
  13. }
  14. // Initialize resets OneShotMergeProcessor.
  15. func (proc *OneShotMergeProcessor) Initialize() {
  16. proc.merges = map[plumbing.Hash]bool{}
  17. }
  18. // ShouldConsumeCommit returns true on regular commits. It also returns true upon
  19. // the first occurrence of a particular merge commit.
  20. func (proc *OneShotMergeProcessor) ShouldConsumeCommit(deps map[string]interface{}) bool {
  21. commit := deps[DependencyCommit].(*object.Commit)
  22. if commit.NumParents() <= 1 {
  23. return true
  24. }
  25. if !proc.merges[commit.Hash] {
  26. proc.merges[commit.Hash] = true
  27. return true
  28. }
  29. return false
  30. }
  31. // IsMergeCommit indicates whether the commit is a merge or not.
  32. func IsMergeCommit(deps map[string]interface{}) bool {
  33. return deps[DependencyCommit].(*object.Commit).NumParents() > 1
  34. }
  35. // NoopMerger provides an empty Merge() method suitable for PipelineItem.
  36. type NoopMerger struct {
  37. }
  38. // Merge does nothing.
  39. func (merger *NoopMerger) Merge(branches []PipelineItem) {
  40. // no-op
  41. }
  42. // ForkSamePipelineItem clones items by referencing the same origin.
  43. func ForkSamePipelineItem(origin PipelineItem, n int) []PipelineItem {
  44. clones := make([]PipelineItem, n)
  45. for i := 0; i < n; i++ {
  46. clones[i] = origin
  47. }
  48. return clones
  49. }
  50. // ForkCopyPipelineItem clones items by copying them by value from the origin.
  51. func ForkCopyPipelineItem(origin PipelineItem, n int) []PipelineItem {
  52. originValue := reflect.Indirect(reflect.ValueOf(origin))
  53. originType := originValue.Type()
  54. clones := make([]PipelineItem, n)
  55. for i := 0; i < n; i++ {
  56. cloneValue := reflect.New(originType).Elem()
  57. cloneValue.Set(originValue)
  58. clones[i] = cloneValue.Addr().Interface().(PipelineItem)
  59. }
  60. return clones
  61. }
  62. const (
  63. // runActionCommit corresponds to a regular commit
  64. runActionCommit = 0
  65. // runActionFork splits a branch into several parts
  66. runActionFork = iota
  67. // runActionMerge merges several branches together
  68. runActionMerge = iota
  69. // runActionDelete removes the branch as it is no longer needed
  70. runActionDelete = iota
  71. )
  72. type runAction struct {
  73. Action int
  74. Commit *object.Commit
  75. Items []int
  76. }
  77. type orderer = func(reverse, direction bool) []string
  78. func cloneItems(origin []PipelineItem, n int) [][]PipelineItem {
  79. clones := make([][]PipelineItem, n)
  80. for j := 0; j < n; j++ {
  81. clones[j] = make([]PipelineItem, len(origin))
  82. }
  83. for i, item := range origin {
  84. itemClones := item.Fork(n)
  85. for j := 0; j < n; j++ {
  86. clones[j][i] = itemClones[j]
  87. }
  88. }
  89. return clones
  90. }
  91. func mergeItems(branches [][]PipelineItem) {
  92. buffer := make([]PipelineItem, len(branches) - 1)
  93. for i, item := range branches[0] {
  94. for j := 0; j < len(branches)-1; j++ {
  95. buffer[j] = branches[j+1][i]
  96. }
  97. item.Merge(buffer)
  98. }
  99. }
  100. // getMasterBranch returns the branch with the smallest index.
  101. func getMasterBranch(branches map[int][]PipelineItem) []PipelineItem {
  102. minKey := 1 << 31
  103. var minVal []PipelineItem
  104. for key, val := range branches {
  105. if key < minKey {
  106. minKey = key
  107. minVal = val
  108. }
  109. }
  110. return minVal
  111. }
  112. // prepareRunPlan schedules the actions for Pipeline.Run().
  113. func prepareRunPlan(commits []*object.Commit) []runAction {
  114. hashes, dag := buildDag(commits)
  115. leaveRootComponent(hashes, dag)
  116. numParents := bindNumParents(hashes, dag)
  117. mergedDag, mergedSeq := mergeDag(numParents, hashes, dag)
  118. orderNodes := bindOrderNodes(mergedDag)
  119. collapseFastForwards(orderNodes, hashes, mergedDag, dag, mergedSeq)
  120. /*fmt.Printf("digraph Hercules {\n")
  121. for i, c := range order {
  122. commit := hashes[c]
  123. fmt.Printf(" \"%s\"[label=\"[%d] %s\"]\n", commit.Hash.String(), i, commit.Hash.String()[:6])
  124. for _, child := range mergedDag[commit.Hash] {
  125. fmt.Printf(" \"%s\" -> \"%s\"\n", commit.Hash.String(), child.Hash.String())
  126. }
  127. }
  128. fmt.Printf("}\n")*/
  129. plan := generatePlan(orderNodes, numParents, hashes, mergedDag, dag, mergedSeq)
  130. plan = optimizePlan(plan)
  131. return plan
  132. }
  133. // buildDag generates the raw commit DAG and the commit hash map.
  134. func buildDag(commits []*object.Commit) (
  135. map[string]*object.Commit, map[plumbing.Hash][]*object.Commit) {
  136. hashes := map[string]*object.Commit{}
  137. for _, commit := range commits {
  138. hashes[commit.Hash.String()] = commit
  139. }
  140. dag := map[plumbing.Hash][]*object.Commit{}
  141. for _, commit := range commits {
  142. if _, exists := dag[commit.Hash]; !exists {
  143. dag[commit.Hash] = make([]*object.Commit, 0, 1)
  144. }
  145. for _, parent := range commit.ParentHashes {
  146. if _, exists := hashes[parent.String()]; !exists {
  147. continue
  148. }
  149. children := dag[parent]
  150. if children == nil {
  151. children = make([]*object.Commit, 0, 1)
  152. }
  153. dag[parent] = append(children, commit)
  154. }
  155. }
  156. return hashes, dag
  157. }
  158. // bindNumParents returns curried "numParents" function.
  159. func bindNumParents(
  160. hashes map[string]*object.Commit,
  161. dag map[plumbing.Hash][]*object.Commit) func(c *object.Commit) int {
  162. return func(c *object.Commit) int {
  163. r := 0
  164. for _, parent := range c.ParentHashes {
  165. if p, exists := hashes[parent.String()]; exists {
  166. for _, pc := range dag[p.Hash] {
  167. if pc.Hash == c.Hash {
  168. r++
  169. break
  170. }
  171. }
  172. }
  173. }
  174. return r
  175. }
  176. }
  177. // leaveRootComponent runs connected components analysis and throws away everything
  178. // but the part which grows from the root.
  179. func leaveRootComponent(
  180. hashes map[string]*object.Commit,
  181. dag map[plumbing.Hash][]*object.Commit) {
  182. visited := map[plumbing.Hash]bool{}
  183. var sets [][]plumbing.Hash
  184. for key := range dag {
  185. if visited[key] {
  186. continue
  187. }
  188. var set []plumbing.Hash
  189. for queue := []plumbing.Hash{key}; len(queue) > 0; {
  190. head := queue[len(queue)-1]
  191. queue = queue[:len(queue)-1]
  192. if visited[head] {
  193. continue
  194. }
  195. set = append(set, head)
  196. visited[head] = true
  197. for _, c := range dag[head] {
  198. if !visited[c.Hash] {
  199. queue = append(queue, c.Hash)
  200. }
  201. }
  202. if commit, exists := hashes[head.String()]; exists {
  203. for _, p := range commit.ParentHashes {
  204. if !visited[p] {
  205. if _, exists := hashes[p.String()]; exists {
  206. queue = append(queue, p)
  207. }
  208. }
  209. }
  210. }
  211. }
  212. sets = append(sets, set)
  213. }
  214. if len(sets) > 1 {
  215. maxlen := 0
  216. maxind := -1
  217. for i, set := range sets {
  218. if len(set) > maxlen {
  219. maxlen = len(set)
  220. maxind = i
  221. }
  222. }
  223. for i, set := range sets {
  224. if i == maxind {
  225. continue
  226. }
  227. for _, h := range set {
  228. log.Printf("warning: dropped %s from the analysis - disjoint", h.String())
  229. delete(dag, h)
  230. delete(hashes, h.String())
  231. }
  232. }
  233. }
  234. }
  235. // bindOrderNodes returns curried "orderNodes" function.
  236. func bindOrderNodes(mergedDag map[plumbing.Hash][]*object.Commit) orderer {
  237. return func(reverse, direction bool) []string {
  238. graph := toposort.NewGraph()
  239. keys := make([]plumbing.Hash, 0, len(mergedDag))
  240. for key := range mergedDag {
  241. keys = append(keys, key)
  242. }
  243. sort.Slice(keys, func(i, j int) bool { return keys[i].String() < keys[j].String() })
  244. for _, key := range keys {
  245. graph.AddNode(key.String())
  246. }
  247. for _, key := range keys {
  248. children := mergedDag[key]
  249. sort.Slice(children, func(i, j int) bool {
  250. return children[i].Hash.String() < children[j].Hash.String()
  251. })
  252. for _, c := range children {
  253. if !direction {
  254. graph.AddEdge(key.String(), c.Hash.String())
  255. } else {
  256. graph.AddEdge(c.Hash.String(), key.String())
  257. }
  258. }
  259. }
  260. order, ok := graph.Toposort()
  261. if !ok {
  262. // should never happen
  263. panic("Could not topologically sort the DAG of commits")
  264. }
  265. if reverse != direction {
  266. // one day this must appear in the standard library...
  267. for i, j := 0, len(order)-1; i < len(order)/2; i, j = i+1, j-1 {
  268. order[i], order[j] = order[j], order[i]
  269. }
  270. }
  271. return order
  272. }
  273. }
  274. // mergeDag turns sequences of consecutive commits into single nodes.
  275. func mergeDag(
  276. numParents func(c *object.Commit) int,
  277. hashes map[string]*object.Commit,
  278. dag map[plumbing.Hash][]*object.Commit) (
  279. mergedDag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  280. parentOf := func(c *object.Commit) plumbing.Hash {
  281. var parent plumbing.Hash
  282. for _, p := range c.ParentHashes {
  283. if _, exists := hashes[p.String()]; exists {
  284. if parent != plumbing.ZeroHash {
  285. // more than one parent
  286. return plumbing.ZeroHash
  287. }
  288. parent = p
  289. }
  290. }
  291. return parent
  292. }
  293. mergedDag = map[plumbing.Hash][]*object.Commit{}
  294. mergedSeq = map[plumbing.Hash][]*object.Commit{}
  295. visited := map[plumbing.Hash]bool{}
  296. for ch := range dag {
  297. c := hashes[ch.String()]
  298. if visited[c.Hash] {
  299. continue
  300. }
  301. for true {
  302. parent := parentOf(c)
  303. if parent == plumbing.ZeroHash || len(dag[parent]) != 1 {
  304. break
  305. }
  306. c = hashes[parent.String()]
  307. }
  308. head := c
  309. var seq []*object.Commit
  310. children := dag[c.Hash]
  311. for true {
  312. visited[c.Hash] = true
  313. seq = append(seq, c)
  314. if len(children) != 1 {
  315. break
  316. }
  317. c = children[0]
  318. children = dag[c.Hash]
  319. if numParents(c) != 1 {
  320. break
  321. }
  322. }
  323. mergedSeq[head.Hash] = seq
  324. mergedDag[head.Hash] = dag[seq[len(seq)-1].Hash]
  325. }
  326. return
  327. }
  328. // collapseFastForwards removes the fast forward merges.
  329. func collapseFastForwards(
  330. orderNodes orderer, hashes map[string]*object.Commit,
  331. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) {
  332. for _, strkey := range orderNodes(true, false) {
  333. key := hashes[strkey].Hash
  334. vals, exists := mergedDag[key]
  335. if !exists {
  336. continue
  337. }
  338. if len(vals) == 2 {
  339. grand1 := mergedDag[vals[0].Hash]
  340. grand2 := mergedDag[vals[1].Hash]
  341. if len(grand2) == 1 && vals[0].Hash == grand2[0].Hash {
  342. mergedDag[key] = mergedDag[vals[0].Hash]
  343. dag[key] = vals[1:]
  344. delete(mergedDag, vals[0].Hash)
  345. delete(mergedDag, vals[1].Hash)
  346. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
  347. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
  348. delete(mergedSeq, vals[0].Hash)
  349. delete(mergedSeq, vals[1].Hash)
  350. }
  351. // symmetric
  352. if len(grand1) == 1 && vals[1].Hash == grand1[0].Hash {
  353. mergedDag[key] = mergedDag[vals[1].Hash]
  354. dag[key] = vals[:1]
  355. delete(mergedDag, vals[0].Hash)
  356. delete(mergedDag, vals[1].Hash)
  357. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[0].Hash]...)
  358. mergedSeq[key] = append(mergedSeq[key], mergedSeq[vals[1].Hash]...)
  359. delete(mergedSeq, vals[0].Hash)
  360. delete(mergedSeq, vals[1].Hash)
  361. }
  362. }
  363. }
  364. }
  365. // generatePlan creates the list of actions from the commit DAG.
  366. func generatePlan(
  367. orderNodes orderer, numParents func(c *object.Commit) int,
  368. hashes map[string]*object.Commit,
  369. mergedDag, dag, mergedSeq map[plumbing.Hash][]*object.Commit) []runAction {
  370. var plan []runAction
  371. branches := map[plumbing.Hash]int{}
  372. branchers := map[plumbing.Hash]map[plumbing.Hash]int{}
  373. counter := 1
  374. for seqIndex, name := range orderNodes(false, true) {
  375. commit := hashes[name]
  376. if seqIndex == 0 {
  377. branches[commit.Hash] = 0
  378. }
  379. var branch int
  380. {
  381. var exists bool
  382. branch, exists = branches[commit.Hash]
  383. if !exists {
  384. branch = -1
  385. }
  386. }
  387. branchExists := func() bool { return branch >= 0 }
  388. appendCommit := func(c *object.Commit, branch int) {
  389. plan = append(plan, runAction{
  390. Action: runActionCommit,
  391. Commit: c,
  392. Items: []int{branch},
  393. })
  394. }
  395. appendMergeIfNeeded := func() {
  396. if numParents(commit) < 2 {
  397. return
  398. }
  399. // merge after the merge commit (the first in the sequence)
  400. var items []int
  401. minBranch := 1 << 31
  402. for _, parent := range commit.ParentHashes {
  403. if _, exists := hashes[parent.String()]; !exists {
  404. continue
  405. }
  406. parentBranch := -1
  407. if parents, exists := branchers[commit.Hash]; exists {
  408. if inheritedBranch, exists := parents[parent]; exists {
  409. parentBranch = inheritedBranch
  410. }
  411. }
  412. if parentBranch == -1 {
  413. parentBranch = branches[parent]
  414. }
  415. if len(dag[parent]) == 1 && minBranch > parentBranch {
  416. minBranch = parentBranch
  417. }
  418. items = append(items, parentBranch)
  419. if parentBranch != branch {
  420. appendCommit(commit, parentBranch)
  421. }
  422. }
  423. if minBranch < 1 << 31 {
  424. branch = minBranch
  425. branches[commit.Hash] = minBranch
  426. } else if !branchExists() {
  427. log.Panicf("!branchExists(%s)", commit.Hash.String())
  428. }
  429. plan = append(plan, runAction{
  430. Action: runActionMerge,
  431. Commit: nil,
  432. Items: items,
  433. })
  434. }
  435. var head plumbing.Hash
  436. if subseq, exists := mergedSeq[commit.Hash]; exists {
  437. for subseqIndex, offspring := range subseq {
  438. if branchExists() {
  439. appendCommit(offspring, branch)
  440. }
  441. if subseqIndex == 0 {
  442. appendMergeIfNeeded()
  443. }
  444. }
  445. head = subseq[len(subseq)-1].Hash
  446. branches[head] = branch
  447. } else {
  448. head = commit.Hash
  449. }
  450. if len(mergedDag[commit.Hash]) > 1 {
  451. children := []int{branch}
  452. for i, child := range mergedDag[commit.Hash] {
  453. if i == 0 {
  454. branches[child.Hash] = branch
  455. continue
  456. }
  457. if _, exists := branches[child.Hash]; !exists {
  458. branches[child.Hash] = counter
  459. }
  460. parents := branchers[child.Hash]
  461. if parents == nil {
  462. parents = map[plumbing.Hash]int{}
  463. branchers[child.Hash] = parents
  464. }
  465. parents[head] = counter
  466. children = append(children, counter)
  467. counter++
  468. }
  469. plan = append(plan, runAction{
  470. Action: runActionFork,
  471. Commit: nil,
  472. Items: children,
  473. })
  474. }
  475. }
  476. return plan
  477. }
  478. // optimizePlan removes "dead" nodes and inserts `runActionDelete` disposal steps.
  479. //
  480. // | *
  481. // * /
  482. // |\/
  483. // |/
  484. // *
  485. //
  486. func optimizePlan(plan []runAction) []runAction {
  487. // lives maps branch index to the number of commits in that branch
  488. lives := map[int]int{}
  489. // lastMentioned maps branch index to the index inside `plan` when that branch was last used
  490. lastMentioned := map[int]int{}
  491. for i, p := range plan {
  492. firstItem := p.Items[0]
  493. switch p.Action {
  494. case runActionCommit:
  495. lives[firstItem]++
  496. lastMentioned[firstItem] = i
  497. case runActionFork:
  498. lastMentioned[firstItem] = i
  499. case runActionMerge:
  500. for _, item := range p.Items {
  501. lastMentioned[item] = i
  502. }
  503. }
  504. }
  505. branchesToDelete := map[int]bool{}
  506. for key, life := range lives {
  507. if life == 1 {
  508. branchesToDelete[key] = true
  509. delete(lastMentioned, key)
  510. }
  511. }
  512. var optimizedPlan []runAction
  513. lastMentionedArr := make([][2]int, 0, len(lastMentioned) + 1)
  514. for key, val := range lastMentioned {
  515. if val != len(plan) - 1 {
  516. lastMentionedArr = append(lastMentionedArr, [2]int{val, key})
  517. }
  518. }
  519. if len(lastMentionedArr) == 0 && len(branchesToDelete) == 0 {
  520. // early return - we have nothing to optimize
  521. return plan
  522. }
  523. sort.Slice(lastMentionedArr, func(i, j int) bool {
  524. return lastMentionedArr[i][0] < lastMentionedArr[j][0]
  525. })
  526. lastMentionedArr = append(lastMentionedArr, [2]int{len(plan)-1, -1})
  527. prevpi := -1
  528. for _, pair := range lastMentionedArr {
  529. for pi := prevpi + 1; pi <= pair[0]; pi++ {
  530. p := plan[pi]
  531. switch p.Action {
  532. case runActionCommit:
  533. if !branchesToDelete[p.Items[0]] {
  534. optimizedPlan = append(optimizedPlan, p)
  535. }
  536. case runActionFork:
  537. var newBranches []int
  538. for _, b := range p.Items {
  539. if !branchesToDelete[b] {
  540. newBranches = append(newBranches, b)
  541. }
  542. }
  543. if len(newBranches) > 1 {
  544. optimizedPlan = append(optimizedPlan, runAction{
  545. Action: runActionFork,
  546. Commit: p.Commit,
  547. Items: newBranches,
  548. })
  549. }
  550. case runActionMerge:
  551. var newBranches []int
  552. for _, b := range p.Items {
  553. if !branchesToDelete[b] {
  554. newBranches = append(newBranches, b)
  555. }
  556. }
  557. if len(newBranches) > 1 {
  558. optimizedPlan = append(optimizedPlan, runAction{
  559. Action: runActionMerge,
  560. Commit: p.Commit,
  561. Items: newBranches,
  562. })
  563. }
  564. }
  565. }
  566. if pair[1] >= 0 {
  567. prevpi = pair[0]
  568. optimizedPlan = append(optimizedPlan, runAction{
  569. Action: runActionDelete,
  570. Commit: nil,
  571. Items: []int{pair[1]},
  572. })
  573. }
  574. }
  575. // single commit can be detected as redundant
  576. if len(optimizedPlan) > 0 {
  577. return optimizedPlan
  578. }
  579. return plan
  580. // TODO(vmarkovtsev): there can be also duplicate redundant merges, e.g.
  581. /*
  582. 0 4e34f03d829fbacb71cde0e010de87ea945dc69a [3]
  583. 0 4e34f03d829fbacb71cde0e010de87ea945dc69a [12]
  584. 2 [3 12]
  585. 0 06716c2b39422938b77ddafa4d5c39bb9e4476da [3]
  586. 0 06716c2b39422938b77ddafa4d5c39bb9e4476da [12]
  587. 2 [3 12]
  588. 0 1219c7bf9e0e1a93459a052ab8b351bfc379dc19 [12]
  589. */
  590. }