Split过程源码分析

split来源:

    1、Memstore flush时直接CompactSplitThread.requestSplit。
    2、HBaseAdmin客户端发起的请求,HRegionServer收到后,转CompactSplitThread.requestSplit处理。

    下面介绍HBaseAdmin发起请求的实现细节:
    client通过界面发起action请求,调用org.apache.hadoop.hbase.generated.master类中的_jspService方法,
    下面为方法中的代码片段:
    可以看到如果发起split请求,会调用HbaseAdmin的split方法,下面进入到org.apache.hadoop.hbase.client.HbaseAdmin的split方法中,此时的第二个参数默认为null。
1
/**
2
   * Split a table or an individual region.
3
   * Asynchronous operation.
4
   */
5
public void split(final byte[] tableNameOrRegionName,
6
          final byte [] splitPoint) throws IOException, InterruptedException {
7
    CatalogTracker ct = getCatalogTracker();
8
    try {
9
        Pair<HRegionInfo, ServerName> regionServerPair = getRegion(tableNameOrRegionName, ct);
10
        if (regionServerPair != null) {
11
            if (regionServerPair.getSecond() == null) {
12
                throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));
13
            } else {
14
                split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
15
            }
16
        } 
17
        ...
18
}
    接着进入到split(pair.getSecond(), pair.getFirst(), splitPoint)方法中去,在此方法中获取到相应的regionserver,并在ProtobufUtil内部调用此regionserver的split方法:
    下面ProtobufUtil的split方法内:
1
/**
2
   * A helper to split a region using admin protocol.
3
   */
4
public static void split(final AdminService.BlockingInterface admin,
5
                         final HRegionInfo hri, byte[] splitPoint) throws IOException {
6
    SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint);
7
    try {
8
        admin.splitRegion(null, request);
9
    } catch (ServiceException se) {
10
        throw ProtobufUtil.getRemoteException(se);
11
    }
12
}
   
 再进入BlockingInterface的splitRegion方法内如下
    可以看到HRegionServer类实现了BlockingInterface接口,并且实现了相应的splitRegion方法。
1
/**
2
 * HRegionServer makes a set of HRegions available to clients. It checks in with
3
 * the HMaster. There are many HRegionServers in a single HBase deployment.
4
 */
5
@InterfaceAudience.Private
6
@SuppressWarnings("deprecation")
7
public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
8
  AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
9
  HBaseRPCErrorHandler, LastSequenceId {
10
    
11
  ...
12
      
13
  /**
14
   * Split a region on the region server.
15
   *
16
   * @param controller the RPC controller
17
   * @param request the request
18
   * @throws ServiceException
19
   */
20
  @Override
21
  @QosPriority(priority=HConstants.HIGH_QOS)
22
  public SplitRegionResponse splitRegion(final RpcController controller,
23
      final SplitRegionRequest request) throws ServiceException {
24
    try {
25
      checkOpen();
26
      requestCount.increment();
27
      HRegion region = getRegion(request.getRegion());
28
      region.startRegionOperation(Operation.SPLIT_REGION);
29
      LOG.info("Splitting " + region.getRegionNameAsString());
30
      long startTime = EnvironmentEdgeManager.currentTimeMillis();
31
      HRegion.FlushResult flushResult = region.flushcache();
32
      if (flushResult.isFlushSucceeded()) {
33
        long endTime = EnvironmentEdgeManager.currentTimeMillis();
34
        metricsRegionServer.updateFlushTime(endTime - startTime);
35
      }
36
      byte[] splitPoint = null;
37
      if (request.hasSplitPoint()) {
38
        splitPoint = request.getSplitPoint().toByteArray();
39
      }
40
      region.forceSplit(splitPoint);
41
      //CompactSplitThread发起split请求,region.checkSplit()会获取到split的midkey
42
      compactSplitThread.requestSplit(region, region.checkSplit(), RpcServer.getRequestUser());
43
      ...
44
 
45
  }
46
}
    首先进入到region.checkSplit()内部:
    进入getSplitPoint内,此方法会获取到regionserver中最大的一个store,并拿到该store的:
1
/**
2
   * @return the key at which the region should be split, or null
3
   * if it cannot be split. This will only be called if shouldSplit
4
   * previously returned true.
5
   */
6
protected byte[] getSplitPoint() {
7
    byte[] explicitSplitPoint = this.region.getExplicitSplitPoint();
8
    if (explicitSplitPoint != null) {
9
        return explicitSplitPoint;
10
    }
11
    Map<byte[], Store> stores = region.getStores();
12
13
    byte[] splitPointFromLargestStore = null;
14
    long largestStoreSize = 0;
15
    for (Store s : stores.values()) {
16
        byte[] splitPoint = s.getSplitPoint();
17
        long storeSize = s.getSize();
18
        if (splitPoint != null && largestStoreSize < storeSize) {
19
            splitPointFromLargestStore = splitPoint;
20
            largestStoreSize = storeSize;
21
        }
22
    }
23
24
    return splitPointFromLargestStore;
25
}
    进到HStore的getSplitPoint方法内:
    
   接着进入到StoreFile的getFileSplitPoint方法内,此方法中主要是返回最大store的最大一个storefile的中间一个block的第一个key:
1
@Override
2
public final byte[] getSplitPoint() throws IOException {
3
    if (this.storefiles.isEmpty()) {
4
        return null;
5
    }
6
    return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);
7
}
    

    HBase还规定,如果定位到的rowkey是整个文件的首个rowkey或者最后一个rowkey的话,就认为没有切分点。

    什么情况下会出现没有切分点的场景呢?最常见的就是一个文件只有一个block,执行split的时候就会发现无法切分。很多新同学在测试split的时候往往都是新建一张新表,然后往新表中插入几条数据并执行一下flush,再执行split,奇迹般地发现数据表并没有真正执行切分。原因就在这里,这个时候仔细的话你翻看debug日志是可以看到这样的日志滴:

Split过程源码分析


    再回到之前CompactSplitThread的requestSplit方法内:
1
/*
2
   * The User parameter allows the split thread to assume the correct user identity
3
   */
4
public synchronized void requestSplit(final HRegion r, byte[] midKey, User user) {
5
    if (midKey == null) {
6
        LOG.debug("Region " + r.getRegionNameAsString() +
7
                  " not splittable because midkey=null");
8
        if (r.shouldForceSplit()) {
9
            r.clearSplit();
10
        }
11
        return;
12
    }
13
    try {
14
        //线程池中调用SplitRequest类的doSplitting方法
15
        this.splits.execute(new SplitRequest(r, midKey, this.server, user));
16
        if (LOG.isDebugEnabled()) {
17
            LOG.debug("Split requested for " + r + ".  " + this);
18
        }
19
    } catch (RejectedExecutionException ree) {
20
        LOG.info("Could not execute split for " + r, ree);
21
    }
22
}
    下面进入到SplitRequest的soSplitting方法内,此方法中主要有两个阶段prepare方法和execute方法,在这之前会把midkey赋值给SplitTransaction类的splitrow变量,这个变量会在后面创建reference files的时候用到,用于比较store
    关于SplitTransaction的prepare方法主要是用于初始化两个子region:
1
/**
2
   * Does checks on split inputs.
3
   * @return <code>true</code> if the region is splittable else
4
   * <code>false</code> if it is not (e.g. its already closed, etc.).
5
   */
6
public boolean prepare() {
7
    if (!this.parent.isSplittable()) return false;
8
    // Split key can be null if this region is unsplittable; i.e. has refs.
9
    if (this.splitrow == null) return false;
10
    HRegionInfo hri = this.parent.getRegionInfo();
11
    parent.prepareToSplit();
12
    // Check splitrow.
13
    byte [] startKey = hri.getStartKey();
14
    byte [] endKey = hri.getEndKey();
15
    if (Bytes.equals(startKey, splitrow) ||
16
        !this.parent.getRegionInfo().containsRow(splitrow)) {
17
        LOG.info("Split row is not inside region key range or is equal to " +
18
                 "startkey: " + Bytes.toStringBinary(this.splitrow));
19
        return false;
20
    }
21
    long rid = getDaughterRegionIdTimestamp(hri);
22
    this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
23
    this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
24
    this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
25
    return true;
26
}
    重点在于SplitTransaction的execute方法内部,在执行完createDaughters方法,会执行stepsAfterPONR方法,这个方法内部会执行openDaughters方法,在这个方法中会调用openDaughterRegion方法,然后对子region做一些初始化操作
    进入到SplitTransaction的createDaughters方法内,获取parent region的写锁
1
/* package */PairOfSameType<HRegion> createDaughters(final Server server,
2
      final RegionServerServices services, User user) throws IOException {
3
    ...
4
5
    PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
6
7
    final List<Mutation> metaEntries = new ArrayList<Mutation>();
8
    boolean ret = false;
9
    
10
    ...
11
    return daughterRegions;
12
}
    
   进入到SplitTransaction的stepsBeforePONR方法内,在这个方法内会通过SplitTransaction的createNodeSplitting方法标识父region在splitting,在journal中加入开始split的日志,在zookeeper上创建splitting文件夹。并且通过splitStoreFiles方法创建reference files去引用两个子region中的hfile在父region中的位置。
    进入splitStoreFiles内部,其中会遍历整个hstoreFilesToSplit,这里面是父region的所有store,这个值是从上一层传过来的:
1
/**
2
   * Creates reference files for top and bottom half of the
3
   * @param hstoreFilesToSplit map of store files to create half file references for.
4
   * @return the number of reference files that were created.
5
   * @throws IOException
6
   */
7
private Pair<Integer, Integer> splitStoreFiles(final Map<byte[], 
8
      List<StoreFile>> hstoreFilesToSplit) throws IOException {
9
    if (hstoreFilesToSplit == null) {
10
        // Could be null because close didn't succeed -- for now consider it fatal
11
        throw new IOException("Close returned empty list of StoreFiles");
12
    }
13
    // The following code sets up a thread pool executor with as many slots as
14
    // there's files to split. It then fires up everything, waits for
15
    // completion and finally checks for any exception
16
    int nbFiles = 0;
17
    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
18
        nbFiles += entry.getValue().size();
19
    }
20
    if (nbFiles == 0) {
21
        // no file needs to be splitted.
22
        return new Pair<Integer, Integer>(0,0);
23
    }
24
    // Default max #threads to use is the smaller of table's configured number of blocking store
25
    // files or the available number of logical cores.
26
    int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
27
                HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
28
                Runtime.getRuntime().availableProcessors());
29
    // Max #threads is the smaller of the number of storefiles or the default max determined above.
30
    int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, defMaxThreads), nbFiles);
31
    LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
32
             " using " + maxThreads + " threads");
33
    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
34
    builder.setNameFormat("StoreFileSplitter-%1$d");
35
    ThreadFactory factory = builder.build();
36
    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
37
    List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
38
39
    // Split each store file.
40
    for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
41
        for (StoreFile sf: entry.getValue()) {
42
            StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
43
            futures.add(threadPool.submit(sfs));
44
        }
45
    }
46
    // Shutdown the pool
47
    threadPool.shutdown();
48
49
    // Wait for all the tasks to finish
50
    try {
51
        boolean stillRunning = !threadPool.awaitTermination(this.fileSplitTimeout, TimeUnit.MILLISECONDS);
52
        if (stillRunning) {
53
            threadPool.shutdownNow();
54
            // wait for the thread to shutdown completely.
55
            while (!threadPool.isTerminated()) {
56
                Thread.sleep(50);
57
            }
58
            throw new IOException("Took too long to split the" + " files and create the references, aborting split");
59
        }
60
    } catch (InterruptedException e) {
61
        throw (InterruptedIOException)new InterruptedIOException().initCause(e);
62
    }
63
64
    int created_a = 0;
65
    int created_b = 0;
66
    // Look for any exception
67
    for (Future<Pair<Path, Path>> future : futures) {
68
        try {
69
            Pair<Path, Path> p = future.get();
70
            created_a += p.getFirst() != null ? 1 : 0;
71
            created_b += p.getSecond() != null ? 1 : 0;
72
        } catch (InterruptedException e) {
73
            throw (InterruptedIOException) new InterruptedIOException().initCause(e);
74
        } catch (ExecutionException e) {
75
            throw new IOException(e);
76
        }
77
    }
78
79
    if (LOG.isDebugEnabled()) {
80
        LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
81
                  + " storefiles, Daugther B: " + created_b + " storefiles.");
82
    }
83
    return new Pair<Integer, Integer>(created_a, created_b);
84
}
    回到上一层hstoreFilesToSplit的值的来源,进入this.parent.close(false)方法,最终这个doClose方法在HRegion内部,所有的store来源于HRegion内的 stores成员变量,如下所示:
1
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
2
    throws IOException {
3
    if (isClosed()) {
4
        LOG.warn("Region " + this + " already closed");
5
        return null;
6
    }
7
8
    if (coprocessorHost != null) {
9
        status.setStatus("Running coprocessor pre-close hooks");
10
        this.coprocessorHost.preClose(abort);
11
    }
12
13
    status.setStatus("Disabling compacts and flushes for region");
14
    synchronized (writestate) {
15
        // Disable compacting and flushing by background threads for this
16
        // region.
17
        writestate.writesEnabled = false;
18
        LOG.debug("Closing " + this + ": disabling compactions & flushes");
19
        waitForFlushesAndCompactions();
20
    }
21
    // If we were not just flushing, is it worth doing a preflush...one
22
    // that will clear out of the bulk of the memstore before we put up
23
    // the close flag?
24
    if (!abort && worthPreFlushing()) {
25
        status.setStatus("Pre-flushing region before close");
26
        LOG.info("Running close preflush of " + this.getRegionNameAsString());
27
        try {
28
            internalFlushcache(status);
29
        } catch (IOException ioe) {
30
            // Failed to flush the region. Keep going.
31
            status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
32
        }
33
    }
34
35
    // block waiting for the lock for closing
36
    lock.writeLock().lock();
37
    this.closing.set(true);
38
    status.setStatus("Disabling writes for close");
39
    try {
40
        if (this.isClosed()) {
41
            status.abort("Already got closed by another process");
42
            // SplitTransaction handles the null
43
            return null;
44
        }
45
        LOG.debug("Updates disabled for region " + this);
46
        // Don't flush the cache if we are aborting
47
        if (!abort) {
48
            int flushCount = 0;
49
            while (this.getMemstoreSize().get() > 0) {
50
                try {
51
                    if (flushCount++ > 0) {
52
                        int actualFlushes = flushCount - 1;
53
                        if (actualFlushes > 5) {
54
                            // If we tried 5 times and are unable to clear memory, abort
55
                            // so we do not lose data
56
                            throw new DroppedSnapshotException("Failed clearing memory after " +
57
                                  actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
58
                        }
59
                        LOG.info("Running extra flush, " + actualFlushes + " (carrying snapshot?) " + this);
60
                    }
61
                    internalFlushcache(status);
62
                } catch (IOException ioe) {
63
                    status.setStatus("Failed flush " + this + ", putting online again");
64
                    synchronized (writestate) {
65
                        writestate.writesEnabled = true;
66
                    }
67
                    // Have to throw to upper layers.  I can't abort server from here.
68
                    throw ioe;
69
                }
70
            }
71
        }
72
73
        Map<byte[], List<StoreFile>> result =
74
            new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
75
        if (!stores.isEmpty()) {
76
            // initialize the thread pool for closing stores in parallel.
77
            ThreadPoolExecutor storeCloserThreadPool =
78
                getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
79
            CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
80
                new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
81
82
            // close each store in parallel
83
            for (final Store store : stores.values()) {
84
                long flushableSize = store.getFlushableSize();
85
                if (!(abort || flushableSize == 0)) {
86
                    getRegionServerServices().abort("Assertion failed while closing store "
87
                         + getRegionInfo().getRegionNameAsString() + " " + store
88
                         + ". flushableSize expected=0, actual= " + flushableSize
89
                         + ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
90
                         + "operation failed and left the memstore in a partially updated state.", null);
91
                }
92
                completionService.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
93
                        @Override
94
                        public Pair<byte[], Collection<StoreFile>> call() throws IOException {
95
                            return new Pair<byte[], Collection<StoreFile>>(
96
                                store.getFamily().getName(), store.close());
97
                        }
98
                    });
99
            }
100
            try {
101
                for (int i = 0; i < stores.size(); i++) {
102
                    Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
103
                    Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
104
                    List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
105
                    if (familyFiles == null) {
106
                        familyFiles = new ArrayList<StoreFile>();
107
                        result.put(storeFiles.getFirst(), familyFiles);
108
                    }
109
                    familyFiles.addAll(storeFiles.getSecond());
110
                }
111
            } catch (InterruptedException e) {
112
                throw (InterruptedIOException)new InterruptedIOException().initCause(e);
113
            } catch (ExecutionException e) {
114
                throw new IOException(e.getCause());
115
            } finally {
116
                storeCloserThreadPool.shutdownNow();
117
            }
118
        }
119
        this.closed.set(true);
120
        if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
121
        if (coprocessorHost != null) {
122
            status.setStatus("Running coprocessor post-close hooks");
123
            this.coprocessorHost.postClose(abort);
124
        }
125
        if ( this.metricsRegion != null) {
126
            this.metricsRegion.close();
127
        }
128
        if ( this.metricsRegionWrapper != null) {
129
            Closeables.closeQuietly(this.metricsRegionWrapper);
130
        }
131
        status.markComplete("Closed");
132
        LOG.info("Closed " + this);
133
        return result;
134
    } finally {
135
        lock.writeLock().unlock();
136
    }
137
}
138
    sotres这个成员变量会在初始化regionStores的时候赋值,此方法在HRegion类中:
    上面就是parent.close()的获取stores的流程,下面进入到splitStoreFile内部,查看如何创建reference files的:
1
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
2
    throws IOException {
3
    if (LOG.isDebugEnabled()) {
4
        LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
5
                  this.parent);
6
    }
7
    HRegionFileSystem fs = this.parent.getRegionFileSystem();
8
    String familyName = Bytes.toString(family);
9
    Path path_a = fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
10
                          this.parent.getSplitPolicy());
11
    Path path_b = fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
12
                          this.parent.getSplitPolicy());
13
    if (LOG.isDebugEnabled()) {
14
        LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
15
                  this.parent);
16
    }
17
    return new Pair<Path,Path>(path_a, path_b);
18
}
    在下面的方法中,如果top为true,则splitKey会与storefile中的firstKey做比较,如果大于firstKey则会为第一个子region添加reference file,如果top为false,则splitKey会与storefile中的lastKey做比较,如果小于lastKey则会为第二个子region添加reference file,大部分hifle会分别在a,b中生成一个reference file。
    下面再进入createDaughterRegionFromSplits方法内部,会将reference files移到相应的子region目录下去,然后创建子region实例并返回:
1
/**
2
   * Create a daughter region from given a temp directory with the region data.
3
   * @param hri Spec. for daughter region to open.
4
   * @throws IOException
5
   */
6
HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
7
    // Move the files from the temporary .splits to the final /table/region directory
8
    fs.commitDaughterRegion(hri);
9
10
    // Create the daughter HRegion instance
11
    HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
12
                                   this.getBaseConf(), hri, this.getTableDesc(), rsServices);
13
    r.readRequestsCount.set(this.getReadRequestsCount() / 2);
14
    r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
15
    return r;
16
}
    以上便是split一个region的主要过程,整个流程图如下:
Split过程源码分析
Split过程源码分析
Split过程源码分析