#include #include #include #include #include #include #include #include #include #include #include #include #include "protocol.c" #define LINE_MAX 1024 #define MAX_USERS 1000 #define MAX_PATH 50 #define MAX_RATE_PER_SECOND 16*1024 / 2 // typedef struct station { // int streamFd; // char* filePath; // int fileBufferSize; // char fileBuffer[MAX_STREAM_RATE]; // } station_t; typedef struct station { pthread_t streamThread; int readfd; char *filePath; } station_t; int num_stations; station_t *stations; int setup_stations(int argc, char *argv[]); void *stream_routine(void *arg); typedef struct user { int udpPort; int stationNum; int sockfd; } user_t; /* For safe condition variable usage, must use a boolean predicate and */ /* a mutex with the condition. */ int count = 0; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER; const char *port; // int num_stations; int start_threads = 0; int max_active_users = 0; int max_sockfd = 0; pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; // array from index to user_data user_t *user_data; int *sockfd_to_user; // stations array pointer // station_t *station_data; struct udp_packet_routine_args { int user_index; int buffer_size; char *file_buffer; }; void *send_udp_packet_routine(void* arg); void *select_routine(void* arg); void *sync_routine(void* arg); void *send_announce_routine(void* arg); void init_station(int station_num, const char *station_name); int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); void *print_info_routine(void *arg); void *get_in_addr(struct sockaddr *sa); void *init_user(int sockfd); void *update_user_udpPort(int sockfd, int udpPort); void *update_user_station(int sockfd, int stationNum); void *print_user_data(int sockfd); void *print_station_data(int station); void destroy_user(int sockfd); void send_announce_reply(int fd, int station_num); void send_invalid_reply(int fd, size_t message_size, char* message); void *send_stationsinfo_reply(void *arg); int send_welcome_reply(int fd); void handle_setstation_command(int fd); void add_station(char *station_name); void destroy_station(int station_num); void send_stationshutdown_reply(int fd); uint16_t handle_handshake(int newfd); void cleanup_fds(); int l = 0; main(int argc, char *argv[]) { // check and assign arguments if (argc < 3) { fprintf(stderr,"usage: ./snowcast_server [file 1] [file 2] ... \n"); exit(1); } // initizlize the port port = argv[1]; // initialize the stations & their threads if (setup_stations(argc, argv) == -1) { perror("setup_stations"); exit(1); } // make array of user data // printf("max active users: %d\n", sizeof(user_t) * max_active_users); user_data = malloc(sizeof(user_t) * max_active_users); if (!user_data) { perror("malloc userdata"); return 1; } sockfd_to_user = malloc(sizeof(int) * max_active_users); if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; } // make and start "select" thread that manages: // 1) new connections, 2) requests from current connections, 3) closing connections pthread_t select_thread; pthread_create(&select_thread, NULL, select_routine, NULL); // start syncchronization thread to broadcast stations // pthread_t sync_thread; // pthread_create(&sync_thread, NULL, sync_routine, NULL); // command line interface char input[LINE_MAX]; memset(input, 0, LINE_MAX); char *tokens[LINE_MAX / 2]; printf("snowcast_server> "); fflush(stdout); while (read(STDIN_FILENO, input, LINE_MAX) > 0) { // init tokens memset(tokens, 0, LINE_MAX / 2); // if 0, all whitespace if (!parse(input, tokens)) { printf("snowcast_server> "); fflush(stdout); continue; } char *command = tokens[0]; // if q, shutdown! if (!strcmp(command, "q")) { printf("Exiting.\n"); // TODO: exit better than break cleanup_fds(); break; } // if p, print info else if (!strcmp(command, "p")) { // get the file descriptor int print_fd = 0; // see if there is a file path char *output_file_path = tokens[1]; if (output_file_path != NULL) { if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1) { perror("open"); memset(input, 0, LINE_MAX); continue; } } else { print_fd = STDOUT_FILENO; } // printf("print_fd: %d\n", print_fd); // pthread_t print_info_thread; // pthread_create(&print_info_thread, NULL, print_info_routine, print_fd); print_info_routine((void *)print_fd); // note - this file descriptor is closed in the thread } else if (!strcmp(command, "u")) { for (int i = 0; i < max_active_users; i++) print_user_data(i); } else if (!strcmp(command, "s")) { for (int i = 0; i < num_stations; i++) print_station_data(i); } else if (!strcmp(command, "log")) { l= !l; printf("logging is now %s\n", l ? "on" : "off"); } else if (!strcmp(command, "add")) { // add a new station // get the file path char *file_path = tokens[1]; if (file_path == NULL) { printf("add: must provide a file path\n"); } else add_station(file_path); // add the station } else if (!strcmp(command, "remove")) { // remove a station if (tokens[1] == NULL) { printf("remove: must provide a station number to remove\n"); } else if ((tokens[1] != '\0' && atoi(tokens[1]) == 0)) { printf("remove: must provide a number\n"); } else destroy_station(atoi(tokens[1])); } else { printf("unknown command: %s\n", command); } memset(input, 0, LINE_MAX); printf("snowcast_server> "); fflush(stdout); } return 0; } int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) { // see if fd was closed at some point if (fd == -1) return -1; int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); if (bytes_read < 0) { perror("read (in read file)"); return -1; } // printf("bytes read: %d\n", bytes_read); if (bytes_read == 0) { // printf("end of file, restarting\n"); pthread_t send_announce_thread; pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num); if (lseek(fd, 0, SEEK_SET) == -1) { perror("lseek (in resarting file)"); return -1; } bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; } } return bytes_read; } void *stream_routine_cleanup(void *arg) { int read_fd = (int) arg; close(read_fd); } void *stream_routine(void *arg) { int station_num = (int) arg; // printf("stream routine %d\n", station_num); int read_fd = stations[station_num].readfd; pthread_cleanup_push(stream_routine_cleanup, read_fd); // make buffer which will be used to stream to children char buffer[MAX_RATE_PER_SECOND]; memset(buffer, 0, MAX_RATE_PER_SECOND); // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } for (;;) { // load bytes into buffer int bytes_read = read_file(read_fd, buffer, station_num); if (bytes_read == -1) { return (NULL); } // TODO: send buffer to children char *send_buffer = malloc(2 + bytes_read); for (int i = 0; i < max_active_users; i++) { if (!user_data[i].sockfd || user_data[i].sockfd == -1) continue; if (user_data[i].stationNum == station_num) { // send the udp packet int *send_buffer = malloc(2 + bytes_read); memset(send_buffer, 0, 2 + bytes_read); send_buffer[0] = i; send_buffer[1] = bytes_read; memcpy(send_buffer+2, buffer, bytes_read); // printf("sending udp packet to user %d\n", i); pthread_t t; pthread_create(&t, NULL, send_udp_packet_routine, send_buffer); } } free(send_buffer); usleep(1000000 / 2 - 5000); start_threads = 1; pthread_cond_broadcast(&cond); usleep(5000); start_threads = 0; memset(buffer, 0, MAX_RATE_PER_SECOND); } return (NULL); pthread_cleanup_pop(1); } int setup_stations(int argc, char *argv[]) { num_stations = argc - 2; // get the size to malloc int totalSize = 0; for(int i = 2; i < argc; i++) { totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); } // malloc the stations array stations = malloc(totalSize); if (!stations) { perror("malloc (stations pointer)"); return -1; } // assign the stations, and start the threads for (int i = 0; i < num_stations; i++) { stations[i].filePath = argv[i+2]; stations[i].readfd = open(argv[i+2], O_RDONLY); if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; } pthread_create(&stations[i].streamThread, NULL, stream_routine, i); } // printf("successfully created %d stations\n", num_stations); return 1; } void write_int_to_fd(int fd, int n) { int len = snprintf(NULL, 0, "%d", n); char *num = malloc(len + 1); if (!num) { perror("malloc write to fd"); return; } snprintf(num, len + 1, "%d", n); if (write(fd, num, strlen(num)) == -1) { perror("write"); } free(num); } void *print_info_routine(void *arg) { int print_fd = (int) arg; // printf("thread print_fd: %d\n", print_fd); // printf("num_stations: %d\n", num_stations); for (int i = 0; i < num_stations; i++) { write_int_to_fd(print_fd, i); char *comma = ","; write(print_fd, comma, strlen(comma)); // write file path char* file_path = stations[i].filePath; write(print_fd, file_path, strlen(file_path)); for (int j = 0; j < max_active_users; j++) { if (!user_data[j].sockfd || user_data[j].sockfd == -1) continue; if (user_data[j].stationNum == i) { char *localhost_ip = ",127.0.0.1:"; write(print_fd, localhost_ip, strlen(localhost_ip)); // write udpPort write_int_to_fd(print_fd, user_data[j].udpPort); } } // wrtie new line char *newline = "\n"; write(print_fd, newline, strlen(newline)); } if (print_fd != STDOUT_FILENO) close(print_fd); return (NULL); } int send_all_udp(int udp_sockfd, char *buf, int *len, struct addrinfo *thread_res) { int MAX_PACKET_SIZE = 512; int total = 0; // how many bytes we've sent int bytesleft = *len; // how many we have left to send int n; while(total < *len) { n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, thread_res->ai_addr, thread_res->ai_addrlen); // thread_res->ai_addr, thread_res->ai_addrlen)) == -1; if (n == -1) { break; } total += n; bytesleft -= n; } *len = total; // return number actually sent here return n==-1?-1:0; // return -1 on failure, 0 on success } void udp_port_cleanup_handler(void *arg) { int sockfd = (int) arg; close(sockfd); } /* Make the manager routine */ void *send_udp_packet_routine(void *arg) { // printf("send udp packet routine\n"); int *buf = arg; // unpack args int user_index = buf[0]; int buffer_size = buf[1]; char *file_buffer = malloc(buffer_size); memcpy(file_buffer, buf+2, buffer_size); // printf("udp packet routine, user:%d\n size: %d\n", user_index, buffer_size); // declare vairables to be used int did_work = 1; pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; int s; int udp_sockfd; struct addrinfo thread_hints, *thread_res, *thread_servinfo; int error_code; // setup hints memset(&thread_hints, 0, sizeof thread_hints); thread_hints.ai_family = AF_INET; // use IPv4 only thread_hints.ai_socktype = SOCK_DGRAM; thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me int int_port = user_data[user_index].udpPort; int length = snprintf( NULL, 0, "%d", int_port ); char* port = malloc( length + 1 ); if (!port) { perror("malloc on port"); return (NULL); } snprintf(port, length + 1, "%d", int_port); sprintf(port, "%d", int_port); if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code)); return (NULL); } free(port); // loop through all the results and make a socket for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) { if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype, thread_res->ai_protocol)) == -1) { perror("talker: socket"); continue; } break; } if (udp_sockfd == NULL) { fprintf(stderr, "talker: failed to create socket\n"); return (NULL); } pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd); // wait for pthread_mutex_lock(&m); did_work = 0; while (!start_threads) { pthread_cond_wait(&cond, &m); } int station_num = user_data[user_index].stationNum; if (station_num == -1) { did_work = 1; } // potential error here! // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); if (send_all_udp(udp_sockfd, file_buffer, &buffer_size, thread_res) == -1) { perror("send_all_udp"); printf("We only sent %d bytes because of the error!\n", buffer_size); } free(file_buffer); pthread_mutex_unlock(&m); pthread_cleanup_pop(1); return (NULL); } void *send_announce_routine(void *arg) { // unpack args int station_num = (int) arg; // send the announce messages for (int i = 0; i < max_active_users; i++) { if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) { continue; } // update the station of each user if (user_data[i].stationNum == station_num) { send_announce_reply(user_data[i].sockfd, station_num); } } } fd_set master; // master file descriptor list void *select_routine(void *arg) { fd_set read_fds; // temp file descriptor list for select() int listener; // listening socket descriptor int newfd; // newly accept()ed socket descriptor int fdmax; // maximum file descriptor number struct sockaddr_storage remoteaddr; // client address socklen_t addrlen; char buf[256]; // buffer for client data int nbytes; char remoteIP[INET6_ADDRSTRLEN]; int yes=1; // for setsockopt() SO_REUSEADDR, below int i, j, rv; struct addrinfo hints, *ai, *p; // const char* port = argv[1]; FD_ZERO(&master); // clear the master and temp sets FD_ZERO(&read_fds); // LISTENER: get us a socket and bind it memset(&hints, 0, sizeof hints); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); exit(1); } for(p = ai; p != NULL; p = p->ai_next) { listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); if (listener < 0) { continue; } // lose the pesky "address already in use" error message setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { close(listener); continue; } break; } // if we got here, it means we didn't get bound if (p == NULL) { fprintf(stderr, "snowcast_server: failed to bind\n"); exit(2); } freeaddrinfo(ai); // all done with this // listen if (listen(listener, 10) == -1) { perror("listen"); exit(3); } // add the listener to the master set FD_SET(listener, &master); // keep track of the biggest file descriptor fdmax = listener; // so far, it's this one while(1) { read_fds = master; // copy it if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { perror("select"); exit(4); } // run through the existing connections looking for data to read for(i = 0; i <= fdmax; i++) { if (FD_ISSET(i, &read_fds)) { // we got one!! if (i == listener) { // handle new connections addrlen = sizeof remoteaddr; newfd = accept(listener, (struct sockaddr *)&remoteaddr, &addrlen); if (newfd == -1) { perror("accept"); } else { // new user, we need to do the handshake uint16_t udp_port = handle_handshake(newfd); if (udp_port == 0) { // drop connection upon error close(newfd); continue; } // add the user to the set FD_SET(newfd, &master); // add to master set if (newfd > fdmax) { // keep track of the max fdmax = newfd; } // TODO: thread or combine into one function // init the user init_user(newfd); // update the udp port update_user_udpPort(newfd, udp_port); send_welcome_reply(newfd); } } else { // handle data from a client uint8_t command_type = -1; if ((nbytes = recv(i, &command_type, 1, 0)) <= 0) { // got error or connection closed by client if (nbytes == 0) { // connection closed if (l) printf("socket %d HUNGUP. lost connection.\n", newfd); } else { perror("recv"); } destroy_user(i); } else // we got some data from a client { if (command_type == 0) { // we got a HELLO commmand // send back in invalid command if user already sent a hello if (user_data[sockfd_to_user[i]].udpPort != -1) { if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", i); char * message = "must not sent more than one Hello message"; send_invalid_reply(i, strlen(message), message); destroy_user(i); } } else if (command_type == 1) { // we got a SETSTATION command handle_setstation_command(i); } else if (command_type == 5) { // we got a ListStations command if (l) printf("received a LISTSTATIONS from socket %d\n", i); pthread_t t; pthread_create(&t, NULL, send_stationsinfo_reply, i); } else { if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, i); char *message = "invalid command type"; send_invalid_reply(i, strlen(message), message); destroy_user(i); } } } } } } } void *init_user(int sockfd) { // add the user to the list of user data pthread_mutex_lock(&mutex_user_data); if(sockfd > max_sockfd) { max_sockfd = sockfd; int * more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1)); if (!more_sockfd_to_user) { perror("realloc"); exit(1); } sockfd_to_user = more_sockfd_to_user; } int running_index = 0; while(running_index < max_active_users) { if (user_data[running_index].sockfd == -1) { break; } running_index++; } if (running_index == max_active_users) { // printf("reached max active users\n"); // printf("making new memory\n"); max_active_users++; user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); if (!more_users) { perror("realloc"); exit(1); } user_data = more_users; } // map TCP sockfd to this user index user_data[running_index] = (user_t){-1, -1, sockfd}; sockfd_to_user[sockfd] = running_index; // free(user_stream_threads); pthread_mutex_unlock(&mutex_user_data); return (NULL); } void *update_user_udpPort(int sockfd, int udpPort) { pthread_mutex_lock(&mutex_user_data); // get the user user_t *user = &user_data[sockfd_to_user[sockfd]]; // set the udpPort user->udpPort = udpPort; // start the stream thread, now that we have the udpPort // pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); pthread_mutex_unlock(&mutex_user_data); return (NULL); } void *update_user_station(int sockfd, int stationNum) { pthread_mutex_lock(&mutex_user_data); user_data[sockfd_to_user[sockfd]].stationNum = stationNum; pthread_mutex_unlock(&mutex_user_data); } void *print_user_data(int index) { printf("user: %d -> udpPort: %d, stationNum: %d, sockfd: %d\n", index, user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd); } void *print_station_data(int station) { printf("station: %d -> filePath: %s, readfd: %d\n", station, stations[station].filePath, stations[station].readfd); } void destroy_user(int sockfd) { close(sockfd); // bye! FD_CLR(sockfd, &master); // remove from master set // remove user from data structures pthread_mutex_lock(&mutex_user_data); user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1}; // map sockfd to -1 sockfd_to_user[sockfd] = -1; pthread_mutex_unlock(&mutex_user_data); } void *get_in_addr(struct sockaddr *sa) { if (sa->sa_family == AF_INET) { return &(((struct sockaddr_in*)sa)->sin_addr); } return &(((struct sockaddr_in6*)sa)->sin6_addr); } void send_announce_reply(int fd, int station_num) { if (l) printf("sending ANNNOUNCE reply to socket %d\n", fd); char* file_path = stations[station_num].filePath; int len_file_path = strlen(file_path); char *send_buffer = malloc(len_file_path+2); if (!send_buffer) { perror("malloc in send announce"); return; } send_buffer[0] = 3; send_buffer[1] = len_file_path; memcpy(send_buffer + 2, file_path, len_file_path); size_t bytes_to_send = len_file_path + 2; if (send_all(fd, send_buffer, &bytes_to_send) == -1) perror("send_all"); free(send_buffer); } void send_invalid_reply(int fd, size_t message_size, char* message) { char *send_buffer = malloc(message_size+2); if (!send_buffer) { perror("malloc in send invalid command"); return; } // type and payload size send_buffer[0] = 4; send_buffer[1] = message_size; memcpy(send_buffer + 2, message, message_size); int bytes_to_send = message_size + 2; if (send_all(fd, send_buffer, &bytes_to_send) == -1) perror("send"); free(send_buffer); } void *send_stationsinfo_reply(void * arg) { int fd = (int) arg; if (l) printf("sending STATIONSINFO reply to socket %d\n", fd); uint32_t reply_size = 0; for (int i = 0; i < num_stations; i++) { if (stations[i].readfd != -1) reply_size += snprintf(NULL, 0, "%d,%s\n", i, stations[i].filePath); } reply_size--; // don't want final \n // send type uint8_t reply_num = 6; if (send(fd, &reply_num, 1, 0) == -1) perror("send in send stations info"); // send payload size uint32_t reply_size_endian = htonl(reply_size); int bytes = sizeof(uint32_t); if (send_all(fd, &reply_size_endian, &bytes) == -1) perror("send_all in send stations info"); printf("SENT reply size: %d\n", reply_size); char send_buffer[reply_size]; int ptr = 0; for (int i = 0; i < num_stations; i++) { ptr += sprintf(send_buffer + ptr, "%d,%s\n", i, stations[i].filePath); } int bytes_to_send = reply_size; // don't want final \n if (send_all(fd, &send_buffer, &bytes_to_send) == -1) perror("send_all buffer"); return (NULL); } // Parses a buffer into tokens, from cs33 :) int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { const char *regex = " \n\t\f\r"; char *current_token = strtok(buffer, regex); if (current_token == NULL) return 0; for (int i = 0; current_token != NULL; i++) { tokens[i] = current_token; current_token = strtok(NULL, regex); } return 1; } uint16_t handle_handshake(int newfd) { if (l) printf("waiting for HELLO command. new connection on socket %d\n", newfd); // apply timeout to socket apply_timeout(newfd); uint8_t command_type = -1; size_t nbytes; if (nbytes = recv(newfd, &command_type, 1, 0) <= 0) { // got error or connection closed by client if (nbytes == 0) { // connection closed if (l) printf("socket %d HUNGUP. lost connection.\n", newfd); } else { perror("recv in handle handshake"); } // remove user from data structures close(newfd); return 0; } if (command_type != 0) { char * message = "must send a Hello message first"; send_invalid_reply(newfd, strlen(message), message); close(newfd); return 0; } // get the udp port uint16_t udp_port = -1; int bytes_to_read = sizeof(uint16_t); if (recv_all(newfd, &udp_port, &bytes_to_read) == -1) { perror("recv_all"); close(newfd); return 0; } // remove timeout, after reading data remove_timeout(newfd); if(l) printf("received HELLO from socket %d with udp port %u\n", newfd, ntohs(udp_port)); return ntohs(udp_port); } int send_welcome_reply(int fd) { if(l) printf("sending WELCOME reply to socket %d\n", fd); struct Welcome welcome; welcome.replyType = 2; welcome.numStations = htons(num_stations); int bytes_to_send = sizeof(struct Welcome); if (send_all(fd, &welcome, &bytes_to_send) == -1) { perror("send_all (in init_user_routine)"); return -1; } return 1; } void handle_setstation_command(int sockfd) { // get the station number uint16_t station_number = -1; int bytes_to_read = sizeof(uint16_t); if (recv_all(sockfd, &station_number, &bytes_to_read) == -1) { perror("recv_all"); destroy_user(sockfd); } station_number = ntohs(station_number); // check if user has a udpPort to stream to if (user_data[sockfd_to_user[sockfd]].udpPort == -1) { if (l) printf("received a SETSTATION from socket %d before HELLO command. sending INVALID reply.\n", sockfd); // send back in invalid command and drop user char * message = "must send Hello message first"; send_invalid_reply(sockfd, strlen(message), message); destroy_user(sockfd); return; } if (l) printf("received a SETSTATION from socket %d with station_number %u\n", sockfd, station_number); // check if station num is in range if (station_number >= num_stations || station_number < 0) { if (l) printf("station number invalid from socket %d. sending INVALID reply.\n", sockfd); // send back in invalid command and drop user char * message = "station number out of range"; send_invalid_reply(sockfd, strlen(message), message); destroy_user(sockfd); return; } // check if station num has been removed if (stations[station_number].readfd == -1) { send_stationshutdown_reply(sockfd); return; } update_user_station(sockfd, station_number); send_announce_reply(sockfd, station_number); } void send_stationshutdown_reply(int fd) { if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd); uint8_t reply_num = 7; if (send(fd, &reply_num, 1, 0) == -1) perror("send in send stationshutdown"); } void destroy_station(int station_num) { // check if station num is in range if (station_num >= num_stations || station_num < 0) { printf("remove: station number %d is out of range\n", station_num); return; } // check if station had been removed if (stations[station_num].readfd == -1) { printf("remove: station %d has already been removed\n", station_num); return; } // send the stationshutdown command to users for (int j = 0; j < max_active_users; j++) { if (!user_data[j].sockfd || user_data[j].sockfd == -1) continue; if (user_data[j].stationNum == station_num) { send_stationshutdown_reply(user_data[j].sockfd); user_data[j].stationNum = -1; } } close(stations[station_num].readfd); // stations[station_num].filePath = NULL; stations[station_num].readfd = -1; printf("remove: successfully removed station %d\n", station_num); } void send_newstation_reply(uint16_t station_num) { if (l) printf("sending NEWSTATION reply to all sockets\n"); uint8_t reply_num = 8; uint16_t station_num_n = htons(station_num); struct NewStation new_station = {reply_num, station_num_n}; for (int i = 0; i < max_active_users; i++) { if (!user_data[i].sockfd || user_data[i].sockfd == -1) continue; int bytes_to_send = sizeof(struct NewStation); if (send_all(user_data[i].sockfd, &new_station, &bytes_to_send) == -1) perror("send_all in newstation reply"); } } void add_station(char *file_path) { // run through array and see if we've seen file path before for (int i = 0; i < num_stations; i++) { if (strcmp(stations[i].filePath, file_path) == 0) { // this station is still active if (stations[i].readfd != -1) { printf("add: (warning) filepath %s is already streaming on station %d\n", file_path, i); } else { // this station has been removed // reopen the file stations[i].readfd = open(file_path, O_RDONLY); if (stations[i].readfd < 0) { perror("read (from add station)"); return; } pthread_create(&stations[i].streamThread, NULL, stream_routine, i); send_newstation_reply(i); return; } } } // get the size to malloc int totalSize = 0; for(int i = 0; i < num_stations; i++) { totalSize += sizeof(pthread_t) + sizeof(int) + strlen(stations[i].filePath); } totalSize += sizeof(pthread_t) + sizeof(int) + strlen(file_path); // malloc the stations array char *more_stations = realloc(stations, totalSize); if (!more_stations) { perror("malloc (stations pointer)"); return; } stations = more_stations; // assign the stations, and start the threads stations[num_stations].filePath = malloc(strlen(file_path)); memcpy(stations[num_stations].filePath, file_path, strlen(file_path)); stations[num_stations].readfd = open(file_path, O_RDONLY); if (stations[num_stations].readfd < 0) { perror("read (from add station)"); return; } pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, num_stations); send_newstation_reply(num_stations); printf("add: successfully created station @ index %d\n", num_stations); num_stations++; } void cleanup_fds() { // // close all the file descriptors // for (int i = 0; i < num_stations; i++) { // close(stations[i].readfd); // } // for (int i = 0; i < max_active_users; i++) { // if (user_data[i].sockfd != -1) { // close(user_data[i].sockfd); // } // } }