Python bindings for a C++ serial port library providing asynchronous serial communication support.
- Non-blocking and blocking read/write operations.
- Cross-platform support for Windows, Linux, macOS, and FreeBSD.
- Event-driven architecture for handling data events.
- Support for gevent, eventlet, asyncio, callback, and synchronous operations.
- Uses epoll, iocp, and kqueue for efficient, scalable I/O operations based on the underlying operating system.
You can install the async-pyserial package using either poetry or pip.
### Using Poetry
Install poetry if you haven't already:
curl -sSL https://install.python-poetry.org | python3 -
Add async-pyserial to your project:
poetry add async-pyserial
### Using pip
Install the package from PyPI:
pip install async-pyserial
### FreeBSD Installation
For FreeBSD, you need to build the package manually:
Clone the repository:
git clone https://github.com/Lei-k/async-pyserial.git
Navigate to the project directory:
cd async-pyserial
Install the dependencies using poetry:
poetry install
Build the package:
python -m build
Install the package:
pip install dist/*.whl
Here's a simple example of how to use async-pyserial:
from async_pyserial import SerialPort, SerialPortOptions, SerialPortEvent, SerialPortParity
def on_data(data):
print(f"Received: {data}")
options = SerialPortOptions()
options.baudrate = 9600
options.bytesize = 8
options.stopbits = 1
options.parity = SerialPortParity.NONE # NONE, ODD, EVEN
serial_port = SerialPort('/dev/ttyUSB0', options)
serial_port.on(SerialPortEvent.ON_DATA, on_data)
serial_port.open()
try:
while True:
data_to_send = input("Enter data to send (or 'exit' to quit): ")
if data_to_send.lower() == 'exit':
break
serial_port.write(data_to_send.encode('utf-8'))
finally:
serial_port.close()
### SerialPort A class for serial communication.
#### Methods
- __init__(self, port: str, options: SerialPortOptions): Initializes the serial port with the specified parameters.
- def write(self, data: bytes, callback: Callable | None = None): Writes data to the serial port. Can be blocking or non-blocking. If a callback is provided, the write will be asynchronous. Supports gevent, eventlet, asyncio, callback, and synchronous operations.
- def read(self, bufsize: int = 512, callback: Callable | None = None): Reads data from the serial port. Can be blocking or non-blocking. If a callback is provided, the read will be asynchronous. Supports gevent, eventlet, asyncio, callback, and synchronous operations.
- def open(self): Opens the serial port.
- def close(self): Closes the serial port.
- def on(self, event: SerialPortEvent, callback: Callable[[bytes], None]): Registers a callback for the specified event.
- def emit(self, evt: str, *args, **kwargs): Emits an event, triggering all registered callbacks for that event.
- def remove_all_listeners(self, evt: str): Removes all listeners for the specified event.
- def remove_listener(self, evt: str, listener: Callable): Removes a specific listener for the specified event.
- def off(self, evt: str, listener: Callable): Alias for remove_listener.
### SerialPortOptions A class for specifying serial port options.
#### Attributes
- baudrate: int: The baud rate for the serial port.
- bytesize: int: The number of data bits.
- stopbits: int: The number of stop bits.
- parity: int: The parity checking (0: None, 1: Odd, 2: Even).
- read_timeout: int: The read timeout in milliseconds.
- write_timeout: int: The write timeout in milliseconds.
- read_bufsize: int: The read buffer size. Default is 0. When read_bufsize is 0, the internal buffer is not used, and only data received after the read call will be returned. If read_bufsize is not 0, both buffered and new data will be returned.
### SerialPortEvent An enumeration for serial port events.
- ON_DATA: Event triggered when data is received.
### SerialPortError An exception class for handling serial port errors.
- __init__(self, *args: object): Initializes the SerialPortError with the specified arguments.
### PlatformNotSupported An exception class for handling unsupported platforms.
- __init__(self, *args: object): Initializes the PlatformNotSupported exception with the specified arguments.
### set_async_worker A function for setting the asynchronous worker.
- def set_async_worker(w: str, loop = None): Sets the asynchronous worker to gevent, eventlet, or asyncio. Optionally, an event loop can be provided for asyncio.
The examples directory contains sample scripts demonstrating how to use async-pyserial for various operations. Below are a few examples to help you get started.
- Basic read and write operations.
- Non-blocking read with asyncio.
- Using gevent and eventlet for asynchronous operations.
Example scripts included in the examples directory:
- interval_write.py: Demonstrates periodic writing to the serial port.
- serialport_read.py: Demonstrates reading from the serial port with different async workers.
- serialport_terminal.py: A terminal interface for interacting with the serial port.
Supports Windows, Linux, macOS, and FreeBSD.
To contribute to the project, follow these steps:
Clone the repository:
git clone https://github.com/Lei-k/async-pyserial.git
Navigate to the project directory:
cd async-pyserial
Install the dependencies using poetry:
poetry install
Run the tests:
poetry run pytest
This project is licensed under the MIT License. See the LICENSE file for more details.