diff --git a/README.md b/README.md index d3e2622..a5f92e2 100644 --- a/README.md +++ b/README.md @@ -31,6 +31,7 @@ For server - DEFAULT_MAX_THREAD_NUMBER(server only): 10 - DEFAULT_RESPONSE_REQUEST(p-server only): 3(-1 is INF) - USE_TRACE +- USE_NO_QUEUE For client - MUL_CLIENT(second unit) diff --git a/server.c b/server.c index f3ec465..d74fde3 100644 --- a/server.c +++ b/server.c @@ -60,6 +60,8 @@ enum{ MAX_THREAD_NUMBER = DEFAULT_MAX_THREAD_NUMBER #endif }; +#define USE_TRACE +#define USE_NO_QUEUE /*======== *Operation @@ -233,6 +235,13 @@ int parse_args(int argc,const char * argv[] , in_port_t * port){ //============ //Simple Thread Pool //============ +#ifdef USE_TRACE +enum{ + Trace_Timer_ID = CLOCK_PROCESS_CPUTIME_ID +}; +#endif + +#ifndef USE_NO_QUEUE typedef struct SharedState{ //empty if less than 0 queue_struct(int,WORK_QUEUE_SIZE) socks; @@ -244,11 +253,6 @@ typedef struct SharedState{ //int progress[MAX_THREAD_NUMBER]; } shared_state_t; -#ifdef USE_TRACE -enum{ - Trace_Timer_ID = CLOCK_PROCESS_CPUTIME_ID -}; -#endif void init_shared_state(shared_state_t * state) { queue_init(&state->socks); #ifdef USE_TRACE @@ -257,22 +261,32 @@ void init_shared_state(shared_state_t * state) { pthread_mutex_init(&state->sock_mutex,NULL); pthread_cond_init(&state->ready,NULL); } +#endif // typedef struct WorkerArgument { int id; int bufsize; uint8_t * buf; - - //pthread_mutex_t * cond_mutex; - //pthread_cond_t cond; - +#ifdef USE_NO_QUEUE + int csock; +#ifdef USE_TRACE + struct timespec ts; +#endif +#endif } worker_argument_t; -__attribute_malloc__ worker_argument_t * create_worker_argument(int id, int bufsize){ +__attribute_malloc__ worker_argument_t * create_worker_argument(int id, int bufsize +#ifdef USE_NO_QUEUE +, int csock +#endif +){ worker_argument_t * ret = (worker_argument_t *)malloc(sizeof(worker_argument_t)); if (ret == NULL) return ret; ret->id = id; +#ifdef USE_NO_QUEUE + ret->csock = csock; +#endif ret->bufsize = bufsize; ret->buf = (uint8_t *)malloc(sizeof(*ret->buf)*bufsize); if(ret->buf == NULL){ @@ -281,45 +295,76 @@ __attribute_malloc__ worker_argument_t * create_worker_argument(int id, int bufs } return ret; } - +void destory_worker_argument(worker_argument_t * arg){ + free(arg->buf); + free(arg); +} +#ifndef USE_NO_QUEUE static shared_state_t globalState; - void * worker_proc(void * data){ worker_argument_t * args = (worker_argument_t *)data; int fd, csock; -#ifdef USE_TRACE + #ifdef USE_TRACE struct timespec ts,ts_middle,ts_end; -#endif + #endif for(;;){ + pthread_mutex_lock(&globalState.sock_mutex); while (queue_isempty(&globalState.socks)){ pthread_cond_wait(&globalState.ready,&globalState.sock_mutex); } csock = dequeue(&globalState.socks); -#ifdef USE_TRACE + #ifdef USE_TRACE ts = dequeue(&globalState.trace_timer); -#endif + #endif pthread_mutex_unlock(&globalState.sock_mutex); -#ifdef USE_TRACE + #ifdef USE_TRACE clock_gettime(Trace_Timer_ID,&ts_middle); -#endif + #endif if((fd = read_request(csock,args->buf,args->bufsize)) > 0){ send_response(csock,fd,args->buf,args->bufsize); close(fd); } -#ifdef USE_TRACE + #ifdef USE_TRACE clock_gettime(Trace_Timer_ID,&ts_end); struct timespec tophalf = timespec_sub(ts_middle,ts); struct timespec bottomhelf = timespec_sub(ts_end,ts_middle); fprintf(stderr,"top : %ld ns, bottom : %ld ns\n",tophalf.tv_nsec,bottomhelf.tv_nsec); -#endif - + #endif if(close(csock) < 0) perror("csock close error"); } + destory_worker_argument(args); + return NULL; } - static pthread_t worker_threads[MAX_THREAD_NUMBER]; +#else +void * worker_proc(void * data){ + worker_argument_t * args = (worker_argument_t *)data; + int fd, csock; + csock = args->csock; + #ifdef USE_TRACE + struct timespec ts,ts_middle,ts_end; + ts = args->ts; + clock_gettime(Trace_Timer_ID,&ts_middle); + #endif + if((fd = read_request(csock,args->buf,args->bufsize)) > 0){ + send_response(csock,fd,args->buf,args->bufsize); + close(fd); + } + #ifdef USE_TRACE + clock_gettime(Trace_Timer_ID,&ts_end); + struct timespec tophalf = timespec_sub(ts_middle,ts); + struct timespec bottomhelf = timespec_sub(ts_end,ts_middle); + fprintf(stderr,"top : %ld ns, bottom : %ld ns\n",tophalf.tv_nsec,bottomhelf.tv_nsec); + #endif + if(close(csock) < 0) + perror("csock close error"); + destory_worker_argument(args); + return NULL; +} +#endif + static int sock; void safe_exit(){ @@ -331,7 +376,7 @@ int main(int argc, const char *argv[]){ socklen_t client_addr_len = sizeof(client_addr); int csock; int bufsize; - int i; + int i = 0; in_port_t binding_port_number = SERVER_PORT; if (argc > 1){ int d = parse_args(argc,argv,&binding_port_number); @@ -350,8 +395,9 @@ int main(int argc, const char *argv[]){ perror("setsockopt"); } } - init_shared_state(&globalState); bufsize = getBufferSizeFrom(sock); +#ifndef USE_NO_QUEUE + init_shared_state(&globalState); for (i = 0; i < MAX_THREAD_NUMBER; i++) { worker_argument_t * args = create_worker_argument(i,bufsize); if (args == NULL) { @@ -360,7 +406,7 @@ int main(int argc, const char *argv[]){ } pthread_create(&worker_threads[i],NULL,worker_proc,args); } - +#endif addr.sin_addr.s_addr = htonl(INADDR_ANY); /*0.0.0.0 모든 네트워크 인터페이스에 묶임.*/ addr.sin_family = AF_INET; addr.sin_port = htons(binding_port_number); @@ -389,27 +435,37 @@ int main(int argc, const char *argv[]){ struct timespec ts; clock_gettime(Trace_Timer_ID, &ts); #endif +#ifndef USE_NO_QUEUE for(;;){ pthread_mutex_lock(&globalState.sock_mutex); if (queue_isfull(&globalState.socks)){ pthread_mutex_unlock(&globalState.sock_mutex); -#ifdef _GNU_SOURCE + #ifdef _GNU_SOURCE pthread_yield(); -#else + #else usleep(400); -#endif + #endif continue; } else { enqueue(&globalState.socks,csock); -#ifdef USE_TRACE + #ifdef USE_TRACE enqueue(&globalState.trace_timer,ts); -#endif + #endif } break; } pthread_mutex_unlock(&globalState.sock_mutex); pthread_cond_signal(&globalState.ready); +#else + pthread_t thread_a; + worker_argument_t * args = create_worker_argument(i++,bufsize,csock); + #ifdef USE_TRACE + args->ts = ts; + #endif + pthread_create(&thread_a,NULL,worker_proc,args); + pthread_detach(thread_a); +#endif } return 1;