编写高效的信号发生器自动化测试脚本需要兼顾代码性能、可维护性和测试覆盖率,同时适应不同硬件接口和测试场景。以下是分步骤的详细指南,涵盖脚本设计、优化技巧和最佳实践:
python
class
SignalGeneratorDriver:
def
__init__(self, interface='LAN', ip='192.168.1.100'
):
self.conn =
self._connect(interface, ip)
def
_connect(self, interface, ip):
if
interface ==
'LAN':
return
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 其他接口实现...
def
set_frequency(self, freq):
self.conn.send(f"FREQ
{freq}Hzn".encode())
response =
self.conn.recv(1024)
if
"ERROR"
in
response:
raise
RuntimeError(f"Set frequency failed:
{response}")
python
class
FrequencySweepTest:
def
__init__(self, driver):
self.driver = driver
def
run(self, start, stop, step):
for
freq
in
range(start, stop, step):
self.driver.set_frequency(freq)
# 验证信号输出...
yaml# test_config.yamltest_cases:- name: "Frequency Sweep"params:start: 1kHzstop: 10MHzstep: 100kHz
argparse
或
PyYAML
动态加载配置:
pythonimport yamlwith open("test_config.yaml") as f:config = yaml.safe_load(f)
concurrent.futures
。
python
from
concurrent.futures
import
ThreadPoolExecutor
def
test_device(driver):
# 单台设备测试逻辑...
with
ThreadPoolExecutor(max_workers=4)
as
executor:
drivers = [SignalGeneratorDriver(ip=f"192.168.1.{i}")
for
i
in
range(1,
5)]
executor.map(test_device, drivers)
asyncio
。
python
import
asyncio
async
def
set_freq_async(driver, freq):
await
driver.conn.send(f"FREQ
{freq}Hzn".encode())
response =
await
driver.conn.recv(1024)
# 处理响应...
FREQ 1MHz; AMPL 1Vpp
),减少通信次数。
python
class
CachedDriver(SignalGeneratorDriver):
def
__init__(self):
super().__init__()
self._cache = {"freq":
None,
"ampl":
None}
def
set_frequency(self, freq):
if
self._cache["freq"] != freq:
super().set_frequency(freq)
self._cache["freq"] = freq
python
import
numpy
as
np
def
generate_sine_wave(freq, samples):
t = np.linspace(0,
1, samples)
return
np.sin(2
* np.pi * freq * t)# 向量化生成波形
python# 二进制协议示例(伪代码)def set_freq_binary(driver, freq):freq_bytes = freq.to_bytes(4, byteorder='big')driver.conn.send(b'x01' + freq_bytes) # 命令码0x01表示设置频率
python
import
time
import
random
def
send_command_with_retry(driver, cmd, max_retries=3
):
for
attempt
in
range(max_retries):
try:
driver.conn.send(cmd)
return
driver.conn.recv(1024)
except
(ConnectionError, TimeoutError)
as
e:
wait_time =
2
** attempt + random.uniform(0,
1)
time.sleep(wait_time)
raise
RuntimeError("Command failed after retries")
*TRG
命令)确保测试与信号生成同步。
pythondef verify_signal(actual, expected, tolerance=0.01):return abs(actual - expected) <= tolerance * expected
logging
模块记录不同级别日志(DEBUG/INFO/ERROR),便于问题定位。
python
import
logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("test.log"),
logging.StreamHandler()
]
)
python
import
socket
import
logging
from
concurrent.futures
import
ThreadPoolExecutor
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class
SignalGeneratorDriver:
def
__init__(self, ip):
self.ip = ip
self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.conn.connect((ip,
5025))# 默认SCPI端口
def
set_frequency(self, freq):
cmd =
f"FREQ
{freq}Hzn".encode()
self.conn.send(cmd)
response =
self.conn.recv(1024).decode()
if
"ERROR"
in
response:
logger.error(f"Set freq failed on
{self.ip}:
{response}")
raise
RuntimeError(response)
logger.info(f"Set freq
{freq}Hz on
{self.ip}")
def
run_test(driver, freq):
try:
driver.set_frequency(freq)
# 其他测试逻辑...
except
Exception
as
e:
logger.error(f"Test failed for
{driver.ip}:
{str(e)}")
if
__name__ ==
"__main__":
device_ips = ["192.168.1.101",
"192.168.1.102"]
frequencies = [1e6,
2e6,
5e6]# 1MHz, 2MHz, 5MHz
with
ThreadPoolExecutor(max_workers=len(device_ips))
as
executor:
for
ip
in
device_ips:
driver = SignalGeneratorDriver(ip)
for
freq
in
frequencies:
executor.submit(run_test, driver, freq)
| 优化方向 | 具体方法 |
|---|---|
| 代码结构 | 分层架构(驱动/逻辑/报告)、参数化配置 |
| 性能 | 异步I/O、批量操作、NumPy向量化计算 |
| 通信 | 二进制协议、压缩数据、错误重试 |
| 数据验证 | 动态阈值检查、统计验证 |
| 调试 | 分级日志、远程调试 |
通过以上方法,可显著提升脚本的执行效率、可维护性和测试覆盖率,适应从实验室研发到工业产线的多样化需求。