关于多路IO写服务器和使用多线程、多进程写服务器的区别(简单理解)
那这篇博客主要是承接上面的poll select epoll的区别,上升到多线程、多进程和高级IO复用的区别了。
https://blog.****.net/lailaiquququ11/article/details/83066593
那我们还是从代码开始说起,先看看,简单的多线程和多进程的代码,然后在进行补充说明:
使用多线程和多进程也能够实现多个客户端和服务器的数据收发的功能。那么所有的监听连接请求和请求的接受处理都会由服务器来处理,即用户自定义的程序来处理(就是我们自己编写的server.c的代码了),这在大型的项目,是极其不推荐的,它不仅降低了程序执行的效率,而且还占用了大量的CPU资源(即accept去监听)。
用户的应用来处理。若使用多路I/O转接,那么 这里关于监听和连接的不再是由用户自己处理,取而代之由内核应用程序监视文件。(即把内核请过来当帮手,来监听相应的客户端,当内核接受客户端的连接请求,给应用程序(server.c)一个反馈,然后应用程序立即与客户端建立连接(不需要再去accept()等待,此部分为链接的部分),然后把它继续放到内核里面去监听,当再次反馈给应用程序的时候,就是有数据过来需要读取了(就不在需要read()的等待)。不然应用程序会在accept(),read()部分会被阻塞。这里使用这种机制,可以当有这些事件发生了,应用程序才去处理,这就空出了很多的时间,不再是阻塞,应用程序可以去干其他的事情。整个机制类似于signal()的功能)
内核反馈的时候是一定有事件发生了,具体是什么事件,建立连接,读事件或者写事件,应用程序处理的时候就需要去具体区分。
这就是使用整个多路IO与多进程多线程的简单区别。
1、多进程并发服务器
使用多进程并发服务器时要考虑以下几点:
- 父进程最大文件描述个数(父进程中需要close关闭accept返回的新文件描述符)
- 系统内创建进程个数(与内存大小相关)
- 进程创建过多是否降低整体服务性能(进程调度)
2、多线程并发服务器
在使用线程模型开发服务器时需考虑以下问题:
- 调整进程内最大文件描述符上限
- 线程如有共享数据,考虑线程同步
- 服务于客户端线程退出时,退出处理。(退出值,分离态)
- 系统负载,随着链接客户端增加,导致其它线程不能及时得到CPU
3、多路I/O转接服务器
多路IO转接服务器也叫做多任务IO服务器。该类服务器实现的主旨思想是,不再由应用程序自己监视客户端连接,取而代之由内核替应用程序监视文件。
主要使用的方法有三种
select、poll、epoll
多进程服务器:
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <signal.h>
#define SER_PORT 7777
#define SIZE 1024
static pthread_rwlock_t rw;
struct node{
char ip[16];
int sfd;
struct node *next,*prev;
}head={.next=&head,.prev=&head};
int appendList(int sfd,char *ip);
void deleteList(int sfd);
/////////////////////////////////////////////
int initSocket(void);
void *listenMsg(void *arg);
int main(void)
{
pthread_t tid;
long ret,newsfd,sockfd;
struct sockaddr_in peer;
socklen_t len=sizeof(peer);
char buf[SIZE+32];
struct node *i;
signal(SIGPIPE,SIG_IGN);//!!!!!!!!!!!!!!!!!!!!!
if(pthread_rwlock_init(&rw,NULL)!=0)
{
fprintf(stderr,"init rwlock failed.\n");
return 2;
}
sockfd=initSocket();
if(sockfd<0) return 1;
printf("server start success.\n");
while(1)
{
newsfd=accept(sockfd,(struct sockaddr *)&peer,&len);
if(newsfd==-1) continue;//failed
pthread_rwlock_wrlock(&rw);
//记录连接者的套接字 便于转发
ret=appendList(newsfd,inet_ntoa(peer.sin_addr));
if(ret!=0)
{
send(newsfd,"服务器:\n 无法进入聊天室.\n",strlen("服务器:\n 无法进入聊天室.\n"),0);
shutdown(newsfd,SHUT_RDWR);
continue;
}
//新建线程
ret=pthread_create(&tid,NULL,listenMsg,(void *)newsfd);
if(ret!=0)
{
deleteList(newsfd);
send(newsfd,"服务器:\n 无法进入聊天室.\n",strlen("服务器:\n 无法进入聊天室.\n"),0);
shutdown(newsfd,SHUT_RDWR);
}
//
snprintf(buf,SIZE,"[服务器]:\n 欢迎 %s 进入聊天室!\n",inet_ntoa(peer.sin_addr));
for(i=head.next;i!=&head;i=i->next)
{
send(i->sfd,buf,strlen(buf),0);
}
pthread_rwlock_unlock(&rw);
}
return 0;
}
void *listenMsg(void *arg)
{
long ret,len,sfd=(long)arg;
char buf[SIZE+32];
struct node *i,*my;
pthread_detach(pthread_self());
//找到自己对应的信息
pthread_rwlock_rdlock(&rw);
for(i=head.next;i!=&head;i=i->next)
{
if(i->sfd==sfd)
{
my=i;
break;
}
}
pthread_rwlock_unlock(&rw);
printf("\033[36mFrom:%s\033[0m\n",my->ip);
//消息首部
snprintf(buf,SIZE,"[%s]:\n ",my->ip);
len=strlen(buf);
while(1)
{
ret=recv(sfd,buf+len,SIZE,0);
if(ret==0||ret==-1) break;
pthread_rwlock_rdlock(&rw);
//转发给所有再线者
for(i=head.next;i!=&head;i=i->next)
{
send(i->sfd,buf,len+ret,0);
}
pthread_rwlock_unlock(&rw);
sched_yield();
//unlock
}
//移除该连接 deleteList(sfd);
pthread_rwlock_wrlock(&rw);
printf("\033[31mExit:%s\033[0m\n",my->ip);
my->next->prev=my->prev;
my->prev->next=my->next;
free(my);
pthread_rwlock_unlock(&rw);
//
shutdown(sfd,SHUT_RDWR);
pthread_exit((void *)0);
}
int initSocket(void)
{
int sfd,ret;
struct sockaddr_in addr;
sfd=socket(AF_INET,SOCK_STREAM,0);
if(sfd==-1)
{
perror("create socket");
return -1;
}
addr.sin_family=AF_INET;
addr.sin_port =htons(SER_PORT);
addr.sin_addr.s_addr=INADDR_ANY;
memset(addr.sin_zero,0,8);
ret=bind(sfd,(struct sockaddr *)&addr,sizeof(addr));
if(ret==-1)
{
perror("bind");
shutdown(sfd,SHUT_RDWR);
return -2;
}
if(listen(sfd,100)==-1)
{
perror("listen");
shutdown(sfd,SHUT_RDWR);
return -3;
}
return sfd;
}
int appendList(int sfd,char *ip)
{
struct node *one;
one=malloc(sizeof(struct node));
if(one==NULL) return -1;
one->sfd=sfd;
strcpy(one->ip,ip);
one->next=&head;
one->prev=head.prev;
one->next->prev=one;
one->prev->next=one;
return 0;
}
void deleteList(int sfd)
{
struct node *i;
for(i=head.next;i!=&head;i=i->next)
{
if(i->sfd==sfd)//del
{
i->next->prev=i->prev;
i->prev->next=i->next;
free(i);
break;
}
}
}
多线程服务器:
因为会涉及到文件描述符使用时的覆盖,所以使用可加入互斥,保证原子操作。
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <pthread.h>
#include <signal.h>
pthread_mutex_t mm=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t ok=PTHREAD_COND_INITIALIZER;
static int flag=1;
//端口号1--65535 建议用1024之后的
#define SER_PORT 7777
#define SER_IP "192.168.122.1"
#define MAX_WAIT 10
void *doServer(void *arg)
{
int ret,i,newsfd;
pthread_detach(pthread_self());//分离
//再保护的状态下取newsfd
pthread_mutex_lock(&mm);
newsfd=*(int *)arg;
flag=0;
pthread_mutex_unlock(&mm);
pthread_cond_signal(&ok);
for(i=0;i<10;i++)
{
//如果接收放掉线(关闭) 如同写一个没有读端的管道 收到SIGPIPE信号
ret=write(newsfd,"hello",5);
if(ret==-1)
{
printf("客户端失去连接.\n");
shutdown(newsfd,SHUT_RDWR);
pthread_exit((void *)0);
}
sleep(1);
}
write(newsfd,"bye",3);
shutdown(newsfd,SHUT_RDWR);
pthread_exit((void *)0);
}
int main(void)
{
int sockfd,ret;
pthread_t tid;
//避免当客户端再服务器间意外失去连接,服务线程发送数据时导致触发SIGPIPE 使服务进程死亡
signal(SIGPIPE,SIG_IGN);
//0选默认协议
sockfd=socket(AF_INET,SOCK_STREAM,0);
if(sockfd==-1)
{
perror("create socket");
return 1;
}
//描述主机的结构信息
struct sockaddr_in addr;
addr.sin_family=AF_INET;//指定地址协议
addr.sin_port =htons(SER_PORT);//网络字节序(大端字节字节序)
//addr.sin_addr.s_addr=inet_addr(SER_IP);
addr.sin_addr.s_addr=INADDR_ANY;//指本机的所有IP(包括127.0.0.1 广播 多播)
memset(addr.sin_zero,0,8);//填补字节 为匹配之前的结构struct sockaddr
ret=bind(sockfd,(struct sockaddr *)&addr,sizeof(addr));
if(ret==-1)
{
perror("bind");
shutdown(sockfd,SHUT_RDWR);
return 2;
}
//监听:MAX_WAIT<半连接队列大小>
ret=listen(sockfd,MAX_WAIT);
if(ret==-1)
{
perror("bind");
shutdown(sockfd,SHUT_RDWR);
return 2;
}
printf("Server Start Success,Wait Connect...\n");
//////////////////
int newsfd;
struct sockaddr_in from;
socklen_t len=sizeof(from);
while(1)
{
//连接没有到来则阻塞
//newsfd=accept(sockfd,NULL,NULL);
newsfd=accept(sockfd,(struct sockaddr *)&from,&len);
if(newsfd==-1) continue;//处理失败
printf("sockfd:%d newsfd:%d from:%s<%d>\n",\
sockfd,newsfd,inet_ntoa(from.sin_addr),ntohs(from.sin_port));
//建立进程或线程服务
ret=pthread_create(&tid,NULL,doServer,&newsfd);
if(ret!=0)
{
write(newsfd,"sorry,can't server",sizeof("sorry,can't server"));
shutdown(newsfd,SHUT_RDWR);
continue;
}
//等待线程将newsfd取走 避免竞争
pthread_mutex_lock(&mm);
while(flag!=0)
{
pthread_cond_wait(&ok,&mm);
}
flag=1;
pthread_mutex_unlock(&mm);
}
return 0;
}
测试的客户端代码:
client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <netinet/in.h>
#define MAXLINE 80
#define IP "127.0.0.1"
#define SERV_PORT 7777
int main(int argc, char* argv[])
{
struct sockaddr_in servaddr;
char buf[MAXLINE];
int sockfd, n;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
bzero(&servaddr, sizeof(servaddr));
servaddr.sin_family = AF_INET;
inet_pton(AF_INET, IP, &servaddr.sin_addr);
servaddr.sin_port = htons(SERV_PORT);
connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr));
while(fgets(buf, MAXLINE, stdin) != NULL)
{
write(sockfd, buf, strlen(buf));
n = read(sockfd, buf, MAXLINE);
if(n == 0)
{
printf("the other side has been closed.\n");
}
else
{
write(STDOUT_FILENO, buf, n);
}
}
close(sockfd);
return 0;
}