quartz: Add a function that can be called when stopping processing data.
diff --git a/dlls/quartz/avisplit.c b/dlls/quartz/avisplit.c
index 77ebc9a..e09adb7 100644
--- a/dlls/quartz/avisplit.c
+++ b/dlls/quartz/avisplit.c
@@ -1080,7 +1080,7 @@
     This->streams = NULL;
     This->oldindex = NULL;
 
-    hr = Parser_Create(&(This->Parser), &AVISplitter_Vtbl, &CLSID_AviSplitter, AVISplitter_Sample, AVISplitter_QueryAccept, AVISplitter_InputPin_PreConnect, AVISplitter_Cleanup, AVISplitter_Disconnect, NULL, NULL, NULL, NULL);
+    hr = Parser_Create(&(This->Parser), &AVISplitter_Vtbl, &CLSID_AviSplitter, AVISplitter_Sample, AVISplitter_QueryAccept, AVISplitter_InputPin_PreConnect, AVISplitter_Cleanup, AVISplitter_Disconnect, NULL, NULL, NULL, NULL, NULL);
 
     if (FAILED(hr))
         return hr;
diff --git a/dlls/quartz/mpegsplit.c b/dlls/quartz/mpegsplit.c
index 1b88739..71027e9 100644
--- a/dlls/quartz/mpegsplit.c
+++ b/dlls/quartz/mpegsplit.c
@@ -830,7 +830,7 @@
     }
     This->seek_entries = 64;
 
-    hr = Parser_Create(&(This->Parser), &MPEGSplitter_Vtbl, &CLSID_MPEG1Splitter, MPEGSplitter_process_sample, MPEGSplitter_query_accept, MPEGSplitter_pre_connect, MPEGSplitter_cleanup, MPEGSplitter_disconnect, MPEGSplitter_first_request, NULL, MPEGSplitter_seek, NULL);
+    hr = Parser_Create(&(This->Parser), &MPEGSplitter_Vtbl, &CLSID_MPEG1Splitter, MPEGSplitter_process_sample, MPEGSplitter_query_accept, MPEGSplitter_pre_connect, MPEGSplitter_cleanup, MPEGSplitter_disconnect, MPEGSplitter_first_request, NULL, NULL, MPEGSplitter_seek, NULL);
     if (FAILED(hr))
     {
         CoTaskMemFree(This);
diff --git a/dlls/quartz/parser.c b/dlls/quartz/parser.c
index 7a7f9ba..f9140ae 100644
--- a/dlls/quartz/parser.c
+++ b/dlls/quartz/parser.c
@@ -52,7 +52,7 @@
 }
 
 
-HRESULT Parser_Create(ParserImpl* pParser, const IBaseFilterVtbl *Parser_Vtbl, const CLSID* pClsid, PFN_PROCESS_SAMPLE fnProcessSample, PFN_QUERY_ACCEPT fnQueryAccept, PFN_PRE_CONNECT fnPreConnect, PFN_CLEANUP fnCleanup, PFN_DISCONNECT fnDisconnect, REQUESTPROC fnRequest, CHANGEPROC stop, CHANGEPROC current, CHANGEPROC rate)
+HRESULT Parser_Create(ParserImpl* pParser, const IBaseFilterVtbl *Parser_Vtbl, const CLSID* pClsid, PFN_PROCESS_SAMPLE fnProcessSample, PFN_QUERY_ACCEPT fnQueryAccept, PFN_PRE_CONNECT fnPreConnect, PFN_CLEANUP fnCleanup, PFN_DISCONNECT fnDisconnect, REQUESTPROC fnRequest, STOPPROCESSPROC fnDone, CHANGEPROC stop, CHANGEPROC current, CHANGEPROC rate)
 {
     HRESULT hr;
     PIN_INFO piInput;
@@ -67,8 +67,8 @@
     pParser->pClock = NULL;
     pParser->fnDisconnect = fnDisconnect;
     ZeroMemory(&pParser->filterInfo, sizeof(FILTER_INFO));
-
     pParser->lastpinchange = GetTickCount();
+
     pParser->cStreams = 0;
     pParser->ppPins = CoTaskMemAlloc(1 * sizeof(IPin *));
 
@@ -89,7 +89,7 @@
     MediaSeekingImpl_Init((IBaseFilter*)pParser, stop, current, rate, &pParser->mediaSeeking, &pParser->csFilter);
     pParser->mediaSeeking.lpVtbl = &Parser_Seeking_Vtbl;
 
-    hr = PullPin_Construct(&Parser_InputPin_Vtbl, &piInput, fnProcessSample, (LPVOID)pParser, fnQueryAccept, fnCleanup, fnRequest, &pParser->csFilter, (IPin **)&pParser->pInputPin);
+    hr = PullPin_Construct(&Parser_InputPin_Vtbl, &piInput, fnProcessSample, (LPVOID)pParser, fnQueryAccept, fnCleanup, fnRequest, fnDone, &pParser->csFilter, (IPin **)&pParser->pInputPin);
 
     if (SUCCEEDED(hr))
     {
@@ -150,6 +150,10 @@
 void Parser_Destroy(ParserImpl *This)
 {
     IPin *connected = NULL;
+    ULONG pinref;
+
+    assert(!This->refCount);
+    PullPin_WaitForStateChange(This->pInputPin, INFINITE);
 
     if (This->pClock)
         IReferenceClock_Release(This->pClock);
@@ -158,11 +162,21 @@
     IPin_ConnectedTo((IPin *)This->pInputPin, &connected);
     if (connected)
     {
-        IPin_Disconnect(connected);
+        assert(IPin_Disconnect(connected) == S_OK);
         IPin_Release(connected);
-        IPin_Disconnect((IPin *)This->pInputPin);
+        assert(IPin_Disconnect((IPin *)This->pInputPin) == S_OK);
     }
-    IPin_Release((IPin *)This->pInputPin);
+    pinref = IPin_Release((IPin *)This->pInputPin);
+    if (pinref)
+    {
+        /* Valgrind could find this, if I kill it here */
+        ERR("pinref should be null, is %u, destroying anyway\n", pinref);
+        assert((LONG)pinref > 0);
+
+        while (pinref)
+            pinref = IPin_Release((IPin *)This->pInputPin);
+    }
+
     CoTaskMemFree(This->ppPins);
     This->lpVtbl = NULL;
 
@@ -378,6 +392,7 @@
 
 /** IBaseFilter implementation **/
 
+/* FIXME: WRONG */
 static HRESULT Parser_GetPin(IBaseFilter *iface, ULONG pos, IPin **pin, DWORD *lastsynctick)
 {
     ParserImpl *This = (ParserImpl *)iface;
@@ -464,7 +479,7 @@
     This->ppPins = CoTaskMemAlloc((This->cStreams + 2) * sizeof(IPin *));
     memcpy(This->ppPins, ppOldPins, (This->cStreams + 1) * sizeof(IPin *));
 
-    hr = OutputPin_Construct(&Parser_OutputPin_Vtbl, sizeof(Parser_OutputPin), piOutput, props, NULL, Parser_OutputPin_QueryAccept, &This->csFilter, This->ppPins + This->cStreams + 1);
+    hr = OutputPin_Construct(&Parser_OutputPin_Vtbl, sizeof(Parser_OutputPin), piOutput, props, NULL, Parser_OutputPin_QueryAccept, &This->csFilter, This->ppPins + (This->cStreams + 1));
 
     if (SUCCEEDED(hr))
     {
@@ -494,22 +509,25 @@
 static HRESULT Parser_RemoveOutputPins(ParserImpl * This)
 {
     /* NOTE: should be in critical section when calling this function */
-
+    HRESULT hr;
     ULONG i;
     IPin ** ppOldPins = This->ppPins;
 
+    TRACE("(%p)\n", This);
+
     /* reduce the pin array down to 1 (just our input pin) */
     This->ppPins = CoTaskMemAlloc(sizeof(IPin *) * 1);
     memcpy(This->ppPins, ppOldPins, sizeof(IPin *) * 1);
 
     for (i = 0; i < This->cStreams; i++)
     {
-        OutputPin_DeliverDisconnect((OutputPin *)ppOldPins[i + 1]);
+        hr = OutputPin_DeliverDisconnect((OutputPin *)ppOldPins[i + 1]);
+        FIXME("Other side: %08x\n", hr);
         IPin_Release(ppOldPins[i + 1]);
     }
 
-    This->cStreams = 0;
     This->lastpinchange = GetTickCount();
+    This->cStreams = 0;
     CoTaskMemFree(ppOldPins);
 
     return S_OK;
@@ -613,7 +631,7 @@
     ULONG refCount = InterlockedDecrement(&This->pin.pin.refCount);
     
     TRACE("(%p)->() Release from %d\n", iface, refCount + 1);
-    
+
     if (!refCount)
     {
         FreeMediaType(This->pmt);
@@ -687,27 +705,31 @@
 static HRESULT WINAPI Parser_PullPin_Disconnect(IPin * iface)
 {
     HRESULT hr;
-    IPinImpl *This = (IPinImpl *)iface;
+    PullPin *This = (PullPin *)iface;
 
     TRACE("()\n");
 
-    EnterCriticalSection(This->pCritSec);
+    EnterCriticalSection(&This->thread_lock);
+    EnterCriticalSection(This->pin.pCritSec);
     {
-        if (This->pConnectedTo)
+        if (This->pin.pConnectedTo)
         {
             PullPin *ppin = (PullPin *)This;
             FILTER_STATE state;
-            ParserImpl *Parser = (ParserImpl *)This->pinInfo.pFilter;
+            ParserImpl *Parser = (ParserImpl *)This->pin.pinInfo.pFilter;
 
-            hr = IBaseFilter_GetState(This->pinInfo.pFilter, 0, &state);
+            LeaveCriticalSection(This->pin.pCritSec);
+            hr = IBaseFilter_GetState(This->pin.pinInfo.pFilter, INFINITE, &state);
+            EnterCriticalSection(This->pin.pCritSec);
 
             if (SUCCEEDED(hr) && (state == State_Stopped) && SUCCEEDED(Parser->fnDisconnect(Parser)))
             {
-                IPin_Release(This->pConnectedTo);
-                This->pConnectedTo = NULL;
+                IPin_Release(This->pin.pConnectedTo);
+                This->pin.pConnectedTo = NULL;
+
                 if (FAILED(hr = IMemAllocator_Decommit(ppin->pAlloc)))
                     ERR("Allocator decommit failed with error %x. Possible memory leak\n", hr);
-                hr = Parser_RemoveOutputPins((ParserImpl *)This->pinInfo.pFilter);
+                hr = Parser_RemoveOutputPins((ParserImpl *)This->pin.pinInfo.pFilter);
             }
             else
                 hr = VFW_E_NOT_STOPPED;
@@ -715,8 +737,9 @@
         else
             hr = S_FALSE;
     }
-    LeaveCriticalSection(This->pCritSec);
-    
+    LeaveCriticalSection(This->pin.pCritSec);
+    LeaveCriticalSection(&This->thread_lock);
+
     return hr;
 }
 
diff --git a/dlls/quartz/parser.h b/dlls/quartz/parser.h
index f30a8fd..b542388 100644
--- a/dlls/quartz/parser.h
+++ b/dlls/quartz/parser.h
@@ -58,7 +58,7 @@
 extern HRESULT Parser_AddPin(ParserImpl * This, const PIN_INFO * piOutput, ALLOCATOR_PROPERTIES * props, const AM_MEDIA_TYPE * amt);
 
 extern HRESULT Parser_Create(ParserImpl*, const IBaseFilterVtbl *, const CLSID*, PFN_PROCESS_SAMPLE, PFN_QUERY_ACCEPT, PFN_PRE_CONNECT,
-                             PFN_CLEANUP, PFN_DISCONNECT, REQUESTPROC, CHANGEPROC stop, CHANGEPROC current, CHANGEPROC rate);
+                             PFN_CLEANUP, PFN_DISCONNECT, REQUESTPROC, STOPPROCESSPROC, CHANGEPROC stop, CHANGEPROC current, CHANGEPROC rate);
 
 /* Override the _Release function and call this when releasing */
 extern void Parser_Destroy(ParserImpl *This);
diff --git a/dlls/quartz/pin.c b/dlls/quartz/pin.c
index 6bbcf9a..803a751 100644
--- a/dlls/quartz/pin.c
+++ b/dlls/quartz/pin.c
@@ -1205,7 +1205,7 @@
 
 
 static HRESULT PullPin_Init(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData,
-                            QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, LPCRITICAL_SECTION pCritSec, PullPin * pPinImpl)
+                            QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, STOPPROCESSPROC pDone, LPCRITICAL_SECTION pCritSec, PullPin * pPinImpl)
 {
     /* Common attributes */
     pPinImpl->pin.lpVtbl = PullPin_Vtbl;
@@ -1220,6 +1220,7 @@
     /* Input pin attributes */
     pPinImpl->fnSampleProc = pSampleProc;
     pPinImpl->fnCleanProc = pCleanUp;
+    pPinImpl->fnDone = pDone;
     pPinImpl->fnPreConnect = NULL;
     pPinImpl->pAlloc = NULL;
     pPinImpl->pReader = NULL;
@@ -1241,7 +1242,7 @@
     return S_OK;
 }
 
-HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, LPCRITICAL_SECTION pCritSec, IPin ** ppPin)
+HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, STOPPROCESSPROC pDone, LPCRITICAL_SECTION pCritSec, IPin ** ppPin)
 {
     PullPin * pPinImpl;
 
@@ -1258,7 +1259,7 @@
     if (!pPinImpl)
         return E_OUTOFMEMORY;
 
-    if (SUCCEEDED(PullPin_Init(PullPin_Vtbl, pPinInfo, pSampleProc, pUserData, pQueryAccept, pCleanUp, pCustomRequest, pCritSec, pPinImpl)))
+    if (SUCCEEDED(PullPin_Init(PullPin_Vtbl, pPinInfo, pSampleProc, pUserData, pQueryAccept, pCleanUp, pCustomRequest, pDone, pCritSec, pPinImpl)))
     {
         *ppPin = (IPin *)(&pPinImpl->pin.lpVtbl);
         return S_OK;
@@ -1381,6 +1382,9 @@
 
     if (!refCount)
     {
+        WaitForSingleObject(This->hEventStateChanged, INFINITE);
+        assert(!This->hThread);
+
         if(This->pAlloc)
             IMemAllocator_Release(This->pAlloc);
         if(This->pReader)
@@ -1565,6 +1569,8 @@
      * Flush remaining samples
      */
     PullPin_Flush(This);
+    if (This->fnDone)
+        This->fnDone(This->pin.pUserData);
 
     TRACE("End: %08x, %d\n", hr, This->stop_playback);
 }
diff --git a/dlls/quartz/pin.h b/dlls/quartz/pin.h
index 147a64c..80b439d 100644
--- a/dlls/quartz/pin.h
+++ b/dlls/quartz/pin.h
@@ -59,6 +59,11 @@
  */
 typedef HRESULT (* REQUESTPROC) (LPVOID userdata);
 
+/* This function is called after processing is done (for whatever reason that is caused)
+ * This is useful if you create processing threads that need to die
+ */
+typedef HRESULT (* STOPPROCESSPROC) (LPVOID userdata);
+
 #define ALIGNDOWN(value,boundary) ((value)/(boundary)*(boundary))
 #define ALIGNUP(value,boundary) (ALIGNDOWN((value)+(boundary)-1, (boundary)))
 
@@ -115,6 +120,7 @@
 	PRECONNECTPROC fnPreConnect;
 	REQUESTPROC fnCustomRequest;
 	CLEANUPPROC fnCleanProc;
+	STOPPROCESSPROC fnDone;
 	double dRate;
 	BOOL stop_playback;
 	DWORD cbAlign;
@@ -138,7 +144,7 @@
 /*** Constructors ***/
 HRESULT InputPin_Construct(const IPinVtbl *InputPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PUSH pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, LPCRITICAL_SECTION pCritSec, IPin ** ppPin);
 HRESULT OutputPin_Construct(const IPinVtbl *OutputPin_Vtbl, long outputpin_size, const PIN_INFO * pPinInfo, ALLOCATOR_PROPERTIES *props, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, LPCRITICAL_SECTION pCritSec, IPin ** ppPin);
-HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, REQUESTPROC pCustomRequest, LPCRITICAL_SECTION pCritSec, IPin ** ppPin);
+HRESULT PullPin_Construct(const IPinVtbl *PullPin_Vtbl, const PIN_INFO * pPinInfo, SAMPLEPROC_PULL pSampleProc, LPVOID pUserData, QUERYACCEPTPROC pQueryAccept, CLEANUPPROC pCleanUp, STOPPROCESSPROC, REQUESTPROC pCustomRequest, LPCRITICAL_SECTION pCritSec, IPin ** ppPin);
 
 /**************************/
 /*** Pin Implementation ***/
diff --git a/dlls/quartz/waveparser.c b/dlls/quartz/waveparser.c
index 8e3767e..3818214 100644
--- a/dlls/quartz/waveparser.c
+++ b/dlls/quartz/waveparser.c
@@ -459,7 +459,7 @@
 
     This->pCurrentSample = NULL;
 
-    hr = Parser_Create(&(This->Parser), &WAVEParser_Vtbl, &CLSID_WAVEParser, WAVEParser_Sample, WAVEParser_QueryAccept, WAVEParser_InputPin_PreConnect, WAVEParser_Cleanup, WAVEParser_disconnect, WAVEParser_first_request, NULL, WAVEParserImpl_seek, NULL);
+    hr = Parser_Create(&(This->Parser), &WAVEParser_Vtbl, &CLSID_WAVEParser, WAVEParser_Sample, WAVEParser_QueryAccept, WAVEParser_InputPin_PreConnect, WAVEParser_Cleanup, WAVEParser_disconnect, WAVEParser_first_request, NULL, NULL, WAVEParserImpl_seek, NULL);
 
     if (FAILED(hr))
         return hr;