aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMartijn van Beurden <mvanb1@gmail.com>2023-07-25 13:41:54 +0200
committerMartijn van Beurden <mvanb1@gmail.com>2023-09-22 21:10:15 +0200
commit021e82bfcce8a57b304adeafd7554bdd9b28ea27 (patch)
tree1ac689a2ee97017577fc6cc43f53c05e990e5396
parentbcc37540aa8a627cf8b26cffb73040b19b9fff27 (diff)
downloadflac-021e82bfcce8a57b304adeafd7554bdd9b28ea27.tar.gz
Add checks for 'overcommitting'
This scales the number of active threads back when threads have to wait for work too often
-rw-r--r--src/libFLAC/stream_encoder.c41
1 files changed, 40 insertions, 1 deletions
diff --git a/src/libFLAC/stream_encoder.c b/src/libFLAC/stream_encoder.c
index 025dc18e..9c15a3c2 100644
--- a/src/libFLAC/stream_encoder.c
+++ b/src/libFLAC/stream_encoder.c
@@ -497,15 +497,18 @@ typedef struct FLAC__StreamEncoderPrivate {
uint32_t num_created_threads;
uint32_t next_thread; /* This is the next thread that needs start, or needs to finish and be restarted */
uint32_t num_started_threadtasks;
- uint32_t num_available_threadtasks;
+ uint32_t num_available_threadtasks; /* Number of threadtasks that are available to work on */
+ uint32_t num_running_threads;
uint32_t next_threadtask; /* Next threadtask that is available to work on */
pthread_mutex_t mutex_md5_fifo;
pthread_mutex_t mutex_md5_active;
pthread_mutex_t mutex_work_queue; /* To lock work related variables in this struct */
pthread_cond_t cond_md5_emptied; /* To signal to main thread that MD5 queue has been emptied */
pthread_cond_t cond_work_available; /* To signal to threads that work is available */
+ pthread_cond_t cond_wake_up_thread; /* To signal that one sleeping thread can wake up */
FLAC__bool md5_work_available; /* To signal to threads that work is available */
FLAC__bool finish_work_threads;
+ int32_t overcommitted_indicator;
verify_input_fifo md5_fifo;
#endif
} FLAC__StreamEncoderPrivate;
@@ -1188,6 +1191,15 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_(
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
+ if(pthread_cond_init(&encoder->private_->cond_wake_up_thread, NULL)) {
+ pthread_mutex_destroy(&encoder->private_->mutex_md5_fifo);
+ pthread_mutex_destroy(&encoder->private_->mutex_md5_active);
+ pthread_mutex_destroy(&encoder->private_->mutex_work_queue);
+ pthread_cond_destroy(&encoder->private_->cond_md5_emptied);
+ pthread_cond_destroy(&encoder->private_->cond_work_available);
+ encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
+ return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
+ }
if(encoder->protected_->do_md5) {
encoder->private_->md5_fifo.size = (encoder->protected_->blocksize+OVERREAD_) * (encoder->private_->num_threadtasks + 2);
for(i = 0; i < encoder->protected_->channels; i++) {
@@ -1197,6 +1209,7 @@ static FLAC__StreamEncoderInitStatus init_stream_internal_(
pthread_mutex_destroy(&encoder->private_->mutex_work_queue);
pthread_cond_destroy(&encoder->private_->cond_md5_emptied);
pthread_cond_destroy(&encoder->private_->cond_work_available);
+ pthread_cond_destroy(&encoder->private_->cond_wake_up_thread);
encoder->protected_->state = FLAC__STREAM_ENCODER_MEMORY_ALLOCATION_ERROR;
return FLAC__STREAM_ENCODER_INIT_STATUS_ENCODER_ERROR;
}
@@ -1719,6 +1732,7 @@ FLAC_API FLAC__bool FLAC__stream_encoder_finish(FLAC__StreamEncoder *encoder)
pthread_mutex_lock(&encoder->private_->mutex_work_queue);
for(t = 1; t < encoder->private_->num_created_threads; t++)
encoder->private_->finish_work_threads = true;
+ pthread_cond_broadcast(&encoder->private_->cond_wake_up_thread);
pthread_cond_broadcast(&encoder->private_->cond_work_available);
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
@@ -2666,8 +2680,10 @@ void set_defaults_(FLAC__StreamEncoder *encoder)
#ifdef HAVE_PTHREAD
encoder->private_->num_created_threads = 1;
encoder->private_->next_thread = 1;
+ encoder->private_->num_running_threads = 1;
encoder->private_->num_started_threadtasks = 1;
encoder->private_->num_available_threadtasks = 0;
+ encoder->private_->overcommitted_indicator = 0;
encoder->private_->next_threadtask = 1;
encoder->private_->md5_work_available = false;
encoder->private_->finish_work_threads = false;
@@ -2777,6 +2793,7 @@ void free_(FLAC__StreamEncoder *encoder)
pthread_mutex_destroy(&encoder->private_->mutex_work_queue);
pthread_cond_destroy(&encoder->private_->cond_md5_emptied);
pthread_cond_destroy(&encoder->private_->cond_work_available);
+ pthread_cond_destroy(&encoder->private_->cond_wake_up_thread);
if(encoder->protected_->do_md5) {
for(i = 0; i < encoder->protected_->channels; i++) {
if(0 != encoder->private_->md5_fifo.data[i]) {
@@ -3547,8 +3564,30 @@ void * process_frame_thread_(void * args) {
FLAC__StreamEncoder * encoder = args;
uint32_t channel;
+ pthread_mutex_lock(&encoder->private_->mutex_work_queue);
+ encoder->private_->num_running_threads++;
+ pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
+
while(1) {
pthread_mutex_lock(&encoder->private_->mutex_work_queue);
+ if(encoder->private_->finish_work_threads) {
+ pthread_mutex_unlock(&encoder->private_->mutex_work_queue);
+ return NULL;
+ }
+ if(encoder->private_->num_available_threadtasks == 0)
+ encoder->private_->overcommitted_indicator++;
+ else if(encoder->private_->num_available_threadtasks > encoder->private_->num_running_threads)
+ encoder->private_->overcommitted_indicator--;
+ if(encoder->private_->overcommitted_indicator < -20) {
+ encoder->private_->overcommitted_indicator = 0;
+ pthread_cond_signal(&encoder->private_->cond_wake_up_thread);
+ }
+ else if(encoder->private_->overcommitted_indicator > 20 && encoder->private_->num_running_threads > 2) {
+ encoder->private_->overcommitted_indicator = 0;
+ encoder->private_->num_running_threads--;
+ pthread_cond_wait(&encoder->private_->cond_wake_up_thread, &encoder->private_->mutex_work_queue);
+ encoder->private_->num_running_threads++;
+ }
while(encoder->private_->num_available_threadtasks == 0 && !encoder->private_->md5_work_available) {
if(encoder->private_->finish_work_threads) {
pthread_mutex_unlock(&encoder->private_->mutex_work_queue);