diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-18 15:32:05 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-18 15:32:05 -0400 |
commit | bc24590991cb27e8bd220fd6d0585e76f804601d (patch) | |
tree | 8a982cf0144dd8366aca84cf8027387403f51af2 /snowcast_server_concurrent.c | |
parent | 5236560176cfe8e4d06be4812719037937b7f4dc (diff) |
good progress. basic num_station data going & listener udp port works
Diffstat (limited to 'snowcast_server_concurrent.c')
-rw-r--r-- | snowcast_server_concurrent.c | 181 |
1 files changed, 135 insertions, 46 deletions
diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c index 903b3fd..e09e398 100644 --- a/snowcast_server_concurrent.c +++ b/snowcast_server_concurrent.c @@ -10,9 +10,14 @@ #include "protocol.h" +#define NUM_STATIONS 2 +#define LINE_MAX 1024 +#define MAX_USERS 1000 +#define MAX_PATH 50 + typedef struct station { - char* filePath; int currentChunk; + char* filePath; } station_t; typedef struct user { @@ -22,9 +27,6 @@ typedef struct user { pthread_t streamThread; } user_t; -#define NUM_STATIONS 2 -#define LINE_MAX 1024 -#define MAX_USERS 1000 /* For safe condition variable usage, must use a boolean predicate and */ /* a mutex with the condition. */ @@ -32,18 +34,18 @@ int count = 0; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +const char *port; int start_threads = 0; - int max_active_users = 0; pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; // array from index to user_data user_t *user_data; -station_t station_data[NUM_STATIONS]; -int sockfd_to_user[MAX_USERS + 4]; +int sockfd_to_user[MAX_USERS]; -char* port = "4950"; +// stations array pointer +station_t *station_data; void *send_udp_packet_routine(void* arg); void *select_thread(void* arg); @@ -62,27 +64,49 @@ void destroy_user(int sockfd); main(int argc, char *argv[]) { - // temporary - station_data[0] = (station_t) {"mp3/Beethoven-SymphonyNo5.mp3", 0}; - station_data[1] = (station_t) {"mp3/DukeEllington-Caravan.mp3", 0}; - // threads to control reading files at chunks while the other threads sleep + // station_data = malloc(sizeof(station_t) * NUM_STATIONS); + // check and assign arguments + if (argc < 3) { + fprintf(stderr,"usage: ./snowcast_server <listen port> <file0> [file 1] [file 2] ... \n"); + exit(1); + } + + port = argv[1]; + + // init stations + size_t totalSize = 0; + // get size to malloc + for (int i = 2; i < argc; i++) + { + printf("file: %s\n", argv[i]); + totalSize += sizeof(int) + strlen(argv[i]); + } + station_data = malloc(totalSize); + // assign the stations + for (int i = 2; i < argc; i++) + { + station_data[i - 2] = (station_t) { 0, argv[i]}; + } + + // print all indexes in station data + for (int i = 0; i < NUM_STATIONS; i++) + { + printf("station %d: %s\n", i, station_data[i].filePath); + } + // make array of user data user_data = malloc(sizeof(user_t) * max_active_users); if (!user_data) { perror("malloc"); return 1; } - // thread that manages file descriptors - pthread_t s_thread, sync_thread; + // make and start "select" thread that manages: + // 1) new connections, 2) requests from current connections, 3)cloing connections + pthread_t s_thread; pthread_create(&s_thread, NULL, select_thread, NULL); - // starts the threads after created - // sleep(1); - // startThreads = 0; - // pthread_cond_broadcast(&cond); - // command line interface char input[LINE_MAX]; - while (1) { + while (1) { char *line = fgets(input, LINE_MAX, stdin); if (line == NULL) { @@ -98,6 +122,7 @@ main(int argc, char *argv[]) } } else if (strncmp("s\n", input, LINE_MAX) == 0) { // start the streaming threads + pthread_t sync_thread; pthread_create(&sync_thread, NULL, synchronization_thread, NULL); } } @@ -107,33 +132,95 @@ main(int argc, char *argv[]) /* Make the manager routine */ void *send_udp_packet_routine(void *arg) { - pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + // unpack args + int user_index = (int) arg; + printf("thread : user_index: %d\n", user_index); + // print user data + print_user_data(user_index); + // declare vairables to be used int did_work = 1; + pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + int s; + int udp_sockfd; + struct addrinfo thread_hints, *thread_res, *thread_servinfo; + int error_code; + + // TODO: add error checking on these calls*** + + // setup hints + memset(&thread_hints, 0, sizeof thread_hints); + thread_hints.ai_family = AF_INET; // use IPv4 only + thread_hints.ai_socktype = SOCK_DGRAM; + thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me + + // setup the socket for client listener DATAGRAM (udp) + // cover the port integer to a string + int int_port = user_data[user_index].udpPort; + int length = snprintf( NULL, 0, "%d", int_port ); + char* port = malloc( length + 1 ); + snprintf( port, length + 1, "%d", int_port ); + sprintf(port, "%d", int_port); + + if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) + { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); + return 1; + } + free(port); - int * i = (int *) arg; - while (1) + // loop through all the results and make a socket + for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { + if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype, + thread_res->ai_protocol)) == -1) { + perror("talker: socket"); + continue; + } + break; + } + if (udp_sockfd == NULL) { + fprintf(stderr, "talker: failed to create socket\n"); + return (NULL); + } + + // bind(udp_sockfd, thread_res->ai_addr, thread_res->ai_addrlen); + + + // freeaddrinfo(thread_servinfo); + + while (1) { + // wait for + pthread_mutex_lock(&m); + did_work = 0; + while (!start_threads) { - // wait for - pthread_mutex_lock(&m); - did_work = 0; - while (!start_threads) - { - pthread_cond_wait(&cond, &m); - } + pthread_cond_wait(&cond, &m); + } + int station_num = user_data[user_index].stationNum; + if (station_num == -1) { + did_work = 1; + } - if (!did_work) { - printf("send data: thread %d \n", i); - printf("load data: thread %d \n", i); - did_work = 1; + if (!did_work) { + // sendto a random string of data to the user + int station_num = user_data[user_index].stationNum - 1; + char *data = station_data[station_num].filePath; + printf("load data: thread %d \n", user_index); + int numbytes; + if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0, + thread_res->ai_addr, thread_res->ai_addrlen)) == -1) { + perror("talker: sendto"); + return (NULL); } - pthread_mutex_unlock(&m); + printf("send data: thread %d \n", user_index); - usleep(500000); - // pthread_mutex_lock(&mutex); - // start_threads = 0; - // pthread_mutex_unlock(&mutex); + did_work = 1; } + + pthread_mutex_unlock(&m); + + usleep(500000); + } return NULL; } @@ -223,7 +310,7 @@ void *select_thread(void *arg) { // keep track of the biggest file descriptor fdmax = listener; // so far, it's this one - while(1==1) { + while(1) { read_fds = master; // copy it if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { perror("select"); @@ -253,7 +340,6 @@ void *select_thread(void *arg) { get_in_addr((struct sockaddr*)&remoteaddr), remoteIP, INET6_ADDRSTRLEN), newfd); - // init user with this newfd init_user(newfd); @@ -334,17 +420,20 @@ void *init_user(int sockfd) { if (!more_users) { perror("realloc"); exit(1); } user_data = more_users; } - // map sockfd to this user index & create its stream thread - pthread_t user_thread; - pthread_create(&user_thread, NULL, send_udp_packet_routine, (void *)user_index); - user_data[user_index] = (user_t){-1, -1, sockfd, user_thread}; + // map TCP sockfd to this user index + user_data[user_index] = (user_t){-1, -1, sockfd, -1}; sockfd_to_user[sockfd] = user_index; // free(user_stream_threads); pthread_mutex_unlock(&mutex_user_data); } void *update_user_udpPort(int sockfd, int udpPort) { pthread_mutex_lock(&mutex_user_data); - user_data[sockfd_to_user[sockfd]].udpPort = udpPort; + // get the user + user_t *user = &user_data[sockfd_to_user[sockfd]]; + // set the udpPort + user->udpPort = udpPort; + // start the stream thread, now that we have the udpPort + pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); pthread_mutex_unlock(&mutex_user_data); } void *update_user_station(int sockfd, int stationNum) { |