编写复杂的信号发生器脚本需要结合高级控制逻辑、多线程/多进程管理、硬件协同以及自动化测试框架。以下从架构设计、关键功能实现和优化策略三个方面展开,并提供Python和C++的代码示例。
python
# 示例:分层架构(Python)
class
SignalGeneratorController:
"""高层控制:任务调度与错误处理"""
def
__init__(self, device):
self.device = device# 底层硬件接口
self.tasks = []# 任务队列
def
add_task(self, task):
self.tasks.append(task)
def
run(self):
for
task
in
self.tasks:
try:
task.execute(self.device)
except
Exception
as
e:
self._handle_failure(task, e)
class
SweepTask:
"""中层任务:频率扫描"""
def
__init__(self, start, stop, step, dwell_time):
self.params =
locals()
def
execute(self, device):
for
freq
in
np.arange(self.start,
self.stop,
self.step):
device.set_frequency(freq)
time.sleep(self.dwell_time)
class
ModulationTask:
"""中层任务:调制信号生成"""
def
__init__(self, mod_type, depth, rate):
self.params =
locals()
def
execute(self, device):
device.configure_modulation(**self.params)
python
from
enum
import
Enum
class
GeneratorState(Enum):
IDLE =
0
CONFIGURING =
1
RUNNING =
2
ERROR =
3
class
StatefulGenerator:
def
__init__(self):
self.state = GeneratorState.IDLE
def
transition(self, new_state):
if
self._is_valid_transition(new_state):
self.state = new_state
else:
raise
RuntimeError(f"非法状态转移:
{self.state}
->
{new_state}")
def
_is_valid_transition(self, new_state):
# 定义状态转移规则
transitions = {
GeneratorState.IDLE: [GeneratorState.CONFIGURING],
GeneratorState.CONFIGURING: [GeneratorState.RUNNING, GeneratorState.ERROR],
GeneratorState.RUNNING: [GeneratorState.IDLE, GeneratorState.ERROR],
GeneratorState.ERROR: [GeneratorState.IDLE]
}
return
new_state
in
transitions[self.state]
python
import
threading
class
MultiChannelGenerator:
def
__init__(self, device_map):
"""device_map: {通道名: 设备实例}"""
self.channels = device_map
self.lock = threading.Lock()
def
sync_channels(self, freq, amp_map):
"""同步设置多通道参数"""
with
self.lock:
for
ch_name, device
in
self.channels.items():
try:
device.set_frequency(freq)
device.set_amplitude(amp_map[ch_name])
except
Exception
as
e:
print(f"通道
{ch_name}
配置失败:
{e}")
# 可选:回退到安全状态
device.set_amplitude(0)
# 使用示例
devices = {"CH1": SigGen1(),
"CH2": SigGen2()}
generator = MultiChannelGenerator(devices)
generator.sync_channels(1e6, {"CH1": -10,
"CH2": -5})
python
class
DynamicSignalGenerator:
def
__init__(self, device):
self.device = device
self.pid = PIDController(kp=0.1, ki=0.01, kd=0.05)
def
track_reference(self, reference_signal):
"""根据参考信号动态调整输出"""
for
ref_value
in
reference_signal:
error = ref_value -
self.device.get_current_output()
correction =
self.pid.compute(error)
new_amp =
self.device.get_amplitude() + correction
self.device.set_amplitude(new_amp)
time.sleep(0.01)# 控制环路周期
python
import
json
class
TestSequenceExecutor:
def
__init__(self, device):
self.device = device
def
load_sequence(self, file_path):
with
open(file_path)
as
f:
self.sequence = json.load(f)
def
run_sequence(self):
for
step
in
self.sequence:
if
step["action"] ==
"set_freq":
self.device.set_frequency(step["value"])
elif
step["action"] ==
"sweep":
self._execute_sweep(step["start"], step["stop"], step["step"])
# 其他操作...
def
_execute_sweep(self, start, stop, step):
for
freq
in
np.arange(start, stop, step):
self.device.set_frequency(freq)
self._verify_output(freq)
def
_verify_output(self, expected_freq):
measured =
self.device.measure_output()
if
abs(measured - expected_freq) >
1e3:# 1kHz容差
raise
RuntimeError(f"频率校验失败! 期望:
{expected_freq}, 实际:
{measured}")
python
import
asyncio
class
AsyncSignalGenerator:
async
def
async_set_frequency(self, freq):
# 模拟异步设备通信
await
asyncio.sleep(0.1)# 替代实际I/O操作
print(f"频率设置为:
{freq}
Hz")
async
def
run_complex_sequence():
sg = AsyncSignalGenerator()
tasks = [
sg.async_set_frequency(1e6),
sg.async_set_frequency(2e6),
sg.async_set_frequency(3e6)
]
await
asyncio.gather(*tasks)# 并行执行
asyncio.run(run_complex_sequence())
cpp
// 高性能波形生成 (C++)
#include
#include
void
generate_complex_waveform
(double* buffer,
size_t
length,
double
freq,
double
sample_rate)
{
const
double
two_pi =
2.0
* M_PI;
for
(size_t
i =
0; i
double
t = i / sample_rate;
// 组合正弦波 + 调制信号
buffer[i] =
sin(two_pi * freq * t) +
0.3
*
sin(two_pi *
3
* freq * t);
}
}
// Python调用示例 (通过ctypes)
python
import
matplotlib.pyplot
as
plt
from
matplotlib.animation
import
FuncAnimation
class
RealTimePlotter:
def
__init__(self, device):
self.device = device
self.fig,
self.ax = plt.subplots()
self.x_data,
self.y_data = [], []
self.line, =
self.ax.plot([], [])
def
update(self, frame):
freq =
self.device.get_frequency()
power =
self.device.measure_power()
self.x_data.append(freq)
self.y_data.append(power)
self.line.set_data(self.x_data,
self.y_data)
self.ax.relim()
self.ax.autoscale_view()
return
self.line,
# 使用示例
plotter = RealTimePlotter(device)
ani = FuncAnimation(plotter.fig, plotter.update, interval=100)
plt.show()
硬件模拟器:
pythonclass MockSignalGenerator:def set_frequency(self, freq):print(f"[MOCK] 设置频率: {freq}")time.sleep(0.01) # 模拟延迟
参数边界检查:
pythondef validate_params(freq, amp):assert 1e3 <= freq <= 10e9, "频率超出范围 (1kHz-10GHz)"assert -100 <= amp <= 20, "幅度超出范围 (-100dBm-+20dBm)"
自动化回归测试:
python
import
unittest
class
TestSignalGenerator(unittest.TestCase):
def
setUp(self):
self.sg = MockSignalGenerator()
def
test_frequency_sweep(self):
self.sg.set_frequency(1e6)
self.assertEqual(self.sg.get_frequency(),
1e6)
python
import
numpy
as
np
import
time
from
collections
import
defaultdict
class
AdvancedSignalSystem:
def
__init__(self, device_map):
self.channels = device_map
self.history = defaultdict(list)# 记录操作日志
def
dynamic_modulation(self, modulation_type, params):
"""多通道动态调制"""
for
ch_name, device
in
self.channels.items():
try:
# 根据调制类型生成波形
if
modulation_type ==
"AM":
waveform =
self._generate_am(**params)
elif
modulation_type ==
"FM":
waveform =
self._generate_fm(**params)
# 实时下发波形
for
chunk
in
np.array_split(waveform,
10):
device.load_waveform(chunk)
self.history[ch_name].append({
"time": time.time(),
"action":
"load_waveform",
"points":
len(chunk)
})
except
Exception
as
e:
self._emergency_shutdown(ch_name)
raise
def
_generate_am(self, carrier_freq, mod_freq, depth):
t = np.linspace(0,
1,
1000)
carrier = np.sin(2
* np.pi * carrier_freq * t)
modulation =
1
+ depth * np.sin(2
* np.pi * mod_freq * t)
return
carrier * modulation
def
_emergency_shutdown(self, channel):
"""故障时安全关闭通道"""
self.channels[channel].set_amplitude(0)
print(f"紧急关闭通道:
{channel}")
# 使用示例
devices = {"TX1": SigGen1(),
"TX2": SigGen2()}
system = AdvancedSignalSystem(devices)
system.dynamic_modulation(
modulation_type="AM",
params={"carrier_freq":
10e6,
"mod_freq":
1e3,
"depth":
0.5}
)
SCHED_FIFO
或Windows的
Real-time Priority
)。
IDevice
接口)。
通过以上方法,可以构建出既能处理复杂信号生成任务,又具备高度可靠性的自动化控制系统。