aboutsummaryrefslogtreecommitdiff
path: root/server.c
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-23 07:08:28 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-23 07:08:28 -0400
commitd93da5af53d6beb9a2339839aa47fbbbbeafc208 (patch)
tree7f9d2013845137979e0805b96585ae54f9cdb47f /server.c
parent2707112ffc0b0eed6af8271d32f94e1622203a80 (diff)
bitrate still low, but pass the no read from file test
Diffstat (limited to 'server.c')
-rw-r--r--server.c407
1 files changed, 238 insertions, 169 deletions
diff --git a/server.c b/server.c
index 7df1f12..324527e 100644
--- a/server.c
+++ b/server.c
@@ -17,20 +17,29 @@
#define LINE_MAX 1024
#define MAX_USERS 1000
#define MAX_PATH 50
-#define MAX_STREAM_RATE 16*1024
-
+#define MAX_RATE_PER_SECOND 16*1024
+
+// typedef struct station {
+// int streamFd;
+// char* filePath;
+// int fileBufferSize;
+// char fileBuffer[MAX_STREAM_RATE];
+// } station_t;
typedef struct station {
- int streamFd;
- char* filePath;
- int fileBufferSize;
- char fileBuffer[MAX_STREAM_RATE];
+ pthread_t streamThread;
+ int readfd;
+ char *filePath;
} station_t;
+int num_stations;
+station_t *stations;
+int setup_stations(int argc, char *argv[]);
+void *stream_routine(void *arg);
+
typedef struct user {
int udpPort;
int stationNum;
int sockfd;
- pthread_t streamThread;
} user_t;
@@ -43,7 +52,7 @@ pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER;
const char *port;
-int num_stations;
+// int num_stations;
int start_threads = 0;
int max_active_users = 0;
@@ -55,14 +64,20 @@ user_t *user_data;
int *sockfd_to_user;
// stations array pointer
-station_t *station_data;
+// station_t *station_data;
+
+struct udp_packet_routine_args {
+ int user_index;
+ int buffer_size;
+ char *file_buffer;
+};
void *send_udp_packet_routine(void* arg);
-void *select_thread(void* arg);
-void *synchronization_thread(void* arg);
+void *select_routine(void* arg);
+void *sync_routine(void* arg);
+void *send_announce_routine(void* arg);
void init_station(int station_num, const char *station_name);
-void seek_stations(int station_num);
int parse(char buffer[LINE_MAX], char *tokens[LINE_MAX / 2]);
void *print_info_routine(void *arg);
@@ -88,41 +103,30 @@ main(int argc, char *argv[])
exit(1);
}
+ // initizlize the port
port = argv[1];
- num_stations = argc - 2;
-
- // init stations
- size_t totalSize = 0;
- // get size to malloc
- for (int i = 2; i < argc; i++)
- {
- // printf("file: %s\n", argv[i]);
- // each "station" has a fd (int), filePath (char*), file_buffer_size (int), buffer_size (MAX_STREAM_RATE)
- totalSize += sizeof(int) + strlen(argv[i]) + sizeof(int) + MAX_STREAM_RATE;
- }
- station_data = malloc(totalSize);
- if (!station_data) { perror("malloc station data"); return 1; }
- // assign the stations
- for (int i = 2; i < argc; i++)
- {
- init_station(i - 2, argv[i]);
+ // initialize the stations & their threads
+ if (setup_stations(argc, argv) == -1) {
+ perror("setup_stations");
+ exit(1);
}
// make array of user data
+ printf("max active users: %d\n", sizeof(user_t) * max_active_users);
user_data = malloc(sizeof(user_t) * max_active_users);
if (!user_data) { perror("malloc userdata"); return 1; }
sockfd_to_user = malloc(sizeof(int) * max_active_users);
if (!sockfd_to_user) { perror("malloc sockfd to user"); return 1; }
// make and start "select" thread that manages:
- // 1) new connections, 2) requests from current connections, 3)cloing connections
- pthread_t s_thread;
- pthread_create(&s_thread, NULL, select_thread, NULL);
+ // 1) new connections, 2) requests from current connections, 3) closing connections
+ pthread_t select_thread;
+ pthread_create(&select_thread, NULL, select_routine, NULL);
// start syncchronization thread to broadcast stations
- pthread_t sync_thread;
- pthread_create(&sync_thread, NULL, synchronization_thread, NULL);
+ // pthread_t sync_thread;
+ // pthread_create(&sync_thread, NULL, sync_routine, NULL);
// command line interface
char input[LINE_MAX];
@@ -179,6 +183,110 @@ main(int argc, char *argv[])
return 0;
}
+int read_file(int fd, char buffer[MAX_RATE_PER_SECOND], int station_num) {
+ int bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ if (bytes_read < 0) { perror("read (from station file)"); return -1; }
+ // printf("bytes read: %d\n", bytes_read);
+ if (bytes_read == 0) {
+ // printf("end of file, restarting\n");
+ pthread_t send_announce_thread;
+ pthread_create(&send_announce_thread, NULL, send_announce_routine, station_num);
+
+ if (lseek(fd, 0, SEEK_SET) == -1)
+ {
+ perror("lseek (in resarting file)");
+ return -1;
+ }
+ bytes_read = read(fd, buffer, MAX_RATE_PER_SECOND);
+ if (bytes_read < 0) { perror("read (from station file, after restart)"); return -1; }
+ }
+
+ return bytes_read;
+}
+
+void *stream_routine_cleanup(void *arg) {
+ int read_fd = (int) arg;
+ close(read_fd);
+}
+
+void *stream_routine(void *arg) {
+ int station_num = (int) arg;
+ printf("stream routine %d\n", station_num);
+ int read_fd = stations[station_num].readfd;
+
+ pthread_cleanup_push(stream_routine_cleanup, read_fd);
+
+ // make buffer which will be used to stream to children
+ char buffer[MAX_RATE_PER_SECOND];
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
+ // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); }
+
+ for (;;)
+ {
+ // load bytes into buffer
+ int bytes_read = read_file(read_fd, buffer, station_num);
+ if (bytes_read == -1) { exit(1); }
+
+ // TODO: send buffer to children
+ char *send_buffer = malloc(2 + bytes_read);
+ for (int i = 0; i < max_active_users; i++)
+ {
+ if (!user_data[i].sockfd || user_data[i].sockfd == -1)
+ continue;
+ if (user_data[i].stationNum == station_num)
+ {
+ // send the udp packet
+ int *send_buffer = malloc(2 + bytes_read);
+ memset(send_buffer, 0, 2 + bytes_read);
+ send_buffer[0] = i;
+ send_buffer[1] = bytes_read;
+ memcpy(send_buffer+2, buffer, bytes_read);
+ // printf("sending udp packet to user %d\n", i);
+ pthread_t t;
+ pthread_create(&t, NULL, send_udp_packet_routine, send_buffer);
+ }
+ }
+ free(send_buffer);
+ usleep(1000000-5000);
+ start_threads = 1;
+ pthread_cond_broadcast(&cond);
+
+ usleep(5000);
+ start_threads = 0;
+
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
+ }
+
+ return (NULL);
+
+ pthread_cleanup_pop(1);
+}
+
+int setup_stations(int argc, char *argv[]) {
+ num_stations = argc - 2;
+
+ // get the size to malloc
+ int totalSize = 0;
+ for(int i = 2; i < argc; i++)
+ {
+ totalSize += sizeof(pthread_t) + sizeof(int) + strlen(argv[i]);
+ }
+
+ // malloc the stations array
+ stations = malloc(totalSize);
+ if (!stations) { perror("malloc (stations pointer)"); return -1; }
+ // assign the stations, and start the threads
+ for (int i = 0; i < num_stations; i++) {
+ stations[i].filePath = argv[i+2];
+ stations[i].readfd = open(argv[i+2], O_RDONLY);
+ if (stations[i].readfd < 0) { perror("read (from station file)"); return -1; }
+ pthread_create(&stations[i].streamThread, NULL, stream_routine, i);
+ }
+
+ printf("successfully created %d stations\n", num_stations);
+ return 1;
+}
+
void write_int_to_fd(int fd, int n) {
int l = snprintf(NULL, 0, "%d", n);
char *num = malloc(l + 1);
@@ -203,7 +311,7 @@ void *print_info_routine(void *arg) {
write(print_fd, comma, strlen(comma));
// write file path
- char* file_path = station_data[i].filePath;
+ char* file_path = stations[i].filePath;
write(print_fd, file_path, strlen(file_path));
for (int j = 0; j < max_active_users; j++) {
@@ -253,8 +361,15 @@ void udp_port_cleanup_handler(void *arg)
/* Make the manager routine */
void *send_udp_packet_routine(void *arg) {
+ printf("send udp packet routine\n");
+ int *buf = arg;
// unpack args
- int user_index = (int) arg;
+ int user_index = buf[0];
+ int buffer_size = buf[1];
+ char *file_buffer = malloc(buffer_size);
+ memcpy(file_buffer, buf+2, buffer_size);
+
+ // printf("udp packet routine, user:%d\n size: %d\n", user_index, buffer_size);
// declare vairables to be used
int did_work = 1;
@@ -299,73 +414,34 @@ void *send_udp_packet_routine(void *arg) {
}
pthread_cleanup_push(udp_port_cleanup_handler, (void *)udp_sockfd);
- while (1)
- {
// wait for
- pthread_mutex_lock(&m);
- did_work = 0;
- while (!start_threads)
- {
- pthread_cond_wait(&cond, &m);
- }
-
- int station_num = user_data[user_index].stationNum;
- if (station_num == -1) {
- did_work = 1;
- }
-
- if (!did_work) {
- // sendto a random string of data to the user
- // int station_num = user_data[user_index].stationNum;
- // char *data = station_data[station_num].filePath;
- // printf("load data: thread %d \n", user_index);
-
- // get file path
- // char* file_path = station_data[station_num].filePath;
- // // get current seek chunk
- // int stream_fd = open(file_path, O_RDONLY);
- // if (stream_fd == -1) {
- // perror("open");
- // return (NULL);
- // }
- // int current_chunk = station_data[station_num].seekIndex;
- // if (lseek(stream_fd, current_chunk, SEEK_SET) == -1) {
- // perror("fseek");
- // return (NULL);
- // }
- // read 1000 bytes of the file
-
- // char file_buffer[BYTES_PER_SECOND];
- // int bytes_read = 0;
- // if ((bytes_read = read(stream_fd, file_buffer, BYTES_PER_SECOND)) == -1) {
- // perror("fread");
- // return (NULL);
- // }
- // close(stream_fd);
-
- station_t *station_info = &station_data[station_num];
- int bytes_read = station_info->fileBufferSize;
- // potential error here!
- // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
-
- if (send_all_udp(udp_sockfd, station_info->fileBuffer, &bytes_read, thread_res) == -1)
- {
- perror("send_all_udp");
- printf("We only sent %d bytes because of the error!\n", bytes_read);
- }
- did_work = 1;
+ pthread_mutex_lock(&m);
+ did_work = 0;
+ while (!start_threads)
+ {
+ pthread_cond_wait(&cond, &m);
+ }
+ int station_num = user_data[user_index].stationNum;
+ if (station_num == -1) {
+ did_work = 1;
+ }
+ // potential error here!
+ // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
- usleep(400000);
- }
+ if (send_all_udp(udp_sockfd, file_buffer, &buffer_size, thread_res) == -1)
+ {
+ perror("send_all_udp");
+ printf("We only sent %d bytes because of the error!\n", buffer_size);
+ }
- pthread_mutex_unlock(&m);
+ free(file_buffer);
- usleep(100000);
- }
+ pthread_mutex_unlock(&m);
pthread_cleanup_pop(1);
- return NULL;
+
+ return (NULL);
}
void *send_announce_routine(void *arg) {
@@ -385,28 +461,21 @@ void *send_announce_routine(void *arg) {
}
}
-void *synchronization_thread(void *arg) {
- int c = 0;
- while (1)
- {
- start_threads = 1;
- pthread_cond_broadcast(&cond);
- usleep(2000);
-
- start_threads = 0;
- size_t BYTES_PER_SECOND = 16*1024;
+// void *sync_routine(void *arg) {
+// int c = 0;
+// while (1)
+// {
+// start_threads = 1;
+// pthread_cond_broadcast(&cond);
+// usleep(2000);
- // seek each file and read
- for (int i = 0; i < num_stations; i++)
- {
- seek_stations(i);
- }
+// start_threads = 0;
- usleep(1000000-2000);
- }
-}
+// usleep(1000000-2000);
+// }
+// }
-void *select_thread(void *arg) {
+void *select_routine(void *arg) {
fd_set master; // master file descriptor list
fd_set read_fds; // temp file descriptor list for select()
int fdmax; // maximum file descriptor number
@@ -688,7 +757,7 @@ void *init_user(int sockfd) {
}
// map TCP sockfd to this user index
- user_data[running_index] = (user_t){-1, -1, sockfd, -1};
+ user_data[running_index] = (user_t){-1, -1, sockfd};
sockfd_to_user[sockfd] = running_index;
// free(user_stream_threads);
pthread_mutex_unlock(&mutex_user_data);
@@ -700,7 +769,7 @@ void *update_user_udpPort(int sockfd, int udpPort) {
// set the udpPort
user->udpPort = udpPort;
// start the stream thread, now that we have the udpPort
- pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]);
+ // pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]);
pthread_mutex_unlock(&mutex_user_data);
}
void *update_user_station(int sockfd, int stationNum) {
@@ -709,19 +778,19 @@ void *update_user_station(int sockfd, int stationNum) {
pthread_mutex_unlock(&mutex_user_data);
}
void *print_user_data(int index) {
- printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n",
- user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread);
+ printf("udpPort: %d, stationNum: %d, sockfd: %d\n",
+ user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd);
}
void destroy_user(int sockfd) {
pthread_mutex_lock(&mutex_user_data);
// stop the thread streaming to the user
- pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread;
- if (thread != -1) {
- pthread_cancel(thread);
- }
+ // pthread_t thread = user_data[sockfd_to_user[sockfd]].streamThread;
+ // if (thread != -1) {
+ // pthread_cancel(thread);
+ // }
// "remove" the user from the list of user data
- user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1};
+ user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1};
// map sockfd to -1
sockfd_to_user[sockfd] = -1;
@@ -739,7 +808,7 @@ void *get_in_addr(struct sockaddr *sa)
}
void send_announce_reply(int fd, int station_num) {
- char* file_path = station_data[station_num].filePath;
+ char* file_path = stations[station_num].filePath;
int len_file_path = strlen(file_path);
char *send_buffer = malloc(len_file_path+2);
@@ -779,50 +848,50 @@ void send_invalid_command_reply(int fd, size_t message_size, char* message) {
free(send_buffer);
}
-void init_station(int station_num, const char* station_name) {
- station_t *station = &station_data[station_num];
-
- // open the file
- int stream_fd = open(station_name, O_RDONLY);
- if (stream_fd == -1) {
- perror("open");
- return;
- }
- station->streamFd = stream_fd;
- station->filePath = station_name;
-
- // setup file buffer
- char stream_buffer[MAX_STREAM_RATE];
- memset(stream_buffer, 0, MAX_STREAM_RATE);
-
- station->fileBufferSize = MAX_STREAM_RATE;
- memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE);
-
-
- // load the first buffer into the stations
- seek_stations(station_num);
-}
-
-void seek_stations(int station_num) {
- station_t *station_info = &station_data[station_num];
- memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE);
- int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
- lseek(station_info->streamFd, -16, SEEK_SET);
- // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
-
- // time to restart the file
- if (bytes_read == 0) {
- if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) {
- perror("fseek");
- }
- pthread_t send_announce_thread;
- pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num);
-
- // load first chunk
- bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
- }
- station_info->fileBufferSize = bytes_read;
-}
+// void init_station(int station_num, const char* station_name) {
+// station_t *station = &station_data[station_num];
+
+// // open the file
+// int stream_fd = open(station_name, O_RDONLY);
+// if (stream_fd == -1) {
+// perror("open");
+// return;
+// }
+// station->streamFd = stream_fd;
+// station->filePath = station_name;
+
+// // setup file buffer
+// char stream_buffer[MAX_STREAM_RATE];
+// memset(stream_buffer, 0, MAX_STREAM_RATE);
+
+// station->fileBufferSize = MAX_STREAM_RATE;
+// memcpy(&station->fileBufferSize, stream_buffer, MAX_STREAM_RATE);
+
+
+// // load the first buffer into the stations
+// seek_stations(station_num);
+// }
+
+// void seek_stations(int station_num) {
+// station_t *station_info = &station_data[station_num];
+// memset(&station_info->fileBuffer, 0, MAX_STREAM_RATE);
+// int bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
+// lseek(station_info->streamFd, -16, SEEK_SET);
+// // printf("station info - bytes read: %d, station_fd: %d, filePath: %s, buffersize: %d\n", bytes_read, station_info->streamFd, station_info->filePath, station_info->fileBufferSize);
+
+// // time to restart the file
+// if (bytes_read == 0) {
+// if (lseek(station_info->streamFd, 0, SEEK_SET) == -1) {
+// perror("fseek");
+// }
+// pthread_t send_announce_thread;
+// pthread_create(&send_announce_thread, NULL, send_announce_routine, (void *)station_num);
+
+// // load first chunk
+// bytes_read = read(station_info->streamFd, &station_info->fileBuffer, MAX_STREAM_RATE);
+// }
+// station_info->fileBufferSize = bytes_read;
+// }
// Parses a buffer into tokens, from cs33 :)