hbase-mvcc principle
Part I---mvcc concepts
in most of databases,the acid symentics are ganranteed ,also,like hbase.but the formers are based on the whole db object,and hbase is based on row(so called hbase provides row-based transactions). in hbase,the acid symentics are addressed by:
atomicity:all cell operations belong a transaction either completely finished or failed
cosistency:named as integrity,that means the state from hbase is tranformed from one valid state to the other one.(eg.in a bank's transfer oper,the total amount is constant during withdrawing and posting;a row will not disappear during an update etc)
isolation:any transaction has it's own data space ,ie.doesn't interfere others when processsing.there are two ioslation levels in hbase's scan opers:read_committed and read_uncommitted.
all three symentics above in general are achieved by 'Edit-Log'.
durability:a resulting data from a success transaction must be persistent.ie.will be re-read by later readers before any transactions.(usually use Lock to implement)
MVCC(multiversion concurrent/consistency control) is a method used for concurrent controls.so you can think as:
multi verion-is a mean to achieve the goal-concurrent control
there is a class named MultiVersionConsistencyControl.java in hbase to achieve this ,and a related class IsolationLeve.java also.in fact ,the MultiVersionConsistencyControl.java is used to:
1.implement the symentic Isolation(reads on wrting) on the same component or accross some ones(eg wal and memstore etc)
2.avoid read waitings when writing by unblocking the reads (which will be blocked if use Lock to do)
--without a isolation,the readings will conflict with writings like below:
Part II---hbase's mvcc implemention
here is the structure of MultiVersionConsistencyControl.java velow:
class MultiVersionConsistencyControl{
int memstoreRead; //mark the current readable max write number(seq no)
int memsotreWrite; //current the max seq no
Object readWaiters; //lock to check whether a entry is readable
linkedlist writeQueue; //fifo list used to collaborate read-write consistency
.......
//advacne the write number and keep it
public WriteEntry beginMemstoreInsert(){...}
//the transaction is complete,wait to this entry to be readable;
//it contains both advanceMemstore() and waitForRead() invokings.
void completeMemstoreInsert(entry){...}
//remove the leading completed entries and update the memstoreRead
boolean advanceMemstore(e){...}
//wait to check wthether this entry is finished.
void waitForRead(e){...}
......
}
actually,there is only one mvcc instance accross all the hbase project,which is placed in HRegion.mvcc.
------there is a Puts/Deletes process model :
1.start a read lock by this.lock
//then append mutations to wal
a.spawn a read lock by updateLock
b. beginMemstoreInsert()
c.add Puts to memstore
d.append to wal without sync
e.release the updateLock's read lock which placed in a
f.sync wal(use flushLock to synchronize )
g.completeMemstoreInsert()
2.calculate the global flush.size(but here is bug in addAndGetGlobalMemstoreSize()
3.release the read lock that placed in step 1.
4.flush the memstore if this mutations's size is bigger than the newly added size
a.add a FlushRegionEntry to flushQueue,so the MemStoreFlusher will check it periodly
//below is the flush-memstore actually.
1) entry flushcache()
2) start read lock by this.lock
3) start write lock by tihs.updateLock
4) beginMemstoreInsert() then advanceMemstore() //mark this write number
5) generate a new log seq num by mvcc instance
6) takes snapshts for all stores
7) release write lock
8) mvcc.waitForRead() //wait all global write entries less then current one to finish.so all the edits(Puts) will sync to wal before we can flsuh the memstore,so the data in hfile will be consistency with wal.
b.flush to memstore to hfile
c.append FLSUHCACHE-COMPLETE to log/wal
d.finally,release the read lock
ReadWriteLock model:
precondition | later reads | later writes |
read lock | o | x |
write lock | x | x |
so with this Readwritelock,we can avoid the delay by common Lock object:by using a read-read mode concurrently.
through this model with mvcc,we know :
1.a abstract model:lock.readlock{ updateLock.read/write.lock{ memstore related logics....} }
2.block adding kvs to memstore(in step 1.c) when flushing( make snapshots);but add kvs to memstore simultaneously is not mutex.
how to implement MultiVersion?
1.use mvcc to a versions keeper
2.data versions in memstore. todo
Part III---Practice in hbase
i have changed some codes to demonstrate how mvcc operates below:
---MultiVersionConsistencyControl.java
boolean advanceMemstore(WriteEntry e) { //-return true if this specified entry is readable
synchronized (writeQueue) {
e.markCompleted(); //-mark current e in queue first,then below maybe check it;
long nextReadValue = -1;
boolean ranOnce=false; //-same write entry will iterate this loop many times below:
while (!writeQueue.isEmpty()) { //-iterate all and remove elements until found an uncompleted one
ranOnce=true;
WriteEntry queueFirst = writeQueue.getFirst();
if (nextReadValue > 0) {
if (nextReadValue+1 != queueFirst.getWriteNumber()) {
throw new RuntimeException("invariant in completeMemstoreInsert violated, prev: "
+ nextReadValue + " next: " + queueFirst.getWriteNumber());
}
}
System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-last w id " + writeQueue.getLast().writeNumber +"first " + queueFirst.writeNumber + ",size " + writeQueue.size()); //TODO
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber(); //-let the readers seen this version
System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-*complete w id " + queueFirst.getWriteNumber() + ",passed id " + e.getWriteNumber()); //TODO
writeQueue.removeFirst(); //remove first element now
} else {
System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-queue size " + writeQueue.size() + ",passed id " + e.getWriteNumber() +
",get first id " + queueFirst.getWriteNumber() ); //TODO test
break;
}
}//while
if (!ranOnce) { //-writequeue is empty
throw new RuntimeException("never was a first");
}
if (nextReadValue > 0) {
synchronized (readWaiters) {
memstoreRead = nextReadValue; //-advance the current readable version if found at least one write entry is complete
readWaiters.notifyAll(); //-see waitForRead(x)
}
}
if (memstoreRead >= e.getWriteNumber()) {
return true;
}
return false;
}//sync writeQueue
}
----TestMultiVersionConsistencyControl.java
static class Writer implements Runnable {
final AtomicBoolean finished;
final MultiVersionConsistencyControl mvcc;
final AtomicBoolean status;
Writer(AtomicBoolean finished, MultiVersionConsistencyControl mvcc, AtomicBoolean status) {
this.finished = finished;
this.mvcc = mvcc;
this.status = status;
}
private Random rnd = new Random();
public boolean failed = false;
public void run() {
while (!finished.get()) {
MultiVersionConsistencyControl.WriteEntry e = mvcc.beginMemstoreInsert();
// System.out.println("Begin write: " + e.getWriteNumber());
// 10 usec - 500usec (including 0)
int sleepTime = rnd.nextInt(500);
// 500 * 1000 = 500,000ns = 500 usec
// 1 * 100 = 100ns = 1usec
try {
if (sleepTime > 0) Thread.sleep(0, sleepTime * 1000);
} catch (InterruptedException e1) {
}
try {
mvcc.completeMemstoreInsert(e);
System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-done insert");
} catch (RuntimeException ex) {
// got failure
System.out.println(ex.toString());
ex.printStackTrace();
status.set(false);
return;
// Report failure if possible.
}
}
}
}
public void testParallelism() throws Exception {
final MultiVersionConsistencyControl mvcc = new MultiVersionConsistencyControl();
final AtomicBoolean finished = new AtomicBoolean(false);
// fail flag for the reader thread
final AtomicBoolean readerFailed = new AtomicBoolean(false);
final AtomicLong failedAt = new AtomicLong();
Runnable reader = new Runnable() {
public void run() { //loop run to check readpoint consistent
long prev = mvcc.memstoreReadPoint();
while (!finished.get()) {
long newPrev = mvcc.memstoreReadPoint(); //-as here is wanted to read the point as quickly as possible,so no wait/sleep exists
System.out.println(Thread.currentThread().getId() + "-" + System.nanoTime() + "-cur read point " + newPrev );
if (newPrev < prev) {
// serious problem.
System.out.println("Reader got out of order, prev: " + prev + " next was: " + newPrev);
readerFailed.set(true);
// might as well give up
failedAt.set(newPrev);
return;
}
}
}
};
// writer thread parallelism.
int n = 20;
n = 3;//-
Thread[] writers = new Thread[n];
AtomicBoolean[] statuses = new AtomicBoolean[n];
Thread readThread = new Thread(reader);
for (int i = 0; i < n; ++i) {
statuses[i] = new AtomicBoolean(true);
writers[i] = new Thread(new Writer(finished, mvcc, statuses[i]));
writers[i].start();
}
readThread.start();
try {
int sec = 10 * 1000;
sec = 20;//-
Thread.sleep(sec );
} catch (InterruptedException ex) {
}
finished.set(true); //-flat all threads to be complete
readThread.join(); //-wait to complete this thread,same like below
for (int i = 0; i < n; ++i) {
writers[i].join();
}
// check failure.
assertFalse(readerFailed.get());
for (int i = 0; i < n; ++i) {
assertTrue(statuses[i].get()); //-not change this status by writers
}
}
and some logs output will be like as below:
9-1408679151040267000-cur read point 0
9-1408679151040587000-cur read point 0
9-1408679151040637000-cur read point 0
9-1408679151040683000-cur read point 0
10-1408679151040686000-last w id 3first 1,size 3
10-1408679151040821000-*complete w id 1,passed id 1
10-1408679151040889000-last w id 3first 2,size 2
10-1408679151040937000-queue size 2,passed id 1,get first id 2
9-1408679151040736000-cur read point 0
11-1408679151041070000-last w id 3first 2,size 2
11-1408679151041201000-*complete w id 2,passed id 2
11-1408679151041256000-last w id 3first 3,size 1
11-1408679151041303000-queue size 1,passed id 2,get first id 3
11-1408679151041394000-done insert
12-1408679151041447000-last w id 3first 3,size 1
12-1408679151041512000-*complete w id 3,passed id 3
12-1408679151041603000-done insert
10-1408679151041018000-done insert
9-1408679151041142000-cur read point 1
9-1408679151041753000-cur read point 3
….
9-1408679151045943000-cur read point 3
12-1408679151045954000-last w id 6first 4,size 3
12-1408679151046069000-*complete w id 4,passed id 4
12-1408679151046141000-last w id 6first 5,size 2
12-1408679151046280000-queue size 2,passed id 4,get first id 5
12-1408679151046352000-done insert
9-1408679151045983000-cur read point 3
9-1408679151046515000-cur read point 4
9-1408679151046570000-cur read point 4
….
9-1408679151048488000-cur read point 4
9-1408679151048543000-cur read point 4
10-1408679151046449000-last w id 7first 5,size 3
10-1408679151048692000-queue size 3,passed id 6,get first id 5
9-1408679151048598000-cur read point 4
9-1408679151048818000-cur read point 4
9-1408679151048856000-cur read point 4
….
9-1408679151050356000-cur read point 4
9-1408679151050648000-cur read point 4
9-1408679151050800000-cur read point 4
11-1408679151049531000-last w id 7first 5,size 3
11-1408679151050973000-*complete w id 5,passed id 5
11-1408679151051028000-last w id 7first 6,size 2
11-1408679151051090000-*complete w id 6,passed id 5
11-1408679151051142000-last w id 7first 7,size 1
11-1408679151051197000-queue size 1,passed id 5,get first id 7
9-1408679151050924000-cur read point 4
12-1408679151051306000-last w id 7first 7,size 1
10-1408679151051286000-done insert
11-1408679151051263000-done insert
12-1408679151051393000-*complete w id 7,passed id 7
12-1408679151051524000-done insert
9-1408679151051346000-cur read point 6
9-1408679151051638000-cur read point 7
9-1408679151051710000-cur read point 7
…
9-1408679151057924000-cur read point 7
9-1408679151057971000-cur read point 7
11-1408679151052653000-last w id 10first 8,size 3
9-1408679151058023000-cur read point 7
9-1408679151058150000-cur read point 7
9-1408679151058190000-cur read point 7
9-1408679151058234000-cur read point 7
9-1408679151058272000-cur read point 7
9-1408679151058333000-cur read point 7
9-1408679151058387000-cur read point 7
9-1408679151058442000-cur read point 7
9-1408679151058500000-cur read point 7
11-1408679151058085000-queue size 3,passed id 9,get first id 8
9-1408679151058560000-cur read point 7
9-1408679151058760000-cur read point 7
9-1408679151058817000-cur read point 7
9-1408679151058867000-cur read point 7
9-1408679151058916000-cur read point 7
……
9-1408679151060011000-cur read point 7
10-1408679151058692000-last w id 10first 8,size 3
10-1408679151060162000-queue size 3,passed id 10,get first id 8
9-1408679151060094000-cur read point 7
9-1408679151060317000-cur read point 7
9-1408679151060373000-cur read point 7
9-1408679151060428000-cur read point 7
9-1408679151060482000-cur read point 7
9-1408679151060536000-cur read point 7
9-1408679151060590000-cur read point 7
12-1408679151060256000-last w id 10first 8,size 3
12-1408679151060769000-*complete w id 8,passed id 8
12-1408679151060837000-last w id 10first 9,size 2
12-1408679151060896000-*complete w id 9,passed id 8
12-1408679151060953000-last w id 10first 10,size 1
12-1408679151061008000-*complete w id 10,passed id 8
12-1408679151061069000-done insert
10-1408679151061164000-done insert
9-1408679151060653000-cur read point 7
11-1408679151061233000-done insert
so from this logs ,we can get some conclusions:
1.the read point is always increased monotonously;--read recent newest data
2.the max memstoreRead is readable only when the write entry is done,this means the reads is read_committed--isolation
3.the writeQueue in mvcc is sized by the callers -write threads
4.reads or writes or among them ,the operations are not guaranteed to be sequential-concurrent
ref:
2。hbase CRUD--Read(query) operations(server side)
Apache HBase Internals: Locking and Multiversion Concurrency Control
http://hbase.apache.org/acid-semantics.html
数据库事务必须具备的ACID特性
http://en.wikipedia.org/wiki/Multiversion_concurrency_control
http://dev.mysql.com/doc/refman/5.1/en/innodb-multi-versioning.html
http://en.wikipedia.org/wiki/Concurrency_control