aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-25 19:02:19 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-25 19:02:19 -0400
commit8c6ae1ecde9faa0af5dacaf7ecf0f9cf47b69159 (patch)
tree87671ddc40b678e0ec6592015d0e5d26e0df1864
parentc534d8e28a00c9762fcb4ef2bdeb9a735ae26b75 (diff)
more refactoring and commenting
-rw-r--r--client.c4
-rw-r--r--protocol.c5
-rw-r--r--server.c222
-rwxr-xr-xsnowcast_controlbin18848 -> 22968 bytes
-rwxr-xr-xsnowcast_serverbin37856 -> 37816 bytes
5 files changed, 119 insertions, 112 deletions
diff --git a/client.c b/client.c
index f9617cd..df9f731 100644
--- a/client.c
+++ b/client.c
@@ -97,7 +97,7 @@ int main(int argc, char *argv[])
pthread_create(&command_line_thread, NULL, command_line_routine, (void*)sockfd);
// this while loop hangs on recv and runs when there is new data
- while (1) {
+ while (69) {
// get the type of the incoming reply
uint8_t reply_type = -1;
// print size of utin8
@@ -261,7 +261,7 @@ void *command_line_routine(void* args) {
printf("Enter a number to change to it's station. Enter q to end stream.\n");
printf("snowcast_control> ");
fflush(stdout);
- while (1) {
+ while (420) {
memset(input, 0, LINE_MAX);
char *line = fgets(input, LINE_MAX, stdin);
diff --git a/protocol.c b/protocol.c
index 5c8127f..9c8c65b 100644
--- a/protocol.c
+++ b/protocol.c
@@ -4,6 +4,7 @@
#include "protocol.h"
#define TCP_TIMEOUT 100000 // 100ms in microseconds
+#define MAX_PACKET_SIZE 512
int send_all(int sock, char *buf, int *len)
{
@@ -18,8 +19,10 @@ int send_all(int sock, char *buf, int *len)
int bytesleft = *len; // how many we have left to send
int n;
+ // ensure we don't send more than MAX_PACKET_SIZE bytes
+ size_t max_send = bytesleft >= MAX_PACKET_SIZE ? MAX_PACKET_SIZE : bytesleft;
while(total < *len) {
- n = send(sock, buf+total, bytesleft, 0);
+ n = send(sock, buf+total, max_send, 0);
if (n == -1) { break; }
total += n;
bytesleft -= n;
diff --git a/server.c b/server.c
index 456ac47..a02fc93 100644
--- a/server.c
+++ b/server.c
@@ -15,8 +15,11 @@
#define LINE_MAX 1024
#define MAX_USERS 1000
#define MAX_PATH 50
-#define MAX_RATE_PER_SECOND 16*1024 / 2
-#define MAX_PACKET_SIZE 512
+#define BROADCAST_OFFSET 10000
+#define MAX_RATE_PER_SECOND 16 * 1024
+#define BROADCASTS_PER_SECOND 2
+
+#define FILE_READ_SIZE MAX_RATE_PER_SECOND / BROADCASTS_PER_SECOND
typedef struct station {
pthread_t streamThread;
@@ -43,7 +46,6 @@ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER;
-const char *port;
// int num_stations;
int start_threads = 0;
@@ -65,7 +67,7 @@ struct udp_packet_routine_args {
};
void *send_udp_packet_routine(void* arg);
-void setup_listener();
+int setup_listener(const char* port);
void *select_routine(void *arg);
void *send_announce_routine(void* arg);
@@ -93,50 +95,58 @@ void send_stationshutdown_reply(int fd);
uint16_t handle_handshake(int newfd);
-void cleanup_fds();
+void cleanup_readfds_and_sockets();
-int l = 0;
+int l = 0; // boolean for logging -> if (l) { printf("..."); }
+fd_set master; // master file descriptor (sockets) list
+int fdmax; // maximum file descriptor number
main(int argc, char *argv[])
{
- // check and assign arguments
+ // CHECK AND USE ARGUMENTS
+ // -------------------------------------------------------------------------------------------------
if (argc < 3) {
fprintf(stderr,"usage: ./snowcast_server <listen port> <file0> [file 1] [file 2] ... \n");
exit(1);
}
- // initizlize the port
- port = argv[1];
+ // assign port
+ const char* server_port = argv[1];
- // initialize the stations & their threads
+ // initialize the stations & their threads, from
if (setup_stations(argc, argv) == -1) {
perror("setup_stations");
exit(1);
}
- // make array of user data
- // printf("max active users: %d\n", sizeof(user_t) * max_active_users);
+ // INTIALIZE DATA STRUCTURES
+ // -------------------------------------------------------------------------------------------------
+ // make pointers to array of user data
user_data = malloc(sizeof(user_t) * max_active_users);
if (!user_data) { perror("malloc userdata"); return 1; }
sockfd_to_user = malloc(sizeof(int) * max_active_users);
if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; }
- // setup the listener
- setup_listener();
+
+ // SETUP SOCKETS TO LISTEN FOR NEW CONNECTIONS
+ // -------------------------------------------------------------------------------------------------
+ // setup the listener fd to accept new connections
+ FD_ZERO(&master); // clear the master set
+ int listenerfd = setup_listener(server_port);
// make and start "select" thread that manages:
// 1) new connections, 2) requests from current connections, 3) closing connections
pthread_t select_thread;
- pthread_create(&select_thread, NULL, select_routine, NULL);
+ pthread_create(&select_thread, NULL, select_routine, listenerfd);
- // start syncchronization thread to broadcast stations
- // pthread_t sync_thread;
- // pthread_create(&sync_thread, NULL, sync_routine, NULL);
-
- // command line interface
+ // BELOW IS FOR THE COMMAND LINE INTERFACE
+ // -------------------------------------------------------------------------------------------------
+
+ // command line data structures
char input[LINE_MAX];
memset(input, 0, LINE_MAX);
char *tokens[LINE_MAX / 2];
+ // memset(tokens, 0, LINE_MAX / 2);
printf("snowcast_server> ");
fflush(stdout);
@@ -152,18 +162,21 @@ main(int argc, char *argv[])
}
char *command = tokens[0];
- // if q, shutdown!
+
+ // if q command: shutdown & close tcp sockets!
if (!strcmp(command, "q")) {
printf("Exiting.\n");
- cleanup_fds();
+ cleanup_readfds_and_sockets();
+ close(listenerfd);
break;
}
- // if p, print info
+ // if p command: print info
else if (!strcmp(command, "p")) {
- // get the file descriptor
- int print_fd = 0;
- // see if there is a file path
+ int print_fd;
+
+ // see if there is a file path to print to,
+ // if so, open it into print_fd
char *output_file_path = tokens[1];
if (output_file_path != NULL)
{
@@ -173,34 +186,37 @@ main(int argc, char *argv[])
memset(input, 0, LINE_MAX);
continue;
}
- } else {
- print_fd = STDOUT_FILENO;
- }
- // printf("print_fd: %d\n", print_fd);
- // pthread_t print_info_thread;
- // pthread_create(&print_info_thread, NULL, print_info_routine, print_fd);
+ } else print_fd = STDOUT_FILENO;
+
+ // print the info
print_info_routine((void *)print_fd);
- // note - this file descriptor is closed in the thread
}
+
+ // if u command: print user data (debugging)
else if (!strcmp(command, "u"))
- {
for (int i = 0; i < max_active_users; i++) print_user_data(i);
- }
+
+ // if u command: print station data (debugging)
else if (!strcmp(command, "s"))
- {
for (int i = 0; i < num_stations; i++) print_station_data(i);
- }
+
+ // if log command: start logging (debugging)
else if (!strcmp(command, "log")) {
l= !l;
printf("logging is now %s\n", l ? "on" : "off");
}
+
+ // if add command: add a station (EXTRA CREDIT)
else if (!strcmp(command, "add")) { // add a new station
// get the file path
char *file_path = tokens[1];
+
if (file_path == NULL) {
printf("add: must provide a file path\n");
} else add_station(file_path); // add the station
}
+
+ // if add command: remove a station (EXTRA CREDIT)
else if (!strcmp(command, "remove")) { // remove a station
if (tokens[1] == NULL) {
printf("remove: must provide a station number to remove\n");
@@ -209,6 +225,7 @@ main(int argc, char *argv[])
printf("remove: must provide a number\n");
} else destroy_station(atoi(tokens[1]));
}
+
else {
printf("unknown command: %s\n", command);
}
@@ -217,15 +234,14 @@ main(int argc, char *argv[])
printf("snowcast_server> ");
fflush(stdout);
}
-
return 0;
}
-int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) {
+int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num) {
// see if fd was closed at some point
if (fd == -1) return -1;
- int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ int bytes_read = read(fd, buffer, FILE_READ_SIZE);
if (bytes_read < 0) { perror("read (in read file)"); return -1; }
// printf("bytes read: %d\n", bytes_read);
if (bytes_read == 0) {
@@ -238,10 +254,9 @@ int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) {
perror("lseek (in resarting file)");
return -1;
}
- bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ bytes_read = read(fd, buffer, FILE_READ_SIZE);
if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; }
}
-
return bytes_read;
}
@@ -251,9 +266,6 @@ void *stream_routine_cleanup(void *arg) {
}
void *stream_routine(void *arg) {
- int BROADCASTS_PER_SECOND = 2;
- int BROADCAST_OFFSET = 10000;
-
int station_num = (int) arg;
// printf("stream routine %d\n", station_num);
int read_fd = stations[station_num].readfd;
@@ -261,17 +273,18 @@ void *stream_routine(void *arg) {
pthread_cleanup_push(stream_routine_cleanup, read_fd);
// make buffer for read_file
- char buffer[MAX_RATE_PER_SECOND];
+ char buffer[FILE_READ_SIZE];
for (;;)
{
// load bytes from file into buffer
- memset(buffer, 0, MAX_RATE_PER_SECOND);
+ memset(buffer, 0, FILE_READ_SIZE);
int bytes_read = read_file(read_fd, buffer, station_num);
if (bytes_read == -1) { return (NULL); }
// create the threads to send packets to users, which will be released later
int *send_buffer;
+ int sent = 0;
for (int i = 0; i < max_active_users; i++)
{
if (!user_data[i].sockfd || user_data[i].sockfd == -1)
@@ -289,6 +302,7 @@ void *stream_routine(void *arg) {
// make thread
pthread_t send_udp_packet_thread;
pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer);
+ sent = 1;
}
}
@@ -304,7 +318,7 @@ void *stream_routine(void *arg) {
start_threads = 0;
// free the buffer after it's been sent
- free(send_buffer);
+ if (sent) free(send_buffer);
}
return (NULL);
@@ -337,6 +351,7 @@ int setup_stations(int argc, char *argv[]) {
return 1;
}
+// helper to write int as a string format buffer
void write_int_to_fd(int fd, int n) {
int len = snprintf(NULL, 0, "%d", n);
char *num = malloc(len + 1);
@@ -350,10 +365,11 @@ void write_int_to_fd(int fd, int n) {
}
void *print_info_routine(void *arg) {
+ // unpack the fd to print to
int print_fd = (int) arg;
- // printf("thread print_fd: %d\n", print_fd);
- // printf("num_stations: %d\n", num_stations);
+
for (int i = 0; i < num_stations; i++) {
+ // prints each station in the desired format
write_int_to_fd(print_fd, i);
char *comma = ",";
write(print_fd, comma, strlen(comma));
@@ -362,11 +378,12 @@ void *print_info_routine(void *arg) {
char* file_path = stations[i].filePath;
write(print_fd, file_path, strlen(file_path));
+ // go through users, and print the udp ports
for (int j = 0; j < max_active_users; j++) {
if (!user_data[j].sockfd || user_data[j].sockfd == -1)
continue;
if (user_data[j].stationNum == i) {
- char *localhost_ip = ",127.0.0.1:";
+ char *localhost_ip = ",127.0.0.1:"; //TODO: possibly update
write(print_fd, localhost_ip, strlen(localhost_ip));
// write udpPort
write_int_to_fd(print_fd, user_data[j].udpPort);
@@ -377,7 +394,9 @@ void *print_info_routine(void *arg) {
write(print_fd, newline, strlen(newline));
}
+ // close the fd
if (print_fd != STDOUT_FILENO) close(print_fd);
+
return (NULL);
}
@@ -445,8 +464,6 @@ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, stru
}
*udp_sockfd = sock;
- // only these two properties of addrinfo are needed to send udp
- // TODO: FIX, or reroute somewhere else
*addrlen = thread_res->ai_addrlen;
*addr = *thread_res->ai_addr;
@@ -464,12 +481,13 @@ void *send_udp_packet_routine(void *arg) {
// setup variables
int did_work = 1;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
+ // only these two following properties of addrinfo are needed to send udp packets
+ struct sockaddr addr;
+ socklen_t addrlen;
// setup udp socket for this thread/user
int udp_port = user_data[user_index].udpPort;
int udp_sockfd;
- struct sockaddr addr;
- socklen_t addrlen;
if (setup_udp_connection(udp_port, &udp_sockfd, &addrlen, &addr) == -1){
fprintf(stderr, "failure in setup_udp_connection");
return (NULL); // error occured
@@ -504,54 +522,34 @@ void *send_udp_packet_routine(void *arg) {
}
void *send_announce_routine(void *arg) {
- // unpack args
+ // unpack arg
int station_num = (int) arg;
// send the announce messages
for (int i = 0; i < max_active_users; i++)
{
- if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1) {
+ if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1)
continue;
- }
- // update the station of each user
- if (user_data[i].stationNum == station_num)
- {
+ // send announce reply to each user
+ if (user_data[i].stationNum == station_num)
send_announce_reply(user_data[i].sockfd, station_num);
- }
}
}
-
-fd_set master; // master file descriptor list
-int listener; // listening socket descriptor
-int fdmax; // maximum file descriptor number
-void setup_listener() {
- int newfd; // newly accept()ed socket descriptor
-
- char buf[256]; // buffer for client data
- int nbytes;
-
-
- char remoteIP[INET6_ADDRSTRLEN];
-
- int yes=1; // for setsockopt() SO_REUSEADDR, below
- int i, j, rv;
-
+int setup_listener(const char* port) {
+ int listener; // listening socket descriptor, to be set
+
+ // setup and bind to listener
struct addrinfo hints, *ai, *p;
-
- // const char* port = argv[1];
-
- FD_ZERO(&master); // clear the master set
-
- // LISTENER: get us a socket and bind it
memset(&hints, 0, sizeof hints);
- hints.ai_family = AF_INET;
- hints.ai_socktype = SOCK_STREAM;
- hints.ai_flags = AI_PASSIVE;
- if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) {
+ hints.ai_family = AF_INET; // Ipv4
+ hints.ai_socktype = SOCK_STREAM; // TCP
+ hints.ai_flags = AI_PASSIVE; // auto select
+ int rv;
+ if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0)
+ {
fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv));
exit(1);
}
-
for(p = ai; p != NULL; p = p->ai_next) {
listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
if (listener < 0) {
@@ -559,16 +557,15 @@ void setup_listener() {
}
// lose the pesky "address already in use" error message
+ int yes=1;
setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) {
close(listener);
continue;
}
-
break;
}
-
// if we got here, it means we didn't get bound
if (p == NULL) {
fprintf(stderr, "snowcast_server: failed to bind\n");
@@ -586,8 +583,11 @@ void setup_listener() {
// add the listener to the master set
FD_SET(listener, &master);
- // keep track of the biggest file descriptor
- fdmax = listener; // so far, it's this one
+ // initialize fdmax. listener will be greatest,
+ // since after the stations are cerated
+ fdmax = listener;
+
+ return listener;
}
int handle_client_command(int clientfd) {
@@ -624,7 +624,7 @@ int handle_client_command(int clientfd) {
if (command_type == LISTSTATIONS) {
if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd);
- pthread_t send_thread; // send STATIONINFO in a different thread
+ pthread_t send_thread; // send STATIONINFO from a different thread
pthread_create(&send_thread, NULL, send_stationsinfo_reply, clientfd);
return 1;
}
@@ -637,8 +637,9 @@ int handle_client_command(int clientfd) {
return 0;
}
-int handle_new_connection() {
- // handle new connections
+int handle_new_connection(int listener) {
+
+ // accept new connection
struct sockaddr_storage remoteaddr;
socklen_t addrlen = sizeof remoteaddr;
int newfd = accept(listener,
@@ -670,10 +671,13 @@ int handle_new_connection() {
}
void *select_routine(void *arg) {
+ int listener = (int) arg; // the listener fd
+
fd_set read_fds; // temp file descriptor list for select()
FD_ZERO(&read_fds); // clear the temp set
- while(1) {
+ for (;;)
+ {
read_fds = master; // copy set
if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) {
@@ -689,7 +693,7 @@ void *select_routine(void *arg) {
// if listener, then we need to add this new connection
if (i == listener) {
- handle_new_connection();
+ handle_new_connection(listener);
continue;
}
@@ -836,7 +840,7 @@ void *send_stationsinfo_reply(void * arg) {
uint32_t reply_size = 0;
for (int i = 0; i < num_stations; i++) {
if (stations[i].readfd != -1)
- reply_size += snprintf(NULL, 0, "%d,%s\n", i, stations[i].filePath);
+ reply_size += snprintf(NULL, 0, "station #%d -> %s\n", i, stations[i].filePath);
}
reply_size--; // don't want final \n
@@ -851,14 +855,10 @@ void *send_stationsinfo_reply(void * arg) {
if (send_all(fd, &reply_size_endian, &bytes) == -1)
perror("send_all in send stations info");
-
-
- printf("SENT reply size: %d\n", reply_size);
-
char send_buffer[reply_size];
int ptr = 0;
for (int i = 0; i < num_stations; i++) {
- ptr += sprintf(send_buffer + ptr, "%d,%s\n", i, stations[i].filePath);
+ ptr += sprintf(send_buffer + ptr, "station %d -> %s\n", i, stations[i].filePath);
}
int bytes_to_send = reply_size; // don't want final \n
@@ -1021,9 +1021,11 @@ void destroy_station(int station_num) {
void send_newstation_reply(uint16_t station_num) {
if (l) printf("sending NEWSTATION reply to all sockets\n");
+ // make the struct
uint8_t reply_num = 8;
uint16_t station_num_n = htons(station_num);
struct NewStation new_station = {reply_num, station_num_n};
+ // send the message to each (valid) user
for (int i = 0; i < max_active_users; i++) {
if (!user_data[i].sockfd || user_data[i].sockfd == -1)
continue;
@@ -1047,6 +1049,7 @@ void add_station(char *file_path) {
stations[i].readfd = open(file_path, O_RDONLY);
if (stations[i].readfd < 0) { perror("read (from add station)"); return; }
pthread_create(&stations[i].streamThread, NULL, stream_routine, i);
+
send_newstation_reply(i);
return;
}
@@ -1069,16 +1072,19 @@ void add_station(char *file_path) {
stations[num_stations].filePath = malloc(strlen(file_path));
memcpy(stations[num_stations].filePath, file_path, strlen(file_path));
stations[num_stations].readfd = open(file_path, O_RDONLY);
+ // update fdmax, if applicable
+ if (stations[num_stations].readfd > fdmax) {
+ fdmax = stations[num_stations].readfd;
+ }
if (stations[num_stations].readfd < 0) { perror("read (from add station)"); return; }
pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, num_stations);
send_newstation_reply(num_stations);
- printf("add: successfully created station @ index %d\n", num_stations);
-
num_stations++;
+ printf("add: successfully created station @ index %d\n", num_stations);
}
-void cleanup_fds() {
+void cleanup_readfds_and_sockets() {
// close all the file descriptors for the stations
for (int i = 0; i < num_stations; i++) {
close(stations[i].readfd);
@@ -1090,6 +1096,4 @@ void cleanup_fds() {
close(user_data[i].sockfd);
}
}
-
- close(listener);
}
diff --git a/snowcast_control b/snowcast_control
index dae574f..e76cedf 100755
--- a/snowcast_control
+++ b/snowcast_control
Binary files differ
diff --git a/snowcast_server b/snowcast_server
index cb0c2f1..d3f5e04 100755
--- a/snowcast_server
+++ b/snowcast_server
Binary files differ