mirror of
https://github.com/kemko/xc.git
synced 2026-01-01 15:55:43 +03:00
92 lines
2.0 KiB
Go
92 lines
2.0 KiB
Go
package remote
|
|
|
|
import (
|
|
"github.com/viert/xc/log"
|
|
)
|
|
|
|
const (
|
|
dataQueueSize = 1024
|
|
)
|
|
|
|
// Pool is a class representing a worker pool
|
|
type Pool struct {
|
|
workers []*Worker
|
|
queue chan *Task
|
|
Data chan *Message
|
|
}
|
|
|
|
// NewPool creates a new worker pool of a given size
|
|
func NewPool() *Pool {
|
|
|
|
p := &Pool{
|
|
workers: make([]*Worker, poolSize),
|
|
queue: make(chan *Task, dataQueueSize),
|
|
Data: make(chan *Message, dataQueueSize),
|
|
}
|
|
|
|
for i := 0; i < poolSize; i++ {
|
|
p.workers[i] = NewWorker(p.queue, p.Data)
|
|
}
|
|
log.Debugf("Remote execution pool created with %d workers", poolSize)
|
|
log.Debugf("Data Queue Size is %d", dataQueueSize)
|
|
return p
|
|
}
|
|
|
|
// ForceStopAllTasks removes all pending tasks and force stops those in progress
|
|
func (p *Pool) ForceStopAllTasks() int {
|
|
// Remove all pending tasks from the queue
|
|
log.Debug("Force stopping all tasks")
|
|
i := 0
|
|
rmvLoop:
|
|
for {
|
|
select {
|
|
case <-p.queue:
|
|
i++
|
|
continue
|
|
default:
|
|
break rmvLoop
|
|
}
|
|
}
|
|
log.Debugf("%d queued (and not yet started) tasks removed from the queue", i)
|
|
|
|
stopped := 0
|
|
for _, wrk := range p.workers {
|
|
if wrk.ForceStop() {
|
|
log.Debugf("Worker %d was running a task so force stopped", wrk.ID())
|
|
stopped++
|
|
}
|
|
}
|
|
return stopped
|
|
}
|
|
|
|
// Close shuts down the pool itself and all its workers
|
|
func (p *Pool) Close() {
|
|
log.Debug("Closing remote execution pool")
|
|
p.ForceStopAllTasks()
|
|
close(p.queue) // this should make all the workers step out of range loop on queue chan and shut down
|
|
log.Debug("Closing the task queue")
|
|
close(p.Data)
|
|
}
|
|
|
|
// AddTask adds a task to the pool queue
|
|
func (p *Pool) AddTask(task *Task) {
|
|
if task.WG != nil {
|
|
task.WG.Add(1)
|
|
}
|
|
p.queue <- task
|
|
}
|
|
|
|
// AddTaskHostlist creates multiple tasks to be run on a multiple hosts
|
|
func (p *Pool) AddTaskHostlist(task *Task, hosts []string) {
|
|
for _, host := range hosts {
|
|
t := &Task{
|
|
Hostname: host,
|
|
LocalFilename: task.LocalFilename,
|
|
RemoteFilename: task.RemoteFilename,
|
|
Cmd: task.Cmd,
|
|
WG: task.WG,
|
|
}
|
|
p.AddTask(t)
|
|
}
|
|
}
|