diff options
author | Martijn van Beurden <mvanb1@gmail.com> | 2023-07-25 13:41:54 +0200 |
---|---|---|
committer | Martijn van Beurden <mvanb1@gmail.com> | 2023-09-22 21:10:15 +0200 |
commit | 021e82bfcce8a57b304adeafd7554bdd9b28ea27 (patch) | |
tree | 1ac689a2ee97017577fc6cc43f53c05e990e5396 | |
parent | bcc37540aa8a627cf8b26cffb73040b19b9fff27 (diff) | |
download | flac-021e82bfcce8a57b304adeafd7554bdd9b28ea27.tar.gz |
Add checks for 'overcommitting'
This scales the number of active threads back when threads have to
wait for work too often
-rw-r--r-- | src/libFLAC/stream_encoder.c | 41 |
1 files changed, 40 insertions, 1 deletions
diff --git a/src/libFLAC/stream_encoder.c b/src/libFLAC/stream_encoder.c index 025dc18e..9c15a3c2 100644 --- a/src/libFLAC/stream_encoder.c +++ b/src/libFLAC/stream_encoder.c @@ -497,15 +497,18 @@ typedef struct FLAC__StreamEncoderPrivate { uint32_t num_created_threads; uint32_t next_thread; /* This is the next thread that needs start, or needs to finish and be restarted */ uint32_t num_started_threadtasks; - uint32_t num_available_threadtasks; + uint32_t num_available_threadtasks; /* Number of threadtasks that are available to work on */ + uint32_t num_running_threads; uint32_t next_threadtask; /* Next threadtask that is available to work on */ pthread_mutex_t mutex_md5_fifo; pthread_mutex_t mutex_md5_active; pthread_mutex_t mutex_work_queue; /* To lock work related variables in this struct */ pthread_cond_t cond_md5_emptied; /* To signal to main thread that MD5 queue has been emptied */ pthread_cond_t cond_work_available; /* To signal to threads that work is available */ + pthread_cond_t cond_wake_up_thread; /* To signal that one sleeping thread can wake up */ FLAC__bool md5_work_available; /* To signal to threads that work is available */ FLAC__bool finish_work_threads; + int32_t overcommitted_indicator; verify_input_fifo md5_fifo; #endif } FLAC__StreamEncoderPrivate; @@ -1188,6 +1191,15 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_( encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; } + if(pthread_cond_init(&encoder->private_->cond_wake_up_thread, NULL)) { + pthread_mutex_destroy(&encoder->private_->mutex_md5_fifo); + pthread_mutex_destroy(&encoder->private_->mutex_md5_active); + pthread_mutex_destroy(&encoder->private_->mutex_work_queue); + pthread_cond_destroy(&encoder->private_->cond_md5_emptied); + pthread_cond_destroy(&encoder->private_->cond_work_available); + encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; + return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; + } if(encoder->protected_->do_md5) { encoder->private_->md5_fifo.size = (encoder->protected_->blocksize+OVERREAD_) * (encoder->private_->num_threadtasks + 2); for(i = 0; i < encoder->protected_->channels; i++) { @@ -1197,6 +1209,7 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_( pthread_mutex_destroy(&encoder->private_->mutex_work_queue); pthread_cond_destroy(&encoder->private_->cond_md5_emptied); pthread_cond_destroy(&encoder->private_->cond_work_available); + pthread_cond_destroy(&encoder->private_->cond_wake_up_thread); encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR; return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR; } @@ -1719,6 +1732,7 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder) pthread_mutex_lock(&encoder->private_->mutex_work_queue); for(t = 1; t < encoder->private_->num_created_threads; t++) encoder->private_->finish_work_threads = true; + pthread_cond_broadcast(&encoder->private_->cond_wake_up_thread); pthread_cond_broadcast(&encoder->private_->cond_work_available); pthread_mutex_unlock(&encoder->private_->mutex_work_queue); @@ -2666,8 +2680,10 @@ void set_defaults_(FLAC__StreamEncoder *encoder) #ifdef HAVE_PTHREAD encoder->private_->num_created_threads = 1; encoder->private_->next_thread = 1; + encoder->private_->num_running_threads = 1; encoder->private_->num_started_threadtasks = 1; encoder->private_->num_available_threadtasks = 0; + encoder->private_->overcommitted_indicator = 0; encoder->private_->next_threadtask = 1; encoder->private_->md5_work_available = false; encoder->private_->finish_work_threads = false; @@ -2777,6 +2793,7 @@ void free_(FLAC__StreamEncoder *encoder) pthread_mutex_destroy(&encoder->private_->mutex_work_queue); pthread_cond_destroy(&encoder->private_->cond_md5_emptied); pthread_cond_destroy(&encoder->private_->cond_work_available); + pthread_cond_destroy(&encoder->private_->cond_wake_up_thread); if(encoder->protected_->do_md5) { for(i = 0; i < encoder->protected_->channels; i++) { if(0 != encoder->private_->md5_fifo.data[i]) { @@ -3547,8 +3564,30 @@ void * process_frame_thread_(void * args) { FLAC__StreamEncoder * encoder = args; uint32_t channel; + pthread_mutex_lock(&encoder->private_->mutex_work_queue); + encoder->private_->num_running_threads++; + pthread_mutex_unlock(&encoder->private_->mutex_work_queue); + while(1) { pthread_mutex_lock(&encoder->private_->mutex_work_queue); + if(encoder->private_->finish_work_threads) { + pthread_mutex_unlock(&encoder->private_->mutex_work_queue); + return NULL; + } + if(encoder->private_->num_available_threadtasks == 0) + encoder->private_->overcommitted_indicator++; + else if(encoder->private_->num_available_threadtasks > encoder->private_->num_running_threads) + encoder->private_->overcommitted_indicator--; + if(encoder->private_->overcommitted_indicator < -20) { + encoder->private_->overcommitted_indicator = 0; + pthread_cond_signal(&encoder->private_->cond_wake_up_thread); + } + else if(encoder->private_->overcommitted_indicator > 20 && encoder->private_->num_running_threads > 2) { + encoder->private_->overcommitted_indicator = 0; + encoder->private_->num_running_threads--; + pthread_cond_wait(&encoder->private_->cond_wake_up_thread, &encoder->private_->mutex_work_queue); + encoder->private_->num_running_threads++; + } while(encoder->private_->num_available_threadtasks == 0 && !encoder->private_->md5_work_available) { if(encoder->private_->finish_work_threads) { pthread_mutex_unlock(&encoder->private_->mutex_work_queue); |