//------------------------------------------------------------------------------ // File: AsyncIo.cpp // // Desc: DirectShow sample code - base library with I/O functionality. // // Copyright (c) Microsoft Corporation. All rights reserved. //------------------------------------------------------------------------------ #include "stdafx.h" #include "BaseClasses/streams.h" #include "asyncio.h" // --- CAsyncRequest --- // implementation of CAsyncRequest representing a single // outstanding request. All the i/o for this object is done // in the Complete method. // init the params for this request. // Read is not issued until the complete call HRESULT CAsyncRequest::Request( CAsyncIo *pIo, CAsyncStream *pStream, LONGLONG llPos, LONG lLength, BOOL bAligned, BYTE* pBuffer, LPVOID pContext, // filter's context DWORD_PTR dwUser) // downstream filter's context { m_pIo = pIo; m_pStream = pStream; m_llPos = llPos; m_lLength = lLength; m_bAligned = bAligned; m_pBuffer = pBuffer; m_pContext = pContext; m_dwUser = dwUser; m_hr = VFW_E_TIMEOUT; // not done yet return S_OK; } // issue the i/o if not overlapped, and block until i/o complete. // returns error code of file i/o // // HRESULT CAsyncRequest::Complete() { m_pStream->Lock(); m_hr = m_pStream->SetPointer(m_llPos); if(S_OK == m_hr) { DWORD dwActual; m_hr = m_pStream->Read(m_pBuffer, m_lLength, m_bAligned, &dwActual); if(m_hr == OLE_S_FIRST) { if(m_pContext) { IMediaSample *pSample = reinterpret_cast(m_pContext); pSample->SetDiscontinuity(TRUE); m_hr = S_OK; } } if(FAILED(m_hr)) { } else if(dwActual != (DWORD)m_lLength) { // tell caller size changed - probably because of EOF m_lLength = (LONG) dwActual; m_hr = S_FALSE; } else { m_hr = S_OK; } } m_pStream->Unlock(); return m_hr; } // --- CAsyncIo --- // note - all events created manual reset CAsyncIo::CAsyncIo(CAsyncStream *pStream) : m_pStream(pStream), m_bFlushing(FALSE), m_listWork(NAME("Work list")), m_listDone(NAME("Done list")), m_evWork(TRUE), m_evDone(TRUE), m_cItemsOut(0), m_bWaiting(FALSE), m_evStop(TRUE), m_hThread(NULL) { } CAsyncIo::~CAsyncIo() { // move everything to the done list BeginFlush(); // shutdown worker thread CloseThread(); // empty the done list POSITION pos = m_listDone.GetHeadPosition(); while(pos) { CAsyncRequest* pRequest = m_listDone.GetNext(pos); delete pRequest; } m_listDone.RemoveAll(); } // ready for async activity - call this before calling Request. // // start the worker thread if we need to // // !!! use overlapped i/o if possible HRESULT CAsyncIo::AsyncActive(void) { return StartThread(); } // call this when no more async activity will happen before // the next AsyncActive call // // stop the worker thread if active HRESULT CAsyncIo::AsyncInactive(void) { return CloseThread(); } // add a request to the queue. HRESULT CAsyncIo::Request( LONGLONG llPos, LONG lLength, BOOL bAligned, BYTE * pBuffer, LPVOID pContext, DWORD_PTR dwUser) { if(bAligned) { if(!IsAligned(llPos) || !IsAligned(lLength) || !IsAligned((LONG_PTR) pBuffer)) { return VFW_E_BADALIGN; } } CAsyncRequest* pRequest = new CAsyncRequest; if (!pRequest) return E_OUTOFMEMORY; HRESULT hr = pRequest->Request(this, m_pStream, llPos, lLength, bAligned, pBuffer, pContext, dwUser); if(SUCCEEDED(hr)) { // might fail if flushing hr = PutWorkItem(pRequest); } if(FAILED(hr)) { delete pRequest; } return hr; } // wait for the next request to complete HRESULT CAsyncIo::WaitForNext( DWORD dwTimeout, LPVOID * ppContext, DWORD_PTR * pdwUser, LONG * pcbActual) { CheckPointer(ppContext,E_POINTER); CheckPointer(pdwUser,E_POINTER); CheckPointer(pcbActual,E_POINTER); // some errors find a sample, others don't. Ensure that // *ppContext is NULL if no sample found *ppContext = NULL; // wait until the event is set, but since we are not // holding the critsec when waiting, we may need to re-wait for(;;) { if(!m_evDone.Wait(dwTimeout)) { // timeout occurred return VFW_E_TIMEOUT; } // get next event from list CAsyncRequest* pRequest = GetDoneItem(); if(pRequest) { // found a completed request // check if ok HRESULT hr = pRequest->GetHResult(); if(hr == S_FALSE) { // this means the actual length was less than // requested - may be ok if he aligned the end of file if((pRequest->GetActualLength() + pRequest->GetStart()) == Size()) { hr = S_OK; } else { // it was an actual read error hr = E_FAIL; } } // return actual bytes read *pcbActual = pRequest->GetActualLength(); // return his context *ppContext = pRequest->GetContext(); *pdwUser = pRequest->GetUser(); delete pRequest; return hr; } else { // Hold the critical section while checking the list state CAutoLock lck(&m_csLists); if(m_bFlushing && !m_bWaiting) { // can't block as we are between BeginFlush and EndFlush // but note that if m_bWaiting is set, then there are some // items not yet complete that we should block for. return VFW_E_WRONG_STATE; } } // done item was grabbed between completion and // us locking m_csLists. } } // perform a synchronous read request on this thread. // Need to hold m_csFile while doing this (done in request object) HRESULT CAsyncIo::SyncReadAligned( LONGLONG llPos, LONG lLength, BYTE * pBuffer, LONG * pcbActual, PVOID pvContext) { CheckPointer(pcbActual,E_POINTER); if(!IsAligned(llPos) || !IsAligned(lLength) || !IsAligned((LONG_PTR) pBuffer)) { return VFW_E_BADALIGN; } CAsyncRequest request; HRESULT hr = request.Request(this, m_pStream, llPos, lLength, TRUE, pBuffer, pvContext, 0); if(FAILED(hr)) return hr; hr = request.Complete(); // return actual data length *pcbActual = request.GetActualLength(); return hr; } HRESULT CAsyncIo::Length(LONGLONG *pllTotal, LONGLONG *pllAvailable) { CheckPointer(pllTotal,E_POINTER); *pllTotal = m_pStream->Size(pllAvailable); return S_OK; } // cancel all items on the worklist onto the done list // and refuse further requests or further WaitForNext calls // until the end flush // // WaitForNext must return with NULL only if there are no successful requests. // So Flush does the following: // 1. set m_bFlushing ensures no more requests succeed // 2. move all items from work list to the done list. // 3. If there are any outstanding requests, then we need to release the // critsec to allow them to complete. The m_bWaiting as well as ensuring // that we are signalled when they are all done is also used to indicate // to WaitForNext that it should continue to block. // 4. Once all outstanding requests are complete, we force m_evDone set and // m_bFlushing set and m_bWaiting false. This ensures that WaitForNext will // not block when the done list is empty. HRESULT CAsyncIo::BeginFlush() { // hold the lock while emptying the work list { CAutoLock lock(&m_csLists); // prevent further requests being queued. // Also WaitForNext will refuse to block if this is set // unless m_bWaiting is also set which it will be when we release // the critsec if there are any outstanding). m_bFlushing = TRUE; CAsyncRequest * preq; while((preq = GetWorkItem()) != 0) { preq->Cancel(); PutDoneItem(preq); } // now wait for any outstanding requests to complete if(m_cItemsOut > 0) { // can be only one person waiting ASSERT(!m_bWaiting); // this tells the completion routine that we need to be // signalled via m_evAllDone when all outstanding items are // done. It also tells WaitForNext to continue blocking. m_bWaiting = TRUE; } else { // all done // force m_evDone set so that even if list is empty, // WaitForNext will not block // don't do this until we are sure that all // requests are on the done list. m_evDone.Set(); return S_OK; } } ASSERT(m_bWaiting); // wait without holding critsec for(;;) { m_evAllDone.Wait(); { // hold critsec to check CAutoLock lock(&m_csLists); if(m_cItemsOut == 0) { // now we are sure that all outstanding requests are on // the done list and no more will be accepted m_bWaiting = FALSE; // force m_evDone set so that even if list is empty, // WaitForNext will not block // don't do this until we are sure that all // requests are on the done list. m_evDone.Set(); return S_OK; } } } } // end a flushing state HRESULT CAsyncIo::EndFlush() { CAutoLock lock(&m_csLists); m_bFlushing = FALSE; ASSERT(!m_bWaiting); // m_evDone might have been set by BeginFlush - ensure it is // set IFF m_listDone is non-empty if(m_listDone.GetCount() > 0) { m_evDone.Set(); } else { m_evDone.Reset(); } return S_OK; } // start the thread HRESULT CAsyncIo::StartThread(void) { if(m_hThread) { return S_OK; } // clear the stop event before starting m_evStop.Reset(); DWORD dwThreadID; m_hThread = CreateThread(NULL, 0, InitialThreadProc, this, 0, &dwThreadID); if(!m_hThread) { DWORD dwErr = GetLastError(); return HRESULT_FROM_WIN32(dwErr); } return S_OK; } // stop the thread and close the handle HRESULT CAsyncIo::CloseThread(void) { // signal the thread-exit object m_evStop.Set(); if(m_hThread) { WaitForSingleObject(m_hThread, INFINITE); CloseHandle(m_hThread); m_hThread = NULL; } return S_OK; } // manage the list of requests. hold m_csLists and ensure // that the (manual reset) event hevList is set when things on // the list but reset when the list is empty. // returns null if list empty CAsyncRequest* CAsyncIo::GetWorkItem() { CAutoLock lck(&m_csLists); CAsyncRequest * preq = m_listWork.RemoveHead(); // force event set correctly if(m_listWork.GetCount() == 0) { m_evWork.Reset(); } return preq; } // get an item from the done list CAsyncRequest* CAsyncIo::GetDoneItem() { CAutoLock lock(&m_csLists); CAsyncRequest * preq = m_listDone.RemoveHead(); // force event set correctly if list now empty // or we're in the final stages of flushing // Note that during flushing the way it's supposed to work is that // everything is shoved on the Done list then the application is // supposed to pull until it gets nothing more // // Thus we should not set m_evDone unconditionally until everything // has moved to the done list which means we must wait until // cItemsOut is 0 (which is guaranteed by m_bWaiting being TRUE). if(m_listDone.GetCount() == 0 && (!m_bFlushing || m_bWaiting)) { m_evDone.Reset(); } return preq; } // put an item on the work list - fail if bFlushing HRESULT CAsyncIo::PutWorkItem(CAsyncRequest* pRequest) { CAutoLock lock(&m_csLists); HRESULT hr; if(m_bFlushing) { hr = VFW_E_WRONG_STATE; } else if(m_listWork.AddTail(pRequest)) { // event should now be in a set state - force this m_evWork.Set(); // start the thread now if not already started hr = StartThread(); } else { hr = E_OUTOFMEMORY; } return(hr); } // put an item on the done list - ok to do this when // flushing HRESULT CAsyncIo::PutDoneItem(CAsyncRequest* pRequest) { ASSERT(CritCheckIn(&m_csLists)); if(m_listDone.AddTail(pRequest)) { // event should now be in a set state - force this m_evDone.Set(); return S_OK; } else { return E_OUTOFMEMORY; } } // called on thread to process any active requests void CAsyncIo::ProcessRequests(void) { // lock to get the item and increment the outstanding count CAsyncRequest * preq = NULL; for(;;) { { CAutoLock lock(&m_csLists); preq = GetWorkItem(); if(preq == NULL) { // done return; } // one more item not on the done or work list m_cItemsOut++; // release critsec } preq->Complete(); // regain critsec to replace on done list { CAutoLock l(&m_csLists); PutDoneItem(preq); if(--m_cItemsOut == 0) { if(m_bWaiting) m_evAllDone.Set(); } } } } // the thread proc - assumes that DWORD thread param is the // this pointer DWORD CAsyncIo::ThreadProc(void) { HANDLE ahev[] = {m_evStop, m_evWork}; for(;;) { DWORD dw = WaitForMultipleObjects(2, ahev, FALSE, INFINITE); if(dw == WAIT_OBJECT_0+1) { // requests need processing ProcessRequests(); } else { // any error or stop event - we should exit return 0; } } } // perform a synchronous read request on this thread. // may not be aligned - so we will have to buffer. HRESULT CAsyncIo::SyncRead( LONGLONG llPos, LONG lLength, BYTE * pBuffer) { if(IsAligned(llPos) && IsAligned(lLength) && IsAligned((LONG_PTR) pBuffer)) { LONG cbUnused; return SyncReadAligned(llPos, lLength, pBuffer, &cbUnused, NULL); } // not aligned with requirements - use buffered file handle. //!!! might want to fix this to buffer the data ourselves? CAsyncRequest request; HRESULT hr = request.Request(this, m_pStream, llPos, lLength, FALSE, pBuffer, NULL, 0); if(FAILED(hr)) { return hr; } return request.Complete(); } // Return the alignment HRESULT CAsyncIo::Alignment(LONG *pAlignment) { CheckPointer(pAlignment,E_POINTER); *pAlignment = Alignment(); return S_OK; }