]> git.cworth.org Git - vogl/blob - src/common/channelmgr.cpp
Initial vogl checkin
[vogl] / src / common / channelmgr.cpp
1 /**************************************************************************
2  *
3  * Copyright 2013-2014 RAD Game Tools and Valve Software
4  * All Rights Reserved.
5  *
6  * Permission is hereby granted, free of charge, to any person obtaining a copy
7  * of this software and associated documentation files (the "Software"), to deal
8  * in the Software without restriction, including without limitation the rights
9  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10  * copies of the Software, and to permit persons to whom the Software is
11  * furnished to do so, subject to the following conditions:
12  *
13  * The above copyright notice and this permission notice shall be included in
14  * all copies or substantial portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22  * THE SOFTWARE.
23  *
24  **************************************************************************/
25
26
27
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <unistd.h>
31 #include <errno.h>
32 #include <string.h>
33 #include <sys/types.h>
34 #include <stdint.h>
35 #include <time.h>
36
37 #include "channel.h"
38 #include "mtqueue.h"
39
40 #include <pthread.h>
41
42 #if defined(_DEBUG) || defined(DEBUG)
43 #define DEBUG_PRINT(fmt, args...)    fprintf( stderr, fmt, ## args )
44 #else
45 #define DEBUG_PRINT(...) /* Don't do anything in release builds */
46 #endif
47
48 void *RecvThreadLoop(void *arg);
49 void *ReqRespRecvThreadLoop(void *arg);
50 void *SendThreadLoop(void *arg);
51 void *ReqRespSendThreadLoop(void *arg);
52
53 using namespace network;
54
55
56 channelmgr::channelmgr()
57 {
58
59     m_threadRecv = (pthread_t) 0;
60     m_threadReqRespRecv = (pthread_t) 0;
61     m_threadSend = (pthread_t) 0;
62     m_threadReqRespSend = (pthread_t) 0;
63     m_basePort = 0;
64     m_fTerminate = false;
65     m_fServer = false;
66     m_fLocal = false;
67     memset(m_server, '\0', sizeof(m_server));
68     m_errno = 0;
69     m_recvCallbackfn = NULL;
70     m_reqCallbackfn = NULL;
71     m_recvCallbackParam = NULL;
72     m_reqCallbackParam = NULL;
73     m_pReqRespChannel = NULL;
74     m_pSendChannel = NULL;
75     m_pRecvChannel = NULL;
76
77     m_pSendQueue = NULL;
78     m_pReqRespSendQueue = NULL;
79
80 }
81 channelmgr::~channelmgr()
82 {
83     Disconnect();
84 }
85
86 CHEC 
87 channelmgr::Connect(char *szServer, int port, char serviceMask, FnReadCallbackPtr recvCallbackfn, void *recvCallbackParam)
88 {
89     CHEC ec = EC_NONE;
90
91     //  int portReqResp = port;
92     //  int portSend = port+1;  //  Needs to match portRecv in channelmgr::Accept
93     //  int portRecv = port+2;  //  Needs to match portSend in channelmgr::Accept
94
95     int err = 0;
96
97     //
98     //  Need to connect to the three different ports on the server, if requested
99
100     if (0 == (serviceMask & (REQRESP|RECVASYNC|SENDASYNC)))
101     {
102         //  No service was requested?
103
104         return EC_NOTALLOWED;
105     }
106
107     m_fServer = false;
108     m_basePort = port;
109     strncpy(m_server, szServer, sizeof(m_server)-1);
110
111     m_fReadyReqResp = true;
112     if (serviceMask & REQRESP)
113     {
114         m_fReadyReqResp = false;
115         //  spin up a thread to handle the recv channel
116         err = pthread_create(&m_threadReqRespSend, NULL, ReqRespSendThreadLoop, (void *)this);
117         if (0 != err)
118         {
119             m_errno = errno;
120             ec = EC_SYSTEM;
121             goto out;
122         }
123     }
124
125     m_fReadySendAsync = true;
126     if (serviceMask & SENDASYNC)
127     {
128         m_fReadySendAsync = false;
129
130         //  spin up a thread to handle the recv channel
131         err = pthread_create(&m_threadSend, NULL, SendThreadLoop, (void *)this);
132         if (0 != err)
133         {
134             m_errno = errno;
135             ec = EC_SYSTEM;
136             goto out;
137         }
138
139     }
140
141     m_fReadyRecvAsync = true;
142     if (serviceMask & RECVASYNC)
143     {    
144         m_fReadyRecvAsync = false;
145         m_recvCallbackfn = recvCallbackfn;
146         m_recvCallbackParam = recvCallbackParam;
147
148         //  spin up a thread to handle the recv channel
149         err = pthread_create(&m_threadRecv, NULL, RecvThreadLoop, (void *)this);
150         if (0 != err)
151         {
152             m_errno = errno;
153             ec = EC_SYSTEM;
154             goto out;
155         }
156     }
157
158     //  Wait for threads to spin up and signal that they're ready.
159     //
160     //  Just loop for it.
161     while ((false == m_fReadyRecvAsync) || (false == m_fReadySendAsync) || (false == m_fReadyReqResp));
162
163
164 out:
165     if (EC_NONE != ec)
166     {
167         // Cleanup
168         Disconnect();
169     }
170
171     return ec;
172 }
173
174 CHEC 
175 channelmgr::Accept(int port, char serviceMask, bool fLocal, FnReadCallbackPtr readcallbackfn, void *readCallbackParam, FnRRCallbackPtr reqCallbackfn, void *reqCallbackParam)
176 {
177     CHEC ec = EC_NONE;
178
179     //  int portReqResp = port;
180     //  int portSend = port+2;  //  Needs to match portRecv in channelmgr::Connect
181     //  int portRecv = port+1;  //  Needs to match portSend in channelmgr::Connect
182
183     int err = 0;
184
185     if (0 == (serviceMask & (REQRESP|RECVASYNC|SENDASYNC)))
186     {
187         //  No service was requested?
188
189         return EC_NOTALLOWED;
190     }
191
192     m_fServer = true;
193     m_basePort = port;
194     m_fLocal = fLocal;
195
196     m_fReadyReqResp = true;
197     if (serviceMask & REQRESP)
198     {
199         //  spin up a thread to handle the reqresp channel
200         m_fReadyReqResp = false;
201
202         m_reqCallbackfn = reqCallbackfn;
203         m_reqCallbackParam = reqCallbackParam;
204
205         err = pthread_create(&m_threadReqRespRecv, NULL, ReqRespRecvThreadLoop, (void *)this);
206         if (0 != err)
207         {
208             ec = EC_SYSTEM;
209             m_errno = errno;
210             goto out;
211         }
212     }
213
214     m_fReadyRecvAsync = true;
215     if (serviceMask & RECVASYNC)
216     {
217         m_fReadyRecvAsync = false;
218         //  spin up a thread to handle the recv channel
219         m_recvCallbackfn = readcallbackfn;
220         m_recvCallbackParam = readCallbackParam;
221
222         err = pthread_create(&m_threadRecv, NULL, RecvThreadLoop, (void *)this);
223         if (0 != err)
224         {
225             ec = EC_SYSTEM;
226             m_errno = errno;
227             goto out;
228         }
229     }
230
231     m_fReadySendAsync = true;
232     if (serviceMask & SENDASYNC)
233     {
234         m_fReadySendAsync = false;
235
236         //  spin up a thread to handle the recv channel
237         err = pthread_create(&m_threadSend, NULL, SendThreadLoop, (void *)this);
238         if (0 != err)
239         {
240             m_errno = errno;
241             ec = EC_SYSTEM;
242             goto out;
243         }
244     }
245
246     //  Wait for threads to spin up and signal that they're ready.
247     //
248     //  Just loop for it.
249     while ((false == m_fReadyRecvAsync) || (false == m_fReadySendAsync) || (false == m_fReadyReqResp));
250
251 out:
252     if (EC_NONE != ec)
253     {
254         // Cleanup
255         Disconnect();
256     }
257
258     return ec;
259 }
260
261 CHEC 
262 channelmgr::SendData(unsigned int cbData, char *pbData, FnReadCallbackPtr respCallbackfn, void *respCallbackParam)
263 {
264     CHEC ec = EC_NONE;
265     queues::MTQ_CODE mtqec = MTQ_NONE;
266
267     //  Don't allow this call if we accepted connections (i.e. this is the "server" side of things)
268     if (true == m_fServer)
269         return EC_NOTALLOWED;
270
271     mtqec = m_pReqRespSendQueue->Enqueue(cbData, pbData, 500, 2);
272     if (MTQ_NONE != mtqec)
273     {
274         if (MTQ_FULL == mtqec)
275         {
276             DEBUG_PRINT("%s:%d %s Unable to send Request (QFull) (error = %x, %d)\n", __FILE__, __LINE__, __func__, mtqec, errno);
277             ec = EC_NETWORK;
278         }
279         else
280         {
281             DEBUG_PRINT("%s:%d %s Unable to send Request (error = %x, %d)\n", __FILE__, __LINE__, __func__, mtqec, errno);
282         }
283         goto out;
284     }
285
286     ec = m_pReqRespChannel->ReadMsg(&cbData, &pbData, 2, 0);
287     if (EC_NONE != ec)
288     {
289         DEBUG_PRINT("%s:%d %s Unable to read response (error = %x)\n", __FILE__, __LINE__, __func__, ec);
290         goto out;
291     }   
292
293     (*respCallbackfn)(respCallbackParam, cbData, pbData);
294
295     if (NULL != pbData)
296         free(pbData);
297
298 out:
299     return ec;
300 }
301
302 CHEC 
303 channelmgr::SendData(unsigned int cbData, char *pbData)
304 {
305     CHEC ec = EC_NONE;
306     queues::MTQ_CODE mtqec = MTQ_NONE;
307
308     mtqec = m_pSendQueue->Enqueue(cbData, pbData, 500, 2);
309     if (MTQ_NONE != mtqec)
310     {
311         if (MTQ_FULL == mtqec)
312         {
313             DEBUG_PRINT("%s:%d %s Unable to send (QFull) (error = %x, %d)\n", __FILE__, __LINE__, __func__, mtqec, errno);
314             ec = EC_NETWORK;
315         }
316         else
317         {
318             DEBUG_PRINT("%s:%d %s Unable to send (error = %x, %d)\n", __FILE__, __LINE__, __func__, mtqec, errno);
319         }
320         goto out;
321     }
322
323 out:
324     return ec;
325 }
326
327 bool 
328 channelmgr::HasError(int *perrno)
329 {
330     if (NULL != perrno)
331     {
332         *perrno = m_errno;
333     }
334     
335     return (0 != m_errno);
336 }
337
338 void 
339 channelmgr::Disconnect()
340 {
341
342     m_fTerminate = true;
343
344     //  Wait for the recvthread to notice and clean itself up
345     if (m_threadRecv)
346         pthread_join(m_threadRecv, NULL);
347
348     if (m_threadSend)
349         pthread_join(m_threadSend, NULL);
350
351     if (m_threadReqRespRecv)
352         pthread_join(m_threadReqRespRecv, NULL);
353
354     if (m_threadReqRespSend)
355         pthread_join(m_threadReqRespSend, NULL);
356
357     //  Clean up all remaining channels
358     if (NULL != m_pReqRespChannel)
359     {
360         m_pReqRespChannel->Disconnect();
361         delete m_pReqRespChannel;
362         m_pReqRespChannel = NULL;
363     }
364
365     if (NULL != m_pSendChannel)
366     {
367         m_pSendChannel->Disconnect();
368         delete m_pSendChannel;
369         m_pSendChannel = NULL;
370     }
371
372     if (NULL != m_pRecvChannel)
373     {
374         m_pRecvChannel->Disconnect();
375         delete m_pRecvChannel;
376         m_pRecvChannel = NULL;
377     }
378
379     if (m_pSendQueue)
380     {
381         delete m_pSendQueue;
382         m_pSendQueue = NULL;
383     }
384
385     if (m_pReqRespSendQueue)
386     {
387         delete m_pReqRespSendQueue;
388         m_pReqRespSendQueue = NULL;
389     }
390
391     m_threadRecv = (pthread_t) 0;
392     m_threadReqRespRecv= (pthread_t) 0;
393     m_basePort = 0;
394     m_fTerminate = false;
395     m_fServer = false;
396     m_fLocal = false;
397     memset(m_server, '\0', sizeof(m_server));
398     m_errno = 0;
399     m_recvCallbackfn = NULL;
400     m_reqCallbackfn = NULL;
401     m_recvCallbackParam = NULL;
402     m_reqCallbackParam = NULL;
403
404     return;
405 }
406
407
408 //
409 //  Handle the recv thread
410 //
411
412 void *RecvThreadLoop(void *arg)
413 {
414     channelmgr *pChannelMgr = (channelmgr *) arg;
415
416     pChannelMgr->DriveRecvLoop();
417
418     return NULL;
419 }
420
421 void 
422 channelmgr::DriveRecvLoop()
423 {
424     CHEC ec = EC_NONE;
425     int err = 0;
426     unsigned int cbData = 0;
427     char *pbData = NULL;
428     pthread_t thisThread = pthread_self();
429     int portRecv;
430
431     //  Set this thread's name:
432     err = pthread_setname_np(thisThread, "VOGLRecvThd");
433     if (0 != err)
434     {
435         DEBUG_PRINT("%s:%d %s Unable to set the name of the thread. returned %d, errno %d\n", __FILE__, __LINE__, __func__, err, errno);
436     }
437
438     m_pRecvChannel = new channel();
439     if (NULL == m_pRecvChannel)
440     {
441         //  Unable to allocate a new channel
442         ec = EC_MEMORY;
443         DEBUG_PRINT("%s:%d %s Unable to allocate new channel for Recv - OOM?\n", __FILE__, __LINE__, __func__);
444         goto out;
445     }
446
447     if (m_fServer)
448     {
449         portRecv = m_basePort+1;  //  Needs to match portSend in channelmgr::Connect
450         ec = m_pRecvChannel->Connect(portRecv, 0, m_fLocal);
451         if (EC_NONE != ec)
452         {
453             DEBUG_PRINT("%s:%d %s Failed to accept on port %d for Recv. returned %d, errno %d\n", __FILE__, __LINE__, __func__, portRecv, ec, errno);
454             goto out;
455         }
456     }
457     else
458     {
459         portRecv = m_basePort+2;  //  Needs to match portSend in channelmgr::Accept 
460         ec = m_pRecvChannel->Connect(m_server, portRecv, 100, 500); // loop 100 times, waiting 500MS between attempts
461         if (EC_NONE != ec)
462         {
463             DEBUG_PRINT("%s:%d %s Failed to connect to %s on port %d for Recv. returned %d, errno %d\n", __FILE__, __LINE__, __func__, m_server, portRecv, ec, errno);
464             goto out;
465         }
466     }
467
468     m_fReadyRecvAsync = true;
469
470     while (!m_fTerminate)
471     {
472
473         ec = m_pRecvChannel->ReadMsg(&cbData, &pbData, 5, 100); // 500
474         //if (EC_TIMEOUT == ec)
475         //{
476             //  Timeouts are not really errors
477         //    continue;
478         //}
479
480         if (EC_NONE != ec)
481         {
482             //  Keep going on the server side.  We'll accept new connections as they come along.
483             if (m_fServer)
484             {
485                 m_pRecvChannel->Disconnect();
486                 continue;
487             }
488
489             //  On the client side, we need to just keep going.
490             //DEBUG_PRINT("%s:%d %s Error reading message (error = %x, %d)\n", __FILE__, __LINE__, __func__, ec, errno);
491             continue;
492         }
493
494         (*m_recvCallbackfn)(m_recvCallbackParam, cbData, pbData);
495
496         if (NULL != pbData)
497             free(pbData);
498
499         cbData = 0;
500         pbData = NULL;
501     }
502 out:
503     return;
504 }
505
506
507 //
508 //  Handle the Async Send thread
509 //
510
511 void *SendThreadLoop(void *arg)
512 {
513     channelmgr *pChannelMgr = (channelmgr *) arg;
514
515     pChannelMgr->DriveSendLoop();
516
517     return NULL;
518 }
519
520 void
521 channelmgr::DriveSendLoop()
522 {
523     CHEC ec = EC_NONE;
524     queues::MTQ_CODE mtqec = MTQ_NONE;
525     int err = 0;
526     bool fmessageSent;
527     unsigned int message_size = 0;
528     char *message = NULL;
529     pthread_t thisThread = pthread_self();
530     int portSend;
531
532     //  Set this thread's name:
533     err = pthread_setname_np(thisThread, "VOGLSendThd");
534     if (0 != err)
535     {
536         DEBUG_PRINT("%s:%d %s Unable to set the name of the thread. returned %d, errno %d\n", __FILE__, __LINE__, __func__, err, errno);
537     }
538
539     //
540     //  Generate the sendQ to hold the messages.
541     m_pSendQueue = new queues::MtQueue();
542     if (NULL == m_pSendQueue)
543     {
544         //  Unable to allocate a new send queue
545         ec = EC_MEMORY;
546         goto out;
547     }
548
549     mtqec = m_pSendQueue->Initialize(1000); // Not sure how large this should be.  This is a guess.
550     if (MTQ_NONE != mtqec)
551     {
552         ec = EC_MEMORY;
553         goto out;
554     }
555
556     m_pSendChannel = new channel();
557     if (NULL == m_pSendChannel)
558     {
559         //  Unable to allocate a new channel
560         ec = EC_MEMORY;
561         goto out;
562     }
563
564     if (m_fServer)
565     {
566         portSend = m_basePort+2;  //  Needs to match portRecv in channelmgr::Connect
567         ec = m_pSendChannel->Connect(portSend, 0, m_fLocal);
568         if (EC_NONE != ec)
569         {
570             DEBUG_PRINT("%s:%d %s Failed to accept on port %d for Send. returned %d, errno %d\n", __FILE__, __LINE__, __func__, portSend, ec, errno);
571             goto out;
572         }
573     }
574     else
575     {
576         portSend = m_basePort+1;  //  Needs to match portRecv in channelmgr::Accept
577         ec = m_pSendChannel->Connect(m_server, portSend, 100, 500); // loop 100 times, waiting 500MS between attempts
578         if (EC_NONE != ec)
579         {
580             DEBUG_PRINT("%s:%d %s Failed to connect to %s on port %d for Send. returned %d, errno %d\n", __FILE__, __LINE__, __func__, m_server, portSend, ec, errno);
581             goto out;
582         }
583     }
584
585     m_fReadySendAsync = true;
586     fmessageSent = true;
587     while (!m_fTerminate)
588     {
589         if (fmessageSent && message)
590         {
591             free(message);
592             message = NULL;
593             message_size = 0;
594         }
595
596         if (fmessageSent)
597         {
598             //  if we sent the last message dequeued, get a new one to send.
599             mtqec = m_pSendQueue->Dequeue(&message_size, &message, 20, 5);
600             if (MTQ_NONE != mtqec)
601             {
602                 //  Just keep going round until you get a message to send.
603                 continue;
604             }
605             DEBUG_PRINT("%s:%d %s DQ'd message to send (size = %d)\n", __FILE__, __LINE__, __func__, message_size);
606             fmessageSent = false; // we have a new message to send
607         }
608         ec = this->SendDataInt(message_size, message);
609         if (EC_NONE != ec)
610         {
611             DEBUG_PRINT("%s:%d %s Unable to send message (error = %x)\n", __FILE__, __LINE__, __func__, ec);
612             continue;
613         }
614         DEBUG_PRINT("%s:%d %s sent message (size = %d)\n", __FILE__, __LINE__, __func__, message_size);
615         fmessageSent = true; // the message was sent successfully
616     }
617
618     if (message)
619         free(message);
620
621
622 out:
623     return;
624 }
625
626
627 CHEC
628 channelmgr::SendDataInt(unsigned int cbData, char *pbData)
629 {
630     CHEC ec = EC_NONE;
631
632     ec = m_pSendChannel->WriteMsg(cbData, pbData, 2, 500);
633     if (EC_NONE != ec)
634     {
635         DEBUG_PRINT("%s:%d %s Unable to send (error = %x, %d)\n", __FILE__, __LINE__, __func__, ec, errno);
636         goto out;
637     }
638
639     DEBUG_PRINT("%s:%d %s sent(internal) message (size = %d)\n", __FILE__, __LINE__, __func__, cbData);
640
641 out:
642     return ec;
643 }
644
645 //
646 //  Handle the reqresp thread
647 //
648
649 void *ReqRespRecvThreadLoop(void *arg)
650 {
651     channelmgr *pChannelMgr = (channelmgr *) arg;
652
653     pChannelMgr->DriveReqRespRecvLoop();
654
655     return NULL;
656 }
657
658 void 
659 channelmgr::DriveReqRespRecvLoop()
660 {
661     CHEC ec = EC_NONE;
662     int err = 0;
663     unsigned int cbReq = 0, cbResp = 0;
664     char *pbReq = NULL, *pbResp = NULL;
665     pthread_t thisThread = pthread_self();
666     int portReqResp = m_basePort;
667
668
669     //  Set this thread's name:
670     err = pthread_setname_np(thisThread, "VOGLRRThd");
671     if (0 != err)
672     {
673         DEBUG_PRINT("%s:%d %s Unable to set the name of the thread. returned %d, errno %d\n", __FILE__, __LINE__, __func__, err, errno);
674     }
675
676     m_pReqRespChannel = new channel();
677     if (NULL == m_pReqRespChannel)
678     {
679         //  Unable to allocate a new channel
680         ec = EC_MEMORY;
681         goto out;
682     }
683
684     //  This thread should only be running when m_fServer is true.
685     ec = m_pReqRespChannel->Connect(portReqResp, 5, m_fLocal);
686     if (EC_NONE != ec)
687     {
688         DEBUG_PRINT("%s:%d %s Failed to accept on port %d for ReqResp. returned %d, errno %d\n", __FILE__, __LINE__, __func__, portReqResp, ec, errno);
689         goto out;
690     }
691
692     m_fReadyReqResp = true;
693
694     while (!m_fTerminate)
695     {
696         if (NULL != pbReq)
697             free(pbReq);
698         cbReq = 0;
699         pbReq = NULL;
700
701         if (NULL != pbResp)
702             free(pbResp);
703         cbResp = 0;
704         pbResp = NULL;
705
706         ec = m_pReqRespChannel->ReadMsg(&cbReq, &pbReq, 5, 100); //100
707         if (EC_TIMEOUT == ec)
708         {
709             //  Timeouts are not really errors
710             continue;
711         }
712
713         if (EC_NONE != ec)
714         {
715             DEBUG_PRINT("%s:%d %s Error reading request (error = %x)\n", __FILE__, __LINE__, __func__, ec);
716             continue;
717         }
718
719         (*m_reqCallbackfn)(m_reqCallbackParam, cbReq, pbReq, &cbResp, &pbResp);
720
721         ec = m_pReqRespChannel->WriteMsg(cbResp, pbResp, 5, 0); //100
722         if (EC_TIMEOUT == ec)
723         {
724             //  Timeouts are not really errors
725             continue;
726         }
727
728         if (EC_NONE != ec)
729         {
730             DEBUG_PRINT("%s:%d %s Unable to write response (error = %x)\n", __FILE__, __LINE__, __func__, ec);
731             continue;
732         }        
733
734     }
735     if (NULL != pbReq)
736         free(pbReq);
737     if (NULL != pbResp)
738         free(pbResp);
739
740 out:
741     return;
742 }
743
744 //
745 //  Handle the Async Send thread
746 //
747
748 void *ReqRespSendThreadLoop(void *arg)
749 {
750     channelmgr *pChannelMgr = (channelmgr *) arg;
751
752     pChannelMgr->DriveReqRespSendLoop();
753
754     return NULL;
755 }
756
757 void
758 channelmgr::DriveReqRespSendLoop()
759 {
760     CHEC ec = EC_NONE;
761     queues::MTQ_CODE mtqec = MTQ_NONE;
762     int err = 0;
763     unsigned int message_size = 0;
764     char *message = NULL;
765     pthread_t thisThread = pthread_self();
766     int portReqResp = m_basePort;
767
768     //  Set this thread's name:
769     err = pthread_setname_np(thisThread, "VOGLRRSndThd");
770     if (0 != err)
771     {
772         DEBUG_PRINT("%s:%d %s Unable to set the name of the thread. returned %d, errno %d\n", __FILE__, __LINE__, __func__, err, errno);
773     }
774
775     //
776     //  Generate the sendQ to hold the messages.
777     m_pReqRespSendQueue = new queues::MtQueue();
778     if (NULL == m_pReqRespSendQueue)
779     {
780         //  Unable to allocate a new send queue
781         ec = EC_MEMORY;
782         goto out;
783     }
784
785     mtqec = m_pReqRespSendQueue->Initialize(1000); // Not sure how large this should be.  This is a guess.
786     if (MTQ_NONE != mtqec)
787     {
788         ec = EC_MEMORY;
789         goto out;
790     }
791
792     m_pReqRespChannel = new channel();
793     if (NULL == m_pReqRespChannel)
794     {
795         //  Unable to allocate a new channel
796         ec = EC_MEMORY;
797         goto out;
798     }
799
800     ec = m_pReqRespChannel->Connect(m_server, portReqResp, 100, 500); // loop 100 times, waiting 500MS between attempts
801     if (EC_NONE != ec)
802     {
803         goto out;
804     }
805     m_fReadyReqResp = true;
806
807     while (!m_fTerminate)
808     {
809         if (message)
810             free(message);
811         message = NULL;
812         message_size = 0;
813
814         mtqec = m_pReqRespSendQueue->Dequeue(&message_size, &message, 20, 5);
815         if (MTQ_NONE != mtqec)
816         {
817             //  Just keep going round until you get a message to send.
818             continue;
819         }
820
821         ec = this->SendDataRRInt(message_size, message);
822         if (EC_NONE != ec)
823         {
824             DEBUG_PRINT("%s:%d %s Unable to send message (error = %x)\n", __FILE__, __LINE__, __func__, ec);
825             continue;
826         }
827     }
828
829     if (message)
830         free(message);
831
832 out:
833     return;
834 }
835
836
837 CHEC
838 channelmgr::SendDataRRInt(unsigned int cbData, char *pbData)
839 {
840     CHEC ec = EC_NONE;
841
842     //  Don't allow this call if we accepted connections (i.e. this is the "server" side of things)
843     if (true == m_fServer)
844         return EC_NOTALLOWED;
845
846     ec = m_pReqRespChannel->WriteMsg(cbData, pbData, 2, 0);
847     if (EC_NONE != ec)
848     {
849         DEBUG_PRINT("%s:%d %s Unable to send request (error = %x, %d)\n", __FILE__, __LINE__, __func__, ec, errno);
850         goto out;
851     }
852
853 out:
854     return ec;
855 }
856