Arrays.sort的背后jdk干了什么
Arrays.sort的大体调用逻辑
大家一般在进行排序的时候都会调用到arrays.sort()方法进行排序,而排序的后面jdk为了我们做了很多事情。
一般我们调用的collections.sort(Comparator c) ,list.sort(Comparator c)方法,最后都会调用到arrays.sort。
public static <T> void sort(T[] a, Comparator<? super T> c) {
if (c == null) {
//没有Comparator,走ComparableTimSort排序逻辑
sort(a, fromIndex, toIndex);
} else {
//越位,范围检查
rangeCheck(a.length, fromIndex, toIndex);
//LegacyMergeSort.userRequested默认返回false
//legacyMergeSort(a, fromIndex, toIndex, c)方法要被时代淘汰了
if (LegacyMergeSort.userRequested)
legacyMergeSort(a, fromIndex, toIndex, c);
else
//主角TimSort排序
TimSort.sort(a, fromIndex, toIndex, c, null, 0, 0);
}
}
当规则对比器为空时调用ComparableTimSort的入口代码,ComparableTimSort是跟timsort一样原理的排序算法,只是元素的比较规则使用了一个默认对比规则。
public static void sort(Object[] a) {
if (LegacyMergeSort.userRequested)
legacyMergeSort(a);
else
ComparableTimSort.sort(a, 0, a.length, null, 0, 0);
}
legacyMergeSort排序的方法,方法上面就被标注了To be removed in a future release,光荣退休了。
/** To be removed in a future release. */
private static <T> void legacyMergeSort(T[] a, int fromIndex, int toIndex,
Comparator<? super T> c) {
T[] aux = copyOfRange(a, fromIndex, toIndex);
if (c==null)
mergeSort(aux, a, fromIndex, toIndex, -fromIndex);
else
mergeSort(aux, a, fromIndex, toIndex, -fromIndex, c);
}
然后就是今天的主角Timsort排序。
/**
* Sorts the given range, using the given workspace array slice
* for temp storage when possible. This method is designed to be
* invoked from public methods (in class Arrays) after performing
* any necessary array bounds checks and expanding parameters into
* the required forms.
*
* @param a the array to be sorted
* @param lo the index of the first element, inclusive, to be sorted
* @param hi the index of the last element, exclusive, to be sorted
* @param c the comparator to use
* @param work a workspace array (slice)
* @param workBase origin of usable space in work array
* @param workLen usable size of work array
* @since 1.8
*/
static <T> void sort(T[] a, int lo, int hi, Comparator<? super T> c,
T[] work, int workBase, int workLen) {
//合法性校验
assert c != null && a != null && lo >= 0 && lo <= hi && hi <= a.length;
//检查数组是否需要排序,元素个数在2以下说明已经具备顺序性
int nRemaining = hi - lo;
if (nRemaining < 2)
return;
//如果数组个数在32个以下,则不用进行归并排序,使用二分插入排序
//private static final int MIN_MERGE = 32;
if (nRemaining < MIN_MERGE) {
// 找出连续升序的最大个数
int initRunLen = countRunAndMakeAscending(a, lo, hi, c);
// 二分插入排序
binarySort(a, lo, hi, lo + initRunLen, c);
return;
}
//以下是走timsort排序
TimSort<T> ts = new TimSort<>(a, c, work, workBase, workLen);
//计算分区最小个数,少于这个长度就需要对其进行扩展。
int minRun = minRunLength(nRemaining);
do {
// 找出 排序数组中 一段升序数组 的长度
int runLen = countRunAndMakeAscending(a, lo, hi, c);
//如果归并分区个数小于之前计算出的minRun
if (runLen < minRun) {
int force = nRemaining <= minRun ? nRemaining : minRun;
// 二分插入排序
binarySort(a, lo, lo + force, lo + runLen, c);
runLen = force;
}
// 归并
ts.pushRun(lo, runLen);
ts.mergeCollapse();
// Advance to find next run
lo += runLen;
nRemaining -= runLen;
} while (nRemaining != 0);
// Merge all remaining runs to complete sort
assert lo == hi;
ts.mergeForceCollapse();
assert ts.stackSize == 1;
}
Timsort介绍
上面我们整理了一下arrays.sort()大体上的逻辑,在深入了解Timsort算法之前,我们先简单了解一下与Timsort相关的一些东西。
Timsort算法是Tim Peters于2002年提出的一个排序算法。
Timsort大体上是归并排序(merge sort),局部上是插入排序(insertion sort) ,在分区排好序的情况下进行了一些优化。对于已经部分排序的数组,时间复杂度远低于 O(n log(n)),最好可达 O(n),对于随机排序的数组,时间复杂度为 O(nlog(n)),平均时间复杂度 O(nlog(n))。
下面这张图,以最好情况,平均情况,最坏情况,内存,稳定性5个方面比较了一些常见排序算法。
Timsort排序算法在其中最优、最坏时间复杂度以及算法的稳定性上来说都是佼佼者,所以python,android,java和GNU Octave都引入了此算法。
Timsort原理
这是了解期间搜索到的一个演示视频Timsort排序视频演示,结合视频看代码比较能更好地理解。
(视频链接:https://www.youtube.com/watch?v=kPRA0W1kECg)
1. 确定最小分区
private static int minRunLength(int n) {
assert n >= 0;
int r = 0; // Becomes 1 if any 1 bits are shifted off
while (n >= MIN_MERGE) {
r |= (n & 1);
n >>= 1;
}
return n + r;
}
minRunLength方法决定了分区的最小值。MIN_MERGE默认为32,n为排序数组的长度。如果n小于MIN_MERGE,那么返回 n 本身。
否则会将 n 不断地右移,直到少于 MIN_MERGE,同时记录一个 r 值,r 代表最后一次移位n时,n最低位是0还是1。
最后返回 n + r,这也意味着只保留最高的 5 位,再加上第六位。
private static <T> int countRunAndMakeAscending(T[] a, int lo, int hi,
Comparator<? super T> c) {
assert lo < hi;
int runHi = lo + 1;
if (runHi == hi)
return 1;
// Find end of run, and reverse range if descending
if (c.compare(a[runHi++], a[lo]) < 0) { // Descending
while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) < 0)
runHi++;
reverseRange(a, lo, runHi);
} else { // Ascending
while (runHi < hi && c.compare(a[runHi], a[runHi - 1]) >= 0)
runHi++;
}
return runHi - lo;
}
寻找在数组中有一致顺序的一段,如果是降序则翻转。
这段具备顺序的片段为a,如果a大于等于分区最小长度则入栈等待归并;如果a小于分区最小长度,则将片段a后的区域(区域长度 = 最小分区长度 - 片段a长度),达到最小分区长度,进行分区排序。
2. 分区排序
@SuppressWarnings("fallthrough")
private static <T> void binarySort(T[] a, int lo, int hi, int start,
Comparator<? super T> c) {
assert lo <= start && start <= hi;
if (start == lo)
start++;
//遍历无序的后半端区域[start,hi]
for ( ; start < hi; start++) {
T pivot = a[start];
int left = lo;
int right = start;
assert left <= right;
//二分查找,查找出在[lo,start]区间中 元素满足 a[n]<pivot<a[n+1]
while (left < right) {
int mid = (left + right) >>> 1;
if (c.compare(pivot, a[mid]) < 0)
right = mid;
else
left = mid + 1;
}
assert left == right;
//在分区升序情况下 二分查找出 pivot 的位置 left和right
//将 此角标 到 原来pivot角标 的所有元素后移一位
int n = start - left; // The number of elements to move
// Switch is just an optimization for arraycopy in default case
switch (n) {
case 2: a[left + 2] = a[left + 1];
case 1: a[left + 1] = a[left];
break;
default: System.arraycopy(a, left, a, left + 1, n);
}
a[left] = pivot;
}
}
传入的分区[lo,hi],lo到start是有序的,start到hi是无序。这个函数的目的就是让分区[lo,hi]都是有序的。
因为[lo,start]是有序的,可以在[lo,start]进行二分查找,为[start,hi]部分的元素跟二分查找出来的mid元素进行比较,确认pivot在有序区[lo,hi]合适的位置,移动合适位置到原来pivot的index片段一个角标位置,对pivot在[lo,hi]进行插入,完成[lo,hi]数组的排序。
3. 归并
/**
* Pushes the specified run onto the pending-run stack.
*
* @param runBase index of the first element in the run
* @param runLen the number of elements in the run
*/
private void pushRun(int runBase, int runLen) {
this.runBase[stackSize] = runBase;
this.runLen[stackSize] = runLen;
stackSize++;
}
一个简单的记录排好序分区的方法。
/**
* 将栈顶的两个分区,进行合并,此时每个分区都已经是升序状态
*/
private void mergeCollapse() {
while (stackSize > 1) {
int n = stackSize - 2;
//当栈内元素大于3个元素 且第一个分区长度 小于等于 第2个分区长度 加 第3个分区长度
if (n > 0 && runLen[n-1] <= runLen[n] + runLen[n+1]) {
//第一个分区长度 小于 第3个分区长度
if (runLen[n - 1] < runLen[n + 1])
n--;
mergeAt(n);
//当栈内元素有且只有2个元素 且第一个分区长度 小于等于 第2个分区长度
} else if (runLen[n] <= runLen[n + 1]) {
mergeAt(n);
} else {
break; // Invariant is established
}
}
}
private void mergeAt(int i) {
assert stackSize >= 2;
assert i >= 0;
assert i == stackSize - 2 || i == stackSize - 3;
int base1 = runBase[i];
int len1 = runLen[i];
int base2 = runBase[i + 1];
int len2 = runLen[i + 1];
assert len1 > 0 && len2 > 0;
assert base1 + len1 == base2;
//合并分区
runLen[i] = len1 + len2;
//合并后移动栈中元素的位置
if (i == stackSize - 3) {
runBase[i + 1] = runBase[i + 2];
runLen[i + 1] = runLen[i + 2];
}
stackSize--;
//用第2个分区的第一个元素 在 第1个分区 寻找 它的合适位置index
//index之前是有序的,之后就是要归并 第2分区的所有元素 和 第1分区index之后的元素
int k = gallopRight(a[base2], a, base1, len1, 0, c);
assert k >= 0;
base1 += k;
len1 -= k;
if (len1 == 0)
return;
//与上面同理 找到run1的最后一个元素在run2中的位置。
//随后的元素在run2中可以忽略(因为它们已经就绪)。
len2 = gallopLeft(a[base1 + len1 - 1], a, base2, len2, len2 - 1, c);
assert len2 >= 0;
if (len2 == 0)
return;
// Merge remaining runs, using tmp array with min(len1, len2) elements
if (len1 <= len2)
mergeLo(base1, len1, base2, len2);
else
mergeHi(base1, len1, base2, len2);
}
由于要合并的两个 run 是已经排序的,所以合并的时候,有会特别的技巧。
假设两个 run 是 run1,run2 ,先用 gallopRight在 run1 里使用 binarySearch 查找 run2 首元素 的位置 k, 那么 run1 中 k 前面的元素就是合并后最小的那些元素。
然后,在 run2 中查找 run1 尾元素 的位置 len2 ,那么 run2 中 len2 后面的那些元素就是合并后最大的那些元素。
最后,根据len1 与 len2 大小,调用 mergeLo 或者 mergeHi 将剩余元素合并。
存在的问题
1. 归并时判断栈内元素隐患
在归并的表达式中,隐含了一个bug。
Timsort算法设置这个约束条件的是为了保证归并排序时两个子序列长度是均衡的,隐含的一层意思是栈内所有run都应该满足该约束条件(即使不在栈顶),在mergeCollapse()方法中即对2 <= i <= StackSize-1,也必须满足
runLen[i-2] > runLen[i-1] + runLen[i]
runLen[i-1] > runLen[i]
在大多数情况下,仅检查栈顶的3个run是否满足这个约束条件是可以保证整个栈内所有run均满足该约束条件。
但是在一些特殊的情况下就不行了,如下面这个栈(右侧为栈顶):
120, 80, 25, 20, 30
对照上面的源码,因为25 < 20 + 30,25 < 30,所以将25和20两个run进行合并,此时栈内的情况变为120, 80, 45, 30。
由于80 > 45 + 30,45 > 30,满足约束条件,此时归并就终止了。但是注意栈里的其他run,120 < 80 + 45,这是不满足约束条件的,而由于我们只判断了栈顶的run,因此在这里就留下了“隐患”。
在大多数情况下,这个问题没什么大不了,不影响我们平时一般的排序操作,因为我们的数据没有那么多,更不会大量出现上述情况。但是这个国外技术团队精心构造了一个Array,成功的让Timsort算法报了java.lang.ArrayIndexOutOfBoundsException这个错误。他们还把复现这个错误的代码放在了github上。
为什么是这个奇怪的错误?这就和Timsort算法的Java实现中申请栈空间的大小有关系了,看看原始代码:
/*
* Allocate runs-to-be-merged stack (which cannot be expanded). The
* stack length requirements are described in listsort.txt. The C
* version always uses the same stack length (85), but this was
* measured to be too expensive when sorting "mid-sized" arrays (e.g.,
* 100 elements) in Java. Therefore, we use smaller (but sufficiently
* large) stack lengths for smaller arrays. The "magic numbers" in the
* computation below must be changed if MIN_MERGE is decreased. See
* the MIN_MERGE declaration above for more information.
* The maximum value of 49 allows for an array up to length
* Integer.MAX_VALUE-4, if array is filled by the worst case stack size
* increasing scenario. More explanations are given in section 4 of:
* http://envisage-project.eu/wp-content/uploads/2015/02/sorting.pdf
*/
int stackLen = (len < 120 ? 5 :
len < 1542 ? 10 :
len < 119151 ? 24 : 49);
runBase = new int[stackLen];
runLen = new int[stackLen];
国外的这个技术团队在论文中算出了最坏情况下用到栈的大小,并画出了一张表。
第二行表示最坏情况下需要用到栈的大小,第三行表示Timsort算法实际给出的栈大小(见上文JDK源码)。
有意思的是在Array长度为65536时,最初Java中Timsort设定栈的长度为19,但是后来有人报告了Bug(https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8011944),也就是说这个Bug在实际中是出现过的。
然而Java开源社区的程序员可能无法定位这个Bug的根源,只好从表面解决这个问题,在后来的更新中把栈的大小改成了24……(不过确实也解决了问题)
但是隐患依然存在的,可以看到在Array长度为67108864时,最坏情况下用到的栈大小41,而Java中Timsort设定的长度为40。所以只要精心构造一个Array,就能触发这个Bug。
然而如本文开头所说,Java开源社区的程序员依然不从根本上解决问题,还是用老办法,增加栈的大小。
2. 对比器规则
在某些特殊情况下,会引发这个问题:
java.lang.IllegalArgumentException: Comparison method violates its general contract!
at java.util.TimSort.mergeHi(TimSort.java:899)
at java.util.TimSort.mergeAt(TimSort.java:516)
at java.util.TimSort.mergeForceCollapse(TimSort.java:457)
at java.util.TimSort.sort(TimSort.java:254)
at java.util.Arrays.sort(Arrays.java:1512)
问题原因是,对某些数据来说,上述代码会导致compare(a,b)<0并且compare(b,a)<0,也就是a<b && b<a。当这类数据遇到某些特殊情况时,就会发生这个异常。
假定输入数组a[] = {5,a,7,12,4,b,8,8},其中待归并的两个有序数组分别是{5,a,7,12}和{4,b,8,8}
假定b<7&&7>b。这样可以触发“特殊情况”,即:a和b在某一次归并操作后,会同时成为“是否移动元素”的临界条件。这样,在“特殊情况”下,优化后的归并操作可能陷入死循环。
解决方案
解决方案其实很简单,确保compare(a,b)操作中,如果a>b,那么b<a即可。更严格点说,compareTo()或compare()操作,必须满足以下条件:(x op y)的结果必须与(y op x)的结果相反。即,如果a>b,那么b<a。传递性。即,如果a>b, b>c,那么a>c。x=y时,(x op z) = ( y op z )
顺带一说,在重写Java api的时候,如果没有十足把握的话,可以将它委托给另一个Java基础类(如String、Integer等)的对应api实现上。例如冯庆的解决方案,本质上就是将compare(a,b)操作委托给了int来实现。
结语
给大家附上作者tim peters写的python之禅。
Beautiful is better than ugly.
Explicit is better than implicit.
Simple is better than complex.
Complex is better than complicated.
Flat is better than nested.
Sparse is better than dense.
Readability counts.
Special cases aren't special enough to break the rules.
Although practicality beats purity.
Errors should never pass silently.
Unless explicitly silenced.
In the face of ambiguity, refuse the temptation to guess.
There should be one— and preferably only one —obvious way to do it.
Although that way may not be obvious at first unless you're Dutch.
Now is better than never.
Although never is often better thanrightnow.
If the implementation is hard to explain, it's a bad idea.
If the implementation is easy to explain, it may be a good idea.
Namespaces are one honking great idea—let's do more of those!
译文
美丽胜于丑陋。
显式胜于隐式。
简单胜于复杂。
复杂胜于复杂。
扁平比嵌套更好。
稀疏胜于密实。
可读性很重要。
特殊情况还不足以打破规则。
尽管实用性胜过纯度。
错误绝不能默默传递。
除非明确地保持沉默。
面对模棱两可,拒绝猜测的诱惑。
应该有一种(最好只有一种)明显的方式来做到这一点。
尽管除非您是荷兰人,否则一开始这种方式可能并不明显。
现在总比没有好。
虽然从未往往比右了。
如果实现难以解释,那是个坏主意。
如果实现易于解释,则可能是个好主意。
命名空间是一个很棒的主意-让我们多做些吧!
关注得物技术,携手走向技术的云端
文|feng逸