gw.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. # gw.py
  2. #
  3. # Greaseweazle control script.
  4. #
  5. # Written & released by Keir Fraser <keir.xen@gmail.com>
  6. #
  7. # This is free and unencumbered software released into the public domain.
  8. # See the file COPYING for more details, or visit <http://unlicense.org>.
  9. import crcmod.predefined
  10. import sys, struct, argparse, serial
  11. from timeit import default_timer as timer
  12. from greaseweazle import version, USB
  13. # 40MHz
  14. scp_freq = 40000000
  15. # flux_to_scp:
  16. # Converts Greaseweazle flux samples into a Supercard Pro Track.
  17. # Returns the Track Data Header (TDH) and the SCP "bitcell" array.
  18. def flux_to_scp(flux, track, nr_revs):
  19. factor = scp_freq / usb.sample_freq
  20. index_times = usb.get_index_times(nr_revs+1)
  21. tdh = struct.pack("<3sB", b"TRK", track)
  22. dat = bytearray()
  23. len_at_index = rev = 0
  24. to_index = index_times[0]
  25. rem = 0.0
  26. for x in flux:
  27. # Are we processing initial samples before the first revolution?
  28. if rev == 0:
  29. if to_index >= x:
  30. # Discard initial samples
  31. to_index -= x
  32. continue
  33. # Now starting the first full revolution
  34. rev = 1
  35. to_index += index_times[rev]
  36. # Does the next flux interval cross the index mark?
  37. while to_index < x:
  38. # Append to the Track Data Header for the previous full revolution
  39. tdh += struct.pack("<III",
  40. int(round(index_times[rev]*factor)),
  41. (len(dat) - len_at_index) // 2,
  42. 4 + nr_revs*12 + len_at_index)
  43. # Set up for the next revolution
  44. len_at_index = len(dat)
  45. rev += 1
  46. if rev > nr_revs:
  47. # We're done: We simply discard any surplus flux samples
  48. return (tdh, dat)
  49. to_index += index_times[rev]
  50. # Process the current flux sample into SCP "bitcell" format
  51. to_index -= x
  52. y = x * factor + rem
  53. val = int(round(y))
  54. if (val & 65535) == 0:
  55. val += 1
  56. rem = y - val
  57. while val >= 65536:
  58. dat.append(0)
  59. dat.append(0)
  60. val -= 65536
  61. dat.append(val>>8)
  62. dat.append(val&255)
  63. # Header for last track(s) in case we ran out of flux timings.
  64. while rev <= nr_revs:
  65. tdh += struct.pack("<III",
  66. int(round(index_times[rev]*factor)),
  67. (len(dat) - len_at_index) // 2,
  68. 4 + nr_revs*12 + len_at_index)
  69. len_at_index = len(dat)
  70. rev += 1
  71. return (tdh, dat)
  72. # read_to_scp:
  73. # Reads a floppy disk and dumps it into a new Supercard Pro image file.
  74. def read_to_scp(args):
  75. trk_dat = bytearray()
  76. trk_offs = []
  77. if args.single_sided:
  78. track_range = range(args.scyl, args.ecyl+1)
  79. nr_sides = 1
  80. else:
  81. track_range = range(args.scyl*2, (args.ecyl+1)*2)
  82. nr_sides = 2
  83. for track in track_range:
  84. cyl = track >> (nr_sides - 1)
  85. side = track & (nr_sides - 1)
  86. print("\rReading Track %u.%u..." % (cyl, side), end="")
  87. trk_offs.append(len(trk_dat))
  88. usb.seek(cyl, side)
  89. for retry in range(1, 5):
  90. (ack, enc_flux) = usb.read_track(args.revs+1)
  91. if ack == USB.Ack.Okay:
  92. break
  93. elif ack == USB.Ack.FluxOverflow and retry < 5:
  94. print("Retry #%u..." % (retry))
  95. else:
  96. raise CmdError(ack)
  97. flux = usb.decode_flux(enc_flux)
  98. (tdh, dat) = flux_to_scp(flux, track, args.revs)
  99. trk_dat += tdh
  100. trk_dat += dat
  101. print()
  102. csum = 0
  103. for x in trk_dat:
  104. csum += x
  105. trk_offs_dat = bytearray()
  106. for x in trk_offs:
  107. trk_offs_dat += struct.pack("<I", 0x2b0 + x)
  108. trk_offs_dat += bytes(0x2a0 - len(trk_offs_dat))
  109. for x in trk_offs_dat:
  110. csum += x
  111. ds_flag = 0
  112. if args.single_sided:
  113. ds_flag = 1
  114. header_dat = struct.pack("<3s9BI",
  115. b"SCP", # Signature
  116. 0, # Version
  117. 0x80, # DiskType = Other
  118. args.revs, # Nr Revolutions
  119. track_range.start, # Start track
  120. track_range.stop-1, # End track
  121. 0x01, # Flags = Index
  122. 0, # 16-bit cell width
  123. ds_flag, # Double Sided
  124. 0, # 25ns capture
  125. csum & 0xffffffff)
  126. with open(args.file, "wb") as f:
  127. f.write(header_dat)
  128. f.write(trk_offs_dat)
  129. f.write(trk_dat)
  130. # write_from_scp:
  131. # Writes the specified Supercard Pro image file to floppy disk.
  132. def write_from_scp(args):
  133. factor = usb.sample_freq / scp_freq
  134. with open(args.file, "rb") as f:
  135. dat = f.read()
  136. header = struct.unpack("<3s9BI", dat[0:16])
  137. assert header[0] == b"SCP"
  138. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  139. if args.single_sided:
  140. track_range = range(args.scyl, args.ecyl+1)
  141. nr_sides = 1
  142. else:
  143. track_range = range(args.scyl*2, (args.ecyl+1)*2)
  144. nr_sides = 2
  145. for i in track_range:
  146. cyl = i >> (nr_sides - 1)
  147. side = i & (nr_sides - 1)
  148. print("\rWriting Track %u.%u..." % (cyl, side), end="")
  149. if trk_offs[i] == 0:
  150. continue
  151. usb.seek(cyl, side)
  152. thdr = struct.unpack("<3sBIII", dat[trk_offs[i]:trk_offs[i]+16])
  153. (sig,_,_,samples,off) = thdr
  154. assert sig == b"TRK"
  155. tdat = dat[trk_offs[i]+off:trk_offs[i]+off+samples*2]
  156. flux = []
  157. rem = 0.0
  158. for i in range(0,len(tdat),2):
  159. x = tdat[i]*256 + tdat[i+1]
  160. if x == 0:
  161. rem += 65536.0
  162. continue
  163. y = x * factor + rem
  164. val = int(round(y))
  165. rem = y - val
  166. flux.append(val)
  167. enc_flux = usb.encode_flux(flux)
  168. for retry in range(1, 5):
  169. ack = usb.write_track(enc_flux)
  170. if ack == USB.Ack.Okay:
  171. break
  172. elif ack == USB.Ack.FluxUnderflow and retry < 5:
  173. print("Retry #%u..." % (retry))
  174. else:
  175. raise CmdError(ack)
  176. print()
  177. # update_firmware:
  178. # Updates the Greaseweazle firmware using the specified Update File.
  179. def update_firmware(args):
  180. # Check that an update operation was actually requested.
  181. if args.action != "update":
  182. print("Greaseweazle is in Firmware Update Mode:")
  183. print(" The only available action is \"update <update_file>\"")
  184. if usb.update_jumpered:
  185. print(" Remove the Update Jumper for normal operation")
  186. else:
  187. print(" Main firmware is erased: You *must* perform an update!")
  188. return
  189. # Check that the firmware is actually in update mode.
  190. if not usb.update_mode:
  191. print("Greaseweazle is in Normal Mode:")
  192. print(" To \"update\" you must install the Update Jumper")
  193. return
  194. # Read and check the update file.
  195. with open(args.file, "rb") as f:
  196. dat = f.read()
  197. (sig, maj, min, pad1, pad2, crc) = struct.unpack(">2s4BH", dat[-8:])
  198. if len(dat) & 3 != 0 or sig != b'GW' or pad1 != 0 or pad2 != 0:
  199. print("%s: Bad update file" % (args.file))
  200. return
  201. crc16 = crcmod.predefined.Crc('crc-ccitt-false')
  202. crc16.update(dat)
  203. if crc16.crcValue != 0:
  204. print("%s: Bad CRC" % (args.file))
  205. # Perform the update.
  206. print("Updating to v%u.%u..." % (maj, min))
  207. ack = usb.update_firmware(dat)
  208. if ack != 0:
  209. print("** UPDATE FAILED: Please retry!")
  210. return
  211. print("Done.")
  212. print("** Disconnect Greaseweazle and remove the Programming Jumper.")
  213. # _main:
  214. # Argument processing and dispatch.
  215. def _main(argv):
  216. actions = {
  217. "read" : read_to_scp,
  218. "write" : write_from_scp,
  219. "update" : update_firmware
  220. }
  221. parser = argparse.ArgumentParser(
  222. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  223. parser.add_argument("action")
  224. parser.add_argument("--revs", type=int, default=3,
  225. help="number of revolutions to read per track")
  226. parser.add_argument("--scyl", type=int, default=0,
  227. help="first cylinder to read/write")
  228. parser.add_argument("--ecyl", type=int, default=81,
  229. help="last cylinder to read/write")
  230. parser.add_argument("--single-sided", action="store_true")
  231. parser.add_argument("file", help="in/out filename")
  232. parser.add_argument("device", help="serial device")
  233. args = parser.parse_args(argv[1:])
  234. if not args.action in actions:
  235. print("** Action \"%s\" is not recognised" % args.action)
  236. print("Valid actions: ", end="")
  237. print(", ".join(str(key) for key in actions.keys()))
  238. return
  239. global usb
  240. usb = USB.Unit(serial.Serial(args.device))
  241. print("** %s v%u.%u, Host Tools v%u.%u"
  242. % (("Greaseweazle","Bootloader")[usb.update_mode],
  243. usb.major, usb.minor,
  244. version.major, version.minor))
  245. if args.action == "update" or usb.update_mode:
  246. return actions[args.action](args)
  247. elif usb.update_needed:
  248. print("Firmware is out of date: Require v%u.%u"
  249. % (version.major, version.minor))
  250. print("Install the Update Jumper and \"update <update_file>\"")
  251. return
  252. #usb.step_delay = 5000
  253. #print("Select Delay: %uus" % usb.select_delay)
  254. #print("Step Delay: %uus" % usb.step_delay)
  255. #print("Settle Time: %ums" % usb.seek_settle_delay)
  256. #print("Motor Delay: %ums" % usb.motor_delay)
  257. #print("Auto Off: %ums" % usb.auto_off_delay)
  258. try:
  259. usb.drive_select(True)
  260. usb.drive_motor(True)
  261. actions[args.action](args)
  262. except KeyboardInterrupt:
  263. print()
  264. usb.reset()
  265. usb.ser.close()
  266. usb.ser.open()
  267. finally:
  268. usb.drive_motor(False)
  269. usb.drive_select(False)
  270. def main(argv):
  271. try:
  272. _main(argv)
  273. except USB.CmdError as error:
  274. print("Command Failed: %s" % error)
  275. if __name__ == "__main__":
  276. main(sys.argv)
  277. # Local variables:
  278. # python-indent: 4
  279. # End: