add client thread

This commit is contained in:
ubuntu201711081 2020-12-06 09:50:13 +00:00
parent f04d3d86ea
commit 22fd0dfea3
9 changed files with 4079 additions and 46 deletions

View File

@ -1,5 +1,5 @@
CC = gcc
CFLAGS = -lm -Wall -O2
CFLAGS = -lm -Wall -O2 -pthread
ServerBin = server p-server
ClientBin = client p-client
Bin = $(ServerBin) $(ClientBin)
@ -13,7 +13,7 @@ socket_wrapper.o: socket_wrapper.c socket_wrapper.h
client: socket_wrapper.o client.c
$(CC) -o client client.c socket_wrapper.o $(CFLAGS)
server: socket_wrapper.o server.c
$(CC) -o server server.c socket_wrapper.o $(CFLAGS) -pthread
$(CC) -o server server.c socket_wrapper.o $(CFLAGS)
p-server: socket_wrapper.o p-server.c
$(CC) -o p-server p-server.c socket_wrapper.o $(CFLAGS)
p-client: socket_wrapper.o p-client.c

190
client.c
View File

@ -15,16 +15,9 @@
#include <time.h>
#include <sys/time.h>
#include <sys/times.h>
#include <pthread.h>
#include "socket_wrapper.h"
#ifdef USE_SENDFILE
#include <sys/sendfile.h>
#ifndef DEFAULT_SEND_FILE_CHUNK_SIZE
const size_t SEND_FILE_CHUNK_SIZE = 0x100000; /*1MB*/
#else
const size_t SEND_FILE_CHUNK_SIZE = DEFAULT_SEND_FILE_CHUNK_SIZE; /*1MB*/
#endif
#endif
#ifndef DEFAULT_TIMEOUT
static const int TIMEOUT = 5;
#else
@ -35,10 +28,14 @@ static const int PROGRESS_BAR_WIDTH = 30;
#else
static const int PROGRESS_BAR_WIDTH = DEFAULT_PROGRESS_BAR_WIDTH;
#endif
enum{
FILENAME_BUF_SIZE = 1024
};
/*========
*Operation
*========*/
//if success, return zero
int sendReadOp(int sock,const char * filename){
struct ReadOp op;
op.file_url_size = strlen(filename);
@ -169,6 +166,29 @@ int recvData(int sock,const char * filename){
return 0;
}
int SendOpAndReceiveFile(const char * filename, struct sockaddr const * addr){
int sock;
int ret = -1;
fprintf(stdout,"request %s\n",filename);
sock = socket(AF_INET,SOCK_STREAM,0);
if(sock < 0){
perror("sock create fail");
return -1;
}
if(connect(sock,(struct sockaddr *)addr,sizeof(*addr)) < 0){
perror("connect failed");
return -1;
}
if(sendReadOp(sock,filename) == 0){
ret = recvData(sock,filename);
}
close(sock);
return ret;
}
struct benchmark_data{
bool benchmode;
struct timespec begin;
@ -199,7 +219,82 @@ static inline struct timespec timespec_sub(struct timespec a,struct timespec b){
return ret;
}
static char filename_buf[1024];
static size_t thread_number_option = 1;
typedef enum{
From_CharArray,
From_FileStream
} queueing_method_t;
struct SimpleThreadGlobal{
size_t thread_arr_size;
pthread_t * thread_arr;
pthread_mutex_t queueing_mutex;
queueing_method_t method;
const char ** filename_begin;
const char ** filename_end;
} global_state;
typedef struct SimpleThreadReturn{
int retval;
int op_count;
} worker_return_t;
__attribute_malloc__ worker_return_t * create_worker_return(){
return (worker_return_t *)malloc(sizeof(worker_return_t));
}
void destroy_worker_return(worker_return_t * r){
free(r);
}
typedef struct SimpleThreadArg{
struct sockaddr addr;
} worker_arg_t;
__attribute_malloc__ worker_arg_t * create_thread_arg(struct sockaddr * r){
worker_arg_t * ret = (worker_arg_t *)malloc(sizeof(*ret));
memcpy(&ret->addr,r,sizeof(*r));
return ret;
}
void destroy_thread_arg(worker_arg_t * arg){
free(arg);
}
void * WorkerProc(void * args){
worker_arg_t * arg = (worker_arg_t *)(args);
worker_return_t * ret = create_worker_return();
char filename_buf[FILENAME_BUF_SIZE];
const char * filename;
for(;;)
{
pthread_mutex_lock(&global_state.queueing_mutex);
if (global_state.method == From_CharArray){
if (__glibc_unlikely(global_state.filename_begin == global_state.filename_end)) {
pthread_mutex_unlock(&global_state.queueing_mutex);
break;
}
filename = *(global_state.filename_begin++);
}
else{
//unsafe.
int t = fscanf(stdin,"%s",filename_buf);
if (__glibc_unlikely(t != 1)) {
pthread_mutex_unlock(&global_state.queueing_mutex);
break;
}
filename = filename_buf;
}
pthread_mutex_unlock(&global_state.queueing_mutex);
fprintf(stdout,"request %s\n",filename);
ret->retval += SendOpAndReceiveFile(filename,&arg->addr);
ret->op_count++;
}
destroy_thread_arg(arg);
return ret;
}
static bool stdinisatty;
int main(int argc, const char *argv[]){
@ -208,8 +303,9 @@ int main(int argc, const char *argv[]){
const char * server_name;
in_port_t server_port = 0;
int arg_filename_start = 3;
int sock, err;
int err;
int retval = 0;
init_bench_data();
stdinisatty = isatty(STDIN_FILENO);
@ -229,6 +325,18 @@ int main(int argc, const char *argv[]){
arg_filename_start++;
DisplayProgress = false;
}
else if(strcmp("-t",argv[arg_filename_start]) == 0 || strcmp("--thread",argv[arg_filename_start]) == 0){
arg_filename_start++;
if (arg_filename_start >= argc){
fprintf(stderr,"need number");
return -2;
}
thread_number_option = atoi(argv[arg_filename_start++]);
if(thread_number_option == 0){
fprintf(stderr,"not number or zero");
return -2;
}
}
else break;
}
if (server_port == 0){
@ -254,35 +362,43 @@ int main(int argc, const char *argv[]){
if (bench.benchmode){
clock_gettime(bench.clock_id,&bench.begin);
}
for (;;){
if (stdinisatty){
if (arg_filename_start >= argc) break;
filename = argv[arg_filename_start++];
if(thread_number_option == 1){
char filename_buf[FILENAME_BUF_SIZE];
for (;;){
if (stdinisatty){
if (arg_filename_start >= argc) break;
filename = argv[arg_filename_start++];
}
else{
//unsafe.
int t = fscanf(stdin,"%s",filename_buf);
if (t != 1) break;
filename = filename_buf;
}
fprintf(stdout,"request %s\n",filename);
retval += SendOpAndReceiveFile(filename,(struct sockaddr *)&addr);
bench.op_count++;
}
else{
//unsafe.
int t = fscanf(stdin,"%s",filename_buf);
if (t != 1) break;
filename = filename_buf;
}
else{
int i = 0;
global_state.method = stdinisatty ? From_CharArray : From_FileStream;
global_state.filename_begin = &argv[arg_filename_start];
global_state.filename_end = &argv[argc];
pthread_mutex_init(&global_state.queueing_mutex,NULL);
global_state.thread_arr_size = thread_number_option;
global_state.thread_arr = (pthread_t *)malloc(sizeof(*global_state.thread_arr) * global_state.thread_arr_size);
for (i = 0; i < global_state.thread_arr_size; i++){
worker_arg_t * arg = create_thread_arg((struct sockaddr *)&addr);
pthread_create(&global_state.thread_arr[i],NULL,WorkerProc,arg);
}
fprintf(stdout,"request %s\n",filename);
sock = socket(AF_INET,SOCK_STREAM,0);
if(sock < 0){
perror("sock create fail");
return 1;
for (i = 0; i < global_state.thread_arr_size; i++){
worker_return_t * ret;
pthread_join(global_state.thread_arr[i],(void **)&ret);
bench.op_count += ret->op_count;
retval += ret->retval;
destroy_worker_return(ret);
}
if(connect(sock,(struct sockaddr *)&addr,sizeof(addr)) < 0){
perror("connect failed");
return 1;
}
if(sendReadOp(sock,filename) == 0){
int ret = recvData(sock,filename);
retval += ret;
}
close(sock);
bench.op_count++;
}
if (bench.benchmode){
struct timespec result;

View File

@ -1,3 +0,0 @@
#! /bin/bash
cd client_test
./p-slowclient localhost 9091 test.txt & ./p-client localhost 9091 test.txt & ./p-client localhost 9091 test.txt

View File

@ -257,7 +257,7 @@ typedef struct WorkerArgument
} worker_argument_t;
__attribute__((malloc)) worker_argument_t * create_worker_argument(int id, int bufsize){
__attribute_malloc__ worker_argument_t * create_worker_argument(int id, int bufsize){
worker_argument_t * ret = (worker_argument_t *)malloc(sizeof(worker_argument_t));
if (ret == NULL) return ret;
ret->id = id;

16
test.sh
View File

@ -14,13 +14,15 @@ cd testdata
server_pid=$!
sleep 1
cd ../tmp
../client localhost 9091 test.txt
../client localhost 9091 list.txt && \
diff list.txt ../testdata/list.txt
do_test "normal"
../client localhost 9091 notexistfile.txt
do_test "notexistfile"
echo test.txt | ../client localhost 9091
echo list.txt | ../client localhost 9091&& \
diff test.txt ../testdata/test.txt
do_test "pipeinput"
../slowclient localhost 9091 test.txt &
@ -40,6 +42,14 @@ if [ $return_code1 -eq 0 -a $return_code2 -eq 0 -a $return_code3 -eq 0 ];then
else
echo -e "multiconnection test \e[91m[fail\e[0m"
fi
rm *.txt
../client localhost 9091 -t 2 test.txt bootstrap.js react.js lorem.txt && \
diff test.txt ../testdata/test.txt && \
diff bootstrap.js ../testdata/bootstrap.js && \
diff react.js ../testdata/react.js && \
diff lorem.txt ../testdata/lorem.txt
do_test "thread_normal"
rm *
echo turn off server :$server_pid
kill $server_pid

3872
testdata/bootstrap.css vendored Normal file

File diff suppressed because it is too large Load Diff

7
testdata/bootstrap.js vendored Normal file

File diff suppressed because one or more lines are too long

4
testdata/lorem.txt vendored Normal file
View File

@ -0,0 +1,4 @@
Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua.
Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.
Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.
Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.

27
testdata/react.js vendored Normal file

File diff suppressed because one or more lines are too long