20G大文件使用Java高效进行外排,算法 = 败者树 + 多路归并排序
业务场景
由于各种原因,数据库stone库和二级库数据同步有问题,现在对二级库做数据补漏。由于数据表的主键ID是UUID形式,更新时间字段没有索引,所以DBA直接否定了直接在数据库层面操作。解决方案改为将表数据(ID和update_time字段)以某种规则逐行导出,对两个文件根据ID分别进行排序,然后逐行比对,输出结果。此处我将介绍如果使用基于败者树的多路归并排序算法对20G的文本文件进行高效排序。
多路归并排序
多路归并排序算法在常见数据结构书中都有涉及。从2路到多路(k路),增大k可以减少外存信息读写时间,但k个归并段中选取最小的记录需要比较k-1次,为得到u个记录的一个有序段共需要(u-1)(k-1)次,若归并趟数为s次,那么对n个记录的文件进行外排时,内部归并过程中进行的总的比较次数为s(n-1)(k-1),若共有m个归并段,则s=logkm,所以总的比较次数为: (向上取整)(logkm)(k-1)(n-1)=(向上取整)(log2m/log2k)(k-1)(n-1),而(k-1)/log2k随k增而增因此内部归并时间随k增长而增长了,抵消了外存读写减少的时间,这样做不行,由此引出了“败者树”tree of loser的使用。在内部归并过程中利用败者树将k个归并段中选取最小记录比较的次数降为(向上取整)(log2k)次使总比较次数为(向上取整)(log2m)(n-1),与k无关。
败者树
叶子节点记录k个段中的最小数据,然后两两进行比赛。败者树是在双亲节点中记录下刚刚进行完的这场比赛的败者,让胜者去参加更高一层的比赛。决赛,根节点记录输者,所以需要重建一个新的根节点,记录胜者(如下图节点0)。
示例:我们以四路归并为例,假设每个归并段已经在输入缓冲区。
每路的第一个元素为胜利树的叶子节点,(5,7)比较出5胜出7失败成为其根节点,(29,9)比较9胜出29失败成为其根节点,胜者(5,9)进行下次的比赛9失败成为其根节点5胜出输出到输出缓冲区。由第一路归并段输出,所有将第一路归并段的第二个元素加到叶子节点如下图:
每路的第一个元素为胜利树的叶子节点,(5,7)比较出5胜出7失败成为其根节点,(29,9)比较9胜出29失败成为其根节点,胜者(5,9)进行下次的比赛9失败成为其根节点5胜出输出到输出缓冲区。由第一路归并段输出,所有将第一路归并段的第二个元素加到叶子节点如下图:
加入叶子节点16进行第二次的比较,跟胜利树一样,由于右子树叶子节点没有发生变化其右子树不用再继续比较。
败者树的实现
/** * 使用败者树,对已经排序的子文件进行多路归并排序 * 时间复杂度:O(nlogk),n为总数据量,k为子文件数量 * * @author Saint * @version 1.0 * @date 2020/7/15 17:58 */public class FailedTree { /** * 败者树(有K个节点 --> k为子文件的数量),保存失败下标。tree[0]保存最小值的下标(胜利者) */ private Integer[] tree = null; /** * 叶子节点数组的个数(即 K路归并中的K) */ private int size = 0; /** * 叶子节点(必须是可以比较的对象) */ private ArrayList leaves = null; /** * 初始化最小值 */ private static final Integer MIN_KEY = -1; /** * 失败者树构造函数 * * @param initValues 初始化叶子节点的数组,即各个归并段的首元素组成的数组 */ public FailedTree(ArrayList initValues) { this.leaves = initValues; this.size = initValues.size(); this.tree = new Integer[size]; //初始化败者树(其实就是一个普通的二叉树,共有2k - 1个节点) for (int i = 0; i < size; i++) { //初始化时,树种各个节点值设为最小值 tree[i] = MIN_KEY; } //初始化,从最后一个(最小值最大的那个)节点开始调整,k次(小文件个数)调整 for (int i = size - 1; i >= 0; i--) { adjust(i); } } /** * 从底向上调整数结构 * * @param s 叶子节点数组的下标(第几个文件) */ private void adjust(int s) { // tree[t] 是 leaves[s] 的父节点 int t = (s + size) / 2; while (t > 0) { //如果叶子节点值大于父节点(保存的下标)指向的值 if (s >= 0 && (tree[t] == -1 || leaves.get(s).compareTo(leaves.get(tree[t])) > 0)) { //父节点保存其下标:总是保存较大的(败者)。 较小值的下标(用s记录)->向上传递 int temp = s; s = tree[t]; tree[t] = temp; } // tree[Integer/2] 是 tree[Integer] 的父节点 t /= 2; } //最后的胜者(最小值) tree[0] = s; } /** * 给叶子节点赋值 * * @param leaf 叶子节点值 * @param s 叶子节点的下标 */ public void add(String leaf, int s) { leaves.set(s, (T) leaf); //每次赋值之后,都要向上调整,使根节点保存最小值的下标(找到当前最小值) adjust(s); } /** * 删除叶子节点,即一个归并段元素取空 * * @param s 叶子节点的下标 */ public void del(int s) { //删除叶子节点 leaves.remove(s); this.size--; this.tree = new Integer[size]; //初始化败者树(严格的说,此时它只是一个普通的二叉树) for (int i = 0; i < size; i++) { //初始化时,树中各个节点值设为可能的最小值 tree[i] = MIN_KEY; } //从最后一个节点开始调整 for (int i = size - 1; i >= 0; i--) { adjust(i); } } /** * 根据下标找到叶子节点(取值) * * @param s 叶子节点下标 * @return */ public T getLeaf(int s) { return leaves.get(s); } /** * 获得胜者(值为最终胜出的叶子节点的下标) * * @return */ public Integer getWinner() { return tree.length > 0 ? tree[0] : null; } /*public static void main(String[] args) { //假设当前有 4 个归并段 Queue queue0 = new LinkedList(); Queue queue1 = new LinkedList(); Queue queue2 = new LinkedList(); Queue queue3 = new LinkedList(); String[] source0 = {"c","e","f"}; String[] source1 = {"b"}; String[] source2 = {"a"}; String[] source3 = {"d","g","h"}; queue0.addAll(Arrays.asList(source0)); queue1.addAll(Arrays.asList(source1)); queue2.addAll(Arrays.asList(source2)); queue3.addAll(Arrays.asList(source3)); Queue[] sources = new Queue[4]; sources[0] = queue0; sources[1] = queue1; sources[2] = queue2; sources[3] = queue3; //进行 4 路归并 ArrayList initValues = new ArrayList<>(sources.length); for (int i = 0; i < sources.length; i++) { initValues.add(sources[i].poll()); } //初始化败者树 FailedTree loserTree = new FailedTree(initValues); //输出胜者 Integer s = loserTree.getWinner(); System.out.println(loserTree.getLeaf(s) + " "); while (sources.length > 0) { //新增叶子节点 String newLeaf = sources[s].poll(); if (newLeaf == null) { //如果多个归并段的长度不一样,这里的逻辑是有问题的。逻辑纠错请看MergeSortedTxt类中。 // sources[s] 对应的队列(归并段)已经为空,删除队列并调整败者树 loserTree.del(s); } else { loserTree.add(newLeaf, s); } s = loserTree.getWinner(); if (s == null) { break; } //输出胜者 System.out.println(loserTree.getLeaf(s) + " "); } }*/}
如果大家对如何从文件中获取数据然后进行外排不感兴趣,也就可以到此为止了。
对多个有序文件进行归并排序
import java.io.*;import java.util.ArrayList;/** * 合并排序后的多个TXT文件 * @author Saint * @version 1.0 * @date 2020/7/15 16:56 */public class MergeSortedTxt { public static void merge(ArrayList list) { int fileSize = list.size(); if (fileSize == 1) { return; } //这里的输出文件命名规则,大家按需更改 //输出文件名 String fileName = list.get(0).getPath(); int beginIndex = fileName.lastIndexOf("\\"); int endIndex = fileName.lastIndexOf("_sorted.txt"); String outFilePath = fileName.substring(0, beginIndex) + fileName.substring(beginIndex, endIndex - 1) + "_sorted.txt"; File outFile = new File(outFilePath); //输出文件 BufferedWriter out = null; //进行多路归并的叶子节点数组 ArrayList leaves = new ArrayList<>(fileSize); try { out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(outFile))); //每个归并段生成一个输入流 ArrayList inputList = new ArrayList<>(); for (int i = 0; i < fileSize; i++) { BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(list.get(i)))); inputList.add(i, reader); } //初始化叶子数组 String data = ""; for (int i = 0; i < inputList.size(); i++) { data = inputList.get(i).readLine(); leaves.add(data); } //初始化败者树 FailedTree failedTree = new FailedTree(leaves); //输出胜者 Integer s = failedTree.getWinner(); out.write(failedTree.getLeaf(s) + ""); out.newLine(); out.flush(); while (inputList.size() > 0) { //新增叶子节点 String newLeaf = inputList.get(s).readLine(); //如果文件读取完成,关闭读取流,删除叶子几点 if (newLeaf == null || newLeaf.equals("")) { //当一个文件读取完成之后,关闭文件输入流、移除出List集合。 inputList.get(s).close(); int remove = s; inputList.remove(remove); failedTree.del(s); } else { failedTree.add(newLeaf, s); } s = failedTree.getWinner(); if (s == null) { break; } //输出胜利者 out.write(failedTree.getLeaf(s) + ""); out.newLine(); out.flush(); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { //关闭输出流 out.close(); } catch (IOException e) { e.printStackTrace(); } } } /*public static void main(String[] args) { ArrayList list = new ArrayList<>(); //大家自行添加几个有序的文件进行测试 list.add(new File("D:/数据1_sorted.txt")); list.add(new File("D:/数据2_sorted.txt")); list.add(new File("D:/数据3_sorted.txt")); long startTime = System.currentTimeMillis(); LocalDateTime time = LocalDateTime.now(); System.out.println(time + " 开始处理"); merge(list); long endTime = System.currentTimeMillis(); System.out.println("耗费时间: " + (endTime - startTime) + " ms"); }*/}
总结
在每个文件有100万行数据的情况下,排序1千万行数据需要大约60秒,2千万行数据(1.75G)大约120秒,3千万行数据大约130秒。 整体的时间复杂度为O(nLog(m)),其中N为数据总行数,m为多路排序的路数(即多少个有序的文件)。 欢迎大牛指出更高效的文件的I/O处理方式,有兴趣的同学可以了解一下Spark,其对文件的处理效率非常高。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
暂时没有评论,来抢沙发吧~