Stream handler DLLs create worker threads for each stream created. A worker thread, for example, would consist of code to request a buffer from the Sync/Stream Manager, fill it with data from a device and return it to the Sync/Stream Manager. This would continue until the end of stream or until some other kind of stream stop.
A worker thread does all the work of the stream handler itself. It is a good idea to create a worker thread for each stream instance. Basically, a worker thread either loops in the read routine or loops in the write routine. This depends on whether it is a source or target. If it is a source, the thread loops in the read routine and reads from its device using whatever commands it uses to interface with its device. It also interfaces with the Sync/Stream Manager requesting empty buffers and returning full buffers. On the other hand, when the DLL stream handler is the target, the thread will also be in a big loop, but it is performing different operations. It will be requesting full buffers from the Sync/Stream Manager and then consuming those buffers by passing them off to the device (in whatever way it communicates with its device).
#include <os2.h>
#include <os2me.h>
#include <hhpheap.h>
#include <shi.h>
PSIB psib; /* Stream Instance Block */
{ /* Start of FsshRead */
RC rc = NO_ERROR; /* local return code */
char NeedBuf; /* local loop Boolean */
LONG NumBytesIO; /* Number of bytes from mmio */
PARM_NOTIFY npget, npret; /* parms for SMH_NOTIFY calls */
SRCBUFTAB SrcBufTab = {0}; /* Source buffer table */
ULONG ulStopped = DO_SLEEP; /* did we get stop disc or flush */
BOOL bAtEOS = FALSE; /* End of Stream indicator */
ULONG ulPostCount; /* Temp to hold count */
/* Before we start lets do some init stuff: */
npget.ulFunction = SMH_NOTIFY;
npget.hid = psib->HandlerID;
npget.hstream = psib->hStream;
npget.ulGetNumEntries = 1L;
npget.ulRetNumEntries = 0L;
npget.pGetBufTab = &SrcBufTab;
npget.pRetBufTab = NULL;
npret.ulFunction = SMH_NOTIFY;
npret.hid = psib->HandlerID;
npret.hstream = psib->hStream;
npret.ulFlags = BUF_RETURNFULL;
npret.ulGetNumEntries = 0L;
npret.ulRetNumEntries = 1L;
npret.pGetBufTab = NULL;
npret.pRetBufTab = &SrcBufTab;
/* Wait until we get the ShcStart */
DosWaitEventSem(psib->hevStop, SEM_INDEFINITE_WAIT);
/* We will loop forever getting an empty buffer, calling the device to */
/* fill up the buffer, sending it to the consumer. During each */
/* iteration of the loop we will check the action flags for */
/* asynchronous requests to do things. */
if (psib->ulActionFlags & SIBACTFLG_KILL)
{ /* Must have been a create error */
rc = 1L;
} /* Must have been a create error */
/* Start the main loop */
while (!rc)
{ /* while no severe error */
if (psib->ulActionFlags)
rc = CheckNSleep(psib);
/*
* Get a buffer
*/
NeedBuf = TRUE;
while ((NeedBuf) && (!rc))
{ /* while we don't have a buffer */
/* Clear the stop sem, so if after we call ssm to get a buffer if */
/* it returns none avail then we won't miss a SSMBuffer Start */
/* before we go to sleep. */
DosResetEventSem(psib->hevStop, &ulPostCount);
npget.ulFlags = BUF_GETEMPTY;
rc = SMHEntryPoint(&npget); /* get a buffer */
if (!rc)
{
NeedBuf = FALSE;
/* make sure attribute is 0 so we don't pass around a bad value */
SrcBufTab.ulMessageParm = 0L;
}
else
{ /* return code from smhnotify */
if (rc == ERROR_BUFFER_NOT_AVAILABLE)
{ /* buffer not available */
/* the smhnotify resets the num entries to 0 when none avail */
npget.ulGetNumEntries = 1L;
ulStopped = DO_SLEEP;
rc = SleepNCheck(psib, &ulStopped);
} /* buffer not available */
} /* return code from smhnotify */
} /* while we don't have a buffer */
/* We have a buffer or an error */
if (!rc)
{ /* have a buffer - do the read */
NumBytesIO = mmioRead((HMMIO)psib->ulAssocP1,
(PCHAR)SrcBufTab.pBuffer,
(LONG)SrcBufTab.ulLength);
if (NumBytesIO == -1L)
{ /* an error */
SrcBufTab.ulLength = 0L;
/* get the real error code */
rc = mmioGetLastError((HMMIO)psib->ulAssocP1);
rc = ShIOError(psib, npret, rc);
} /* an error */
else
{ /* We have some data */
if (NumBytesIO != (LONG)SrcBufTab.ulLength)
{ /* End of stream */
npret.ulFlags |= BUF_EOS;
bAtEOS = TRUE;
DosResetEventSem(psib->hevStop, &ulPostCount);
SrcBufTab.ulLength = NumBytesIO;
} /* End of stream */
/* Send the data to the stream manager */
rc = SMHEntryPoint(&npret);
if (!rc)
{ /* data sent ok */
if (bAtEOS)
{
bAtEOS = FALSE;
ulStopped = DO_SLEEP;
rc = SleepNCheck(psib, &ulStopped);
}
} /* data sent ok */
} /* We have some data */
/* Clear the EOS if it was set. And attribute */
npret.ulFlags = BUF_RETURNFULL;
SrcBufTab.ulMessageParm = 0L;
} /* have a buffer - do the read */
} /* while no severe error */
/* We get here if an error has occurred or a kill has */
/* been sent. In the case of the kill, reset the */
/* return code to 0 (no error) and exit the thread. */
/* Otherwise, report the error event and exit the */
/* thread. */
if (psib->ulActionFlags & SIBACTFLG_KILL)
{
rc = 0L;
}
else
{
ReportEvent(psib,
rc, /* Return code */
EVENT_ERROR, /* event type */
0L, /* user info */
NONRECOVERABLE_ERROR); /* Severe Error */
}
/* Only set this flag when we no longer need access to the sib since */
/* Destroy may get control and Free the sib. */
psib->ulActionFlags |= SIBACTFLG_THREAD_DEAD;
return;
} /* End of FsshRead */