Split过程源码分析
split来源:
1、Memstore flush时直接CompactSplitThread.requestSplit。
2、HBaseAdmin客户端发起的请求,HRegionServer收到后,转CompactSplitThread.requestSplit处理。
下面介绍HBaseAdmin发起请求的实现细节:
client通过界面发起action请求,调用org.apache.hadoop.hbase.generated.master类中的_jspService方法,
下面为方法中的代码片段:
1
...
2
3
if (action.equals("split")) {
4
if (key != null && key.length() > 0) {
5
hbadmin.split(key);
6
} else {
7
hbadmin.split(fqtn);
8
}
9
out.write(" Split request accepted. ");
10
11
}
12
...
可以看到如果发起split请求,会调用HbaseAdmin的split方法,下面进入到org.apache.hadoop.hbase.client.HbaseAdmin的split方法中,此时的第二个参数默认为null。
1
/**
2
* Split a table or an individual region.
3
* Asynchronous operation.
4
*/
5
public void split(final byte[] tableNameOrRegionName,
6
final byte [] splitPoint) throws IOException, InterruptedException {
7
CatalogTracker ct = getCatalogTracker();
8
try {
9
Pair<HRegionInfo, ServerName> regionServerPair = getRegion(tableNameOrRegionName, ct);
10
if (regionServerPair != null) {
11
if (regionServerPair.getSecond() == null) {
12
throw new NoServerForRegionException(Bytes.toStringBinary(tableNameOrRegionName));
13
} else {
14
split(regionServerPair.getSecond(), regionServerPair.getFirst(), splitPoint);
15
}
16
}
17
...
18
}
接着进入到split(pair.getSecond(), pair.getFirst(), splitPoint)方法中去,在此方法中获取到相应的regionserver,并在ProtobufUtil内部调用此regionserver的split方法:
1
private void split(final ServerName sn, final HRegionInfo hri,
2
byte[] splitPoint) throws IOException {
3
if (hri.getStartKey() != null && splitPoint != null &&
4
Bytes.compareTo(hri.getStartKey(), splitPoint) == 0) {
5
throw new IOException("should not give a splitkey which equals to startkey!");
6
}
7
AdminService.BlockingInterface admin = this.connection.getAdmin(sn);
8
ProtobufUtil.split(admin, hri, splitPoint);
9
}
下面ProtobufUtil的split方法内:
1
/**
2
* A helper to split a region using admin protocol.
3
*/
4
public static void split(final AdminService.BlockingInterface admin,
5
final HRegionInfo hri, byte[] splitPoint) throws IOException {
6
SplitRegionRequest request = RequestConverter.buildSplitRegionRequest(hri.getRegionName(), splitPoint);
7
try {
8
admin.splitRegion(null, request);
9
} catch (ServiceException se) {
10
throw ProtobufUtil.getRemoteException(se);
11
}
12
}
再进入BlockingInterface的splitRegion方法内如下:
1
public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionResponse splitRegion(
2
com.google.protobuf.RpcController controller,
3
org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest request)
4
throws com.google.protobuf.ServiceException;
可以看到HRegionServer类实现了BlockingInterface接口,并且实现了相应的splitRegion方法。
1
/**
2
* HRegionServer makes a set of HRegions available to clients. It checks in with
3
* the HMaster. There are many HRegionServers in a single HBase deployment.
4
*/
5
@InterfaceAudience.Private
6
@SuppressWarnings("deprecation")
7
public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,
8
AdminProtos.AdminService.BlockingInterface, Runnable, RegionServerServices,
9
HBaseRPCErrorHandler, LastSequenceId {
10
11
...
12
13
/**
14
* Split a region on the region server.
15
*
16
* @param controller the RPC controller
17
* @param request the request
18
* @throws ServiceException
19
*/
20
@Override
21
@QosPriority(priority=HConstants.HIGH_QOS)
22
public SplitRegionResponse splitRegion(final RpcController controller,
23
final SplitRegionRequest request) throws ServiceException {
24
try {
25
checkOpen();
26
requestCount.increment();
27
HRegion region = getRegion(request.getRegion());
28
region.startRegionOperation(Operation.SPLIT_REGION);
29
LOG.info("Splitting " + region.getRegionNameAsString());
30
long startTime = EnvironmentEdgeManager.currentTimeMillis();
31
HRegion.FlushResult flushResult = region.flushcache();
32
if (flushResult.isFlushSucceeded()) {
33
long endTime = EnvironmentEdgeManager.currentTimeMillis();
34
metricsRegionServer.updateFlushTime(endTime - startTime);
35
}
36
byte[] splitPoint = null;
37
if (request.hasSplitPoint()) {
38
splitPoint = request.getSplitPoint().toByteArray();
39
}
40
region.forceSplit(splitPoint);
41
//CompactSplitThread发起split请求,region.checkSplit()会获取到split的midkey
42
compactSplitThread.requestSplit(region, region.checkSplit(), RpcServer.getRequestUser());
43
...
44
45
}
46
}
首先进入到region.checkSplit()内部:
1
/**
2
* Return the splitpoint. null indicates the region isn't splittable
3
* If the splitpoint isn't explicitly specified, it will go over the stores
4
* to find the best splitpoint. Currently the criteria of best splitpoint
5
* is based on the size of the store.
6
*/
7
public byte[] checkSplit() {
8
...
9
if (!splitPolicy.shouldSplit()) {
10
return null;
11
}
12
13
byte[] ret = splitPolicy.getSplitPoint();
14
...
15
return ret;
16
}
进入getSplitPoint内,此方法会获取到regionserver中最大的一个store,并拿到该store的:
1
/**
2
* @return the key at which the region should be split, or null
3
* if it cannot be split. This will only be called if shouldSplit
4
* previously returned true.
5
*/
6
protected byte[] getSplitPoint() {
7
byte[] explicitSplitPoint = this.region.getExplicitSplitPoint();
8
if (explicitSplitPoint != null) {
9
return explicitSplitPoint;
10
}
11
Map<byte[], Store> stores = region.getStores();
12
13
byte[] splitPointFromLargestStore = null;
14
long largestStoreSize = 0;
15
for (Store s : stores.values()) {
16
byte[] splitPoint = s.getSplitPoint();
17
long storeSize = s.getSize();
18
if (splitPoint != null && largestStoreSize < storeSize) {
19
splitPointFromLargestStore = splitPoint;
20
largestStoreSize = storeSize;
21
}
22
}
23
24
return splitPointFromLargestStore;
25
}
进到HStore的getSplitPoint方法内:
1
@Override
2
public byte[] getSplitPoint() {
3
this.lock.readLock().lock();
4
try {
5
// Should already be enforced by the split policy!
6
assert !this.getRegionInfo().isMetaRegion();
7
// Not split-able if we find a reference store file present in the store.
8
if (hasReferences()) {
9
return null;
10
}
11
return this.storeEngine.getStoreFileManager().getSplitPoint();
12
} catch(IOException e) {
13
LOG.warn("Failed getting store size for " + this, e);
14
} finally {
15
this.lock.readLock().unlock();
16
}
17
return null;
18
}
接着进入到StoreFile的getFileSplitPoint方法内,此方法中主要是返回最大store的最大一个storefile的中间一个block的第一个key:
1
@Override
2
public final byte[] getSplitPoint() throws IOException {
3
if (this.storefiles.isEmpty()) {
4
return null;
5
}
6
return StoreUtils.getLargestFile(this.storefiles).getFileSplitPoint(this.kvComparator);
7
}
1
/**
2
* Gets the approximate mid-point of this file that is optimal for use in splitting it.
3
* @param comparator Comparator used to compare KVs.
4
* @return The split point row, or null if splitting is not possible, or reader is null.
5
*/
6
@SuppressWarnings("deprecation")
7
byte[] getFileSplitPoint(KVComparator comparator) throws IOException {
8
if (this.reader == null) {
9
LOG.warn("Storefile " + this + " Reader is null; cannot get split point");
10
return null;
11
}
12
// Get first, last, and mid keys. Midkey is the key that starts block
13
// in middle of hfile. Has column and timestamp. Need to return just
14
// the row we want to split on as midkey.
15
byte [] midkey = this.reader.midkey();
16
if (midkey != null) {
17
KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length);
18
byte [] fk = this.reader.getFirstKey();
19
KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length);
20
byte [] lk = this.reader.getLastKey();
21
KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length);
22
// if the midkey is the same as the first or last keys, we cannot (ever) split this region.
23
if (comparator.compareRows(mk, firstKey) == 0 || comparator.compareRows(mk, lastKey) == 0) {
24
if (LOG.isDebugEnabled()) {
25
LOG.debug("cannot split because midkey is the same as first or last row");
26
}
27
return null;
28
}
29
return mk.getRow();
30
}
31
return null;
32
}
HBase还规定,如果定位到的rowkey是整个文件的首个rowkey或者最后一个rowkey的话,就认为没有切分点。
什么情况下会出现没有切分点的场景呢?最常见的就是一个文件只有一个block,执行split的时候就会发现无法切分。很多新同学在测试split的时候往往都是新建一张新表,然后往新表中插入几条数据并执行一下flush,再执行split,奇迹般地发现数据表并没有真正执行切分。原因就在这里,这个时候仔细的话你翻看debug日志是可以看到这样的日志滴:
再回到之前CompactSplitThread的requestSplit方法内:
1
/*
2
* The User parameter allows the split thread to assume the correct user identity
3
*/
4
public synchronized void requestSplit(final HRegion r, byte[] midKey, User user) {
5
if (midKey == null) {
6
LOG.debug("Region " + r.getRegionNameAsString() +
7
" not splittable because midkey=null");
8
if (r.shouldForceSplit()) {
9
r.clearSplit();
10
}
11
return;
12
}
13
try {
14
//线程池中调用SplitRequest类的doSplitting方法
15
this.splits.execute(new SplitRequest(r, midKey, this.server, user));
16
if (LOG.isDebugEnabled()) {
17
LOG.debug("Split requested for " + r + ". " + this);
18
}
19
} catch (RejectedExecutionException ree) {
20
LOG.info("Could not execute split for " + r, ree);
21
}
22
}
下面进入到SplitRequest的soSplitting方法内,此方法中主要有两个阶段prepare方法和execute方法,在这之前会把midkey赋值给SplitTransaction类的splitrow变量,这个变量会在后面创建reference
files的时候用到,用于比较store:
1
private void doSplitting(User user) {
2
boolean success = false;
3
server.getMetrics().incrSplitRequest();
4
long startTime = EnvironmentEdgeManager.currentTimeMillis();
5
SplitTransaction st = new SplitTransaction(parent, midKey);
6
try {
7
//acquire a shared read lock on the table, so that table schema modifications
8
//do not happen concurrently
9
tableLock = server.getTableLockManager().readLock(parent.getTableDesc().getTableName()
10
, "SPLIT_REGION:" + parent.getRegionNameAsString());
11
try {
12
tableLock.acquire();
13
} catch (IOException ex) {
14
tableLock = null;
15
throw ex;
16
}
17
18
// If prepare does not return true, for some reason -- logged inside in
19
// the prepare call -- we are not ready to split just now. Just return.
20
if (!st.prepare()) return;
21
try {
22
st.execute(this.server, this.server, user);
23
success = true;
24
} catch (Exception e) {
25
...
26
} catch (IOException ex) {
27
LOG.error("Split failed " + this, RemoteExceptionHandler.checkIOException(ex));
28
server.checkFileSystem();
29
} finally {
30
if (this.parent.getCoprocessorHost() != null) {
31
try {
32
this.parent.getCoprocessorHost().postCompleteSplit();
33
} catch (IOException io) {
34
LOG.error("Split failed " + this,
35
RemoteExceptionHandler.checkIOException(io));
36
}
37
}
38
if (parent.shouldForceSplit()) {
39
parent.clearSplit();
40
}
41
releaseTableLock();
42
long endTime = EnvironmentEdgeManager.currentTimeMillis();
43
// Update regionserver metrics with the split transaction total running time
44
server.getMetrics().updateSplitTime(endTime - startTime);
45
if (success) {
46
...
47
}
48
}
关于SplitTransaction的prepare方法主要是用于初始化两个子region:
1
/**
2
* Does checks on split inputs.
3
* @return <code>true</code> if the region is splittable else
4
* <code>false</code> if it is not (e.g. its already closed, etc.).
5
*/
6
public boolean prepare() {
7
if (!this.parent.isSplittable()) return false;
8
// Split key can be null if this region is unsplittable; i.e. has refs.
9
if (this.splitrow == null) return false;
10
HRegionInfo hri = this.parent.getRegionInfo();
11
parent.prepareToSplit();
12
// Check splitrow.
13
byte [] startKey = hri.getStartKey();
14
byte [] endKey = hri.getEndKey();
15
if (Bytes.equals(startKey, splitrow) ||
16
!this.parent.getRegionInfo().containsRow(splitrow)) {
17
LOG.info("Split row is not inside region key range or is equal to " +
18
"startkey: " + Bytes.toStringBinary(this.splitrow));
19
return false;
20
}
21
long rid = getDaughterRegionIdTimestamp(hri);
22
this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid);
23
this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid);
24
this.journal.add(new JournalEntry(JournalEntryType.PREPARED));
25
return true;
26
}
重点在于SplitTransaction的execute方法内部,在执行完createDaughters方法,会执行stepsAfterPONR方法,这个方法内部会执行openDaughters方法,在这个方法中会调用openDaughterRegion方法,然后对子region做一些初始化操作:
1
/**
2
* Run the transaction.
3
* @param server Hosting server instance. Can be null when testing (won't try
4
* and update in zk if a null server)
5
* @param services Used to online/offline regions.
6
* @throws IOException If thrown, transaction failed.
7
* Call {@link #rollback(Server, RegionServerServices)}
8
* @return Regions created
9
* @throws IOException
10
* @see #rollback(Server, RegionServerServices)
11
*/
12
public PairOfSameType<HRegion> execute(final Server server,
13
final RegionServerServices services, User user)
14
throws IOException {
15
useZKForAssignment =
16
server == null ? true : ConfigUtil.useZKForAssignment(server.getConfiguration());
17
PairOfSameType<HRegion> regions = createDaughters(server, services, user);
18
...
19
return stepsAfterPONR(server, services, regions, user);
20
}
进入到SplitTransaction的createDaughters方法内,获取parent region的写锁:
1
/* package */PairOfSameType<HRegion> createDaughters(final Server server,
2
final RegionServerServices services, User user) throws IOException {
3
...
4
5
PairOfSameType<HRegion> daughterRegions = stepsBeforePONR(server, services, testing);
6
7
final List<Mutation> metaEntries = new ArrayList<Mutation>();
8
boolean ret = false;
9
10
...
11
return daughterRegions;
12
}
进入到SplitTransaction的stepsBeforePONR方法内,在这个方法内会通过SplitTransaction的createNodeSplitting方法标识父region在splitting,在journal中加入开始split的日志,在zookeeper上创建splitting文件夹。并且通过splitStoreFiles方法创建reference files去引用两个子region中的hfile在父region中的位置。
1
public PairOfSameType<HRegion> stepsBeforePONR(final Server server,
2
final RegionServerServices services, boolean testing) throws IOException {
3
// Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't
4
// have zookeeper so don't do zk stuff if server or zookeeper is null
5
if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
6
try {
7
createNodeSplitting(server.getZooKeeper(), parent.getRegionInfo(), server.getServerName(), hri_a, hri_b);
8
} catch (KeeperException e) {
9
throw new IOException("Failed creating PENDING_SPLIT znode on " +
10
this.parent.getRegionNameAsString(), e);
11
}
12
} else if (services != null && !useZKForAssignment) {
13
if (!services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, parent.getRegionInfo(), hri_a, hri_b)) {
14
throw new IOException("Failed to get ok from master to split " + parent.getRegionNameAsString());
15
}
16
}
17
this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING_IN_ZK));
18
if (server != null && server.getZooKeeper() != null && useZKForAssignment) {
19
// After creating the split node, wait for master to transition it
20
// from PENDING_SPLIT to SPLITTING so that we can move on. We want master
21
// knows about it and won't transition any region which is splitting.
22
znodeVersion = getZKNode(server, services);
23
}
24
25
this.parent.getRegionFileSystem().createSplitsDir();
26
this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR));
27
28
Map<byte[], List<StoreFile>> hstoreFilesToSplit = null;
29
Exception exceptionToThrow = null;
30
try{
31
// 获取到父region的所有sotre
32
hstoreFilesToSplit = this.parent.close(false);
33
} catch (Exception e) {
34
exceptionToThrow = e;
35
}
36
...
37
this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT));
38
39
// splitStoreFiles creates daughter region dirs under the parent splits dir
40
// Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will
41
// clean this up.
42
Pair<Integer, Integer> expectedReferences = splitStoreFiles(hstoreFilesToSplit);
43
44
// Log to the journal that we are creating region A, the first daughter
45
// region. We could fail halfway through. If we do, we could have left
46
// stuff in fs that needs cleanup -- a storefile or two. Thats why we
47
// add entry to journal BEFORE rather than AFTER the change.
48
this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION));
49
assertReferenceFileCount(expectedReferences.getFirst(),
50
this.parent.getRegionFileSystem().getSplitsDir(this.hri_a));
51
HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a);
52
assertReferenceFileCount(expectedReferences.getFirst(),
53
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName()));
54
55
// Ditto
56
this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION));
57
assertReferenceFileCount(expectedReferences.getSecond(), this.parent.getRegionFileSystem().getSplitsDir(this.hri_b));
58
HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b);
59
assertReferenceFileCount(expectedReferences.getSecond(),
60
new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName()));
61
62
return new PairOfSameType<HRegion>(a, b);
63
}
进入splitStoreFiles内部,其中会遍历整个hstoreFilesToSplit,这里面是父region的所有store,这个值是从上一层传过来的:
1
/**
2
* Creates reference files for top and bottom half of the
3
* @param hstoreFilesToSplit map of store files to create half file references for.
4
* @return the number of reference files that were created.
5
* @throws IOException
6
*/
7
private Pair<Integer, Integer> splitStoreFiles(final Map<byte[],
8
List<StoreFile>> hstoreFilesToSplit) throws IOException {
9
if (hstoreFilesToSplit == null) {
10
// Could be null because close didn't succeed -- for now consider it fatal
11
throw new IOException("Close returned empty list of StoreFiles");
12
}
13
// The following code sets up a thread pool executor with as many slots as
14
// there's files to split. It then fires up everything, waits for
15
// completion and finally checks for any exception
16
int nbFiles = 0;
17
for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
18
nbFiles += entry.getValue().size();
19
}
20
if (nbFiles == 0) {
21
// no file needs to be splitted.
22
return new Pair<Integer, Integer>(0,0);
23
}
24
// Default max #threads to use is the smaller of table's configured number of blocking store
25
// files or the available number of logical cores.
26
int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY,
27
HStore.DEFAULT_BLOCKING_STOREFILE_COUNT),
28
Runtime.getRuntime().availableProcessors());
29
// Max #threads is the smaller of the number of storefiles or the default max determined above.
30
int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, defMaxThreads), nbFiles);
31
LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent +
32
" using " + maxThreads + " threads");
33
ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
34
builder.setNameFormat("StoreFileSplitter-%1$d");
35
ThreadFactory factory = builder.build();
36
ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory);
37
List<Future<Pair<Path,Path>>> futures = new ArrayList<Future<Pair<Path,Path>>> (nbFiles);
38
39
// Split each store file.
40
for (Map.Entry<byte[], List<StoreFile>> entry: hstoreFilesToSplit.entrySet()) {
41
for (StoreFile sf: entry.getValue()) {
42
StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf);
43
futures.add(threadPool.submit(sfs));
44
}
45
}
46
// Shutdown the pool
47
threadPool.shutdown();
48
49
// Wait for all the tasks to finish
50
try {
51
boolean stillRunning = !threadPool.awaitTermination(this.fileSplitTimeout, TimeUnit.MILLISECONDS);
52
if (stillRunning) {
53
threadPool.shutdownNow();
54
// wait for the thread to shutdown completely.
55
while (!threadPool.isTerminated()) {
56
Thread.sleep(50);
57
}
58
throw new IOException("Took too long to split the" + " files and create the references, aborting split");
59
}
60
} catch (InterruptedException e) {
61
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
62
}
63
64
int created_a = 0;
65
int created_b = 0;
66
// Look for any exception
67
for (Future<Pair<Path, Path>> future : futures) {
68
try {
69
Pair<Path, Path> p = future.get();
70
created_a += p.getFirst() != null ? 1 : 0;
71
created_b += p.getSecond() != null ? 1 : 0;
72
} catch (InterruptedException e) {
73
throw (InterruptedIOException) new InterruptedIOException().initCause(e);
74
} catch (ExecutionException e) {
75
throw new IOException(e);
76
}
77
}
78
79
if (LOG.isDebugEnabled()) {
80
LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a
81
+ " storefiles, Daugther B: " + created_b + " storefiles.");
82
}
83
return new Pair<Integer, Integer>(created_a, created_b);
84
}
回到上一层hstoreFilesToSplit的值的来源,进入this.parent.close(false)方法,最终这个doClose方法在HRegion内部,所有的store来源于HRegion内的 stores成员变量,如下所示:
1
protected final Map<byte[], Store> stores = new ConcurrentSkipListMap<byte[], Store>(Bytes.BYTES_RAWCOMPARATOR);
1
private Map<byte[], List<StoreFile>> doClose(final boolean abort, MonitoredTask status)
2
throws IOException {
3
if (isClosed()) {
4
LOG.warn("Region " + this + " already closed");
5
return null;
6
}
7
8
if (coprocessorHost != null) {
9
status.setStatus("Running coprocessor pre-close hooks");
10
this.coprocessorHost.preClose(abort);
11
}
12
13
status.setStatus("Disabling compacts and flushes for region");
14
synchronized (writestate) {
15
// Disable compacting and flushing by background threads for this
16
// region.
17
writestate.writesEnabled = false;
18
LOG.debug("Closing " + this + ": disabling compactions & flushes");
19
waitForFlushesAndCompactions();
20
}
21
// If we were not just flushing, is it worth doing a preflush...one
22
// that will clear out of the bulk of the memstore before we put up
23
// the close flag?
24
if (!abort && worthPreFlushing()) {
25
status.setStatus("Pre-flushing region before close");
26
LOG.info("Running close preflush of " + this.getRegionNameAsString());
27
try {
28
internalFlushcache(status);
29
} catch (IOException ioe) {
30
// Failed to flush the region. Keep going.
31
status.setStatus("Failed pre-flush " + this + "; " + ioe.getMessage());
32
}
33
}
34
35
// block waiting for the lock for closing
36
lock.writeLock().lock();
37
this.closing.set(true);
38
status.setStatus("Disabling writes for close");
39
try {
40
if (this.isClosed()) {
41
status.abort("Already got closed by another process");
42
// SplitTransaction handles the null
43
return null;
44
}
45
LOG.debug("Updates disabled for region " + this);
46
// Don't flush the cache if we are aborting
47
if (!abort) {
48
int flushCount = 0;
49
while (this.getMemstoreSize().get() > 0) {
50
try {
51
if (flushCount++ > 0) {
52
int actualFlushes = flushCount - 1;
53
if (actualFlushes > 5) {
54
// If we tried 5 times and are unable to clear memory, abort
55
// so we do not lose data
56
throw new DroppedSnapshotException("Failed clearing memory after " +
57
actualFlushes + " attempts on region: " + Bytes.toStringBinary(getRegionName()));
58
}
59
LOG.info("Running extra flush, " + actualFlushes + " (carrying snapshot?) " + this);
60
}
61
internalFlushcache(status);
62
} catch (IOException ioe) {
63
status.setStatus("Failed flush " + this + ", putting online again");
64
synchronized (writestate) {
65
writestate.writesEnabled = true;
66
}
67
// Have to throw to upper layers. I can't abort server from here.
68
throw ioe;
69
}
70
}
71
}
72
73
Map<byte[], List<StoreFile>> result =
74
new TreeMap<byte[], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
75
if (!stores.isEmpty()) {
76
// initialize the thread pool for closing stores in parallel.
77
ThreadPoolExecutor storeCloserThreadPool =
78
getStoreOpenAndCloseThreadPool("StoreCloserThread-" + this.getRegionNameAsString());
79
CompletionService<Pair<byte[], Collection<StoreFile>>> completionService =
80
new ExecutorCompletionService<Pair<byte[], Collection<StoreFile>>>(storeCloserThreadPool);
81
82
// close each store in parallel
83
for (final Store store : stores.values()) {
84
long flushableSize = store.getFlushableSize();
85
if (!(abort || flushableSize == 0)) {
86
getRegionServerServices().abort("Assertion failed while closing store "
87
+ getRegionInfo().getRegionNameAsString() + " " + store
88
+ ". flushableSize expected=0, actual= " + flushableSize
89
+ ". Current memstoreSize=" + getMemstoreSize() + ". Maybe a coprocessor "
90
+ "operation failed and left the memstore in a partially updated state.", null);
91
}
92
completionService.submit(new Callable<Pair<byte[], Collection<StoreFile>>>() {
93
@Override
94
public Pair<byte[], Collection<StoreFile>> call() throws IOException {
95
return new Pair<byte[], Collection<StoreFile>>(
96
store.getFamily().getName(), store.close());
97
}
98
});
99
}
100
try {
101
for (int i = 0; i < stores.size(); i++) {
102
Future<Pair<byte[], Collection<StoreFile>>> future = completionService.take();
103
Pair<byte[], Collection<StoreFile>> storeFiles = future.get();
104
List<StoreFile> familyFiles = result.get(storeFiles.getFirst());
105
if (familyFiles == null) {
106
familyFiles = new ArrayList<StoreFile>();
107
result.put(storeFiles.getFirst(), familyFiles);
108
}
109
familyFiles.addAll(storeFiles.getSecond());
110
}
111
} catch (InterruptedException e) {
112
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
113
} catch (ExecutionException e) {
114
throw new IOException(e.getCause());
115
} finally {
116
storeCloserThreadPool.shutdownNow();
117
}
118
}
119
this.closed.set(true);
120
if (memstoreSize.get() != 0) LOG.error("Memstore size is " + memstoreSize.get());
121
if (coprocessorHost != null) {
122
status.setStatus("Running coprocessor post-close hooks");
123
this.coprocessorHost.postClose(abort);
124
}
125
if ( this.metricsRegion != null) {
126
this.metricsRegion.close();
127
}
128
if ( this.metricsRegionWrapper != null) {
129
Closeables.closeQuietly(this.metricsRegionWrapper);
130
}
131
status.markComplete("Closed");
132
LOG.info("Closed " + this);
133
return result;
134
} finally {
135
lock.writeLock().unlock();
136
}
137
}
138
sotres这个成员变量会在初始化regionStores的时候赋值,此方法在HRegion类中:
1
private long initializeRegionStores(final CancelableProgressable reporter, MonitoredTask status)
2
throws IOException, UnsupportedEncodingException {
3
// Load in all the HStores.
4
5
long maxSeqId = -1;
6
// initialized to -1 so that we pick up MemstoreTS from column families
7
long maxMemstoreTS = -1;
8
9
if (!htableDescriptor.getFamilies().isEmpty()) {
10
// initialize the thread pool for opening stores in parallel.
11
ThreadPoolExecutor storeOpenerThreadPool =
12
getStoreOpenAndCloseThreadPool("StoreOpener-" + this.getRegionInfo().getShortNameToLog());
13
CompletionService<HStore> completionService =
14
new ExecutorCompletionService<HStore>(storeOpenerThreadPool);
15
16
// initialize each store in parallel
17
for (final HColumnDescriptor family : htableDescriptor.getFamilies()) {
18
status.setStatus("Instantiating store for column family " + family);
19
completionService.submit(new Callable<HStore>() {
20
@Override
21
public HStore call() throws IOException {
22
return instantiateHStore(family);
23
}
24
});
25
}
26
boolean allStoresOpened = false;
27
try {
28
for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
29
Future<HStore> future = completionService.take();
30
HStore store = future.get();
31
this.stores.put(store.getColumnFamilyName().getBytes(), store);
32
33
long storeMaxSequenceId = store.getMaxSequenceId();
34
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
35
storeMaxSequenceId);
36
if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
37
maxSeqId = storeMaxSequenceId;
38
}
39
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
40
if (maxStoreMemstoreTS > maxMemstoreTS) {
41
maxMemstoreTS = maxStoreMemstoreTS;
42
}
43
}
44
allStoresOpened = true;
45
} catch (InterruptedException e) {
46
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
47
} catch (ExecutionException e) {
48
throw new IOException(e.getCause());
49
} finally {
50
storeOpenerThreadPool.shutdownNow();
51
if (!allStoresOpened) {
52
// something went wrong, close all opened stores
53
LOG.error("Could not initialize all stores for the region=" + this);
54
for (Store store : this.stores.values()) {
55
try {
56
store.close();
57
} catch (IOException e) {
58
LOG.warn(e.getMessage());
59
}
60
}
61
}
62
}
63
}
64
mvcc.initialize(maxMemstoreTS + 1);
65
// Recover any edits if available.
66
maxSeqId = Math.max(maxSeqId, replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
67
return maxSeqId;
68
}
上面就是parent.close()的获取stores的流程,下面进入到splitStoreFile内部,查看如何创建reference files的:
1
private Pair<Path, Path> splitStoreFile(final byte[] family, final StoreFile sf)
2
throws IOException {
3
if (LOG.isDebugEnabled()) {
4
LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " +
5
this.parent);
6
}
7
HRegionFileSystem fs = this.parent.getRegionFileSystem();
8
String familyName = Bytes.toString(family);
9
Path path_a = fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false,
10
this.parent.getSplitPolicy());
11
Path path_b = fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true,
12
this.parent.getSplitPolicy());
13
if (LOG.isDebugEnabled()) {
14
LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " +
15
this.parent);
16
}
17
return new Pair<Path,Path>(path_a, path_b);
18
}
在下面的方法中,如果top为true,则splitKey会与storefile中的firstKey做比较,如果大于firstKey则会为第一个子region添加reference
file,如果top为false,则splitKey会与storefile中的lastKey做比较,如果小于lastKey则会为第二个子region添加reference
file,大部分hifle会分别在a,b中生成一个reference file。
1
/**
2
* Write out a split reference. Package local so it doesnt leak out of
3
* regionserver.
4
* @param hri {@link HRegionInfo} of the destination
5
* @param familyName Column Family Name
6
* @param f File to split.
7
* @param splitRow Split Row
8
* @param top True if we are referring to the top half of the hfile.
9
* @param splitPolicy
10
* @return Path to created reference.
11
* @throws IOException
12
*/
13
Path splitStoreFile(final HRegionInfo hri, final String familyName, final StoreFile f,
14
final byte[] splitRow, final boolean top, RegionSplitPolicy splitPolicy) throws IOException {
15
16
if (splitPolicy == null || !splitPolicy.skipStoreFileRangeCheck(familyName)) {
17
// Check whether the split row lies in the range of the store file
18
// If it is outside the range, return directly.
19
try {
20
if (top) {
21
//check if larger than last key.
22
KeyValue splitKey = KeyValue.createFirstOnRow(splitRow);
23
byte[] lastKey = f.createReader().getLastKey();
24
// If lastKey is null means storefile is empty.
25
if (lastKey == null) return null;
26
if (f.getReader().getComparator().compareFlatKey(splitKey.getBuffer(),
27
splitKey.getKeyOffset(), splitKey.getKeyLength(), lastKey, 0, lastKey.length) > 0) {
28
return null;
29
}
30
} else {
31
//check if smaller than first key
32
KeyValue splitKey = KeyValue.createLastOnRow(splitRow);
33
byte[] firstKey = f.createReader().getFirstKey();
34
// If firstKey is null means storefile is empty.
35
if (firstKey == null) return null;
36
if (f.getReader().getComparator().compareFlatKey(splitKey.getBuffer(),
37
splitKey.getKeyOffset(), splitKey.getKeyLength(), firstKey, 0, firstKey.length) < 0) {
38
return null;
39
}
40
}
41
} finally {
42
f.closeReader(f.getCacheConf() != null ? f.getCacheConf().shouldEvictOnClose() : true);
43
}
44
}
45
46
Path splitDir = new Path(getSplitsDir(hri), familyName);
47
// A reference to the bottom half of the hsf store file.
48
Reference r =
49
top ? Reference.createTopReference(splitRow): Reference.createBottomReference(splitRow);
50
// Add the referred-to regions name as a dot separated suffix.
51
// See REF_NAME_REGEX regex above. The referred-to regions name is
52
// up in the path of the passed in <code>f</code> -- parentdir is family,
53
// then the directory above is the region name.
54
String parentRegionName = regionInfo.getEncodedName();
55
// Write reference with same file id only with the other region name as
56
// suffix and into the new region location (under same family).
57
Path p = new Path(splitDir, f.getPath().getName() + "." + parentRegionName);
58
return r.write(fs, p);
59
}
下面再进入createDaughterRegionFromSplits方法内部,会将reference files移到相应的子region目录下去,然后创建子region实例并返回:
1
/**
2
* Create a daughter region from given a temp directory with the region data.
3
* @param hri Spec. for daughter region to open.
4
* @throws IOException
5
*/
6
HRegion createDaughterRegionFromSplits(final HRegionInfo hri) throws IOException {
7
// Move the files from the temporary .splits to the final /table/region directory
8
fs.commitDaughterRegion(hri);
9
10
// Create the daughter HRegion instance
11
HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getLog(), fs.getFileSystem(),
12
this.getBaseConf(), hri, this.getTableDesc(), rsServices);
13
r.readRequestsCount.set(this.getReadRequestsCount() / 2);
14
r.writeRequestsCount.set(this.getWriteRequestsCount() / 2);
15
return r;
16
}
以上便是split一个region的主要过程,整个流程图如下:
相关推荐
- Java集合源码分析之基础(四):二叉排序树
- gobblin 源码分析
- Java集合源码分析之基础(三):树与二叉树
- 分布式追踪 SkyWalking 源码分析一 Agent初始化
- 分布式追踪 SkyWalking 源码分析六 Collector 接收和发送 trace 数据
- 分析TCP头的结构,并分析TCP的三次握手过程
- 分布式追踪 SkyWalking 源码分析七 agent和byteBuddy 原理
- Java 源码分析(一)集合
- ****-sharding-jdbc架构介绍与源码分析-Java
- Nginx源码分析-进程管理之master进程
- 用crash工具分析内核死锁问题
- 一条SQL更新语句是如何执行的?