pool is created on demand (#6)

This commit is contained in:
Pavel Vorobyov
2019-12-19 16:13:43 +03:00
committed by GitHub
parent 93e477ed62
commit 91f776275d
7 changed files with 32 additions and 12 deletions

View File

@@ -50,7 +50,6 @@ func (c *Cli) runAlias(name string, argsLine string, args ...string) {
} }
func exterpolate(al *alias, argsLine string, args ...string) (string, error) { func exterpolate(al *alias, argsLine string, args ...string) (string, error) {
fmt.Println(al, argsLine, args)
res := "" res := ""
for i := 0; i < len(al.proxy); i++ { for i := 0; i < len(al.proxy); i++ {
if i < len(al.proxy)-1 && al.proxy[i] == '#' { if i < len(al.proxy)-1 && al.proxy[i] == '#' {

View File

@@ -134,6 +134,7 @@ func New(cfg *config.XCConfig, backend store.Backend) (*Cli, error) {
remote.SetConnectTimeout(cli.connectTimeout) remote.SetConnectTimeout(cli.connectTimeout)
remote.SetDebug(cli.debug) remote.SetDebug(cli.debug)
remote.SetUsePasswordManager(cli.usePasswordMgr) remote.SetUsePasswordManager(cli.usePasswordMgr)
remote.SetNumThreads(cli.sshThreads)
// interpreter // interpreter
cli.setInterpreter("none", cfg.Interpreter) cli.setInterpreter("none", cfg.Interpreter)

View File

@@ -5,6 +5,7 @@ import (
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
"runtime"
"strconv" "strconv"
"syscall" "syscall"
@@ -53,6 +54,7 @@ func (c *Cli) setupCmdHandlers() {
c.handlers["distribute_type"] = c.doDistributeType c.handlers["distribute_type"] = c.doDistributeType
c.handlers["_passmgr_debug"] = c.doPassmgrDebug c.handlers["_passmgr_debug"] = c.doPassmgrDebug
c.handlers["version"] = c.doVersion c.handlers["version"] = c.doVersion
c.handlers["goruntime"] = c.doGoruntime
commands := make([]string, len(c.handlers)) commands := make([]string, len(c.handlers))
i := 0 i := 0
@@ -478,3 +480,19 @@ func (c *Cli) doPRunScript(name string, argsLine string, args ...string) {
func (c *Cli) doPassmgrDebug(name string, argsLine string, args ...string) { func (c *Cli) doPassmgrDebug(name string, argsLine string, args ...string) {
passmgr.PrintDebug() passmgr.PrintDebug()
} }
func (c *Cli) doGoruntime(name string, argsLine string, args ...string) {
var ms runtime.MemStats
numGr := runtime.NumGoroutine()
runtime.ReadMemStats(&ms)
term.Warnf("Heap Allocations: %d\n", ms.HeapAlloc)
term.Warnf("Heap Objects: %d\n", ms.HeapObjects)
term.Warnf("Heap In Use: %d\n", ms.HeapInuse)
term.Warnf("Mallocs: %d\n", ms.Mallocs)
term.Warnf("Frees: %d\n", ms.Frees)
term.Warnf("Last GC TStamp: %d\n", ms.LastGC/1_000_000_000)
term.Warnf("Next GC HeapSize: %d\n", ms.NextGC)
term.Warnf("Num GC: %d\n", ms.NumGC)
term.Warnf("Num Forced GC: %d\n\n", ms.NumForcedGC)
term.Warnf("Goroutines: %d\n", numGr)
}

View File

@@ -34,6 +34,8 @@ func Distribute(hosts []string, localFilename string, remoteFilename string, rec
signal.Notify(sigs, syscall.SIGINT) signal.Notify(sigs, syscall.SIGINT)
defer signal.Reset() defer signal.Reset()
pool = NewPool()
defer pool.Close()
go func() { go func() {
for _, host := range hosts { for _, host := range hosts {
t = &Task{ t = &Task{

View File

@@ -111,6 +111,8 @@ func RunParallel(hosts []string, cmd string) *ExecResult {
signal.Notify(sigs, syscall.SIGINT) signal.Notify(sigs, syscall.SIGINT)
defer signal.Reset() defer signal.Reset()
pool = NewPool()
defer pool.Close()
go enqueue(local, remote, hosts) go enqueue(local, remote, hosts)
for running > 0 { for running > 0 {
@@ -182,6 +184,8 @@ func RunCollapse(hosts []string, cmd string) *ExecResult {
signal.Notify(sigs, syscall.SIGINT) signal.Notify(sigs, syscall.SIGINT)
defer signal.Reset() defer signal.Reset()
pool = NewPool()
defer pool.Close()
go enqueue(local, remote, hosts) go enqueue(local, remote, hosts)
for running > 0 { for running > 0 {

View File

@@ -16,18 +16,18 @@ type Pool struct {
} }
// NewPool creates a new worker pool of a given size // NewPool creates a new worker pool of a given size
func NewPool(size int) *Pool { func NewPool() *Pool {
p := &Pool{ p := &Pool{
workers: make([]*Worker, size), workers: make([]*Worker, poolSize),
queue: make(chan *Task, dataQueueSize), queue: make(chan *Task, dataQueueSize),
Data: make(chan *Message, dataQueueSize), Data: make(chan *Message, dataQueueSize),
} }
for i := 0; i < size; i++ { for i := 0; i < poolSize; i++ {
p.workers[i] = NewWorker(p.queue, p.Data) p.workers[i] = NewWorker(p.queue, p.Data)
} }
log.Debugf("Remote execution pool created with %d workers", size) log.Debugf("Remote execution pool created with %d workers", poolSize)
log.Debugf("Data Queue Size is %d", dataQueueSize) log.Debugf("Data Queue Size is %d", dataQueueSize)
return p return p
} }

View File

@@ -23,6 +23,7 @@ var (
currentDebug bool currentDebug bool
outputFile *os.File outputFile *os.File
poolLock *sync.Mutex poolLock *sync.Mutex
poolSize int
noneInterpreter string noneInterpreter string
suInterpreter string suInterpreter string
@@ -31,7 +32,6 @@ var (
// Initialize initializes new execution pool // Initialize initializes new execution pool
func Initialize(numThreads int, username string) { func Initialize(numThreads int, username string) {
pool = NewPool(numThreads)
poolLock = new(sync.Mutex) poolLock = new(sync.Mutex)
SetUser(username) SetUser(username)
SetPassword("") SetPassword("")
@@ -109,13 +109,9 @@ func SetOutputFile(f *os.File) {
outputFile = f outputFile = f
} }
// SetNumThreads recreates the execution pool with the given number of threads // SetNumThreads sets execution pool size
func SetNumThreads(numThreads int) { func SetNumThreads(numThreads int) {
if len(pool.workers) == numThreads { poolSize = numThreads
return
}
pool.Close()
pool = NewPool(numThreads)
} }
func prepareTempFiles(cmd string) (string, string, error) { func prepareTempFiles(cmd string) (string, string, error) {