Python 操作Kubernetes集群完全指南

Python 操作Kubernetes集群完全指南

目录

  1. 基础环境准备
  2. Python Kubernetes客户端介绍
  3. 连接Kubernetes集群
  4. Pod操作实战
  5. Deployment管理
  6. Service资源操作
  7. ConfigMap和Secret管理
  8. 自定义资源定义(CRD)操作
  9. 事件监听和Watch操作
  10. 高级应用场景

基础环境准备

1. 安装必要的包

首先,我们需要安装Python的Kubernetes客户端库:

pip install kubernetes
pip install openshift # 可选,用于OpenShift集群

2. 配置文件准备

import os
from kubernetes import client, config

# 加载kubeconfig配置
config.load_kube_config()

Python Kubernetes客户端介绍

1. 主要模块说明

from kubernetes import client, config, watch
from kubernetes.client import ApiClient
from kubernetes.client.rest import ApiException

主要模块功能:

  • client: 提供各种API操作接口
  • config: 处理配置文件加载
  • watch: 用于监控资源变化
  • ApiClient: 底层API客户端
  • ApiException: 异常处理
  • 连接Kubernetes集群

    示例1:基础连接配置

    from kubernetes import client, config
    
    def connect_kubernetes():
        try:
            # 加载本地kubeconfig
            config.load_kube_config()
            
            # 创建API客户端
            v1 = client.CoreV1Api()
            
            # 测试连接
            ret = v1.list_pod_for_all_namespaces(limit=1)
            print("连接成功!发现 {} 个Pod".format(len(ret.items)))
            return v1
        except Exception as e:
            print(f"连接失败:{str(e)}")
            return None
    
    # 测试连接
    api = connect_kubernetes()
    

    示例2:多集群配置

    def connect_multiple_clusters():
        clusters = {
            'prod': '/path/to/prod-kubeconfig',
            'dev': '/path/to/dev-kubeconfig'
        }
        
        apis = {}
        for cluster_name, config_file in clusters.items():
            try:
                config.load_kube_config(config_file=config_file)
                apis[cluster_name] = client.CoreV1Api()
                print(f"成功连接到{cluster_name}集群")
            except Exception as e:
                print(f"连接{cluster_name}集群失败:{str(e)}")
        
        return apis
    

    Pod操作实战

    示例3:创建Pod

    from kubernetes import client, config
    
    def create_pod(name, image, namespace="default"):
        # 创建Pod对象
        pod = client.V1Pod(
            metadata=client.V1ObjectMeta(name=name),
            spec=client.V1PodSpec(
                containers=[
                    client.V1Container(
                        name=name,
                        image=image,
                        ports=[client.V1ContainerPort(container_port=80)]
                    )
                ]
            )
        )
        
        # 获取API实例
        v1 = client.CoreV1Api()
        
        try:
            # 创建Pod
            api_response = v1.create_namespaced_pod(
                namespace=namespace,
                body=pod
            )
            print(f"Pod {name} 创建成功")
            return api_response
        except ApiException as e:
            print(f"Pod创建失败:{str(e)}")
            return None
    
    # 使用示例
    create_pod("nginx-pod", "nginx:latest")
    

    示例4:查询Pod状态

    def get_pod_status(name, namespace="default"):
        v1 = client.CoreV1Api()
        try:
            pod = v1.read_namespaced_pod(name=name, namespace=namespace)
            return {
                "name": pod.metadata.name,
                "status": pod.status.phase,
                "pod_ip": pod.status.pod_ip,
                "host_ip": pod.status.host_ip,
                "start_time": pod.status.start_time,
                "conditions": [
                    {
                        "type": condition.type,
                        "status": condition.status
                    }
                    for condition in pod.status.conditions or []
                ]
            }
        except ApiException as e:
            print(f"获取Pod状态失败:{str(e)}")
            return None
    
    # 使用示例
    status = get_pod_status("nginx-pod")
    print(status)
    

    Deployment管理

    示例5:创建Deployment

    def create_deployment(name, image, replicas=3, namespace="default"):
        # 创建Deployment对象
        deployment = client.V1Deployment(
            metadata=client.V1ObjectMeta(name=name),
            spec=client.V1DeploymentSpec(
                replicas=replicas,
                selector=client.V1LabelSelector(
                    match_labels={"app": name}
                ),
                template=client.V1PodTemplateSpec(
                    metadata=client.V1ObjectMeta(
                        labels={"app": name}
                    ),
                    spec=client.V1PodSpec(
                        containers=[
                            client.V1Container(
                                name=name,
                                image=image,
                                ports=[client.V1ContainerPort(container_port=80)]
                            )
                        ]
                    )
                )
            )
        )
        
        # 获取API实例
        apps_v1 = client.AppsV1Api()
        
        try:
            # 创建Deployment
            api_response = apps_v1.create_namespaced_deployment(
                namespace=namespace,
                body=deployment
            )
            print(f"Deployment {name} 创建成功")
            return api_response
        except ApiException as e:
            print(f"Deployment创建失败:{str(e)}")
            return None
    
    # 使用示例
    create_deployment("nginx-deployment", "nginx:latest")
    

    示例6:更新Deployment

    def update_deployment(name, new_image, namespace="default"):
        apps_v1 = client.AppsV1Api()
        
        try:
            # 获取现有deployment
            deployment = apps_v1.read_namespaced_deployment(name, namespace)
            
            # 更新镜像
            deployment.spec.template.spec.containers[0].image = new_image
            
            # 应用更新
            api_response = apps_v1.patch_namespaced_deployment(
                name=name,
                namespace=namespace,
                body=deployment
            )
            print(f"Deployment {name} 更新成功")
            return api_response
        except ApiException as e:
            print(f"Deployment更新失败:{str(e)}")
            return None
    
    # 使用示例
    update_deployment("nginx-deployment", "nginx:1.19")
    

    Service资源操作

    示例7:创建Service

    def create_service(name, selector, port, target_port, namespace="default"):
        # 创建Service对象
        service = client.V1Service(
            metadata=client.V1ObjectMeta(name=name),
            spec=client.V1ServiceSpec(
                selector=selector,
                ports=[client.V1ServicePort(
                    port=port,
                    target_port=target_port
                )]
            )
        )
        
        v1 = client.CoreV1Api()
        
        try:
            # 创建Service
            api_response = v1.create_namespaced_service(
                namespace=namespace,
                body=service
            )
            print(f"Service {name} 创建成功")
            return api_response
        except ApiException as e:
            print(f"Service创建失败:{str(e)}")
            return None
    
    # 使用示例
    create_service(
        "nginx-service",
        {"app": "nginx-deployment"},
        80,
        80
    )
    

    ConfigMap和Secret管理

    示例8:创建ConfigMap

    def create_configmap(name, data, namespace="default"):
        # 创建ConfigMap对象
        configmap = client.V1ConfigMap(
            metadata=client.V1ObjectMeta(name=name),
            data=data
        )
        
        v1 = client.CoreV1Api()
        
        try:
            # 创建ConfigMap
            api_response = v1.create_namespaced_config_map(
                namespace=namespace,
                body=configmap
            )
            print(f"ConfigMap {name} 创建成功")
            return api_response
        except ApiException as e:
            print(f"ConfigMap创建失败:{str(e)}")
            return None
    
    # 使用示例
    config_data = {
        "app.properties": """
        app.name=myapp
        app.env=production
        """
    }
    create_configmap("app-config", config_data)
    

    示例9:创建Secret

    import base64
    
    def create_secret(name, data, namespace="default"):
        # 编码数据
        encoded_data = {
            k: base64.b64encode(v.encode()).decode()
            for k, v in data.items()
        }
        
        # 创建Secret对象
        secret = client.V1Secret(
            metadata=client.V1ObjectMeta(name=name),
            type="Opaque",
            data=encoded_data
        )
        
        v1 = client.CoreV1Api()
        
        try:
            # 创建Secret
            api_response = v1.create_namespaced_secret(
                namespace=namespace,
                body=secret
            )
            print(f"Secret {name} 创建成功")
            return api_response
        except ApiException as e:
            print(f"Secret创建失败:{str(e)}")
            return None
    
    # 使用示例
    secret_data = {
        "username": "admin",
        "password": "secret123"
    }
    create_secret("app-secrets", secret_data)
    

    自定义资源定义(CRD)操作

    示例10:操作CRD资源

    def create_custom_resource(group, version, plural, namespace, body):
        # 获取CustomObjectsApi
        custom_api = client.CustomObjectsApi()
        
        try:
            # 创建自定义资源
            api_response = custom_api.create_namespaced_custom_object(
                group=group,
                version=version,
                namespace=namespace,
                plural=plural,
                body=body
            )
            print(f"自定义资源创建成功")
            return api_response
        except ApiException as e:
            print(f"自定义资源创建失败:{str(e)}")
            return None
    
    # 使用示例
    custom_resource = {
        "apiVersion": "stable.example.com/v1",
        "kind": "CronTab",
        "metadata": {
            "name": "my-crontab"
        },
        "spec": {
            "cronSpec": "* * * * */5",
            "image": "my-cron-image"
        }
    }
    
    create_custom_resource(
        group="stable.example.com",
        version="v1",
        plural="crontabs",
        namespace="default",
        body=custom_resource
    )
    

    事件监听和Watch操作

    示例11:监听Pod事件

    from kubernetes import watch
    
    def watch_pods(namespace="default"):
        v1 = client.CoreV1Api()
        w = watch.Watch()
        
        try:
            for event in w.stream(v1.list_namespaced_pod, namespace=namespace):
                pod = event['object']
                event_type = event['type']
                
                print(f"事件类型: {event_type}")
                print(f"Pod名称: {pod.metadata.name}")
                print(f"Pod状态: {pod.status.phase}")
                print("-------------------")
                
        except ApiException as e:
            print(f"监听失败:{str(e)}")
        except KeyboardInterrupt:
            w.stop()
            print("监听已停止")
    
    # 使用示例
    # watch_pods()  # 此函数会持续运行直到被中断
    

    高级应用场景

    示例12:批量操作和错误处理

    def batch_create_resources(resources):
        results = {
            'success': [],
            'failed': []
        }
        
        for resource in resources:
            try:
                if resource['kind'] == 'Deployment':
                    apps_v1 = client.AppsV1Api()
                    response = apps_v1.create_namespaced_deployment(
                        namespace=resource['namespace'],
                        body=resource['spec']
                    )
                    results['success'].append({
                        'kind': 'Deployment',
                        'name': resource['spec'].metadata.name
                    })
                elif resource['kind'] == 'Service':
                    v1 = client.CoreV1Api()
                    response = v1.create_namespaced_service(
                        namespace=resource['namespace'],
                        body=resource['spec']
                    )
                    results['success'].append({
                        'kind': 'Service',
                        'name': resource['spec'].metadata.name
                    })
            except ApiException as e:
                results['failed'].append({
                    'kind': resource['kind'],
                    'name': resource['spec'].metadata.name,
                    'error': str(e)
                })
        
        return results
    
    # 使用示例
    resources = [
        {
            'kind': 'Deployment',
            'namespace': 'default',
            'spec': client.V1Deployment(
                metadata=client.V1ObjectMeta(name="nginx-deployment"),
                spec=client.V1DeploymentSpec(
                    replicas=3,
                    selector=client.V1LabelSelector(
                        match_labels={"app": "nginx"}
                    ),
                    template=client.V1PodTemplateSpec(
                        metadata=client.V1ObjectMeta(
                            labels={"app": "nginx"}
                        ),
                        spec=client.V1PodSpec(
                            containers=[
                                client.V1Container(
                                    name="nginx",
                                    image="nginx:latest"
                                )
                            ]
                        )
                    )
                )
            )
    	}
    ]
    		
    ### 示例13:资源清理和垃圾回收
    
    ```python
    def cleanup_resources(namespace="default", label_selector=None):
        """
        清理指定命名空间下的资源
        """
        v1 = client.CoreV1Api()
        apps_v1 = client.AppsV1Api()
        
        cleanup_results = {
            'pods': [],
            'deployments': [],
            'services': [],
            'errors': []
        }
        
        try:
            # 删除Pod
            pods = v1.list_namespaced_pod(
                namespace=namespace,
                label_selector=label_selector
            )
            for pod in pods.items:
                try:
                    v1.delete_namespaced_pod(
                        name=pod.metadata.name,
                        namespace=namespace
                    )
                    cleanup_results['pods'].append(pod.metadata.name)
                except ApiException as e:
                    cleanup_results['errors'].append(f"Pod {pod.metadata.name}: {str(e)}")
            
            # 删除Deployment
            deployments = apps_v1.list_namespaced_deployment(
                namespace=namespace,
                label_selector=label_selector
            )
            for deployment in deployments.items:
                try:
                    apps_v1.delete_namespaced_deployment(
                        name=deployment.metadata.name,
                        namespace=namespace
                    )
                    cleanup_results['deployments'].append(deployment.metadata.name)
                except ApiException as e:
                    cleanup_results['errors'].append(f"Deployment {deployment.metadata.name}: {str(e)}")
            
            # 删除Service
            services = v1.list_namespaced_service(
                namespace=namespace,
                label_selector=label_selector
            )
            for service in services.items:
                try:
                    v1.delete_namespaced_service(
                        name=service.metadata.name,
                        namespace=namespace
                    )
                    cleanup_results['services'].append(service.metadata.name)
                except ApiException as e:
                    cleanup_results['errors'].append(f"Service {service.metadata.name}: {str(e)}")
                    
            return cleanup_results
        
        except ApiException as e:
            print(f"清理资源时发生错误:{str(e)}")
            return None
    
    # 使用示例
    cleanup_result = cleanup_resources(namespace="default", label_selector="app=nginx")
    print("清理结果:", cleanup_result)
    

    示例14:资源健康检查和自动修复

    import time
    from typing import Dict, List
    
    class ResourceHealthChecker:
        def __init__(self, namespace: str = "default"):
            self.namespace = namespace
            self.v1 = client.CoreV1Api()
            self.apps_v1 = client.AppsV1Api()
            
        def check_pod_health(self) -> Dict[str, List[str]]:
            """
            检查Pod的健康状态
            """
            unhealthy_pods = []
            pending_pods = []
            
            try:
                pods = self.v1.list_namespaced_pod(namespace=self.namespace)
                
                for pod in pods.items:
                    if pod.status.phase == 'Failed':
                        unhealthy_pods.append(pod.metadata.name)
                    elif pod.status.phase == 'Pending':
                        pending_pods.append(pod.metadata.name)
                
                return {
                    'unhealthy': unhealthy_pods,
                    'pending': pending_pods
                }
            
            except ApiException as e:
                print(f"检查Pod健康状态时发生错误:{str(e)}")
                return None
        
        def check_deployment_health(self) -> Dict[str, List[str]]:
            """
            检查Deployment的健康状态
            """
            unhealthy_deployments = []
            
            try:
                deployments = self.apps_v1.list_namespaced_deployment(namespace=self.namespace)
                
                for deployment in deployments.items:
                    if deployment.status.ready_replicas != deployment.status.replicas:
                        unhealthy_deployments.append(deployment.metadata.name)
                
                return {
                    'unhealthy': unhealthy_deployments
                }
            
            except ApiException as e:
                print(f"检查Deployment健康状态时发生错误:{str(e)}")
                return None
        
        def auto_repair(self):
            """
            自动修复不健康的资源
            """
            repair_actions = []
            
            # 检查并修复Pod
            pod_health = self.check_pod_health()
            if pod_health:
                for unhealthy_pod in pod_health['unhealthy']:
                    try:
                        self.v1.delete_namespaced_pod(
                            name=unhealthy_pod,
                            namespace=self.namespace
                        )
                        repair_actions.append(f"删除不健康的Pod: {unhealthy_pod}")
                    except ApiException as e:
                        repair_actions.append(f"修复Pod {unhealthy_pod} 失败: {str(e)}")
            
            # 检查并修复Deployment
            deployment_health = self.check_deployment_health()
            if deployment_health:
                for unhealthy_deployment in deployment_health['unhealthy']:
                    try:
                        # 重启Deployment
                        patch = {
                            "spec": {
                                "template": {
                                    "metadata": {
                                        "annotations": {
                                            "kubectl.kubernetes.io/restartedAt": datetime.now().isoformat()
                                        }
                                    }
                                }
                            }
                        }
                        self.apps_v1.patch_namespaced_deployment(
                            name=unhealthy_deployment,
                            namespace=self.namespace,
                            body=patch
                        )
                        repair_actions.append(f"重启Deployment: {unhealthy_deployment}")
                    except ApiException as e:
                        repair_actions.append(f"修复Deployment {unhealthy_deployment} 失败: {str(e)}")
            
            return repair_actions
    
    # 使用示例
    health_checker = ResourceHealthChecker("default")
    repair_results = health_checker.auto_repair()
    print("修复操作:", repair_results)
    

    示例15:自定义控制器实现

    from kubernetes import watch
    import threading
    import queue
    
    class CustomController:
        def __init__(self, namespace="default"):
            self.namespace = namespace
            self.v1 = client.CoreV1Api()
            self.apps_v1 = client.AppsV1Api()
            self.event_queue = queue.Queue()
            self.running = False
        
        def start(self):
            """
            启动控制器
            """
            self.running = True
            
            # 启动事件处理线程
            threading.Thread(target=self._process_events).start()
            
            # 启动资源监控
            threading.Thread(target=self._watch_pods).start()
            threading.Thread(target=self._watch_deployments).start()
        
        def stop(self):
            """
            停止控制器
            """
            self.running = False
        
        def _watch_pods(self):
            """
            监控Pod变化
            """
            w = watch.Watch()
            while self.running:
                try:
                    for event in w.stream(
                        self.v1.list_namespaced_pod,
                        namespace=self.namespace
                    ):
                        if not self.running:
                            break
                        self.event_queue.put(('Pod', event))
                except Exception as e:
                    print(f"Pod监控异常:{str(e)}")
                    if self.running:
                        time.sleep(5)  # 发生错误时等待后重试
        
        def _watch_deployments(self):
            """
            监控Deployment变化
            """
            w = watch.Watch()
            while self.running:
                try:
                    for event in w.stream(
                        self.apps_v1.list_namespaced_deployment,
                        namespace=self.namespace
                    ):
                        if not self.running:
                            break
                        self.event_queue.put(('Deployment', event))
                except Exception as e:
                    print(f"Deployment监控异常:{str(e)}")
                    if self.running:
                        time.sleep(5)
        
        def _process_events(self):
            """
            处理事件队列
            """
            while self.running:
                try:
                    resource_type, event = self.event_queue.get(timeout=1)
                    self._handle_event(resource_type, event)
                except queue.Empty:
                    continue
                except Exception as e:
                    print(f"事件处理异常:{str(e)}")
        
        def _handle_event(self, resource_type, event):
            """
            处理具体事件
            """
            event_type = event['type']
            obj = event['object']
            
            print(f"收到{resource_type}事件:")
            print(f"  类型: {event_type}")
            print(f"  名称: {obj.metadata.name}")
            
            if resource_type == 'Pod':
                self._handle_pod_event(event_type, obj)
            elif resource_type == 'Deployment':
                self._handle_deployment_event(event_type, obj)
        
        def _handle_pod_event(self, event_type, pod):
            """
            处理Pod事件
            """
            if event_type == 'MODIFIED':
                if pod.status.phase == 'Failed':
                    print(f"检测到Pod {pod.metadata.name} 失败,尝试重启")
                    try:
                        self.v1.delete_namespaced_pod(
                            name=pod.metadata.name,
                            namespace=self.namespace
                        )
                    except ApiException as e:
                        print(f"重启Pod失败:{str(e)}")
        
        def _handle_deployment_event(self, event_type, deployment):
            """
            处理Deployment事件
            """
            if event_type == 'MODIFIED':
                if deployment.status.ready_replicas != deployment.status.replicas:
                    print(f"检测到Deployment {deployment.metadata.name} 副本不一致")
                    # 这里可以添加自定义的处理逻辑
    
    # 使用示例
    controller = CustomController("default")
    controller.start()
    
    # 运行一段时间后停止
    # time.sleep(3600)
    # controller.stop()
    

    示例16:资源指标监控

    from kubernetes.client import CustomObjectsApi
    import time
    
    class MetricsCollector:
        def __init__(self):
            self.custom_api = CustomObjectsApi()
        
        def get_node_metrics(self):
            """
            获取节点资源使用指标
            """
            try:
                metrics = self.custom_api.list_cluster_custom_object(
                    group="metrics.k8s.io",
                    version="v1beta1",
                    plural="nodes"
                )
                
                node_metrics = {}
                for item in metrics['items']:
                    node_name = item['metadata']['name']
                    node_metrics[node_name] = {
                        'cpu': item['usage']['cpu'],
                        'memory': item['usage']['memory']
                    }
                
                return node_metrics
            
            except ApiException as e:
                print(f"获取节点指标失败:{str(e)}")
                return None
        
        def get_pod_metrics(self, namespace="default"):
            """
            获取Pod资源使用指标
            """
            try:
                metrics = self.custom_api.list_namespaced_custom_object(
                    group="metrics.k8s.io",
                    version="v1beta1",
                    namespace=namespace,
                    plural="pods"
                )
                
                pod_metrics = {}
                for item in metrics['items']:
                    pod_name = item['metadata']['name']
                    containers = {}
                    
                    for container in item['containers']:
                        containers[container['name']] = {
                            'cpu': container['usage']['cpu'],
                            'memory': container['usage']['memory']
                        }
                    
                    pod_metrics[pod_name] = containers
                
                return pod_metrics
            
            except ApiException as e:
                print(f"获取Pod指标失败:{str(e)}")
                return None
        
        def monitor_resources(self, interval=30):
            """
            持续监控资源使用情况
            """
            while True:
                print("\n=== 资源使用情况 ===")
                
                # 获取节点指标
                node_metrics = self.get_node_metrics()
                if node_metrics:
                    print("\n节点资源使用情况:")
                    for node_name, metrics in node_metrics.items():
                        print(f"\n节点: {node_name}")
                        print(f"CPU使用: {metrics['cpu']}")
                        print(f"内存使用: {metrics['memory']}")
                
                # 获取Pod指标
                pod_metrics = self.get_pod_metrics()
                if pod_metrics:
                    print("\nPod资源使用情况:")
                    for pod_name, containers in pod_metrics.items():
                        print(f"\nPod: {pod_name}")
                        for container_name, metrics in containers.items():
                            print(f"容器: {container_name}")
                            print(f"CPU使用: {metrics['cpu']}")
                            print(f"内存使用: {metrics['memory']}")
                
                time.sleep(interval)
    
    # 使用示例
    collector = MetricsCollector()
    # collector.monitor_resources()  # 持续监控
    

    最佳实践和注意事项

    1. 错误处理
  • 始终使用try-except块处理API调用
  • 实现重试机制处理临时性故障
  • 记录详细的错误信息便于调试
    1. 性能优化
  • 使用批量操作代替单个操作
  • 实现合适的缓存机制
  • 避免频繁的API调用
    1. 安全考虑
  • 使用最小权限原则
  • 保护敏感信息(如密钥和证书)
  • 实现适当的认证和授权机制
    1. 可维护性
  • 模块化代码结构
  • 完善的日志记录
  • 清晰的代码注释
  • 总结

    本文详细介绍了如何使用Python操作Kubernetes集群,包括:

    1. 基础环境配置
    2. 常见资源操作
    3. 高级应用场景
    4. 自动化运维实践
    5. 监控和告警实现

    通过这些示例和最佳实践,可以构建强大的Kubernetes自动化工具和运维系统。

    作者:IT策士

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python 操作Kubernetes集群完全指南

    发表回复