gw.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. # gw.py
  2. #
  3. # Greaseweazle control script.
  4. #
  5. # Written & released by Keir Fraser <keir.xen@gmail.com>
  6. #
  7. # This is free and unencumbered software released into the public domain.
  8. # See the file COPYING for more details, or visit <http://unlicense.org>.
  9. #from timeit import default_timer as timer
  10. import sys, struct, argparse, serial
  11. import crcmod.predefined
  12. from greaseweazle import version
  13. from greaseweazle import USB
  14. from greaseweazle.bitcell import Bitcell
  15. from greaseweazle.flux import Flux
  16. from greaseweazle.scp import SCP
  17. # read_to_scp:
  18. # Reads a floppy disk and dumps it into a new Supercard Pro image file.
  19. def read_to_scp(usb, args):
  20. nr_sides = 1 if args.single_sided else 2
  21. scp = SCP(args.scyl, nr_sides)
  22. for cyl in range(args.scyl, args.ecyl+1):
  23. for side in range(0, nr_sides):
  24. print("\rReading Track %u.%u..." % (cyl, side), end="")
  25. usb.seek(cyl, side)
  26. for retry in range(1, 5):
  27. ack, index_list, enc_flux = usb.read_track(args.revs+1)
  28. if ack == USB.Ack.Okay:
  29. break
  30. elif ack == USB.Ack.FluxOverflow and retry < 5:
  31. print("Retry #%u..." % (retry))
  32. else:
  33. raise CmdError(ack)
  34. flux = Flux(index_list, usb.decode_flux(enc_flux), usb.sample_freq)
  35. scp.append_track(flux)
  36. print()
  37. with open(args.file, "wb") as f:
  38. f.write(scp.get_image())
  39. # write_from_scp:
  40. # Writes the specified Supercard Pro image file to floppy disk.
  41. def write_from_scp(usb, args):
  42. if args.adjust_speed:
  43. # @drive_ticks is the time in Gresaeweazle ticks between index pulses.
  44. # We will adjust the flux intervals per track to allow for this.
  45. for retry in range(1, 5):
  46. ack, index_list, _ = usb.read_track(3)
  47. if ack == USB.Ack.Okay:
  48. break
  49. elif ack != USB.Ack.FluxOverflow or retry >= 5:
  50. raise CmdError(ack)
  51. drive_ticks = (index_list[1] + index_list[2]) / 2
  52. else:
  53. # Simple ratio between the Greaseweazle and SCP sample frequencies.
  54. factor = usb.sample_freq / SCP.sample_freq
  55. # Parse the SCP image header.
  56. with open(args.file, "rb") as f:
  57. dat = f.read()
  58. header = struct.unpack("<3s9BI", dat[0:16])
  59. assert header[0] == b"SCP"
  60. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  61. if args.single_sided:
  62. track_range = range(args.scyl, args.ecyl+1)
  63. nr_sides = 1
  64. else:
  65. track_range = range(args.scyl*2, (args.ecyl+1)*2)
  66. nr_sides = 2
  67. for track in track_range:
  68. cyl = track >> (nr_sides - 1)
  69. side = track & (nr_sides - 1)
  70. print("\rWriting Track %u.%u..." % (cyl, side), end="")
  71. trk_off = trk_offs[track]
  72. if trk_off == 0:
  73. continue
  74. usb.seek(cyl, side)
  75. # Parse the SCP track header and extract the flux data.
  76. thdr = struct.unpack("<3sBIII", dat[trk_off:trk_off+16])
  77. sig, _, track_ticks, samples, off = thdr
  78. assert sig == b"TRK"
  79. tdat = dat[trk_off+off:trk_off+off+samples*2]
  80. # Decode the SCP flux data into a simple list of flux times.
  81. flux = []
  82. rem = 0.0
  83. if args.adjust_speed:
  84. # @factor adjusts flux times for speed variations between the
  85. # read-in and write-out drives.
  86. factor = drive_ticks / track_ticks
  87. for i in range(0, len(tdat), 2):
  88. x = tdat[i]*256 + tdat[i+1]
  89. if x == 0:
  90. rem += 65536.0
  91. continue
  92. y = x * factor + rem
  93. val = int(round(y))
  94. rem = y - val
  95. flux.append(val)
  96. # Encode the flux times for Greaseweazle, and write them out.
  97. enc_flux = usb.encode_flux(flux)
  98. for retry in range(1, 5):
  99. ack = usb.write_track(enc_flux)
  100. if ack == USB.Ack.Okay:
  101. break
  102. elif ack == USB.Ack.FluxUnderflow and retry < 5:
  103. print("Retry #%u..." % (retry))
  104. else:
  105. raise CmdError(ack)
  106. print()
  107. # update_firmware:
  108. # Updates the Greaseweazle firmware using the specified Update File.
  109. def update_firmware(usb, args):
  110. # Check that an update operation was actually requested.
  111. if args.action != "update":
  112. print("Greaseweazle is in Firmware Update Mode:")
  113. print(" The only available action is \"update <update_file>\"")
  114. if usb.update_jumpered:
  115. print(" Remove the Update Jumper for normal operation")
  116. else:
  117. print(" Main firmware is erased: You *must* perform an update!")
  118. return
  119. # Check that the firmware is actually in update mode.
  120. if not usb.update_mode:
  121. print("Greaseweazle is in Normal Mode:")
  122. print(" To \"update\" you must install the Update Jumper")
  123. return
  124. # Read and check the update file.
  125. with open(args.file, "rb") as f:
  126. dat = f.read()
  127. sig, maj, min, pad1, pad2, crc = struct.unpack(">2s4BH", dat[-8:])
  128. if len(dat) & 3 != 0 or sig != b'GW' or pad1 != 0 or pad2 != 0:
  129. print("%s: Bad update file" % (args.file))
  130. return
  131. crc16 = crcmod.predefined.Crc('crc-ccitt-false')
  132. crc16.update(dat)
  133. if crc16.crcValue != 0:
  134. print("%s: Bad CRC" % (args.file))
  135. # Perform the update.
  136. print("Updating to v%u.%u..." % (maj, min))
  137. ack = usb.update_firmware(dat)
  138. if ack != 0:
  139. print("** UPDATE FAILED: Please retry!")
  140. return
  141. print("Done.")
  142. print("** Disconnect Greaseweazle and remove the Programming Jumper.")
  143. # _main:
  144. # Argument processing and dispatch.
  145. def _main(argv):
  146. actions = {
  147. "read" : read_to_scp,
  148. "write" : write_from_scp,
  149. "update" : update_firmware
  150. }
  151. parser = argparse.ArgumentParser(
  152. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  153. parser.add_argument("action")
  154. parser.add_argument("--revs", type=int, default=3,
  155. help="number of revolutions to read per track")
  156. parser.add_argument("--scyl", type=int, default=0,
  157. help="first cylinder to read/write")
  158. parser.add_argument("--ecyl", type=int, default=81,
  159. help="last cylinder to read/write")
  160. parser.add_argument("--single-sided", action="store_true",
  161. help="read/write a single-sided image")
  162. parser.add_argument("--adjust-speed", action="store_true",
  163. help="adjust write-flux times for drive speed")
  164. parser.add_argument("file", help="in/out filename")
  165. parser.add_argument("device", help="serial device")
  166. args = parser.parse_args(argv[1:])
  167. if not args.action in actions:
  168. print("** Action \"%s\" is not recognised" % args.action)
  169. print("Valid actions: ", end="")
  170. print(", ".join(str(key) for key in actions.keys()))
  171. return
  172. usb = USB.Unit(serial.Serial(args.device))
  173. print("** %s v%u.%u, Host Tools v%u.%u"
  174. % (("Greaseweazle", "Bootloader")[usb.update_mode],
  175. usb.major, usb.minor,
  176. version.major, version.minor))
  177. if args.action == "update" or usb.update_mode:
  178. actions[args.action](usb, args)
  179. return
  180. elif usb.update_needed:
  181. print("Firmware is out of date: Require v%u.%u"
  182. % (version.major, version.minor))
  183. print("Install the Update Jumper and \"update <update_file>\"")
  184. return
  185. #usb.step_delay = 5000
  186. #print("Select Delay: %uus" % usb.select_delay)
  187. #print("Step Delay: %uus" % usb.step_delay)
  188. #print("Settle Time: %ums" % usb.seek_settle_delay)
  189. #print("Motor Delay: %ums" % usb.motor_delay)
  190. #print("Auto Off: %ums" % usb.auto_off_delay)
  191. try:
  192. usb.drive_select(True)
  193. usb.drive_motor(True)
  194. actions[args.action](usb, args)
  195. except KeyboardInterrupt:
  196. print()
  197. usb.reset()
  198. usb.ser.close()
  199. usb.ser.open()
  200. finally:
  201. usb.drive_motor(False)
  202. usb.drive_select(False)
  203. def main(argv):
  204. try:
  205. _main(argv)
  206. except USB.CmdError as error:
  207. print("Command Failed: %s" % error)
  208. if __name__ == "__main__":
  209. main(sys.argv)
  210. # Local variables:
  211. # python-indent: 4
  212. # End: