ACQ2XX_API
ProcessController.cpp
Go to the documentation of this file.
00001 /*
00002  * ProcessController.cpp
00003  *
00004  *  Created on: Feb 12, 2011
00005  *      Author: pgm
00006  */
00007 
00008 
00009 #include <assert.h>
00010 #include <iostream>
00011 #include <map>
00012 #include <vector>
00013 #include <set>
00014 
00015 
00016 #include <stdlib.h>
00017 #include <libgen.h>     /* dirname() */
00018 #include <unistd.h>
00019 #include <errno.h>
00020 
00021 #include <fcntl.h>
00022 #include <sys/mman.h>
00023 #include <sys/stat.h>
00024 #include <sys/types.h>
00025 #include <sys/stat.h>
00026 #include <sys/wait.h>
00027 
00028 
00029 #include "popt.h"
00030 
00031 #include "ProcessController.h"
00032 #include "acq_demux.h"
00033 
00034 
00035 #define LM_FEED         "/dev/acq200/data.dma/tbstat2"
00036 #define LM_LINE_SZ      32
00037 
00038 #define TBLOCK_LEN      0x600000        /* @@todo read the knob */
00039 int MAXQ = 16;
00040 
00041 
00042 #include "local.h"
00043 
00044 #include "AcqType.h"
00045 #include "AcqDataModel.h"
00046 
00047 #include "CommandBuffer.h"
00048 #include "Timer.h"
00049 #include "WorkingDir.h"
00050 
00051 extern "C" {
00052         extern int acq200_debug;
00053 };
00054 
00055 
00056 
00057 #define _ACQ_DEMUX_TBLOCK_ROOT  "/dev/acq200/tblocks/%s"
00058 #define ACQ_DEMUX_TBLOCK_ROOT \
00059         (getenv("ACQ_DEMUX_TBLOCK_ROOT")? \
00060          getenv("ACQ_DEMUX_TBLOCK_ROOT"): _ACQ_DEMUX_TBLOCK_ROOT)
00061 
00062 ProcessController::ProcessController(AcqDataModel& _dataModel) :
00063         dataModel(_dataModel),
00064         use_fork(true), nice2(0), euid(0),
00065         src_fmt(ACQ_DEMUX_TBLOCK_ROOT),
00066         show_exit_level(1)
00067 {
00068         int n2;
00069         if ((n2 = getenvInt("ACQ_DEMUX_NICE2", 0)) != 0){
00070                 if (n2 < 0){
00071                         use_fork = false;
00072                 }else{
00073                         use_fork = true;
00074                         nice2 = n2;
00075                 }
00076         }
00077         euid = getenvInt("ACQ_DEMUX_EUID", euid);
00078 }
00079 
00080 int ProcessController::flen(const char *fname) {
00081         struct stat statbuf;
00082         int rc = stat(fname, &statbuf);
00083         if (rc == 0){
00084                 return statbuf.st_size;
00085         }else{
00086                 cerr << "stat() failed on file " << fname << endl;
00087                 perror("");
00088                 exit(-1);
00089                 return -1;
00090         }
00091 }
00092 
00093 
00094 
00095 void addLiveCal(AcqDataModel& dataModel)
00096 {
00097 #ifdef __arm
00098         static bool acq_cal_has_been_set;
00099 
00100         if (!acq_cal_has_been_set){
00101                 system("get.vin >/tmp/local.vin");
00102                 dataModel.setAcqCal(
00103                                 AcqCal::create(dataModel.getAcqType(), "/tmp/local.vin"));
00104                 acq_cal_has_been_set = true;
00105         }
00106 #else
00107 #warning "feature not available in x86 (it could be..)"
00108 #endif
00109 }
00110 
00111 void ProcessController::addCal(const char *rawfname)
00112 {
00113         char cal_file[128];
00114         struct stat stat_buf;
00115         sprintf(cal_file, "%s.vin", rawfname);
00116 
00117         if (stat(cal_file, &stat_buf) == 0){
00118                 dataModel.setAcqCal(
00119                         AcqCal::create(dataModel.getAcqType(), cal_file));
00120         }
00121 }
00122 
00123 unsigned ValidatorData::getSampleOffset(unsigned offset_in_block)
00124 {
00125         if (msecs_at_esoff == 0){
00126                 return pss + offset_in_block;
00127         }else{
00128                 pc.dataModel.setWallClockPolicy(
00129                         AcqDataModel::wallclock_policy == WCP_TIMED_AT_EVENT?
00130                                         msecs_at_esoff: msecs_at_tblock_start);
00131                 //@@todo .. NB DR is an issue
00132                 return offset_in_block;
00133         }
00134 }
00135 
00136 void ValidatorData::adjust(int samples){
00137         if (!adjusted){
00138                 dbg(1, "pss:%d adj:%d new:%d msecs_at_esoff:%d",
00139                                 pss, samples, pss+samples,
00140                                 msecs_at_esoff);
00141 
00142                 if (msecs_at_esoff){
00143                         if (AcqDataModel::wallclock_policy ==
00144                                                         WCP_TIMED_AT_EVENT){
00145                                 return;
00146                         }
00147                         int dms = (int)(samples*Clock::sample_clock_ns/1000000);
00148                         msecs_at_esoff += dms;
00149                         msecs_at_tblock_start += dms;
00150                         pc.dataModel.setWallClockPolicy(msecs_at_tblock_start);
00151                 }else{
00152                         pss += samples;
00153                 }
00154                 adjusted = true;
00155         }
00156 }
00157 ValidatorData::ValidatorData(ProcessController& _pc) :
00158                 pc(_pc),
00159                 pss(0),  adjusted(0),
00160                 msecs_at_esoff(0), msecs_at_tblock_start(0),
00161                 tblockN(0), esoff(0), esoff2(0),
00162                 tblock(0), nblocks(0) {
00163         /* Java is so much better .. */
00164         blocks123[0] = blocks123[1] = blocks123[2] = 0;
00165 }
00166 
00167 ValidatorData* ValidatorData::create(ProcessController& _pc, char* input_line)
00168 /** @factory */
00169 {
00170 /*
00171  * tblock=030,031,999 pss=17399808 esoff=0x0006c000,0x00372000 ecount=2,2,0 msec=17308 tblockN=177
00172  */
00173 
00174         ValidatorData tv(_pc);
00175         const int NCONV = 11;
00176         int nconv;
00177 
00178         if ((nconv = sscanf(input_line,
00179         "tblock=%d,%d,%d pss=%u esoff=%x,%x ecount=%d,%d,%d msec=%u tblockN=%u",
00180                 tv.blocks123+0, tv.blocks123+1, tv.blocks123+2,
00181                 &tv.pss, &tv.esoff, &tv.esoff2,
00182                 tv.ecount+0, tv.ecount+1, tv.ecount+2,
00183                 &tv.msecs_at_esoff,
00184                 &tv.tblockN)) == NCONV){
00185 
00186                 AcqDataModel::wallclock_policy = WCP_TIMED_AT_EVENT;
00187 
00188                 unsigned offsam =
00189                         tv.esoff/tv.pc.dataModel.getAcqType().getSampleSize();
00190                 unsigned delta_ms = (unsigned)
00191                                 (offsam * Clock::sample_clock_ns /1000000);
00192 
00193                 if (tv.msecs_at_esoff >= delta_ms){
00194                         tv.msecs_at_tblock_start =
00195                                         tv.msecs_at_esoff -     delta_ms;
00196                 }else{
00197                         err("esoff*clock greater than msecs");
00198                 }
00199                 tv.tblock = tv.blocks123[1];
00200 /*
00201                 for (int ib = 0; ib <= 2; ++ib){
00202                         tv.nblocks += tv.blocks123[ib] != 999;
00203                 }
00204 */
00205                 tv.nblocks = 3;
00206                 if (tv.esoff2 == 0){
00207                         tv.esoff2 = tv.esoff;
00208                 }
00209                 ValidatorData* vdata = new ValidatorData(_pc);
00210                 memcpy(vdata, &tv, sizeof(tv));
00211 
00212                 return vdata;
00213         }
00214         err("validation failed %d conversions out of %d", nconv, NCONV);
00215         return 0;
00216 }
00217 
00218 
00219 
00220 class InputValidator {
00221 public:
00222         virtual bool isValidInput(char* input_line) {
00223                 return false;
00224         }
00225         virtual ~InputValidator() {}
00226 
00227         static InputValidator instance;
00228 };
00229 
00230 InputValidator InputValidator::instance;
00231 
00232 class LiveFeedProcessController : public ProcessController {
00233 
00234 protected:
00235         int line_sz;
00236         const char* feed;
00237         int maxq;
00238 
00239         InputValidator *inputValidator;
00240 public:
00241         LiveFeedProcessController(
00242                         AcqDataModel& _dataModel,
00243                         const char* _feed,
00244                         int _line_sz = LM_LINE_SZ) :
00245                 ProcessController(_dataModel),
00246                 line_sz(_line_sz),
00247                 feed(_feed)
00248         {
00249                 inputValidator = &InputValidator::instance;
00250                 maxq = MAXQ;
00251         }
00252         virtual void run(poptContext& opt_context);
00253 };
00254 
00255 void ProcessController::processAction(void *pdata, int len, int tblock)
00256 {
00257         dbg(2, "pdata %p len %d", pdata, len);
00258 
00259         dataModel.processRaw(pdata, len);
00260         dbg(2, "99");
00261 }
00262 
00263 
00264 
00265 void ProcessController::processRaw(const char *rawfname, int tblock) {
00266         dbg(1, "rawfname:\"%s\"", rawfname);
00267         int fd = open(rawfname, O_RDONLY);
00268         int len = flen(rawfname);
00269         void *pdata;
00270 
00271         if (fd <= 0) {
00272                 cerr << "Failed to open file:" << rawfname << endl;
00273                 perror("");
00274                 exit(-1);
00275         }
00276 
00277         pdata = mmap(0, len, PROT_READ, MAP_PRIVATE, fd, 0);
00278 
00279         dbg(1, "fname %s len %d pdata %p", rawfname, len, pdata);
00280 
00281         if (pdata == MAP_FAILED) {
00282                 cerr << "Failed to map file:" << rawfname << " len:" << len
00283                                 << endl;
00284                 perror("");
00285                 exit(-1);
00286         }
00287 
00288 #ifdef __arm
00289         addCal(dataModel, rawfname);
00290 #endif
00291         dataModel.setDataSourceName(rawfname);
00292 
00293         processAction(pdata, len, tblock);
00294 
00295         munmap(pdata, len);
00296         close(fd);
00297 }
00298 
00299 void ProcessController::processRawFiles(const char* tbdef)
00300 {
00301         if (strstr(tbdef, ",") == 0){
00302                 processRaw(tbdef);
00303         }else{
00304                 char buf[128];
00305                 char *str;
00306                 char *tp;
00307                 char *tbid;
00308 
00309 
00310                 dbg(1, "files:\"%s\"", tbdef);
00311 
00312                 strncpy(buf, tbdef, sizeof(buf)-1);
00313 
00314                 for(str = buf; (tbid = strtok_r(str, ",", &tp)); str = NULL){
00315                         char tbname[80];
00316                         sprintf(tbname, src_fmt, tbid);
00317                         dbg(2, "tbid:\"%s\" tbname:\"%s\"", tbid, tbname);
00318                         processRaw(tbname, strtol(tbid, 0, 10));
00319                 }
00320         }
00321 }
00322 
00323 
00324 int ProcessController::processTblock(const char* tbid){
00325         dbg(1, "01");
00326 
00327         if (use_fork){
00328                 /* we fork() to do the process - now we don't need to worry about
00329                  * any cleanup/memory leak in dataModel!
00330                  */
00331                 int pid;
00332 
00333                 if ((pid = fork()) == 0){
00334                         if (nice2){
00335                                 nice(nice2);
00336                         }
00337                         if (euid){
00338                                 setegid((gid_t)(euid+1));/* D-TACQ convention */
00339                                 seteuid((uid_t)(euid));
00340                         }
00341 
00342                         processRawFiles(tbid);
00343 
00344                         int rc = dump();
00345                         dbg(1, "exit(%d)", rc);
00346                         exit(rc);
00347                         return rc;              /* doesn't happen */
00348                 }else{
00349                         dbg(1, "40 pid %d", pid);
00350                         int status;
00351                         Timer timer;
00352                         wait(&status);
00353 
00354                         if (WIFEXITED(status)){
00355                                 parentDump();
00356                                 dbg(show_exit_level,
00357                                     "90: child status %d ret %d time:%.2f s",
00358                                     status, WEXITSTATUS(status),
00359                                     timer.timeFromStart());
00360                                 dbg(1, "\n\n");
00361                                 return WEXITSTATUS(status);
00362                         }else{
00363                                 err("CHILD PROCESS FAILED %d", status);
00364                                 return 0;
00365                         }
00366                 }
00367         }else{
00368                 processRawFiles(tbid);
00369 
00370                 int rc = dump();
00371                 parentDump();
00372                 dataModel.clear();
00373                 return rc;
00374         }
00375         dbg(1, "99");
00376 }
00377 
00378 static bool isComment(char *myline)
00379 {
00380         return myline[0] == '\n' || myline[0] == '#';
00381 }
00382 
00383 bool acq_state_idle(void)
00384 {
00385         FILE* fp = fopen("/dev/dtacq/state", "r");
00386         if (fp == 0) return false;                      /* on HOST? */
00387 
00388         char state[16];
00389 
00390         if (fgets(state, 16, fp)){
00391                 return atoi(state) == 0;
00392         }
00393 
00394         return false;
00395 }
00396 
00397 void LiveFeedProcessController::run(poptContext& opt_context)
00398 {
00399         dbg(1, "01");
00400 
00401         addLiveCal(dataModel);
00402         CommandBuffer& cb = *CommandBuffer::create(feed, line_sz, maxq);
00403         char* lm_line = new char[line_sz+1];
00404 
00405         int tblock_count = 0;
00406         int max_tblocks = 0;    /* no limit */
00407         bool quit_on_stop = false;
00408 
00409         max_tblocks = getenvInt("MAX_TBLOCKS", max_tblocks);
00410         quit_on_stop = getenvInt("QUIT_ON_STOP", quit_on_stop);
00411 
00412         while(max_tblocks == 0 || tblock_count < max_tblocks){
00413                 cb.getNext(lm_line, line_sz);
00414 
00415                 if (isComment(lm_line)){
00416                         continue;
00417                 }else if (inputValidator->isValidInput(lm_line)){
00418                         processTblock(lm_line);
00419                         cb.writeBack(lm_line);
00420                 }else{
00421                         err("bad line: \"%s\"", lm_line);
00422                 }
00423                 cb.dump(lm_line);
00424 
00425                 if (quit_on_stop && !cb.hasDataAvailable() && acq_state_idle()){
00426                         printf("COMPLETE");
00427                         break;
00428                 }
00429         }
00430 
00431         delete [] lm_line;
00432 }
00433 
00434 
00435 
00436 class TblockLiveFeedInputValidator : public InputValidator {
00437 public:
00438         virtual bool isValidInput(char* input_line){
00439                 if (strchr("0123456789", input_line[0]) &&
00440                     strchr("0123456789", input_line[1]) &&
00441                     strchr("0123456789", input_line[2])){
00442                         input_line[3] = '\0';
00443                         return true;
00444                 }else{
00445                         return false;
00446                 }
00447         }
00448 };
00449 
00450 
00451 class SubsetProcessController : public LiveFeedProcessController {
00452 
00453 protected:
00454         int sample_size;
00455 
00456         virtual void processAction(void *pdata, int len, int tblock);
00457 public:
00458         SubsetProcessController(AcqDataModel& _dataModel, const char *_feed) :
00459                 LiveFeedProcessController(_dataModel, _feed)
00460         {
00461                 sample_size = dataModel.getAcqType().getWordSize()*
00462                                 dataModel.getAcqType().getNumChannels();
00463         }
00464         virtual ~SubsetProcessController() {}
00465 };
00466 
00467 void SubsetProcessController::processAction(void *pdata, int len, int tblock)
00468 /* tackle 128 sample bites of the data to the limit of maxlen */
00469 {
00470         int maxlen_sam = Args::maxlen;
00471 
00472         int nbites = maxlen_sam>128? (maxlen_sam / 128): 1;
00473         int stride = len/nbites;
00474         int runlen = MIN(maxlen_sam, 128) * sample_size;
00475         int start = Args::startoff;
00476 
00477         dbg(1, "stride %d runlen %d, len %d", stride, runlen, len);
00478 
00479         for ( ; start + runlen < len; start += stride){
00480                 dbg(1, "45: call ProcessController::processAction %d", start);
00481                 ProcessController::processAction((char*)pdata+start, runlen, tblock);
00482                 dbg(1, "50: done start=%d", start);
00483         }
00484 
00485         dbg(1, "99 start %d runlen %d len %d",
00486                         start, runlen, len);
00487 }
00488 
00489 class LiveLogProcessController : public SubsetProcessController {
00490         unsigned ess;
00491         int samples_per_tblock;
00492         bool appending;
00493 protected:
00494         virtual int processTblock(const char* tbid){
00495                 dbg(1,"01: call ProcessController::processTblock then set appending");
00496 
00497                 int rc = ProcessController::processTblock(tbid);
00498                 appending = true;
00499                 return rc;
00500         }
00501         virtual int dump() {
00502 
00503                 dbg(1, "ess %u samples_per_tblock %d sample_size %d app %d",
00504                                         ess, samples_per_tblock, sample_size,
00505                                         appending);
00506 
00507                 dbg(1, "%p 001 appending %d", this, appending);
00508                 DumpDef dd(WorkingDir::outbase ==0? "/tmp/live_log": WorkingDir::outbase, ess);
00509 
00510                 dd.setAppending(appending);
00511 
00512                 const char* acq_demux_id = getenv("ACQ_DEMUX_ID");
00513                 if (acq_demux_id){
00514                         string pfx(acq_demux_id);
00515                         pfx.append(".");
00516 
00517                         dataModel.setPrefix(pfx);
00518                 }
00519                 dataModel.dump(dd);
00520                 return 0;
00521         }
00522 
00523         virtual void parentDump() {
00524                 ess += samples_per_tblock;
00525                 dbg(1, "ess %d", ess);
00526         }
00527 public:
00528         LiveLogProcessController(AcqDataModel& _dataModel):
00529                 SubsetProcessController(_dataModel, LM_FEED),
00530                 ess(0),
00531                 appending(false)
00532         {
00533                 assert(sample_size != 0);
00534                 samples_per_tblock = TBLOCK_LEN/sample_size;
00535                 inputValidator = new TblockLiveFeedInputValidator();
00536                 maxq = 1;       /* best latency */
00537         }
00538 };
00539 
00540 
00541 #define MEANFILE "/dev/shm/live_means"
00542 
00543 class MeanVisitor: public ChannelVisitor {
00544         AcqDataModel& dataModel;
00545         int fd;
00546         int *pmeans;
00547         int *nmeans;
00548         int len;
00549 
00550         void clear() {
00551                 memset(pmeans, 0, len*sizeof(int));
00552                 memset(nmeans, 0, len*sizeof(int));
00553         }
00554 public:
00555         MeanVisitor(AcqDataModel& _dataModel) :
00556                 dataModel(_dataModel)
00557         {
00558 
00559                 fd = open(MEANFILE, O_WRONLY);
00560                 if (fd <= 0){
00561                         cerr << "Failed to open file:" << MEANFILE << endl;
00562                         perror("");
00563                         exit(-1);
00564                 }
00565                 len = 32+1;             // @@todo also, index from 1?
00566                 pmeans = new int[len];
00567                 nmeans = new int[len];
00568                 clear();
00569 
00570                 dbg(1, "01 fd:%d", fd);
00571         }
00572         virtual ~MeanVisitor() {
00573                 close(fd);
00574                 delete [] pmeans;
00575                 delete [] nmeans;
00576         }
00577 
00578         void dump() {
00579                 dbg(1, "01");
00580 
00581                 for (int ii = 0; ii < len; ++ii){
00582                         int nm = nmeans[ii];
00583                         if (nm){
00584                                 pmeans[ii] /= nm;
00585                         }
00586                 }
00587                 write(fd, pmeans, len);
00588                 clear();
00589                 dbg(1, "99");
00590         }
00591         virtual void onSample(int ichan, int sample)
00592         {
00593                 dbg(2, "chan:%2d sample:%u", ichan, sample);
00594                 pmeans[ichan] += sample;
00595                 nmeans[ichan]++;
00596         }
00597 };
00598 class LiveMeanProcessController : public SubsetProcessController {
00599         MeanVisitor channelVisitor;
00600 protected:
00601         virtual int dump() {
00602                 dataModel.visitChannels(channelVisitor);
00603                 channelVisitor.dump();
00604                 return 0;
00605         }
00606 public:
00607         LiveMeanProcessController(AcqDataModel& _dataModel);
00608 };
00609 
00610 LiveMeanProcessController::LiveMeanProcessController(AcqDataModel& _dataModel) :
00611         SubsetProcessController(_dataModel, LM_FEED),
00612         channelVisitor(_dataModel)
00613 /* map shared memory, set up semaphore ? */
00614 {
00615         dbg(1, "01");
00616 }
00617 
00618 #define _EV_FEED        "/dev/acq200/data.dma/tbstat_ev"
00619 
00620 #define EV_FEED (getenv("EV_FEED")? getenv("EV_FEED"): _EV_FEED)
00621 
00622 
00623 class LivePrePostProcessController;
00624 
00625 class EvLiveFeedInputValidator : public InputValidator {
00626         LivePrePostProcessController& parent;
00627 
00628 public:
00629         virtual bool isValidInput(char* input_line);
00630         EvLiveFeedInputValidator(LivePrePostProcessController& _parent) :
00631                 parent(_parent)
00632         {}
00633 };
00634 
00635 /* @@todo hack!! */
00636 #define SAMPLE_SIZE     (32 * sizeof(short))
00637 #define TBLOCK_LEN      0x600000
00638 #define TBLOCK_HALF     0x300000
00639 #define TBLOCK_GT       0x400000
00640 #define TBLOCK_LT       0x200000
00641 #define TBLOCK_TRIM     0x100000
00642 
00643 class LivePrePostProcessController : public LiveFeedProcessController {
00644         void preprocessCallout();
00645         void postprocessCallout();
00646 protected:
00647         virtual int dump();
00648 
00649         File event_log;
00650 
00651         int previous;
00652         bool rc_previous;
00653         int adjacent_search_length;
00654 public:
00655         bool tblockAlreadyKnown(int tblock){
00656                 if (tblock == 999){
00657                         return true;
00658                 }else if (tblock == previous){
00659                         return !rc_previous;
00660                 }else{
00661                         previous = tblock;
00662                         return false;
00663                 }
00664         }
00665         virtual void processAction(void *pdata, int len, int tblock);
00666 
00667         LivePrePostProcessController(AcqDataModel& _dataModel) :
00668                 LiveFeedProcessController(_dataModel, EV_FEED, 256),
00669                 event_log(WorkingDir::outbase, "tbstat_ev"),
00670                 rc_previous(getenvInt("ACQ_DEMUX_PROCESS_PREVIOUS", true))
00671         {
00672                 MAXQ = 128;
00673                 previous = 999;
00674                 memset(&validatorData, 0, sizeof(validatorData));
00675                 inputValidator = new EvLiveFeedInputValidator(*this);
00676                 show_exit_level = 0;
00677 
00678                 if (getenvInt("ACQ_DEMUX_ADJACENT_SEARCH_HALF", 0) != 0){
00679                         adjacent_search_length = TBLOCK_HALF;
00680                 }else{
00681                         adjacent_search_length = TBLOCK_LEN;
00682                 }
00683         }
00684 
00685         virtual int processTblock(const char *tbdef);
00686         friend class EvLiveFeedInputValidator;
00687 
00688         static inline int B2S(int bytes){
00689                 int samples;
00690 
00691                 samples = bytes/(int)SAMPLE_SIZE;/* (signed) essential!!?? */
00692                 dbg(1, "bytes:%d samples:%d", bytes, samples);
00693                 return samples;
00694         }
00695 };
00696 
00697 
00698 
00699 #define ACQ132_BLOCKLEN (32 * sizeof(short) * 256)
00700 
00701 unsigned round_up(unsigned offset)
00702 {
00703         if ((offset & (ACQ132_BLOCKLEN-1)) == 0){
00704                 return offset;
00705         }else{
00706                 offset &= ~(ACQ132_BLOCKLEN-1);
00707                 offset += ACQ132_BLOCKLEN;
00708                 return offset;
00709         }
00710 }
00711 unsigned round_down(unsigned offset){
00712         return offset &= ~(ACQ132_BLOCKLEN-1);
00713 }
00714 
00715 unsigned round_up_samples(int samples) {
00716         return round_up(samples * 32 * sizeof(short));
00717 }
00718 
00719 
00720 void
00721 LivePrePostProcessController::processAction(void *pdata, int len, int tblock) {
00722         const char *optimise = "none";
00723 
00724         dbg(1, "01: tblock %03d [%03d,%03d,%03d] pdata:%p rawlen 0x%06x optimised:%s",
00725                 tblock,
00726                 validatorData->blocks123[0], validatorData->blocks123[1],
00727                 validatorData->blocks123[2], pdata, len, optimise);
00728 
00729         if (validatorData->tblock == tblock) {
00730                 unsigned low = round_up_samples(Args::pre) + ACQ132_BLOCKLEN;
00731                 unsigned high =
00732                         len - (round_up_samples(Args::post) + ACQ132_BLOCKLEN);
00733 
00734                 if (round_down(validatorData->esoff) > low){
00735                         unsigned del = round_down(validatorData->esoff) - low;
00736                         pdata = (char*)pdata + del;
00737                         validatorData->adjust(B2S(del));
00738                         len -= del;
00739                         optimise = "single block > pre";
00740                         dbg(1, "45: tblock %03d pdata:%p rawlen 0x%06x optimised:%s",
00741                                                 tblock, pdata, len, optimise);
00742                 }
00743                 if (round_up(validatorData->esoff2) < high){
00744                         unsigned del = high - round_up(validatorData->esoff2);
00745                         len -= del;
00746                         optimise = "single block < post";
00747                         dbg(1, "45: tblock %03d pdata:%p rawlen 0x%06x optimised:%s",
00748                                                         tblock, pdata, len, optimise);
00749                 }
00750         } else if (tblock == validatorData->blocks123[0]) {
00751                 int pre_bytes = round_up_samples(Args::pre);
00752                 /* optimise previous - maybe start halfway */
00753                 pdata = (char*) pdata + (TBLOCK_LEN - pre_bytes);
00754                 validatorData->adjust(B2S(-pre_bytes));
00755                 len = pre_bytes;
00756                 AcqDataModel::setProcessNoStashES(len);
00757                 optimise = "left block";
00758         } else if (tblock == validatorData->blocks123[2]) {
00759                 int post_bytes = round_up_samples(Args::post);
00760                 /* optimise next - finish halfway */
00761                 len = post_bytes;
00762                 AcqDataModel::setProcessNoStashES(len);
00763                 optimise = "right block";
00764         }
00765 
00766         dbg(0, "99: tblock %03d pdata:%p rawlen 0x%06x optimised:%s",
00767                                                         tblock, pdata, len, optimise);
00768         ProcessController::processAction(pdata, len, tblock);
00769 }
00770 
00771 
00772 
00773 
00774 /* get the triplet [singlet]
00775  * set parent.validatorData.tblock as the middle [first] number
00776  * eliminate tblocks that have been already processed, pass the rest on
00777  */
00778 bool EvLiveFeedInputValidator::isValidInput(char* input_line){
00779         parent.event_log.writeln(input_line);
00780 
00781         dbg(1, "input_line:\"%s\"", input_line);
00782 
00783         if (parent.validatorData != 0){
00784                 delete parent.validatorData;
00785         }
00786 
00787         ValidatorData* vdata;
00788         if ((vdata = ValidatorData::create(parent, input_line)) != 0){
00789                 char tblocks_to_process[80];
00790                 tblocks_to_process[0] = '\0';
00791                 int nblocks = 0;
00792 
00793                 for (int iblock = 0; iblock < vdata->nblocks; ++iblock){
00794                         if (!parent.tblockAlreadyKnown(
00795                                         vdata->blocks123[iblock])){
00796                                 char block_id[5];
00797                                 snprintf(block_id, 4, "%03d",
00798                                         vdata->blocks123[iblock]);
00799                                 if (strlen(tblocks_to_process)){
00800                                         strcat(tblocks_to_process, ",");
00801                                 }
00802                                 strcat(tblocks_to_process, block_id);
00803                                 ++nblocks;
00804                         }
00805                 }
00806 
00807                 if (nblocks){
00808                         dbg(1, "99 tblocks_to_process \"%s\"", tblocks_to_process);
00809                         strcpy(input_line, tblocks_to_process);
00810                         parent.validatorData = vdata;
00811                         return true;
00812                 }
00813         }
00814         return false;
00815 }
00816 
00817 
00818 void spawn_task(const char *cbuf){
00819         int rc = system(cbuf);
00820 
00821         if (rc == -1){
00822                 err("system() failed");
00823                 exit(-1);
00824         }
00825         if (WEXITSTATUS(rc) != 0){
00826                 err("call to %s failed, drop out", cbuf);
00827                 exit(WEXITSTATUS(rc));
00828         }
00829 }
00830 void LivePrePostProcessController::preprocessCallout()
00831 {
00832         if (char *cmd = getenv("PP_PREP_CALLOUT")){
00833                 char cbuf[128];
00834                 snprintf(cbuf, 128, "%s %d", cmd, validatorData->evnum);
00835 
00836                 spawn_task(cbuf);
00837         }
00838 }
00839 
00840 void LivePrePostProcessController::postprocessCallout()
00841 {
00842         if (char *cmd = getenv("PP_POST_CALLOUT")){
00843                 char cbuf[128];
00844                 snprintf(cbuf, 128, "%s %d", cmd, validatorData->evnum);
00845 
00846                 spawn_task(cbuf);
00847         }
00848 }
00849 
00850 /*
00851  * making a group format:
00852 ls -1 * | grep COOKED | sed -e 's!^!/INCLUDE !' -e 's/:$//'
00853 /INCLUDE 00_0124870400.COOKED
00854 /INCLUDE 01_0126870015.COOKED
00855 /INCLUDE 02_0127003648.COOKED
00856  *
00857  * >format
00858  ls -1 * | grep COOKED | sed -e 's!^!INCLUDE !' -e 's!:$!/format!' >format
00859  */
00860 
00861 int LivePrePostProcessController::processTblock(const char *tbdef)
00862 {
00863         preprocessCallout();
00864 
00865         int rc = LiveFeedProcessController::processTblock(tbdef);
00866 
00867         if (rc < 0){
00868                 err("LiveFeedProcessController::processTblock returned %d", rc);
00869         }else if (rc == 0){
00870                 err("LiveFeedProcessController::processTblock NO EVENTS");
00871         }else{
00872                 validatorData->evnum += rc;
00873                 if (rc > 1){
00874                         dbg(1, "evnum:%d multiple events in Tblock: %d",
00875                                         validatorData->evnum, rc);
00876                 }else{
00877                         dbg(1, "single event processed");
00878                 }
00879 
00880                 postprocessCallout();
00881         }
00882         return rc;
00883 }
00884 
00885 static void updateEvnIndicator(int evnum)
00886 {
00887         char evn_buf[80];
00888         sprintf(evn_buf, "%d\n", evnum);
00889         File evn("/tmp", "acq_demux_evn", "w");
00890         evn.writeln(evn_buf);
00891 }
00892 
00893 int LivePrePostProcessController::dump() {
00894         int num_events = 0;
00895         int evnum = validatorData->evnum;
00896 
00897         for (vector<int>::iterator event_it = dataModel.getEvents().begin();
00898                         event_it != dataModel.getEvents().end();
00899                         ++event_it, ++num_events){
00900 
00901                 int event_offset = *event_it;
00902 
00903                 ++evnum;
00904 
00905                 dbg(1, "evnum:%d dump event at %d", evnum, event_offset);
00906 
00907                 char ev_id[80];
00908                 const char* acq_demux_id = getenv("ACQ_DEMUX_ID");
00909                 if (acq_demux_id){
00910                         strncpy(ev_id, acq_demux_id, 80);
00911                 }
00912 
00913                 sprintf(ev_id, "%s%sEV%02d%s",
00914                                 acq_demux_id? acq_demux_id: "",
00915                                 acq_demux_id? ".": "", evnum, ".");
00916                 string pfx(ev_id);
00917                 dataModel.setPrefix(pfx);
00918 
00919                 char _current_root[128];
00920 
00921                 sprintf(_current_root, "%03d", evnum);
00922 
00923 
00924                 WorkingDir mydir(_current_root);
00925                 DumpDef dd(mydir.name(),
00926                                 validatorData->getSampleOffset(event_offset),
00927                                 event_offset, Args::pre, Args::post);
00928 
00929                 dbg(1, "dirFile %s pss %lu", dd.root.c_str(), dd.event_sample_start);
00930                 dataModel.dump(dd);
00931                 dataModel.dumpFormat(dd.root, event_offset +
00932                                 validatorData->getSampleOffset(event_offset));
00933                 dbg(1, "finished with event %d", evnum);
00934 
00935                 updateEvnIndicator(evnum);
00936         }
00937 
00938         const char *pj = getenv("ACQ_DEMUX_POSTEVENT_JOB");
00939         if (pj){
00940                 system(pj);
00941         }
00942 
00943         dbg(1, "return num_events %d", num_events);
00944         return num_events;
00945 }
00946 
00947 
00948 class OfflineProcessController: public ProcessController {
00949         const char* arg1;
00950 public:
00951         OfflineProcessController(AcqDataModel& _dataModel):
00952                 ProcessController(_dataModel),
00953                 arg1(0)
00954         {}
00955 
00956         virtual int dump();
00957         virtual void run(poptContext& opt_context);
00958 };
00959 
00960 int OfflineProcessController::dump()
00961 {
00962         WorkingDir mydir(arg1);
00963         string dirFile(mydir.name());
00964         if (Args::pre == 0 && Args::post == 0){
00965                 dataModel.dump(dirFile);
00966         }else{
00967                 DumpDef dd(dirFile, 0, 0, Args::pre, Args::post);
00968                 dataModel.dump(dd);
00969         }
00970         dataModel.dumpFormat(dirFile);
00971         return 0;
00972 }
00973 void OfflineProcessController::run(poptContext& opt_context)
00974 {
00975         const char* arg;
00976         int nargs = 0;
00977 
00978         while((arg = poptGetArg(opt_context)) != 0){
00979                 processRaw(arg);
00980                 if (nargs == 0){
00981                         if (arg1 == 0){
00982                                 arg1 = arg;
00983                         }
00984                 }
00985                 ++nargs;
00986         }
00987         dump();
00988 }
00989 
00990 class IncomingProcessController: public ProcessController {
00991 
00992         const char* tb_name;
00993         int pulse;
00994         int pps;                /* pulse per second */
00995         int second;
00996         int minute;
00997         char *current_root;
00998 
00999         void copyLatest(char *complete_second);
01000 
01001         void runPulse(char time[], char file[], char event[]);
01002 
01003 protected:
01004         virtual void processRawFiles(const char* tbdef) {
01005                 processRaw(tbdef);
01006         }
01007 public:
01008         IncomingProcessController(AcqDataModel& _dataModel):
01009                 ProcessController(_dataModel),
01010                 pulse(0)
01011         {
01012                 pps = getenvInt("PULSE_PER_SECOND", 5);
01013                 src_fmt = "";
01014                 current_root = new char[256];
01015 
01016                 char current[256];
01017                 sprintf(current, "%s/current", WorkingDir::outbase);
01018                 mkdir(current, 0777);
01019         }
01020         virtual ~IncomingProcessController() {
01021                 delete [] current_root;
01022         }
01023 
01024         virtual int dump();
01025         virtual void run(poptContext& opt_context);
01026 };
01027 
01028 int IncomingProcessController::dump()
01029 {
01030         dbg(1, "");
01031         char wd[128];
01032 
01033         sprintf(wd, "%s/%03d", current_root, pulse);
01034 
01035 
01036         WorkingDir mydir(wd, ABSDIR);
01037         DumpDef dd(mydir.name(), 0, 0, 0, 0);
01038 /*
01039                    event_offset + validatorData.pss,
01040                    event_offset, pre, post);
01041 */
01042         dbg(1, "dirFile %s pss %lu", dd.root.c_str(), dd.event_sample_start);
01043         dataModel.dump(dd);
01044         dataModel.dumpFormat(dd.root, 0);
01045         return 0;
01046 }
01047 
01048 void IncomingProcessController::copyLatest(char *complete_second)
01049 {
01050         char cmd[256];
01051 #if 1
01052         sprintf(cmd,
01053         "mkdir %s/current.new;cp -r %s/* %s/current.new;make.format %s/current.new",
01054                 WorkingDir::outbase, complete_second, WorkingDir::outbase, WorkingDir::outbase);
01055         system(cmd);
01056         sprintf(cmd, "mv -T %s/current.new %s/current", WorkingDir::outbase, WorkingDir::outbase);
01057         system(cmd);
01058 #endif
01059 #if 0
01060         "rm -Rf %s/current/*;cp -r %s/* %s/current;make.format %s/current",
01061                         WorkingDir::outbase, complete_second, WorkingDir::outbase, WorkingDir::outbase);
01062         system(cmd);
01063 #endif
01064         /* this really doesn't work well. Re-write using getdata library? */
01065 }
01066 
01067 void IncomingProcessController::runPulse(char time[], char file[], char event[])
01068 {
01069         if (pulse%pps == 0){
01070                 copyLatest(current_root);
01071                 ++second;
01072 
01073                 if ((pulse/pps)%60 == 0){
01074                         ++minute;
01075                         second = 0;
01076                         sprintf(current_root, "%s/%03d",
01077                                         WorkingDir::outbase, minute);
01078                         if (mkdir(current_root, 0777)){
01079                                 err("failed to create \"%s\"", current_root);
01080                                 exit(errno);
01081                         }
01082                 }
01083 
01084                 sprintf(current_root, "%s/%03d/%02d",
01085                                 WorkingDir::outbase, minute, second);
01086                 if (mkdir(current_root, 0777)){
01087                         err("failed to create \"%s\"", current_root);
01088                         exit(errno);
01089                 }
01090         }
01091         char _pulse_pfx[80];
01092         sprintf(_pulse_pfx, "%02d_", (pulse%pps)+1);
01093         string pulse_pfx(_pulse_pfx);
01094         dataModel.setPrefix(pulse_pfx);
01095 
01096         ++pulse;
01097 
01098         dbg(1, "file:\"%s\"", file);
01099         processTblock(tb_name = file);
01100         dataModel.clear();
01101 }
01102 void IncomingProcessController::run(poptContext& opt_context)
01103 {
01104         const char* arg = poptGetArg(opt_context);
01105         char waits_tblocks[80];
01106         char latest_tblock[80] = {};
01107         char time[80];
01108         char file[80];
01109         char event[80];
01110 
01111         if (arg == 0){
01112                 arg = ".";
01113         }
01114 
01115         sprintf(waits_tblocks, "dir-watch %s", arg);
01116 
01117         FILE *fstat = popen(waits_tblocks, "r");
01118 
01119         if (fstat == 0){
01120                 err("failed to spawn \"%s\"", waits_tblocks);
01121                 exit(errno);
01122         }
01123         while(fgets(latest_tblock, 80, fstat)){
01124                 dbg(2, "tblock: \"%s\"", latest_tblock);
01125                 if (sscanf(latest_tblock, "%s %s %s", time, file, event) == 3){
01126                         runPulse(time, file, event);
01127                 }
01128         }
01129         pclose(fstat);
01130 }
01131 
01132 
01133 
01134 
01135 ProcessControllerRegistry& ProcessControllerRegistry::instance()
01136 {
01137         static ProcessControllerRegistry pcr;
01138 
01139         return pcr;
01140 }
01141 void ProcessControllerRegistry::registerController(const string key, ProcessControllerCreator* pcc)
01142 {
01143         creators[key] = pcc;
01144 }
01145 
01146 ProcessController* ProcessController::create(
01147                 const string key,  AcqDataModel& dataModel)
01148 {
01149         ProcessControllerCreator* pcc = ProcessControllerRegistry::instance().creators[key];
01150 
01151         if (pcc){
01152                 return pcc->create(dataModel);
01153         }
01154 
01155         map<const string, ProcessControllerCreator*>::const_iterator it;
01156         for (it = ProcessControllerRegistry::instance().creators.begin();
01157              it != ProcessControllerRegistry::instance().creators.end(); ++it){
01158                 cerr << it->first << endl;
01159                 if (it->first == key){
01160                         pcc = it->second;
01161                         return pcc->create(dataModel);
01162                 }
01163         }
01164         exit(1);
01165 #if 0
01166         if (pcc){
01167                 return pcc;
01168         }
01169 
01170         }else if (key == "make.acq200.format"){
01171                 /*
01172                 addLiveCal(dataModel);
01173                 dataModel.has_timebase = false;
01174                 dataModel.ch_name_core = "";
01175                 dataModel.dumpFormat(WorkingDir::outbase?
01176                                 WorkingDir::outbase: "/dev/acq200/data");
01177                 */
01178                 return 0;
01179         }else if (key == "acq_demux-trial"){
01180                 /*
01181                 dataModel.print();
01182                 */
01183                 return 0;
01184         }
01185 #endif
01186 }
01187 
01188 
01189 class InitializesDefaults {
01190         class OfflineProcessControllerCreator : public ProcessControllerCreator {
01191         public:
01192                 ProcessController* create(AcqDataModel& _dataModel) const {
01193                         return new OfflineProcessController( _dataModel);
01194                 }
01195         };
01196         class LiveMeanProcessControllerCreator : public ProcessControllerCreator {
01197         public:
01198                 ProcessController* create(AcqDataModel& _dataModel) const {
01199                         return new LiveMeanProcessController( _dataModel);
01200                 }
01201         };
01202         class LiveLogProcessControllerCreator : public ProcessControllerCreator {
01203         public:
01204                 ProcessController* create(AcqDataModel& _dataModel) const {
01205                         return new LiveLogProcessController( _dataModel);
01206                 }
01207         };
01208         class LivePrePostProcessControllerCreator : public ProcessControllerCreator {
01209         public:
01210                 ProcessController* create(AcqDataModel& _dataModel) const {
01211                         return new LivePrePostProcessController( _dataModel);
01212                 }
01213         };
01214         class IncomingProcessControllerCreator : public ProcessControllerCreator {
01215         public:
01216                 ProcessController* create(AcqDataModel& _dataModel) const {
01217                         return new IncomingProcessController( _dataModel);
01218                 }
01219         };
01220 public:
01221         InitializesDefaults() {
01222                 ProcessControllerRegistry::instance().registerController(
01223                                 "acq_demux",
01224                                 new OfflineProcessControllerCreator());
01225 
01226                 ProcessControllerRegistry::instance().registerController(
01227                                 "acq_demux-lm",
01228                                 new LiveMeanProcessControllerCreator());
01229                 ProcessControllerRegistry::instance().registerController(
01230                                 "acq_demux-ll",
01231                                 new LiveLogProcessControllerCreator());
01232                 ProcessControllerRegistry::instance().registerController(
01233                                 "acq_demux-lpp",
01234                                 new LivePrePostProcessControllerCreator());
01235                 ProcessControllerRegistry::instance().registerController(
01236                                 "acq_demux_incoming",
01237                                 new IncomingProcessControllerCreator());
01238         }
01239 };
01240 
01241 int ValidatorData::evnum;
01242 
01243 static InitializesDefaults ID;
01244 
01245