diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-25 13:11:59 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-25 13:11:59 -0400 |
commit | 150ff61c978b7ae381d1a71953c92bf4dd5c3571 (patch) | |
tree | f24a4cf4ebbf312ca15bbd04d77386c8d2ea1329 | |
parent | 0e34f9ce017d0a9afc2b2354854a0e76019a86ec (diff) |
big refactoring code. some issues with udp stream now, though. will need to fix up
-rw-r--r-- | client.c | 10 | ||||
-rw-r--r-- | protocol.h | 10 | ||||
-rw-r--r-- | server.c | 410 | ||||
-rwxr-xr-x | snowcast_control | bin | 35277 -> 18848 bytes | |||
-rwxr-xr-x | snowcast_listener | bin | 34270 -> 13656 bytes | |||
-rwxr-xr-x | snowcast_server | bin | 71644 -> 37800 bytes |
6 files changed, 234 insertions, 196 deletions
@@ -21,7 +21,7 @@ void *command_line_routine(void *args); -int handle_handshake(int sockfd, char* udpPort); +u_int16_t handle_handshake(int sockfd, char* udpPort); int station_is_set = 0; int l = 0; @@ -272,7 +272,7 @@ void *command_line_routine(void* args) { // convert input to an int int inputInt = atoi(input); if (input[0] != '0' && inputInt == 0) { - printf("unknown command: %s\n", input); + printf("unknown command: %si", input); printf("snowcast_control> "); fflush(stdout); continue; @@ -300,8 +300,8 @@ void *command_line_routine(void* args) { return (NULL); } -int handle_handshake(int sockfd, char* udp_port) { - apply_timeout(sockfd); +uint16_t handle_handshake(int sockfd, char* udp_port) { + apply_timeout(sockfd); // apply timeout for handshake if (l) printf("found server, sending HELLO command. waiting for ANNOUNCE reply.\n"); @@ -333,7 +333,7 @@ int handle_handshake(int sockfd, char* udp_port) { if (l) printf("received ANNOUNCE reply.\n"); + // remove timeout since we no longer are "waiting" for an immediate reply remove_timeout(sockfd); - return ntohs(num_stations); }
\ No newline at end of file @@ -1,5 +1,15 @@ #include <stdint.h> // Provides uint8_t, int8_t, etc. +#define HELLO 0 +#define SETSTATION 1 +#define WELCOME 2 +#define ANNOUNCE 3 +#define INVALID 4 + +#define LISTSTATIONS 5 +#define STATIONINFO 6 + + // client to server messages (commands) struct Hello { @@ -73,19 +73,17 @@ struct udp_packet_routine_args { }; void *send_udp_packet_routine(void* arg); -void *select_routine(void* arg); -void *sync_routine(void* arg); +void setup_listener(); +void *select_routine(void *arg); void *send_announce_routine(void* arg); -void init_station(int station_num, const char *station_name); - int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); void *print_info_routine(void *arg); void *get_in_addr(struct sockaddr *sa); -void *init_user(int sockfd); -void *update_user_udpPort(int sockfd, int udpPort); +void *init_user_routine(int sockfd, int udp_port); +// void *update_user_udpPort(int sockfd, int udpPort); void *update_user_station(int sockfd, int stationNum); void *print_user_data(int sockfd); void *print_station_data(int station); @@ -95,7 +93,7 @@ void send_announce_reply(int fd, int station_num); void send_invalid_reply(int fd, size_t message_size, char* message); void *send_stationsinfo_reply(void *arg); int send_welcome_reply(int fd); -void handle_setstation_command(int fd); +int handle_setstation_command(int fd); void add_station(char *station_name); void destroy_station(int station_num); @@ -131,6 +129,9 @@ main(int argc, char *argv[]) sockfd_to_user = malloc(sizeof(int) * max_active_users); if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; } + // setup the listener + setup_listener(); + // make and start "select" thread that manages: // 1) new connections, 2) requests from current connections, 3) closing connections pthread_t select_thread; @@ -162,7 +163,6 @@ main(int argc, char *argv[]) // if q, shutdown! if (!strcmp(command, "q")) { printf("Exiting.\n"); - // TODO: exit better than break cleanup_fds(); break; } @@ -406,63 +406,81 @@ void udp_port_cleanup_handler(void *arg) close(sockfd); } -/* Make the manager routine */ -void *send_udp_packet_routine(void *arg) { - // printf("send udp packet routine\n"); - int *buf = arg; - // unpack args - int user_index = buf[0]; - int buffer_size = buf[1]; - char *file_buffer = malloc(buffer_size); - memcpy(file_buffer, buf+2, buffer_size); - - // printf("udp packet routine, user:%d\n size: %d\n", user_index, buffer_size); - - // declare vairables to be used - int did_work = 1; - pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; - int s; - int udp_sockfd; +int setup_udp_connection(int udp_port, int *udp_sockfd, struct addrinfo* res) { + // setup hints to get udp_port struct addrinfo thread_hints, *thread_res, *thread_servinfo; - int error_code; - - // setup hints memset(&thread_hints, 0, sizeof thread_hints); thread_hints.ai_family = AF_INET; // use IPv4 only - thread_hints.ai_socktype = SOCK_DGRAM; + thread_hints.ai_socktype = SOCK_DGRAM; // udp thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me - int int_port = user_data[user_index].udpPort; - int length = snprintf( NULL, 0, "%d", int_port ); + // convert port uint16 into a string for getaddrinfo + int length = snprintf( NULL, 0, "%d", udp_port ); char* port = malloc( length + 1 ); if (!port) { perror("malloc on port"); return (NULL); } - snprintf(port, length + 1, "%d", int_port); - sprintf(port, "%d", int_port); + snprintf(port, length + 1, "%d", udp_port); + sprintf(port, "%d", udp_port); + // resolve host + int error_code; if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); - return (NULL); + return 0; } free(port); - // loop through all the results and make a socket + // make the socket + int sock; for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { - if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype, + if ((sock = socket(thread_res->ai_family, thread_res->ai_socktype, thread_res->ai_protocol)) == -1) { - perror("talker: socket"); + perror("socket in setup_udp_connection"); continue; } break; } - if (udp_sockfd == NULL) { - fprintf(stderr, "talker: failed to create socket\n"); - return (NULL); + if (sock == NULL) { + fprintf(stderr, "socket: failed to create udp socket(setup_udp_connection)\n"); + return 0; } + + *udp_sockfd = sock; + // only these two properties of addrinfo are needed to send udp + *res->ai_addr = *thread_res->ai_addr; + res->ai_addrlen = thread_res->ai_addrlen; + return 1; +} + +/* Make the manager routine */ +void *send_udp_packet_routine(void *arg) { + // printf("send udp packet routine\n"); + int *buf = arg; + // unpack args + int user_index = buf[0]; + int buffer_size = buf[1]; + char *file_buffer = malloc(buffer_size); + memcpy(file_buffer, buf+2, buffer_size); + + // setup variables + int did_work = 1; + pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + + // setup udp socket for this thread/user + int udp_port = user_data[user_index].udpPort; + int udp_sockfd; + struct addrinfo *res; + if (setup_udp_connection(udp_port, &udp_sockfd, &res) == -1){ + fprintf(stderr, "failure in setup_udp_connection"); + return (NULL); // error occured + } + + // setup cleanup for thread that closes the fd if closed pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd); - // wait for + pthread_mutex_lock(&m); + // wait for threads did_work = 0; while (!start_threads) { @@ -473,17 +491,14 @@ void *send_udp_packet_routine(void *arg) { if (station_num == -1) { did_work = 1; } - // potential error here! - // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); - if (send_all_udp(udp_sockfd, file_buffer, &buffer_size, thread_res) == -1) - { - perror("send_all_udp"); - printf("We only sent %d bytes because of the error!\n", buffer_size); - } + // send the data! (no error check since udp) + send_all_udp(udp_sockfd, file_buffer, &buffer_size, &res); + // cleanup + close(udp_sockfd); free(file_buffer); - + pthread_mutex_unlock(&m); pthread_cleanup_pop(1); @@ -510,15 +525,10 @@ void *send_announce_routine(void *arg) { fd_set master; // master file descriptor list - -void *select_routine(void *arg) { - fd_set read_fds; // temp file descriptor list for select() - - int listener; // listening socket descriptor +int listener; // listening socket descriptor +int fdmax; // maximum file descriptor number +void setup_listener() { int newfd; // newly accept()ed socket descriptor - int fdmax; // maximum file descriptor number - struct sockaddr_storage remoteaddr; // client address - socklen_t addrlen; char buf[256]; // buffer for client data int nbytes; @@ -533,8 +543,7 @@ void *select_routine(void *arg) { // const char* port = argv[1]; - FD_ZERO(&master); // clear the master and temp sets - FD_ZERO(&read_fds); + FD_ZERO(&master); // clear the master set // LISTENER: get us a socket and bind it memset(&hints, 0, sizeof hints); @@ -582,144 +591,170 @@ void *select_routine(void *arg) { // keep track of the biggest file descriptor fdmax = listener; // so far, it's this one +} + +int handle_client_command(int clientfd) { + uint8_t command_type = -1; + if (recv(clientfd, &command_type, 1, 0) <= 0) // check user has disconnected + { + if (l) printf("socket %d HUNGUP. lost connection.\n", clientfd); + + destroy_user(clientfd); + return 0; + } + + // check for each command type... + if (command_type == HELLO) { + // if user already sent a HELLO, send back in invalid command + // since only the only the handshake handles HELLO + if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", clientfd); + + char * message = "must not sent more than one Hello message"; + send_invalid_reply(clientfd, strlen(message), message); + destroy_user(clientfd); + return 0; + } + + if (command_type == SETSTATION) { + if(!handle_setstation_command(clientfd)) { // failure + // remove user + destroy_user(clientfd); + return 0; + } + return 1; // sucess, return 1 + } + + if (command_type == LISTSTATIONS) { + if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd); + + pthread_t send_thread; // send STATIONINFO in a different thread + pthread_create(&send_thread, NULL, send_stationsinfo_reply, clientfd); + return 1; + } + + // if we're here, we got an invalid/unknown command + if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, clientfd); + char *message = "invalid command type"; + send_invalid_reply(clientfd, strlen(message), message); + destroy_user(clientfd); + return 0; +} + +int handle_new_connection() { + // handle new connections + struct sockaddr_storage remoteaddr; + socklen_t addrlen = sizeof remoteaddr; + int newfd = accept(listener, + (struct sockaddr *)&remoteaddr, + &addrlen); + // check if newfd is valid + if (newfd == -1) { + perror("accept"); + return 0; + } + + // new user, we need to do the handshake + uint16_t udp_port = handle_handshake(newfd); + if (udp_port == 0) { // 0 is used for err since unsigned can't be negative + // drop connection upon error + close(newfd); + return 0; + } + + // add the user to the set + FD_SET(newfd, &master); // add to master set + if (newfd > fdmax) { // keep track of the max + fdmax = newfd; + } + + // TODO: thread this + init_user_routine(newfd, udp_port); + return 1; +} + +void *select_routine(void *arg) { + fd_set read_fds; // temp file descriptor list for select() + FD_ZERO(&read_fds); // clear the temp set while(1) { - read_fds = master; // copy it + read_fds = master; // copy set + if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { perror("select"); exit(4); } // run through the existing connections looking for data to read - for(i = 0; i <= fdmax; i++) { - if (FD_ISSET(i, &read_fds)) { // we got one!! - if (i == listener) { - // handle new connections - addrlen = sizeof remoteaddr; - newfd = accept(listener, - (struct sockaddr *)&remoteaddr, - &addrlen); - - if (newfd == -1) { - perror("accept"); - } else { - // new user, we need to do the handshake - uint16_t udp_port = handle_handshake(newfd); - if (udp_port == 0) { - // drop connection upon error - close(newfd); - continue; - } - - // add the user to the set - FD_SET(newfd, &master); // add to master set - if (newfd > fdmax) { // keep track of the max - fdmax = newfd; - } - - // TODO: thread or combine into one function - // init the user - init_user(newfd); - // update the udp port - update_user_udpPort(newfd, udp_port); - - send_welcome_reply(newfd); - } - } else { - // handle data from a client - uint8_t command_type = -1; - if ((nbytes = recv(i, &command_type, 1, 0)) <= 0) - { - // got error or connection closed by client - if (nbytes == 0) { - // connection closed - if (l) printf("socket %d HUNGUP. lost connection.\n", newfd); - } else { - perror("recv"); - } - destroy_user(i); - } - else // we got some data from a client - { - if (command_type == 0) { // we got a HELLO commmand - // send back in invalid command if user already sent a hello - if (user_data[sockfd_to_user[i]].udpPort != -1) { - if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", i); - char * message = "must not sent more than one Hello message"; - send_invalid_reply(i, strlen(message), message); - destroy_user(i); - } - } - - else if (command_type == 1) { // we got a SETSTATION command - handle_setstation_command(i); - } - else if (command_type == 5) { // we got a ListStations command - if (l) printf("received a LISTSTATIONS from socket %d\n", i); - pthread_t t; - pthread_create(&t, NULL, send_stationsinfo_reply, i); - } - else { - if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, i); - char *message = "invalid command type"; - send_invalid_reply(i, strlen(message), message); - destroy_user(i); - } - } - } + for(int i = 0; i <= fdmax; i++) { + if (!FD_ISSET(i, &read_fds)) { // no new connections + continue; + } + + // if listener, then we need to add this new connection + if (i == listener) { + handle_new_connection(); + continue; } + + // if not the listener, then we need to handle new data from the client + handle_client_command(i); } } + + return (NULL); } -void *init_user(int sockfd) { - // add the user to the list of user data +void *init_user_routine(int newfd, int udp_port) { pthread_mutex_lock(&mutex_user_data); - if(sockfd > max_sockfd) { - max_sockfd = sockfd; - int * more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1)); - if (!more_sockfd_to_user) { perror("realloc"); exit(1); } + // FOLLOWING IS FOR MEMORY OPTIMIZATION + + // if the newfd is larger than the max, we need to resize the array + if(newfd > max_sockfd) { + max_sockfd = newfd*2; // double the array + int *more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1)); + if (!more_sockfd_to_user) { perror("realloc in init_user_routine1"); exit(1); } sockfd_to_user = more_sockfd_to_user; } + // let's check if we can resuse the space from a previous user int running_index = 0; while(running_index < max_active_users) { + // -1 on sockfd indicates that we have directly assigned it when "destroying a user" + // -> so a previous user was there, but discconnected if (user_data[running_index].sockfd == -1) { - break; + break; // we found an index } running_index++; } + if (running_index == max_active_users) { - // printf("reached max active users\n"); - // printf("making new memory\n"); + // if we're here, we went through the whole array to no avail, + // so we need to extend users array max_active_users++; user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); - if (!more_users) { perror("realloc"); exit(1); } + if (!more_users) { perror("realloc in init_user_routine2"); exit(1); } user_data = more_users; } // map TCP sockfd to this user index - user_data[running_index] = (user_t){-1, -1, sockfd}; - sockfd_to_user[sockfd] = running_index; - // free(user_stream_threads); + user_data[running_index] = (user_t){udp_port, -1, newfd}; + sockfd_to_user[newfd] = running_index; + pthread_mutex_unlock(&mutex_user_data); + // successfully created user, let's send the welcome + send_welcome_reply(newfd); + return (NULL); } -void *update_user_udpPort(int sockfd, int udpPort) { - pthread_mutex_lock(&mutex_user_data); - // get the user - user_t *user = &user_data[sockfd_to_user[sockfd]]; - // set the udpPort - user->udpPort = udpPort; - // start the stream thread, now that we have the udpPort - // pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); - pthread_mutex_unlock(&mutex_user_data); +// void *update_user_udpPort(int sockfd, int udpPort) { +// pthread_mutex_lock(&mutex_user_data); +// pthread_mutex_unlock(&mutex_user_data); - return (NULL); -} +// return (NULL); +// } void *update_user_station(int sockfd, int stationNum) { pthread_mutex_lock(&mutex_user_data); user_data[sockfd_to_user[sockfd]].stationNum = stationNum; @@ -859,23 +894,14 @@ uint16_t handle_handshake(int newfd) { if (l) printf("waiting for HELLO command. new connection on socket %d\n", newfd); - // apply timeout to socket + // apply timeout to socket since handshake must be completed quickly apply_timeout(newfd); uint8_t command_type = -1; - size_t nbytes; - if (nbytes = recv(newfd, &command_type, 1, 0) <= 0) + if (recv(newfd, &command_type, 1, 0) <= 0) { // got error or connection closed by client - if (nbytes == 0) { - // connection closed - if (l) printf("socket %d HUNGUP. lost connection.\n", newfd); - } - else - { - perror("recv in handle handshake"); - } - // remove user from data structures + if (l) printf("socket %d HUNGUP in handshake. lost connection.\n", newfd); close(newfd); return 0; } @@ -888,14 +914,15 @@ uint16_t handle_handshake(int newfd) } // get the udp port - uint16_t udp_port = -1; + uint16_t udp_port; int bytes_to_read = sizeof(uint16_t); if (recv_all(newfd, &udp_port, &bytes_to_read) == -1) { perror("recv_all"); close(newfd); return 0; } - // remove timeout, after reading data + + // remove timeout, as we aren't "waiting" for immediate reply remove_timeout(newfd); if(l) printf("received HELLO from socket %d with udp port %u\n", newfd, ntohs(udp_port)); @@ -919,13 +946,13 @@ int send_welcome_reply(int fd) { return 1; } -void handle_setstation_command(int sockfd) { +int handle_setstation_command(int sockfd) { // get the station number - uint16_t station_number = -1; + uint16_t station_number; int bytes_to_read = sizeof(uint16_t); if (recv_all(sockfd, &station_number, &bytes_to_read) == -1) { perror("recv_all"); - destroy_user(sockfd); + return 0; } station_number = ntohs(station_number); @@ -935,8 +962,7 @@ void handle_setstation_command(int sockfd) { // send back in invalid command and drop user char * message = "must send Hello message first"; send_invalid_reply(sockfd, strlen(message), message); - destroy_user(sockfd); - return; + return 0; } if (l) printf("received a SETSTATION from socket %d with station_number %u\n", sockfd, station_number); @@ -948,18 +974,18 @@ void handle_setstation_command(int sockfd) { // send back in invalid command and drop user char * message = "station number out of range"; send_invalid_reply(sockfd, strlen(message), message); - destroy_user(sockfd); - return; + return 0; } // check if station num has been removed if (stations[station_number].readfd == -1) { send_stationshutdown_reply(sockfd); - return; + return 1; } update_user_station(sockfd, station_number); send_announce_reply(sockfd, station_number); + return 1; } void send_stationshutdown_reply(int fd) { @@ -1062,13 +1088,15 @@ void add_station(char *file_path) { void cleanup_fds() { - // // close all the file descriptors - // for (int i = 0; i < num_stations; i++) { - // close(stations[i].readfd); - // } - // for (int i = 0; i < max_active_users; i++) { - // if (user_data[i].sockfd != -1) { - // close(user_data[i].sockfd); - // } - // } + // close all the file descriptors for the stations + for (int i = 0; i < num_stations; i++) { + close(stations[i].readfd); + } + + // close all the tcp connections + for (int i = 0; i < max_active_users; i++) { + if (user_data[i].sockfd != -1) { + close(user_data[i].sockfd); + } + } } diff --git a/snowcast_control b/snowcast_control Binary files differindex 8a8a2ce..01f4d14 100755 --- a/snowcast_control +++ b/snowcast_control diff --git a/snowcast_listener b/snowcast_listener Binary files differindex d15c2ac..f27a5c1 100755 --- a/snowcast_listener +++ b/snowcast_listener diff --git a/snowcast_server b/snowcast_server Binary files differindex ab6cf33..5c97922 100755 --- a/snowcast_server +++ b/snowcast_server |