gw.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. # gw.py
  2. #
  3. # Greaseweazle control script.
  4. #
  5. # Written & released by Keir Fraser <keir.xen@gmail.com>
  6. #
  7. # This is free and unencumbered software released into the public domain.
  8. # See the file COPYING for more details, or visit <http://unlicense.org>.
  9. import crcmod.predefined
  10. import sys, struct, argparse, serial
  11. from timeit import default_timer as timer
  12. from greaseweazle import version
  13. from greaseweazle import USB
  14. from greaseweazle.bitcell import Bitcell
  15. from greaseweazle.flux import Flux
  16. # 40MHz
  17. scp_freq = 40000000
  18. # flux_to_scp:
  19. # Converts Greaseweazle flux samples into a Supercard Pro Track.
  20. # Returns the Track Data Header (TDH) and the SCP "bitcell" array.
  21. def flux_to_scp(flux, track, nr_revs):
  22. factor = scp_freq / flux.sample_freq
  23. tdh = struct.pack("<3sB", b"TRK", track)
  24. dat = bytearray()
  25. len_at_index = rev = 0
  26. to_index = flux.index_list[0]
  27. rem = 0.0
  28. for x in flux.list:
  29. # Are we processing initial samples before the first revolution?
  30. if rev == 0:
  31. if to_index >= x:
  32. # Discard initial samples
  33. to_index -= x
  34. continue
  35. # Now starting the first full revolution
  36. rev = 1
  37. to_index += flux.index_list[rev]
  38. # Does the next flux interval cross the index mark?
  39. while to_index < x:
  40. # Append to the Track Data Header for the previous full revolution
  41. tdh += struct.pack("<III",
  42. int(round(flux.index_list[rev]*factor)),
  43. (len(dat) - len_at_index) // 2,
  44. 4 + nr_revs*12 + len_at_index)
  45. # Set up for the next revolution
  46. len_at_index = len(dat)
  47. rev += 1
  48. if rev > nr_revs:
  49. # We're done: We simply discard any surplus flux samples
  50. return tdh, dat
  51. to_index += flux.index_list[rev]
  52. # Process the current flux sample into SCP "bitcell" format
  53. to_index -= x
  54. y = x * factor + rem
  55. val = int(round(y))
  56. if (val & 65535) == 0:
  57. val += 1
  58. rem = y - val
  59. while val >= 65536:
  60. dat.append(0)
  61. dat.append(0)
  62. val -= 65536
  63. dat.append(val>>8)
  64. dat.append(val&255)
  65. # Header for last track(s) in case we ran out of flux timings.
  66. while rev <= nr_revs:
  67. tdh += struct.pack("<III",
  68. int(round(flux.index_list[rev]*factor)),
  69. (len(dat) - len_at_index) // 2,
  70. 4 + nr_revs*12 + len_at_index)
  71. len_at_index = len(dat)
  72. rev += 1
  73. return tdh, dat
  74. # read_to_scp:
  75. # Reads a floppy disk and dumps it into a new Supercard Pro image file.
  76. def read_to_scp(usb, args):
  77. trk_dat = bytearray()
  78. trk_offs = []
  79. if args.single_sided:
  80. track_range = range(args.scyl, args.ecyl+1)
  81. nr_sides = 1
  82. else:
  83. track_range = range(args.scyl*2, (args.ecyl+1)*2)
  84. nr_sides = 2
  85. for track in track_range:
  86. cyl = track >> (nr_sides - 1)
  87. side = track & (nr_sides - 1)
  88. print("\rReading Track %u.%u..." % (cyl, side), end="")
  89. trk_offs.append(len(trk_dat))
  90. usb.seek(cyl, side)
  91. for retry in range(1, 5):
  92. ack, index_list, enc_flux = usb.read_track(args.revs+1)
  93. if ack == USB.Ack.Okay:
  94. break
  95. elif ack == USB.Ack.FluxOverflow and retry < 5:
  96. print("Retry #%u..." % (retry))
  97. else:
  98. raise CmdError(ack)
  99. flux = Flux(index_list, usb.decode_flux(enc_flux), usb.sample_freq)
  100. tdh, dat = flux_to_scp(flux, track, args.revs)
  101. trk_dat += tdh
  102. trk_dat += dat
  103. print()
  104. csum = 0
  105. for x in trk_dat:
  106. csum += x
  107. trk_offs_dat = bytearray()
  108. for x in trk_offs:
  109. trk_offs_dat += struct.pack("<I", 0x2b0 + x)
  110. trk_offs_dat += bytes(0x2a0 - len(trk_offs_dat))
  111. for x in trk_offs_dat:
  112. csum += x
  113. ds_flag = 0
  114. if args.single_sided:
  115. ds_flag = 1
  116. header_dat = struct.pack("<3s9BI",
  117. b"SCP", # Signature
  118. 0, # Version
  119. 0x80, # DiskType = Other
  120. args.revs, # Nr Revolutions
  121. track_range.start, # Start track
  122. track_range.stop-1, # End track
  123. 0x01, # Flags = Index
  124. 0, # 16-bit cell width
  125. ds_flag, # Double Sided
  126. 0, # 25ns capture
  127. csum & 0xffffffff)
  128. with open(args.file, "wb") as f:
  129. f.write(header_dat)
  130. f.write(trk_offs_dat)
  131. f.write(trk_dat)
  132. # write_from_scp:
  133. # Writes the specified Supercard Pro image file to floppy disk.
  134. def write_from_scp(usb, args):
  135. if args.adjust_speed:
  136. # @drive_ticks is the time in Gresaeweazle ticks between index pulses.
  137. # We will adjust the flux intervals per track to allow for this.
  138. for retry in range(1, 5):
  139. ack, index_list, _ = usb.read_track(3)
  140. if ack == USB.Ack.Okay:
  141. break
  142. elif ack != USB.Ack.FluxOverflow or retry >= 5:
  143. raise CmdError(ack)
  144. drive_ticks = (index_list[1] + index_list[2]) / 2
  145. else:
  146. # Simple ratio between the Greaseweazle and SCP sample frequencies.
  147. factor = usb.sample_freq / scp_freq
  148. # Parse the SCP image header.
  149. with open(args.file, "rb") as f:
  150. dat = f.read()
  151. header = struct.unpack("<3s9BI", dat[0:16])
  152. assert header[0] == b"SCP"
  153. trk_offs = struct.unpack("<168I", dat[16:0x2b0])
  154. if args.single_sided:
  155. track_range = range(args.scyl, args.ecyl+1)
  156. nr_sides = 1
  157. else:
  158. track_range = range(args.scyl*2, (args.ecyl+1)*2)
  159. nr_sides = 2
  160. for i in track_range:
  161. cyl = i >> (nr_sides - 1)
  162. side = i & (nr_sides - 1)
  163. print("\rWriting Track %u.%u..." % (cyl, side), end="")
  164. if trk_offs[i] == 0:
  165. continue
  166. usb.seek(cyl, side)
  167. # Parse the SCP track header and extract the flux data.
  168. thdr = struct.unpack("<3sBIII", dat[trk_offs[i]:trk_offs[i]+16])
  169. sig, _, track_ticks, samples,off = thdr
  170. assert sig == b"TRK"
  171. tdat = dat[trk_offs[i]+off:trk_offs[i]+off+samples*2]
  172. # Decode the SCP flux data into a simple list of flux times.
  173. flux = []
  174. rem = 0.0
  175. if args.adjust_speed:
  176. # @factor adjusts flux times for speed variations between the
  177. # read-in and write-out drives.
  178. factor = drive_ticks / track_ticks
  179. for i in range(0,len(tdat),2):
  180. x = tdat[i]*256 + tdat[i+1]
  181. if x == 0:
  182. rem += 65536.0
  183. continue
  184. y = x * factor + rem
  185. val = int(round(y))
  186. rem = y - val
  187. flux.append(val)
  188. # Encode the flux times for Greaseweazle, and write them out.
  189. enc_flux = usb.encode_flux(flux)
  190. for retry in range(1, 5):
  191. ack = usb.write_track(enc_flux)
  192. if ack == USB.Ack.Okay:
  193. break
  194. elif ack == USB.Ack.FluxUnderflow and retry < 5:
  195. print("Retry #%u..." % (retry))
  196. else:
  197. raise CmdError(ack)
  198. print()
  199. # update_firmware:
  200. # Updates the Greaseweazle firmware using the specified Update File.
  201. def update_firmware(usb, args):
  202. # Check that an update operation was actually requested.
  203. if args.action != "update":
  204. print("Greaseweazle is in Firmware Update Mode:")
  205. print(" The only available action is \"update <update_file>\"")
  206. if usb.update_jumpered:
  207. print(" Remove the Update Jumper for normal operation")
  208. else:
  209. print(" Main firmware is erased: You *must* perform an update!")
  210. return
  211. # Check that the firmware is actually in update mode.
  212. if not usb.update_mode:
  213. print("Greaseweazle is in Normal Mode:")
  214. print(" To \"update\" you must install the Update Jumper")
  215. return
  216. # Read and check the update file.
  217. with open(args.file, "rb") as f:
  218. dat = f.read()
  219. sig, maj, min, pad1, pad2, crc = struct.unpack(">2s4BH", dat[-8:])
  220. if len(dat) & 3 != 0 or sig != b'GW' or pad1 != 0 or pad2 != 0:
  221. print("%s: Bad update file" % (args.file))
  222. return
  223. crc16 = crcmod.predefined.Crc('crc-ccitt-false')
  224. crc16.update(dat)
  225. if crc16.crcValue != 0:
  226. print("%s: Bad CRC" % (args.file))
  227. # Perform the update.
  228. print("Updating to v%u.%u..." % (maj, min))
  229. ack = usb.update_firmware(dat)
  230. if ack != 0:
  231. print("** UPDATE FAILED: Please retry!")
  232. return
  233. print("Done.")
  234. print("** Disconnect Greaseweazle and remove the Programming Jumper.")
  235. # _main:
  236. # Argument processing and dispatch.
  237. def _main(argv):
  238. actions = {
  239. "read" : read_to_scp,
  240. "write" : write_from_scp,
  241. "update" : update_firmware
  242. }
  243. parser = argparse.ArgumentParser(
  244. formatter_class=argparse.ArgumentDefaultsHelpFormatter)
  245. parser.add_argument("action")
  246. parser.add_argument("--revs", type=int, default=3,
  247. help="number of revolutions to read per track")
  248. parser.add_argument("--scyl", type=int, default=0,
  249. help="first cylinder to read/write")
  250. parser.add_argument("--ecyl", type=int, default=81,
  251. help="last cylinder to read/write")
  252. parser.add_argument("--single-sided", action="store_true",
  253. help="read/write a single-sided image")
  254. parser.add_argument("--adjust-speed", action="store_true",
  255. help="adjust write-flux times for drive speed")
  256. parser.add_argument("file", help="in/out filename")
  257. parser.add_argument("device", help="serial device")
  258. args = parser.parse_args(argv[1:])
  259. if not args.action in actions:
  260. print("** Action \"%s\" is not recognised" % args.action)
  261. print("Valid actions: ", end="")
  262. print(", ".join(str(key) for key in actions.keys()))
  263. return
  264. usb = USB.Unit(serial.Serial(args.device))
  265. print("** %s v%u.%u, Host Tools v%u.%u"
  266. % (("Greaseweazle","Bootloader")[usb.update_mode],
  267. usb.major, usb.minor,
  268. version.major, version.minor))
  269. if args.action == "update" or usb.update_mode:
  270. return actions[args.action](usb, args)
  271. elif usb.update_needed:
  272. print("Firmware is out of date: Require v%u.%u"
  273. % (version.major, version.minor))
  274. print("Install the Update Jumper and \"update <update_file>\"")
  275. return
  276. #usb.step_delay = 5000
  277. #print("Select Delay: %uus" % usb.select_delay)
  278. #print("Step Delay: %uus" % usb.step_delay)
  279. #print("Settle Time: %ums" % usb.seek_settle_delay)
  280. #print("Motor Delay: %ums" % usb.motor_delay)
  281. #print("Auto Off: %ums" % usb.auto_off_delay)
  282. try:
  283. usb.drive_select(True)
  284. usb.drive_motor(True)
  285. actions[args.action](usb, args)
  286. except KeyboardInterrupt:
  287. print()
  288. usb.reset()
  289. usb.ser.close()
  290. usb.ser.open()
  291. finally:
  292. usb.drive_motor(False)
  293. usb.drive_select(False)
  294. def main(argv):
  295. try:
  296. _main(argv)
  297. except USB.CmdError as error:
  298. print("Command Failed: %s" % error)
  299. if __name__ == "__main__":
  300. main(sys.argv)
  301. # Local variables:
  302. # python-indent: 4
  303. # End: