Android Media Player 框架分析-AHandler AMessage ALooper
在之前一篇中简单的介绍了一下setDataSource的流程,其中遇到了一个新的消息机制AHandler,其实这东西本来不需要介绍,因为消息机制原本就是一个很成熟和常见的技术技巧,这玩意里面包含了计算机哲学和计算机玄学的双学位问题,听起来牛逼轰轰,其实也就那回事了。为了这次文档的完整性,再一个后面可能要整理到公司的文档库中,所以在此介绍一下,熟悉的同学直接飘过了。
先说说为什么要用消息机制,在我们的软件开发中经常会遇到多线程的问题,由此便带来了线程同步的问题,比如各种锁,最常见的就是Mutex,而线程同步通常是有代价的,这个代价一方面来自于锁本身,而另一方面来自于程序设计,在一个有众多线程的进程中使用同步锁常常造成各种死锁问题,当然你可以说你水平高超没有这个问题,而通常解决锁之间的竞争往往在效率上也要付出相应的代价,当然了你也可以说你连这也能解决,那我也只能膜拜了。然而通常还有另一个问题,便是线程间的执行顺序问题,如果使用锁和条件变量来控制,在线程众多的情况下,可控性只会越来越差,另外通常来讲CPU的利用率也不高。那么消息机制是如何来解决这些问题的呢?
第一个问题是什么是消息机制,这里先介绍一个同步/异步的的概念,举例来说明,
比如说肚子饿了去饭店吃饭,点好菜后你坐那老老实实等着服务员上菜,什么时候菜做了好你才能结束等待,那这就是同步。
而异步就不用干等了,掏出手机,打开外卖软件,下单,OK,这时你就可以去干别的事情了,应该没有人死盯着手机干等吧,到时候饭店做好饭,小哥自然送上门。
听起来异步比同步是先进很多了,但在现实中事情往往没有想象的那么容易,首先你用外卖软件,如果我是老板,餐厅已经爆满了,顾客在饭店里已经骂娘了,哪还顾得上你,第二你点完得等送餐吧,又多了一段路程,而同步也有自己的缺点,坐在餐馆等饭的时间对于你而言算是浪费了,比如你还有其他事情没做此时也只能乖乖干坐着了。
好吧,扯太远了,回到消息机制上来。
当我们想要我们的程序执行一些事情的时候要么同步,要么异步,而消息机制便是一种异步的方式,首先创建一个消息队列,这个消息队列在一个单独的线程中轮询,一旦有人发送消息给它,它就会将消息取出执行,执行后又回到等待状态直到有新的消息到来。
如下图:
消息被投递到一个消息队列中去,而Looper会持续不断的从Message Queue中拿出消息,再投递给合适的Handler去处理。
好了,开始对Android的这个消息队列进行分析吧,依旧从使用上入手,通常在使用的开始会创建一个ALooper对象,然后会实现一个AHandler的子类,最后创建消息进行投递进入Message Queue。
首先我们先看一下AHandler的定义。代码不多直接贴上来。
- namespace android {
- struct AMessage;
- struct AHandler : public RefBase {
- AHandler()
- : mID(0),
- mVerboseStats(false),
- mMessageCounter(0) {
- }
- ALooper::handler_id id() const {
- return mID;
- }
- sp<ALooper> looper() const {
- return mLooper.promote();
- }
- wp<ALooper> getLooper() const {
- return mLooper;
- }
- wp<AHandler> getHandler() const {
- // allow getting a weak reference to a const handler
- return const_cast<AHandler *>(this);
- }
- protected:
- virtual void onMessageReceived(const sp<AMessage> &msg) = 0;
- private:
- friend struct AMessage; // deliverMessage()
- friend struct ALooperRoster; // setID()
- ALooper::handler_id mID;
- wp<ALooper> mLooper;
- inline void setID(ALooper::handler_id id, wp<ALooper> looper) {
- mID = id;
- mLooper = looper;
- }
- bool mVerboseStats;
- uint32_t mMessageCounter;
- KeyedVector<uint32_t, uint32_t> mMessages;
- void deliverMessage(const sp<AMessage> &msg);
- DISALLOW_EVIL_CONSTRUCTORS(AHandler);
- };
- } // namespace android
cpp文件:
- namespace android {
- void AHandler::deliverMessage(const sp<AMessage> &msg) {
- onMessageReceived(msg);
- mMessageCounter++;
- if (mVerboseStats) {
- uint32_t what = msg->what();
- ssize_t idx = mMessages.indexOfKey(what);
- if (idx < 0) {
- mMessages.add(what, 1);
- } else {
- mMessages.editValueAt(idx)++;
- }
- }
- }
- } // namespace android
由于该类为基类,在使用时需要自己定义子类去继承重新类中的方法,所先以该类的Cpp文件相对比较简单,只定义了一个deliverMessage方法,
先看看.h文件中该类的构成,其中有一个容易混淆的成员mMessages被定义为KeyedVector类型,该并不是真正的Message Queue,相反该对象并没有什么太大的实际作用,只有在verbose模式下才存储一个消息被传递的次数,对我们分析没什么帮助。剩下的就是一个id值,和一个ALooper的弱引用。刚刚我们已经说过了,Looper负责将消息源源不断的发送给Handler来处理,所以两者是必然有关联的。另外还有一个mVerboseStats的bool值,这玩意实际上没啥用,先不看了。之后看看一个重要的纯虚函数onMessageReceived,我们定义的子类最重要的就是实现这个方法。之后会通过调用把Message发给这个函数,而这个函数会根据这个Message中的what值来判断需要进行什么操作,习惯用法是通过一个switch来处理。
- namespace android {
- struct AHandler;
- struct AMessage;
- struct AReplyToken;
- struct ALooper : public RefBase {
- typedef int32_t event_id;
- typedef int32_t handler_id;
- ALooper();
- // Takes effect in a subsequent call to start().
- void setName(const char *name);
- handler_id registerHandler(const sp<AHandler> &handler);
- void unregisterHandler(handler_id handlerID);
- status_t start(
- bool runOnCallingThread = false,
- bool canCallJava = false,
- int32_t priority = PRIORITY_DEFAULT
- );
- status_t stop();
- static int64_t GetNowUs();
- const char *getName() const {
- return mName.c_str();
- }
- protected:
- virtual ~ALooper();
- private:
- friend struct AMessage; // post()
- struct Event {
- int64_t mWhenUs;
- sp<AMessage> mMessage;
- };
- Mutex mLock;
- Condition mQueueChangedCondition;
- AString mName;
- List<Event> mEventQueue;
- struct LooperThread;
- sp<LooperThread> mThread;
- bool mRunningLocally;
- // use a separate lock for reply handling, as it is always on another thread
- // use a central lock, however, to avoid creating a mutex for each reply
- Mutex mRepliesLock;
- Condition mRepliesCondition;
- // START --- methods used only by AMessage
- // posts a message on this looper with the given timeout
- void post(const sp<AMessage> &msg, int64_t delayUs);
- // creates a reply token to be used with this looper
- sp<AReplyToken> createReplyToken();
- // waits for a response for the reply token. If status is OK, the response
- // is stored into the supplied variable. Otherwise, it is unchanged.
- status_t awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response);
- // posts a reply for a reply token. If the reply could be successfully posted,
- // it returns OK. Otherwise, it returns an error value.
- status_t postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &msg);
- // END --- methods used only by AMessage
- bool loop();
- DISALLOW_EVIL_CONSTRUCTORS(ALooper);
- };
- } // namespace android
- namespace android {
- ALooperRoster gLooperRoster;
- struct ALooper::LooperThread : public Thread {
- LooperThread(ALooper *looper, bool canCallJava)
- : Thread(canCallJava),
- mLooper(looper),
- mThreadId(NULL) {
- }
- virtual status_t readyToRun() {
- mThreadId = androidGetThreadId();
- return Thread::readyToRun();
- }
- virtual bool threadLoop() {
- return mLooper->loop();
- }
- bool isCurrentThread() const {
- return mThreadId == androidGetThreadId();
- }
- protected:
- virtual ~LooperThread() {}
- private:
- ALooper *mLooper;
- android_thread_id_t mThreadId;
- DISALLOW_EVIL_CONSTRUCTORS(LooperThread);
- };
- // static
- int64_t ALooper::GetNowUs() {
- return systemTime(SYSTEM_TIME_MONOTONIC) / 1000ll;
- }
- ALooper::ALooper()
- : mRunningLocally(false) {
- // clean up stale AHandlers. Doing it here instead of in the destructor avoids
- // the side effect of objects being deleted from the unregister function recursively.
- gLooperRoster.unregisterStaleHandlers();
- }
- ALooper::~ALooper() {
- stop();
- // stale AHandlers are now cleaned up in the constructor of the next ALooper to come along
- }
- void ALooper::setName(const char *name) {
- mName = name;
- }
- ALooper::handler_id ALooper::registerHandler(const sp<AHandler> &handler) {
- return gLooperRoster.registerHandler(this, handler);
- }
- void ALooper::unregisterHandler(handler_id handlerID) {
- gLooperRoster.unregisterHandler(handlerID);
- }
- status_t ALooper::start(
- bool runOnCallingThread, bool canCallJava, int32_t priority) {
- if (runOnCallingThread) {
- {
- Mutex::Autolock autoLock(mLock);
- if (mThread != NULL || mRunningLocally) {
- return INVALID_OPERATION;
- }
- mRunningLocally = true;
- }
- do {
- } while (loop());
- return OK;
- }
- Mutex::Autolock autoLock(mLock);
- if (mThread != NULL || mRunningLocally) {
- return INVALID_OPERATION;
- }
- mThread = new LooperThread(this, canCallJava);
- status_t err = mThread->run(
- mName.empty() ? "ALooper" : mName.c_str(), priority);
- if (err != OK) {
- mThread.clear();
- }
- return err;
- }
- status_t ALooper::stop() {
- sp<LooperThread> thread;
- bool runningLocally;
- {
- Mutex::Autolock autoLock(mLock);
- thread = mThread;
- runningLocally = mRunningLocally;
- mThread.clear();
- mRunningLocally = false;
- }
- if (thread == NULL && !runningLocally) {
- return INVALID_OPERATION;
- }
- if (thread != NULL) {
- thread->requestExit();
- }
- mQueueChangedCondition.signal();
- {
- Mutex::Autolock autoLock(mRepliesLock);
- mRepliesCondition.broadcast();
- }
- if (!runningLocally && !thread->isCurrentThread()) {
- // If not running locally and this thread _is_ the looper thread,
- // the loop() function will return and never be called again.
- thread->requestExitAndWait();
- }
- return OK;
- }
- void ALooper::post(const sp<AMessage> &msg, int64_t delayUs) {
- Mutex::Autolock autoLock(mLock);
- int64_t whenUs;
- if (delayUs > 0) {
- whenUs = GetNowUs() + delayUs;
- } else {
- whenUs = GetNowUs();
- }
- List<Event>::iterator it = mEventQueue.begin();
- while (it != mEventQueue.end() && (*it).mWhenUs <= whenUs) {
- ++it;
- }
- Event event;
- event.mWhenUs = whenUs;
- event.mMessage = msg;
- if (it == mEventQueue.begin()) {
- mQueueChangedCondition.signal();
- }
- mEventQueue.insert(it, event);
- }
- bool ALooper::loop() {
- Event event;
- {
- Mutex::Autolock autoLock(mLock);
- if (mThread == NULL && !mRunningLocally) {
- return false;
- }
- if (mEventQueue.empty()) {
- mQueueChangedCondition.wait(mLock);
- return true;
- }
- int64_t whenUs = (*mEventQueue.begin()).mWhenUs;
- int64_t nowUs = GetNowUs();
- if (whenUs > nowUs) {
- int64_t delayUs = whenUs - nowUs;
- mQueueChangedCondition.waitRelative(mLock, delayUs * 1000ll);
- return true;
- }
- event = *mEventQueue.begin();
- mEventQueue.erase(mEventQueue.begin());
- }
- event.mMessage->deliver();
- // NOTE: It's important to note that at this point our "ALooper" object
- // may no longer exist (its final reference may have gone away while
- // delivering the message). We have made sure, however, that loop()
- // won't be called again.
- return true;
- }
- // to be called by AMessage::postAndAwaitResponse only
- sp<AReplyToken> ALooper::createReplyToken() {
- return new AReplyToken(this);
- }
- // to be called by AMessage::postAndAwaitResponse only
- status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {
- // return status in case we want to handle an interrupted wait
- Mutex::Autolock autoLock(mRepliesLock);
- CHECK(replyToken != NULL);
- while (!replyToken->retrieveReply(response)) {
- {
- Mutex::Autolock autoLock(mLock);
- if (mThread == NULL) {
- return -ENOENT;
- }
- }
- mRepliesCondition.wait(mRepliesLock);
- }
- return OK;
- }
- status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {
- Mutex::Autolock autoLock(mRepliesLock);
- status_t err = replyToken->setReply(reply);
- if (err == OK) {
- mRepliesCondition.broadcast();
- }
- return err;
- }
- } // namespace android
- Mutex mLock;
- Condition mQueueChangedCondition;
- Mutex mRepliesLock;
- Condition mRepliesCondition;
- struct Item {
- union {
- int32_t int32Value;
- int64_t int64Value;
- size_t sizeValue;
- float floatValue;
- double doubleValue;
- void *ptrValue;
- RefBase *refValue;
- AString *stringValue;
- Rect rectValue;
- } u;
- const char *mName;
- size_t mNameLength;
- Type mType;
- void setName(const char *name, size_t len);
- };
这个类型所定义的就是一个key-value,被命名为Item, 其中使用联合体这段空间来存储value,使用mName来作为key。mNameLength来存储key值得长度,这个长度在这里还挺重要的,后面我们再看,mType是一个枚举值,保存了所存数据是什么类型的,枚举体定义如下:
- enum Type {
- kTypeInt32,
- kTypeInt64,
- kTypeSize,
- kTypeFloat,
- kTypeDouble,
- kTypePointer,
- kTypeString,
- kTypeObject,
- kTypeMessage,
- kTypeRect,
- kTypeBuffer,
- };
- wp<AHandler> mHandler;
- wp<ALooper> mLooper;
- uint32_t mWhat;
- AMessage();
- AMessage(uint32_t what, const sp<const AHandler> &handler);
- AMessage::AMessage(void)
- : mWhat(0),
- mTarget(0),
- mNumItems(0) {
- }
- AMessage::AMessage(uint32_t what, const sp<const AHandler> &handler)
- : mWhat(what),
- mNumItems(0) {
- setTarget(handler);
- }
- void AMessage::setTarget(const sp<const AHandler> &handler) {
- if (handler == NULL) {
- mTarget = 0;
- mHandler.clear();
- mLooper.clear();
- } else {
- mTarget = handler->id();
- mHandler = handler->getHandler();
- mLooper = handler->getLooper();
- }
- }
- status_t AMessage::post(int64_t delayUs) {
- sp<ALooper> looper = mLooper.promote();
- if (looper == NULL) {
- ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);
- return -ENOENT;
- }
- looper->post(this, delayUs);
- return OK;
- }
- void AMessage::deliver() {
- sp<AHandler> handler = mHandler.promote();
- if (handler == NULL) {
- ALOGW("failed to deliver message as target handler %d is gone.", mTarget);
- return;
- }
- handler->deliverMessage(this);
- }
- struct AReplyToken : public RefBase {
- AReplyToken(const sp<ALooper> &looper)
- : mLooper(looper),
- mReplied(false) {
- }
- private:
- friend struct AMessage;
- friend struct ALooper;
- wp<ALooper> mLooper;
- sp<AMessage> mReply;
- bool mReplied;
- sp<ALooper> getLooper() const {
- return mLooper.promote();
- }
- // if reply is not set, returns false; otherwise, it retrieves the reply and returns true
- bool retrieveReply(sp<AMessage> *reply) {
- if (mReplied) {
- *reply = mReply;
- mReply.clear();
- }
- return mReplied;
- }
- // sets the reply for this token. returns OK or error
- status_t setReply(const sp<AMessage> &reply);
- };
我们还是从源头上来看,这种情形通常是从AMessage的对象调用postAndAwaitResponse开始的,这个函数有一个参数是sp<AMessage>*类型了,sp包含的已经是指针了,那这里显然是一个out型的参数,也就是在调用结束后这个参数会被赋值。我们先看看这个函数的定义。
- status_t AMessage::postAndAwaitResponse(sp<AMessage> *response) {
- sp<ALooper> looper = mLooper.promote();
- if (looper == NULL) {
- ALOGW("failed to post message as target looper for handler %d is gone.", mTarget);
- return -ENOENT;
- }
- sp<AReplyToken> token = looper->createReplyToken();
- if (token == NULL) {
- ALOGE("failed to create reply token");
- return -ENOMEM;
- }
- setObject("replyID", token);
- looper->post(this, 0 /* delayUs */);
- return looper->awaitResponse(token, response);
- }
- // to be called by AMessage::postAndAwaitResponse only
- status_t ALooper::awaitResponse(const sp<AReplyToken> &replyToken, sp<AMessage> *response) {
- // return status in case we want to handle an interrupted wait
- Mutex::Autolock autoLock(mRepliesLock);
- CHECK(replyToken != NULL);
- while (!replyToken->retrieveReply(response)) {
- {
- Mutex::Autolock autoLock(mLock);
- if (mThread == NULL) {
- return -ENOENT;
- }
- }
- mRepliesCondition.wait(mRepliesLock);
- }
- return OK;
- }
- status_t ALooper::postReply(const sp<AReplyToken> &replyToken, const sp<AMessage> &reply) {
- Mutex::Autolock autoLock(mRepliesLock);
- status_t err = replyToken->setReply(reply);
- if (err == OK) {
- mRepliesCondition.broadcast();
- }
- return err;
- }
- status_t AReplyToken::setReply(const sp<AMessage> &reply) {
- if (mReplied) {
- ALOGE("trying to post a duplicate reply");
- return -EBUSY;
- }
- CHECK(mReply == NULL);
- mReply = reply;
- mReplied = true;
- return OK;
- }
- bool retrieveReply(sp<AMessage> *reply) {
- if (mReplied) {
- *reply = mReply;
- mReply.clear();
- }
- return mReplied;
- }
- // sets the reply for this token. returns OK or error
- status_t setReply(const sp<AMessage> &reply);
- status_t AMessage::postReply(const sp<AReplyToken> &replyToken) {
- if (replyToken == NULL) {
- ALOGW("failed to post reply to a NULL token");
- return -ENOENT;
- }
- sp<ALooper> looper = replyToken->getLooper();
- if (looper == NULL) {
- ALOGW("failed to post reply as target looper is gone.");
- return -ENOENT;
- }
- return looper->postReply(replyToken, this);
- }
- bool AMessage::senderAwaitsResponse(sp<AReplyToken> *replyToken) {
- sp<RefBase> tmp;
- bool found = findObject("replyID", &tmp);
- if (!found) {
- return false;
- }
- *replyToken = static_cast<AReplyToken *>(tmp.get());
- tmp.clear();
- setObject("replyID", tmp);
- // TODO: delete Object instead of setting it to NULL
- return *replyToken != NULL;
- }
- sp<AReplyToken> replyID;
- CHECK(msg->senderAwaitsResponse(&replyID));
- sp<AMessage> response = new AMessage;
- response->postReply(replyID);