Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/Morlunk/Jumble.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Comminos <andrewcomminos@gmail.com>2014-09-04 04:52:49 +0400
committerAndrew Comminos <andrewcomminos@gmail.com>2014-09-04 04:52:49 +0400
commit1cc4120a3d262ce3c8ac81c310971f8a4b36b46a (patch)
tree58f46c1db857cec8c3b50658767ee52bcb482d92
parent6ae63631dae976f6854be9b9b685a5e4ac9519a2 (diff)
Parallelize decode process.
-rw-r--r--src/main/java/com/morlunk/jumble/audio/AudioOutput.java65
-rw-r--r--src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java68
2 files changed, 101 insertions, 32 deletions
diff --git a/src/main/java/com/morlunk/jumble/audio/AudioOutput.java b/src/main/java/com/morlunk/jumble/audio/AudioOutput.java
index 4ba3674..aa36927 100644
--- a/src/main/java/com/morlunk/jumble/audio/AudioOutput.java
+++ b/src/main/java/com/morlunk/jumble/audio/AudioOutput.java
@@ -24,7 +24,6 @@ import android.os.Handler;
import android.os.Looper;
import android.os.Process;
import android.util.Log;
-import android.util.SparseArray;
import com.morlunk.jumble.Constants;
import com.morlunk.jumble.exception.AudioInitializationException;
@@ -34,33 +33,45 @@ import com.morlunk.jumble.net.JumbleUDPMessageType;
import com.morlunk.jumble.net.PacketBuffer;
import com.morlunk.jumble.protocol.AudioHandler;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
/**
* Created by andrew on 16/07/13.
*/
public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListener {
+ /** The size (in samples) of the mixing buffer. */
+ public static final int BUFFER_SIZE = AudioHandler.FRAME_SIZE;
- private SparseArray<AudioOutputSpeech> mAudioOutputs = new SparseArray<AudioOutputSpeech>();
+ private Map<Integer,AudioOutputSpeech> mAudioOutputs = new HashMap<Integer, AudioOutputSpeech>();
private AudioTrack mAudioTrack;
private int mBufferSize;
private Thread mThread;
private final Object mInactiveLock = new Object(); // Lock that the audio thread waits on when there's no audio to play. Wake when we get a frame.
private final Object mPacketLock = new Object();
private boolean mRunning = false;
- private List<AudioOutputSpeech> mMixBuffer = new ArrayList<AudioOutputSpeech>();
- private List<AudioOutputSpeech> mDelBuffer = new ArrayList<AudioOutputSpeech>();
+ private List<AudioOutputSpeech.Result> mMixBuffer = new ArrayList<AudioOutputSpeech.Result>();
+ private List<AudioOutputSpeech.Result> mDelBuffer = new ArrayList<AudioOutputSpeech.Result>();
private Handler mMainHandler;
private AudioOutputListener mListener;
private int mAudioStream;
+ private int mNumThreads; // Set the number of decoding threads to number of cores
+ private ExecutorService mDecodeExecutorService;
+
public AudioOutput(AudioOutputListener listener, int audioStream) {
mListener = listener;
mAudioStream = audioStream;
mMainHandler = new Handler(Looper.getMainLooper());
+ mNumThreads = Runtime.getRuntime().availableProcessors();
+ mDecodeExecutorService = Executors.newFixedThreadPool(mNumThreads);
}
public void startPlaying(boolean scoEnabled) throws AudioInitializationException {
@@ -102,8 +113,8 @@ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListene
e.printStackTrace();
}
mThread = null;
- for(int i = 0; i < mAudioOutputs.size(); i++) {
- mAudioOutputs.valueAt(i).destroy();
+ for(AudioOutputSpeech speech : mAudioOutputs.values()) {
+ speech.destroy();
}
mAudioOutputs.clear();
@@ -122,11 +133,11 @@ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListene
mRunning = true;
mAudioTrack.play();
- final short[] mix = new short[AudioHandler.FRAME_SIZE];
+ final short[] mix = new short[BUFFER_SIZE];
while(mRunning) {
Arrays.fill(mix, (short)0);
- boolean play = mix(mix, mix.length);
+ boolean play = mix(mix);
if(play) {
mAudioTrack.write(mix, 0, mix.length);
} else {
@@ -146,31 +157,43 @@ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListene
mAudioTrack.stop();
}
- private boolean mix(short[] outBuffer, int bufferSize) {
+ private boolean mix(short[] outBuffer) {
mMixBuffer.clear();
mDelBuffer.clear();
// TODO add priority speaker support
synchronized (mPacketLock) {
- for(int i = 0; i < mAudioOutputs.size(); i++) {
- AudioOutputSpeech speech = mAudioOutputs.valueAt(i);
- if(!speech.needSamples(bufferSize))
- mDelBuffer.add(speech);
- else
- mMixBuffer.add(speech);
+ try {
+ // Parallelize decoding using a fixed thread pool equal to the number of cores
+ List<Future<AudioOutputSpeech.Result>> futureResults = mDecodeExecutorService.invokeAll(mAudioOutputs.values());
+ for(Future<AudioOutputSpeech.Result> future : futureResults) {
+ AudioOutputSpeech.Result result = future.get();
+ if(!result.isAlive()) {
+ mDelBuffer.add(result);
+ } else {
+ mMixBuffer.add(result);
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return false;
+ } catch (ExecutionException e) {
+ e.printStackTrace();
+ return false;
}
if(!mMixBuffer.isEmpty()) {
- for(AudioOutputSpeech speech : mMixBuffer) {
- float[] buffer = speech.getBuffer();
- for(int i = 0; i < bufferSize; i++) {
+ for(AudioOutputSpeech.Result result : mMixBuffer) {
+ float[] buffer = result.getSamples();
+ for(int i = 0; i < BUFFER_SIZE; i++) {
short pcm = (short) (buffer[i]*Short.MAX_VALUE); // Convert float to short
pcm = pcm <= Short.MAX_VALUE ? (pcm >= Short.MIN_VALUE ? pcm : Short.MIN_VALUE) : Short.MIN_VALUE; // Clip audio
outBuffer[i] += pcm;
}
}
}
- for(AudioOutputSpeech speech : mDelBuffer) {
+ for(AudioOutputSpeech.Result result : mDelBuffer) {
+ AudioOutputSpeech speech = result.getSpeechOutput();
Log.v(Constants.TAG, "Deleted audio user "+speech.getUser().getName());
mAudioOutputs.remove(speech.getSession());
speech.destroy();
@@ -202,7 +225,7 @@ public class AudioOutput implements Runnable, AudioOutputSpeech.TalkStateListene
}
if(aop == null) {
try {
- aop = new AudioOutputSpeech(user, messageType, this);
+ aop = new AudioOutputSpeech(user, messageType, BUFFER_SIZE, this);
} catch (NativeAudioException e) {
Log.v(Constants.TAG, "Failed to create audio user "+user.getName());
e.printStackTrace();
diff --git a/src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java b/src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java
index fd4380b..cbbc70d 100644
--- a/src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java
+++ b/src/main/java/com/morlunk/jumble/audio/AudioOutputSpeech.java
@@ -33,12 +33,15 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Queue;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RunnableFuture;
/**
* Created by andrew on 16/07/13.
*/
-public class AudioOutputSpeech {
+public class AudioOutputSpeech implements Callable<AudioOutputSpeech.Result> {
+
interface TalkStateListener {
public void onTalkStateUpdated(int session, User.TalkState state);
}
@@ -50,6 +53,7 @@ public class AudioOutputSpeech {
private User mUser;
private JumbleUDPMessageType mCodec;
private int mAudioBufferSize = AudioHandler.FRAME_SIZE;
+ private int mRequestedSamples; // Number of samples requested
// State-specific
private float[] mBuffer;
@@ -66,10 +70,11 @@ public class AudioOutputSpeech {
private TalkStateListener mTalkStateListener;
- public AudioOutputSpeech(User user, JumbleUDPMessageType codec, TalkStateListener listener) throws NativeAudioException {
+ public AudioOutputSpeech(User user, JumbleUDPMessageType codec, int requestedSamples, TalkStateListener listener) throws NativeAudioException {
// TODO: consider implementing resampling if some Android devices not support 48kHz?
mUser = user;
mCodec = codec;
+ mRequestedSamples = requestedSamples;
mTalkStateListener = listener;
switch (codec) {
case UDPVoiceOpus:
@@ -145,19 +150,20 @@ public class AudioOutputSpeech {
}
}
- public boolean needSamples(int num) {
+ @Override
+ public Result call() throws Exception {
for(int i = mLastConsume; i < mBufferFilled; ++i)
mBuffer[i-mLastConsume] = mBuffer[i];
mBufferFilled -= mLastConsume;
- mLastConsume = num;
+ mLastConsume = mRequestedSamples;
- if(mBufferFilled >= num)
- return mLastAlive;
+ if(mBufferFilled >= mRequestedSamples)
+ return new Result(this, mLastAlive, mBuffer, mBufferFilled);
boolean nextAlive = mLastAlive;
- while(mBufferFilled < num) {
+ while(mBufferFilled < mRequestedSamples) {
int decodedSamples = AudioHandler.FRAME_SIZE;
resizeBuffer(mBufferFilled + mAudioBufferSize);
@@ -311,18 +317,22 @@ public class AudioOutputSpeech {
boolean tmp = mLastAlive;
mLastAlive = nextAlive;
- return tmp;
+ return new Result(this, tmp, mBuffer, mBufferFilled);
}
- public void resizeBuffer(int newSize) {
+ private void resizeBuffer(int newSize) {
if(newSize > mBuffer.length) {
float[] newBuffer = Arrays.copyOf(mBuffer, newSize);
mBuffer = newBuffer;
}
}
- public float[] getBuffer() {
- return mBuffer;
+ /**
+ * Sets the preferred number of samples to return when the callable is executed.
+ * @param samples The number of floating point samples to retrieve.
+ */
+ public void setRequestedSamples(int samples) {
+ mRequestedSamples = samples;
}
public JumbleUDPMessageType getCodec() {
@@ -345,4 +355,40 @@ public class AudioOutputSpeech {
if(mDecoder != null) mDecoder.destroy();
mJitterBuffer.destroy();
}
+
+ /**
+ * The outcome of a decoding pass.
+ */
+ protected static class Result {
+ private AudioOutputSpeech mSpeechOutput;
+ private boolean mAlive;
+ private float[] mSamples;
+ private int mNumSamples;
+
+ private Result(AudioOutputSpeech speechOutput,
+ boolean alive,
+ float[] samples,
+ int numSamples) {
+ mSpeechOutput = speechOutput;
+ mAlive = alive;
+ mSamples = samples;
+ mNumSamples = numSamples;
+ }
+
+ public AudioOutputSpeech getSpeechOutput() {
+ return mSpeechOutput;
+ }
+
+ public boolean isAlive() {
+ return mAlive;
+ }
+
+ public float[] getSamples() {
+ return mSamples;
+ }
+
+ public int getNumSamples() {
+ return mNumSamples;
+ }
+ }
}