ACQ2XX_API
MdsProcessController.cpp
Go to the documentation of this file.
00001 /*
00002  * MdsProcessController.cpp
00003  *
00004  *  Created on: Feb 12, 2011
00005  *      Author: pgm
00006  */
00007 
00008 #define TIMEBASE_DEBUG
00009 
00010 #include "local.h"
00011 
00012 #include <assert.h>
00013 #include <iostream>
00014 #include <map>
00015 #include <vector>
00016 #include <set>
00017 
00018 
00019 #include <stdlib.h>
00020 #include <libgen.h>     /* dirname() */
00021 #include <unistd.h>
00022 #include <errno.h>
00023 
00024 #include <fcntl.h>
00025 #include <sys/mman.h>
00026 #include <sys/stat.h>
00027 #include <sys/types.h>
00028 #include <sys/stat.h>
00029 #include <sys/wait.h>
00030 
00031 
00032 #include "popt.h"
00033 #include <string.h>
00034 #include <vector>
00035 
00036 #include "AcqType.h"
00037 #include "AcqDataModel.h"
00038 #include "ProcessController.h"
00039 #include "acq_demux.h"
00040 
00041 #include "MdsProcessController.h"
00042 #include "Timer.h"
00043 #include <mdsobjects.h>
00044 using namespace MDSplus;
00045 
00046 bool no_store = false;
00047 
00048 char *fgets_t(char *s, int size, FILE* stream, int timeout);
00049 
00050 #define TBCHAN  0               /* "CH00" is holds timebase */
00051 
00052 #define PS_HEADER 0xfeedc0de
00053 #define PS_FOOTER 0xdeadc0de
00054 
00055 struct ProcessState {
00056         unsigned ps_header;
00057         unsigned long long timestamp;
00058         int nfiles;
00059         int nsegs;
00060         char last_file[128];
00061         unsigned ps_footer;
00062 
00063         ProcessState(){
00064                 memset(this, 0, sizeof(ProcessState));
00065                 ps_header = PS_HEADER;
00066                 ps_footer = PS_FOOTER;
00067         }
00068 };
00069 
00070 typedef vector<int> ActiveChannels;
00071 typedef vector<int>::iterator ActiveChannelsIt;
00072 
00073 class MdsWrapper {
00074 
00075 protected:
00076         ActiveChannels active_channels;
00077         AcqDataModel& dataModel;
00078         virtual void mdsPutSegments(Tree *tree, const char *field_fmt) = 0;
00079 
00080         void mdsPutCal(Tree *tree, const char *field_fmt);
00081 
00082         void storeChannels(Tree *tree, const char* field_fmt,
00083                         Data* start, Data* end, Data* dimension,
00084                         unsigned istart, unsigned len);
00085         void storeTimebase(Tree *tree, const char* field_fmt,
00086                                 Data* start, Data* end, Data* dimension,
00087                                 unsigned istart, unsigned len,
00088                                 double t1, double isi);
00089 
00090 
00091 public:
00092         void mdsPut(const char* tree, const char* field_fmt);
00093 
00094         MdsWrapper(AcqDataModel& _dataModel);
00095         virtual ~MdsWrapper() {}
00096 
00097         virtual void setN1(long long t1) {
00098                 err("setN1 not supported by this wrapper");
00099                 return;
00100         }
00101 
00102         void setActiveChannel(int channel, bool on);
00103         void setActiveChannelRange(int c1, int c2, bool on);
00104         void clearActiveChannelRange(void){
00105                 active_channels.clear();
00106         }
00107 };
00108 
00109 MdsWrapper::MdsWrapper(AcqDataModel& _dataModel) :
00110         dataModel(_dataModel)
00111 {
00112         setActiveChannelRange(1, dataModel.getAcqType().getNumChannels(), true);
00113 }
00114 
00115 
00116 void MdsWrapper::setActiveChannel(int channel, bool on)
00117 {
00118         for (ActiveChannelsIt it = active_channels.begin();
00119                                                                 it < active_channels.end(); it++){
00120                 if (*it == channel){
00121                         if (on){
00122                                 return;
00123                         }else{
00124                                 active_channels.erase(it);
00125                                 return;
00126                         }
00127                 }
00128         }
00129         if (on){
00130                 active_channels.push_back(channel);
00131         }
00132 }
00133 
00134 /* this isn't the most efficient implementation, but it's only an init thing */
00135 void MdsWrapper::setActiveChannelRange(int c1, int c2, bool on)
00136 {
00137         for (int cx = c1; cx <= c2; ++cx){
00138                 setActiveChannel(cx, on);
00139         }
00140 }
00141 
00142 
00143 class ContinuousMdsWrapper : public MdsWrapper {
00144         /** continuous capture, no timestamp **/
00145 protected:
00146         long long n1;           /* _may_ be used by subclass */
00147         virtual void mdsPutSegments(Tree *tree, const char *field_fmt);
00148 
00149 public:
00150         ContinuousMdsWrapper(AcqDataModel& _dataModel) :
00151                 MdsWrapper(_dataModel), n1(0)
00152         {}
00153         virtual ~ContinuousMdsWrapper() {}
00154         virtual void setN1(long long _n1) {
00155                         n1 = _n1;
00156         }
00157 };
00158 
00159 
00160 class SegmentMdsWrapper : public MdsWrapper {
00161 
00162         virtual void mdsPutSegments(Tree *tree, const char *field_fmt);
00163 public:
00164         SegmentMdsWrapper(AcqDataModel& _dataModel) :
00165                 MdsWrapper(_dataModel)
00166         {}
00167 };
00168 
00169 class TimestampedSegmentMdsWrapper : public MdsWrapper {
00170         void storeChannels(Tree *tree, const char *field_fmt,
00171                         unsigned long long ts, unsigned istart, unsigned len);
00172         virtual void mdsPutSegments(Tree *tree, const char *field_fmt);
00173 public:
00174         TimestampedSegmentMdsWrapper(AcqDataModel& _dataModel) :
00175                 MdsWrapper(_dataModel)
00176         {}
00177 };
00178 
00179 class MdsProcessController : public ProcessController {
00180 
00181 protected:
00182         virtual int dump() {
00183                 return 0;
00184         }
00185 
00186         MdsWrapper* mdsWrapper;
00187 
00188         char *eventName;
00189         double eventMin;
00190 
00191         Timer *shot_time;
00192         double last_time;
00193         ProcessState* state;
00194         int fd_state;
00195         bool first_time;
00196         bool remove_when_done;
00197 
00198         int consecutive_child_errors;
00199         const int CHILD_ERROR_MAX;
00200 
00201         void mdsPut(const char* tree, const char* field_fmt);
00202 
00203         void onSegmentComplete(int segment);
00204 
00205         void createState(const char* tree, const char *field);
00206 
00207         virtual bool getDef(char *fname, int maxdef);
00208 
00209 public:
00210         MdsProcessController(AcqDataModel& _dataModel, MdsWrapper* _mdsWrapper);
00211         MdsProcessController(AcqDataModel& _dataModel);
00212         virtual ~MdsProcessController();
00213 
00214         virtual void processRawFiles(const char* tbdef);
00215         virtual void run(poptContext& opt_context);
00216 
00217         static void set_alarm(bool enable);
00218         static int rtrim;               /* trim samples from end of pulse */
00219 
00220         static bool isReadableFile(const char* path);
00221 };
00222 
00223 int MdsProcessController::rtrim;
00224 
00225 bool MdsProcessController::isReadableFile(const char* path)
00226 {
00227         struct stat sb;
00228 
00229         if (stat(path, &sb) == 0){
00230                 return sb.st_size > 0;
00231         }
00232         return false;
00233 }
00234 bool MdsProcessController::getDef(char *fname, int maxdef) {
00235         if (!first_time){
00236                 set_alarm(1);
00237         }
00238         first_time = false;
00239         while (true){
00240                 if (fgets(fname, maxdef, Args::config_fp) == 0){
00241                         return false;
00242                 }
00243 
00244                 set_alarm(0);
00245                 if (fname[0] == '#'){
00246                         continue;
00247                 }else{
00248                         chomp(fname);
00249                         if (!isReadableFile(fname)){
00250                                 err("input not appropriate file");
00251                                 exit(1);
00252                         }
00253                         dbg(1, "%s", fname);
00254                         return true;
00255                 }
00256         }
00257 }
00258 void MdsProcessController::processRawFiles(const char* tbdef)
00259 {
00260         if (strstr(tbdef, ",") == 0){
00261                 processRaw(tbdef);
00262         }else{
00263                 char buf[128];
00264                 char *str;
00265                 char *tp;
00266                 char *tbid;
00267 
00268 
00269                 dbg(1, "files:\"%s\"", tbdef);
00270 
00271                 strncpy(buf, tbdef, sizeof(buf)-1);
00272 
00273                 for(str = buf; (tbid = strtok_r(str, ",", &tp)); str = NULL){
00274                         char tbname[80];
00275                         sprintf(tbname, src_fmt, tbid);
00276                         dbg(2, "tbid:\"%s\" tbname:\"%s\"", tbid, tbname);
00277                         processRaw(tbname, strtol(tbid, 0, 10));
00278                 }
00279         }
00280 }
00281 
00282 void MdsProcessController::onSegmentComplete(int segment)
00283 {
00284         if (eventName && shot_time->timeFromStart() - last_time > eventMin){
00285                 char *key = getenv("CPP_EVENTS_GOOD");
00286                 if (key != 0 && atoi(key)){
00287                         Data* data = new Int32(segment);
00288                         dbg(1, "call setEvent \"%s\"", eventName);
00289                         Event::setEvent(eventName, data);
00290                         dbg(2, "done setEvent \"%s\"", eventName);
00291                         deleteData(data);
00292                         dbg(2, "deleteData done");
00293                 }else{
00294                         FILE *pp = popen("/usr/local/mdsplus/bin/tdic", "w");
00295                         fprintf(pp, "setevent(\"%s\");\nexit\n", eventName);
00296                         pclose(pp);
00297                 }
00298                 last_time = shot_time->timeFromStart();
00299         }
00300 }
00301 
00302 MdsProcessController::MdsProcessController(
00303                 AcqDataModel& _dataModel, MdsWrapper* _mdsWrapper) :
00304         ProcessController(_dataModel),
00305         mdsWrapper(_mdsWrapper),
00306         first_time(true),
00307         remove_when_done(true),
00308         consecutive_child_errors(0),
00309         CHILD_ERROR_MAX(10)
00310 {
00311         char *key;
00312 
00313         if ((key = getenv("MDS_EVENT")) != 0){
00314                 eventName = new char[128];
00315                 int nc = sscanf(key, "%s %f", eventName, &eventMin);
00316                 if (nc == 1){
00317                         eventMin = 1;
00318                 }
00319         }else{
00320                 eventName = 0;
00321         }
00322 
00323         if ((key = getenv("MDS_ACTIVE_CHANNELS")) != 0){
00324                 int c1, c2;
00325                 if (sscanf(key, "%d,%d", &c1, &c2) == 2){
00326                         info("setting active channel range %d,%d", c1, c2);
00327                         mdsWrapper->clearActiveChannelRange();
00328                         mdsWrapper->setActiveChannelRange(c1, c2, true);
00329                 }
00330         }
00331         if ((key = getenv("MDS_RTRIM")) != 0){
00332                 rtrim = atoi(getenv("MDS_RTRIM"));
00333         }
00334 }
00335 
00336 MdsProcessController::MdsProcessController(AcqDataModel& _dataModel) :
00337         ProcessController(_dataModel),
00338         first_time(true),
00339         remove_when_done(true),
00340         consecutive_child_errors(0),
00341         CHILD_ERROR_MAX(10)
00342 {
00343         // TODO Auto-generated constructor stub
00344         const char* key = getenv("MDS_USE_TIMESTAMP_SEGS");
00345         MdsWrapper* _mdsWrapper;
00346         if (key && atoi(key) == 1){
00347                 _mdsWrapper = new TimestampedSegmentMdsWrapper(dataModel);
00348         }else{
00349                 _mdsWrapper = new SegmentMdsWrapper(dataModel);
00350         }
00351 
00352         MdsProcessController(_dataModel, _mdsWrapper);
00353 }
00354 
00355 MdsProcessController::~MdsProcessController() {
00356         // TODO Auto-generated destructor stub
00357 }
00358 
00359 
00360 void MdsWrapper::storeChannels(Tree *tree, const char* field_fmt,
00361                         Data* start, Data* end, Data* dimension,
00362                         unsigned istart, unsigned len)
00363 {
00364         int nchan = dataModel.getAcqType().getNumChannels();
00365 
00366         if (getenv("ACQ_DEMUX_CHANNEL_ONE_ONLY")){
00367                 nchan = 1;                              /* reduced data for testing */
00368         }
00369 
00370         for (ActiveChannelsIt it = active_channels.begin();
00371                                                         it < active_channels.end(); it++){
00372                 int ch = *it;
00373                 char node_name[128];
00374                 snprintf(node_name, 128, field_fmt, ch);
00375                 Array *data = new Int16Array(
00376                                 &dataModel.getChannelData(ch)[istart], len);
00377                 TreeNode *node = tree->getNode(node_name);
00378                 node->beginSegment(start, end, dimension, data);
00379                 deleteData(node);
00380                 deleteData(data);
00381         }
00382 }
00383 
00384 void MdsWrapper::storeTimebase(Tree *tree, const char* field_fmt,
00385                 Data* start, Data* end, Data* dimension,
00386                 unsigned istart, unsigned len,
00387                 double t1, double isi)
00388 {
00389         double* tb_seg = new double[len];
00390         double tx = t1;
00391         char node_name[128];
00392         snprintf(node_name, 128, field_fmt, TBCHAN);
00393 
00394         for (unsigned ii = 0; ii != len; ++ii){
00395                 tb_seg[ii] = tx;
00396                 tx += isi;
00397         }
00398         Array *data = new Float64Array(tb_seg, len);
00399         TreeNode *node = tree->getNode(node_name);
00400         node->beginSegment(start, end, dimension, data);
00401         deleteData(node);
00402         deleteData(data);
00403         delete [] tb_seg;
00404 }
00405 
00406 void ContinuousMdsWrapper::mdsPutSegments(Tree *tree, const char *field_fmt)
00407 {
00408         double isi_sec = Clock::sample_clock_ns/1e9;
00409         double t1 = (double)n1 * isi_sec;
00410 
00411         unsigned int len = dataModel.getChannelData(1).size();
00412         double len_sec = (double)len * isi_sec;
00413 
00414         Data* start = new Float64(t1);
00415         Data* end   = new Float64(t1 + len_sec);
00416         Data* dimension = new Range(start, end, new Float64(isi_sec));
00417 
00418         dbg(1, "t1:%.3f len:%d", t1, len);
00419 
00420         if (!no_store){
00421                 storeChannels(tree, field_fmt, start, end, dimension, 0, len);
00422                 storeTimebase(tree, field_fmt, start, end, dimension, 0, len,
00423                                 t1, isi_sec);
00424         }else{
00425                 info("storeSeg STUBBED");
00426         }
00427 
00428         deleteData(dimension);
00429 }
00430 
00431 void SegmentMdsWrapper::mdsPutSegments(Tree *tree, const char *field_fmt)
00432 {
00433         vector<NewEventSignature*> eventSignatures =
00434                         dataModel.getEventSignatures();
00435         vector<NewEventSignature*>::iterator it = eventSignatures.begin();
00436         unsigned int sample_cursor = 0;
00437 
00438         if (it == eventSignatures.end()){
00439                 err("no event signatures");
00440                 exit(1);
00441         }
00442         if ((*it)->getSampleCursor() != 0){
00443                 err("unable to create timebase unless es at sample 0");
00444                 exit(1);
00445         }
00446         double t1 = (*it)->timeInSeconds();
00447         double isi_sec = Clock::sample_clock_ns/1e9;
00448         unsigned int sample_max = dataModel.getChannelData(1).size();
00449 
00450 #ifdef TIMEBASE_DEBUG
00451         File tb("/tmp", field_fmt);
00452 #endif
00453         for (++it; it != eventSignatures.end(); ++it){
00454                 double t2 = (*it)->timeInSeconds();
00455                 unsigned c2 = (*it)->getSampleCursor();
00456                 int len = c2 - sample_cursor - MdsProcessController::rtrim;
00457                 double lensec = ((double)(len))*isi_sec;
00458 
00459                 Data* start = new Float64(t1);
00460                 Data* end   = new Float64(t1 + lensec);
00461                 Data* dimension = new Range(start, end, new Float64(isi_sec));
00462 #ifdef TIMEBASE_DEBUG
00463                 fprintf(tb.getFp(), "%f,%f,%f,%d\n",
00464                                 t1, t1+lensec, isi_sec, len);
00465 #endif
00466                 dbg(1, "call storeSeg t1:%f t2:%f isi:%f", t1, t1+lensec, isi_sec);
00467 
00468                 if (!no_store){
00469                         storeChannels(tree, field_fmt, start, end, dimension,
00470                                         sample_cursor, len);
00471                         storeTimebase(tree, field_fmt, start, end, dimension,
00472                                         sample_cursor, len,
00473                                         t1, isi_sec);
00474                 }else{
00475                         info("storeSeg STUBBED");
00476                 }
00477 
00478                 deleteData(dimension);
00479                 t1 = t2;
00480                 sample_cursor = c2;
00481         }
00482 
00483         if (sample_cursor < sample_max){
00484                 int len = sample_max - sample_cursor - MdsProcessController::rtrim;
00485                 double lensec = ((double)(len))*isi_sec;
00486 
00487                 Data* start = new Float64(t1);
00488                 Data* end   = new Float64(t1 + lensec);
00489                 Data* dimension = new Range(start, end, new Float64(isi_sec));
00490 
00491                 storeChannels(tree, field_fmt, start, end, dimension,
00492                         sample_cursor, len);
00493                 storeTimebase(tree, field_fmt, start, end, dimension,
00494                         sample_cursor, len,
00495                         t1, isi_sec);
00496         }
00497 }
00498 
00499 
00500 void TimestampedSegmentMdsWrapper::storeChannels(
00501         Tree *tree, const char *field_fmt, unsigned long long ts, unsigned istart, unsigned len)
00502 {
00503         int nchan = dataModel.getAcqType().getNumChannels();
00504 
00505         if (getenv("ACQ_DEMUX_CHANNEL_ONE_ONLY")){
00506                 nchan = 1;                              /* reduced data for testing */
00507         }
00508 
00509         _int64 ts64 = ts;
00510 
00511         /* @@todo .. big assumption that ch starts at 1 and ends at N */
00512         for (int ch = 1; ch <= nchan; ++ch){
00513                 char node_name[128];
00514                 snprintf(node_name, 128, field_fmt, ch);
00515                 Array *data = new Int16Array(
00516                                 &dataModel.getChannelData(ch)[istart], len);
00517                 TreeNode *node = tree->getNode(node_name);
00518                 node->putRow(data, &ts64);
00519                 deleteData(data);
00520         }
00521 }
00522 
00523 
00524 void TimestampedSegmentMdsWrapper::mdsPutSegments(Tree *tree, const char *field_fmt)
00525 {
00526         vector<NewEventSignature*> eventSignatures =
00527                         dataModel.getEventSignatures();
00528         vector<NewEventSignature*>::iterator it = eventSignatures.begin();
00529         unsigned int sample_cursor = 0;
00530 
00531         if (it == eventSignatures.end()){
00532                 err("no event signatures");
00533                 exit(1);
00534         }
00535         if ((*it)->getSampleCursor() != 0){
00536                 err("unable to create timebase unless es at sample 0");
00537                 exit(1);
00538         }
00539         unsigned long long ts1 = (*it)->getTimeStamp();
00540         unsigned int sample_max = dataModel.getChannelData(1).size();
00541 
00542 #ifdef TIMEBASE_DEBUG
00543         File tb("/tmp", field_fmt);
00544 #endif
00545         for (++it; it != eventSignatures.end(); ++it){
00546                 unsigned long long ts2 = (*it)->getTimeStamp();
00547                 unsigned c2 = (*it)->getSampleCursor();
00548 
00549 #ifdef TIMEBASE_DEBUG
00550                 fprintf(tb.getFp(), "%ull %u\n", ts1, sample_cursor);
00551 #endif
00552                 if (!no_store){
00553                         storeChannels(tree, field_fmt, ts1,
00554                                         sample_cursor, c2-sample_cursor);
00555                 }else{
00556                         info("storeSeg STUBBED");
00557                 }
00558 
00559                 ts1 = ts2;
00560                 sample_cursor = c2;
00561         }
00562 
00563         if (sample_cursor < sample_max && !no_store){
00564                 storeChannels(tree, field_fmt, ts1,
00565                                         sample_cursor, sample_max-sample_cursor);
00566         }
00567 }
00568 
00569 void MdsWrapper::mdsPutCal(Tree *tree, const char* field_fmt)
00570 {
00571         int nchan = dataModel.getAcqType().getNumChannels();
00572 
00573         for (int ch = 1; ch <= nchan; ++ch){
00574                 char node_name[128];
00575                 char gain_name[128];
00576                 char offset_name[128];
00577 
00578                 snprintf(node_name, 128, field_fmt, ch);
00579                 snprintf(gain_name, 128, "%s:GAIN_V", node_name);
00580                 snprintf(offset_name, 128, "%s:OFFSET_V", node_name);
00581 
00582                 double gain_v, offset_v;
00583 
00584                 dataModel.getAcqCal()->getCal(ch, gain_v, offset_v);
00585                 Float64* gv = new Float64(gain_v);
00586                 Float64* ov = new Float64(offset_v);
00587 
00588                 tree->getNode(gain_name)->putData(gv);
00589                 tree->getNode(offset_name)->putData(ov);
00590 
00591                 deleteData(gv);
00592                 deleteData(ov);
00593         }
00594 }
00595 void MdsWrapper::mdsPut(const char* _tree, const char* field_fmt)
00596 {
00597         try {
00598                 // Tree signature should be const char*
00599                 char *tree_tmp = new char[strlen(_tree)+1];
00600                 strcpy(tree_tmp, _tree);
00601                 Tree *tree = new Tree(tree_tmp, 0, "EDIT");
00602                 mdsPutCal(tree, field_fmt);
00603                 mdsPutSegments(tree, field_fmt);
00604                 tree->write();
00605                 delete tree;
00606         } catch(...) {
00607                 cerr << "ERROR failed to open Tree \"" << _tree << "\"" << endl;
00608                 exit(1);
00609         }
00610 
00611 }
00612 
00613 
00614 
00615 
00616 void MdsProcessController::set_alarm(bool enable)
00617 /* some lib code has nobbled normal alarm handling. So we have to get creative*/
00618 {
00619         static FILE *wdt;
00620 
00621         if (wdt == 0){
00622                 if (getenv("MDS_TIMEOUT")){
00623                         int msecs = atoi(getenv("MDS_TIMEOUT"));
00624 
00625                         char cmd[80];
00626                         sprintf(cmd, "wdt %d %d", msecs, getpid());
00627                         wdt = popen(cmd, "w");
00628 
00629                         if (wdt == 0){
00630                                 perror("FAILED to spawn wdt");
00631                                 exit(errno);
00632                         }
00633                 }else{
00634                         return;
00635                 }
00636 
00637         }
00638 
00639         fprintf(wdt, "%d\n", enable);
00640 }
00641 
00642 void MdsProcessController::createState(const char* tree, const char *field)
00643 {
00644         char shm_name[256];
00645         sprintf(shm_name, "/dev/shm/demux_mds-%s.%s", tree, field);
00646 
00647         fd_state = open(shm_name, O_RDWR|O_CREAT|O_TRUNC, S_IRWXU);
00648         if (fd_state == -1){
00649                 err("failed to create shm \"%s\"", shm_name);
00650                 state = new ProcessState;
00651         }else{
00652                 ProcessState p;
00653                 write(fd_state, &p, sizeof(p));
00654 
00655                 state = (ProcessState*)mmap(0, sizeof(p), PROT_READ|PROT_WRITE, MAP_SHARED,
00656                                 fd_state, 0);
00657 
00658                 if (state == MAP_FAILED){
00659                         perror("mmap failed");
00660                         exit(errno);
00661                 }
00662         }
00663 }
00664 void MdsProcessController::run(poptContext& opt_context)
00665 {
00666         const char* tree = poptGetArg(opt_context);
00667         const char* field = poptGetArg(opt_context);
00668 
00669         if (tree == 0 || field == 0){
00670                 cerr << "ERROR: usage acq_demux-mds [opts] TREE FIELD [files]" <<endl;
00671                 exit(1);
00672         }
00673         cout << "acq_demux-mds TREE:" << tree << " FIELD:" << field << endl;
00674 
00675         addCal(tree);
00676         shot_time = new Timer;
00677         last_time = shot_time->timeFromStart();
00678 
00679         createState(tree, field);
00680         if (Args::config_fp != 0){
00681                 char *fname = new char [4096];
00682                 int rawblock = 0;
00683 
00684                 for ( ; getDef(fname, 4096); ){
00685                         if (fork() == 0){
00686                                 if (Args::log_fp != 0){
00687                                         dup2(fileno(Args::log_fp), 1);
00688                                         dup2(fileno(Args::log_fp), 2);
00689                                 }
00690                                 processRawFiles(fname);
00691                                 mdsWrapper->mdsPut(tree, field);
00692                                 if (remove_when_done){
00693                                         unlink(fname);
00694                                 }
00695                                 exit(0);
00696                         }else{
00697                                 int status;
00698                                 Timer timer;
00699 
00700                                 wait(&status);
00701 
00702                                 bool child_exit_error = true;
00703 
00704                                 if (WIFEXITED(status)){
00705                                         char *last_file = rindex(fname, ',');
00706                                         if (last_file){
00707                                                 last_file += 1;
00708                                         }else{
00709                                                 last_file = "";
00710                                         }
00711                                         int child_exit_code = WEXITSTATUS(status);
00712                                         printf("child exit %d time: %.2f s file %s\n",
00713                                             child_exit_code, timer.timeFromStart(), last_file);
00714                                         onSegmentComplete(rawblock++);
00715                                         if (child_exit_code == 0){
00716                                                 child_exit_error = false;
00717                                         }
00718 
00719                                 }else{
00720                                         err("CHILD PROCESS FAILED %d", status);
00721                                 }
00722 
00723                                 if (child_exit_error){
00724                                         if (++consecutive_child_errors > CHILD_ERROR_MAX){
00725                                                 err("too many child errors, dropping out");
00726                                                 exit(1);
00727                                         }else{
00728                                                 err("child exit error %d", consecutive_child_errors);
00729                                         }
00730                                 }else{
00731                                                 consecutive_child_errors = 0;
00732                                 }
00733                         }
00734                 }
00735         }else{
00736                 const char* fname;
00737 
00738                 while((fname = poptGetArg(opt_context)) != 0){
00739                         processRaw(fname);
00740                 }
00741                 mdsWrapper->mdsPut(tree, field);
00742         }
00743 
00744         dbg(1, "99");
00745 }
00746 
00747 
00748 class MdsContinuousProcessController : public MdsProcessController {
00749 
00750 protected:
00751         virtual int dump() {
00752                 return 0;
00753         }
00754 
00755         const int files_per_segment;
00756         const int burn_after_reading;
00757         int ibuf;
00758         virtual bool getDef(char *fname, int maxdef);
00759 
00760 public:
00761         MdsContinuousProcessController(AcqDataModel& _dataModel);
00762         virtual ~MdsContinuousProcessController();
00763 
00764         virtual void run(poptContext& opt_context);
00765 
00766         virtual void processRawFiles(const char* tbdef);
00767 };
00768 
00769 
00770 MdsContinuousProcessController::MdsContinuousProcessController(AcqDataModel& _dataModel) :
00771                 MdsProcessController(_dataModel, new ContinuousMdsWrapper(_dataModel)),
00772                 files_per_segment(getenvInt("MDS_FILES_PER_SEGMENT", 6)),
00773                 burn_after_reading(getenvInt("MDS_BURN_AFTER_READING", 0)),
00774                 ibuf(0)
00775 {
00776         remove_when_done = false;
00777         src_fmt = "%s";
00778 }
00779 
00780 MdsContinuousProcessController::~MdsContinuousProcessController()
00781 {
00782 
00783 }
00784 
00785 #define FLEN                    0x100000        /* files (membufs) are this size */
00786 #define PAGE_SIZE       4096
00787 
00788 void MdsContinuousProcessController::processRawFiles(const char* tbdef)
00789 {
00790         if (strstr(tbdef, ",") == 0){
00791                 processRaw(tbdef);
00792         }else{
00793                 void *pbuf;
00794                 char* pdata;
00795 
00796                 int rc;
00797                 int totlen = files_per_segment*FLEN;
00798 
00799                 rc = posix_memalign(&pbuf, PAGE_SIZE, totlen);
00800                 if (rc == 0){
00801                         pdata = (char*)pbuf;
00802                         dbg(2, "buf:%p", pdata);
00803                 }else{
00804                         perror("posix_memalign failed");
00805                         exit(rc);
00806                 }
00807 
00808                 dbg(1, "files:\"%s\"", tbdef);
00809 
00810                 char* nbuf = new char[strlen(tbdef)];
00811                 char *str;
00812                 char *tp;
00813                 char *tbid;
00814                 int ii = 0;
00815                 int* fd = new int[files_per_segment];
00816                 char** map = new char*[files_per_segment];
00817                 char** names = new char*[files_per_segment];
00818 
00819                 strcpy(nbuf, tbdef);
00820 
00821                 for(str = nbuf; (tbid = strtok_r(str, ",", &tp)); str = NULL, ++ii){
00822                         assert(ii < files_per_segment);
00823 
00824                         names[ii] = tbid;
00825                         dbg(2, "tbid:\"%s\"", tbid);
00826 
00827                         fd[ii] = open(names[ii], O_RDONLY);
00828                         if (fd[ii] < 0){
00829                                 perror(names[ii]);
00830                                 exit(errno);
00831                         }
00832                         map[ii] = (char*)mmap(pdata+ii*FLEN, FLEN,
00833                                         PROT_READ, MAP_PRIVATE|MAP_FIXED,
00834                                         fd[ii], 0);
00835                 }
00836 
00837                 dataModel.setDataSourceName(names[0]);
00838                 processAction(pdata, totlen, 0);
00839 
00840                 for ( ; --ii >= 0; ){
00841                         munmap(map[ii], FLEN);
00842                         close(fd[ii]);
00843                         if (burn_after_reading){
00844                                 dbg(2, "unlink(%s)", names[ii]);
00845                                 unlink(names[ii]);
00846                                 char id[256];
00847                                 sprintf(id, "%s.id", names[ii]);
00848                                 unlink(id);
00849                         }
00850                 }
00851 
00852                 delete [] names;
00853                 delete [] fd;
00854                 delete [] map;
00855                 delete [] nbuf;
00856         }
00857 }
00858 bool MdsContinuousProcessController::getDef(char *fname, int maxdef) {
00859         /* collect "files_per_segment" names before returning
00860          * no validation is attempted ...
00861          */
00862         if (!first_time){
00863                 set_alarm(1);
00864         }
00865         first_time = false;
00866         fname[0] = '\0';
00867         while (true){
00868                 int cursor = strlen(fname);
00869                 if (fgets(fname+cursor, maxdef-cursor, Args::config_fp) == 0){
00870                         return false;
00871                 }
00872 
00873                 set_alarm(0);
00874                 if (fname[cursor] == '#'){
00875                         fname[cursor] = '\0';
00876                         continue;
00877                 }else{
00878                         chomp(fname);
00879 
00880                         if (!isReadableFile(fname+cursor)){
00881                                 err("input it not a file");
00882                                 exit(1);
00883                         }
00884                         if (ibuf == 0){
00885                                 FILE *fp = fopen(fname, "r");
00886                                 char stuff[4096];
00887                                 int nb = fread(stuff, 1, 4096, fp);
00888                                 if (nb <= 0){
00889                                         err("bad return from fread()");
00890                                 }
00891                                 stuff[nb] = '\0';
00892                                 char* key = strstr(stuff, "NSAMPLES=");
00893                                 long long nsamples1;
00894                                 if (key != 0 && sscanf(key, "NSAMPLES=%lld", &nsamples1) == 1){
00895                                         dbg(1, "nsamples:%lld", nsamples1);
00896                                         mdsWrapper->setN1(nsamples1);
00897                                 }else{
00898                                         err("failed to get nsamples");
00899                                 }
00900                                 fclose(fp);
00901                         }
00902                         char *id;
00903                         if ((id = strstr(fname, ".id")) != 0){
00904                                 *id = '\0';
00905                         }
00906                         if (++ibuf == files_per_segment){
00907                                 ibuf = 0;
00908                                 dbg(2, "%s", fname);
00909                                 return true;
00910                         }else{
00911                                 strcat(fname, ",");
00912                         }
00913                 }
00914         }
00915 }
00916 void MdsContinuousProcessController::run(poptContext& opt_context)
00917 {
00918         MdsProcessController::run(opt_context);
00919 }
00920 class MdsProcessControllerCreator : public ProcessControllerCreator {
00921 public:
00922         MdsProcessControllerCreator() {
00923         }
00924         virtual ProcessController* create(AcqDataModel& _dataModel) const {
00925                 return new MdsProcessController( _dataModel);
00926         }
00927 };
00928 
00929 class MdsContinuousControllerCreator : public ProcessControllerCreator {
00930 public:
00931         MdsContinuousControllerCreator() {
00932         }
00933 
00934         virtual ProcessController* create(AcqDataModel& _dataModel) const {
00935                 return new MdsContinuousProcessController( _dataModel);
00936         }
00937 };
00938 
00939 class MdsInitializesDefaults {
00940 public:
00941         MdsInitializesDefaults() {
00942                 MdsProcessControllerCreator* creator = new MdsProcessControllerCreator();
00943                 cerr << "MdsInitializesDefaults() B1050" << endl;
00944                 ProcessControllerRegistry::instance().registerController(
00945                                 "acq_demux-mds", creator);
00946 
00947                 MdsContinuousControllerCreator* c2 = new MdsContinuousControllerCreator();
00948                 ProcessControllerRegistry::instance().registerController(
00949                                                 "acq_demux-mds-continuous", c2);
00950 
00951         }
00952 };
00953 
00954 
00955 
00956 
00957 MdsInitializesDefaults ID1;