1''' Top-level python bindings for the lircd socket interface. '''
28from abc
import ABCMeta, abstractmethod
41_DEFAULT_PROG =
'lircd-client'
44def get_default_socket_path() -> str:
45 ''' Get default value for the lircd socket path, using (falling priority):
47 - The environment variable LIRC_SOCKET_PATH.
48 - The 'output' value in the lirc_options.conf file if value and the
49 corresponding file exists.
50 - A hardcoded default lirc.config.VARRUNDIR/lirc/lircd, possibly
54 if 'LIRC_SOCKET_PATH' in os.environ:
55 return os.environ[
'LIRC_SOCKET_PATH']
56 path = lirc.config.SYSCONFDIR +
'/lirc/lirc_options.conf'
57 if sys.version_info < (3, 2):
58 parser = configparser.SafeConfigParser()
60 parser = configparser.ConfigParser()
63 except configparser.Error:
66 if parser.has_section(
'lircd'):
68 path = str(parser.get(
'lircd',
'output'))
69 if os.path.exists(path):
71 except configparser.NoOptionError:
73 return lirc.config.VARRUNDIR +
'/lirc/lircd'
76def get_default_lircrc_path() -> str:
77 ''' Get default path to the lircrc file according to (falling priority):
79 - $XDG_CONFIG_HOME/lircrc if environment variable and file exists.
80 - ~/.config/lircrc if it exists.
81 - ~/.lircrc if it exists
82 - A hardcoded default lirc.config.SYSCONFDIR/lirc/lircrc, whether
85 if 'XDG_CONFIG_HOME' in os.environ:
86 path = os.path.join(os.environ[
'XDG_CONFIG_HOME'],
'lircrc')
87 if os.path.exists(path):
89 path = os.path.join(os.path.expanduser(
'~'),
'.config' 'lircrc')
90 if os.path.exists(path):
92 path = os.path.join(os.path.expanduser(
'~'),
'.lircrc')
93 if os.path.exists(path):
95 return os.path.join(lirc.config.SYSCONFDIR,
'lirc',
'lircrc')
98class BadPacketException(Exception):
99 ''' Malformed or otherwise unparsable packet received. '''
103class TimeoutException(Exception):
104 ''' Timeout receiving data from remote host.'''
159 ''' Abstract interface for all connections. '''
164 def __exit__(self, exc_type, exc, traceback):
168 def readline(self, timeout: float =
None) -> str:
169 ''' Read a buffered line
173 - If set to 0 immediately return either a line or None.
174 - If set to None (default mode) use blocking read.
176 Returns: code string as described in lircd(8) without trailing
179 Raises: TimeoutException if timeout > 0 expires.
185 ''' Return the file nr used for IO, suitable for select() etc. '''
190 ''' Return true if next readline(None) won't block . '''
195 ''' Close/release all resources '''
199class RawConnection(AbstractConnection):
200 ''' Interface to receive code strings as described in lircd(8).
203 - socket_path: lircd output socket path, see get_default_socket_path()
205 - prog: Program name used in lircrc decoding, see ircat(1). Could be
206 omitted if only raw keypresses should be read.
211 def __init__(self, socket_path: str =
None, prog: str = _DEFAULT_PROG):
213 os.environ[
'LIRC_SOCKET_PATH'] = socket_path
216 _client.lirc_deinit()
217 fd = _client.lirc_init(prog)
218 self.
_socket = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_STREAM)
219 self.
_select = selectors.DefaultSelector()
223 def readline(self, timeout: float =
None) -> str:
224 ''' Implements AbstractConnection.readline(). '''
226 start = time.perf_counter()
229 start + timeout - time.perf_counter()
if timeout
else timeout)
233 "readline: no data within %f seconds" % timeout)
238 raise ConnectionResetError(
'Connection lost')
241 return line.decode(
'ascii',
'ignore')
244 ''' Implements AbstractConnection.fileno(). '''
248 ''' Implements AbstractConnection.has_data() '''
252 ''' Implements AbstractConnection.close() '''
254 _client.lirc_deinit()
257AbstractConnection.register(RawConnection)
261 ''' Interface to receive lircrc-translated keypresses. This is basically
262 built on top of lirc_code2char() and as such supporting centralized
263 translations using lircrc_class. See lircrcd(8).
266 - program: string, used to identify client. See ircat(1)
267 - lircrc: lircrc file path. See get_default_lircrc_path() for defaults.
268 - socket_path: lircd output socket path, see get_default_socket_path()
273 def __init__(self, program: str,
274 lircrc_path: str =
None,
275 socket_path: str =
None):
279 raise FileNotFoundError(
'Cannot find lircrc config file.')
281 self.
_lircrc = _client.lirc_readconfig(lircrc_path)
285 def readline(self, timeout: float =
None):
286 ''' Implements AbstractConnection.readline(). '''
293 if not strings
or len(strings) == 0:
301 ''' Implements AbstractConnection.has_data() '''
305 ''' Implements AbstractConnection.fileno(). '''
309 ''' Implements AbstractConnection.close() '''
311 _client.lirc_freeconfig(self.
_lircrc)
314AbstractConnection.register(LircdConnection)
371 ''' Extends the parent with a send() method. '''
373 def __init__(self, socket_path: str =
None):
374 RawConnection.__init__(self, socket_path)
376 def send(self, command: (bytearray, str)):
377 ''' Send single line over socket '''
378 if not isinstance(command, bytearray):
379 command = command.encode(
'ascii')
380 while len(command) > 0:
382 command = command[sent:]
386 ''' Public reply parser result, available when completed. '''
393 ''' Command, parser and connection container with a run() method. '''
395 def __init__(self, cmd: str,
396 connection: AbstractConnection,
397 timeout: float = 0.4):
398 self.
_conn = connection
402 def run(self, timeout: float =
None):
403 ''' Run the command and return a Reply. Timeout as of
404 AbstractConnection.readline()
407 while not self.
_parser.is_completed():
416 ''' The status/result from parsing a command reply.
419 result: Enum Result, reflects parser state.
420 success: bool, reflects SUCCESS/ERROR.
421 data: List of lines, the command DATA payload.
422 sighup: bool, reflects if a SIGHUP package has been received
423 (these are otherwise ignored).
424 last_line: str, last input line (for error messages).
427 self.
result = Result.INCOMPLETE
435 ''' Handles the actual parsing of a command reply. '''
439 self._state = self._State.BEGIN
440 self._lines_expected =
None
441 self._buffer = bytearray(0)
443 def is_completed(self) -> bool:
444 ''' Returns true if no more reply input is required. '''
447 def feed(self, line: str):
448 ''' Enter a line of data into parsing FSM, update state. '''
451 self._State.BEGIN: self._begin,
452 self._State.COMMAND: self._command,
453 self._State.RESULT: self._result,
454 self._State.DATA: self._data,
455 self._State.LINE_COUNT: self._line_count,
456 self._State.LINES: self._lines,
457 self._State.END: self._end,
458 self._State.SIGHUP_END: self._sighup_end
465 if self.
_state == self._State.DONE:
475 ''' Internal FSM state. '''
487 def _bad_packet_exception(self, line):
490 'Cannot parse: %s\nat state: %s\n' % (line, self.
_state))
492 def _begin(self, line):
494 self._state = self._State.COMMAND
496 def _command(self, line):
498 self._bad_packet_exception(line)
499 elif line ==
'SIGHUP':
500 self._state = self._State.SIGHUP_END
503 self._state = self._State.RESULT
505 def _result(self, line):
506 if line
in [
'SUCCESS',
'ERROR']:
507 self.success = line ==
'SUCCESS'
508 self._state = self._State.DATA
510 self._bad_packet_exception(line)
512 def _data(self, line):
514 self._state = self._State.DONE
516 self._state = self._State.LINE_COUNT
518 self._bad_packet_exception(line)
520 def _line_count(self, line):
522 self._lines_expected = int(line)
524 self._bad_packet_exception(line)
525 if self._lines_expected == 0:
526 self._state = self._State.END
528 self._state = self._State.LINES
530 def _lines(self, line):
531 self.data.append(line)
532 if len(self.data) >= self._lines_expected:
533 self._state = self._State.END
535 def _end(self, line):
537 self._bad_packet_exception(line)
538 self._state = self._State.DONE
540 def _sighup_end(self, line):
542 ReplyParser.__init__(self)
545 self._bad_packet_exception(line)
563 ''' Simulate a button press, see SIMULATE in lircd(8) manpage. '''
566 def __init__(self, connection: AbstractConnection,
567 remote: str, key: str, repeat: int = 1, keycode: int = 0):
568 cmd =
'SIMULATE %016d %02d %s %s\n' % \
569 (int(keycode), int(repeat), key, remote)
570 Command.__init__(self, cmd, connection)
574 ''' List available remotes, see LIST in lircd(8) manpage. '''
576 def __init__(self, connection: AbstractConnection):
577 Command.__init__(self,
'LIST\n', connection)
581 ''' List available keys in given remote, see LIST in lircd(8) manpage. '''
583 def __init__(self, connection: AbstractConnection, remote: str):
584 Command.__init__(self,
'LIST %s\n' % remote, connection)
588 ''' Start repeating given key, see SEND_START in lircd(8) manpage. '''
590 def __init__(self, connection: AbstractConnection,
591 remote: str, key: str):
592 cmd =
'SEND_START %s %s\n' % (remote, key)
593 Command.__init__(self, cmd, connection)
597 ''' Stop repeating given key, see SEND_STOP in lircd(8) manpage. '''
599 def __init__(self, connection: AbstractConnection,
600 remote: str, key: str):
601 cmd =
'SEND_STOP %s %s\n' % (remote, key)
602 Command.__init__(self, cmd, connection)
606 ''' Send given key, see SEND_ONCE in lircd(8) manpage. '''
608 def __init__(self, connection: AbstractConnection,
609 remote: str, keys: str):
611 raise ValueError(
'No keys to send given')
612 cmd =
'SEND_ONCE %s %s\n' % (remote,
' '.join(keys))
613 Command.__init__(self, cmd, connection)
617 ''' Set transmitters to use, see SET_TRANSMITTERS in lircd(8) manpage.
620 transmitter: Either a bitmask or a list of int describing active
624 def __init__(self, connection: AbstractConnection,
625 transmitters: (int, list)):
626 if isinstance(transmitters, list):
628 for transmitter
in transmitters:
629 mask |= (1 << (int(transmitter) - 1))
632 cmd =
'SET_TRANSMITTERS %d\n' % mask
633 Command.__init__(self, cmd, connection)
637 ''' Get lircd version, see VERSION in lircd(8) manpage. '''
639 def __init__(self, connection: AbstractConnection):
640 Command.__init__(self,
'VERSION\n', connection)
644 ''' Set a driver option value, see DRV_OPTION in lircd(8) manpage. '''
646 def __init__(self, connection: AbstractConnection,
647 option: str, value: str):
648 cmd =
'DRV_OPTION %s %s\n' % (option, value)
649 Command.__init__(self, cmd, connection)
653 ''' Start/stop logging lircd output , see SET_INPUTLOG in lircd(8)
657 def __init__(self, connection: AbstractConnection,
658 logfile: str =
None):
659 cmd =
'SET_INPUTLOG' + (
' ' + logfile
if logfile
else '') +
'\n'
660 Command.__init__(self, cmd, connection)
674 ''' Identify client using the prog token, see IDENT in lircrcd(8) '''
676 def __init__(self, connection: AbstractConnection,
679 raise ValueError(
'The prog argument cannot be None')
680 cmd =
'IDENT {}\n'.format(prog)
681 Command.__init__(self, cmd, connection)
685 '''Translate a keypress to application string, see CODE in lircrcd(8) '''
687 def __init__(self, connection: AbstractConnection,
690 raise ValueError(
'The prog argument cannot be None')
691 Command.__init__(self,
'CODE {}\n'.format(code), connection)
695 '''Get current translation mode, see GETMODE in lircrcd(8) '''
697 def __init__(self, connection: AbstractConnection):
698 Command.__init__(self,
"GETMODE\n", connection)
702 '''Set current translation mode, see SETMODE in lircrcd(8) '''
704 def __init__(self, connection: AbstractConnection,
707 raise ValueError(
'The mode argument cannot be None')
708 Command.__init__(self,
'SETMODE {}\n'.format(mode), connection)
Abstract interface for all connections.
int fileno(self)
Return the file nr used for IO, suitable for select() etc.
close(self)
Close/release all resources.
bool has_data(self)
Return true if next readline(None) won't block .
str readline(self, float timeout=None)
Read a buffered line.
Malformed or otherwise unparsable packet received.
Translate a keypress to application string, see CODE in lircrcd(8)
Extends the parent with a send() method.
send(self,(bytearray, str) command)
Send single line over socket.
Command, parser and connection container with a run() method.
run(self, float timeout=None)
Run the command and return a Reply.
Set a driver option value, see DRV_OPTION in lircd(8) manpage.
Get current translation mode, see GETMODE in lircrcd(8)
Identify client using the prog token, see IDENT in lircrcd(8)
Interface to receive lircrc-translated keypresses.
int fileno(self)
Implements AbstractConnection.fileno().
bool has_data(self)
Implements AbstractConnection.has_data()
readline(self, float timeout=None)
Implements AbstractConnection.readline().
close(self)
Implements AbstractConnection.close()
List available keys in given remote, see LIST in lircd(8) manpage.
List available remotes, see LIST in lircd(8) manpage.
Interface to receive code strings as described in lircd(8).
int fileno(self)
Implements AbstractConnection.fileno().
close(self)
Implements AbstractConnection.close()
str readline(self, float timeout=None)
Implements AbstractConnection.readline().
bool has_data(self)
Implements AbstractConnection.has_data()
Handles the actual parsing of a command reply.
The status/result from parsing a command reply.
result
Enum Result, reflects parser state.
str last_line
str, last input line (for error messages).
success
bool, reflects SUCCESS/ERROR.
bool sighup
bool, reflects if a SIGHUP package has been received
list data
List of lines, the command DATA payload.
Public reply parser result, available when completed.
Send given key, see SEND_ONCE in lircd(8) manpage.
Start/stop logging lircd output , see SET_INPUTLOG in lircd(8) manpage.
Set current translation mode, see SETMODE in lircrcd(8)
Set transmitters to use, see SET_TRANSMITTERS in lircd(8) manpage.
Simulate a button press, see SIMULATE in lircd(8) manpage.
Start repeating given key, see SEND_START in lircd(8) manpage.
Stop repeating given key, see SEND_STOP in lircd(8) manpage.
Timeout receiving data from remote host.
Get lircd version, see VERSION in lircd(8) manpage.
str get_default_socket_path()
Get default value for the lircd socket path, using (falling priority):
str get_default_lircrc_path()
Get default path to the lircrc file according to (falling priority):