#include #include #include #include #include #include #include #include #include #include "protocol.h" typedef struct station { char* filePath; int currentChunk; } station_t; typedef struct user { int udpPort; int stationNum; int sockfd; pthread_t streamThread; } user_t; #define NUM_STATIONS 2 #define LINE_MAX 1024 #define MAX_USERS 1000 /* For safe condition variable usage, must use a boolean predicate and */ /* a mutex with the condition. */ int count = 0; pthread_cond_t cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; int start_threads = 0; int max_active_users = 0; pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER; // array from index to user_data user_t *user_data; station_t station_data[NUM_STATIONS]; int sockfd_to_user[MAX_USERS + 4]; char* port = "4950"; void *send_udp_packet_routine(void* arg); void *select_thread(void* arg); void *synchronization_thread(void* arg); void *get_in_addr(struct sockaddr *sa); void *init_user(int sockfd); void *update_user_udpPort(int sockfd, int udpPort); void *update_user_station(int sockfd, int stationNum); void *print_user_data(int sockfd); void *destroy_user(int sockfd); // void *load_file(void* arg); main(int argc, char *argv[]) { // temporary station_data[0] = (station_t) {"mp3/Beethoven-SymphonyNo5.mp3", 0}; station_data[1] = (station_t) {"mp3/DukeEllington-Caravan.mp3", 0}; // threads to control reading files at chunks while the other threads sleep user_data = malloc(sizeof(user_t) * max_active_users); if (!user_data) { perror("malloc"); return 1; } // thread that manages file descriptors pthread_t s_thread, sync_thread; pthread_create(&s_thread, NULL, select_thread, NULL); // starts the threads after created // sleep(1); // startThreads = 0; // pthread_cond_broadcast(&cond); // command line interface char input[LINE_MAX]; while (1) { char *line = fgets(input, LINE_MAX, stdin); if (line == NULL) { continue; } else if (strncmp("q\n", input, LINE_MAX) == 0) { // end code if type in q printf("Exiting.\n"); break; } else if (strncmp("p\n", input, LINE_MAX) == 0) { // print all user data for (int i = 0; i < max_active_users; i++) { print_user_data(i); } } else if (strncmp("s\n", input, LINE_MAX) == 0) { // start the streaming threads pthread_create(&sync_thread, NULL, synchronization_thread, NULL); } } return 0; } /* Make the manager routine */ void *send_udp_packet_routine(void *arg) { // pthread_mutex_lock(&mutex); // while(startThreads) { // pthread_cond_wait(&cond, &mutex); // } // pthread_mutex_unlock(&mutex); int did_send_data = 0; int did_load_data = 0; pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; int * i = (int *) arg; while (1) { pthread_mutex_lock(&m); if (!start_threads && did_send_data && did_load_data) { did_load_data = 0; did_send_data = 0; } while(!start_threads) { pthread_cond_wait(&cond, &m); } pthread_mutex_unlock(&m); if(!did_send_data) { printf("send data: thread %d \n", i); did_send_data = 1; } if(!did_load_data) { printf("load data: thread %d \n", i); did_load_data = 1; } } return NULL; } // /* Make the manager routine */ // void *load_file(void *arg) { // // read first data off the files // pthread_mutex_lock(&mutex); // while(startThreads) { // pthread_cond_wait(&cond, &mutex); // } // pthread_mutex_unlock(&mutex); // int * i = (int *) arg; // while (1) // { // pthread_mutex_lock(&mutex); // while(startThreads) { // pthread_cond_wait(&cond, &mutex); // } // pthread_mutex_unlock(&mutex); // /* Do some work. */ // printf("Thread %d \n", i); // // send data to port // // read data coming in off the file // // sleep for a secong // sleep(1); // } // return NULL; // } void *synchronization_thread(void *arg) { int c = 0; while (1) { start_threads = 1; printf("\nbroadcast %d\n", c++); pthread_cond_broadcast(&cond); usleep(1000); start_threads = 0; sleep(1); } } void *select_thread(void *arg) { fd_set master; // master file descriptor list fd_set read_fds; // temp file descriptor list for select() int fdmax; // maximum file descriptor number int listener; // listening socket descriptor int newfd; // newly accept()ed socket descriptor struct sockaddr_storage remoteaddr; // client address socklen_t addrlen; char buf[256]; // buffer for client data int nbytes; char remoteIP[INET6_ADDRSTRLEN]; int yes=1; // for setsockopt() SO_REUSEADDR, below int i, j, rv; struct addrinfo hints, *ai, *p; // const char* port = argv[1]; FD_ZERO(&master); // clear the master and temp sets FD_ZERO(&read_fds); // LISTENER: get us a socket and bind it memset(&hints, 0, sizeof hints); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; if ((rv = getaddrinfo(NULL, port, &hints, &ai)) != 0) { fprintf(stderr, "snowcast_server: %s\n", gai_strerror(rv)); exit(1); } for(p = ai; p != NULL; p = p->ai_next) { listener = socket(p->ai_family, p->ai_socktype, p->ai_protocol); if (listener < 0) { continue; } // lose the pesky "address already in use" error message setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); if (bind(listener, p->ai_addr, p->ai_addrlen) < 0) { close(listener); continue; } break; } // if we got here, it means we didn't get bound if (p == NULL) { fprintf(stderr, "snowcast_server: failed to bind\n"); exit(2); } freeaddrinfo(ai); // all done with this // listen if (listen(listener, 10) == -1) { perror("listen"); exit(3); } // add the listener to the master set FD_SET(listener, &master); // keep track of the biggest file descriptor fdmax = listener; // so far, it's this one while(1==1) { read_fds = master; // copy it if (select(fdmax+1, &read_fds, NULL, NULL, NULL) == -1) { perror("select"); exit(4); } // run through the existing connections looking for data to read for(i = 0; i <= fdmax; i++) { if (FD_ISSET(i, &read_fds)) { // we got one!! if (i == listener) { // handle new connections addrlen = sizeof remoteaddr; newfd = accept(listener, (struct sockaddr *)&remoteaddr, &addrlen); if (newfd == -1) { perror("accept"); } else { FD_SET(newfd, &master); // add to master set if (newfd > fdmax) { // keep track of the max fdmax = newfd; } printf("selectserver: new connection from %s on " "socket %d\n.", inet_ntop(remoteaddr.ss_family, get_in_addr((struct sockaddr*)&remoteaddr), remoteIP, INET6_ADDRSTRLEN), newfd); // init user with this newfd init_user(newfd); // send the welcome message to client struct Welcome welcome; welcome.replyType = 2; welcome.numStations = htons(NUM_STATIONS); if ((send(newfd, &welcome, sizeof(struct Welcome), 0)) == -1) perror("send"); } } else { // handle data from a client struct Command command; if ((nbytes = recv(i, (char*)&command, sizeof(struct Command), 0)) <= 0) { // got error or connection closed by client if (nbytes == 0) { // connection closed printf("selectserver: socket %d hung up\n", i); } else { perror("recv"); } close(i); // bye! FD_CLR(i, &master); // remove from master set // remove user from data structures destroy_user(i); } else { // we got some data from a client if (command.commandType == 0) { // hello message with udpPort printf("udpPort (from Hello) for new connection is %d.\n", ntohs(command.number)); // update udp port of user update_user_udpPort(i, ntohs(command.number)); // // TALKER: get us a udp socket and bind it // struct addrinfo hintsUdp, *servinfoUdp, *pUdp; // int rvUdp, sockfdUdp, numbytesUdp; // memset(&hintsUdp, 0, sizeof hintsUdp); // hintsUdp.ai_family = AF_INET; // IPv4 // hintsUdp.ai_socktype = SOCK_DGRAM; // UDP // if ((rvUdp = getaddrinfo(argv[1], command.number, &hints, &servinfoUdp)) != 0) { // fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rvUdp)); // return 1; // } // // loop through all the results and make a socket // for(p = servinfoUdp; p != NULL; p = p->ai_next) { // if ((sockfdUdp = socket(p->ai_family, p->ai_socktype, // p->ai_protocol)) == -1) { // perror("talker: socket"); // continue; // } // break; // } // if (p == NULL) { // fprintf(stderr, "talker: failed to create socket\n"); // return 2; // } // if ((numbytesUdp = sendto(sockfdUdp, "test", strlen("test"), 0, // p->ai_addr, p->ai_addrlen)) == -1) { // perror("talker: sendto"); // exit(1); // } // freeaddrinfo(servinfoUdp); // printf("talker: sent %d bytes to %d\n", numbytesUdp, sockfdUdp); // close(sockfdUdp); } else if (command.commandType == 1) { // setStation command for the user printf("TODO: set station to %d\n", ntohs(command.number)); // update station of user update_user_station(i, ntohs(command.number)); } else { // send back in invalid command struct InvalidCommand invalid; invalid.replyType = 4; invalid.replyStringSize = 21; // make a string with the command.commmandType type in it invalid.replyString = "Invalid command type"; if ((send(i, &invalid, sizeof(struct InvalidCommand), 0)) == -1) perror("send"); // drop connection upon invalid command close(i); FD_CLR(i, &master); } } } // END handle data from client } // END got new incoming connection } // END looping through file descriptors // broadcast the new files over the udp socket list for each use } // END for(;;)--and you thought it would never end! } void *init_user(int sockfd) { // add the user to the list of user data pthread_mutex_lock(&mutex_user_data); // this is to save memory space. // in general, the displacement of 4 is where a user "used to be" int user_index = max_active_users++; if(user_data[sockfd-4].sockfd == -1) { printf("reusing memory\n"); user_index = sockfd - 4; } else { printf("making new memory\n"); // have to make more memory user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users); if (!more_users) { perror("realloc"); exit(1); } user_data = more_users; } // map sockfd to this user index & create its stream thread pthread_t user_thread; pthread_create(&user_thread, NULL, send_udp_packet_routine, (void *)user_index); user_data[user_index] = (user_t){-1, -1, sockfd, user_thread}; sockfd_to_user[sockfd] = user_index; // free(user_stream_threads); pthread_mutex_unlock(&mutex_user_data); } void *update_user_udpPort(int sockfd, int udpPort) { pthread_mutex_lock(&mutex_user_data); user_data[sockfd_to_user[sockfd]].udpPort = udpPort; pthread_mutex_unlock(&mutex_user_data); } void *update_user_station(int sockfd, int stationNum) { pthread_mutex_lock(&mutex_user_data); user_data[sockfd_to_user[sockfd]].stationNum = stationNum; pthread_mutex_unlock(&mutex_user_data); } void *print_user_data(int index) { printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n", user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread); } void *destroy_user(int sockfd) { pthread_mutex_lock(&mutex_user_data); // stop the thread streaming to the user pthread_cancel(user_data[sockfd_to_user[sockfd]].streamThread); // "remove" the user from the list of user data user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1}; // map sockfd to -1 sockfd_to_user[sockfd] = -1; pthread_mutex_unlock(&mutex_user_data); } void *get_in_addr(struct sockaddr *sa) { if (sa->sa_family == AF_INET) { return &(((struct sockaddr_in*)sa)->sin_addr); } return &(((struct sockaddr_in6*)sa)->sin6_addr); }