概要
這個(gè)類在 Oracle 的官方文檔里是查不到的,但是確實(shí)在 OpenJDK
的源代碼里出現(xiàn)了,Arrays
中的 sort
函數(shù)用到了這個(gè)用于排序的類。它將歸并排序(merge sort) 與插入排序(insertion sort) 結(jié)合,并進(jìn)行了一些優(yōu)化。對于已經(jīng)部分排序的數(shù)組,時(shí)間復(fù)雜度遠(yuǎn)低于 O(n log(n))
,最好可達(dá) O(n)
,對于隨機(jī)排序的數(shù)組,時(shí)間復(fù)雜度為 O(nlog(n))
,平均時(shí)間復(fù)雜度 O(nlog(n))
。強(qiáng)烈建議在看此文前觀看 Youtube 上的 可視化Timsort,看完后馬上就會(huì)對算法的執(zhí)行過程有一個(gè)感性的了解。然后,可以閱讀 Wikipeida 詞條:Timsort。 這個(gè)排序算法在 Java SE 7, Android, GNU Octave 中都得到了應(yīng)用。另外,文 后也推薦了兩篇非常好的文章,如果想搞明白 TimSort
最好閱讀一下。
此類是對 Python
中,由 Tim Peters
實(shí)現(xiàn)的排序算法的改寫。實(shí)現(xiàn)來自:listobject.c.
原始論文來自:
"Optimistic Sorting and Information Theoretic Complexity" Peter McIlroy SODA (Fourth Annual ACM-SIAM Symposium on Discrete Algorithms), pp 467-474, Austin, Texas, 25-27 January 1993.
實(shí)現(xiàn)
static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c) { if (c == null) { Arrays.sort(a, lo, hi); return; } rangeCheck(a.length, lo, hi); int nRemaining = hi - lo; if (nRemaining < 2) return; // Arrays of size 0 and 1 are always sorted // If array is small, do a "mini-TimSort" with no merges if (nRemaining < MIN_MERGE) { int initRunLen = countRunAndMakeAscending(a, lo, hi, c); binarySort(a, lo, hi, lo + initRunLen, c); return; } /** * March over the array once, left to right, finding natural runs, * extending short natural runs to minRun elements, and merging runs * to maintain stack invariant. */ TimSort<T> ts = new TimSort<>(a, c); int minRun = minRunLength(nRemaining); do { // Identify next run int runLen = countRunAndMakeAscending(a, lo, hi, c); // If run is short, extend to min(minRun, nRemaining) if (runLen < minRun) { int force = nRemaining <= minRun ? nRemaining : minRun; binarySort(a, lo, lo + force, lo + runLen, c); runLen = force; } // Push run onto pending-run stack, and maybe merge ts.pushRun(lo, runLen); ts.mergeCollapse(); // Advance to find next run lo += runLen; nRemaining -= runLen; } while (nRemaining != 0); // Merge all remaining runs to complete sort assert lo == hi; ts.mergeForceCollapse(); assert ts.stackSize == 1; }
下面分段解釋:
if (c == null) { Arrays.sort(a, lo, hi); return; }
如果沒有提供 Comparaotr
的話,會(huì)調(diào)用 Arrays.sort
中的函數(shù),背后其實(shí)又會(huì)調(diào)用 ComparableTimSort
,它是對沒有提供Comparator
,但是實(shí)現(xiàn)了 Comparable
的元素進(jìn)行排序,算法和這里的是一樣的,就是元素比較方法不一樣。
后面是算法的主體:
if (nRemaining < 2) return; // Arrays of size 0 and 1 are always sorted // If array is small, do a "mini-TimSort" with no merges if (nRemaining < MIN_MERGE) { int initRunLen = countRunAndMakeAscending(a, lo, hi, c); binarySort(a, lo, hi, lo + initRunLen, c); return; }
- 如果元素個(gè)數(shù)小于2,直接返回,因?yàn)檫@兩個(gè)元素已經(jīng)排序了
- 如果元素個(gè)數(shù)小于一個(gè)閾值(默認(rèn)為),調(diào)用
binarySort
,這是一個(gè)不包含合并操作的 mini-TimSort
。 - 在關(guān)鍵的
do-while
循環(huán)中,不斷地進(jìn)行排序,合并,排序,合并,一直到所有數(shù)據(jù)都處理完。
TimSort<T> ts = new TimSort<>(a, c); int minRun = minRunLength(nRemaining); do { ... } while (nRemaining != 0);
這個(gè)函數(shù)會(huì)找出 run
的最小長度,少于這個(gè)長度就需要對其進(jìn)行擴(kuò)展。
static int minRunLength(int n) { assert n >= 0; int r = 0; // Becomes 1 if any 1 bits are shifted off while (n >= MIN_MERGE) { r |= (n & 1); n >>= 1; } return n + r; }
先看看 n 與 minRunLength(n) 對應(yīng)關(guān)系
0 0 1 1 2 2 3 3 4 4 5 5 6 6 7 7 8 8 9 9 10 10 11 11 12 12 13 13 14 14 15 15 16 16 17 17 18 18 19 19 20 20 21 21 22 22 23 23 24 24 25 25 26 26 27 27 28 28 29 29 30 30 31 31 32 16 33 17 34 17 35 18 36 18 37 19 38 19 39 20 40 20 41 21 42 21 43 22 44 22 45 23 46 23 47 24 48 24 49 25 50 25 51 26 52 26 53 27 54 27 55 28 56 28 57 29 58 29 59 30 60 30 61 31 62 31 63 32 64 16 65 17 66 17 67 17 68 17 69 18 70 18 71 18 72 18 73 19 74 19 75 19 76 19 77 20 78 20 79 20 80 20 81 21 82 21 83 21 84 21 85 22 86 22 87 22 88 22 89 23 90 23 91 23 92 23 93 24 94 24 95 24 96 24 97 25 98 25 99 25 ...
看這個(gè)估計(jì)可以猜出來函數(shù)的功能了,下面解釋一下。
這個(gè)函數(shù)根據(jù) n 計(jì)算出對應(yīng)的 natural run
的最小長度。MIN_MERGE
默認(rèn)為 32
,如果n小于此值,那么返回 n
本身。否則會(huì)將 n
不斷地右移,直到少于 MIN_MERGE
,同時(shí)記錄一個(gè) r
值,r 代表最后一次移位n時(shí),n最低位是0還是1。 最后返回 n + r
,這也意味著只保留最高的 5 位,再加上第六位。
我們再看看 do-while
中發(fā)生了什么。
TimSort<T> ts = new TimSort<>(a, c); int minRun = minRunLength(nRemaining); do { // Identify next run int runLen = countRunAndMakeAscending(a, lo, hi, c); // If run is short, extend to min(minRun, nRemaining) if (runLen < minRun) { int force = nRemaining <= minRun ? nRemaining : minRun; binarySort(a, lo, lo + force, lo + runLen, c); runLen = force; } // Push run onto pending-run stack, and maybe merge ts.pushRun(lo, runLen); ts.mergeCollapse(); // Advance to find next run lo += runLen; nRemaining -= runLen; } while (nRemaining != 0);
countRunAndMakeAscending
會(huì)找到一個(gè) run
,這個(gè) run
必須是已經(jīng)排序的,并且函數(shù)會(huì)保證它為升序,也就是說,如果找到的是一個(gè)降序的,會(huì)對其進(jìn)行翻轉(zhuǎn)。
簡單看一眼這個(gè)函數(shù):
private static <T> int countRunAndMakeAscending(T[] a, int lo, int hi, Comparator<? super T> c) { assert lo < hi; int runHi = lo + 1; if (runHi == hi) return 1; // Find end of run, and reverse range if descending if (c.compare(a[runHi++], a[lo]) < 0) { // Descending while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) < 0) runHi++; reverseRange(a, lo, runHi); } else { // Ascending while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) >= 0) runHi++; } return runHi - lo; }
注意其中的 reverseRange
就是我們說的翻轉(zhuǎn)。
現(xiàn)在,有必要看一下 binarySort
了。
private static <T> void binarySort(T[] a, int lo, int hi, int start, Comparator<? super T> c) { assert lo <= start && start <= hi; if (start == lo) start++; for ( ; start < hi; start++) { T pivot = a[start]; // Set left (and right) to the index where a[start] (pivot) belongs int left = lo; int right = start; assert left <= right; /* * Invariants: * pivot >= all in [lo, left). * pivot < all in [right, start). */ while (left < right) { int mid = (left + right) >>> 1; if (c.compare(pivot, a[mid]) < 0) right = mid; else left = mid + 1; } assert left == right; /* * The invariants still hold: pivot >= all in [lo, left) and * pivot < all in [left, start), so pivot belongs at left. Note * that if there are elements equal to pivot, left points to the * first slot after them -- that's why this sort is stable. * Slide elements over to make room for pivot. */ int n = start - left; // The number of elements to move // Switch is just an optimization for arraycopy in default case switch (n) { case 2: a[left + 2] = a[left + 1]; case 1: a[left + 1] = a[left]; break; default: System.arraycopy(a, left, a, left + 1, n); } a[left] = pivot; } }
我們都聽說過 binarySearch
,但是這個(gè) binarySort
又是什么呢? binarySort
對數(shù)組 a[lo:hi]
進(jìn)行排序,并且a[lo:start]
是已經(jīng)排好序的。算法的思路是對 a[start:hi]
中的元素,每次使用 binarySearch
為它在 a[lo:start]
中找到相應(yīng)位置,并插入。
回到 do-while
循環(huán)中,看看 binarySearch
的作用:
// If run is short, extend to min(minRun, nRemaining) if (runLen < minRun) { int force = nRemaining <= minRun ? nRemaining : minRun; binarySort(a, lo, lo + force, lo + runLen, c); runLen = force; }
所以,我們明白了,binarySort
對 run
進(jìn)行了擴(kuò)展,并且擴(kuò)展后,run
仍然是有序的。
隨后:
// Push run onto pending-run stack, and maybe merge ts.pushRun(lo, runLen); ts.mergeCollapse(); // Advance to find next run lo += runLen; nRemaining -= runLen;
當(dāng)前的 run
位于 a[lo:runLen]
,將其入棧,然后將棧中的 run
合并。
private void pushRun(int runBase, int runLen) { this.runBase[stackSize] = runBase; this.runLen[stackSize] = runLen; stackSize++; }
入棧過程簡單明了,不解釋。
再看另一個(gè)關(guān)鍵函數(shù),合并操作。如果你看過文章開頭提到的對 Timsort
進(jìn)行可視化的視頻,一定會(huì)對合并操作印象深刻。它會(huì)把已經(jīng)排序的 run
合并成一個(gè)大 run
,此大 run
也會(huì)排好序。
/** * Examines the stack of runs waiting to be merged and merges adjacent runs * until the stack invariants are reestablished: * * 1. runLen[i - 3] > runLen[i - 2] + runLen[i - 1] * 2. runLen[i - 2] > runLen[i - 1] * * This method is called each time a new run is pushed onto the stack, * so the invariants are guaranteed to hold for i < stackSize upon * entry to the method. */ private void mergeCollapse() { while (stackSize > 1) { int n = stackSize - 2; if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) { if (runLen[n - 1] < runLen[n + 1]) n--; mergeAt(n); } else if (runLen[n] <= runLen[n + 1]) { mergeAt(n); } else { break; // Invariant is established } } }
合并的過程會(huì)一直循環(huán)下去,一直到注釋里提到的循環(huán)不變式得到滿足。
mergeAt
會(huì)把棧頂?shù)膬蓚€(gè) run
合并起來:
/** * Merges the two runs at stack indices i and i+1. Run i must be * the penultimate or antepenultimate run on the stack. In other words, * i must be equal to stackSize-2 or stackSize-3. * * @param i stack index of the first of the two runs to merge */ private void mergeAt(int i) { assert stackSize >= 2; assert i >= 0; assert i == stackSize - 2 || i == stackSize - 3; int base1 = runBase[i]; int len1 = runLen[i]; int base2 = runBase[i + 1]; int len2 = runLen[i + 1]; assert len1 > 0 && len2 > 0; assert base1 + len1 == base2; /* * Record the length of the combined runs; if i is the 3rd-last * run now, also slide over the last run (which isn't involved * in this merge). The current run (i+1) goes away in any case. */ runLen[i] = len1 + len2; if (i == stackSize - 3) { runBase[i + 1] = runBase[i + 2]; runLen[i + 1] = runLen[i + 2]; } stackSize--; /* * Find where the first element of run2 goes in run1. Prior elements * in run1 can be ignored (because they're already in place). */ int k = gallopRight(a[base2], a, base1, len1, 0, c); assert k >= 0; base1 += k; len1 -= k; if (len1 == 0) return; /* * Find where the last element of run1 goes in run2. Subsequent elements * in run2 can be ignored (because they're already in place). */ len2 = gallopLeft(a[base1 + len1 - 1], a, base2, len2, len2 - 1, c); assert len2 >= 0; if (len2 == 0) return; // Merge remaining runs, using tmp array with min(len1, len2) elements if (len1 <= len2) mergeLo(base1, len1, base2, len2); else mergeHi(base1, len1, base2, len2); }
由于要合并的兩個(gè) run
是已經(jīng)排序的,所以合并的時(shí)候,有會(huì)特別的技巧。假設(shè)兩個(gè) run
是 run1,run2
,先用 gallopRight
在 run1
里使用 binarySearch
查找 run2 首元素
的位置 k
, 那么 run1
中 k
前面的元素就是合并后最小的那些元素。然后,在 run2
中查找 run1 尾元素
的位置 len2
,那么 run2
中 len2
后面的那些元素就是合并后最大的那些元素。最后,根據(jù)len1
與 len2
大小,調(diào)用 mergeLo
或者 mergeHi
將剩余元素合并。
gallop
和 merge
就不展開了。
另外,強(qiáng)烈推薦閱讀文后的兩篇文章,第一篇可以看到 JDK7 中更換排序算法后可能引發(fā)的問題,另外,也會(huì)介紹源代碼,并給出具體的例子。第二篇會(huì)告訴你如何對一個(gè) MergeSort
進(jìn)行優(yōu)化,介紹了 TimSort
背后的思想。
推薦閱讀