diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-25 22:21:42 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-25 22:21:42 -0400 |
commit | 6a2c567b85be275bb431c09952a88ea4cdf210aa (patch) | |
tree | ff649ae093b3b8eb9931e3a2e4cf7bb3e5ce0bb7 | |
parent | 8c6ae1ecde9faa0af5dacaf7ecf0f9cf47b69159 (diff) |
massive restrcuting of code for readability
-rw-r--r-- | client.c | 100 | ||||
-rw-r--r-- | listener.c | 58 | ||||
-rw-r--r-- | protocol.c | 31 | ||||
-rw-r--r-- | protocol.h | 3 | ||||
-rw-r--r-- | server.c | 1215 | ||||
-rwxr-xr-x | snowcast_control | bin | 22968 -> 35133 bytes | |||
-rwxr-xr-x | snowcast_listener | bin | 13656 -> 34542 bytes | |||
-rwxr-xr-x | snowcast_server | bin | 37816 -> 71756 bytes |
8 files changed, 780 insertions, 627 deletions
@@ -1,3 +1,12 @@ +/* + author: sotech117 + date: 9/25/2023 + course: csci1680 + description: client for snowcast, a music streaming service + + enjoy :) +*/ + #include <stdio.h> #include <stdlib.h> #include <unistd.h> @@ -12,53 +21,42 @@ #include <arpa/inet.h> #include "protocol.c" -#define MAX_READ_SIZE 1024 -#define LINE_MAX 1024 - -void *command_line_routine(void *args); +#define COMMAND_LINE_MAX 1024 +// handles the handshake and returns the number of stations u_int16_t handle_handshake(int sockfd, char* udpPort); -int station_is_set = 0; -int l = 0; -int waiting = 0; -int exiting = 0; - -// get sockaddr, IPv4 or IPv6: -void *get_in_addr(struct sockaddr *sa) -{ - if (sa->sa_family == AF_INET) { - return &(((struct sockaddr_in*)sa)->sin_addr); - } +// routine thread for the command line +void *command_line_routine(void *args); - return &(((struct sockaddr_in6*)sa)->sin6_addr); -} +int l = 0; // logging +int station_is_set = 0; // if we have a station set +int waiting = 0; // for command line "snowcast_control>" prompt to be a new lone +int exiting = 0; // to not have error messages when the program exits "gracefully" -int main(int argc, char *argv[]) +main(int argc, char *argv[]) // no int here for good luck :) { - int sockfd, numbytesrecv, numbytessent, recvbytes; - // char buf[MAXDATASIZE]; - struct addrinfo hints, *servinfo, *p; - char s[INET6_ADDRSTRLEN]; - - // check arugments + // CHECK AND USE ARGUMENTS + // ------------------------------------------------------------------------------------------------- if (argc != 4) { fprintf(stderr,"<server IP> <server port> <listener port>\n"); exit(1); } + const char* tcpPort = argv[2]; // port we use to connect to server's tcp stream + const char* udpPort = argv[3]; // port we use to connect to server's udp info and command + // SETUP TCP CONNECTION (getaaadrinfo->socket->connect) + // ------------------------------------------------------------------------------------------------- + struct addrinfo hints, *servinfo, *p; memset(&hints, 0, sizeof hints); hints.ai_family = AF_INET; // only IPv4 hints.ai_socktype = SOCK_STREAM; - char* tcpPort = argv[2]; // port we use to connect to server's tcp stream - char* udpPort = argv[3]; // port we use to connect to server's udp info and command - - // resolve host - int rv; - if ((rv = getaddrinfo(argv[1], tcpPort, &hints, &servinfo)) != 0) + // resolve host & make socket + int sockfd, err; + if ((err = getaddrinfo(argv[1], tcpPort, &hints, &servinfo)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(err)); return 1; } // loop through all the results and connect to the first we can @@ -80,9 +78,12 @@ int main(int argc, char *argv[]) fprintf(stderr, "client: failed to connect\n"); return 2; } - inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof s); + // char s[INET_ADDRSTRLEN]; // for printing the ip address + // inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof s); freeaddrinfo(servinfo); // all done with this structure + // DO HANDSHAKE & PRINT STATION NUMBER + // ------------------------------------------------------------------------------------------------- // now that we're connectioned, let's do the handshake uint16_t num_stations = handle_handshake(sockfd, udpPort); @@ -92,17 +93,19 @@ int main(int argc, char *argv[]) printf("Welcome to Snowcast! The server has %u stations.\n", num_stations); fflush(stdout); - // start the thread to accept command lines + // START COMMAND LINE THREAD + // ------------------------------------------------------------------------------------------------- pthread_t command_line_thread; pthread_create(&command_line_thread, NULL, command_line_routine, (void*)sockfd); - // this while loop hangs on recv and runs when there is new data + // START WHILE LOOP THAT HANDLES ALL REPLIES + // ------------------------------------------------------------------------------------------------- while (69) { // get the type of the incoming reply uint8_t reply_type = -1; // print size of utin8 if (recv(sockfd, &reply_type, 1, 0) == -1) { - if (exiting) { + if (exiting) { // just to remove that pesky error message break; } perror("recv in first byte"); @@ -211,6 +214,9 @@ int main(int argc, char *argv[]) if (reply_type == STATIONSHUTDOWN) { // we are getting StationShutdown if (l) printf("received STATIONSHUTDOWN reply.\n"); + // remove timeout, in case we were waiting for an announce + remove_timeout(sockfd); + if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt :) waiting = 0; @@ -251,27 +257,28 @@ int main(int argc, char *argv[]) return 0; } +/* thread for managing the command line */ void *command_line_routine(void* args) { // unpack sockfd as arg int sockfd = (int) args; // buffer for input - char input[LINE_MAX]; + char input[COMMAND_LINE_MAX]; printf("Enter a number to change to it's station. Enter q to end stream.\n"); printf("snowcast_control> "); fflush(stdout); while (420) { - memset(input, 0, LINE_MAX); - char *line = fgets(input, LINE_MAX, stdin); + memset(input, 0, COMMAND_LINE_MAX); + char *line = fgets(input, COMMAND_LINE_MAX, stdin); // nothing was typed if (line == NULL) { continue; } - // q command: exit the program - if (strncmp("q\n", input, LINE_MAX) == 0) { + // "q" command: exit the program + if (strncmp("q\n", input, COMMAND_LINE_MAX) == 0) { // end code if type in q exiting = 1; printf("Exiting.\n"); @@ -279,8 +286,8 @@ void *command_line_routine(void* args) { exit(0); } - // l command: STATIONINFO command (EXTRA CREDIT) - if (strncmp("l\n", input, LINE_MAX) == 0) { + // "l" command: STATIONINFO command (EXTRA CREDIT) + if (strncmp("l\n", input, COMMAND_LINE_MAX) == 0) { // send the command to list stations if (l) printf("sending LISTSTATIONS command. waiting for STATIONINFO reply.\n"); apply_timeout(sockfd); // apply a timeout, will be released when we get the reply @@ -292,8 +299,8 @@ void *command_line_routine(void* args) { continue; } - // log command: toggle logging - if (strncmp("log\n", input, LINE_MAX) == 0) + // "log" command: toggle logging + if (strncmp("log\n", input, COMMAND_LINE_MAX) == 0) { l = !l; printf("LOGGING is now %s!\n", l ? "on" : "off"); @@ -335,6 +342,11 @@ void *command_line_routine(void* args) { return (NULL); } +/* + handles the handshake, given the new socket fd and the udp port of this client + returns the number of stations + note: will close the program on failure! +*/ uint16_t handle_handshake(int sockfd, char* udp_port) { if (l) printf("found server, sending HELLO command.\n"); @@ -1,5 +1,11 @@ + /* -** listener.c -- a datagram sockets "server" demo + author: sotech117 + date: 9/25/2023 + course: csci1680 + description: listener for snowcast, a music streaming service + + enjoy :) */ #include <stdio.h> @@ -13,45 +19,30 @@ #include <arpa/inet.h> #include <netdb.h> -// #define MYPORT "4950" // the port users will be connecting to - -#define MAXBUFLEN 16384 - -// get sockaddr, IPv4 or IPv6: -void *get_in_addr(struct sockaddr *sa) -{ - if (sa->sa_family == AF_INET) { - return &(((struct sockaddr_in*)sa)->sin_addr); - } - - return &(((struct sockaddr_in6*)sa)->sin6_addr); -} +#include "protocol.c" int main(int argc, char *argv[]) { - int sockfd; - struct addrinfo hints, *servinfo, *p; - int rv; - int numbytes; - struct sockaddr_storage their_addr; - socklen_t addr_len; - char s[INET6_ADDRSTRLEN]; - + // CHECK AND USE ARGUMENTS + // ------------------------------------------------------------------------------------------------- if (argc != 2) { fprintf(stderr,"<udp port>\n"); exit(1); } - + // get the udp port const char* udp_port = argv[1]; - + // GET UDP SOCKET + // ------------------------------------------------------------------------------------------------- + int sockfd, err; + struct addrinfo hints, *servinfo, *p; memset(&hints, 0, sizeof hints); hints.ai_family = AF_INET; // set to AF_INET to use IPv4 hints.ai_socktype = SOCK_DGRAM; hints.ai_flags = AI_PASSIVE; // use my IP - if ((rv = getaddrinfo(NULL, udp_port, &hints, &servinfo)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); + if ((err = getaddrinfo(NULL, udp_port, &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(err)); return 1; } @@ -71,20 +62,21 @@ int main(int argc, char *argv[]) break; } - if (p == NULL) { fprintf(stderr, "listener: failed to bind socket\n"); return 2; } - freeaddrinfo(servinfo); - int count = 0; - - char buf[MAXBUFLEN]; + // START READ/WRITE LOOP + // ------------------------------------------------------------------------------------------------- + char buf[MAX_PACKET_SIZE]; // buffer for reading + struct sockaddr_storage their_addr; // connector's address information + socklen_t addr_len; while(1) { addr_len = sizeof their_addr; - if ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN , 0, + int numbytes; + if ((numbytes = recvfrom(sockfd, buf, MAX_PACKET_SIZE , 0, (struct sockaddr *)&their_addr, &addr_len)) == -1) { perror("recvfrom"); exit(1); @@ -92,7 +84,7 @@ int main(int argc, char *argv[]) // print the buffer write(STDOUT_FILENO, buf, numbytes); - memset(buf, 0, MAXBUFLEN); + memset(buf, 0, MAX_PACKET_SIZE); } close(sockfd); @@ -3,9 +3,11 @@ #include "protocol.h" -#define TCP_TIMEOUT 100000 // 100ms in microseconds -#define MAX_PACKET_SIZE 512 - +/* + ensures all bytes from the buffer are sent + note: applies a timeout during the send of bytes + note: modyfies the len variable to reflect the number of bytes send +*/ int send_all(int sock, char *buf, int *len) { struct timeval timeout; @@ -33,6 +35,11 @@ int send_all(int sock, char *buf, int *len) return n==-1?-1:0; // return -1 on failure, 0 on success } +/* + ensures all bytes that can be sent are loaded into the buffer + note: applies a timeout during the collection of bytes + note: modyfies the len variable to reflect the number of bytes read +*/ int recv_all(int sock, char *buf, int *len) { // setup the timeout on the socket @@ -68,6 +75,10 @@ int recv_all(int sock, char *buf, int *len) return n==-1?-1:0; // return -1 on failure, 0 on success } +/* + applies a timeout to the socket itself + note: should only be used with tcp connections, after connect() +*/ int apply_timeout(int fd) { // handle handshake struct timeval tv; @@ -81,6 +92,10 @@ int apply_timeout(int fd) { return 1; } +/* + removes the timeout on a socket + note: should only be used with tcp connections, after connect() +*/ int remove_timeout(int fd) { // handle handshake @@ -93,4 +108,14 @@ int remove_timeout(int fd) } return 1; +} + +/* + basic helper, not "really" used as we are only Ipv4 +*/ +void *get_in_addr(struct sockaddr *sa){ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + return &(((struct sockaddr_in6*)sa)->sin6_addr); }
\ No newline at end of file @@ -11,6 +11,8 @@ #define STATIONSHUTDOWN 7 #define NEWSTATION 8 +#define TCP_TIMEOUT 100000 // 100ms in microseconds +#define MAX_PACKET_SIZE 512 // client to server messages (commands) @@ -63,3 +65,4 @@ struct NewStation { int send_all(int sock, char *buf, int *len); int recv_all(int sock, char *buf, int *len); +void *get_in_addr(struct sockaddr *sa); @@ -1,3 +1,16 @@ + +/* + author: sotech117 + date: 9/25/2023 + course: csci1680 + description: server for snowcast, a music streaming service + + disclaimer: I have outlined the structure of the server by categorizing the code into sections. + Each category is implemented in the order that it appears in the declaration below. + + enjoy :) +*/ + #include <stdlib.h> #include <pthread.h> #include <stdio.h> @@ -12,96 +25,112 @@ #include <sys/types.h> #include "protocol.c" -#define LINE_MAX 1024 -#define MAX_USERS 1000 -#define MAX_PATH 50 -#define BROADCAST_OFFSET 10000 -#define MAX_RATE_PER_SECOND 16 * 1024 -#define BROADCASTS_PER_SECOND 2 +// ---------------------------------------------------------------------------------------------------------- +// 0) MACROS +// ---------------------------------------------------------------------------------------------------------- +#define COMMAND_LINE_MAX 1024 +#define BROADCAST_OFFSET 10000 // how long to "allow" the threads to broadcast +#define MAX_RATE_PER_SECOND 16 * 1024 // stream rate per second +#define BROADCASTS_PER_SECOND 2 // how often to "seek" the file #define FILE_READ_SIZE MAX_RATE_PER_SECOND / BROADCASTS_PER_SECOND +// ---------------------------------------------------------------------------------------------------------- +// 1) STATION DATA AND FUNCTIONS +// ---------------------------------------------------------------------------------------------------------- typedef struct station { - pthread_t streamThread; - int readfd; - char *filePath; + pthread_t streamThread; + int readfd; + char *filePath; } station_t; -int num_stations; -station_t *stations; -int setup_stations(int argc, char *argv[]); -void *stream_routine(void *arg); - -typedef struct user { +int num_stations; +pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER; +station_t *stations; +int setup_stations(int argc, char *argv[]); +void *stream_routine(void *arg); +void *stream_routine_cleanup(void *arg); +int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num); + +// ---------------------------------------------------------------------------------------------------------- +// 2) USER DATA AND FUNCTIONS +// ---------------------------------------------------------------------------------------------------------- +typedef struct user +{ int udpPort; int stationNum; - int sockfd; + int tcpfd; } user_t; - - -/* For safe condition variable usage, must use a boolean predicate and */ -/* a mutex with the condition. */ -int count = 0; -pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; - -pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER; - -// int num_stations; - -int start_threads = 0; -int max_active_users = 0; -int max_sockfd = 0; - -pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; -// array from index to user_data -user_t *user_data; -int *sockfd_to_user; - -// stations array pointer -// station_t *station_data; - -struct udp_packet_routine_args { - int user_index; - int buffer_size; - char *file_buffer; -}; - -void *send_udp_packet_routine(void* arg); -int setup_listener(const char* port); -void *select_routine(void *arg); -void *send_announce_routine(void* arg); - -int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); -void *print_info_routine(void *arg); - -void *get_in_addr(struct sockaddr *sa); - -void *init_user_routine(int sockfd, int udp_port); -// void *update_user_udpPort(int sockfd, int udpPort); -void *update_user_station(int sockfd, int stationNum); -void *print_user_data(int sockfd); -void *print_station_data(int station); -void destroy_user(int sockfd); - -void send_announce_reply(int fd, int station_num); -void send_invalid_reply(int fd, size_t message_size, char* message); -void *send_stationsinfo_reply(void *arg); -int send_welcome_reply(int fd); -int handle_setstation_command(int fd); - -void add_station(char *station_name); -void destroy_station(int station_num); -void send_stationshutdown_reply(int fd); - -uint16_t handle_handshake(int newfd); - -void cleanup_readfds_and_sockets(); - -int l = 0; // boolean for logging -> if (l) { printf("..."); } - -fd_set master; // master file descriptor (sockets) list -int fdmax; // maximum file descriptor number -main(int argc, char *argv[]) +int max_active_users = 0; // maximum number of users that have been connected at once +int max_tcpfd = 0; // maximum tcpfd for users that has been seen +pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER; +user_t *users; +int *tcpfd_to_user; // maps tcpfd to user index +void init_user(int tcpfd, int udp_port); +void destroy_user(int tcpfd); + +// ---------------------------------------------------------------------------------------------------------- +// 3) UDP STREAM DATA AND FUNCTIONS +// ---------------------------------------------------------------------------------------------------------- +typedef struct send_udp_packet_routine_args { + int user_index; + int buffer_size; + char *file_buffer; +}; // not used, but nice to have as a guideline +int start_threads = 0; // for synchronization, with the cond +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // for synchronization, with the following routine +void *send_udp_packet_routine(void* arg); +void *send_udp_packet_routine_cleanup(void *arg); +int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, struct sockaddr *addr); +int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen); + +// ---------------------------------------------------------------------------------------------------------- +// 4) TCP LISTENER DATA AND FUNCTIONS +// -------------------------------------------------------------------------å--------------------------------- +fd_set master; // master file list for socket descriptors +int fdmax; // maximum file descriptor number +int setup_listener(const char* port); +int handle_new_connection(int listener); +void *select_routine(void *arg); + +// ---------------------------------------------------------------------------------------------------------- +// 5) PROTOCOL REPLY AND COMMAND FUNCTIONS +// ---------------------------------------------------------------------------------------------------------- +int handle_client_command(int clientfd); +uint16_t handle_handshake(int newfd); +uint8_t send_welcome_reply(int fd); +uint8_t handle_setstation_command(int fd); +void update_user_station(int tcpfd, int stationNum); // setstation helper +void *send_announce_routine(void* arg); +void send_announce_reply(int fd, int station_num); // called in send_announce_routine +void send_invalid_reply(int fd, size_t message_size, char* message); + +// ---------------------------------------------------------------------------------------------------------- +// 6) COMMAND LINE INTERFACE FUNCTIONS +// ---------------------------------------------------------------------------------------------------------- +uint8_t l = 0; // boolean for logging -> if (l) { printf("..."); } +int parse(char buffer[COMMAND_LINE_MAX], char *tokens[COMMAND_LINE_MAX / 2]); +void print_info_routine(int fd); +void write_int_to_fd(int fd, int n); // helper to print_info_routine +void *print_user_data(int index); +void *print_station_data(int station); +void cleanup_readfds_and_sockets(); + + +// --------------------------------------------------------------------------------------------------------- +// 7) EXTRA CREDIT +// --------------------------------------------------------------------------------------------------------- +// ability for client to receive station list +void *send_stationsinfo_routine(void *arg); // sends station info (index, name) to client +// ability for server to add/remove stations through command line +void add_station(char *station_name); +void send_newstation_reply(uint16_t station_num); // notifies users that a new station has been added +void destroy_station(int station_num); +void send_stationshutdown_reply(int fd); // sends to users if listening to a station that is destroyed + +// ---------------------------------------------------------------------------------------------------------- +// 0) MAIN FUNCTION +// ---------------------------------------------------------------------------------------------------------- +main(int argc, char *argv[]) // no int here for good luck :) { // CHECK AND USE ARGUMENTS // ------------------------------------------------------------------------------------------------- @@ -110,7 +139,7 @@ main(int argc, char *argv[]) exit(1); } - // assign port + // get listener's port const char* server_port = argv[1]; // initialize the stations & their threads, from @@ -119,16 +148,16 @@ main(int argc, char *argv[]) exit(1); } - // INTIALIZE DATA STRUCTURES + // INTIALIZE USER DATA STRUCTURES // ------------------------------------------------------------------------------------------------- - // make pointers to array of user data - user_data = malloc(sizeof(user_t) * max_active_users); - if (!user_data) { perror("malloc userdata"); return 1; } - sockfd_to_user = malloc(sizeof(int) * max_active_users); - if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; } + // make pointers user data structures + users = malloc(sizeof(user_t) * max_active_users); + if (!users) { perror("malloc userdata"); return 1; } + tcpfd_to_user = malloc(sizeof(int) * max_active_users); + if (!tcpfd_to_user) { perror("malloc tcpfd to user"); return 1; } - // SETUP SOCKETS TO LISTEN FOR NEW CONNECTIONS + // LISTENER LIST. SOCKER, and SELECT THREAD // ------------------------------------------------------------------------------------------------- // setup the listener fd to accept new connections FD_ZERO(&master); // clear the master set @@ -140,19 +169,16 @@ main(int argc, char *argv[]) pthread_create(&select_thread, NULL, select_routine, listenerfd); // BELOW IS FOR THE COMMAND LINE INTERFACE - // ------------------------------------------------------------------------------------------------- - + // -------------------------------------------------------------------------------------------------s // command line data structures - char input[LINE_MAX]; - memset(input, 0, LINE_MAX); - char *tokens[LINE_MAX / 2]; - // memset(tokens, 0, LINE_MAX / 2); - - printf("snowcast_server> "); + char input[COMMAND_LINE_MAX]; + memset(input, 0, COMMAND_LINE_MAX); + char *tokens[COMMAND_LINE_MAX / 2]; + printf("snowcast_server> "); // very cute to have :) fflush(stdout); - while (read(STDIN_FILENO, input, LINE_MAX) > 0) { + while (read(STDIN_FILENO, input, COMMAND_LINE_MAX) > 0) { // init tokens - memset(tokens, 0, LINE_MAX / 2); + memset(tokens, 0, COMMAND_LINE_MAX / 2); // if 0, all whitespace if (!parse(input, tokens)) { @@ -160,10 +186,10 @@ main(int argc, char *argv[]) fflush(stdout); continue; } - + char *command = tokens[0]; - // if q command: shutdown & close tcp sockets! + // if "q" command: shutdown & close tcp sockets! if (!strcmp(command, "q")) { printf("Exiting.\n"); cleanup_readfds_and_sockets(); @@ -171,7 +197,7 @@ main(int argc, char *argv[]) break; } - // if p command: print info + // if "p" command: print info else if (!strcmp(command, "p")) { int print_fd; @@ -183,30 +209,30 @@ main(int argc, char *argv[]) if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1) { perror("open"); - memset(input, 0, LINE_MAX); + memset(input, 0, COMMAND_LINE_MAX); continue; } } else print_fd = STDOUT_FILENO; // print the info - print_info_routine((void *)print_fd); + print_info_routine(print_fd); } - // if u command: print user data (debugging) + // if "u" command: print user data (debugging) else if (!strcmp(command, "u")) for (int i = 0; i < max_active_users; i++) print_user_data(i); - // if u command: print station data (debugging) + // if "s" command: print station data (debugging) else if (!strcmp(command, "s")) for (int i = 0; i < num_stations; i++) print_station_data(i); - // if log command: start logging (debugging) + // if "log" command: start logging (debugging) else if (!strcmp(command, "log")) { l= !l; printf("logging is now %s\n", l ? "on" : "off"); } - // if add command: add a station (EXTRA CREDIT) + // if "add" command: add a station (EXTRA CREDIT) else if (!strcmp(command, "add")) { // add a new station // get the file path char *file_path = tokens[1]; @@ -216,7 +242,7 @@ main(int argc, char *argv[]) } else add_station(file_path); // add the station } - // if add command: remove a station (EXTRA CREDIT) + // if "remove" command: remove a station (EXTRA CREDIT) else if (!strcmp(command, "remove")) { // remove a station if (tokens[1] == NULL) { printf("remove: must provide a station number to remove\n"); @@ -230,41 +256,55 @@ main(int argc, char *argv[]) printf("unknown command: %s\n", command); } - memset(input, 0, LINE_MAX); + memset(input, 0, COMMAND_LINE_MAX); printf("snowcast_server> "); fflush(stdout); } + return 0; } -int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num) { - // see if fd was closed at some point - if (fd == -1) return -1; +// ---------------------------------------------------------------------------------------------------------- +// 1) STATION DATA AND FUNCTIONS IMPLEMENTATIONS +// ---------------------------------------------------------------------------------------------------------- +/* + given the command line arguments, setups the stations. + 1) mallocs the stations pointer + 2) assigns the stations' variables (filepath & opens file into readfd) + 3) starts the routine for each station +*/ +int setup_stations(int argc, char *argv[]) { + num_stations = argc - 2; - int bytes_read = read(fd, buffer, FILE_READ_SIZE); - if (bytes_read < 0) { perror("read (in read file)"); return -1; } - // printf("bytes read: %d\n", bytes_read); - if (bytes_read == 0) { - // printf("end of file, restarting\n"); - pthread_t send_announce_thread; - pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num); + // get the size to malloc + int totalSize = 0; + for(int i = 2; i < argc; i++) + { + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); + } - if (lseek(fd, 0, SEEK_SET) == -1) - { - perror("lseek (in resarting file)"); - return -1; - } - bytes_read = read(fd, buffer, FILE_READ_SIZE); - if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; } + // malloc the stations array + stations = malloc(totalSize); + if (!stations) { perror("malloc (stations pointer)"); return -1; } + + // assign the stations, and start the threads + for (int i = 0; i < num_stations; i++) { + stations[i].filePath = argv[i+2]; + stations[i].readfd = open(argv[i+2], O_RDONLY); + if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; } + pthread_create(&stations[i].streamThread, NULL, stream_routine, i); } - return bytes_read; -} -void *stream_routine_cleanup(void *arg) { - int read_fd = (int) arg; - close(read_fd); + return 1; } +/* + thread route for each station, given the station num + 1) reads the section of the file into a buffer + 2) creates a thread for each user that is listening to the station + 3) waits for the threads to be created, then broadcasts the cond variable to let them run + note: you can modify how often and how much is read off the file by changing the MACROS +*/ void *stream_routine(void *arg) { int station_num = (int) arg; // printf("stream routine %d\n", station_num); @@ -287,9 +327,9 @@ void *stream_routine(void *arg) { int sent = 0; for (int i = 0; i < max_active_users; i++) { - if (!user_data[i].sockfd || user_data[i].sockfd == -1) + if (!users[i].tcpfd || users[i].tcpfd == -1) continue; - if (user_data[i].stationNum == station_num) + if (users[i].stationNum == station_num) { // prepare the send buffer // (note: using int* for easy pointer assignment) @@ -299,7 +339,7 @@ void *stream_routine(void *arg) { send_buffer[1] = bytes_read; memcpy(send_buffer+2, buffer, bytes_read); - // make thread + // make thread to send packet data pthread_t send_udp_packet_thread; pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer); sent = 1; @@ -325,105 +365,178 @@ void *stream_routine(void *arg) { pthread_cleanup_pop(1); } +void *stream_routine_cleanup(void *arg) { // closes the read fd if thread is cancelled + int read_fd = (int) arg; + close(read_fd); +} -int setup_stations(int argc, char *argv[]) { - num_stations = argc - 2; +/* + reads the file into a buffer. returns the number of bytes read + note: if it reaches the end of a file, it will send an ANNOUNCE message to all users +*/ +int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num) { + // see if fd was closed at some point + if (fd == -1) return -1; - // get the size to malloc - int totalSize = 0; - for(int i = 2; i < argc; i++) - { - totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); - } + int bytes_read = read(fd, buffer, FILE_READ_SIZE); + if (bytes_read < 0) { perror("read (in read file)"); return -1; } + // printf("bytes read: %d\n", bytes_read); + if (bytes_read == 0) { + // printf("end of file, restarting\n"); + pthread_t send_announce_thread; + pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num); - // malloc the stations array - stations = malloc(totalSize); - if (!stations) { perror("malloc (stations pointer)"); return -1; } - // assign the stations, and start the threads - for (int i = 0; i < num_stations; i++) { - stations[i].filePath = argv[i+2]; - stations[i].readfd = open(argv[i+2], O_RDONLY); - if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; } - pthread_create(&stations[i].streamThread, NULL, stream_routine, i); + if (lseek(fd, 0, SEEK_SET) == -1) + { + perror("lseek (in resarting file)"); + return -1; + } + bytes_read = read(fd, buffer, FILE_READ_SIZE); + if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; } } - - // printf("successfully created %d stations\n", num_stations); - return 1; + return bytes_read; } -// helper to write int as a string format buffer -void write_int_to_fd(int fd, int n) { - int len = snprintf(NULL, 0, "%d", n); - char *num = malloc(len + 1); - if (!num) { perror("malloc write to fd"); return; } +// ---------------------------------------------------------------------------------------------------------- +// 2) USER DATA AND FUNCTIONS IMPLEMENTATIONS +// ---------------------------------------------------------------------------------------------------------- +/* + given the newfd and udp_port of a newly connected user, initializes them to the data structure + note: this function does its best to optimize memory (dynamic resize of tcpfd map & resusing space in array) +*/ +void init_user(int newfd, int udp_port) { + pthread_mutex_lock(&mutex_users); - snprintf(num, len + 1, "%d", n); - if (write(fd, num, strlen(num)) == -1) { - perror("write"); + // FOLLOWING IS FOR MEMORY OPTIMIZATION + + // if the newfd is larger than the max, we need to resize the array + if(newfd > max_tcpfd) { + max_tcpfd = newfd*2; // double the array + int *more_tcpfd_to_user = realloc(tcpfd_to_user, sizeof(int) * (max_tcpfd + 1)); + if (!more_tcpfd_to_user) { perror("realloc in init_user1"); exit(1); } + tcpfd_to_user = more_tcpfd_to_user; } - free(num); -} -void *print_info_routine(void *arg) { - // unpack the fd to print to - int print_fd = (int) arg; + // let's check if we can resuse the space from a previous user + int running_index = 0; + while(running_index < max_active_users) { + // -1 on tcpfd indicates that we have directly assigned it when "destroying a user" + // -> so a previous user was there, but discconnected + if (users[running_index].tcpfd == -1) { + break; // we found an index + } + running_index++; + } - for (int i = 0; i < num_stations; i++) { - // prints each station in the desired format - write_int_to_fd(print_fd, i); - char *comma = ","; - write(print_fd, comma, strlen(comma)); + if (running_index == max_active_users) { + // if we're here, we went through the whole array to no avail, + // so we need to extend users array + max_active_users++; + user_t *more_users = realloc(users, sizeof(user_t) * max_active_users); + if (!more_users) { perror("realloc in init_user2"); exit(1); } + users = more_users; + } + + // map TCP tcpfd to this user index + users[running_index] = (user_t){udp_port, -1, newfd}; + tcpfd_to_user[newfd] = running_index; - // write file path - char* file_path = stations[i].filePath; - write(print_fd, file_path, strlen(file_path)); + pthread_mutex_unlock(&mutex_users); - // go through users, and print the udp ports - for (int j = 0; j < max_active_users; j++) { - if (!user_data[j].sockfd || user_data[j].sockfd == -1) - continue; - if (user_data[j].stationNum == i) { - char *localhost_ip = ",127.0.0.1:"; //TODO: possibly update - write(print_fd, localhost_ip, strlen(localhost_ip)); - // write udpPort - write_int_to_fd(print_fd, user_data[j].udpPort); - } - } - // wrtie new line - char *newline = "\n"; - write(print_fd, newline, strlen(newline)); - } + // successfully created user, let's send the welcome + send_welcome_reply(newfd); +} +/* + given the tcpfd of a user, destroys them from the data structures + 1) close the tcpfd + 2) remove from master set + 3) remove from data structures + note: destroying a user sets all fields of that user to -1. often, this is how it's known if a user exists +*/ +void destroy_user(int tcpfd) { + close(tcpfd); // bye! + FD_CLR(tcpfd, &master); // remove from master set - // close the fd - if (print_fd != STDOUT_FILENO) close(print_fd); + // remove user from data structures + pthread_mutex_lock(&mutex_users); + users[tcpfd_to_user[tcpfd]] = (user_t) {-1, -1, -1}; + // map tcpfd to -1 + tcpfd_to_user[tcpfd] = -1; - return (NULL); + pthread_mutex_unlock(&mutex_users); } -int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen) -{ - int total = 0; // how many bytes we've sent - int bytesleft = *len; // how many we have left to send - int n; +// ---------------------------------------------------------------------------------------------------------- +// 3) UDP STREAM DATA AND FUNCTIONS IMPLEMENTATIONS +// ---------------------------------------------------------------------------------------------------------- +/* + thread routine that streams the binary data to the user + 1) unpack the arguments (see struct send_udp_packet_routine_args) + 2) setup udp socket + 3) wait for the cond variable to be broadcasted + 4) send the data over udp +*/ +void *send_udp_packet_routine(void *arg) { + // unpack args + int *buf = arg; + int user_index = buf[0]; + int buffer_size = buf[1]; + char *data_ptr = buf + 2; // add two will skip first ints, since int* buf - while(total < *len) { - n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, addr, addrlen); - if (n == -1) { break; } - total += n; - bytesleft -= n; + // setup variables + int did_work = 1; + pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; + // only these two following properties of addrinfo are needed to send udp packets + struct sockaddr addr; + socklen_t addrlen; + + // setup udp socket for this thread/user + int udp_port = users[user_index].udpPort; + int udp_sockfd; + if (setup_udp_connection(udp_port, &udp_sockfd, &addrlen, &addr) == -1){ + fprintf(stderr, "failure in setup_udp_connection"); + return (NULL); // error occured } - *len = total; // return number actually sent here + // setup cleanup for thread that closes the fd if closed + pthread_cleanup_push(send_udp_packet_routine_cleanup, (void *)udp_sockfd); - return n==-1?-1:0; // return -1 on failure, 0 on success -} + pthread_mutex_lock(&m); + // wait for threads + did_work = 0; + while (!start_threads) + { + pthread_cond_wait(&cond, &m); + } + + int station_num = users[user_index].stationNum; + if (station_num == -1) { + did_work = 1; + } + + // send the data! (no error check since udp) + send_all_udp(udp_sockfd, data_ptr, &buffer_size, &addr, addrlen); + + // cleanup + close(udp_sockfd); + + pthread_mutex_unlock(&m); + pthread_cleanup_pop(1); -void udp_port_cleanup_handler(void *arg) + return (NULL); +} +void *send_udp_packet_routine_cleanup(void *arg) // closes the udpfd if thread is cancelled { int sockfd = (int) arg; close(sockfd); + return (NULL); } +/* + given the udp_port, makes a socket and returns it's file descriptor. (returns -1 on failure) + note: pointers to the desired vairables to be set (udp_sockfd, addrlen, addr) must be passed in, + this function will also set those variables +*/ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, struct sockaddr* addr) { // setup hints to get udp_port struct addrinfo thread_hints, *thread_res, *thread_servinfo; @@ -444,7 +557,7 @@ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, stru if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); - return 0; + return -1; } free(port); @@ -460,7 +573,7 @@ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, stru } if (sock == NULL) { fprintf(stderr, "socket: failed to create udp socket(setup_udp_connection)\n"); - return 0; + return -1; } *udp_sockfd = sock; @@ -470,71 +583,36 @@ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, stru return 1; } -/* Make the manager routine */ -void *send_udp_packet_routine(void *arg) { - // unpack args - int *buf = arg; - int user_index = buf[0]; - int buffer_size = buf[1]; - char *data_ptr = buf + 2; // add two will skip first ints, since int* buf - - // setup variables - int did_work = 1; - pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; - // only these two following properties of addrinfo are needed to send udp packets - struct sockaddr addr; - socklen_t addrlen; - - // setup udp socket for this thread/user - int udp_port = user_data[user_index].udpPort; - int udp_sockfd; - if (setup_udp_connection(udp_port, &udp_sockfd, &addrlen, &addr) == -1){ - fprintf(stderr, "failure in setup_udp_connection"); - return (NULL); // error occured - } - - // setup cleanup for thread that closes the fd if closed - pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd); - - pthread_mutex_lock(&m); - // wait for threads - did_work = 0; - while (!start_threads) - { - pthread_cond_wait(&cond, &m); - } +/* + sends the binary data over udp, but limits individual packets to MAX_PACKET_SIZE + note: the use of pointers modifying the arguments +*/ +int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen) +{ + int total = 0; // how many bytes we've sent + int bytesleft = *len; // how many we have left to send + int n; - int station_num = user_data[user_index].stationNum; - if (station_num == -1) { - did_work = 1; + while(total < *len) { + n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, addr, addrlen); + if (n == -1) { break; } + total += n; + bytesleft -= n; } - // send the data! (no error check since udp) - send_all_udp(udp_sockfd, data_ptr, &buffer_size, &addr, addrlen); - - // cleanup - close(udp_sockfd); - - pthread_mutex_unlock(&m); - pthread_cleanup_pop(1); - - return (NULL); -} + *len = total; // return number actually sent here -void *send_announce_routine(void *arg) { - // unpack arg - int station_num = (int) arg; - // send the announce messages - for (int i = 0; i < max_active_users; i++) - { - if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) - continue; - // send announce reply to each user - if (user_data[i].stationNum == station_num) - send_announce_reply(user_data[i].sockfd, station_num); - } -} + return n==-1?-1:0; // return -1 on failure, 0 on success +} +// ---------------------------------------------------------------------------------------------------------- +// 4) TCP LISTENER DATA AND FUNCTIONS IMPLEMENTATIONS +// ---------------------------------------------------------------------------------------------------------- +/* + given the port, sets up the listener socket on that port (getaddrinfo->socket->bind->listen) + returns the file descriptor of the socket! + note: ends whole program on failure +*/ int setup_listener(const char* port) { int listener; // listening socket descriptor, to be set @@ -590,55 +668,11 @@ int setup_listener(const char* port) { return listener; } -int handle_client_command(int clientfd) { - uint8_t command_type = -1; - if (recv(clientfd, &command_type, 1, 0) <= 0) // check user has disconnected - { - if (l) printf("socket %d HUNGUP. lost connection.\n", clientfd); - - destroy_user(clientfd); - return 0; - } - - // check for each command type... - if (command_type == HELLO) { - // if user already sent a HELLO, send back in invalid command - // since only the only the handshake handles HELLO - if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", clientfd); - - char * message = "must not sent more than one Hello message"; - send_invalid_reply(clientfd, strlen(message), message); - destroy_user(clientfd); - return 0; - } - - if (command_type == SETSTATION) { - if(!handle_setstation_command(clientfd)) { // failure - // remove user - destroy_user(clientfd); - return 0; - } - return 1; // sucess, return 1 - } - - if (command_type == LISTSTATIONS) { - if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd); - - pthread_t send_thread; // send STATIONINFO from a different thread - pthread_create(&send_thread, NULL, send_stationsinfo_reply, clientfd); - return 1; - } - - // if we're here, we got an invalid/unknown command - if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, clientfd); - char *message = "invalid command type"; - send_invalid_reply(clientfd, strlen(message), message); - destroy_user(clientfd); - return 0; -} - +/* + calls accept() when the listener sees has a new connection + returns 1 on success, 0 on failure + */ int handle_new_connection(int listener) { - // accept new connection struct sockaddr_storage remoteaddr; socklen_t addrlen = sizeof remoteaddr; @@ -665,11 +699,16 @@ int handle_new_connection(int listener) { fdmax = newfd; } - // TODO: thread this - init_user_routine(newfd, udp_port); + init_user(newfd, udp_port); return 1; } +/* + thread routine that manages the select() call + 1) copies the master set + 2) calls select() + 3) handles new connections or responds on new data from clients +*/ void *select_routine(void *arg) { int listener = (int) arg; // the listener fd @@ -705,181 +744,61 @@ void *select_routine(void *arg) { return (NULL); } -void *init_user_routine(int newfd, int udp_port) { - pthread_mutex_lock(&mutex_user_data); - - // FOLLOWING IS FOR MEMORY OPTIMIZATION - - // if the newfd is larger than the max, we need to resize the array - if(newfd > max_sockfd) { - max_sockfd = newfd*2; // double the array - int *more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1)); - if (!more_sockfd_to_user) { perror("realloc in init_user_routine1"); exit(1); } - sockfd_to_user = more_sockfd_to_user; - } - - // let's check if we can resuse the space from a previous user - int running_index = 0; - while(running_index < max_active_users) { - // -1 on sockfd indicates that we have directly assigned it when "destroying a user" - // -> so a previous user was there, but discconnected - if (user_data[running_index].sockfd == -1) { - break; // we found an index - } - running_index++; - } - - if (running_index == max_active_users) { - // if we're here, we went through the whole array to no avail, - // so we need to extend users array - max_active_users++; - user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); - if (!more_users) { perror("realloc in init_user_routine2"); exit(1); } - user_data = more_users; - } - - // map TCP sockfd to this user index - user_data[running_index] = (user_t){udp_port, -1, newfd}; - sockfd_to_user[newfd] = running_index; - - pthread_mutex_unlock(&mutex_user_data); - - // successfully created user, let's send the welcome - send_welcome_reply(newfd); - - return (NULL); -} - - -void *update_user_station(int sockfd, int stationNum) { - pthread_mutex_lock(&mutex_user_data); - user_data[sockfd_to_user[sockfd]].stationNum = stationNum; - pthread_mutex_unlock(&mutex_user_data); -} -void *print_user_data(int index) { - printf("user: %d -> udpPort: %d, stationNum: %d, sockfd: %d\n", index, - user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd); -} -void *print_station_data(int station) { - printf("station: %d -> filePath: %s, readfd: %d\n", - station, stations[station].filePath, stations[station].readfd); -} - -void destroy_user(int sockfd) { - close(sockfd); // bye! - FD_CLR(sockfd, &master); // remove from master set - - // remove user from data structures - pthread_mutex_lock(&mutex_user_data); - user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1}; - // map sockfd to -1 - sockfd_to_user[sockfd] = -1; - - pthread_mutex_unlock(&mutex_user_data); -} - - -void *get_in_addr(struct sockaddr *sa) -{ - if (sa->sa_family == AF_INET) { - return &(((struct sockaddr_in*)sa)->sin_addr); - } - - return &(((struct sockaddr_in6*)sa)->sin6_addr); -} - -void send_announce_reply(int fd, int station_num) { - if (l) printf("sending ANNNOUNCE reply to socket %d\n", fd); - - - char* file_path = stations[station_num].filePath; - int len_file_path = strlen(file_path); - - char *send_buffer = malloc(len_file_path+2); - if (!send_buffer) { - perror("malloc in send announce"); - return; - } - send_buffer[0] = 3; - send_buffer[1] = len_file_path; - - memcpy(send_buffer + 2, file_path, len_file_path); - - size_t bytes_to_send = len_file_path + 2; - if (send_all(fd, send_buffer, &bytes_to_send) == -1) - perror("send_all"); - - free(send_buffer); -} +// ---------------------------------------------------------------------------------------------------------- +// 5) PROTOCOL REPLY AND COMMAND FUNCTIONS IMPLEMENTATIONS +// ---------------------------------------------------------------------------------------------------------- +/* + given the tcpfd of a user under the impression there is new data, handles the command logic + 1) recv the command type (only 1 byte, so ok) + 2) check for each command type + 3) after basic error checking, calls the function that executes the reply (this may be threaded out) + note: returns 0 on failure and will destroy the user (returns 1 on suncess) +*/ +int handle_client_command(int clientfd) { + uint8_t command_type = -1; + if (recv(clientfd, &command_type, 1, 0) <= 0) // check user has disconnected + { + if (l) printf("socket %d HUNGUP. lost connection.\n", clientfd); -void send_invalid_reply(int fd, size_t message_size, char* message) { - char *send_buffer = malloc(message_size+2); - if (!send_buffer) { - perror("malloc in send invalid command"); - return; + destroy_user(clientfd); + return 0; } - // type and payload size - send_buffer[0] = 4; - send_buffer[1] = message_size; - - memcpy(send_buffer + 2, message, message_size); - - int bytes_to_send = message_size + 2; - if (send_all(fd, send_buffer, &bytes_to_send) == -1) - perror("send"); - - free(send_buffer); -} - -void *send_stationsinfo_reply(void * arg) { - int fd = (int) arg; - - if (l) printf("sending STATIONSINFO reply to socket %d\n", fd); + // check for each command type + if (command_type == HELLO) { + // if user already sent a HELLO, send back in invalid command + // since only the only the handshake handles HELLO + if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", clientfd); - uint32_t reply_size = 0; - for (int i = 0; i < num_stations; i++) { - if (stations[i].readfd != -1) - reply_size += snprintf(NULL, 0, "station #%d -> %s\n", i, stations[i].filePath); + char * message = "must not sent more than one Hello message"; + send_invalid_reply(clientfd, strlen(message), message); + destroy_user(clientfd); + return 0; } - reply_size--; // don't want final \n - // send type - uint8_t reply_num = 6; - if (send(fd, &reply_num, 1, 0) == -1) - perror("send in send stations info"); - - // send payload size - uint32_t reply_size_endian = htonl(reply_size); - int bytes = sizeof(uint32_t); - if (send_all(fd, &reply_size_endian, &bytes) == -1) - perror("send_all in send stations info"); - - char send_buffer[reply_size]; - int ptr = 0; - for (int i = 0; i < num_stations; i++) { - ptr += sprintf(send_buffer + ptr, "station %d -> %s\n", i, stations[i].filePath); + if (command_type == SETSTATION) { + if(!handle_setstation_command(clientfd)) { // failure + // remove user + destroy_user(clientfd); + return 0; + } + return 1; // sucess, return 1 } - int bytes_to_send = reply_size; // don't want final \n - if (send_all(fd, &send_buffer, &bytes_to_send) == -1) - perror("send_all buffer"); - - return (NULL); -} - -// Parses a buffer into tokens, from cs33 :) -int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { - const char *regex = " \n\t\f\r"; - char *current_token = strtok(buffer, regex); - if (current_token == NULL) return 0; + if (command_type == LISTSTATIONS) { + if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd); - for (int i = 0; current_token != NULL; i++) { - tokens[i] = current_token; - current_token = strtok(NULL, regex); + pthread_t send_thread; // send STATIONINFO from a different thread + pthread_create(&send_thread, NULL, send_stationsinfo_routine, clientfd); + return 1; } - return 1; + // if we're here, we got an invalid/unknown command + if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, clientfd); + char *message = "invalid command type"; + send_invalid_reply(clientfd, strlen(message), message); + destroy_user(clientfd); + return 0; } uint16_t handle_handshake(int newfd) @@ -921,9 +840,7 @@ uint16_t handle_handshake(int newfd) return ntohs(udp_port); } - - -int send_welcome_reply(int fd) { +uint8_t send_welcome_reply(int fd) { // returns 0 on failure if(l) printf("sending WELCOME reply to socket %d\n", fd); struct Welcome welcome; @@ -931,108 +848,266 @@ int send_welcome_reply(int fd) { welcome.numStations = htons(num_stations); int bytes_to_send = sizeof(struct Welcome); if (send_all(fd, &welcome, &bytes_to_send) == -1) { - perror("send_all (in init_user_routine)"); - return -1; + perror("send_all (in send_welcome_reply)"); + return 0; } return 1; } -int handle_setstation_command(int sockfd) { +/* + given the socket fd of the user, handles the SETSTATION command + note: returns 0 on failure and DOES NOT remove the user +*/ +uint8_t handle_setstation_command(int tcpfd) { // get the station number uint16_t station_number; int bytes_to_read = sizeof(uint16_t); - if (recv_all(sockfd, &station_number, &bytes_to_read) == -1) { + if (recv_all(tcpfd, &station_number, &bytes_to_read) == -1) { perror("recv_all"); return 0; } station_number = ntohs(station_number); // check if user has a udpPort to stream to - if (user_data[sockfd_to_user[sockfd]].udpPort == -1) { - if (l) printf("received a SETSTATION from socket %d before HELLO command. sending INVALID reply.\n", sockfd); + if (users[tcpfd_to_user[tcpfd]].udpPort == -1) { + if (l) printf("received a SETSTATION from socket %d before HELLO command. sending INVALID reply.\n", tcpfd); // send back in invalid command and drop user char * message = "must send Hello message first"; - send_invalid_reply(sockfd, strlen(message), message); + send_invalid_reply(tcpfd, strlen(message), message); return 0; } - if (l) printf("received a SETSTATION from socket %d with station_number %u\n", sockfd, station_number); + if (l) printf("received a SETSTATION from socket %d with station_number %u\n", tcpfd, station_number); // check if station num is in range if (station_number >= num_stations || station_number < 0) { - if (l) printf("station number invalid from socket %d. sending INVALID reply.\n", sockfd); + if (l) printf("station number invalid from socket %d. sending INVALID reply.\n", tcpfd); // send back in invalid command and drop user char * message = "station number out of range"; - send_invalid_reply(sockfd, strlen(message), message); + send_invalid_reply(tcpfd, strlen(message), message); return 0; } // check if station num has been removed if (stations[station_number].readfd == -1) { - send_stationshutdown_reply(sockfd); + send_stationshutdown_reply(tcpfd); return 1; } - update_user_station(sockfd, station_number); - send_announce_reply(sockfd, station_number); + update_user_station(tcpfd, station_number); + send_announce_reply(tcpfd, station_number); return 1; } -void send_stationshutdown_reply(int fd) { - if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd); +void update_user_station(int tcpfd, int stationNum) { + pthread_mutex_lock(&mutex_users); + users[tcpfd_to_user[tcpfd]].stationNum = stationNum; + pthread_mutex_unlock(&mutex_users); +} - uint8_t reply_num = 7; - if (send(fd, &reply_num, 1, 0) == -1) - perror("send in send stationshutdown"); +/* + given the station number of a station (as void* arg), sends an ANNOUNCE reply to all users on that station + note: this is a thread routine, but may not always be run in a thread +*/ +void *send_announce_routine(void *arg) { + // unpack arg + int station_num = (int) arg; + // send the announce messages + for (int i = 0; i < max_active_users; i++) + { + if (users[i].tcpfd == 0 || users[i].tcpfd == -1) + continue; + // send announce reply to each user + if (users[i].stationNum == station_num) + send_announce_reply(users[i].tcpfd, station_num); + } } -void destroy_station(int station_num) { - // check if station num is in range - if (station_num >= num_stations || station_num < 0) { - printf("remove: station number %d is out of range\n", station_num); +/* + given the socket fd and station number of the user, sends an ANNOUNCE reply to that user +*/ +void send_announce_reply(int fd, int station_num) { + if (l) printf("sending ANNNOUNCE reply to socket %d\n", fd); + + // get the file path + char* file_path = stations[station_num].filePath; + int len_file_path = strlen(file_path); + + // make & fill the buffer to send + char *send_buffer = malloc(len_file_path+2); + if (!send_buffer) { + perror("malloc in send announce"); return; } - // check if station had been removed - if (stations[station_num].readfd == -1) { - printf("remove: station %d has already been removed\n", station_num); + send_buffer[0] = ANNOUNCE; + send_buffer[1] = len_file_path; + memcpy(send_buffer + 2, file_path, len_file_path); + + // send it + int bytes_to_send = len_file_path + 2; + if (send_all(fd, send_buffer, &bytes_to_send) == -1) + perror("send_all"); + + // cleanup + free(send_buffer); +} + +void send_invalid_reply(int fd, size_t message_size, char* message) { + if (l) printf("sending INVALID reply to socket %d\n", fd); + + char *send_buffer = malloc(message_size+2); + if (!send_buffer) { + perror("malloc in send invalid command"); return; } - // send the stationshutdown command to users - for (int j = 0; j < max_active_users; j++) { - if (!user_data[j].sockfd || user_data[j].sockfd == -1) - continue; - if (user_data[j].stationNum == station_num) { - send_stationshutdown_reply(user_data[j].sockfd); - user_data[j].stationNum = -1; - } + // make & fill the buffer to send + send_buffer[0] = INVALID; + send_buffer[1] = message_size; + memcpy(send_buffer + 2, message, message_size); + + // send! + int bytes_to_send = message_size + 2; + if (send_all(fd, send_buffer, &bytes_to_send) == -1) + perror("send"); + + free(send_buffer); +} + +// ---------------------------------------------------------------------------------------------------------- +// 6) PROTOCOL REPLY AND COMMAND FUNCTIONS IMPLEMENTATIONS +// ---------------------------------------------------------------------------------------------------------- +/* + parses a buffer into tokens, from cs33 :) +*/ +int parse(char buffer[COMMAND_LINE_MAX], char *tokens[COMMAND_LINE_MAX / 2]) +{ + const char *regex = " \n\t\f\r"; + char *current_token = strtok(buffer, regex); + if (current_token == NULL) return 0; + + for (int i = 0; current_token != NULL; i++) { + tokens[i] = current_token; + current_token = strtok(NULL, regex); } - // cancel the stream's thread and close the read fd - pthread_cancel(stations[station_num].streamThread); - close(stations[station_num].readfd); - stations[station_num].readfd = -1; + return 1; +} - printf("remove: successfully removed station %d\n", station_num); +/* + given a file descriptor to write to, prints the station/user info in the desired format + note: it will close the fd inputted, except for stdout +*/ +void print_info_routine(int fd) { + // for each station, print the info + for (int i = 0; i < num_stations; i++) { + + // prints each station in the desired format + write_int_to_fd(fd, i); + char *comma = ","; + write(fd, comma, strlen(comma)); + + // write file path + char* file_path = stations[i].filePath; + write(fd, file_path, strlen(file_path)); + + // go through users, and print the udp ports + for (int j = 0; j < max_active_users; j++) { + if (!users[j].tcpfd || users[j].tcpfd == -1) + continue; + if (users[j].stationNum == i) { + char *localhost_ip = ",127.0.0.1:"; //TODO: possibly update + write(fd, localhost_ip, strlen(localhost_ip)); + // write udpPort + write_int_to_fd(fd, users[j].udpPort); + } + } + // wrtie new line + char *newline = "\n"; + write(fd, newline, strlen(newline)); + } + + // close the fd + if (fd != STDOUT_FILENO) close(fd); } -void send_newstation_reply(uint16_t station_num) { - if (l) printf("sending NEWSTATION reply to all sockets\n"); +/* helper to write int as a string format buffer to an fd */ +void write_int_to_fd(int fd, int n) { + int len = snprintf(NULL, 0, "%d", n); + char *num = malloc(len + 1); + if (!num) { perror("malloc write to fd"); return; } - // make the struct - uint8_t reply_num = 8; - uint16_t station_num_n = htons(station_num); - struct NewStation new_station = {reply_num, station_num_n}; - // send the message to each (valid) user + snprintf(num, len + 1, "%d", n); + if (write(fd, num, strlen(num)) == -1) { + perror("write"); + } + free(num); +} +void *print_user_data(int index) { + printf("user: %d -> udpPort: %d, stationNum: %d, tcpfd: %d\n", index, + users[index].udpPort, users[index].stationNum, users[index].tcpfd); +} +void *print_station_data(int station) { + printf("station: %d -> filePath: %s, readfd: %d\n", + station, stations[station].filePath, stations[station].readfd); +} +/* + to be called for a "graceful" exit + note: this will not close the listener +*/ +void cleanup_readfds_and_sockets() { + // close all the file descriptors for the stations + for (int i = 0; i < num_stations; i++) { + close(stations[i].readfd); + } + // close all the tcp connections for (int i = 0; i < max_active_users; i++) { - if (!user_data[i].sockfd || user_data[i].sockfd == -1) - continue; - int bytes_to_send = sizeof(struct NewStation); - if (send_all(user_data[i].sockfd, &new_station, &bytes_to_send) == -1) - perror("send_all in newstation reply"); + if (users[i].tcpfd != -1) { + close(users[i].tcpfd); + } + } +} + +// ---------------------------------------------------------------------------------------------------------- +// 7) EXTRA CREDIT IMPLEMENTATIONS (no broad function comments below, but it is all pretty self explanatory) +// ---------------------------------------------------------------------------------------------------------- +void *send_stationsinfo_routine(void * arg) { + int fd = (int) arg; + + if (l) printf("sending STATIONSINFO reply to socket %d\n", fd); + + uint32_t reply_size = 0; + for (int i = 0; i < num_stations; i++) { + if (stations[i].readfd != -1) + reply_size += snprintf(NULL, 0, "station #%d -> %s\n", i, stations[i].filePath); + } + reply_size--; // don't want final \n + + // send type + uint8_t reply_num = 6; + if (send(fd, &reply_num, 1, 0) == -1) + perror("send in send stations info"); + + // send payload size + uint32_t reply_size_endian = htonl(reply_size); + int bytes = sizeof(uint32_t); + if (send_all(fd, &reply_size_endian, &bytes) == -1) + perror("send_all in send stations info"); + + char send_buffer[reply_size]; + int ptr = 0; + for (int i = 0; i < num_stations; i++) { + if (stations[i].readfd != -1) + ptr += sprintf(send_buffer + ptr, "station #%d -> %s\n", i, stations[i].filePath); } + + int bytes_to_send = reply_size; // don't want final \n + if (send_all(fd, &send_buffer, &bytes_to_send) == -1) + perror("send_all buffer"); + + return (NULL); } void add_station(char *file_path) { @@ -1080,20 +1155,66 @@ void add_station(char *file_path) { pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, num_stations); send_newstation_reply(num_stations); - num_stations++; printf("add: successfully created station @ index %d\n", num_stations); + + num_stations++; } -void cleanup_readfds_and_sockets() { - // close all the file descriptors for the stations - for (int i = 0; i < num_stations; i++) { - close(stations[i].readfd); +void destroy_station(int station_num) { + // check if station num is in range + if (station_num >= num_stations || station_num < 0) { + printf("remove: station number %d is out of range\n", station_num); + return; + } + // check if station had been removed + if (stations[station_num].readfd == -1) { + printf("remove: station %d has already been removed\n", station_num); + return; } - // close all the tcp connections - for (int i = 0; i < max_active_users; i++) { - if (user_data[i].sockfd != -1) { - close(user_data[i].sockfd); + // send the stationshutdown command to users + for (int j = 0; j < max_active_users; j++) { + if (!users[j].tcpfd || users[j].tcpfd == -1) + continue; + if (users[j].stationNum == station_num) { + send_stationshutdown_reply(users[j].tcpfd); + users[j].stationNum = -1; } } + + // cancel the stream's thread and close the read fd + pthread_cancel(stations[station_num].streamThread); + close(stations[station_num].readfd); + stations[station_num].readfd = -1; + + printf("remove: successfully removed station %d\n", station_num); } + +void send_newstation_reply(uint16_t station_num) { + if (l) printf("sending NEWSTATION reply to all sockets\n"); + + // make the struct + uint8_t reply_num = 8; + uint16_t station_num_n = htons(station_num); + struct NewStation new_station = {reply_num, station_num_n}; + // send the message to each (valid) user + for (int i = 0; i < max_active_users; i++) { + if (!users[i].tcpfd || users[i].tcpfd == -1) + continue; + int bytes_to_send = sizeof(struct NewStation); + if (send_all(users[i].tcpfd, &new_station, &bytes_to_send) == -1) + perror("send_all in newstation reply"); + } +} + +void send_stationshutdown_reply(int fd) { + if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd); + + uint8_t reply_num = 7; + if (send(fd, &reply_num, 1, 0) == -1) + perror("send in send stationshutdown"); +} + +// --------------------------------------------------------------------------------------------------------- +// EOF :) +// ---------------------------------------------------------------------------------------------------------
\ No newline at end of file diff --git a/snowcast_control b/snowcast_control Binary files differindex e76cedf..4b59e54 100755 --- a/snowcast_control +++ b/snowcast_control diff --git a/snowcast_listener b/snowcast_listener Binary files differindex f27a5c1..455fcdc 100755 --- a/snowcast_listener +++ b/snowcast_listener diff --git a/snowcast_server b/snowcast_server Binary files differindex d3f5e04..9733366 100755 --- a/snowcast_server +++ b/snowcast_server |