diff options
Diffstat (limited to 'server.c')
-rw-r--r-- | server.c | 141 |
1 files changed, 135 insertions, 6 deletions
@@ -49,7 +49,7 @@ int count = 0; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER; const char *port; // int num_stations; @@ -97,6 +97,10 @@ void *send_stationsinfo_reply(void *arg); int send_welcome_reply(int fd); void handle_setstation_command(int fd); +void add_station(char *station_name); +void destroy_station(int station_num); +void send_stationshutdown_reply(int fd); + uint16_t handle_handshake(int newfd); void cleanup_fds(); @@ -145,7 +149,7 @@ main(int argc, char *argv[]) fflush(stdout); while (read(STDIN_FILENO, input, LINE_MAX) > 0) { // init tokens - memset(tokens, 0, (LINE_MAX / 2) * sizeof(char *)); + memset(tokens, 0, LINE_MAX / 2); // if 0, all whitespace if (!parse(input, tokens)) { @@ -174,6 +178,7 @@ main(int argc, char *argv[]) if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1) { perror("open"); + memset(input, 0, LINE_MAX); continue; } } else { @@ -197,10 +202,26 @@ main(int argc, char *argv[]) l= !l; printf("logging is now %s\n", l ? "on" : "off"); } + else if (!strcmp(command, "add")) { // add a new station + // get the file path + char *file_path = tokens[1]; + if (file_path == NULL) { + printf("add: must provide a file path\n"); + } else add_station(file_path); // add the station + } + else if (!strcmp(command, "remove")) { // remove a station + if (tokens[1] == NULL) { + printf("remove: must provide a station number to remove\n"); + } + else if ((tokens[1] != '\0' && atoi(tokens[1]) == 0)) { + printf("remove: must provide a number\n"); + } else destroy_station(atoi(tokens[1])); + } else { - printf("unknown command:\t%s\n", command); + printf("unknown command: %s\n", command); } + memset(input, 0, LINE_MAX); printf("snowcast_server> "); fflush(stdout); } @@ -209,8 +230,11 @@ main(int argc, char *argv[]) } int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) { + // see if fd was closed at some point + if (fd == -1) return -1; + int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); - if (bytes_read < 0) { perror("read (from station file)"); return -1; } + if (bytes_read < 0) { perror("read (in read file)"); return -1; } // printf("bytes read: %d\n", bytes_read); if (bytes_read == 0) { // printf("end of file, restarting\n"); @@ -223,7 +247,7 @@ int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) { return -1; } bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); - if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; } + if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; } } return bytes_read; @@ -250,7 +274,7 @@ void *stream_routine(void *arg) { { // load bytes into buffer int bytes_read = read_file(read_fd, buffer, station_num); - if (bytes_read == -1) { exit(1); } + if (bytes_read == -1) { return (NULL); } // TODO: send buffer to children char *send_buffer = malloc(2 + bytes_read); @@ -918,10 +942,115 @@ void handle_setstation_command(int sockfd) { return; } + // check if station num has been removed + if (stations[station_number].readfd == -1) { + send_stationshutdown_reply(sockfd); + return; + } + update_user_station(sockfd, station_number); send_announce_reply(sockfd, station_number); } +void send_stationshutdown_reply(int fd) { + if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd); + + uint8_t reply_num = 7; + if (send(fd, &reply_num, 1, 0) == -1) + perror("send in send stationshutdown"); +} + +void destroy_station(int station_num) { + // check if station num is in range + if (station_num >= num_stations || station_num < 0) { + printf("remove: station number %d is out of range\n", station_num); + return; + } + // check if station had been removed + if (stations[station_num].readfd == -1) { + printf("remove: station %d has already been removed\n", station_num); + return; + } + + // send the stationshutdown command to users + for (int j = 0; j < max_active_users; j++) { + if (!user_data[j].sockfd || user_data[j].sockfd == -1) + continue; + if (user_data[j].stationNum == station_num) { + send_stationshutdown_reply(user_data[j].sockfd); + user_data[j].stationNum = -1; + } + } + + close(stations[station_num].readfd); + // stations[station_num].filePath = NULL; + stations[station_num].readfd = -1; + + printf("remove: successfully removed station %d\n", station_num); +} + +void send_newstation_reply(uint16_t station_num) { + if (l) printf("sending NEWSTATION reply to all sockets\n"); + + uint8_t reply_num = 8; + uint16_t station_num_n = htons(station_num); + struct NewStation new_station = {reply_num, station_num_n}; + for (int i = 0; i < max_active_users; i++) { + if (!user_data[i].sockfd || user_data[i].sockfd == -1) + continue; + int bytes_to_send = sizeof(struct NewStation); + if (send_all(user_data[i].sockfd, &new_station, &bytes_to_send) == -1) + perror("send_all in newstation reply"); + } +} + +void add_station(char *file_path) { + // run through array and see if we've seen file path before + for (int i = 0; i < num_stations; i++) { + if (strcmp(stations[i].filePath, file_path) == 0) { + // this station is still active + if (stations[i].readfd != -1) { + printf("add: (warning) filepath %s is already streaming on station %d\n", file_path, i); + } + else { + // this station has been removed + // reopen the file + stations[i].readfd = open(file_path, O_RDONLY); + if (stations[i].readfd < 0) { perror("read (from add station)"); return; } + pthread_create(&stations[i].streamThread, NULL, stream_routine, i); + send_newstation_reply(i); + return; + } + } + } + + // get the size to malloc + int totalSize = 0; + for(int i = 0; i < num_stations; i++) + { + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(stations[i].filePath); + } + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(file_path); + + // malloc the stations array + char *more_stations = realloc(stations, totalSize); + if (!more_stations) { perror("malloc (stations pointer)"); return; } + stations = more_stations; + // assign the stations, and start the threads + stations[num_stations].filePath = malloc(strlen(file_path)); + memcpy(stations[num_stations].filePath, file_path, strlen(file_path)); + stations[num_stations].readfd = open(file_path, O_RDONLY); + if (stations[num_stations].readfd < 0) { perror("read (from add station)"); return; } + pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, num_stations); + send_newstation_reply(num_stations); + + printf("add: successfully created station @ index %d\n", num_stations); + + num_stations++; +} + + + void cleanup_fds() { // // close all the file descriptors // for (int i = 0; i < num_stations; i++) { |