Welcome to mirror list, hosted at ThFree Co, Russian Federation.

gitlab.com/quite/humla.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Comminos <andrew@comminos.com>2017-08-23 04:39:54 +0300
committerAndrew Comminos <andrew@comminos.com>2017-08-23 04:39:54 +0300
commit4a0fe1d3c8592491809783d97f61088b98fa0027 (patch)
tree73a79a9ebc9e325fa1ff0d86fc2b077264444105
parent4a44ab69bdf4bd2c755f694dbac95fcc06373078 (diff)
Improve UDP socket logic, fixing various issues with concurrency when sending messages.
-rw-r--r--src/main/java/com/morlunk/jumble/net/JumbleConnection.java13
-rw-r--r--src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java1
-rw-r--r--src/main/java/com/morlunk/jumble/net/JumbleTCP.java30
-rw-r--r--src/main/java/com/morlunk/jumble/net/JumbleUDP.java222
4 files changed, 166 insertions, 100 deletions
diff --git a/src/main/java/com/morlunk/jumble/net/JumbleConnection.java b/src/main/java/com/morlunk/jumble/net/JumbleConnection.java
index 6f54199..9194902 100644
--- a/src/main/java/com/morlunk/jumble/net/JumbleConnection.java
+++ b/src/main/java/com/morlunk/jumble/net/JumbleConnection.java
@@ -625,17 +625,12 @@ public class JumbleConnection implements JumbleTCP.TCPConnectionListener, Jumble
mConnected = true;
// Attempt to start UDP thread once connected.
- if(!shouldForceTCP()) {
- try {
- mUDP = new JumbleUDP(mCryptState);
- mUDP.setUDPConnectionListener(this);
- mUDP.connect(mHost, mPort);
- } catch (ConnectException e) {
- onUDPConnectionError(e);
- }
+ if (!shouldForceTCP()) {
+ mUDP = new JumbleUDP(mCryptState, this, mMainHandler);
+ mUDP.connect(mHost, mPort);
}
- if(mListener != null) mListener.onConnectionEstablished();
+ if (mListener != null) mListener.onConnectionEstablished();
}
@Override
diff --git a/src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java b/src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java
index b58f0e5..9c89117 100644
--- a/src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java
+++ b/src/main/java/com/morlunk/jumble/net/JumbleNetworkThread.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors;
* Base class for TCP/UDP protocol implementations.
* Provides a common threading model (single threaded queue for write)
* Created by andrew on 25/03/14.
+ * @deprecated This shouldn't be needed. Redundant inheritance with limited shared code.
*/
public abstract class JumbleNetworkThread implements Runnable {
diff --git a/src/main/java/com/morlunk/jumble/net/JumbleTCP.java b/src/main/java/com/morlunk/jumble/net/JumbleTCP.java
index 764b2af..6ffbbbc 100644
--- a/src/main/java/com/morlunk/jumble/net/JumbleTCP.java
+++ b/src/main/java/com/morlunk/jumble/net/JumbleTCP.java
@@ -126,15 +126,6 @@ public class JumbleTCP extends JumbleNetworkThread {
});
}
}
-
- if(mListener != null) {
- executeOnMainThread(new Runnable() {
- @Override
- public void run() {
- mListener.onTCPConnectionDisconnect();
- }
- });
- }
} catch (SocketException e) {
error("Could not open a connection to the host", e);
} catch (SSLHandshakeException e) {
@@ -162,6 +153,13 @@ public class JumbleTCP extends JumbleNetworkThread {
e.printStackTrace();
}
mRunning = false;
+
+ executeOnMainThread(new Runnable() {
+ @Override
+ public void run() {
+ mListener.onTCPConnectionDisconnect();
+ }
+ });
stopThreads();
}
}
@@ -215,6 +213,8 @@ public class JumbleTCP extends JumbleNetworkThread {
* Attempts to disconnect gracefully on the Tx thread.
* Disconnects interrupt the socket listening on the Tx thread, suppressing any exceptions
* caused by this request. Any remaining protobuf messages will be dispatched first.
+ *
+ * Suppresses all future errors on this connection.
*/
public void disconnect() {
if (!mRunning) return;
@@ -256,6 +256,18 @@ public class JumbleTCP extends JumbleNetworkThread {
});
}
+ /**
+ * Runnable that
+ */
+ private static class OutboxConsumer implements Runnable {
+
+
+ @Override
+ public void run() {
+
+ }
+ }
+
public interface TCPConnectionListener {
public void onTCPConnectionEstablished();
public void onTLSHandshakeFailed(X509Certificate[] chain);
diff --git a/src/main/java/com/morlunk/jumble/net/JumbleUDP.java b/src/main/java/com/morlunk/jumble/net/JumbleUDP.java
index ab59c82..b593a7f 100644
--- a/src/main/java/com/morlunk/jumble/net/JumbleUDP.java
+++ b/src/main/java/com/morlunk/jumble/net/JumbleUDP.java
@@ -17,15 +17,22 @@
package com.morlunk.jumble.net;
+import android.os.Handler;
import android.util.Log;
import com.morlunk.jumble.Constants;
+import org.jetbrains.annotations.NotNull;
+
import java.io.IOException;
import java.net.ConnectException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import javax.crypto.BadPaddingException;
import javax.crypto.IllegalBlockSizeException;
@@ -33,8 +40,11 @@ import javax.crypto.ShortBufferException;
/**
* Class to maintain and receive packets from the UDP connection to a Mumble server.
+ * Public interface is not thread safe.
*/
-public class JumbleUDP extends JumbleNetworkThread {
+public class JumbleUDP implements Runnable {
+ private static final String TAG = "JumbleUDP";
+
private static final int BUFFER_SIZE = 2048;
private final CryptState mCryptState;
@@ -45,23 +55,34 @@ public class JumbleUDP extends JumbleNetworkThread {
private InetAddress mResolvedHost;
private boolean mConnected;
+ /** Main datagram thread hosting this runnable. */
+ private final Thread mDatagramThread;
+
+ /** Handler to invoke listener callback invocations on. */
+ private final Handler mCallbackHandler;
+
+ /** Unbounded queue of outgoing packets to be sent. */
+ private final BlockingQueue<DatagramPacket> mSendQueue;
+
/**
* Sets up a new UDP connection context.
* @param cryptState Cryptographic state provider.
+ * @param listener Callback target. Messages will be posted on the callback handler given.
+ * @param callbackHandler Handler to post listener invocations on.
*/
- public JumbleUDP(CryptState cryptState) {
+ public JumbleUDP(@NotNull CryptState cryptState, @NotNull UDPConnectionListener listener,
+ @NotNull Handler callbackHandler) {
mCryptState = cryptState;
- }
-
- public void setUDPConnectionListener(UDPConnectionListener listener) {
mListener = listener;
+ mCallbackHandler = callbackHandler;
+ mDatagramThread = new Thread(this);
+ mSendQueue = new LinkedBlockingQueue<>();
}
- public void connect(String host, int port) throws ConnectException {
- if(mConnected) throw new ConnectException("UDP connection already established!");
+ public void connect(@NotNull String host, @NotNull int port) {
mHost = host;
mPort = port;
- startThreads();
+ mDatagramThread.start();
}
public boolean isRunning() {
@@ -70,39 +91,32 @@ public class JumbleUDP extends JumbleNetworkThread {
@Override
public void run() {
+ Thread outgoingConsumerThread = null;
+ mConnected = true;
try {
mResolvedHost = InetAddress.getByName(mHost);
mUDPSocket = new DatagramSocket();
- } catch (final IOException e) {
- if(mListener != null) {
- executeOnMainThread(new Runnable() {
- @Override
- public void run() {
- mListener.onUDPConnectionError(e);
- }
- });
- }
- return;
- }
- mUDPSocket.connect(mResolvedHost, mPort);
- final DatagramPacket packet = new DatagramPacket(new byte[BUFFER_SIZE], BUFFER_SIZE);
+ mUDPSocket.connect(mResolvedHost, mPort);
+ Log.d(TAG, "Created socket");
- Log.d(Constants.TAG, "[UDP] Created socket");
- mConnected = true;
+ // Start outgoing consumer once the UDP socket is open, as a child thread.
+ final OutgoingConsumer outgoingConsumer = new OutgoingConsumer(mUDPSocket, mSendQueue);
+ outgoingConsumerThread = new Thread(outgoingConsumer);
+ outgoingConsumerThread.start();
- while(mConnected) {
- try {
+ final DatagramPacket packet = new DatagramPacket(new byte[BUFFER_SIZE], BUFFER_SIZE);
+ while (mConnected) {
mUDPSocket.receive(packet);
final byte[] data = packet.getData();
final int length = packet.getLength();
if (!mCryptState.isValid()) {
- Log.d(Constants.TAG, "[UDP] CryptState invalid, discarding packet");
+ Log.d(TAG, "CryptState invalid, discarding packet");
continue;
}
if (length < 5) {
- Log.d(Constants.TAG, "[UDP] Packet too short, discarding");
+ Log.d(TAG, "Packet too short, discarding");
continue;
}
@@ -111,82 +125,94 @@ public class JumbleUDP extends JumbleNetworkThread {
if (mListener != null) {
if (buffer != null) {
- executeOnReceiveThread(new Runnable() {
+ mCallbackHandler.post(new Runnable() {
@Override
public void run() {
mListener.onUDPDataReceived(buffer);
}
});
- } else if(mCryptState.getLastGoodElapsed() > 5000000 &&
+ } else if (mCryptState.getLastGoodElapsed() > 5000000 &&
mCryptState.getLastRequestElapsed() > 5000000) {
mCryptState.resetLastRequestTime();
- executeOnMainThread(new Runnable() {
+ mCallbackHandler.post(new Runnable() {
@Override
public void run() {
mListener.resyncCryptState();
}
});
- Log.d(Constants.TAG, "[UDP] Packet failed to decrypt, discarding and " +
- "requesting crypt state resync");
+ Log.d(TAG, "Packet failed to decrypt, discarding and requesting crypt state resync");
} else {
- Log.d(Constants.TAG, "[UDP] Packet failed to decrypt, discarding");
+ Log.d(TAG, "Packet failed to decrypt, discarding");
}
}
- } catch (BadPaddingException|IllegalBlockSizeException|ShortBufferException e) {
- Log.d(Constants.TAG, "[UDP] Discarding packet", e);
- }
- } catch (final IOException e) {
- // If a UDP exception is thrown while connected, notify the listener to fall back to TCP.
- if(mConnected && mListener != null) {
- executeOnMainThread(new Runnable() {
- @Override
- public void run() {
- mListener.onUDPConnectionError(e);
- }
- });
+ } catch (BadPaddingException | IllegalBlockSizeException | ShortBufferException e) {
+ Log.d(Constants.TAG, "Discarding packet", e);
}
- break;
}
+ } catch (final IOException e) {
+ // If mConnected is false, then this is a user-triggered disconnection. Report no error.
+ if (mConnected) {
+ Log.d(TAG, "UDP socket closed unexpectedly");
+ mCallbackHandler.post(new Runnable() {
+ @Override
+ public void run() {
+ mListener.onUDPConnectionError(e);
+ }
+ });
+ } else {
+ Log.d(TAG, "UDP socket closed in response to user disconnect");
+ }
+ } finally {
+ mConnected = false;
+
+ // We want to interrupt the outgoing queue consumer thread to avoid sends after socket
+ // cleanup. Blocking shouldn't be necessary.
+ if (outgoingConsumerThread != null) {
+ outgoingConsumerThread.interrupt();
+ }
+
+ // Clear the outgoing queue, in case the caller decides to reconnect with the same socket.
+ mSendQueue.clear();
+
+ mUDPSocket.close();
}
- disconnect(); // Make sure we close the socket if disconnect wasn't controlled
}
- public void sendMessage(final byte[] data, final int length) {
- if(!mCryptState.isValid() || !mConnected) return;
- executeOnSendThread(new Runnable() {
- @Override
- public void run() {
- try {
- if(!mCryptState.isValid() || !mConnected) return;
- byte[] encryptedData = mCryptState.encrypt(data, length);
- final DatagramPacket packet = new DatagramPacket(encryptedData, encryptedData.length);
- packet.setAddress(mResolvedHost);
- packet.setPort(mPort);
- mUDPSocket.send(packet);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (IllegalBlockSizeException e) {
- e.printStackTrace();
- } catch (ShortBufferException e) {
- e.printStackTrace();
- } catch (BadPaddingException e) {
- e.printStackTrace();
- }
- }
- });
+ public void sendMessage(@NotNull final byte[] data, final int length) {
+ if (!mCryptState.isValid()) {
+ Log.w(TAG, "Invalid cryptstate prior to sendMessage call.");
+ return;
+ }
+ if (!mConnected) {
+ Log.w(TAG, "Tried to send UDP message without an active connection.");
+ return;
+ }
+
+ try {
+ byte[] encryptedData = mCryptState.encrypt(data, length);
+ final DatagramPacket packet = new DatagramPacket(encryptedData, encryptedData.length);
+ packet.setAddress(mResolvedHost);
+ packet.setPort(mPort);
+ mSendQueue.add(packet);
+ } catch (BadPaddingException e) {
+ // TODO
+ e.printStackTrace();
+ } catch (IllegalBlockSizeException e) {
+ // TODO
+ e.printStackTrace();
+ } catch (ShortBufferException e) {
+ // TODO
+ e.printStackTrace();
+ }
}
+ /**
+ * Lazy, non-blocking idempotent disconnect.
+ */
public void disconnect() {
- if(!mConnected) return;
mConnected = false;
- executeOnSendThread(new Runnable() {
- @Override
- public void run() {
- mUDPSocket.disconnect();
- mUDPSocket.close();
- }
- });
- stopThreads();
+ // Closing a socket will trigger an IOException on the consumer thread.
+ mUDPSocket.close();
}
/**
@@ -194,8 +220,40 @@ public class JumbleUDP extends JumbleNetworkThread {
* onUDPDataReceived is always called on the UDP receive thread.
*/
public interface UDPConnectionListener {
- public void onUDPDataReceived(byte[] data);
- public void onUDPConnectionError(Exception e);
- public void resyncCryptState();
+ void onUDPDataReceived(byte[] data);
+ void onUDPConnectionError(Exception e);
+ void resyncCryptState();
+ }
+
+ /**
+ * Runnable that reads from a shared blocking queue, dispatching datagrams when available.
+ */
+ private static class OutgoingConsumer implements Runnable {
+ private final DatagramSocket mSocket;
+ private final BlockingQueue<DatagramPacket> mQueue;
+
+ public OutgoingConsumer(@NotNull DatagramSocket socket,
+ @NotNull BlockingQueue<DatagramPacket> queue) {
+ mSocket = socket;
+ mQueue = queue;
+ }
+
+ @Override
+ public void run() {
+ Log.d(TAG, "Datagram outbox consumer active");
+ boolean interrupted = false;
+ while (!interrupted) {
+ try {
+ DatagramPacket packet = mQueue.take();
+ mSocket.send(packet);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ // Our datagram thread interrupted us. We should stop reading.
+ interrupted = true;
+ }
+ }
+ Log.d(TAG, "Datagram outbox consumer shutdown");
+ }
}
}