/* author: sotech117 date: 9/25/2023 course: csci1680 description: server for snowcast, a music streaming service disclaimer: I have outlined the structure of the server by categorizing the code into sections. Each category is implemented in the order that it appears in the declaration below. enjoy :) */ #include #include #include #include #include #include #include #include #include #include #include #include #include "protocol.c" // ---------------------------------------------------------------------------------------------------------- // 0) MACROS // ---------------------------------------------------------------------------------------------------------- #define COMMAND_LINE_MAX 1024 #define BROADCAST_OFFSET 10000 // how long to "allow" the threads to broadcast #define MAX_RATE_PER_SECOND 16 * 1024 // stream rate per second #define BROADCASTS_PER_SECOND 2 // how often to "seek" the file #define FILE_READ_SIZE MAX_RATE_PER_SECOND / BROADCASTS_PER_SECOND // ---------------------------------------------------------------------------------------------------------- // 1) STATION DATA AND FUNCTIONS // ---------------------------------------------------------------------------------------------------------- typedef struct station { pthread_t streamThread; int readfd; char *filePath; } station_t; int num_stations; pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER; station_t *stations; int setup_stations(int argc, char *argv[]); void *stream_routine(void *arg); void stream_routine_cleanup(void *arg); int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num); // ---------------------------------------------------------------------------------------------------------- // 2) USER DATA AND FUNCTIONS // ---------------------------------------------------------------------------------------------------------- typedef struct user { int tcpfd; int stationNum; int udpPort; // need this (& the following) for udp packet sending int udpfd; socklen_t addrlen; struct sockaddr addr; } user_t; int max_active_users = 0; // maximum number of users that have been connected at once int max_tcpfd = 0; // maximum tcpfd for users that has been seen pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER; user_t *users; int *tcpfd_to_user; // maps tcpfd to user index void init_user(int tcpfd, int udp_port); void destroy_user(int tcpfd); // ---------------------------------------------------------------------------------------------------------- // 3) UDP STREAM DATA AND FUNCTIONS // ---------------------------------------------------------------------------------------------------------- struct send_udp_packet_routine_args { int user_index; int buffer_size; char *file_buffer; }; // not used, but nice to have as a guideline int start_threads = 0; // for synchronization, with the cond pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // for synchronization, with the following routine void *send_udp_packet_routine(void* arg); int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, struct sockaddr *addr); int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen); // ---------------------------------------------------------------------------------------------------------- // 4) TCP LISTENER DATA AND FUNCTIONS // -------------------------------------------------------------------------å--------------------------------- fd_set master; // master file list for socket descriptors int fdmax; // maximum file descriptor number int setup_listener(const char* port); int handle_new_connection(int listener); void *select_routine(void *arg); // ---------------------------------------------------------------------------------------------------------- // 5) PROTOCOL REPLY AND COMMAND FUNCTIONS // ---------------------------------------------------------------------------------------------------------- int handle_client_command(int clientfd); uint16_t handle_handshake(int newfd); uint8_t send_welcome_reply(int fd); uint8_t handle_setstation_command(int fd); void update_user_station(int tcpfd, int stationNum); // setstation helper void *send_announce_routine(void* arg); void send_announce_reply(int fd, int station_num); // called in send_announce_routine void send_invalid_reply(int fd, size_t message_size, char* message); // ---------------------------------------------------------------------------------------------------------- // 6) COMMAND LINE INTERFACE FUNCTIONS // ---------------------------------------------------------------------------------------------------------- uint8_t l = 0; // boolean for logging -> if (l) { printf("..."); } int parse(char buffer[COMMAND_LINE_MAX], char *tokens[COMMAND_LINE_MAX / 2]); void print_info(int fd); void write_int_to_fd(int fd, int n); // helper to print_info_routine void print_user_data(int index); void print_station_data(int station); void cleanup_readfds_and_sockets(); // --------------------------------------------------------------------------------------------------------- // 7) EXTRA CREDIT // --------------------------------------------------------------------------------------------------------- // ability for client to receive station list void send_stationinfo_reply(int clientfd); // sends station info (index, name) to client // ability for server to add/remove stations through command line void add_station(char *station_name); void send_newstation_reply(uint16_t station_num); // notifies users that a new station has been added void destroy_station(int station_num); void send_stationshutdown_reply(int fd); // sends to users if listening to a station that is destroyed // ---------------------------------------------------------------------------------------------------------- // 0) MAIN FUNCTION // ---------------------------------------------------------------------------------------------------------- main(int argc, char *argv[]) // no int here for good luck :) { // CHECK AND USE ARGUMENTS // ------------------------------------------------------------------------------------------------- if (argc < 3) { fprintf(stderr,"usage: ./snowcast_server [file 1] [file 2] ... \n"); exit(1); } // get listener's port const char* server_port = argv[1]; // initialize the stations & their threads, from if (setup_stations(argc, argv) == -1) { perror("setup_stations"); exit(1); } // INTIALIZE USER DATA STRUCTURES // ------------------------------------------------------------------------------------------------- // make pointers user data structures users = malloc(sizeof(user_t) * max_active_users); if (!users) { perror("malloc userdata"); return 1; } tcpfd_to_user = malloc(sizeof(int) * max_active_users); if (!tcpfd_to_user) { perror("malloc tcpfd to user"); return 1; } // LISTENER LIST. SOCKER, and SELECT THREAD // ------------------------------------------------------------------------------------------------- // setup the listener fd to accept new connections FD_ZERO(&master); // clear the master set int listenerfd = setup_listener(server_port); // make and start "select" thread that manages: // 1) new connections, 2) requests from current connections, 3) closing connections pthread_t select_thread; pthread_create(&select_thread, NULL, select_routine, (void *) &listenerfd); // BELOW IS FOR THE COMMAND LINE INTERFACE // -------------------------------------------------------------------------------------------------- // command line data structures char input[COMMAND_LINE_MAX]; memset(input, 0, COMMAND_LINE_MAX); char *tokens[COMMAND_LINE_MAX / 2]; printf("snowcast_server> "); // very cute to have :) fflush(stdout); while (read(STDIN_FILENO, input, COMMAND_LINE_MAX) > 0) { // init tokens memset(tokens, 0, COMMAND_LINE_MAX / 2); // if 0, all whitespace if (!parse(input, tokens)) { printf("snowcast_server> "); fflush(stdout); continue; } char *command = tokens[0]; // if "q" command: shutdown & close tcp sockets! if (!strcmp(command, "q")) { printf("Exiting.\n"); close(listenerfd); cleanup_readfds_and_sockets(); break; } // if "p" command: print info else if (!strcmp(command, "p")) { // get the file path to print to int print_fd; // see if there is a file path to print to, // if so, open it into print_fd char *output_file_path = tokens[1]; if (output_file_path != NULL) { if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1) { perror("open"); memset(input, 0, COMMAND_LINE_MAX); continue; } } else print_fd = STDOUT_FILENO; // print the info print_info(print_fd); } // if "u" command: print user data (debugging) else if (!strcmp(command, "u")) for (int i = 0; i < max_active_users; i++) print_user_data(i); // if "s" command: print station data (debugging) else if (!strcmp(command, "s")) for (int i = 0; i < num_stations; i++) print_station_data(i); // if "log" command: start logging (debugging) else if (!strcmp(command, "log")) { l= !l; printf("logging is now %s\n", l ? "on" : "off"); } // if "add" command: add a station (EXTRA CREDIT) else if (!strcmp(command, "add")) { // add a new station // get the file path char *file_path = tokens[1]; if (file_path == NULL) { printf("add: must provide a file path\n"); } else add_station(file_path); // add the station } // if "remove" command: remove a station (EXTRA CREDIT) else if (!strcmp(command, "remove")) { // remove a station if (tokens[1] == NULL) { printf("remove: must provide a station number to remove\n"); } else if ((tokens[1][0] != '0' && atoi(tokens[1]) == 0)) { printf("remove: must provide a number\n"); } else destroy_station(atoi(tokens[1])); } else { printf("unknown command: %s\n", command); } memset(input, 0, COMMAND_LINE_MAX); printf("snowcast_server> "); fflush(stdout); } return 0; } // ---------------------------------------------------------------------------------------------------------- // 1) STATION DATA AND FUNCTIONS IMPLEMENTATIONS // ---------------------------------------------------------------------------------------------------------- /* given the command line arguments, setups the stations. 1) mallocs the stations pointer 2) assigns the stations' variables (filepath & opens file into readfd) 3) starts the routine for each station */ int setup_stations(int argc, char *argv[]) { num_stations = argc - 2; // get the size to malloc int totalSize = 0; for(int i = 2; i < argc; i++) { totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); } // malloc the stations array stations = malloc(totalSize); if (!stations) { perror("malloc (stations pointer)"); return -1; } // assign the stations, and start the threads for (int i = 0; i < num_stations; i++) { stations[i].filePath = argv[i+2]; stations[i].readfd = open(argv[i+2], O_RDONLY); if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; } pthread_create(&stations[i].streamThread, NULL, stream_routine, (void *) (uint64_t) i); // uint64_t to avoid warning in compiler } return 1; } /* thread route for each station, given the station num 1) reads the section of the file into a buffer 2) creates a thread for each user that is listening to the station 3) waits for the threads to be created, then broadcasts the cond variable to let them run note: you can modify how often and how much is read off the file by changing the MACROS */ void *stream_routine(void *arg) { uint64_t station_num = (uint64_t) arg; // printf("stream routine %d\n", station_num); int read_fd = stations[station_num].readfd; pthread_cleanup_push(stream_routine_cleanup, (void *) &read_fd); // make buffer for read_file char buffer[FILE_READ_SIZE]; for (;;) { // load bytes from file into buffer memset(buffer, 0, FILE_READ_SIZE); int bytes_read = read_file(read_fd, buffer, station_num); if (bytes_read == -1) { return (NULL); } // create the threads to send packets to users, which will be released later int *send_buffer; int sent = 0; for (int i = 0; i < max_active_users; i++) { if (!users[i].tcpfd || users[i].tcpfd == -1) continue; if (users[i].stationNum == station_num) { // prepare the send buffer // (note: using int* for easy pointer assignment) send_buffer = malloc(2 + bytes_read); if (!send_buffer) { perror("malloc send_buffer in stream_routine"); return (NULL); } send_buffer[0] = i; send_buffer[1] = bytes_read; memcpy(send_buffer+2, buffer, bytes_read); // make thread to send packet data pthread_t send_udp_packet_thread; pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer); sent = 1; } } // wait for the thread to be created usleep(1000000 / BROADCASTS_PER_SECOND - BROADCAST_OFFSET); // do -1000 for the usleep below // let the threads run! start_threads = 1; pthread_cond_broadcast(&cond); // give some time to broadcast, then reset variables usleep(BROADCAST_OFFSET); start_threads = 0; // free the buffer after it's been sent if (sent) free(send_buffer); } return (NULL); pthread_cleanup_pop(1); } void stream_routine_cleanup(void *arg) { // closes the read fd if thread is cancelled int read_fd = * (int *) arg; close(read_fd); } /* reads the file into a buffer. returns the number of bytes read note: if it reaches the end of a file, it will send an ANNOUNCE message to all users */ int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num) { // see if fd was closed at some point if (fd == -1) return -1; int bytes_read = read(fd, buffer, FILE_READ_SIZE); if (bytes_read < 0) { perror("read (in read file)"); return -1; } // printf("bytes read: %d\n", bytes_read); if (bytes_read == 0) { // printf("end of file, restarting\n"); pthread_t send_announce_thread; pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *) (uint64_t) station_num); if (lseek(fd, 0, SEEK_SET) == -1) { perror("lseek (in resarting file)"); return -1; } bytes_read = read(fd, buffer, FILE_READ_SIZE); if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; } } return bytes_read; } // ---------------------------------------------------------------------------------------------------------- // 2) USER DATA AND FUNCTIONS IMPLEMENTATIONS // ---------------------------------------------------------------------------------------------------------- /* given the newfd and udp_port of a newly connected user, initializes them to the data structure 1) update the tcpfd_to_user map size if needed 2) find the first available index in the users array 3) make a new user, set the tcp info & udp info 4) map the tcpfd to the user index 5) send WELCOME reply note: this function does its best to optimize memory (dynamic resize of tcpfd map & resusing space in array) */ void init_user(int newfd, int udp_port) { pthread_mutex_lock(&mutex_users); // FOLLOWING IS FOR MEMORY OPTIMIZATION // if the newfd is larger than the max, we need to resize the array if(newfd > max_tcpfd) { max_tcpfd = newfd*2; // double the array int *more_tcpfd_to_user = realloc(tcpfd_to_user, sizeof(int) * (max_tcpfd + 1)); if (!more_tcpfd_to_user) { perror("realloc in init_user1"); exit(1); } tcpfd_to_user = more_tcpfd_to_user; } // let's check if we can resuse the space from a previous user int running_index = 0; while(running_index < max_active_users) { // -1 on tcpfd indicates that we have directly assigned it when "destroying a user" // -> so a previous user was there, but discconnected if (users[running_index].tcpfd == -1) { break; // we found an index } running_index++; } if (running_index == max_active_users) { // if we're here, we went through the whole array to no avail, // so we need to extend users array max_active_users++; user_t *more_users = realloc(users, sizeof(user_t) * max_active_users); if (!more_users) { perror("realloc in init_user2"); exit(1); } users = more_users; } // make new user, set tcp info user_t new_user; new_user.tcpfd = newfd; new_user.stationNum = -1; new_user.udpPort = udp_port; // set the udp info if(setup_udp_connection(udp_port, &new_user.udpfd, &new_user.addrlen, &new_user.addr) == -1) { fprintf(stderr, "failure in init_user (setup_udp_connection)"); return; // error occured } // map TCP tcpfd to this user index users[running_index] = new_user; tcpfd_to_user[newfd] = running_index; pthread_mutex_unlock(&mutex_users); // successfully created user, let's send the welcome send_welcome_reply(newfd); } /* given the tcpfd of a user, destroys them from the data structures 1) close both tcpfd and udpfd of user 2) remove from master set 3) remove from data structures note: destroying a user sets all fields of that user to -1. often, this is how it's known if a user exists */ void destroy_user(int tcpfd) { close(tcpfd); // close tcpfd! FD_CLR(tcpfd, &master); // remove from master set if (tcpfd_to_user[tcpfd] == -1) return; // if -1, user has already been destroyed close(users[tcpfd_to_user[tcpfd]].udpfd); // close udpfd // remove user from data structures pthread_mutex_lock(&mutex_users); users[tcpfd_to_user[tcpfd]] = (user_t) { -1, -1, -1, -1, 0, 0 }; // set all fields to -1 or 0 // map tcpfd to -1 tcpfd_to_user[tcpfd] = -1; pthread_mutex_unlock(&mutex_users); } // ---------------------------------------------------------------------------------------------------------- // 3) UDP STREAM DATA AND FUNCTIONS IMPLEMENTATIONS // ---------------------------------------------------------------------------------------------------------- /* thread routine that streams the binary data to the user 1) unpack the arguments (see struct send_udp_packet_routine_args) 2) setup udp socket 3) wait for the cond variable to be broadcasted 4) send the data over udp */ void *send_udp_packet_routine(void *arg) { // unpack args int *buf = arg; int user_index = buf[0]; int buffer_size = buf[1]; char *data_ptr = (char *) (buf + 2); // add two will skip first ints, since int* buf // setup variables int did_work = 1; pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&m); // wait for threads did_work = 0; while (!start_threads) { pthread_cond_wait(&cond, &m); } int station_num = users[user_index].stationNum; if (station_num == -1) { did_work = 1; } // send the data! (no error check since udp) send_all_udp(users[user_index].udpfd, (char *) data_ptr, &buffer_size, &users[user_index].addr, users[user_index].addrlen); pthread_mutex_unlock(&m); return (NULL); } /* given the udp_port, makes a socket and returns it's file descriptor. (returns -1 on failure) note: pointers to the desired vairables to be set (udp_sockfd, addrlen, addr) must be passed in, this function will also set those variables */ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, struct sockaddr* addr) { // setup hints to get udp_port struct addrinfo thread_hints, *thread_res, *thread_servinfo; memset(&thread_hints, 0, sizeof thread_hints); thread_hints.ai_family = AF_INET; // use IPv4 only thread_hints.ai_socktype = SOCK_DGRAM; // udp thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me // convert port uint16 into a string for getaddrinfo int length = snprintf( NULL, 0, "%d", udp_port ); char* port = malloc( length + 1 ); if (!port) { perror("malloc on port"); return -1; } snprintf(port, length + 1, "%d", udp_port); sprintf(port, "%d", udp_port); // resolve host int error_code; if ((error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo)) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); return -1; } free(port); // make the socket int sock; for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { if ((sock = socket(thread_res->ai_family, thread_res->ai_socktype, thread_res->ai_protocol)) == -1) { perror("socket in setup_udp_connection"); continue; } break; } if (sock == -1) { fprintf(stderr, "socket: failed to create udp socket(setup_udp_connection)\n"); return -1; } *udp_sockfd = sock; *addrlen = thread_res->ai_addrlen; *addr = *thread_res->ai_addr; return 1; } /* sends the binary data over udp, but limits individual packets to MAX_PACKET_SIZE note: the use of pointers modifying the arguments */ int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen) { int total = 0; // how many bytes we've sent int bytesleft = *len; // how many we have left to send int n; while(total < *len) { n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, addr, addrlen); if (n == -1) { break; } total += n; bytesleft -= n; } *len = total; // return number actually sent here return n==-1?-1:0; // return -1 on failure, 0 on success } // ---------------------------------------------------------------------------------------------------------- // 4) TCP LISTENER DATA AND FUNCTIONS IMPLEMENTATIONS // ---------------------------------------------------------------------------------------------------------- /* given the port, sets up the listener socket on that port (getaddrinfo->socket->bind->listen) returns the file descriptor of the socket! note: ends whole program on failure */ int setup_listener(const char* port) { int listener; // listening socket descriptor, to be set // setup and bind to listener struct addrinfo hints, *ai, *p; memset(&hints, 0, sizeof hints); hints.ai_family = AF_INET; // Ipv4 hints.ai_socktype = SOCK_STREAM; // TCP hints.ai_flags = AI_PASSIVE; // auto select int rv; if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); exit(1); } for(p = ai; p != NULL; p = p->ai_next) { listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); if (listener < 0) { continue; } // lose the pesky "address already in use" error message int yes=1; setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { close(listener); continue; } break; } // if we got here, it means we didn't get bound if (p == NULL) { fprintf(stderr, "snowcast_server: failed to bind\n"); exit(2); } freeaddrinfo(ai); // all done with this // listen if (listen(listener, 10) == -1) { perror("listen"); exit(3); } // add the listener to the master set FD_SET(listener, &master); // initialize fdmax. listener will be greatest, // since after the stations are cerated fdmax = listener; return listener; } /* calls accept() when the listener sees has a new connection returns 1 on success, 0 on failure */ int handle_new_connection(int listener) { // accept new connection struct sockaddr_storage remoteaddr; socklen_t addrlen = sizeof remoteaddr; int newfd = accept(listener, (struct sockaddr *)&remoteaddr, &addrlen); // check if newfd is valid if (newfd == -1) { perror("accept"); return 0; } // new user, we need to do the handshake uint16_t udp_port = handle_handshake(newfd); if (udp_port == 0) { // 0 is used for err since unsigned can't be negative // drop connection upon error close(newfd); return 0; } // determine if we've seen this udp_port before for (int i = 0; i < max_active_users; i++) { if (users[i].udpPort == udp_port) { // if we have, we need to close the fd and move on fprintf(stderr, "error connecting new user: udp port already in use\n"); return 0; } } // add the user to the set FD_SET(newfd, &master); // add to master set if (newfd > fdmax) { // keep track of the max fdmax = newfd; } init_user(newfd, udp_port); return 1; } /* thread routine that manages the select() call 1) copies the master set 2) calls select() 3) handles new connections or responds on new data from clients */ void *select_routine(void *arg) { int listener = * (int *) arg; // the listener fd fd_set read_fds; // temp file descriptor list for select() FD_ZERO(&read_fds); // clear the temp set for (;;) { read_fds = master; // copy set if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { perror("select"); exit(4); } // run through the existing connections looking for data to read for(int i = 0; i <= fdmax; i++) { if (!FD_ISSET(i, &read_fds)) { // no new connections continue; } // if listener, then we need to add this new connection if (i == listener) { handle_new_connection(listener); continue; } // if not the listener, then we need to handle new data from the client handle_client_command(i); } } return (NULL); } // ---------------------------------------------------------------------------------------------------------- // 5) PROTOCOL REPLY AND COMMAND FUNCTIONS IMPLEMENTATIONS // ---------------------------------------------------------------------------------------------------------- /* given the tcpfd of a user under the impression there is new data, handles the command logic 1) recv the command type (only 1 byte, so ok) 2) check for each command type 3) after basic error checking, calls the function that executes the reply (this may be threaded out) note: returns 0 on failure and will destroy the user (returns 1 on suncess) */ int handle_client_command(int clientfd) { uint8_t command_type = -1; if (recv(clientfd, &command_type, 1, 0) <= 0) // check user has disconnected { if (l) printf("socket %d HUNGUP. lost connection.\n", clientfd); destroy_user(clientfd); return 0; } // check for each command type if (command_type == HELLO) { // if user already sent a HELLO, send back in invalid command // since only the only the handshake handles HELLO if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", clientfd); char * message = "must not sent more than one Hello message"; send_invalid_reply(clientfd, strlen(message), message); destroy_user(clientfd); return 0; } if (command_type == SETSTATION) { if(!handle_setstation_command(clientfd)) { // on failure, remove user destroy_user(clientfd); return 0; } return 1; // sucess, return 1 } if (command_type == LISTSTATIONS) { if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd); send_stationinfo_reply(clientfd); return 1; } // if we're here, we got an invalid/unknown command if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, clientfd); char *message = "invalid command type"; send_invalid_reply(clientfd, strlen(message), message); destroy_user(clientfd); return 0; } uint16_t handle_handshake(int newfd) { if (l) printf("waiting for HELLO command. new connection on socket %d\n", newfd); // apply timeout to socket since handshake must be completed quickly apply_timeout(newfd); uint8_t command_type = -1; if (recv(newfd, &command_type, 1, 0) <= 0) { // got error or connection closed by client if (l) printf("socket %d HUNGUP in handshake. lost connection.\n", newfd); close(newfd); return 0; } if (command_type != 0) { char * message = "must send a Hello message first"; send_invalid_reply(newfd, strlen(message), message); close(newfd); return 0; } // get the udp port uint16_t udp_port; int bytes_to_read = sizeof(uint16_t); if (recv_all(newfd, (char *) &udp_port, &bytes_to_read) == -1) { perror("recv_all"); close(newfd); return 0; } // remove timeout, as we aren't "waiting" for immediate reply remove_timeout(newfd); if(l) printf("received HELLO from socket %d with udp port %u\n", newfd, ntohs(udp_port)); return ntohs(udp_port); } uint8_t send_welcome_reply(int fd) { // returns 0 on failure if(l) printf("sending WELCOME reply to socket %d\n", fd); struct Welcome welcome; welcome.replyType = 2; welcome.numStations = htons(num_stations); int bytes_to_send = sizeof(struct Welcome); if (send_all(fd, (char *) &welcome, &bytes_to_send) == -1) { perror("send_all (in send_welcome_reply)"); return 0; } return 1; } /* given the socket fd of the user, handles the SETSTATION command note: returns 0 on failure and DOES NOT remove the user */ uint8_t handle_setstation_command(int tcpfd) { // get the station number uint16_t station_number; int bytes_to_read = sizeof(uint16_t); if (recv_all(tcpfd, (char *) &station_number, &bytes_to_read) == -1) { perror("recv_all"); return 0; } station_number = ntohs(station_number); // check if user has a udpPort to stream to if (users[tcpfd_to_user[tcpfd]].udpfd == -1) { if (l) printf("received a SETSTATION from socket %d before HELLO command. sending INVALID reply.\n", tcpfd); // send back in invalid command and drop user char * message = "must send Hello message first"; send_invalid_reply(tcpfd, strlen(message), message); return 0; } if (l) printf("received a SETSTATION from socket %d with station_number %u\n", tcpfd, station_number); // check if station num is in range if (station_number >= num_stations || station_number < 0) { if (l) printf("station number invalid from socket %d. sending INVALID reply.\n", tcpfd); // send back in invalid command and drop user char * message = "station number out of range"; send_invalid_reply(tcpfd, strlen(message), message); return 0; } // check if station num has been removed if (stations[station_number].readfd == -1) { send_stationshutdown_reply(tcpfd); return 1; } update_user_station(tcpfd, station_number); send_announce_reply(tcpfd, station_number); return 1; } void update_user_station(int tcpfd, int stationNum) { pthread_mutex_lock(&mutex_users); users[tcpfd_to_user[tcpfd]].stationNum = stationNum; pthread_mutex_unlock(&mutex_users); } /* given the station number of a station (as void* arg), sends an ANNOUNCE reply to all users on that station note: this is a thread routine, but may not always be run in a thread */ void *send_announce_routine(void *arg) { // unpack arg uint64_t station_num = (uint64_t) arg; // send the announce messages for (int i = 0; i < max_active_users; i++) { if (users[i].tcpfd == 0 || users[i].tcpfd == -1) continue; // send announce reply to each user if (users[i].stationNum == station_num) send_announce_reply(users[i].tcpfd, station_num); } return (NULL); } /* given the socket fd and station number of the user, sends an ANNOUNCE reply to that user */ void send_announce_reply(int fd, int station_num) { if (l) printf("sending ANNNOUNCE reply to socket %d\n", fd); // get the file path char* file_path = stations[station_num].filePath; int len_file_path = strlen(file_path); // make & fill the buffer to send char *send_buffer = malloc(len_file_path+2); if (!send_buffer) { perror("malloc in send announce"); return; } send_buffer[0] = ANNOUNCE; send_buffer[1] = len_file_path; memcpy(send_buffer + 2, file_path, len_file_path); // send it int bytes_to_send = len_file_path + 2; if (send_all(fd, send_buffer, &bytes_to_send) == -1) perror("send_all"); // cleanup free(send_buffer); } void send_invalid_reply(int fd, size_t message_size, char* message) { if (l) printf("sending INVALID reply to socket %d\n", fd); char *send_buffer = malloc(message_size+2); if (!send_buffer) { perror("malloc in send invalid command"); return; } // make & fill the buffer to send send_buffer[0] = INVALID; send_buffer[1] = message_size; memcpy(send_buffer + 2, message, message_size); // send! int bytes_to_send = message_size + 2; if (send_all(fd, send_buffer, &bytes_to_send) == -1) perror("send"); free(send_buffer); } // ---------------------------------------------------------------------------------------------------------- // 6) PROTOCOL REPLY AND COMMAND FUNCTIONS IMPLEMENTATIONS // ---------------------------------------------------------------------------------------------------------- /* parses a buffer into tokens, from cs33 :) */ int parse(char buffer[COMMAND_LINE_MAX], char *tokens[COMMAND_LINE_MAX / 2]) { const char *regex = " \n\t\f\r"; char *current_token = strtok(buffer, regex); if (current_token == NULL) return 0; for (int i = 0; current_token != NULL; i++) { tokens[i] = current_token; current_token = strtok(NULL, regex); } return 1; } /* given a file descriptor to write to, prints the station/user info in the desired format note: it will close the fd inputted, except for stdout */ void print_info(int fd) { // for each station, print the info for (int i = 0; i < num_stations; i++) { // prints each station in the desired format write_int_to_fd(fd, i); char *comma = ","; write(fd, comma, strlen(comma)); // write file path char* file_path = stations[i].filePath; write(fd, file_path, strlen(file_path)); // go through users, and print the udp ports for (int j = 0; j < max_active_users; j++) { if (!users[j].tcpfd || users[j].tcpfd == -1) continue; if (users[j].stationNum == i) { char *localhost_ip = ",127.0.0.1:"; //TODO: possibly update write(fd, localhost_ip, strlen(localhost_ip)); // write udpPort write_int_to_fd(fd, users[j].udpPort); } } // wrtie new line char *newline = "\n"; write(fd, newline, strlen(newline)); } // close the fd if (fd != STDOUT_FILENO) close(fd); } /* helper to write int as a string format buffer to an fd */ void write_int_to_fd(int fd, int n) { int len = snprintf(NULL, 0, "%d", n); char *num = malloc(len + 1); if (!num) { perror("malloc write to fd"); return; } snprintf(num, len + 1, "%d", n); if (write(fd, num, strlen(num)) == -1) { perror("write"); } free(num); } void print_user_data(int index) { printf("user: %d -> tcpfd: %d, stationNum: %d, udpPort: %d, udpfd: %d, addrlen: %d\n", index, users[index].tcpfd, users[index].stationNum, users[index].udpPort, users[index].udpfd, users[index].addrlen); } void print_station_data(int station) { printf("station: %d -> filePath: %s, readfd: %d\n", station, stations[station].filePath, stations[station].readfd); } /* to be called for a "graceful" exit note: this will not close the listener */ void cleanup_readfds_and_sockets() { // close all the file descriptors for the stations for (int i = 0; i < num_stations; i++) { close(stations[i].readfd); } // close all the tcp connections for (int i = 0; i < max_active_users; i++) { if (users[i].tcpfd != -1) { close(users[i].tcpfd); } } } // ---------------------------------------------------------------------------------------------------------- // 7) EXTRA CREDIT IMPLEMENTATIONS (no broad function comments below, but it is all pretty self explanatory) // ---------------------------------------------------------------------------------------------------------- void send_stationinfo_reply(int clientfd) { if (l) printf("sending STATIONSINFO reply to socket %d\n", clientfd); uint32_t reply_size = 0; for (int i = 0; i < num_stations; i++) { if (stations[i].readfd != -1) reply_size += snprintf(NULL, 0, "station #%d -> %s\n", i, stations[i].filePath); } reply_size--; // don't want final \n // send type uint8_t reply_num = 6; if (send(clientfd, &reply_num, 1, 0) == -1) perror("send in send stations info"); // send payload size uint32_t reply_size_endian = htonl(reply_size); int bytes = sizeof(uint32_t); if (send_all(clientfd, (char *) &reply_size_endian, &bytes) == -1) perror("send_all in send stations info"); char send_buffer[reply_size]; int ptr = 0; for (int i = 0; i < num_stations; i++) { if (stations[i].readfd != -1) ptr += sprintf(send_buffer + ptr, "station #%d -> %s\n", i, stations[i].filePath); } int bytes_to_send = reply_size; // don't want final \n if (send_all(clientfd, (char *) &send_buffer, &bytes_to_send) == -1) perror("send_all buffer"); } void add_station(char *file_path) { // run through array and see if we've seen file path before for (int i = 0; i < num_stations; i++) { if (strcmp(stations[i].filePath, file_path) == 0) { // this station is still active if (stations[i].readfd != -1) { printf("add: (warning) filepath %s is already streaming on station %d\n", file_path, i); } else { // this station has been removed // reopen the file stations[i].readfd = open(file_path, O_RDONLY); if (stations[i].readfd < 0) { perror("read (from add station)"); return; } pthread_create(&stations[i].streamThread, NULL, stream_routine, (void *) (uint64_t) i); send_newstation_reply(i); return; } } } // get the size to malloc int totalSize = 0; for(int i = 0; i < num_stations; i++) { totalSize += sizeof(pthread_t) + sizeof(int) + strlen(stations[i].filePath); } totalSize += sizeof(pthread_t) + sizeof(int) + strlen(file_path); // malloc the stations array station_t *more_stations = realloc(stations, totalSize); if (!more_stations) { perror("malloc (stations pointer)"); return; } stations = more_stations; // assign the stations, and start the threads stations[num_stations].filePath = malloc(strlen(file_path)); memcpy(stations[num_stations].filePath, file_path, strlen(file_path)); stations[num_stations].readfd = open(file_path, O_RDONLY); // update fdmax, if applicable if (stations[num_stations].readfd > fdmax) { fdmax = stations[num_stations].readfd; } if (stations[num_stations].readfd < 0) { perror("read (from add station)"); return; } pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, (void *) (uint64_t) num_stations); send_newstation_reply(num_stations); printf("add: successfully created station @ index %d\n", num_stations); num_stations++; } void destroy_station(int station_num) { // check if station num is in range if (station_num >= num_stations || station_num < 0) { printf("remove: station number %d is out of range\n", station_num); return; } // check if station had been removed if (stations[station_num].readfd == -1) { printf("remove: station %d has already been removed\n", station_num); return; } // send the stationshutdown command to users for (int j = 0; j < max_active_users; j++) { if (!users[j].tcpfd || users[j].tcpfd == -1) continue; if (users[j].stationNum == station_num) { send_stationshutdown_reply(users[j].tcpfd); users[j].stationNum = -1; } } // cancel the stream's thread and close the read fd pthread_cancel(stations[station_num].streamThread); close(stations[station_num].readfd); stations[station_num].readfd = -1; printf("remove: successfully removed station %d\n", station_num); } void send_newstation_reply(uint16_t station_num) { if (l) printf("sending NEWSTATION reply to all sockets\n"); // make the struct uint8_t reply_num = 8; uint16_t station_num_n = htons(station_num); struct NewStation new_station = {reply_num, station_num_n}; // send the message to each (valid) user for (int i = 0; i < max_active_users; i++) { if (!users[i].tcpfd || users[i].tcpfd == -1) continue; int bytes_to_send = sizeof(struct NewStation); if (send_all(users[i].tcpfd, (char *) &new_station, &bytes_to_send) == -1) perror("send_all in newstation reply"); } } void send_stationshutdown_reply(int fd) { if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd); uint8_t reply_num = 7; if (send(fd, &reply_num, 1, 0) == -1) perror("send in send stationshutdown"); } // --------------------------------------------------------------------------------------------------------- // EOF :) // ---------------------------------------------------------------------------------------------------------