gw.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. # gw.py
  2. #
  3. # Greaseweazle control script.
  4. #
  5. # Written & released by Keir Fraser <keir.xen@gmail.com>
  6. #
  7. # This is free and unencumbered software released into the public domain.
  8. # See the file COPYING for more details, or visit <http://unlicense.org>.
  9. #from timeit import default_timer as timer
  10. import sys, struct, argparse, serial
  11. import crcmod.predefined
  12. from greaseweazle import version
  13. from greaseweazle import USB
  14. from greaseweazle.bitcell import Bitcell
  15. from greaseweazle.flux import Flux
  16. # 40MHz
  17. scp_freq = 40000000
  18. # flux_to_scp:
  19. # Converts Greaseweazle flux samples into a Supercard Pro Track.
  20. # Returns the Track Data Header (TDH) and the SCP "bitcell" array.
  21. def flux_to_scp(flux, track, nr_revs):
  22. factor = scp_freq / flux.sample_freq
  23. tdh = struct.pack("<3sB", b"TRK", track)
  24. dat = bytearray()
  25. len_at_index = rev = 0
  26. to_index = flux.index_list[0]
  27. rem = 0.0
  28. for x in flux.list:
  29. # Are we processing initial samples before the first revolution?
  30. if rev == 0:
  31. if to_index >= x:
  32. # Discard initial samples
  33. to_index -= x
  34. continue
  35. # Now starting the first full revolution
  36. rev = 1
  37. to_index += flux.index_list[rev]
  38. # Does the next flux interval cross the index mark?
  39. while to_index < x:
  40. # Append to the Track Data Header for the previous full revolution
  41. tdh += struct.pack("<III",
  42. int(round(flux.index_list[rev]*factor)),
  43. (len(dat) - len_at_index) // 2,
  44. 4 + nr_revs*12 + len_at_index)
  45. # Set up for the next revolution
  46. len_at_index = len(dat)
  47. rev += 1
  48. if rev > nr_revs:
  49. # We're done: We simply discard any surplus flux samples
  50. return tdh, dat
  51. to_index += flux.index_list[rev]
  52. # Process the current flux sample into SCP "bitcell" format
  53. to_index -= x
  54. y = x * factor + rem
  55. val = int(round(y))
  56. if (val & 65535) == 0:
  57. val += 1
  58. rem = y - val
  59. while val >= 65536:
  60. dat.append(0)
  61. dat.append(0)
  62. val -= 65536
  63. dat.append(val>>8)
  64. dat.append(val&255)
  65. # Header for last track(s) in case we ran out of flux timings.
  66. while rev <= nr_revs:
  67. tdh += struct.pack("<III",
  68. int(round(flux.index_list[rev]*factor)),
  69. (len(dat) - len_at_index) // 2,
  70. 4 + nr_revs*12 + len_at_index)
  71. len_at_index = len(dat)
  72. rev += 1
  73. return tdh, dat
  74. # read_to_scp:
  75. # Reads a floppy disk and dumps it into a new Supercard Pro image file.
  76. def read_to_scp(usb, args):
  77. trk_dat = bytearray()
  78. trk_offs = []
  79. if args.single_sided:
  80. track_range = range(args.scyl, args.ecyl+1)
  81. nr_sides = 1
  82. else:
  83. track_range = range(args.scyl*2, (args.ecyl+1)*2)
  84. nr_sides = 2
  85. for track in track_range:
  86. cyl = track >> (nr_sides - 1)
  87. side = track & (nr_sides - 1)
  88. print("\rReading Track %u.%u..." % (cyl, side), end="")
  89. trk_offs.append(len(trk_dat))
  90. usb.seek(cyl, side)
  91. for retry in range(1, 5):
  92. ack, index_list, enc_flux = usb.read_track(args.revs+1)
  93. if ack == USB.Ack.Okay:
  94. break
  95. elif ack == USB.Ack.FluxOverflow and retry < 5:
  96. print("Retry #%u..." % (retry))
  97. else:
  98. raise CmdError(ack)
  99. flux = Flux(index_list, usb.decode_flux(enc_flux), usb.sample_freq)
  100. tdh, dat = flux_to_scp(flux, track, args.revs)
  101. trk_dat += tdh
  102. trk_dat += dat
  103. print()
  104. csum = 0
  105. for x in trk_dat:
  106. csum += x
  107. trk_offs_dat = bytearray()
  108. for x in trk_offs:
  109. trk_offs_dat += struct.pack("<I", 0x2b0 + x)
  110. trk_offs_dat += bytes(0x2a0 - len(trk_offs_dat))
  111. for x in trk_offs_dat:
  112. csum += x
  113. ds_flag = 0
  114. if args.single_sided:
  115. ds_flag = 1
  116. header_dat = struct.pack("<3s9BI",
  117. b"SCP", # Signature
  118. 0, # Version
  119. 0x80, # DiskType = Other
  120. args.revs, # Nr Revolutions
  121. track_range.start, # Start track
  122. track_range.stop-1, # End track
  123. 0x01, # Flags = Index
  124. 0, # 16-bit cell width
  125. ds_flag, # Double Sided
  126. 0, # 25ns capture
  127. csum & 0xffffffff)
  128. with open(args.file, "wb") as f:
  129. f.write(header_dat)
  130. f.write(trk_offs_dat)
  131. f.write(trk_dat)
  132. # write_from_scp:
  133. # Writes the specified Supercard Pro image file to floppy disk.
  134. def write_from_scp(usb, args):
  135. if args.adjust_speed:
  136. # @drive_ticks is the time in Gresaeweazle ticks between index pulses.
  137. # We will adjust the flux intervals per track to allow for this.
  138. for retry in range(1, 5):
  139. ack, index_list, _ = usb.read_track(3)
  140. if ack == USB.Ack.Okay:
  141. break
  142. elif ack != USB.Ack.FluxOverflow or retry >= 5:
  143. raise CmdError(ack)
  144. drive_ticks = (index_list[1] + index_list[2]) / 2
  145. else:
  146. # Simple ratio between the Greaseweazle and SCP sample frequencies.
  147. factor = usb.sample_freq / scp_freq
  148. # Parse the SCP image header.
  149. with open(args.file, "rb") as f:
  150. dat = f.read()
  151. header = struct.unpack("<3s9BI", dat[0:16])
  152. assert header[0] == b"SCP"
  153. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  154. if args.single_sided:
  155. track_range = range(args.scyl, args.ecyl+1)
  156. nr_sides = 1
  157. else:
  158. track_range = range(args.scyl*2, (args.ecyl+1)*2)
  159. nr_sides = 2
  160. for track in track_range:
  161. cyl = track >> (nr_sides - 1)
  162. side = track & (nr_sides - 1)
  163. print("\rWriting Track %u.%u..." % (cyl, side), end="")
  164. trk_off = trk_offs[track]
  165. if trk_off == 0:
  166. continue
  167. usb.seek(cyl, side)
  168. # Parse the SCP track header and extract the flux data.
  169. thdr = struct.unpack("<3sBIII", dat[trk_off:trk_off+16])
  170. sig, _, track_ticks, samples, off = thdr
  171. assert sig == b"TRK"
  172. tdat = dat[trk_off+off:trk_off+off+samples*2]
  173. # Decode the SCP flux data into a simple list of flux times.
  174. flux = []
  175. rem = 0.0
  176. if args.adjust_speed:
  177. # @factor adjusts flux times for speed variations between the
  178. # read-in and write-out drives.
  179. factor = drive_ticks / track_ticks
  180. for i in range(0, len(tdat), 2):
  181. x = tdat[i]*256 + tdat[i+1]
  182. if x == 0:
  183. rem += 65536.0
  184. continue
  185. y = x * factor + rem
  186. val = int(round(y))
  187. rem = y - val
  188. flux.append(val)
  189. # Encode the flux times for Greaseweazle, and write them out.
  190. enc_flux = usb.encode_flux(flux)
  191. for retry in range(1, 5):
  192. ack = usb.write_track(enc_flux)
  193. if ack == USB.Ack.Okay:
  194. break
  195. elif ack == USB.Ack.FluxUnderflow and retry < 5:
  196. print("Retry #%u..." % (retry))
  197. else:
  198. raise CmdError(ack)
  199. print()
  200. # update_firmware:
  201. # Updates the Greaseweazle firmware using the specified Update File.
  202. def update_firmware(usb, args):
  203. # Check that an update operation was actually requested.
  204. if args.action != "update":
  205. print("Greaseweazle is in Firmware Update Mode:")
  206. print(" The only available action is \"update <update_file>\"")
  207. if usb.update_jumpered:
  208. print(" Remove the Update Jumper for normal operation")
  209. else:
  210. print(" Main firmware is erased: You *must* perform an update!")
  211. return
  212. # Check that the firmware is actually in update mode.
  213. if not usb.update_mode:
  214. print("Greaseweazle is in Normal Mode:")
  215. print(" To \"update\" you must install the Update Jumper")
  216. return
  217. # Read and check the update file.
  218. with open(args.file, "rb") as f:
  219. dat = f.read()
  220. sig, maj, min, pad1, pad2, crc = struct.unpack(">2s4BH", dat[-8:])
  221. if len(dat) & 3 != 0 or sig != b'GW' or pad1 != 0 or pad2 != 0:
  222. print("%s: Bad update file" % (args.file))
  223. return
  224. crc16 = crcmod.predefined.Crc('crc-ccitt-false')
  225. crc16.update(dat)
  226. if crc16.crcValue != 0:
  227. print("%s: Bad CRC" % (args.file))
  228. # Perform the update.
  229. print("Updating to v%u.%u..." % (maj, min))
  230. ack = usb.update_firmware(dat)
  231. if ack != 0:
  232. print("** UPDATE FAILED: Please retry!")
  233. return
  234. print("Done.")
  235. print("** Disconnect Greaseweazle and remove the Programming Jumper.")
  236. # _main:
  237. # Argument processing and dispatch.
  238. def _main(argv):
  239. actions = {
  240. "read" : read_to_scp,
  241. "write" : write_from_scp,
  242. "update" : update_firmware
  243. }
  244. parser = argparse.ArgumentParser(
  245. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  246. parser.add_argument("action")
  247. parser.add_argument("--revs", type=int, default=3,
  248. help="number of revolutions to read per track")
  249. parser.add_argument("--scyl", type=int, default=0,
  250. help="first cylinder to read/write")
  251. parser.add_argument("--ecyl", type=int, default=81,
  252. help="last cylinder to read/write")
  253. parser.add_argument("--single-sided", action="store_true",
  254. help="read/write a single-sided image")
  255. parser.add_argument("--adjust-speed", action="store_true",
  256. help="adjust write-flux times for drive speed")
  257. parser.add_argument("file", help="in/out filename")
  258. parser.add_argument("device", help="serial device")
  259. args = parser.parse_args(argv[1:])
  260. if not args.action in actions:
  261. print("** Action \"%s\" is not recognised" % args.action)
  262. print("Valid actions: ", end="")
  263. print(", ".join(str(key) for key in actions.keys()))
  264. return
  265. usb = USB.Unit(serial.Serial(args.device))
  266. print("** %s v%u.%u, Host Tools v%u.%u"
  267. % (("Greaseweazle", "Bootloader")[usb.update_mode],
  268. usb.major, usb.minor,
  269. version.major, version.minor))
  270. if args.action == "update" or usb.update_mode:
  271. actions[args.action](usb, args)
  272. return
  273. elif usb.update_needed:
  274. print("Firmware is out of date: Require v%u.%u"
  275. % (version.major, version.minor))
  276. print("Install the Update Jumper and \"update <update_file>\"")
  277. return
  278. #usb.step_delay = 5000
  279. #print("Select Delay: %uus" % usb.select_delay)
  280. #print("Step Delay: %uus" % usb.step_delay)
  281. #print("Settle Time: %ums" % usb.seek_settle_delay)
  282. #print("Motor Delay: %ums" % usb.motor_delay)
  283. #print("Auto Off: %ums" % usb.auto_off_delay)
  284. try:
  285. usb.drive_select(True)
  286. usb.drive_motor(True)
  287. actions[args.action](usb, args)
  288. except KeyboardInterrupt:
  289. print()
  290. usb.reset()
  291. usb.ser.close()
  292. usb.ser.open()
  293. finally:
  294. usb.drive_motor(False)
  295. usb.drive_select(False)
  296. def main(argv):
  297. try:
  298. _main(argv)
  299. except USB.CmdError as error:
  300. print("Command Failed: %s" % error)
  301. if __name__ == "__main__":
  302. main(sys.argv)
  303. # Local variables:
  304. # python-indent: 4
  305. # End: