1
0

Merge commit 'a881aea5f1a87929fb768f05a395cbcaf9812f30' into prow-update-master

This commit is contained in:
Patrick Ohly
2021-11-30 21:17:23 +01:00
2385 changed files with 883714 additions and 14 deletions

View File

@@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "{}"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright {yyyy} {name of copyright owner}
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,17 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller // import "sigs.k8s.io/sig-storage-lib-external-provisioner/v6/controller"

View File

@@ -0,0 +1,17 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics // import "sigs.k8s.io/sig-storage-lib-external-provisioner/v6/controller/metrics"

View File

@@ -0,0 +1,117 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
const (
// ControllerSubsystem is prometheus subsystem name.
ControllerSubsystem = "controller"
)
// Metrics contains the metrics for a certain subsystem name.
type Metrics struct {
// PersistentVolumeClaimProvisionTotal is used to collect accumulated count of persistent volumes provisioned.
PersistentVolumeClaimProvisionTotal *prometheus.CounterVec
// PersistentVolumeClaimProvisionFailedTotal is used to collect accumulated count of persistent volume provision failed attempts.
PersistentVolumeClaimProvisionFailedTotal *prometheus.CounterVec
// PersistentVolumeClaimProvisionDurationSeconds is used to collect latency in seconds to provision persistent volumes.
PersistentVolumeClaimProvisionDurationSeconds *prometheus.HistogramVec
// PersistentVolumeDeleteTotal is used to collect accumulated count of persistent volumes deleted.
PersistentVolumeDeleteTotal *prometheus.CounterVec
// PersistentVolumeDeleteFailedTotal is used to collect accumulated count of persistent volume delete failed attempts.
PersistentVolumeDeleteFailedTotal *prometheus.CounterVec
// PersistentVolumeDeleteDurationSeconds is used to collect latency in seconds to delete persistent volumes.
PersistentVolumeDeleteDurationSeconds *prometheus.HistogramVec
}
// M contains the metrics with ControllerSubsystem as subsystem name.
var M = New(ControllerSubsystem)
// These variables are defined merely for API compatibility.
var (
// PersistentVolumeClaimProvisionTotal is used to collect accumulated count of persistent volumes provisioned.
PersistentVolumeClaimProvisionTotal = M.PersistentVolumeClaimProvisionTotal
// PersistentVolumeClaimProvisionFailedTotal is used to collect accumulated count of persistent volume provision failed attempts.
PersistentVolumeClaimProvisionFailedTotal = M.PersistentVolumeClaimProvisionFailedTotal
// PersistentVolumeClaimProvisionDurationSeconds is used to collect latency in seconds to provision persistent volumes.
PersistentVolumeClaimProvisionDurationSeconds = M.PersistentVolumeClaimProvisionDurationSeconds
// PersistentVolumeDeleteTotal is used to collect accumulated count of persistent volumes deleted.
PersistentVolumeDeleteTotal = M.PersistentVolumeDeleteTotal
// PersistentVolumeDeleteFailedTotal is used to collect accumulated count of persistent volume delete failed attempts.
PersistentVolumeDeleteFailedTotal = M.PersistentVolumeDeleteFailedTotal
// PersistentVolumeDeleteDurationSeconds is used to collect latency in seconds to delete persistent volumes.
PersistentVolumeDeleteDurationSeconds = M.PersistentVolumeDeleteDurationSeconds
)
// New creates a new set of metrics with the goven subsystem name.
func New(subsystem string) Metrics {
return Metrics{
PersistentVolumeClaimProvisionTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "persistentvolumeclaim_provision_total",
Help: "Total number of persistent volumes provisioned succesfully. Broken down by storage class name.",
},
[]string{"class"},
),
PersistentVolumeClaimProvisionFailedTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "persistentvolumeclaim_provision_failed_total",
Help: "Total number of persistent volume provision failed attempts. Broken down by storage class name.",
},
[]string{"class"},
),
PersistentVolumeClaimProvisionDurationSeconds: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: "persistentvolumeclaim_provision_duration_seconds",
Help: "Latency in seconds to provision persistent volumes. Failed provisioning attempts are ignored. Broken down by storage class name.",
Buckets: prometheus.DefBuckets,
},
[]string{"class"},
),
PersistentVolumeDeleteTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "persistentvolume_delete_total",
Help: "Total number of persistent volumes deleted succesfully. Broken down by storage class name.",
},
[]string{"class"},
),
PersistentVolumeDeleteFailedTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: subsystem,
Name: "persistentvolume_delete_failed_total",
Help: "Total number of persistent volume delete failed attempts. Broken down by storage class name.",
},
[]string{"class"},
),
PersistentVolumeDeleteDurationSeconds: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: subsystem,
Name: "persistentvolume_delete_duration_seconds",
Help: "Latency in seconds to delete persistent volumes. Failed deletion attempts are ignored. Broken down by storage class name.",
Buckets: prometheus.DefBuckets,
},
[]string{"class"},
),
}
}

View File

@@ -0,0 +1,133 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"k8s.io/api/core/v1"
storageapis "k8s.io/api/storage/v1"
)
// Provisioner is an interface that creates templates for PersistentVolumes
// and can create the volume as a new resource in the infrastructure provider.
// It can also remove the volume it created from the underlying storage
// provider.
type Provisioner interface {
// Provision creates a volume i.e. the storage asset and returns a PV object
// for the volume. The provisioner can return an error (e.g. timeout) and state
// ProvisioningInBackground to tell the controller that provisioning may be in
// progress after Provision() finishes. The controller will call Provision()
// again with the same parameters, assuming that the provisioner continues
// provisioning the volume. The provisioner must return either final error (with
// ProvisioningFinished) or success eventually, otherwise the controller will try
// forever (unless FailedProvisionThreshold is set).
Provision(context.Context, ProvisionOptions) (*v1.PersistentVolume, ProvisioningState, error)
// Delete removes the storage asset that was created by Provision backing the
// given PV. Does not delete the PV object itself.
//
// May return IgnoredError to indicate that the call has been ignored and no
// action taken.
Delete(context.Context, *v1.PersistentVolume) error
}
// Qualifier is an optional interface implemented by provisioners to determine
// whether a claim should be provisioned as early as possible (e.g. prior to
// leader election).
type Qualifier interface {
// ShouldProvision returns whether provisioning for the claim should
// be attempted.
ShouldProvision(context.Context, *v1.PersistentVolumeClaim) bool
}
// DeletionGuard is an optional interface implemented by provisioners to determine
// whether a PV should be deleted.
type DeletionGuard interface {
// ShouldDelete returns whether deleting the PV should be attempted.
ShouldDelete(context.Context, *v1.PersistentVolume) bool
}
// BlockProvisioner is an optional interface implemented by provisioners to determine
// whether it supports block volume.
type BlockProvisioner interface {
Provisioner
// SupportsBlock returns whether provisioner supports block volume.
SupportsBlock(context.Context) bool
}
// ProvisioningState is state of volume provisioning. It tells the controller if
// provisioning could be in progress in the background after Provision() call
// returns or the provisioning is 100% finished (either with success or error).
type ProvisioningState string
const (
// ProvisioningInBackground tells the controller that provisioning may be in
// progress in background after Provision call finished.
ProvisioningInBackground ProvisioningState = "Background"
// ProvisioningFinished tells the controller that provisioning for sure does
// not continue in background, error code of Provision() is final.
ProvisioningFinished ProvisioningState = "Finished"
// ProvisioningNoChange tells the controller that provisioning state is the same as
// before the call - either ProvisioningInBackground or ProvisioningFinished from
// the previous Provision(). This state is typically returned by a provisioner
// before it could reach storage backend - the provisioner could not check status
// of provisioning and previous state applies. If this state is returned from the
// first Provision call, ProvisioningFinished is assumed (the provisioning
// could not even start).
ProvisioningNoChange ProvisioningState = "NoChange"
// ProvisioningReschedule tells the controller that it shall stop all further
// attempts to provision the volume and instead ask the Kubernetes scheduler
// to pick a different node. This only makes sense for volumes with a selected
// node, i.e. those with late binding, and must only be returned when it is certain
// that provisioning does not continue in the background. The error returned together
// with this state contains further information why rescheduling is needed.
ProvisioningReschedule ProvisioningState = "Reschedule"
)
// IgnoredError is the value for Delete to return to indicate that the call has
// been ignored and no action taken. In case multiple provisioners are serving
// the same storage class, provisioners may ignore PVs they are not responsible
// for (e.g. ones they didn't create). The controller will act accordingly,
// i.e. it won't emit a misleading VolumeFailedDelete event.
type IgnoredError struct {
Reason string
}
func (e *IgnoredError) Error() string {
return fmt.Sprintf("ignored because %s", e.Reason)
}
// ProvisionOptions contains all information required to provision a volume
type ProvisionOptions struct {
// StorageClass is a reference to the storage class that is used for
// provisioning for this volume
StorageClass *storageapis.StorageClass
// PV.Name of the appropriate PersistentVolume. Used to generate cloud
// volume name.
PVName string
// PVC is reference to the claim that lead to provisioning of a new PV.
// Provisioners *must* create a PV that would be matched by this PVC,
// i.e. with required capacity, accessMode, labels matching PVC.Selector and
// so on.
PVC *v1.PersistentVolumeClaim
// Node selected by the scheduler for the volume.
SelectedNode *v1.Node
}

View File

@@ -0,0 +1,269 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"sync"
"time"
v1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
)
// VolumeStore is an interface that's used to save PersistentVolumes to API server.
// Implementation of the interface add custom error recovery policy.
// A volume is added via StoreVolume(). It's enough to store the volume only once.
// It is not possible to remove a volume, even when corresponding PVC is deleted
// and PV is not necessary any longer. PV will be always created.
// If corresponding PVC is deleted, the PV will be deleted by Kubernetes using
// standard deletion procedure. It saves us some code here.
type VolumeStore interface {
// StoreVolume makes sure a volume is saved to Kubernetes API server.
// If no error is returned, caller can assume that PV was saved or
// is being saved in background.
// In error is returned, no PV was saved and corresponding PVC needs
// to be re-queued (so whole provisioning needs to be done again).
StoreVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error
// Runs any background goroutines for implementation of the interface.
Run(ctx context.Context, threadiness int)
}
// queueStore is implementation of VolumeStore that re-tries saving
// PVs to API server using a workqueue running in its own goroutine(s).
// After failed save, volume is re-qeueued with exponential backoff.
type queueStore struct {
client kubernetes.Interface
queue workqueue.RateLimitingInterface
eventRecorder record.EventRecorder
claimsIndexer cache.Indexer
volumes sync.Map
}
var _ VolumeStore = &queueStore{}
// NewVolumeStoreQueue returns VolumeStore that uses asynchronous workqueue to save PVs.
func NewVolumeStoreQueue(
client kubernetes.Interface,
limiter workqueue.RateLimiter,
claimsIndexer cache.Indexer,
eventRecorder record.EventRecorder,
) VolumeStore {
return &queueStore{
client: client,
queue: workqueue.NewNamedRateLimitingQueue(limiter, "unsavedpvs"),
claimsIndexer: claimsIndexer,
eventRecorder: eventRecorder,
}
}
func (q *queueStore) StoreVolume(_ *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error {
if err := q.doSaveVolume(volume); err != nil {
q.volumes.Store(volume.Name, volume)
q.queue.Add(volume.Name)
klog.Errorf("Failed to save volume %s: %s", volume.Name, err)
}
// Consume any error, this Store will retry in background.
return nil
}
func (q *queueStore) Run(ctx context.Context, threadiness int) {
klog.Infof("Starting save volume queue")
defer q.queue.ShutDown()
for i := 0; i < threadiness; i++ {
go wait.Until(q.saveVolumeWorker, time.Second, ctx.Done())
}
<-ctx.Done()
klog.Infof("Stopped save volume queue")
}
func (q *queueStore) saveVolumeWorker() {
for q.processNextWorkItem() {
}
}
func (q *queueStore) processNextWorkItem() bool {
obj, shutdown := q.queue.Get()
defer q.queue.Done(obj)
if shutdown {
return false
}
var volumeName string
var ok bool
if volumeName, ok = obj.(string); !ok {
q.queue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in save workqueue but got %#v", obj))
return true
}
volumeObj, found := q.volumes.Load(volumeName)
if !found {
q.queue.Forget(volumeName)
utilruntime.HandleError(fmt.Errorf("did not find saved volume %s", volumeName))
return true
}
volume, ok := volumeObj.(*v1.PersistentVolume)
if !ok {
q.queue.Forget(volumeName)
utilruntime.HandleError(fmt.Errorf("saved object is not volume: %+v", volumeObj))
return true
}
if err := q.doSaveVolume(volume); err != nil {
q.queue.AddRateLimited(volumeName)
utilruntime.HandleError(err)
klog.V(5).Infof("Volume %s enqueued", volume.Name)
return true
}
q.volumes.Delete(volumeName)
q.queue.Forget(volumeName)
return true
}
func (q *queueStore) doSaveVolume(volume *v1.PersistentVolume) error {
klog.V(5).Infof("Saving volume %s", volume.Name)
_, err := q.client.CoreV1().PersistentVolumes().Create(context.Background(), volume, metav1.CreateOptions{})
if err == nil || apierrs.IsAlreadyExists(err) {
klog.V(5).Infof("Volume %s saved", volume.Name)
q.sendSuccessEvent(volume)
return nil
}
return fmt.Errorf("error saving volume %s: %s", volume.Name, err)
}
func (q *queueStore) sendSuccessEvent(volume *v1.PersistentVolume) {
claimObjs, err := q.claimsIndexer.ByIndex(uidIndex, string(volume.Spec.ClaimRef.UID))
if err != nil {
klog.V(2).Infof("Error sending event to claim %s: %s", volume.Spec.ClaimRef.UID, err)
return
}
if len(claimObjs) != 1 {
return
}
claim, ok := claimObjs[0].(*v1.PersistentVolumeClaim)
if !ok {
return
}
msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name)
q.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg)
}
// backoffStore is implementation of VolumeStore that blocks and tries to save
// a volume to API server with configurable backoff. If saving fails,
// StoreVolume() deletes the storage asset in the end and returns appropriate
// error code.
type backoffStore struct {
client kubernetes.Interface
eventRecorder record.EventRecorder
backoff *wait.Backoff
ctrl *ProvisionController
}
var _ VolumeStore = &backoffStore{}
// NewBackoffStore returns VolumeStore that uses blocking exponential backoff to save PVs.
func NewBackoffStore(client kubernetes.Interface,
eventRecorder record.EventRecorder,
backoff *wait.Backoff,
ctrl *ProvisionController,
) VolumeStore {
return &backoffStore{
client: client,
eventRecorder: eventRecorder,
backoff: backoff,
ctrl: ctrl,
}
}
func (b *backoffStore) StoreVolume(claim *v1.PersistentVolumeClaim, volume *v1.PersistentVolume) error {
// Try to create the PV object several times
var lastSaveError error
err := wait.ExponentialBackoff(*b.backoff, func() (bool, error) {
klog.Infof("Trying to save persistentvolume %q", volume.Name)
var err error
if _, err = b.client.CoreV1().PersistentVolumes().Create(context.Background(), volume, metav1.CreateOptions{}); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
klog.Infof("persistentvolume %q already exists, reusing", volume.Name)
} else {
klog.Infof("persistentvolume %q saved", volume.Name)
}
return true, nil
}
// Save failed, try again after a while.
klog.Infof("Failed to save persistentvolume %q: %v", volume.Name, err)
lastSaveError = err
return false, nil
})
if err == nil {
// Save succeeded
msg := fmt.Sprintf("Successfully provisioned volume %s", volume.Name)
b.eventRecorder.Event(claim, v1.EventTypeNormal, "ProvisioningSucceeded", msg)
return nil
}
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError)
klog.Error(strerr)
b.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr)
var lastDeleteError error
err = wait.ExponentialBackoff(*b.backoff, func() (bool, error) {
if err = b.ctrl.provisioner.Delete(context.Background(), volume); err == nil {
// Delete succeeded
klog.Infof("Cleaning volume %q succeeded", volume.Name)
return true, nil
}
// Delete failed, try again after a while.
klog.Infof("Failed to clean volume %q: %v", volume.Name, err)
lastDeleteError = err
return false, nil
})
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError)
klog.Error(strerr)
b.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}
return lastSaveError
}
func (b *backoffStore) Run(ctx context.Context, threadiness int) {
// There is not background processing
}

View File

@@ -0,0 +1,17 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util // import "sigs.k8s.io/sig-storage-lib-external-provisioner/v6/util"

View File

@@ -0,0 +1,161 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"net"
"github.com/miekg/dns"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
glog "k8s.io/klog"
)
// Common allocation units
const (
KiB int64 = 1024
MiB int64 = 1024 * KiB
GiB int64 = 1024 * MiB
TiB int64 = 1024 * GiB
)
// RoundUpSize calculates how many allocation units are needed to accommodate
// a volume of given size. E.g. when user wants 1500MiB volume, while AWS EBS
// allocates volumes in gibibyte-sized chunks,
// RoundUpSize(1500 * 1024*1024, 1024*1024*1024) returns '2'
// (2 GiB is the smallest allocatable volume that can hold 1500MiB)
func RoundUpSize(volumeSizeBytes int64, allocationUnitBytes int64) int64 {
return (volumeSizeBytes + allocationUnitBytes - 1) / allocationUnitBytes
}
// RoundUpToGiB rounds up given quantity upto chunks of GiB
func RoundUpToGiB(sizeBytes int64) int64 {
return RoundUpSize(sizeBytes, GiB)
}
// AccessModesContains returns whether the requested mode is contained by modes
func AccessModesContains(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAccessMode) bool {
for _, m := range modes {
if m == mode {
return true
}
}
return false
}
// AccessModesContainedInAll returns whether all of the requested modes are contained by modes
func AccessModesContainedInAll(indexedModes []v1.PersistentVolumeAccessMode, requestedModes []v1.PersistentVolumeAccessMode) bool {
for _, mode := range requestedModes {
if !AccessModesContains(indexedModes, mode) {
return false
}
}
return true
}
// GetPersistentVolumeClass returns StorageClassName.
func GetPersistentVolumeClass(volume *v1.PersistentVolume) string {
// Use beta annotation first
if class, found := volume.Annotations[v1.BetaStorageClassAnnotation]; found {
return class
}
return volume.Spec.StorageClassName
}
// GetPersistentVolumeClaimClass returns StorageClassName. If no storage class was
// requested, it returns "".
func GetPersistentVolumeClaimClass(claim *v1.PersistentVolumeClaim) string {
// Use beta annotation first
if class, found := claim.Annotations[v1.BetaStorageClassAnnotation]; found {
return class
}
if claim.Spec.StorageClassName != nil {
return *claim.Spec.StorageClassName
}
return ""
}
// CheckPersistentVolumeClaimModeBlock checks VolumeMode.
// If the mode is Block, return true otherwise return false.
func CheckPersistentVolumeClaimModeBlock(pvc *v1.PersistentVolumeClaim) bool {
return pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == v1.PersistentVolumeBlock
}
// FindDNSIP looks up the cluster DNS service by label "coredns", falling back to "kube-dns" if not found
func FindDNSIP(ctx context.Context, client kubernetes.Interface) (dnsip string) {
// find DNS server address through client API
// cache result in rbdProvisioner
var dnssvc *v1.Service
coredns, err := client.CoreV1().Services(metav1.NamespaceSystem).Get(ctx, "coredns", metav1.GetOptions{})
if err != nil {
glog.Warningf("error getting coredns service: %v. Falling back to kube-dns\n", err)
kubedns, err := client.CoreV1().Services(metav1.NamespaceSystem).Get(ctx, "kube-dns", metav1.GetOptions{})
if err != nil {
glog.Errorf("error getting kube-dns service: %v\n", err)
return ""
}
dnssvc = kubedns
} else {
dnssvc = coredns
}
if len(dnssvc.Spec.ClusterIP) == 0 {
glog.Errorf("DNS service ClusterIP bad\n")
return ""
}
return dnssvc.Spec.ClusterIP
}
// LookupHost looks up IP addresses of hostname on specified DNS server
func LookupHost(hostname string, serverip string) (iplist []string, err error) {
glog.V(4).Infof("lookuphost %q on %q\n", hostname, serverip)
m := new(dns.Msg)
m.SetQuestion(dns.Fqdn(hostname), dns.TypeA)
in, err := dns.Exchange(m, JoinHostPort(serverip, "53"))
if err != nil {
glog.Errorf("dns lookup of %q failed: err %v", hostname, err)
return nil, err
}
for _, a := range in.Answer {
glog.V(4).Infof("lookuphost answer: %v\n", a)
if t, ok := a.(*dns.A); ok {
iplist = append(iplist, t.A.String())
}
}
return iplist, nil
}
// SplitHostPort split a string into host and port (port is optional)
func SplitHostPort(hostport string) (host, port string) {
host, port, err := net.SplitHostPort(hostport)
if err != nil {
host, port = hostport, ""
}
return host, port
}
// JoinHostPort joins a hostname and an optional port
func JoinHostPort(host, port string) (hostport string) {
if port != "" {
return net.JoinHostPort(host, port)
}
return host
}