From c8d28648629384ab345e4a6c08f3fe5780a02f2d Mon Sep 17 00:00:00 2001 From: Christopher Snowhill Date: Thu, 13 Jan 2022 23:05:32 -0800 Subject: [PATCH] Cog Audio: Enhance playback queue handler, so it always halts buffering when there are at least 30 seconds worth of buffers filled, possibly spanning multiple files. Also improve the chain reset function so that playlist changes and playback order control reset the queue properly when the queue refill function is currently entered in another thread. --- Audio/AudioPlayer.h | 9 ++++ Audio/AudioPlayer.m | 88 +++++++++++++++++++++++++------------ Audio/Chain/BufferChain.h | 2 + Audio/Chain/BufferChain.m | 11 +++++ Audio/Chain/ConverterNode.m | 5 +++ Audio/Chain/InputNode.m | 6 +++ Audio/Chain/Node.h | 2 + Audio/Chain/Node.m | 6 +++ 8 files changed, 102 insertions(+), 27 deletions(-) diff --git a/Audio/AudioPlayer.h b/Audio/AudioPlayer.h index 7559e0473..e16095885 100644 --- a/Audio/AudioPlayer.h +++ b/Audio/AudioPlayer.h @@ -8,6 +8,10 @@ #import +#import + +#import + @class BufferChain; @class OutputNode; @@ -30,6 +34,11 @@ BOOL endOfInputReached; BOOL startedPaused; BOOL initialBufferFilled; + + Semaphore *semaphore; + + atomic_bool resettingNow; + atomic_int refCount; } - (id)init; diff --git a/Audio/AudioPlayer.m b/Audio/AudioPlayer.m index b955326d2..29e2107d4 100644 --- a/Audio/AudioPlayer.m +++ b/Audio/AudioPlayer.m @@ -29,6 +29,11 @@ endOfInputReached = NO; chainQueue = [[NSMutableArray alloc] init]; + + semaphore = [[Semaphore alloc] init]; + + atomic_init(&resettingNow, false); + atomic_init(&refCount, 0); } return self; @@ -189,6 +194,21 @@ // Called when the playlist changed before we actually started playing a requested stream. We will re-request. - (void)resetNextStreams { + // This sucks! And since the thread that's inside the function can be calling + // event dispatches, we have to pump the message queue if we're on the main + // thread. Damn. + if (atomic_load_explicit(&refCount, memory_order_relaxed) != 0) { + BOOL mainThread = (dispatch_queue_get_label(dispatch_get_main_queue()) == dispatch_queue_get_label(DISPATCH_CURRENT_QUEUE_LABEL)); + atomic_store(&resettingNow, true); + while (atomic_load_explicit(&refCount, memory_order_relaxed) != 0) { + [semaphore signal]; // Gotta poke this periodically + if (mainThread) + [[NSRunLoop currentRunLoop] runUntilDate:[NSDate dateWithTimeIntervalSinceNow:0.001]]; + else + usleep(500); + } + atomic_store(&resettingNow, false); + } @synchronized (chainQueue) { for (id anObject in chainQueue) { [anObject setShouldContinue:NO]; @@ -246,31 +266,14 @@ } - (BOOL)endOfInputReached:(BufferChain *)sender //Sender is a BufferChain -{ - // Stop single or series of short tracks from queueing forever - { - unsigned long queueCount; - - @synchronized (chainQueue) { - queueCount = [chainQueue count]; - } - - while (queueCount >= 5) - { - usleep(10000); - @synchronized (chainQueue) { - queueCount = [chainQueue count]; - } - } - } - - return [self endOfInputReachedInternal:sender]; -} - -- (BOOL)endOfInputReachedInternal:(BufferChain *)sender //Sender is a BufferChain { BufferChain *newChain = nil; + if (atomic_load_explicit(&resettingNow, memory_order_relaxed)) + return YES; + + atomic_fetch_add(&refCount, 1); + @synchronized (chainQueue) { // No point in constructing new chain for the next playlist entry // if there's already one at the head of chainQueue... r-r-right? @@ -278,6 +281,7 @@ { if ([chain isRunning]) { + atomic_fetch_sub(&refCount, 1); return YES; } } @@ -287,17 +291,42 @@ //{ // return YES; //} - - nextStreamUserInfo = [sender userInfo]; - - nextStreamRGInfo = [sender rgInfo]; } + double duration = 0.0; + + @synchronized (chainQueue) { + for (BufferChain *chain in chainQueue) { + duration += [chain secondsBuffered]; + } + } + + while (duration >= 30.0) + { + [semaphore wait]; + if (atomic_load_explicit(&resettingNow, memory_order_relaxed)) { + atomic_fetch_sub(&refCount, 1); + return YES; + } + @synchronized (chainQueue) { + duration = 0.0; + for (BufferChain *chain in chainQueue) { + duration += [chain secondsBuffered]; + } + } + } + + nextStreamUserInfo = [sender userInfo]; + + nextStreamRGInfo = [sender rgInfo]; + // This call can sometimes lead to invoking a chainQueue block on another thread [self requestNextStream: nextStreamUserInfo]; - if (!nextStream) + if (!nextStream) { + atomic_fetch_sub(&refCount, 1); return YES; + } @synchronized (chainQueue) { newChain = [[BufferChain alloc] initWithController:self]; @@ -326,6 +355,7 @@ //Keep on-playin newChain = nil; + atomic_fetch_sub(&refCount, 1); return NO; } } @@ -337,6 +367,7 @@ if (nextStream == nil) { newChain = nil; + atomic_fetch_sub(&refCount, 1); return YES; } @@ -362,6 +393,7 @@ // - head of chainQueue is the buffer chain for the next entry (which has launched its threads already) } + atomic_fetch_sub(&refCount, 1); return YES; } @@ -390,6 +422,8 @@ [chainQueue removeObjectAtIndex:0]; DLog(@"New!!! %@ %@", bufferChain, [[bufferChain inputNode] decoder]); + + [semaphore signal]; } [self notifyStreamChanged:[bufferChain userInfo]]; diff --git a/Audio/Chain/BufferChain.h b/Audio/Chain/BufferChain.h index d1888127f..05a71b9dc 100644 --- a/Audio/Chain/BufferChain.h +++ b/Audio/Chain/BufferChain.h @@ -73,4 +73,6 @@ - (ConverterNode *)converter; - (AudioStreamBasicDescription)inputFormat; +- (double)secondsBuffered; + @end diff --git a/Audio/Chain/BufferChain.m b/Audio/Chain/BufferChain.m index 31389aced..244251048 100644 --- a/Audio/Chain/BufferChain.m +++ b/Audio/Chain/BufferChain.m @@ -228,4 +228,15 @@ return inputFormat; } +- (double)secondsBuffered +{ + double duration = 0.0; + Node * node = [self finalNode]; + while (node) { + duration += [node secondsBuffered]; + node = [node previousNode]; + } + return duration; +} + @end diff --git a/Audio/Chain/ConverterNode.m b/Audio/Chain/ConverterNode.m index 5190e1fb5..85cb977a2 100644 --- a/Audio/Chain/ConverterNode.m +++ b/Audio/Chain/ConverterNode.m @@ -1000,4 +1000,9 @@ static float db_to_scale(float db) floatSize = 0; } +- (double) secondsBuffered +{ + return ((double)[buffer bufferedLength] / (outputFormat.mSampleRate * outputFormat.mBytesPerPacket)); +} + @end diff --git a/Audio/Chain/InputNode.m b/Audio/Chain/InputNode.m index 5fa51a627..8568dc501 100644 --- a/Audio/Chain/InputNode.m +++ b/Audio/Chain/InputNode.m @@ -236,4 +236,10 @@ return decoder; } +- (double) secondsBuffered +{ + AudioStreamBasicDescription inputFormat = [[[controller controller] bufferChain] inputFormat]; + return ((double)[buffer bufferedLength] / (inputFormat.mSampleRate * inputFormat.mBytesPerPacket)); +} + @end diff --git a/Audio/Chain/Node.h b/Audio/Chain/Node.h index 1e1ebfc9f..778dea9c0 100644 --- a/Audio/Chain/Node.h +++ b/Audio/Chain/Node.h @@ -60,4 +60,6 @@ - (BOOL)endOfStream; - (void)setEndOfStream:(BOOL)e; +- (double)secondsBuffered; + @end diff --git a/Audio/Chain/Node.m b/Audio/Chain/Node.m index 8b2efb99d..e8d935ad7 100644 --- a/Audio/Chain/Node.m +++ b/Audio/Chain/Node.m @@ -219,5 +219,11 @@ return shouldReset; } +// Buffering nodes should implement this +- (double)secondsBuffered +{ + return 0.0; +} + @end