aboutsummaryrefslogtreecommitdiff
path: root/server.c
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-24 07:45:27 +0000
committersotech117 <michael_foiani@brown.edu>2023-09-24 07:45:27 +0000
commit538d22688c4ae07d8ee9b9d1ec2ebbe80adddf47 (patch)
tree8f5060a14231f3c0d9c63a27fac79dc242b9a297 /server.c
parent2acabfaf7308be7517d948743cc6ab9660566327 (diff)
more extra credit :)
Diffstat (limited to 'server.c')
-rw-r--r--server.c141
1 files changed, 135 insertions, 6 deletions
diff --git a/server.c b/server.c
index d854aa3..e6b4867 100644
--- a/server.c
+++ b/server.c
@@ -49,7 +49,7 @@ int count = 0;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER;
const char *port;
// int num_stations;
@@ -97,6 +97,10 @@ void *send_stationsinfo_reply(void *arg);
int send_welcome_reply(int fd);
void handle_setstation_command(int fd);
+void add_station(char *station_name);
+void destroy_station(int station_num);
+void send_stationshutdown_reply(int fd);
+
uint16_t handle_handshake(int newfd);
void cleanup_fds();
@@ -145,7 +149,7 @@ main(int argc, char *argv[])
fflush(stdout);
while (read(STDIN_FILENO, input, LINE_MAX) > 0) {
// init tokens
- memset(tokens, 0, (LINE_MAX / 2) * sizeof(char *));
+ memset(tokens, 0, LINE_MAX / 2);
// if 0, all whitespace
if (!parse(input, tokens)) {
@@ -174,6 +178,7 @@ main(int argc, char *argv[])
if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1)
{
perror("open");
+ memset(input, 0, LINE_MAX);
continue;
}
} else {
@@ -197,10 +202,26 @@ main(int argc, char *argv[])
l= !l;
printf("logging is now %s\n", l ? "on" : "off");
}
+ else if (!strcmp(command, "add")) { // add a new station
+ // get the file path
+ char *file_path = tokens[1];
+ if (file_path == NULL) {
+ printf("add: must provide a file path\n");
+ } else add_station(file_path); // add the station
+ }
+ else if (!strcmp(command, "remove")) { // remove a station
+ if (tokens[1] == NULL) {
+ printf("remove: must provide a station number to remove\n");
+ }
+ else if ((tokens[1] != '\0' && atoi(tokens[1]) == 0)) {
+ printf("remove: must provide a number\n");
+ } else destroy_station(atoi(tokens[1]));
+ }
else {
- printf("unknown command:\t%s\n", command);
+ printf("unknown command: %s\n", command);
}
+ memset(input, 0, LINE_MAX);
printf("snowcast_server> ");
fflush(stdout);
}
@@ -209,8 +230,11 @@ main(int argc, char *argv[])
}
int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) {
+ // see if fd was closed at some point
+ if (fd == -1) return -1;
+
int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
- if (bytes_read < 0) { perror("read (from station file)"); return -1; }
+ if (bytes_read < 0) { perror("read (in read file)"); return -1; }
// printf("bytes read: %d\n", bytes_read);
if (bytes_read == 0) {
// printf("end of file, restarting\n");
@@ -223,7 +247,7 @@ int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) {
return -1;
}
bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
- if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; }
+ if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; }
}
return bytes_read;
@@ -250,7 +274,7 @@ void *stream_routine(void *arg) {
{
// load bytes into buffer
int bytes_read = read_file(read_fd, buffer, station_num);
- if (bytes_read == -1) { exit(1); }
+ if (bytes_read == -1) { return (NULL); }
// TODO: send buffer to children
char *send_buffer = malloc(2 + bytes_read);
@@ -918,10 +942,115 @@ void handle_setstation_command(int sockfd) {
return;
}
+ // check if station num has been removed
+ if (stations[station_number].readfd == -1) {
+ send_stationshutdown_reply(sockfd);
+ return;
+ }
+
update_user_station(sockfd, station_number);
send_announce_reply(sockfd, station_number);
}
+void send_stationshutdown_reply(int fd) {
+ if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd);
+
+ uint8_t reply_num = 7;
+ if (send(fd, &reply_num, 1, 0) == -1)
+ perror("send in send stationshutdown");
+}
+
+void destroy_station(int station_num) {
+ // check if station num is in range
+ if (station_num >= num_stations || station_num < 0) {
+ printf("remove: station number %d is out of range\n", station_num);
+ return;
+ }
+ // check if station had been removed
+ if (stations[station_num].readfd == -1) {
+ printf("remove: station %d has already been removed\n", station_num);
+ return;
+ }
+
+ // send the stationshutdown command to users
+ for (int j = 0; j < max_active_users; j++) {
+ if (!user_data[j].sockfd || user_data[j].sockfd == -1)
+ continue;
+ if (user_data[j].stationNum == station_num) {
+ send_stationshutdown_reply(user_data[j].sockfd);
+ user_data[j].stationNum = -1;
+ }
+ }
+
+ close(stations[station_num].readfd);
+ // stations[station_num].filePath = NULL;
+ stations[station_num].readfd = -1;
+
+ printf("remove: successfully removed station %d\n", station_num);
+}
+
+void send_newstation_reply(uint16_t station_num) {
+ if (l) printf("sending NEWSTATION reply to all sockets\n");
+
+ uint8_t reply_num = 8;
+ uint16_t station_num_n = htons(station_num);
+ struct NewStation new_station = {reply_num, station_num_n};
+ for (int i = 0; i < max_active_users; i++) {
+ if (!user_data[i].sockfd || user_data[i].sockfd == -1)
+ continue;
+ int bytes_to_send = sizeof(struct NewStation);
+ if (send_all(user_data[i].sockfd, &new_station, &bytes_to_send) == -1)
+ perror("send_all in newstation reply");
+ }
+}
+
+void add_station(char *file_path) {
+ // run through array and see if we've seen file path before
+ for (int i = 0; i < num_stations; i++) {
+ if (strcmp(stations[i].filePath, file_path) == 0) {
+ // this station is still active
+ if (stations[i].readfd != -1) {
+ printf("add: (warning) filepath %s is already streaming on station %d\n", file_path, i);
+ }
+ else {
+ // this station has been removed
+ // reopen the file
+ stations[i].readfd = open(file_path, O_RDONLY);
+ if (stations[i].readfd < 0) { perror("read (from add station)"); return; }
+ pthread_create(&stations[i].streamThread, NULL, stream_routine, i);
+ send_newstation_reply(i);
+ return;
+ }
+ }
+ }
+
+ // get the size to malloc
+ int totalSize = 0;
+ for(int i = 0; i < num_stations; i++)
+ {
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(stations[i].filePath);
+ }
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(file_path);
+
+ // malloc the stations array
+ char *more_stations = realloc(stations, totalSize);
+ if (!more_stations) { perror("malloc (stations pointer)"); return; }
+ stations = more_stations;
+ // assign the stations, and start the threads
+ stations[num_stations].filePath = malloc(strlen(file_path));
+ memcpy(stations[num_stations].filePath, file_path, strlen(file_path));
+ stations[num_stations].readfd = open(file_path, O_RDONLY);
+ if (stations[num_stations].readfd < 0) { perror("read (from add station)"); return; }
+ pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, num_stations);
+ send_newstation_reply(num_stations);
+
+ printf("add: successfully created station @ index %d\n", num_stations);
+
+ num_stations++;
+}
+
+
+
void cleanup_fds() {
// // close all the file descriptors
// for (int i = 0; i < num_stations; i++) {