编写复杂的信号发生器脚本需要结合高级控制逻辑、多线程/多进程管理、硬件协同以及自动化测试框架。以下从架构设计、关键功能实现和优化策略三个方面展开,并提供Python和C++的代码示例。
一、复杂脚本的核心架构
1. 分层设计
| # 示例:分层架构(Python) |
| class SignalGeneratorController: |
| """高层控制:任务调度与错误处理""" |
| def __init__(self, device): |
| self.device = device |
| self.tasks = [] |
|
| def add_task(self, task): |
| self.tasks.append(task) |
|
| def run(self): |
| for task in self.tasks: |
| try: |
| task.execute(self.device) |
| except Exception as e: |
| self._handle_failure(task, e) |
|
| class SweepTask: |
| """中层任务:频率扫描""" |
| def __init__(self, start, stop, step, dwell_time): |
| self.params = locals() |
|
| def execute(self, device): |
| for freq in np.arange(self.start, self.stop, self.step): |
| device.set_frequency(freq) |
| time.sleep(self.dwell_time) |
|
| class ModulationTask: |
| """中层任务:调制信号生成""" |
| def __init__(self, mod_type, depth, rate): |
| self.params = locals() |
|
| def execute(self, device): |
| device.configure_modulation(**self.params) |
2. 状态机管理复杂流程
| from enum import Enum |
|
| class GeneratorState(Enum): |
| IDLE = 0 |
| CONFIGURING = 1 |
| RUNNING = 2 |
| ERROR = 3 |
|
| class StatefulGenerator: |
| def __init__(self): |
| self.state = GeneratorState.IDLE |
|
| def transition(self, new_state): |
| if self._is_valid_transition(new_state): |
| self.state = new_state |
| else: |
| raise RuntimeError(f"非法状态转移: {self.state} -> {new_state}") |
|
| def _is_valid_transition(self, new_state): |
| # 定义状态转移规则 |
| transitions = { |
| GeneratorState.IDLE: [GeneratorState.CONFIGURING], |
| GeneratorState.CONFIGURING: [GeneratorState.RUNNING, GeneratorState.ERROR], |
| GeneratorState.RUNNING: [GeneratorState.IDLE, GeneratorState.ERROR], |
| GeneratorState.ERROR: [GeneratorState.IDLE] |
| } |
| return new_state in transitions[self.state] |
二、关键功能实现
1. 多通道协同控制
| import threading |
|
| class MultiChannelGenerator: |
| def __init__(self, device_map): |
| """device_map: {通道名: 设备实例}""" |
| self.channels = device_map |
| self.lock = threading.Lock() |
|
| def sync_channels(self, freq, amp_map): |
| """同步设置多通道参数""" |
| with self.lock: |
| for ch_name, device in self.channels.items(): |
| try: |
| device.set_frequency(freq) |
| device.set_amplitude(amp_map[ch_name]) |
| except Exception as e: |
| print(f"通道 {ch_name} 配置失败: {e}") |
| # 可选:回退到安全状态 |
| device.set_amplitude(0) |
|
| # 使用示例 |
| devices = {"CH1": SigGen1(), "CH2": SigGen2()} |
| generator = MultiChannelGenerator(devices) |
| generator.sync_channels(1e6, {"CH1": -10, "CH2": -5}) |
2. 动态参数调整(如PID控制)
| class DynamicSignalGenerator: |
| def __init__(self, device): |
| self.device = device |
| self.pid = PIDController(kp=0.1, ki=0.01, kd=0.05) |
|
| def track_reference(self, reference_signal): |
| """根据参考信号动态调整输出""" |
| for ref_value in reference_signal: |
| error = ref_value - self.device.get_current_output() |
| correction = self.pid.compute(error) |
| new_amp = self.device.get_amplitude() + correction |
| self.device.set_amplitude(new_amp) |
| time.sleep(0.01) |
3. 自动化测试序列
| import json |
|
| class TestSequenceExecutor: |
| def __init__(self, device): |
| self.device = device |
|
| def load_sequence(self, file_path): |
| with open(file_path) as f: |
| self.sequence = json.load(f) |
|
| def run_sequence(self): |
| for step in self.sequence: |
| if step["action"] == "set_freq": |
| self.device.set_frequency(step["value"]) |
| elif step["action"] == "sweep": |
| self._execute_sweep(step["start"], step["stop"], step["step"]) |
| # 其他操作... |
|
| def _execute_sweep(self, start, stop, step): |
| for freq in np.arange(start, stop, step): |
| self.device.set_frequency(freq) |
| self._verify_output(freq) |
|
| def _verify_output(self, expected_freq): |
| measured = self.device.measure_output() |
| if abs(measured - expected_freq) > 1e3: |
| raise RuntimeError(f"频率校验失败! 期望: {expected_freq}, 实际: {measured}") |
三、性能优化与高级技巧
1. 异步I/O与事件驱动
| import asyncio |
|
| class AsyncSignalGenerator: |
| async def async_set_frequency(self, freq): |
| # 模拟异步设备通信 |
| await asyncio.sleep(0.1) |
| print(f"频率设置为: {freq} Hz") |
|
| async def run_complex_sequence(): |
| sg = AsyncSignalGenerator() |
| tasks = [ |
| sg.async_set_frequency(1e6), |
| sg.async_set_frequency(2e6), |
| sg.async_set_frequency(3e6) |
| ] |
| await asyncio.gather(*tasks) |
|
| asyncio.run(run_complex_sequence()) |
2. 硬件加速(C++扩展)
| // 高性能波形生成 (C++) |
| #include <vector> |
| #include <cmath> |
|
| void generate_complex_waveform(double* buffer, size_t length, double freq, double sample_rate) { |
| const double two_pi = 2.0 * M_PI; |
| for (size_t i = 0; i < length; ++i) { |
| double t = i / sample_rate; |
| // 组合正弦波 + 调制信号 |
| buffer[i] = sin(two_pi * freq * t) + 0.3 * sin(two_pi * 3 * freq * t); |
| } |
| } |
|
| // Python调用示例 (通过ctypes) |
| /* |
| from ctypes import CDLL, POINTER, c_double, c_size_t |
|
| lib = CDLL("./waveform_generator.so") |
| lib.generate_complex_waveform.argtypes = [POINTER(c_double), c_size_t, c_double, c_double] |
|
| buffer = (c_double * 1024)() |
| lib.generate_complex_waveform(buffer, 1024, 1e6, 10e6) |
| */ |
3. 实时数据可视化
| import matplotlib.pyplot as plt |
| from matplotlib.animation import FuncAnimation |
|
| class RealTimePlotter: |
| def __init__(self, device): |
| self.device = device |
| self.fig, self.ax = plt.subplots() |
| self.x_data, self.y_data = [], [] |
| self.line, = self.ax.plot([], []) |
|
| def update(self, frame): |
| freq = self.device.get_frequency() |
| power = self.device.measure_power() |
| self.x_data.append(freq) |
| self.y_data.append(power) |
| self.line.set_data(self.x_data, self.y_data) |
| self.ax.relim() |
| self.ax.autoscale_view() |
| return self.line, |
|
| # 使用示例 |
| plotter = RealTimePlotter(device) |
| ani = FuncAnimation(plotter.fig, plotter.update, interval=100) |
| plt.show() |
四、调试与验证策略
硬件模拟器:
pythonclass MockSignalGenerator:def set_frequency(self, freq):print(f"[MOCK] 设置频率: {freq}")time.sleep(0.01) # 模拟延迟
参数边界检查:
pythondef validate_params(freq, amp):assert 1e3 <= freq <= 10e9, "频率超出范围 (1kHz-10GHz)"assert -100 <= amp <= 20, "幅度超出范围 (-100dBm-+20dBm)"
自动化回归测试:
| import unittest |
|
| class TestSignalGenerator(unittest.TestCase): |
| def setUp(self): |
| self.sg = MockSignalGenerator() |
|
| def test_frequency_sweep(self): |
| self.sg.set_frequency(1e6) |
| self.assertEqual(self.sg.get_frequency(), 1e6) |
五、完整示例:多通道动态调制系统
| import numpy as np |
| import time |
| from collections import defaultdict |
|
| class AdvancedSignalSystem: |
| def __init__(self, device_map): |
| self.channels = device_map |
| self.history = defaultdict(list) |
|
| def dynamic_modulation(self, modulation_type, params): |
| """多通道动态调制""" |
| for ch_name, device in self.channels.items(): |
| try: |
| # 根据调制类型生成波形 |
| if modulation_type == "AM": |
| waveform = self._generate_am(**params) |
| elif modulation_type == "FM": |
| waveform = self._generate_fm(**params) |
|
| # 实时下发波形 |
| for chunk in np.array_split(waveform, 10): |
| device.load_waveform(chunk) |
| self.history[ch_name].append({ |
| "time": time.time(), |
| "action": "load_waveform", |
| "points": len(chunk) |
| }) |
| except Exception as e: |
| self._emergency_shutdown(ch_name) |
| raise |
|
| def _generate_am(self, carrier_freq, mod_freq, depth): |
| t = np.linspace(0, 1, 1000) |
| carrier = np.sin(2 * np.pi * carrier_freq * t) |
| modulation = 1 + depth * np.sin(2 * np.pi * mod_freq * t) |
| return carrier * modulation |
|
| def _emergency_shutdown(self, channel): |
| """故障时安全关闭通道""" |
| self.channels[channel].set_amplitude(0) |
| print(f"紧急关闭通道: {channel}") |
|
| # 使用示例 |
| devices = {"TX1": SigGen1(), "TX2": SigGen2()} |
| system = AdvancedSignalSystem(devices) |
| system.dynamic_modulation( |
| modulation_type="AM", |
| params={"carrier_freq": 10e6, "mod_freq": 1e3, "depth": 0.5} |
| ) |
六、关键优化点总结
- 模块化设计:
- 将不同功能(如波形生成、设备控制、日志)解耦为独立模块。
- 实时性保障:
- 对关键操作使用实时线程优先级(Linux的
SCHED_FIFO或Windows的Real-time Priority)。
- 资源管理:
- 使用RAII模式(C++)或上下文管理器(Python)确保硬件资源释放。
- 可扩展性:
- 通过插件式架构支持新型号设备(如定义统一的
IDevice接口)。
通过以上方法,可以构建出既能处理复杂信号生成任务,又具备高度可靠性的自动化控制系统。