【Python】使用Luckysheet实现Excel多人在线协同编辑
前言
闲的无事,捣鼓了一下Luckysheet,虽然luckysheet已经出3.0并改名为Univer了,但不影响我继续使用。今天要弄的是多人在线协同编辑Excel,虽然网上也有一些凌乱的教程,但是好像没涉及到Python的,而最近在转行做Python项目,所以借这个机会好好捣鼓一下。不知道luckysheet为何物的朋友可先百度查一下,这里就不多说了,直接进入正题。
正文
准备工作
前端Vue2.0,后端Python3.8 + FastApi + WebSocket
引入luckysheet依赖
CSDN
<link rel='stylesheet' href='https://cdn.jsdelivr.net/npm/luckysheet/dist/plugins/css/pluginsCss.css' />
<link rel='stylesheet' href='https://cdn.jsdelivr.net/npm/luckysheet/dist/plugins/plugins.css' />
<link rel='stylesheet' href='https://cdn.jsdelivr.net/npm/luckysheet/dist/css/luckysheet.css' />
<link rel='stylesheet' href='https://cdn.jsdelivr.net/npm/luckysheet/dist/assets/iconfont/iconfont.css' />
<script src="https://cdn.jsdelivr.net/npm/luckysheet/dist/plugins/js/plugin.js"></script>
<script src="https://cdn.jsdelivr.net/npm/luckysheet/dist/luckysheet.umd.js"></script>
或者自己本地打包
Luckysheet: 🚀Luckysheet ,一款纯前端类似excel的在线表格,功能强大、配置简单、完全开源。
官网建议我们下载完整的包,这样,我们得到的是luckysheet的源码,可以进行二次开发。
npm i –s // 执行 npm 命令,进行依赖包的下载
npm run build // 执行打包命令(二次开发是需要修改源码的)
然后把dist包放到public目录下,可以改个名字,最后在index.html中引入即可
开始工作
home.vue
<template>
<div>
<button @click="add">新建Excel</button>
<div>
<ul>
<li v-for="(item, index) in excels" :key="index">
<a @click="detail(item.wbId)">{{item.option.title}}</a>
</li>
</ul>
</div>
</div>
</template>
<script>
import axios from "axios"
export default {
data(){
return {
excels:[],
workbook: {}
}
},
created(){
this.getList();
},
methods:{
add(){
axios.post('/api/luckysheet/index/create')
.then(response => {
this.workbook = response.data;
// 跳转页面
this.$router.push({ path: `/luckysheet/${this.workbook.wbId}` });
})
.catch(error => {
console.log(error)
})
},
detail(wbId){
this.$router.push({ path: `/luckysheet/${wbId}` });
},
getList(){
axios.get('/api/luckysheet/index')
.then(response => {
this.excels = response.data;
})
.catch(error => {
console.log(error)
})
}
}
}
</script>
luckysheet.vue
<template>
<div>
<div :id="id" style="margin:0px;padding:0px;position:absolute;width:100%;height:100%;left: 0px;top: 0px;"></div>
</div>
</template>
<script>
import axios from "axios"
export default {
data(){
return {
wb: {},
wbId: 0,
id: "",
}
},
created(){
this.wbId = this.$route.params.wbId;
this.getWorkbook(this.wbId);
},
methods:{
getWorkbook(wbId){
axios.get(`/api/luckysheet/index/${wbId}`)
.then(response => {
this.wb = response.data;
this.id = this.wb.option.container;
this.initSheet(wbId);
})
.catch(error => {
console.log(error)
})
},
initSheet(wbId){
// 初始化表格
var options = {
container: this.id, //luckysheet为容器id
title: this.wb.option.title,
allowUpdate: this.wb.option.allowUpdate,
loadUrl: `/api/luckysheet/load/${wbId}`,
updateUrl: "ws://localhost:8000/ws/" + Math.round(Math.random() * 100) + `/${wbId}`,
};
window.luckysheet.create(options);
}
}
}
</script>
后端
config.py 用于存储数据(本示例没有用到数据库)
# 保存所有工作簿
work_books = {}
# 保存每个工作簿的每个sheet
work_sheets = {}
router.py
from fastapi import APIRouter
import json
from config import *
import uuid
router = APIRouter()
# 返回所有工作簿
@router.get("/luckysheet/index")
def index():
wbs = []
for wb_id, wb in work_books.items():
wbs.append(wb)
return wbs
# 创建工作簿
@router.post("/luckysheet/index/create")
def createbook():
# 生成随机的uuid
wb_id = str(uuid.uuid4())
wb = {
"wbId": wb_id,
"name": "default",
"option": {
"container": "luckysheet",
"title": "leckysheet demo",
"lang": "zh",
"allowUpdate": True,
"loadUrl": "",
"loadSheetUrl": "",
"updateUrl": ""
}
}
# 保存工作簿
work_books.update({wb_id: wb})
# 生成工作表
createsheet(wb_id)
return wb
# 获取工作簿
@router.get("/luckysheet/index/{wb_id}")
def getworkbook(wb_id: str):
wb = work_books.get(wb_id,{})
if not wb:
wb = createbook()
return wb
# 创建工作表
def createsheet(wb_id:str):
for i in range(4):
index = str(uuid.uuid4())
status = 1 if i==0 else 0
sheet = {
"wb_id": wb_id,
"data": {
"row": 84,
"column": 60,
"name": f"sheet{i}",
"index": index,
"order": i,
"status": status,
"celldata": []
},
"delete_status": 0
}
if wb_id not in work_sheets.keys():
work_sheets.update({wb_id: {index: sheet}})
else:
work_sheets[wb_id].update({index: sheet})
# 加载工作簿下所有的工作表
@router.post("/luckysheet/load/{wb_id}")
def load(wb_id:str):
sheets = []
for index, sheet in work_sheets[wb_id].items():
sheets.append(sheet["data"])
return json.dumps(sheets)
# 加载工作簿下所有的工作表
@router.post("/luckysheet/loadsheet/{wb_id}")
def loadsheet(wb_id:str):
sheets = []
for index, sheet in work_sheets[wb_id].items():
sheets.append(sheet["data"])
if not sheets:
createsheet(wb_id)
for index, sheet in work_sheets[wb_id].items():
sheets.append(sheet["data"])
return json.dumps(sheets)
gzip.py
import gzip
from urllib import parse
class GzipManager:
# 前端数据经过pako 压缩 ,拿到消息需要先解压
def decompress_data(self, message:str):
decompressed_data = gzip.decompress(bytes(message, "ISO-8859-1"))
decompressed_data = decompressed_data.decode("utf-8")
decompressed_data = parse.unquote(decompressed_data)
return decompressed_data
websocketrouter.py
from functools import cache
from fastapi import APIRouter,WebSocket,WebSocketDisconnect
from websocketutils import WebsocketManager
from gzipUtil import GzipManager
import logging
import json
router = APIRouter()
manager = WebsocketManager()
gzip = GzipManager()
@router.websocket("/ws/{user_id}/{grid_key}")
async def websocket_serve(
user_id: str,
grid_key: str,
websocket: WebSocket):
# 客户端、服务端建立 ws 连接 记录客户端信息
await manager.connect(user_id, grid_key, websocket)
logging.error(f'用户连接:{user_id}, 打开的表格:{grid_key}, 当前在线人数:{manager.active_clients_len}')
try:
while True:
# 服务端接收客户端发送的内容
msg = await websocket.receive_text()
if msg:
if "rub" == msg:
continue
# 解压消息
message = gzip.decompress_data(msg)
logging.error(f"用户消息:{user_id},报文:{message}")
# 转json
jsonmsg = json.loads(message)
if "mv" != jsonmsg.get("t"):
await manager.process_message(grid_key, jsonmsg)
users = manager.active_clients.get(grid_key,{})
if users:
# 广播除了发送者的表格位置信息
for user, wb in users.items():
if user != user_id:
try:
# 如果是mv,代表发送者的表格位置信息
if "mv" == jsonmsg.get("t"):
await manager.send_message_to_client(user, grid_key, json.dumps(manager.mv_sheet(user, user, message)))
elif "shs" != jsonmsg.get("t"):
# 如果是切换sheet,则不发送信息
await manager.send_message_to_client(user, grid_key, json.dumps(manager.shs_sheet(user, user, message)))
except Exception:
pass
except WebSocketDisconnect:
manager.disconnect(user_id, grid_key)
websocketutils.py
from fastapi import WebSocket
from typing import List
from switchlang import switch
import copy
import re
import logging
from config import *
"""
创建工具管理类 处理服务端和客户端的交互
"""
class WebsocketManager:
def __init__(self):
# 初始化参数 记录活跃的客户端
self.active_clients = {}
self.active_clients_len = 0
"""
{
"grid_key1":{
"grid_key": "1101",
"data": {
"column": 60,
"name": "sheet1",
"index": ""
}
},
}
"""
async def connect(self, user_id:str ,grid_key:str, websocket: WebSocket):
# 创建客户与服务端之间的连接 并记录下客户端
await websocket.accept()
if grid_key not in self.active_clients.keys():
self.active_clients.update({grid_key: {user_id: websocket}})
else:
self.active_clients[grid_key].update({user_id: websocket})
self.active_clients_len = len(self.active_clients[grid_key].keys())
def disconnect(self, user_id:str ,grid_key:str):
# 断开某个客户端的连接
if grid_key in self.active_clients.keys():
if user_id in self.active_clients[grid_key].keys():
self.active_clients[grid_key].pop(user_id)
if not self.active_clients[grid_key]:
# 为空
self.active_clients.pop(grid_key)
self.active_clients_len = 0
else:
self.active_clients_len = len(self.active_clients[grid_key].keys())
async def send_message_to_client(self, user_id:str ,grid_key:str, message: str):
# 给客户端发送消息
if grid_key in self.active_clients.keys():
if user_id in self.active_clients[grid_key].keys():
await self.active_clients[grid_key][user_id].send_text(message)
async def process_message(self, grid_key:str, message:dict):
# 获取操作名
action = message.get("t")
# 获取sheet的index值
index = message.get("i")
# 如果是复制sheet,index的值需要另取
if "shc" == action:
index = message.get("v").get("copyindex");
# 如果是删除sheet,index的值需要另取
if "shd" == action:
index = message.get("v").get("deleIndex");
# 如果是恢复sheet,index的值需要另取
if "shre" == action:
index = message.get("v").get("reIndex");
ws = None
try:
ws = work_sheets[grid_key][index]
except Exception:
pass
# 单个单元格刷新
if action == "v":
logging.error("单个单元格刷新")
ws = self.single_cell_refresh(ws, message)
# 范围单元格刷新
if action == "rv":
ws = self.range_cell_refresh(ws, message)
# config操作
if action == "cg":
ws = self.config_refresh(ws, message)
# 通用保存
if action == "all":
ws = self.all_refresh(ws, message)
# 函数链操作
if action == "fc":
ws = self.calc_chain_refresh(ws, message)
# 删除行或列
if action == "drc":
ws = self.drc_refresh(ws, message)
# 增加行或列
if action == "arc":
ws = self.arc_refresh(ws, message)
# 清除筛选
if action == "fsc":
ws = self.fsc_refresh(ws, message)
# 恢复筛选
if action == "fsr":
ws = self.fsc_refresh(ws, message)
# 新建sheet
if action == "sha":
ws = self.sha_refresh(grid_key, message)
# 切换到指定sheet
if action == "shs":
self.shs_refresh(grid_key, message)
# 复制sheet
if action == "shc":
ws = self.shc_refresh(ws, message)
# 修改工作簿名称
if action == "na":
self.na_refresh(grid_key, message)
# 删除sheet
if action == "shd":
ws['deleteStatus'] = 1
# 删除sheet后恢复操作
if action == "shre":
ws['deleteStatus'] = 0
# 调整sheet位置
if action == "shr":
self.shr_refresh(grid_key, message)
# sheet属性(隐藏或显示)
if action == "sh":
ws = self.sh_refresh(ws, message)
if not ws:
return
# 保存
work_sheets[grid_key].update({index: ws})
def single_cell_refresh(self, ws:dict, message:dict):
# 对celldata进行深拷贝
celldata = copy.deepcopy(ws['data']['celldata'])
if not message.get("v"):
for c in celldata:
if c:
if c['r'] == message['r'] and c['c'] == message['c']:
ws['data']['celldata'].remove(c)
else:
collectdata = {"r": message['r'], "c": message['c'], "v": message['v']}
flag = []
for c in celldata:
if c:
if c['r'] == message['r'] and c['c'] == message['c']:
ws['data']['celldata'].remove(c)
ws['data']['celldata'].append(collectdata)
flag.append("used")
if not flag:
ws['data']['celldata'].append(collectdata)
return ws
def range_cell_refresh(self, ws:dict, message:dict):
rowArray = message.get("range").get("row")
columnArray = message.get("range").get("column")
vArray = message.get("v")
celldata = ws["data"]["celldata"]
countRowIndex = 0
# 遍历行列,对符合行列的内容进行更新
for ri in (rowArray[0], rowArray[1]+1):
countColumnIndex = 0
for ci in (columnArray[0], columnArray[1]+1):
flag = []
newcell = vArray[countRowIndex][countColumnIndex]
collectdata = {"r": ri, "c": ci, "v": newcell}
rowIndex = ri
columnIndex = ci
for c in celldata:
if c:
if c['r'] == rowIndex and c['c'] ==columnIndex:
if str(newcell) == "null" or not newcell:
ws['data']['celldata'].remove(c)
else:
ws['data']['celldata'].remove(c)
ws['data']['celldata'].append(collectdata)
flag.append("used")
if not flag and newcell:
ws['data']['celldata'].append(collectdata)
countColumnIndex +=1
countRowIndex +=1
return ws
def config_refresh(self, ws:dict, message:dict):
ws["data"]["config"] = {message.get("k"): message.get("v")}
return ws
def all_refresh(self, ws:dict, message:dict):
temp = message.get("v")
if not temp:
ws["data"].remove(message.get("k"))
else:
ws["data"].update({message.get("k"): temp})
return ws
def calc_chain_refresh(self, ws:dict, message:dict):
value = message.get("v");
calcChain = []
if "add" == message["op"]:
calcChain.append(value)
elif "update" == message["op"]:
calcChain.remove(calcChain[message[message["pos"]]])
calcChain.append(value)
elif "del" == message["op"]:
calcChain.remove(calcChain[message[message["pos"]]])
ws["data"]["calcChain"] = calcChain
return ws
def drc_refresh(self, ws:dict, message:dict):
celldata = copy.deepcopy(ws['data']['celldata'])
index = message.get("v").get("index")
len = message.get("v").get("len")
if "r" == message.get("rc"):
ws["data"].update({"row": ws["data"]["row"] - len})
else:
ws["data"].update({"column": ws["data"]["column"] - len})
for c in celldata:
if "r" == message.get("rc"):
# 删除行所在区域的内容
if c['r'] >= index and c["r"] < index + len:
ws['data']['celldata'].remove(c)
# 增加大于 最大删除行的的行号
if c['r'] >= index + len:
ws['data']['celldata'].remove(c)
c.update({"r": c.get("r") - len})
ws['data']['celldata'].append(c)
else:
# 删除列所在区域的内容
if c["c"] >= index and c["c"] < index + len:
ws['data']['celldata'].remove(c)
# 增加大于 最大删除列的的列号
if c["c"] >= index + len:
ws['data']['celldata'].remove(c)
c.update({"c": c.get("c") - len})
ws['data']['celldata'].append(c)
return ws
def arc_refresh(self, ws:dict, message:dict):
celldata = copy.deepcopy(ws['data']['celldata'])
index = message.get("v").get("index")
len = message.get("v").get("len")
for c in celldata:
if "r" == message.get("rc"):
# 如果是增加行,且是向左增加
if c["r"] >= index and "lefttop" == message["v"]["direction"]:
ws["data"]["celldata"].remove(c)
c.update({"r": c.get("r") + len})
ws['data']['celldata'].append(c)
# 如果是增加行,且是向右增加
if c["r"] > index and "rightbottom" == message["v"]["direction"]:
ws["data"]["celldata"].remove(c)
c.update({"r": c.get("r") + len})
ws['data']['celldata'].append(c)
else:
# 如果是增加列,且是向上增加
if c["c"] >= index and "lefttop" == message["v"]["direction"]:
ws["data"]["celldata"].remove(c)
c.update({"c": c.get("c") + len})
ws['data']['celldata'].append(c)
# 如果是增加行,且是向右增加
if c["c"] > index and "rightbottom" == message["v"]["direction"]:
ws["data"]["celldata"].remove(c)
c.update({"c": c.get("c") + len})
ws['data']['celldata'].append(c)
vArray = message.get("v").get("data")
if "r" == message.get("rc"):
ws["data"].update({"row": ws["data"]["row"] + len})
for r_index, r_value in enumerate(vArray):
for c_index,c_value in enumerate(r_value):
if not c_value:
continue
newcell = {"r": r_index + index, "c": c_index, "v": c_value}
ws["data"]["celldata"].append(newcell)
else:
ws["data"].update({"column": ws["data"]["column"] + len})
for r_index, r_value in enumerate(vArray):
for c_index,c_value in enumerate(r_value):
if not c_value:
continue
newcell = {"r": r_index, "c": c_index + index, "v": c_value}
ws["data"]["celldata"].append(newcell)
return ws
def fsc_refresh(self, ws:dict, message:dict):
if not message.get("v"):
ws["data"].remove("filter")
ws["data"].remove("filter_select");
else:
ws["data"].update({"filter": message.get("v").get("filter")})
ws["data"].update({"filter_select": message.get("v").get("filter_select")})
return ws
def fsr_refresh(self, ws:dict, message:dict):
pass
def sha_refresh(self, grid_key:str, message:dict):
ws = {
"wb_id": grid_key,
"data": message.get("v")
}
return ws
def shs_refresh(self, grid_key:str, message:dict):
for index,sheet in work_sheets[grid_key]:
if sheet["data"]["status"] == 1:
sheet["data"]["status"] = 0
work_sheets[grid_key][message["v"]]["data"]["status"] = 1
def shc_refresh(self, ws:dict, message:dict):
index = message.get("i")
ws["data"]["index"] = index
ws["data"]["name"] = message["v"]["name"]
return ws
def na_refresh(self, grid_key:str, message:dict):
work_books[grid_key]["option"]["title"] = message["v"]
def shr_refresh(self, grid_key:str, message:dict):
for index, sheet in work_sheets[grid_key].items():
sheet["data"]["order"] = message["v"][sheet["data"]["index"]]
def sh_refresh(self, ws:dict, message:dict):
ws["data"].update({"hide": message["v"]})
if "hide" == message.get("op"):
ws["data"]["status"] = 0
work_sheets[ws["wb_id"]][message.get("cur")]["data"]["status"] = 1
else:
for index,sheet in work_sheets[ws["wb_id"]]:
if sheet["data"]["status"] == 1:
sheet["data"]["status"] = 0
break
return ws
def is_json(self, str):
pattern = r'^\s*[\{\[](.*)[\}\]]\s*$' # 匹配JSON格式的正则表达式
return re.match(pattern, str) is not None
def mv_sheet(self, user_id:str, user_name:str, msg:str):
return {"type":3, "id":user_id, "username":user_name, "data":msg}
def shs_sheet(self, user_id:str, user_name:str, msg:str):
return {"type":2, "id":user_id, "username":user_name, "data":msg}
async def broadcast(self, message: str):
# 给所有客户端发送消息 广播
for grid_key, users in self.active_clients.items():
for user_id, websocket in users.items():
await websocket.send_text(message)
最后
能用代码解决的事,就不多说一句话,所以大家直接照搬吧