Changeset 140641


Ignore:
Timestamp:
Sep 28, 2015, 9:22:19 PM (5 years ago)
Author:
cal@…
Message:

base: tracelib: Refactor, enable process tracking

Refactor both the synchronization and error handling in tracelib to
clean up some of the copy-n-paste cruft that has accumulated over time.
Additionally, add the code that fills and depletes the list of connected
sockets and their processes at the remote ends to implement cleaning
them up after the build finishes.

This simplifies how the synchronization between "tracelib run" and
"tracelib closesocket" works. "tracelib run" now uses a simple event
loop idiom that receives and process asynchronous events and terminates
at the request of "tracelib closesocket" using a well-defined channel (a
selfpipe, since it's the only portable method to do this without kqueue
user events, which are not available on older OS X versions).

Location:
trunk/base/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/base/src/pextlib1.0/tracelib.c

    r140639 r140641  
    113113static int enable_fence = 0;
    114114static Tcl_Interp *interp;
    115 static pthread_mutex_t sock_mutex = PTHREAD_MUTEX_INITIALIZER;
    116 static int cleanuping = 0;
     115
     116/**
     117 * Mutex that shall be acquired to exclusively lock checking and acting upon
     118 * the value of kq, indicating whether the event loop has started. If it has
     119 * started, shutdown of the event loop shall occur by writing to the write end
     120 * of the selfpipe (which is non-blocking), which will in turn trigger the
     121 * event loop termination and a signal on the evloop_signal condition variable
     122 * when the loop has been terminated and it is safe to free the resources that
     123 * were used by the loop.
     124 *
     125 * If kq is -1, the event loop has not been started and resources can
     126 * immediately be free(3)d (under the lock to avoid concurrent set up of the
     127 * event loop in a different thread).
     128 */
     129static pthread_mutex_t evloop_mutex = PTHREAD_MUTEX_INITIALIZER;
     130
     131/**
     132 * Condition variable that shall be used to signal the end of the event loop
     133 * after a termination signal has been sent to it via the write end of
     134 * selfpipe. The associated mutex is evloop_mtx.
     135 */
     136static pthread_cond_t evloop_signal = PTHREAD_COND_INITIALIZER;
    117137
    118138static void send_file_map(int sock);
     
    276296static void answer(int sock, const char *buf) {
    277297    answer_s(sock, buf, (uint32_t) strlen(buf));
     298}
     299
     300/**
     301 * Closes the two sockets given in \a p and sets their values to -1.
     302 */
     303static void pipe_cleanup(int p[2]) {
     304    for (size_t i = 0; i < 2; ++i) {
     305        if (p[i] != -1) {
     306            close(p[i]);
     307            p[i] = -1;
     308        }
     309    }
     310}
     311
     312/**
     313 * Helper function to simplify error handling. Converts the error indicated by
     314 * \a msg, appended with a string representation of the UNIX error \a errno
     315 * into a Tcl error by setting up the result of the Tcl interpreter \a interp
     316 * accordingly.
     317 *
     318 * Returns TCL_ERROR to be used as the return value of the caller.
     319 */
     320static int error2tcl(const char *msg, int err, Tcl_Interp *interp) {
     321    Tcl_SetErrno(err);
     322    Tcl_ResetResult(interp);
     323    if (err != 0) {
     324        Tcl_AppendResult(interp, msg, (char *) Tcl_PosixError(interp), NULL);
     325    } else {
     326        Tcl_AppendResult(interp, msg, NULL);
     327    }
     328
     329    return TCL_ERROR;
    278330}
    279331
     
    600652    struct rlimit rl;
    601653
    602     cleanuping = 0;
    603 
    604     pthread_mutex_lock(&sock_mutex);
    605654    if (-1 == (sock = socket(PF_LOCAL, SOCK_STREAM, 0))) {
    606         Tcl_SetErrno(errno);
    607         Tcl_ResetResult(interp);
    608         Tcl_AppendResult(interp, "socket: ", (char *) Tcl_PosixError(interp), NULL);
    609         pthread_mutex_unlock(&sock_mutex);
    610         return TCL_ERROR;
    611     }
    612     pthread_mutex_unlock(&sock_mutex);
    613 
    614     interp = in;
     655        return error2tcl("socket: ", errno, in);
     656    }
    615657
    616658    /* raise the limit of open files to the maximum from the default soft limit
     
    634676
    635677    if (-1 == (bind(sock, (struct sockaddr *) &sun, sizeof(sun)))) {
    636         Tcl_SetErrno(errno);
    637         Tcl_ResetResult(interp);
    638         Tcl_AppendResult(interp, "bind: ", (char *) Tcl_PosixError(interp), NULL);
     678        int err = errno;
    639679        close(sock);
    640680        sock = -1;
    641         return TCL_ERROR;
     681        return error2tcl("bind: ", err, in);
    642682    }
    643683
    644684    if (-1 == listen(sock, SOMAXCONN)) {
    645         Tcl_SetErrno(errno);
    646         Tcl_ResetResult(interp);
    647         Tcl_AppendResult(interp, "listen: ", (char *) Tcl_PosixError(interp), NULL);
     685        int err = errno;
    648686        close(sock);
    649687        sock = -1;
    650         return TCL_ERROR;
    651     }
     688        return error2tcl("bind: ", err, in);
     689    }
     690
     691    // keep a reference to the interpreter that opened the socket
     692    interp = in;
    652693
    653694    return TCL_OK;
     
    685726static int TracelibRunCmd(Tcl_Interp *in) {
    686727    struct kevent kev;
     728    int retval = TCL_ERROR;
    687729    int flags;
    688     int oldsock;
    689730    int opensockcount = 0;
    690 
    691     pthread_mutex_lock(&sock_mutex);
     731    bool break_eventloop = false;
     732
     733    pthread_mutex_lock(&evloop_mutex);
     734    /* bring all variables into a defined state so the cleanup code can be
     735     * called from anywhere */
     736    selfpipe[0] = -1;
     737    selfpipe[1] = -1;
     738    kq = -1;
     739
    692740    if (-1 == (kq = kqueue())) {
    693         Tcl_SetErrno(errno);
    694         Tcl_ResetResult(in);
    695         Tcl_AppendResult(in, "kqueue: ", (char *) Tcl_PosixError(in), NULL);
    696         return TCL_ERROR;
     741        error2tcl("kqueue: ", errno, in);
     742        goto error_locked;
    697743    }
    698744
    699745    if (sock != -1) {
    700         oldsock = sock;
    701 
    702746        /* mark listen socket non-blocking in order to prevent a race condition
    703747         * that would occur between kevent(2) and accept(2), if a incoming
    704748         * connection is aborted before it is accepted. Using a non-blocking
    705749         * accept(2) prevents the problem.*/
    706         flags = fcntl(oldsock, F_GETFL, 0);
    707         if (-1 == fcntl(oldsock, F_SETFL, flags | O_NONBLOCK)) {
    708             Tcl_SetErrno(errno);
    709             Tcl_ResetResult(in);
    710             Tcl_AppendResult(in, "fcntl(F_SETFL, += O_NONBLOCK): ", (char *) Tcl_PosixError(in), NULL);
    711             pthread_mutex_unlock(&sock_mutex);
    712             return TCL_ERROR;
     750        flags = fcntl(sock, F_GETFL, 0);
     751        if (-1 == fcntl(sock, F_SETFL, flags | O_NONBLOCK)) {
     752            error2tcl("fcntl(F_SETFL, += O_NONBLOCK): ", errno, in);
     753            goto error_locked;
    713754        }
    714755
    715756        /* register the listen socket in the kqueue */
    716         EV_SET(&kev, oldsock, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, NULL);
     757        EV_SET(&kev, sock, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, NULL);
    717758        if (1 != kevent(kq, &kev, 1, &kev, 1, NULL)) {
    718             Tcl_SetErrno(errno);
    719             Tcl_ResetResult(in);
    720             Tcl_AppendResult(in, "kevent (listen socket): ", (char *) Tcl_PosixError(in), NULL);
    721             close(kq);
    722             pthread_mutex_unlock(&sock_mutex);
    723             return TCL_ERROR;
     759            error2tcl("kevent (listen socket): ", errno, in);
     760            goto error_locked;
    724761        }
    725762        /* kevent(2) on EV_RECEIPT: When passed as input, it forces EV_ERROR to
     
    727764         * will be zero. */
    728765        if ((kev.flags & EV_ERROR) == 0 || ((kev.flags & EV_ERROR) > 0 && kev.data != 0)) {
    729             Tcl_SetErrno(kev.data);
    730             Tcl_ResetResult(in);
    731             Tcl_AppendResult(in, "kevent (listen socket receipt): ", (char *) Tcl_PosixError(in), NULL);
    732             close(kq);
    733             pthread_mutex_unlock(&sock_mutex);
    734             return TCL_ERROR;
     766            error2tcl("kevent (listen socket receipt): ", kev.data, in);
     767            goto error_locked;
    735768        }
    736769
     
    739772         * tracelib closesocket is called. */
    740773        if (-1 == pipe(selfpipe)) {
    741             Tcl_SetErrno(errno);
    742             Tcl_ResetResult(in);
    743             Tcl_AppendResult(in, "pipe: ", (char *) Tcl_PosixError(in), NULL);
    744             pthread_mutex_unlock(&sock_mutex);
    745             return TCL_ERROR;
     774            error2tcl("pipe: ", errno, in);
     775            goto error_locked;
    746776        }
    747777
     
    749779        flags = fcntl(selfpipe[1], F_GETFL, 0);
    750780        if (-1 == fcntl(selfpipe[1], F_SETFL, flags | O_NONBLOCK)) {
    751             Tcl_SetErrno(errno);
    752             Tcl_ResetResult(in);
    753             Tcl_AppendResult(in, "fcntl(F_SETFL, += O_NONBLOCK): ", (char *) Tcl_PosixError(in), NULL);
    754             pthread_mutex_unlock(&sock_mutex);
    755             return TCL_ERROR;
     781            error2tcl("fcntl(F_SETFL, += O_NONBLOCK): ", errno, in);
     782            goto error_locked;
    756783        }
    757784
     
    760787        EV_SET(&kev, selfpipe[0], EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, NULL);
    761788        if (1 != kevent(kq, &kev, 1, &kev, 1, NULL)) {
    762             Tcl_SetErrno(errno);
    763             Tcl_ResetResult(in);
    764             Tcl_AppendResult(in, "kevent (selfpipe): ", (char *) Tcl_PosixError(in), NULL);
    765             close(kq);
    766             pthread_mutex_unlock(&sock_mutex);
    767             return TCL_ERROR;
     789            error2tcl("kevent (selfpipe): ", errno, in);
     790            goto error_locked;
    768791        }
    769792        /* kevent(2) on EV_RECEIPT: When passed as input, it forces EV_ERROR to
     
    771794         * will be zero. */
    772795        if ((kev.flags & EV_ERROR) == 0 || ((kev.flags & EV_ERROR) > 0 && kev.data != 0)) {
    773             Tcl_SetErrno(kev.data);
    774             Tcl_ResetResult(in);
    775             Tcl_AppendResult(in, "kevent (selfpipe receipt): ", (char *) Tcl_PosixError(in), NULL);
    776             close(kq);
    777             pthread_mutex_unlock(&sock_mutex);
    778             return TCL_ERROR;
    779         }
    780     }
    781     pthread_mutex_unlock(&sock_mutex);
    782 
    783     while (sock != -1 && !cleanuping) {
     796            error2tcl("kevent (selfpipe receipt): ", kev.data, in);
     797            goto error_locked;
     798        }
     799    }
     800    pthread_mutex_unlock(&evloop_mutex);
     801
     802    while (sock != -1 && !break_eventloop) {
    784803        int keventstatus;
    785         int i;
     804        bool incoming = false;
    786805
    787806        /* run kevent(2) until new activity is available */
    788807        do {
    789808            if (-1 == (keventstatus = kevent(kq, NULL, 0, res_kevents, MAX_SOCKETS, NULL))) {
    790                 Tcl_SetErrno(errno);
    791                 Tcl_ResetResult(in);
    792                 Tcl_AppendResult(in, "kevent (main loop): ", (char *) Tcl_PosixError(in), NULL);
    793                 close(kq);
    794                 return TCL_ERROR;
     809                error2tcl("kevent (main loop): ", errno, in);
     810                goto error_unlocked;
    795811            }
    796812        } while (keventstatus == 0);
    797813
    798         for (i = 0; i < keventstatus; ++i) {
     814        for (int i = 0; i < keventstatus; ++i) {
    799815            /* handle traffic on the selfpipe */
    800816            if ((int) res_kevents[i].ident == selfpipe[0]) {
    801                 pthread_mutex_lock(&sock_mutex);
    802                 close(selfpipe[0]);
    803                 close(selfpipe[1]);
    804                 selfpipe[0] = -1;
    805                 selfpipe[1] = -1;
    806                 pthread_mutex_unlock(&sock_mutex);
    807                 break;
    808             }
    809 
    810             /* the control socket has activity – we might have a new
    811              * connection. We use a copy of sock here, because sock might have
    812              * been set to -1 by the close command */
    813             if ((int) res_kevents[i].ident == oldsock) {
    814                 int s;
    815 
    816                 /* handle error conditions */
    817                 if ((res_kevents[i].flags & (EV_ERROR | EV_EOF)) > 0) {
    818                     if (cleanuping) {
    819                         break;
    820                     }
    821                     Tcl_ResetResult(in);
    822                     Tcl_SetResult(in, "control socket closed", NULL);
    823                     close(kq);
    824                     return TCL_ERROR;
    825                 }
    826 
    827                 /* else: new connection attempt(s) */
    828                 for (;;) {
    829                     if (-1 == (s = accept(sock, NULL, NULL))) {
    830                         if (cleanuping) {
    831                             break;
    832                         }
    833                         if (errno == EWOULDBLOCK) {
    834                             break;
    835                         }
    836                         Tcl_SetErrno(errno);
    837                         Tcl_ResetResult(in);
    838                         Tcl_AppendResult(in, "accept: ", (char *) Tcl_PosixError(in), NULL);
    839                         close(kq);
    840                         return TCL_ERROR;
    841                     }
    842 
    843                     flags = fcntl(s, F_GETFL, 0);
    844                     if (-1 == fcntl(s, F_SETFL, flags & ~O_NONBLOCK)) {
    845                         ui_warn(interp, "tracelib: couldn't mark socket as blocking");
    846                         close(s);
    847                         continue;
    848                     }
    849 
    850                     /* register the new socket in the kqueue */
    851                     EV_SET(&kev, s, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, NULL);
    852                     if (1 != kevent(kq, &kev, 1, &kev, 1, NULL)) {
    853                         ui_warn(interp, "tracelib: error adding socket to kqueue");
    854                         close(s);
    855                         continue;
    856                     }
    857                     /* kevent(2) on EV_RECEIPT: When passed as input, it forces EV_ERROR to
    858                      * always be returned. When a filter is successfully added, the data field
    859                      * will be zero. */
    860                     if ((kev.flags & EV_ERROR) == 0 || ((kev.flags & EV_ERROR) > 0 && kev.data != 0)) {
    861                         ui_warn(interp, "tracelib: error adding socket to kqueue");
    862                         close(s);
    863                         continue;
    864                     }
    865 
    866                     opensockcount++;
    867                 }
    868 
    869                 if (cleanuping) {
    870                     break;
    871                 }
    872             } else {
     817                /* traffic on the selfpipe means we should clean up */
     818                break_eventloop = true;
     819                /* finish processing this batch */
     820                continue;
     821            } else if ((int) res_kevents[i].ident != sock) {
    873822                /* if the socket is to be closed, or */
    874823                if ((res_kevents[i].flags & (EV_EOF | EV_ERROR)) > 0
     
    876825                     * close the socket */
    877826                    || (!process_line(res_kevents[i].ident))) {
    878                     /* an error occured or process_line suggested closing this
    879                      * socket */
    880                     close(res_kevents[i].ident);
    881                     /* closing the socket will automatically remove it from the
    882                      * kqueue :) */
    883                     opensockcount--;
    884                 }
     827                        /* an error occured or process_line suggested closing
     828                         * this socket */
     829                        close(res_kevents[i].ident);
     830                        /* closing the socket will automatically remove it from the
     831                         * kqueue :) */
     832                        opensockcount--;
     833
     834#ifdef HAVE_PEERPID_LIST
     835                        if (peerpid_list_dequeue(res_kevents[i].ident) == (pid_t) -1) {
     836                            fprintf(stderr, "tracelib: didn't find PID for closed socket %d\n", (int) res_kevents[i].ident);
     837                        }
     838#endif
     839                }
     840            } else {
     841                /* the control socket has activity – we might have a new
     842                 * connection. */
     843
     844                /* handle error conditions */
     845                if ((res_kevents[i].flags & (EV_ERROR | EV_EOF)) > 0) {
     846                    error2tcl("control socket closed", 0, in);
     847                    goto error_unlocked;
     848                }
     849
     850                /* delay processing, process data on existing sockets first */
     851                incoming = true;
    885852            }
    886853        }
    887     }
    888 
    889     /* NOTE: We aren't necessarily closing all client sockets here! */
    890 
     854
     855        if (incoming) {
     856            /* new connection attempt(s) */
     857            for (;;) {
     858                int s;
     859
     860                if (-1 == (s = accept(sock, NULL, NULL))) {
     861                    if (errno == EWOULDBLOCK) {
     862                        break;
     863                    }
     864
     865                    error2tcl("accept: ", errno, in);
     866                    goto error_unlocked;
     867                }
     868
     869                flags = fcntl(s, F_GETFL, 0);
     870                if (-1 == fcntl(s, F_SETFL, flags & ~O_NONBLOCK)) {
     871                    ui_warn(interp, "tracelib: couldn't mark socket as blocking");
     872                    close(s);
     873                    continue;
     874                }
     875
     876                /* register the new socket in the kqueue */
     877                EV_SET(&kev, s, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, NULL);
     878                if (1 != kevent(kq, &kev, 1, &kev, 1, NULL)) {
     879                    ui_warn(interp, "tracelib: error adding socket to kqueue");
     880                    close(s);
     881                    continue;
     882                }
     883                /* kevent(2) on EV_RECEIPT: When passed as input, it forces EV_ERROR to
     884                 * always be returned. When a filter is successfully added, the data field
     885                 * will be zero. */
     886                if ((kev.flags & EV_ERROR) == 0 || ((kev.flags & EV_ERROR) > 0 && kev.data != 0)) {
     887                    ui_warn(interp, "tracelib: error adding socket to kqueue (receipt)");
     888                    close(s);
     889                    continue;
     890                }
     891
     892#ifdef HAVE_PEERPID_LIST
     893                pid_t peer_pid = (pid_t) -1;
     894                socklen_t peer_pid_len = sizeof(peer_pid);
     895                if (getsockopt(s, SOL_LOCAL, LOCAL_PEERPID, &peer_pid, &peer_pid_len) == 0) {
     896                    // We found a PID for the remote side
     897                    peerpid_list_enqueue(s, peer_pid);
     898                } else {
     899                    // Error occured, process has probably already terminated
     900                    close(s);
     901                    continue;
     902                }
     903#endif
     904                opensockcount++;
     905            }
     906        }
     907    }
     908
     909    retval = TCL_OK;
     910
     911error_unlocked:
     912    pthread_mutex_lock(&evloop_mutex);
     913error_locked:
    891914    // Close remainig sockets to avoid dangling processes
    892915    if (opensockcount > 0) {
     
    899922#endif
    900923    }
    901     pthread_mutex_lock(&sock_mutex);
    902     close(kq);
    903     kq = -1;
    904     pthread_mutex_unlock(&sock_mutex);
    905 
    906     return TCL_OK;
     924
     925    // cleanup selfpipe and set it to -1
     926    pipe_cleanup(selfpipe);
     927
     928    // close kqueue(2) socket
     929    if (kq != -1) {
     930        close(kq);
     931        kq = -1;
     932    }
     933
     934    pthread_mutex_unlock(&evloop_mutex);
     935    // wake up any waiting threads in TracelibCloseSocketCmd
     936    pthread_cond_broadcast(&evloop_signal);
     937
     938    return retval;
    907939}
    908940
    909941static int TracelibCleanCmd(Tcl_Interp *interp UNUSED) {
    910942#define safe_free(x) do{free(x); x=0;}while(0);
    911     cleanuping = 1;
    912     pthread_mutex_lock(&sock_mutex);
    913943    if (sock != -1) {
    914944        close(sock);
    915945        sock = -1;
    916946    }
    917     pthread_mutex_unlock(&sock_mutex);
     947
    918948    if (name) {
    919949        unlink(name);
    920950        safe_free(name);
    921951    }
    922     if (depends) {
    923         safe_free(depends);
    924     }
     952
     953    safe_free(depends);
     954
    925955    enable_fence = 0;
     956    return TCL_OK;
     957
    926958#undef safe_free
    927     return TCL_OK;
    928959}
    929960
    930961static int TracelibCloseSocketCmd(Tcl_Interp *interp UNUSED) {
    931     cleanuping = 1;
    932     pthread_mutex_lock(&sock_mutex);
    933     if (sock != -1) {
    934         close(sock);
    935         sock = -1;
    936 
    937         if (kq != -1) {
    938             /* We know the pipes have been created because kq != -1 and we have
    939              * the lock. We don't have to check for errors, because none should
    940              * occur but when the pipe is full, which we wouldn't care about.
    941              * */
    942             write(selfpipe[1], "!", 1);
    943         }
    944     }
    945     pthread_mutex_unlock(&sock_mutex);
     962    pthread_mutex_lock(&evloop_mutex);
     963    if (kq != -1 && selfpipe[1] != -1) {
     964        /* We know the pipes have been created because kq != -1 and we have the
     965         * lock. We don't have to check for errors, because none should occur
     966         * but when the pipe is full, which we wouldn't care about. */
     967        write(selfpipe[1], "!", 1);
     968
     969        /* Wait for the kqueue event loop to terminate. We must not return
     970         * earlier than that because the next call will be to tracelib clean,
     971         * and that frees up memory that would be used by the event loop
     972         * otherwise. */
     973        pthread_cond_wait(&evloop_signal, &evloop_mutex);
     974    } else {
     975        /* The kqueue(2) loop isn't running yet, so we can just close the
     976         * socket and make sure it stays closed. In this situation, the kqueue
     977         * will not be created. */
     978        if (sock != -1) {
     979            close(sock);
     980            sock = -1;
     981        }
     982    }
     983    pthread_mutex_unlock(&evloop_mutex);
     984
    946985    return TCL_OK;
    947986}
  • trunk/base/src/port1.0/porttrace.tcl

    r138100 r140641  
    267267
    268268                # Kill socket
     269                tracelib closesocket
    269270                tracelib clean
    270271                # Delete the socket file
Note: See TracChangeset for help on using the changeset viewer.