aboutsummaryrefslogtreecommitdiff
path: root/new.c
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-23 07:08:28 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-23 07:08:28 -0400
commitd93da5af53d6beb9a2339839aa47fbbbbeafc208 (patch)
tree7f9d2013845137979e0805b96585ae54f9cdb47f /new.c
parent2707112ffc0b0eed6af8271d32f94e1622203a80 (diff)
bitrate still low, but pass the no read from file test
Diffstat (limited to 'new.c')
-rw-r--r--new.c497
1 files changed, 497 insertions, 0 deletions
diff --git a/new.c b/new.c
new file mode 100644
index 0000000..e180200
--- /dev/null
+++ b/new.c
@@ -0,0 +1,497 @@
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/syscall.h>
+#include <pthread.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+
+#include "protocol.c"
+
+
+
+#define LINE_MAX 1024
+#define MAX_RATE_PER_SECOND 16*1024
+#define TCP_TIMEOUT 100000 // 100ms in microseconds
+
+typedef struct station {
+ pthread_t thread;
+ int fd;
+ char *path;
+} station_t;
+
+int num_stations;
+station_t *stations;
+setup_stations(int argc, char *argv[]);
+*stream_routine(void *arg);
+
+int setup_listener(int port);
+*accept_routine(void *arg);
+
+// user data structure
+typedef struct user {
+ int tcpfd;
+ int udpfd;
+ pthread_t command_thread;
+ int station;
+} user_t;
+int num_users = 0;
+user_t *users;
+int tcpfd_max = 0;
+int *fd_to_user;
+pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER;
+*init_user_routine(void *arg);
+int init_user(int tcpfd, int udpfd);
+void destroy_user(int fd);
+*command_routine(void *arg);
+
+send_invalid_reply(int fd, char *message) {
+ printf("sending INVALID reply to socket %d\n", fd);
+ size_t message_size = strlen(message);
+ char buf[message_size + 2];
+ // type and payload size
+ buf[0] = 4;
+ buf[1] = message_size;
+ memcpy(buf + 2, message, message_size);
+
+ int size_buf = message_size + 2;
+ if (send_all(fd, &buf, &size_buf) == -1) {
+ perror("send_all (in init_user_routine)");
+ return -1;
+ }
+
+ return 1;
+}
+
+print_users();
+
+main(int argc, char *argv[]) {
+ if (argc < 3) {
+ printf("usage: ./snowcast_server <port> <file0> [file 1] [file 2] ...\n");
+ exit(1);
+ }
+
+ setup_stations(argc, argv);
+
+ users = malloc(0);
+ fd_to_user = malloc(0);
+
+ int port = atoi(argv[1]);
+ int listenerfd = setup_listener(port);
+ pthread_t accept_thread;
+ pthread_create(&accept_thread, NULL, accept_routine, listenerfd);
+
+
+ while(1)
+ ;
+}
+
+setup_stations(int argc, char *argv[]) {
+ num_stations = argc - 2;
+
+ // get the size to malloc
+ int totalSize = 0;
+ for(int i = 2; i < argc; i++)
+ {
+ // printf("file: %s\n", argv[i]);
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]);
+ }
+
+ // malloc the stations array
+ stations = malloc(totalSize);
+ if (!stations) { perror("malloc (stations pointer)"); exit(1); }
+ // assign the stations, and start the threads
+ for (int i = 0; i < num_stations; i++) {
+ stations[i].path = argv[i+2];
+ stations[i].fd = open(argv[i+2], O_RDONLY);
+ printf(stations[i].path);
+ if (stations[i].fd < 0) { perror("read (from station file)"); exit(1); }
+ pthread_create(&stations[i].thread, NULL, stream_routine, &stations[i].fd);
+ }
+
+ printf("successfully created %d stations\n", num_stations);
+}
+
+int setup_listener(int port) {
+ int sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) { perror("socket (listener)"); return -1; }
+
+ struct sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = INADDR_ANY;
+
+ if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+ perror("bind (listener)");
+ return -1;
+ }
+
+ if (listen(sock, 0) < 0) { perror("listen (listener)"); return -1; }
+
+ return sock;
+}
+
+// THREAD FUNCTIONS
+
+// helper
+int read_file(int fd, char buffer[MAX_RATE_PER_SECOND]) {
+ int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ if (bytes_read < 0) { perror("read (from station file)"); return -1; }
+ // printf("bytes read: %d\n", bytes_read);
+ if (bytes_read == 0) {
+ // printf("end of file, restarting\n");
+ if(lseek(fd, 0, SEEK_SET) == -1) { perror("lseek (in resarting file)"); return -1; }
+ bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; }
+ }
+
+ return bytes_read;
+}
+
+*stream_cleanup(void *arg) {
+ int fd = *(int*)arg;
+ printf("cleanup/delete station\n");
+ return (NULL);
+}
+
+*stream_routine(void *arg) {
+ int fd = *(int*)arg;
+
+ pthread_cleanup_push(stream_cleanup, fd);
+
+ // make buffer which will be used to stream to children
+ char buffer[MAX_RATE_PER_SECOND];
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
+ // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); }
+
+ for (;;)
+ {
+ // load bytes into buffer
+ int bytes_read = read_file(fd, buffer);
+ if (bytes_read == -1) { exit(1); }
+
+ // TODO: send buffer to children
+
+ sleep(1);
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
+ }
+
+ pthread_cleanup_pop(1);
+
+ return (NULL);
+}
+
+*accept_cleanup(void *arg) {
+ int fd = (int) arg;
+ printf("cleanup/delete accept\n");
+ close(fd);
+ return (NULL);
+}
+
+*accept_routine(void *arg) {
+ int listener = (int) arg;
+
+ // pthread_cleanup_push(accept_cleanup, listener);
+
+ while(1) {
+ printf("accepting %d\n", listener);
+ int userfd = accept(listener, NULL, NULL);
+ if (userfd < 0) { perror("accept (in accept thread)"); return(NULL); }
+
+ printf("accepted socket %d\n", userfd);
+
+ pthread_t init_user_thread;
+ pthread_create(&init_user_thread, NULL, init_user_routine, userfd);
+ }
+
+ // pthread_cleanup_pop(1);
+}
+
+apply_timeout(int fd) {
+ // handle handshake
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = TCP_TIMEOUT;
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
+ perror("setsockopt (in apply_timeout)");
+ return -1;
+ }
+
+ return 1;
+}
+
+remove_timeout(int fd)
+{
+ // handle handshake
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) {
+ perror("setsockopt (in remove_timeout)");
+ return -1;
+ }
+
+ return 1;
+}
+
+int handle_handshake(int userfd) {
+ if (apply_timeout(userfd) == -1) { return -1; }
+
+ // get the command type
+ uint8_t command_type = -1;
+ if ((recv(userfd, &command_type, 1, 0)) < 0)
+ {
+ perror("recv (in init_user_routine)");
+ return -1;
+ }
+
+ // check
+ if (command_type != 0) {
+ printf("user on socket %d must send a Hello message first", userfd);
+ return -1;
+ }
+
+ // get the udp port
+ uint16_t udp_port = -1;
+ int bytes_to_read = sizeof(uint16_t);
+ if (recv_all(userfd, &udp_port, &bytes_to_read) == -1) {
+ perror("recv_all (in init_user_routine)");
+ return -1;
+ }
+ // remove timeout
+ if (remove_timeout(userfd) == -1) { return -1; }
+
+ printf("recieved HELLO command from socket %d\n", userfd);
+
+
+ // make udp socket
+ int udpfd = socket(AF_INET, SOCK_DGRAM, AI_PASSIVE);
+ if (udpfd < 0)
+ {
+ perror("socket (in init_user_routine UDP)");
+ return -1;
+ }
+
+ return udpfd;
+}
+
+send_welcome_reply(int fd) {
+ printf("sending WELCOME reply to socket %d\n", fd);
+
+ struct Welcome welcome;
+ welcome.replyType = 2;
+ printf("num_stations: %d\n", num_stations);
+ welcome.numStations = htons(num_stations);
+ int bytes_to_send = sizeof(struct Welcome);
+ if (send_all(fd, &welcome, &bytes_to_send) == -1) {
+ perror("send_all (in init_user_routine)");
+ return -1;
+ }
+
+ return 1;
+}
+
+*init_user_cleanup(void *arg) {
+ int fd = (int) arg;
+ // printf("cleanup/delete user_maybe?\n");
+ close(fd);
+ return (NULL);
+}
+
+*init_user_routine(void *arg) {
+ int userfd = (int) arg;
+ // pthread_cleanup_push(init_user_cleanup, userfd);
+
+ printf("new user on socket %d, waiting for HELLO\n", userfd);
+ int udpfd = handle_handshake(userfd);
+ if (udpfd == -1)
+ {
+ perror("handle_handshake (in init_user_routine)");
+ close(userfd);
+ return (NULL);
+ }
+
+ if(send_welcome_reply(userfd) == -1) {
+ perror("send_welcome_reply (in init_user_routine)");
+ close(userfd);
+ return (NULL);
+ }
+
+ if (init_user(userfd, udpfd) != 0) {
+ perror("init_user (in init_user_routine)");
+ destroy_user(userfd);
+ return (NULL);
+ }
+
+ return (NULL);
+
+ // pthread_cleanup_pop(0);
+}
+
+*command_cleanup(void *arg) {
+ int fd = (int) arg;
+ printf("cleanup/delete command\n");
+ close(fd);
+ return (NULL);
+}
+
+handle_setstation_command(int fd, uint16_t station_number) {
+ printf("received SETSTATION command from socket %d\n", fd);
+ // check if station number is valid
+ int station_num = ntohs(station_number);
+ if (station_num < 0 || station_num >= num_stations) {
+ printf("station number %d is invalid\n", station_num);
+ send_invalid_reply(fd, "station number is invalid");
+ return -1;
+ }
+
+ // set the station number
+ pthread_mutex_lock(&mutex_users);
+ printf("setting station number of user on socket %d to %d, user! %d\n", fd, station_num, fd_to_user[fd]);
+ users[fd_to_user[fd]].station = station_num;
+ pthread_mutex_unlock(&mutex_users);
+
+ print_users();
+
+ return 1;
+}
+
+*command_routine(void *arg) {
+ int fd = (int) arg;
+
+ printf("waiting for SETSTATION from socket %d\n", fd);
+
+ while(1) {
+ // get the command type
+ uint8_t command_type = -1;
+ if ((recv(fd, &command_type, 1, 0)) < 0)
+ {
+ perror("recv (in command_routine)");
+ destroy_user(fd);
+ return (NULL);
+ }
+
+ // check if have sent hello before
+ if (command_type == 0) {
+ printf("user on socket %d has already sent a HELLO command\n", fd);
+ send_invalid_reply(fd, "already sent a HELLO command");
+ destroy_user(fd);
+ return (NULL);
+ }
+
+ else if (command_type == 1) {
+ // get the station number
+ uint16_t station_number = -1;
+ int bytes_to_read = sizeof(uint16_t);
+ if (recv_all(fd, &station_number, &bytes_to_read) == -1) {
+ perror("recv_all (in command_routine)");
+ destroy_user(fd);
+ return (NULL);
+ }
+ if (handle_setstation_command(fd, station_number) == -1) {
+ destroy_user(fd);
+ return (NULL);
+ }
+ }
+
+ else if (command_type == 5) {
+ printf("user on socket %d has requested a LIST\n", fd);
+ }
+
+ else {
+ printf("user on socket %d has sent an INVALID command type of %d\n", fd, command_type);
+ send_invalid_reply(fd, "invalid command type");
+ destroy_user(fd);
+ return (NULL);
+ }
+
+
+ }
+}
+
+
+// HELPER FUNCTIONS
+
+int init_user(int sockfd, int udpfd) {
+ pthread_mutex_lock(&mutex_users);
+ printf("initializing user on sockets %d (tcp), %d (udp)\n", sockfd, udpfd);
+ // update map
+ if(sockfd > tcpfd_max) {
+ tcpfd_max = sockfd;
+ int * more_sockfd_to_user = realloc(fd_to_user, sizeof(int) * (tcpfd_max + 1));
+ if (!more_sockfd_to_user) { perror("realloc"); exit(1); }
+ fd_to_user = more_sockfd_to_user;
+ }
+
+ int running_index = 0;
+ while(running_index < num_users) {
+ if (users[running_index].tcpfd == -1) {
+ break;
+ }
+ running_index++;
+ }
+ if (running_index == num_users) {
+ // printf("reached max active users\n");
+ // printf("making new memory\n");
+ num_users++;
+ user_t *more_users = realloc(users, sizeof(user_t) * num_users);
+ if (!more_users) { perror("realloc"); exit(1); }
+ users = more_users;
+ }
+
+ // map TCP sockfd to this user index
+ users[running_index] = (user_t){sockfd, udpfd, -1, -1};
+ pthread_create(&users[running_index].command_thread, NULL, command_routine, sockfd);
+ fd_to_user[sockfd] = running_index;
+ // free(user_stream_threads);
+
+ print_users();
+ pthread_mutex_unlock(&mutex_users);
+
+ return 0;
+}
+
+void destroy_user(int fd) {
+ pthread_mutex_lock(&mutex_users);
+
+ printf("destroying user on sockets %d (tcp), %d (udp)\n", users[fd_to_user[fd]].tcpfd, users[fd_to_user[fd]].udpfd);
+
+ // close the sockets
+
+ close(fd);
+ close(users[fd_to_user[fd]].udpfd);
+ // stop the thread taking commands to the user
+ pthread_cancel(&users[fd_to_user[fd]].command_thread);
+ // "remove" the user from the list of user data
+ users[fd_to_user[fd]] = (user_t) {-1, -1, -1, -1};
+ // map sockfd to -1
+ fd_to_user[fd] = -1;
+ pthread_mutex_unlock(&mutex_users);
+}
+
+
+print_users() {
+ printf("num users: %d\n", num_users);
+ for (int i = 0; i < num_users; i++) {
+ printf("tcpfd %d , udpfd %d , station %d |\n", users[i].tcpfd, users[i].udpfd, users[i].station);
+ }
+ printf("\n");
+}
+
+// Parses a buffer into tokens, from cs33 :)
+int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) {
+ const char *regex = " \n\t\f\r";
+ char *current_token = strtok(buffer, regex);
+ if (current_token == NULL) return 0;
+
+ for (int i = 0; current_token != NULL; i++) {
+ tokens[i] = current_token;
+ current_token = strtok(NULL, regex);
+ }
+
+ return 1;
+} \ No newline at end of file