diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-23 07:08:28 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-23 07:08:28 -0400 |
commit | d93da5af53d6beb9a2339839aa47fbbbbeafc208 (patch) | |
tree | 7f9d2013845137979e0805b96585ae54f9cdb47f | |
parent | 2707112ffc0b0eed6af8271d32f94e1622203a80 (diff) |
bitrate still low, but pass the no read from file test
-rw-r--r-- | Makefile | 5 | ||||
-rwxr-xr-x | new | bin | 0 -> 55440 bytes | |||
-rw-r--r-- | new.c | 497 | ||||
-rw-r--r-- | new.dSYM/Contents/Info.plist | 20 | ||||
-rw-r--r-- | new.dSYM/Contents/Resources/DWARF/new | bin | 0 -> 19866 bytes | |||
-rw-r--r-- | server.c | 407 | ||||
-rwxr-xr-x | snowcast_server | bin | 44944 -> 45200 bytes | |||
-rw-r--r-- | snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server | bin | 27442 -> 27865 bytes |
8 files changed, 759 insertions, 170 deletions
@@ -17,4 +17,7 @@ client: # $(CC) $(CFLAGS) -o c client.c clean: - rm -fv snowcast_server snowcast_control snowcast_listener
\ No newline at end of file + rm -fv snowcast_server snowcast_control snowcast_listener + +new: + $(CC) $(CFLAGS) -o snowcast_server new.c Binary files differ@@ -0,0 +1,497 @@ +#include <string.h> +#include <stdlib.h> +#include <unistd.h> +#include <sys/syscall.h> +#include <pthread.h> +#include <stdio.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> + +#include "protocol.c" + + + +#define LINE_MAX 1024 +#define MAX_RATE_PER_SECOND 16*1024 +#define TCP_TIMEOUT 100000 // 100ms in microseconds + +typedef struct station { + pthread_t thread; + int fd; + char *path; +} station_t; + +int num_stations; +station_t *stations; +setup_stations(int argc, char *argv[]); +*stream_routine(void *arg); + +int setup_listener(int port); +*accept_routine(void *arg); + +// user data structure +typedef struct user { + int tcpfd; + int udpfd; + pthread_t command_thread; + int station; +} user_t; +int num_users = 0; +user_t *users; +int tcpfd_max = 0; +int *fd_to_user; +pthread_mutex_t mutex_users = PTHREAD_MUTEX_INITIALIZER; +*init_user_routine(void *arg); +int init_user(int tcpfd, int udpfd); +void destroy_user(int fd); +*command_routine(void *arg); + +send_invalid_reply(int fd, char *message) { + printf("sending INVALID reply to socket %d\n", fd); + size_t message_size = strlen(message); + char buf[message_size + 2]; + // type and payload size + buf[0] = 4; + buf[1] = message_size; + memcpy(buf + 2, message, message_size); + + int size_buf = message_size + 2; + if (send_all(fd, &buf, &size_buf) == -1) { + perror("send_all (in init_user_routine)"); + return -1; + } + + return 1; +} + +print_users(); + +main(int argc, char *argv[]) { + if (argc < 3) { + printf("usage: ./snowcast_server <port> <file0> [file 1] [file 2] ...\n"); + exit(1); + } + + setup_stations(argc, argv); + + users = malloc(0); + fd_to_user = malloc(0); + + int port = atoi(argv[1]); + int listenerfd = setup_listener(port); + pthread_t accept_thread; + pthread_create(&accept_thread, NULL, accept_routine, listenerfd); + + + while(1) + ; +} + +setup_stations(int argc, char *argv[]) { + num_stations = argc - 2; + + // get the size to malloc + int totalSize = 0; + for(int i = 2; i < argc; i++) + { + // printf("file: %s\n", argv[i]); + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); + } + + // malloc the stations array + stations = malloc(totalSize); + if (!stations) { perror("malloc (stations pointer)"); exit(1); } + // assign the stations, and start the threads + for (int i = 0; i < num_stations; i++) { + stations[i].path = argv[i+2]; + stations[i].fd = open(argv[i+2], O_RDONLY); + printf(stations[i].path); + if (stations[i].fd < 0) { perror("read (from station file)"); exit(1); } + pthread_create(&stations[i].thread, NULL, stream_routine, &stations[i].fd); + } + + printf("successfully created %d stations\n", num_stations); +} + +int setup_listener(int port) { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { perror("socket (listener)"); return -1; } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = INADDR_ANY; + + if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + perror("bind (listener)"); + return -1; + } + + if (listen(sock, 0) < 0) { perror("listen (listener)"); return -1; } + + return sock; +} + +// THREAD FUNCTIONS + +// helper +int read_file(int fd, char buffer[MAX_RATE_PER_SECOND]) { + int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file)"); return -1; } + // printf("bytes read: %d\n", bytes_read); + if (bytes_read == 0) { + // printf("end of file, restarting\n"); + if(lseek(fd, 0, SEEK_SET) == -1) { perror("lseek (in resarting file)"); return -1; } + bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; } + } + + return bytes_read; +} + +*stream_cleanup(void *arg) { + int fd = *(int*)arg; + printf("cleanup/delete station\n"); + return (NULL); +} + +*stream_routine(void *arg) { + int fd = *(int*)arg; + + pthread_cleanup_push(stream_cleanup, fd); + + // make buffer which will be used to stream to children + char buffer[MAX_RATE_PER_SECOND]; + memset(buffer, 0, MAX_RATE_PER_SECOND); + // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } + + for (;;) + { + // load bytes into buffer + int bytes_read = read_file(fd, buffer); + if (bytes_read == -1) { exit(1); } + + // TODO: send buffer to children + + sleep(1); + memset(buffer, 0, MAX_RATE_PER_SECOND); + } + + pthread_cleanup_pop(1); + + return (NULL); +} + +*accept_cleanup(void *arg) { + int fd = (int) arg; + printf("cleanup/delete accept\n"); + close(fd); + return (NULL); +} + +*accept_routine(void *arg) { + int listener = (int) arg; + + // pthread_cleanup_push(accept_cleanup, listener); + + while(1) { + printf("accepting %d\n", listener); + int userfd = accept(listener, NULL, NULL); + if (userfd < 0) { perror("accept (in accept thread)"); return(NULL); } + + printf("accepted socket %d\n", userfd); + + pthread_t init_user_thread; + pthread_create(&init_user_thread, NULL, init_user_routine, userfd); + } + + // pthread_cleanup_pop(1); +} + +apply_timeout(int fd) { + // handle handshake + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = TCP_TIMEOUT; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt (in apply_timeout)"); + return -1; + } + + return 1; +} + +remove_timeout(int fd) +{ + // handle handshake + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) < 0) { + perror("setsockopt (in remove_timeout)"); + return -1; + } + + return 1; +} + +int handle_handshake(int userfd) { + if (apply_timeout(userfd) == -1) { return -1; } + + // get the command type + uint8_t command_type = -1; + if ((recv(userfd, &command_type, 1, 0)) < 0) + { + perror("recv (in init_user_routine)"); + return -1; + } + + // check + if (command_type != 0) { + printf("user on socket %d must send a Hello message first", userfd); + return -1; + } + + // get the udp port + uint16_t udp_port = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(userfd, &udp_port, &bytes_to_read) == -1) { + perror("recv_all (in init_user_routine)"); + return -1; + } + // remove timeout + if (remove_timeout(userfd) == -1) { return -1; } + + printf("recieved HELLO command from socket %d\n", userfd); + + + // make udp socket + int udpfd = socket(AF_INET, SOCK_DGRAM, AI_PASSIVE); + if (udpfd < 0) + { + perror("socket (in init_user_routine UDP)"); + return -1; + } + + return udpfd; +} + +send_welcome_reply(int fd) { + printf("sending WELCOME reply to socket %d\n", fd); + + struct Welcome welcome; + welcome.replyType = 2; + printf("num_stations: %d\n", num_stations); + welcome.numStations = htons(num_stations); + int bytes_to_send = sizeof(struct Welcome); + if (send_all(fd, &welcome, &bytes_to_send) == -1) { + perror("send_all (in init_user_routine)"); + return -1; + } + + return 1; +} + +*init_user_cleanup(void *arg) { + int fd = (int) arg; + // printf("cleanup/delete user_maybe?\n"); + close(fd); + return (NULL); +} + +*init_user_routine(void *arg) { + int userfd = (int) arg; + // pthread_cleanup_push(init_user_cleanup, userfd); + + printf("new user on socket %d, waiting for HELLO\n", userfd); + int udpfd = handle_handshake(userfd); + if (udpfd == -1) + { + perror("handle_handshake (in init_user_routine)"); + close(userfd); + return (NULL); + } + + if(send_welcome_reply(userfd) == -1) { + perror("send_welcome_reply (in init_user_routine)"); + close(userfd); + return (NULL); + } + + if (init_user(userfd, udpfd) != 0) { + perror("init_user (in init_user_routine)"); + destroy_user(userfd); + return (NULL); + } + + return (NULL); + + // pthread_cleanup_pop(0); +} + +*command_cleanup(void *arg) { + int fd = (int) arg; + printf("cleanup/delete command\n"); + close(fd); + return (NULL); +} + +handle_setstation_command(int fd, uint16_t station_number) { + printf("received SETSTATION command from socket %d\n", fd); + // check if station number is valid + int station_num = ntohs(station_number); + if (station_num < 0 || station_num >= num_stations) { + printf("station number %d is invalid\n", station_num); + send_invalid_reply(fd, "station number is invalid"); + return -1; + } + + // set the station number + pthread_mutex_lock(&mutex_users); + printf("setting station number of user on socket %d to %d, user! %d\n", fd, station_num, fd_to_user[fd]); + users[fd_to_user[fd]].station = station_num; + pthread_mutex_unlock(&mutex_users); + + print_users(); + + return 1; +} + +*command_routine(void *arg) { + int fd = (int) arg; + + printf("waiting for SETSTATION from socket %d\n", fd); + + while(1) { + // get the command type + uint8_t command_type = -1; + if ((recv(fd, &command_type, 1, 0)) < 0) + { + perror("recv (in command_routine)"); + destroy_user(fd); + return (NULL); + } + + // check if have sent hello before + if (command_type == 0) { + printf("user on socket %d has already sent a HELLO command\n", fd); + send_invalid_reply(fd, "already sent a HELLO command"); + destroy_user(fd); + return (NULL); + } + + else if (command_type == 1) { + // get the station number + uint16_t station_number = -1; + int bytes_to_read = sizeof(uint16_t); + if (recv_all(fd, &station_number, &bytes_to_read) == -1) { + perror("recv_all (in command_routine)"); + destroy_user(fd); + return (NULL); + } + if (handle_setstation_command(fd, station_number) == -1) { + destroy_user(fd); + return (NULL); + } + } + + else if (command_type == 5) { + printf("user on socket %d has requested a LIST\n", fd); + } + + else { + printf("user on socket %d has sent an INVALID command type of %d\n", fd, command_type); + send_invalid_reply(fd, "invalid command type"); + destroy_user(fd); + return (NULL); + } + + + } +} + + +// HELPER FUNCTIONS + +int init_user(int sockfd, int udpfd) { + pthread_mutex_lock(&mutex_users); + printf("initializing user on sockets %d (tcp), %d (udp)\n", sockfd, udpfd); + // update map + if(sockfd > tcpfd_max) { + tcpfd_max = sockfd; + int * more_sockfd_to_user = realloc(fd_to_user, sizeof(int) * (tcpfd_max + 1)); + if (!more_sockfd_to_user) { perror("realloc"); exit(1); } + fd_to_user = more_sockfd_to_user; + } + + int running_index = 0; + while(running_index < num_users) { + if (users[running_index].tcpfd == -1) { + break; + } + running_index++; + } + if (running_index == num_users) { + // printf("reached max active users\n"); + // printf("making new memory\n"); + num_users++; + user_t *more_users = realloc(users, sizeof(user_t) * num_users); + if (!more_users) { perror("realloc"); exit(1); } + users = more_users; + } + + // map TCP sockfd to this user index + users[running_index] = (user_t){sockfd, udpfd, -1, -1}; + pthread_create(&users[running_index].command_thread, NULL, command_routine, sockfd); + fd_to_user[sockfd] = running_index; + // free(user_stream_threads); + + print_users(); + pthread_mutex_unlock(&mutex_users); + + return 0; +} + +void destroy_user(int fd) { + pthread_mutex_lock(&mutex_users); + + printf("destroying user on sockets %d (tcp), %d (udp)\n", users[fd_to_user[fd]].tcpfd, users[fd_to_user[fd]].udpfd); + + // close the sockets + + close(fd); + close(users[fd_to_user[fd]].udpfd); + // stop the thread taking commands to the user + pthread_cancel(&users[fd_to_user[fd]].command_thread); + // "remove" the user from the list of user data + users[fd_to_user[fd]] = (user_t) {-1, -1, -1, -1}; + // map sockfd to -1 + fd_to_user[fd] = -1; + pthread_mutex_unlock(&mutex_users); +} + + +print_users() { + printf("num users: %d\n", num_users); + for (int i = 0; i < num_users; i++) { + printf("tcpfd %d , udpfd %d , station %d |\n", users[i].tcpfd, users[i].udpfd, users[i].station); + } + printf("\n"); +} + +// Parses a buffer into tokens, from cs33 :) +int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { + const char *regex = " \n\t\f\r"; + char *current_token = strtok(buffer, regex); + if (current_token == NULL) return 0; + + for (int i = 0; current_token != NULL; i++) { + tokens[i] = current_token; + current_token = strtok(NULL, regex); + } + + return 1; +}
\ No newline at end of file diff --git a/new.dSYM/Contents/Info.plist b/new.dSYM/Contents/Info.plist new file mode 100644 index 0000000..ba6d460 --- /dev/null +++ b/new.dSYM/Contents/Info.plist @@ -0,0 +1,20 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd"> +<plist version="1.0"> + <dict> + <key>CFBundleDevelopmentRegion</key> + <string>English</string> + <key>CFBundleIdentifier</key> + <string>com.apple.xcode.dsym.new</string> + <key>CFBundleInfoDictionaryVersion</key> + <string>6.0</string> + <key>CFBundlePackageType</key> + <string>dSYM</string> + <key>CFBundleSignature</key> + <string>????</string> + <key>CFBundleShortVersionString</key> + <string>1.0</string> + <key>CFBundleVersion</key> + <string>1</string> + </dict> +</plist> diff --git a/new.dSYM/Contents/Resources/DWARF/new b/new.dSYM/Contents/Resources/DWARF/new Binary files differnew file mode 100644 index 0000000..3200cf1 --- /dev/null +++ b/new.dSYM/Contents/Resources/DWARF/new @@ -17,20 +17,29 @@ #define LINE_MAX 1024 #define MAX_USERS 1000 #define MAX_PATH 50 -#define MAX_STREAM_RATE 16*1024 - +#define MAX_RATE_PER_SECOND 16*1024 + +// typedef struct station { +// int streamFd; +// char* filePath; +// int fileBufferSize; +// char fileBuffer[MAX_STREAM_RATE]; +// } station_t; typedef struct station { - int streamFd; - char* filePath; - int fileBufferSize; - char fileBuffer[MAX_STREAM_RATE]; + pthread_t streamThread; + int readfd; + char *filePath; } station_t; +int num_stations; +station_t *stations; +int setup_stations(int argc, char *argv[]); +void *stream_routine(void *arg); + typedef struct user { int udpPort; int stationNum; int sockfd; - pthread_t streamThread; } user_t; @@ -43,7 +52,7 @@ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER; const char *port; -int num_stations; +// int num_stations; int start_threads = 0; int max_active_users = 0; @@ -55,14 +64,20 @@ user_t *user_data; int *sockfd_to_user; // stations array pointer -station_t *station_data; +// station_t *station_data; + +struct udp_packet_routine_args { + int user_index; + int buffer_size; + char *file_buffer; +}; void *send_udp_packet_routine(void* arg); -void *select_thread(void* arg); -void *synchronization_thread(void* arg); +void *select_routine(void* arg); +void *sync_routine(void* arg); +void *send_announce_routine(void* arg); void init_station(int station_num, const char *station_name); -void seek_stations(int station_num); int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); void *print_info_routine(void *arg); @@ -88,41 +103,30 @@ main(int argc, char *argv[]) exit(1); } + // initizlize the port port = argv[1]; - num_stations = argc - 2; - - // init stations - size_t totalSize = 0; - // get size to malloc - for (int i = 2; i < argc; i++) - { - // printf("file: %s\n", argv[i]); - // each "station" has a fd (int), filePath (char*), file_buffer_size (int), buffer_size (MAX_STREAM_RATE) - totalSize += sizeof(int) + strlen(argv[i]) + sizeof(int) + MAX_STREAM_RATE; - } - station_data = malloc(totalSize); - if (!station_data) { perror("malloc station data"); return 1; } - // assign the stations - for (int i = 2; i < argc; i++) - { - init_station(i - 2, argv[i]); + // initialize the stations & their threads + if (setup_stations(argc, argv) == -1) { + perror("setup_stations"); + exit(1); } // make array of user data + printf("max active users: %d\n", sizeof(user_t) * max_active_users); user_data = malloc(sizeof(user_t) * max_active_users); if (!user_data) { perror("malloc userdata"); return 1; } sockfd_to_user = malloc(sizeof(int) * max_active_users); if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; } // make and start "select" thread that manages: - // 1) new connections, 2) requests from current connections, 3)cloing connections - pthread_t s_thread; - pthread_create(&s_thread, NULL, select_thread, NULL); + // 1) new connections, 2) requests from current connections, 3) closing connections + pthread_t select_thread; + pthread_create(&select_thread, NULL, select_routine, NULL); // start syncchronization thread to broadcast stations - pthread_t sync_thread; - pthread_create(&sync_thread, NULL, synchronization_thread, NULL); + // pthread_t sync_thread; + // pthread_create(&sync_thread, NULL, sync_routine, NULL); // command line interface char input[LINE_MAX]; @@ -179,6 +183,110 @@ main(int argc, char *argv[]) return 0; } +int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) { + int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file)"); return -1; } + // printf("bytes read: %d\n", bytes_read); + if (bytes_read == 0) { + // printf("end of file, restarting\n"); + pthread_t send_announce_thread; + pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num); + + if (lseek(fd, 0, SEEK_SET) == -1) + { + perror("lseek (in resarting file)"); + return -1; + } + bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND); + if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; } + } + + return bytes_read; +} + +void *stream_routine_cleanup(void *arg) { + int read_fd = (int) arg; + close(read_fd); +} + +void *stream_routine(void *arg) { + int station_num = (int) arg; + printf("stream routine %d\n", station_num); + int read_fd = stations[station_num].readfd; + + pthread_cleanup_push(stream_routine_cleanup, read_fd); + + // make buffer which will be used to stream to children + char buffer[MAX_RATE_PER_SECOND]; + memset(buffer, 0, MAX_RATE_PER_SECOND); + // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } + + for (;;) + { + // load bytes into buffer + int bytes_read = read_file(read_fd, buffer, station_num); + if (bytes_read == -1) { exit(1); } + + // TODO: send buffer to children + char *send_buffer = malloc(2 + bytes_read); + for (int i = 0; i < max_active_users; i++) + { + if (!user_data[i].sockfd || user_data[i].sockfd == -1) + continue; + if (user_data[i].stationNum == station_num) + { + // send the udp packet + int *send_buffer = malloc(2 + bytes_read); + memset(send_buffer, 0, 2 + bytes_read); + send_buffer[0] = i; + send_buffer[1] = bytes_read; + memcpy(send_buffer+2, buffer, bytes_read); + // printf("sending udp packet to user %d\n", i); + pthread_t t; + pthread_create(&t, NULL, send_udp_packet_routine, send_buffer); + } + } + free(send_buffer); + usleep(1000000-5000); + start_threads = 1; + pthread_cond_broadcast(&cond); + + usleep(5000); + start_threads = 0; + + memset(buffer, 0, MAX_RATE_PER_SECOND); + } + + return (NULL); + + pthread_cleanup_pop(1); +} + +int setup_stations(int argc, char *argv[]) { + num_stations = argc - 2; + + // get the size to malloc + int totalSize = 0; + for(int i = 2; i < argc; i++) + { + totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]); + } + + // malloc the stations array + stations = malloc(totalSize); + if (!stations) { perror("malloc (stations pointer)"); return -1; } + // assign the stations, and start the threads + for (int i = 0; i < num_stations; i++) { + stations[i].filePath = argv[i+2]; + stations[i].readfd = open(argv[i+2], O_RDONLY); + if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; } + pthread_create(&stations[i].streamThread, NULL, stream_routine, i); + } + + printf("successfully created %d stations\n", num_stations); + return 1; +} + void write_int_to_fd(int fd, int n) { int l = snprintf(NULL, 0, "%d", n); char *num = malloc(l + 1); @@ -203,7 +311,7 @@ void *print_info_routine(void *arg) { write(print_fd, comma, strlen(comma)); // write file path - char* file_path = station_data[i].filePath; + char* file_path = stations[i].filePath; write(print_fd, file_path, strlen(file_path)); for (int j = 0; j < max_active_users; j++) { @@ -253,8 +361,15 @@ void udp_port_cleanup_handler(void *arg) /* Make the manager routine */ void *send_udp_packet_routine(void *arg) { + printf("send udp packet routine\n"); + int *buf = arg; // unpack args - int user_index = (int) arg; + int user_index = buf[0]; + int buffer_size = buf[1]; + char *file_buffer = malloc(buffer_size); + memcpy(file_buffer, buf+2, buffer_size); + + // printf("udp packet routine, user:%d\n size: %d\n", user_index, buffer_size); // declare vairables to be used int did_work = 1; @@ -299,73 +414,34 @@ void *send_udp_packet_routine(void *arg) { } pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd); - while (1) - { // wait for - pthread_mutex_lock(&m); - did_work = 0; - while (!start_threads) - { - pthread_cond_wait(&cond, &m); - } - - int station_num = user_data[user_index].stationNum; - if (station_num == -1) { - did_work = 1; - } - - if (!did_work) { - // sendto a random string of data to the user - // int station_num = user_data[user_index].stationNum; - // char *data = station_data[station_num].filePath; - // printf("load data: thread %d \n", user_index); - - // get file path - // char* file_path = station_data[station_num].filePath; - // // get current seek chunk - // int stream_fd = open(file_path, O_RDONLY); - // if (stream_fd == -1) { - // perror("open"); - // return (NULL); - // } - // int current_chunk = station_data[station_num].seekIndex; - // if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) { - // perror("fseek"); - // return (NULL); - // } - // read 1000 bytes of the file - - // char file_buffer[BYTES_PER_SECOND]; - // int bytes_read = 0; - // if ((bytes_read = read(stream_fd, file_buffer, BYTES_PER_SECOND)) == -1) { - // perror("fread"); - // return (NULL); - // } - // close(stream_fd); - - station_t *station_info = &station_data[station_num]; - int bytes_read = station_info->fileBufferSize; - // potential error here! - // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); - - if (send_all_udp(udp_sockfd, station_info->fileBuffer, &bytes_read, thread_res) == -1) - { - perror("send_all_udp"); - printf("We only sent %d bytes because of the error!\n", bytes_read); - } - did_work = 1; + pthread_mutex_lock(&m); + did_work = 0; + while (!start_threads) + { + pthread_cond_wait(&cond, &m); + } + int station_num = user_data[user_index].stationNum; + if (station_num == -1) { + did_work = 1; + } + // potential error here! + // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); - usleep(400000); - } + if (send_all_udp(udp_sockfd, file_buffer, &buffer_size, thread_res) == -1) + { + perror("send_all_udp"); + printf("We only sent %d bytes because of the error!\n", buffer_size); + } - pthread_mutex_unlock(&m); + free(file_buffer); - usleep(100000); - } + pthread_mutex_unlock(&m); pthread_cleanup_pop(1); - return NULL; + + return (NULL); } void *send_announce_routine(void *arg) { @@ -385,28 +461,21 @@ void *send_announce_routine(void *arg) { } } -void *synchronization_thread(void *arg) { - int c = 0; - while (1) - { - start_threads = 1; - pthread_cond_broadcast(&cond); - usleep(2000); - - start_threads = 0; - size_t BYTES_PER_SECOND = 16*1024; +// void *sync_routine(void *arg) { +// int c = 0; +// while (1) +// { +// start_threads = 1; +// pthread_cond_broadcast(&cond); +// usleep(2000); - // seek each file and read - for (int i = 0; i < num_stations; i++) - { - seek_stations(i); - } +// start_threads = 0; - usleep(1000000-2000); - } -} +// usleep(1000000-2000); +// } +// } -void *select_thread(void *arg) { +void *select_routine(void *arg) { fd_set master; // master file descriptor list fd_set read_fds; // temp file descriptor list for select() int fdmax; // maximum file descriptor number @@ -688,7 +757,7 @@ void *init_user(int sockfd) { } // map TCP sockfd to this user index - user_data[running_index] = (user_t){-1, -1, sockfd, -1}; + user_data[running_index] = (user_t){-1, -1, sockfd}; sockfd_to_user[sockfd] = running_index; // free(user_stream_threads); pthread_mutex_unlock(&mutex_user_data); @@ -700,7 +769,7 @@ void *update_user_udpPort(int sockfd, int udpPort) { // set the udpPort user->udpPort = udpPort; // start the stream thread, now that we have the udpPort - pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); + // pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]); pthread_mutex_unlock(&mutex_user_data); } void *update_user_station(int sockfd, int stationNum) { @@ -709,19 +778,19 @@ void *update_user_station(int sockfd, int stationNum) { pthread_mutex_unlock(&mutex_user_data); } void *print_user_data(int index) { - printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n", - user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread); + printf("udpPort: %d, stationNum: %d, sockfd: %d\n", + user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd); } void destroy_user(int sockfd) { pthread_mutex_lock(&mutex_user_data); // stop the thread streaming to the user - pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread; - if (thread != -1) { - pthread_cancel(thread); - } + // pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread; + // if (thread != -1) { + // pthread_cancel(thread); + // } // "remove" the user from the list of user data - user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1}; + user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1}; // map sockfd to -1 sockfd_to_user[sockfd] = -1; @@ -739,7 +808,7 @@ void *get_in_addr(struct sockaddr *sa) } void send_announce_reply(int fd, int station_num) { - char* file_path = station_data[station_num].filePath; + char* file_path = stations[station_num].filePath; int len_file_path = strlen(file_path); char *send_buffer = malloc(len_file_path+2); @@ -779,50 +848,50 @@ void send_invalid_command_reply(int fd, size_t message_size, char* message) { free(send_buffer); } -void init_station(int station_num, const char* station_name) { - station_t *station = &station_data[station_num]; - - // open the file - int stream_fd = open(station_name, O_RDONLY); - if (stream_fd == -1) { - perror("open"); - return; - } - station->streamFd = stream_fd; - station->filePath = station_name; - - // setup file buffer - char stream_buffer[MAX_STREAM_RATE]; - memset(stream_buffer, 0, MAX_STREAM_RATE); - - station->fileBufferSize = MAX_STREAM_RATE; - memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE); - - - // load the first buffer into the stations - seek_stations(station_num); -} - -void seek_stations(int station_num) { - station_t *station_info = &station_data[station_num]; - memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE); - int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); - lseek(station_info->streamFd, -16, SEEK_SET); - // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); - - // time to restart the file - if (bytes_read == 0) { - if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) { - perror("fseek"); - } - pthread_t send_announce_thread; - pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num); - - // load first chunk - bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); - } - station_info->fileBufferSize = bytes_read; -} +// void init_station(int station_num, const char* station_name) { +// station_t *station = &station_data[station_num]; + +// // open the file +// int stream_fd = open(station_name, O_RDONLY); +// if (stream_fd == -1) { +// perror("open"); +// return; +// } +// station->streamFd = stream_fd; +// station->filePath = station_name; + +// // setup file buffer +// char stream_buffer[MAX_STREAM_RATE]; +// memset(stream_buffer, 0, MAX_STREAM_RATE); + +// station->fileBufferSize = MAX_STREAM_RATE; +// memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE); + + +// // load the first buffer into the stations +// seek_stations(station_num); +// } + +// void seek_stations(int station_num) { +// station_t *station_info = &station_data[station_num]; +// memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE); +// int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); +// lseek(station_info->streamFd, -16, SEEK_SET); +// // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); + +// // time to restart the file +// if (bytes_read == 0) { +// if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) { +// perror("fseek"); +// } +// pthread_t send_announce_thread; +// pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num); + +// // load first chunk +// bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); +// } +// station_info->fileBufferSize = bytes_read; +// } // Parses a buffer into tokens, from cs33 :) diff --git a/snowcast_server b/snowcast_server Binary files differindex cb074ef..89e4cae 100755 --- a/snowcast_server +++ b/snowcast_server diff --git a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server Binary files differindex 948984a..667c8c1 100644 --- a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server +++ b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server |