(转)测试Lighttpd accept的惊群现象

转载自:http://www.javaeye.com/topic/382107

lighttpd里面采用的是prefork的模型,在fork进程之前就已经创建好了listen socket
那么fork了进程池之后,所有进程都有一份自己独立的listen socket fd,

但实际上这个独立的fd 对应的确是一个文件表项,即实际上任然是一个共享的文件描述符

在阻塞模型中,各进程分别通过accept阻塞,等待连接到达,当一个连接到达时,所有的进程都会被唤醒,但只有其中一个进程可以成功accept该连接,其余的则继续投入睡眠,这就是所谓的惊群现象

lighttpd使用的是非阻塞IO复用模型,测试一下是否会有惊群现象呢?

先把结论给出:

1.比如有20个进程注册了listen socket的请求连接事件,当一个连接到达确实会有多个进程 被通知有事件要处理(但不是全部,大约只有5,6个进程)
2.被唤醒的这几个进程会调用accept函数,其中只有一个成功返回连接fd,其余进程均返回EAGAIN或者 EWOULDBLOCK错误(因为是非阻塞的)

测试方法,自己写了一个prefork进程 + epoll的非阻塞server,启动20个进程,client telnet,打印服务器日志

try to accept new connection,pid=29879
try to accept new connection,pid=29876
try to accept new connection,pid=29880

process 29879 accept connection

accept EAGAIN error pid=29876
try to accept new connection,pid=29875
accept EAGAIN error pid=29880
accept EAGAIN error pid=29875

四个进程被通知有事件处理,1个成功accept,3个返回EAGAIN

在lighttpd中,server当被通知有连接要处理时,server会通过循环执行
accept,直到返回错误,或者超过一个上限值

这样,当海量请求连接到达时,似乎惊群不会带来太多的性能损耗。

enum conn_states {
    conn_listening,  /** the socket which listens for connections */
    conn_read,       /** reading in a command line */
    conn_write,      /** writing out a simple response */
    conn_nread,      /** reading in a fixed number of bytes */
    conn_swallow,    /** swallowing unnecessary bytes w/o storing */
    conn_closing,    /** closing this connection */
    conn_mwrite,     /** writing out many items sequentially */
};
typedef struct{
    int fd;
    int state;
}conn;

#include<stdio.h>
#include<unistd.h>
#include<sys/types.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<errno.h>
#include<fcntl.h>
#include<netinet/in.h>
#include<sys/resource.h>
#include "event.h"
#include "base.h"

//forward declaration

static fdevents *ev;
static conn **conns;
static int freetotal;
static int freecurr;

static int create_listen_fd(char *addr,int port);
static int conn_init();
static conn *get_conn_from_freelist();
static int add_conn_to_freelist(conn *c);
conn *conn_new(int fd,int state);

static int conn_init(){
    freetotal=200;
    freecurr=0;
    conns=(conn **)malloc(freetotal * sizeof(*conns));
    if(!conns){
        return -1;
    }
    return 0;
}
static conn *get_conn_from_freelist(){
    conn *con;
    if(freecurr > 0){
        con=conns[--freecurr];
        conns[freecurr]=NULL;
        return con;
    }
    return NULL;
}
static int add_conn_to_freelist(conn *c){
    if(freecurr<freetotal){
        conns[freecurr++]=c;
        return 0;
    }else{
        conn **new_conns=(conn **)realloc(conns,sizeof(*new_conns)*2*freetotal);
        if(new_conns){
            freetotal*=2;
            conns=new_conns;
            conns[freecurr++]=c;
            return 0;
        }
    }
    return -1;
}
conn *conn_new(int fd,int state){
    conn *c;
    c=get_conn_from_freelist();
    if(!c){
        c=(conn *)malloc(sizeof(*c));
    }

    c->fd=fd;
    c->state=state;

    return c;

}
static int create_listen_fd(char *addr,int port){
    int fd,val,flags;
    struct sockaddr_in sockaddr;
    fd=socket(AF_INET,SOCK_STREAM,0);
    if(fd==-1){
        fprintf(stderr,"socket()\n");
        return -1;
    }
    val=1;
    if(setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,&val,sizeof(val))<0){
        fprintf(stderr,"reuseaddr\n");
        return -1;
    }
    if((flags=fcntl(fd,F_GETFL,0)<0) || fcntl(fd,F_SETFL,flags | O_NONBLOCK) < 0){
        fprintf(stderr,"nonblocking\n");
        return -1;
    }

    bzero(&sockaddr,sizeof(sockaddr));
    sockaddr.sin_family=AF_INET;
    sockaddr.sin_port=htons(port);
    inet_pton(AF_INET,addr,&sockaddr.sin_addr);

    if(bind(fd,(struct sockaddr *)&sockaddr,sizeof(sockaddr))<0){
        fprintf(stderr,"bind error %s",strerror(errno));
        return -1;
    }
    if(listen(fd,2048)<0){
        fprintf(stderr,"listen %s",strerror(errno));
        return -1;
    }
    return fd;
}
void event_handler(int fd,void *ctx, int revents){
    struct sockaddr_in addr;
    socklen_t sock_len;
    int done=0,connfd;
    conn *c;
    c=(conn *)ctx;
    while(!done){
        switch(c->state){
            case conn_listening:
                printf("try to accept new connection,pid=%d\n",getpid());
                sock_len=sizeof(addr);
                connfd=accept(fd,(struct sockaddr *)&addr,&sock_len);
                if(connfd>0){
                    printf("process %d accept connection\n",getpid());
                    c = conn_new(connfd,conn_read);
                    fdevent_register(ev,connfd,event_handler,c);
                    fdevent_event_add(ev,connfd,FDEVENT_IN);
                }else{
                        if(errno== EAGAIN || errno == EWOULDBLOCK){
                            printf("accept EAGAIN error pid=%d\n",getpid());
                        }
                        if(errno==EINTR){
                            printf("accept EINTR error pid=%d\n",getpid());
                        }
                        if(errno==ECONNABORTED){ /* this is a FreeBSD thingy */
                            printf("accept EABORTED error pid=%d\n",getpid());
                        }
                        if(errno==EMFILE){
                            printf("accept EMFILE error pid=%d\n",getpid());
                        }
                }
                done=1;
                break;
            case conn_read:
                printf("on read");
                break;
        }
    }
}
int main(int argc,char **argv){
    int fd,o;
    char *listen_addr;
    int port,num_childs,max_fds;
    struct rlimit rlim;
    conn *c;
    port=0;
    num_childs=5;
    while(-1!=(o=getopt(argc,argv,"l:p:f:h"))){
        switch(o){
            case 'l':
                listen_addr=strdup(optarg);
                break;
            case 'p':
                port=atoi(optarg);
                break;
            case 'f':
                num_childs=atoi(optarg);
                break;
            case 'h':
                printf("Usage -l listen addr\n");
                printf("Usage -p listen port \n");
                printf("Usage -f fork num\n");
                exit(1);
        }
    }
    if(!listen_addr){
        listen_addr=strdup("127.0.0.1");
    }
    if(!port){
        printf("port is unknown\n");
        exit(1);
    }
    if(0 != getrlimit(RLIMIT_NOFILE,&rlim)){
        fprintf(stderr,"getrlimit failed.reason %s\n",strerror(errno));
        exit(1);
    }

    max_fds=rlim.rlim_cur;

    //create listen socket
    if(-1==(fd=create_listen_fd(listen_addr,port))){
        fprintf(stderr,"create listen fd failed\n");
        exit(1);
    }
    //prefork child
    if(num_childs > 0){
        int child=0;
        while(!child){
            if(num_childs >0){
                switch(fork()){
                    case -1:
                        return -1;
                    case 0:
                        child=1;
                        break;
                    default:
                        num_childs--;
                        break;
                }
            }else{
                int status;
                if(-1 !=wait(&status)){
                    num_childs++;
                }else{
                    //ignore
                }
            }
        }
    }
    //child process event
    conn_init();
    c=conn_new(fd,conn_listening);
    ev=fdevent_init(max_fds);
    if(!ev){
        fprintf(stderr,"fdevent_init()\n");
        exit(1);
    }
    fdevent_register(ev,fd,event_handler,c);
    fdevent_event_add(ev,fd,FDEVENT_IN);
    fdevent_poll(ev,1000);
}

Leave a Reply

Your email address will not be published.