在信号发生器编程软件中记录日志是调试和监控系统运行状态的关键手段,能够帮助开发者快速定位问题、追溯操作历史以及分析性能。以下是详细的日志记录方法及实践建议,涵盖日志设计原则、实现方式、高级功能和工具推荐。
根据信息的重要性和紧急程度,定义不同级别的日志:
示例场景:
每条日志应包含以下要素:
示例日志格式:
[2024-03-15 14:23:45.123] [Thread-1] [DEVICE_1234] INFO - Set frequency to 1000000Hz (Command: FREQ 1MHz)[2024-03-15 14:23:45.456] [Thread-1] [DEVICE_1234] ERROR - Failed to enable output: VISA timeout (Error code: -1073807339)
log_20240315.txt
)。
Python的
logging
模块支持多级别日志和灵活的输出格式。
基础实现:
python
import
logging
# 配置日志
logging.basicConfig(
level=logging.DEBUG,# 全局最低级别
format='[%(asctime)s] [%(threadName)s] [%(device_id)s] %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S.%f',
filename='signal_generator.log',
filemode='a'
# 追加模式
)
# 添加自定义字段(如设备ID)
class
DeviceFilter(logging.Filter):
def
__init__(self, device_id):
self.device_id = device_id
def
filter(self, record):
record.device_id =
self.device_id
return
True
# 使用示例
logger = logging.getLogger('SignalGenerator')
logger.addFilter(DeviceFilter('DEVICE_1234'))
logger.debug("Preparing to send command...")
try:
sg.write("FREQ 1MHz")
logger.info("Frequency set successfully")
except
Exception
as
e:
logger.error(f"Command failed:
{str(e)}", exc_info=True)# 记录堆栈
在多线程环境中,需确保日志写入是线程安全的。
解决方案:
QueueHandler
:将日志消息放入队列,由单独线程处理。
loguru
(内置异步支持)。
示例(QueueHandler):
python
import
logging.handlers
import
queue
import
threading
log_queue = queue.Queue()
def
queue_listener(queue):
while
True:
record = queue.get()
if
record
is
None:# 终止信号
break
logger = logging.getLogger(record.name)
logger.handle(record)
# 配置队列处理器
q_handler = logging.handlers.QueueHandler(log_queue)
root_logger = logging.getLogger()
root_logger.addHandler(q_handler)
root_logger.setLevel(logging.DEBUG)
# 启动监听线程
listener_thread = threading.Thread(target=queue_listener, args=(log_queue,))
listener_thread.daemon =
True
listener_thread.start()
# 线程中记录日志
def
worker():
logger = logging.getLogger('Worker')
logger.info("Thread started")
worker()
log_queue.put(None)# 终止监听线程
在每次与设备通信时自动记录请求和响应。
封装SCPI命令记录:
python
class
SCPILogger:
def
__init__(self, device, logger):
self.device = device
self.logger = logger
def
write(self, command):
self.logger.debug(f"Sending SCPI:
{command}")
self.device.write(command)
def
query(self, command):
self.logger.debug(f"Querying SCPI:
{command}")
response =
self.device.query(command)
self.logger.debug(f"Received response:
{response}")
return
response
# 使用示例
import
pyvisa
rm = pyvisa.ResourceManager()
sg = rm.open_resource("TCPIP0::192.168.1.1::INSTR")
logger = logging.getLogger('SCPI')
scpi_logger = SCPILogger(sg, logger)
scpi_logger.write("FREQ 1MHz")
response = scpi_logger.query("FREQ?")
VISA timeout
)。
示例(Python分析脚本):
python
import
re
from
collections
import
defaultdict
def
analyze_logs(log_file):
error_counts = defaultdict(int)
with
open(log_file,
'r')
as
f:
for
line
in
f:
if
'ERROR'
in
line:
match
= re.search(r'ERROR - (.*?):', line)
if
match:
error_type =
match.group(1)
error_counts[error_type] +=
1
return
error_counts
errors = analyze_logs('signal_generator.log')
print("Top errors:",
sorted(errors.items(), key=lambda
x: x[1], reverse=True))
示例(Prometheus指标):
python
from
prometheus_client
import
start_http_server, Counter, Histogram
# 定义指标
COMMAND_SUCCESS = Counter('scpi_commands_total',
'Total SCPI commands', ['status'])
COMMAND_LATENCY = Histogram('scpi_command_latency_seconds',
'SCPI command latency')
def
logged_query(scpi_logger, command):
start_time = time.time()
try:
response = scpi_logger.query(command)
latency = time.time() - start_time
COMMAND_SUCCESS.labels(status='success').inc()
COMMAND_LATENCY.observe(latency)
return
response
except
Exception
as
e:
COMMAND_SUCCESS.labels(status='failure').inc()
raise
start_http_server(8000)# 暴露指标端点
| 工具类型 | 推荐方案 |
|---|---|
| 基础日志库 |
Python
logging
、Java
Log4j
、C++
spdlog
|
| 结构化日志 |
loguru
(Python)、
JSONLogger
(自定义格式)
|
| 日志分析 |
logwatch
(命令行)、
Splunk
(企业级)
|
| 实时监控 |
Grafana
+
Loki
(日志聚合)、
ELK
(Elasticsearch+Logstash+Kibana)
|
| 异步日志 |
Python QueueHandler
、
ZeroMQ
(跨进程)
|
logrotate
(Linux)或
logging.handlers.RotatingFileHandler
防止日志文件过大。
完整示例(Python):
python
import
logging
import
logging.handlers
import
time
class
SignalGeneratorLogger:
def
__init__(self, device_id, log_file='signal_generator.log'
):
self.logger = logging.getLogger(f'SignalGenerator_{device_id}')
self.logger.setLevel(logging.DEBUG)
# 文件处理器(带轮转)
file_handler = logging.handlers.RotatingFileHandler(
log_file, maxBytes=10*1024*1024, backupCount=5
)
file_handler.setFormatter(logging.Formatter(
'[%(asctime)s] [%(threadName)s] [%(device_id)s] %(levelname)s - %(message)s'
))
self.logger.addHandler(file_handler)
# 控制台处理器(生产环境可移除)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
self.logger.addHandler(console_handler)
# 添加设备ID过滤器
class
DeviceFilter(logging.Filter):
def
filter(self, record):
record.device_id = device_id
return
True
self.logger.addFilter(DeviceFilter())
def
log_command(self, command, is_query=False, response=None
):
self.logger.debug(f"SCPI
{'Query'
if
is_query
else
'Command'}:
{command}")
if
is_query
and
response
is
not
None:
self.logger.debug(f"Response:
{response}")
# 使用示例
sg_logger = SignalGeneratorLogger('DEVICE_1234')
sg_logger.log_command("FREQ 1MHz")
response =
"FREQ 1000000Hz"
# 模拟设备响应
sg_logger.log_command("FREQ?", is_query=True, response=response)
通过以上方法,可以构建一个高效、可维护的日志系统,显著提升信号发生器编程软件的调试效率和运行可靠性。