Python 局域网远程控制电脑
Python 局域网远程控制电脑
1.简介:
一款由Python可以远程控制局域网电脑关机、重启、注销、锁定 、休眠、退出登录甚至能操作远程电脑Cmd终端命令的一款工具。资源及源码已打包,大家可自行下载。
工具分为1.0以及2.0版本,2.0版本在1.0终端命令行模式更改为网页界面化操作,可利用手机等多终端设备控制,更加美观实用;优化了端口设置,添加了条件判断;可结合“Radmin_LAN”软件,实现异地控制。工具可用Pyinstaller打包成无窗口后台静默运行。
默认账号:root
默认密码:123456
2. 运行效果:
1.0版本运行效果:
2.0版本运行效果:
PC端页面效果:
服务端页面:
客户端登录页面:
客户端操作页面:
手机移动端展示效果:
3. 1.0版本相关源码:
服务端server.py
import socket
import keyboard
import subprocess
import threading
import logging
import queue
'''
import win32gui
import win32con
# 获取当前窗口句柄
hwnd = win32gui.GetForegroundWindow()
# 设置窗口属性,隐藏窗口
win32gui.ShowWindow(hwnd, win32con.SW_HIDE)
'''
# 配置日志
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
print("客户端登录账号:root,密码:123456,F3关闭服务器")
print()
ip1 = '192.168.137.1' # 默认ip地址
print(f"请输入服务器IP地址(电脑的IPv4地址),不输入默认为 {ip1}")
# 超时设置,15秒
def get_input(prompt, timeout, result_queue):
try:
user_input = input(prompt)
if user_input.strip() == "":
result_queue.put(None) # 如果用户直接回车,返回None
else:
result_queue.put(user_input)
except Exception as e:
logging.error(f"输入过程中发生错误: {e}")
# 创建一个线程来获取输入
print("超时时间---15秒")
print()
result_queue = queue.Queue()
input_thread = threading.Thread(target=get_input, args=("请输入服务器IP地址:", 15, result_queue))
input_thread.start()
# 等待输入,如果超时则返回默认值
input_thread.join(15)
if not result_queue.empty():
ip = result_queue.get()
if ip is None:
ip = ip1 # 如果用户直接回车,使用默认值
else:
ip = ip1
print(f"最终IP地址: {ip}")
'''
def get_ipv4_address():
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
return ip_address
ip=get_ipv4_address()
print(ip)
'''
# 创建一个 TCP/IP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置 SO_REUSEADDR 选项
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定服务器地址和端口
server_address = (ip, 5000)
server_socket.bind(server_address)
# 监听连接
server_socket.listen(5) # 支持最多5个并发连接
print('服务器正在等待连接...')
def close_server():
print('服务器已关闭')
server_socket.close()
exit()
# 监听 F3 键关闭服务器
keyboard.add_hotkey('F3', close_server)
def handle_client(client_socket, client_address):
try:
print('连接来自:', client_address)
# 接收账号和密码
data = client_socket.recv(1024)
logging.debug(f"接收到的数据: {data.decode()}")
# 解析账号和密码
try:
account, password = data.decode().split(':')
except ValueError:
logging.error("账号和密码格式错误")
message = '账号和密码格式错误!'
client_socket.send(message.encode())
return
# 验证账号和密码
if account == 'root' and password == '123456':
# 发送成功消息
message = '登录成功!'
client_socket.send(message.encode())
# 接收客户端请求
while True:
request = client_socket.recv(1024).decode()
logging.debug(f"接收到的请求: {request}")
if request == '1':
# 关机
subprocess.call('shutdown /s /t 0', shell=True)
elif request == '2':
# 重启
subprocess.call('shutdown /r /t 0', shell=True)
elif request == '3':
# 休眠
subprocess.call('rundll32.exe powrprof.dll,SetSuspendState 0,1,0', shell=True)
elif request == '4':
# 锁定
subprocess.call('rundll32.exe user32.dll,LockWorkStation', shell=True)
elif request == '0':
# 退出登录
print(client_address, "退出登录")
break
elif request == '5':
# 等待用户输入
user_input = 1
while True:
# 执行cmd命令
command = client_socket.recv(1024).decode()
logging.debug(f"接收到的命令:{command}")
try:
# 执行命令
result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
# 获取命令的输出
output, error = result.communicate()
# 打印命令的输出到本地控制台
logging.debug(f"{output}")
# 发送命令的输出
if error:
output = " "
client_socket.sendall(output.encode())
logging.debug(f"{error}")
# 发送命令的错误
client_socket.sendall(error.encode())
logging.debug("命令结果已发送")
except subprocess.CalledProcessError as e:
logging.error(f"{e.stderr}")
# 发送错误信息
client_socket.sendall(str(e).encode())
finally:
# 发送一个结束标记
client_socket.sendall(' '.encode())
# 等待用户输入
#接收客户端的user_input值
user_input = client_socket.recv(1024).decode()
logging.debug(f"接收到的请求: {user_input}")
if user_input == '1':
# 继续执行命令
continue
elif user_input == '2':
print(client_address, "退出登录")
# 退出循环
break
else:
# 无效输入
client_socket.sendall('无效输入'.encode())
# 继续执行命令
continue
else:
# 无效输入
client_socket.sendall('无效输入'.encode())
else:
# 无效请求
message = '无效请求!'
client_socket.send(message.encode())
else:
# 发送失败消息
message = '账号或密码错误!'
client_socket.send(message.encode())
except Exception as e:
logging.error(f"处理客户端连接时发生错误: {e}")
finally:
# 关闭连接
client_socket.close()
while True:
try:
# 等待客户端连接
client_socket, client_address = server_socket.accept()
logging.debug(f"接受到来自 {client_address} 的连接")
# 创建新线程处理客户端连接
client_thread = threading.Thread(target=handle_client, args=(client_socket, client_address))
client_thread.start()
except OSError as e:
if e.errno == 10038:
print('服务器已关闭')
break
else:
raise
input("按回车键退出...")
客户端client.py
import socket
def run_script():
try:
print()
ip1 = '192.168.137.1'
print("请输入服务器的IP地址,不输入默认ip为", ip1)
print()
ip = input("请输入服务器的IP地址:") or ip1
print()
# 服务器地址和端口
server_address = (ip, 5000)
# 创建一个 TCP/IP socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# 连接到服务器
client_socket.connect(server_address)
print('已连接到服务器')
# 获取用户输入的账号和密码
account = input('请输入账号:') or 'root'
password = input('请输入密码:') or '123456'
# 发送账号和密码
message = f'{account}:{password}'
client_socket.sendall(message.encode())
# 接收服务器响应
response = client_socket.recv(1024).decode()
print('服务器响应:', response)
if response == '登录成功!':
print('登录成功')
print()
# 发送默认请求
print("[0]退出登录---[1]关机---[2]重启---[3]休眠")
print("[4]锁定---[5]执行cmd命令")
print()
request = input('输入数字: ') or "5" # 你可以修改这个请求
if request in ['0', '1', '2', '3', '4', '5']:
client_socket.sendall(request.encode())
print("请求已发送")
if request == '0':
print("退出登录成功")
elif request == '5':
while True:
# 获取用户输入的命令
print()
command = input('请输入要执行的命令:') or "echo %date% %time%"
# 发送命令
client_socket.sendall(command.encode())
# 设置超时时间
client_socket.settimeout(60)
try:
print()
print("接收超时时间为60秒")
print()
# 接收命令的输出
data = b''
count = 0
while count < 2:
packet = client_socket.recv(4096)
data += packet
count += 1
output = data.decode('utf-8', errors='ignore')
print(output)
except socket.timeout:
print("接收命令的输出超时")
# 等待用户输入
print()
user_input = input("是否继续执行命令?[1]继续---[2]退出: ") or "1"
if user_input == '1':
# 发送
client_socket.sendall(user_input.encode())
# 继续执行命令
continue
elif user_input == '2':
print("退出登录成功")
break
else:
# 无效输入
print('无效输入,继续执行命令')
# 发送
client_socket.sendall(user_input.encode())
# 继续执行命令
continue
else:
print("无效的请求")
else:
print('登录失败')
finally:
# 关闭连接
client_socket.close()
user_choice = input("输入1继续运行,输入2结束代码: ")
if user_choice == '2':
print("结束代码")
return
elif user_choice == '1':
print("继续运行")
run_script()
else:
print("无效输入,默认退出代码")
except Exception as e:
print(f"An error occurred: {e}")
else:
print("Script completed successfully.")
run_script()
4. 2.0版本相关源码:
from flask import Flask, request, render_template_string, jsonify
import subprocess
import socket
import os
app = Flask(__name__)
def get_local_ip():
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
# 连接到一个公网DNS服务器,不发送任何数据,只是为了获取本地IP
s.connect(("8.8.8.8", 53))
return s.getsockname()[0]
except:
return None # 如果无法获取IP,返回None
def load_server_ip():
ip_file = 'server_ip.txt'
if not os.path.exists(ip_file):
return None
with open(ip_file, 'r') as f:
ip = f.read().strip()
return ip if ip else None
def save_server_ip(ip):
ip_file = 'server_ip.txt'
with open(ip_file, 'w') as f:
f.write(ip)
def ping_ip(ip):
"""
Ping指定的IP,返回True如果IP可达,否则返回False
"""
try:
# 在Windows系统上使用-4参数强制使用IPv4
output = subprocess.check_output(['ping', '-4', '-n', '1', ip], stderr=subprocess.STDOUT, universal_newlines=True)
if "unreachable" in output.lower():
return False
return True
except subprocess.CalledProcessError:
return False
def bind_server(host_ip):
try:
print(f"尝试绑定到IP: {host_ip}")
app.run(host=host_ip, port=80)
except Exception as e:
print(f"绑定到 {host_ip} 失败: {e}")
return False
return True
def main():
# 尝试从server_ip.txt读取IP
server_ip = load_server_ip()
if server_ip:
print(f"从文件加载服务器IP: {server_ip}")
if not ping_ip(server_ip):
print(f"Ping测试失败,IP {server_ip} 不可用")
server_ip = None
else:
print(f"Ping测试成功,IP {server_ip} 可用")
else:
server_ip = None # 重置server_ip,因为文件不存在或内容为空
# 如果server_ip.txt中的IP不可用,尝试获取本机IP
if not server_ip:
server_ip = get_local_ip()
if server_ip:
print(f"获取到服务器IP: {server_ip}")
if not ping_ip(server_ip):
print(f"Ping测试失败,IP {server_ip} 不可用")
server_ip = None
else:
print(f"Ping测试成功,IP {server_ip} 可用")
save_server_ip(server_ip)
else:
print("无法获取本机IP")
# 尝试绑定到server_ip
if server_ip:
if not bind_server(server_ip):
# 如果绑定失败,尝试绑定到127.0.0.1
print("绑定到指定IP失败,尝试绑定到127.0.0.1")
if not bind_server('127.0.0.1'):
print("绑定到127.0.0.1也失败,终止脚本")
exit(1)
else:
# 如果没有有效的IP,尝试绑定到127.0.0.1
print("没有有效的IP,尝试绑定到127.0.0.1")
if not bind_server('127.0.0.1'):
print("绑定到127.0.0.1失败,终止脚本")
exit(1)
@app.route('/', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
data = request.form
if 'username' in data and 'password' in data:
username = data['username']
password = data['password']
print(f"Received username: {username}, password: {password}")
if username == 'root' and password == '123456': # 这里是用户名和密码,请自行修改。
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>登录成功</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
h1, h2 {
text-align: center;
}
/* 按钮样式 */
.btn {
padding: 10px 20px;
font-size: 16px;
margin: 5px;
border: none;
border-radius: 4px;
cursor: pointer;
background-color: #007bff;
color: white;
}
.btn:hover {
background-color: #0056b3;
}
/* 输入框样式 */
#command-input {
width: 100%;
height: 100px;
font-size: 16px;
padding: 10px;
resize: vertical;
border: 1px solid #ccc;
border-radius: 4px;
box-sizing: border-box;
}
/* 结果显示区域 */
#result {
margin-top: 20px;
padding: 10px;
border: 1px solid #ddd;
border-radius: 4px;
background-color: #f9f9f9;
white-space: pre-wrap; /* 保留换行 */
}
/* 执行按钮样式 */
#execute-button {
float: right;
margin-top: 10px;
}
/* 响应式设计 */
@media (max-width: 600px) {
.btn {
width: 100%;
font-size: 14px;
}
#command-input {
height: 100px;
font-size: 14px;
}
#execute-button {
width: 100%;
float: none;
}
#result {
font-size: 14px;
}
}
</style>
</head>
<body>
<h1>登录成功</h1>
<div style="text-align: center;">
<button class="btn">退出登录</button>
<button class="btn">关机</button>
<button class="btn">重启</button>
<button class="btn">休眠</button>
<button class="btn">锁定</button>
</div>
<br>
<h2>执行命令</h2>
<textarea id="command-input" placeholder="请输入命令"></textarea>
<button id="execute-button" class="btn">执行</button>
<h3>结果:</h3>
<div id="result"></div>
<script>
function logout() {
fetch('/logout', { method: 'GET' })
.then(response => response.json())
.then(data => {
alert(data.message);
window.location.href = '/';
});
}
function shutdown() {
fetch('/shutdown', { method: 'GET' })
.then(response => response.json())
.then(data => alert(data.message));
}
function restart() {
fetch('/restart', { method: 'GET' })
.then(response => response.json())
.then(data => alert(data.message));
}
function sleep() {
fetch('/sleep', { method: 'GET' })
.then(response => response.json())
.then(data => alert(data.message));
}
function lock() {
fetch('/lock', { method: 'GET' })
.then(response => response.json())
.then(data => alert(data.message));
}
function executeCommand() {
const command = document.getElementById('command-input').value;
fetch('/execute', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ command: command })
})
.then(response => response.json())
.then(data => {
document.getElementById('result').innerText = data.output;
})
.catch(error => {
document.getElementById('result').innerText = '执行失败';
});
}
</script>
</body>
</html>
''')
else:
return jsonify({"message": "登录失败"}), 401
else:
return jsonify({"message": "缺少用户名或密码"}), 400
else:
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>登录</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<style>
body {
font-family: Arial, sans-serif;
margin: 20px;
}
form {
max-width: 400px;
margin: 0 auto;
}
input[type="text"], input[type="password"] {
width: 100%;
padding: 10px;
margin: 10px 0;
border: 1px solid #ccc;
border-radius: 4px;
box-sizing: border-box;
}
input[type="submit"] {
width: 100%;
padding: 10px;
background-color: #28a745;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
input[type="submit"]:hover {
background-color: #218838;
}
@media (max-width: 600px) {
form {
max-width: 100%;
}
}
</style>
</head>
<body>
<h1>登录</h1>
<form method="post">
<input type="text" name="username" placeholder="用户名"><br>
<input type="password" name="password" placeholder="密码"><br>
<input type="submit" value="登录">
</form>
</body>
</html>
''')
@app.route('/logout', methods=['GET'])
def logout():
return jsonify({"message": "退出登录成功"}), 200
@app.route('/shutdown', methods=['GET'])
def shutdown():
try:
subprocess.run(["shutdown", "/s", "/t", "1"], check=True)
return jsonify({"message": "关机成功"}), 200
except subprocess.CalledProcessError as e:
return jsonify({"message": "关机失败", "error": str(e)}), 500
@app.route('/restart', methods=['GET'])
def restart():
try:
subprocess.run(["shutdown", "/r", "/t", "1"], check=True)
return jsonify({"message": "重启成功"}), 200
except subprocess.CalledProcessError as e:
return jsonify({"message": "重启失败", "error": str(e)}), 500
@app.route('/sleep', methods=['GET'])
def sleep():
try:
subprocess.run(["rundll32.exe", "powrprof.dll,SetSuspendState", "0,1,0"], check=True)
return jsonify({"message": "休眠成功"}), 200
except subprocess.CalledProcessError as e:
return jsonify({"message": "休眠失败", "error": str(e)}), 500
@app.route('/lock', methods=['GET'])
def lock():
try:
subprocess.run(["rundll32", "user32.dll,LockWorkStation"], check=True)
return jsonify({"message": "锁定成功"}), 200
except subprocess.CalledProcessError as e:
return jsonify({"message": "锁定失败", "error": str(e)}), 500
@app.route('/execute', methods=['POST'])
def execute():
data = request.get_json()
command = data.get('command', '')
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True)
output = result.stdout + result.stderr
return jsonify({"output": output}), 200
except Exception as e:
return jsonify({"output": f"执行失败: {str(e)}"}), 500
if __name__ == '__main__':
main()
作者:hvinsion