aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-25 16:17:12 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-25 16:17:12 -0400
commitc534d8e28a00c9762fcb4ef2bdeb9a735ae26b75 (patch)
tree096a80c9e20de1daf4babbf610837de0cefd5297
parent13929ac7a2f3d18f1a9d5717e76d0e7725c263c4 (diff)
add comments and clean client
-rw-r--r--client.c195
-rw-r--r--protocol.h2
-rw-r--r--select.c0
-rw-r--r--server.c58
-rwxr-xr-xsnowcast_controlbin18848 -> 18848 bytes
-rwxr-xr-xsnowcast_serverbin37800 -> 37856 bytes
6 files changed, 148 insertions, 107 deletions
diff --git a/client.c b/client.c
index 4c3573c..f9617cd 100644
--- a/client.c
+++ b/client.c
@@ -9,13 +9,9 @@
#include <sys/socket.h>
#include <ctype.h>
#include <pthread.h>
-
#include <arpa/inet.h>
-
#include "protocol.c"
-#define MAXDATASIZE 100 // max number of bytes we can get at once
-
#define MAX_READ_SIZE 1024
#define LINE_MAX 1024
@@ -43,9 +39,9 @@ int main(int argc, char *argv[])
int sockfd, numbytesrecv, numbytessent, recvbytes;
// char buf[MAXDATASIZE];
struct addrinfo hints, *servinfo, *p;
- int rv;
char s[INET6_ADDRSTRLEN];
+ // check arugments
if (argc != 4) {
fprintf(stderr,"<server IP> <server port> <listener port>\n");
exit(1);
@@ -58,11 +54,13 @@ int main(int argc, char *argv[])
char* tcpPort = argv[2]; // port we use to connect to server's tcp stream
char* udpPort = argv[3]; // port we use to connect to server's udp info and command
- if ((rv = getaddrinfo(argv[1], tcpPort, &hints, &servinfo)) != 0) {
+ // resolve host
+ int rv;
+ if ((rv = getaddrinfo(argv[1], tcpPort, &hints, &servinfo)) != 0)
+ {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
return 1;
}
-
// loop through all the results and connect to the first we can
for(p = servinfo; p != NULL; p = p->ai_next) {
if ((sockfd = socket(p->ai_family, p->ai_socktype,
@@ -76,34 +74,31 @@ int main(int argc, char *argv[])
perror("client: connect");
continue;
}
-
break;
}
-
if (p == NULL) {
fprintf(stderr, "client: failed to connect\n");
return 2;
}
-
- inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr),
- s, sizeof s);
- // printf("client: connecting to %s\n", s);
-
+ inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof s);
freeaddrinfo(servinfo); // all done with this structure
+ // now that we're connectioned, let's do the handshake
uint16_t num_stations = handle_handshake(sockfd, udpPort);
+ // handle_handshake will end the program on fail
+ // so if we're here, num_stations is correct
fflush(stdout);
printf("Welcome to Snowcast! The server has %u stations.\n", num_stations);
fflush(stdout);
+ // start the thread to accept command lines
pthread_t command_line_thread;
pthread_create(&command_line_thread, NULL, command_line_routine, (void*)sockfd);
- // CONSIDER: could recieve the welcome message here
- // int recvbytes;
+ // this while loop hangs on recv and runs when there is new data
while (1) {
- // recv the first byte of the message to get it's type
+ // get the type of the incoming reply
uint8_t reply_type = -1;
// print size of utin8
if (recv(sockfd, &reply_type, 1, 0) == -1) {
@@ -114,35 +109,38 @@ int main(int argc, char *argv[])
exit(1);
}
- if (reply_type == 2) { // we have a second welcome message
+ if (reply_type == WELCOME) {
fprintf(stderr, "WECLOME reply received twice. Exiting.\n");
close(sockfd);
exit(1);
}
- if (reply_type == 3) { // we have an announce message
-
+ if (reply_type == ANNOUNCE) {
if (!station_is_set) {
fprintf(stderr, "ANNOUNCE reply received before SETSTATION command. Exiting.\n");
close(sockfd);
exit(1);
}
- // get the string size
+ // get the string size for the songname
u_int8_t string_size = -1;
if (recv(sockfd, &string_size, 1, 0) == -1) {
perror("recv in announce");
exit(1);
}
+
+ // read the songname
char *song_name = malloc(string_size);
if(song_name == NULL) { perror("malloc in song name"); }
-
int bytes_to_read = string_size;
if (recv_all(sockfd, song_name, &bytes_to_read) == -1) {
perror("recv_all in announce");
exit(1);
}
+ remove_timeout(sockfd); // have received all, so can remove timeout
+ if (l) printf("received ANNOUNCE reply.\n");
+ // print the songname
if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt
waiting = 0;
fflush(stdout);
@@ -151,7 +149,9 @@ int main(int argc, char *argv[])
fflush(stdout);
free(song_name);
continue;
- } else if (reply_type == 4) { // we have an invalid command message
+ }
+
+ if (reply_type == INVALID) { // we have an invalid command message
// get the string size
u_int8_t string_size = -1;
if (recv(sockfd, &string_size, 1, 0) == -1) {
@@ -170,10 +170,15 @@ int main(int argc, char *argv[])
fflush(stdout);
free(message);
close(sockfd);
+
+ // close the program on all INVALID COMMANDS
exit(1);
}
- else if (reply_type == 6) { // we are getting STATIONINFO
- // get the string size
+
+ if (reply_type == STATIONINFO) { // we are getting STATIONINFO
+ if (l) printf("received STATIONINFO reply.\n");
+
+ // get the string size, which can be farily long (uint32_t)
uint32_t buf_string_size = -1;
int bytes_to_read = sizeof(uint32_t);
if (recv_all(sockfd, &buf_string_size, &bytes_to_read) == -1) {
@@ -182,7 +187,8 @@ int main(int argc, char *argv[])
}
uint32_t string_size = ntohl(buf_string_size);
- printf("string size: %d\n", string_size);
+
+ // recieve the message
char *info = malloc(string_size);
if(info == NULL) { perror("malloc in info"); }
bytes_to_read = string_size;
@@ -190,9 +196,9 @@ int main(int argc, char *argv[])
perror("recv_all 2 in stationinfo");
exit(1);
}
- remove_timeout(sockfd);
- if (l) printf("received STATIONINFO reply.\n");
+ remove_timeout(sockfd); // remove the timeout, now that we have the data
+ // print the info
fflush(stdout);
printf("Station Information:\n%s\n", info);
printf("snowcast_control> ");
@@ -201,32 +207,43 @@ int main(int argc, char *argv[])
free(info);
continue;
}
- else if (reply_type == 7) { // we are getting StationShutdown
+
+ if (reply_type == STATIONSHUTDOWN) { // we are getting StationShutdown
if (l) printf("received STATIONSHUTDOWN reply.\n");
- if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt
+
+ if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt :)
waiting = 0;
- remove_timeout(sockfd);
+
+ // station no longer set
+ station_is_set = 0;
fflush(stdout);
printf("This station has shut down. Please select a different station.\n");
printf("snowcast_control> ");
fflush(stdout);
continue;
}
- else if (reply_type == 8) { // we are getting NewStation
+
+ if (reply_type == NEWSTATION) { // we are getting NewStation
+ if (l) printf("received NEWSTATION reply.\n");
+
+ // get station number
uint16_t station_number = -1;
if (recv(sockfd, &station_number, 2, 0) == -1) {
perror("recv in new station");
exit(1);
}
station_number = ntohs(station_number);
- if (l) printf("received NEWSTATION reply.\n");
+
+ // print
fflush(stdout);
- printf("\nThere is now a new station @ index %d.\n", station_number);
+ printf("\nThere is now a new station @ index %u.\n", station_number);
printf("snowcast_control> ");
fflush(stdout);
+
continue;
}
+ // if we're here, lost conneciton to the server -> end the program
printf("\nsocket to server HUNGUP. Exiting.\n");
close(sockfd);
exit(1);
@@ -235,105 +252,131 @@ int main(int argc, char *argv[])
}
void *command_line_routine(void* args) {
+ // unpack sockfd as arg
int sockfd = (int) args;
+ // buffer for input
char input[LINE_MAX];
printf("Enter a number to change to it's station. Enter q to end stream.\n");
printf("snowcast_control> ");
fflush(stdout);
while (1) {
+ memset(input, 0, LINE_MAX);
char *line = fgets(input, LINE_MAX, stdin);
+ // nothing was typed
if (line == NULL) {
continue;
- } else if (strncmp("q\n", input, LINE_MAX) == 0) {
+ }
+
+ // q command: exit the program
+ if (strncmp("q\n", input, LINE_MAX) == 0) {
// end code if type in q
exiting = 1;
printf("Exiting.\n");
close(sockfd);
exit(0);
- } else if (strncmp("l\n", input, LINE_MAX) == 0) {
+ }
+
+ // l command: STATIONINFO command (EXTRA CREDIT)
+ if (strncmp("l\n", input, LINE_MAX) == 0) {
// send the command to list stations
if (l) printf("sending LISTSTATIONS command. waiting for STATIONINFO reply.\n");
- apply_timeout(sockfd);
+ apply_timeout(sockfd); // apply a timeout, will be released when we get the reply
int list_station_reply_type = 5;
if (send(sockfd, &list_station_reply_type, 1, 0) == -1) {
perror("send");
exit(1);
}
- } else if (strncmp("log\n", input, LINE_MAX) == 0) {
+ continue;
+ }
+
+ // log command: toggle logging
+ if (strncmp("log\n", input, LINE_MAX) == 0)
+ {
l = !l;
printf("LOGGING is now %s!\n", l ? "on" : "off");
printf("snowcast_control> ");
fflush(stdout);
+ continue;
}
- else {
- // convert input to an int
- int inputInt = atoi(input);
- if (input[0] != '0' && inputInt == 0) {
- printf("unknown command: %si", input);
- printf("snowcast_control> ");
- fflush(stdout);
- continue;
- }
- // printf("Changing to station %d.\n", inputInt);
-
- // set waiting so no new line on announce
- waiting = 1;
- // send the command to change the station
- apply_timeout(sockfd);
- struct SetStation setStation;
- setStation.commandType = 1;
- setStation.stationNumber = htons(inputInt);
- int bytes_to_send = sizeof(struct SetStation);
- // apply_timeout(sockfd);
- if (send_all(sockfd, &setStation, &bytes_to_send) == -1) {
- perror("send_all");
- exit(1);
- }
- if (!station_is_set) {
- station_is_set = 1;
- }
+
+ // check if this could be a station number
+ int inputInt = atoi(input);
+ if (input[0] != '0' && inputInt == 0) {
+ // if we're in here, it's ~likely~ not a number (ik it's not perfect, sorry :/)
+ printf("unknown command: %si", input);
+ printf("snowcast_control> ");
+ fflush(stdout);
+ continue;
}
+
+ // if we're here, it was a valid number,
+ // & we need to send SETSTATION command.
+ if (l) printf("sending SETSTATION command.\n");
+ waiting = 1; // just for clean cmd
+
+ // setup struct
+ struct SetStation setStation;
+ setStation.commandType = SETSTATION;
+ setStation.stationNumber = htons(inputInt);
+ // send it
+ apply_timeout(sockfd); // apply timeout, will be released when we get ANNOUNCE reply
+ int bytes_to_send = sizeof(struct SetStation);
+ if (send_all(sockfd, &setStation, &bytes_to_send) == -1) {
+ perror("send_all");
+ exit(1);
+ }
+
+ if (!station_is_set) station_is_set = 1; // update, if needed
+ if (l) printf("waiting for ANNOUNCE reply...\n");
}
return (NULL);
}
uint16_t handle_handshake(int sockfd, char* udp_port) {
- apply_timeout(sockfd); // apply timeout for handshake
+ if (l) printf("found server, sending HELLO command.\n");
- if (l) printf("found server, sending HELLO command. waiting for ANNOUNCE reply.\n");
+ apply_timeout(sockfd); // apply timeout for handshake
+ // after we find connection, we need to send HELLO command
struct Hello hello;
- hello.commandType = 0;
+ hello.commandType = HELLO;
// convert updPort to an int
int udp_port_int = atoi(udp_port);
hello.udpPort = htons(udp_port_int);
- if (send(sockfd, &hello, sizeof(struct Hello), 0) == -1) {
+ int hello_len = sizeof(struct Hello);
+ if (send_all(sockfd, &hello, &hello_len) == -1) {
perror("send");
exit(1);
}
- // recv the first byte of the message to get it's type
+ if (l) printf("waiting for WELCOME reply...\n");
+
+ // read WELCOME reply type
uint8_t reply_type = -1;
// print size of utin8
- if (recv(sockfd, &reply_type, 1, 0) == -1) {
+ if (recv(sockfd, &reply_type, 1, 0) == -1) { // only one byte, so can use recv
perror("recv in handshake");
exit(1);
}
- // recv the message, check for errors too
+ if (reply_type != WELCOME) {
+ fprintf(stderr, "first reply is not WELCOME. Exiting.\n");
+ close(sockfd);
+ exit(1);
+ }
+
+ // get the number of stations
int16_t num_stations = -1;
int bytes_to_read = sizeof(uint16_t);
if (recv_all(sockfd, &num_stations, &bytes_to_read) == -1) {
perror("recv_all in handshake");
exit(1);
}
-
if (l) printf("received ANNOUNCE reply.\n");
- // remove timeout since we no longer are "waiting" for an immediate reply
- remove_timeout(sockfd);
- return ntohs(num_stations);
+ remove_timeout(sockfd); // remove timeout since we no longer are "waiting" for an immediate reply
+ return ntohs(num_stations); // return the num_stations to be printed
} \ No newline at end of file
diff --git a/protocol.h b/protocol.h
index 574537b..fc5b536 100644
--- a/protocol.h
+++ b/protocol.h
@@ -8,6 +8,8 @@
#define LISTSTATIONS 5
#define STATIONINFO 6
+#define STATIONSHUTDOWN 7
+#define NEWSTATION 8
// client to server messages (commands)
diff --git a/select.c b/select.c
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/select.c
diff --git a/server.c b/server.c
index bd2f5f4..456ac47 100644
--- a/server.c
+++ b/server.c
@@ -7,24 +7,17 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <string.h>
-
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/types.h>
-
#include "protocol.c"
#define LINE_MAX 1024
#define MAX_USERS 1000
#define MAX_PATH 50
#define MAX_RATE_PER_SECOND 16*1024 / 2
+#define MAX_PACKET_SIZE 512
-// typedef struct station {
-// int streamFd;
-// char* filePath;
-// int fileBufferSize;
-// char fileBuffer[MAX_STREAM_RATE];
-// } station_t;
typedef struct station {
pthread_t streamThread;
int readfd;
@@ -35,7 +28,6 @@ station_t *stations;
int setup_stations(int argc, char *argv[]);
void *stream_routine(void *arg);
-
typedef struct user {
int udpPort;
int stationNum;
@@ -259,24 +251,26 @@ void *stream_routine_cleanup(void *arg) {
}
void *stream_routine(void *arg) {
+ int BROADCASTS_PER_SECOND = 2;
+ int BROADCAST_OFFSET = 10000;
+
int station_num = (int) arg;
// printf("stream routine %d\n", station_num);
int read_fd = stations[station_num].readfd;
pthread_cleanup_push(stream_routine_cleanup, read_fd);
- // make buffer which will be used to stream to children
+ // make buffer for read_file
char buffer[MAX_RATE_PER_SECOND];
- memset(buffer, 0, MAX_RATE_PER_SECOND);
- // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); }
for (;;)
{
- // load bytes into buffer
+ // load bytes from file into buffer
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
int bytes_read = read_file(read_fd, buffer, station_num);
if (bytes_read == -1) { return (NULL); }
- // TODO: send buffer to children
+ // create the threads to send packets to users, which will be released later
int *send_buffer;
for (int i = 0; i < max_active_users; i++)
{
@@ -284,26 +278,33 @@ void *stream_routine(void *arg) {
continue;
if (user_data[i].stationNum == station_num)
{
- // send the udp packet
+ // prepare the send buffer
+ // (note: using int* for easy pointer assignment)
send_buffer = malloc(2 + bytes_read);
- memset(send_buffer, 0, 2 + bytes_read);
+ if (!send_buffer) { perror("malloc send_buffer in stream_routine"); return (NULL); }
send_buffer[0] = i;
send_buffer[1] = bytes_read;
memcpy(send_buffer+2, buffer, bytes_read);
- // printf("sending udp packet to user %d\n", i);
- pthread_t t;
- pthread_create(&t, NULL, send_udp_packet_routine, send_buffer);
+
+ // make thread
+ pthread_t send_udp_packet_thread;
+ pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer);
}
}
- usleep(1000000 / 2 - 5000);
+
+ // wait for the thread to be created
+ usleep(1000000 / BROADCASTS_PER_SECOND - BROADCAST_OFFSET); // do -1000 for the usleep below
+
+ // let the threads run!
start_threads = 1;
pthread_cond_broadcast(&cond);
- usleep(5000);
+ // give some time to broadcast, then reset variables
+ usleep(BROADCAST_OFFSET);
start_threads = 0;
+ // free the buffer after it's been sent
free(send_buffer);
- memset(buffer, 0, MAX_RATE_PER_SECOND);
}
return (NULL);
@@ -382,7 +383,6 @@ void *print_info_routine(void *arg) {
int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen)
{
- int MAX_PACKET_SIZE = 512;
int total = 0; // how many bytes we've sent
int bytesleft = *len; // how many we have left to send
int n;
@@ -746,12 +746,7 @@ void *init_user_routine(int newfd, int udp_port) {
return (NULL);
}
-// void *update_user_udpPort(int sockfd, int udpPort) {
-// pthread_mutex_lock(&mutex_user_data);
-// pthread_mutex_unlock(&mutex_user_data);
-// return (NULL);
-// }
void *update_user_station(int sockfd, int stationNum) {
pthread_mutex_lock(&mutex_user_data);
user_data[sockfd_to_user[sockfd]].stationNum = stationNum;
@@ -1015,8 +1010,9 @@ void destroy_station(int station_num) {
}
}
+ // cancel the stream's thread and close the read fd
+ pthread_cancel(stations[station_num].streamThread);
close(stations[station_num].readfd);
- // stations[station_num].filePath = NULL;
stations[station_num].readfd = -1;
printf("remove: successfully removed station %d\n", station_num);
@@ -1082,8 +1078,6 @@ void add_station(char *file_path) {
num_stations++;
}
-
-
void cleanup_fds() {
// close all the file descriptors for the stations
for (int i = 0; i < num_stations; i++) {
@@ -1096,4 +1090,6 @@ void cleanup_fds() {
close(user_data[i].sockfd);
}
}
+
+ close(listener);
}
diff --git a/snowcast_control b/snowcast_control
index 01f4d14..dae574f 100755
--- a/snowcast_control
+++ b/snowcast_control
Binary files differ
diff --git a/snowcast_server b/snowcast_server
index 55a64fb..cb0c2f1 100755
--- a/snowcast_server
+++ b/snowcast_server
Binary files differ