From 6ed8bef65e54efdb98841ef9b6eb3ea3c9be82d5 Mon Sep 17 00:00:00 2001 From: sotech117 Date: Mon, 18 Sep 2023 02:34:12 -0400 Subject: big progress on streaming in sync and controlling multiple users --- Makefile | 5 +- client.c | 5 +- htable.c | 78 ++++++++ htable.h | 45 +++++ list.h | 137 +++++++++++++ protocol.h | 2 +- snowcast_control | Bin 34461 -> 34461 bytes snowcast_server | Bin 34476 -> 34636 bytes snowcast_server_concurrent.c | 457 +++++++++++++++++++++++++++++++++++++++++++ test | Bin 0 -> 52849 bytes 10 files changed, 725 insertions(+), 4 deletions(-) create mode 100644 htable.c create mode 100644 htable.h create mode 100644 list.h create mode 100644 snowcast_server_concurrent.c create mode 100755 test diff --git a/Makefile b/Makefile index be4e159..1603f38 100644 --- a/Makefile +++ b/Makefile @@ -10,4 +10,7 @@ server: server.c $(CC) $(CFLAGS) -o snowcast_server snowcast_server.c client: client.c - $(CC) $(CFLAGS) -o snowcast_control client.c \ No newline at end of file + $(CC) $(CFLAGS) -o snowcast_control client.c + +new: + $(CC) $(CFLAGS) -o test snowcast_server_concurrent.c \ No newline at end of file diff --git a/client.c b/client.c index cb524cf..1acd3ae 100644 --- a/client.c +++ b/client.c @@ -107,7 +107,7 @@ int main(int argc, char *argv[]) char input[LINE_MAX]; printf("Enter a number to change to it's station. Click q to end stream.\n"); - while (1==1) { + while (1) { char *line = fgets(input, LINE_MAX, stdin); if (line == NULL) { @@ -118,7 +118,8 @@ int main(int argc, char *argv[]) break; } else { // convert input to an int - int inputInt = (uint16_t)atoi(input); + int inputInt = atoi(input); + printf("Changing to station %d.\n", inputInt); // send the command to change the station struct SetStation setStation; diff --git a/htable.c b/htable.c new file mode 100644 index 0000000..a39c3c6 --- /dev/null +++ b/htable.c @@ -0,0 +1,78 @@ +#include +#include +#include +#include +#include + +#include "htable.h" + +static htable_node_t *__htable_lookup( htable_t *ht, unsigned int id ); + +void htable_init( htable_t *ht, unsigned int cap ) { + unsigned int i; + + ht->ht_hash = (list_t*) malloc( sizeof( list_t ) * cap ); + ht->ht_cap = cap; + ht->ht_size = 0; + + for( i = 0; i < cap; i++ ) + list_init( &ht->ht_hash[ i ] ); +} + +void htable_destroy( htable_t *ht ) { + unsigned int i; + htable_node_t *hn; + + for( i = 0; i < ht->ht_cap; i++ ) { + list_iterate_begin( &ht->ht_hash[ i ], hn, htable_node_t, hn_link ) { + free( hn ); + } list_iterate_end(); + } + + free( ht->ht_hash ); +} + +void *htable_get( htable_t *ht, unsigned int id ) { + htable_node_t *hn; + + if( ( hn = __htable_lookup( ht, id ) ) ) return hn->hn_data; + else return NULL; +} + +void *htable_put( htable_t *ht, unsigned int id, void *data ) { + htable_node_t *hn; + void *old = NULL; + + if( !( hn = __htable_lookup( ht, id ) ) ) { + hn = (htable_node_t*) malloc( sizeof( htable_node_t ) ); + hn->hn_id = id; + list_insert_head( &ht->ht_hash[ id % ht->ht_cap ], &hn->hn_link ); + ht->ht_size++; + } else old = hn->hn_data; + + hn->hn_data = data; + + return old; +} + +void *htable_remove( htable_t *ht, unsigned int id ) { + htable_node_t *hn; + + if( ( hn = __htable_lookup( ht, id ) ) ) { + void *data = hn->hn_data; + list_remove( &hn->hn_link ); + free( hn ); + ht->ht_size--; + return data; + } else return NULL; +} + +htable_node_t *__htable_lookup( htable_t *ht, unsigned int id ) { + htable_node_t *hn; + + list_iterate_begin( &ht->ht_hash[ id % ht->ht_cap ], hn, htable_node_t, hn_link ) { + if( hn->hn_id == id ) return hn; + } list_iterate_end(); + + return NULL; +} \ No newline at end of file diff --git a/htable.h b/htable.h new file mode 100644 index 0000000..65eac58 --- /dev/null +++ b/htable.h @@ -0,0 +1,45 @@ +#ifndef __HASHTABLE_H__ +#define __HASHTABLE_H__ + +#include + +#include "list.h" + +/* FIXME make this a doubly-hashed, dynamically groweable hashtable */ + +typedef struct htable { + list_t *ht_hash; /* table entries */ + unsigned int ht_size; /* table size */ + unsigned int ht_cap; /* table capacity */ +} htable_t; + +typedef struct htable_node { + list_t hn_link; /* link */ + unsigned int hn_id; /* hash id */ + void *hn_data; /* data */ +} htable_node_t; + +void htable_init( htable_t *ht, unsigned int cap ); +void htable_destroy( htable_t *ht ); +void *htable_get( htable_t *ht, unsigned int id ); +void *htable_put( htable_t *ht, unsigned int id, void *data ); +void *htable_remove( htable_t *ht, unsigned int id ); + +#define htable_iterate_begin( ht, key, var, type ) \ +do { \ + unsigned int ___i; \ + htable_t *__ht = (ht); \ + htable_node_t *__hnode; \ + for(___i = 0;___i < __ht->ht_cap;___i++ ) { \ + list_iterate_begin( &__ht->ht_hash[___i ], __hnode, htable_node_t, hn_link ) { \ + (var) = (type*) __hnode->hn_data; \ + (key) = __hnode->hn_id; \ + do + +#define htable_iterate_end() \ + while( 0 ); \ + } list_iterate_end(); \ + } \ +} while( 0 ) + +#endif /* __HASHTABLE_H__ */ \ No newline at end of file diff --git a/list.h b/list.h new file mode 100644 index 0000000..03fcdc5 --- /dev/null +++ b/list.h @@ -0,0 +1,137 @@ +#ifndef __LIST_H__ +#define __LIST_H__ + +#include + +/* +** Generic circular doubly linked list implementation. +** +** list_t is the head of the list. +** list_link_t should be included in structures which want to be +** linked on a list_t. +** +** All of the list functions take pointers to list_t and list_link_t +** types, unless otherwise specified. +** +** list_init(list) initializes a list_t to an empty list. +** +** list_empty(list) returns 1 iff the list is empty. +** +** Insertion functions. +** list_insert_head(list, link) inserts at the front of the list. +** list_insert_tail(list, link) inserts at the end of the list. +** list_insert_before(olink, nlink) inserts nlink before olink in list. +** +** Removal functions. +** Head is list->l_next. Tail is list->l_prev. +** The following functions should only be called on non-empty lists. +** list_remove(link) removes a specific element from the list. +** list_remove_head(list) removes the first element. +** list_remove_tail(list) removes the last element. +** +** Item accessors. +** list_item(link, type, member) given a list_link_t* and the name +** of the type of structure which contains the list_link_t and +** the name of the member corresponding to the list_link_t, +** returns a pointer (of type "type*") to the item. +** +** To iterate over a list, +** +** list_link_t *link; +** for (link = list->l_next; +** link != list; link = link->l_next) +** ... +** +** Or, use the macros, which will work even if you list_remove() the +** current link: +** +** type iterator; +** list_iterate_begin(list, iterator, type, member) { +** ... use iterator ... +** } list_iterate_end; +*/ + +typedef struct llist { + struct llist *l_next; + struct llist *l_prev; +} list_t, list_link_t; + +#define list_init(list) \ + (list)->l_next = (list)->l_prev = (list); + +#define list_link_init(link) \ + (link)->l_next = (link)->l_prev = NULL; + +#define list_empty(list) \ + ((list)->l_next == (list)) + +#define list_insert_before(old, new) \ + do { \ + list_link_t *prev = (new); \ + list_link_t *next = (old); \ + prev->l_next = next; \ + prev->l_prev = next->l_prev; \ + next->l_prev->l_next = prev; \ + next->l_prev = prev; \ + } while(0) + +#define list_insert_head(list, link) \ + list_insert_before((list)->l_next, link) + +#define list_insert_tail(list, link) \ + list_insert_before(list, link) + +#define list_remove(link) \ + do { \ + list_link_t *ll = (link); \ + list_link_t *prev = ll->l_prev; \ + list_link_t *next = ll->l_next; \ + prev->l_next = next; \ + next->l_prev = prev; \ + ll->l_next = ll->l_prev = 0; \ + } while(0) + +#define list_remove_head(list) \ + list_remove((list)->l_next) + +#define list_remove_tail(list) \ + list_remove((list)->l_prev) + +#define list_item(link, type, member) \ + (type*)((char*)(link) - offsetof(type, member)) + +#define list_head(list, type, member) \ + list_item((list)->l_next, type, member) + +#define list_tail(list, type, member) \ + list_item((list)->l_prev, type, member) + +#define list_iterate_begin(list, var, type, member) \ + do { \ + list_link_t *__link; \ + list_link_t *__next; \ + for (__link = (list)->l_next; \ + __link != (list); \ + __link = __next) { \ + var = list_item(__link, type, member); \ + __next = __link->l_next; + +#define list_iterate_end() \ + } \ + } while(0) + +#define list_iterate_reverse_begin(list, var, type, member) \ + do { \ + list_link_t *__link; \ + list_link_t *__prev; \ + for (__link = (list)->l_prev; \ + __link != (list); \ + __link = __prev) { \ + var = list_item(__link, type, member); \ + __prev = __link->l_prev; + +#define list_iterate_reverse_end() \ + } \ + } while(0) + +#endif /* __LIST_H__ */ \ No newline at end of file diff --git a/protocol.h b/protocol.h index b038901..39f26e6 100644 --- a/protocol.h +++ b/protocol.h @@ -4,7 +4,7 @@ struct Command { uint8_t commandType; - u_int16_t number; + uint16_t number; } __attribute__((packed)); struct Hello { diff --git a/snowcast_control b/snowcast_control index d6aaff0..1b879ff 100755 Binary files a/snowcast_control and b/snowcast_control differ diff --git a/snowcast_server b/snowcast_server index a4dfc51..bf1801b 100755 Binary files a/snowcast_server and b/snowcast_server differ diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c new file mode 100644 index 0000000..7f70294 --- /dev/null +++ b/snowcast_server_concurrent.c @@ -0,0 +1,457 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "protocol.h" + +typedef struct station { + char* filePath; + int currentChunk; +} station_t; + +typedef struct user { + int udpPort; + int stationNum; + int sockfd; +} user_t; + +#define NUM_STATIONS 2 +#define LINE_MAX 1024 +#define MAX_USERS 1000 + +/* For safe condition variable usage, must use a boolean predicate and */ +/* a mutex with the condition. */ +int count = 0; +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; + + +int start_threads = 0; + +int max_active_users = 0; + +pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; +// array from fd to users +user_t *user_data; +// array from fd to user's stream thread +pthread_t *user_stream_threads; +station_t station_data[NUM_STATIONS]; +int sockfd_to_user[MAX_USERS + 4]; + +char* port = "4950"; + +void *send_udp_packet_routine(void* arg); +void *select_thread(void* arg); +void *synchronization_thread(void* arg); + +void *get_in_addr(struct sockaddr *sa); + +void *init_user(int sockfd); +void *update_user_udpPort(int sockfd, int udpPort); +void *update_user_station(int sockfd, int stationNum); +void *print_user_data(int sockfd); +void *destroy_user(int sockfd); + + +// void *load_file(void* arg); + +main(int argc, char *argv[]) +{ + // temporary + station_data[0] = (station_t) {"mp3/Beethoven-SymphonyNo5.mp3", 0}; + station_data[1] = (station_t) {"mp3/DukeEllington-Caravan.mp3", 0}; + + // threads to control reading files at chunks while the other threads sleep + + user_data = malloc(sizeof(user_t) * max_active_users); + if (!user_data) { perror("malloc"); return 1; } + user_stream_threads = malloc(sizeof(pthread_t) * max_active_users); + if (!user_stream_threads) + { + perror("malloc"); return 1; + } + + // thread that manages file descriptors + pthread_t s_thread, sync_thread; + pthread_create(&s_thread, NULL, select_thread, NULL); + + // starts the threads after created + // sleep(1); + // startThreads = 0; + // pthread_cond_broadcast(&cond); + + // command line interface + char input[LINE_MAX]; + while (1) { + char *line = fgets(input, LINE_MAX, stdin); + + if (line == NULL) { + continue; + } else if (strncmp("q\n", input, LINE_MAX) == 0) { + // end code if type in q + printf("Exiting.\n"); + break; + } else if (strncmp("p\n", input, LINE_MAX) == 0) { + // print all user data + for (int i = 0; i < max_active_users; i++) { + print_user_data(i); + } + } else if (strncmp("s\n", input, LINE_MAX) == 0) { + // start the streaming threads + pthread_create(&sync_thread, NULL, synchronization_thread, NULL); + } + } + + return 0; +} + +/* Make the manager routine */ +void *send_udp_packet_routine(void *arg) { + // pthread_mutex_lock(&mutex); + // while(startThreads) { + // pthread_cond_wait(&cond, &mutex); + // } + // pthread_mutex_unlock(&mutex); + int did_send_data = 0; + int did_load_data = 0; + + int * i = (int *) arg; + while (1) + { + pthread_mutex_lock(&mutex); + if (!start_threads && did_send_data && did_load_data) { + did_load_data = 0; + did_send_data = 0; + } + while(!start_threads) { + pthread_cond_wait(&cond, &mutex); + } + pthread_mutex_unlock(&mutex); + if(!did_send_data && start_threads) { + printf("send data: thread %d \n", i); + did_send_data = 1; + } + if(!did_load_data && start_threads) { + printf("load data: thread %d \n", i); + did_load_data = 1; + } + } + return NULL; +} + +// /* Make the manager routine */ +// void *load_file(void *arg) { +// // read first data off the files +// pthread_mutex_lock(&mutex); +// while(startThreads) { +// pthread_cond_wait(&cond, &mutex); +// } +// pthread_mutex_unlock(&mutex); + +// int * i = (int *) arg; +// while (1) +// { +// pthread_mutex_lock(&mutex); +// while(startThreads) { +// pthread_cond_wait(&cond, &mutex); +// } +// pthread_mutex_unlock(&mutex); +// /* Do some work. */ +// printf("Thread %d \n", i); +// // send data to port +// // read data coming in off the file +// // sleep for a secong +// sleep(1); +// } +// return NULL; +// } + +void *synchronization_thread(void *arg) { + int c = 0; + while (1) + { + start_threads = 1; + printf("\nbroadcast %d\n", c++); + pthread_cond_broadcast(&cond); + usleep(10000); + start_threads = 0; + sleep(1); + } +} + +void *select_thread(void *arg) { + fd_set master; // master file descriptor list + fd_set read_fds; // temp file descriptor list for select() + int fdmax; // maximum file descriptor number + + int listener; // listening socket descriptor + int newfd; // newly accept()ed socket descriptor + struct sockaddr_storage remoteaddr; // client address + socklen_t addrlen; + + char buf[256]; // buffer for client data + int nbytes; + + + char remoteIP[INET6_ADDRSTRLEN]; + + int yes=1; // for setsockopt() SO_REUSEADDR, below + int i, j, rv; + + struct addrinfo hints, *ai, *p; + + // const char* port = argv[1]; + + FD_ZERO(&master); // clear the master and temp sets + FD_ZERO(&read_fds); + + // LISTENER: get us a socket and bind it + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { + fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); + exit(1); + } + + for(p = ai; p != NULL; p = p->ai_next) { + listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + if (listener < 0) { + continue; + } + + // lose the pesky "address already in use" error message + setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + + if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { + close(listener); + continue; + } + + break; + } + + // if we got here, it means we didn't get bound + if (p == NULL) { + fprintf(stderr, "snowcast_server: failed to bind\n"); + exit(2); + } + + freeaddrinfo(ai); // all done with this + + // listen + if (listen(listener, 10) == -1) { + perror("listen"); + exit(3); + } + + // add the listener to the master set + FD_SET(listener, &master); + + // keep track of the biggest file descriptor + fdmax = listener; // so far, it's this one + + while(1==1) { + read_fds = master; // copy it + if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { + perror("select"); + exit(4); + } + + // run through the existing connections looking for data to read + for(i = 0; i <= fdmax; i++) { + if (FD_ISSET(i, &read_fds)) { // we got one!! + if (i == listener) { + // handle new connections + addrlen = sizeof remoteaddr; + newfd = accept(listener, + (struct sockaddr *)&remoteaddr, + &addrlen); + + if (newfd == -1) { + perror("accept"); + } else { + FD_SET(newfd, &master); // add to master set + if (newfd > fdmax) { // keep track of the max + fdmax = newfd; + } + printf("selectserver: new connection from %s on " + "socket %d\n.", + inet_ntop(remoteaddr.ss_family, + get_in_addr((struct sockaddr*)&remoteaddr), + remoteIP, INET6_ADDRSTRLEN), + newfd); + + // init user with this newfd + init_user(newfd); + + // send the welcome message to client + struct Welcome welcome; + welcome.replyType = 2; + welcome.numStations = htons(NUM_STATIONS); + if ((send(newfd, &welcome, sizeof(struct Welcome), 0)) == -1) + perror("send"); + } + } else { + // handle data from a client + struct Command command; + if ((nbytes = recv(i, (char*)&command, sizeof(struct Command), 0)) <= 0) { + // got error or connection closed by client + if (nbytes == 0) { + // connection closed + printf("selectserver: socket %d hung up\n", i); + } else { + perror("recv"); + } + close(i); // bye! + FD_CLR(i, &master); // remove from master set + // remove user from data structures + destroy_user(i); + } else { + // we got some data from a client + if (command.commandType == 0) { + // hello message with udpPort + printf("udpPort (from Hello) for new connection is %d.\n", ntohs(command.number)); + // update udp port of user + update_user_udpPort(i, ntohs(command.number)); + + + + // // TALKER: get us a udp socket and bind it + // struct addrinfo hintsUdp, *servinfoUdp, *pUdp; + // int rvUdp, sockfdUdp, numbytesUdp; + // memset(&hintsUdp, 0, sizeof hintsUdp); + // hintsUdp.ai_family = AF_INET; // IPv4 + // hintsUdp.ai_socktype = SOCK_DGRAM; // UDP + // if ((rvUdp = getaddrinfo(argv[1], command.number, &hints, &servinfoUdp)) != 0) { + // fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rvUdp)); + // return 1; + // } + + // // loop through all the results and make a socket + // for(p = servinfoUdp; p != NULL; p = p->ai_next) { + // if ((sockfdUdp = socket(p->ai_family, p->ai_socktype, + // p->ai_protocol)) == -1) { + // perror("talker: socket"); + // continue; + // } + // break; + // } + + // if (p == NULL) { + // fprintf(stderr, "talker: failed to create socket\n"); + // return 2; + // } + + + // if ((numbytesUdp = sendto(sockfdUdp, "test", strlen("test"), 0, + // p->ai_addr, p->ai_addrlen)) == -1) { + // perror("talker: sendto"); + // exit(1); + // } + + // freeaddrinfo(servinfoUdp); + + // printf("talker: sent %d bytes to %d\n", numbytesUdp, sockfdUdp); + // close(sockfdUdp); + } + else if (command.commandType == 1) { + // setStation command for the user + printf("TODO: set station to %d\n", ntohs(command.number)); + // update station of user + update_user_station(i, ntohs(command.number)); + } + else { + // send back in invalid command + struct InvalidCommand invalid; + invalid.replyType = 4; + invalid.replyStringSize = 21; + // make a string with the command.commmandType type in it + invalid.replyString = "Invalid command type"; + if ((send(i, &invalid, sizeof(struct InvalidCommand), 0)) == -1) + perror("send"); + + // drop connection upon invalid command + close(i); + FD_CLR(i, &master); + } + } + } // END handle data from client + } // END got new incoming connection + } // END looping through file descriptors + + // broadcast the new files over the udp socket list for each use + + } // END for(;;)--and you thought it would never end! +} + +void *init_user(int sockfd) { + // add the user to the list of user data + pthread_mutex_lock(&mutex_user_data); + + // this is to save memory space. + // in general, the displacement of 4 is where a user "used to be" + int user_index = max_active_users++; + if(user_data[sockfd-4].sockfd == -1) { + printf("reusing memory\n"); + user_index = sockfd - 4; + } else { + printf("making new memory\n"); + // have to make more memory + user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); + if (!more_users) { perror("realloc"); exit(1); } + user_data = more_users; + pthread_t *more_stream_threads = realloc(user_stream_threads, sizeof(pthread_t) * max_active_users); + if (!more_stream_threads) { perror("realloc"); exit(1); } + user_stream_threads = more_stream_threads; + } + // map sockfd to this user index & create its stream thread + user_data[user_index] = (user_t) {-1, -1, sockfd}; + sockfd_to_user[sockfd] = user_index; + pthread_create(&user_stream_threads[user_index], NULL, send_udp_packet_routine, (void *)sockfd); + // free(user_stream_threads); + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_udpPort(int sockfd, int udpPort) { + pthread_mutex_lock(&mutex_user_data); + user_data[sockfd_to_user[sockfd]].udpPort = udpPort; + pthread_mutex_unlock(&mutex_user_data); +} +void *update_user_station(int sockfd, int stationNum) { + pthread_mutex_lock(&mutex_user_data); + user_data[sockfd_to_user[sockfd]].stationNum = stationNum; + pthread_mutex_unlock(&mutex_user_data); +} +void *print_user_data(int index) { + printf("udpPort: %d, stationNum: %d, sockfd: %d\n", + user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd); +} +void *destroy_user(int sockfd) { + pthread_mutex_lock(&mutex_user_data); + + // stop the thread streaming to the user + pthread_cancel(user_stream_threads[sockfd_to_user[sockfd]]); + // "remove" the user from the list of user data + user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1}; + // map sockfd to -1 + sockfd_to_user[sockfd] = -1; + + pthread_mutex_unlock(&mutex_user_data); +} + + +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} \ No newline at end of file diff --git a/test b/test new file mode 100755 index 0000000..4bb883b Binary files /dev/null and b/test differ -- cgit v1.2.3-70-g09d2