diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-25 19:02:19 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-25 19:02:19 -0400 |
commit | 8c6ae1ecde9faa0af5dacaf7ecf0f9cf47b69159 (patch) | |
tree | 87671ddc40b678e0ec6592015d0e5d26e0df1864 | |
parent | c534d8e28a00c9762fcb4ef2bdeb9a735ae26b75 (diff) |
more refactoring and commenting
-rw-r--r-- | client.c | 4 | ||||
-rw-r--r-- | protocol.c | 5 | ||||
-rw-r--r-- | server.c | 222 | ||||
-rwxr-xr-x | snowcast_control | bin | 18848 -> 22968 bytes | |||
-rwxr-xr-x | snowcast_server | bin | 37856 -> 37816 bytes |
5 files changed, 119 insertions, 112 deletions
@@ -97,7 +97,7 @@ int main(int argc, char *argv[]) pthread_create(&command_line_thread, NULL, command_line_routine, (void*)sockfd); // this while loop hangs on recv and runs when there is new data - while (1) { + while (69) { // get the type of the incoming reply uint8_t reply_type = -1; // print size of utin8 @@ -261,7 +261,7 @@ void *command_line_routine(void* args) { printf("Enter a number to change to it's station. Enter q to end stream.\n"); printf("snowcast_control> "); fflush(stdout); - while (1) { + while (420) { memset(input, 0, LINE_MAX); char *line = fgets(input, LINE_MAX, stdin); @@ -4,6 +4,7 @@ #include "protocol.h" #define TCP_TIMEOUT 100000 // 100ms in microseconds +#define MAX_PACKET_SIZE 512 int send_all(int sock, char *buf, int *len) { @@ -18,8 +19,10 @@ int send_all(int sock, char *buf, int *len) int bytesleft = *len; // how many we have left to send int n; + // ensure we don't send more than MAX_PACKET_SIZE bytes + size_t max_send = bytesleft >= MAX_PACKET_SIZE ? MAX_PACKET_SIZE : bytesleft; while(total < *len) { - n = send(sock, buf+total, bytesleft, 0); + n = send(sock, buf+total, max_send, 0); if (n == -1) { break; } total += n; bytesleft -= n; @@ -15,8 +15,11 @@ #define LINE_MAX 1024 #define MAX_USERS 1000 #define MAX_PATH 50 -#define MAX_RATE_PER_SECOND 16*1024 / 2 -#define MAX_PACKET_SIZE 512 +#define BROADCAST_OFFSET 10000 +#define MAX_RATE_PER_SECOND 16 * 1024 +#define BROADCASTS_PER_SECOND 2 + +#define FILE_READ_SIZE MAX_RATE_PER_SECOND / BROADCASTS_PER_SECOND typedef struct station { pthread_t streamThread; @@ -43,7 +46,6 @@ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER; -const char *port; // int num_stations; int start_threads = 0; @@ -65,7 +67,7 @@ struct udp_packet_routine_args { }; void *send_udp_packet_routine(void* arg); -void setup_listener(); +int setup_listener(const char* port); void *select_routine(void *arg); void *send_announce_routine(void* arg); @@ -93,50 +95,58 @@ void send_stationshutdown_reply(int fd); uint16_t handle_handshake(int newfd); -void cleanup_fds(); +void cleanup_readfds_and_sockets(); -int l = 0; +int l = 0; // boolean for logging -> if (l) { printf("..."); } +fd_set master; // master file descriptor (sockets) list +int fdmax; // maximum file descriptor number main(int argc, char *argv[]) { - // check and assign arguments + // CHECK AND USE ARGUMENTS + // ------------------------------------------------------------------------------------------------- if (argc < 3) { fprintf(stderr,"usage: ./snowcast_server <listen port> <file0> [file 1] [file 2] ... \n"); exit(1); } - // initizlize the port - port = argv[1]; + // assign port + const char* server_port = argv[1]; - // initialize the stations & their threads + // initialize the stations & their threads, from if (setup_stations(argc, argv) == -1) { perror("setup_stations"); exit(1); } - // make array of user data - // printf("max active users: %d\n", sizeof(user_t) * max_active_users); + // INTIALIZE DATA STRUCTURES + // ------------------------------------------------------------------------------------------------- + // make pointers to array of user data user_data = malloc(sizeof(user_t) * max_active_users); if (!user_data) { perror("malloc userdata"); return 1; } sockfd_to_user = malloc(sizeof(int) * max_active_users); if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; } - // setup the listener - setup_listener(); + + // SETUP SOCKETS TO LISTEN FOR NEW CONNECTIONS + // ------------------------------------------------------------------------------------------------- + // setup the listener fd to accept new connections + FD_ZERO(&master); // clear the master set + int listenerfd = setup_listener(server_port); // make and start "select" thread that manages: // 1) new connections, 2) requests from current connections, 3) closing connections pthread_t select_thread; - pthread_create(&select_thread, NULL, select_routine, NULL); + pthread_create(&select_thread, NULL, select_routine, listenerfd); - // start syncchronization thread to broadcast stations - // pthread_t sync_thread; - // pthread_create(&sync_thread, NULL, sync_routine, NULL); - - // command line interface + // BELOW IS FOR THE COMMAND LINE INTERFACE + // ------------------------------------------------------------------------------------------------- + + // command line data structures char input[LINE_MAX]; memset(input, 0, LINE_MAX); char *tokens[LINE_MAX / 2]; + // memset(tokens, 0, LINE_MAX / 2); printf("snowcast_server> "); fflush(stdout); @@ -152,18 +162,21 @@ main(int argc, char *argv[]) } char *command = tokens[0]; - // if q, shutdown! + + // if q command: shutdown & close tcp sockets! if (!strcmp(command, "q")) { printf("Exiting.\n"); - cleanup_fds(); + cleanup_readfds_and_sockets(); + close(listenerfd); break; } - // if p, print info + // if p command: print info else if (!strcmp(command, "p")) { - // get the file descriptor - int print_fd = 0; - // see if there is a file path + int print_fd; + + // see if there is a file path to print to, + // if so, open it into print_fd char *output_file_path = tokens[1]; if (output_file_path != NULL) { @@ -173,34 +186,37 @@ main(int argc, char *argv[]) memset(input, 0, LINE_MAX); continue; } - } else { - print_fd = STDOUT_FILENO; - } - // printf("print_fd: %d\n", print_fd); - // pthread_t print_info_thread; - // pthread_create(&print_info_thread, NULL, print_info_routine, print_fd); + } else print_fd = STDOUT_FILENO; + + // print the info print_info_routine((void *)print_fd); - // note - this file descriptor is closed in the thread } + + // if u command: print user data (debugging) else if (!strcmp(command, "u")) - { for (int i = 0; i < max_active_users; i++) print_user_data(i); - } + + // if u command: print station data (debugging) else if (!strcmp(command, "s")) - { for (int i = 0; i < num_stations; i++) print_station_data(i); - } + + // if log command: start logging (debugging) else if (!strcmp(command, "log")) { l= !l; printf("logging is now %s\n", l ? "on" : "off"); } + + // if add command: add a station (EXTRA CREDIT) else if (!strcmp(command, "add")) { // add a new station // get the file path char *file_path = tokens[1]; + if (file_path == NULL) { printf("add: must provide a file path\n"); } else add_station(file_path); // add the station } + + // if add command: remove a station (EXTRA CREDIT) else if (!strcmp(command, "remove")) { // remove a station if (tokens[1] == NULL) { printf("remove: must provide a station number to remove\n"); @@ -209,6 +225,7 @@ main(int argc, char *argv[]) printf("remove: must provide a number\n"); } else destroy_station(atoi(tokens[1])); } + else { printf("unknown command: %s\n", command); } @@ -217,15 +234,14 @@ main(int argc, char *argv[]) printf("snowcast_server> "); fflush(stdout); } - return 0; } -int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) { +int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num) { // see if fd was closed at some point if (fd == -1) return -1; - int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + int bytes_read = read(fd, buffer, FILE_READ_SIZE); if (bytes_read < 0) { perror("read (in read file)"); return -1; } // printf("bytes read: %d\n", bytes_read); if (bytes_read == 0) { @@ -238,10 +254,9 @@ int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) { perror("lseek (in resarting file)"); return -1; } - bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + bytes_read = read(fd, buffer, FILE_READ_SIZE); if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; } } - return bytes_read; } @@ -251,9 +266,6 @@ void *stream_routine_cleanup(void *arg) { } void *stream_routine(void *arg) { - int BROADCASTS_PER_SECOND = 2; - int BROADCAST_OFFSET = 10000; - int station_num = (int) arg; // printf("stream routine %d\n", station_num); int read_fd = stations[station_num].readfd; @@ -261,17 +273,18 @@ void *stream_routine(void *arg) { pthread_cleanup_push(stream_routine_cleanup, read_fd); // make buffer for read_file - char buffer[MAX_RATE_PER_SECOND]; + char buffer[FILE_READ_SIZE]; for (;;) { // load bytes from file into buffer - memset(buffer, 0, MAX_RATE_PER_SECOND); + memset(buffer, 0, FILE_READ_SIZE); int bytes_read = read_file(read_fd, buffer, station_num); if (bytes_read == -1) { return (NULL); } // create the threads to send packets to users, which will be released later int *send_buffer; + int sent = 0; for (int i = 0; i < max_active_users; i++) { if (!user_data[i].sockfd || user_data[i].sockfd == -1) @@ -289,6 +302,7 @@ void *stream_routine(void *arg) { // make thread pthread_t send_udp_packet_thread; pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer); + sent = 1; } } @@ -304,7 +318,7 @@ void *stream_routine(void *arg) { start_threads = 0; // free the buffer after it's been sent - free(send_buffer); + if (sent) free(send_buffer); } return (NULL); @@ -337,6 +351,7 @@ int setup_stations(int argc, char *argv[]) { return 1; } +// helper to write int as a string format buffer void write_int_to_fd(int fd, int n) { int len = snprintf(NULL, 0, "%d", n); char *num = malloc(len + 1); @@ -350,10 +365,11 @@ void write_int_to_fd(int fd, int n) { } void *print_info_routine(void *arg) { + // unpack the fd to print to int print_fd = (int) arg; - // printf("thread print_fd: %d\n", print_fd); - // printf("num_stations: %d\n", num_stations); + for (int i = 0; i < num_stations; i++) { + // prints each station in the desired format write_int_to_fd(print_fd, i); char *comma = ","; write(print_fd, comma, strlen(comma)); @@ -362,11 +378,12 @@ void *print_info_routine(void *arg) { char* file_path = stations[i].filePath; write(print_fd, file_path, strlen(file_path)); + // go through users, and print the udp ports for (int j = 0; j < max_active_users; j++) { if (!user_data[j].sockfd || user_data[j].sockfd == -1) continue; if (user_data[j].stationNum == i) { - char *localhost_ip = ",127.0.0.1:"; + char *localhost_ip = ",127.0.0.1:"; //TODO: possibly update write(print_fd, localhost_ip, strlen(localhost_ip)); // write udpPort write_int_to_fd(print_fd, user_data[j].udpPort); @@ -377,7 +394,9 @@ void *print_info_routine(void *arg) { write(print_fd, newline, strlen(newline)); } + // close the fd if (print_fd != STDOUT_FILENO) close(print_fd); + return (NULL); } @@ -445,8 +464,6 @@ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, stru } *udp_sockfd = sock; - // only these two properties of addrinfo are needed to send udp - // TODO: FIX, or reroute somewhere else *addrlen = thread_res->ai_addrlen; *addr = *thread_res->ai_addr; @@ -464,12 +481,13 @@ void *send_udp_packet_routine(void *arg) { // setup variables int did_work = 1; pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + // only these two following properties of addrinfo are needed to send udp packets + struct sockaddr addr; + socklen_t addrlen; // setup udp socket for this thread/user int udp_port = user_data[user_index].udpPort; int udp_sockfd; - struct sockaddr addr; - socklen_t addrlen; if (setup_udp_connection(udp_port, &udp_sockfd, &addrlen, &addr) == -1){ fprintf(stderr, "failure in setup_udp_connection"); return (NULL); // error occured @@ -504,54 +522,34 @@ void *send_udp_packet_routine(void *arg) { } void *send_announce_routine(void *arg) { - // unpack args + // unpack arg int station_num = (int) arg; // send the announce messages for (int i = 0; i < max_active_users; i++) { - if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) { + if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) continue; - } - // update the station of each user - if (user_data[i].stationNum == station_num) - { + // send announce reply to each user + if (user_data[i].stationNum == station_num) send_announce_reply(user_data[i].sockfd, station_num); - } } } - -fd_set master; // master file descriptor list -int listener; // listening socket descriptor -int fdmax; // maximum file descriptor number -void setup_listener() { - int newfd; // newly accept()ed socket descriptor - - char buf[256]; // buffer for client data - int nbytes; - - - char remoteIP[INET6_ADDRSTRLEN]; - - int yes=1; // for setsockopt() SO_REUSEADDR, below - int i, j, rv; - +int setup_listener(const char* port) { + int listener; // listening socket descriptor, to be set + + // setup and bind to listener struct addrinfo hints, *ai, *p; - - // const char* port = argv[1]; - - FD_ZERO(&master); // clear the master set - - // LISTENER: get us a socket and bind it memset(&hints, 0, sizeof hints); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; - if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { + hints.ai_family = AF_INET; // Ipv4 + hints.ai_socktype = SOCK_STREAM; // TCP + hints.ai_flags = AI_PASSIVE; // auto select + int rv; + if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) + { fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); exit(1); } - for(p = ai; p != NULL; p = p->ai_next) { listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); if (listener < 0) { @@ -559,16 +557,15 @@ void setup_listener() { } // lose the pesky "address already in use" error message + int yes=1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { close(listener); continue; } - break; } - // if we got here, it means we didn't get bound if (p == NULL) { fprintf(stderr, "snowcast_server: failed to bind\n"); @@ -586,8 +583,11 @@ void setup_listener() { // add the listener to the master set FD_SET(listener, &master); - // keep track of the biggest file descriptor - fdmax = listener; // so far, it's this one + // initialize fdmax. listener will be greatest, + // since after the stations are cerated + fdmax = listener; + + return listener; } int handle_client_command(int clientfd) { @@ -624,7 +624,7 @@ int handle_client_command(int clientfd) { if (command_type == LISTSTATIONS) { if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd); - pthread_t send_thread; // send STATIONINFO in a different thread + pthread_t send_thread; // send STATIONINFO from a different thread pthread_create(&send_thread, NULL, send_stationsinfo_reply, clientfd); return 1; } @@ -637,8 +637,9 @@ int handle_client_command(int clientfd) { return 0; } -int handle_new_connection() { - // handle new connections +int handle_new_connection(int listener) { + + // accept new connection struct sockaddr_storage remoteaddr; socklen_t addrlen = sizeof remoteaddr; int newfd = accept(listener, @@ -670,10 +671,13 @@ int handle_new_connection() { } void *select_routine(void *arg) { + int listener = (int) arg; // the listener fd + fd_set read_fds; // temp file descriptor list for select() FD_ZERO(&read_fds); // clear the temp set - while(1) { + for (;;) + { read_fds = master; // copy set if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { @@ -689,7 +693,7 @@ void *select_routine(void *arg) { // if listener, then we need to add this new connection if (i == listener) { - handle_new_connection(); + handle_new_connection(listener); continue; } @@ -836,7 +840,7 @@ void *send_stationsinfo_reply(void * arg) { uint32_t reply_size = 0; for (int i = 0; i < num_stations; i++) { if (stations[i].readfd != -1) - reply_size += snprintf(NULL, 0, "%d,%s\n", i, stations[i].filePath); + reply_size += snprintf(NULL, 0, "station #%d -> %s\n", i, stations[i].filePath); } reply_size--; // don't want final \n @@ -851,14 +855,10 @@ void *send_stationsinfo_reply(void * arg) { if (send_all(fd, &reply_size_endian, &bytes) == -1) perror("send_all in send stations info"); - - - printf("SENT reply size: %d\n", reply_size); - char send_buffer[reply_size]; int ptr = 0; for (int i = 0; i < num_stations; i++) { - ptr += sprintf(send_buffer + ptr, "%d,%s\n", i, stations[i].filePath); + ptr += sprintf(send_buffer + ptr, "station %d -> %s\n", i, stations[i].filePath); } int bytes_to_send = reply_size; // don't want final \n @@ -1021,9 +1021,11 @@ void destroy_station(int station_num) { void send_newstation_reply(uint16_t station_num) { if (l) printf("sending NEWSTATION reply to all sockets\n"); + // make the struct uint8_t reply_num = 8; uint16_t station_num_n = htons(station_num); struct NewStation new_station = {reply_num, station_num_n}; + // send the message to each (valid) user for (int i = 0; i < max_active_users; i++) { if (!user_data[i].sockfd || user_data[i].sockfd == -1) continue; @@ -1047,6 +1049,7 @@ void add_station(char *file_path) { stations[i].readfd = open(file_path, O_RDONLY); if (stations[i].readfd < 0) { perror("read (from add station)"); return; } pthread_create(&stations[i].streamThread, NULL, stream_routine, i); + send_newstation_reply(i); return; } @@ -1069,16 +1072,19 @@ void add_station(char *file_path) { stations[num_stations].filePath = malloc(strlen(file_path)); memcpy(stations[num_stations].filePath, file_path, strlen(file_path)); stations[num_stations].readfd = open(file_path, O_RDONLY); + // update fdmax, if applicable + if (stations[num_stations].readfd > fdmax) { + fdmax = stations[num_stations].readfd; + } if (stations[num_stations].readfd < 0) { perror("read (from add station)"); return; } pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, num_stations); send_newstation_reply(num_stations); - printf("add: successfully created station @ index %d\n", num_stations); - num_stations++; + printf("add: successfully created station @ index %d\n", num_stations); } -void cleanup_fds() { +void cleanup_readfds_and_sockets() { // close all the file descriptors for the stations for (int i = 0; i < num_stations; i++) { close(stations[i].readfd); @@ -1090,6 +1096,4 @@ void cleanup_fds() { close(user_data[i].sockfd); } } - - close(listener); } diff --git a/snowcast_control b/snowcast_control Binary files differindex dae574f..e76cedf 100755 --- a/snowcast_control +++ b/snowcast_control diff --git a/snowcast_server b/snowcast_server Binary files differindex cb0c2f1..d3f5e04 100755 --- a/snowcast_server +++ b/snowcast_server |