aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-25 22:21:42 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-25 22:21:42 -0400
commit6a2c567b85be275bb431c09952a88ea4cdf210aa (patch)
treeff649ae093b3b8eb9931e3a2e4cf7bb3e5ce0bb7
parent8c6ae1ecde9faa0af5dacaf7ecf0f9cf47b69159 (diff)
massive restrcuting of code for readability
-rw-r--r--client.c100
-rw-r--r--listener.c58
-rw-r--r--protocol.c31
-rw-r--r--protocol.h3
-rw-r--r--server.c1215
-rwxr-xr-xsnowcast_controlbin22968 -> 35133 bytes
-rwxr-xr-xsnowcast_listenerbin13656 -> 34542 bytes
-rwxr-xr-xsnowcast_serverbin37816 -> 71756 bytes
8 files changed, 780 insertions, 627 deletions
diff --git a/client.c b/client.c
index df9f731..17a8a81 100644
--- a/client.c
+++ b/client.c
@@ -1,3 +1,12 @@
+/*
+ author: sotech117
+ date: 9/25/2023
+ course: csci1680
+ description: client for snowcast, a music streaming service
+
+ enjoy :)
+*/
+
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
@@ -12,53 +21,42 @@
#include <arpa/inet.h>
#include "protocol.c"
-#define MAX_READ_SIZE 1024
-#define LINE_MAX 1024
-
-void *command_line_routine(void *args);
+#define COMMAND_LINE_MAX 1024
+// handles the handshake and returns the number of stations
u_int16_t handle_handshake(int sockfd, char* udpPort);
-int station_is_set = 0;
-int l = 0;
-int waiting = 0;
-int exiting = 0;
-
-// get sockaddr, IPv4 or IPv6:
-void *get_in_addr(struct sockaddr *sa)
-{
- if (sa->sa_family == AF_INET) {
- return &(((struct sockaddr_in*)sa)->sin_addr);
- }
+// routine thread for the command line
+void *command_line_routine(void *args);
- return &(((struct sockaddr_in6*)sa)->sin6_addr);
-}
+int l = 0; // logging
+int station_is_set = 0; // if we have a station set
+int waiting = 0; // for command line "snowcast_control>" prompt to be a new lone
+int exiting = 0; // to not have error messages when the program exits "gracefully"
-int main(int argc, char *argv[])
+main(int argc, char *argv[]) // no int here for good luck :)
{
- int sockfd, numbytesrecv, numbytessent, recvbytes;
- // char buf[MAXDATASIZE];
- struct addrinfo hints, *servinfo, *p;
- char s[INET6_ADDRSTRLEN];
-
- // check arugments
+ // CHECK AND USE ARGUMENTS
+ // -------------------------------------------------------------------------------------------------
if (argc != 4) {
fprintf(stderr,"<server IP> <server port> <listener port>\n");
exit(1);
}
+ const char* tcpPort = argv[2]; // port we use to connect to server's tcp stream
+ const char* udpPort = argv[3]; // port we use to connect to server's udp info and command
+ // SETUP TCP CONNECTION (getaaadrinfo->socket->connect)
+ // -------------------------------------------------------------------------------------------------
+ struct addrinfo hints, *servinfo, *p;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET; // only IPv4
hints.ai_socktype = SOCK_STREAM;
- char* tcpPort = argv[2]; // port we use to connect to server's tcp stream
- char* udpPort = argv[3]; // port we use to connect to server's udp info and command
-
- // resolve host
- int rv;
- if ((rv = getaddrinfo(argv[1], tcpPort, &hints, &servinfo)) != 0)
+ // resolve host & make socket
+ int sockfd, err;
+ if ((err = getaddrinfo(argv[1], tcpPort, &hints, &servinfo)) != 0)
{
- fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
+ fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(err));
return 1;
}
// loop through all the results and connect to the first we can
@@ -80,9 +78,12 @@ int main(int argc, char *argv[])
fprintf(stderr, "client: failed to connect\n");
return 2;
}
- inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof s);
+ // char s[INET_ADDRSTRLEN]; // for printing the ip address
+ // inet_ntop(p->ai_family, get_in_addr((struct sockaddr *)p->ai_addr), s, sizeof s);
freeaddrinfo(servinfo); // all done with this structure
+ // DO HANDSHAKE & PRINT STATION NUMBER
+ // -------------------------------------------------------------------------------------------------
// now that we're connectioned, let's do the handshake
uint16_t num_stations = handle_handshake(sockfd, udpPort);
@@ -92,17 +93,19 @@ int main(int argc, char *argv[])
printf("Welcome to Snowcast! The server has %u stations.\n", num_stations);
fflush(stdout);
- // start the thread to accept command lines
+ // START COMMAND LINE THREAD
+ // -------------------------------------------------------------------------------------------------
pthread_t command_line_thread;
pthread_create(&command_line_thread, NULL, command_line_routine, (void*)sockfd);
- // this while loop hangs on recv and runs when there is new data
+ // START WHILE LOOP THAT HANDLES ALL REPLIES
+ // -------------------------------------------------------------------------------------------------
while (69) {
// get the type of the incoming reply
uint8_t reply_type = -1;
// print size of utin8
if (recv(sockfd, &reply_type, 1, 0) == -1) {
- if (exiting) {
+ if (exiting) { // just to remove that pesky error message
break;
}
perror("recv in first byte");
@@ -211,6 +214,9 @@ int main(int argc, char *argv[])
if (reply_type == STATIONSHUTDOWN) { // we are getting StationShutdown
if (l) printf("received STATIONSHUTDOWN reply.\n");
+ // remove timeout, in case we were waiting for an announce
+ remove_timeout(sockfd);
+
if (!waiting) printf("\n"); // note: this is worth the lines for a clean cmd prompt :)
waiting = 0;
@@ -251,27 +257,28 @@ int main(int argc, char *argv[])
return 0;
}
+/* thread for managing the command line */
void *command_line_routine(void* args) {
// unpack sockfd as arg
int sockfd = (int) args;
// buffer for input
- char input[LINE_MAX];
+ char input[COMMAND_LINE_MAX];
printf("Enter a number to change to it's station. Enter q to end stream.\n");
printf("snowcast_control> ");
fflush(stdout);
while (420) {
- memset(input, 0, LINE_MAX);
- char *line = fgets(input, LINE_MAX, stdin);
+ memset(input, 0, COMMAND_LINE_MAX);
+ char *line = fgets(input, COMMAND_LINE_MAX, stdin);
// nothing was typed
if (line == NULL) {
continue;
}
- // q command: exit the program
- if (strncmp("q\n", input, LINE_MAX) == 0) {
+ // "q" command: exit the program
+ if (strncmp("q\n", input, COMMAND_LINE_MAX) == 0) {
// end code if type in q
exiting = 1;
printf("Exiting.\n");
@@ -279,8 +286,8 @@ void *command_line_routine(void* args) {
exit(0);
}
- // l command: STATIONINFO command (EXTRA CREDIT)
- if (strncmp("l\n", input, LINE_MAX) == 0) {
+ // "l" command: STATIONINFO command (EXTRA CREDIT)
+ if (strncmp("l\n", input, COMMAND_LINE_MAX) == 0) {
// send the command to list stations
if (l) printf("sending LISTSTATIONS command. waiting for STATIONINFO reply.\n");
apply_timeout(sockfd); // apply a timeout, will be released when we get the reply
@@ -292,8 +299,8 @@ void *command_line_routine(void* args) {
continue;
}
- // log command: toggle logging
- if (strncmp("log\n", input, LINE_MAX) == 0)
+ // "log" command: toggle logging
+ if (strncmp("log\n", input, COMMAND_LINE_MAX) == 0)
{
l = !l;
printf("LOGGING is now %s!\n", l ? "on" : "off");
@@ -335,6 +342,11 @@ void *command_line_routine(void* args) {
return (NULL);
}
+/*
+ handles the handshake, given the new socket fd and the udp port of this client
+ returns the number of stations
+ note: will close the program on failure!
+*/
uint16_t handle_handshake(int sockfd, char* udp_port) {
if (l) printf("found server, sending HELLO command.\n");
diff --git a/listener.c b/listener.c
index 04e3030..3b5084b 100644
--- a/listener.c
+++ b/listener.c
@@ -1,5 +1,11 @@
+
/*
-** listener.c -- a datagram sockets "server" demo
+ author: sotech117
+ date: 9/25/2023
+ course: csci1680
+ description: listener for snowcast, a music streaming service
+
+ enjoy :)
*/
#include <stdio.h>
@@ -13,45 +19,30 @@
#include <arpa/inet.h>
#include <netdb.h>
-// #define MYPORT "4950" // the port users will be connecting to
-
-#define MAXBUFLEN 16384
-
-// get sockaddr, IPv4 or IPv6:
-void *get_in_addr(struct sockaddr *sa)
-{
- if (sa->sa_family == AF_INET) {
- return &(((struct sockaddr_in*)sa)->sin_addr);
- }
-
- return &(((struct sockaddr_in6*)sa)->sin6_addr);
-}
+#include "protocol.c"
int main(int argc, char *argv[])
{
- int sockfd;
- struct addrinfo hints, *servinfo, *p;
- int rv;
- int numbytes;
- struct sockaddr_storage their_addr;
- socklen_t addr_len;
- char s[INET6_ADDRSTRLEN];
-
+ // CHECK AND USE ARGUMENTS
+ // -------------------------------------------------------------------------------------------------
if (argc != 2) {
fprintf(stderr,"<udp port>\n");
exit(1);
}
-
+ // get the udp port
const char* udp_port = argv[1];
-
+ // GET UDP SOCKET
+ // -------------------------------------------------------------------------------------------------
+ int sockfd, err;
+ struct addrinfo hints, *servinfo, *p;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET; // set to AF_INET to use IPv4
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE; // use my IP
- if ((rv = getaddrinfo(NULL, udp_port, &hints, &servinfo)) != 0) {
- fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
+ if ((err = getaddrinfo(NULL, udp_port, &hints, &servinfo)) != 0) {
+ fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(err));
return 1;
}
@@ -71,20 +62,21 @@ int main(int argc, char *argv[])
break;
}
-
if (p == NULL) {
fprintf(stderr, "listener: failed to bind socket\n");
return 2;
}
-
freeaddrinfo(servinfo);
- int count = 0;
-
- char buf[MAXBUFLEN];
+ // START READ/WRITE LOOP
+ // -------------------------------------------------------------------------------------------------
+ char buf[MAX_PACKET_SIZE]; // buffer for reading
+ struct sockaddr_storage their_addr; // connector's address information
+ socklen_t addr_len;
while(1) {
addr_len = sizeof their_addr;
- if ((numbytes = recvfrom(sockfd, buf, MAXBUFLEN , 0,
+ int numbytes;
+ if ((numbytes = recvfrom(sockfd, buf, MAX_PACKET_SIZE , 0,
(struct sockaddr *)&their_addr, &addr_len)) == -1) {
perror("recvfrom");
exit(1);
@@ -92,7 +84,7 @@ int main(int argc, char *argv[])
// print the buffer
write(STDOUT_FILENO, buf, numbytes);
- memset(buf, 0, MAXBUFLEN);
+ memset(buf, 0, MAX_PACKET_SIZE);
}
close(sockfd);
diff --git a/protocol.c b/protocol.c
index 9c8c65b..b70c70a 100644
--- a/protocol.c
+++ b/protocol.c
@@ -3,9 +3,11 @@
#include "protocol.h"
-#define TCP_TIMEOUT 100000 // 100ms in microseconds
-#define MAX_PACKET_SIZE 512
-
+/*
+ ensures all bytes from the buffer are sent
+ note: applies a timeout during the send of bytes
+ note: modyfies the len variable to reflect the number of bytes send
+*/
int send_all(int sock, char *buf, int *len)
{
struct timeval timeout;
@@ -33,6 +35,11 @@ int send_all(int sock, char *buf, int *len)
return n==-1?-1:0; // return -1 on failure, 0 on success
}
+/*
+ ensures all bytes that can be sent are loaded into the buffer
+ note: applies a timeout during the collection of bytes
+ note: modyfies the len variable to reflect the number of bytes read
+*/
int recv_all(int sock, char *buf, int *len)
{
// setup the timeout on the socket
@@ -68,6 +75,10 @@ int recv_all(int sock, char *buf, int *len)
return n==-1?-1:0; // return -1 on failure, 0 on success
}
+/*
+ applies a timeout to the socket itself
+ note: should only be used with tcp connections, after connect()
+*/
int apply_timeout(int fd) {
// handle handshake
struct timeval tv;
@@ -81,6 +92,10 @@ int apply_timeout(int fd) {
return 1;
}
+/*
+ removes the timeout on a socket
+ note: should only be used with tcp connections, after connect()
+*/
int remove_timeout(int fd)
{
// handle handshake
@@ -93,4 +108,14 @@ int remove_timeout(int fd)
}
return 1;
+}
+
+/*
+ basic helper, not "really" used as we are only Ipv4
+*/
+void *get_in_addr(struct sockaddr *sa){
+ if (sa->sa_family == AF_INET) {
+ return &(((struct sockaddr_in*)sa)->sin_addr);
+ }
+ return &(((struct sockaddr_in6*)sa)->sin6_addr);
} \ No newline at end of file
diff --git a/protocol.h b/protocol.h
index fc5b536..7970e18 100644
--- a/protocol.h
+++ b/protocol.h
@@ -11,6 +11,8 @@
#define STATIONSHUTDOWN 7
#define NEWSTATION 8
+#define TCP_TIMEOUT 100000 // 100ms in microseconds
+#define MAX_PACKET_SIZE 512
// client to server messages (commands)
@@ -63,3 +65,4 @@ struct NewStation {
int send_all(int sock, char *buf, int *len);
int recv_all(int sock, char *buf, int *len);
+void *get_in_addr(struct sockaddr *sa);
diff --git a/server.c b/server.c
index a02fc93..b0ad3ed 100644
--- a/server.c
+++ b/server.c
@@ -1,3 +1,16 @@
+
+/*
+ author: sotech117
+ date: 9/25/2023
+ course: csci1680
+ description: server for snowcast, a music streaming service
+
+ disclaimer: I have outlined the structure of the server by categorizing the code into sections.
+ Each category is implemented in the order that it appears in the declaration below.
+
+ enjoy :)
+*/
+
#include <stdlib.h>
#include <pthread.h>
#include <stdio.h>
@@ -12,96 +25,112 @@
#include <sys/types.h>
#include "protocol.c"
-#define LINE_MAX 1024
-#define MAX_USERS 1000
-#define MAX_PATH 50
-#define BROADCAST_OFFSET 10000
-#define MAX_RATE_PER_SECOND 16 * 1024
-#define BROADCASTS_PER_SECOND 2
+// ----------------------------------------------------------------------------------------------------------
+// 0) MACROS
+// ----------------------------------------------------------------------------------------------------------
+#define COMMAND_LINE_MAX 1024
+#define BROADCAST_OFFSET 10000 // how long to "allow" the threads to broadcast
+#define MAX_RATE_PER_SECOND 16 * 1024 // stream rate per second
+#define BROADCASTS_PER_SECOND 2 // how often to "seek" the file
#define FILE_READ_SIZE MAX_RATE_PER_SECOND / BROADCASTS_PER_SECOND
+// ----------------------------------------------------------------------------------------------------------
+// 1) STATION DATA AND FUNCTIONS
+// ----------------------------------------------------------------------------------------------------------
typedef struct station {
- pthread_t streamThread;
- int readfd;
- char *filePath;
+ pthread_t streamThread;
+ int readfd;
+ char *filePath;
} station_t;
-int num_stations;
-station_t *stations;
-int setup_stations(int argc, char *argv[]);
-void *stream_routine(void *arg);
-
-typedef struct user {
+int num_stations;
+pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER;
+station_t *stations;
+int setup_stations(int argc, char *argv[]);
+void *stream_routine(void *arg);
+void *stream_routine_cleanup(void *arg);
+int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num);
+
+// ----------------------------------------------------------------------------------------------------------
+// 2) USER DATA AND FUNCTIONS
+// ----------------------------------------------------------------------------------------------------------
+typedef struct user
+{
int udpPort;
int stationNum;
- int sockfd;
+ int tcpfd;
} user_t;
-
-
-/* For safe condition variable usage, must use a boolean predicate and */
-/* a mutex with the condition. */
-int count = 0;
-pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
-
-pthread_mutex_t mutex_stations = PTHREAD_MUTEX_INITIALIZER;
-
-// int num_stations;
-
-int start_threads = 0;
-int max_active_users = 0;
-int max_sockfd = 0;
-
-pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER;
-// array from index to user_data
-user_t *user_data;
-int *sockfd_to_user;
-
-// stations array pointer
-// station_t *station_data;
-
-struct udp_packet_routine_args {
- int user_index;
- int buffer_size;
- char *file_buffer;
-};
-
-void *send_udp_packet_routine(void* arg);
-int setup_listener(const char* port);
-void *select_routine(void *arg);
-void *send_announce_routine(void* arg);
-
-int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]);
-void *print_info_routine(void *arg);
-
-void *get_in_addr(struct sockaddr *sa);
-
-void *init_user_routine(int sockfd, int udp_port);
-// void *update_user_udpPort(int sockfd, int udpPort);
-void *update_user_station(int sockfd, int stationNum);
-void *print_user_data(int sockfd);
-void *print_station_data(int station);
-void destroy_user(int sockfd);
-
-void send_announce_reply(int fd, int station_num);
-void send_invalid_reply(int fd, size_t message_size, char* message);
-void *send_stationsinfo_reply(void *arg);
-int send_welcome_reply(int fd);
-int handle_setstation_command(int fd);
-
-void add_station(char *station_name);
-void destroy_station(int station_num);
-void send_stationshutdown_reply(int fd);
-
-uint16_t handle_handshake(int newfd);
-
-void cleanup_readfds_and_sockets();
-
-int l = 0; // boolean for logging -> if (l) { printf("..."); }
-
-fd_set master; // master file descriptor (sockets) list
-int fdmax; // maximum file descriptor number
-main(int argc, char *argv[])
+int max_active_users = 0; // maximum number of users that have been connected at once
+int max_tcpfd = 0; // maximum tcpfd for users that has been seen
+pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER;
+user_t *users;
+int *tcpfd_to_user; // maps tcpfd to user index
+void init_user(int tcpfd, int udp_port);
+void destroy_user(int tcpfd);
+
+// ----------------------------------------------------------------------------------------------------------
+// 3) UDP STREAM DATA AND FUNCTIONS
+// ----------------------------------------------------------------------------------------------------------
+typedef struct send_udp_packet_routine_args {
+ int user_index;
+ int buffer_size;
+ char *file_buffer;
+}; // not used, but nice to have as a guideline
+int start_threads = 0; // for synchronization, with the cond
+pthread_cond_t cond = PTHREAD_COND_INITIALIZER; // for synchronization, with the following routine
+void *send_udp_packet_routine(void* arg);
+void *send_udp_packet_routine_cleanup(void *arg);
+int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, struct sockaddr *addr);
+int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen);
+
+// ----------------------------------------------------------------------------------------------------------
+// 4) TCP LISTENER DATA AND FUNCTIONS
+// -------------------------------------------------------------------------å---------------------------------
+fd_set master; // master file list for socket descriptors
+int fdmax; // maximum file descriptor number
+int setup_listener(const char* port);
+int handle_new_connection(int listener);
+void *select_routine(void *arg);
+
+// ----------------------------------------------------------------------------------------------------------
+// 5) PROTOCOL REPLY AND COMMAND FUNCTIONS
+// ----------------------------------------------------------------------------------------------------------
+int handle_client_command(int clientfd);
+uint16_t handle_handshake(int newfd);
+uint8_t send_welcome_reply(int fd);
+uint8_t handle_setstation_command(int fd);
+void update_user_station(int tcpfd, int stationNum); // setstation helper
+void *send_announce_routine(void* arg);
+void send_announce_reply(int fd, int station_num); // called in send_announce_routine
+void send_invalid_reply(int fd, size_t message_size, char* message);
+
+// ----------------------------------------------------------------------------------------------------------
+// 6) COMMAND LINE INTERFACE FUNCTIONS
+// ----------------------------------------------------------------------------------------------------------
+uint8_t l = 0; // boolean for logging -> if (l) { printf("..."); }
+int parse(char buffer[COMMAND_LINE_MAX], char *tokens[COMMAND_LINE_MAX / 2]);
+void print_info_routine(int fd);
+void write_int_to_fd(int fd, int n); // helper to print_info_routine
+void *print_user_data(int index);
+void *print_station_data(int station);
+void cleanup_readfds_and_sockets();
+
+
+// ---------------------------------------------------------------------------------------------------------
+// 7) EXTRA CREDIT
+// ---------------------------------------------------------------------------------------------------------
+// ability for client to receive station list
+void *send_stationsinfo_routine(void *arg); // sends station info (index, name) to client
+// ability for server to add/remove stations through command line
+void add_station(char *station_name);
+void send_newstation_reply(uint16_t station_num); // notifies users that a new station has been added
+void destroy_station(int station_num);
+void send_stationshutdown_reply(int fd); // sends to users if listening to a station that is destroyed
+
+// ----------------------------------------------------------------------------------------------------------
+// 0) MAIN FUNCTION
+// ----------------------------------------------------------------------------------------------------------
+main(int argc, char *argv[]) // no int here for good luck :)
{
// CHECK AND USE ARGUMENTS
// -------------------------------------------------------------------------------------------------
@@ -110,7 +139,7 @@ main(int argc, char *argv[])
exit(1);
}
- // assign port
+ // get listener's port
const char* server_port = argv[1];
// initialize the stations & their threads, from
@@ -119,16 +148,16 @@ main(int argc, char *argv[])
exit(1);
}
- // INTIALIZE DATA STRUCTURES
+ // INTIALIZE USER DATA STRUCTURES
// -------------------------------------------------------------------------------------------------
- // make pointers to array of user data
- user_data = malloc(sizeof(user_t) * max_active_users);
- if (!user_data) { perror("malloc userdata"); return 1; }
- sockfd_to_user = malloc(sizeof(int) * max_active_users);
- if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; }
+ // make pointers user data structures
+ users = malloc(sizeof(user_t) * max_active_users);
+ if (!users) { perror("malloc userdata"); return 1; }
+ tcpfd_to_user = malloc(sizeof(int) * max_active_users);
+ if (!tcpfd_to_user) { perror("malloc tcpfd to user"); return 1; }
- // SETUP SOCKETS TO LISTEN FOR NEW CONNECTIONS
+ // LISTENER LIST. SOCKER, and SELECT THREAD
// -------------------------------------------------------------------------------------------------
// setup the listener fd to accept new connections
FD_ZERO(&master); // clear the master set
@@ -140,19 +169,16 @@ main(int argc, char *argv[])
pthread_create(&select_thread, NULL, select_routine, listenerfd);
// BELOW IS FOR THE COMMAND LINE INTERFACE
- // -------------------------------------------------------------------------------------------------
-
+ // -------------------------------------------------------------------------------------------------s
// command line data structures
- char input[LINE_MAX];
- memset(input, 0, LINE_MAX);
- char *tokens[LINE_MAX / 2];
- // memset(tokens, 0, LINE_MAX / 2);
-
- printf("snowcast_server> ");
+ char input[COMMAND_LINE_MAX];
+ memset(input, 0, COMMAND_LINE_MAX);
+ char *tokens[COMMAND_LINE_MAX / 2];
+ printf("snowcast_server> "); // very cute to have :)
fflush(stdout);
- while (read(STDIN_FILENO, input, LINE_MAX) > 0) {
+ while (read(STDIN_FILENO, input, COMMAND_LINE_MAX) > 0) {
// init tokens
- memset(tokens, 0, LINE_MAX / 2);
+ memset(tokens, 0, COMMAND_LINE_MAX / 2);
// if 0, all whitespace
if (!parse(input, tokens)) {
@@ -160,10 +186,10 @@ main(int argc, char *argv[])
fflush(stdout);
continue;
}
-
+
char *command = tokens[0];
- // if q command: shutdown & close tcp sockets!
+ // if "q" command: shutdown & close tcp sockets!
if (!strcmp(command, "q")) {
printf("Exiting.\n");
cleanup_readfds_and_sockets();
@@ -171,7 +197,7 @@ main(int argc, char *argv[])
break;
}
- // if p command: print info
+ // if "p" command: print info
else if (!strcmp(command, "p")) {
int print_fd;
@@ -183,30 +209,30 @@ main(int argc, char *argv[])
if ((print_fd = open(output_file_path, O_CREAT | O_WRONLY | O_TRUNC, S_IRWXU)) == -1)
{
perror("open");
- memset(input, 0, LINE_MAX);
+ memset(input, 0, COMMAND_LINE_MAX);
continue;
}
} else print_fd = STDOUT_FILENO;
// print the info
- print_info_routine((void *)print_fd);
+ print_info_routine(print_fd);
}
- // if u command: print user data (debugging)
+ // if "u" command: print user data (debugging)
else if (!strcmp(command, "u"))
for (int i = 0; i < max_active_users; i++) print_user_data(i);
- // if u command: print station data (debugging)
+ // if "s" command: print station data (debugging)
else if (!strcmp(command, "s"))
for (int i = 0; i < num_stations; i++) print_station_data(i);
- // if log command: start logging (debugging)
+ // if "log" command: start logging (debugging)
else if (!strcmp(command, "log")) {
l= !l;
printf("logging is now %s\n", l ? "on" : "off");
}
- // if add command: add a station (EXTRA CREDIT)
+ // if "add" command: add a station (EXTRA CREDIT)
else if (!strcmp(command, "add")) { // add a new station
// get the file path
char *file_path = tokens[1];
@@ -216,7 +242,7 @@ main(int argc, char *argv[])
} else add_station(file_path); // add the station
}
- // if add command: remove a station (EXTRA CREDIT)
+ // if "remove" command: remove a station (EXTRA CREDIT)
else if (!strcmp(command, "remove")) { // remove a station
if (tokens[1] == NULL) {
printf("remove: must provide a station number to remove\n");
@@ -230,41 +256,55 @@ main(int argc, char *argv[])
printf("unknown command: %s\n", command);
}
- memset(input, 0, LINE_MAX);
+ memset(input, 0, COMMAND_LINE_MAX);
printf("snowcast_server> ");
fflush(stdout);
}
+
return 0;
}
-int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num) {
- // see if fd was closed at some point
- if (fd == -1) return -1;
+// ----------------------------------------------------------------------------------------------------------
+// 1) STATION DATA AND FUNCTIONS IMPLEMENTATIONS
+// ----------------------------------------------------------------------------------------------------------
+/*
+ given the command line arguments, setups the stations.
+ 1) mallocs the stations pointer
+ 2) assigns the stations' variables (filepath & opens file into readfd)
+ 3) starts the routine for each station
+*/
+int setup_stations(int argc, char *argv[]) {
+ num_stations = argc - 2;
- int bytes_read = read(fd, buffer, FILE_READ_SIZE);
- if (bytes_read < 0) { perror("read (in read file)"); return -1; }
- // printf("bytes read: %d\n", bytes_read);
- if (bytes_read == 0) {
- // printf("end of file, restarting\n");
- pthread_t send_announce_thread;
- pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num);
+ // get the size to malloc
+ int totalSize = 0;
+ for(int i = 2; i < argc; i++)
+ {
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]);
+ }
- if (lseek(fd, 0, SEEK_SET) == -1)
- {
- perror("lseek (in resarting file)");
- return -1;
- }
- bytes_read = read(fd, buffer, FILE_READ_SIZE);
- if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; }
+ // malloc the stations array
+ stations = malloc(totalSize);
+ if (!stations) { perror("malloc (stations pointer)"); return -1; }
+
+ // assign the stations, and start the threads
+ for (int i = 0; i < num_stations; i++) {
+ stations[i].filePath = argv[i+2];
+ stations[i].readfd = open(argv[i+2], O_RDONLY);
+ if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; }
+ pthread_create(&stations[i].streamThread, NULL, stream_routine, i);
}
- return bytes_read;
-}
-void *stream_routine_cleanup(void *arg) {
- int read_fd = (int) arg;
- close(read_fd);
+ return 1;
}
+/*
+ thread route for each station, given the station num
+ 1) reads the section of the file into a buffer
+ 2) creates a thread for each user that is listening to the station
+ 3) waits for the threads to be created, then broadcasts the cond variable to let them run
+ note: you can modify how often and how much is read off the file by changing the MACROS
+*/
void *stream_routine(void *arg) {
int station_num = (int) arg;
// printf("stream routine %d\n", station_num);
@@ -287,9 +327,9 @@ void *stream_routine(void *arg) {
int sent = 0;
for (int i = 0; i < max_active_users; i++)
{
- if (!user_data[i].sockfd || user_data[i].sockfd == -1)
+ if (!users[i].tcpfd || users[i].tcpfd == -1)
continue;
- if (user_data[i].stationNum == station_num)
+ if (users[i].stationNum == station_num)
{
// prepare the send buffer
// (note: using int* for easy pointer assignment)
@@ -299,7 +339,7 @@ void *stream_routine(void *arg) {
send_buffer[1] = bytes_read;
memcpy(send_buffer+2, buffer, bytes_read);
- // make thread
+ // make thread to send packet data
pthread_t send_udp_packet_thread;
pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer);
sent = 1;
@@ -325,105 +365,178 @@ void *stream_routine(void *arg) {
pthread_cleanup_pop(1);
}
+void *stream_routine_cleanup(void *arg) { // closes the read fd if thread is cancelled
+ int read_fd = (int) arg;
+ close(read_fd);
+}
-int setup_stations(int argc, char *argv[]) {
- num_stations = argc - 2;
+/*
+ reads the file into a buffer. returns the number of bytes read
+ note: if it reaches the end of a file, it will send an ANNOUNCE message to all users
+*/
+int read_file(int fd, char buffer[FILE_READ_SIZE], int station_num) {
+ // see if fd was closed at some point
+ if (fd == -1) return -1;
- // get the size to malloc
- int totalSize = 0;
- for(int i = 2; i < argc; i++)
- {
- totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]);
- }
+ int bytes_read = read(fd, buffer, FILE_READ_SIZE);
+ if (bytes_read < 0) { perror("read (in read file)"); return -1; }
+ // printf("bytes read: %d\n", bytes_read);
+ if (bytes_read == 0) {
+ // printf("end of file, restarting\n");
+ pthread_t send_announce_thread;
+ pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num);
- // malloc the stations array
- stations = malloc(totalSize);
- if (!stations) { perror("malloc (stations pointer)"); return -1; }
- // assign the stations, and start the threads
- for (int i = 0; i < num_stations; i++) {
- stations[i].filePath = argv[i+2];
- stations[i].readfd = open(argv[i+2], O_RDONLY);
- if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; }
- pthread_create(&stations[i].streamThread, NULL, stream_routine, i);
+ if (lseek(fd, 0, SEEK_SET) == -1)
+ {
+ perror("lseek (in resarting file)");
+ return -1;
+ }
+ bytes_read = read(fd, buffer, FILE_READ_SIZE);
+ if (bytes_read < 0) { perror("read (in read file, after restart)"); return -1; }
}
-
- // printf("successfully created %d stations\n", num_stations);
- return 1;
+ return bytes_read;
}
-// helper to write int as a string format buffer
-void write_int_to_fd(int fd, int n) {
- int len = snprintf(NULL, 0, "%d", n);
- char *num = malloc(len + 1);
- if (!num) { perror("malloc write to fd"); return; }
+// ----------------------------------------------------------------------------------------------------------
+// 2) USER DATA AND FUNCTIONS IMPLEMENTATIONS
+// ----------------------------------------------------------------------------------------------------------
+/*
+ given the newfd and udp_port of a newly connected user, initializes them to the data structure
+ note: this function does its best to optimize memory (dynamic resize of tcpfd map & resusing space in array)
+*/
+void init_user(int newfd, int udp_port) {
+ pthread_mutex_lock(&mutex_users);
- snprintf(num, len + 1, "%d", n);
- if (write(fd, num, strlen(num)) == -1) {
- perror("write");
+ // FOLLOWING IS FOR MEMORY OPTIMIZATION
+
+ // if the newfd is larger than the max, we need to resize the array
+ if(newfd > max_tcpfd) {
+ max_tcpfd = newfd*2; // double the array
+ int *more_tcpfd_to_user = realloc(tcpfd_to_user, sizeof(int) * (max_tcpfd + 1));
+ if (!more_tcpfd_to_user) { perror("realloc in init_user1"); exit(1); }
+ tcpfd_to_user = more_tcpfd_to_user;
}
- free(num);
-}
-void *print_info_routine(void *arg) {
- // unpack the fd to print to
- int print_fd = (int) arg;
+ // let's check if we can resuse the space from a previous user
+ int running_index = 0;
+ while(running_index < max_active_users) {
+ // -1 on tcpfd indicates that we have directly assigned it when "destroying a user"
+ // -> so a previous user was there, but discconnected
+ if (users[running_index].tcpfd == -1) {
+ break; // we found an index
+ }
+ running_index++;
+ }
- for (int i = 0; i < num_stations; i++) {
- // prints each station in the desired format
- write_int_to_fd(print_fd, i);
- char *comma = ",";
- write(print_fd, comma, strlen(comma));
+ if (running_index == max_active_users) {
+ // if we're here, we went through the whole array to no avail,
+ // so we need to extend users array
+ max_active_users++;
+ user_t *more_users = realloc(users, sizeof(user_t) * max_active_users);
+ if (!more_users) { perror("realloc in init_user2"); exit(1); }
+ users = more_users;
+ }
+
+ // map TCP tcpfd to this user index
+ users[running_index] = (user_t){udp_port, -1, newfd};
+ tcpfd_to_user[newfd] = running_index;
- // write file path
- char* file_path = stations[i].filePath;
- write(print_fd, file_path, strlen(file_path));
+ pthread_mutex_unlock(&mutex_users);
- // go through users, and print the udp ports
- for (int j = 0; j < max_active_users; j++) {
- if (!user_data[j].sockfd || user_data[j].sockfd == -1)
- continue;
- if (user_data[j].stationNum == i) {
- char *localhost_ip = ",127.0.0.1:"; //TODO: possibly update
- write(print_fd, localhost_ip, strlen(localhost_ip));
- // write udpPort
- write_int_to_fd(print_fd, user_data[j].udpPort);
- }
- }
- // wrtie new line
- char *newline = "\n";
- write(print_fd, newline, strlen(newline));
- }
+ // successfully created user, let's send the welcome
+ send_welcome_reply(newfd);
+}
+/*
+ given the tcpfd of a user, destroys them from the data structures
+ 1) close the tcpfd
+ 2) remove from master set
+ 3) remove from data structures
+ note: destroying a user sets all fields of that user to -1. often, this is how it's known if a user exists
+*/
+void destroy_user(int tcpfd) {
+ close(tcpfd); // bye!
+ FD_CLR(tcpfd, &master); // remove from master set
- // close the fd
- if (print_fd != STDOUT_FILENO) close(print_fd);
+ // remove user from data structures
+ pthread_mutex_lock(&mutex_users);
+ users[tcpfd_to_user[tcpfd]] = (user_t) {-1, -1, -1};
+ // map tcpfd to -1
+ tcpfd_to_user[tcpfd] = -1;
- return (NULL);
+ pthread_mutex_unlock(&mutex_users);
}
-int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen)
-{
- int total = 0; // how many bytes we've sent
- int bytesleft = *len; // how many we have left to send
- int n;
+// ----------------------------------------------------------------------------------------------------------
+// 3) UDP STREAM DATA AND FUNCTIONS IMPLEMENTATIONS
+// ----------------------------------------------------------------------------------------------------------
+/*
+ thread routine that streams the binary data to the user
+ 1) unpack the arguments (see struct send_udp_packet_routine_args)
+ 2) setup udp socket
+ 3) wait for the cond variable to be broadcasted
+ 4) send the data over udp
+*/
+void *send_udp_packet_routine(void *arg) {
+ // unpack args
+ int *buf = arg;
+ int user_index = buf[0];
+ int buffer_size = buf[1];
+ char *data_ptr = buf + 2; // add two will skip first ints, since int* buf
- while(total < *len) {
- n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, addr, addrlen);
- if (n == -1) { break; }
- total += n;
- bytesleft -= n;
+ // setup variables
+ int did_work = 1;
+ pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
+ // only these two following properties of addrinfo are needed to send udp packets
+ struct sockaddr addr;
+ socklen_t addrlen;
+
+ // setup udp socket for this thread/user
+ int udp_port = users[user_index].udpPort;
+ int udp_sockfd;
+ if (setup_udp_connection(udp_port, &udp_sockfd, &addrlen, &addr) == -1){
+ fprintf(stderr, "failure in setup_udp_connection");
+ return (NULL); // error occured
}
- *len = total; // return number actually sent here
+ // setup cleanup for thread that closes the fd if closed
+ pthread_cleanup_push(send_udp_packet_routine_cleanup, (void *)udp_sockfd);
- return n==-1?-1:0; // return -1 on failure, 0 on success
-}
+ pthread_mutex_lock(&m);
+ // wait for threads
+ did_work = 0;
+ while (!start_threads)
+ {
+ pthread_cond_wait(&cond, &m);
+ }
+
+ int station_num = users[user_index].stationNum;
+ if (station_num == -1) {
+ did_work = 1;
+ }
+
+ // send the data! (no error check since udp)
+ send_all_udp(udp_sockfd, data_ptr, &buffer_size, &addr, addrlen);
+
+ // cleanup
+ close(udp_sockfd);
+
+ pthread_mutex_unlock(&m);
+ pthread_cleanup_pop(1);
-void udp_port_cleanup_handler(void *arg)
+ return (NULL);
+}
+void *send_udp_packet_routine_cleanup(void *arg) // closes the udpfd if thread is cancelled
{
int sockfd = (int) arg;
close(sockfd);
+ return (NULL);
}
+/*
+ given the udp_port, makes a socket and returns it's file descriptor. (returns -1 on failure)
+ note: pointers to the desired vairables to be set (udp_sockfd, addrlen, addr) must be passed in,
+ this function will also set those variables
+*/
int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, struct sockaddr* addr) {
// setup hints to get udp_port
struct addrinfo thread_hints, *thread_res, *thread_servinfo;
@@ -444,7 +557,7 @@ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, stru
if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0)
{
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code));
- return 0;
+ return -1;
}
free(port);
@@ -460,7 +573,7 @@ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, stru
}
if (sock == NULL) {
fprintf(stderr, "socket: failed to create udp socket(setup_udp_connection)\n");
- return 0;
+ return -1;
}
*udp_sockfd = sock;
@@ -470,71 +583,36 @@ int setup_udp_connection(int udp_port, int *udp_sockfd, socklen_t *addrlen, stru
return 1;
}
-/* Make the manager routine */
-void *send_udp_packet_routine(void *arg) {
- // unpack args
- int *buf = arg;
- int user_index = buf[0];
- int buffer_size = buf[1];
- char *data_ptr = buf + 2; // add two will skip first ints, since int* buf
-
- // setup variables
- int did_work = 1;
- pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
- // only these two following properties of addrinfo are needed to send udp packets
- struct sockaddr addr;
- socklen_t addrlen;
-
- // setup udp socket for this thread/user
- int udp_port = user_data[user_index].udpPort;
- int udp_sockfd;
- if (setup_udp_connection(udp_port, &udp_sockfd, &addrlen, &addr) == -1){
- fprintf(stderr, "failure in setup_udp_connection");
- return (NULL); // error occured
- }
-
- // setup cleanup for thread that closes the fd if closed
- pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd);
-
- pthread_mutex_lock(&m);
- // wait for threads
- did_work = 0;
- while (!start_threads)
- {
- pthread_cond_wait(&cond, &m);
- }
+/*
+ sends the binary data over udp, but limits individual packets to MAX_PACKET_SIZE
+ note: the use of pointers modifying the arguments
+*/
+int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen)
+{
+ int total = 0; // how many bytes we've sent
+ int bytesleft = *len; // how many we have left to send
+ int n;
- int station_num = user_data[user_index].stationNum;
- if (station_num == -1) {
- did_work = 1;
+ while(total < *len) {
+ n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, addr, addrlen);
+ if (n == -1) { break; }
+ total += n;
+ bytesleft -= n;
}
- // send the data! (no error check since udp)
- send_all_udp(udp_sockfd, data_ptr, &buffer_size, &addr, addrlen);
-
- // cleanup
- close(udp_sockfd);
-
- pthread_mutex_unlock(&m);
- pthread_cleanup_pop(1);
-
- return (NULL);
-}
+ *len = total; // return number actually sent here
-void *send_announce_routine(void *arg) {
- // unpack arg
- int station_num = (int) arg;
- // send the announce messages
- for (int i = 0; i < max_active_users; i++)
- {
- if (user_data[i].sockfd == 0 || user_data[i].sockfd == -1)
- continue;
- // send announce reply to each user
- if (user_data[i].stationNum == station_num)
- send_announce_reply(user_data[i].sockfd, station_num);
- }
-}
+ return n==-1?-1:0; // return -1 on failure, 0 on success
+}
+// ----------------------------------------------------------------------------------------------------------
+// 4) TCP LISTENER DATA AND FUNCTIONS IMPLEMENTATIONS
+// ----------------------------------------------------------------------------------------------------------
+/*
+ given the port, sets up the listener socket on that port (getaddrinfo->socket->bind->listen)
+ returns the file descriptor of the socket!
+ note: ends whole program on failure
+*/
int setup_listener(const char* port) {
int listener; // listening socket descriptor, to be set
@@ -590,55 +668,11 @@ int setup_listener(const char* port) {
return listener;
}
-int handle_client_command(int clientfd) {
- uint8_t command_type = -1;
- if (recv(clientfd, &command_type, 1, 0) <= 0) // check user has disconnected
- {
- if (l) printf("socket %d HUNGUP. lost connection.\n", clientfd);
-
- destroy_user(clientfd);
- return 0;
- }
-
- // check for each command type...
- if (command_type == HELLO) {
- // if user already sent a HELLO, send back in invalid command
- // since only the only the handshake handles HELLO
- if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", clientfd);
-
- char * message = "must not sent more than one Hello message";
- send_invalid_reply(clientfd, strlen(message), message);
- destroy_user(clientfd);
- return 0;
- }
-
- if (command_type == SETSTATION) {
- if(!handle_setstation_command(clientfd)) { // failure
- // remove user
- destroy_user(clientfd);
- return 0;
- }
- return 1; // sucess, return 1
- }
-
- if (command_type == LISTSTATIONS) {
- if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd);
-
- pthread_t send_thread; // send STATIONINFO from a different thread
- pthread_create(&send_thread, NULL, send_stationsinfo_reply, clientfd);
- return 1;
- }
-
- // if we're here, we got an invalid/unknown command
- if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, clientfd);
- char *message = "invalid command type";
- send_invalid_reply(clientfd, strlen(message), message);
- destroy_user(clientfd);
- return 0;
-}
-
+/*
+ calls accept() when the listener sees has a new connection
+ returns 1 on success, 0 on failure
+ */
int handle_new_connection(int listener) {
-
// accept new connection
struct sockaddr_storage remoteaddr;
socklen_t addrlen = sizeof remoteaddr;
@@ -665,11 +699,16 @@ int handle_new_connection(int listener) {
fdmax = newfd;
}
- // TODO: thread this
- init_user_routine(newfd, udp_port);
+ init_user(newfd, udp_port);
return 1;
}
+/*
+ thread routine that manages the select() call
+ 1) copies the master set
+ 2) calls select()
+ 3) handles new connections or responds on new data from clients
+*/
void *select_routine(void *arg) {
int listener = (int) arg; // the listener fd
@@ -705,181 +744,61 @@ void *select_routine(void *arg) {
return (NULL);
}
-void *init_user_routine(int newfd, int udp_port) {
- pthread_mutex_lock(&mutex_user_data);
-
- // FOLLOWING IS FOR MEMORY OPTIMIZATION
-
- // if the newfd is larger than the max, we need to resize the array
- if(newfd > max_sockfd) {
- max_sockfd = newfd*2; // double the array
- int *more_sockfd_to_user = realloc(sockfd_to_user, sizeof(int) * (max_sockfd + 1));
- if (!more_sockfd_to_user) { perror("realloc in init_user_routine1"); exit(1); }
- sockfd_to_user = more_sockfd_to_user;
- }
-
- // let's check if we can resuse the space from a previous user
- int running_index = 0;
- while(running_index < max_active_users) {
- // -1 on sockfd indicates that we have directly assigned it when "destroying a user"
- // -> so a previous user was there, but discconnected
- if (user_data[running_index].sockfd == -1) {
- break; // we found an index
- }
- running_index++;
- }
-
- if (running_index == max_active_users) {
- // if we're here, we went through the whole array to no avail,
- // so we need to extend users array
- max_active_users++;
- user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users);
- if (!more_users) { perror("realloc in init_user_routine2"); exit(1); }
- user_data = more_users;
- }
-
- // map TCP sockfd to this user index
- user_data[running_index] = (user_t){udp_port, -1, newfd};
- sockfd_to_user[newfd] = running_index;
-
- pthread_mutex_unlock(&mutex_user_data);
-
- // successfully created user, let's send the welcome
- send_welcome_reply(newfd);
-
- return (NULL);
-}
-
-
-void *update_user_station(int sockfd, int stationNum) {
- pthread_mutex_lock(&mutex_user_data);
- user_data[sockfd_to_user[sockfd]].stationNum = stationNum;
- pthread_mutex_unlock(&mutex_user_data);
-}
-void *print_user_data(int index) {
- printf("user: %d -> udpPort: %d, stationNum: %d, sockfd: %d\n", index,
- user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd);
-}
-void *print_station_data(int station) {
- printf("station: %d -> filePath: %s, readfd: %d\n",
- station, stations[station].filePath, stations[station].readfd);
-}
-
-void destroy_user(int sockfd) {
- close(sockfd); // bye!
- FD_CLR(sockfd, &master); // remove from master set
-
- // remove user from data structures
- pthread_mutex_lock(&mutex_user_data);
- user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1};
- // map sockfd to -1
- sockfd_to_user[sockfd] = -1;
-
- pthread_mutex_unlock(&mutex_user_data);
-}
-
-
-void *get_in_addr(struct sockaddr *sa)
-{
- if (sa->sa_family == AF_INET) {
- return &(((struct sockaddr_in*)sa)->sin_addr);
- }
-
- return &(((struct sockaddr_in6*)sa)->sin6_addr);
-}
-
-void send_announce_reply(int fd, int station_num) {
- if (l) printf("sending ANNNOUNCE reply to socket %d\n", fd);
-
-
- char* file_path = stations[station_num].filePath;
- int len_file_path = strlen(file_path);
-
- char *send_buffer = malloc(len_file_path+2);
- if (!send_buffer) {
- perror("malloc in send announce");
- return;
- }
- send_buffer[0] = 3;
- send_buffer[1] = len_file_path;
-
- memcpy(send_buffer + 2, file_path, len_file_path);
-
- size_t bytes_to_send = len_file_path + 2;
- if (send_all(fd, send_buffer, &bytes_to_send) == -1)
- perror("send_all");
-
- free(send_buffer);
-}
+// ----------------------------------------------------------------------------------------------------------
+// 5) PROTOCOL REPLY AND COMMAND FUNCTIONS IMPLEMENTATIONS
+// ----------------------------------------------------------------------------------------------------------
+/*
+ given the tcpfd of a user under the impression there is new data, handles the command logic
+ 1) recv the command type (only 1 byte, so ok)
+ 2) check for each command type
+ 3) after basic error checking, calls the function that executes the reply (this may be threaded out)
+ note: returns 0 on failure and will destroy the user (returns 1 on suncess)
+*/
+int handle_client_command(int clientfd) {
+ uint8_t command_type = -1;
+ if (recv(clientfd, &command_type, 1, 0) <= 0) // check user has disconnected
+ {
+ if (l) printf("socket %d HUNGUP. lost connection.\n", clientfd);
-void send_invalid_reply(int fd, size_t message_size, char* message) {
- char *send_buffer = malloc(message_size+2);
- if (!send_buffer) {
- perror("malloc in send invalid command");
- return;
+ destroy_user(clientfd);
+ return 0;
}
- // type and payload size
- send_buffer[0] = 4;
- send_buffer[1] = message_size;
-
- memcpy(send_buffer + 2, message, message_size);
-
- int bytes_to_send = message_size + 2;
- if (send_all(fd, send_buffer, &bytes_to_send) == -1)
- perror("send");
-
- free(send_buffer);
-}
-
-void *send_stationsinfo_reply(void * arg) {
- int fd = (int) arg;
-
- if (l) printf("sending STATIONSINFO reply to socket %d\n", fd);
+ // check for each command type
+ if (command_type == HELLO) {
+ // if user already sent a HELLO, send back in invalid command
+ // since only the only the handshake handles HELLO
+ if (l) printf("received an extraneous HELLO from socket %d. sending INVALID reply.\n", clientfd);
- uint32_t reply_size = 0;
- for (int i = 0; i < num_stations; i++) {
- if (stations[i].readfd != -1)
- reply_size += snprintf(NULL, 0, "station #%d -> %s\n", i, stations[i].filePath);
+ char * message = "must not sent more than one Hello message";
+ send_invalid_reply(clientfd, strlen(message), message);
+ destroy_user(clientfd);
+ return 0;
}
- reply_size--; // don't want final \n
- // send type
- uint8_t reply_num = 6;
- if (send(fd, &reply_num, 1, 0) == -1)
- perror("send in send stations info");
-
- // send payload size
- uint32_t reply_size_endian = htonl(reply_size);
- int bytes = sizeof(uint32_t);
- if (send_all(fd, &reply_size_endian, &bytes) == -1)
- perror("send_all in send stations info");
-
- char send_buffer[reply_size];
- int ptr = 0;
- for (int i = 0; i < num_stations; i++) {
- ptr += sprintf(send_buffer + ptr, "station %d -> %s\n", i, stations[i].filePath);
+ if (command_type == SETSTATION) {
+ if(!handle_setstation_command(clientfd)) { // failure
+ // remove user
+ destroy_user(clientfd);
+ return 0;
+ }
+ return 1; // sucess, return 1
}
- int bytes_to_send = reply_size; // don't want final \n
- if (send_all(fd, &send_buffer, &bytes_to_send) == -1)
- perror("send_all buffer");
-
- return (NULL);
-}
-
-// Parses a buffer into tokens, from cs33 :)
-int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) {
- const char *regex = " \n\t\f\r";
- char *current_token = strtok(buffer, regex);
- if (current_token == NULL) return 0;
+ if (command_type == LISTSTATIONS) {
+ if (l) printf("received a LISTSTATIONS from socket %d\n", clientfd);
- for (int i = 0; current_token != NULL; i++) {
- tokens[i] = current_token;
- current_token = strtok(NULL, regex);
+ pthread_t send_thread; // send STATIONINFO from a different thread
+ pthread_create(&send_thread, NULL, send_stationsinfo_routine, clientfd);
+ return 1;
}
- return 1;
+ // if we're here, we got an invalid/unknown command
+ if (l) printf("received unknown command type %d from socket %d. sending INVALID reply.\n", command_type, clientfd);
+ char *message = "invalid command type";
+ send_invalid_reply(clientfd, strlen(message), message);
+ destroy_user(clientfd);
+ return 0;
}
uint16_t handle_handshake(int newfd)
@@ -921,9 +840,7 @@ uint16_t handle_handshake(int newfd)
return ntohs(udp_port);
}
-
-
-int send_welcome_reply(int fd) {
+uint8_t send_welcome_reply(int fd) { // returns 0 on failure
if(l) printf("sending WELCOME reply to socket %d\n", fd);
struct Welcome welcome;
@@ -931,108 +848,266 @@ int send_welcome_reply(int fd) {
welcome.numStations = htons(num_stations);
int bytes_to_send = sizeof(struct Welcome);
if (send_all(fd, &welcome, &bytes_to_send) == -1) {
- perror("send_all (in init_user_routine)");
- return -1;
+ perror("send_all (in send_welcome_reply)");
+ return 0;
}
return 1;
}
-int handle_setstation_command(int sockfd) {
+/*
+ given the socket fd of the user, handles the SETSTATION command
+ note: returns 0 on failure and DOES NOT remove the user
+*/
+uint8_t handle_setstation_command(int tcpfd) {
// get the station number
uint16_t station_number;
int bytes_to_read = sizeof(uint16_t);
- if (recv_all(sockfd, &station_number, &bytes_to_read) == -1) {
+ if (recv_all(tcpfd, &station_number, &bytes_to_read) == -1) {
perror("recv_all");
return 0;
}
station_number = ntohs(station_number);
// check if user has a udpPort to stream to
- if (user_data[sockfd_to_user[sockfd]].udpPort == -1) {
- if (l) printf("received a SETSTATION from socket %d before HELLO command. sending INVALID reply.\n", sockfd);
+ if (users[tcpfd_to_user[tcpfd]].udpPort == -1) {
+ if (l) printf("received a SETSTATION from socket %d before HELLO command. sending INVALID reply.\n", tcpfd);
// send back in invalid command and drop user
char * message = "must send Hello message first";
- send_invalid_reply(sockfd, strlen(message), message);
+ send_invalid_reply(tcpfd, strlen(message), message);
return 0;
}
- if (l) printf("received a SETSTATION from socket %d with station_number %u\n", sockfd, station_number);
+ if (l) printf("received a SETSTATION from socket %d with station_number %u\n", tcpfd, station_number);
// check if station num is in range
if (station_number >= num_stations || station_number < 0) {
- if (l) printf("station number invalid from socket %d. sending INVALID reply.\n", sockfd);
+ if (l) printf("station number invalid from socket %d. sending INVALID reply.\n", tcpfd);
// send back in invalid command and drop user
char * message = "station number out of range";
- send_invalid_reply(sockfd, strlen(message), message);
+ send_invalid_reply(tcpfd, strlen(message), message);
return 0;
}
// check if station num has been removed
if (stations[station_number].readfd == -1) {
- send_stationshutdown_reply(sockfd);
+ send_stationshutdown_reply(tcpfd);
return 1;
}
- update_user_station(sockfd, station_number);
- send_announce_reply(sockfd, station_number);
+ update_user_station(tcpfd, station_number);
+ send_announce_reply(tcpfd, station_number);
return 1;
}
-void send_stationshutdown_reply(int fd) {
- if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd);
+void update_user_station(int tcpfd, int stationNum) {
+ pthread_mutex_lock(&mutex_users);
+ users[tcpfd_to_user[tcpfd]].stationNum = stationNum;
+ pthread_mutex_unlock(&mutex_users);
+}
- uint8_t reply_num = 7;
- if (send(fd, &reply_num, 1, 0) == -1)
- perror("send in send stationshutdown");
+/*
+ given the station number of a station (as void* arg), sends an ANNOUNCE reply to all users on that station
+ note: this is a thread routine, but may not always be run in a thread
+*/
+void *send_announce_routine(void *arg) {
+ // unpack arg
+ int station_num = (int) arg;
+ // send the announce messages
+ for (int i = 0; i < max_active_users; i++)
+ {
+ if (users[i].tcpfd == 0 || users[i].tcpfd == -1)
+ continue;
+ // send announce reply to each user
+ if (users[i].stationNum == station_num)
+ send_announce_reply(users[i].tcpfd, station_num);
+ }
}
-void destroy_station(int station_num) {
- // check if station num is in range
- if (station_num >= num_stations || station_num < 0) {
- printf("remove: station number %d is out of range\n", station_num);
+/*
+ given the socket fd and station number of the user, sends an ANNOUNCE reply to that user
+*/
+void send_announce_reply(int fd, int station_num) {
+ if (l) printf("sending ANNNOUNCE reply to socket %d\n", fd);
+
+ // get the file path
+ char* file_path = stations[station_num].filePath;
+ int len_file_path = strlen(file_path);
+
+ // make & fill the buffer to send
+ char *send_buffer = malloc(len_file_path+2);
+ if (!send_buffer) {
+ perror("malloc in send announce");
return;
}
- // check if station had been removed
- if (stations[station_num].readfd == -1) {
- printf("remove: station %d has already been removed\n", station_num);
+ send_buffer[0] = ANNOUNCE;
+ send_buffer[1] = len_file_path;
+ memcpy(send_buffer + 2, file_path, len_file_path);
+
+ // send it
+ int bytes_to_send = len_file_path + 2;
+ if (send_all(fd, send_buffer, &bytes_to_send) == -1)
+ perror("send_all");
+
+ // cleanup
+ free(send_buffer);
+}
+
+void send_invalid_reply(int fd, size_t message_size, char* message) {
+ if (l) printf("sending INVALID reply to socket %d\n", fd);
+
+ char *send_buffer = malloc(message_size+2);
+ if (!send_buffer) {
+ perror("malloc in send invalid command");
return;
}
- // send the stationshutdown command to users
- for (int j = 0; j < max_active_users; j++) {
- if (!user_data[j].sockfd || user_data[j].sockfd == -1)
- continue;
- if (user_data[j].stationNum == station_num) {
- send_stationshutdown_reply(user_data[j].sockfd);
- user_data[j].stationNum = -1;
- }
+ // make & fill the buffer to send
+ send_buffer[0] = INVALID;
+ send_buffer[1] = message_size;
+ memcpy(send_buffer + 2, message, message_size);
+
+ // send!
+ int bytes_to_send = message_size + 2;
+ if (send_all(fd, send_buffer, &bytes_to_send) == -1)
+ perror("send");
+
+ free(send_buffer);
+}
+
+// ----------------------------------------------------------------------------------------------------------
+// 6) PROTOCOL REPLY AND COMMAND FUNCTIONS IMPLEMENTATIONS
+// ----------------------------------------------------------------------------------------------------------
+/*
+ parses a buffer into tokens, from cs33 :)
+*/
+int parse(char buffer[COMMAND_LINE_MAX], char *tokens[COMMAND_LINE_MAX / 2])
+{
+ const char *regex = " \n\t\f\r";
+ char *current_token = strtok(buffer, regex);
+ if (current_token == NULL) return 0;
+
+ for (int i = 0; current_token != NULL; i++) {
+ tokens[i] = current_token;
+ current_token = strtok(NULL, regex);
}
- // cancel the stream's thread and close the read fd
- pthread_cancel(stations[station_num].streamThread);
- close(stations[station_num].readfd);
- stations[station_num].readfd = -1;
+ return 1;
+}
- printf("remove: successfully removed station %d\n", station_num);
+/*
+ given a file descriptor to write to, prints the station/user info in the desired format
+ note: it will close the fd inputted, except for stdout
+*/
+void print_info_routine(int fd) {
+ // for each station, print the info
+ for (int i = 0; i < num_stations; i++) {
+
+ // prints each station in the desired format
+ write_int_to_fd(fd, i);
+ char *comma = ",";
+ write(fd, comma, strlen(comma));
+
+ // write file path
+ char* file_path = stations[i].filePath;
+ write(fd, file_path, strlen(file_path));
+
+ // go through users, and print the udp ports
+ for (int j = 0; j < max_active_users; j++) {
+ if (!users[j].tcpfd || users[j].tcpfd == -1)
+ continue;
+ if (users[j].stationNum == i) {
+ char *localhost_ip = ",127.0.0.1:"; //TODO: possibly update
+ write(fd, localhost_ip, strlen(localhost_ip));
+ // write udpPort
+ write_int_to_fd(fd, users[j].udpPort);
+ }
+ }
+ // wrtie new line
+ char *newline = "\n";
+ write(fd, newline, strlen(newline));
+ }
+
+ // close the fd
+ if (fd != STDOUT_FILENO) close(fd);
}
-void send_newstation_reply(uint16_t station_num) {
- if (l) printf("sending NEWSTATION reply to all sockets\n");
+/* helper to write int as a string format buffer to an fd */
+void write_int_to_fd(int fd, int n) {
+ int len = snprintf(NULL, 0, "%d", n);
+ char *num = malloc(len + 1);
+ if (!num) { perror("malloc write to fd"); return; }
- // make the struct
- uint8_t reply_num = 8;
- uint16_t station_num_n = htons(station_num);
- struct NewStation new_station = {reply_num, station_num_n};
- // send the message to each (valid) user
+ snprintf(num, len + 1, "%d", n);
+ if (write(fd, num, strlen(num)) == -1) {
+ perror("write");
+ }
+ free(num);
+}
+void *print_user_data(int index) {
+ printf("user: %d -> udpPort: %d, stationNum: %d, tcpfd: %d\n", index,
+ users[index].udpPort, users[index].stationNum, users[index].tcpfd);
+}
+void *print_station_data(int station) {
+ printf("station: %d -> filePath: %s, readfd: %d\n",
+ station, stations[station].filePath, stations[station].readfd);
+}
+/*
+ to be called for a "graceful" exit
+ note: this will not close the listener
+*/
+void cleanup_readfds_and_sockets() {
+ // close all the file descriptors for the stations
+ for (int i = 0; i < num_stations; i++) {
+ close(stations[i].readfd);
+ }
+ // close all the tcp connections
for (int i = 0; i < max_active_users; i++) {
- if (!user_data[i].sockfd || user_data[i].sockfd == -1)
- continue;
- int bytes_to_send = sizeof(struct NewStation);
- if (send_all(user_data[i].sockfd, &new_station, &bytes_to_send) == -1)
- perror("send_all in newstation reply");
+ if (users[i].tcpfd != -1) {
+ close(users[i].tcpfd);
+ }
+ }
+}
+
+// ----------------------------------------------------------------------------------------------------------
+// 7) EXTRA CREDIT IMPLEMENTATIONS (no broad function comments below, but it is all pretty self explanatory)
+// ----------------------------------------------------------------------------------------------------------
+void *send_stationsinfo_routine(void * arg) {
+ int fd = (int) arg;
+
+ if (l) printf("sending STATIONSINFO reply to socket %d\n", fd);
+
+ uint32_t reply_size = 0;
+ for (int i = 0; i < num_stations; i++) {
+ if (stations[i].readfd != -1)
+ reply_size += snprintf(NULL, 0, "station #%d -> %s\n", i, stations[i].filePath);
+ }
+ reply_size--; // don't want final \n
+
+ // send type
+ uint8_t reply_num = 6;
+ if (send(fd, &reply_num, 1, 0) == -1)
+ perror("send in send stations info");
+
+ // send payload size
+ uint32_t reply_size_endian = htonl(reply_size);
+ int bytes = sizeof(uint32_t);
+ if (send_all(fd, &reply_size_endian, &bytes) == -1)
+ perror("send_all in send stations info");
+
+ char send_buffer[reply_size];
+ int ptr = 0;
+ for (int i = 0; i < num_stations; i++) {
+ if (stations[i].readfd != -1)
+ ptr += sprintf(send_buffer + ptr, "station #%d -> %s\n", i, stations[i].filePath);
}
+
+ int bytes_to_send = reply_size; // don't want final \n
+ if (send_all(fd, &send_buffer, &bytes_to_send) == -1)
+ perror("send_all buffer");
+
+ return (NULL);
}
void add_station(char *file_path) {
@@ -1080,20 +1155,66 @@ void add_station(char *file_path) {
pthread_create(&stations[num_stations].streamThread, NULL, stream_routine, num_stations);
send_newstation_reply(num_stations);
- num_stations++;
printf("add: successfully created station @ index %d\n", num_stations);
+
+ num_stations++;
}
-void cleanup_readfds_and_sockets() {
- // close all the file descriptors for the stations
- for (int i = 0; i < num_stations; i++) {
- close(stations[i].readfd);
+void destroy_station(int station_num) {
+ // check if station num is in range
+ if (station_num >= num_stations || station_num < 0) {
+ printf("remove: station number %d is out of range\n", station_num);
+ return;
+ }
+ // check if station had been removed
+ if (stations[station_num].readfd == -1) {
+ printf("remove: station %d has already been removed\n", station_num);
+ return;
}
- // close all the tcp connections
- for (int i = 0; i < max_active_users; i++) {
- if (user_data[i].sockfd != -1) {
- close(user_data[i].sockfd);
+ // send the stationshutdown command to users
+ for (int j = 0; j < max_active_users; j++) {
+ if (!users[j].tcpfd || users[j].tcpfd == -1)
+ continue;
+ if (users[j].stationNum == station_num) {
+ send_stationshutdown_reply(users[j].tcpfd);
+ users[j].stationNum = -1;
}
}
+
+ // cancel the stream's thread and close the read fd
+ pthread_cancel(stations[station_num].streamThread);
+ close(stations[station_num].readfd);
+ stations[station_num].readfd = -1;
+
+ printf("remove: successfully removed station %d\n", station_num);
}
+
+void send_newstation_reply(uint16_t station_num) {
+ if (l) printf("sending NEWSTATION reply to all sockets\n");
+
+ // make the struct
+ uint8_t reply_num = 8;
+ uint16_t station_num_n = htons(station_num);
+ struct NewStation new_station = {reply_num, station_num_n};
+ // send the message to each (valid) user
+ for (int i = 0; i < max_active_users; i++) {
+ if (!users[i].tcpfd || users[i].tcpfd == -1)
+ continue;
+ int bytes_to_send = sizeof(struct NewStation);
+ if (send_all(users[i].tcpfd, &new_station, &bytes_to_send) == -1)
+ perror("send_all in newstation reply");
+ }
+}
+
+void send_stationshutdown_reply(int fd) {
+ if (l) printf("sending STATIONSHUTDOWN reply to socket %d\n", fd);
+
+ uint8_t reply_num = 7;
+ if (send(fd, &reply_num, 1, 0) == -1)
+ perror("send in send stationshutdown");
+}
+
+// ---------------------------------------------------------------------------------------------------------
+// EOF :)
+// --------------------------------------------------------------------------------------------------------- \ No newline at end of file
diff --git a/snowcast_control b/snowcast_control
index e76cedf..4b59e54 100755
--- a/snowcast_control
+++ b/snowcast_control
Binary files differ
diff --git a/snowcast_listener b/snowcast_listener
index f27a5c1..455fcdc 100755
--- a/snowcast_listener
+++ b/snowcast_listener
Binary files differ
diff --git a/snowcast_server b/snowcast_server
index d3f5e04..9733366 100755
--- a/snowcast_server
+++ b/snowcast_server
Binary files differ