ACQ2XX_API
CommandBuffer.cpp
Go to the documentation of this file.
00001 /*
00002  * CommandBuffer.cpp
00003  *
00004  *  Created on: Jul 27, 2010
00005  *      Author: pgm
00006  */
00007 
00008 
00009 #include <assert.h>
00010 #include <iostream>
00011 #include <map>
00012 #include <vector>
00013 #include <list>
00014 
00015 #include "local.h"
00016 
00017 
00018 #include <stdlib.h>
00019 #include <libgen.h>     /* dirname() */
00020 #include <unistd.h>
00021 #include <errno.h>
00022 
00023 #include <fcntl.h>
00024 #include <sys/mman.h>
00025 #include <sys/stat.h>
00026 #include <sys/types.h>
00027 #include <sys/stat.h>
00028 #include <sys/wait.h>
00029 
00030 #include "CommandBuffer.h"
00031 #include "iclient3.h"
00032 #include <sys/select.h>
00033 
00034 #include "AcqType.h"            // File
00035 using namespace std;
00036 
00037 class CommandBufferImpl : public CommandBuffer {
00038         list<char *> pool;
00039         list<char *> queue;
00040         int maxlen;
00041 
00042         unsigned discards;
00043         unsigned processed;
00044         unsigned no_data_waiting;
00045         unsigned blocked_on_read;
00046 
00047         unsigned hitide;
00048 
00049         int fd;
00050         fd_set inset;
00051         struct timeval timeout;
00052         enum POLICY { POL_LOWLAT, POL_NODROP } policy;
00053 
00054         char *getBuffer()
00055         /**< return from pool, else steal one from queue. */
00056         {
00057                 char *rc;
00058 
00059                 if (!pool.empty()){
00060                         rc = pool.front();
00061                         pool.pop_front();
00062                 }else{
00063                         assert(!queue.empty());
00064                         ++discards;
00065                         rc = queue.front();
00066                         queue.pop_front();
00067                 }
00068                 return rc;
00069         }
00070         bool inputDataAvailable() {
00071                 struct timeval timeout2 = timeout;
00072                 FD_SET(fd, &inset);
00073                 int rc = select(fd+1, &inset, 0, 0, &timeout2);
00074 
00075                 if (rc < 0){
00076                         perror("select failed");
00077                         _exit(errno);
00078                 }
00079                 return rc == 1 && FD_ISSET(fd, &inset);
00080         }
00081 
00082         void readLinePushQueue() {
00083                 char *lm_line = getBuffer();
00084                 int rc;
00085 
00086                 memset(lm_line, 0, maxlen);
00087 
00088                 if ((rc = read(fd, lm_line, maxlen-1)) > 0){
00089                         chomp(lm_line);
00090                         dbg(1, "\"%s\"", lm_line);
00091                         queue.push_back(lm_line);
00092                 }else if (rc < 0){
00093                         perror("command read error");
00094                         _exit(errno);
00095                 }else{
00096                         err("read returned 0");
00097                 }
00098         }
00099 public:
00100         CommandBufferImpl(const char* fname, int _maxlen = 80, int maxq=4) :
00101                 pool(), queue(), maxlen(_maxlen),
00102                 discards(0), processed(0),
00103                 no_data_waiting(0), blocked_on_read(0), hitide(0)
00104         {
00105                 memset(&timeout, 0, sizeof(timeout));
00106                 for (int ibuf = 0; ibuf < maxq; ++ibuf){
00107                         pool.push_back(new char [maxlen]);
00108                 }
00109                 if (maxq > 2){
00110                         policy = POL_NODROP;
00111                 }else{
00112                         policy = POL_LOWLAT;
00113                 }
00114 
00115                 fd = open(fname, O_RDWR);
00116                 if (fd < 0){
00117                         err("failed to open \"%s\"", fname);
00118                         perror("");
00119                         _exit(errno);
00120                 }
00121                 FD_ZERO(&inset);
00122 
00123                 dump(FN);
00124         }
00125         ~CommandBufferImpl() {
00126                 while(!queue.empty()){
00127                         delete [] queue.front();
00128                         queue.pop_front();
00129                 }
00130                 while (!pool.empty()){
00131                         delete [] pool.front();
00132                         pool.pop_front();
00133                 }
00134                 close(fd);
00135         }
00136 
00137         void getNext(char* ubuf, int _maxlen) {
00138                 bool nodata = true;
00139 
00140                 /* first, clear any waiting commands */
00141 
00142                 while(inputDataAvailable()){
00143                         nodata = false;
00144 
00145                         if (policy == POL_NODROP && pool.empty()){
00146                                 break;
00147                         }else{
00148                                 readLinePushQueue();
00149                         }
00150                 }
00151                 /* if we don't have a command, block until one shows */
00152                 if (queue.empty()){
00153                         readLinePushQueue();
00154                         blocked_on_read++;
00155                 }
00156 
00157                 if (queue.size() > hitide){
00158                         hitide = queue.size();
00159                 }
00160 
00161                 /* pop the next command in queue and output it */
00162                 char *cmd = queue.front();
00163                 queue.pop_front();
00164                 strncpy(ubuf, cmd, _maxlen);
00165 
00166                 dbg(1, "p:%d q:%d \"%s\"", pool.size(), queue.size(), cmd);
00167 
00168                 pool.push_back(cmd);
00169                 ++processed;
00170 
00171 
00172                 if (nodata){
00173                         ++no_data_waiting;
00174                 }
00175         }
00176         void dump(const char *user) {
00177                 char buf[80];
00178                 char *cp;
00179                 const char *root = "";
00180 
00181                 if ((cp = getenv("ACQ_DEMUX_COMMAND_LOG")) == 0){
00182                         sprintf(buf, "acq_demux.%d.log", getpid());
00183                         cp = buf;
00184                         root = "/dev/shm";
00185                 }
00186 
00187                 File log(root, cp, "w");
00188                 fprintf(log.getFp(),
00189                         "q:%d hitide:%d max:%d proc:%u/%u disc:%u "
00190                         "no_data_waiting:%u blocked_on_read %u: %s\n",
00191                         queue.size(), hitide, maxlen,
00192                         processed, discards+processed,
00193                         discards,
00194                         no_data_waiting, blocked_on_read, user);
00195         }
00196         virtual int writeBack(char* ubuf){
00197                 dbg(1, "\"%s\"", ubuf);
00198                 return write(fd, ubuf, strlen(ubuf));
00199         }
00200         virtual bool hasDataAvailable(void) {
00201                 return !queue.empty() || inputDataAvailable();
00202         }
00203 };
00204 
00205 class CommandBufferDbg: public CommandBuffer {
00206         FILE *fp;
00207 public:
00208         CommandBufferDbg(const char *fname){
00209                 fp = fopen(fname, "r");
00210                 assert(fp != 0);
00211         }
00212         virtual ~CommandBufferDbg() {
00213                 fclose(fp);
00214         }
00215         void getNext(char* ubuf, int _maxlen) {
00216                 if (fgets(ubuf, _maxlen, fp) == 0){
00217                         info("quitting time!");
00218                         _exit(0);
00219                 }
00220         }
00221 
00222         virtual bool hasDataAvailable(void) {
00223                 return true;
00224         }
00225 };
00226 CommandBuffer* CommandBuffer::create(
00227                         const char* fname, int _maxlen, int maxq)
00228 {
00229         if (strncmp(fname, "/dev/", 5)){
00230                 return new CommandBufferDbg(fname);
00231         }else{
00232                 return new CommandBufferImpl(fname, _maxlen, maxq);
00233         }
00234 }
00235