From 97ff64f344c99f76bbed6b934236ee65eda8a9ef Mon Sep 17 00:00:00 2001 From: sotech117 Date: Thu, 21 Sep 2023 20:42:11 +0000 Subject: change model to seek --- server.c | 134 +++++++++++++-------- snowcast_control | Bin 35613 -> 22384 bytes snowcast_listener | Bin 34654 -> 19120 bytes snowcast_server | Bin 56428 -> 44944 bytes .../Contents/Resources/DWARF/snowcast_server | Bin 29386 -> 27469 bytes 5 files changed, 86 insertions(+), 48 deletions(-) diff --git a/server.c b/server.c index 7246885..333f521 100644 --- a/server.c +++ b/server.c @@ -17,10 +17,13 @@ #define LINE_MAX 1024 #define MAX_USERS 1000 #define MAX_PATH 50 +#define MAX_STREAM_RATE 16*1024 typedef struct station { - int seekIndex; + int streamFd; char* filePath; + int fileBufferSize; + char fileBuffer[MAX_STREAM_RATE]; } station_t; typedef struct user { @@ -58,6 +61,9 @@ void *send_udp_packet_routine(void* arg); void *select_thread(void* arg); void *synchronization_thread(void* arg); +void init_station(int station_num, const char *station_name); +void seek_stations(int station_num); + int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]); void *print_info_routine(void *arg); @@ -85,20 +91,22 @@ main(int argc, char *argv[]) port = argv[1]; num_stations = argc - 2; + // init stations size_t totalSize = 0; // get size to malloc for (int i = 2; i < argc; i++) { // printf("file: %s\n", argv[i]); - totalSize += sizeof(int) + strlen(argv[i]); + // each "station" has a fd (int), filePath (char*), file_buffer_size (int), buffer_size (MAX_STREAM_RATE) + totalSize += sizeof(int) + strlen(argv[i]) + sizeof(int) + MAX_STREAM_RATE; } station_data = malloc(totalSize); if (!station_data) { perror("malloc station data"); return 1; } // assign the stations for (int i = 2; i < argc; i++) { - station_data[i - 2] = (station_t) { 0, argv[i]}; + init_station(i - 2, argv[i]); } // make array of user data @@ -308,35 +316,39 @@ void *send_udp_packet_routine(void *arg) { if (!did_work) { // sendto a random string of data to the user - int station_num = user_data[user_index].stationNum; - char *data = station_data[station_num].filePath; + // int station_num = user_data[user_index].stationNum; + // char *data = station_data[station_num].filePath; // printf("load data: thread %d \n", user_index); // get file path - char* file_path = station_data[station_num].filePath; - // get current seek chunk - int stream_fd = open(file_path, O_RDONLY); - if (stream_fd == -1) { - perror("open"); - return (NULL); - } - int current_chunk = station_data[station_num].seekIndex; - if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) { - perror("fseek"); - return (NULL); - } - size_t BYTES_PER_SECOND = 16*1024; + // char* file_path = station_data[station_num].filePath; + // // get current seek chunk + // int stream_fd = open(file_path, O_RDONLY); + // if (stream_fd == -1) { + // perror("open"); + // return (NULL); + // } + // int current_chunk = station_data[station_num].seekIndex; + // if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) { + // perror("fseek"); + // return (NULL); + // } // read 1000 bytes of the file - char file_buffer[BYTES_PER_SECOND]; - int bytes_read = 0; - if ((bytes_read = read(stream_fd, file_buffer, BYTES_PER_SECOND)) == -1) { - perror("fread"); - return (NULL); - } - close(stream_fd); + // char file_buffer[BYTES_PER_SECOND]; + // int bytes_read = 0; + // if ((bytes_read = read(stream_fd, file_buffer, BYTES_PER_SECOND)) == -1) { + // perror("fread"); + // return (NULL); + // } + // close(stream_fd); + + station_t *station_info = &station_data[station_num]; + int bytes_read = station_info->fileBufferSize; + // potential error here! + // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); - if (send_all_udp(udp_sockfd, file_buffer, &bytes_read, thread_res) == -1) + if (send_all_udp(udp_sockfd, station_info->fileBuffer, &bytes_read, thread_res) == -1) { perror("send_all_udp"); printf("We only sent %d bytes because of the error!\n", bytes_read); @@ -385,31 +397,13 @@ void *synchronization_thread(void *arg) { start_threads = 0; size_t BYTES_PER_SECOND = 16*1024; + // seek each file and read for (int i = 0; i < num_stations; i++) { - // get size of file - struct stat f_info; - if (stat(station_data[i].filePath, &f_info) == -1) { - perror("fstat"); - return (NULL); - } - - size_t size = f_info.st_size; - - station_data[i].seekIndex += BYTES_PER_SECOND; - // if the seek index is greater than the size of the file, reset it - if (station_data[i].seekIndex >= size) - { - station_data[i].seekIndex = 0; - - pthread_t send_announce_thread; - pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)i); - } + seek_stations(i); } - - usleep(2000); - usleep(1000000-4000); + usleep(1000000-2000); } } @@ -769,7 +763,7 @@ void send_announce_reply(int fd, int station_num) { void send_invalid_command_reply(int fd, size_t message_size, char* message) { char *send_buffer = malloc(message_size+2); if (!send_buffer) { - perror("malloc in send ancalid command"); + perror("malloc in send invalid command"); return; } @@ -786,6 +780,50 @@ void send_invalid_command_reply(int fd, size_t message_size, char* message) { free(send_buffer); } +void init_station(int station_num, const char* station_name) { + station_t *station = &station_data[station_num]; + + // open the file + int stream_fd = open(station_name, O_RDONLY); + if (stream_fd == -1) { + perror("open"); + return; + } + station->streamFd = stream_fd; + station->filePath = station_name; + + // setup file buffer + char stream_buffer[MAX_STREAM_RATE]; + memset(stream_buffer, 0, MAX_STREAM_RATE); + + station->fileBufferSize = MAX_STREAM_RATE; + memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE); + + + // load the first buffer into the stations + seek_stations(station_num); +} + +void seek_stations(int station_num) { + station_t *station_info = &station_data[station_num]; + memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE); + int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); + // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize); + + // time to restart the file + if (bytes_read == 0) { + if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) { + perror("fseek"); + } + pthread_t send_announce_thread; + pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num); + + // load first chunk + bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE); + } + station_info->fileBufferSize = bytes_read; +} + // Parses a buffer into tokens, from cs33 :) int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]) { diff --git a/snowcast_control b/snowcast_control index c1feb84..413a63b 100755 Binary files a/snowcast_control and b/snowcast_control differ diff --git a/snowcast_listener b/snowcast_listener index 5d30ec3..3e47e13 100755 Binary files a/snowcast_listener and b/snowcast_listener differ diff --git a/snowcast_server b/snowcast_server index c296476..f549d85 100755 Binary files a/snowcast_server and b/snowcast_server differ diff --git a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server index 0bc1a97..d52de8f 100644 Binary files a/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server and b/snowcast_server.dSYM/Contents/Resources/DWARF/snowcast_server differ -- cgit v1.2.3-70-g09d2