From bc24590991cb27e8bd220fd6d0585e76f804601d Mon Sep 17 00:00:00 2001 From: sotech117 Date: Mon, 18 Sep 2023 15:32:05 -0400 Subject: good progress. basic num_station data going & listener udp port works --- Makefile | 4 +- c | Bin 0 -> 34446 bytes l | Bin 0 -> 34254 bytes listener.c | 50 +++++++----- s | Bin 0 -> 52974 bytes snowcast_server_concurrent.c | 181 ++++++++++++++++++++++++++++++++----------- test | Bin 52753 -> 52753 bytes 7 files changed, 169 insertions(+), 66 deletions(-) create mode 100755 c create mode 100755 l create mode 100755 s diff --git a/Makefile b/Makefile index 1603f38..c47fa52 100644 --- a/Makefile +++ b/Makefile @@ -13,4 +13,6 @@ client: client.c $(CC) $(CFLAGS) -o snowcast_control client.c new: - $(CC) $(CFLAGS) -o test snowcast_server_concurrent.c \ No newline at end of file + $(CC) $(CFLAGS) -o s snowcast_server_concurrent.c + $(CC) $(CFLAGS) -o l listener.c + $(CC) $(CFLAGS) -o c client.c \ No newline at end of file diff --git a/c b/c new file mode 100755 index 0000000..fdfd8d6 Binary files /dev/null and b/c differ diff --git a/l b/l new file mode 100755 index 0000000..6eaa41d Binary files /dev/null and b/l differ diff --git a/listener.c b/listener.c index 723cb1b..2d46307 100644 --- a/listener.c +++ b/listener.c @@ -13,9 +13,9 @@ #include #include -#define MYPORT "4950" // the port users will be connecting to +// #define MYPORT "4950" // the port users will be connecting to -#define MAXBUFLEN 100 +#define MAXBUFLEN 16384 // get sockaddr, IPv4 or IPv6: void *get_in_addr(struct sockaddr *sa) @@ -27,7 +27,7 @@ void *get_in_addr(struct sockaddr *sa) return &(((struct sockaddr_in6*)sa)->sin6_addr); } -int main(void) +int main(int argc, char *argv[]) { int sockfd; struct addrinfo hints, *servinfo, *p; @@ -38,12 +38,20 @@ int main(void) socklen_t addr_len; char s[INET6_ADDRSTRLEN]; + if (argc != 2) { + fprintf(stderr,"\n"); + exit(1); + } + + const char* udp_port = argv[1]; + + memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET6; // set to AF_INET to use IPv4 + hints.ai_family = AF_INET; // set to AF_INET to use IPv4 hints.ai_socktype = SOCK_DGRAM; hints.ai_flags = AI_PASSIVE; // use my IP - if ((rv = getaddrinfo(NULL, MYPORT, &hints, &servinfo)) != 0) { + if ((rv = getaddrinfo(NULL, udp_port, &hints, &servinfo)) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); return 1; } @@ -72,22 +80,26 @@ int main(void) freeaddrinfo(servinfo); - printf("listener: waiting to recvfrom...\n"); + int count = 0; - addr_len = sizeof their_addr; - if ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN-1 , 0, - (struct sockaddr *)&their_addr, &addr_len)) == -1) { - perror("recvfrom"); - exit(1); - } + while(1) { + printf("\nlistener: waiting to recvfrom... %d times\n", count++); - printf("listener: got packet from %s\n", - inet_ntop(their_addr.ss_family, - get_in_addr((struct sockaddr *)&their_addr), - s, sizeof s)); - printf("listener: packet is %d bytes long\n", numbytes); - buf[numbytes] = '\0'; - printf("listener: packet contains \"%s\"\n", buf); + addr_len = sizeof their_addr; + if ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN , 0, + (struct sockaddr *)&their_addr, &addr_len)) == -1) { + perror("recvfrom"); + exit(1); + } + + printf("listener: got packet from %s\n", + inet_ntop(their_addr.ss_family, + get_in_addr((struct sockaddr *)&their_addr), + s, sizeof s)); + printf("listener: packet is %d bytes long\n", numbytes); + buf[numbytes] = '\0'; + printf("listener: packet contains \"%s\"\n", buf); + } close(sockfd); diff --git a/s b/s new file mode 100755 index 0000000..05d1ffc Binary files /dev/null and b/s differ diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c index 903b3fd..e09e398 100644 --- a/snowcast_server_concurrent.c +++ b/snowcast_server_concurrent.c @@ -10,9 +10,14 @@ #include "protocol.h" +#define NUM_STATIONS 2 +#define LINE_MAX 1024 +#define MAX_USERS 1000 +#define MAX_PATH 50 + typedef struct station { - char* filePath; int currentChunk; + char* filePath; } station_t; typedef struct user { @@ -22,9 +27,6 @@ typedef struct user { pthread_t streamThread; } user_t; -#define NUM_STATIONS 2 -#define LINE_MAX 1024 -#define MAX_USERS 1000 /* For safe condition variable usage, must use a boolean predicate and */ /* a mutex with the condition. */ @@ -32,18 +34,18 @@ int count = 0; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +const char *port; int start_threads = 0; - int max_active_users = 0; pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; // array from index to user_data user_t *user_data; -station_t station_data[NUM_STATIONS]; -int sockfd_to_user[MAX_USERS + 4]; +int sockfd_to_user[MAX_USERS]; -char* port = "4950"; +// stations array pointer +station_t *station_data; void *send_udp_packet_routine(void* arg); void *select_thread(void* arg); @@ -62,27 +64,49 @@ void destroy_user(int sockfd); main(int argc, char *argv[]) { - // temporary - station_data[0] = (station_t) {"mp3/Beethoven-SymphonyNo5.mp3", 0}; - station_data[1] = (station_t) {"mp3/DukeEllington-Caravan.mp3", 0}; - // threads to control reading files at chunks while the other threads sleep + // station_data = malloc(sizeof(station_t) * NUM_STATIONS); + // check and assign arguments + if (argc < 3) { + fprintf(stderr,"usage: ./snowcast_server [file 1] [file 2] ... \n"); + exit(1); + } + + port = argv[1]; + + // init stations + size_t totalSize = 0; + // get size to malloc + for (int i = 2; i < argc; i++) + { + printf("file: %s\n", argv[i]); + totalSize += sizeof(int) + strlen(argv[i]); + } + station_data = malloc(totalSize); + // assign the stations + for (int i = 2; i < argc; i++) + { + station_data[i - 2] = (station_t) { 0, argv[i]}; + } + + // print all indexes in station data + for (int i = 0; i < NUM_STATIONS; i++) + { + printf("station %d: %s\n", i, station_data[i].filePath); + } + // make array of user data user_data = malloc(sizeof(user_t) * max_active_users); if (!user_data) { perror("malloc"); return 1; } - // thread that manages file descriptors - pthread_t s_thread, sync_thread; + // make and start "select" thread that manages: + // 1) new connections, 2) requests from current connections, 3)cloing connections + pthread_t s_thread; pthread_create(&s_thread, NULL, select_thread, NULL); - // starts the threads after created - // sleep(1); - // startThreads = 0; - // pthread_cond_broadcast(&cond); - // command line interface char input[LINE_MAX]; - while (1) { + while (1) { char *line = fgets(input, LINE_MAX, stdin); if (line == NULL) { @@ -98,6 +122,7 @@ main(int argc, char *argv[]) } } else if (strncmp("s\n", input, LINE_MAX) == 0) { // start the streaming threads + pthread_t sync_thread; pthread_create(&sync_thread, NULL, synchronization_thread, NULL); } } @@ -107,33 +132,95 @@ main(int argc, char *argv[]) /* Make the manager routine */ void *send_udp_packet_routine(void *arg) { - pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + // unpack args + int user_index = (int) arg; + printf("thread : user_index: %d\n", user_index); + // print user data + print_user_data(user_index); + // declare vairables to be used int did_work = 1; + pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + int s; + int udp_sockfd; + struct addrinfo thread_hints, *thread_res, *thread_servinfo; + int error_code; + + // TODO: add error checking on these calls*** + + // setup hints + memset(&thread_hints, 0, sizeof thread_hints); + thread_hints.ai_family = AF_INET; // use IPv4 only + thread_hints.ai_socktype = SOCK_DGRAM; + thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me + + // setup the socket for client listener DATAGRAM (udp) + // cover the port integer to a string + int int_port = user_data[user_index].udpPort; + int length = snprintf( NULL, 0, "%d", int_port ); + char* port = malloc( length + 1 ); + snprintf( port, length + 1, "%d", int_port ); + sprintf(port, "%d", int_port); + + if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) + { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); + return 1; + } + free(port); - int * i = (int *) arg; - while (1) + // loop through all the results and make a socket + for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { + if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype, + thread_res->ai_protocol)) == -1) { + perror("talker: socket"); + continue; + } + break; + } + if (udp_sockfd == NULL) { + fprintf(stderr, "talker: failed to create socket\n"); + return (NULL); + } + + // bind(udp_sockfd, thread_res->ai_addr, thread_res->ai_addrlen); + + + // freeaddrinfo(thread_servinfo); + + while (1) { + // wait for + pthread_mutex_lock(&m); + did_work = 0; + while (!start_threads) { - // wait for - pthread_mutex_lock(&m); - did_work = 0; - while (!start_threads) - { - pthread_cond_wait(&cond, &m); - } + pthread_cond_wait(&cond, &m); + } + int station_num = user_data[user_index].stationNum; + if (station_num == -1) { + did_work = 1; + } - if (!did_work) { - printf("send data: thread %d \n", i); - printf("load data: thread %d \n", i); - did_work = 1; + if (!did_work) { + // sendto a random string of data to the user + int station_num = user_data[user_index].stationNum - 1; + char *data = station_data[station_num].filePath; + printf("load data: thread %d \n", user_index); + int numbytes; + if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0, + thread_res->ai_addr, thread_res->ai_addrlen)) == -1) { + perror("talker: sendto"); + return (NULL); } - pthread_mutex_unlock(&m); + printf("send data: thread %d \n", user_index); - usleep(500000); - // pthread_mutex_lock(&mutex); - // start_threads = 0; - // pthread_mutex_unlock(&mutex); + did_work = 1; } + + pthread_mutex_unlock(&m); + + usleep(500000); + } return NULL; } @@ -223,7 +310,7 @@ void *select_thread(void *arg) { // keep track of the biggest file descriptor fdmax = listener; // so far, it's this one - while(1==1) { + while(1) { read_fds = master; // copy it if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { perror("select"); @@ -253,7 +340,6 @@ void *select_thread(void *arg) { get_in_addr((struct sockaddr*)&remoteaddr), remoteIP, INET6_ADDRSTRLEN), newfd); - // init user with this newfd init_user(newfd); @@ -334,17 +420,20 @@ void *init_user(int sockfd) { if (!more_users) { perror("realloc"); exit(1); } user_data = more_users; } - // map sockfd to this user index & create its stream thread - pthread_t user_thread; - pthread_create(&user_thread, NULL, send_udp_packet_routine, (void *)user_index); - user_data[user_index] = (user_t){-1, -1, sockfd, user_thread}; + // map TCP sockfd to this user index + user_data[user_index] = (user_t){-1, -1, sockfd, -1}; sockfd_to_user[sockfd] = user_index; // free(user_stream_threads); pthread_mutex_unlock(&mutex_user_data); } void *update_user_udpPort(int sockfd, int udpPort) { pthread_mutex_lock(&mutex_user_data); - user_data[sockfd_to_user[sockfd]].udpPort = udpPort; + // get the user + user_t *user = &user_data[sockfd_to_user[sockfd]]; + // set the udpPort + user->udpPort = udpPort; + // start the stream thread, now that we have the udpPort + pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); pthread_mutex_unlock(&mutex_user_data); } void *update_user_station(int sockfd, int stationNum) { diff --git a/test b/test index 5c87dd6..a576c5e 100755 Binary files a/test and b/test differ -- cgit v1.2.3-70-g09d2