OpenMAXBellagio  0.9.3
omx_base_sink.c
Go to the documentation of this file.
1 
28 #include <omxcore.h>
29 #include <omx_base_sink.h>
30 
33  omx_base_sink_PrivateType* omx_base_sink_Private;
34 
35  if (openmaxStandComp->pComponentPrivate) {
36  omx_base_sink_Private = (omx_base_sink_PrivateType*)openmaxStandComp->pComponentPrivate;
37  } else {
38  omx_base_sink_Private = calloc(1,sizeof(omx_base_sink_PrivateType));
39  if (!omx_base_sink_Private) {
41  }
42  }
43 
44  // we could create our own port structures here
45  // fixme maybe the base class could use a "port factory" function pointer?
46  err = omx_base_component_Constructor(openmaxStandComp,cComponentName);
47 
48  /* here we can override whatever defaults the base_component constructor set
49  * e.g. we can override the function pointers in the private struct */
50  omx_base_sink_Private = openmaxStandComp->pComponentPrivate;
51 
53 
54  return err;
55 }
56 
58 {
59  return omx_base_component_Destructor(openmaxStandComp);
60 }
61 
67 void* omx_base_sink_BufferMgmtFunction (void* param) {
68  OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
69  omx_base_component_PrivateType* omx_base_component_Private = (omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
70  omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private;
71  omx_base_PortType *pInPort = (omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX];
72  tsem_t* pInputSem = pInPort->pBufferSem;
73  queue_t* pInputQueue = pInPort->pBufferQueue;
74  OMX_BUFFERHEADERTYPE* pInputBuffer = NULL;
75  OMX_COMPONENTTYPE* target_component;
76  OMX_BOOL isInputBufferNeeded = OMX_TRUE;
77  int inBufExchanged = 0;
78 
79 #if defined(__linux__)
80  omx_base_sink_Private->bellagioThreads->nThreadBufferMngtID = (long int)syscall(__NR_gettid);
81  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s the thread ID is %i\n", __func__, (int)omx_base_sink_Private->bellagioThreads->nThreadBufferMngtID);
82 #endif
83 
84  DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
85  while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting || omx_base_component_Private->state == OMX_StatePause ||
86  omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
87 
88  /*Wait till the ports are being flushed*/
89  pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
90  while( PORT_IS_BEING_FLUSHED(pInPort)) {
91  pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
92 
93  if(isInputBufferNeeded==OMX_FALSE) {
94  pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
95  inBufExchanged--;
96  pInputBuffer=NULL;
97  isInputBufferNeeded=OMX_TRUE;
98  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning input buffer\n");
99  }
100  DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
101 
102  tsem_up(omx_base_sink_Private->flush_all_condition);
103  tsem_down(omx_base_sink_Private->flush_condition);
104  pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
105  }
106  pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
107 
108  /*No buffer to process. So wait here*/
109  if((pInputSem->semval==0 && isInputBufferNeeded==OMX_TRUE ) &&
110  (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) {
111  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for input buffer \n");
112  tsem_down(omx_base_sink_Private->bMgmtSem);
113  }
114 
115  if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
116  DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
117  break;
118  }
119 
120  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for input buffer semval=%d in %s\n",pInputSem->semval, __func__);
121  if(pInputSem->semval>0 && isInputBufferNeeded==OMX_TRUE ) {
122  tsem_down(pInputSem);
123  if(pInputQueue->nelem>0){
124  inBufExchanged++;
125  isInputBufferNeeded=OMX_FALSE;
126  pInputBuffer = dequeue(pInputQueue);
127  if(pInputBuffer == NULL){
128  DEBUG(DEB_LEV_ERR, "Had NULL input buffer!!\n");
129  break;
130  }
131  }
132  }
133 
134  if(isInputBufferNeeded==OMX_FALSE) {
135  if((pInputBuffer->nFlags & OMX_BUFFERFLAG_EOS) ==OMX_BUFFERFLAG_EOS) {
136  DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in input buffer\n");
137 
138  (*(omx_base_component_Private->callbacks->EventHandler))
139  (openmaxStandComp,
140  omx_base_component_Private->callbackData,
141  OMX_EventBufferFlag, /* The command was completed */
142  0, /* The commands was a OMX_CommandStateSet */
143  pInputBuffer->nFlags, /* The state has been changed in message->messageParam2 */
144  NULL);
145  pInputBuffer->nFlags=0;
146  }
147 
148  target_component=(OMX_COMPONENTTYPE*)pInputBuffer->hMarkTargetComponent;
149  if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
150  /*Clear the mark and generate an event*/
151  (*(omx_base_component_Private->callbacks->EventHandler))
152  (openmaxStandComp,
153  omx_base_component_Private->callbackData,
154  OMX_EventMark, /* The command was completed */
155  1, /* The commands was a OMX_CommandStateSet */
156  0, /* The state has been changed in message->messageParam2 */
157  pInputBuffer->pMarkData);
158  } else if(pInputBuffer->hMarkTargetComponent!=NULL){
159  /*If this is not the target component then pass the mark*/
160  DEBUG(DEB_LEV_FULL_SEQ, "Can't Pass Mark. This is a Sink!!\n");
161  }
162 
163  if((omx_base_sink_Private->state == OMX_StateExecuting) || (omx_base_sink_Private->state == OMX_StateIdle)) {
164  if ((omx_base_sink_Private->BufferMgmtCallback && pInputBuffer->nFilledLen > 0)
165  || (pInputBuffer->nFlags)){
166  (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer);
167  }
168  else {
169  /*If no buffer management call back the explicitly consume input buffer*/
170  pInputBuffer->nFilledLen = 0;
171  }
172  } else {
173  DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%s) TrState (%s)\n",
174  __func__, stateName(omx_base_sink_Private->state),
175  transientStateName(omx_base_component_Private->transientState));
176  if(OMX_TransStateExecutingToIdle == omx_base_component_Private->transientState ||
177  OMX_TransStatePauseToIdle == omx_base_component_Private->transientState) {
178  pInputBuffer->nFilledLen = 0;
179  }
180  }
181  /*Input Buffer has been completely consumed. So, get new input buffer*/
182 
183  if(omx_base_sink_Private->state==OMX_StatePause && !PORT_IS_BEING_FLUSHED(pInPort)) {
184  /*Waiting at paused state*/
185  tsem_wait(omx_base_sink_Private->bStateSem);
186  }
187 
188  /*Input Buffer has been completely consumed. So, return input buffer*/
189  if(pInputBuffer->nFilledLen==0) {
190  pInPort->ReturnBufferFunction(pInPort,pInputBuffer);
191  inBufExchanged--;
192  pInputBuffer=NULL;
193  isInputBufferNeeded = OMX_TRUE;
194  }
195 
196  }
197  }
198  DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
199  return NULL;
200 }
201 
209  OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
210  omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
211  omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private;
212  omx_base_PortType *pInPort[2];
213  tsem_t* pInputSem[2];
214  queue_t* pInputQueue[2];
215  OMX_BUFFERHEADERTYPE* pInputBuffer[2];
216  OMX_COMPONENTTYPE* target_component;
217  OMX_BOOL isInputBufferNeeded[2];
218  int i,outBufExchanged[2];
219 
220  pInPort[0]=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX];
221  pInPort[1]=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX_1];
222  pInputSem[0] = pInPort[0]->pBufferSem;
223  pInputSem[1] = pInPort[1]->pBufferSem;
224  pInputQueue[0] = pInPort[0]->pBufferQueue;
225  pInputQueue[1] = pInPort[1]->pBufferQueue;
226  pInputBuffer[1]= pInputBuffer[0]=NULL;
227  isInputBufferNeeded[0]=isInputBufferNeeded[1]=OMX_TRUE;
228  outBufExchanged[0]=outBufExchanged[1]=0;
229 
230  DEBUG(DEB_LEV_FUNCTION_NAME, "In %s\n", __func__);
231  while(omx_base_sink_Private->state == OMX_StateIdle || omx_base_sink_Private->state == OMX_StateExecuting || omx_base_sink_Private->state == OMX_StatePause ||
232  omx_base_sink_Private->transientState == OMX_TransStateLoadedToIdle){
233 
234  /*Wait till the ports are being flushed*/
235  pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
236  while( PORT_IS_BEING_FLUSHED(pInPort[0]) ||
237  PORT_IS_BEING_FLUSHED(pInPort[1])) {
238  pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
239 
240  DEBUG(DEB_LEV_FULL_SEQ, "In %s 1 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
241  __func__,outBufExchanged[0],isInputBufferNeeded[0],outBufExchanged[1],isInputBufferNeeded[1],pInputSem[0]->semval,pInputSem[1]->semval);
242 
243  if(isInputBufferNeeded[1]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pInPort[1])) {
244  pInPort[1]->ReturnBufferFunction(pInPort[1],pInputBuffer[1]);
245  outBufExchanged[1]--;
246  pInputBuffer[1]=NULL;
247  isInputBufferNeeded[1]=OMX_TRUE;
248  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning Input 1 buffer\n");
249  }
250 
251  if(isInputBufferNeeded[0]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pInPort[0])) {
252  pInPort[0]->ReturnBufferFunction(pInPort[0],pInputBuffer[0]);
253  outBufExchanged[0]--;
254  pInputBuffer[0]=NULL;
255  isInputBufferNeeded[0]=OMX_TRUE;
256  DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning Input 0 buffer\n");
257  }
258 
259  DEBUG(DEB_LEV_FULL_SEQ, "In %s 2 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
260  __func__,outBufExchanged[0],isInputBufferNeeded[0],outBufExchanged[1],isInputBufferNeeded[1],pInputSem[0]->semval,pInputSem[1]->semval);
261 
262  tsem_up(omx_base_sink_Private->flush_all_condition);
263  tsem_down(omx_base_sink_Private->flush_condition);
264  pthread_mutex_lock(&omx_base_sink_Private->flush_mutex);
265  }
266  pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex);
267 
268  /*No buffer to process. So wait here*/
269  if((isInputBufferNeeded[0]==OMX_TRUE && pInputSem[0]->semval==0) &&
270  (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) {
271  //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
272  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next Input buffer 0\n");
273  tsem_down(omx_base_sink_Private->bMgmtSem);
274 
275  }
276  if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
277  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
278  break;
279  }
280  if((isInputBufferNeeded[1]==OMX_TRUE && pInputSem[1]->semval==0) &&
281  (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid) &&
282  !(PORT_IS_BEING_FLUSHED(pInPort[0]) || PORT_IS_BEING_FLUSHED(pInPort[1]))) {
283  //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
284  DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next Input buffer 1\n");
285  tsem_down(omx_base_sink_Private->bMgmtSem);
286 
287  }
288  if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) {
289  DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
290  break;
291  }
292 
293  DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for Input buffer 0 semval=%d \n",pInputSem[0]->semval);
294  if(pInputSem[0]->semval>0 && isInputBufferNeeded[0]==OMX_TRUE ) {
295  tsem_down(pInputSem[0]);
296  if(pInputQueue[0]->nelem>0){
297  outBufExchanged[0]++;
298  isInputBufferNeeded[0]=OMX_FALSE;
299  pInputBuffer[0] = dequeue(pInputQueue[0]);
300  if(pInputBuffer[0] == NULL){
301  DEBUG(DEB_LEV_ERR, "Had NULL Input buffer!!\n");
302  break;
303  }
304  }
305  }
306  /*When we have input buffer to process then get one Input buffer*/
307  if(pInputSem[1]->semval>0 && isInputBufferNeeded[1]==OMX_TRUE) {
308  tsem_down(pInputSem[1]);
309  DEBUG(DEB_LEV_FULL_SEQ, "Wait over for Input buffer 1 semval=%d \n",pInputSem[1]->semval);
310  if(pInputQueue[1]->nelem>0){
311  outBufExchanged[1]++;
312  isInputBufferNeeded[1]=OMX_FALSE;
313  pInputBuffer[1] = dequeue(pInputQueue[1]);
314  if(pInputBuffer[1] == NULL){
315  DEBUG(DEB_LEV_ERR, "Had NULL Input buffer!! op is=%d,iq=%d\n",pInputSem[1]->semval,pInputQueue[1]->nelem);
316  break;
317  }
318  }
319  }
320 
321  for(i=0;i < (omx_base_component_Private->sPortTypesParam[OMX_PortDomainAudio].nPorts +
322  omx_base_component_Private->sPortTypesParam[OMX_PortDomainVideo].nPorts +
323  omx_base_component_Private->sPortTypesParam[OMX_PortDomainImage].nPorts +
324  omx_base_component_Private->sPortTypesParam[OMX_PortDomainOther].nPorts);i++) {
325 
326  if(omx_base_sink_Private->ports[i]->sPortParam.eDomain != OMX_PortDomainOther){ /* clock ports are not to be processed */
327  /*Process Input buffer of Port i */
328  if(isInputBufferNeeded[i]==OMX_FALSE) {
329 
330  /*Pass the Mark to all outgoing buffers*/
331  if(omx_base_sink_Private->pMark.hMarkTargetComponent != NULL){
332  pInputBuffer[i]->hMarkTargetComponent = omx_base_sink_Private->pMark.hMarkTargetComponent;
333  pInputBuffer[i]->pMarkData = omx_base_sink_Private->pMark.pMarkData;
334  }
335 
336  target_component=(OMX_COMPONENTTYPE*)pInputBuffer[i]->hMarkTargetComponent;
337  if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
338  /*Clear the mark and generate an event*/
339  (*(omx_base_sink_Private->callbacks->EventHandler))
340  (openmaxStandComp,
341  omx_base_sink_Private->callbackData,
342  OMX_EventMark, /* The command was completed */
343  1, /* The commands was a OMX_CommandStateSet */
344  i, /* The state has been changed in message->messageParam2 */
345  pInputBuffer[i]->pMarkData);
346  } else if(pInputBuffer[i]->hMarkTargetComponent!=NULL){
347  /*If this is not the target component then pass the mark*/
348  //pInputBuffer[i]->pMarkData=NULL;
349  DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
350  }
351 
352  if(omx_base_sink_Private->state == OMX_StateExecuting) {
353  if (omx_base_sink_Private->BufferMgmtCallback && pInputBuffer[i]->nFilledLen > 0) {
354  (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer[i]);
355  } else {
356  /*If no buffer management call back then don't produce any Input buffer*/
357  pInputBuffer[i]->nFilledLen = 0;
358  }
359  } else {
360  DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_sink_Private->state);
361 
362  if(OMX_TransStateExecutingToIdle == omx_base_component_Private->transientState ||
363  OMX_TransStatePauseToIdle == omx_base_component_Private->transientState) {
364  pInputBuffer[i]->nFilledLen = 0;
365  }
366  }
367 
368  if((pInputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS && pInputBuffer[i]->nFilledLen==0) {
369  DEBUG(DEB_LEV_FULL_SEQ, "Detected EOS flags in input buffer filled len=%d\n", (int)pInputBuffer[i]->nFilledLen);
370  (*(omx_base_sink_Private->callbacks->EventHandler))
371  (openmaxStandComp,
372  omx_base_sink_Private->callbackData,
373  OMX_EventBufferFlag, /* The command was completed */
374  i, /* The commands was a OMX_CommandStateSet */
375  pInputBuffer[i]->nFlags, /* The state has been changed in message->messageParam2 */
376  NULL);
377  }
378  if(omx_base_sink_Private->state==OMX_StatePause && !(PORT_IS_BEING_FLUSHED(pInPort[0]) || PORT_IS_BEING_FLUSHED(pInPort[1]))) {
379  /*Waiting at paused state*/
380  tsem_wait(omx_base_component_Private->bStateSem);
381  }
382 
383  /*Input Buffer has been produced or EOS. So, return Input buffer and get new buffer*/
384  if(pInputBuffer[i]->nFilledLen ==0 || ((pInputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS)){
385  pInPort[i]->ReturnBufferFunction(pInPort[i],pInputBuffer[i]);
386  outBufExchanged[i]--;
387  pInputBuffer[i]=NULL;
388  isInputBufferNeeded[i]=OMX_TRUE;
389  }
390  }
391  }
392  }
393 
394  /*Clear the Mark*/
395  if(omx_base_sink_Private->pMark.hMarkTargetComponent != NULL){
396  omx_base_sink_Private->pMark.hMarkTargetComponent = NULL;
397  omx_base_sink_Private->pMark.pMarkData = NULL;
398  }
399  }
400  DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
401  return NULL;
402 }
#define OMX_BASE_SINK_INPUTPORT_INDEX
Definition: omx_base_sink.h:36
OMX_ERRORTYPE omx_base_sink_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
Definition: omx_base_sink.c:57
void tsem_wait(tsem_t *tsem)
Definition: tsemaphore.c:131
#define OMX_BASE_SINK_INPUTPORT_INDEX_1
Definition: omx_base_sink.h:38
void *(* BufferMgmtFunction)(void *param)
Definition: omx_base_sink.h:50
OMX_ERRORTYPE omx_base_component_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName)
The base constructor for the OpenMAX ST components.
#define DEB_LEV_SIMPLE_SEQ
#define PORT_IS_BEING_FLUSHED(pPort)
Definition: omx_base_port.h:39
char * stateName(OMX_STATETYPE state)
Definition: utils.c:29
void * omx_base_sink_BufferMgmtFunction(void *param)
Definition: omx_base_sink.c:67
#define DEBUG(n, fmt, args...)
queue_t * pBufferQueue
char * OMX_STRING
Definition: OMX_Types.h:206
OMX_BOOL
Definition: OMX_Types.h:189
#define DEB_LEV_ERR
OMX_ERRORTYPE(* EventHandler)(OMX_IN OMX_HANDLETYPE hComponent, OMX_IN OMX_PTR pAppData, OMX_IN OMX_EVENTTYPE eEvent, OMX_IN OMX_U32 nData1, OMX_IN OMX_U32 nData2, OMX_IN OMX_PTR pEventData)
Definition: OMX_Core.h:530
void tsem_up(tsem_t *tsem)
Definition: tsemaphore.c:110
void tsem_down(tsem_t *tsem)
Definition: tsemaphore.c:97
Definition: queue.h:43
#define OMX_BUFFERFLAG_EOS
Definition: OMX_Core.h:299
OMX_ERRORTYPE omx_base_component_Destructor(OMX_COMPONENTTYPE *openmaxStandComp)
The base destructor for ST OpenMAX components.
OMX_ERRORTYPE err
#define DEB_LEV_FULL_SEQ
OMX_HANDLETYPE hMarkTargetComponent
Definition: OMX_Core.h:417
OMX_PTR pComponentPrivate
#define DEB_LEV_FUNCTION_NAME
void * omx_base_sink_twoport_BufferMgmtFunction(void *param)
char * transientStateName(int state)
Definition: utils.c:55
OMX_ERRORTYPE(* ReturnBufferFunction)(omx_base_PortType *openmaxStandPort, OMX_BUFFERHEADERTYPE *pBuffer)
OMX_ERRORTYPE omx_base_sink_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName)
Definition: omx_base_sink.c:31
void * dequeue(queue_t *queue)
Definition: queue.c:122
OMX_PORT_PARAM_TYPE sPortTypesParam[4]
OMX_ERRORTYPE
Definition: OMX_Core.h:126

Generated for OpenMAX Bellagio rel. 0.9.3 by  doxygen 1.5.1
SourceForge.net Logo