python常用语法笔记(持续更新)
文章目录
一、基础语法
1、sleep休眠
import time
# 休眠5秒钟,可以写小数
time.sleep(5)
2、os系统操作
(1)获取环境变量
import os
# 第一个参数为key,第二个参数为如果没有该key的默认值
name = os.getenv("MY_NAME", "World")
print(f"Hello {name} from Python")
(2)os.path操作
import os
path_temp = 'E:\test.txt'
# 判断是否是目录
print(os.path.isdir(path_temp))
# 判断是否是文件
print(os.path.isfile(path_temp))
# 判断文件是否存在,包括文件和目录
print(os.path.exists(path_temp))
# 获取当前脚本的绝对路径
script_path = os.path.abspath(__file__)
# 获取文件所在的目录
project_path = os.path.dirname(script_path)
(3)获取命令行参数
import sys
# 获取参数列表
arg = sys.argv
print(arg)
# 参数长度
print(len(arg))
# 获取脚本名称
arg1 = arg[0]
# 获取第一个参数
arg2 = arg[1]
print(arg2)
执行:
python test.py 1
3、文件操作
(1)文件读取模式详解
(2)逐行读取文件
file_path = 'E:\phone.txt'
file_content= []
# 逐行读取文件
with open(file_path, 'r') as file:
for line in file:
# 处理每一行数据,去除换行符
file_content.append(line.strip())
print(file_content)
(3)逐行写入文件
# 假设我们有一个大文件file.txt,我们要向其中追加文本
filename = "file.txt"
# 要追加的文本行
lines_to_append = ["这是第一行\n", "这是第二行\n", "这是第三行\n"]
# 使用with语句确保文件正确打开和关闭
with open(filename, "w") as file:
for line in lines_to_append:
file.write(line)
4、字符串操作
(1)字符串转义:r
# 如果希望反斜杠被解释为普通字符,你可以在字符串前面添加一个 r,表示这是一个原始字符串。例如:r"E:\photo/naked_wife.jpg"
# 或者使用 R
image = cv.imread(r"E:\photo/naked_wife.jpg")
(2)格式化字符串:f
# 格式化字符串允许在字符串中插入变量或表达式的值,使用大括号 {} 包裹变量或表达式。
name = "Alice"
age = 30
formatted_string = f"My name is {name} and I am {age} years old."
print(formatted_string)
5、时间操作
(1)获取当前时间
import datetime
# 获取当前时间
current_time = datetime.datetime.now()
# 输出当前时间 2024-11-21 06:23:54.802026
print(current_time)
# 分别获取当前年、月、日、时、分、秒
current_year = current_datetime.year
current_month = current_datetime.month
current_day = current_datetime.day
current_hour = current_datetime.hour
current_minute = current_datetime.minute
current_second = current_datetime.second
import time
# 获取当前时间戳
timestamp = time.time()
# 转换为可读性更好的格式 Thu Nov 21 06:24:23 2024
current_time = time.ctime(timestamp)
import time
# 获取当前时间的结构化元组
current_time_struct = time.localtime()
# 分别获取当前年、月、日、时、分、秒
current_year = current_time_struct.tm_year
current_month = current_time_struct.tm_mon
current_day = current_time_struct.tm_mday
current_hour = current_time_struct.tm_hour
current_minute = current_time_struct.tm_min
current_second = current_time_struct.tm_sec
# 输出当前时间 time.struct_time(tm_year=2024, tm_mon=11, tm_mday=21, tm_hour=9, tm_min=25, tm_sec=44, tm_wday=3, tm_yday=326, tm_isdst=0)
print(current_time_struct)
(2)时间格式化
import datetime
# 获取当前时间
now = datetime.datetime.now()
# 格式化当前时间
now_str = now.strftime("%Y-%m-%d %H:%M:%S")
print(now_str)
import time
# 获取当前时间戳
timestamp = time.time()
# 格式化时间戳为字符串
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp))
print(formatted_time)
6、多线程与多进程
(1)多进程:multiprocessing
'''
Process类的参数说明如下:
group: 表示进程组,目前只能设置为None,一般不需要进行设置。
target:表示当前进程启动时执行的可调用对象。即进程执行的目标任务,一般是函数名或者是方法名
name : 为当前进程实例的别名。如果不设置,默认为Process-1,Process-2,...Process-N
args: 表示传递给target函数的参数元组。
kwargs: 表示传递给target函数的参数字典。
Process的实例常用的方法除start()外,还有如下常用方法:
is_ alive0): 判断进程实例是否还在执行。
join(timecout]):是否等待进程实例执行结束,或等待多少秒。
start(): 启动进程实例(创建子进程)。
run(): 如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法。
terminate(): 不管任务是否完成,立即终止。
Process类还有如下常用属性:
name:当前进程实例别名,默认为Process一N, N为从1开始递增的整数。
pid:当前进程实例的PID值。
'''
# 1.导入进程包
import multiprocessing
import random
import time
# 任务
def task():
print(f' {time.strftime("%H:%M:%S")} 执行任务...')
time.sleep(2)
# 任务
def task2(sec):
print(f' {time.strftime("%H:%M:%S")} 执行任务 {sec}...')
time.sleep(sec)
# 2.创建子进程
# 自己手动创建的进程称为子进程
if __name__ == "__main__":
for i in range(8):
task_process = multiprocessing.Process(target=task2, args=(random.randint(1, 3),)) # 注意元组只有一个元素的时候需要加上,
# 3.启动进程执行对应的任务
task_process.start()
(2)多进程:Pool
'''
Pool类的常用方法。常用方法及说明如下:
apply_ async(func[, args[, kwds]):
使用非阻塞方式调用func()函数( 并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),
args为传递给func()函数的参数列表,
kwds为传递给func()函数的关键字参数列表。
apply(func[, args[, kwdsI): 使用阻塞方式调用func() 函数。
close(): 关闭Pool, 使其不再接受新的任务。
terminate():不管任务是否完成,立即终止。
join(): 主进程阻塞,等待子进程的退出,必须在close或terminate之后使用.
'''
from multiprocessing import Pool # 导入
import os
import time
def task(name):
print(f"子进程 {os.getpid()} 执行 task{name} ...")
time.sleep(2)
if __name__ == "__main__":
# 1.创建进程池 最大进程数为3
pool = Pool(3)
# 2.从0开始执行10次
for i in range(16):
# 使用非阻塞方式调用task()函数
pool.apply_async(task, args=(i,))
print("等待所有子进程结束...")
pool.close() # 关闭进程池,关闭后pool不再接受新的任务请求
pool.join() # 等待子进程结束, 没有这句代码,程序会直接结束
print("所有子进程结束.")
(3)多线程:threading
'''
Thread类的参数说明如下:
group: 值为None,为以后版本而保留。
target: 表示一个可调用对象,线程启动时,run()方法将调用此对象,默认值为None, 表示不调用任何内容。
name : 表示当前线程名称,默认创建一个"Thread一N"格式的唯一名称。
args : 表示传递给target()函数的参数元组。
kwargs:表示传递给target()函数的参数字典。
'''
import threading # 1.导入线程模块
import time
# 任务
def task():
print(f' {time.strftime("%H:%M:%S")} 执行任务...')
time.sleep(2)
if __name__ == '__main__':
# 获取当前线程
print("main_thread:", threading.current_thread())
for i in range(16):
# 1.创建子线程
task_thread = threading.Thread(target=task, name="task_thread")
# 2.启动子线程执行对应的任务
task_thread.start()
二、高级用法
1、sqlite数据库封装
import sqlite3
from collections import namedtuple
from math import ceil
class SQLiteDB:
def __init__(self, db_file):
"""
初始化SQLite数据库连接和游标
:param db_file: 数据库文件路径
"""
self.conn = sqlite3.connect(db_file)
self.cursor = self.conn.cursor()
def create_table(self, table_name, columns):
"""
创建表格
:param table_name: 表格名称
:param columns: 列定义,格式为 "列名1 数据类型1, 列名2 数据类型2, ..."
"""
self.cursor.execute(f'''
CREATE TABLE IF NOT EXISTS {table_name} (
{columns}
)
''')
self.conn.commit()
def insert(self, table_name, data):
"""
插入数据
:param table_name: 表格名称
:param data: 要插入的数据,格式为 {"列名1": 值1, "列名2": 值2, ...}
:return: 插入的数据的所有行
"""
columns = ', '.join(data.keys())
placeholders = ', '.join(['?'] * len(data))
values = tuple(data.values())
self.cursor.execute(f'''
INSERT INTO {table_name} ({columns}) VALUES ({placeholders})
''', values)
self.conn.commit()
return self.cursor.fetchall()
def update(self, table_name, data, condition):
"""
更新数据
:param table_name: 表格名称
:param data: 要更新的数据,格式为 {"列名1": 值1, "列名2": 值2, ...}
:param condition: 更新条件
"""
columns = ', '.join([f'{column}=?' for column in data.keys()])
values = tuple(data.values())
self.cursor.execute(f'''
UPDATE {table_name} SET {columns} WHERE {condition}
''', values)
self.conn.commit()
def delete(self, table_name, condition):
"""
删除数据
:param table_name: 表格名称
:param condition: 删除条件
"""
self.cursor.execute(f'''
DELETE FROM {table_name} WHERE {condition}
''')
self.conn.commit()
def query_all(self, table_name):
"""
查询表格中的所有数据
:param table_name: 表格名称
:return: 所有数据的所有行
"""
self.cursor.execute(f'''
SELECT * FROM {table_name}
''')
return self.cursor.fetchall()
def query_page(self, table_name, page_size, page_number, condition=None):
"""
分页查询表格中的数据
:param table_name: 表格名称
:param page_size: 每页数据数量
:param page_number: 页码
:param condition: 查询条件 ”age > 10“
:return: 分页数据的所有行、总数和总页数
"""
if condition:
query = f'''
SELECT * FROM {table_name}
WHERE {condition}
LIMIT {page_size} OFFSET {page_size * (page_number - 1)}
'''
else:
query = f'''
SELECT * FROM {table_name}
LIMIT {page_size} OFFSET {page_size * (page_number - 1)}
'''
self.cursor.execute(query)
results = self.cursor.fetchall()
column_names = [desc[0] for desc in self.cursor.description]
Row = namedtuple('Row', column_names)
rows = [Row(*row) for row in results]
# 查询总数
count_query = f'SELECT COUNT(*) FROM {table_name}'
if condition:
count_query += f' WHERE {condition}'
self.cursor.execute(count_query)
total_count = self.cursor.fetchone()[0]
# 计算总页数
total_pages = ceil(total_count / page_size)
return rows, total_count, total_pages
def execute_sql(self, sql, params=None):
"""
执行自定义的SQL语句
:param sql: SQL语句
:param params: SQL语句中的参数,可选
:return: 执行结果的所有行
"""
if params is None:
self.cursor.execute(sql)
else:
self.cursor.execute(sql, params)
# self.conn.commit()
return self.cursor.fetchall()
def query_join(self, table1_name, table2_name, on_condition):
"""
执行表格的连接查询
:param table1_name: 第一个表格名称
:param table2_name: 第二个表格名称
:param on_condition: 连接条件
:return: 连接查询结果的所有行
"""
self.cursor.execute(f'''
SELECT * FROM {table1_name}
INNER JOIN {table2_name} ON {on_condition}
''')
return self.cursor.fetchall()
def close(self):
"""
关闭数据库连接
"""
self.conn.close()
# 使用示例:
if __name__ == '__main__':
# 创建数据库
db = SQLiteDB(r'E:\example.db')
# 创建表格
db.create_table('students', 'id INTEGER PRIMARY KEY, name TEXT NOT NULL, age INTEGER, gender TEXT')
db.create_table('courses', 'id INTEGER PRIMARY KEY, name TEXT NOT NULL, teacher TEXT')
# 插入数据
db.insert('students', {'name': 'Alice', 'age': 20, 'gender': 'Female'})
db.insert('students', {'name': 'Bob', 'age': 22, 'gender': 'Male'})
db.insert('courses', {'name': 'Math', 'teacher': 'Mr. Smith'})
db.insert('courses', {'name': 'English', 'teacher': 'Ms. Johnson'})
# 查询所有数据
print('All Students:')
print(db.query_all('students'))
# 两表联查
print('Join Students and Courses:')
print(db.query_join('students', 'courses', 'students.id = courses.id'))
# 分页查询
rows, total_count, total_pages = db.query_page('students', 10, 1)
for row in rows:
print(row.name, row.age, row.gender)
print(total_count)
print(total_pages)
# 修改数据
db.update('students', {'age': 21}, 'name = "Alice"')
# 新增数据
db.insert('students', {'name': 'Charlie', 'age': 19, 'gender': 'Male'})
# 删除数据
db.delete('students', 'name = "Bob"')
# 自定义SQL执行
db.execute_sql('DELETE FROM students WHERE name = "Charlie"')
db.close()
作者:秃了也弱了。