diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 9e72a7cc97aa3..6b1a34165b56c 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/managed" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/utils/cpuset" ) @@ -409,6 +410,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec failure = []reconciledContainer{} m.removeStaleState() + workloadEnabled := managed.IsEnabled() for _, pod := range m.activePods() { pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) if !ok { @@ -416,6 +418,10 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec failure = append(failure, reconciledContainer{pod.Name, "", ""}) continue } + if enabled, _, _ := managed.IsPodManaged(pod); workloadEnabled && enabled { + klog.V(4).InfoS("[cpumanager] reconcileState: skipping pod; pod is managed (pod: %s)", pod.Name) + continue + } allContainers := pod.Spec.InitContainers allContainers = append(allContainers, pod.Spec.Containers...) diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index 26d1fc6d91bef..eba1e454f6f6a 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager/bitmask" + "k8s.io/kubernetes/pkg/kubelet/managed" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/utils/cpuset" ) @@ -214,6 +215,10 @@ func (p *staticPolicy) validateState(s state.State) error { // state is empty initialize s.SetDefaultCPUSet(allCPUs) klog.InfoS("Static policy initialized", "defaultCPUSet", allCPUs) + if managed.IsEnabled() { + defaultCpus := s.GetDefaultCPUSet().Difference(p.reservedCPUs) + s.SetDefaultCPUSet(defaultCpus) + } return nil } @@ -227,7 +232,9 @@ func (p *staticPolicy) validateState(s state.State) error { p.reservedCPUs.Intersection(tmpDefaultCPUset).String(), tmpDefaultCPUset.String()) } } else { - if !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) { + // 2. This only applies when managed mode is disabled. Active workload partitioning feature + // removes the reserved cpus from the default cpu mask on purpose. + if !managed.IsEnabled() && !p.reservedCPUs.Intersection(tmpDefaultCPUset).Equals(p.reservedCPUs) { return fmt.Errorf("not all reserved cpus: \"%s\" are present in defaultCpuSet: \"%s\"", p.reservedCPUs.String(), tmpDefaultCPUset.String()) } @@ -259,10 +266,17 @@ func (p *staticPolicy) validateState(s state.State) error { } } totalKnownCPUs = totalKnownCPUs.Union(tmpCPUSets...) - if !totalKnownCPUs.Equals(allCPUs) { - return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"", - allCPUs.String(), totalKnownCPUs.String()) + availableCPUs := p.topology.CPUDetails.CPUs() + // CPU (workload) partitioning removes reserved cpus + // from the default mask intentionally + if managed.IsEnabled() { + availableCPUs = availableCPUs.Difference(p.reservedCPUs) + } + + if !totalKnownCPUs.Equals(availableCPUs) { + return fmt.Errorf("current set of available CPUs \"%s\" doesn't match with CPUs in state \"%s\"", + availableCPUs.String(), totalKnownCPUs.String()) } return nil diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index 9e9618873cd8e..87983038f91a6 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -21,6 +21,8 @@ import ( "reflect" "testing" + "k8s.io/kubernetes/pkg/kubelet/managed" + v1 "k8s.io/api/core/v1" utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -961,18 +963,19 @@ func TestTopologyAwareAllocateCPUs(t *testing.T) { // above test cases are without kubelet --reserved-cpus cmd option // the following tests are with --reserved-cpus configured type staticPolicyTestWithResvList struct { - description string - topo *topology.CPUTopology - numReservedCPUs int - reserved cpuset.CPUSet - cpuPolicyOptions map[string]string - stAssignments state.ContainerCPUAssignments - stDefaultCPUSet cpuset.CPUSet - pod *v1.Pod - expErr error - expNewErr error - expCPUAlloc bool - expCSet cpuset.CPUSet + description string + topo *topology.CPUTopology + numReservedCPUs int + reserved cpuset.CPUSet + cpuPolicyOptions map[string]string + stAssignments state.ContainerCPUAssignments + stDefaultCPUSet cpuset.CPUSet + pod *v1.Pod + expErr error + expNewErr error + expCPUAlloc bool + expCSet cpuset.CPUSet + managementPartition bool } func TestStaticPolicyStartWithResvList(t *testing.T) { @@ -1024,9 +1027,31 @@ func TestStaticPolicyStartWithResvList(t *testing.T) { stDefaultCPUSet: cpuset.New(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), expNewErr: fmt.Errorf("[cpumanager] unable to reserve the required amount of CPUs (size of 0-1 did not equal 1)"), }, + { + description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled", + topo: topoDualSocketHT, + numReservedCPUs: 2, + stAssignments: state.ContainerCPUAssignments{}, + managementPartition: true, + expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + }, + { + description: "reserved cores 0 & 6 are not present in available cpuset when management partitioning is enabled during recovery", + topo: topoDualSocketHT, + numReservedCPUs: 2, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + managementPartition: true, + expCSet: cpuset.New(1, 2, 3, 4, 5, 7, 8, 9, 10, 11), + }, } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { + wasManaged := managed.IsEnabled() + managed.TestOnlySetEnabled(testCase.managementPartition) + defer func() { + managed.TestOnlySetEnabled(wasManaged) + }() featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.CPUManagerPolicyAlphaOptions, true) p, err := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, testCase.reserved, topologymanager.NewFakeManager(), testCase.cpuPolicyOptions) if !reflect.DeepEqual(err, testCase.expNewErr) { diff --git a/pkg/kubelet/cm/qos_container_manager_linux.go b/pkg/kubelet/cm/qos_container_manager_linux.go index 0f88e10ff69bc..499373eba3cd7 100644 --- a/pkg/kubelet/cm/qos_container_manager_linux.go +++ b/pkg/kubelet/cm/qos_container_manager_linux.go @@ -35,6 +35,7 @@ import ( "k8s.io/component-helpers/resource" v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" kubefeatures "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/managed" ) const ( @@ -174,6 +175,9 @@ func (m *qosContainerManagerImpl) setCPUCgroupConfig(configs map[v1.PodQOSClass] reuseReqs := make(v1.ResourceList, 4) for i := range pods { pod := pods[i] + if enabled, _, _ := managed.IsPodManaged(pod); enabled { + continue + } qosClass := v1qos.GetPodQOS(pod) if qosClass != v1.PodQOSBurstable { // we only care about the burstable qos tier diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 79e2af6ed6216..d2526ec989d97 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/kubelet/managed" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" utilio "k8s.io/utils/io" ) @@ -230,6 +231,16 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) { if podErr != nil { return pod, podErr } + if managed.IsEnabled() { + if newPod, _, err := managed.ModifyStaticPodForPinnedManagement(pod); err != nil { + klog.V(2).Error(err, "Static Pod is managed but errored", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace) + } else if newPod != nil { + klog.V(2).InfoS("Static Pod is managed. Using modified pod", "name", newPod.ObjectMeta.Name, "namespace", newPod.ObjectMeta.Namespace, "annotations", newPod.Annotations) + pod = newPod + } else { + klog.V(2).InfoS("Static Pod is not managed", "name", pod.ObjectMeta.Name, "namespace", pod.ObjectMeta.Namespace) + } + } return pod, nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e44d877c8fe01..414232775f159 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -95,6 +95,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/kuberuntime" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/logs" + "k8s.io/kubernetes/pkg/kubelet/managed" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics/collectors" "k8s.io/kubernetes/pkg/kubelet/network/dns" @@ -668,6 +669,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, klet.runtimeService = kubeDeps.RemoteRuntimeService + if managed.IsEnabled() { + klog.InfoS("Pinned Workload Management Enabled") + } + if kubeDeps.KubeClient != nil { klet.runtimeClassManager = runtimeclass.NewManager(kubeDeps.KubeClient) } diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 0505821a75bbb..6d7156070b613 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -42,7 +42,9 @@ import ( kubeletapis "k8s.io/kubelet/pkg/apis" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/managed" "k8s.io/kubernetes/pkg/kubelet/nodestatus" taintutil "k8s.io/kubernetes/pkg/util/taints" volutil "k8s.io/kubernetes/pkg/volume/util" @@ -131,6 +133,9 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { requiresUpdate = kl.updateDefaultLabels(node, existingNode) || requiresUpdate requiresUpdate = kl.reconcileExtendedResource(node, existingNode) || requiresUpdate requiresUpdate = kl.reconcileHugePageResource(node, existingNode) || requiresUpdate + if managed.IsEnabled() { + requiresUpdate = kl.addManagementNodeCapacity(node, existingNode) || requiresUpdate + } if requiresUpdate { if _, _, err := nodeutil.PatchNodeStatus(kl.kubeClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, existingNode); err != nil { klog.ErrorS(err, "Unable to reconcile node with API server,error updating node", "node", klog.KObj(node)) @@ -141,6 +146,25 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool { return true } +// addManagementNodeCapacity adds the managednode capacity to the node +func (kl *Kubelet) addManagementNodeCapacity(initialNode, existingNode *v1.Node) bool { + updateDefaultResources(initialNode, existingNode) + machineInfo, err := kl.cadvisor.MachineInfo() + if err != nil { + klog.Errorf("Unable to calculate managed node capacity for %q: %v", kl.nodeName, err) + return false + } + cpuRequest := cadvisor.CapacityFromMachineInfo(machineInfo)[v1.ResourceCPU] + cpuRequestInMilli := cpuRequest.MilliValue() + newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format) + managedResourceName := managed.GenerateResourceName("management") + if existingCapacity, ok := existingNode.Status.Capacity[managedResourceName]; ok && existingCapacity.Equal(*newCPURequest) { + return false + } + existingNode.Status.Capacity[managedResourceName] = *newCPURequest + return true +} + // reconcileHugePageResource will update huge page capacity for each page size and remove huge page sizes no longer supported func (kl *Kubelet) reconcileHugePageResource(initialNode, existingNode *v1.Node) bool { requiresUpdate := updateDefaultResources(initialNode, existingNode) @@ -432,6 +456,9 @@ func (kl *Kubelet) initialNode(ctx context.Context) (*v1.Node, error) { } } } + if managed.IsEnabled() { + kl.addManagementNodeCapacity(node, node) + } kl.setNodeStatus(ctx, node) diff --git a/pkg/kubelet/managed/cpu_shares.go b/pkg/kubelet/managed/cpu_shares.go new file mode 100644 index 0000000000000..de60b6d6e755e --- /dev/null +++ b/pkg/kubelet/managed/cpu_shares.go @@ -0,0 +1,30 @@ +package managed + +const ( + // These limits are defined in the kernel: + // https://github.com/torvalds/linux/blob/0bddd227f3dc55975e2b8dfa7fc6f959b062a2c7/kernel/sched/sched.h#L427-L428 + MinShares = 2 + MaxShares = 262144 + + SharesPerCPU = 1024 + MilliCPUToCPU = 1000 +) + +// MilliCPUToShares converts the milliCPU to CFS shares. +func MilliCPUToShares(milliCPU int64) uint64 { + if milliCPU == 0 { + // Docker converts zero milliCPU to unset, which maps to kernel default + // for unset: 1024. Return 2 here to really match kernel default for + // zero milliCPU. + return MinShares + } + // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding. + shares := (milliCPU * SharesPerCPU) / MilliCPUToCPU + if shares < MinShares { + return MinShares + } + if shares > MaxShares { + return MaxShares + } + return uint64(shares) +} diff --git a/pkg/kubelet/managed/managed.go b/pkg/kubelet/managed/managed.go new file mode 100644 index 0000000000000..3d9ff87aa625b --- /dev/null +++ b/pkg/kubelet/managed/managed.go @@ -0,0 +1,204 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package managed + +import ( + "encoding/json" + "fmt" + "os" + "strings" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" +) + +var ( + pinnedManagementEnabled bool + pinnedManagementFilename = "/etc/kubernetes/openshift-workload-pinning" +) + +const ( + qosWarning = "skipping pod CPUs requests modifications because it has guaranteed QoS class" + WorkloadsAnnotationPrefix = "target.workload.openshift.io/" + WorkloadsCapacitySuffix = "workload.openshift.io/cores" + ContainerAnnotationPrefix = "resources.workload.openshift.io/" + WorkloadAnnotationWarning = "workload.openshift.io/warning" +) + +type WorkloadContainerAnnotation struct { + CpuShares uint64 `json:"cpushares"` +} + +func NewWorkloadContainerAnnotation(cpushares uint64) WorkloadContainerAnnotation { + return WorkloadContainerAnnotation{ + CpuShares: cpushares, + } +} + +func (w WorkloadContainerAnnotation) Serialize() ([]byte, error) { + return json.Marshal(w) +} + +func init() { + readEnablementFile() +} + +func readEnablementFile() { + if _, err := os.Stat(pinnedManagementFilename); err == nil { + pinnedManagementEnabled = true + } +} + +// TestOnlySetEnabled allows changing the state of management partition enablement +// This method MUST NOT be used outside of test code +func TestOnlySetEnabled(enabled bool) { + pinnedManagementEnabled = enabled +} + +func IsEnabled() bool { + return pinnedManagementEnabled +} + +// IsPodManaged returns true and the name of the workload if enabled. +// returns true, workload name, and the annotation payload. +func IsPodManaged(pod *v1.Pod) (bool, string, string) { + if pod.ObjectMeta.Annotations == nil { + return false, "", "" + } + for annotation, value := range pod.ObjectMeta.Annotations { + if strings.HasPrefix(annotation, WorkloadsAnnotationPrefix) { + return true, strings.TrimPrefix(annotation, WorkloadsAnnotationPrefix), value + } + } + return false, "", "" +} + +// ModifyStaticPodForPinnedManagement will modify a pod for pod management +func ModifyStaticPodForPinnedManagement(pod *v1.Pod) (*v1.Pod, string, error) { + pod = pod.DeepCopy() + enabled, workloadName, value := IsPodManaged(pod) + if !enabled { + return nil, "", nil + } + if pod.Annotations == nil { + pod.Annotations = make(map[string]string) + } + if isPodGuaranteed(pod) { + stripWorkloadAnnotations(pod.Annotations) + pod.Annotations[WorkloadAnnotationWarning] = qosWarning + return pod, "", nil + } + pod.Annotations[fmt.Sprintf("%v%v", WorkloadsAnnotationPrefix, workloadName)] = value + if err := updateContainers(workloadName, pod); err != nil { + return nil, "", err + } + return pod, workloadName, nil +} + +func GenerateResourceName(workloadName string) v1.ResourceName { + return v1.ResourceName(fmt.Sprintf("%v.%v", workloadName, WorkloadsCapacitySuffix)) +} + +func updateContainers(workloadName string, pod *v1.Pod) error { + updateContainer := func(container *v1.Container) error { + if container.Resources.Requests == nil { + return fmt.Errorf("managed container %v does not have Resource.Requests", container.Name) + } + if _, ok := container.Resources.Requests[v1.ResourceCPU]; !ok { + return fmt.Errorf("managed container %v does not have cpu requests", container.Name) + } + if _, ok := container.Resources.Requests[v1.ResourceMemory]; !ok { + return fmt.Errorf("managed container %v does not have memory requests", container.Name) + } + if container.Resources.Limits == nil { + container.Resources.Limits = v1.ResourceList{} + } + cpuRequest := container.Resources.Requests[v1.ResourceCPU] + cpuRequestInMilli := cpuRequest.MilliValue() + + containerAnnotation := NewWorkloadContainerAnnotation(MilliCPUToShares(cpuRequestInMilli)) + jsonAnnotation, _ := containerAnnotation.Serialize() + containerNameKey := fmt.Sprintf("%v%v", ContainerAnnotationPrefix, container.Name) + + newCPURequest := resource.NewMilliQuantity(cpuRequestInMilli*1000, cpuRequest.Format) + + pod.Annotations[containerNameKey] = string(jsonAnnotation) + container.Resources.Requests[GenerateResourceName(workloadName)] = *newCPURequest + container.Resources.Limits[GenerateResourceName(workloadName)] = *newCPURequest + + delete(container.Resources.Requests, v1.ResourceCPU) + return nil + } + for idx := range pod.Spec.Containers { + if err := updateContainer(&pod.Spec.Containers[idx]); err != nil { + return err + } + } + for idx := range pod.Spec.InitContainers { + if err := updateContainer(&pod.Spec.InitContainers[idx]); err != nil { + return err + } + } + return nil +} + +// isPodGuaranteed checks if the pod has a guaranteed QoS. +// This QoS check is different from the library versions that check QoS, +// this is because of the order at which changes get observed. +// (i.e. the library assumes the defaulter has ran on the pod resource before calculating QoS). +// +// The files will get interpreted before they reach the API server and before the defaulter applies changes, +// this function takes into account the case where only `limits.cpu` are provided but no `requests.cpu` are since that +// counts as a Guaranteed QoS after the defaulter runs. +func isPodGuaranteed(pod *v1.Pod) bool { + isGuaranteed := func(containers []v1.Container) bool { + for _, c := range containers { + // only memory and CPU resources are relevant to decide pod QoS class + for _, r := range []v1.ResourceName{v1.ResourceMemory, v1.ResourceCPU} { + limit := c.Resources.Limits[r] + request, requestExist := c.Resources.Requests[r] + if limit.IsZero() { + return false + } + if !requestExist { + continue + } + // In some corner case, when you set CPU requests to 0 the k8s defaulter will change it to the value + // specified under the limit. + if r == v1.ResourceCPU && request.IsZero() { + continue + } + if !limit.Equal(request) { + return false + } + } + } + return true + } + return isGuaranteed(pod.Spec.InitContainers) && isGuaranteed(pod.Spec.Containers) +} + +func stripWorkloadAnnotations(annotations map[string]string) { + for k := range annotations { + if strings.HasPrefix(k, WorkloadsAnnotationPrefix) { + delete(annotations, k) + } + if strings.HasPrefix(k, ContainerAnnotationPrefix) { + delete(annotations, k) + } + } +} diff --git a/pkg/kubelet/managed/managed_test.go b/pkg/kubelet/managed/managed_test.go new file mode 100644 index 0000000000000..16acda0868cdb --- /dev/null +++ b/pkg/kubelet/managed/managed_test.go @@ -0,0 +1,675 @@ +package managed + +import ( + "fmt" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestModifyStaticPodForPinnedManagementErrorStates(t *testing.T) { + + workloadAnnotations := map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + } + + testCases := []struct { + pod *v1.Pod + expectedError error + }{ + { + pod: createPod(workloadAnnotations, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: nil, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have Resource.Requests"), + }, + { + pod: createPod(workloadAnnotations, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have cpu requests"), + }, + { + pod: createPod(workloadAnnotations, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + }, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have memory requests"), + }, + { + pod: createPod(workloadAnnotations, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, nil), + expectedError: fmt.Errorf("managed container nginx does not have cpu requests"), + }, + { + pod: createPod(workloadAnnotations, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + }, + }, + }, nil), + expectedError: fmt.Errorf("managed container nginx does not have memory requests"), + }, + { + pod: createPod(nil, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have cpu requests"), + }, + { + pod: createPod(map[string]string{"something": "else"}, nil, + &v1.Container{ + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + }, + }, + }), + expectedError: fmt.Errorf("managed container nginx does not have memory requests"), + }, + } + + for _, tc := range testCases { + pod, workloadName, err := ModifyStaticPodForPinnedManagement(tc.pod) + if err != nil && err.Error() != tc.expectedError.Error() { + t.Errorf("ModifyStaticPodForPinned got error of (%v) but expected (%v)", err, tc.expectedError) + } + if pod != nil { + t.Errorf("ModifyStaticPodForPinned should return pod with nil value") + } + if workloadName != "" { + t.Errorf("ModifyStaticPodForPinned should return empty workloadName but got %v", workloadName) + } + } +} + +func TestStaticPodManaged(t *testing.T) { + testCases := []struct { + pod *v1.Pod + expectedAnnotations map[string]string + isGuaranteed bool + }{ + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/nginx": `{"cpushares":102}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + { + Name: "c2", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + { + Name: "c_3", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/c1": `{"cpushares":102}`, + "resources.workload.openshift.io/c2": `{"cpushares":1024}`, + "resources.workload.openshift.io/c_3": `{"cpushares":1024}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("20m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/c1": `{"cpushares":20}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/c1": `{"cpushares":20}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/management": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("0m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + } + + for _, tc := range testCases { + pod, workloadName, err := ModifyStaticPodForPinnedManagement(tc.pod) + if err != nil { + t.Errorf("ModifyStaticPodForPinned should not error") + } + for expectedKey, expectedValue := range tc.expectedAnnotations { + value, exists := pod.Annotations[expectedKey] + if !exists { + t.Errorf("%v key not found", expectedKey) + } + if expectedValue != value { + t.Errorf("'%v' key's value does not equal '%v' and got '%v'", expectedKey, expectedValue, value) + } + } + for _, container := range pod.Spec.Containers { + if container.Resources.Requests.Cpu().String() != "0" && !tc.isGuaranteed { + t.Errorf("cpu requests should be 0 got %v", container.Resources.Requests.Cpu().String()) + } + if container.Resources.Requests.Memory().String() == "0" && !tc.isGuaranteed { + t.Errorf("memory requests were %v but should be %v", container.Resources.Requests.Memory().String(), container.Resources.Requests.Memory().String()) + } + if _, exists := container.Resources.Requests[GenerateResourceName(workloadName)]; !exists && !tc.isGuaranteed { + t.Errorf("managed capacity label missing from pod %v and container %v", tc.pod.Name, container.Name) + } + if _, exists := container.Resources.Limits[GenerateResourceName(workloadName)]; !exists && !tc.isGuaranteed { + t.Errorf("managed capacity label missing from pod %v and container %v limits", tc.pod.Name, container.Name) + } + } + } +} + +func TestStaticPodThrottle(t *testing.T) { + testCases := []struct { + pod *v1.Pod + expectedAnnotations map[string]string + isGuaranteed bool + }{ + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "nginx", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/nginx": `{"cpushares":102}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + { + Name: "c2", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + { + Name: "c_3", + Image: "test/image", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("1"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + "resources.workload.openshift.io/c1": `{"cpushares":102}`, + "resources.workload.openshift.io/c2": `{"cpushares":1024}`, + "resources.workload.openshift.io/c_3": `{"cpushares":1024}`, + }, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + { + pod: &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{ + "target.workload.openshift.io/throttle": `{"effect": "PreferredDuringScheduling"}`, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "c1", + Image: "test/nginx", + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse("100m"), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("100m"), + }, + }, + }, + }, + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + expectedAnnotations: map[string]string{ + WorkloadAnnotationWarning: qosWarning, + }, + isGuaranteed: true, + }, + } + + for _, tc := range testCases { + pod, workloadName, err := ModifyStaticPodForPinnedManagement(tc.pod) + if err != nil { + t.Errorf("ModifyStaticPodForPinned should not error") + } + for expectedKey, expectedValue := range tc.expectedAnnotations { + value, exists := pod.Annotations[expectedKey] + if !exists { + t.Errorf("%v key not found", expectedKey) + } + if expectedValue != value { + t.Errorf("'%v' key's value does not equal '%v' and got '%v'", expectedKey, expectedValue, value) + } + } + for _, container := range pod.Spec.Containers { + if container.Resources.Requests.Cpu().String() != "0" && !tc.isGuaranteed { + t.Errorf("cpu requests should be 0 got %v", container.Resources.Requests.Cpu().String()) + } + if container.Resources.Requests.Memory().String() == "0" && !tc.isGuaranteed { + t.Errorf("memory requests were %v but should be %v", container.Resources.Requests.Memory().String(), container.Resources.Requests.Memory().String()) + } + if _, exists := container.Resources.Requests[GenerateResourceName(workloadName)]; !exists && !tc.isGuaranteed { + t.Errorf("managed capacity label missing from pod %v and container %v", tc.pod.Name, container.Name) + } + if _, exists := container.Resources.Limits[GenerateResourceName(workloadName)]; !exists && !tc.isGuaranteed { + t.Errorf("managed limits capacity label missing from pod %v and container %v", tc.pod.Name, container.Name) + } + } + } +} + +func createPod(annotations map[string]string, initContainer, container *v1.Container) *v1.Pod { + pod := &v1.Pod{ + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + Annotations: annotations, + }, + Spec: v1.PodSpec{ + SecurityContext: &v1.PodSecurityContext{}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + } + + if initContainer != nil { + pod.Spec.InitContainers = append(pod.Spec.InitContainers, *initContainer) + } + + if container != nil { + pod.Spec.Containers = append(pod.Spec.Containers, *container) + } + + return pod +}