kubernetes添加第三方存储(三)

释放双眼,带上耳机,听听看~!

前面两篇文章已经介绍了整体的内容,下面就已对青云的存储为例,先提供一个Provision的实现


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
1func (c *volumeProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
2    glog.V(4).Infof("qingcloudVolumeProvisioner Provision called, options: [%+v]", options)
3
4    // TODO: implement PVC.Selector parsing
5    if options.PVC.Spec.Selector != nil {
6        return nil, fmt.Errorf("claim.Spec.Selector is not supported for dynamic provisioning on qingcloud")
7    }
8
9    // Validate access modes
10    found := false
11    for _, mode := range options.PVC.Spec.AccessModes {
12        if mode == v1.ReadWriteOnce {
13            found = true
14        }
15    }
16    if !found {
17        return nil, fmt.Errorf("Qingcloud volume only supports ReadWriteOnce mounts")
18    }
19
20    volumeOptions := &VolumeOptions{}
21
22    hasSetType := false
23    for k, v := range options.Parameters {
24        switch strings.ToLower(k) {
25        case "type":
26            if !supportVolumeTypes.Has(v) {
27                return nil, fmt.Errorf("invalid option '%q' for qingcloud-volume-provisioner, it only can be 0, 2, 3",
28                    k)
29            }
30            volumeTypeInt, _ := strconv.Atoi(v)
31            volumeOptions.VolumeType = VolumeType(volumeTypeInt)
32            hasSetType = true
33        default:
34            return nil, fmt.Errorf("invalid option '%q' for qingcloud-volume-provisioner", k)
35        }
36    }
37
38    //auto set volume type by instance class.
39    if !hasSetType {
40        volumeOptions.VolumeType = c.manager.GetDefaultVolumeType()
41    }
42
43    capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
44    sizeGB, err := RoundUpVolumeCapacity(capacity, volumeOptions.VolumeType)
45    if err != nil {
46        return nil, err
47    }
48    volumeOptions.CapacityGB = sizeGB
49
50    //use pv name as volumeName
51    volumeOptions.VolumeName = options.PVName
52    volumeID, err := c.manager.CreateVolume(volumeOptions)
53    if err != nil {
54        glog.V(2).Infof("Error creating qingcloud volume: %v", err)
55        return nil, err
56    }
57    glog.V(2).Infof("Successfully created qingcloud volume %s", volumeID)
58
59    storageClassName := ""
60    if options.PVC.Spec.StorageClassName != nil {
61        storageClassName = *options.PVC.Spec.StorageClassName
62    }
63
64    annotations := make(map[string]string)
65    annotations[annCreatedBy] = createdBy
66    annotations[annProvisionerId] = ProvisionerName
67
68    flexVolumeConfig := make(map[string]string)
69    flexVolumeConfig[OptionVolumeID] = volumeID
70
71    fsType ,ok := options.PVC.ObjectMeta.Annotations["kubernetes.io/fsType"]
72    if !ok{
73        fsType = DefaultFSType
74    }
75    pv := &v1.PersistentVolume{
76        ObjectMeta: metav1.ObjectMeta{
77            Name:        options.PVName,
78            Labels:      map[string]string{},
79            Annotations: annotations,
80        },
81        Spec: v1.PersistentVolumeSpec{
82            PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
83            AccessModes:                   []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
84            Capacity: v1.ResourceList{
85                v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGB)),
86            },
87            StorageClassName: storageClassName,
88            PersistentVolumeSource: v1.PersistentVolumeSource{
89                FlexVolume: &v1.FlexVolumeSource{
90                    Driver:   FlexDriverName,
91                    FSType:   fsType,
92                    ReadOnly: false,
93                    Options:  flexVolumeConfig,
94                },
95            },
96        },
97    }
98
99    return pv, nil
100}
101

上面的代码就是创建pv的整个过程

然后是删除的方法


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
1func (c *volumeProvisioner) Delete(volume *v1.PersistentVolume) error {
2    if volume.Name == "" {
3        return fmt.Errorf("volume name cannot be empty %#v", volume)
4    }
5
6    if volume.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimRetain {
7        if volume.Spec.PersistentVolumeSource.FlexVolume == nil {
8            return fmt.Errorf("volume [%s] not support by qingcloud-volume-provisioner", volume.Name)
9        }
10        volumeID := volume.Spec.PersistentVolumeSource.FlexVolume.Options["volumeID"]
11        if volumeID == "" {
12            return fmt.Errorf("Spec.PersistentVolumeSource.FlexVolume.Options[\"volumeID\"]  cannot be empty %#v", volume)
13        }
14        _, err := c.manager.DeleteVolume(volumeID)
15        if err != nil {
16            return err
17        }
18        return nil
19    }
20
21    return nil
22}
23

然后是二进制执行文件


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1func handler(op string, args []string) flex.VolumeResult {
2    volumePlugin, err := qingcloud.NewFlexVolumePlugin()
3
4    if err != nil {
5        return flex.NewVolumeError("Error init FlexVolumePlugin")
6    }
7
8    var ret flex.VolumeResult
9
10    switch op {
11    case "init":
12        ret = volumePlugin.Init()
13    case "attach":
14        if len(args) < 2 {
15            return flex.NewVolumeError("attach requires options in json format and a node name")
16        }
17        ret = volumePlugin.Attach(ensureVolumeOptions(args[0]), args[1])
18    case "isattached":
19        if len(args) < 2 {
20            return flex.NewVolumeError("isattached requires options in json format and a node name")
21        }
22        ret = volumePlugin.IsAttached(ensureVolumeOptions(args[0]), args[1])
23    case "detach":
24        if len(args) < 2 {
25            return flex.NewVolumeError("detach requires a device path and a node name")
26        }
27        ret = volumePlugin.Detach(args[0], args[1])
28    case "mountdevice":
29        if len(args) < 3 {
30            return flex.NewVolumeError("mountdevice requires a mount path, a device path and mount options")
31        }
32        ret = volumePlugin.MountDevice(args[0], args[1], ensureVolumeOptions(args[2]))
33    case "unmountdevice":
34        if len(args) < 1 {
35            return flex.NewVolumeError("unmountdevice requires a mount path")
36        }
37        ret = volumePlugin.UnmountDevice(args[0])
38    case "waitforattach":
39        if len(args) < 2 {
40            return flex.NewVolumeError("waitforattach requires a device path and options in json format")
41        }
42        ret = volumePlugin.WaitForAttach(args[0], ensureVolumeOptions(args[1]))
43    case "getvolumename":
44        if len(args) < 1 {
45            return flex.NewVolumeError("getvolumename requires options in json format")
46        }
47        ret = volumePlugin.GetVolumeName(ensureVolumeOptions(args[0]))
48    default:
49        ret = flex.NewVolumeNotSupported(op)
50    }
51    return ret
52}
53

实现了attach、MountDevice等方法。具体以一个attached为例


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1func (p *flexVolumePlugin) Attach(options flex.VolumeOptions, node string) flex.VolumeResult {
2    glog.V(4).Infof("Attach")
3    volumeID, _ := options[OptionVolumeID].(string)
4    pvOrVolumeName, _ := options[OptionPVorVolumeName].(string)
5    // flexVolumeDriver GetVolumeName is not yet supported,  so PVorVolumeName is pvName, and store pvName to volumeName
6    if !isVolumeID(pvOrVolumeName) {
7        err := p.manager.UpdateVolume(volumeID, pvOrVolumeName)
8        if err != nil {
9            return flex.NewVolumeError("Error updating volume (%s) name to (%s) : %s", volumeID, pvOrVolumeName, err.Error())
10        }
11    }
12    // VolumeManager.AttachVolume checks if disk is already attached to node and
13    // succeeds in that case, so no need to do that separately.
14    _, err := p.manager.AttachVolume(volumeID, node)
15
16    if err != nil {
17        //ignore already attached error
18        if !strings.Contains(err.Error(), "have been already attached to instance") {
19            glog.Errorf("Error attaching volume %q: %+v", volumeID, err)
20            return flex.NewVolumeError("Error attaching volume %q to node %s: %+v", volumeID, node, err)
21        }
22    }
23
24    return flex.NewVolumeSuccess().WithDevicePath(volumeID)
25}
26

上面的AttachVolume通过调用青云api完成加载


1
2
3
4
5
6
7
8
9
10
11
12
13
1if !attached {
2        output, err := vm.volumeService.AttachVolumes(&qcservice.AttachVolumesInput{
3            Volumes:  []*string{&volumeID},
4            Instance: &instanceID,
5        })
6        if err != nil {
7            return "", err
8        }
9        jobID := *output.JobID
10        //ignore wait job error
11        qcclient.WaitJob(vm.jobService, jobID, operationWaitTimeout, waitInterval)
12    }
13

如果你还有自己的存储,通过相同的方式也就可以介入到k8s了。

给TA打赏
共{{data.count}}人
人已打赏
安全运维

故障复盘的简洁框架-黄金三问

2021-9-30 19:18:23

安全运维

OpenSSH-8.7p1离线升级修复安全漏洞

2021-10-23 10:13:25

个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索