Tomcat源码分析(三)------ 可携带状态的线程池

最近想实现一个可携带状态的线程池,具体需求就是池中的线程被用来处理某种信息,而此信息可视为线程所依赖的外部状态。如果用简单的线程池来实现,线程初始化时就得赋予某些信息,使得线程无法被再次利用。在看老版Tomcat的源码时,找到了答案,其实现思路主要是利用了线程的等待和唤起,HttpProcessor的实现正好基于此思路,时序图如下所示:

Tomcat源码分析(三)------ 可携带状态的线程池

初始化HttpProcessor线程时,没法赋予所需的Socket对象,因为如果在初始化阶段就赋予Socket会导致此线程没法回收用来处理其他Socket。因此,在HttpProcessor的run阶段,先把线程给wait住,具体在await方法里体现,代码如下所示:

  1. /**
  2. *AwaitanewlyassignedSocketfromourConnector,or<code>null</code>
  3. *ifwearesupposedtoshutdown.
  4. */
  5. privatesynchronizedSocketawait(){
  6. //WaitfortheConnectortoprovideanewSocket
  7. while(!available){
  8. try{
  9. wait();
  10. }catch(InterruptedExceptione){
  11. }
  12. }
  13. //NotifytheConnectorthatwehavereceivedthisSocket
  14. Socketsocket=this.socket;
  15. available=false;
  16. notifyAll();
  17. if((debug>=1)&&(socket!=null))
  18. log("Theincomingrequesthasbeenawaited");
  19. return(socket);
  20. }

当HttpConnector调用HttpProcessor.assign(socket)方法时,会给此线程赋予Socket对象,并唤起此线程,使其继续执行,assign方法的源码如下所示:

  1. /**
  2. *ProcessanincomingTCP/IPconnectiononthespecifiedsocket.Any
  3. *exceptionthatoccursduringprocessingmustbeloggedandswallowed.
  4. *<b>NOTE</b>:ThismethodiscalledfromourConnector'sthread.We
  5. *mustassignittoourownthreadsothatmultiplesimultaneous
  6. *requestscanbehandled.
  7. *
  8. *@paramsocketTCPsockettoprocess
  9. */
  10. synchronizedvoidassign(Socketsocket){
  11. //WaitfortheProcessortogetthepreviousSocket
  12. while(available){
  13. try{
  14. wait();
  15. }catch(InterruptedExceptione){
  16. }
  17. }
  18. //StorethenewlyavailableSocketandnotifyourthread
  19. this.socket=socket;
  20. available=true;
  21. notifyAll();
  22. if((debug>=1)&&(socket!=null))
  23. log("Anincomingrequestisbeingassigned");
  24. }

线程被唤起和赋予socket对象后,继续执行核心的process方法,HttpProcessor.run的完整源码如下所示:

  1. /**
  2. *ThebackgroundthreadthatlistensforincomingTCP/IPconnectionsand
  3. *handsthemofftoanappropriateprocessor.
  4. */
  5. publicvoidrun(){
  6. //Processrequestsuntilwereceiveashutdownsignal
  7. while(!stopped){
  8. //Waitforthenextsockettobeassigned
  9. Socketsocket=await();
  10. if(socket==null)
  11. continue;
  12. //Processtherequestfromthissocket
  13. try{
  14. process(socket);
  15. }catch(Throwablet){
  16. log("process.invoke",t);
  17. }
  18. //Finishupthisrequest
  19. connector.recycle(this);
  20. }
  21. //TellthreadStop()wehaveshutourselvesdownsuccessfully
  22. synchronized(threadSync){
  23. threadSync.notifyAll();
  24. }
  25. }