From 91f776275d2f3c5b1b07deeefbc76f3e789bdf31 Mon Sep 17 00:00:00 2001 From: Pavel Vorobyov Date: Thu, 19 Dec 2019 16:13:43 +0300 Subject: [PATCH] pool is created on demand (#6) --- cli/alias.go | 1 - cli/cli.go | 1 + cli/handlers.go | 18 ++++++++++++++++++ remote/distribute.go | 2 ++ remote/executer.go | 4 ++++ remote/pool.go | 8 ++++---- remote/remote.go | 10 +++------- 7 files changed, 32 insertions(+), 12 deletions(-) diff --git a/cli/alias.go b/cli/alias.go index e24a3e4..a7fa129 100644 --- a/cli/alias.go +++ b/cli/alias.go @@ -50,7 +50,6 @@ func (c *Cli) runAlias(name string, argsLine string, args ...string) { } func exterpolate(al *alias, argsLine string, args ...string) (string, error) { - fmt.Println(al, argsLine, args) res := "" for i := 0; i < len(al.proxy); i++ { if i < len(al.proxy)-1 && al.proxy[i] == '#' { diff --git a/cli/cli.go b/cli/cli.go index ed2b27c..229441e 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -134,6 +134,7 @@ func New(cfg *config.XCConfig, backend store.Backend) (*Cli, error) { remote.SetConnectTimeout(cli.connectTimeout) remote.SetDebug(cli.debug) remote.SetUsePasswordManager(cli.usePasswordMgr) + remote.SetNumThreads(cli.sshThreads) // interpreter cli.setInterpreter("none", cfg.Interpreter) diff --git a/cli/handlers.go b/cli/handlers.go index 6670de7..4444628 100644 --- a/cli/handlers.go +++ b/cli/handlers.go @@ -5,6 +5,7 @@ import ( "os" "os/exec" "os/signal" + "runtime" "strconv" "syscall" @@ -53,6 +54,7 @@ func (c *Cli) setupCmdHandlers() { c.handlers["distribute_type"] = c.doDistributeType c.handlers["_passmgr_debug"] = c.doPassmgrDebug c.handlers["version"] = c.doVersion + c.handlers["goruntime"] = c.doGoruntime commands := make([]string, len(c.handlers)) i := 0 @@ -478,3 +480,19 @@ func (c *Cli) doPRunScript(name string, argsLine string, args ...string) { func (c *Cli) doPassmgrDebug(name string, argsLine string, args ...string) { passmgr.PrintDebug() } + +func (c *Cli) doGoruntime(name string, argsLine string, args ...string) { + var ms runtime.MemStats + numGr := runtime.NumGoroutine() + runtime.ReadMemStats(&ms) + term.Warnf("Heap Allocations: %d\n", ms.HeapAlloc) + term.Warnf("Heap Objects: %d\n", ms.HeapObjects) + term.Warnf("Heap In Use: %d\n", ms.HeapInuse) + term.Warnf("Mallocs: %d\n", ms.Mallocs) + term.Warnf("Frees: %d\n", ms.Frees) + term.Warnf("Last GC TStamp: %d\n", ms.LastGC/1_000_000_000) + term.Warnf("Next GC HeapSize: %d\n", ms.NextGC) + term.Warnf("Num GC: %d\n", ms.NumGC) + term.Warnf("Num Forced GC: %d\n\n", ms.NumForcedGC) + term.Warnf("Goroutines: %d\n", numGr) +} diff --git a/remote/distribute.go b/remote/distribute.go index 66d8ad5..343ec72 100644 --- a/remote/distribute.go +++ b/remote/distribute.go @@ -34,6 +34,8 @@ func Distribute(hosts []string, localFilename string, remoteFilename string, rec signal.Notify(sigs, syscall.SIGINT) defer signal.Reset() + pool = NewPool() + defer pool.Close() go func() { for _, host := range hosts { t = &Task{ diff --git a/remote/executer.go b/remote/executer.go index 9b620c4..b9f453a 100644 --- a/remote/executer.go +++ b/remote/executer.go @@ -111,6 +111,8 @@ func RunParallel(hosts []string, cmd string) *ExecResult { signal.Notify(sigs, syscall.SIGINT) defer signal.Reset() + pool = NewPool() + defer pool.Close() go enqueue(local, remote, hosts) for running > 0 { @@ -182,6 +184,8 @@ func RunCollapse(hosts []string, cmd string) *ExecResult { signal.Notify(sigs, syscall.SIGINT) defer signal.Reset() + pool = NewPool() + defer pool.Close() go enqueue(local, remote, hosts) for running > 0 { diff --git a/remote/pool.go b/remote/pool.go index 9b4fb76..8c0a111 100644 --- a/remote/pool.go +++ b/remote/pool.go @@ -16,18 +16,18 @@ type Pool struct { } // NewPool creates a new worker pool of a given size -func NewPool(size int) *Pool { +func NewPool() *Pool { p := &Pool{ - workers: make([]*Worker, size), + workers: make([]*Worker, poolSize), queue: make(chan *Task, dataQueueSize), Data: make(chan *Message, dataQueueSize), } - for i := 0; i < size; i++ { + for i := 0; i < poolSize; i++ { p.workers[i] = NewWorker(p.queue, p.Data) } - log.Debugf("Remote execution pool created with %d workers", size) + log.Debugf("Remote execution pool created with %d workers", poolSize) log.Debugf("Data Queue Size is %d", dataQueueSize) return p } diff --git a/remote/remote.go b/remote/remote.go index 9cb9b78..b97a2c3 100644 --- a/remote/remote.go +++ b/remote/remote.go @@ -23,6 +23,7 @@ var ( currentDebug bool outputFile *os.File poolLock *sync.Mutex + poolSize int noneInterpreter string suInterpreter string @@ -31,7 +32,6 @@ var ( // Initialize initializes new execution pool func Initialize(numThreads int, username string) { - pool = NewPool(numThreads) poolLock = new(sync.Mutex) SetUser(username) SetPassword("") @@ -109,13 +109,9 @@ func SetOutputFile(f *os.File) { outputFile = f } -// SetNumThreads recreates the execution pool with the given number of threads +// SetNumThreads sets execution pool size func SetNumThreads(numThreads int) { - if len(pool.workers) == numThreads { - return - } - pool.Close() - pool = NewPool(numThreads) + poolSize = numThreads } func prepareTempFiles(cmd string) (string, string, error) {