diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-25 16:17:12 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-25 16:17:12 -0400 |
commit | c534d8e28a00c9762fcb4ef2bdeb9a735ae26b75 (patch) | |
tree | 096a80c9e20de1daf4babbf610837de0cefd5297 | |
parent | 13929ac7a2f3d18f1a9d5717e76d0e7725c263c4 (diff) |
add comments and clean client
-rw-r--r-- | client.c | 195 | ||||
-rw-r--r-- | protocol.h | 2 | ||||
-rw-r--r-- | select.c | 0 | ||||
-rw-r--r-- | server.c | 58 | ||||
-rwxr-xr-x | snowcast_control | bin | 18848 -> 18848 bytes | |||
-rwxr-xr-x | snowcast_server | bin | 37800 -> 37856 bytes |
6 files changed, 148 insertions, 107 deletions
@@ -9,13 +9,9 @@ #include <sys/socket.h> #include <ctype.h> #include <pthread.h> - #include <arpa/inet.h> - #include "protocol.c" -#define MAXDATASIZE 100 // max number of bytes we can get at once - #define MAX_READ_SIZE 1024 #define LINE_MAX 1024 @@ -43,9 +39,9 @@ int main(int argc, char *argv[]) int sockfd, numbytesrecv, numbytessent, recvbytes; // char buf[MAXDATASIZE]; struct addrinfo hints, *servinfo, *p; - int rv; char s[INET6_ADDRSTRLEN]; + // check arugments if (argc != 4) { fprintf(stderr,"<server IP> <server port> <listener port>\n"); exit(1); @@ -58,11 +54,13 @@ int main(int argc, char *argv[]) char* tcpPort = argv[2]; // port we use to connect to server's tcp stream char* udpPort = argv[3]; // port we use to connect to server's udp info and command - if ((rv = getaddrinfo(argv[1], tcpPort, &hints, &servinfo)) != 0) { + // resolve host + int rv; + if ((rv = getaddrinfo(argv[1], tcpPort, &hints, &servinfo)) != 0) + { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); return 1; } - // loop through all the results and connect to the first we can for(p = servinfo; p != NULL; p = p->ai_next) { if ((sockfd = socket(p->ai_family, p->ai_socktype, @@ -76,34 +74,31 @@ int main(int argc, char *argv[]) perror("client: connect"); continue; } - break; } - if (p == NULL) { fprintf(stderr, "client: failed to connect\n"); return 2; } - - inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), - s, sizeof s); - // printf("client: connecting to %s\n", s); - + inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof s); freeaddrinfo(servinfo); // all done with this structure + // now that we're connectioned, let's do the handshake uint16_t num_stations = handle_handshake(sockfd, udpPort); + // handle_handshake will end the program on fail + // so if we're here, num_stations is correct fflush(stdout); printf("Welcome to Snowcast! The server has %u stations.\n", num_stations); fflush(stdout); + // start the thread to accept command lines pthread_t command_line_thread; pthread_create(&command_line_thread, NULL, command_line_routine, (void*)sockfd); - // CONSIDER: could recieve the welcome message here - // int recvbytes; + // this while loop hangs on recv and runs when there is new data while (1) { - // recv the first byte of the message to get it's type + // get the type of the incoming reply uint8_t reply_type = -1; // print size of utin8 if (recv(sockfd, &reply_type, 1, 0) == -1) { @@ -114,35 +109,38 @@ int main(int argc, char *argv[]) exit(1); } - if (reply_type == 2) { // we have a second welcome message + if (reply_type == WELCOME) { fprintf(stderr, "WECLOME reply received twice. Exiting.\n"); close(sockfd); exit(1); } - if (reply_type == 3) { // we have an announce message - + if (reply_type == ANNOUNCE) { if (!station_is_set) { fprintf(stderr, "ANNOUNCE reply received before SETSTATION command. Exiting.\n"); close(sockfd); exit(1); } - // get the string size + // get the string size for the songname u_int8_t string_size = -1; if (recv(sockfd, &string_size, 1, 0) == -1) { perror("recv in announce"); exit(1); } + + // read the songname char *song_name = malloc(string_size); if(song_name == NULL) { perror("malloc in song name"); } - int bytes_to_read = string_size; if (recv_all(sockfd, song_name, &bytes_to_read) == -1) { perror("recv_all in announce"); exit(1); } + remove_timeout(sockfd); // have received all, so can remove timeout + if (l) printf("received ANNOUNCE reply.\n"); + // print the songname if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt waiting = 0; fflush(stdout); @@ -151,7 +149,9 @@ int main(int argc, char *argv[]) fflush(stdout); free(song_name); continue; - } else if (reply_type == 4) { // we have an invalid command message + } + + if (reply_type == INVALID) { // we have an invalid command message // get the string size u_int8_t string_size = -1; if (recv(sockfd, &string_size, 1, 0) == -1) { @@ -170,10 +170,15 @@ int main(int argc, char *argv[]) fflush(stdout); free(message); close(sockfd); + + // close the program on all INVALID COMMANDS exit(1); } - else if (reply_type == 6) { // we are getting STATIONINFO - // get the string size + + if (reply_type == STATIONINFO) { // we are getting STATIONINFO + if (l) printf("received STATIONINFO reply.\n"); + + // get the string size, which can be farily long (uint32_t) uint32_t buf_string_size = -1; int bytes_to_read = sizeof(uint32_t); if (recv_all(sockfd, &buf_string_size, &bytes_to_read) == -1) { @@ -182,7 +187,8 @@ int main(int argc, char *argv[]) } uint32_t string_size = ntohl(buf_string_size); - printf("string size: %d\n", string_size); + + // recieve the message char *info = malloc(string_size); if(info == NULL) { perror("malloc in info"); } bytes_to_read = string_size; @@ -190,9 +196,9 @@ int main(int argc, char *argv[]) perror("recv_all 2 in stationinfo"); exit(1); } - remove_timeout(sockfd); - if (l) printf("received STATIONINFO reply.\n"); + remove_timeout(sockfd); // remove the timeout, now that we have the data + // print the info fflush(stdout); printf("Station Information:\n%s\n", info); printf("snowcast_control> "); @@ -201,32 +207,43 @@ int main(int argc, char *argv[]) free(info); continue; } - else if (reply_type == 7) { // we are getting StationShutdown + + if (reply_type == STATIONSHUTDOWN) { // we are getting StationShutdown if (l) printf("received STATIONSHUTDOWN reply.\n"); - if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt + + if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt :) waiting = 0; - remove_timeout(sockfd); + + // station no longer set + station_is_set = 0; fflush(stdout); printf("This station has shut down. Please select a different station.\n"); printf("snowcast_control> "); fflush(stdout); continue; } - else if (reply_type == 8) { // we are getting NewStation + + if (reply_type == NEWSTATION) { // we are getting NewStation + if (l) printf("received NEWSTATION reply.\n"); + + // get station number uint16_t station_number = -1; if (recv(sockfd, &station_number, 2, 0) == -1) { perror("recv in new station"); exit(1); } station_number = ntohs(station_number); - if (l) printf("received NEWSTATION reply.\n"); + + // print fflush(stdout); - printf("\nThere is now a new station @ index %d.\n", station_number); + printf("\nThere is now a new station @ index %u.\n", station_number); printf("snowcast_control> "); fflush(stdout); + continue; } + // if we're here, lost conneciton to the server -> end the program printf("\nsocket to server HUNGUP. Exiting.\n"); close(sockfd); exit(1); @@ -235,105 +252,131 @@ int main(int argc, char *argv[]) } void *command_line_routine(void* args) { + // unpack sockfd as arg int sockfd = (int) args; + // buffer for input char input[LINE_MAX]; printf("Enter a number to change to it's station. Enter q to end stream.\n"); printf("snowcast_control> "); fflush(stdout); while (1) { + memset(input, 0, LINE_MAX); char *line = fgets(input, LINE_MAX, stdin); + // nothing was typed if (line == NULL) { continue; - } else if (strncmp("q\n", input, LINE_MAX) == 0) { + } + + // q command: exit the program + if (strncmp("q\n", input, LINE_MAX) == 0) { // end code if type in q exiting = 1; printf("Exiting.\n"); close(sockfd); exit(0); - } else if (strncmp("l\n", input, LINE_MAX) == 0) { + } + + // l command: STATIONINFO command (EXTRA CREDIT) + if (strncmp("l\n", input, LINE_MAX) == 0) { // send the command to list stations if (l) printf("sending LISTSTATIONS command. waiting for STATIONINFO reply.\n"); - apply_timeout(sockfd); + apply_timeout(sockfd); // apply a timeout, will be released when we get the reply int list_station_reply_type = 5; if (send(sockfd, &list_station_reply_type, 1, 0) == -1) { perror("send"); exit(1); } - } else if (strncmp("log\n", input, LINE_MAX) == 0) { + continue; + } + + // log command: toggle logging + if (strncmp("log\n", input, LINE_MAX) == 0) + { l = !l; printf("LOGGING is now %s!\n", l ? "on" : "off"); printf("snowcast_control> "); fflush(stdout); + continue; } - else { - // convert input to an int - int inputInt = atoi(input); - if (input[0] != '0' && inputInt == 0) { - printf("unknown command: %si", input); - printf("snowcast_control> "); - fflush(stdout); - continue; - } - // printf("Changing to station %d.\n", inputInt); - - // set waiting so no new line on announce - waiting = 1; - // send the command to change the station - apply_timeout(sockfd); - struct SetStation setStation; - setStation.commandType = 1; - setStation.stationNumber = htons(inputInt); - int bytes_to_send = sizeof(struct SetStation); - // apply_timeout(sockfd); - if (send_all(sockfd, &setStation, &bytes_to_send) == -1) { - perror("send_all"); - exit(1); - } - if (!station_is_set) { - station_is_set = 1; - } + + // check if this could be a station number + int inputInt = atoi(input); + if (input[0] != '0' && inputInt == 0) { + // if we're in here, it's ~likely~ not a number (ik it's not perfect, sorry :/) + printf("unknown command: %si", input); + printf("snowcast_control> "); + fflush(stdout); + continue; } + + // if we're here, it was a valid number, + // & we need to send SETSTATION command. + if (l) printf("sending SETSTATION command.\n"); + waiting = 1; // just for clean cmd + + // setup struct + struct SetStation setStation; + setStation.commandType = SETSTATION; + setStation.stationNumber = htons(inputInt); + // send it + apply_timeout(sockfd); // apply timeout, will be released when we get ANNOUNCE reply + int bytes_to_send = sizeof(struct SetStation); + if (send_all(sockfd, &setStation, &bytes_to_send) == -1) { + perror("send_all"); + exit(1); + } + + if (!station_is_set) station_is_set = 1; // update, if needed + if (l) printf("waiting for ANNOUNCE reply...\n"); } return (NULL); } uint16_t handle_handshake(int sockfd, char* udp_port) { - apply_timeout(sockfd); // apply timeout for handshake + if (l) printf("found server, sending HELLO command.\n"); - if (l) printf("found server, sending HELLO command. waiting for ANNOUNCE reply.\n"); + apply_timeout(sockfd); // apply timeout for handshake + // after we find connection, we need to send HELLO command struct Hello hello; - hello.commandType = 0; + hello.commandType = HELLO; // convert updPort to an int int udp_port_int = atoi(udp_port); hello.udpPort = htons(udp_port_int); - if (send(sockfd, &hello, sizeof(struct Hello), 0) == -1) { + int hello_len = sizeof(struct Hello); + if (send_all(sockfd, &hello, &hello_len) == -1) { perror("send"); exit(1); } - // recv the first byte of the message to get it's type + if (l) printf("waiting for WELCOME reply...\n"); + + // read WELCOME reply type uint8_t reply_type = -1; // print size of utin8 - if (recv(sockfd, &reply_type, 1, 0) == -1) { + if (recv(sockfd, &reply_type, 1, 0) == -1) { // only one byte, so can use recv perror("recv in handshake"); exit(1); } - // recv the message, check for errors too + if (reply_type != WELCOME) { + fprintf(stderr, "first reply is not WELCOME. Exiting.\n"); + close(sockfd); + exit(1); + } + + // get the number of stations int16_t num_stations = -1; int bytes_to_read = sizeof(uint16_t); if (recv_all(sockfd, &num_stations, &bytes_to_read) == -1) { perror("recv_all in handshake"); exit(1); } - if (l) printf("received ANNOUNCE reply.\n"); - // remove timeout since we no longer are "waiting" for an immediate reply - remove_timeout(sockfd); - return ntohs(num_stations); + remove_timeout(sockfd); // remove timeout since we no longer are "waiting" for an immediate reply + return ntohs(num_stations); // return the num_stations to be printed }
\ No newline at end of file @@ -8,6 +8,8 @@ #define LISTSTATIONS 5 #define STATIONINFO 6 +#define STATIONSHUTDOWN 7 +#define NEWSTATION 8 // client to server messages (commands) diff --git a/select.c b/select.c new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/select.c @@ -7,24 +7,17 @@ #include <arpa/inet.h> #include <netdb.h> #include <string.h> - #include <sys/stat.h> #include <fcntl.h> #include <sys/types.h> - #include "protocol.c" #define LINE_MAX 1024 #define MAX_USERS 1000 #define MAX_PATH 50 #define MAX_RATE_PER_SECOND 16*1024 / 2 +#define MAX_PACKET_SIZE 512 -// typedef struct station { -// int streamFd; -// char* filePath; -// int fileBufferSize; -// char fileBuffer[MAX_STREAM_RATE]; -// } station_t; typedef struct station { pthread_t streamThread; int readfd; @@ -35,7 +28,6 @@ station_t *stations; int setup_stations(int argc, char *argv[]); void *stream_routine(void *arg); - typedef struct user { int udpPort; int stationNum; @@ -259,24 +251,26 @@ void *stream_routine_cleanup(void *arg) { } void *stream_routine(void *arg) { + int BROADCASTS_PER_SECOND = 2; + int BROADCAST_OFFSET = 10000; + int station_num = (int) arg; // printf("stream routine %d\n", station_num); int read_fd = stations[station_num].readfd; pthread_cleanup_push(stream_routine_cleanup, read_fd); - // make buffer which will be used to stream to children + // make buffer for read_file char buffer[MAX_RATE_PER_SECOND]; - memset(buffer, 0, MAX_RATE_PER_SECOND); - // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } for (;;) { - // load bytes into buffer + // load bytes from file into buffer + memset(buffer, 0, MAX_RATE_PER_SECOND); int bytes_read = read_file(read_fd, buffer, station_num); if (bytes_read == -1) { return (NULL); } - // TODO: send buffer to children + // create the threads to send packets to users, which will be released later int *send_buffer; for (int i = 0; i < max_active_users; i++) { @@ -284,26 +278,33 @@ void *stream_routine(void *arg) { continue; if (user_data[i].stationNum == station_num) { - // send the udp packet + // prepare the send buffer + // (note: using int* for easy pointer assignment) send_buffer = malloc(2 + bytes_read); - memset(send_buffer, 0, 2 + bytes_read); + if (!send_buffer) { perror("malloc send_buffer in stream_routine"); return (NULL); } send_buffer[0] = i; send_buffer[1] = bytes_read; memcpy(send_buffer+2, buffer, bytes_read); - // printf("sending udp packet to user %d\n", i); - pthread_t t; - pthread_create(&t, NULL, send_udp_packet_routine, send_buffer); + + // make thread + pthread_t send_udp_packet_thread; + pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer); } } - usleep(1000000 / 2 - 5000); + + // wait for the thread to be created + usleep(1000000 / BROADCASTS_PER_SECOND - BROADCAST_OFFSET); // do -1000 for the usleep below + + // let the threads run! start_threads = 1; pthread_cond_broadcast(&cond); - usleep(5000); + // give some time to broadcast, then reset variables + usleep(BROADCAST_OFFSET); start_threads = 0; + // free the buffer after it's been sent free(send_buffer); - memset(buffer, 0, MAX_RATE_PER_SECOND); } return (NULL); @@ -382,7 +383,6 @@ void *print_info_routine(void *arg) { int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen) { - int MAX_PACKET_SIZE = 512; int total = 0; // how many bytes we've sent int bytesleft = *len; // how many we have left to send int n; @@ -746,12 +746,7 @@ void *init_user_routine(int newfd, int udp_port) { return (NULL); } -// void *update_user_udpPort(int sockfd, int udpPort) { -// pthread_mutex_lock(&mutex_user_data); -// pthread_mutex_unlock(&mutex_user_data); -// return (NULL); -// } void *update_user_station(int sockfd, int stationNum) { pthread_mutex_lock(&mutex_user_data); user_data[sockfd_to_user[sockfd]].stationNum = stationNum; @@ -1015,8 +1010,9 @@ void destroy_station(int station_num) { } } + // cancel the stream's thread and close the read fd + pthread_cancel(stations[station_num].streamThread); close(stations[station_num].readfd); - // stations[station_num].filePath = NULL; stations[station_num].readfd = -1; printf("remove: successfully removed station %d\n", station_num); @@ -1082,8 +1078,6 @@ void add_station(char *file_path) { num_stations++; } - - void cleanup_fds() { // close all the file descriptors for the stations for (int i = 0; i < num_stations; i++) { @@ -1096,4 +1090,6 @@ void cleanup_fds() { close(user_data[i].sockfd); } } + + close(listener); } diff --git a/snowcast_control b/snowcast_control Binary files differindex 01f4d14..dae574f 100755 --- a/snowcast_control +++ b/snowcast_control diff --git a/snowcast_server b/snowcast_server Binary files differindex 55a64fb..cb0c2f1 100755 --- a/snowcast_server +++ b/snowcast_server |