高级教程
开放验证平台工具链的高级特性教程。picker目前已经支持使用vcs进行仿真,相关文档还在完善中
回调函数
利用回调处理电路事件
异步编程
利用异步模式简化回调
消息驱动
利用消息对电路和软件激励进行解耦
验证框架
MLVP 验证框架。
回调函数
利用回调处理电路事件回调
概述
硬件描述语言不同于C++/Python等高级软件编程语言,具有独特的“时钟”特性,在使用picker工具进行验证时,我们可能会遇到想在时钟的上升沿触发某种操作的情况,要想实现这个过程,就要会使用回调函数
- 回调函数就是一个被作为参数传递的函数。
- 在C语言中,回调函数只能使用函数指针实现。
- 在C++、Python、ECMAScript等更现代的编程语言中还可以使用仿函数或匿名函数。
回调函数是一个函数或过程,不过它是一个由调用方自己实现,供被调用方使用的特殊函数,一般使用方法如下
- 在a()方法中调用了b()方法
- 在b方法执行完毕主动调用提供的callback()方法
这个下面的例子中,实现了一个简单的callback 示例,我们定一个了一个打印结果的方法 print_result,一个两数相加的方法add (), 当完成add 后,调用 print_result()方法将结果打印出来
def add(x, y):
return x + y
def sub(x, y):
return x - y
def mul(x, y):
return x * y
def div(x, y):
return x / y
def calc(x, y, func):
return func(x, y)
# 将函数作为参数传入,再调用函数
print(calc(1, 2, add))
>>> 3
回调函数的优点
优点
- 回调函数的作用是将代码逻辑分离出来使得代码更加模块化和可维护。。
- 提高代码的复用性和灵活性:回调函数可以将一个函数作为参数传递给另一个函数,从而实现模块化编程,提高代码的复用性和灵活性。
- 解耦合:回调函数可以将不同模块之间的关系解耦,使得代码更易于维护和扩展。
- 可以异步执行:回调函数可以在异步操作完成后被执行,这样避免了阻塞线程,提高应用程序的效率。
例如在下面这个例子中,我们定义了两个回调函数addOne和addTwo,一个是生成x+1,另一个是生成x+2,还有一个生成倒数的中间函数
- 我们可以通过一个中间函数,来分别调用addOne和addTwo来生成形如1/(x+1)和1/(x+2)形式的数
- 也可以使用匿名函数的形式生成1/(x+3)形式的数
def addOne(x):
return x + 1
def addTwo(x):
return x + 2
from even import *
# 中间函数
# 接受一个回调函数作为参数,返回它的倒数
def getNumber(k, getEvenNumber):
return 1 / getEvenNumber(k)
if __name__ == "__main__":
x = 1
# 当需要生成一个1/(x+1)形式的数时
print(getNumber(x, addOne))
# 当需要一个1/(x+2)形式的数时
print(getNumber(x, addTwo))
# 当需要一个1/(x+3)形式数
print(getNumber(x, lambda k: k +3))
回调函数的使用场景包括
- 事件处理:回调函数可以用于处理各种事件,例如鼠标点击、键盘输入、网络请求等。
- 异步操作:回调函数可以用于异步操作,例如读取文件、发送邮件、下载文件等。
- 数据处理:回调函数可以用于处理数据,例如对数组进行排序、过滤、映射等。
使用回调函数在时钟上升沿触发操作
下面的测试代码里,我们将在随机数生成器的测试中使用回调函数。
在整个测试过程中,我们会在114514个时钟周期内验证随机数生成器的结果,并统计生成的随机数中大于中位数和小于等于中位数的数量。其中,结果的验证和数据的统计都在时钟上升沿进行。
TestRandomGenerator
是对随机数生成器进行测试的类,在它的属性和方法中:
self.dut
是用于测试的实例化DUTRandomGenerator
对象。self.ref
是用于验证结果的实例化LFSR_16
对象。callback1(self, clk)
会对随机数生成器进行验证,在时钟上升沿触发。callback2(self, clk)
会统计生成随机数的分布,也在时钟上升沿触发。test_rg(self, callback3)
方法会执行整个测试流程,最后执行callback3
函数。
Picker生成的DUT类会包含一个驱动电路的时钟源self.xclock
,DUTRandomGenerator
也同样如此。
测试前需要把待测模块的clk
引脚接入时钟源,我们可以调用dut
中已经封装好的方法:
self.dut.init_clock("clk") # 等价于 self.xclock.Add(self.port["clk"])
之后再对生成器进行复位,进行初始化赋值:
self.dut.reset.value = 1
self.dut.Step(1) # 时钟等待一个周期,下个周期dut的输出会置为0
self.dut.reset.value = 0 # 设置完成后需要记得复位原信号!
完成初始化后,添加在时钟上升沿触发的回调函数,用于验证与统计:
self.dut.StepRis(self.callback1) # 添加在时钟上升沿触发的回调函数
self.dut.StepRis(self.callback2) # 当然可也添加多个
然后等待时钟经过114514个周期,期间每个时钟的上升沿会对结果进行验证并统计生成随机数的分布:
self.dut.Step(114514)
最后进行收尾工作,以回调函数的形式调用median_distribution_stats
输出随机数的分布情况:
self.dut.finalize()
callback3(self.greater, self.less_equal, self.MEDIAN)
至此,测试完成。
随机数生成器测试代码
from UT_RandomGenerator import *
import random
def median_distribution_stats(gt, le, mid) -> None:
# 输出产生结果中大于中位数的个数和小于等于中位数的个数。
print(f"There are {gt} numbers > {mid} and {le} numbers <= {mid}")
# 16位线性移位寄存器模拟类
class LFSR_16:
def __init__(self, seed):
self.state = seed & ((1 << 16) - 1)
def step(self):
new_bit = (self.state >> 15) ^ (self.state >> 14) & 1
self.state = ((self.state << 1) | new_bit) & ((1 << 16) - 1)
class TestRandomGenerator:
def __init__(self) -> None:
self.MEDIAN = 2**15
self.SEED = random.randint(0, 2**16 - 1)
self.greater = 0
self.less_equal = 0
self.ref = LFSR_16(self.SEED)
self.dut = DUTRandomGenerator()
def test_rg(self, callback3) -> None:
# clk引脚接入时钟源
self.dut.init_clock("clk")
self.dut.seed.value = self.SEED
# 复位操作
self.dut.reset.value = 1
self.dut.Step(1) # 时钟等待一个周期,下个周期dut的输出会置为0
self.dut.reset.value = 0 # 设置完成后需要记得复位原信号!
# 设置回调函数
self.dut.StepRis(self.callback1) # 添加在时钟上升沿触发的回调函数
self.dut.StepRis(self.callback2) # 当然可也添加多个
# 测试,启动!
self.dut.Step(114514) # 等待时钟经过114514个周期
# 结束
self.dut.finalize()
callback3(self.greater, self.less_equal, self.MEDIAN)
pass
def callback1(self, clk):
# 比对结果是否符合预期
assert self.dut.random_number.value == self.ref.state, "Mismatch"
print(
f"Cycle {clk}, DUT: {self.dut.random_number.value:x},"
+ f" REF: {self.ref.state:x}"
)
self.ref.step()
def callback2(self, clk):
# 统计产生的随机数中,大于中位数和小于等于中位数的分布
if self.dut.random_number.value > self.MEDIAN:
self.greater += 1
else:
self.less_equal += 1
if __name__ == "__main__":
TestRandomGenerator().test_rg(median_distribution_stats)
pass
在验证加法器时添加回调函数
在这里定义一个32位的加法器RisAdder
,它只在时钟上升沿更新输出。RTL代码为:
module RisAdder #(
parameter WIDTH = 32
) (
input clk,
input [WIDTH-1:0] a,
input [WIDTH-1:0] b,
input cin,
output [WIDTH-1:0] sum,
output cout
);
reg Cout;
reg [WIDTH-1:0] Sum;
always @(posedge clk) begin
{Cout, Sum} <= a + b + cin;
end
assign {cout, sum} = {Cout, Sum};
endmodule
下面的测试代码里,我们将在加法器的测试中使用回调函数。
在测试开始前,先创建DUTRisAdder
对象的实例dut
和SimpleRisAdder
对象的实例ref
。其中,ref
用于模拟加法器预期的行为,用作参考加法器。
Picker生成的DUT类会包含一个驱动电路的时钟源self.xclock
,DUTRisAdder
也同样如此。
在测试开始前,我们会把待测模块的clk
引脚接入时钟源:
dut.init_clock("clk") # 等价于 self.xclock.Add(self.port[name])
之给加法器的输入置为0,让它下个周期的输出为0:
dut.a.value = 0
dut.b.value = 0
dut.cin.value = 0
dut.Step(1) # 等待时钟进入下个周期
随后,添加在时钟上升沿触发的回调函数test_adder(clk: int, dut: DUTRisAdder, ref: SimpleRisAdder) -> None
,并向test_adder
传入dut
和ref
对象:
# 测试函数,验证加法器的输出是否正确
def test_adder(clk: int, dut: DUTRisAdder, ref: SimpleRisAdder) -> None:
# 获取加法器的输入和输出
a = dut.a.value
b = dut.b.value
cin = dut.cin.value
cout = dut.cout.value
sum = dut.sum.value
# 检查加法器的输出是否与预期一致
isEqual = (cout, sum) == (ref.cout, ref.sum)
# 输出测试结果
print(f"Cycle: {clk}, Input(a, b, cin) = ({a:x}, {b:x}, {cin:x})")
print(
FONT_GREEN + "Pass." # 输出绿色的“Pass.”,如果测试通过
if isEqual
else FONT_RED + f"MisMatch! Expect cout: {ref.cout:x}, sum: {ref.sum:x}." +
FONT_COLOR_RESET + f"Get cout: {cout:x}, sum: {sum:x}."
)
assert isEqual # 如果测试失败,触发断言异常
if __name__ == "__main__":
...
dut.StepRis(test_adder, (dut, ref)) # 添加在时钟上升沿触发的回调函数, 给回调函数传入dut和ref
...
test_adder
函数将会在时钟上升沿比较dut
和ref
的输出,验证RTL代码的实现是否符合我们的预期。
最后,执行114514个周期的测试,每个测试数据的信号都会持续一个周期:
# 测试114514个周期
for _ in range(114514):
a = random.randint(0, (1<<WIDTH) - 1)
b = random.randint(0, (1<<WIDTH) - 1)
cin = random.randint(0, 1)
dut.a.value = a
dut.b.value = b
dut.cin.value = cin
ref.step(a, b, cin) # 更新参考加法器的状态
dut.Step(1) # 等待时钟进入下个周期
dut.finalize()
上升沿更新的加法器的代码
from UT_RisAdder import *
import random
# 控制字体颜色
FONT_GREEN = "\033[0;32m"
FONT_RED = "\033[0;31m"
FONT_COLOR_RESET = "\033[0m"
class SimpleRisAdder:
"""
SimpleRisAdder 类是一个作为参考的加法器类,
它模拟了我们预期的RisAdder的行为
"""
def __init__(self, width) -> None:
self.WIDTH = width # 加法器的位宽
# 端口定义
self.a = 0 # 输入端口a
self.b = 0 # 输入端口b
self.cin = 0 # 输入端口cin
self.cout = 0 # 输出端口cout
self.sum = 0 # 输出端口sum
def step(self, a, b, cin):
"""
模拟上升沿更新输出: 先用上个周期的输入更新输出,之后再更新输入
"""
sum = self.a + self.b + self.cin
self.cout = sum >> self.WIDTH # 计算进位
self.sum = sum & ((1 << self.WIDTH) - 1) # 计算和
self.a = a # 更新输入a
self.b = b # 更新输入b
self.cin = cin # 更新输入cin
# 测试函数,验证加法器的输出是否正确
def test_adder(clk: int, dut: DUTRisAdder, ref: SimpleRisAdder) -> None:
# 获取加法器的输入和输出
a = dut.a.value
b = dut.b.value
cin = dut.cin.value
cout = dut.cout.value
sum = dut.sum.value
# 验证输出是否符合预期
isEqual = (cout, sum) == (ref.cout, ref.sum)
# 输出测试结果
print(f"Cycle: {clk}, Input(a, b, cin) = ({a:x}, {b:x}, {cin:x})")
print(
FONT_GREEN + "Pass."
if isEqual
else FONT_RED + f"MisMatch! Expect cout: {ref.cout:x}, sum: {ref.sum:x}.",
FONT_COLOR_RESET + f"Get cout: {cout:x}, sum: {sum:x}.",
)
assert isEqual # 如果测试失败,触发断言异常
if __name__ == "__main__":
WIDTH = 32 # 设置加法器的位宽
ref = SimpleRisAdder(WIDTH) # 创建一个参考加法器
dut = DUTRisAdder() # 创建被测试的加法器
# 绑定时钟信号
dut.init_clock("clk") # 等价于 self.xclock.Add(self.port[name])
# dut输入信号置0
dut.a.value = 0
dut.b.value = 0
dut.cin.value = 0
dut.Step(1) # 等待时钟进入下个周期
dut.StepRis(test_adder, (dut, ref)) # 添加在时钟上升沿触发的回调函数, 给回调函数传入dut和ref
# 测试114514个周期
for _ in range(114514):
# 随机生成输入
a = random.randint(0, (1<<WIDTH) - 1)
b = random.randint(0, (1<<WIDTH) - 1)
cin = random.randint(0, 1)
ref.step(a, b, cin) # 更新参考加法器的状态
dut.a.value = a # 设置被测试加法器的输入a
dut.b.value = b # 设置被测试加法器的输入b
dut.cin.value = cin # 设置被测试加法器的输入cin
dut.Step(1) # 等待时钟进入下个周期
dut.finalize()
pass
Eventloop
概述
Event Loop:事件循环机制是一种计算机编程模型,其目的是使程序能够在一种非阻塞方式下等待事件(如
输入、计时器、定时器、网络
等)的发生,并在发生事件时被通知及时处理事件,用于等待和分配消息和事件,单线程运行时不会阻塞的一种机制,也就是实现异步的原理。作为一种单线程语言,事件循环机制的核心是事件循环,即程序会轮询事件队列中是否有待处理事件,如果有,就执行相应的回调函数来处理该事件。然后继续等待下一个事件。事件可以是来自外部资源(如网络套接字、文件、定时器等)的输入、用户输入、系统通知等。由此,程序就可以实现异步、非阻塞的编程方式,提高程序的响应速度和运行效率.
基本原理
事件循环的工作流程通常如下:
- 启动程序,执行同步代码直到遇到异步代码,
- 将异步代码的回调函数放入事件队列中,以便在事件发生时执行。
- 当所有同步代码执行完毕,开始事件循环,不断检查是否有事件发生。
- 如果有事件队列不为空,则执行与之关联的回调函数。
- 回到步骤 4,继续循环处理事件。 伪代码的形式如下
while(1) {
events = getEvents();
for (e in events)
processEvent(e);
}
Python中的Evenloop
python中的Asyncio模块提供了以下方法来管理事件循环
- loop = get_event_loop() : 得到当前的事件循环。
- asyncio.set_event_loop() : 为当前上下文设置事件循环。
- asyncio.new_event_loop() : 根据此策略创建一个新的事件循环并返回。
- loop.call_at():在指定时间的运行。
- loop.call_later(delay, callback, arg) : 延迟delay 秒再执行 callback 方法。
- loop.call_soon(callback, argument) : 尽可能快调用 callback方法, call_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback 。
- loop.time() : 返回当前事件循环的内部时间。
- loop.run_forever() : 在调用 stop() 之前将一直运行。
在下面的例子中,我们定义了一个callback方法用于打印参数和loop内时间,以观察函数的定义顺序和执行顺序
- 在main方法中,首先我们先获取当前的事件循环loop,和当前的时间
- 依次调用callback方法,设置不同的开始执行时间
import asyncio
def callback(a, loop):
print("我的参数为 {0},执行的时间为{1}".format(a,loop.time()))
if __name__ == "__main__":
try:
loop = asyncio.get_event_loop()
now = loop.time()
loop.call_later(5, callback, 5, loop)
loop.call_at(now+2, callback, 2, loop)
loop.call_at(now+1, callback, 1, loop)
loop.call_at(now+3, callback, 3, loop)
loop.call_soon(callback, 4, loop)
loop.run_forever() #要用这个run_forever运行
except KeyboardInterrupt:
print("Goodbye!")
运行结果为:
我的参数为 4,执行的时间为266419.843
我的参数为 1,执行的时间为266420.843
我的参数为 2,执行的时间为266421.859
我的参数为 3,执行的时间为266422.859
我的参数为 5,执行的时间为266424.843
回调函数和Eventloop的缺点
回调函数也存在如下的一些缺点,因此在下一小节中引入了异步的概念, 缺点:
- 回调函数嵌套过多会导致代码难以维护:如果回调函数嵌套层数过多,代码会变得非常复杂,难以维护。
- 回调函数容易造成竞态条件:如果回调函数中有共享资源访问,容易出现竞态条件,导致程序出错。
- 代码可读性差:回调函数的使用可能会破坏代码的结构和可读性,尤其是在处理大量数据时
异步编程
概述
为什么要引入异步编程?
上一节中我们学习了如何使用回调函数,但是在使用回调函数时可能会遇到回调地狱,即如果回调嵌套过多会导致代码会变得非常复杂,并且难以,因此我们可以通过异步(async,await)的方式来避免这种情况,使用异步,可以使代码结构变得清晰,过await关键字,可以使得异步操作按顺序执行,而不需要通过回调函数来管理执行顺序。
实现原理
在python的asyncio中异步编程的实现基于以下三个核心概念,我们会在下一小节进行更详细的介绍
- 回调函数(Callback) 回调函数预先注册的回是异步编程的基础。当一个任务完成时,系统会调用调函数来处理任务的结果。通过回调函数的方式,程序可以在等待任务完成的同时继续执行其他任务,提高了程序的并发性。
- 事件循环(Event Loop) 事件循环是异步编程的核心机制之一。它负责监听各种事件(如用户输入、I/O 操作等),当事件发生时,触发相应的回调函数进行处理。事件循环通过不断地轮询事件队列,实现了非阻塞式的任务处理。
- 协程 其中协程就是用户自己定义的任务
常见的异步编程框架和工具
为了方便开发者进行异步编程,有许多优秀的框架和工具可供选择。以下是一些常见的异步编程框架和工具:
- Asyncio Asyncio 是 Python 的一个强大的异步编程框架,提供了高效的协程(Coroutine)支持。它可以用于编写并发性能优秀的网络应用、爬虫程序等。
- Node. Js Node. Js 是基于 Chrome V 8 引擎构建的 JavaScript 运行时环境,天生支持非阻塞 I/O 操作。它在 Web 开发领域广泛应用,尤其擅长处理高并发的实时应用。
- RxJava RxJava 是一个基于观察者模式和迭代器模式的异步编程库。它为 Java 开发者提供了丰富的操作符和组合方式,简化了异步编程的复杂性。
在python中使用异步,需要使用async和await两个关键字
- async:用于定义异步函数,在异步函数中,通常需要包含异步操作
- await:用于在异步函数中等待异步操作的完成
下面是一个简单的python代码,来演示async和await关键字的用法
async def my_async_function():
print("Start async_function and wait some funcion ")
await some_other_async_function()
print("End of my_async_function")
在python中要想实现异步,通常使用asyncio模块,在下面的例子中,我们定义了一个greet函数,分别打印Hello+name和Goodbye+name,两次打印中间间隔2s.使用asyncio.create创建两个异步任务,并收集执行结果
- asyncio.create_task():用于创建一个协程任务,并安排其立即执行
- asyncio.gather():等待多个协程任务的全部完成,并且可以收集执行结果
- asyncio.sleep():在异步操作中等待一段实际
import asyncio
# 定义一个异步函数
async def greet(name):
print("Hello, " + name)
await asyncio.sleep(2) # 使用异步的sleep函数
print("Goodbye, " + name)
# 执行异步函数
async def main():
# 创建任务并发执行
task1 = asyncio.create_task(greet("verify chip"))
task2 = asyncio.create_task(greet("picker"))
# 等待所有任务完成
await asyncio.gather(task1, task2)
# 运行主函数
if __name__ == "__main__":
asyncio.run(main())
- 首先执行greet(“verify chip”),打印Hello,verify chip
- 当遇到await时,转去执行greet(“picker”), 打印Hello,picker
- 当要等待的操作执行完以后两个task分别输出Goodbye,verify chip,Goodbye,picker
异步编程的优势
异步编程具有以下几个显著的优势:
- 提高响应速度 通过异步编程,程序能够在等待某个任务完成时继续执行其他任务,避免了任务阻塞带来的延迟。这样能够大幅度提高程序的响应速度,提升用户体验。
- 提升并发性能 异步编程允许程序同时处理多个任务,充分利用计算资源,提升了系统的并发能力。特别是在处理大量 I/O 密集型任务时,异步编程能够更好地发挥优势,降低资源消耗。
- 简化编程逻辑 异步编程可以避免编写复杂的多线程代码,降低了程序的复杂性和出错的概率。通过简化编程逻辑,开发者能够更专注于业务逻辑的实现。
因此异步编程广泛应用于以下几个领域:
- Web 开发 在 Web 开发中,异步编程常用于处理网络请求、数据库操作等耗时任务。通过异步方式处理这些任务,可以避免阻塞主线程,保证 Web 服务器的并发性能。
- 并行计算 异步编程可以帮助实现并行计算,将一个大任务拆分成多个小任务并发执行,提高计算效率。这在科学计算、数据处理等领域非常常见。
- 消息队列 消息队列是异步编程的经典应用之一。异步消息队列可以实现不同系统之间的解耦和异步通信,提高系统的可扩展性和稳定性。
picker中异步的用法
例如在picker中,我们可以通过如下方法通过周期来控制代码执行的流程
- await clk.AStep(3):等待时钟 clk 走 3 个时钟周期。await 关键字使得程序在这里暂停执行,直到时钟走完指定的时钟周期后才继续执行下一行代码。
- await clk.ACondition(lambda: clk.clk == 20):它等待条件 clk.clk == 20 成立。类似地,程序在这里暂停执行,直到条件成立后才继续执行下一行代码。
async def test_async():
clk = XClock(lambda a: 0)
clk.StepRis(lambda c : print("lambda ris: ", c))
task = create_task(clk.RunStep(30))
print("test AStep:", clk.clk)
await clk.AStep(3)
print("test ACondition:", clk.clk)
await clk.ACondition(lambda: clk.clk == 20)
print("test cpm:", clk.clk)
await task
验证加法器时使用异步
这里继续用在上升沿触发的加法器作为例子,我们对之前的代码做了一些微小的变动,把时钟信号的产生和等待时钟周期换成了picker
提供的异步方法:
XClock的
Step(i)
方法会推进i
个时钟周期,具有两个主要功能:
- 生成
i
个时钟周期的时钟信号。- 等待时钟经过
i
个周期。在异步编程中,这两个功能对应着 XClock 提供的两种异步方法:
RunStep(i)
:生成i
个时钟周期的时钟信号。需要通过asyncio.create_task
创建任务,并在测试代码的最后使用await
等待其完成。AStep(i)
:等待时钟经过i
个周期。如果
RunStep(i)
方法在AStep(i)
之前完成,整个程序将在AStep(i)
处被阻塞。
使用异步的测试代码
from UT_RisAdder import *
import random
import asyncio
# 控制字体颜色
FONT_GREEN = "\033[0;32m" # 绿色
FONT_RED = "\033[0;31m" # 红色
FONT_COLOR_RESET = "\033[0m" # 重置颜色
class SimpleRisAdder:
"""
SimpleRisAdder 类是一个作为参考的加法器类,
它模拟了我们预期的RisAdder的行为
"""
def __init__(self, width) -> None:
self.WIDTH = width # 加法器的位宽
# 端口定义
self.a = 0 # 输入端口a
self.b = 0 # 输入端口b
self.cin = 0 # 输入端口cin
self.cout = 0 # 输出端口cout
self.sum = 0 # 输出端口sum
def step(self, a, b, cin):
"""
模拟上升沿更新输出: 先用上个周期的输入更新输出,之后再更新输入
"""
sum = self.a + self.b + self.cin
self.cout = sum >> self.WIDTH # 计算进位
self.sum = sum & ((1 << self.WIDTH) - 1) # 计算和
self.a = a # 更新输入a
self.b = b # 更新输入b
self.cin = cin # 更新输入cin
# 测试函数,验证加法器的输出是否正确
def test_adder(clk: int, dut: DUTRisAdder, ref: SimpleRisAdder) -> None:
# 获取加法器的输入和输出
a = dut.a.value
b = dut.b.value
cin = dut.cin.value
cout = dut.cout.value
sum = dut.sum.value
# 检查加法器的输出是否与预期一致
isEqual = (cout, sum) == (ref.cout, ref.sum)
# 输出测试结果
print(f"Cycle: {clk}, Input(a, b, cin) = ({a:x}, {b:x}, {cin:x})")
print(
FONT_GREEN + "Pass.
消息驱动
利用消息对电路和软件激励进行解耦概述
消息驱动编程是一种编程范式,它依赖于异步消息传递以促进组件间的通信和协作。在这种模式下,系统的各个组件不是通过直接调用对方的函数或方法,而是通过发送和接收消息来交流。例如,在picker环境中,我们可以利用消息驱动的方法将电路的行为和软件的激励相解耦,这样就可以避免受到硬件电路时序限制的束缚。 在硬件电路测试中,硬件时序指的是电路中各个元件操作的顺序和时间间隔,这对于电路的正确运行至关重要。软件激励则是指用软件生成的一系列动作或信号,用以模拟外部事件对电路的影响,以测试电路的反应。将硬件时序与软件激励解耦是必要的,因为这样可以使得测试过程更加灵活和高效。这种解耦还有助于在不同的环境中重用软件激励,提高测试资源的利用率。总之,使用消息驱动来解耦硬件时序和软件激励可以提升测试的质量和可维护性,同时降低复杂性。
消息驱动编程通常涉及以下几个概念和组件:
- 消息: 消息是在组件之间传递的数据单元。消息可以是简单的数据结构、事件对象,甚至是命令。发送方将消息发送到一个目标,接收方则从目标接收消息。
- 消息队列: 消息队列是消息传递的中介。它负责存储和管理发送到它的消息,并将消息传递给接收方。消息队列可以基于内存或磁盘,可以是单播(一对一)、多播(一对多)或广播(一对所有)。
- 发布-订阅模式: 发布-订阅模式是消息驱动编程的一种常见实现方式。在这种模式中,发布者发布消息到一个或多个主题(topic),订阅者订阅感兴趣的主题,并接收相应的消息。
- 消息代理: 消息代理是处理消息传递的中间件组件。它负责接收和分发消息,管理消息队列,确保消息的可靠传递,以及提供其他消息相关的功能,如消息路由、消息过滤、消息持久化等
使用Pub/Sub模式来实现消息驱动
发布/订阅模式是一种在软件架构中常见的消息通信方式。在这个模式中,发布者不直接将消息发送给特定的接收者,而是发布(发送)到一个中间层,即消息代理。订阅者通过订阅感兴趣的消息类型或主题,来表明他们希望接收哪些消息。消息代理的职责是确保所有订阅了特定主题的客户端都能收到相应的消息。 这种模式的一个关键特点是发布者和订阅者之间的解耦。他们不需要知道对方的存在,也不需要直接通信。这提高了系统的灵活性和可扩展性,因为可以独立地添加或移除发布者和订阅者,而不会影响系统的其他部分。
- 使用 Python 的内置队列模块实现的基本发布/订阅模型:
- 此处 Publisher 类具有消息队列和订阅者列表。使用发布方法发布消息时,会将其添加到队列中,并通过调用其接收方法传递到所有订阅的客户端。Subscriber 类具有一个 receive 方法,该方法仅打印收到的消息。
import queue # 发布者类 class Publisher: def __init__(self): # 初始化消息队列和订阅者列表 self.message_queue = queue.Queue() self.subscribers = [] def subscribe(self, subscriber): # 添加一个新的订阅者到订阅者列表 self.subscribers.append(subscriber) def publish(self, message): # 将消息放入队列并通知所有订阅者 self.message_queue.put(message) for subscriber in self.subscribers: # 调用订阅者的接收方法 subscriber.receive(message) # 订阅者类 class Subscriber: def __init__(self, name): # 初始化订阅者的名称 self.name = name def receive(self, message): # 打印接收到的消息 print(f"{self.name} received message: {message}") # 创建一个发布者实例 publisher = Publisher() # 创建两个订阅者实例 subscriber_1 = Subscriber("Subscriber 1") subscriber_2 = Subscriber("Subscriber 2") # 将订阅者添加到发布者的订阅者列表中 publisher.subscribe(subscriber_1) publisher.subscribe(subscriber_2) # 发布者发布一条消息 publisher.publish("Hello World")
- 使用 Python 的线程模块实现的发布/订阅模型:
- 在此示例中,Publisher 类有一个订阅者字典,其中键是主题,值是订阅者列表。subscribe 方法将订阅服务器添加到指定主题的列表中。publish 方法检查指定主题是否有任何订阅者,如果有,则设置事件并存储每个订阅者的消息。Subscriber 类和 receive 方法与前面的示例相同。
import threading # 发布者类 class Publisher: def __init__(self): # 初始化订阅者字典,按主题组织 self.subscribers = {} def subscribe(self, subscriber, topic): # 订阅方法,将订阅者添加到特定主题 if topic not in self.subscribers: self.subscribers[topic] = [] self.subscribers[topic].append(subscriber) def publish(self, message, topic): # 发布方法,向特定主题的所有订阅者发送消息 if topic in self.subscribers: for subscriber in self.subscribers[topic]: # 设置事件标志,通知订阅者有新消息 subscriber.event.set() # 将消息传递给订阅者 subscriber.message = message # 订阅者类 class Subscriber: def __init__(self, name): # 初始化订阅者名称和事件标志 self.name = name self.event = threading.Event() self.message = None def receive(self): # 接收方法,等待事件标志被设置 self.event.wait() # 打印接收到的消息 print(f"{self.name} received message: {self.message}") # 清除事件标志,准备接收下一个消息 self.event.clear() # 创建发布者实例 publisher = Publisher() # 创建三个订阅者实例 subscriber_1 = Subscriber("Subscriber 1") subscriber_2 = Subscriber("Subscriber 2") subscriber_3 = Subscriber("Subscriber 3") # 将订阅者根据主题订阅到发布者 publisher.subscribe(subscriber_1, "sports") publisher.subscribe(subscriber_2, "entertainment") publisher.subscribe(subscriber_3, "sports") # 发布者发布一条属于'sports'主题的消息 publisher.publish("Soccer match result", "sports") # 订阅者1接收并处理消息 subscriber_1.receive()
使用消息驱动进行验证
下面我们将以果壳cache的验证过程为例,来介绍消息驱动在验证中的使用。 完整代码参见。
from util.simplebus import SimpleBusWrapper
from tools.colorprint import Color as cl
import xspcomm as xsp
import queue
# 请求消息类,用于封装通信请求的详细信息
class ReqMsg:
def __init__(self, addr, cmd, user=0x123, size=7, mask=0, data=0):
self.user = user
self.size = size
self.addr = addr
self.cmd = cmd
self.mask = mask
self.data = data
def display(self):
print(f"[REQ MSG] user {self.user:x}, size {self.size}, addr 0x{self.addr:x} "
f"cmd 0x{self.cmd:x}, mask {self.mask:b}, data {self.data:x}")
# 缓存包装器类,模拟缓存的行为并与外部总线通信
class CacheWrapper:
def __init__(self, io_bus: SimpleBusWrapper, clk: xsp.XClock, cache_port: xsp.XPort):
self.xclk = clk
# 简单总线包装器,用于与外部通信
self.io_bus = io_bus
# 缓存端口,可能用于与外部组件交互
self.cache_port = cache_port
# 初始化请求队列,用于存储即将处理的请求消息
self.req_que = queue.Queue()
# 初始化响应队列,用于存储处理完的响应消息
self.resp_que = queue.Queue()
# 注册硬件时钟上升沿的回调方法,用于处理请求和响应
self.xclk.StepRis(self.__callback)
# 发起一个读请求
def trigger_read_req(self, addr):
# 将读请求消息放入请求队列,不等待队列锁定
self.req_que.put_nowait(ReqMsg(addr=addr, cmd=self.io_bus.cmd_read))
# 发起一个写请求
def trigger_write_req(self, addr, data, mask):
# 将写请求消息放入请求队列,不等待队列锁定
self.req_que.put_nowait(ReqMsg(addr=addr, cmd=self.io_bus.cmd_write, mask=mask, data=data))
# 接收响应
def recv(self):
# 等待响应队列非空,然后取出响应
while self.resp_que.empty():
self.xclk.Step(1)
return self.resp_que.get()
# 读取数据
def read(self, addr):
# 发起读请求,然后等待并返回响应
self.trigger_read_req(addr)
return self.recv()
# 写入数据
def write(self, addr, data, mask):
# 发起写请求,然后等待并返回响应
self.trigger_write_req(addr, data, mask)
return self.recv()
# 重置缓存
def reset(self):
# 设置复位信号,等待一定时钟周期,然后清除复位信号
self.cache_port["reset"].value = 1
self.xclk.Step(100)
self.cache_port["reset"].value = 0
self.cache_port["io_flush"].value = 0
# 等待请求准备就绪信号
while not self.io_bus.IsReqReady():
self.xclk.Step(1)
# 硬件时钟上升沿的回调方法
def __callback(self, *a, **b):
# 处理请求
if self.io_bus.IsReqSend():
# 如果有请求发送,从请求队列取出一个请求
self.req_que.get()
# 检查请求队列是否为空
if self.req_que.empty():
# 如果请求队列为空,向io_bus发送无效请求信号
self.io_bus.ReqUnValid()
else:
# 如果请求队列不为空,向io_bus发送有效请求信号
self.io_bus.ReqSetValid()
# 取出队首的请求消息
msg: ReqMsg = self.req_que.queue[0]
# 根据请求命令类型,执行读或写操作
if msg.cmd == self.io_bus.cmd_read:
self.io_bus.ReqReadData(msg.addr)
if msg.cmd == self.io_bus.cmd_write:
self.io_bus.ReqWriteData(msg.addr, msg.data, msg.mask)
# 处理接收
self.io_bus.port["resp_ready"].value = 1
# 如果响应有效,从io_bus获取响应数据,并放入响应队列
if self.io_bus.IsRespValid():
res = self.io_bus.get_resp_rdata()
self.resp_que.put_nowait(res)
在上述代码中,进行消息驱动的流程如下:
- 封装软件激励:
- 软件激励首先被封装进ReqMsg对象中,这个对象包含了所有必要的信息,如地址、命令、数据等。此处以果壳cache的验证为例。
- 使用消息队列存储请求:
- 封装后的请求被放入CacheWrapper类的请求队列req_que中。这个队列作为软件激励的缓冲区,允许软件在任何时刻发送请求,而不必等待硬件的即时响应。
- 解耦的回调机制:
- 在硬件时钟上升沿,CacheWrapper类的__callback方法被触发。这个方法检查请求队列中是否有待处理的请求,并根据当前的硬件状态决定是否处理这些请求。这是解耦过程中的关键步骤,因为它将软件激励的发送与硬件时序的处理分离开来。
- 模拟硬件响应:
- 封装后的请求被放入CacheWrapper类的请求队列req_que中。这个队列作为软件激励的缓冲区,允许软件在任何时刻发送请求,而不必等待硬件的即时响应。
- 软件接收响应:
- 软件可以通过CacheWrapper类的recv方法从响应队列中取出响应。这个过程是同步的,但它允许软件在任何时刻检查响应队列,而不是必须在特定的硬件时序点上。
通过上述过程,软件的请求(激励)和硬件的响应(时序)被有效地解耦。软件可以自由地发送请求,而硬件则在适当的时序下处理这些请求,生成响应。这样的分离确保了软件的开发和测试可以与硬件的具体行为相独立,极大提升了开发效率和系统的可扩展性。为了避免每次都手动编写代码,我们提供了一个名为MLVP的框架,它包含了一系列现成的消息驱动方法。
验证框架
MLVP 验证框架。通过 Picker 工具生成的 Python DUT,已经可以使我们在 Python 环境中进行简单的验证,包括对 DUT 的实例化、信号赋值、时钟驱动等操作。但是在实际的验证工作中,我们通常需要更为高级的验证特性,例如协程支持、覆盖率收集与报告生成等功能。为此,我们提供了 MLVP(Multi-language Verification Platform) 验证框架,用于提供这些高级验证特性。
目前,MLVP 验证框架支持的功能包括:
- 协程支持:MLVP 验证框架提供了协程支持,使用户可以方便地编写异步验证代码。
- 覆盖率收集与报告生成:MLVP 验证框架提供了覆盖率收集与报告生成功能,使用户可以方便地收集覆盖率数据,并生成覆盖率报告。
- 日志记录:MLVP 验证框架提供了日志记录功能,使用户可以方便地记录验证过程中的信息。
- 接口 :MLVP 验证框架提供了接口的创建,方便用户定义一组用于完成某个特定功能的接口集合,同时也使得软件模块的编写与 DUT 的具体实现解耦。
- 验证实用模块: MLVP 验证框架提供了一些验证实用模块,方便用户编写软件模块,目前包含 “两比特饱和计数器”, “伪 LRU 算法” 等。
有关 MLVP 验证框架的详细使用方法,请参见 MLVP。
标签:__,异步,RV,函数,--,self,教程,dut,def From: https://www.cnblogs.com/shiyue1702/p/18319764