Python使用Canoe控制系统变量和信号值
1、Canoe获取启动Canoe,停止Canoe,获取系统变量,更改系统变量,获取信号值Canoe工程如下
2、Python_canoe_start_and_stop_variables_and_signals_read_and_write.py代码如下
# --------------------------------------------------------------------------
# Standard library imports
import os
import sys
import subprocess
import time
import msvcrt
from win32com.client import *
from win32com.client.connect import *
# Vector Canoe Class
class CANoe:
def __init__(self, visible=True):
self.application = None
self.application = DispatchEx("CANoe.Application")
#self.application.Versible = visible
self.ver = self.application.Version
print('Loaded CANoe version ',
self.ver.major, '.',
self.ver.minor, '.',
self.ver.Build, '...') # , sep,''
self.Measurement = self.application.Measurement.Running
def open_cfg(self, cfgname):
# open CANoe simulation
cfgname = os.path.abspath(cfgname)
if (self.application != None):
# check for valid file and it is *.cfg file
if os.path.isfile(cfgname) and (os.path.splitext(cfgname)[1] == ".cfg"):
self.application.Open(cfgname)
print("opening..." + cfgname)
else:
raise RuntimeError("Can't find CANoe cfg file")
else:
raise RuntimeError("CANoe Application is missing,unable to open simulation")
def close_cfg(self):
# close CANoe simulation
if (self.application != None):
print("close cfg ...")
# self.stop_Measurement()
self.application.Quit()
self.application = None
def start_Measurement(self):
retry = 0
retry_counter = 5
# try to establish measurement within 5s timeout
while not self.application.Measurement.Running and (retry < retry_counter):
self.application.Measurement.Start()
time.sleep(1)
retry += 1
if (retry == retry_counter):
raise RuntimeWarning("CANoe start measuremet failed, Please Check Connection!")
def stop_Measurement(self):
if self.application.Measurement.Running:
self.application.Measurement.Stop()
else:
pass
def get_SigVal(self, channel_num, msg_name, sig_name, bus_type="CAN"):
"""
@summary Get the value of a raw CAN signal on the CAN simulation bus
@param channel_num - Integer value to indicate from which channel we will read the signal, usually start from 1,
Check with CANoe can channel setup.
@param msg_name - String value that indicate the message name to which the signal belong. Check DBC setup.
@param sig_name - String value of the signal to be read
@param bus_type - String value of the bus type - e.g. "CAN", "LIN" and etc.
@return The CAN signal value in floating point value.
Even if the signal is of integer type, we will still return by
floating point value.
@exception None
"""
if (self.application != None):
result = self.application.GetBus(bus_type).GetSignal(channel_num, msg_name, sig_name)
return result.Value
else:
raise RuntimeError("CANoe is not open,unable to GetVariable")
def set_SigVal(self, channel_num, msg_name, sig_name, bus_type, setValue):
if (self.application != None):
result = self.application.GetBus(bus_type).GetSignal(channel_num, msg_name, sig_name)
result.Value = setValue
else:
raise RuntimeError("CANoe is not open,unable to GetVariable")
def get_EnvVar(self, var):
if (self.application != None):
result = self.application.Environment.GetVariable(var)
return result.Value
else:
raise RuntimeError("CANoe is not open,unable to GetVariable")
def set_EnvVar(self, var, value):
result = None
if (self.application != None):
# set the environment varible
result = self.application.Environment.GetVariable(var)
result.Value = value
checker = self.get_EnvVar(var)
# check the environment varible is set properly?
while (checker != value):
checker = self.get_EnvVar(var)
else:
raise RuntimeError("CANoe is not open,unable to SetVariable")
def get_SysVar(self, ns_name, sysvar_name):
if (self.application != None):
namespace = self.application.System.Namespaces(ns_name)
# tokens[0]是py_CANoe,即namespace的名字
var_value = namespace.Variables(sysvar_name).Value
# systemCAN = self.application.System.Namespaces
# print(systemCAN)
# sys_namespace = systemCAN(ns_name)
# sys_value = sys_namespace.Variables(sysvar_name)
return var_value
else:
raise RuntimeError("CANoe is not open,unable to GetVariable")
def set_SysVar(self, ns_name, sysvar_name, var):
if (self.application != None):
systemCAN = self.application.System.Namespaces(ns_name)
# sys_namespace = systemCAN.Variables(sysvar_name).Value
# sys_value = sys_namespace.Variables(sysvar_name)
systemCAN.Variables(sysvar_name).Value = var
else:
raise RuntimeError("CANoe is not open,unable to GetVariable")
def DoEvents(self):
pythoncom.PumpWaitingMessages()
time.sleep(1)
app = CANoe() # 定义CANoe为app
app.open_cfg(r"../Common_if_testapp/CANoecfg/Common_if_testapp.cfg") # 导入某个CANoe congif
time.sleep(5)
app.start_Measurement()
# 信号测试
time.sleep(15)
#获取can 路上的信号值
while not msvcrt.kbhit():
EngineSpeedDspMeter=app.get_SigVal(5,"IPD_20ms_PDU04", "IRC_IPDPriyChannleFltSts","CAN")
print(EngineSpeedDspMeter)
if (EngineSpeedDspMeter == 2):
# app.set_SigVal(1,"EngineState", "EngineSpeed","CAN",3.0)
print(EngineSpeedDspMeter)
break
app.DoEvents()
#获取系统变量的值以及更改系统变量的值
while not msvcrt.kbhit():
EngineSpeedDspMeter = app.get_SysVar("seil_bianliang", "zj")
print(EngineSpeedDspMeter)
if (EngineSpeedDspMeter == 1):
print("正确")
app.set_SysVar("seil_bianliang", "zj",5)
# app.set_SysVar("Engine", "EngineSpeedDspMeter", 3.0)
break
app.DoEvents()
#更改系统变量的值
while not msvcrt.kbhit():
EngineSpeedDspMeter = app.get_SysVar("seil_bianliang", "zj")
print(EngineSpeedDspMeter)
if (EngineSpeedDspMeter == 5):
print("正确等于5")
# app.set_SysVar("seil_bianliang", "zj",5)
# app.set_SysVar("Engine", "EngineSpeedDspMeter", 3.0)
break
app.DoEvents()
time.sleep(5)
app.stop_Measurement()
app.close_cfg()
3、运行xml形式的testcase python进行调用
4、代码如下 Python_Canoe_Test_module_configuration.py
import time, os, msvcrt
from win32com.client import *
from win32com.client.connect import *
def DoEvents():
pythoncom.PumpWaitingMessages()
time.sleep(.1)
def DoEventsUntil(cond):
while not cond():
DoEvents()
class CanoeSync(object):
"""Wrapper class for CANoe Application object"""
Started = False
Stopped = False
ConfigPath = ""
def __init__(self):
app = DispatchEx('CANoe.Application')
app.Configuration.Modified = False
ver = app.Version
print('Loaded CANoe version ',
ver.major, '.',
ver.minor, '.',
ver.Build, '...', sep='')
self.App = app
self.Measurement = app.Measurement
self.Running = lambda: self.Measurement.Running
self.WaitForStart = lambda: DoEventsUntil(lambda: CanoeSync.Started)
self.WaitForStop = lambda: DoEventsUntil(lambda: CanoeSync.Stopped)
WithEvents(self.App.Measurement, CanoeMeasurementEvents)
def Start(self):
if not self.Running():
self.Measurement.Start()
self.WaitForStart()
def Stop(self):
if self.Running():
self.Measurement.Stop()
self.WaitForStop()
def Load(self, cfgPath):
# current dir must point to the script file
cfg = os.path.join(os.curdir, cfgPath)
cfg = os.path.abspath(cfg)
print('Opening: ', cfg)
self.ConfigPath = os.path.dirname(cfg)
self.Configuration = self.App.Configuration
self.App.Open(cfg)
def LoadTestSetup(self, testsetup):
self.TestSetup = self.App.Configuration.TestSetup
print(self.ConfigPath)
path = os.path.join(self.ConfigPath, testsetup)
print("传入的tse路径:", path)
# 如果目标 tse 已存在,直接读取,否则添加它,如果已经存在,直接add的话会报错
tse_count = self.TestSetup.TestEnvironments.Count
print("add前tse数量:",tse_count)
_existTse = False
for _index_tse in range(1, tse_count + 1):
if self.TestSetup.TestEnvironments.Item(_index_tse).FullName == path:
testenv = self.TestSetup.TestEnvironments.Item(_index_tse)
_existTse = True
break
if _existTse == False:
testenv = self.TestSetup.TestEnvironments.Add(path)
print("add后tse数量:", self.TestSetup.TestEnvironments.Count)
testenv = CastTo(testenv, "ITestEnvironment2")
# TestModules property to access the test modules
self.TestModules = []
self.TraverseTestItem(testenv, lambda tm: self.TestModules.append(CanoeTestModule(tm)))
def TraverseTestItem(self, parent, testf):
for test in parent.TestModules:
testf(test)
for folder in parent.Folders:
found = self.TraverseTestItem(folder, testf)
def RunTestModules(self):
""" starts all test modules and waits for all of them to finish"""
# start all test modules
for tm in self.TestModules:
tm.Start()
# wait for test modules to stop
while not all([not tm.Enabled or tm.IsDone() for tm in self.TestModules]):
DoEvents()
class CanoeTestModule:
"""Wrapper class for CANoe TestModule object"""
def __init__(self, tm):
self.tm = tm
self.Events = DispatchWithEvents(tm, CanoeTestEvents)
self.Name = tm.Name
self.IsDone = lambda: self.Events.stopped
self.Enabled = tm.Enabled
def Start(self):
if self.tm.Enabled:
self.tm.Start()
self.Events.WaitForStart()
class CanoeTestEvents:
"""Utility class to handle the test events"""
def __init__(self):
self.started = False
self.stopped = False
self.WaitForStart = lambda: DoEventsUntil(lambda: self.started)
self.WaitForStop = lambda: DoEventsUntil(lambda: self.stopped)
def OnStart(self):
self.started = True
self.stopped = False
print("<", self.Name, " started >")
def OnStop(self, reason):
self.started = False
self.stopped = True
print("<", self.Name, " stopped >")
class CanoeMeasurementEvents(object):
"""Handler for CANoe measurement events"""
def OnStart(self):
CanoeSync.Started = True
CanoeSync.Stopped = False
print("< measurement started >")
def OnStop(self):
CanoeSync.Started = False
CanoeSync.Stopped = True
print("< measurement stopped >")
def main():
Tester = CanoeSync()
Tester.Load(r'../Pressure_measurement_collection/Pressure_measurement_collection/CANoecfg/Pressure_measurement_collection.cfg')
Tester.LoadTestSetup("Pressure_measurement_collection.tse")
Tester.Start()
Tester.RunTestModules()
Tester.Stop()
if __name__ == "__main__":
main()
文章借鉴:Python调用CANoe(1)(启动与停止,变量和信号读写)-CSDN博客
作者:习惯有梅自傲举