aboutsummaryrefslogtreecommitdiff
path: root/snowcast_server_concurrent.c
diff options
context:
space:
mode:
authorsotech117 <michael_foiani@brown.edu>2023-09-18 15:32:05 -0400
committersotech117 <michael_foiani@brown.edu>2023-09-18 15:32:05 -0400
commitbc24590991cb27e8bd220fd6d0585e76f804601d (patch)
tree8a982cf0144dd8366aca84cf8027387403f51af2 /snowcast_server_concurrent.c
parent5236560176cfe8e4d06be4812719037937b7f4dc (diff)
good progress. basic num_station data going & listener udp port works
Diffstat (limited to 'snowcast_server_concurrent.c')
-rw-r--r--snowcast_server_concurrent.c181
1 files changed, 135 insertions, 46 deletions
diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c
index 903b3fd..e09e398 100644
--- a/snowcast_server_concurrent.c
+++ b/snowcast_server_concurrent.c
@@ -10,9 +10,14 @@
#include "protocol.h"
+#define NUM_STATIONS 2
+#define LINE_MAX 1024
+#define MAX_USERS 1000
+#define MAX_PATH 50
+
typedef struct station {
- char* filePath;
int currentChunk;
+ char* filePath;
} station_t;
typedef struct user {
@@ -22,9 +27,6 @@ typedef struct user {
pthread_t streamThread;
} user_t;
-#define NUM_STATIONS 2
-#define LINE_MAX 1024
-#define MAX_USERS 1000
/* For safe condition variable usage, must use a boolean predicate and */
/* a mutex with the condition. */
@@ -32,18 +34,18 @@ int count = 0;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+const char *port;
int start_threads = 0;
-
int max_active_users = 0;
pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER;
// array from index to user_data
user_t *user_data;
-station_t station_data[NUM_STATIONS];
-int sockfd_to_user[MAX_USERS + 4];
+int sockfd_to_user[MAX_USERS];
-char* port = "4950";
+// stations array pointer
+station_t *station_data;
void *send_udp_packet_routine(void* arg);
void *select_thread(void* arg);
@@ -62,27 +64,49 @@ void destroy_user(int sockfd);
main(int argc, char *argv[])
{
- // temporary
- station_data[0] = (station_t) {"mp3/Beethoven-SymphonyNo5.mp3", 0};
- station_data[1] = (station_t) {"mp3/DukeEllington-Caravan.mp3", 0};
-
// threads to control reading files at chunks while the other threads sleep
+ // station_data = malloc(sizeof(station_t) * NUM_STATIONS);
+ // check and assign arguments
+ if (argc < 3) {
+ fprintf(stderr,"usage: ./snowcast_server <listen port> <file0> [file 1] [file 2] ... \n");
+ exit(1);
+ }
+
+ port = argv[1];
+
+ // init stations
+ size_t totalSize = 0;
+ // get size to malloc
+ for (int i = 2; i < argc; i++)
+ {
+ printf("file: %s\n", argv[i]);
+ totalSize += sizeof(int) + strlen(argv[i]);
+ }
+ station_data = malloc(totalSize);
+ // assign the stations
+ for (int i = 2; i < argc; i++)
+ {
+ station_data[i - 2] = (station_t) { 0, argv[i]};
+ }
+
+ // print all indexes in station data
+ for (int i = 0; i < NUM_STATIONS; i++)
+ {
+ printf("station %d: %s\n", i, station_data[i].filePath);
+ }
+ // make array of user data
user_data = malloc(sizeof(user_t) * max_active_users);
if (!user_data) { perror("malloc"); return 1; }
- // thread that manages file descriptors
- pthread_t s_thread, sync_thread;
+ // make and start "select" thread that manages:
+ // 1) new connections, 2) requests from current connections, 3)cloing connections
+ pthread_t s_thread;
pthread_create(&s_thread, NULL, select_thread, NULL);
- // starts the threads after created
- // sleep(1);
- // startThreads = 0;
- // pthread_cond_broadcast(&cond);
-
// command line interface
char input[LINE_MAX];
- while (1) {
+ while (1) {
char *line = fgets(input, LINE_MAX, stdin);
if (line == NULL) {
@@ -98,6 +122,7 @@ main(int argc, char *argv[])
}
} else if (strncmp("s\n", input, LINE_MAX) == 0) {
// start the streaming threads
+ pthread_t sync_thread;
pthread_create(&sync_thread, NULL, synchronization_thread, NULL);
}
}
@@ -107,33 +132,95 @@ main(int argc, char *argv[])
/* Make the manager routine */
void *send_udp_packet_routine(void *arg) {
- pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
+ // unpack args
+ int user_index = (int) arg;
+ printf("thread : user_index: %d\n", user_index);
+ // print user data
+ print_user_data(user_index);
+ // declare vairables to be used
int did_work = 1;
+ pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
+ int s;
+ int udp_sockfd;
+ struct addrinfo thread_hints, *thread_res, *thread_servinfo;
+ int error_code;
+
+ // TODO: add error checking on these calls***
+
+ // setup hints
+ memset(&thread_hints, 0, sizeof thread_hints);
+ thread_hints.ai_family = AF_INET; // use IPv4 only
+ thread_hints.ai_socktype = SOCK_DGRAM;
+ thread_hints.ai_flags = AI_PASSIVE; // fill in my IP for me
+
+ // setup the socket for client listener DATAGRAM (udp)
+ // cover the port integer to a string
+ int int_port = user_data[user_index].udpPort;
+ int length = snprintf( NULL, 0, "%d", int_port );
+ char* port = malloc( length + 1 );
+ snprintf( port, length + 1, "%d", int_port );
+ sprintf(port, "%d", int_port);
+
+ if (error_code = getaddrinfo(NULL, port, &thread_hints, &thread_servinfo) != 0)
+ {
+ fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(error_code));
+ return 1;
+ }
+ free(port);
- int * i = (int *) arg;
- while (1)
+ // loop through all the results and make a socket
+ for(thread_res = thread_servinfo; thread_res != NULL; thread_res = thread_res->ai_next) {
+ if ((udp_sockfd = socket(thread_res->ai_family, thread_res->ai_socktype,
+ thread_res->ai_protocol)) == -1) {
+ perror("talker: socket");
+ continue;
+ }
+ break;
+ }
+ if (udp_sockfd == NULL) {
+ fprintf(stderr, "talker: failed to create socket\n");
+ return (NULL);
+ }
+
+ // bind(udp_sockfd, thread_res->ai_addr, thread_res->ai_addrlen);
+
+
+ // freeaddrinfo(thread_servinfo);
+
+ while (1) {
+ // wait for
+ pthread_mutex_lock(&m);
+ did_work = 0;
+ while (!start_threads)
{
- // wait for
- pthread_mutex_lock(&m);
- did_work = 0;
- while (!start_threads)
- {
- pthread_cond_wait(&cond, &m);
- }
+ pthread_cond_wait(&cond, &m);
+ }
+ int station_num = user_data[user_index].stationNum;
+ if (station_num == -1) {
+ did_work = 1;
+ }
- if (!did_work) {
- printf("send data: thread %d \n", i);
- printf("load data: thread %d \n", i);
- did_work = 1;
+ if (!did_work) {
+ // sendto a random string of data to the user
+ int station_num = user_data[user_index].stationNum - 1;
+ char *data = station_data[station_num].filePath;
+ printf("load data: thread %d \n", user_index);
+ int numbytes;
+ if ((numbytes = sendto(udp_sockfd, data, strlen(data), 0,
+ thread_res->ai_addr, thread_res->ai_addrlen)) == -1) {
+ perror("talker: sendto");
+ return (NULL);
}
- pthread_mutex_unlock(&m);
+ printf("send data: thread %d \n", user_index);
- usleep(500000);
- // pthread_mutex_lock(&mutex);
- // start_threads = 0;
- // pthread_mutex_unlock(&mutex);
+ did_work = 1;
}
+
+ pthread_mutex_unlock(&m);
+
+ usleep(500000);
+ }
return NULL;
}
@@ -223,7 +310,7 @@ void *select_thread(void *arg) {
// keep track of the biggest file descriptor
fdmax = listener; // so far, it's this one
- while(1==1) {
+ while(1) {
read_fds = master; // copy it
if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) {
perror("select");
@@ -253,7 +340,6 @@ void *select_thread(void *arg) {
get_in_addr((struct sockaddr*)&remoteaddr),
remoteIP, INET6_ADDRSTRLEN),
newfd);
-
// init user with this newfd
init_user(newfd);
@@ -334,17 +420,20 @@ void *init_user(int sockfd) {
if (!more_users) { perror("realloc"); exit(1); }
user_data = more_users;
}
- // map sockfd to this user index & create its stream thread
- pthread_t user_thread;
- pthread_create(&user_thread, NULL, send_udp_packet_routine, (void *)user_index);
- user_data[user_index] = (user_t){-1, -1, sockfd, user_thread};
+ // map TCP sockfd to this user index
+ user_data[user_index] = (user_t){-1, -1, sockfd, -1};
sockfd_to_user[sockfd] = user_index;
// free(user_stream_threads);
pthread_mutex_unlock(&mutex_user_data);
}
void *update_user_udpPort(int sockfd, int udpPort) {
pthread_mutex_lock(&mutex_user_data);
- user_data[sockfd_to_user[sockfd]].udpPort = udpPort;
+ // get the user
+ user_t *user = &user_data[sockfd_to_user[sockfd]];
+ // set the udpPort
+ user->udpPort = udpPort;
+ // start the stream thread, now that we have the udpPort
+ pthread_create(&user->streamThread, NULL, send_udp_packet_routine, (void *)sockfd_to_user[sockfd]);
pthread_mutex_unlock(&mutex_user_data);
}
void *update_user_station(int sockfd, int stationNum) {