#include #include #include #include #include #include #include #include #include #include #include #include "protocol.c" #define LINE_MAX 1024 #define MAX_RATE_PER_SECOND 16*1024 #define TCP_TIMEOUT 100000 // 100ms in microseconds typedef struct station { pthread_t thread; int fd; char *path; } station_t; int num_stations; station_t *stations; setup_stations(int argc, char *argv[]); *stream_routine(void *arg); int setup_listener(int port); *accept_routine(void *arg); // user data structure typedef struct user { int tcpfd; int udpfd; pthread_t command_thread; int station; } user_t; int num_users = 0; user_t *users; int tcpfd_max = 0; int *fd_to_user; pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER; *init_user_routine(void *arg); int init_user(int tcpfd, int udpfd); void destroy_user(int fd); *command_routine(void *arg); send_invalid_reply(int fd, char *message) { printf("sending INVALID reply to socket %d\n", fd); size_t message_size = strlen(message); char buf[message_size + 2]; // type and payload size buf[0] = 4; buf[1] = message_size; memcpy(buf + 2, message, message_size); int size_buf = message_size + 2; if (send_all(fd, &buf, &size_buf) == -1) { perror("send_all (in init_user_routine)"); return -1; } return 1; } print_users(); main(int argc, char *argv[]) { if (argc < 3) { printf("usage: ./snowcast_server [file 1] [file 2] ...\n"); exit(1); } setup_stations(argc, argv); users = malloc(0); fd_to_user = malloc(0); int port = atoi(argv[1]); int listenerfd = setup_listener(port); pthread_t accept_thread; pthread_create(&accept_thread, NULL, accept_routine, listenerfd); while(1) ; } setup_stations(int argc, char *argv[]) { num_stations = argc - 2; // get the size to malloc int totalSize = 0; for(int i = 2; i < argc; i++) { // printf("file: %s\n", argv[i]); totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); } // malloc the stations array stations = malloc(totalSize); if (!stations) { perror("malloc (stations pointer)"); exit(1); } // assign the stations, and start the threads for (int i = 0; i < num_stations; i++) { stations[i].path = argv[i+2]; stations[i].fd = open(argv[i+2], O_RDONLY); printf(stations[i].path); if (stations[i].fd < 0) { perror("read (from station file)"); exit(1); } pthread_create(&stations[i].thread, NULL, stream_routine, &stations[i].fd); } printf("successfully created %d stations\n", num_stations); } int setup_listener(int port) { int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock < 0) { perror("socket (listener)"); return -1; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = INADDR_ANY; if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { perror("bind (listener)"); return -1; } if (listen(sock, 0) < 0) { perror("listen (listener)"); return -1; } return sock; } // THREAD FUNCTIONS // helper int read_file(int fd, char buffer[MAX_RATE_PER_SECOND]) { int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); if (bytes_read < 0) { perror("read (from station file)"); return -1; } // printf("bytes read: %d\n", bytes_read); if (bytes_read == 0) { // printf("end of file, restarting\n"); if(lseek(fd, 0, SEEK_SET) == -1) { perror("lseek (in resarting file)"); return -1; } bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; } } return bytes_read; } *stream_cleanup(void *arg) { int fd = *(int*)arg; printf("cleanup/delete station\n"); return (NULL); } *stream_routine(void *arg) { int fd = *(int*)arg; pthread_cleanup_push(stream_cleanup, fd); // make buffer which will be used to stream to children char buffer[MAX_RATE_PER_SECOND]; memset(buffer, 0, MAX_RATE_PER_SECOND); // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } for (;;) { // load bytes into buffer int bytes_read = read_file(fd, buffer); if (bytes_read == -1) { exit(1); } // TODO: send buffer to children sleep(1); memset(buffer, 0, MAX_RATE_PER_SECOND); } pthread_cleanup_pop(1); return (NULL); } *accept_cleanup(void *arg) { int fd = (int) arg; printf("cleanup/delete accept\n"); close(fd); return (NULL); } *accept_routine(void *arg) { int listener = (int) arg; // pthread_cleanup_push(accept_cleanup, listener); while(1) { printf("accepting %d\n", listener); int userfd = accept(listener, NULL, NULL); if (userfd < 0) { perror("accept (in accept thread)"); return(NULL); } printf("accepted socket %d\n", userfd); pthread_t init_user_thread; pthread_create(&init_user_thread, NULL, init_user_routine, userfd); } // pthread_cleanup_pop(1); } apply_timeout(int fd) { // handle handshake struct timeval tv; tv.tv_sec = 0; tv.tv_usec = TCP_TIMEOUT; if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { perror("setsockopt (in apply_timeout)"); return -1; } return 1; } remove_timeout(int fd) { // handle handshake struct timeval tv; tv.tv_sec = 0; tv.tv_usec = 0; if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { perror("setsockopt (in remove_timeout)"); return -1; } return 1; } int handle_handshake(int userfd) { if (apply_timeout(userfd) == -1) { return -1; } // get the command type uint8_t command_type = -1; if ((recv(userfd, &command_type, 1, 0)) < 0) { perror("recv (in init_user_routine)"); return -1; } // check if (command_type != 0) { printf("user on socket %d must send a Hello message first", userfd); return -1; } // get the udp port uint16_t udp_port = -1; int bytes_to_read = sizeof(uint16_t); if (recv_all(userfd, &udp_port, &bytes_to_read) == -1) { perror("recv_all (in init_user_routine)"); return -1; } // remove timeout if (remove_timeout(userfd) == -1) { return -1; } printf("recieved HELLO command from socket %d\n", userfd); // make udp socket int udpfd = socket(AF_INET, SOCK_DGRAM, AI_PASSIVE); if (udpfd < 0) { perror("socket (in init_user_routine UDP)"); return -1; } return udpfd; } send_welcome_reply(int fd) { printf("sending WELCOME reply to socket %d\n", fd); struct Welcome welcome; welcome.replyType = 2; printf("num_stations: %d\n", num_stations); welcome.numStations = htons(num_stations); int bytes_to_send = sizeof(struct Welcome); if (send_all(fd, &welcome, &bytes_to_send) == -1) { perror("send_all (in init_user_routine)"); return -1; } return 1; } *init_user_cleanup(void *arg) { int fd = (int) arg; // printf("cleanup/delete user_maybe?\n"); close(fd); return (NULL); } *init_user_routine(void *arg) { int userfd = (int) arg; // pthread_cleanup_push(init_user_cleanup, userfd); printf("new user on socket %d, waiting for HELLO\n", userfd); int udpfd = handle_handshake(userfd); if (udpfd == -1) { perror("handle_handshake (in init_user_routine)"); close(userfd); return (NULL); } if(send_welcome_reply(userfd) == -1) { perror("send_welcome_reply (in init_user_routine)"); close(userfd); return (NULL); } if (init_user(userfd, udpfd) != 0) { perror("init_user (in init_user_routine)"); destroy_user(userfd); return (NULL); } return (NULL); // pthread_cleanup_pop(0); } *command_cleanup(void *arg) { int fd = (int) arg; printf("cleanup/delete command\n"); close(fd); return (NULL); } handle_setstation_command(int fd, uint16_t station_number) { printf("received SETSTATION command from socket %d\n", fd); // check if station number is valid int station_num = ntohs(station_number); if (station_num < 0 || station_num >= num_stations) { printf("station number %d is invalid\n", station_num); send_invalid_reply(fd, "station number is invalid"); return -1; } // set the station number pthread_mutex_lock(&mutex_users); printf("setting station number of user on socket %d to %d, user! %d\n", fd, station_num, fd_to_user[fd]); users[fd_to_user[fd]].station = station_num; pthread_mutex_unlock(&mutex_users); print_users(); return 1; } *command_routine(void *arg) { int fd = (int) arg; printf("waiting for SETSTATION from socket %d\n", fd); while(1) { // get the command type uint8_t command_type = -1; if ((recv(fd, &command_type, 1, 0)) < 0) { perror("recv (in command_routine)"); destroy_user(fd); return (NULL); } // check if have sent hello before if (command_type == 0) { printf("user on socket %d has already sent a HELLO command\n", fd); send_invalid_reply(fd, "already sent a HELLO command"); destroy_user(fd); return (NULL); } else if (command_type == 1) { // get the station number uint16_t station_number = -1; int bytes_to_read = sizeof(uint16_t); if (recv_all(fd, &station_number, &bytes_to_read) == -1) { perror("recv_all (in command_routine)"); destroy_user(fd); return (NULL); } if (handle_setstation_command(fd, station_number) == -1) { destroy_user(fd); return (NULL); } } else if (command_type == 5) { printf("user on socket %d has requested a LIST\n", fd); } else { printf("user on socket %d has sent an INVALID command type of %d\n", fd, command_type); send_invalid_reply(fd, "invalid command type"); destroy_user(fd); return (NULL); } } } // HELPER FUNCTIONS int init_user(int sockfd, int udpfd) { pthread_mutex_lock(&mutex_users); printf("initializing user on sockets %d (tcp), %d (udp)\n", sockfd, udpfd); // update map if(sockfd > tcpfd_max) { tcpfd_max = sockfd; int * more_sockfd_to_user = realloc(fd_to_user, sizeof(int) * (tcpfd_max + 1)); if (!more_sockfd_to_user) { perror("realloc"); exit(1); } fd_to_user = more_sockfd_to_user; } int running_index = 0; while(running_index < num_users) { if (users[running_index].tcpfd == -1) { break; } running_index++; } if (running_index == num_users) { // printf("reached max active users\n"); // printf("making new memory\n"); num_users++; user_t *more_users = realloc(users, sizeof(user_t) * num_users); if (!more_users) { perror("realloc"); exit(1); } users = more_users; } // map TCP sockfd to this user index users[running_index] = (user_t){sockfd, udpfd, -1, -1}; pthread_create(&users[running_index].command_thread, NULL, command_routine, sockfd); fd_to_user[sockfd] = running_index; // free(user_stream_threads); print_users(); pthread_mutex_unlock(&mutex_users); return 0; } void destroy_user(int fd) { pthread_mutex_lock(&mutex_users); printf("destroying user on sockets %d (tcp), %d (udp)\n", users[fd_to_user[fd]].tcpfd, users[fd_to_user[fd]].udpfd); // close the sockets close(fd); close(users[fd_to_user[fd]].udpfd); // stop the thread taking commands to the user pthread_cancel(&users[fd_to_user[fd]].command_thread); // "remove" the user from the list of user data users[fd_to_user[fd]] = (user_t) {-1, -1, -1, -1}; // map sockfd to -1 fd_to_user[fd] = -1; pthread_mutex_unlock(&mutex_users); } print_users() { printf("num users: %d\n", num_users); for (int i = 0; i < num_users; i++) { printf("tcpfd %d , udpfd %d , station %d |\n", users[i].tcpfd, users[i].udpfd, users[i].station); } printf("\n"); } // Parses a buffer into tokens, from cs33 :) int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { const char *regex = " \n\t\f\r"; char *current_token = strtok(buffer, regex); if (current_token == NULL) return 0; for (int i = 0; current_token != NULL; i++) { tokens[i] = current_token; current_token = strtok(NULL, regex); } return 1; }