package performance import ( "context" "fmt" "sync" "time" "github.com/sirupsen/logrus" ) // ScalingManager manages horizontal scaling and load balancing type ScalingManager struct { nodes map[string]*Node loadBalancer *LoadBalancer autoscaler *AutoScaler mu sync.RWMutex logger *logrus.Logger enabled bool } // Node represents a compute node in the cluster type Node struct { ID string `json:"id"` Hostname string `json:"hostname"` Address string `json:"address"` Port int `json:"port"` Status NodeStatus `json:"status"` Capabilities map[string]interface{} `json:"capabilities"` Metrics *NodeMetrics `json:"metrics"` LastSeen time.Time `json:"last_seen"` Tags map[string]string `json:"tags"` } // NodeStatus represents the status of a node type NodeStatus string const ( NodeStatusOnline NodeStatus = "online" NodeStatusOffline NodeStatus = "offline" NodeStatusBusy NodeStatus = "busy" NodeStatusError NodeStatus = "error" ) // NodeMetrics represents performance metrics for a node type NodeMetrics struct { CPUUsage float64 `json:"cpu_usage"` MemoryUsage float64 `json:"memory_usage"` DiskUsage float64 `json:"disk_usage"` LoadAverage float64 `json:"load_average"` ActiveJobs int `json:"active_jobs"` MaxJobs int `json:"max_jobs"` LastUpdate time.Time `json:"last_update"` } // LoadBalancer manages load distribution across nodes type LoadBalancer struct { strategy LoadBalancingStrategy nodes map[string]*Node mu sync.RWMutex logger *logrus.Logger } // LoadBalancingStrategy defines the interface for load balancing strategies type LoadBalancingStrategy interface { SelectNode(nodes map[string]*Node, request *LoadRequest) (*Node, error) GetName() string } // LoadRequest represents a load balancing request type LoadRequest struct { Type string `json:"type"` Priority int `json:"priority"` Requirements map[string]interface{} `json:"requirements"` Metadata map[string]interface{} `json:"metadata"` } // AutoScaler manages automatic scaling of the cluster type AutoScaler struct { config *AutoScalerConfig nodes map[string]*Node mu sync.RWMutex logger *logrus.Logger enabled bool ctx context.Context cancel context.CancelFunc } // AutoScalerConfig represents auto-scaling configuration type AutoScalerConfig struct { Enabled bool `yaml:"enabled"` MinNodes int `yaml:"min_nodes"` MaxNodes int `yaml:"max_nodes"` ScaleUpThreshold float64 `yaml:"scale_up_threshold"` ScaleDownThreshold float64 `yaml:"scale_down_threshold"` ScaleUpCooldown time.Duration `yaml:"scale_up_cooldown"` ScaleDownCooldown time.Duration `yaml:"scale_down_cooldown"` CheckInterval time.Duration `yaml:"check_interval"` } // NewScalingManager creates a new scaling manager func NewScalingManager(enabled bool) *ScalingManager { sm := &ScalingManager{ nodes: make(map[string]*Node), logger: logrus.New(), enabled: enabled, } // Initialize load balancer sm.loadBalancer = NewLoadBalancer(sm.logger) // Initialize auto-scaler sm.autoscaler = NewAutoScaler(&AutoScalerConfig{ Enabled: true, MinNodes: 2, MaxNodes: 10, ScaleUpThreshold: 80.0, ScaleDownThreshold: 20.0, ScaleUpCooldown: 5 * time.Minute, ScaleDownCooldown: 10 * time.Minute, CheckInterval: 30 * time.Second, }, sm.logger) return sm } // RegisterNode registers a new node in the cluster func (sm *ScalingManager) RegisterNode(node *Node) error { sm.mu.Lock() defer sm.mu.Unlock() // Validate node if node.ID == "" { return fmt.Errorf("node ID is required") } if node.Address == "" { return fmt.Errorf("node address is required") } // Check for duplicate if _, exists := sm.nodes[node.ID]; exists { return fmt.Errorf("node %s already exists", node.ID) } // Set default values if node.Status == "" { node.Status = NodeStatusOnline } if node.Capabilities == nil { node.Capabilities = make(map[string]interface{}) } if node.Tags == nil { node.Tags = make(map[string]string) } if node.Metrics == nil { node.Metrics = &NodeMetrics{ LastUpdate: time.Now(), } } node.LastSeen = time.Now() sm.nodes[node.ID] = node // Update load balancer sm.loadBalancer.AddNode(node) sm.logger.Infof("Registered node: %s (%s)", node.ID, node.Hostname) return nil } // UnregisterNode removes a node from the cluster func (sm *ScalingManager) UnregisterNode(nodeID string) error { sm.mu.Lock() defer sm.mu.Unlock() node, exists := sm.nodes[nodeID] if !exists { return fmt.Errorf("node %s not found", nodeID) } delete(sm.nodes, nodeID) sm.loadBalancer.RemoveNode(nodeID) sm.logger.Infof("Unregistered node: %s (%s)", node.ID, node.Hostname) return nil } // UpdateNodeMetrics updates metrics for a specific node func (sm *ScalingManager) UpdateNodeMetrics(nodeID string, metrics *NodeMetrics) error { sm.mu.Lock() defer sm.mu.Unlock() node, exists := sm.nodes[nodeID] if !exists { return fmt.Errorf("node %s not found", nodeID) } metrics.LastUpdate = time.Now() node.Metrics = metrics node.LastSeen = time.Now() // Update load balancer sm.loadBalancer.UpdateNode(node) return nil } // GetNode returns a node by ID func (sm *ScalingManager) GetNode(nodeID string) (*Node, bool) { sm.mu.RLock() defer sm.mu.RUnlock() node, exists := sm.nodes[nodeID] return node, exists } // GetAllNodes returns all registered nodes func (sm *ScalingManager) GetAllNodes() map[string]*Node { sm.mu.RLock() defer sm.mu.RUnlock() // Create a copy to avoid race conditions nodes := make(map[string]*Node) for k, v := range sm.nodes { nodes[k] = v } return nodes } // GetAvailableNodes returns all available nodes func (sm *ScalingManager) GetAvailableNodes() []*Node { sm.mu.RLock() defer sm.mu.RUnlock() var available []*Node for _, node := range sm.nodes { if node.Status == NodeStatusOnline && node.Metrics.ActiveJobs < node.Metrics.MaxJobs { available = append(available, node) } } return available } // SelectNode selects a node for a specific request func (sm *ScalingManager) SelectNode(request *LoadRequest) (*Node, error) { return sm.loadBalancer.SelectNode(request) } // Start starts the scaling manager func (sm *ScalingManager) Start() error { if !sm.enabled { sm.logger.Info("Scaling manager is disabled") return nil } sm.logger.Info("Starting scaling manager") // Start auto-scaler if err := sm.autoscaler.Start(); err != nil { return fmt.Errorf("failed to start auto-scaler: %w", err) } // Start node health monitoring go sm.monitorNodeHealth() return nil } // Stop stops the scaling manager func (sm *ScalingManager) Stop() error { sm.logger.Info("Stopping scaling manager") // Stop auto-scaler if err := sm.autoscaler.Stop(); err != nil { return fmt.Errorf("failed to stop auto-scaler: %w", err) } return nil } // monitorNodeHealth monitors the health of all nodes func (sm *ScalingManager) monitorNodeHealth() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: sm.checkNodeHealth() } } } // checkNodeHealth checks the health of all nodes func (sm *ScalingManager) checkNodeHealth() { nodes := sm.GetAllNodes() for _, node := range nodes { // Check if node is responsive if time.Since(node.LastSeen) > 2*time.Minute { sm.logger.Warnf("Node %s appears to be unresponsive", node.ID) sm.markNodeOffline(node.ID) } // Check metrics freshness if node.Metrics != nil && time.Since(node.Metrics.LastUpdate) > 5*time.Minute { sm.logger.Warnf("Node %s metrics are stale", node.ID) } } } // markNodeOffline marks a node as offline func (sm *ScalingManager) markNodeOffline(nodeID string) { sm.mu.Lock() defer sm.mu.Unlock() if node, exists := sm.nodes[nodeID]; exists { node.Status = NodeStatusOffline sm.logger.Infof("Marked node %s as offline", nodeID) } } // GetClusterStatus returns the current status of the cluster func (sm *ScalingManager) GetClusterStatus() map[string]interface{} { nodes := sm.GetAllNodes() status := map[string]interface{}{ "total_nodes": len(nodes), "online_nodes": 0, "offline_nodes": 0, "busy_nodes": 0, "error_nodes": 0, "total_capacity": 0, "used_capacity": 0, "timestamp": time.Now(), } for _, node := range nodes { switch node.Status { case NodeStatusOnline: status["online_nodes"] = status["online_nodes"].(int) + 1 case NodeStatusOffline: status["offline_nodes"] = status["offline_nodes"].(int) + 1 case NodeStatusBusy: status["busy_nodes"] = status["busy_nodes"].(int) + 1 case NodeStatusError: status["error_nodes"] = status["error_nodes"].(int) + 1 } if node.Metrics != nil { status["total_capacity"] = status["total_capacity"].(int) + node.Metrics.MaxJobs status["used_capacity"] = status["used_capacity"].(int) + node.Metrics.ActiveJobs } } // Calculate utilization percentage if status["total_capacity"].(int) > 0 { utilization := float64(status["used_capacity"].(int)) / float64(status["total_capacity"].(int)) * 100 status["utilization_percentage"] = utilization } return status } // NewLoadBalancer creates a new load balancer func NewLoadBalancer(logger *logrus.Logger) *LoadBalancer { lb := &LoadBalancer{ strategy: NewRoundRobinStrategy(), nodes: make(map[string]*Node), logger: logger, } return lb } // AddNode adds a node to the load balancer func (lb *LoadBalancer) AddNode(node *Node) { lb.mu.Lock() defer lb.mu.Unlock() lb.nodes[node.ID] = node lb.logger.Debugf("Added node %s to load balancer", node.ID) } // RemoveNode removes a node from the load balancer func (lb *LoadBalancer) RemoveNode(nodeID string) { lb.mu.Lock() defer lb.mu.Unlock() delete(lb.nodes, nodeID) lb.logger.Debugf("Removed node %s from load balancer", nodeID) } // UpdateNode updates a node in the load balancer func (lb *LoadBalancer) UpdateNode(node *Node) { lb.mu.Lock() defer lb.mu.Unlock() lb.nodes[node.ID] = node } // SelectNode selects a node using the configured strategy func (lb *LoadBalancer) SelectNode(request *LoadRequest) (*Node, error) { lb.mu.RLock() defer lb.mu.RUnlock() // Filter available nodes availableNodes := make(map[string]*Node) for id, node := range lb.nodes { if node.Status == NodeStatusOnline && node.Metrics.ActiveJobs < node.Metrics.MaxJobs { availableNodes[id] = node } } if len(availableNodes) == 0 { return nil, fmt.Errorf("no available nodes") } return lb.strategy.SelectNode(availableNodes, request) } // SetStrategy sets the load balancing strategy func (lb *LoadBalancer) SetStrategy(strategy LoadBalancingStrategy) { lb.mu.Lock() defer lb.mu.Unlock() lb.strategy = strategy lb.logger.Infof("Load balancing strategy changed to: %s", strategy.GetName()) } // NewAutoScaler creates a new auto-scaler func NewAutoScaler(config *AutoScalerConfig, logger *logrus.Logger) *AutoScaler { ctx, cancel := context.WithCancel(context.Background()) as := &AutoScaler{ config: config, nodes: make(map[string]*Node), logger: logger, enabled: config.Enabled, ctx: ctx, cancel: cancel, } return as } // Start starts the auto-scaler func (as *AutoScaler) Start() error { if !as.enabled { as.logger.Info("Auto-scaler is disabled") return nil } as.logger.Info("Starting auto-scaler") // Start scaling checks go as.runScalingChecks() return nil } // Stop stops the auto-scaler func (as *AutoScaler) Stop() error { as.logger.Info("Stopping auto-scaler") as.cancel() return nil } // runScalingChecks runs periodic scaling checks func (as *AutoScaler) runScalingChecks() { ticker := time.NewTicker(as.config.CheckInterval) defer ticker.Stop() for { select { case <-as.ctx.Done(): return case <-ticker.C: as.checkScaling() } } } // checkScaling checks if scaling is needed func (as *AutoScaler) checkScaling() { // Get current cluster status // This would typically come from the scaling manager // For now, we'll use placeholder logic as.logger.Debug("Running scaling check") // Check if we need to scale up // Check if we need to scale down // Implement scaling logic based on metrics }