ACQ2XX_API
Dt100Transport.cpp
Go to the documentation of this file.
00001 /* ------------------------------------------------------------------------- */
00002 /* file Dt100Transport.cpp                                                   */
00003 /* ------------------------------------------------------------------------- */
00004 /*   Copyright (C) 2008 Peter Milne, D-TACQ Solutions Ltd
00005  *                      <Peter dot Milne at D hyphen TACQ dot com>
00006 
00007     This program is free software; you can redistribute it and/or modify
00008     it under the terms of Version 2 of the GNU General Public License
00009     as published by the Free Software Foundation;
00010 
00011     This program is distributed in the hope that it will be useful,
00012     but WITHOUT ANY WARRANTY; without even the implied warranty of
00013     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014     GNU General Public License for more details.
00015 
00016     You should have received a copy of the GNU General Public License
00017     along with this program; if not, write to the Free Software
00018     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.                */
00019 /* ------------------------------------------------------------------------- */
00020 
00021 /** @file Dt100Transport.cpp defines the dt100 Transport.
00022  *  ACQ2xx cards offer the "dt100 service" at port 0xd100
00023  *  This Transport controls the card using the dt100 service
00024  */
00025 #include "local.h"
00026 #include <stdio.h>
00027 #include <stdlib.h>
00028 #include <string.h>
00029 #include "acq_transport.h"
00030 #include "Dt100Transport.h"
00031 #include "Frame.h"
00032 
00033 #include <sys/socket.h>
00034 #include <sys/un.h>
00035 #include <unistd.h>
00036 #include <unistd.h>
00037 #include <stdarg.h>
00038 #include <string.h>
00039 #include <errno.h>
00040 #include <netdb.h>
00041 #include <fcntl.h>
00042 #include <sys/time.h>
00043 #include <sys/socket.h>
00044 #include <netinet/in.h>
00045 #include <arpa/inet.h>
00046 
00047 #include <sys/uio.h>
00048 
00049 #include "iclient3.h"
00050 
00051 #define MASTER_TO 30
00052 #define SLAVE_TO  10
00053 #define PUT_TO    10
00054 
00055 int verbose = 0;
00056 
00057 static int sample_size = sizeof(short);
00058 
00059 class Dt100Transport: public Transport {
00060         struct State {
00061                 SOCKET cmd_sock;
00062                 SOCKET stat_sock;
00063                 SOCKET stream_sock;
00064 
00065                 const char* remotehost;
00066                 const char* remoteport;
00067                 char* my_id;
00068                 char *stream_buf;
00069                 int stream_buf_len;
00070         } s;
00071 
00072         void init(void) {
00073                 memset(&s, 0, sizeof(s));
00074         }
00075 
00076 /** Transport implementation connects to dt100d service on card. */
00077 public:
00078         Dt100Transport(const char* id);
00079         virtual ~Dt100Transport();
00080 
00081         virtual STATUS acqcmd(
00082                 const char* command, char *response, int maxresponse);
00083         /**< send an "acqcmd" (acquisition command or query) to the card.
00084          * @param command - the command (or query) to send
00085          * @param response - user buffer to collect response.
00086          * @param maxresponse - maximum response size required.
00087          */
00088         
00089         virtual STATUS acq2sh(
00090                 const char* command, char *response, int maxresponse);
00091         /**< run a remote shell command or query. 
00092          * @param command - the command (or query) to send
00093          * @param response - user buffer to collect response.
00094          * @param maxresponse - maximum response size required.
00095          */
00096         
00097         virtual STATUS waitStateChange(
00098                 int timeout, char* response, int maxresponse);
00099         /**< block until remote state changes or timeout. */
00100 
00101         virtual STATUS readChannel(
00102                 int channel, short* data,
00103                 int nsamples, int start = 0, int stride = 1);
00104         /**< read and output raw data for channel
00105          * @param channel - channel number 1..N
00106          * @param data - caller's buffer
00107          * @param nsamples - max samples to read
00108          * @param start - start sample in data set
00109          * @param stride - stride [subsample] value
00110          * @returns actual samples returned or STATUS_ERR
00111          */     
00112 
00113         /** streaming interface: not all transports can do this. */
00114         virtual STATUS startStreaming(void);
00115         virtual STATUS readStreamingFrame(Frame* frame, unsigned id);
00116         virtual STATUS stopStreaming(void);
00117 };
00118 
00119 #define VFPRINTF(fmt...) if (verbose) fprintf(stderr, ## fmt)
00120 #define VFPRINTF2(fmt...) if (verbose > 1) fprintf(stderr, ## fmt)
00121 static unsigned S_BUFLEN = (4096*16);
00122 static char *S_buf[2];
00123 
00124 
00125 
00126 static char signon_command[80];
00127 static const char *prefix;
00128 static struct timeval timeout;
00129 struct timeval zero_time = {};
00130 
00131 
00132 static void signon(
00133         int sock, const char *remotedev, const char* mode)
00134 {
00135         char buf[80];
00136         int wait_prompt = 1;
00137 #define SIGNON_FMT "dt100 open %s %s\n" 
00138 
00139         readline(sock, buf, sizeof(buf));
00140         dbg(2,"signon:%s\n", buf);
00141 
00142         if (strcmp(mode, "master") == 0){
00143                 sprintf(signon_command, SIGNON_FMT, mode, remotedev);
00144                 prefix = "acqcmd ";
00145                 timeout.tv_sec = MASTER_TO;
00146         }else if (strncmp(mode, "data", 4) == 0){
00147                 sprintf(signon_command, SIGNON_FMT, mode, remotedev);
00148                 prefix = "dt100 ";
00149                 timeout.tv_sec = SLAVE_TO;
00150         }else if (strcmp(mode, "stream") == 0){
00151                 sprintf(signon_command, SIGNON_FMT, "data", remotedev);
00152                 prefix = "dt100 ";
00153                 timeout.tv_sec = SLAVE_TO;              
00154         }else if (strcmp(mode, "shell") == 0){
00155                 sprintf(signon_command, SIGNON_FMT, "shell", remotedev);
00156                 prefix = "";
00157                 timeout.tv_sec = MASTER_TO;
00158         }else{ 
00159                 /* mode: get, put */
00160                 sprintf(signon_command, "%s %s\n", mode, remotedev);
00161                 prefix = "";
00162                 timeout.tv_sec = PUT_TO;
00163                 wait_prompt = 0;
00164         }
00165         dbg(2, "signon:command:%s", signon_command);
00166         write(sock, signon_command, strlen(signon_command));
00167         if (wait_prompt){
00168                 readline(sock, buf, sizeof(buf));
00169                 dbg(2, "signon:response from %d:%s\n", sock, buf);      
00170         }
00171 }
00172 
00173 static SOCKET get_sock(void)
00174 {
00175         SOCKET sock;
00176 
00177         if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0){
00178                 die(errno, "socket");
00179         }
00180 
00181         if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF,
00182                        (char*)&S_BUFLEN, sizeof(S_BUFLEN)) ){
00183                 die(errno, "setsockopt() SO_RCVBUF failed\n");
00184         }
00185         if (setsockopt(sock, SOL_SOCKET, SO_SNDBUF,
00186                        (char*)&S_BUFLEN, sizeof(S_BUFLEN)) ){
00187                 die(errno, "setsockopt() SO_SNDBUF failed\n");
00188         }
00189         return sock;
00190 }
00191 
00192 
00193 static void connect_to(
00194         SOCKET sock, const char *remotehost, const char *remoteport)
00195 {
00196         struct sockaddr_in peer;
00197 
00198         set_address(remotehost, remoteport, &peer, "tcp" );
00199 
00200         if (connect(sock, (struct sockaddr *)&peer, sizeof(peer))){
00201                 die(errno, "connect failed SOCK" );
00202         }
00203 }
00204 
00205 
00206 
00207 Dt100Transport::Dt100Transport(const char* id): 
00208         Transport(id)
00209 {
00210         init();
00211         if (getenv("DT100_CONNECT_VERBOSE")){
00212                 verbose = atoi(getenv("DT100_CONNECT_VERBOSE"));
00213                 info("Dt100Transport host %s", id);
00214         }
00215         S_buf[0] = (char*)malloc(S_BUFLEN);
00216         S_buf[1] = (char*)malloc(S_BUFLEN);     
00217 
00218         s.my_id = new char[strlen(id)+1];
00219         strcpy(s.my_id, id);
00220         if (index(s.my_id, ':')){
00221                 char *sep = index(s.my_id, ':');
00222                 *sep++ = '\0';
00223                 s.remotehost = s.my_id;
00224                 s.remoteport = sep;
00225         }else{
00226                 s.remotehost = s.my_id;
00227                 s.remoteport = "0xd100";
00228         }
00229 }
00230 
00231 Dt100Transport::~Dt100Transport() 
00232 {
00233         if (s.cmd_sock) close(s.cmd_sock);
00234         delete [] s.my_id;
00235 }
00236 #define ACQCMD "acqcmd "
00237 
00238 STATUS Dt100Transport::acqcmd(
00239         const char* command, char *response, int maxresponse)
00240 /**< send an "acqcmd" (acquisition command or query) to the card.
00241  * @param command - the command (or query) to send
00242  * @param response - user buffer to collect response.
00243  * @param maxresponse - maximum response size required.
00244  */
00245 {
00246         char* my_command = new char[strlen(command) + strlen(ACQCMD) + 1];
00247         strcpy(my_command, ACQCMD);
00248         strcat(my_command, command);
00249         STATUS rc = acq2sh(my_command, response, maxresponse);
00250         delete [] my_command;
00251         return rc;
00252 }
00253 
00254 
00255 #define DUMMYCMD "hostname\n"
00256 
00257 STATUS Dt100Transport::acq2sh(
00258         const char* command, char *response, int maxresponse)
00259 /**< run a remote shell command or query. 
00260  * @param command - the command (or query) to send
00261  * @param response - user buffer to collect response.
00262  * @param maxresponse - maximum response size required.
00263  */
00264 {
00265         if (!s.cmd_sock){
00266                 s.cmd_sock = get_sock();
00267                 connect_to(s.cmd_sock, s.remotehost, s.remoteport);
00268                 signon(s.cmd_sock, "1", "shell");
00269         }
00270 
00271         int buflen = MAX(maxresponse+20, 1024);
00272         char* rxbuf = new char[buflen];
00273         char *pend;
00274         int ibuf = 0;
00275         int rc = -1;
00276 
00277 
00278 
00279         dbg(1, "->%s", command);
00280         rc = write(s.cmd_sock, command, strlen(command));
00281         if (command[strlen(command)-1] != '\n'){
00282                 write(s.cmd_sock, "\n", 1);
00283         }
00284 
00285         if (rc <= 0){
00286                 die(errno, "write() failed");
00287         }
00288 
00289         while(ibuf < buflen-1 && 
00290                 (rc = read(s.cmd_sock, rxbuf+ibuf, buflen-ibuf)) > 0){
00291                 ibuf += rc;
00292                 rxbuf[ibuf] = '\0';
00293 
00294                 dbg(1, "<-%s", rxbuf);
00295 
00296                 if ((pend = strstr(rxbuf, "EOF")) != 0){
00297                         rc = pend - rxbuf;
00298 
00299                         int cnum, ecode = 0;
00300                         if (sscanf(pend, "EOF %d %d", &cnum, &ecode) == 2){
00301                                 if (ecode > 0){
00302                                         rc = -ecode;
00303                                 }
00304                         }else{
00305                                 err("non conformant EOF \"%s\"", pend);
00306                         }
00307                         *pend = '\0';
00308                         strcpy(response, rxbuf);
00309                         break;
00310                 }
00311         }
00312 
00313 
00314         delete []rxbuf;
00315 
00316         return rc;
00317 }
00318 
00319 #define STATCMD "cat /dev/acq200/tblocks/acqstate\n"
00320 
00321 STATUS Dt100Transport::waitStateChange(
00322         int timeout, char* response, int maxresponse)
00323 /**< block until remote state changes or timeout. 
00324  *  @@todo timeout not implemented
00325  */
00326 {
00327         if (!s.stat_sock){
00328                 s.stat_sock = get_sock();
00329                 connect_to(s.stat_sock, s.remotehost, s.remoteport);
00330                 signon(s.stat_sock, "1", "shell");
00331                 write(s.stat_sock, STATCMD, strlen(STATCMD));
00332         }
00333         int rc = readline(s.stat_sock, response, maxresponse);
00334         return rc;
00335 }
00336 
00337 #define MAXREAD 0x10000
00338 
00339 int readb(int sock, char* buf, int nbytes)
00340 {
00341         int total;
00342         int nread;
00343 
00344         for (total = 0; total < nbytes; total += nread){
00345                 dbg(2, "read(%d)", nbytes);
00346                 nread = read(sock, buf+total, nbytes-total);
00347                 dbg(2, "read(%d) returned %d", nbytes-total, nread);
00348                 if (nread < 0){
00349                         return nread;
00350                 } 
00351         }
00352 
00353         return total;
00354 }
00355 STATUS Dt100Transport::readChannel(
00356         int channel, short* data,
00357         int nsamples, int start, int stride)
00358 /**< read and output raw data for channel
00359  * @param channel - channel number 1..N
00360  * @param data - caller's buffer
00361  * @param nsamples - max samples to read
00362  * @param start - start sample in data set
00363  * @param stride - stride [subsample] value
00364  * @returns actual samples returned or STATUS_ERR
00365  */
00366 {
00367         char channel_dev[80];
00368         int remain;
00369         int nsam;
00370         int nbytes;
00371         int rc;
00372 
00373         sprintf(channel_dev, "/dev/acq32/acq32.1.%02d", channel);
00374         SOCKET dsock = get_sock();
00375         connect_to(dsock, s.remotehost, s.remoteport);
00376         signon(dsock, channel_dev, "data1");
00377 
00378         dbg(1, "channel: %d dev: %s signon OK", channel, channel_dev);
00379 
00380         for (remain = nsamples; remain > 0 ; remain -= nsam, start += nsam ){
00381                 nsam = MIN(remain, MAXREAD);
00382                 char command[80];
00383                 char reply[80];
00384 
00385                 sprintf(command, "dt100 read %d %d %d\n",
00386                         start, start+nsam, stride);
00387 
00388                 dbg(1, "asking:%s", command);
00389 
00390                 write(dsock, command, strlen(command));
00391                 rc = readline(dsock, reply, sizeof(reply));
00392 
00393                 dbg(1, "dt100:\"%s\"", reply);
00394                 if (rc < 0 ){
00395                         return rc;
00396                 }else if (sscanf(reply, "DT100:%d bytes", &nbytes) == 0){
00397                         err("reply doesn't scan \"%s\"", reply);
00398                         return -10;
00399                 }else{
00400                         dbg(1, "DT100 says read %d bytes", nbytes);
00401                         if (nbytes == 0){
00402                                 break;
00403                         }else{
00404 /* @todo - size hardcoded as 2 bytes - should be wordsize .. short* data? */
00405                                 nsam = readb(dsock, (char*)data, nbytes)/
00406                                                                 sample_size;
00407                         }               
00408                 }
00409         }
00410         dbg(1, "return %d", nsamples-remain);
00411 
00412         return nsamples-remain;
00413 }
00414 
00415 
00416 /** streaming interface: not all transports can do this. */
00417 STATUS Dt100Transport::startStreaming(void)
00418 {
00419         return STATUS_WORKTODO;
00420 }
00421 
00422 STATUS Dt100Transport::readStreamingFrame(Frame* frame, unsigned id)
00423 {
00424         if (!s.stream_sock){
00425                 if (s.stream_buf_len < frame->frameSize()){
00426                         if (s.stream_buf){
00427                                 delete [] s.stream_buf;
00428                         }
00429                         s.stream_buf = new char[frame->frameSize()];
00430                         s.stream_buf_len = frame->frameSize();
00431                 }
00432 
00433                 s.stream_sock = get_sock();
00434                 connect_to(s.stream_sock, s.remotehost, s.remoteport);
00435                 signon(s.stream_sock, "/dev/acq32/acq32.1.01", "stream");
00436 
00437                 char command[80];
00438                 sprintf(command, "%s stream 1 0 %d\n", 
00439                                 prefix,
00440                                 frame->sampleSize()/sizeof(unsigned));
00441                 write(s.stream_sock, command, strlen(command));
00442         }
00443 
00444 /* it's possible that we are readin misaligned from frame start.
00445  * we can put in code to adjust, but first up, we just try read a whole
00446  * frame.
00447  */
00448         int nread = 0;
00449         char *bp = s.stream_buf;
00450         while (nread < frame->frameSize()){
00451                 int rc = read(s.stream_sock, bp, 
00452                                 frame->frameSize() - nread);
00453                 if (rc < 0){
00454                         die(errno, "read() failed");
00455                 }
00456                 nread += rc;
00457                 bp += rc;
00458         }
00459 
00460         return Frame::buildFrame(id, frame, s.stream_buf, nread);
00461 }
00462 
00463 STATUS Dt100Transport::stopStreaming(void)
00464 {
00465         int rc = close(s.stream_sock);
00466         s.stream_sock = 0;
00467         
00468         return rc;
00469 }
00470 
00471 
00472 Transport* Dt100TransportFactory::createTransport(const char* id)
00473 {
00474         if (getenv("DT100_SAMPLE_SIZE")){
00475                 sample_size = atoi(getenv("DT100_SAMPLE_SIZE"));
00476         }
00477         return new Dt100Transport(id);
00478 }