Initial version
This commit is contained in:
139
dog-trainer.py
Executable file
139
dog-trainer.py
Executable file
@@ -0,0 +1,139 @@
|
||||
import RPi.GPIO as GPIO
|
||||
import time
|
||||
from datetime import datetime
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
PLAY_DEVICE = "my_usb_sound_device_with_converter" # as defined in '~/.asoundrc'
|
||||
REC_DEVICE = "my_usb_sound_device" # as defined in '~/.asoundrc'
|
||||
MAX_RECORDING_TIME = 30 # limit recordings to only e.g. 30 seconds long
|
||||
TRIM_RECORDING_END = 0.8 # remove the clicking noise from the end of recordings
|
||||
RECORD_BUTTON_PRESS_TIME = 2 # pressing and holding a button longer then e.g. 2 seconds triggers recording otherwise playing
|
||||
PINS = [17, 23] # listen to changes on this GPIOs e.g. [4, 17, 23, 0, 5, 6]
|
||||
GPIO.setmode(GPIO.BCM) # use BCM pin layout
|
||||
GPIO.setwarnings(False)
|
||||
|
||||
|
||||
def setup_pins():
|
||||
for pin in PINS:
|
||||
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_OFF)
|
||||
|
||||
|
||||
def terminate_recording(proc, pin):
|
||||
print("Terminating recording...")
|
||||
proc.terminate()
|
||||
proc.wait(timeout=5)
|
||||
print(f"Cutting last {TRIM_RECORDING_END} second (click noise) from the recording")
|
||||
audio_duration = subprocess.run(
|
||||
[
|
||||
"ffprobe",
|
||||
"-v",
|
||||
"error",
|
||||
"-select_streams",
|
||||
"a:0",
|
||||
"-show_entries",
|
||||
"stream=duration",
|
||||
"-of",
|
||||
"default=noprint_wrappers=1:nokey=1",
|
||||
f"recording_temp_{pin}.wav",
|
||||
],
|
||||
stdout=subprocess.PIPE,
|
||||
).stdout.decode("utf-8")
|
||||
subprocess.call(
|
||||
[
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-y",
|
||||
"-i",
|
||||
f"recording_temp_{pin}.wav",
|
||||
"-t",
|
||||
f"{float(audio_duration) - TRIM_RECORDING_END}",
|
||||
"-acodec",
|
||||
"copy",
|
||||
f"recording_{pin}.wav",
|
||||
]
|
||||
)
|
||||
DATE_TIME = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
|
||||
print(f"Archiving old sound to archive/recording_{pin}_{DATE_TIME}.wav")
|
||||
subprocess.call(
|
||||
["mv", f"recording_temp_{pin}.wav", f"archive/recording_{pin}__{DATE_TIME}.wav"]
|
||||
)
|
||||
play_sound(pin)
|
||||
|
||||
|
||||
def play(file):
|
||||
subprocess.call(["aplay", "-D", PLAY_DEVICE, file])
|
||||
|
||||
|
||||
def play_sound(pin):
|
||||
play(f"recording_{pin}.wav")
|
||||
|
||||
|
||||
def play_beep():
|
||||
play("beep.wav")
|
||||
|
||||
|
||||
def play_intro():
|
||||
play("Windows_95.wav")
|
||||
|
||||
|
||||
def record_sound(pin):
|
||||
print(f"GPIO {pin} LOW - Starting recording_temp_{pin}.wav")
|
||||
proc = subprocess.Popen(
|
||||
[
|
||||
"arecord",
|
||||
"-D",
|
||||
REC_DEVICE,
|
||||
"-f",
|
||||
"S16_LE",
|
||||
"-r",
|
||||
"16000",
|
||||
"-c",
|
||||
"1",
|
||||
"-d",
|
||||
f"{MAX_RECORDING_TIME}",
|
||||
f"recording_temp_{pin}.wav",
|
||||
]
|
||||
)
|
||||
print(f"Recording in progress (max {MAX_RECORDING_TIME} seconds)...")
|
||||
wait_for_button_release_or_timeout(pin, MAX_RECORDING_TIME)
|
||||
print("GPIO went HIGH - terminating recording")
|
||||
play_beep()
|
||||
terminate_recording(proc, pin)
|
||||
|
||||
|
||||
def wait_for_button_release_or_timeout(pin, timeout):
|
||||
timeout_start = time.time()
|
||||
while time.time() < timeout_start + timeout:
|
||||
if GPIO.input(pin) == GPIO.HIGH:
|
||||
break
|
||||
time.sleep(0.05)
|
||||
|
||||
|
||||
def handle_pin(pin):
|
||||
print(f"GPIO {pin} LOW - event triggered")
|
||||
timeout_start = time.time()
|
||||
wait_for_button_release_or_timeout(pin, RECORD_BUTTON_PRESS_TIME)
|
||||
|
||||
if time.time() - timeout_start >= RECORD_BUTTON_PRESS_TIME:
|
||||
# Pin was LOW for longer time; recording
|
||||
play_beep()
|
||||
record_sound(pin)
|
||||
else:
|
||||
# Pin was LOW for shorter time; playing
|
||||
play_sound(pin)
|
||||
|
||||
|
||||
try:
|
||||
setup_pins()
|
||||
subprocess.call(["mkdir", "-p", "archive"])
|
||||
play_intro()
|
||||
for pin in PINS:
|
||||
GPIO.add_event_detect(pin, GPIO.FALLING, callback=handle_pin, bouncetime=1000)
|
||||
while True:
|
||||
time.sleep(0.01)
|
||||
except Exception:
|
||||
print("Cought exception...")
|
||||
sys.exit()
|
||||
finally:
|
||||
GPIO.cleanup()
|
||||
Reference in New Issue
Block a user