diff options
author | sotech117 <michael_foiani@brown.edu> | 2023-09-25 16:17:12 -0400 |
---|---|---|
committer | sotech117 <michael_foiani@brown.edu> | 2023-09-25 16:17:12 -0400 |
commit | c534d8e28a00c9762fcb4ef2bdeb9a735ae26b75 (patch) | |
tree | 096a80c9e20de1daf4babbf610837de0cefd5297 /server.c | |
parent | 13929ac7a2f3d18f1a9d5717e76d0e7725c263c4 (diff) |
add comments and clean client
Diffstat (limited to 'server.c')
-rw-r--r-- | server.c | 58 |
1 files changed, 27 insertions, 31 deletions
@@ -7,24 +7,17 @@ #include <arpa/inet.h> #include <netdb.h> #include <string.h> - #include <sys/stat.h> #include <fcntl.h> #include <sys/types.h> - #include "protocol.c" #define LINE_MAX 1024 #define MAX_USERS 1000 #define MAX_PATH 50 #define MAX_RATE_PER_SECOND 16*1024 / 2 +#define MAX_PACKET_SIZE 512 -// typedef struct station { -// int streamFd; -// char* filePath; -// int fileBufferSize; -// char fileBuffer[MAX_STREAM_RATE]; -// } station_t; typedef struct station { pthread_t streamThread; int readfd; @@ -35,7 +28,6 @@ station_t *stations; int setup_stations(int argc, char *argv[]); void *stream_routine(void *arg); - typedef struct user { int udpPort; int stationNum; @@ -259,24 +251,26 @@ void *stream_routine_cleanup(void *arg) { } void *stream_routine(void *arg) { + int BROADCASTS_PER_SECOND = 2; + int BROADCAST_OFFSET = 10000; + int station_num = (int) arg; // printf("stream routine %d\n", station_num); int read_fd = stations[station_num].readfd; pthread_cleanup_push(stream_routine_cleanup, read_fd); - // make buffer which will be used to stream to children + // make buffer for read_file char buffer[MAX_RATE_PER_SECOND]; - memset(buffer, 0, MAX_RATE_PER_SECOND); - // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); } for (;;) { - // load bytes into buffer + // load bytes from file into buffer + memset(buffer, 0, MAX_RATE_PER_SECOND); int bytes_read = read_file(read_fd, buffer, station_num); if (bytes_read == -1) { return (NULL); } - // TODO: send buffer to children + // create the threads to send packets to users, which will be released later int *send_buffer; for (int i = 0; i < max_active_users; i++) { @@ -284,26 +278,33 @@ void *stream_routine(void *arg) { continue; if (user_data[i].stationNum == station_num) { - // send the udp packet + // prepare the send buffer + // (note: using int* for easy pointer assignment) send_buffer = malloc(2 + bytes_read); - memset(send_buffer, 0, 2 + bytes_read); + if (!send_buffer) { perror("malloc send_buffer in stream_routine"); return (NULL); } send_buffer[0] = i; send_buffer[1] = bytes_read; memcpy(send_buffer+2, buffer, bytes_read); - // printf("sending udp packet to user %d\n", i); - pthread_t t; - pthread_create(&t, NULL, send_udp_packet_routine, send_buffer); + + // make thread + pthread_t send_udp_packet_thread; + pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer); } } - usleep(1000000 / 2 - 5000); + + // wait for the thread to be created + usleep(1000000 / BROADCASTS_PER_SECOND - BROADCAST_OFFSET); // do -1000 for the usleep below + + // let the threads run! start_threads = 1; pthread_cond_broadcast(&cond); - usleep(5000); + // give some time to broadcast, then reset variables + usleep(BROADCAST_OFFSET); start_threads = 0; + // free the buffer after it's been sent free(send_buffer); - memset(buffer, 0, MAX_RATE_PER_SECOND); } return (NULL); @@ -382,7 +383,6 @@ void *print_info_routine(void *arg) { int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen) { - int MAX_PACKET_SIZE = 512; int total = 0; // how many bytes we've sent int bytesleft = *len; // how many we have left to send int n; @@ -746,12 +746,7 @@ void *init_user_routine(int newfd, int udp_port) { return (NULL); } -// void *update_user_udpPort(int sockfd, int udpPort) { -// pthread_mutex_lock(&mutex_user_data); -// pthread_mutex_unlock(&mutex_user_data); -// return (NULL); -// } void *update_user_station(int sockfd, int stationNum) { pthread_mutex_lock(&mutex_user_data); user_data[sockfd_to_user[sockfd]].stationNum = stationNum; @@ -1015,8 +1010,9 @@ void destroy_station(int station_num) { } } + // cancel the stream's thread and close the read fd + pthread_cancel(stations[station_num].streamThread); close(stations[station_num].readfd); - // stations[station_num].filePath = NULL; stations[station_num].readfd = -1; printf("remove: successfully removed station %d\n", station_num); @@ -1082,8 +1078,6 @@ void add_station(char *file_path) { num_stations++; } - - void cleanup_fds() { // close all the file descriptors for the stations for (int i = 0; i < num_stations; i++) { @@ -1096,4 +1090,6 @@ void cleanup_fds() { close(user_data[i].sockfd); } } + + close(listener); } |