在启动第三个命令之前,请确保两个命令正在运行

I have three commands to run, but I'd like to make sure the two first are running before running the third one.

Currently, it does run A and B then C.

  1. I run A and B in goroutines
  2. I communicate their name through chan if there's no stderr
  3. the main functions pushes the names received through chan into a slice
  4. once the slice contains all names of module A and B it starts C

Some context

I'm in the process of learning goroutines and chan as a hobbyist. It's not clear to me how to output exec.Command("foo", "bar").Run() in a reliable way while it's running. It's not clear either how to handle errors received by each process through chan.

The reason why I need A and B to run before C is because A and B are graphql microservices, C needs them to run in order to get their schemas through HTTP and start doing some graphql federation (f.k.a. graphql stitching)

Inconsistencies

  • With my current approach, I will know if A and B are running only if they print something I guess.
  • I don't like that each subsequent stdout will hit an if statement, just to know if the process is running.
  • My error handling is not as clean as I'd like it to be.

Question

How could I have a more reliable way to ensure that A and B are running, event if they don't print anything and that they did not throw errors?

package main

import (
    "bufio"
    "fmt"
    "log"
    "os/exec"
    "reflect"
    "sort"
    "strings"
    "sync"
)

var wg sync.WaitGroup
var modulesToRun = []string{"micro-post", "micro-hello"}

func main() {
    // Send multiple values to chan
    // https://stackoverflow.com/a/50857250/9077800
    c := make(chan func() (string, error))

    go runModule([]string{"go", "run", "micro-post"}, c)  // PROCESS A
    go runModule([]string{"go", "run", "micro-hello"}, c) // PROCESS B

    modulesRunning := []string{}
    for {
        msg, err := (<-c)()
        if err != nil {
            log.Fatalln(err)
        }

        if strings.HasPrefix(msg, "micro-") && err == nil {
            modulesRunning = append(modulesRunning, msg)
            if CompareUnorderedSlices(modulesToRun, modulesRunning) {
                go runModule([]string{"go", "run", "micro-federation"}, c) // PROCESS C
            }
        }
    }

}

func runModule(commandArgs []string, o chan func() (string, error)) {
    cmd := exec.Command(commandArgs[0], commandArgs[1], commandArgs[2]+"/main.go")

    // Less verbose solution to stream output with io?
    // var stdBuffer bytes.Buffer
    // mw := io.MultiWriter(os.Stdout, &stdBuffer)
    // cmd.Stdout = mw
    // cmd.Stderr = mw

    c := make(chan struct{})
    wg.Add(1)

    // Stream command output
    // https://stackoverflow.com/a/38870609/9077800
    go func(cmd *exec.Cmd, c chan struct{}) {
        defer wg.Done()
        stdout, err := cmd.StdoutPipe()
        if err != nil {
            close(o)
            panic(err)
        }

        stderr, err := cmd.StderrPipe()
        if err != nil {
            close(o)
            panic(err)
        }

        <-c
        outScanner := bufio.NewScanner(stdout)
        for outScanner.Scan() {
            m := outScanner.Text()
            fmt.Println(commandArgs[2]+":", m)
            o <- (func() (string, error) { return commandArgs[2], nil })
        }

        errScanner := bufio.NewScanner(stderr)
        for errScanner.Scan() {
            m := errScanner.Text()
            fmt.Println(commandArgs[2]+":", m)
            o <- (func() (string, error) { return "bad", nil })
        }
    }(cmd, c)

    c <- struct{}{}
    cmd.Start()

    wg.Wait()
    close(o)
}

// CompareUnorderedSlices orders slices before comparing them
func CompareUnorderedSlices(a, b []string) bool {
    if len(a) != len(b) {
        return false
    }

    sort.Strings(a)
    sort.Strings(b)

    return reflect.DeepEqual(a, b)
}

About process management

Starting the process is the action of calling the binary path with its arguments. It will fail if the bin path is not found, or some malformed arguments syntax is provided.

As a consequence you might start a process with success, but receive an exit error because somehow its execution fails.

Those details are important to figure out if you need only to startup the process to consider the operation as successful or dig further its state and/or output.

In your code it appears you wait for the first line of stderr to be printed to consider it as started, without any consideration to the content being printed.

It resemble more to a kind of sleeping time to ensure the process has initialized.

Consider that starting the binary happens much faster in comparison to the execution of its bootstrap sequence.

About the code, your exit rules are unclear. What is keeping main from exiting ?

In the current code it will exit before C is executed when A and B has started (not anylising other cases)

Your implementation of job concurrency in main is not standard. It is missing the loop to collect results, quit and close(chan).

The chan signature is awkward, i would rather use a struct {Module string, Err error}

The runModule function is buggy. It might close(o) while another routine might attempt to write it. If starts fails, you are not returning any error signal.

A somewhat solution might look like this, consider it as being opinniated and depending the binary run other strategies can/should be implemented to detect error over the standard FDs.

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "os/exec"
    "strings"
    "sync"
    "time"
)

type cmd struct {
    Module string
    Cmd    string
    Args   []string
    Err    error
}

func main() {

    torun := []cmd{
        cmd{
            Module: "A",
            Cmd:    "ping",
            Args:   []string{"8.8.8.8"},
        },
        cmd{
            Module: "B",
            Cmd:    "ping",
            // Args:   []string{"8.8.8.8.9"},
            Args: []string{"8.8.8.8"},
        },
    }

    var wg sync.WaitGroup // use a waitgroup to ensure all concurrent jobs are done
    wg.Add(len(torun))

    out := make(chan cmd) // a channel to output cmd status

    go func() {
        wg.Wait()  //wait for the group to finish
        close(out) //  then close the signal channel
    }()

    // start the commands
    for _, c := range torun {
        // go runCmd(c, out, &wg)
        go runCmdAndWaitForSomeOutput(c, out, &wg)
    }

    // loop over the chan to collect errors
    // it ends when wg.Wait unfreeze and closes out
    for c := range out {
        if c.Err != nil {
            log.Fatalf("%v %v has failed with %v", c.Cmd, c.Args, c.Err)
        }
    }

    // here all commands started you can proceed further to run the last command
    fmt.Println("all done")
    os.Exit(0)
}

func runCmd(o cmd, out chan cmd, wg *sync.WaitGroup) {
    defer wg.Done()

    cmd := exec.Command(o.Cmd, o.Args...)

    if err := cmd.Start(); err != nil {
        o.Err = err // save err
        out <- o    // signal completion error
        return      // return to unfreeze the waitgroup wg
    }
    go cmd.Wait() // dont wait for command completion,
    // consider its done once the program started with success.

    // out <- o // useless as main look ups only for error
}

func runCmdAndWaitForSomeOutput(o cmd, out chan cmd, wg *sync.WaitGroup) {
    defer wg.Done()

    cmd := exec.Command(o.Cmd, o.Args...)

    stdout, err := cmd.StdoutPipe()
    if err != nil {
        o.Err = err // save err
        out <- o    // signal completion
        return      // return to unfreeze the waitgroup wg
    }
    stderr, err := cmd.StderrPipe()
    if err != nil {
        o.Err = err
        out <- o
        return
    }

    if err := cmd.Start(); err != nil {
        o.Err = err
        out <- o
        return
    }

    go cmd.Wait() // dont wait for command completion

    // build a concurrent fd's scanner

    outScan := make(chan error) // to signal errors detected on the fd

    var wg2 sync.WaitGroup
    wg2.Add(2) // the number of fds being watched

    go func() {
        defer wg2.Done()
        sc := bufio.NewScanner(stdout)
        for sc.Scan() {
            line := sc.Text()
            if strings.Contains(line, "icmp_seq") { // the OK marker
                return // quit asap to unfreeze wg2
            } else if strings.Contains(line, "not known") { // the nOK marker, if any...
                outScan <- fmt.Errorf("%v", line)
                return // quit  to unfreeze wg2
            }
        }
    }()

    go func() {
        defer wg2.Done()
        sc := bufio.NewScanner(stderr)
        for sc.Scan() {
            line := sc.Text()
            if strings.Contains(line, "icmp_seq") { // the OK marker
                return // quit asap to unfreeze wg2
            } else if strings.Contains(line, "not known") { // the nOK marker, if any...
                outScan <- fmt.Errorf("%v", line) // signal error
                return                            // quit to unfreeze wg2
            }
        }
    }()

    go func() {
        wg2.Wait() // consider that if the program does not output anything,
        // or never prints ok/nok, this will block forever
        close(outScan) // close the chan so the next loop is finite
    }()

    // - simple timeout less loop
    // for err := range outScan {
    //  if err != nil {
    //      o.Err = err // save the execution error
    //      out <- o // signal the cmd
    //      return // qui to unfreeze the wait group wg
    //  }
    // }

    // - more complex version with timeout
    timeout := time.After(time.Second * 3)
    for {
        select {
        case err, ok := <-outScan:
            if !ok { // if !ok, outScan is closed and we should quit the loop
                return
            }
            if err != nil {
                o.Err = err // save the execution error
                out <- o    // signal the cmd
                return      // quit to unfreeze the wait group wg
            }
        case <-timeout:
            o.Err = fmt.Errorf("timed out...%v", timeout) // save the execution error
            out <- o                                      // signal the cmd
            return                                        // quit to unfreeze the wait group wg
        }
    }

    // exit and unfreeze the wait group wg
}