将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)

前情:

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

python脚本实现

厉害的小伙伴最终使用python脚本免费实现了这个功能

文件结构

以下是cursor对代码的评审意见:

这是一个用于下载 Azure Storage 访问日志并将其导入到 Elasticsearch 的项目。让我逐个文件分析:

1. 整体架构

项目采用了良好的模块化设计,主要包含以下组件:

  • main.py: 主程序入口,协调整体流程
  • azure_log_downloader.py: Azure blob 存储日志下载器
  • elasticsearch_sender.py: Elasticsearch 数据发送器
  • models.py: 数据模型定义
  • log_config.py: 日志配置
  • .env: 环境配置文件
  • 2. 具体评审

    main.py

    优点:

  • 支持日期范围处理
  • 良好的错误处理和日志记录
  • 模块化的设计,职责划分清晰
  • 建议改进:

  • 可以考虑添加命令行参数支持,使日期范围可配置
  • 可以添加并发处理支持,提高处理速度
  • azure_log_downloader.py

    优点:

  • 清晰的类设计
  • 良好的错误处理
  • 建议改进:

  • 可以添加重试机制
  • 可以考虑支持异步操作
  • elasticsearch_sender.py

    优点:

  • 实现了批量处理
  • 包含重试机制
  • 良好的连接测试功能
  • 建议改进:

  • 可以考虑添加索引模板管理
  • 可以添加更多的 ES 配置选项
  • models.py

    优点:

  • 使用 dataclass 简化了类定义
  • 详细的字段解析逻辑
  • 良好的类型注解
  • 建议改进:

  • 可以添加字段验证
  • 可以考虑使用 Pydantic 进行数据验证
  • log_config.py

    优点:

  • 完善的日志配置
  • 支持文件和控制台输出
  • 建议改进:

  • 可以考虑添加日志轮转
  • 可以添加日志级别配置选项
  • 3. 安全性建议

  • 建议使用更安全的方式存储敏感信息(如 API keys)
  • 建议添加 SSL 证书验证
  • 建议添加请求速率限制
  • 4. 性能建议

  • 考虑添加并发处理
  • 可以实现增量更新机制
  • 考虑添加缓存机制
  • 5. 可维护性建议

  • 添加单元测试
  • 添加更详细的文档
  • 考虑添加监控指标
  • 添加 CI/CD 配置
  • 总体来说,这是一个设计良好的项目,代码质量较高,有良好的错误处理和日志记录。主要可以在性能优化、安全性和可维护性方面进行进一步改进。

    具体脚本

    1. azure_log_downloader.py:

    from azure.storage.blob import BlobServiceClient
    import json
    from models import StorageOperationLog
    
    class AzureLogDownloader:
        def __init__(self, connection_string, container_name, blob_name, logger=None):
            self.logger = logger
            self.connection_string = connection_string
            self.container_name = container_name
            self.blob_name = blob_name
            
        def _get_blob_client(self):
            blob_service_client = BlobServiceClient.from_connection_string(self.connection_string)
            blob_client = blob_service_client.get_blob_client(
                container=self.container_name, 
                blob=self.blob_name
            )
            return blob_client
    
        def download_and_transform(self):
            """Download and transform log data from Azure storage"""
            try:
                blob_client = self._get_blob_client()
    
                if not blob_client.exists():
                    self.logger.info(f"Blob does not exist, skipping: {self.blob_name}")
                    return []
                
                blob_data = blob_client.download_blob().readall().decode('utf-8')
                
                transformed_entries = []
                for line in blob_data.splitlines():
                    if line.strip():
                        try:
                            log_entry = json.loads(line)
                            log_obj = StorageOperationLog.from_log_entry(log_entry, self.logger)
                            if log_obj:
                                transformed_entries.append(log_obj)
                        except json.JSONDecodeError as e:
                            self.logger.error(f"Error parsing line: {str(e)}")
                            continue
                
                self.logger.info(f"Downloaded and transformed {len(transformed_entries)} logs")
                
                return transformed_entries
            except Exception as e:
                self.logger.error(f"Error downloading blob: {str(e)}")
                self.logger.error(f"Blob: {self.blob_name}, Container: {self.container_name}")
                self.logger.error(f"Error type: {type(e).__name__}")
                return []

    2. elasticsearch_sender.py:

    from elasticsearch import Elasticsearch, helpers
    import time
    import uuid
    
    class ElasticsearchSender:
        def __init__(self, host, api_key=None, index_name="logs", logger=None):
            self.logger = logger
    
            self.config = {
                'hosts': host,
                'timeout': 30,
                'retry_on_timeout': True,
                'max_retries': 3,
                'verify_certs': False,
                'ssl_show_warn': False,
                'use_ssl': True
            }
            if api_key:
                self.config['api_key'] = api_key
                
            self.index_name = index_name
            self.es = Elasticsearch(**self.config)
    
        def test_connection(self):
            """Test Elasticsearch connection"""
            try:
                info = self.es.info()
                self.logger.info("\nElasticsearch Server Info:")
                self.logger.info(f"Version: {info['version']['number']}")
                self.logger.info(f"Cluster Name: {info['cluster_name']}")
                return True
            except Exception as e:
                self.logger.error(f"\nElasticsearch connection failed: {str(e)}")
                return False
    
        def send_logs(self, log_entries, batch_size=500, max_retries=3):
            """Send logs to Elasticsearch"""
            def generate_actions():
                for entry in log_entries:
                    doc_data = entry.__dict__.copy()
                    if 'time' in doc_data:
                        doc_data['@timestamp'] = doc_data.pop('time')
    
                    action = {
                        '_index': self.index_name,
                        '_id': str(uuid.uuid4()),
                        '_source': doc_data
                    }
                    yield action
    
            success_count = 0
            failure_count = 0
            retry_count = 0
    
            while retry_count < max_retries:
                try:
                    success, failed = helpers.bulk(
                        self.es,
                        generate_actions(),
                        chunk_size=batch_size,
                        raise_on_error=False,
                        raise_on_exception=False
                    )
                    
                    success_count += success
                    failure_count += len(failed) if failed else 0
                    
                    self.logger.info(f"\nBatch processing results:")
                    self.logger.info(f"- Successfully indexed: {success_count} documents")
                    self.logger.info(f"- Failed: {failure_count} documents")
                    
                    if not failed:
                        break
                        
                    retry_count += 1
                    if retry_count < max_retries:
                        self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")
                        time.sleep(2 ** retry_count)
                    
                except Exception as e:
                    self.logger.error(f"\nBulk indexing error: {str(e)}")
                    retry_count += 1
                    if retry_count < max_retries:
                        self.logger.info(f"Retrying... (Attempt {retry_count + 1}/{max_retries})")
                        time.sleep(2 ** retry_count)
                    else:
                        self.logger.info("Maximum retry attempts reached")
                        break
    
            return success_count, failure_count

    3. log_config.py:

    import logging
    import os
    from datetime import UTC, datetime
    
    def setup_logger(target_date: datetime = None, log_prefix: str = "app"):
        base_dir = os.path.dirname(os.path.abspath(__file__))
        log_dir = os.path.join(base_dir, 'logs')
    
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        
        current_time = datetime.now(UTC).strftime("%Y%m%d_%H%M%S")
        target_date_str = target_date.strftime("%Y%m%d") if target_date else "None"
        log_file = os.path.join(log_dir, f'{log_prefix}_target_date_{target_date_str}_export_at_{current_time}.log')
        
        logger = logging.getLogger('AccessLog')
        logger.setLevel(logging.INFO)
        
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setLevel(logging.INFO)
        
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
        
        return logger

    4. models.py:

    from dataclasses import dataclass
    from datetime import datetime
    import re
    from typing import Optional
    
    @dataclass
    class StorageOperationLog:
        time: datetime
        category: Optional[str]
        operationName: Optional[str]
        callerIpAddress: Optional[str]
        location: Optional[str]
        uri: Optional[str]
        durationMs: Optional[int]
        referrerHeader: Optional[str]
        userAgentHeader: Optional[str]
        requestBodySize: Optional[int]
        responseBodySize: Optional[int]
        serverLatencyMs: Optional[int]
        objectKey: Optional[str]
        functionName: Optional[str]
        file_extension: Optional[str]
    
        @staticmethod
        def parse_object_key(object_key: str, logger=None) -> tuple[Optional[str], Optional[str]]:
            """Parse objectKey to get institution_id and functionName"""
            try:
                container_match = re.search(r'container-(\d+)', object_key)
    
                parts = object_key.split('/')
                function_name = None
                if container_match:
                    container_index = next((i for i, part in enumerate(parts) 
                                        if 'container-' in part), None)
                    if container_index is not None and container_index + 1 < len(parts):
                        function_name = parts[container_index + 1]
    
                file_extension = None
                if parts and '.' in parts[-1]:
                    file_extension = parts[-1].split('.')[-1].lower()
    
                return function_name, file_extension
            except Exception as e:
                if logger:
                    logger.error(f"Error parsing object_key {object_key}: {str(e)}")
                return None, None
    
        @classmethod
        def from_log_entry(cls, entry: dict[str, any], logger=None) -> Optional['StorageOperationLog']:
            """Create StorageOperationLog instance from raw log entry"""
            try:
                properties = entry.get('properties', {})
                object_key = properties.get('objectKey', '')
                function_name, file_extension = cls.parse_object_key(object_key)
                
                return cls(
                    time=entry.get('time'),
                    category=entry.get('category'),
                    operationName=entry.get('operationName'),
                    callerIpAddress=entry.get('callerIpAddress'),
                    location=entry.get('location'),
                    uri=entry.get('uri'),
                    durationMs=int(entry.get('durationMs')) if entry.get('durationMs') is not None else None,
                    referrerHeader=properties.get('referrerHeader'),
                    userAgentHeader=properties.get('userAgentHeader'),
                    requestBodySize=int(properties.get('requestBodySize')) if properties.get('requestBodySize') is not None else None,
                    responseBodySize=int(properties.get('responseBodySize')) if properties.get('responseBodySize') is not None else None,
                    serverLatencyMs=int(properties.get('serverLatencyMs')) if properties.get('serverLatencyMs') is not None else None,
                    objectKey=object_key,
                    functionName=function_name,
                    file_extension=file_extension
                )
            except Exception as e:
                if logger:
                    logger.error(f"Error creating StorageOperationLog: {str(e)}")
                return None
    
        def __post_init__(self):
            if isinstance(self.time, str):
                if 'Z' in self.time:
                    time_parts = self.time.split('.')
                    if len(time_parts) > 1:
                        microseconds = time_parts[1].replace('Z', '')[:6]
                        time_str = f"{time_parts[0]}.{microseconds}Z"
                        self.time = datetime.strptime(time_str, "%Y-%m-%dT%H:%M:%S.%fZ")
                    else:
                        self.time = datetime.strptime(self.time, "%Y-%m-%dT%H:%M:%SZ")

    5. main.py:

    from log_config import setup_logger
    from azure_log_downloader import AzureLogDownloader
    from elasticsearch_sender import ElasticsearchSender
    from datetime import datetime, timedelta, UTC
    from dotenv import load_dotenv
    import os
    
    load_dotenv()
    
    def _get_index_name(target_date: datetime):
        """Get full index name for the specified date"""
        return os.getenv('ELASTICSEARCH_INDEX_TEMPLATE', 'logs-{year}-{month}').format(
            year=target_date.year,
            month=target_date.month
        )
    
    def _get_blob_name_list(target_date: datetime):
        """Get blob paths for all hours of the specified date"""
        blobs = []
        for hour in range(24):
            blob_time = target_date.replace(hour=hour, minute=0, second=0, microsecond=0)
            blob_name = os.getenv('AZURE_STORAGE_BLOB_TEMPLATE', 'logs/y={year}/m={month}/d={day}/h={hour}').format(
                year=blob_time.year,
                month=blob_time.month,
                day=blob_time.day,
                hour=blob_time.hour
            )
            blobs.append(blob_name)
        return blobs
    
    def main():
        start_date = datetime(2024, 1, 1, tzinfo=UTC)
        end_date = datetime(2024, 1, 2, tzinfo=UTC)
    
        current_date = start_date
        while current_date <= end_date:
            target_date = current_date
            logger = setup_logger(target_date, os.getenv('LOG_PREFIX', 'app'))
    
            try:
                logger.info(f"\nProcessing data for {current_date.date()}")
                elasticsearch_index = _get_index_name(target_date)
    
                sender = ElasticsearchSender(
                    os.getenv('ELASTICSEARCH_HOST', 'http://localhost:9200'),
                    os.getenv('ELASTICSEARCH_API_KEY'),
                    elasticsearch_index,
                    logger
                )
    
                if not sender.test_connection():
                    logger.error("Elasticsearch connection failed")
                    current_date += timedelta(days=1)
                    continue
    
                total_logs = total_success = total_failed = 0
                blobs = _get_blob_name_list(target_date)
    
                for container in os.getenv('AZURE_STORAGE_CONTAINERS', 'logs').split(','):
                    logger.info(f"\nProcessing container: {container}")
                    
                    for blob_name in blobs:
                        logger.info(f"\nProcessing blob: {blob_name}")
                        
                        downloader = AzureLogDownloader(
                            os.getenv('AZURE_STORAGE_URI'),
                            container,
                            blob_name,
                            logger
                        )
                        
                        try:
                            log_entries = downloader.download_and_transform()
                            success, failed = sender.send_logs(log_entries)
                            
                            total_logs += len(log_entries)
                            total_success += success
                            total_failed += failed
                            
                        except Exception as e:
                            logger.error(f"Error processing {blob_name}: {str(e)}")
                            continue
    
                logger.info(f"\n{current_date.date()} Processing completed:")
                logger.info(f"Total documents processed: {total_logs}")
                logger.info(f"Successfully indexed: {total_success}")
                logger.info(f"Failed: {total_failed}")
    
            finally:
                for handler in logger.handlers[:]:
                    handler.close()
                    logger.removeHandler(handler)
    
            current_date += timedelta(days=1)
    
    if __name__ == "__main__":
        main()

    6. .env :

    ELASTICSEARCH_HOST=http://localhost:9200
    ELASTICSEARCH_API_KEY=your_api_key
    ELASTICSEARCH_INDEX_TEMPLATE=logs-{year}-{month}
    AZURE_STORAGE_URI=your_storage_connection_string
    AZURE_STORAGE_CONTAINERS=logs
    AZURE_STORAGE_BLOB_TEMPLATE=logs/y={year}/m={month}/d={day}/h={hour}
    LOG_PREFIX=app


    前情后续:

    将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(1.标准版)-CSDN博客

    将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(2.换掉付费的Event Hubs)-CSDN博客

    将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)-CSDN博客




    作者:petunsecn

    物联沃分享整理
    物联沃-IOTWORD物联网 » 将 AzureBlob 的日志通过 Azure Event Hubs 发给 Elasticsearch(3.纯python的实惠版)

    发表回复