aboutsummaryrefslogtreecommitdiff
path: root/server.c
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-25 16:17:12 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-25 16:17:12 -0400
commitc534d8e28a00c9762fcb4ef2bdeb9a735ae26b75 (patch)
tree096a80c9e20de1daf4babbf610837de0cefd5297 /server.c
parent13929ac7a2f3d18f1a9d5717e76d0e7725c263c4 (diff)
add comments and clean client
Diffstat (limited to 'server.c')
-rw-r--r--server.c58
1 files changed, 27 insertions, 31 deletions
diff --git a/server.c b/server.c
index bd2f5f4..456ac47 100644
--- a/server.c
+++ b/server.c
@@ -7,24 +7,17 @@
#include <arpa/inet.h>
#include <netdb.h>
#include <string.h>
-
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/types.h>
-
#include "protocol.c"
#define LINE_MAX 1024
#define MAX_USERS 1000
#define MAX_PATH 50
#define MAX_RATE_PER_SECOND 16*1024 / 2
+#define MAX_PACKET_SIZE 512
-// typedef struct station {
-// int streamFd;
-// char* filePath;
-// int fileBufferSize;
-// char fileBuffer[MAX_STREAM_RATE];
-// } station_t;
typedef struct station {
pthread_t streamThread;
int readfd;
@@ -35,7 +28,6 @@ station_t *stations;
int setup_stations(int argc, char *argv[]);
void *stream_routine(void *arg);
-
typedef struct user {
int udpPort;
int stationNum;
@@ -259,24 +251,26 @@ void *stream_routine_cleanup(void *arg) {
}
void *stream_routine(void *arg) {
+ int BROADCASTS_PER_SECOND = 2;
+ int BROADCAST_OFFSET = 10000;
+
int station_num = (int) arg;
// printf("stream routine %d\n", station_num);
int read_fd = stations[station_num].readfd;
pthread_cleanup_push(stream_routine_cleanup, read_fd);
- // make buffer which will be used to stream to children
+ // make buffer for read_file
char buffer[MAX_RATE_PER_SECOND];
- memset(buffer, 0, MAX_RATE_PER_SECOND);
- // if (!buffer) { perror("malloc (buffer in station thread)"); exit(1); }
for (;;)
{
- // load bytes into buffer
+ // load bytes from file into buffer
+ memset(buffer, 0, MAX_RATE_PER_SECOND);
int bytes_read = read_file(read_fd, buffer, station_num);
if (bytes_read == -1) { return (NULL); }
- // TODO: send buffer to children
+ // create the threads to send packets to users, which will be released later
int *send_buffer;
for (int i = 0; i < max_active_users; i++)
{
@@ -284,26 +278,33 @@ void *stream_routine(void *arg) {
continue;
if (user_data[i].stationNum == station_num)
{
- // send the udp packet
+ // prepare the send buffer
+ // (note: using int* for easy pointer assignment)
send_buffer = malloc(2 + bytes_read);
- memset(send_buffer, 0, 2 + bytes_read);
+ if (!send_buffer) { perror("malloc send_buffer in stream_routine"); return (NULL); }
send_buffer[0] = i;
send_buffer[1] = bytes_read;
memcpy(send_buffer+2, buffer, bytes_read);
- // printf("sending udp packet to user %d\n", i);
- pthread_t t;
- pthread_create(&t, NULL, send_udp_packet_routine, send_buffer);
+
+ // make thread
+ pthread_t send_udp_packet_thread;
+ pthread_create(&send_udp_packet_thread, NULL, send_udp_packet_routine, send_buffer);
}
}
- usleep(1000000 / 2 - 5000);
+
+ // wait for the thread to be created
+ usleep(1000000 / BROADCASTS_PER_SECOND - BROADCAST_OFFSET); // do -1000 for the usleep below
+
+ // let the threads run!
start_threads = 1;
pthread_cond_broadcast(&cond);
- usleep(5000);
+ // give some time to broadcast, then reset variables
+ usleep(BROADCAST_OFFSET);
start_threads = 0;
+ // free the buffer after it's been sent
free(send_buffer);
- memset(buffer, 0, MAX_RATE_PER_SECOND);
}
return (NULL);
@@ -382,7 +383,6 @@ void *print_info_routine(void *arg) {
int send_all_udp(int udp_sockfd, char *buf, int *len, struct sockaddr *addr, socklen_t addrlen)
{
- int MAX_PACKET_SIZE = 512;
int total = 0; // how many bytes we've sent
int bytesleft = *len; // how many we have left to send
int n;
@@ -746,12 +746,7 @@ void *init_user_routine(int newfd, int udp_port) {
return (NULL);
}
-// void *update_user_udpPort(int sockfd, int udpPort) {
-// pthread_mutex_lock(&mutex_user_data);
-// pthread_mutex_unlock(&mutex_user_data);
-// return (NULL);
-// }
void *update_user_station(int sockfd, int stationNum) {
pthread_mutex_lock(&mutex_user_data);
user_data[sockfd_to_user[sockfd]].stationNum = stationNum;
@@ -1015,8 +1010,9 @@ void destroy_station(int station_num) {
}
}
+ // cancel the stream's thread and close the read fd
+ pthread_cancel(stations[station_num].streamThread);
close(stations[station_num].readfd);
- // stations[station_num].filePath = NULL;
stations[station_num].readfd = -1;
printf("remove: successfully removed station %d\n", station_num);
@@ -1082,8 +1078,6 @@ void add_station(char *file_path) {
num_stations++;
}
-
-
void cleanup_fds() {
// close all the file descriptors for the stations
for (int i = 0; i < num_stations; i++) {
@@ -1096,4 +1090,6 @@ void cleanup_fds() {
close(user_data[i].sockfd);
}
}
+
+ close(listener);
}