Source code for fx2

import struct
import usb1


__all__ = ["FX2Config", "FX2Device", "FX2DeviceError"]


VID_CYPRESS = 0x04B4
PID_FX2     = 0x8613

CFG_DISCON  = 1 << 6
CFG_400KHZ  = 1 << 0

REG_CPUCS   = 0xE600

REQ_RAM        = 0xA0
REQ_EEPROM_SB  = 0xA2
REQ_EXT_RAM    = 0xA3
REQ_RENUMERATE = 0xA8
REQ_EEPROM_DB  = 0xA9
REQ_PAGE_SIZE  = 0xB0


[docs]class FX2Config: """ Cypress FX2 EEPROM configuration data. vendor_id : int USB vendor ID, 16 bits. product_id : int USB product ID, 16 bits. device_id : int USB device ID, 16 bits in binary-coded decimal. disconnect : bool If ``True``, do not enumerate on startup. If ``False``, enumerate as a default FX2 device with specified VID/PID/DID. i2c_400khz : bool If ``True``, use 400 kHz I2C clock to read firmware from EEPROM. If ``False``, use 100 kHz clock. firmware : list Firmware to be loaded into on-chip program/data RAM. If empty, "C0 load" is used; if not, "C2 load" is used, and the command to bring the CPU out of reset is inserted automatically. """ def __init__(self, vendor_id=VID_CYPRESS, product_id=PID_FX2, device_id=0x0000, disconnect=False, i2c_400khz=False): self.vendor_id = vendor_id self.product_id = product_id self.device_id = device_id self.disconnect = disconnect self.i2c_400khz = i2c_400khz self.firmware = []
[docs] def append(self, addr, chunk): """ Append a command to load ``chunk`` at ``addr`` into on-chip program/data RAM on device startup. """ first, last = addr, addr + len(chunk) assert (first >= 0x0000 and last <= 0x4000 or first >= 0xE000 and last <= 0xE200) while len(chunk) > 0: # Split into 1023-byte chunks, since the hardware is not able to handle # length field over 10 bit. self.firmware.append((addr, bytes(chunk[:1023]))) addr += 1023 chunk = chunk[1023:]
[docs] def encode(self, max_size=None): """ Convert configuration to an image that can be loaded into an EEPROM. Returns the EEPROM image if the resulting configuration is smaller than ``max_size``, or raises :class:`ValueError` if it is not. """ data = bytearray() data.append(0xC0 if len(self.firmware) == 0 else 0xC2) # Windows wouldn't enumerate a device with these VID/PID. assert (self.vendor_id not in (0x0000, 0xFFFF) and self.product_id not in (0x0000, 0xFFFF)) data += struct.pack("<HHH", self.vendor_id, self.product_id, self.device_id) data.append((CFG_DISCON if self.disconnect else 0) | (CFG_400KHZ if self.i2c_400khz else 0)) if len(self.firmware) > 0: for (addr, chunk) in self.firmware: assert len(chunk) > 0 and len(chunk) < 1024 data += struct.pack(">HH", len(chunk), addr) data += chunk data += struct.pack(">HHb", 0x8001, REG_CPUCS, 0) if max_size is not None and len(data) > max_size: raise ValueError("configuration image exceeds maximum size") return data
[docs] @classmethod def decode(cls, data, partial=False): """ Parse configuration from an image loaded from an EEPROM. Returns ``None`` if the EEPROM image is empty (the EEPROM was erased), :class:`FX2Config` if it contains a valid configuration, or raises :class:`ValueError` if it does not. If ``partial`` is ``True``, only requires any data records present to be complete; if it is ``False``, it is an error unless the image contains the final data record. """ if data[0] == 0xFF: return None elif data[0] == 0xC0: has_firmware = False elif data[0] == 0xC2: has_firmware = True else: raise ValueError("Invalid load command") vendor_id, product_id, device_id, config_byte = \ struct.unpack_from("<HHHb", data, offset=1) if (vendor_id in (0x0000, 0xFFFF) or product_id in (0x0000, 0xFFFF)): raise ValueError("Invalid VID/PID") disconnect = (config_byte & CFG_DISCON) != 0 i2c_400khz = (config_byte & CFG_400KHZ) != 0 config = cls(vendor_id, product_id, device_id, disconnect, i2c_400khz) if has_firmware: offset = 8 while not partial or offset < len(data): if offset + 4 > len(data): raise ValueError("Truncated data record") length, addr = struct.unpack_from(">HH", data, offset) offset += 4 if length & 0x8000: break if offset + length > len(data): raise ValueError("Truncated data record") config.firmware.append((addr, data[offset:offset + length])) offset += length return config
def __eq__(self, other): return (self.vendor_id == other.vendor_id and self.product_id == other.product_id and self.device_id == other.device_id and self.disconnect == other.disconnect and self.i2c_400khz == other.i2c_400khz and self.firmware == other.firmware) def __ne__(self, other): return not (self == other)
[docs]class FX2DeviceError(Exception): """An exception raised on a communication error."""
[docs]class FX2Device: """ A Cypress FX2 series device. The initializer of this class locates the device by provided VID:PID pair, or raises a :exc:`FX2DeviceError`. usb : usb1.USBDeviceHandle Raw USB device handle. """ def __init__(self, vendor_id=VID_CYPRESS, product_id=PID_FX2): self.timeout = 1000 self.usb_context = usb1.USBContext() try: self.usb = self.usb_context.openByVendorIDAndProductID(vendor_id, product_id) except usb1.USBErrorAccess: raise FX2DeviceError("Cannot access device {:04x}:{:04x}" .format(vendor_id, product_id)) if self.usb is None: raise FX2DeviceError("Device {:04x}:{:04x} not found" .format(vendor_id, product_id)) try: self.usb.setAutoDetachKernelDriver(True) except usb1.USBErrorNotSupported: pass
[docs] def control_read(self, request_type, request, value, index, length, timeout=None): """ Issue an USB control read request with timeout defaulting to ``self.timeout``. """ if timeout is None: timeout = self.timeout return self.usb.controlRead(request_type, request, value, index, length, timeout)
[docs] def control_write(self, request_type, request, value, index, data, timeout=None): """ Issue an USB control write request with timeout defaulting to ``self.timeout``. """ if timeout is None: timeout = self.timeout self.usb.controlWrite(request_type, request, value, index, data, timeout)
[docs] def bulk_read(self, endpoint, length, timeout=None): """ Issue an USB bulk read request with timeout defaulting to ``self.timeout``. """ if timeout is None: timeout = self.timeout return self.usb.bulkRead(endpoint, length, timeout)
[docs] def bulk_write(self, endpoint, data, timeout=None): """ Issue an USB bulk write request with timeout defaulting to ``self.timeout``. """ if timeout is None: timeout = self.timeout self.usb.bulkWrite(endpoint, data, timeout)
[docs] def read_ram(self, addr, length): """ Read ``length`` bytes at ``addr`` from internal RAM. Note that not all memory can be addressed this way; consult the TRM. """ data = bytearray() while length > 0: chunk_length = min(length, 4096) if addr & 1: # unaligned data += self.control_read(usb1.REQUEST_TYPE_VENDOR, REQ_RAM, addr, 0, chunk_length + 1)[1:] else: data += self.control_read(usb1.REQUEST_TYPE_VENDOR, REQ_RAM, addr, 0, chunk_length) addr += chunk_length length -= chunk_length return data
[docs] def write_ram(self, addr, data): """ Write ``data`` to ``addr`` to internal RAM. Note that not all memory can be addressed this way; consult the TRM. """ while len(data) > 0: chunk_length = min(len(data), 4096) self.control_write(usb1.REQUEST_TYPE_VENDOR, REQ_RAM, addr, 0, data[:chunk_length]) addr += chunk_length data = data[chunk_length:]
[docs] def cpu_reset(self, is_reset): """Bring CPU in or out of reset.""" self.write_ram(REG_CPUCS, [1 if is_reset else 0])
[docs] def load_ram(self, chunks): """ Write ``chunks``, a list of ``(address, data)`` pairs, to internal RAM, and start the CPU core. See also ``write_ram``. """ self.cpu_reset(True) for address, data in chunks: self.write_ram(address, data) self.cpu_reset(False)
@staticmethod def _eeprom_cmd(addr_width): if addr_width == 1: return REQ_EEPROM_SB elif addr_width == 2: return REQ_EEPROM_DB else: raise ValueError("Address width {addr_width} is not supported" .format(addr_width=addr_width))
[docs] def read_boot_eeprom(self, addr, length, addr_width, chunk_size=0x100): """ Read ``length`` bytes at ``addr`` from boot EEPROM in ``chunk_size`` chunks. Requires the second stage bootloader. """ data = bytearray() while length > 0: chunk_length = min(length, chunk_size) data += self.control_read(usb1.REQUEST_TYPE_VENDOR, self._eeprom_cmd(addr_width), addr, 0, chunk_length) addr += chunk_length length -= chunk_length return data
[docs] def write_boot_eeprom(self, addr, data, addr_width, chunk_size=0x10, page_size=0): """ Write ``data`` to ``addr`` in boot EEPROM that has ``2 ** page_size`` byte pages in ``chunk_size`` chunks. Writing EEPROM is much slower than reading; for best performance, specify ``page_size`` per EEPROM datasheet, and set ``chunk_size`` to a small multiple of ``2 ** page_size``. Otherwise, timeouts may occur. Requires the second stage bootloader or a compatible firmware. """ self.control_write(usb1.REQUEST_TYPE_VENDOR, REQ_PAGE_SIZE, page_size, 0, []) while len(data) > 0: chunk_length = min(len(data), chunk_size) self.control_write(usb1.REQUEST_TYPE_VENDOR, self._eeprom_cmd(addr_width), addr, 0, data[:chunk_length]) addr += chunk_length data = data[chunk_length:]
def read_ext_ram(self, addr, length): """ Read ``length`` bytes at ``addr`` in RAM using the ``movx`` instruction. Unlike ``read_ram``, this can access the entire address space. Requires the second stage bootloader. """ return self.control_read(usb1.REQUEST_TYPE_VENDOR, REQ_EXT_RAM, addr, 0, length) def write_ext_ram(self, addr, data): """ Write ``data`` to ``addr`` in RAM using the ``movx`` instruction. Unlike ``write_ram``, this can access the entire address space. Requires the second stage bootloader or a compatible firmware. """ self.control_write(usb1.REQUEST_TYPE_VENDOR, REQ_EXT_RAM, addr, 0, data)
[docs] def reenumerate(self): """ Trigger re-enumeration. Requires the second stage bootloader or a compatible firmware. """ self.control_write(usb1.REQUEST_TYPE_VENDOR, REQ_RENUMERATE, 0, 0, [])