aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-24 07:45:27 +0000
committersotech117 <michael_foiani@brown.edu>2023-09-24 07:45:27 +0000
commit538d22688c4ae07d8ee9b9d1ec2ebbe80adddf47 (patch)
tree8f5060a14231f3c0d9c63a27fac79dc242b9a297
parent2acabfaf7308be7517d948743cc6ab9660566327 (diff)
more extra credit :)
-rw-r--r--client.c30
-rwxr-xr-xmove0
-rw-r--r--protocol.h9
-rw-r--r--server.c141
-rwxr-xr-xsnowcast_controlbin18768 -> 18808 bytes
-rwxr-xr-xsnowcast_serverbin33248 -> 37568 bytes
6 files changed, 171 insertions, 9 deletions
diff --git a/client.c b/client.c
index 4ca1165..6773c86 100644
--- a/client.c
+++ b/client.c
@@ -143,7 +143,7 @@ int main(int argc, char *argv[])
exit(1);
}
- if (!waiting) printf("\n", waiting); // note: this is worth the lines for a clean cmd prompt
+ if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt
waiting = 0;
fflush(stdout);
printf("New song announced: %s\n", song_name);
@@ -197,6 +197,31 @@ int main(int argc, char *argv[])
free(info);
continue;
}
+ else if (reply_type == 7) { // we are getting StationShutdown
+ if (l) printf("received STATIONSHUTDOWN reply.\n");
+ if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt
+ waiting = 0;
+ remove_timeout(sockfd);
+ fflush(stdout);
+ printf("This station has shut down. Please select different station.\n");
+ printf("snowcast_control> ");
+ fflush(stdout);
+ continue;
+ }
+ else if (reply_type == 8) { // we are getting NewStation
+ uint16_t station_number = -1;
+ if (recv(sockfd, &station_number, 2, 0) == -1) {
+ perror("recv in new station");
+ exit(1);
+ }
+ station_number = ntohs(station_number);
+ if (l) printf("received NEWSTATION reply.\n");
+ fflush(stdout);
+ printf("\nThere is now a new station @ index %d.\n", station_number);
+ printf("snowcast_control> ");
+ fflush(stdout);
+ continue;
+ }
printf("\nsocket to server HUNGUP. Exiting.\n");
close(sockfd);
@@ -243,7 +268,7 @@ void *command_line_routine(void* args) {
// convert input to an int
int inputInt = atoi(input);
if (input[0] != '0' && inputInt == 0) {
- printf("unknown command\n");
+ printf("unknown command: %s\n", input);
printf("snowcast_control> ");
fflush(stdout);
continue;
@@ -252,7 +277,6 @@ void *command_line_routine(void* args) {
// set waiting so no new line on announce
waiting = 1;
-
// send the command to change the station
apply_timeout(sockfd);
struct SetStation setStation;
diff --git a/move b/move
new file mode 100755
index 0000000..e69de29
--- /dev/null
+++ b/move
diff --git a/protocol.h b/protocol.h
index 33ffb07..2b48ef4 100644
--- a/protocol.h
+++ b/protocol.h
@@ -39,6 +39,15 @@ struct StationInfo {
char *infoString;
} __attribute__((packed));
+struct StationShutdown {
+ uint8_t replyType; // 7
+} __attribute__((packed));
+
+struct NewStation {
+ uint8_t replyType; // 8
+ uint16_t stationNumber;
+} __attribute__((packed));
+
int send_all(int sock, char *buf, int *len);
int recv_all(int sock, char *buf, int *len);
diff --git a/server.c b/server.c
index d854aa3..e6b4867 100644
--- a/server.c
+++ b/server.c
@@ -49,7 +49,7 @@ int count = 0;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER;
const char *port;
// int num_stations;
@@ -97,6 +97,10 @@ void *send_stationsinfo_reply(void *arg);
int send_welcome_reply(int fd);
void handle_setstation_command(int fd);
+void add_station(char *station_name);
+void destroy_station(int station_num);
+void send_stationshutdown_reply(int fd);
+
uint16_t handle_handshake(int newfd);
void cleanup_fds();
@@ -145,7 +149,7 @@ main(int argc, char *argv[])
fflush(stdout);
while (read(STDIN_FILENO, input, LINE_MAX) > 0) {
// init tokens
- memset(tokens, 0, (LINE_MAX / 2) * sizeof(char *));
+ memset(tokens, 0, LINE_MAX / 2);
// if 0, all whitespace
if (!parse(input, tokens)) {
@@ -174,6 +178,7 @@ main(int argc, char *argv[])
if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1)
{
perror("open");
+ memset(input, 0, LINE_MAX);
continue;
}
} else {
@@ -197,10 +202,26 @@ main(int argc, char *argv[])
l= !l;
printf("logging is now %s\n", l ? "on" : "off");
}
+ else if (!strcmp(command, "add")) { // add a new station
+ // get the file path
+ char *file_path = tokens[1];
+ if (file_path == NULL) {
+ printf("add: must provide a file path\n");
+ } else add_station(file_path); // add the station
+ }
+ else if (!strcmp(command, "remove")) { // remove a station
+ if (tokens[1] == NULL) {
+ printf("remove: must provide a station number to remove\n");
+ }
+ else if ((tokens[1] != '\0' && atoi(tokens[1]) == 0)) {
+ printf("remove: must provide a number\n");
+ } else destroy_station(atoi(tokens[1]));
+ }
else {
- printf("unknown command:\t%s\n", command);
+ printf("unknown command: %s\n", command);
}
+ memset(input, 0, LINE_MAX);
printf("snowcast_server> ");
fflush(stdout);
}
@@ -209,8 +230,11 @@ main(int argc, char *argv[])
}
int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) {
+ // see if fd was closed at some point
+ if (fd == -1) return -1;
+
int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
- if (bytes_read < 0) { perror("read (from station file)"); return -1; }
+ if (bytes_read < 0) { perror("read (in read file)"); return -1; }
// printf("bytes read: %d\n", bytes_read);
if (bytes_read == 0) {
// printf("end of file, restarting\n");
@@ -223,7 +247,7 @@ int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) {
return -1;
}
bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
- if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; }
+ if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; }
}
return bytes_read;
@@ -250,7 +274,7 @@ void *stream_routine(void *arg) {
{
// load bytes into buffer
int bytes_read = read_file(read_fd, buffer, station_num);
- if (bytes_read == -1) { exit(1); }
+ if (bytes_read == -1) { return (NULL); }
// TODO: send buffer to children
char *send_buffer = malloc(2 + bytes_read);
@@ -918,10 +942,115 @@ void handle_setstation_command(int sockfd) {
return;
}
+ // check if station num has been removed
+ if (stations[station_number].readfd == -1) {
+ send_stationshutdown_reply(sockfd);
+ return;
+ }
+
update_user_station(sockfd, station_number);
send_announce_reply(sockfd, station_number);
}
+void send_stationshutdown_reply(int fd) {
+ if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd);
+
+ uint8_t reply_num = 7;
+ if (send(fd, &reply_num, 1, 0) == -1)
+ perror("send in send stationshutdown");
+}
+
+void destroy_station(int station_num) {
+ // check if station num is in range
+ if (station_num >= num_stations || station_num < 0) {
+ printf("remove: station number %d is out of range\n", station_num);
+ return;
+ }
+ // check if station had been removed
+ if (stations[station_num].readfd == -1) {
+ printf("remove: station %d has already been removed\n", station_num);
+ return;
+ }
+
+ // send the stationshutdown command to users
+ for (int j = 0; j < max_active_users; j++) {
+ if (!user_data[j].sockfd || user_data[j].sockfd == -1)
+ continue;
+ if (user_data[j].stationNum == station_num) {
+ send_stationshutdown_reply(user_data[j].sockfd);
+ user_data[j].stationNum = -1;
+ }
+ }
+
+ close(stations[station_num].readfd);
+ // stations[station_num].filePath = NULL;
+ stations[station_num].readfd = -1;
+
+ printf("remove: successfully removed station %d\n", station_num);
+}
+
+void send_newstation_reply(uint16_t station_num) {
+ if (l) printf("sending NEWSTATION reply to all sockets\n");
+
+ uint8_t reply_num = 8;
+ uint16_t station_num_n = htons(station_num);
+ struct NewStation new_station = {reply_num, station_num_n};
+ for (int i = 0; i < max_active_users; i++) {
+ if (!user_data[i].sockfd || user_data[i].sockfd == -1)
+ continue;
+ int bytes_to_send = sizeof(struct NewStation);
+ if (send_all(user_data[i].sockfd, &new_station, &bytes_to_send) == -1)
+ perror("send_all in newstation reply");
+ }
+}
+
+void add_station(char *file_path) {
+ // run through array and see if we've seen file path before
+ for (int i = 0; i < num_stations; i++) {
+ if (strcmp(stations[i].filePath, file_path) == 0) {
+ // this station is still active
+ if (stations[i].readfd != -1) {
+ printf("add: (warning) filepath %s is already streaming on station %d\n", file_path, i);
+ }
+ else {
+ // this station has been removed
+ // reopen the file
+ stations[i].readfd = open(file_path, O_RDONLY);
+ if (stations[i].readfd < 0) { perror("read (from add station)"); return; }
+ pthread_create(&stations[i].streamThread, NULL, stream_routine, i);
+ send_newstation_reply(i);
+ return;
+ }
+ }
+ }
+
+ // get the size to malloc
+ int totalSize = 0;
+ for(int i = 0; i < num_stations; i++)
+ {
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(stations[i].filePath);
+ }
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(file_path);
+
+ // malloc the stations array
+ char *more_stations = realloc(stations, totalSize);
+ if (!more_stations) { perror("malloc (stations pointer)"); return; }
+ stations = more_stations;
+ // assign the stations, and start the threads
+ stations[num_stations].filePath = malloc(strlen(file_path));
+ memcpy(stations[num_stations].filePath, file_path, strlen(file_path));
+ stations[num_stations].readfd = open(file_path, O_RDONLY);
+ if (stations[num_stations].readfd < 0) { perror("read (from add station)"); return; }
+ pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, num_stations);
+ send_newstation_reply(num_stations);
+
+ printf("add: successfully created station @ index %d\n", num_stations);
+
+ num_stations++;
+}
+
+
+
void cleanup_fds() {
// // close all the file descriptors
// for (int i = 0; i < num_stations; i++) {
diff --git a/snowcast_control b/snowcast_control
index d32ecdb..5cab9dd 100755
--- a/snowcast_control
+++ b/snowcast_control
Binary files differ
diff --git a/snowcast_server b/snowcast_server
index b98a780..e217218 100755
--- a/snowcast_server
+++ b/snowcast_server
Binary files differ