kubernetes发展到1.7以后,就朝着平台去开发了,上一个讨论的点是kubernetes把metrics单独出去,成为一个kubernetes的监控标准,这个我们以后再慢慢说,先看存储这个孵化的项目。github.com/kubernetes-incubator/external-storage第三方存储对接。
它和上一篇介绍的custom-metrics-apiserver项目一样都是孵化的项目,我们先看看他是如何工作的。
核心在lib/controller/controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 1//启动了pvc的listwatch
2controller.claimSource = &cache.ListWatch{
3 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
4 return client.Core().PersistentVolumeClaims(v1.NamespaceAll).List(options)
5 },
6 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
7 return client.Core().PersistentVolumeClaims(v1.NamespaceAll).Watch(options)
8 },
9 }
10 controller.claims, controller.claimController = cache.NewInformer(
11 controller.claimSource,
12 &v1.PersistentVolumeClaim{},
13 controller.resyncPeriod,
14 cache.ResourceEventHandlerFuncs{
15 AddFunc: controller.addClaim,
16 UpdateFunc: controller.updateClaim,
17 DeleteFunc: nil,
18 },
19 )
20//启动了pv的listwatch
21 controller.volumeSource = &cache.ListWatch{
22 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
23 return client.Core().PersistentVolumes().List(options)
24 },
25 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
26 return client.Core().PersistentVolumes().Watch(options)
27 },
28 }
29 controller.volumes, controller.volumeController = cache.NewInformer(
30 controller.volumeSource,
31 &v1.PersistentVolume{},
32 controller.resyncPeriod,
33 cache.ResourceEventHandlerFuncs{
34 AddFunc: nil,
35 UpdateFunc: controller.updateVolume,
36 DeleteFunc: nil,
37 },
38 )
39//启动了storageclass的listwatch
40controller.classSource = &cache.ListWatch{
41 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
42 return client.StorageV1().StorageClasses().List(options)
43 },
44 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
45 return client.StorageV1().StorageClasses().Watch(options)
46 },
47 }
48 controller.classReflector = cache.NewReflector(
49 controller.classSource,
50 &storage.StorageClass{},
51 controller.classes,
52 controller.resyncPeriod,
53 )
54
上面启动服务主要是watch k8s对存储管理的各种事件,只要是pvc、pv和storageclass的事件的监听。
先看如果pvc创建了改如何处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 1func (ctrl *ProvisionController) addClaim(obj interface{}) {
2 claim, ok := obj.(*v1.PersistentVolumeClaim)
3 if !ok {
4 glog.Errorf("Expected PersistentVolumeClaim but addClaim received %+v", obj)
5 return
6 }
7
8 if ctrl.shouldProvision(claim) {
9 ctrl.leaderElectorsMutex.Lock()
10 le, ok := ctrl.leaderElectors[claim.UID]
11 ctrl.leaderElectorsMutex.Unlock()
12 if ok && le.IsLeader() {
13 opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
14 ctrl.scheduleOperation(opName, func() error {
15 err := ctrl.provisionClaimOperation(claim)
16 ctrl.updateProvisionStats(claim, err)
17 return err
18 })
19 } else {
20 opName := fmt.Sprintf("lock-provision-%s[%s]", claimToClaimKey(claim), string(claim.UID))
21 ctrl.scheduleOperation(opName, func() error {
22 ctrl.lockProvisionClaimOperation(claim)
23 return nil
24 })
25 }
26 }
27}
28
关于选主的方法在此先跳过,主要看看,如何执行的
1
2
3 1err := ctrl.provisionClaimOperation(claim)
2ctrl.updateProvisionStats(claim, err)
3
先看provisionClaimOperation这个方法,这个方法比较长,挑选核心介绍
1
2
3
4
5
6
7
8
9 1先获取到pv的名称通过pvc+pvc的uid拼接而成
2pvName := ctrl.getProvisionedVolumeNameForClaim(claim)
3获取pv是否存在,存在则跳过
4volume, err := ctrl.client.Core().PersistentVolumes().Get(pvName, metav1.GetOptions{})
5如果不存在则执行Provision,创建pv对象
6volume, err = ctrl.provisioner.Provision(options)
7调用k8s创建pv
8ctrl.client.Core().PersistentVolumes().Create(volume)
9
核心是上面的Provision,它是通过pvc创建pv的入口。
它是一个接口
1
2
3
4
5
6
7 1type Provisioner interface {
2 //创建pv对象
3 Provision(VolumeOptions) (*v1.PersistentVolume, error)
4 // 删除pv
5 Delete(*v1.PersistentVolume) error
6}
7
这个就是让第三方去实现的接口,去创建自己的pv对象。这样存储就创建好了。
如果是pv被删除呢?当然按照一开始listwatch方式也是监听到这个消息,会执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 1func (ctrl *ProvisionController) updateVolume(oldObj, newObj interface{}) {
2 volume, ok := newObj.(*v1.PersistentVolume)
3 if !ok {
4 glog.Errorf("Expected PersistentVolume but handler received %#v", newObj)
5 return
6 }
7
8 if ctrl.shouldDelete(volume) {
9 opName := fmt.Sprintf("delete-%s[%s]", volume.Name, string(volume.UID))
10 ctrl.scheduleOperation(opName, func() error {
11 err := ctrl.deleteVolumeOperation(volume)
12 ctrl.updateDeleteStats(volume, err)
13 return err
14 })
15 }
16}
17
deleteVolumeOperation这个方法原理和之前的一样会调用删除接口
1
2 1err = ctrl.provisioner.Delete(volume)
2
释放存储。