python常用语法笔记(持续更新)

文章目录

  • 一、基础语法
  • 1、sleep休眠
  • 2、os系统操作
  • (1)获取环境变量
  • (2)os.path操作
  • (3)获取命令行参数
  • 3、文件操作
  • (1)文件读取模式详解
  • (2)逐行读取文件
  • (3)逐行写入文件
  • 4、字符串操作
  • (1)字符串转义:r
  • (2)格式化字符串:f
  • 5、时间操作
  • (1)获取当前时间
  • (2)时间格式化
  • 6、多线程与多进程
  • (1)多进程:multiprocessing
  • (2)多进程:Pool
  • (3)多线程:threading
  • 二、高级用法
  • 1、sqlite数据库封装
  • 一、基础语法

    1、sleep休眠

    import time
    
    # 休眠5秒钟,可以写小数
    time.sleep(5)
    

    2、os系统操作

    (1)获取环境变量

    import os
    # 第一个参数为key,第二个参数为如果没有该key的默认值
    name = os.getenv("MY_NAME", "World")
    print(f"Hello {name} from Python")
    

    (2)os.path操作

    import os
    
    path_temp = 'E:\test.txt'
    # 判断是否是目录
    print(os.path.isdir(path_temp))
    # 判断是否是文件
    print(os.path.isfile(path_temp))
    # 判断文件是否存在,包括文件和目录
    print(os.path.exists(path_temp))
    
    # 获取当前脚本的绝对路径
    script_path = os.path.abspath(__file__)
    # 获取文件所在的目录
    project_path = os.path.dirname(script_path)
    
    
    

    (3)获取命令行参数

    import sys
    # 获取参数列表
    arg = sys.argv
    print(arg)
    # 参数长度
    print(len(arg))
    # 获取脚本名称
    arg1 = arg[0]
    # 获取第一个参数
    arg2 = arg[1]
    print(arg2)
    

    执行:
    python test.py 1

    3、文件操作

    (1)文件读取模式详解

    (2)逐行读取文件

    file_path = 'E:\phone.txt'
    
    file_content= []
    # 逐行读取文件
    with open(file_path, 'r') as file:
        for line in file:
            # 处理每一行数据,去除换行符
            file_content.append(line.strip())
    
    print(file_content)
    
    

    (3)逐行写入文件

    # 假设我们有一个大文件file.txt,我们要向其中追加文本
    filename = "file.txt"
     
    # 要追加的文本行
    lines_to_append = ["这是第一行\n", "这是第二行\n", "这是第三行\n"]
     
    # 使用with语句确保文件正确打开和关闭
    with open(filename, "w") as file:
        for line in lines_to_append:
            file.write(line)
    

    4、字符串操作

    (1)字符串转义:r

    # 如果希望反斜杠被解释为普通字符,你可以在字符串前面添加一个 r,表示这是一个原始字符串。例如:r"E:\photo/naked_wife.jpg" 
    # 或者使用 R
    image = cv.imread(r"E:\photo/naked_wife.jpg")
    

    (2)格式化字符串:f

    # 格式化字符串允许在字符串中插入变量或表达式的值,使用大括号 {} 包裹变量或表达式。
    name = "Alice"
    age = 30
    formatted_string = f"My name is {name} and I am {age} years old."
    print(formatted_string)
    

    5、时间操作

    (1)获取当前时间

    import datetime
    # 获取当前时间
    current_time = datetime.datetime.now()
    # 输出当前时间 2024-11-21 06:23:54.802026
    print(current_time)
    # 分别获取当前年、月、日、时、分、秒
    current_year = current_datetime.year
    current_month = current_datetime.month
    current_day = current_datetime.day
    current_hour = current_datetime.hour
    current_minute = current_datetime.minute
    current_second = current_datetime.second
    
    import time
    # 获取当前时间戳
    timestamp = time.time()
    # 转换为可读性更好的格式 Thu Nov 21 06:24:23 2024
    current_time = time.ctime(timestamp)
    
    import time
    
    # 获取当前时间的结构化元组
    current_time_struct = time.localtime()
    
    # 分别获取当前年、月、日、时、分、秒
    current_year = current_time_struct.tm_year
    current_month = current_time_struct.tm_mon
    current_day = current_time_struct.tm_mday
    current_hour = current_time_struct.tm_hour
    current_minute = current_time_struct.tm_min
    current_second = current_time_struct.tm_sec
    
    # 输出当前时间 time.struct_time(tm_year=2024, tm_mon=11, tm_mday=21, tm_hour=9, tm_min=25, tm_sec=44, tm_wday=3, tm_yday=326, tm_isdst=0)
    print(current_time_struct)
    

    (2)时间格式化

    import datetime
    # 获取当前时间
    now = datetime.datetime.now()
    # 格式化当前时间
    now_str = now.strftime("%Y-%m-%d %H:%M:%S")
    print(now_str)
    
    import time
    # 获取当前时间戳
    timestamp = time.time()
    # 格式化时间戳为字符串
    formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
    print(formatted_time)
    

    6、多线程与多进程

    (1)多进程:multiprocessing

    '''
    Process类的参数说明如下:
    group: 表示进程组,目前只能设置为None,一般不需要进行设置。
    target:表示当前进程启动时执行的可调用对象。即进程执行的目标任务,一般是函数名或者是方法名
    name : 为当前进程实例的别名。如果不设置,默认为Process-1,Process-2,...Process-N
    args: 表示传递给target函数的参数元组。
    kwargs: 表示传递给target函数的参数字典。
    
    Process的实例常用的方法除start()外,还有如下常用方法:
    is_ alive0): 判断进程实例是否还在执行。
    join(timecout]):是否等待进程实例执行结束,或等待多少秒。
    start(): 启动进程实例(创建子进程)。
    run(): 如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法。
    terminate(): 不管任务是否完成,立即终止。
    
    Process类还有如下常用属性:
    name:当前进程实例别名,默认为Process一N, N为从1开始递增的整数。
    pid:当前进程实例的PID值。
    
    '''
    
    # 1.导入进程包
    import multiprocessing
    import random
    import time
    # 任务
    def task():
        print(f' {time.strftime("%H:%M:%S")} 执行任务...')
        time.sleep(2)
    # 任务
    def task2(sec):
        print(f' {time.strftime("%H:%M:%S")} 执行任务 {sec}...')
        time.sleep(sec)
    # 2.创建子进程
    # 自己手动创建的进程称为子进程
    if __name__ == "__main__":
        for i in range(8):
            task_process = multiprocessing.Process(target=task2, args=(random.randint(1, 3),)) # 注意元组只有一个元素的时候需要加上,
            # 3.启动进程执行对应的任务
            task_process.start()
    

    (2)多进程:Pool

    '''
    Pool类的常用方法。常用方法及说明如下:
    apply_ async(func[, args[, kwds]):
        使用非阻塞方式调用func()函数( 并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),
        args为传递给func()函数的参数列表,
        kwds为传递给func()函数的关键字参数列表。
    apply(func[, args[, kwdsI): 使用阻塞方式调用func() 函数。
    close(): 关闭Pool, 使其不再接受新的任务。
    terminate():不管任务是否完成,立即终止。
    join(): 主进程阻塞,等待子进程的退出,必须在close或terminate之后使用.
    '''
    
    from multiprocessing import Pool # 导入
    import os
    import time
    def task(name):
        print(f"子进程 {os.getpid()} 执行 task{name} ...")
        time.sleep(2)
    
    if __name__ == "__main__":
        # 1.创建进程池 最大进程数为3
        pool = Pool(3)
        # 2.从0开始执行10次
        for i in range(16):
            # 使用非阻塞方式调用task()函数
            pool.apply_async(task, args=(i,))
        print("等待所有子进程结束...")
        pool.close()  # 关闭进程池,关闭后pool不再接受新的任务请求
        pool.join()  # 等待子进程结束, 没有这句代码,程序会直接结束
        print("所有子进程结束.")
    

    (3)多线程:threading

    '''
    Thread类的参数说明如下:
    group: 值为None,为以后版本而保留。
    target: 表示一个可调用对象,线程启动时,run()方法将调用此对象,默认值为None, 表示不调用任何内容。
    name : 表示当前线程名称,默认创建一个"Thread一N"格式的唯一名称。
    args : 表示传递给target()函数的参数元组。
    kwargs:表示传递给target()函数的参数字典。
    '''
    
    import threading # 1.导入线程模块
    import time
    
    # 任务
    def task():
        print(f' {time.strftime("%H:%M:%S")} 执行任务...')
        time.sleep(2)
    
    if __name__ == '__main__':
        # 获取当前线程
        print("main_thread:", threading.current_thread())
        for i in range(16):
            # 1.创建子线程
            task_thread = threading.Thread(target=task, name="task_thread")
            # 2.启动子线程执行对应的任务
            task_thread.start()
    

    二、高级用法

    1、sqlite数据库封装

    import sqlite3
    from collections import namedtuple
    from math import ceil
    
    class SQLiteDB:
        def __init__(self, db_file):
            """
            初始化SQLite数据库连接和游标
            :param db_file: 数据库文件路径
            """
            self.conn = sqlite3.connect(db_file)
            self.cursor = self.conn.cursor()
    
        def create_table(self, table_name, columns):
            """
            创建表格
            :param table_name: 表格名称
            :param columns: 列定义,格式为 "列名1 数据类型1, 列名2 数据类型2, ..."
            """
            self.cursor.execute(f'''
                CREATE TABLE IF NOT EXISTS {table_name} (
                    {columns}
                )
            ''')
            self.conn.commit()
    
        def insert(self, table_name, data):
            """
            插入数据
            :param table_name: 表格名称
            :param data: 要插入的数据,格式为 {"列名1": 值1, "列名2": 值2, ...}
            :return: 插入的数据的所有行
            """
            columns = ', '.join(data.keys())
            placeholders = ', '.join(['?'] * len(data))
            values = tuple(data.values())
    
            self.cursor.execute(f'''
                INSERT INTO {table_name} ({columns}) VALUES ({placeholders})
            ''', values)
            self.conn.commit()
            return self.cursor.fetchall()
    
        def update(self, table_name, data, condition):
            """
            更新数据
            :param table_name: 表格名称
            :param data: 要更新的数据,格式为 {"列名1": 值1, "列名2": 值2, ...}
            :param condition: 更新条件
            """
            columns = ', '.join([f'{column}=?' for column in data.keys()])
            values = tuple(data.values())
    
            self.cursor.execute(f'''
                UPDATE {table_name} SET {columns} WHERE {condition}
            ''', values)
            self.conn.commit()
    
        def delete(self, table_name, condition):
            """
            删除数据
            :param table_name: 表格名称
            :param condition: 删除条件
            """
            self.cursor.execute(f'''
                DELETE FROM {table_name} WHERE {condition}
            ''')
            self.conn.commit()
    
        def query_all(self, table_name):
            """
            查询表格中的所有数据
            :param table_name: 表格名称
            :return: 所有数据的所有行
            """
            self.cursor.execute(f'''
                SELECT * FROM {table_name}
            ''')
            return self.cursor.fetchall()
    
        def query_page(self, table_name, page_size, page_number, condition=None):
            """
            分页查询表格中的数据
            :param table_name: 表格名称
            :param page_size: 每页数据数量
            :param page_number: 页码
            :param condition: 查询条件 ”age > 10“
            :return: 分页数据的所有行、总数和总页数
            """
            if condition:
                query = f'''
                    SELECT * FROM {table_name}
                    WHERE {condition}
                    LIMIT {page_size} OFFSET {page_size * (page_number - 1)}
                '''
            else:
                query = f'''
                    SELECT * FROM {table_name}
                    LIMIT {page_size} OFFSET {page_size * (page_number - 1)}
                '''
    
            self.cursor.execute(query)
            results = self.cursor.fetchall()
            column_names = [desc[0] for desc in self.cursor.description]
    
            Row = namedtuple('Row', column_names)
            rows = [Row(*row) for row in results]
    
            # 查询总数
            count_query = f'SELECT COUNT(*) FROM {table_name}'
            if condition:
                count_query += f' WHERE {condition}'
            self.cursor.execute(count_query)
            total_count = self.cursor.fetchone()[0]
    
            # 计算总页数
            total_pages = ceil(total_count / page_size)
    
            return rows, total_count, total_pages
    
    
        def execute_sql(self, sql, params=None):
            """
            执行自定义的SQL语句
            :param sql: SQL语句
            :param params: SQL语句中的参数,可选
            :return: 执行结果的所有行
            """
            if params is None:
                self.cursor.execute(sql)
            else:
                self.cursor.execute(sql, params)
            # self.conn.commit()
            return self.cursor.fetchall()
    
        def query_join(self, table1_name, table2_name, on_condition):
            """
            执行表格的连接查询
            :param table1_name: 第一个表格名称
            :param table2_name: 第二个表格名称
            :param on_condition: 连接条件
            :return: 连接查询结果的所有行
            """
            self.cursor.execute(f'''
                   SELECT * FROM {table1_name} 
                   INNER JOIN {table2_name} ON {on_condition}
               ''')
            return self.cursor.fetchall()
    
        def close(self):
            """
            关闭数据库连接
            """
            self.conn.close()
    
    
    # 使用示例:
    if __name__ == '__main__':
        # 创建数据库
        db = SQLiteDB(r'E:\example.db')
    
        # 创建表格
        db.create_table('students', 'id INTEGER PRIMARY KEY, name TEXT NOT NULL, age INTEGER, gender TEXT')
        db.create_table('courses', 'id INTEGER PRIMARY KEY, name TEXT NOT NULL, teacher TEXT')
    
        # 插入数据
        db.insert('students', {'name': 'Alice', 'age': 20, 'gender': 'Female'})
        db.insert('students', {'name': 'Bob', 'age': 22, 'gender': 'Male'})
        db.insert('courses', {'name': 'Math', 'teacher': 'Mr. Smith'})
        db.insert('courses', {'name': 'English', 'teacher': 'Ms. Johnson'})
    
        # 查询所有数据
        print('All Students:')
        print(db.query_all('students'))
    
        # 两表联查
        print('Join Students and Courses:')
        print(db.query_join('students', 'courses', 'students.id = courses.id'))
    
        # 分页查询
        rows, total_count, total_pages = db.query_page('students', 10, 1)
        for row in rows:
            print(row.name, row.age, row.gender)
        print(total_count)
        print(total_pages)
    
        # 修改数据
        db.update('students', {'age': 21}, 'name = "Alice"')
    
        # 新增数据
        db.insert('students', {'name': 'Charlie', 'age': 19, 'gender': 'Male'})
    
        # 删除数据
        db.delete('students', 'name = "Bob"')
    
        # 自定义SQL执行
        db.execute_sql('DELETE FROM students WHERE name = "Charlie"')
    
        db.close()
    
    

    作者:秃了也弱了。

    物联沃分享整理
    物联沃-IOTWORD物联网 » python常用语法笔记(持续更新)

    发表回复