Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/SoftEtherVPN/SoftEtherVPN_Stable.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/Cedar/Command.c')
-rw-r--r--src/Cedar/Command.c762
1 files changed, 456 insertions, 306 deletions
diff --git a/src/Cedar/Command.c b/src/Cedar/Command.c
index 365257cd..81e12af0 100644
--- a/src/Cedar/Command.c
+++ b/src/Cedar/Command.c
@@ -3,9 +3,9 @@
//
// SoftEther VPN Server, Client and Bridge are free software under GPLv2.
//
-// Copyright (c) 2012-2016 Daiyuu Nobori.
-// Copyright (c) 2012-2016 SoftEther VPN Project, University of Tsukuba, Japan.
-// Copyright (c) 2012-2016 SoftEther Corporation.
+// Copyright (c) Daiyuu Nobori, Ph.D..
+// Copyright (c) SoftEther VPN Project, University of Tsukuba, Japan.
+// Copyright (c) SoftEther Corporation.
//
// All Rights Reserved.
//
@@ -1166,6 +1166,7 @@ void TtGenerateRandomData(UCHAR **buf, UINT *size)
void TtsWorkerThread(THREAD *thread, void *param)
{
TTS *tts;
+ TTS_WORKER *w;
UINT buf_size;
UCHAR *send_buf_data, *recv_buf_data;
bool all_sockets_blocked = false;
@@ -1185,14 +1186,15 @@ void TtsWorkerThread(THREAD *thread, void *param)
TtGenerateRandomData(&send_buf_data, &buf_size);
TtGenerateRandomData(&recv_buf_data, &buf_size);
- tts = (TTS *)param;
+ w = (TTS_WORKER *)param;
+ tts = (TTS *)w->Tts;
// Preparation of socket events
- tts->SockEvent = NewSockEvent();
- AddRef(tts->SockEvent->ref);
+ w->SockEvent = NewSockEvent();
+ AddRef(w->SockEvent->ref);
// Preparing the Server socket list
- tts->TtsSockList = NewList(NULL);
+ w->TtsSockList = NewList(NULL);
// Notify completion of preparation to parent thread
NoticeThreadInit(thread);
@@ -1206,12 +1208,12 @@ void TtsWorkerThread(THREAD *thread, void *param)
// Wait for all sockets
if (dont_block_next_time == false)
{
- WaitSockEvent(tts->SockEvent, 50);
+ WaitSockEvent(w->SockEvent, 50);
}
dont_block_next_time = false;
// Process for sockets that are currently registered
- LockList(tts->TtsSockList);
+ LockList(w->TtsSockList);
{
UINT i;
@@ -1223,17 +1225,17 @@ void TtsWorkerThread(THREAD *thread, void *param)
{
all_sockets_blocked = true;
- for (i = 0;i < LIST_NUM(tts->TtsSockList);i++)
+ for (i = 0;i < LIST_NUM(w->TtsSockList);i++)
{
UINT ret = SOCK_LATER;
UCHAR *send_data = NULL, *recv_data = NULL;
UINT send_size = 0, recv_size = 0;
- TTS_SOCK *ts = LIST_DATA(tts->TtsSockList, i);
+ TTS_SOCK *ts = LIST_DATA(w->TtsSockList, i);
bool blocked_for_this_socket = false;
if (ts->SockJoined == false)
{
- JoinSockToSockEvent(ts->Sock, tts->SockEvent);
+ JoinSockToSockEvent(ts->Sock, w->SockEvent);
ts->SockJoined = true;
}
@@ -1396,9 +1398,9 @@ void TtsWorkerThread(THREAD *thread, void *param)
{
// Not to send more data to the socket of the
// transmission direction in the same session ID
- for (j = 0;j < LIST_NUM(tts->TtsSockList);j++)
+ for (j = 0;j < LIST_NUM(w->TtsSockList);j++)
{
- TTS_SOCK *ts2 = LIST_DATA(tts->TtsSockList, j);
+ TTS_SOCK *ts2 = LIST_DATA(w->TtsSockList, j);
if (ts2->SessionId == ts->SessionId &&
ts2 != ts)
@@ -1457,7 +1459,7 @@ void TtsWorkerThread(THREAD *thread, void *param)
Disconnect(ts->Sock);
ReleaseSock(ts->Sock);
- Delete(tts->TtsSockList, ts);
+ Delete(w->TtsSockList, ts);
Free(ts);
}
@@ -1465,23 +1467,23 @@ void TtsWorkerThread(THREAD *thread, void *param)
DeleteAll(o);
}
- if (tts->NewSocketArrived || tts->Halt)
+ if (w->NewSocketArrived || tts->Halt)
{
- tts->NewSocketArrived = false;
+ w->NewSocketArrived = false;
all_sockets_blocked = true;
dont_block_next_time = true;
}
}
}
- UnlockList(tts->TtsSockList);
+ UnlockList(w->TtsSockList);
}
- LockList(tts->TtsSockList);
+ LockList(w->TtsSockList);
{
// Release the sockets of all remaining
- for (i = 0;i < LIST_NUM(tts->TtsSockList);i++)
+ for (i = 0;i < LIST_NUM(w->TtsSockList);i++)
{
- TTS_SOCK *ts = LIST_DATA(tts->TtsSockList, i);
+ TTS_SOCK *ts = LIST_DATA(w->TtsSockList, i);
UniFormat(tmp, sizeof(tmp), _UU("TTS_DISCONNECT"), ts->Id, ts->Sock->RemoteHostname);
TtPrint(tts->Param, tts->Print, tmp);
@@ -1492,12 +1494,12 @@ void TtsWorkerThread(THREAD *thread, void *param)
Free(ts);
}
}
- UnlockList(tts->TtsSockList);
+ UnlockList(w->TtsSockList);
// Cleanup
ReleaseList(o);
- ReleaseList(tts->TtsSockList);
- ReleaseSockEvent(tts->SockEvent);
+ ReleaseList(w->TtsSockList);
+ ReleaseSockEvent(w->SockEvent);
Free(send_buf_data);
Free(recv_buf_data);
}
@@ -1519,6 +1521,7 @@ void TtsIPv6AcceptThread(THREAD *thread, void *param)
void TtsAcceptProc(TTS *tts, SOCK *listen_socket)
{
wchar_t tmp[MAX_SIZE];
+ UINT seed = 0;
// Validate arguments
if (tts == NULL || listen_socket == NULL)
{
@@ -1541,10 +1544,23 @@ void TtsAcceptProc(TTS *tts, SOCK *listen_socket)
}
else
{
+ UINT num, i;
+ TTS_WORKER *w;
+
// Connected from the client
AcceptInitEx(s, true);
- tts->NewSocketArrived = true;
- LockList(tts->TtsSockList);
+
+ // Choose a worker thread
+ num = LIST_NUM(tts->WorkerList);
+
+ i = seed % num;
+
+ seed++;
+
+ w = LIST_DATA(tts->WorkerList, i);
+
+ w->NewSocketArrived = true;
+ LockList(w->TtsSockList);
{
TTS_SOCK *ts = ZeroMalloc(sizeof(TTS_SOCK));
@@ -1558,12 +1574,12 @@ void TtsAcceptProc(TTS *tts, SOCK *listen_socket)
s->RemoteHostname, s->RemotePort);
TtPrint(tts->Param, tts->Print, tmp);
- Insert(tts->TtsSockList, ts);
- tts->NewSocketArrived = true;
+ Insert(w->TtsSockList, ts);
+ w->NewSocketArrived = true;
}
- UnlockList(tts->TtsSockList);
+ UnlockList(w->TtsSockList);
- SetSockEvent(tts->SockEvent);
+ SetSockEvent(w->SockEvent);
}
}
}
@@ -1598,6 +1614,8 @@ void TtsListenThread(THREAD *thread, void *param)
}
else
{
+ UINT i, num_worker_threads;
+
UniFormat(tmp, sizeof(tmp), _UU("TTS_LISTEN_STARTED"), tts->Port);
TtPrint(tts->Param, tts->Print, tmp);
@@ -1621,9 +1639,19 @@ void TtsListenThread(THREAD *thread, void *param)
AddRef(tts->ListenSocketV6->ref);
}
- // Start the worker thread
- tts->WorkThread = NewThread(TtsWorkerThread, tts);
- WaitThreadInit(tts->WorkThread);
+ num_worker_threads = GetNumberOfCpu();
+
+ // Start the worker threads
+ for (i = 0;i < num_worker_threads;i++)
+ {
+ TTS_WORKER *w = ZeroMalloc(sizeof(TTS_WORKER));
+
+ w->Tts = tts;
+ w->WorkThread = NewThread(TtsWorkerThread, w);
+ WaitThreadInit(w->WorkThread);
+
+ Add(tts->WorkerList, w);
+ }
// Notify completion of preparation to parent thread
NoticeThreadInit(thread);
@@ -1647,12 +1675,20 @@ void TtsListenThread(THREAD *thread, void *param)
ReleaseSock(tts->ListenSocket);
ReleaseSock(tts->ListenSocketV6);
- SetSockEvent(tts->SockEvent);
- // Wait for stopping the worker thread
- WaitThread(tts->WorkThread, INFINITE);
- ReleaseThread(tts->WorkThread);
- ReleaseSockEvent(tts->SockEvent);
+ for (i = 0;i < LIST_NUM(tts->WorkerList);i++)
+ {
+ TTS_WORKER *w = LIST_DATA(tts->WorkerList, i);
+
+ SetSockEvent(w->SockEvent);
+
+ // Wait for stopping the worker thread
+ WaitThread(w->WorkThread, INFINITE);
+ ReleaseThread(w->WorkThread);
+ ReleaseSockEvent(w->SockEvent);
+
+ Free(w);
+ }
}
}
@@ -1737,7 +1773,6 @@ void StopTtc(TTC *ttc)
TtPrint(ttc->Param, ttc->Print, _UU("TTC_STOPPING"));
ttc->Halt = true;
- SetSockEvent(ttc->SockEvent);
}
// Generate a result
@@ -1800,6 +1835,301 @@ void TtcGenerateResult(TTC *ttc)
res->BpsTotal = res->BpsUpload + res->BpsDownload;
}
+// Client worker thread
+void TtcWorkerThread(THREAD *thread, void *param)
+{
+ TTC_WORKER *w;
+ TTC *ttc;
+ bool dont_block_next_time = false;
+ bool halting = false;
+ UINT64 halt_timeout = 0;
+ bool all_sockets_blocked;
+ wchar_t tmp[MAX_SIZE];
+ UCHAR *send_buf_data, *recv_buf_data;
+ UINT buf_size;
+ UINT64 tmp64;
+
+ if (thread == NULL || param == NULL)
+ {
+ return;
+ }
+
+ w = (TTC_WORKER *)param;
+ ttc = w->Ttc;
+
+ // Allocate the data area
+ TtGenerateRandomData(&send_buf_data, &buf_size);
+ TtGenerateRandomData(&recv_buf_data, &buf_size);
+
+ NoticeThreadInit(thread);
+
+ // Wait for start
+ Wait(w->StartEvent, INFINITE);
+
+ // Main loop
+ while (true)
+ {
+ UINT i;
+
+ if (dont_block_next_time == false)
+ {
+ WaitSockEvent(w->SockEvent, 50);
+ }
+
+ dont_block_next_time = false;
+
+ if (ttc->AbnormalTerminated)
+ {
+ // Abnormal termination occured
+ break;
+ }
+
+ if (ttc->Halt || ttc->end_tick <= Tick64() || (ttc->Cancel != NULL && (*ttc->Cancel)))
+ {
+ // End measurement
+ if (halting == false)
+ {
+ if (ttc->Halt || (ttc->Cancel != NULL && (*ttc->Cancel)))
+ {
+ if ((ttc->flag1++) == 0)
+ {
+ // User cancel
+ TtPrint(ttc->Param, ttc->Print, _UU("TTC_COMM_USER_CANCEL"));
+ }
+ }
+ else
+ {
+ // Time elapsed
+ if ((ttc->flag2++) == 0)
+ {
+ UniFormat(tmp, sizeof(tmp), _UU("TTC_COMM_END"),
+ (double)ttc->Span / 1000.0);
+ TtPrint(ttc->Param, ttc->Print, tmp);
+ }
+ }
+
+ if (ttc->RealSpan == 0)
+ {
+ ttc->RealSpan = Tick64() - ttc->start_tick;
+ }
+
+ halting = true;
+
+ // Wait for reporting data from the server
+ halt_timeout = Tick64() + 60000ULL;
+ }
+ }
+
+ if (halt_timeout != 0)
+ {
+ bool ok = true;
+
+ // Wait that all TCP connections to finish processing
+ for (i = 0;i < LIST_NUM(w->SockList);i++)
+ {
+ TTC_SOCK *ts = LIST_DATA(w->SockList, i);
+
+ if (ts->Download == false)
+ {
+ if (ts->ServerUploadReportReceived == false)
+ {
+ ok = false;
+ }
+ }
+ }
+
+ if (ok)
+ {
+ // Measurement completed
+ w->Ok = true;
+ break;
+ }
+ else
+ {
+ if (halt_timeout <= Tick64())
+ {
+ // An error occured
+ ttc->AbnormalTerminated = true;
+ ttc->ErrorCode = ERR_PROTOCOL_ERROR;
+ break;
+ }
+ }
+ }
+
+ all_sockets_blocked = false;
+
+ // Continue to send and receive data
+ // until all sockets become block state
+ while (all_sockets_blocked == false)
+ {
+ all_sockets_blocked = true;
+
+ for (i = 0;i < LIST_NUM(w->SockList);i++)
+ {
+ UINT ret = SOCK_LATER;
+ TTC_SOCK *ts = LIST_DATA(w->SockList, i);
+ bool blocked_for_this_socket = false;
+ UCHAR c = 0;
+ UCHAR c_and_session_id[1 + sizeof(UINT64) + sizeof(UINT64)];
+
+ if (halt_timeout != 0)
+ {
+ if (ts->State != 3 && ts->State != 4)
+ {
+ if (ts->Download == false)
+ {
+ if (ts->State != 0)
+ {
+ ts->State = 3;
+ }
+ else
+ {
+ ts->ServerUploadReportReceived = true;
+ ts->State = 4;
+ }
+ }
+ else
+ {
+ ts->State = 4;
+ }
+ }
+ }
+
+ switch (ts->State)
+ {
+ case 0:
+ // Initial state: Specify the direction of
+ // the data flow between client-server
+ if (ts->Download)
+ {
+ c = 1;
+ }
+ else
+ {
+ c = 0;
+ }
+
+ c_and_session_id[0] = c;
+ WRITE_UINT64(c_and_session_id + 1, ttc->session_id);
+ WRITE_UINT64(c_and_session_id + sizeof(UINT64) + 1, ttc->Span);
+
+ ret = Send(ts->Sock, c_and_session_id, 1 + sizeof(UINT64) + sizeof(UINT64), false);
+
+ if (ret != 0 && ret != SOCK_LATER)
+ {
+ if (ts->Download)
+ {
+ ts->State = 1;
+ }
+ else
+ {
+ ts->State = 2;
+ }
+ }
+ break;
+
+ case 1:
+ // Server -> Client (download)
+ ret = Recv(ts->Sock, recv_buf_data, buf_size, false);
+ break;
+
+ case 2:
+ // Client -> Server (upload)
+ ret = Send(ts->Sock, send_buf_data, buf_size, false);
+ break;
+
+ case 3:
+ // Transmission completion client -> server (upload)
+ // Request the data size
+ if (ts->NextSendRequestReportTick == 0 ||
+ (Tick64() >= ts->NextSendRequestReportTick))
+ {
+ UCHAR suprise[MAX_SIZE];
+ UINT i;
+
+ ts->NextSendRequestReportTick = Tick64() + 200ULL;
+
+ for (i = 0;i < sizeof(suprise);i++)
+ {
+ suprise[i] = '!';
+ }
+
+ ret = Send(ts->Sock, suprise, sizeof(suprise), false);
+ }
+
+ ret = Recv(ts->Sock, &tmp64, sizeof(tmp64), false);
+ if (ret != 0 && ret != SOCK_LATER && ret == sizeof(tmp64))
+ {
+ ts->NumBytes = Endian64(tmp64);
+
+ ts->ServerUploadReportReceived = true;
+
+ ts->State = 4;
+ }
+ break;
+
+ case 4:
+ // Do Nothing
+ if (Recv(ts->Sock, recv_buf_data, buf_size, false) == SOCK_LATER)
+ {
+ ret = SOCK_LATER;
+ }
+ break;
+ }
+
+ if (ret == 0)
+ {
+ // The socket is disconnected
+ ttc->AbnormalTerminated = true;
+ ttc->ErrorCode = ERR_PROTOCOL_ERROR;
+ blocked_for_this_socket = true;
+ dont_block_next_time = false;
+
+ if (ts->HideErrMsg == false)
+ {
+ UniFormat(tmp, sizeof(tmp), _UU("TTC_COMM_DISCONNECTED"), ts->Id);
+ TtPrint(ttc->Param, ttc->Print, tmp);
+ ts->HideErrMsg = true;
+ }
+ }
+ else if (ret == SOCK_LATER)
+ {
+ // Delay has occurred
+ blocked_for_this_socket = true;
+ dont_block_next_time = false;
+ }
+ else
+ {
+ if (ts->Download)
+ {
+ ts->NumBytes += (UINT64)ret;
+ }
+ }
+
+ if (blocked_for_this_socket == false)
+ {
+ all_sockets_blocked = false;
+ }
+ }
+
+ if (ttc->Halt || (ttc->Cancel != NULL && (*ttc->Cancel)))
+ {
+ all_sockets_blocked = true;
+ dont_block_next_time = true;
+ }
+
+ if (ttc->end_tick <= Tick64())
+ {
+ all_sockets_blocked = true;
+ dont_block_next_time = true;
+ }
+ }
+ }
+
+ Free(send_buf_data);
+ Free(recv_buf_data);
+}
+
// Client thread
void TtcThread(THREAD *thread, void *param)
{
@@ -1807,8 +2137,6 @@ void TtcThread(THREAD *thread, void *param)
UINT i;
wchar_t tmp[MAX_SIZE];
bool ok = false;
- UINT buf_size;
- UCHAR *send_buf_data, *recv_buf_data;
IP ip_ret;
// Validate arguments
if (thread == NULL || param == NULL)
@@ -1816,15 +2144,8 @@ void TtcThread(THREAD *thread, void *param)
return;
}
- // Allocate the data area
- TtGenerateRandomData(&send_buf_data, &buf_size);
- TtGenerateRandomData(&recv_buf_data, &buf_size);
-
ttc = (TTC *)param;
- ttc->SockEvent = NewSockEvent();
- AddRef(ttc->SockEvent->ref);
-
// Ready
NoticeThreadInit(thread);
@@ -1904,8 +2225,6 @@ void TtcThread(THREAD *thread, void *param)
ts->Sock = s;
SetTimeout(s, TIMEOUT_INFINITE);
-
- JoinSockToSockEvent(s, ttc->SockEvent);
}
Insert(ttc->ItcSockList, ts);
@@ -1921,278 +2240,109 @@ void TtcThread(THREAD *thread, void *param)
if (ok)
{
- bool all_sockets_blocked;
- bool dont_block_next_time = false;
- bool halt_flag = false;
UINT64 start_tick, end_tick;
UINT64 halt_timeout = 0;
wchar_t tmp1[MAX_SIZE], tmp2[MAX_SIZE];
UINT check_clock_seed = 0;
bool halting = false;
- UINT64 tmp64;
UINT64 session_id = Rand64();
+ UINT i, num_cpu;
+ bool all_ok = false;
- // Record the current time
- start_tick = Tick64();
- end_tick = start_tick + ttc->Span;
-
- // Show start message
- GetDateTimeStrEx64(tmp1, sizeof(tmp1), SystemToLocal64(TickToTime(start_tick)), NULL);
- GetDateTimeStrEx64(tmp2, sizeof(tmp2), SystemToLocal64(TickToTime(end_tick)), NULL);
- UniFormat(tmp, sizeof(tmp), _UU("TTC_COMM_START"), tmp1, tmp2);
- TtPrint(ttc->Param, ttc->Print, tmp);
-
- // Main loop
- while (true)
- {
- UINT i;
-
- if (dont_block_next_time == false)
- {
- WaitSockEvent(ttc->SockEvent, 50);
- }
-
- dont_block_next_time = false;
-
- if (ttc->AbnormalTerminated)
- {
- // Abnormal termination occured
- break;
- }
-
- if (ttc->Halt || end_tick <= Tick64() || (ttc->Cancel != NULL && (*ttc->Cancel)))
- {
- // End measurement
- if (halting == false)
- {
- if (ttc->Halt || (ttc->Cancel != NULL && (*ttc->Cancel)))
- {
- // User cancel
- TtPrint(ttc->Param, ttc->Print, _UU("TTC_COMM_USER_CANCEL"));
- }
- else
- {
- // Time elapsed
- UniFormat(tmp, sizeof(tmp), _UU("TTC_COMM_END"),
- (double)ttc->Span / 1000.0);
- TtPrint(ttc->Param, ttc->Print, tmp);
- }
-
- ttc->RealSpan = Tick64() - start_tick;
-
- halting = true;
-
- // Wait for reporting data from the server
- halt_timeout = Tick64() + 60000ULL;
- }
- }
-
- if (halt_timeout != 0)
- {
- bool ok = true;
-
- // Wait that all TCP connections to finish processing
- for (i = 0;i < LIST_NUM(ttc->ItcSockList);i++)
- {
- TTC_SOCK *ts = LIST_DATA(ttc->ItcSockList, i);
+ ttc->session_id = session_id;
- if (ts->Download == false)
- {
- if (ts->ServerUploadReportReceived == false)
- {
- ok = false;
- }
- }
- }
+ num_cpu = GetNumberOfCpu();
- if (ok)
- {
- // Measurement completed
- // Show the result
- TtcGenerateResult(ttc);
- break;
- }
- else
- {
- if (halt_timeout <= Tick64())
- {
- // An error occured
- ttc->AbnormalTerminated = true;
- ttc->ErrorCode = ERR_PROTOCOL_ERROR;
- break;
- }
- }
- }
+ ttc->WorkerThreadList = NewList(NULL);
- all_sockets_blocked = false;
+ for (i = 0;i < num_cpu;i++)
+ {
+ TTC_WORKER *w = ZeroMalloc(sizeof(TTC_WORKER));
- // Continue to send and receive data
- // until all sockets become block state
- while (all_sockets_blocked == false)
- {
- all_sockets_blocked = true;
+ w->Ttc = ttc;
+ w->SockList = NewList(NULL);
+ w->StartEvent = NewEvent();
+ w->SockEvent = NewSockEvent();
- for (i = 0;i < LIST_NUM(ttc->ItcSockList);i++)
- {
- UINT ret = SOCK_LATER;
- TTC_SOCK *ts = LIST_DATA(ttc->ItcSockList, i);
- bool blocked_for_this_socket = false;
- UCHAR c = 0;
- UCHAR c_and_session_id[1 + sizeof(UINT64) + sizeof(UINT64)];
-
- if (halt_timeout != 0)
- {
- if (ts->State != 3 && ts->State != 4)
- {
- if (ts->Download == false)
- {
- if (ts->State != 0)
- {
- ts->State = 3;
- }
- else
- {
- ts->ServerUploadReportReceived = true;
- ts->State = 4;
- }
- }
- else
- {
- ts->State = 4;
- }
- }
- }
+ w->WorkerThread = NewThread(TtcWorkerThread, w);
- switch (ts->State)
- {
- case 0:
- // Initial state: Specify the direction of
- // the data flow between client-server
- if (ts->Download)
- {
- c = 1;
- }
- else
- {
- c = 0;
- }
+ WaitThreadInit(w->WorkerThread);
- c_and_session_id[0] = c;
- WRITE_UINT64(c_and_session_id + 1, session_id);
- WRITE_UINT64(c_and_session_id + sizeof(UINT64) + 1, ttc->Span);
+ Add(ttc->WorkerThreadList, w);
+ }
- ret = Send(ts->Sock, c_and_session_id, 1 + sizeof(UINT64) + sizeof(UINT64), false);
+ // Assign each of sockets to each of worker threads
+ for (i = 0;i < LIST_NUM(ttc->ItcSockList);i++)
+ {
+ TTC_SOCK *ts = LIST_DATA(ttc->ItcSockList, i);
+ UINT num = LIST_NUM(ttc->WorkerThreadList);
+ UINT j = i % num;
+ TTC_WORKER *w = LIST_DATA(ttc->WorkerThreadList, j);
- if (ret != 0 && ret != SOCK_LATER)
- {
- if (ts->Download)
- {
- ts->State = 1;
- }
- else
- {
- ts->State = 2;
- }
- }
- break;
+ Add(w->SockList, ts);
- case 1:
- // Server -> Client (download)
- ret = Recv(ts->Sock, recv_buf_data, buf_size, false);
- break;
+ JoinSockToSockEvent(ts->Sock, w->SockEvent);
+ }
- case 2:
- // Client -> Server (upload)
- ret = Send(ts->Sock, send_buf_data, buf_size, false);
- break;
+ // Record the current time
+ start_tick = Tick64();
+ end_tick = start_tick + ttc->Span;
- case 3:
- // Transmission completion client -> server (upload)
- // Request the data size
- if (ts->NextSendRequestReportTick == 0 ||
- (Tick64() >= ts->NextSendRequestReportTick))
- {
- UCHAR suprise[MAX_SIZE];
- UINT i;
+ ttc->start_tick = start_tick;
+ ttc->end_tick = end_tick;
- ts->NextSendRequestReportTick = Tick64() + 200ULL;
+ // Set the start event for all worker threads
+ for (i = 0;i < LIST_NUM(ttc->WorkerThreadList);i++)
+ {
+ TTC_WORKER *w = LIST_DATA(ttc->WorkerThreadList, i);
- for (i = 0;i < sizeof(suprise);i++)
- {
- suprise[i] = '!';
- }
+ Set(w->StartEvent);
+ }
- ret = Send(ts->Sock, suprise, sizeof(suprise), false);
- }
+ // Show start message
+ GetDateTimeStrEx64(tmp1, sizeof(tmp1), SystemToLocal64(TickToTime(start_tick)), NULL);
+ GetDateTimeStrEx64(tmp2, sizeof(tmp2), SystemToLocal64(TickToTime(end_tick)), NULL);
+ UniFormat(tmp, sizeof(tmp), _UU("TTC_COMM_START"), tmp1, tmp2);
+ TtPrint(ttc->Param, ttc->Print, tmp);
- ret = Recv(ts->Sock, &tmp64, sizeof(tmp64), false);
- if (ret != 0 && ret != SOCK_LATER && ret == sizeof(tmp64))
- {
- ts->NumBytes = Endian64(tmp64);
+ // Wait for all worker threads finish
+ all_ok = true;
+ for (i = 0;i < LIST_NUM(ttc->WorkerThreadList);i++)
+ {
+ TTC_WORKER *w = LIST_DATA(ttc->WorkerThreadList, i);
- ts->ServerUploadReportReceived = true;
+ WaitThread(w->WorkerThread, INFINITE);
- ts->State = 4;
- }
- break;
+ if (w->Ok == false)
+ {
+ all_ok = false;
+ }
+ }
- case 4:
- // Do Nothing
- if (Recv(ts->Sock, recv_buf_data, buf_size, false) == SOCK_LATER)
- {
- ret = SOCK_LATER;
- }
- break;
- }
+ if (all_ok)
+ {
+ // Measurement completed
+ // Show the result
+ TtcGenerateResult(ttc);
+ }
- if (ret == 0)
- {
- // The socket is disconnected
- ttc->AbnormalTerminated = true;
- ttc->ErrorCode = ERR_PROTOCOL_ERROR;
- blocked_for_this_socket = true;
- dont_block_next_time = false;
+ // Release worker threads
+ for (i = 0;i < LIST_NUM(ttc->WorkerThreadList);i++)
+ {
+ TTC_WORKER *w = LIST_DATA(ttc->WorkerThreadList, i);
- if (ts->HideErrMsg == false)
- {
- UniFormat(tmp, sizeof(tmp), _UU("TTC_COMM_DISCONNECTED"), ts->Id);
- TtPrint(ttc->Param, ttc->Print, tmp);
- ts->HideErrMsg = true;
- }
- }
- else if (ret == SOCK_LATER)
- {
- // Delay has occurred
- blocked_for_this_socket = true;
- dont_block_next_time = false;
- }
- else
- {
- if (ts->Download)
- {
- ts->NumBytes += (UINT64)ret;
- }
- }
+ ReleaseThread(w->WorkerThread);
- if (blocked_for_this_socket == false)
- {
- all_sockets_blocked = false;
- }
- }
+ ReleaseEvent(w->StartEvent);
+ ReleaseList(w->SockList);
- if (ttc->Halt || (ttc->Cancel != NULL && (*ttc->Cancel)))
- {
- all_sockets_blocked = true;
- dont_block_next_time = true;
- }
+ ReleaseSockEvent(w->SockEvent);
- if (end_tick <= Tick64())
- {
- all_sockets_blocked = true;
- dont_block_next_time = true;
- }
- }
+ Free(w);
}
+
+ ReleaseList(ttc->WorkerThreadList);
+ ttc->WorkerThreadList = NULL;
}
else
{
@@ -2211,10 +2361,7 @@ void TtcThread(THREAD *thread, void *param)
Free(ts);
}
- ReleaseSockEvent(ttc->SockEvent);
ReleaseList(ttc->ItcSockList);
- Free(send_buf_data);
- Free(recv_buf_data);
}
// Start the communication throughput measurement client
@@ -2280,7 +2427,6 @@ UINT FreeTtc(TTC *ttc, TT_RESULT *result)
}
}
- ReleaseSockEvent(ttc->SockEvent);
ReleaseEvent(ttc->InitedEvent);
Free(ttc);
@@ -2301,6 +2447,8 @@ TTS *NewTts(UINT port, void *param, TT_PRINT_PROC *print_proc)
TtPrint(param, print_proc, _UU("TTS_INIT"));
+ tts->WorkerList = NewList(NULL);
+
// Creating a thread
t = NewThread(TtsListenThread, tts);
WaitThreadInit(t);
@@ -2337,6 +2485,8 @@ UINT FreeTts(TTS *tts)
ret = tts->ErrorCode;
+ ReleaseList(tts->WorkerList);
+
Free(tts);
return ret;
@@ -2356,16 +2506,20 @@ void PtTrafficPrintProc(void *param, wchar_t *str)
if (c->ConsoleType == CONSOLE_LOCAL)
{
- wchar_t tmp[MAX_SIZE];
-
- // Display only if the local console
- // (Can not be displayed because threads aren't synchronized otherwise?)
- UniStrCpy(tmp, sizeof(tmp), str);
- if (UniEndWith(str, L"\n") == false)
+ Lock(c->OutputLock);
{
- UniStrCat(tmp, sizeof(tmp), L"\n");
+ wchar_t tmp[MAX_SIZE];
+
+ // Display only if the local console
+ // (Can not be displayed because threads aren't synchronized otherwise?)
+ UniStrCpy(tmp, sizeof(tmp), str);
+ if (UniEndWith(str, L"\n") == false)
+ {
+ UniStrCat(tmp, sizeof(tmp), L"\n");
+ }
+ UniPrint(L"%s", tmp);
}
- UniPrint(L"%s", tmp);
+ Unlock(c->OutputLock);
}
}
@@ -23850,7 +24004,3 @@ LABEL_CLEANUP:
#endif // OS_WIN32
-
-// Developed by SoftEther VPN Project at University of Tsukuba in Japan.
-// Department of Computer Science has dozens of overly-enthusiastic geeks.
-// Join us: http://www.tsukuba.ac.jp/english/admission/