Audio: Improve buffer signaling

This should stop the deadlocks which were occurring.

Signed-off-by: Christopher Snowhill <kode54@gmail.com>
This commit is contained in:
Christopher Snowhill 2025-02-13 19:55:39 -08:00
parent 4cd5cb8fa7
commit b0e6ec98a9
13 changed files with 252 additions and 54 deletions

View file

@ -201,7 +201,7 @@
- (void)dealloc {
[inputNode setShouldContinue:NO];
[[inputNode exitAtTheEndOfTheStream] signal];
[[inputNode semaphore] signal];
[[inputNode writeSemaphore] signal];
if(![inputNode threadExited])
[[inputNode exitAtTheEndOfTheStream] wait]; // wait for decoder to be closed (see InputNode's -(void)process )

View file

@ -459,7 +459,7 @@ static void convert_be_to_le(uint8_t *buffer, size_t bitsPerSample, size_t bytes
- (BOOL)isFull {
@synchronized (chunkList) {
return (maxDuration - listDuration) < 0.05;
return (maxDuration - listDuration) < 0.001;
}
}
@ -588,7 +588,6 @@ static void convert_be_to_le(uint8_t *buffer, size_t bitsPerSample, size_t bytes
if(block()) {
break;
}
usleep(500);
continue;
}
if(formatSet &&
@ -608,7 +607,6 @@ static void convert_be_to_le(uint8_t *buffer, size_t bitsPerSample, size_t bytes
if(block()) {
break;
}
usleep(500);
continue;
}

View file

@ -482,6 +482,7 @@ static float db_to_scale(float db) {
paused = NO;
[self cleanUp];
[super cleanUp];
}
- (void)setOutputFormat:(AudioStreamBasicDescription)format {

View file

@ -38,6 +38,7 @@
- (void)dealloc {
DLog(@"Downmix dealloc");
[self cleanUp];
[super cleanUp];
}
- (BOOL)fullInit {

View file

@ -157,6 +157,7 @@ static OSStatus eqRenderCallback(void *inRefCon, AudioUnitRenderActionFlags *ioA
DLog(@"Equalizer dealloc");
[self cleanUp];
[self removeObservers];
[super cleanUp];
}
- (void)addObservers {

View file

@ -55,6 +55,7 @@ static void * kDSPFSurroundNodeContext = &kDSPFSurroundNodeContext;
DLog(@"FreeSurround dealloc");
[self cleanUp];
[self removeObservers];
[super cleanUp];
}
- (void)addObservers {

View file

@ -130,6 +130,7 @@ static void unregisterMotionListener(void) {
DLog(@"HRTF dealloc");
[self cleanUp];
[self removeObservers];
[super cleanUp];
}
- (void)addObservers {

View file

@ -65,6 +65,7 @@ static void * kDSPRubberbandNodeContext = &kDSPRubberbandNodeContext;
DLog(@"Rubber Band dealloc");
[self cleanUp];
[self removeObservers];
[super cleanUp];
}
- (void)addObservers {

View file

@ -16,7 +16,8 @@
if(self) {
buffer = [[ChunkList alloc] initWithMaximumDuration:latency];
semaphore = [[Semaphore alloc] init];
writeSemaphore = [[Semaphore alloc] init];
readSemaphore = [[Semaphore alloc] init];
accessLock = [[NSLock alloc] init];
@ -31,6 +32,11 @@
durationPrebuffer = latency * 0.25;
inWrite = NO;
inPeek = NO;
inRead = NO;
inMerge = NO;
[self setPreviousNode:p];
}

View file

@ -246,7 +246,7 @@ static void *kInputNodeContext = &kInputNodeContext;
seekFrame = frame;
shouldSeek = YES;
DLog(@"Should seek!");
[semaphore signal];
[writeSemaphore signal];
if(endOfStream) {
[exitAtTheEndOfTheStream signal];
@ -280,6 +280,7 @@ static void *kInputNodeContext = &kInputNodeContext;
- (void)dealloc {
DLog(@"Input Node dealloc");
[self removeObservers];
[super cleanUp];
}
- (NSDictionary *)properties {

View file

@ -17,7 +17,8 @@
@interface Node : NSObject {
ChunkList *buffer;
Semaphore *semaphore;
Semaphore *writeSemaphore;
Semaphore *readSemaphore;
NSLock *accessLock;
@ -26,6 +27,11 @@
BOOL shouldReset;
BOOL inWrite;
BOOL inPeek;
BOOL inRead;
BOOL inMerge;
BOOL shouldContinue;
BOOL endOfStream; // All data is now in buffer
BOOL initialBufferFilled;
@ -38,6 +44,8 @@
}
- (id _Nullable)initWithController:(id _Nonnull)c previous:(id _Nullable)p;
- (void)cleanUp;
- (void)writeData:(const void *_Nonnull)ptr amount:(size_t)a;
- (void)writeChunk:(AudioChunk *_Nonnull)chunk;
- (AudioChunk *_Nonnull)readChunk:(size_t)maxFrames;
@ -70,7 +78,8 @@
- (uint32_t)nodeChannelConfig;
- (BOOL)nodeLossless;
- (Semaphore *_Nonnull)semaphore;
- (Semaphore *_Nonnull)writeSemaphore;
- (Semaphore *_Nonnull)readSemaphore;
//-(void)resetBuffer;

View file

@ -23,7 +23,8 @@
self = [super init];
if(self) {
buffer = [[ChunkList alloc] initWithMaximumDuration:10.0];
semaphore = [[Semaphore alloc] init];
writeSemaphore = [[Semaphore alloc] init];
readSemaphore = [[Semaphore alloc] init];
accessLock = [[NSLock alloc] init];
@ -38,12 +39,32 @@
durationPrebuffer = 2.0;
inWrite = NO;
inPeek = NO;
inRead = NO;
inMerge = NO;
[self setPreviousNode:p];
}
return self;
}
- (void)dealloc {
[self cleanUp];
}
- (void)cleanUp {
[self setShouldContinue:NO];
while(inWrite || inPeek || inRead || inMerge) {
[writeSemaphore signal];
if(previousNode) {
[[previousNode readSemaphore] signal];
}
usleep(500);
}
}
- (AudioStreamBasicDescription)nodeFormat {
return nodeFormat;
}
@ -57,6 +78,12 @@
}
- (void)writeData:(const void *)ptr amount:(size_t)amount {
inWrite = YES;
if(!shouldContinue) {
inWrite = NO;
return;
}
[accessLock lock];
AudioChunk *chunk = [[AudioChunk alloc] init];
@ -78,22 +105,38 @@
}
}
while(shouldContinue == YES && durationLeft <= 0.0) {
if(durationLeft <= 0.0 || shouldReset) {
while(shouldContinue == YES && durationLeft < 0.0) {
if(durationLeft < 0.0 || shouldReset) {
[accessLock unlock];
[semaphore wait];
[writeSemaphore timedWait:2000];
[accessLock lock];
}
durationLeft = [buffer maxDuration] - [buffer listDuration];
}
BOOL doSignal = NO;
if([chunk frameCount]) {
[buffer addChunk:chunk];
doSignal = YES;
}
[accessLock unlock];
if(doSignal) {
[readSemaphore signal];
}
inWrite = NO;
}
- (void)writeChunk:(AudioChunk *)chunk {
inWrite = YES;
if(!shouldContinue) {
inWrite = NO;
return;
}
[accessLock lock];
double durationList = [buffer listDuration];
@ -107,24 +150,34 @@
}
}
while(shouldContinue == YES && durationLeft <= 0.0) {
while(shouldContinue == YES && durationLeft < 0.0) {
if(previousNode && [previousNode shouldContinue] == NO) {
shouldContinue = NO;
break;
}
if(durationLeft <= 0.0 || shouldReset) {
if(durationLeft < 0.0 || shouldReset) {
[accessLock unlock];
[semaphore wait];
[writeSemaphore timedWait:2000];
[accessLock lock];
}
durationLeft = [buffer maxDuration] - [buffer listDuration];
}
BOOL doSignal = NO;
if([chunk frameCount]) {
[buffer addChunk:chunk];
doSignal = YES;
}
[accessLock unlock];
if(doSignal) {
[readSemaphore signal];
}
inWrite = NO;
}
// Should be overwriten by subclass.
@ -138,11 +191,30 @@
}
- (BOOL)peekFormat:(nonnull AudioStreamBasicDescription *)format channelConfig:(nonnull uint32_t *)config {
inPeek = YES;
if(!shouldContinue) {
inPeek = NO;
return NO;
}
[accessLock lock];
while(shouldContinue && [[previousNode buffer] isEmpty] && [previousNode endOfStream] == NO) {
[accessLock unlock];
[[previousNode readSemaphore] timedWait:2000];
[accessLock lock];
}
if(!shouldContinue) {
[accessLock unlock];
inPeek = NO;
return NO;
}
if([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES) {
endOfStream = YES;
[accessLock unlock];
inPeek = NO;
return NO;
}
@ -150,15 +222,36 @@
[accessLock unlock];
inPeek = NO;
return ret;
}
- (BOOL)peekTimestamp:(double *_Nonnull)timestamp timeRatio:(double *_Nonnull)timeRatio {
inPeek = YES;
if(!shouldContinue) {
inPeek = NO;
return NO;
}
[accessLock lock];
while(shouldContinue && [[previousNode buffer] isEmpty] && [previousNode endOfStream] == NO) {
[accessLock unlock];
[[previousNode readSemaphore] timedWait:2000];
[accessLock lock];
}
if(!shouldContinue) {
[accessLock unlock];
inPeek = NO;
return NO;
}
if([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES) {
endOfStream = YES;
[accessLock unlock];
inPeek = NO;
return NO;
}
@ -166,15 +259,39 @@
[accessLock unlock];
inPeek = NO;
return ret;
}
- (AudioChunk *)readChunk:(size_t)maxFrames {
inRead = YES;
if(!shouldContinue) {
inRead = NO;
return [[AudioChunk alloc] init];
}
[accessLock lock];
while(shouldContinue && [[previousNode buffer] isEmpty] && [previousNode endOfStream] == NO) {
[accessLock unlock];
[[previousNode readSemaphore] timedWait:2000];
[accessLock lock];
if([previousNode shouldReset] == YES) {
break;
}
}
if(!shouldContinue) {
[accessLock unlock];
inRead = NO;
return [[AudioChunk alloc] init];
}
if([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES) {
endOfStream = YES;
[accessLock unlock];
inRead = NO;
return [[AudioChunk alloc] init];
}
@ -186,7 +303,7 @@
shouldReset = YES;
[previousNode setShouldReset:NO];
[[previousNode semaphore] signal];
[[previousNode writeSemaphore] signal];
}
AudioChunk *ret;
@ -198,18 +315,42 @@
[accessLock unlock];
if([ret frameCount]) {
[[previousNode semaphore] signal];
[[previousNode writeSemaphore] signal];
}
inRead = NO;
return ret;
}
- (AudioChunk *)readChunkAsFloat32:(size_t)maxFrames {
inRead = YES;
if(!shouldContinue) {
inRead = NO;
return [[AudioChunk alloc] init];
}
[accessLock lock];
while(shouldContinue && [[previousNode buffer] isEmpty] && [previousNode endOfStream] == NO) {
[accessLock unlock];
[[previousNode readSemaphore] timedWait:2000];
[accessLock lock];
if([previousNode shouldReset] == YES) {
break;
}
}
if(!shouldContinue) {
[accessLock unlock];
inRead = NO;
return [[AudioChunk alloc] init];
}
if([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES) {
endOfStream = YES;
[accessLock unlock];
inRead = NO;
return [[AudioChunk alloc] init];
}
@ -221,7 +362,7 @@
shouldReset = YES;
[previousNode setShouldReset:NO];
[[previousNode semaphore] signal];
[[previousNode writeSemaphore] signal];
}
AudioChunk *ret;
@ -233,58 +374,34 @@
[accessLock unlock];
if([ret frameCount]) {
[[previousNode semaphore] signal];
[[previousNode writeSemaphore] signal];
}
inRead = NO;
return ret;
}
- (AudioChunk *)readAndMergeChunks:(size_t)maxFrames {
inMerge = YES;
if(!shouldContinue) {
inMerge = NO;
return [[AudioChunk alloc] init];
}
[accessLock lock];
if([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES) {
endOfStream = YES;
[accessLock unlock];
inMerge = NO;
return [[AudioChunk alloc] init];
}
if([previousNode shouldReset] == YES) {
@autoreleasepool {
[buffer reset];
}
shouldReset = YES;
[previousNode setShouldReset:NO];
[[previousNode semaphore] signal];
}
[accessLock unlock];
AudioChunk *ret;
@autoreleasepool {
ret = [[previousNode buffer] removeAndMergeSamples:maxFrames callBlock:^BOOL{
return [[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES;
}];
}
if([ret frameCount]) {
[[previousNode semaphore] signal];
}
return ret;
}
- (AudioChunk *)readAndMergeChunksAsFloat32:(size_t)maxFrames {
[accessLock lock];
if([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES) {
endOfStream = YES;
[accessLock unlock];
return [[AudioChunk alloc] init];
}
if([previousNode shouldReset] == YES) {
@autoreleasepool {
[buffer reset];
@ -292,24 +409,74 @@
shouldReset = YES;
[previousNode setShouldReset:NO];
[[previousNode semaphore] signal];
}
[accessLock unlock];
[[previousNode writeSemaphore] signal];
[[previousNode readSemaphore] timedWait:2000];
[accessLock lock];
return !shouldContinue || ([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES);
}];
}
[accessLock unlock];
if([ret frameCount]) {
[[previousNode writeSemaphore] signal];
}
inMerge = NO;
return ret;
}
- (AudioChunk *)readAndMergeChunksAsFloat32:(size_t)maxFrames {
inMerge = YES;
if(!shouldContinue) {
inMerge = NO;
return [[AudioChunk alloc] init];
}
[accessLock lock];
if([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES) {
endOfStream = YES;
[accessLock unlock];
inMerge = NO;
return [[AudioChunk alloc] init];
}
AudioChunk *ret;
@autoreleasepool {
ret = [[previousNode buffer] removeAndMergeSamplesAsFloat32:maxFrames callBlock:^BOOL{
return [[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES;
if([previousNode shouldReset] == YES) {
@autoreleasepool {
[buffer reset];
}
shouldReset = YES;
[previousNode setShouldReset:NO];
}
[accessLock unlock];
[[previousNode writeSemaphore] signal];
[[previousNode readSemaphore] timedWait:2000];
[accessLock lock];
return !shouldContinue || ([[previousNode buffer] isEmpty] && [previousNode endOfStream] == YES);
}];
}
[accessLock unlock];
if([ret frameCount]) {
[[previousNode semaphore] signal];
[[previousNode writeSemaphore] signal];
}
inMerge = NO;
return ret;
}
@ -348,8 +515,12 @@
}
}
- (Semaphore *)semaphore {
return semaphore;
- (Semaphore *)writeSemaphore {
return writeSemaphore;
}
- (Semaphore *)readSemaphore {
return readSemaphore;
}
- (BOOL)endOfStream {

View file

@ -111,7 +111,8 @@ static VisualizationCollection *theCollection = nil;
if(self) {
buffer = [[ChunkList alloc] initWithMaximumDuration:latency];
semaphore = [[Semaphore alloc] init];
writeSemaphore = [[Semaphore alloc] init];
readSemaphore = [[Semaphore alloc] init];
accessLock = [[NSLock alloc] init];
@ -133,6 +134,11 @@ static VisualizationCollection *theCollection = nil;
replay = NO;
prerollBuffer = [[NSMutableData alloc] init];
inWrite = NO;
inPeek = NO;
inRead = NO;
inMerge = NO;
[self setPreviousNode:p];
}
@ -143,6 +149,7 @@ static VisualizationCollection *theCollection = nil;
DLog(@"Visualization node dealloc");
[self cleanUp];
[self pop];
[super cleanUp];
}
- (void)pop {