diff options
author | Andrew Comminos <andrewcomminos@gmail.com> | 2014-09-04 04:52:49 +0400 |
---|---|---|
committer | Andrew Comminos <andrewcomminos@gmail.com> | 2014-09-04 04:52:49 +0400 |
commit | 1cc4120a3d262ce3c8ac81c310971f8a4b36b46a (patch) | |
tree | 58f46c1db857cec8c3b50658767ee52bcb482d92 | |
parent | 6ae63631dae976f6854be9b9b685a5e4ac9519a2 (diff) |
Parallelize decode process.
-rw-r--r-- | src/main/java/com/morlunk/jumble/audio/AudioOutput.java | 65 | ||||
-rw-r--r-- | src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java | 68 |
2 files changed, 101 insertions, 32 deletions
diff --git a/src/main/java/com/morlunk/jumble/audio/AudioOutput.java b/src/main/java/com/morlunk/jumble/audio/AudioOutput.java index 4ba3674..aa36927 100644 --- a/src/main/java/com/morlunk/jumble/audio/AudioOutput.java +++ b/src/main/java/com/morlunk/jumble/audio/AudioOutput.java @@ -24,7 +24,6 @@ import android.os.Handler; import android.os.Looper; import android.os.Process; import android.util.Log; -import android.util.SparseArray; import com.morlunk.jumble.Constants; import com.morlunk.jumble.exception.AudioInitializationException; @@ -34,33 +33,45 @@ import com.morlunk.jumble.net.JumbleUDPMessageType; import com.morlunk.jumble.net.PacketBuffer; import com.morlunk.jumble.protocol.AudioHandler; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * Created by andrew on 16/07/13. */ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListener { + /** The size (in samples) of the mixing buffer. */ + public static final int BUFFER_SIZE = AudioHandler.FRAME_SIZE; - private SparseArray<AudioOutputSpeech> mAudioOutputs = new SparseArray<AudioOutputSpeech>(); + private Map<Integer,AudioOutputSpeech> mAudioOutputs = new HashMap<Integer, AudioOutputSpeech>(); private AudioTrack mAudioTrack; private int mBufferSize; private Thread mThread; private final Object mInactiveLock = new Object(); // Lock that the audio thread waits on when there's no audio to play. Wake when we get a frame. private final Object mPacketLock = new Object(); private boolean mRunning = false; - private List<AudioOutputSpeech> mMixBuffer = new ArrayList<AudioOutputSpeech>(); - private List<AudioOutputSpeech> mDelBuffer = new ArrayList<AudioOutputSpeech>(); + private List<AudioOutputSpeech.Result> mMixBuffer = new ArrayList<AudioOutputSpeech.Result>(); + private List<AudioOutputSpeech.Result> mDelBuffer = new ArrayList<AudioOutputSpeech.Result>(); private Handler mMainHandler; private AudioOutputListener mListener; private int mAudioStream; + private int mNumThreads; // Set the number of decoding threads to number of cores + private ExecutorService mDecodeExecutorService; + public AudioOutput(AudioOutputListener listener, int audioStream) { mListener = listener; mAudioStream = audioStream; mMainHandler = new Handler(Looper.getMainLooper()); + mNumThreads = Runtime.getRuntime().availableProcessors(); + mDecodeExecutorService = Executors.newFixedThreadPool(mNumThreads); } public void startPlaying(boolean scoEnabled) throws AudioInitializationException { @@ -102,8 +113,8 @@ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListene e.printStackTrace(); } mThread = null; - for(int i = 0; i < mAudioOutputs.size(); i++) { - mAudioOutputs.valueAt(i).destroy(); + for(AudioOutputSpeech speech : mAudioOutputs.values()) { + speech.destroy(); } mAudioOutputs.clear(); @@ -122,11 +133,11 @@ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListene mRunning = true; mAudioTrack.play(); - final short[] mix = new short[AudioHandler.FRAME_SIZE]; + final short[] mix = new short[BUFFER_SIZE]; while(mRunning) { Arrays.fill(mix, (short)0); - boolean play = mix(mix, mix.length); + boolean play = mix(mix); if(play) { mAudioTrack.write(mix, 0, mix.length); } else { @@ -146,31 +157,43 @@ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListene mAudioTrack.stop(); } - private boolean mix(short[] outBuffer, int bufferSize) { + private boolean mix(short[] outBuffer) { mMixBuffer.clear(); mDelBuffer.clear(); // TODO add priority speaker support synchronized (mPacketLock) { - for(int i = 0; i < mAudioOutputs.size(); i++) { - AudioOutputSpeech speech = mAudioOutputs.valueAt(i); - if(!speech.needSamples(bufferSize)) - mDelBuffer.add(speech); - else - mMixBuffer.add(speech); + try { + // Parallelize decoding using a fixed thread pool equal to the number of cores + List<Future<AudioOutputSpeech.Result>> futureResults = mDecodeExecutorService.invokeAll(mAudioOutputs.values()); + for(Future<AudioOutputSpeech.Result> future : futureResults) { + AudioOutputSpeech.Result result = future.get(); + if(!result.isAlive()) { + mDelBuffer.add(result); + } else { + mMixBuffer.add(result); + } + } + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } catch (ExecutionException e) { + e.printStackTrace(); + return false; } if(!mMixBuffer.isEmpty()) { - for(AudioOutputSpeech speech : mMixBuffer) { - float[] buffer = speech.getBuffer(); - for(int i = 0; i < bufferSize; i++) { + for(AudioOutputSpeech.Result result : mMixBuffer) { + float[] buffer = result.getSamples(); + for(int i = 0; i < BUFFER_SIZE; i++) { short pcm = (short) (buffer[i]*Short.MAX_VALUE); // Convert float to short pcm = pcm <= Short.MAX_VALUE ? (pcm >= Short.MIN_VALUE ? pcm : Short.MIN_VALUE) : Short.MIN_VALUE; // Clip audio outBuffer[i] += pcm; } } } - for(AudioOutputSpeech speech : mDelBuffer) { + for(AudioOutputSpeech.Result result : mDelBuffer) { + AudioOutputSpeech speech = result.getSpeechOutput(); Log.v(Constants.TAG, "Deleted audio user "+speech.getUser().getName()); mAudioOutputs.remove(speech.getSession()); speech.destroy(); @@ -202,7 +225,7 @@ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListene } if(aop == null) { try { - aop = new AudioOutputSpeech(user, messageType, this); + aop = new AudioOutputSpeech(user, messageType, BUFFER_SIZE, this); } catch (NativeAudioException e) { Log.v(Constants.TAG, "Failed to create audio user "+user.getName()); e.printStackTrace(); diff --git a/src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java b/src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java index fd4380b..cbbc70d 100644 --- a/src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java +++ b/src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java @@ -33,12 +33,15 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Queue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RunnableFuture; /** * Created by andrew on 16/07/13. */ -public class AudioOutputSpeech { +public class AudioOutputSpeech implements Callable<AudioOutputSpeech.Result> { + interface TalkStateListener { public void onTalkStateUpdated(int session, User.TalkState state); } @@ -50,6 +53,7 @@ public class AudioOutputSpeech { private User mUser; private JumbleUDPMessageType mCodec; private int mAudioBufferSize = AudioHandler.FRAME_SIZE; + private int mRequestedSamples; // Number of samples requested // State-specific private float[] mBuffer; @@ -66,10 +70,11 @@ public class AudioOutputSpeech { private TalkStateListener mTalkStateListener; - public AudioOutputSpeech(User user, JumbleUDPMessageType codec, TalkStateListener listener) throws NativeAudioException { + public AudioOutputSpeech(User user, JumbleUDPMessageType codec, int requestedSamples, TalkStateListener listener) throws NativeAudioException { // TODO: consider implementing resampling if some Android devices not support 48kHz? mUser = user; mCodec = codec; + mRequestedSamples = requestedSamples; mTalkStateListener = listener; switch (codec) { case UDPVoiceOpus: @@ -145,19 +150,20 @@ public class AudioOutputSpeech { } } - public boolean needSamples(int num) { + @Override + public Result call() throws Exception { for(int i = mLastConsume; i < mBufferFilled; ++i) mBuffer[i-mLastConsume] = mBuffer[i]; mBufferFilled -= mLastConsume; - mLastConsume = num; + mLastConsume = mRequestedSamples; - if(mBufferFilled >= num) - return mLastAlive; + if(mBufferFilled >= mRequestedSamples) + return new Result(this, mLastAlive, mBuffer, mBufferFilled); boolean nextAlive = mLastAlive; - while(mBufferFilled < num) { + while(mBufferFilled < mRequestedSamples) { int decodedSamples = AudioHandler.FRAME_SIZE; resizeBuffer(mBufferFilled + mAudioBufferSize); @@ -311,18 +317,22 @@ public class AudioOutputSpeech { boolean tmp = mLastAlive; mLastAlive = nextAlive; - return tmp; + return new Result(this, tmp, mBuffer, mBufferFilled); } - public void resizeBuffer(int newSize) { + private void resizeBuffer(int newSize) { if(newSize > mBuffer.length) { float[] newBuffer = Arrays.copyOf(mBuffer, newSize); mBuffer = newBuffer; } } - public float[] getBuffer() { - return mBuffer; + /** + * Sets the preferred number of samples to return when the callable is executed. + * @param samples The number of floating point samples to retrieve. + */ + public void setRequestedSamples(int samples) { + mRequestedSamples = samples; } public JumbleUDPMessageType getCodec() { @@ -345,4 +355,40 @@ public class AudioOutputSpeech { if(mDecoder != null) mDecoder.destroy(); mJitterBuffer.destroy(); } + + /** + * The outcome of a decoding pass. + */ + protected static class Result { + private AudioOutputSpeech mSpeechOutput; + private boolean mAlive; + private float[] mSamples; + private int mNumSamples; + + private Result(AudioOutputSpeech speechOutput, + boolean alive, + float[] samples, + int numSamples) { + mSpeechOutput = speechOutput; + mAlive = alive; + mSamples = samples; + mNumSamples = numSamples; + } + + public AudioOutputSpeech getSpeechOutput() { + return mSpeechOutput; + } + + public boolean isAlive() { + return mAlive; + } + + public float[] getSamples() { + return mSamples; + } + + public int getNumSamples() { + return mNumSamples; + } + } } |