aboutsummaryrefslogtreecommitdiff
path: root/snowcast_server_concurrent.c
diff options
context:
space:
mode:
Diffstat (limited to 'snowcast_server_concurrent.c')
-rw-r--r--snowcast_server_concurrent.c121
1 files changed, 107 insertions, 14 deletions
diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c
index 03d6414..3b71156 100644
--- a/snowcast_server_concurrent.c
+++ b/snowcast_server_concurrent.c
@@ -10,13 +10,12 @@
#include "protocol.h"
-#define NUM_STATIONS 2
#define LINE_MAX 1024
#define MAX_USERS 1000
#define MAX_PATH 50
typedef struct station {
- int currentChunk;
+ int seekIndex;
char* filePath;
} station_t;
@@ -34,7 +33,10 @@ int count = 0;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+pthread_mutex_t station_mutex = PTHREAD_MUTEX_INITIALIZER;
+
const char *port;
+int num_stations;
int start_threads = 0;
int max_active_users = 0;
@@ -75,6 +77,10 @@ int main(int argc, char *argv[])
}
port = argv[1];
+ num_stations = argc - 2;
+
+ printf("port: %s\n", port);
+ printf("num_stations: %d\n", num_stations);
// init stations
size_t totalSize = 0;
@@ -92,7 +98,7 @@ int main(int argc, char *argv[])
}
// print all indexes in station data
- for (int i = 0; i < NUM_STATIONS; i++)
+ for (int i = 0; i < num_stations; i++)
{
printf("station %d: %s\n", i, station_data[i].filePath);
}
@@ -132,6 +138,27 @@ int main(int argc, char *argv[])
return 0;
}
+int sendall(int udp_sockfd, char *buf, int *len, struct addrinfo *thread_res)
+{
+ int MAX_PACKET_SIZE = 512;
+ int total = 0; // how many bytes we've sent
+ int bytesleft = *len; // how many we have left to send
+ int n;
+
+ while(total < *len) {
+ n = sendto(udp_sockfd, buf+total, MAX_PACKET_SIZE, 0, thread_res->ai_addr, thread_res->ai_addrlen);
+ // thread_res->ai_addr, thread_res->ai_addrlen)) == -1;
+ if (n == -1) { break; }
+ total += n;
+ bytesleft -= n;
+ }
+
+ *len = total; // return number actually sent here
+
+ return n==-1?-1:0; // return -1 on failure, 0 on success
+}
+
+
/* Make the manager routine */
void *send_udp_packet_routine(void *arg) {
// unpack args
@@ -198,6 +225,7 @@ void *send_udp_packet_routine(void *arg) {
{
pthread_cond_wait(&cond, &m);
}
+
int station_num = user_data[user_index].stationNum;
if (station_num == -1) {
did_work = 1;
@@ -207,21 +235,52 @@ void *send_udp_packet_routine(void *arg) {
// sendto a random string of data to the user
int station_num = user_data[user_index].stationNum;
char *data = station_data[station_num].filePath;
- printf("load data: thread %d \n", user_index);
- int numbytes;
- if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0,
- thread_res->ai_addr, thread_res->ai_addrlen)) == -1) {
- perror("talker: sendto");
+ // printf("load data: thread %d \n", user_index);
+
+ // get file path
+ char* file_path = station_data[station_num].filePath;
+ // get current seek chunk
+ int current_chunk = station_data[station_num].seekIndex;
+ FILE* file_stream = fopen(file_path, "r");
+ if (fseek(file_stream, current_chunk, SEEK_SET) == -1) {
+ perror("fseek");
return (NULL);
}
- printf("send data: thread %d \n", user_index);
+ size_t BYTES_PER_SECOND = 16*1024;
+ // read 1000 bytes of the file
+ char file_buffer[BYTES_PER_SECOND];
+ if (fread(file_buffer, BYTES_PER_SECOND, 1, file_stream) == -1) {
+ perror("fread");
+ return (NULL);
+ }
+ // printf("send data: thread %d \n", user_index);
+ // int numbytes;
+ // if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0,
+ // thread_res->ai_addr, thread_res->ai_addrlen)) == -1) {
+ // perror("talker: sendto");
+ // return (NULL);
+ // }
+ // print the size of the file_buffer
+ // printf("size of file_buffer: %lu\n", sizeof(file_buffer));
+
+ int bytes_sent = sizeof(file_buffer);
+ if (sendall(udp_sockfd, file_buffer, &bytes_sent, thread_res) == -1)
+ {
+ perror("sendall");
+ printf("We only sent %d bytes because of the error!\n", bytes_sent);
+ }
+ // printf("We sent all %d bytes!\n", bytes_sent);
did_work = 1;
+
+ close(file_stream);
+
+ usleep(400000);
}
pthread_mutex_unlock(&m);
- usleep(500000);
+ usleep(100000);
}
return NULL;
}
@@ -231,11 +290,39 @@ void *synchronization_thread(void *arg) {
while (1)
{
start_threads = 1;
- printf("\nbroadcast %d\n", c++);
+ // printf("\nbroadcast %d\n", c++);
pthread_cond_broadcast(&cond);
usleep(2000);
start_threads = 0;
- usleep(1000000-2000);
+ // printf("before loop");
+ // update file seek index for each station
+ size_t BYTES_PER_SECOND = 16*1024;
+ // print num_stations
+ // printf("num_stations: %d\n", num_stations);
+ for (int i = 0; i < num_stations; i++)
+ {
+ // printf("checking station %d\n", i);
+ // get size of file
+ FILE* fp = fopen(station_data[i].filePath, "r");
+ fseek(fp, 0L, SEEK_END);
+ size_t size = ftell(fp);
+ if (size == -1) {
+ perror("ftell");
+ return (NULL);
+ }
+ station_data[i].seekIndex += BYTES_PER_SECOND;
+ // if the seek index is greater than the size of the file, reset it
+ if (station_data[i].seekIndex >= size)
+ {
+ // printf("resetting seek index for station %d\n", i);
+ station_data[i].seekIndex = 0;
+ }
+ fclose(fp);
+ }
+
+
+ usleep(2000);
+ usleep(1000000-4000);
}
}
@@ -348,9 +435,15 @@ void *select_thread(void *arg) {
// send the welcome message to client
struct Welcome welcome;
welcome.replyType = 2;
- welcome.numStations = htons(NUM_STATIONS);
- if ((send(newfd, &welcome, sizeof(struct Welcome), 0)) == -1)
+ welcome.numStations = htons(num_stations);
+ int numbytes;
+ if ((numbytes=send(newfd, &welcome, sizeof(struct Welcome), 0)) == -1)
perror("send");
+
+ //print the num bytes
+ // print the size of the struct welcome
+ printf("size of welcome struct: %lu\n", sizeof(struct Welcome));
+ printf("sent %d bytes\n", numbytes);
}
} else {
// handle data from a client