1
0
mirror of https://github.com/kubernetes-sigs/descheduler.git synced 2026-01-28 14:41:10 +01:00

[nodeutilization]: actual usage client through kubernetes metrics

This commit is contained in:
Jan Chaloupka
2024-11-07 16:23:16 +01:00
parent c86416612e
commit 6567f01e86
25 changed files with 1643 additions and 134 deletions

View File

@@ -19,16 +19,16 @@ package client
import (
"fmt"
clientset "k8s.io/client-go/kubernetes"
componentbaseconfig "k8s.io/component-base/config"
// Ensure to load all auth plugins.
clientset "k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
componentbaseconfig "k8s.io/component-base/config"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
)
func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) {
func createConfig(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (*rest.Config, error) {
var cfg *rest.Config
if len(clientConnection.Kubeconfig) != 0 {
master, err := GetMasterFromKubeconfig(clientConnection.Kubeconfig)
@@ -56,9 +56,28 @@ func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfigura
cfg = rest.AddUserAgent(cfg, userAgt)
}
return cfg, nil
}
func CreateClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (clientset.Interface, error) {
cfg, err := createConfig(clientConnection, userAgt)
if err != nil {
return nil, fmt.Errorf("unable to create config: %v", err)
}
return clientset.NewForConfig(cfg)
}
func CreateMetricsClient(clientConnection componentbaseconfig.ClientConnectionConfiguration, userAgt string) (metricsclient.Interface, error) {
cfg, err := createConfig(clientConnection, userAgt)
if err != nil {
return nil, fmt.Errorf("unable to create config: %v", err)
}
// Create the metrics clientset to access the metrics.k8s.io API
return metricsclient.NewForConfig(cfg)
}
func GetMasterFromKubeconfig(filename string) (string, error) {
config, err := clientcmd.LoadFromFile(filename)
if err != nil {

View File

@@ -23,44 +23,43 @@ import (
"strconv"
"time"
policyv1 "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/events"
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
utilversion "k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
fakeclientset "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
"sigs.k8s.io/descheduler/pkg/tracing"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/metrics"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/client"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
eutils "sigs.k8s.io/descheduler/pkg/descheduler/evictions/utils"
"sigs.k8s.io/descheduler/pkg/descheduler/metricscollector"
nodeutil "sigs.k8s.io/descheduler/pkg/descheduler/node"
podutil "sigs.k8s.io/descheduler/pkg/descheduler/pod"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
frameworkprofile "sigs.k8s.io/descheduler/pkg/framework/profile"
frameworktypes "sigs.k8s.io/descheduler/pkg/framework/types"
"sigs.k8s.io/descheduler/pkg/tracing"
"sigs.k8s.io/descheduler/pkg/utils"
"sigs.k8s.io/descheduler/pkg/version"
)
type eprunner func(ctx context.Context, nodes []*v1.Node) *frameworktypes.Status
@@ -79,6 +78,7 @@ type descheduler struct {
eventRecorder events.EventRecorder
podEvictor *evictions.PodEvictor
podEvictionReactionFnc func(*fakeclientset.Clientset) func(action core.Action) (bool, runtime.Object, error)
metricsCollector *metricscollector.MetricsCollector
}
type informerResources struct {
@@ -163,6 +163,19 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
return nil, err
}
var metricsCollector *metricscollector.MetricsCollector
if deschedulerPolicy.MetricsCollector.Enabled {
nodeSelector := labels.Everything()
if deschedulerPolicy.NodeSelector != nil {
sel, err := labels.Parse(*deschedulerPolicy.NodeSelector)
if err != nil {
return nil, err
}
nodeSelector = sel
}
metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector)
}
return &descheduler{
rs: rs,
ir: ir,
@@ -172,6 +185,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
eventRecorder: eventRecorder,
podEvictor: podEvictor,
podEvictionReactionFnc: podEvictionReactionFnc,
metricsCollector: metricsCollector,
}, nil
}
@@ -251,6 +265,7 @@ func (d *descheduler) runProfiles(ctx context.Context, client clientset.Interfac
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
frameworkprofile.WithPodEvictor(d.podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
frameworkprofile.WithMetricsCollector(d.metricsCollector),
)
if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
@@ -315,6 +330,14 @@ func Run(ctx context.Context, rs *options.DeschedulerServer) error {
return err
}
if deschedulerPolicy.MetricsCollector.Enabled {
metricsClient, err := client.CreateMetricsClient(clientConnection, "descheduler")
if err != nil {
return err
}
rs.MetricsClient = metricsClient
}
runFn := func() error {
return RunDeschedulerStrategies(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion)
}
@@ -423,6 +446,20 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
sharedInformerFactory.WaitForCacheSync(ctx.Done())
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
if deschedulerPolicy.MetricsCollector.Enabled {
go func() {
klog.V(2).Infof("Starting metrics collector")
descheduler.metricsCollector.Run(ctx)
klog.V(2).Infof("Stopped metrics collector")
}()
klog.V(2).Infof("Waiting for metrics collector to sync")
if err := wait.PollWithContext(ctx, time.Second, time.Minute, func(context.Context) (done bool, err error) {
return descheduler.metricsCollector.HasSynced(), nil
}); err != nil {
return fmt.Errorf("unable to wait for metrics collector to sync: %v", err)
}
}
wait.NonSlidingUntil(func() {
// A next context is created here intentionally to avoid nesting the spans via context.
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")

View File

@@ -14,6 +14,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apiversion "k8s.io/apimachinery/pkg/version"
fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/informers"
@@ -21,13 +22,18 @@ import (
core "k8s.io/client-go/testing"
"k8s.io/component-base/featuregate"
"k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/metrics/v1beta1"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
utilptr "k8s.io/utils/ptr"
"sigs.k8s.io/descheduler/cmd/descheduler/app/options"
"sigs.k8s.io/descheduler/pkg/api"
"sigs.k8s.io/descheduler/pkg/descheduler/evictions"
"sigs.k8s.io/descheduler/pkg/features"
"sigs.k8s.io/descheduler/pkg/framework/pluginregistry"
"sigs.k8s.io/descheduler/pkg/framework/plugins/defaultevictor"
"sigs.k8s.io/descheduler/pkg/framework/plugins/nodeutilization"
"sigs.k8s.io/descheduler/pkg/framework/plugins/removeduplicates"
"sigs.k8s.io/descheduler/pkg/framework/plugins/removepodsviolatingnodetaints"
"sigs.k8s.io/descheduler/pkg/utils"
@@ -45,6 +51,8 @@ var (
Message: "admission webhook \"virt-launcher-eviction-interceptor.kubevirt.io\" denied the request: Eviction triggered evacuation of VMI",
},
}
nodesgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"}
podsgvr = schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "pods"}
)
func initFeatureGates() featuregate.FeatureGate {
@@ -60,6 +68,7 @@ func initPluginRegistry() {
pluginregistry.Register(removeduplicates.PluginName, removeduplicates.New, &removeduplicates.RemoveDuplicates{}, &removeduplicates.RemoveDuplicatesArgs{}, removeduplicates.ValidateRemoveDuplicatesArgs, removeduplicates.SetDefaults_RemoveDuplicatesArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(defaultevictor.PluginName, defaultevictor.New, &defaultevictor.DefaultEvictor{}, &defaultevictor.DefaultEvictorArgs{}, defaultevictor.ValidateDefaultEvictorArgs, defaultevictor.SetDefaults_DefaultEvictorArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(removepodsviolatingnodetaints.PluginName, removepodsviolatingnodetaints.New, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaints{}, &removepodsviolatingnodetaints.RemovePodsViolatingNodeTaintsArgs{}, removepodsviolatingnodetaints.ValidateRemovePodsViolatingNodeTaintsArgs, removepodsviolatingnodetaints.SetDefaults_RemovePodsViolatingNodeTaintsArgs, pluginregistry.PluginRegistry)
pluginregistry.Register(nodeutilization.LowNodeUtilizationPluginName, nodeutilization.NewLowNodeUtilization, &nodeutilization.LowNodeUtilization{}, &nodeutilization.LowNodeUtilizationArgs{}, nodeutilization.ValidateLowNodeUtilizationArgs, nodeutilization.SetDefaults_LowNodeUtilizationArgs, pluginregistry.PluginRegistry)
}
func removePodsViolatingNodeTaintsPolicy() *api.DeschedulerPolicy {
@@ -126,7 +135,45 @@ func removeDuplicatesPolicy() *api.DeschedulerPolicy {
}
}
func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThresholds, metricsEnabled bool) *api.DeschedulerPolicy {
return &api.DeschedulerPolicy{
Profiles: []api.DeschedulerProfile{
{
Name: "Profile",
PluginConfigs: []api.PluginConfig{
{
Name: nodeutilization.LowNodeUtilizationPluginName,
Args: &nodeutilization.LowNodeUtilizationArgs{
Thresholds: thresholds,
TargetThresholds: targetThresholds,
MetricsUtilization: nodeutilization.MetricsUtilization{
MetricsServer: metricsEnabled,
},
},
},
{
Name: defaultevictor.PluginName,
Args: &defaultevictor.DefaultEvictorArgs{},
},
},
Plugins: api.Plugins{
Filter: api.PluginSet{
Enabled: []string{
defaultevictor.PluginName,
},
},
Balance: api.PluginSet{
Enabled: []string{
nodeutilization.LowNodeUtilizationPluginName,
},
},
},
},
},
}
}
func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
client := fakeclientset.NewSimpleClientset(objects...)
eventClient := fakeclientset.NewSimpleClientset(objects...)
@@ -137,6 +184,7 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate
rs.Client = client
rs.EventClient = eventClient
rs.DefaultFeatureGates = featureGates
rs.MetricsClient = metricsClient
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client)
@@ -441,7 +489,7 @@ func TestPodEvictorReset(t *testing.T) {
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
ctxCancel, cancel := context.WithCancel(ctx)
rs, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, node1, node2, p1, p2)
rs, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, nil, node1, node2, p1, p2)
defer cancel()
var evictedPods []string
@@ -543,7 +591,7 @@ func TestEvictionRequestsCache(t *testing.T) {
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha},
})
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, node1, node2, p1, p2, p3, p4)
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, nil, node1, node2, p1, p2, p3, p4)
defer cancel()
var fakeEvictedPods []string
@@ -685,7 +733,7 @@ func TestDeschedulingLimits(t *testing.T) {
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha},
})
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, node1, node2)
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, nil, node1, node2)
defer cancel()
var fakeEvictedPods []string
@@ -737,3 +785,81 @@ func TestDeschedulingLimits(t *testing.T) {
})
}
}
func TestLoadAwareDescheduling(t *testing.T) {
initPluginRegistry()
ownerRef1 := test.GetReplicaSetOwnerRefList()
updatePod := func(pod *v1.Pod) {
pod.ObjectMeta.OwnerReferences = ownerRef1
}
ctx := context.Background()
node1 := test.BuildTestNode("n1", 2000, 3000, 10, taintNodeNoSchedule)
node2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
nodes := []*v1.Node{node1, node2}
p1 := test.BuildTestPod("p1", 300, 0, node1.Name, updatePod)
p2 := test.BuildTestPod("p2", 300, 0, node1.Name, updatePod)
p3 := test.BuildTestPod("p3", 300, 0, node1.Name, updatePod)
p4 := test.BuildTestPod("p4", 300, 0, node1.Name, updatePod)
p5 := test.BuildTestPod("p5", 300, 0, node1.Name, updatePod)
nodemetricses := []*v1beta1.NodeMetrics{
test.BuildNodeMetrics("n1", 2400, 3000),
test.BuildNodeMetrics("n2", 400, 0),
}
podmetricses := []*v1beta1.PodMetrics{
test.BuildPodMetrics("p1", 400, 0),
test.BuildPodMetrics("p2", 400, 0),
test.BuildPodMetrics("p3", 400, 0),
test.BuildPodMetrics("p4", 400, 0),
test.BuildPodMetrics("p5", 400, 0),
}
metricsClientset := fakemetricsclient.NewSimpleClientset()
for _, nodemetrics := range nodemetricses {
metricsClientset.Tracker().Create(nodesgvr, nodemetrics, "")
}
for _, podmetrics := range podmetricses {
metricsClientset.Tracker().Create(podsgvr, podmetrics, podmetrics.Namespace)
}
policy := lowNodeUtilizationPolicy(
api.ResourceThresholds{
v1.ResourceCPU: 30,
v1.ResourcePods: 30,
},
api.ResourceThresholds{
v1.ResourceCPU: 50,
v1.ResourcePods: 50,
},
true, // enabled metrics utilization
)
policy.MetricsCollector.Enabled = true
ctxCancel, cancel := context.WithCancel(ctx)
_, descheduler, _ := initDescheduler(
t,
ctxCancel,
initFeatureGates(),
policy,
metricsClientset,
node1, node2, p1, p2, p3, p4, p5)
defer cancel()
// This needs to be run since the metrics collector is started
// after newDescheduler in RunDeschedulerStrategies.
descheduler.metricsCollector.Collect(ctx)
err := descheduler.runDeschedulerLoop(ctx, nodes)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
totalEs := descheduler.podEvictor.TotalEvicted()
if totalEs != 2 {
t.Fatalf("Expected %v evictions in total, got %v instead", 2, totalEs)
}
t.Logf("Total evictions: %v", totalEs)
}

View File

@@ -0,0 +1,151 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricscollector
import (
"context"
"fmt"
"math"
"sync"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
listercorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
metricsclient "k8s.io/metrics/pkg/client/clientset/versioned"
utilptr "k8s.io/utils/ptr"
)
const (
beta float64 = 0.9
)
type MetricsCollector struct {
nodeLister listercorev1.NodeLister
metricsClientset metricsclient.Interface
nodeSelector labels.Selector
nodes map[string]map[v1.ResourceName]*resource.Quantity
mu sync.RWMutex
// hasSynced signals at least one sync succeeded
hasSynced bool
}
func NewMetricsCollector(nodeLister listercorev1.NodeLister, metricsClientset metricsclient.Interface, nodeSelector labels.Selector) *MetricsCollector {
return &MetricsCollector{
nodeLister: nodeLister,
metricsClientset: metricsClientset,
nodeSelector: nodeSelector,
nodes: make(map[string]map[v1.ResourceName]*resource.Quantity),
}
}
func (mc *MetricsCollector) Run(ctx context.Context) {
wait.NonSlidingUntil(func() {
mc.Collect(ctx)
}, 5*time.Second, ctx.Done())
}
// During experiments rounding to int error causes weightedAverage to never
// reach value even when weightedAverage is repeated many times in a row.
// The difference between the limit and computed average stops within 5 units.
// Nevertheless, the value is expected to change in time. So the weighted
// average nevers gets a chance to converge. Which makes the computed
// error negligible.
// The speed of convergence depends on how often the metrics collector
// syncs with the current value. Currently, the interval is set to 5s.
func weightedAverage(prevValue, value int64) int64 {
return int64(math.Round(beta*float64(prevValue) + (1-beta)*float64(value)))
}
func (mc *MetricsCollector) AllNodesUsage() (map[string]map[v1.ResourceName]*resource.Quantity, error) {
mc.mu.RLock()
defer mc.mu.RUnlock()
allNodesUsage := make(map[string]map[v1.ResourceName]*resource.Quantity)
for nodeName := range mc.nodes {
allNodesUsage[nodeName] = map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: utilptr.To[resource.Quantity](mc.nodes[nodeName][v1.ResourceCPU].DeepCopy()),
v1.ResourceMemory: utilptr.To[resource.Quantity](mc.nodes[nodeName][v1.ResourceMemory].DeepCopy()),
}
}
return allNodesUsage, nil
}
func (mc *MetricsCollector) NodeUsage(node *v1.Node) (map[v1.ResourceName]*resource.Quantity, error) {
mc.mu.RLock()
defer mc.mu.RUnlock()
if _, exists := mc.nodes[node.Name]; !exists {
klog.V(4).InfoS("unable to find node in the collected metrics", "node", klog.KObj(node))
return nil, fmt.Errorf("unable to find node %q in the collected metrics", node.Name)
}
return map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceCPU].DeepCopy()),
v1.ResourceMemory: utilptr.To[resource.Quantity](mc.nodes[node.Name][v1.ResourceMemory].DeepCopy()),
}, nil
}
func (mc *MetricsCollector) HasSynced() bool {
return mc.hasSynced
}
func (mc *MetricsCollector) MetricsClient() metricsclient.Interface {
return mc.metricsClientset
}
func (mc *MetricsCollector) Collect(ctx context.Context) error {
mc.mu.Lock()
defer mc.mu.Unlock()
nodes, err := mc.nodeLister.List(mc.nodeSelector)
if err != nil {
return fmt.Errorf("unable to list nodes: %v", err)
}
for _, node := range nodes {
metrics, err := mc.metricsClientset.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
klog.ErrorS(err, "Error fetching metrics", "node", node.Name)
// No entry -> duplicate the previous value -> do nothing as beta*PV + (1-beta)*PV = PV
continue
}
if _, exists := mc.nodes[node.Name]; !exists {
mc.nodes[node.Name] = map[v1.ResourceName]*resource.Quantity{
v1.ResourceCPU: utilptr.To[resource.Quantity](metrics.Usage.Cpu().DeepCopy()),
v1.ResourceMemory: utilptr.To[resource.Quantity](metrics.Usage.Memory().DeepCopy()),
}
} else {
// get MilliValue to reduce loss of precision
mc.nodes[node.Name][v1.ResourceCPU].SetMilli(
weightedAverage(mc.nodes[node.Name][v1.ResourceCPU].MilliValue(), metrics.Usage.Cpu().MilliValue()),
)
mc.nodes[node.Name][v1.ResourceMemory].Set(
weightedAverage(mc.nodes[node.Name][v1.ResourceMemory].Value(), metrics.Usage.Memory().Value()),
)
}
}
mc.hasSynced = true
return nil
}

View File

@@ -0,0 +1,141 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricscollector
import (
"context"
"math"
"testing"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
fakeclientset "k8s.io/client-go/kubernetes/fake"
fakemetricsclient "k8s.io/metrics/pkg/client/clientset/versioned/fake"
"sigs.k8s.io/descheduler/test"
)
func checkCpuNodeUsage(t *testing.T, usage map[v1.ResourceName]*resource.Quantity, millicpu int64) {
t.Logf("current node cpu usage: %v\n", usage[v1.ResourceCPU].MilliValue())
if usage[v1.ResourceCPU].MilliValue() != millicpu {
t.Fatalf("cpu node usage expected to be %v, got %v instead", millicpu, usage[v1.ResourceCPU].MilliValue())
}
}
func TestMetricsCollector(t *testing.T) {
gvr := schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"}
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)
n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816)
n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816)
n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816)
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3)
metricsClientset := fakemetricsclient.NewSimpleClientset()
metricsClientset.Tracker().Create(gvr, n1metrics, "")
metricsClientset.Tracker().Create(gvr, n2metrics, "")
metricsClientset.Tracker().Create(gvr, n3metrics, "")
ctx := context.TODO()
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
t.Logf("Set initial node cpu usage to 1400")
collector := NewMetricsCollector(nodeLister, metricsClientset, labels.Everything())
collector.Collect(context.TODO())
nodesUsage, _ := collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1400)
allnodesUsage, _ := collector.AllNodesUsage()
checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1400)
t.Logf("Set current node cpu usage to 500")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(500, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
collector.Collect(context.TODO())
nodesUsage, _ = collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1310)
allnodesUsage, _ = collector.AllNodesUsage()
checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1310)
t.Logf("Set current node cpu usage to 900")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
collector.Collect(context.TODO())
nodesUsage, _ = collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1269)
allnodesUsage, _ = collector.AllNodesUsage()
checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1269)
}
func TestMetricsCollectorConvergence(t *testing.T) {
gvr := schema.GroupVersionResource{Group: "metrics.k8s.io", Version: "v1beta1", Resource: "nodes"}
n1 := test.BuildTestNode("n1", 2000, 3000, 10, nil)
n2 := test.BuildTestNode("n2", 2000, 3000, 10, nil)
n3 := test.BuildTestNode("n3", 2000, 3000, 10, nil)
n1metrics := test.BuildNodeMetrics("n1", 400, 1714978816)
n2metrics := test.BuildNodeMetrics("n2", 1400, 1714978816)
n3metrics := test.BuildNodeMetrics("n3", 300, 1714978816)
clientset := fakeclientset.NewSimpleClientset(n1, n2, n3)
metricsClientset := fakemetricsclient.NewSimpleClientset()
metricsClientset.Tracker().Create(gvr, n1metrics, "")
metricsClientset.Tracker().Create(gvr, n2metrics, "")
metricsClientset.Tracker().Create(gvr, n3metrics, "")
ctx := context.TODO()
sharedInformerFactory := informers.NewSharedInformerFactory(clientset, 0)
nodeLister := sharedInformerFactory.Core().V1().Nodes().Lister()
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
t.Logf("Set initial node cpu usage to 1400")
collector := NewMetricsCollector(nodeLister, metricsClientset, labels.Everything())
collector.Collect(context.TODO())
nodesUsage, _ := collector.NodeUsage(n2)
checkCpuNodeUsage(t, nodesUsage, 1400)
allnodesUsage, _ := collector.AllNodesUsage()
checkCpuNodeUsage(t, allnodesUsage[n2.Name], 1400)
t.Logf("Set current node cpu/memory usage to 900/1614978816 and wait until it converges to it")
n2metrics.Usage[v1.ResourceCPU] = *resource.NewMilliQuantity(900, resource.DecimalSI)
n2metrics.Usage[v1.ResourceMemory] = *resource.NewQuantity(1614978816, resource.BinarySI)
metricsClientset.Tracker().Update(gvr, n2metrics, "")
converged := false
for i := 0; i < 300; i++ {
collector.Collect(context.TODO())
nodesUsage, _ = collector.NodeUsage(n2)
if math.Abs(float64(900-nodesUsage[v1.ResourceCPU].MilliValue())) < 6 && math.Abs(float64(1614978816-nodesUsage[v1.ResourceMemory].Value())) < 6 {
t.Logf("Node cpu/memory usage converged to 900+-5/1614978816+-5")
converged = true
break
}
t.Logf("The current node usage: cpu=%v, memory=%v", nodesUsage[v1.ResourceCPU].MilliValue(), nodesUsage[v1.ResourceMemory].Value())
}
if !converged {
t.Fatalf("The node usage did not converged to 900+-1")
}
}