前面两篇文章已经介绍了整体的内容,下面就已对青云的存储为例,先提供一个Provision的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 1func (c *volumeProvisioner) Provision(options controller.VolumeOptions) (*v1.PersistentVolume, error) {
2 glog.V(4).Infof("qingcloudVolumeProvisioner Provision called, options: [%+v]", options)
3
4 // TODO: implement PVC.Selector parsing
5 if options.PVC.Spec.Selector != nil {
6 return nil, fmt.Errorf("claim.Spec.Selector is not supported for dynamic provisioning on qingcloud")
7 }
8
9 // Validate access modes
10 found := false
11 for _, mode := range options.PVC.Spec.AccessModes {
12 if mode == v1.ReadWriteOnce {
13 found = true
14 }
15 }
16 if !found {
17 return nil, fmt.Errorf("Qingcloud volume only supports ReadWriteOnce mounts")
18 }
19
20 volumeOptions := &VolumeOptions{}
21
22 hasSetType := false
23 for k, v := range options.Parameters {
24 switch strings.ToLower(k) {
25 case "type":
26 if !supportVolumeTypes.Has(v) {
27 return nil, fmt.Errorf("invalid option '%q' for qingcloud-volume-provisioner, it only can be 0, 2, 3",
28 k)
29 }
30 volumeTypeInt, _ := strconv.Atoi(v)
31 volumeOptions.VolumeType = VolumeType(volumeTypeInt)
32 hasSetType = true
33 default:
34 return nil, fmt.Errorf("invalid option '%q' for qingcloud-volume-provisioner", k)
35 }
36 }
37
38 //auto set volume type by instance class.
39 if !hasSetType {
40 volumeOptions.VolumeType = c.manager.GetDefaultVolumeType()
41 }
42
43 capacity := options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
44 sizeGB, err := RoundUpVolumeCapacity(capacity, volumeOptions.VolumeType)
45 if err != nil {
46 return nil, err
47 }
48 volumeOptions.CapacityGB = sizeGB
49
50 //use pv name as volumeName
51 volumeOptions.VolumeName = options.PVName
52 volumeID, err := c.manager.CreateVolume(volumeOptions)
53 if err != nil {
54 glog.V(2).Infof("Error creating qingcloud volume: %v", err)
55 return nil, err
56 }
57 glog.V(2).Infof("Successfully created qingcloud volume %s", volumeID)
58
59 storageClassName := ""
60 if options.PVC.Spec.StorageClassName != nil {
61 storageClassName = *options.PVC.Spec.StorageClassName
62 }
63
64 annotations := make(map[string]string)
65 annotations[annCreatedBy] = createdBy
66 annotations[annProvisionerId] = ProvisionerName
67
68 flexVolumeConfig := make(map[string]string)
69 flexVolumeConfig[OptionVolumeID] = volumeID
70
71 fsType ,ok := options.PVC.ObjectMeta.Annotations["kubernetes.io/fsType"]
72 if !ok{
73 fsType = DefaultFSType
74 }
75 pv := &v1.PersistentVolume{
76 ObjectMeta: metav1.ObjectMeta{
77 Name: options.PVName,
78 Labels: map[string]string{},
79 Annotations: annotations,
80 },
81 Spec: v1.PersistentVolumeSpec{
82 PersistentVolumeReclaimPolicy: options.PersistentVolumeReclaimPolicy,
83 AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
84 Capacity: v1.ResourceList{
85 v1.ResourceName(v1.ResourceStorage): resource.MustParse(fmt.Sprintf("%dGi", sizeGB)),
86 },
87 StorageClassName: storageClassName,
88 PersistentVolumeSource: v1.PersistentVolumeSource{
89 FlexVolume: &v1.FlexVolumeSource{
90 Driver: FlexDriverName,
91 FSType: fsType,
92 ReadOnly: false,
93 Options: flexVolumeConfig,
94 },
95 },
96 },
97 }
98
99 return pv, nil
100}
101
上面的代码就是创建pv的整个过程
然后是删除的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23 1func (c *volumeProvisioner) Delete(volume *v1.PersistentVolume) error {
2 if volume.Name == "" {
3 return fmt.Errorf("volume name cannot be empty %#v", volume)
4 }
5
6 if volume.Spec.PersistentVolumeReclaimPolicy != v1.PersistentVolumeReclaimRetain {
7 if volume.Spec.PersistentVolumeSource.FlexVolume == nil {
8 return fmt.Errorf("volume [%s] not support by qingcloud-volume-provisioner", volume.Name)
9 }
10 volumeID := volume.Spec.PersistentVolumeSource.FlexVolume.Options["volumeID"]
11 if volumeID == "" {
12 return fmt.Errorf("Spec.PersistentVolumeSource.FlexVolume.Options[\"volumeID\"] cannot be empty %#v", volume)
13 }
14 _, err := c.manager.DeleteVolume(volumeID)
15 if err != nil {
16 return err
17 }
18 return nil
19 }
20
21 return nil
22}
23
然后是二进制执行文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 1func handler(op string, args []string) flex.VolumeResult {
2 volumePlugin, err := qingcloud.NewFlexVolumePlugin()
3
4 if err != nil {
5 return flex.NewVolumeError("Error init FlexVolumePlugin")
6 }
7
8 var ret flex.VolumeResult
9
10 switch op {
11 case "init":
12 ret = volumePlugin.Init()
13 case "attach":
14 if len(args) < 2 {
15 return flex.NewVolumeError("attach requires options in json format and a node name")
16 }
17 ret = volumePlugin.Attach(ensureVolumeOptions(args[0]), args[1])
18 case "isattached":
19 if len(args) < 2 {
20 return flex.NewVolumeError("isattached requires options in json format and a node name")
21 }
22 ret = volumePlugin.IsAttached(ensureVolumeOptions(args[0]), args[1])
23 case "detach":
24 if len(args) < 2 {
25 return flex.NewVolumeError("detach requires a device path and a node name")
26 }
27 ret = volumePlugin.Detach(args[0], args[1])
28 case "mountdevice":
29 if len(args) < 3 {
30 return flex.NewVolumeError("mountdevice requires a mount path, a device path and mount options")
31 }
32 ret = volumePlugin.MountDevice(args[0], args[1], ensureVolumeOptions(args[2]))
33 case "unmountdevice":
34 if len(args) < 1 {
35 return flex.NewVolumeError("unmountdevice requires a mount path")
36 }
37 ret = volumePlugin.UnmountDevice(args[0])
38 case "waitforattach":
39 if len(args) < 2 {
40 return flex.NewVolumeError("waitforattach requires a device path and options in json format")
41 }
42 ret = volumePlugin.WaitForAttach(args[0], ensureVolumeOptions(args[1]))
43 case "getvolumename":
44 if len(args) < 1 {
45 return flex.NewVolumeError("getvolumename requires options in json format")
46 }
47 ret = volumePlugin.GetVolumeName(ensureVolumeOptions(args[0]))
48 default:
49 ret = flex.NewVolumeNotSupported(op)
50 }
51 return ret
52}
53
实现了attach、MountDevice等方法。具体以一个attached为例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 1func (p *flexVolumePlugin) Attach(options flex.VolumeOptions, node string) flex.VolumeResult {
2 glog.V(4).Infof("Attach")
3 volumeID, _ := options[OptionVolumeID].(string)
4 pvOrVolumeName, _ := options[OptionPVorVolumeName].(string)
5 // flexVolumeDriver GetVolumeName is not yet supported, so PVorVolumeName is pvName, and store pvName to volumeName
6 if !isVolumeID(pvOrVolumeName) {
7 err := p.manager.UpdateVolume(volumeID, pvOrVolumeName)
8 if err != nil {
9 return flex.NewVolumeError("Error updating volume (%s) name to (%s) : %s", volumeID, pvOrVolumeName, err.Error())
10 }
11 }
12 // VolumeManager.AttachVolume checks if disk is already attached to node and
13 // succeeds in that case, so no need to do that separately.
14 _, err := p.manager.AttachVolume(volumeID, node)
15
16 if err != nil {
17 //ignore already attached error
18 if !strings.Contains(err.Error(), "have been already attached to instance") {
19 glog.Errorf("Error attaching volume %q: %+v", volumeID, err)
20 return flex.NewVolumeError("Error attaching volume %q to node %s: %+v", volumeID, node, err)
21 }
22 }
23
24 return flex.NewVolumeSuccess().WithDevicePath(volumeID)
25}
26
上面的AttachVolume通过调用青云api完成加载
1
2
3
4
5
6
7
8
9
10
11
12
13 1if !attached {
2 output, err := vm.volumeService.AttachVolumes(&qcservice.AttachVolumesInput{
3 Volumes: []*string{&volumeID},
4 Instance: &instanceID,
5 })
6 if err != nil {
7 return "", err
8 }
9 jobID := *output.JobID
10 //ignore wait job error
11 qcclient.WaitJob(vm.jobService, jobID, operationWaitTimeout, waitInterval)
12 }
13
如果你还有自己的存储,通过相同的方式也就可以介入到k8s了。