diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-23 17:30:45 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-23 17:30:45 -0400 |
commit | 3b2aa8c271bf5cd5497decb6577afe5fd7339f57 (patch) | |
tree | bc1d39ad76b15f58ddf61385645fa87a59fb1157 /new.c | |
parent | b417bcc57b9fd49f360087c32c97293a6bc7d2be (diff) | |
parent | 1e9ac5407ef4f2cddc745f35f33a860446526cea (diff) |
merge post-warmup with main
Diffstat (limited to 'new.c')
-rw-r--r-- | new.c | 497 |
1 files changed, 497 insertions, 0 deletions
@@ -0,0 +1,497 @@ +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/syscall.h> +#include <pthread.h> +#include <stdio.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> + +#include "protocol.c" + + + +#define LINE_MAX 1024 +#define MAX_RATE_PER_SECOND 16*1024 +#define TCP_TIMEOUT 100000 // 100ms in microseconds + +typedef struct station { + pthread_t thread; + int fd; + char *path; +} station_t; + +int num_stations; +station_t *stations; +setup_stations(int argc, char *argv[]); +*stream_routine(void *arg); + +int setup_listener(int port); +*accept_routine(void *arg); + +// user data structure +typedef struct user { + int tcpfd; + int udpfd; + pthread_t command_thread; + int station; +} user_t; +int num_users = 0; +user_t *users; +int tcpfd_max = 0; +int *fd_to_user; +pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER; +*init_user_routine(void *arg); +int init_user(int tcpfd, int udpfd); +void destroy_user(int fd); +*command_routine(void *arg); + +send_invalid_reply(int fd, char *message) { + printf("sending INVALID reply to socket %d\n", fd); + size_t message_size = strlen(message); + char buf[message_size + 2]; + // type and payload size + buf[0] = 4; + buf[1] = message_size; + memcpy(buf + 2, message, message_size); + + int size_buf = message_size + 2; + if (send_all(fd, &buf, &size_buf) == -1) { + perror("send_all (in init_user_routine)"); + return -1; + } + + return 1; +} + +print_users(); + +main(int argc, char *argv[]) { + if (argc < 3) { + printf("usage: ./snowcast_server <port> <file0> [file 1] [file 2] ...\n"); + exit(1); + } + + setup_stations(argc, argv); + + users = malloc(0); + fd_to_user = malloc(0); + + int port = atoi(argv[1]); + int listenerfd = setup_listener(port); + pthread_t accept_thread; + pthread_create(&accept_thread, NULL, accept_routine, listenerfd); + + + while(1) + ; +} + +setup_stations(int argc, char *argv[]) { + num_stations = argc - 2; + + // get the size to malloc + int totalSize = 0; + for(int i = 2; i < argc; i++) + { + // printf("file: %s\n", argv[i]); + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); + } + + // malloc the stations array + stations = malloc(totalSize); + if (!stations) { perror("malloc (stations pointer)"); exit(1); } + // assign the stations, and start the threads + for (int i = 0; i < num_stations; i++) { + stations[i].path = argv[i+2]; + stations[i].fd = open(argv[i+2], O_RDONLY); + printf(stations[i].path); + if (stations[i].fd < 0) { perror("read (from station file)"); exit(1); } + pthread_create(&stations[i].thread, NULL, stream_routine, &stations[i].fd); + } + + printf("successfully created %d stations\n", num_stations); +} + +int setup_listener(int port) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { perror("socket (listener)"); return -1; } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY; + + if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + perror("bind (listener)"); + return -1; + } + + if (listen(sock, 0) < 0) { perror("listen (listener)"); return -1; } + + return sock; +} + +// THREAD FUNCTIONS + +// helper +int read_file(int fd, char buffer[MAX_RATE_PER_SECOND]) { + int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file)"); return -1; } + // printf("bytes read: %d\n", bytes_read); + if (bytes_read == 0) { + // printf("end of file, restarting\n"); + if(lseek(fd, 0, SEEK_SET) == -1) { perror("lseek (in resarting file)"); return -1; } + bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; } + } + + return bytes_read; +} + +*stream_cleanup(void *arg) { + int fd = *(int*)arg; + printf("cleanup/delete station\n"); + return (NULL); +} + +*stream_routine(void *arg) { + int fd = *(int*)arg; + + pthread_cleanup_push(stream_cleanup, fd); + + // make buffer which will be used to stream to children + char buffer[MAX_RATE_PER_SECOND]; + memset(buffer, 0, MAX_RATE_PER_SECOND); + // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } + + for (;;) + { + // load bytes into buffer + int bytes_read = read_file(fd, buffer); + if (bytes_read == -1) { exit(1); } + + // TODO: send buffer to children + + sleep(1); + memset(buffer, 0, MAX_RATE_PER_SECOND); + } + + pthread_cleanup_pop(1); + + return (NULL); +} + +*accept_cleanup(void *arg) { + int fd = (int) arg; + printf("cleanup/delete accept\n"); + close(fd); + return (NULL); +} + +*accept_routine(void *arg) { + int listener = (int) arg; + + // pthread_cleanup_push(accept_cleanup, listener); + + while(1) { + printf("accepting %d\n", listener); + int userfd = accept(listener, NULL, NULL); + if (userfd < 0) { perror("accept (in accept thread)"); return(NULL); } + + printf("accepted socket %d\n", userfd); + + pthread_t init_user_thread; + pthread_create(&init_user_thread, NULL, init_user_routine, userfd); + } + + // pthread_cleanup_pop(1); +} + +apply_timeout(int fd) { + // handle handshake + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = TCP_TIMEOUT; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt (in apply_timeout)"); + return -1; + } + + return 1; +} + +remove_timeout(int fd) +{ + // handle handshake + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt (in remove_timeout)"); + return -1; + } + + return 1; +} + +int handle_handshake(int userfd) { + if (apply_timeout(userfd) == -1) { return -1; } + + // get the command type + uint8_t command_type = -1; + if ((recv(userfd, &command_type, 1, 0)) < 0) + { + perror("recv (in init_user_routine)"); + return -1; + } + + // check + if (command_type != 0) { + printf("user on socket %d must send a Hello message first", userfd); + return -1; + } + + // get the udp port + uint16_t udp_port = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(userfd, &udp_port, &bytes_to_read) == -1) { + perror("recv_all (in init_user_routine)"); + return -1; + } + // remove timeout + if (remove_timeout(userfd) == -1) { return -1; } + + printf("recieved HELLO command from socket %d\n", userfd); + + + // make udp socket + int udpfd = socket(AF_INET, SOCK_DGRAM, AI_PASSIVE); + if (udpfd < 0) + { + perror("socket (in init_user_routine UDP)"); + return -1; + } + + return udpfd; +} + +send_welcome_reply(int fd) { + printf("sending WELCOME reply to socket %d\n", fd); + + struct Welcome welcome; + welcome.replyType = 2; + printf("num_stations: %d\n", num_stations); + welcome.numStations = htons(num_stations); + int bytes_to_send = sizeof(struct Welcome); + if (send_all(fd, &welcome, &bytes_to_send) == -1) { + perror("send_all (in init_user_routine)"); + return -1; + } + + return 1; +} + +*init_user_cleanup(void *arg) { + int fd = (int) arg; + // printf("cleanup/delete user_maybe?\n"); + close(fd); + return (NULL); +} + +*init_user_routine(void *arg) { + int userfd = (int) arg; + // pthread_cleanup_push(init_user_cleanup, userfd); + + printf("new user on socket %d, waiting for HELLO\n", userfd); + int udpfd = handle_handshake(userfd); + if (udpfd == -1) + { + perror("handle_handshake (in init_user_routine)"); + close(userfd); + return (NULL); + } + + if(send_welcome_reply(userfd) == -1) { + perror("send_welcome_reply (in init_user_routine)"); + close(userfd); + return (NULL); + } + + if (init_user(userfd, udpfd) != 0) { + perror("init_user (in init_user_routine)"); + destroy_user(userfd); + return (NULL); + } + + return (NULL); + + // pthread_cleanup_pop(0); +} + +*command_cleanup(void *arg) { + int fd = (int) arg; + printf("cleanup/delete command\n"); + close(fd); + return (NULL); +} + +handle_setstation_command(int fd, uint16_t station_number) { + printf("received SETSTATION command from socket %d\n", fd); + // check if station number is valid + int station_num = ntohs(station_number); + if (station_num < 0 || station_num >= num_stations) { + printf("station number %d is invalid\n", station_num); + send_invalid_reply(fd, "station number is invalid"); + return -1; + } + + // set the station number + pthread_mutex_lock(&mutex_users); + printf("setting station number of user on socket %d to %d, user! %d\n", fd, station_num, fd_to_user[fd]); + users[fd_to_user[fd]].station = station_num; + pthread_mutex_unlock(&mutex_users); + + print_users(); + + return 1; +} + +*command_routine(void *arg) { + int fd = (int) arg; + + printf("waiting for SETSTATION from socket %d\n", fd); + + while(1) { + // get the command type + uint8_t command_type = -1; + if ((recv(fd, &command_type, 1, 0)) < 0) + { + perror("recv (in command_routine)"); + destroy_user(fd); + return (NULL); + } + + // check if have sent hello before + if (command_type == 0) { + printf("user on socket %d has already sent a HELLO command\n", fd); + send_invalid_reply(fd, "already sent a HELLO command"); + destroy_user(fd); + return (NULL); + } + + else if (command_type == 1) { + // get the station number + uint16_t station_number = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(fd, &station_number, &bytes_to_read) == -1) { + perror("recv_all (in command_routine)"); + destroy_user(fd); + return (NULL); + } + if (handle_setstation_command(fd, station_number) == -1) { + destroy_user(fd); + return (NULL); + } + } + + else if (command_type == 5) { + printf("user on socket %d has requested a LIST\n", fd); + } + + else { + printf("user on socket %d has sent an INVALID command type of %d\n", fd, command_type); + send_invalid_reply(fd, "invalid command type"); + destroy_user(fd); + return (NULL); + } + + + } +} + + +// HELPER FUNCTIONS + +int init_user(int sockfd, int udpfd) { + pthread_mutex_lock(&mutex_users); + printf("initializing user on sockets %d (tcp), %d (udp)\n", sockfd, udpfd); + // update map + if(sockfd > tcpfd_max) { + tcpfd_max = sockfd; + int * more_sockfd_to_user = realloc(fd_to_user, sizeof(int) * (tcpfd_max + 1)); + if (!more_sockfd_to_user) { perror("realloc"); exit(1); } + fd_to_user = more_sockfd_to_user; + } + + int running_index = 0; + while(running_index < num_users) { + if (users[running_index].tcpfd == -1) { + break; + } + running_index++; + } + if (running_index == num_users) { + // printf("reached max active users\n"); + // printf("making new memory\n"); + num_users++; + user_t *more_users = realloc(users, sizeof(user_t) * num_users); + if (!more_users) { perror("realloc"); exit(1); } + users = more_users; + } + + // map TCP sockfd to this user index + users[running_index] = (user_t){sockfd, udpfd, -1, -1}; + pthread_create(&users[running_index].command_thread, NULL, command_routine, sockfd); + fd_to_user[sockfd] = running_index; + // free(user_stream_threads); + + print_users(); + pthread_mutex_unlock(&mutex_users); + + return 0; +} + +void destroy_user(int fd) { + pthread_mutex_lock(&mutex_users); + + printf("destroying user on sockets %d (tcp), %d (udp)\n", users[fd_to_user[fd]].tcpfd, users[fd_to_user[fd]].udpfd); + + // close the sockets + + close(fd); + close(users[fd_to_user[fd]].udpfd); + // stop the thread taking commands to the user + pthread_cancel(&users[fd_to_user[fd]].command_thread); + // "remove" the user from the list of user data + users[fd_to_user[fd]] = (user_t) {-1, -1, -1, -1}; + // map sockfd to -1 + fd_to_user[fd] = -1; + pthread_mutex_unlock(&mutex_users); +} + + +print_users() { + printf("num users: %d\n", num_users); + for (int i = 0; i < num_users; i++) { + printf("tcpfd %d , udpfd %d , station %d |\n", users[i].tcpfd, users[i].udpfd, users[i].station); + } + printf("\n"); +} + +// Parses a buffer into tokens, from cs33 :) +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { + const char *regex = " \n\t\f\r"; + char *current_token = strtok(buffer, regex); + if (current_token == NULL) return 0; + + for (int i = 0; current_token != NULL; i++) { + tokens[i] = current_token; + current_token = strtok(NULL, regex); + } + + return 1; +}
\ No newline at end of file |