该程序从流式麦克风获取输入,并通过在listen_print_loop() 中对其进行处理来生成输出。 点击有效,但向上或向下移动仅有效一次。 在添加 pydub 库之前它工作正常。
import queue
import sys
import pyautogui as pag
from pydub import AudioSegment
from pydub.playback import play
from google.api_core.client_options import ClientOptions
from google.cloud import speech
import pyaudio
RATE = 16000
CHUNK = int(RATE / 10)
class MicrophoneStream:
def __init__(self: object, rate: int = RATE, chunk: int = CHUNK) -> None:
self._rate = rate
self._chunk = chunk
self._buff = queue.Queue()
self.closed = True
def __enter__(self: object) -> object:
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
channels=1,
rate=self._rate,
input=True,
frames_per_buffer=self._chunk,
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(
self: object,
type: object,
value: object,
traceback: object,
) -> None:
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(
self: object,
in_data: object,
frame_count: int,
time_info: object,
status_flags: object,
) -> object:
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self: object) -> object:
while not self.closed:
chunk = self._buff.get()
if chunk is None:
return
data = [chunk]
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b"".join(data)
def listen_print_loop(responses: object) -> str:
b= 1
b2= 0
num_chars_printed = 0
for response in responses:
if not response.results:
continue
result = response.results[0]
if not result.alternatives:
continue
transcript = result.alternatives[0].transcript
overwrite_chars = " " * (num_chars_printed - len(transcript))
if not result.is_final:
sys.stdout.write(transcript + overwrite_chars + "\r")
sys.stdout.flush()
num_chars_printed = len(transcript)
else:
print(transcript + overwrite_chars)
if b == 1:
if transcript.strip()== "exit":
play(AudioSegment.from_wav("5.wav"))
sys.exit()
if transcript.strip()== "single click":
pag.click()
play(AudioSegment.from_wav("2.wav"))
elif transcript.strip()== "double click":
pag.doubleClick()
play(AudioSegment.from_wav("2.wav"))
elif transcript.strip()== "right click":
pag.click(button= "right")
play(AudioSegment.from_wav("2.wav"))
elif transcript[:7].strip()== "move up":
pag.move( 0, -int(transcript[8:].replace(" ", "").replace(",", "")), 0.5, pag.easeOutQuad)
play(AudioSegment.from_wav("2.wav"))
elif transcript[:9].strip()== "move down":
pag.move( 0, int(transcript[10:].replace(" ", "").replace(",", "")), 0.5, pag.easeOutQuad)
play(AudioSegment.from_wav("2.wav"))
num_chars_printed = 0
return transcript
def main() -> None:
play(AudioSegment.from_wav("1.wav"))
play(AudioSegment.from_wav("2.wav"))
language_code = "en-US"
client = speech.SpeechClient.from_service_account_file('Key.json')
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=RATE,
language_code=language_code,
)
streaming_config = speech.StreamingRecognitionConfig(
config=config, interim_results=True
)
with MicrophoneStream(RATE, CHUNK) as stream:
audio_generator = stream.generator()
requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
)
responses = client.streaming_recognize(streaming_config, requests)
listen_print_loop(responses)
if __name__ == "__main__":
main()
我重新启动了 VS Code 以检查这是否只是一个小问题,但它不起作用。
问题在于变量
b
在
listen_print_loop
函数中被设置为
1
,并且在条件语句之外从未被修改过。这意味着该条件
if b == 1:
始终为真,因此与“向上移动”和“向下移动”相关的代码块只会被执行
一次
,即第一次满足此条件时。
为了解决这个问题,需要在每次成功识别“向上移动”或“向下移动”命令后将
b
的值更改为其他值(例如
0
)。
以下是更新后的
listen_print_loop
函数:
def listen_print_loop(responses: object) -> str:
num_chars_printed = 0
b = 1 # 初始化 b
for response in responses:
# ... [现有代码] ...
else:
print(transcript + overwrite_chars)
if b == 1:
if transcript.strip() == "exit":
play(AudioSegment.from_wav("5.wav"))
sys.exit()
elif transcript.strip() == "single click":
pag.click()
play(AudioSegment.from_wav("2.wav"))
elif transcript.strip() == "double click":
pag.doubleClick()
play(AudioSegment.from_wav("2.wav"))
elif transcript.strip() == "right click":
pag.click(button="right")
play(AudioSegment.from_wav("2.wav"))
elif transcript[:7].strip() == "move up":
pag.move(0, -int(transcript[8:].replace(" ", "").replace(",", "")), 0.5, pag.easeOutQuad)
play(AudioSegment.from_wav("2.wav"))
b = 0 # 修改 b 的值
elif transcript[:9].strip() == "move down":
pag.move(0, int(transcript[10:].replace(" ", "").replace(",", "")), 0.5, pag.easeOutQuad)
play(AudioSegment.from_wav("2.wav"))
b = 0 # 修改 b 的值
num_chars_printed = 0
return transcript
通过在每次成功识别“向上移动”或“向下移动”后将
b
设置为
0
,确保
if b == 1:
条件在第一次之后不再为真,从而允许后续识别这些命令。
请注意,变量
b2
似乎没有被使用,可以将其从代码中删除。