aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-25 13:11:59 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-25 13:11:59 -0400
commit150ff61c978b7ae381d1a71953c92bf4dd5c3571 (patch)
treef24a4cf4ebbf312ca15bbd04d77386c8d2ea1329
parent0e34f9ce017d0a9afc2b2354854a0e76019a86ec (diff)
big refactoring code. some issues with udp stream now, though. will need to fix up
-rw-r--r--client.c10
-rw-r--r--protocol.h10
-rw-r--r--server.c410
-rwxr-xr-xsnowcast_controlbin35277 -> 18848 bytes
-rwxr-xr-xsnowcast_listenerbin34270 -> 13656 bytes
-rwxr-xr-xsnowcast_serverbin71644 -> 37800 bytes
6 files changed, 234 insertions, 196 deletions
diff --git a/client.c b/client.c
index 7b9e036..4c3573c 100644
--- a/client.c
+++ b/client.c
@@ -21,7 +21,7 @@
void *command_line_routine(void *args);
-int handle_handshake(int sockfd, char* udpPort);
+u_int16_t handle_handshake(int sockfd, char* udpPort);
int station_is_set = 0;
int l = 0;
@@ -272,7 +272,7 @@ void *command_line_routine(void* args) {
// convert input to an int
int inputInt = atoi(input);
if (input[0] != '0' && inputInt == 0) {
- printf("unknown command: %s\n", input);
+ printf("unknown command: %si", input);
printf("snowcast_control> ");
fflush(stdout);
continue;
@@ -300,8 +300,8 @@ void *command_line_routine(void* args) {
return (NULL);
}
-int handle_handshake(int sockfd, char* udp_port) {
- apply_timeout(sockfd);
+uint16_t handle_handshake(int sockfd, char* udp_port) {
+ apply_timeout(sockfd); // apply timeout for handshake
if (l) printf("found server, sending HELLO command. waiting for ANNOUNCE reply.\n");
@@ -333,7 +333,7 @@ int handle_handshake(int sockfd, char* udp_port) {
if (l) printf("received ANNOUNCE reply.\n");
+ // remove timeout since we no longer are "waiting" for an immediate reply
remove_timeout(sockfd);
-
return ntohs(num_stations);
} \ No newline at end of file
diff --git a/protocol.h b/protocol.h
index d4f7b2a..574537b 100644
--- a/protocol.h
+++ b/protocol.h
@@ -1,5 +1,15 @@
#include <stdint.h> // Provides uint8_t, int8_t, etc.
+#define HELLO 0
+#define SETSTATION 1
+#define WELCOME 2
+#define ANNOUNCE 3
+#define INVALID 4
+
+#define LISTSTATIONS 5
+#define STATIONINFO 6
+
+
// client to server messages (commands)
struct Hello {
diff --git a/server.c b/server.c
index d5ca55f..ec0e6a7 100644
--- a/server.c
+++ b/server.c
@@ -73,19 +73,17 @@ struct udp_packet_routine_args {
};
void *send_udp_packet_routine(void* arg);
-void *select_routine(void* arg);
-void *sync_routine(void* arg);
+void setup_listener();
+void *select_routine(void *arg);
void *send_announce_routine(void* arg);
-void init_station(int station_num, const char *station_name);
-
int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]);
void *print_info_routine(void *arg);
void *get_in_addr(struct sockaddr *sa);
-void *init_user(int sockfd);
-void *update_user_udpPort(int sockfd, int udpPort);
+void *init_user_routine(int sockfd, int udp_port);
+// void *update_user_udpPort(int sockfd, int udpPort);
void *update_user_station(int sockfd, int stationNum);
void *print_user_data(int sockfd);
void *print_station_data(int station);
@@ -95,7 +93,7 @@ void send_announce_reply(int fd, int station_num);
void send_invalid_reply(int fd, size_t message_size, char* message);
void *send_stationsinfo_reply(void *arg);
int send_welcome_reply(int fd);
-void handle_setstation_command(int fd);
+int handle_setstation_command(int fd);
void add_station(char *station_name);
void destroy_station(int station_num);
@@ -131,6 +129,9 @@ main(int argc, char *argv[])
sockfd_to_user = malloc(sizeof(int) * max_active_users);
if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; }
+ // setup the listener
+ setup_listener();
+
// make and start "select" thread that manages:
// 1) new connections, 2) requests from current connections, 3) closing connections
pthread_t select_thread;
@@ -162,7 +163,6 @@ main(int argc, char *argv[])
// if q, shutdown!
if (!strcmp(command, "q")) {
printf("Exiting.\n");
- // TODO: exit better than break
cleanup_fds();
break;
}
@@ -406,63 +406,81 @@ void udp_port_cleanup_handler(void *arg)
close(sockfd);
}
-/* Make the manager routine */
-void *send_udp_packet_routine(void *arg) {
- // printf("send udp packet routine\n");
- int *buf = arg;
- // unpack args
- int user_index = buf[0];
- int buffer_size = buf[1];
- char *file_buffer = malloc(buffer_size);
- memcpy(file_buffer, buf+2, buffer_size);
-
- // printf("udp packet routine, user:%d\n size: %d\n", user_index, buffer_size);
-
- // declare vairables to be used
- int did_work = 1;
- pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
- int s;
- int udp_sockfd;
+int setup_udp_connection(int udp_port, int *udp_sockfd, struct addrinfo* res) {
+ // setup hints to get udp_port
struct addrinfo thread_hints, *thread_res, *thread_servinfo;
- int error_code;
-
- // setup hints
memset(&thread_hints, 0, sizeof thread_hints);
thread_hints.ai_family = AF_INET; // use IPv4 only
- thread_hints.ai_socktype = SOCK_DGRAM;
+ thread_hints.ai_socktype = SOCK_DGRAM; // udp
thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me
- int int_port = user_data[user_index].udpPort;
- int length = snprintf( NULL, 0, "%d", int_port );
+ // convert port uint16 into a string for getaddrinfo
+ int length = snprintf( NULL, 0, "%d", udp_port );
char* port = malloc( length + 1 );
if (!port) { perror("malloc on port"); return (NULL); }
- snprintf(port, length + 1, "%d", int_port);
- sprintf(port, "%d", int_port);
+ snprintf(port, length + 1, "%d", udp_port);
+ sprintf(port, "%d", udp_port);
+ // resolve host
+ int error_code;
if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0)
{
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code));
- return (NULL);
+ return 0;
}
free(port);
- // loop through all the results and make a socket
+ // make the socket
+ int sock;
for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) {
- if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype,
+ if ((sock = socket(thread_res->ai_family, thread_res->ai_socktype,
thread_res->ai_protocol)) == -1) {
- perror("talker: socket");
+ perror("socket in setup_udp_connection");
continue;
}
break;
}
- if (udp_sockfd == NULL) {
- fprintf(stderr, "talker: failed to create socket\n");
- return (NULL);
+ if (sock == NULL) {
+ fprintf(stderr, "socket: failed to create udp socket(setup_udp_connection)\n");
+ return 0;
}
+
+ *udp_sockfd = sock;
+ // only these two properties of addrinfo are needed to send udp
+ *res->ai_addr = *thread_res->ai_addr;
+ res->ai_addrlen = thread_res->ai_addrlen;
+ return 1;
+}
+
+/* Make the manager routine */
+void *send_udp_packet_routine(void *arg) {
+ // printf("send udp packet routine\n");
+ int *buf = arg;
+ // unpack args
+ int user_index = buf[0];
+ int buffer_size = buf[1];
+ char *file_buffer = malloc(buffer_size);
+ memcpy(file_buffer, buf+2, buffer_size);
+
+ // setup variables
+ int did_work = 1;
+ pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
+
+ // setup udp socket for this thread/user
+ int udp_port = user_data[user_index].udpPort;
+ int udp_sockfd;
+ struct addrinfo *res;
+ if (setup_udp_connection(udp_port, &udp_sockfd, &res) == -1){
+ fprintf(stderr, "failure in setup_udp_connection");
+ return (NULL); // error occured
+ }
+
+ // setup cleanup for thread that closes the fd if closed
pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd);
- // wait for
+
pthread_mutex_lock(&m);
+ // wait for threads
did_work = 0;
while (!start_threads)
{
@@ -473,17 +491,14 @@ void *send_udp_packet_routine(void *arg) {
if (station_num == -1) {
did_work = 1;
}
- // potential error here!
- // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
- if (send_all_udp(udp_sockfd, file_buffer, &buffer_size, thread_res) == -1)
- {
- perror("send_all_udp");
- printf("We only sent %d bytes because of the error!\n", buffer_size);
- }
+ // send the data! (no error check since udp)
+ send_all_udp(udp_sockfd, file_buffer, &buffer_size, &res);
+ // cleanup
+ close(udp_sockfd);
free(file_buffer);
-
+
pthread_mutex_unlock(&m);
pthread_cleanup_pop(1);
@@ -510,15 +525,10 @@ void *send_announce_routine(void *arg) {
fd_set master; // master file descriptor list
-
-void *select_routine(void *arg) {
- fd_set read_fds; // temp file descriptor list for select()
-
- int listener; // listening socket descriptor
+int listener; // listening socket descriptor
+int fdmax; // maximum file descriptor number
+void setup_listener() {
int newfd; // newly accept()ed socket descriptor
- int fdmax; // maximum file descriptor number
- struct sockaddr_storage remoteaddr; // client address
- socklen_t addrlen;
char buf[256]; // buffer for client data
int nbytes;
@@ -533,8 +543,7 @@ void *select_routine(void *arg) {
// const char* port = argv[1];
- FD_ZERO(&master); // clear the master and temp sets
- FD_ZERO(&read_fds);
+ FD_ZERO(&master); // clear the master set
// LISTENER: get us a socket and bind it
memset(&hints, 0, sizeof hints);
@@ -582,144 +591,170 @@ void *select_routine(void *arg) {
// keep track of the biggest file descriptor
fdmax = listener; // so far, it's this one
+}
+
+int handle_client_command(int clientfd) {
+ uint8_t command_type = -1;
+ if (recv(clientfd, &command_type, 1, 0) <= 0) // check user has disconnected
+ {
+ if (l) printf("socket %d HUNGUP. lost connection.\n", clientfd);
+
+ destroy_user(clientfd);
+ return 0;
+ }
+
+ // check for each command type...
+ if (command_type == HELLO) {
+ // if user already sent a HELLO, send back in invalid command
+ // since only the only the handshake handles HELLO
+ if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", clientfd);
+
+ char * message = "must not sent more than one Hello message";
+ send_invalid_reply(clientfd, strlen(message), message);
+ destroy_user(clientfd);
+ return 0;
+ }
+
+ if (command_type == SETSTATION) {
+ if(!handle_setstation_command(clientfd)) { // failure
+ // remove user
+ destroy_user(clientfd);
+ return 0;
+ }
+ return 1; // sucess, return 1
+ }
+
+ if (command_type == LISTSTATIONS) {
+ if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd);
+
+ pthread_t send_thread; // send STATIONINFO in a different thread
+ pthread_create(&send_thread, NULL, send_stationsinfo_reply, clientfd);
+ return 1;
+ }
+
+ // if we're here, we got an invalid/unknown command
+ if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, clientfd);
+ char *message = "invalid command type";
+ send_invalid_reply(clientfd, strlen(message), message);
+ destroy_user(clientfd);
+ return 0;
+}
+
+int handle_new_connection() {
+ // handle new connections
+ struct sockaddr_storage remoteaddr;
+ socklen_t addrlen = sizeof remoteaddr;
+ int newfd = accept(listener,
+ (struct sockaddr *)&remoteaddr,
+ &addrlen);
+ // check if newfd is valid
+ if (newfd == -1) {
+ perror("accept");
+ return 0;
+ }
+
+ // new user, we need to do the handshake
+ uint16_t udp_port = handle_handshake(newfd);
+ if (udp_port == 0) { // 0 is used for err since unsigned can't be negative
+ // drop connection upon error
+ close(newfd);
+ return 0;
+ }
+
+ // add the user to the set
+ FD_SET(newfd, &master); // add to master set
+ if (newfd > fdmax) { // keep track of the max
+ fdmax = newfd;
+ }
+
+ // TODO: thread this
+ init_user_routine(newfd, udp_port);
+ return 1;
+}
+
+void *select_routine(void *arg) {
+ fd_set read_fds; // temp file descriptor list for select()
+ FD_ZERO(&read_fds); // clear the temp set
while(1) {
- read_fds = master; // copy it
+ read_fds = master; // copy set
+
if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) {
perror("select");
exit(4);
}
// run through the existing connections looking for data to read
- for(i = 0; i <= fdmax; i++) {
- if (FD_ISSET(i, &read_fds)) { // we got one!!
- if (i == listener) {
- // handle new connections
- addrlen = sizeof remoteaddr;
- newfd = accept(listener,
- (struct sockaddr *)&remoteaddr,
- &addrlen);
-
- if (newfd == -1) {
- perror("accept");
- } else {
- // new user, we need to do the handshake
- uint16_t udp_port = handle_handshake(newfd);
- if (udp_port == 0) {
- // drop connection upon error
- close(newfd);
- continue;
- }
-
- // add the user to the set
- FD_SET(newfd, &master); // add to master set
- if (newfd > fdmax) { // keep track of the max
- fdmax = newfd;
- }
-
- // TODO: thread or combine into one function
- // init the user
- init_user(newfd);
- // update the udp port
- update_user_udpPort(newfd, udp_port);
-
- send_welcome_reply(newfd);
- }
- } else {
- // handle data from a client
- uint8_t command_type = -1;
- if ((nbytes = recv(i, &command_type, 1, 0)) <= 0)
- {
- // got error or connection closed by client
- if (nbytes == 0) {
- // connection closed
- if (l) printf("socket %d HUNGUP. lost connection.\n", newfd);
- } else {
- perror("recv");
- }
- destroy_user(i);
- }
- else // we got some data from a client
- {
- if (command_type == 0) { // we got a HELLO commmand
- // send back in invalid command if user already sent a hello
- if (user_data[sockfd_to_user[i]].udpPort != -1) {
- if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", i);
- char * message = "must not sent more than one Hello message";
- send_invalid_reply(i, strlen(message), message);
- destroy_user(i);
- }
- }
-
- else if (command_type == 1) { // we got a SETSTATION command
- handle_setstation_command(i);
- }
- else if (command_type == 5) { // we got a ListStations command
- if (l) printf("received a LISTSTATIONS from socket %d\n", i);
- pthread_t t;
- pthread_create(&t, NULL, send_stationsinfo_reply, i);
- }
- else {
- if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, i);
- char *message = "invalid command type";
- send_invalid_reply(i, strlen(message), message);
- destroy_user(i);
- }
- }
- }
+ for(int i = 0; i <= fdmax; i++) {
+ if (!FD_ISSET(i, &read_fds)) { // no new connections
+ continue;
+ }
+
+ // if listener, then we need to add this new connection
+ if (i == listener) {
+ handle_new_connection();
+ continue;
}
+
+ // if not the listener, then we need to handle new data from the client
+ handle_client_command(i);
}
}
+
+ return (NULL);
}
-void *init_user(int sockfd) {
- // add the user to the list of user data
+void *init_user_routine(int newfd, int udp_port) {
pthread_mutex_lock(&mutex_user_data);
- if(sockfd > max_sockfd) {
- max_sockfd = sockfd;
- int * more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1));
- if (!more_sockfd_to_user) { perror("realloc"); exit(1); }
+ // FOLLOWING IS FOR MEMORY OPTIMIZATION
+
+ // if the newfd is larger than the max, we need to resize the array
+ if(newfd > max_sockfd) {
+ max_sockfd = newfd*2; // double the array
+ int *more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1));
+ if (!more_sockfd_to_user) { perror("realloc in init_user_routine1"); exit(1); }
sockfd_to_user = more_sockfd_to_user;
}
+ // let's check if we can resuse the space from a previous user
int running_index = 0;
while(running_index < max_active_users) {
+ // -1 on sockfd indicates that we have directly assigned it when "destroying a user"
+ // -> so a previous user was there, but discconnected
if (user_data[running_index].sockfd == -1) {
- break;
+ break; // we found an index
}
running_index++;
}
+
if (running_index == max_active_users) {
- // printf("reached max active users\n");
- // printf("making new memory\n");
+ // if we're here, we went through the whole array to no avail,
+ // so we need to extend users array
max_active_users++;
user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users);
- if (!more_users) { perror("realloc"); exit(1); }
+ if (!more_users) { perror("realloc in init_user_routine2"); exit(1); }
user_data = more_users;
}
// map TCP sockfd to this user index
- user_data[running_index] = (user_t){-1, -1, sockfd};
- sockfd_to_user[sockfd] = running_index;
- // free(user_stream_threads);
+ user_data[running_index] = (user_t){udp_port, -1, newfd};
+ sockfd_to_user[newfd] = running_index;
+
pthread_mutex_unlock(&mutex_user_data);
+ // successfully created user, let's send the welcome
+ send_welcome_reply(newfd);
+
return (NULL);
}
-void *update_user_udpPort(int sockfd, int udpPort) {
- pthread_mutex_lock(&mutex_user_data);
- // get the user
- user_t *user = &user_data[sockfd_to_user[sockfd]];
- // set the udpPort
- user->udpPort = udpPort;
- // start the stream thread, now that we have the udpPort
- // pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]);
- pthread_mutex_unlock(&mutex_user_data);
+// void *update_user_udpPort(int sockfd, int udpPort) {
+// pthread_mutex_lock(&mutex_user_data);
+// pthread_mutex_unlock(&mutex_user_data);
- return (NULL);
-}
+// return (NULL);
+// }
void *update_user_station(int sockfd, int stationNum) {
pthread_mutex_lock(&mutex_user_data);
user_data[sockfd_to_user[sockfd]].stationNum = stationNum;
@@ -859,23 +894,14 @@ uint16_t handle_handshake(int newfd)
{
if (l) printf("waiting for HELLO command. new connection on socket %d\n", newfd);
- // apply timeout to socket
+ // apply timeout to socket since handshake must be completed quickly
apply_timeout(newfd);
uint8_t command_type = -1;
- size_t nbytes;
- if (nbytes = recv(newfd, &command_type, 1, 0) <= 0)
+ if (recv(newfd, &command_type, 1, 0) <= 0)
{
// got error or connection closed by client
- if (nbytes == 0) {
- // connection closed
- if (l) printf("socket %d HUNGUP. lost connection.\n", newfd);
- }
- else
- {
- perror("recv in handle handshake");
- }
- // remove user from data structures
+ if (l) printf("socket %d HUNGUP in handshake. lost connection.\n", newfd);
close(newfd);
return 0;
}
@@ -888,14 +914,15 @@ uint16_t handle_handshake(int newfd)
}
// get the udp port
- uint16_t udp_port = -1;
+ uint16_t udp_port;
int bytes_to_read = sizeof(uint16_t);
if (recv_all(newfd, &udp_port, &bytes_to_read) == -1) {
perror("recv_all");
close(newfd);
return 0;
}
- // remove timeout, after reading data
+
+ // remove timeout, as we aren't "waiting" for immediate reply
remove_timeout(newfd);
if(l) printf("received HELLO from socket %d with udp port %u\n", newfd, ntohs(udp_port));
@@ -919,13 +946,13 @@ int send_welcome_reply(int fd) {
return 1;
}
-void handle_setstation_command(int sockfd) {
+int handle_setstation_command(int sockfd) {
// get the station number
- uint16_t station_number = -1;
+ uint16_t station_number;
int bytes_to_read = sizeof(uint16_t);
if (recv_all(sockfd, &station_number, &bytes_to_read) == -1) {
perror("recv_all");
- destroy_user(sockfd);
+ return 0;
}
station_number = ntohs(station_number);
@@ -935,8 +962,7 @@ void handle_setstation_command(int sockfd) {
// send back in invalid command and drop user
char * message = "must send Hello message first";
send_invalid_reply(sockfd, strlen(message), message);
- destroy_user(sockfd);
- return;
+ return 0;
}
if (l) printf("received a SETSTATION from socket %d with station_number %u\n", sockfd, station_number);
@@ -948,18 +974,18 @@ void handle_setstation_command(int sockfd) {
// send back in invalid command and drop user
char * message = "station number out of range";
send_invalid_reply(sockfd, strlen(message), message);
- destroy_user(sockfd);
- return;
+ return 0;
}
// check if station num has been removed
if (stations[station_number].readfd == -1) {
send_stationshutdown_reply(sockfd);
- return;
+ return 1;
}
update_user_station(sockfd, station_number);
send_announce_reply(sockfd, station_number);
+ return 1;
}
void send_stationshutdown_reply(int fd) {
@@ -1062,13 +1088,15 @@ void add_station(char *file_path) {
void cleanup_fds() {
- // // close all the file descriptors
- // for (int i = 0; i < num_stations; i++) {
- // close(stations[i].readfd);
- // }
- // for (int i = 0; i < max_active_users; i++) {
- // if (user_data[i].sockfd != -1) {
- // close(user_data[i].sockfd);
- // }
- // }
+ // close all the file descriptors for the stations
+ for (int i = 0; i < num_stations; i++) {
+ close(stations[i].readfd);
+ }
+
+ // close all the tcp connections
+ for (int i = 0; i < max_active_users; i++) {
+ if (user_data[i].sockfd != -1) {
+ close(user_data[i].sockfd);
+ }
+ }
}
diff --git a/snowcast_control b/snowcast_control
index 8a8a2ce..01f4d14 100755
--- a/snowcast_control
+++ b/snowcast_control
Binary files differ
diff --git a/snowcast_listener b/snowcast_listener
index d15c2ac..f27a5c1 100755
--- a/snowcast_listener
+++ b/snowcast_listener
Binary files differ
diff --git a/snowcast_server b/snowcast_server
index ab6cf33..5c97922 100755
--- a/snowcast_server
+++ b/snowcast_server
Binary files differ