资讯中心

联系我们

深圳市维立信电子科技有限公司
地址:深圳市福田区红荔路第一世界广场A座8D-E
咨询电话:0755-83766766
E-mail:info@welissom.com

如何编写更复杂的信号发生器脚本?

2025-10-11 10:21:52  点击:

编写复杂的信号发生器脚本需要结合高级控制逻辑多线程/多进程管理硬件协同以及自动化测试框架。以下从架构设计、关键功能实现和优化策略三个方面展开,并提供Python和C++的代码示例。


一、复杂脚本的核心架构

1. 分层设计

python
# 示例:分层架构(Python)
class SignalGeneratorController:
"""高层控制:任务调度与错误处理"""
def __init__(self, device):
self.device = device  # 底层硬件接口
self.tasks = []      # 任务队列

def add_task(self, task):
self.tasks.append(task)

def run(self):
for task in self.tasks:
try:
task.execute(self.device)
except Exception as e:
self._handle_failure(task, e)

class SweepTask:
"""中层任务:频率扫描"""
def __init__(self, start, stop, step, dwell_time):
self.params = locals()

def execute(self, device):
for freq in np.arange(self.start, self.stop, self.step):
device.set_frequency(freq)
time.sleep(self.dwell_time)

class ModulationTask:
"""中层任务:调制信号生成"""
def __init__(self, mod_type, depth, rate):
self.params = locals()

def execute(self, device):
device.configure_modulation(**self.params)

2. 状态机管理复杂流程

python
from enum import Enum

class GeneratorState(Enum):
IDLE = 0
CONFIGURING = 1
RUNNING = 2
ERROR = 3

class StatefulGenerator:
def __init__(self):
self.state = GeneratorState.IDLE

def transition(self, new_state):
if self._is_valid_transition(new_state):
self.state = new_state
else:
raise RuntimeError(f"非法状态转移: {self.state} -> {new_state}")

def _is_valid_transition(self, new_state):
# 定义状态转移规则
transitions = {
GeneratorState.IDLE: [GeneratorState.CONFIGURING],
GeneratorState.CONFIGURING: [GeneratorState.RUNNING, GeneratorState.ERROR],
GeneratorState.RUNNING: [GeneratorState.IDLE, GeneratorState.ERROR],
GeneratorState.ERROR: [GeneratorState.IDLE]
}
return new_state in transitions[self.state]

二、关键功能实现

1. 多通道协同控制

python
import threading

class MultiChannelGenerator:
def __init__(self, device_map):
"""device_map: {通道名: 设备实例}"""
self.channels = device_map
self.lock = threading.Lock()

def sync_channels(self, freq, amp_map):
"""同步设置多通道参数"""
with self.lock:
for ch_name, device in self.channels.items():
try:
device.set_frequency(freq)
device.set_amplitude(amp_map[ch_name])
except Exception as e:
print(f"通道 {ch_name} 配置失败: {e}")
# 可选:回退到安全状态
device.set_amplitude(0)

# 使用示例
devices = {"CH1": SigGen1(), "CH2": SigGen2()}
generator = MultiChannelGenerator(devices)
generator.sync_channels(1e6, {"CH1": -10, "CH2": -5})

2. 动态参数调整(如PID控制)

python
class DynamicSignalGenerator:
def __init__(self, device):
self.device = device
self.pid = PIDController(kp=0.1, ki=0.01, kd=0.05)

def track_reference(self, reference_signal):
"""根据参考信号动态调整输出"""
for ref_value in reference_signal:
error = ref_value - self.device.get_current_output()
correction = self.pid.compute(error)
new_amp = self.device.get_amplitude() + correction
self.device.set_amplitude(new_amp)
time.sleep(0.01)  # 控制环路周期

3. 自动化测试序列

python
import json

class TestSequenceExecutor:
def __init__(self, device):
self.device = device

def load_sequence(self, file_path):
with open(file_path) as f:
self.sequence = json.load(f)

def run_sequence(self):
for step in self.sequence:
if step["action"] == "set_freq":
self.device.set_frequency(step["value"])
elif step["action"] == "sweep":
self._execute_sweep(step["start"], step["stop"], step["step"])
# 其他操作...

def _execute_sweep(self, start, stop, step):
for freq in np.arange(start, stop, step):
self.device.set_frequency(freq)
self._verify_output(freq)

def _verify_output(self, expected_freq):
measured = self.device.measure_output()
if abs(measured - expected_freq) > 1e3:  # 1kHz容差
raise RuntimeError(f"频率校验失败! 期望: {expected_freq}, 实际: {measured}")

三、性能优化与高级技巧

1. 异步I/O与事件驱动

python
import asyncio

class AsyncSignalGenerator:
async def async_set_frequency(self, freq):
# 模拟异步设备通信
await asyncio.sleep(0.1)  # 替代实际I/O操作
print(f"频率设置为: {freq} Hz")

async def run_complex_sequence():
sg = AsyncSignalGenerator()
tasks = [
sg.async_set_frequency(1e6),
sg.async_set_frequency(2e6),
sg.async_set_frequency(3e6)
]
await asyncio.gather(*tasks)  # 并行执行

asyncio.run(run_complex_sequence())

2. 硬件加速(C++扩展)

cpp
// 高性能波形生成 (C++)
#include <vector>
#include <cmath>

void generate_complex_waveform(double* buffer, size_t length, double freq, double sample_rate) {
const double two_pi = 2.0 * M_PI;
for (size_t i = 0; i < length; ++i) {
double t = i / sample_rate;
// 组合正弦波 + 调制信号
buffer[i] = sin(two_pi * freq * t) + 0.3 * sin(two_pi * 3 * freq * t);
}
}

// Python调用示例 (通过ctypes)
/*
from ctypes import CDLL, POINTER, c_double, c_size_t

lib = CDLL("./waveform_generator.so")
lib.generate_complex_waveform.argtypes = [POINTER(c_double), c_size_t, c_double, c_double]

buffer = (c_double * 1024)()
lib.generate_complex_waveform(buffer, 1024, 1e6, 10e6)
*/

3. 实时数据可视化

python
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

class RealTimePlotter:
def __init__(self, device):
self.device = device
self.fig, self.ax = plt.subplots()
self.x_data, self.y_data = [], []
self.line, = self.ax.plot([], [])

def update(self, frame):
freq = self.device.get_frequency()
power = self.device.measure_power()
self.x_data.append(freq)
self.y_data.append(power)
self.line.set_data(self.x_data, self.y_data)
self.ax.relim()
self.ax.autoscale_view()
return self.line,

# 使用示例
plotter = RealTimePlotter(device)
ani = FuncAnimation(plotter.fig, plotter.update, interval=100)
plt.show()

四、调试与验证策略

  1. 硬件模拟器

    • 在无硬件时,用虚拟设备模拟响应:
    pythonclass MockSignalGenerator:def set_frequency(self, freq):print(f"[MOCK] 设置频率: {freq}")time.sleep(0.01)  # 模拟延迟
  2. 参数边界检查

    pythondef validate_params(freq, amp):assert 1e3 <= freq <= 10e9, "频率超出范围 (1kHz-10GHz)"assert -100 <= amp <= 20, "幅度超出范围 (-100dBm-+20dBm)"
  3. 自动化回归测试

    python
    import unittest

    class TestSignalGenerator(unittest.TestCase):
    def setUp(self):
    self.sg = MockSignalGenerator()

    def test_frequency_sweep(self):
    self.sg.set_frequency(1e6)
    self.assertEqual(self.sg.get_frequency(), 1e6)

五、完整示例:多通道动态调制系统

python
import numpy as np
import time
from collections import defaultdict

class AdvancedSignalSystem:
def __init__(self, device_map):
self.channels = device_map
self.history = defaultdict(list)  # 记录操作日志

def dynamic_modulation(self, modulation_type, params):
"""多通道动态调制"""
for ch_name, device in self.channels.items():
try:
# 根据调制类型生成波形
if modulation_type == "AM":
waveform = self._generate_am(**params)
elif modulation_type == "FM":
waveform = self._generate_fm(**params)

# 实时下发波形
for chunk in np.array_split(waveform, 10):
device.load_waveform(chunk)
self.history[ch_name].append({
"time": time.time(),
"action": "load_waveform",
"points": len(chunk)
})
except Exception as e:
self._emergency_shutdown(ch_name)
raise

def _generate_am(self, carrier_freq, mod_freq, depth):
t = np.linspace(0, 1, 1000)
carrier = np.sin(2 * np.pi * carrier_freq * t)
modulation = 1 + depth * np.sin(2 * np.pi * mod_freq * t)
return carrier * modulation

def _emergency_shutdown(self, channel):
"""故障时安全关闭通道"""
self.channels[channel].set_amplitude(0)
print(f"紧急关闭通道: {channel}")

# 使用示例
devices = {"TX1": SigGen1(), "TX2": SigGen2()}
system = AdvancedSignalSystem(devices)
system.dynamic_modulation(
modulation_type="AM",
params={"carrier_freq": 10e6, "mod_freq": 1e3, "depth": 0.5}
)

六、关键优化点总结

  1. 模块化设计
    • 将不同功能(如波形生成、设备控制、日志)解耦为独立模块。
  2. 实时性保障
    • 对关键操作使用实时线程优先级(Linux的SCHED_FIFO或Windows的Real-time Priority)。
  3. 资源管理
    • 使用RAII模式(C++)或上下文管理器(Python)确保硬件资源释放。
  4. 可扩展性
    • 通过插件式架构支持新型号设备(如定义统一的IDevice接口)。

通过以上方法,可以构建出既能处理复杂信号生成任务,又具备高度可靠性的自动化控制系统。