memcached网络层操作详解
[liexusong原创]
memcached的网络层处理大概如下:
(1) main函数创建一个侦听服务端的socket, 然后添加到libevent中, 然后当有客户端连接时就会触发libevent事件, 并且调用drive_machine()函数, 就像上面图一样.
(2) drive_machine()函数会根据conn的state字段来判断一个要进行什么操作. memcached中有7种的操作状态, 定义如下:
enum conn_states {
conn_listening,
conn_read,
conn_write,
conn_nread,
conn_swallow,
conn_closing,
conn_mwrite
};
drive_machine()函数大概如下:
void drive_machine(conn *c) {
int exit = 0;
int sfd, flags = 1;
socklen_t addrlen;
struct sockaddr addr;
conn *newc;
int res;
while (!exit) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
if ((sfd = accept(c->sfd, &addr, &addrlen)) == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
exit = 1;
break;
} else {
perror("accept()");
}
break;
}
//设置socket为非阻塞
if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
perror("setting O_NONBLOCK");
close(sfd);
break;
}
newc = conn_new(sfd, conn_read, EV_READ | EV_PERSIST);
if (!newc) {
if (settings.verbose > 0)
fprintf(stderr, "couldn't create new connection\n");
close(sfd);
break;
}
break;
case conn_read:
if (try_read_command(c)) {
continue;
}
if (try_read_network(c)) {
continue;
}
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
… … …
}
… … …
}
}
drive_machine()函数大概就是根据conn *c的state字段来判断要进行的操作. 每个客户端连接到memcached之后都会把他添加到libevent, 然后就触发事件调用这个函数. 所以说这个函数是最重要的.
[1]conn_listening状态:
这个状态主要是侦听客户端连接, 把他设置为非阻塞, 然后通过调用conn_new()函数把他放到libevent中. conn_new()函数大概如下:
conn *conn_new(int sfd, int init_state, int event_flags) {
conn *c;
if (freecurr > 0) {
//如果在freeconns中有数据, 直接去freeconns中取
c = freeconns[--freecurr];
} else {
if (!(c = (conn *)malloc(sizeof(conn)))) {
perror("malloc()");
return 0;
}
c->rbuf = c->wbuf = 0;
c->ilist = 0;
c->rbuf = (char *) malloc(DATA_BUFFER_SIZE);//2048
c->wbuf = (char *) malloc(DATA_BUFFER_SIZE);//2048
c->ilist = (item **) malloc(sizeof(item *)*200);//分配200个item指针
//分配内存失败?
if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0) {
if (c->rbuf != 0) free(c->rbuf);
if (c->wbuf != 0) free(c->wbuf);
if (c->ilist !=0) free(c->ilist);
free(c);
perror("malloc()");
return 0;
}
c->rsize = c->wsize = DATA_BUFFER_SIZE;//2048
c->isize = 200;
stats.conn_structs++;//状态中增加一个conn
}
if (settings.verbose > 1) {
if (init_state == conn_listening)
fprintf(stderr, "<%d server listening\n", sfd);
else
fprintf(stderr, "<%d new client connection\n", sfd);
}
c->sfd = sfd;//保存socket文件描述符
c->state = init_state;//初始化的处理状态
c->rlbytes = 0;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;//指向当前的缓存位置
c->rcurr = c->rbuf;
c->icurr = c->ilist;
c->ileft = 0;
c->iptr = c->ibuf;
c->ibytes = 0;
c->write_and_go = conn_read;//写完后进行读操作
c->write_and_free = 0;
c->item = 0;
c->is_corked = 0;
//设置event事件
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1) {
free(c);
return 0;
}
stats.curr_conns++;
stats.total_conns++;
return c;
}
conn_new()函数主要是新建一个conn对象, 并且初始化他的属性.
[2]conn_read状态:
状态conn_read处理中有两个函数比较重要, 就是try_read_network() 和try_read_command(). try_read_network()是从网络端处读取数据, 当读到足够的数据时调用try_read_command()来处理数据, try_read_command()函数调用parse_command()函数来解析命令, 然后根据命令的类型来改变连接的状态.
try_read_network()函数如下:
int try_read_network(conn *c) {
int gotdata = 0;
int res;
while (1) {
if (c->rbytes >= c->rsize) {
char *new_rbuf = realloc(c->rbuf, c->rsize*2);
if (!new_rbuf) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't realloc input buffer\n");
c->rbytes = 0;
out_string(c, "SERVER_ERROR out of memory");
c->write_and_go = conn_closing;
return 1;
}
c->rbuf = new_rbuf; c->rsize *= 2;
}
res = read(c->sfd, c->rbuf + c->rbytes, c->rsize - c->rbytes);
if (res > 0) {
stats.bytes_read += res;
gotdata = 1;
c->rbytes += res;
continue;
}
if (res == 0) {
c->state = conn_closing;
return 1;
}
if (res == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) break;
else return 0;
}
}
return gotdata;
}
这个函数的主要操作是, 从客服端处读取数据, 直到没有数据可读为止[res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)]. 读取数据的同时改变conn的rbytes字段, 表示已经读到的数据大小. 当read()调用返回0时表示连接已经断开, 修改连接的状态为关闭, 下次操作会从libevent中去掉.
try_read_command()函数如下:
int try_read_command(conn *c) {
char *el, *cont;
if (!c->rbytes)
return 0;
el = memchr(c->rbuf, '\n', c->rbytes);
if (!el)
return 0;
cont = el + 1;
//略过\r\n
if (el - c->rbuf > 1 && *(el - 1) == '\r') {
el--;
}
*el = '\0';
process_command(c, c->rbuf);
if (cont - c->rbuf < c->rbytes) {
memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));
}
c->rbytes -= (cont - c->rbuf);
return 1;
}
try_read_command()函数主要是当从客户端读取的数据足够时才会处理的, 就是当读到有换行符\n时才会处理, 因为命令一般是以换行符结束的. 然后通过调用process_command()来解析命令. 解析完命令后, 把命令从缓存中去掉, 剩下剩余的数据, 操作如下:
if (cont - c->rbuf < c->rbytes) {
memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));
}
效果如下图:
其中绿色段是数据, 黄色段是命令, 处理完命令后, 会用数据段把命令段覆盖, 是通过memmove(c->rbuf, cont, c->rbytes - (cont - c->rbuf));这条语句来处理的.
process_command()函数如下:
void process_command(conn *c, char *command) {
int comm = 0;
int incr = 0;
if (settings.verbose > 1)
fprintf(stderr, "<%d %s\n", c->sfd, command);
set_cork(c, 1);
if ((strncmp(command, "add ", 4) == 0 && (comm = NREAD_ADD)) ||
(strncmp(command, "set ", 4) == 0 && (comm = NREAD_SET)) ||
(strncmp(command, "replace ", 8) == 0 && (comm = NREAD_REPLACE))) {
char key[251];
int flags;
time_t expire;
int len, res;
item *it;
//<command name> <key> <flags> <exptime> <bytes>\r\n
//忽略command
res = sscanf(command, "%*s %0s %u %lu %d\n", key, &flags, &expire, &len);
if (res!=4 || strlen(key)==0 ) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
expire = realtime(expire);
it = item_alloc(key, flags, expire, len+2);//alloc new item
if (it == 0) {
out_string(c, "SERVER_ERROR out of memory");
c->write_and_go = conn_swallow;
c->sbytes = len+2;
return;
}
c->item_comm = comm;
c->item = it;
c->rcurr = ITEM_data(it);//point to item data area (指向item的数据域)
c->rlbytes = it->nbytes;
c->state = conn_nread;
return;
}
… …
}
process_command()函数主要是解析命令, 然后进行制定的操作. 如上代码, 就是处理add/set/replace命令的操作.最后完成add/set/replace操作后, 会把连接的状态设置为conn_nread. conn_nread状态的操作是从客户端中读取完整的数据. 然后完成add/set/replace的所有操作, 如把数据存储在item中, 并且连接这个item. 最后把conn的rcurr指针指向item的数据域. 这样做是为了下面的conn_nread操作时可以直接把数据读到item的数据域中.
[3]conn_nread状态:
conn_nread状态是要从客户端连接处读取没有读完的数据, 并且把数据放到在conn_read状态时的item的数据域中.
[4]conn_write状态:
说conn_write状态之前先要说说out_string()函数, 定义如下:
void out_string(conn *c, char *str) {
int len;
if (settings.verbose > 1)
fprintf(stderr, ">%d %s\n", c->sfd, str);
len = strlen(str);
if (len + 2 > c->wsize) {
str = "SERVER_ERROR output line too long";
len = strlen(str);
}
strcpy(c->wbuf, str);
strcat(c->wbuf, "\r\n");
c->wbytes = len + 2;
c->wcurr = c->wbuf;
c->state = conn_write;
c->write_and_go = conn_read;
return;
}
就是把str字符串写到conn的wbuf缓存中, 然后设置conn的状态为conn_write, 表示下次操作要进行conn_write相关的操作, 最后设置conn的write_and_go为conn_read, 表示写操作完毕以后要进行读操作.
而conn_write状态对应的操作就是: 把wbuf缓存的数据写到客户端连接处, 操作如下:
case conn_write:
if (c->wbytes == 0) {
if (c->write_and_free) {
free(c->write_and_free);
c->write_and_free = 0;
}
c->state = c->write_and_go;
if (c->state == conn_read)
set_cork(c, 0);
break;
}
res = write(c->sfd, c->wcurr, c->wbytes);
if (res > 0) {
stats.bytes_written += res;
c->wcurr += res;
c->wbytes -= res;
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
}
if (settings.verbose > 0)
fprintf(stderr, "Failed to write, and not due to blocking\n");
c->state = conn_closing;
break;
conn_write状态操作一般是调用out_string()函数设置的, 作用一般都是把一些消息发给客户端连接. 调用out_string()函数一般会立即执行conn_read状态对应的操作. 因为out_string()没有修改libevent的事件, 所以要立即执行conn_read状态对应的操作才行, 不然就不会触发相关的事件. 而执行conn_read状态操作时, 如果客户端不可写(write调用返回-1, 并且errno == EAGAIN || errno == EWOULDBLOCK), 那么就会修改libevent的事件为写(EV_WRITE) [ update_event(c, EV_WRITE | EV_PERSIST)], 之后就等libevent事件触发才再次执行conn_write状态对应的操作了.
[5]conn_mwrite状态:
conn_mwrite状态是当用户要执行get命令时设置的. get命令处理如下:
if (strncmp(command, "get ", 4) == 0) {
char *start = command + 4;
char key[251];
int next;
int i = 0;
item *it;
time_t now = time(0);
while(sscanf(start, " %0s%n", key, &next) >= 1) {
start+=next;
stats.get_cmds++;
it = assoc_find(key);
if (it && (it->it_flags & ITEM_DELETED)) {
it = 0;
}
if (settings.oldest_live && it &&
it->time <= settings.oldest_live) {
it = 0;
}
if (it && it->exptime && it->exptime < now) {
item_unlink(it);
it = 0;
}
if (it) {
stats.get_hits++;
it->refcount++;
item_update(it);
*(c->ilist + i) = it;
i++;
if (i > c->isize) {
c->isize *= 2;
c->ilist = realloc(c->ilist, sizeof(item *)*c->isize);
}
} else stats.get_misses++;
}
c->icurr = c->ilist;
c->ileft = i;
if (c->ileft) {
c->ipart = 0;
c->state = conn_mwrite;
c->ibytes = 0;
return;
} else {
out_string(c, "END");
return;
}
}
此操作主要是根据用户提供的key来查找相关的item, 并且把item方法conn的item列表中供conn_mwrite状态操作时使用.
conn_mrwrite操作时最复杂的, 代码如下:
case conn_mwrite:
if (c->ibytes > 0) {
res = write(c->sfd, c->iptr, c->ibytes);
if (res > 0) {
stats.bytes_written += res;
c->iptr += res;
c->ibytes -= res;
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
}
if (settings.verbose > 0)
fprintf(stderr, "Failed to write, and not due to blocking\n");
c->state = conn_closing;
break;
} else {
item *it;
switch (c->ipart) {
case 1:
it = *(c->icurr);
assert((it->it_flags & ITEM_SLABBED) == 0);
c->iptr = ITEM_data(it);
c->ibytes = it->nbytes;
c->ipart = 2;
break;
case 2:
it = *(c->icurr);
item_remove(it);
c->ileft--;
if (c->ileft <= 0) {
c->ipart = 3;
break;
} else {
c->icurr++;
}
case 0:
it = *(c->icurr);
assert((it->it_flags & ITEM_SLABBED) == 0);
sprintf(c->ibuf, "VALUE %s %u %u\r\n", ITEM_key(it), it->flags, it->nbytes - 2);
if (settings.verbose > 1)
fprintf(stderr, ">%d sending key %s\n", c->sfd, ITEM_key(it));
c->iptr = c->ibuf;
c->ibytes = strlen(c->iptr);
c->ipart = 1;
break;
case 3:
out_string(c, "END");
break;
}
}
break;
这个操作主要是把get命令处获取到的item的数据发送到客户端连接.
主要难点是: conn_mwrite操作是遍历所有获取到的item, 然后把item的数据发送给客户端.
遍历的时候使用了一个switch来处理, 主要有4中情况, 如下:
(1) 0是发生key和数据的信息.
(2) 1是发生数据data.
(3) 2是减少item的refcount和把当前item指针向后移动一个, 并且继续执行(1)的操作.
(4) 3是完成操作.
每次ibuf有数据的时候都会进行写操作, 代码如下:
if (c->ibytes > 0) {
res = write(c->sfd, c->iptr, c->ibytes);
if (res > 0) {
stats.bytes_written += res;
c->iptr += res;
c->ibytes -= res;
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_WRITE | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
}
if (settings.verbose > 0)
fprintf(stderr, "Failed to write, and not due to blocking\n");
c->state = conn_closing;
break;
}
当ibuf没有数据时, 才会遍历从get命令操作取得的item列表, 并取得数据复制到ibuf中.
[6] conn_swallow状态:
conn_swallow状态是当内存不足并且调用add/set/replace命令的时候才会出现, 此操作主要是读取客户端发送过来的数据, 并且忽略掉. 操作如下:
case conn_swallow:
if (c->sbytes == 0) {
c->state = conn_read;
break;
}
if (c->rbytes > 0) {
int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;
c->sbytes -= tocopy;
if (c->rbytes > tocopy) {
memmove(c->rbuf, c->rbuf+tocopy, c->rbytes - tocopy);
}
c->rbytes -= tocopy;
break;
}
res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);
if (res > 0) {
stats.bytes_read += res;
c->sbytes -= res;
break;
}
if (res == 0) {
c->state = conn_closing;
break;
}
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
c->state = conn_closing;
break;
}
exit = 1;
break;
}
if (settings.verbose > 0)
fprintf(stderr, "Failed to read, and not due to blocking\n");
c->state = conn_closing;
break;
至此memcached所有的网络层操作已经全部分析完.