Browse Source

python: Break the low-level stuff out into a separate module.

Keir Fraser 5 năm trước cách đây
mục cha
commit
8ad29b77b6
2 tập tin đã thay đổi với 333 bổ sung251 xóa
  1. 280 0
      scripts/greaseweazle/USB.py
  2. 53 251
      scripts/gw.py

+ 280 - 0
scripts/greaseweazle/USB.py

@@ -0,0 +1,280 @@
+# greaseweazle/USB.py
+#
+# Written & released by Keir Fraser <keir.xen@gmail.com>
+#
+# This is free and unencumbered software released into the public domain.
+# See the file COPYING for more details, or visit <http://unlicense.org>.
+
+import struct, collections
+
+
+## Control-Path command set
+class ControlCmd:
+  ClearComms      = 10000
+  Normal          =  9600
+
+  
+## Command set
+class Cmd:
+  GetInfo         =  0
+  Seek            =  1
+  Side            =  2
+  SetParams       =  3
+  GetParams       =  4
+  Motor           =  5
+  ReadFlux        =  6
+  WriteFlux       =  7
+  GetFluxStatus   =  8
+  GetIndexTimes   =  9
+  Select          = 10
+  # Bootloader specific:
+  Update          =  1
+
+  
+## Command responses/acknowledgements
+class Ack:
+  Okay            = 0
+  BadCommand      = 1
+  NoIndex         = 2
+  NoTrk0          = 3
+  FluxOverflow    = 4
+  FluxUnderflow   = 5
+  Wrprot          = 6
+  Max             = 6
+
+  
+## Cmd.{Get,Set}Params indexes
+class Params:
+  Delays          = 0
+
+
+## CmdError: Encapsulates a command acknowledgement.
+class CmdError(Exception):
+
+  str = [ "Okay", "Bad Command", "No Index (No disk?)", "Track 0 not found",
+          "Flux Overflow", "Flux Underflow", "Disk is Write Protected" ]
+
+  def __init__(self, cmd, code):
+    self.cmd = cmd
+    self.code = code
+
+  def __str__(self):
+    if self.code <= Ack.Max:
+      return self.str[self.code]
+    return "Unknown Error (%u)" % self.code
+
+
+class Unit:
+
+  ## Unit information, instance variables:
+  ##  major, minor: Greaseweazle firmware version number
+  ##  max_index:    Maximum index timings for Cmd.ReadFlux
+  ##  max_cmd:      Maximum Cmd number accepted by this unit
+  ##  sample_freq:  Resolution of all time values passed to/from this unit
+
+  ## Unit(ser):
+  ## Accepts a Pyserial instance for Greaseweazle communications.
+  def __init__(self, ser):
+    self.ser = ser
+    self.reset()
+    # Initialise the delay properties with current firmware values.
+    self.send_cmd(struct.pack("4B", Cmd.GetParams, 4, Params.Delays, 10))
+    (self._select_delay, self._step_delay,
+     self._seek_settle_delay, self._motor_delay,
+     self._auto_off_delay) = struct.unpack("<5H", self.ser.read(10))
+    # Copy firmware info to instance variables (see above for definitions).
+    self.send_cmd(struct.pack("4B", Cmd.GetInfo, 4, 0, 8))
+    (self.major, self.minor, self.max_index,
+     self.max_cmd, self.sample_freq) = struct.unpack("<4BI", self.ser.read(8))
+    
+
+  ## reset:
+  ## Resets communications with Greaseweazle.
+  def reset(self):
+    self.ser.reset_output_buffer()
+    self.ser.baudrate = ControlCmd.ClearComms
+    self.ser.baudrate = ControlCmd.Normal
+    self.ser.reset_input_buffer()
+
+
+  ## send_cmd:
+  ## Send given command byte sequence to Greaseweazle.
+  ## Raise a CmdError if command fails.
+  def send_cmd(self, cmd):
+    self.ser.write(cmd)
+    (c,r) = struct.unpack("2B", self.ser.read(2))
+    assert c == cmd[0]
+    if r != 0:
+      raise CmdError(c,r)
+
+
+  ## seek:
+  ## Seek currently-selected drive's heads to the specified track (cyl, side).
+  def seek(self, cyl, side):
+    self.send_cmd(struct.pack("3B", Cmd.Seek, 3, cyl))
+    self.send_cmd(struct.pack("3B", Cmd.Side, 3, side))
+
+
+  ## drive_select:
+  ## Select/deselect the drive.
+  def drive_select(self, state):
+    self.send_cmd(struct.pack("3B", Cmd.Select, 3, int(state)))
+
+
+  ## drive_motor:
+  ## Turn the selected drive's motor on/off.
+  def drive_motor(self, state):
+    self.send_cmd(struct.pack("3B", Cmd.Motor, 3, int(state)))
+
+
+  ## get_index_times:
+  ## Get index timing values for the last .read_track() command.
+  def get_index_times(self, nr):
+    self.send_cmd(struct.pack("4B", Cmd.GetIndexTimes, 4, 0, nr))
+    x = struct.unpack("<%dI" % nr, self.ser.read(4*nr))
+    return x
+
+
+  ## update_firmware:
+  ## Update Greaseweazle to the given new firmware.
+  def update_firmware(self, dat):
+    self.send_cmd(struct.pack("<2BI", Cmd.Update, 6, len(dat)))
+    self.ser.write(dat)
+    (ack,) = struct.unpack("B", self.ser.read(1))
+    return ack
+
+  
+  ## decode_flux:
+  ## Decode the Greaseweazle data stream into a list of flux samples.
+  def decode_flux(self, dat):
+    flux = []
+    while dat:
+      i = dat.popleft()
+      if i < 250:
+        flux.append(i)
+      elif i == 255:
+        val =  (dat.popleft() & 254) >>  1
+        val += (dat.popleft() & 254) <<  6
+        val += (dat.popleft() & 254) << 13
+        val += (dat.popleft() & 254) << 20
+        flux.append(val)
+      else:
+        val = (i - 249) * 250
+        val += dat.popleft() - 1
+        flux.append(val)
+    assert flux[-1] == 0
+    return flux[:-1]
+
+
+  ## encode_flux:
+  ## Convert the given flux timings into an encoded data stream.
+  def encode_flux(self, flux):
+    dat = bytearray()
+    for val in flux:
+      if val == 0:
+        pass
+      elif val < 250:
+        dat.append(val)
+      else:
+        high = val // 250
+        if high <= 5:
+          dat.append(249+high)
+          dat.append(1 + val%250)
+        else:
+          dat.append(255)
+          dat.append(1 | (val<<1) & 255)
+          dat.append(1 | (val>>6) & 255)
+          dat.append(1 | (val>>13) & 255)
+          dat.append(1 | (val>>20) & 255)
+    dat.append(0) # End of Stream
+    return dat
+
+  
+  ## read_track:
+  ## Read flux timings as encoded data stream for the current track.
+  def read_track(self, nr_idx):
+    dat = collections.deque()
+    self.send_cmd(struct.pack("3B", Cmd.ReadFlux, 3, nr_idx))
+    while True:
+      dat += self.ser.read(1)
+      dat += self.ser.read(self.ser.in_waiting)
+      if dat[-1] == 0:
+        break
+    try:
+      self.send_cmd(struct.pack("2B", Cmd.GetFluxStatus, 2))
+    except CmdError as error:
+      del dat
+      return (error.code, None)
+    return (Ack.Okay, dat)
+    
+
+  ## write_track:
+  ## Write the given data stream to the current track via Greaseweazle.
+  def write_track(self, dat):
+    self.send_cmd(struct.pack("<2BIB", Cmd.WriteFlux, 7, 0, 1))
+    self.ser.write(dat)
+    self.ser.read(1) # Sync with Greaseweazle
+    try:
+      self.send_cmd(struct.pack("2B", Cmd.GetFluxStatus, 2))
+    except CmdError as error:
+      return error.code
+    return Ack.Okay
+
+  
+  ##
+  ## Delay-property public getters and setters:
+  ##  select_delay:      Delay (usec) after asserting drive select
+  ##  step_delay:        Delay (usec) after issuing a head-step command
+  ##  seek_settle_delay: Delay (msec) after completing a head-seek operation
+  ##  motor_delay:       Delay (msec) after turning on drive spindle motor
+  ##  auto_off_delay:    Timeout (msec) since last command upon which all
+  ##                     drives are deselected and spindle motors turned off
+  ##
+
+  def _set_delays(self):
+    self.send_cmd(struct.pack("<3B5H", Cmd.SetParams,
+                              3+5*2, Params.Delays,
+                              self._select_delay, self._step_delay,
+                              self._seek_settle_delay,
+                              self._motor_delay, self._auto_off_delay))
+
+  @property
+  def select_delay(self):
+    return self._select_delay
+  @select_delay.setter
+  def select_delay(self, select_delay):
+    self._select_delay = select_delay
+    self._set_delays()
+    
+  @property
+  def step_delay(self):
+    return self._step_delay
+  @step_delay.setter
+  def step_delay(self, step_delay):
+    self._step_delay = step_delay
+    self._set_delays()
+    
+  @property
+  def seek_settle_delay(self):
+    return self._seek_settle_delay
+  @seek_settle_delay.setter
+  def seek_settle_delay(self, seek_settle_delay):
+    self._seek_settle_delay = seek_settle_delay
+    self._set_delays()
+    
+  @property
+  def motor_delay(self):
+    return self._motor_delay
+  @motor_delay.setter
+  def motor_delay(self, motor_delay):
+    self._motor_delay = motor_delay
+    self._set_delays()
+    
+  @property
+  def auto_off_delay(self):
+    return self._auto_off_delay
+  @auto_off_delay.setter
+  def auto_off_delay(self, auto_off_delay):
+    self._auto_off_delay = auto_off_delay
+    self._set_delays()
+    

+ 53 - 251
scripts/gw.py

@@ -8,228 +8,22 @@
 # See the file COPYING for more details, or visit <http://unlicense.org>.
 
 import crcmod.predefined
-import sys, struct, argparse, serial, collections
+import sys, struct, argparse, serial
 from timeit import default_timer as timer
 
-from greaseweazle import version
+from greaseweazle import version, USB
 
 # 40MHz
 scp_freq = 40000000
 
-BAUD_CLEAR_COMMS    = 10000
-BAUD_NORMAL         = 9600
-
-CMD_GET_INFO        =  0
-CMD_SEEK            =  1
-CMD_SIDE            =  2
-CMD_SET_PARAMS      =  3
-CMD_GET_PARAMS      =  4
-CMD_MOTOR           =  5
-CMD_READ_FLUX       =  6
-CMD_WRITE_FLUX      =  7
-CMD_GET_FLUX_STATUS =  8
-CMD_GET_INDEX_TIMES =  9
-CMD_SELECT          = 10
-
-# Bootloader-specific:
-CMD_UPDATE          = 1
-
-ACK_OKAY            = 0
-ACK_BAD_COMMAND     = 1
-ACK_NO_INDEX        = 2
-ACK_NO_TRK0         = 3
-ACK_FLUX_OVERFLOW   = 4
-ACK_FLUX_UNDERFLOW  = 5
-ACK_WRPROT          = 6
-ACK_MAX             = 6
-
-# CMD_{GET,SET}_PARAMS:
-PARAMS_DELAYS       = 0
-
-ack_str = [
-  "Okay", "Bad Command", "No Index", "Track 0 not found",
-  "Flux Overflow", "Flux Underflow", "Disk is Write Protected" ]
-
-class CmdError(Exception):
-  def __init__(self, cmd, code):
-    self.cmd = cmd
-    self.code = code
-  def __str__(self):
-    if self.code <= ACK_MAX:
-      return ack_str[self.code]
-    return "Unknown Error (%u)" % self.code
-
-def send_cmd(cmd):
-  ser.write(cmd)
-  (c,r) = struct.unpack("2B", ser.read(2))
-  assert c == cmd[0]
-  if r != 0:
-    raise CmdError(c,r)
-
-def get_fw_info():
-  send_cmd(struct.pack("4B", CMD_GET_INFO, 4, 0, 8))
-  return struct.unpack("<4BI", ser.read(8))
-
-def print_fw_info(info):
-  (major, minor, max_index, max_cmd, freq) = info
-  print("Greaseweazle v%u.%u" % (major, minor))
-  print("Max index timings %u" % (max_index))
-  print("Max cmd %u" % (max_cmd))
-  print("Sample frequency: %.2f MHz" % (freq / 1000000))
-  
-def seek(cyl, side):
-  send_cmd(struct.pack("3B", CMD_SEEK, 3, cyl))
-  send_cmd(struct.pack("3B", CMD_SIDE, 3, side))
-
-def get_delays():
-  send_cmd(struct.pack("4B", CMD_GET_PARAMS, 4, PARAMS_DELAYS, 5*2))
-  return struct.unpack("<5H", ser.read(5*2))
-  
-def print_delays(x):
-  (select_delay, step_delay, seek_settle, motor_delay, auto_off) = x
-  print("Select Delay: %uus" % select_delay)
-  print("Step Delay: %uus" % step_delay)
-  print("Settle Time: %ums" % seek_settle)
-  print("Motor Delay: %ums" % motor_delay)
-  print("Auto Off: %ums" % auto_off)
-
-def set_delays(seek_delay = None, step_delay = None, seek_settle = None,
-               motor_delay = None, auto_off = None):
-  (_seek_delay, _step_delay, _seek_settle,
-   _motor_delay, _auto_off) = get_delays()
-  if not seek_delay: seek_delay = _seek_delay
-  if not step_delay: step_delay = _step_delay
-  if not seek_settle: seek_settle = _seek_settle
-  if not motor_delay: motor_delay = _motor_delay
-  if not auto_off: auto_off = _auto_off
-  send_cmd(struct.pack("<3B5H", CMD_SET_PARAMS, 3+5*2, PARAMS_DELAYS,
-                       seek_delay, step_delay, seek_settle,
-                       motor_delay, auto_off))
-
-def drive_select(state):
-  send_cmd(struct.pack("3B", CMD_SELECT, 3, int(state)))
-
-def drive_motor(state):
-  send_cmd(struct.pack("3B", CMD_MOTOR, 3, int(state)))
-
-def get_index_times(nr):
-  send_cmd(struct.pack("4B", CMD_GET_INDEX_TIMES, 4, 0, nr))
-  x = struct.unpack("<%dI" % nr, ser.read(4*nr))
-  return x
-
-def print_index_times(index_times):
-  for ticks in index_times:
-    print("%u ticks" % (ticks))
-
-
-# write_flux:
-# Write the current track via Greaseweazle with the specified flux timings.
-def write_flux(flux):
-  start = timer()
-  x = bytearray()
-  for val in flux:
-    if val == 0:
-      pass
-    elif val < 250:
-      x.append(val)
-    else:
-      high = val // 250
-      if high <= 5:
-        x.append(249+high)
-        x.append(1 + val%250)
-      else:
-        x.append(255)
-        x.append(1 | (val<<1) & 255)
-        x.append(1 | (val>>6) & 255)
-        x.append(1 | (val>>13) & 255)
-        x.append(1 | (val>>20) & 255)
-  x.append(0) # End of Stream
-  end = timer()
-  #print("%u flux -> %u bytes in %f seconds" % (len(flux), len(x), end-start))
-  retry = 0
-  while True:
-    start = timer()
-    send_cmd(struct.pack("<2BIB", CMD_WRITE_FLUX, 7, 0, 1))
-    ser.write(x)
-    ser.read(1) # Sync with Greaseweazle
-    try:
-      send_cmd(struct.pack("2B", CMD_GET_FLUX_STATUS, 2))
-    except CmdError as error:
-      if error.code == ACK_FLUX_UNDERFLOW and retry < 5:
-        retry += 1
-        print("Retry #%u..." % retry)
-        continue;
-      raise
-    end = timer()
-    #print("Track written in %f seconds" % (end-start))
-    break
-
-
-# read_flux:
-# Read flux timings from Greaseweazle for the current track.
-def read_flux(nr_idx):
-
-  # Read the encoded flux stream, retrying if necessary.
-  retry = 0
-  while True:
-    start = timer()
-    x = collections.deque()
-    send_cmd(struct.pack("3B", CMD_READ_FLUX, 3, nr_idx))
-    nr = 0
-    while True:
-      x += ser.read(1)
-      x += ser.read(ser.in_waiting)
-      nr += 1;
-      if x[-1] == 0:
-        break
-    try:
-      send_cmd(struct.pack("2B", CMD_GET_FLUX_STATUS, 2))
-    except CmdError as error:
-      if error.code == ACK_FLUX_OVERFLOW and retry < 5:
-        retry += 1
-        print("Retry #%u..." % retry)
-        del x
-        continue;
-      raise
-    end = timer()
-    break
-    
-  #print("Read %u bytes in %u batches in %f seconds" % (len(x), nr, end-start))
-
-  # Decode the flux stream into a list of flux samples.
-  start = timer()
-  y = []
-  while x:
-    i = x.popleft()
-    if i < 250:
-      y.append(i)
-    elif i == 255:
-      val =  (x.popleft() & 254) >>  1
-      val += (x.popleft() & 254) <<  6
-      val += (x.popleft() & 254) << 13
-      val += (x.popleft() & 254) << 20
-      y.append(val)
-    else:
-      val = (i - 249) * 250
-      val += x.popleft() - 1
-      y.append(val)
-  assert y[-1] == 0
-  y = y[:-1]
-  end = timer()
-
-  #print("Processed %u flux values in %f seconds" % (len(y), end-start))
-
-  return y
-
-
 # flux_to_scp:
 # Converts Greaseweazle flux samples into a Supercard Pro Track.
 # Returns the Track Data Header (TDH) and the SCP "bitcell" array.
 def flux_to_scp(flux, track, nr_revs):
 
-  factor = scp_freq / sample_freq
+  factor = scp_freq / usb.sample_freq
 
-  index_times = get_index_times(nr_revs+1)
+  index_times = usb.get_index_times(nr_revs+1)
   tdh = struct.pack("<3sB", b"TRK", track)
   dat = bytearray()
 
@@ -306,8 +100,16 @@ def read_to_scp(args):
     side = track & (nr_sides - 1)
     print("\rReading Track %u.%u..." % (cyl, side), end="")
     trk_offs.append(len(trk_dat))
-    seek(cyl, side)
-    flux = read_flux(args.revs+1)
+    usb.seek(cyl, side)
+    for retry in range(1, 5):
+      (ack, enc_flux) = usb.read_track(args.revs+1)
+      if ack == USB.Ack.Okay:
+        break
+      elif ack == USB.Ack.FluxOverflow and retry < 5:
+        print("Retry #%u..." % (retry))
+      else:
+        raise CmdError(ack)
+    flux = usb.decode_flux(enc_flux)
     (tdh, dat) = flux_to_scp(flux, track, args.revs)
     trk_dat += tdh
     trk_dat += dat
@@ -345,7 +147,7 @@ def read_to_scp(args):
 # write_from_scp:
 # Writes the specified Supercard Pro image file to floppy disk.
 def write_from_scp(args):
-  factor = sample_freq / scp_freq
+  factor = usb.sample_freq / scp_freq
   with open(args.file, "rb") as f:
     dat = f.read()
   header = struct.unpack("<3s9BI", dat[0:16])
@@ -363,7 +165,7 @@ def write_from_scp(args):
     print("\rWriting Track %u.%u..." % (cyl, side), end="")
     if trk_offs[i] == 0:
       continue
-    seek(cyl, side)
+    usb.seek(cyl, side)
     thdr = struct.unpack("<3sBIII", dat[trk_offs[i]:trk_offs[i]+16])
     (sig,_,_,samples,off) = thdr
     assert sig == b"TRK"
@@ -379,7 +181,15 @@ def write_from_scp(args):
       val = int(round(y))
       rem = y - val
       flux.append(val)
-    write_flux(flux)
+    enc_flux = usb.encode_flux(flux)
+    for retry in range(1, 5):
+      ack = usb.write_track(enc_flux)
+      if ack == USB.Ack.Okay:
+        break
+      elif ack == USB.Ack.FluxUnderflow and retry < 5:
+        print("Retry #%u..." % (retry))
+      else:
+        raise CmdError(ack)
   print()
 
 
@@ -397,9 +207,7 @@ def update_firmware(args):
   if crc16.crcValue != 0:
     print("%s: Bad CRC" % (args.file))
   print("Updating to v%u.%u..." % (maj, min))
-  send_cmd(struct.pack("<2BI", CMD_UPDATE, 6, len(dat)))
-  ser.write(dat)
-  (ack,) = struct.unpack("B", ser.read(1))
+  ack = usb.update_firmware(dat)
   if ack != 0:
     print("** UPDATE FAILED: Please retry!")
     return
@@ -437,24 +245,19 @@ def _main(argv):
     print(", ".join(str(key) for key in actions.keys()))
     return
   
-  global ser
-  ser = serial.Serial(args.device)
-  ser.baudrate = BAUD_CLEAR_COMMS
-  ser.baudrate = BAUD_NORMAL
-  ser.reset_input_buffer()
+  global usb
+  usb = USB.Unit(serial.Serial(args.device))
 
-  global sample_freq
-  info = get_fw_info()
-  sample_freq = info[4]
-  update_mode = (info[2] == 0)
+  update_mode = (usb.max_index == 0)
 
   print("** %s v%u.%u, Host Tools v%u.%u"
-        % (("Greaseweazle","Bootloader")[update_mode], info[0], info[1],
+        % (("Greaseweazle","Bootloader")[update_mode],
+           usb.major, usb.minor,
            version.major, version.minor))
   
   if (not update_mode
-      and (version.major > info[0]
-           or (version.major == info[0] and version.minor > info[1]))):
+      and (version.major > usb.major
+           or (version.major == usb.major and version.minor > usb.minor))):
     print("Firmware is out of date: Require >= v%u.%u"
           % (version.major, version.minor))
     print("Install the Update Jumper and \"update <update_file>\"")
@@ -463,7 +266,7 @@ def _main(argv):
   if update_mode and args.action != "update":
     print("Greaseweazle is in Firmware Update Mode:")
     print(" The only available action is \"update <update_file>\"")
-    if info[4] & 1:
+    if usb.sample_freq & 1:
       print(" Remove the Update Jumper for normal operation")
     else:
       print(" Main firmware is erased: You *must* perform an update!")
@@ -473,35 +276,34 @@ def _main(argv):
     print("Greaseweazle is in Normal Mode:")
     print(" To \"update\" you must install the Update Jumper")
     return
-  
-  set_delays(step_delay=5000)
-  print_delays(get_delays())
 
-  if not update_mode:
-    drive_select(True)
-    drive_motor(True)
-  
+  usb.step_delay = 5000
+  print("Select Delay: %uus" % usb.select_delay)
+  print("Step Delay: %uus" % usb.step_delay)
+  print("Settle Time: %ums" % usb.seek_settle_delay)
+  print("Motor Delay: %ums" % usb.motor_delay)
+  print("Auto Off: %ums" % usb.auto_off_delay)
+
   try:
-    actions[args.action](args)
-  except:
     if not update_mode:
-      ser.reset_output_buffer()
-      ser.baudrate = BAUD_CLEAR_COMMS
-      ser.baudrate = BAUD_NORMAL
-      ser.reset_input_buffer()
-      drive_motor(False)
-      drive_select(False)
-    raise
-  else:
+      usb.drive_select(True)
+      usb.drive_motor(True)
+    actions[args.action](args)
+  except KeyboardInterrupt:
+    print()
+    usb.reset()
+    usb.ser.close()
+    usb.ser.open()
+  finally:
     if not update_mode:
-      drive_motor(False)
-      drive_select(False)
+      usb.drive_motor(False)
+      usb.drive_select(False)
 
 
 def main(argv):
   try:
     _main(argv)
-  except CmdError as error:
+  except USB.CmdError as error:
     print("Command Failed: %s" % error)