aboutsummaryrefslogtreecommitdiff
path: root/snowcast_server_concurrent.c
diff options
context:
space:
mode:
Diffstat (limited to 'snowcast_server_concurrent.c')
-rw-r--r--snowcast_server_concurrent.c42
1 files changed, 19 insertions, 23 deletions
diff --git a/snowcast_server_concurrent.c b/snowcast_server_concurrent.c
index 7f70294..0586635 100644
--- a/snowcast_server_concurrent.c
+++ b/snowcast_server_concurrent.c
@@ -19,6 +19,7 @@ typedef struct user {
int udpPort;
int stationNum;
int sockfd;
+ pthread_t streamThread;
} user_t;
#define NUM_STATIONS 2
@@ -37,10 +38,8 @@ int start_threads = 0;
int max_active_users = 0;
pthread_mutex_t mutex_user_data = PTHREAD_MUTEX_INITIALIZER;
-// array from fd to users
+// array from index to user_data
user_t *user_data;
-// array from fd to user's stream thread
-pthread_t *user_stream_threads;
station_t station_data[NUM_STATIONS];
int sockfd_to_user[MAX_USERS + 4];
@@ -71,11 +70,6 @@ main(int argc, char *argv[])
user_data = malloc(sizeof(user_t) * max_active_users);
if (!user_data) { perror("malloc"); return 1; }
- user_stream_threads = malloc(sizeof(pthread_t) * max_active_users);
- if (!user_stream_threads)
- {
- perror("malloc"); return 1;
- }
// thread that manages file descriptors
pthread_t s_thread, sync_thread;
@@ -121,26 +115,29 @@ void *send_udp_packet_routine(void *arg) {
int did_send_data = 0;
int did_load_data = 0;
+ pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
+
int * i = (int *) arg;
while (1)
{
- pthread_mutex_lock(&mutex);
+ pthread_mutex_lock(&m);
if (!start_threads && did_send_data && did_load_data) {
did_load_data = 0;
did_send_data = 0;
}
while(!start_threads) {
- pthread_cond_wait(&cond, &mutex);
+ pthread_cond_wait(&cond, &m);
}
- pthread_mutex_unlock(&mutex);
- if(!did_send_data && start_threads) {
+ pthread_mutex_unlock(&m);
+ if(!did_send_data) {
printf("send data: thread %d \n", i);
did_send_data = 1;
}
- if(!did_load_data && start_threads) {
+ if(!did_load_data) {
printf("load data: thread %d \n", i);
did_load_data = 1;
}
+
}
return NULL;
}
@@ -179,9 +176,10 @@ void *synchronization_thread(void *arg) {
start_threads = 1;
printf("\nbroadcast %d\n", c++);
pthread_cond_broadcast(&cond);
- usleep(10000);
+ usleep(1000);
start_threads = 0;
sleep(1);
+
}
}
@@ -408,14 +406,12 @@ void *init_user(int sockfd) {
user_t *more_users = realloc(user_data, sizeof(user_t) * max_active_users);
if (!more_users) { perror("realloc"); exit(1); }
user_data = more_users;
- pthread_t *more_stream_threads = realloc(user_stream_threads, sizeof(pthread_t) * max_active_users);
- if (!more_stream_threads) { perror("realloc"); exit(1); }
- user_stream_threads = more_stream_threads;
}
// map sockfd to this user index & create its stream thread
- user_data[user_index] = (user_t) {-1, -1, sockfd};
+ pthread_t user_thread;
+ pthread_create(&user_thread, NULL, send_udp_packet_routine, (void *)user_index);
+ user_data[user_index] = (user_t){-1, -1, sockfd, user_thread};
sockfd_to_user[sockfd] = user_index;
- pthread_create(&user_stream_threads[user_index], NULL, send_udp_packet_routine, (void *)sockfd);
// free(user_stream_threads);
pthread_mutex_unlock(&mutex_user_data);
}
@@ -430,16 +426,16 @@ void *update_user_station(int sockfd, int stationNum) {
pthread_mutex_unlock(&mutex_user_data);
}
void *print_user_data(int index) {
- printf("udpPort: %d, stationNum: %d, sockfd: %d\n",
- user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd);
+ printf("udpPort: %d, stationNum: %d, sockfd: %d, threadId:%d\n",
+ user_data[index].udpPort, user_data[index].stationNum, user_data[index].sockfd, user_data[index].streamThread);
}
void *destroy_user(int sockfd) {
pthread_mutex_lock(&mutex_user_data);
// stop the thread streaming to the user
- pthread_cancel(user_stream_threads[sockfd_to_user[sockfd]]);
+ pthread_cancel(user_data[sockfd_to_user[sockfd]].streamThread);
// "remove" the user from the list of user data
- user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1};
+ user_data[sockfd_to_user[sockfd]] = (user_t) {-1, -1, -1, -1};
// map sockfd to -1
sockfd_to_user[sockfd] = -1;