#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "socket_wrapper.h" #include "simple_circular_buffer.h" #include "timerhelper.h" #include "display_bar.h" #ifndef DEFAULT_MAX_LISTEN_SOCKET static const int MAX_LISTEN_SOCKET = 16; #else static const int MAX_LISTEN_SOCKET = DEFAULT_MAX_LISTEN_SOCKET; #endif #ifdef USE_SENDFILE #include #ifndef DEFAULT_SEND_FILE_CHUNK_SIZE const size_t SEND_FILE_CHUNK_SIZE = 0x100000; /*1MB*/ #else const size_t SEND_FILE_CHUNK_SIZE = DEFAULT_SEND_FILE_CHUNK_SIZE; /*1MB*/ #endif #endif #ifndef DEFAULT_SERVER_PORT static const in_port_t SERVER_PORT = 9091; #else static const in_port_t SERVER_PORT = DEFAULT_SERVER_PORT; #endif #ifndef DEFAULT_MAX_PATH_SIZE /*0 < x < MAX_PATH_SIZE*/ static const uint16_t MAX_PATH_SIZE = 256; #else static const uint16_t MAX_PATH_SIZE = DEFAULT_MAX_PATH_SIZE; #endif #ifndef DEFAULT_TIMEOUT static const int TIMEOUT = 5; #else static const int TIMEOUT = DEFAULT_TIMEOUT; #endif enum{ #ifndef DEFAULT_WORK_QUEUE_SIZE WORK_QUEUE_SIZE = 10, #else WORK_QUEUE_SIZE = DEFAULT_WORK_QUEUE_SIZE, #endif #ifndef DEFAULT_MAX_THREAD_NUMBER MAX_THREAD_NUMBER = 10 #else MAX_THREAD_NUMBER = DEFAULT_MAX_THREAD_NUMBER #endif }; #define USE_TRACE /*======== *Operation *========*/ /** * send user error message * 80 character limit * thread safe */ int send_fail(int sock,const char * msg){ struct TransferResult res; res.res = RES_USR_ERR; res.file_size = 0; res.err_number = 0; res.error_msg_size = strlen(msg); //os will be combining if tcp_autocorking emabled. if(send(sock,&res,sizeof(res),0) < 0){ myd_perror("error msg send"); return -1; } if (send(sock,msg,res.error_msg_size,0) < 0){ myd_perror("error msg send"); return -1; } return 0; } /** * send errno to client * thread safe */ int send_errno(int sock){ struct TransferResult r; r.res = RES_ERR; r.err_number = errno; r.file_size = 0; r.error_msg_size = 0; if(send(sock,&r,sizeof(r),0)){ myd_perror("errno send"); return -1; } return 0; } /** * return fd, if success. otherwise, return -1. * thread safe */ int read_request(int sock,uint8_t * buf,size_t bufsize){ struct ReadOp p; int fd; ssize_t n = recv_until_byte(sock,&p,sizeof(p),TIMEOUT); if (n < 0){ if (n == -2) fprintf(stderr,"timeout!"); else myd_perror("receive fail"); return -1; } if(bufsize <= ((size_t)p.file_url_size) + sizeof(p) + 1){ send_fail(sock,"buffer overflow"); return -1; } else if(p.file_url_size + 1 > MAX_PATH_SIZE){ send_fail(sock,"max path fail"); return -1; } else if(p.file_url_size == 0){ send_fail(sock,"filename zero fail"); return -1; } n = recv_until_byte(sock,buf,p.file_url_size,TIMEOUT); buf[p.file_url_size] = '\0'; //truncate if (isatty_file(stdout)){ lock_scrolled(); add_scrolled_unlocked(1); fprintf(stdout,"str size: %d, request %s\n",p.file_url_size,buf); unlock_scrolled(); } else fprintf(stdout,"str size: %d, request %s\n",p.file_url_size,buf); if(strchr((char *)buf,'/') != NULL){ send_fail(sock,"Illegal character /"); return -1; } fd = open((char *)buf,O_RDONLY); if(fd < 0){ send_errno(sock); close(fd); return -1; } return fd; } /** * send response to client * thread safe */ int send_response(int sock,int fd, uint8_t * buf, size_t bufsize){ struct TransferResult r; struct stat st; off_t offset = 0; ssize_t readed = 0; progress_bar_t pbar; r.res = RES_OK; r.err_number = 0; r.error_msg_size = 0; if(fstat(fd,&st) < 0){ return send_errno(sock); } if(S_ISDIR(st.st_mode)){ return send_fail(sock,"is a directory"); } r.file_size = st.st_size; if(send(sock,&r,sizeof(r),0)<0){ myd_perror("send fail"); return -1; } init_progress_bar(&pbar,10); #ifdef USE_SENDFILE while (r.file_size != offset) { size_t count = SEND_FILE_CHUNK_SIZE < (r.file_size - offset) ? SEND_FILE_CHUNK_SIZE : (r.file_size - offset); if((readed = sendfile(sock,fd,&offset,count)) < 0){ myd_perror("send file fail"); return -1; } DisplayProgressBar(&pbar,offset,r.file_size,"",false); } #else while (offset < r.file_size) { readed = bufsize < (r.file_size - offset) ? bufsize : r.file_size - offset; if(read(fd,buf,readed)<0){ myd_perror("send response read fail"); return -1; } if(send(sock,buf,readed,0)<0){ myd_perror("send response send fail"); return -1; } offset += readed; usleep(500000); DisplayProgressBar(&pbar,offset,r.file_size,"",false); } DisplayProgressBar(&pbar,offset,r.file_size,"",true); #endif return 0; } const char * help(const char * n){ const char * msg = "USASE : %s [Option] ...\n" "Options and arguments: \n" "-p port\t:set to port binding. couldn't set to 0\n" "-h\t:print help message.\n"; printf(msg,n); return msg; } /** return 0 ok. otherwise invalid format*/ int parse_args(int argc,const char * argv[] , in_port_t * port){ int pos = 1; const char * opt; while (pos < argc) { opt = argv[pos++]; if (strcmp(opt,"-h") == 0 || strcmp(opt,"--help") == 0) { help(argv[0]); return 0; } else if(strcmp(opt,"-p") == 0 || strcmp(opt,"--port") == 0){ if (pos < argc){ const char * value = argv[pos++]; *port = atoi(value); if (port == 0){ // either not number or zero fprintf(stderr,"argument is either not number or zero\n"); return 2; } } else{ fprintf(stderr,"need argument\n"); return 2; //failed to find argument. } } } return 0; } //============ //Simple Thread Pool //============ #ifdef USE_TRACE enum{ Trace_Timer_ID = CLOCK_REALTIME }; #endif #ifndef USE_NO_QUEUE typedef struct SharedState{ //empty if less than 0 queue_struct(int,WORK_QUEUE_SIZE) socks; #ifdef USE_TRACE queue_struct(struct timespec,WORK_QUEUE_SIZE) trace_timer; #endif pthread_mutex_t sock_mutex; pthread_cond_t ready; //int progress[MAX_THREAD_NUMBER]; } shared_state_t; void init_shared_state(shared_state_t * state) { queue_init(&state->socks); #ifdef USE_TRACE queue_init(&state->trace_timer); #endif pthread_mutex_init(&state->sock_mutex,NULL); pthread_cond_init(&state->ready,NULL); } #endif // typedef struct WorkerArgument { int id; int bufsize; uint8_t * buf; #ifdef USE_NO_QUEUE int csock; #ifdef USE_TRACE struct timespec ts; #endif #endif } worker_argument_t; __attribute_malloc__ worker_argument_t * create_worker_argument(int id, int bufsize #ifdef USE_NO_QUEUE , int csock #endif ){ worker_argument_t * ret = (worker_argument_t *)malloc(sizeof(worker_argument_t)); if (ret == NULL) return ret; ret->id = id; #ifdef USE_NO_QUEUE ret->csock = csock; #endif ret->bufsize = bufsize; ret->buf = (uint8_t *)malloc(sizeof(*ret->buf)*bufsize); if(ret->buf == NULL){ free(ret); ret = NULL; } return ret; } void destory_worker_argument(worker_argument_t * arg){ free(arg->buf); free(arg); } #ifndef USE_NO_QUEUE static shared_state_t globalState; void * worker_proc(void * data){ worker_argument_t * args = (worker_argument_t *)data; int fd, csock; #ifdef USE_TRACE struct timespec ts,ts_middle,ts_end; #endif for(;;){ pthread_mutex_lock(&globalState.sock_mutex); while (queue_isempty(&globalState.socks)){ pthread_cond_wait(&globalState.ready,&globalState.sock_mutex); } csock = dequeue(&globalState.socks); #ifdef USE_TRACE ts = dequeue(&globalState.trace_timer); #endif pthread_mutex_unlock(&globalState.sock_mutex); #ifdef USE_TRACE clock_gettime(Trace_Timer_ID,&ts_middle); #endif if((fd = read_request(csock,args->buf,args->bufsize)) > 0){ send_response(csock,fd,args->buf,args->bufsize); close(fd); } #ifdef USE_TRACE clock_gettime(Trace_Timer_ID,&ts_end); struct timespec tophalf = timespec_sub(ts_middle,ts); struct timespec bottomhelf = timespec_sub(ts_end,ts_middle); if(isatty_file(stderr)){ lock_scrolled(); add_scrolled_unlocked(1); fprintf(stderr,"top : %ld ns, bottom : %ld ns\n",tophalf.tv_nsec,bottomhelf.tv_nsec); unlock_scrolled(); } #endif if(close(csock) < 0) myd_perror("csock close error"); } destory_worker_argument(args); return NULL; } static pthread_t worker_threads[MAX_THREAD_NUMBER]; #else void * worker_proc(void * data){ worker_argument_t * args = (worker_argument_t *)data; int fd, csock; csock = args->csock; #ifdef USE_TRACE struct timespec ts,ts_middle,ts_end; ts = args->ts; clock_gettime(Trace_Timer_ID,&ts_middle); #endif if((fd = read_request(csock,args->buf,args->bufsize)) > 0){ send_response(csock,fd,args->buf,args->bufsize); close(fd); } #ifdef USE_TRACE clock_gettime(Trace_Timer_ID,&ts_end); struct timespec tophalf = timespec_sub(ts_middle,ts); struct timespec bottomhelf = timespec_sub(ts_end,ts_middle); if(isatty_file(stderr)){ lock_scrolled(); add_scrolled_unlocked(1); fprintf(stderr,"top : %ld ns, bottom : %ld ns\n",tophalf.tv_nsec,bottomhelf.tv_nsec); unlock_scrolled(); } else fprintf(stderr,"top : %ld ns, bottom : %ld ns\n",tophalf.tv_nsec,bottomhelf.tv_nsec); #endif if(close(csock) < 0) myd_perror("csock close error"); destory_worker_argument(args); return NULL; } #endif static int sock; void safe_exit(){ close(sock); } int main(int argc, const char *argv[]){ struct sockaddr_in addr; struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); int csock; int bufsize; int i = 0; in_port_t binding_port_number = SERVER_PORT; if (argc > 1){ int d = parse_args(argc,argv,&binding_port_number); if(d != 0 ) return d; } ready_progress_bar(); sock = socket(AF_INET,SOCK_STREAM,0); atexit(safe_exit); if(sock < 0){ myd_perror("sock create fail"); return 1; } else { int option = 1; if(setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&option,sizeof(option)) < 0){ myd_perror("setsockopt"); } } bufsize = getBufferSizeFrom(sock); #ifndef USE_NO_QUEUE init_shared_state(&globalState); for (i = 0; i < MAX_THREAD_NUMBER; i++) { worker_argument_t * args = create_worker_argument(i,bufsize); if (args == NULL) { fprintf(stderr,"malloc: lack of memory"); return 1; } pthread_create(&worker_threads[i],NULL,worker_proc,args); } #endif addr.sin_addr.s_addr = htonl(INADDR_ANY); /*0.0.0.0 모든 네트워크 인터페이스에 묶임.*/ addr.sin_family = AF_INET; addr.sin_port = htons(binding_port_number); if(bind(sock, (struct sockaddr *)&addr,sizeof(addr)) < 0){ myd_perror("bind failed"); return 1; } else { char ip_buf[INET_ADDRSTRLEN]; const char * msg = inet_ntop(AF_INET,&addr.sin_addr,ip_buf,sizeof(ip_buf)); assert(msg != NULL); if(isatty_file(stdout)){ lock_scrolled(); add_scrolled_unlocked(1); fprintf(stdout,"server bind on %s:%d\n",msg ,binding_port_number); unlock_scrolled(); } else fprintf(stdout,"server bind on %s:%d\n",msg ,binding_port_number); } if(listen(sock,MAX_LISTEN_SOCKET) < 0){ myd_perror("listen failed"); return 1; } while ((csock = accept(sock, (struct sockaddr *)&client_addr,&client_addr_len)) >= 0) { char ip_buf[INET_ADDRSTRLEN]; const char * msg = inet_ntop(AF_INET,&client_addr.sin_addr,ip_buf,sizeof(ip_buf)); if(isatty_file(stdout)){ lock_scrolled(); add_scrolled_unlocked(1); fprintf(stdout,"Connected on : %s:%d\n",msg == NULL ? "(null)" : msg , ntohs(addr.sin_port)); unlock_scrolled(); } else fprintf(stdout,"Connected on : %s:%d\n",msg == NULL ? "(null)" : msg , ntohs(addr.sin_port)); #ifdef USE_TRACE struct timespec ts; clock_gettime(Trace_Timer_ID, &ts); #endif #ifndef USE_NO_QUEUE for(;;){ pthread_mutex_lock(&globalState.sock_mutex); if (queue_isfull(&globalState.socks)){ pthread_mutex_unlock(&globalState.sock_mutex); #ifdef _GNU_SOURCE pthread_yield(); #else usleep(400); #endif continue; } else { enqueue(&globalState.socks,csock); #ifdef USE_TRACE enqueue(&globalState.trace_timer,ts); #endif } break; } pthread_mutex_unlock(&globalState.sock_mutex); pthread_cond_signal(&globalState.ready); #else pthread_t thread_a; worker_argument_t * args = create_worker_argument(i++,bufsize,csock); #ifdef USE_TRACE args->ts = ts; #endif pthread_create(&thread_a,NULL,worker_proc,args); pthread_detach(thread_a); #endif } return 1; }