# gw.py # # Greaseweazle control script. # # Written & released by Keir Fraser # # This is free and unencumbered software released into the public domain. # See the file COPYING for more details, or visit . import crcmod.predefined import sys, struct, argparse, serial from timeit import default_timer as timer from greaseweazle import version, USB # 40MHz scp_freq = 40000000 # flux_to_scp: # Converts Greaseweazle flux samples into a Supercard Pro Track. # Returns the Track Data Header (TDH) and the SCP "bitcell" array. def flux_to_scp(flux, track, nr_revs): factor = scp_freq / usb.sample_freq index_times = usb.get_index_times(nr_revs+1) tdh = struct.pack("<3sB", b"TRK", track) dat = bytearray() len_at_index = rev = 0 to_index = index_times[0] rem = 0.0 for x in flux: # Are we processing initial samples before the first revolution? if rev == 0: if to_index >= x: # Discard initial samples to_index -= x continue # Now starting the first full revolution rev = 1 to_index += index_times[rev] # Does the next flux interval cross the index mark? while to_index < x: # Append to the Track Data Header for the previous full revolution tdh += struct.pack(" nr_revs: # We're done: We simply discard any surplus flux samples return (tdh, dat) to_index += index_times[rev] # Process the current flux sample into SCP "bitcell" format to_index -= x y = x * factor + rem val = int(round(y)) if (val & 65535) == 0: val += 1 rem = y - val while val >= 65536: dat.append(0) dat.append(0) val -= 65536 dat.append(val>>8) dat.append(val&255) # Header for last track(s) in case we ran out of flux timings. while rev <= nr_revs: tdh += struct.pack("> (nr_sides - 1) side = track & (nr_sides - 1) print("\rReading Track %u.%u..." % (cyl, side), end="") trk_offs.append(len(trk_dat)) usb.seek(cyl, side) for retry in range(1, 5): (ack, enc_flux) = usb.read_track(args.revs+1) if ack == USB.Ack.Okay: break elif ack == USB.Ack.FluxOverflow and retry < 5: print("Retry #%u..." % (retry)) else: raise CmdError(ack) flux = usb.decode_flux(enc_flux) (tdh, dat) = flux_to_scp(flux, track, args.revs) trk_dat += tdh trk_dat += dat print() csum = 0 for x in trk_dat: csum += x trk_offs_dat = bytearray() for x in trk_offs: trk_offs_dat += struct.pack("> (nr_sides - 1) side = i & (nr_sides - 1) print("\rWriting Track %u.%u..." % (cyl, side), end="") if trk_offs[i] == 0: continue usb.seek(cyl, side) thdr = struct.unpack("<3sBIII", dat[trk_offs[i]:trk_offs[i]+16]) (sig,_,_,samples,off) = thdr assert sig == b"TRK" tdat = dat[trk_offs[i]+off:trk_offs[i]+off+samples*2] flux = [] rem = 0.0 for i in range(0,len(tdat),2): x = tdat[i]*256 + tdat[i+1] if x == 0: rem += 65536.0 continue y = x * factor + rem val = int(round(y)) rem = y - val flux.append(val) enc_flux = usb.encode_flux(flux) for retry in range(1, 5): ack = usb.write_track(enc_flux) if ack == USB.Ack.Okay: break elif ack == USB.Ack.FluxUnderflow and retry < 5: print("Retry #%u..." % (retry)) else: raise CmdError(ack) print() # update_firmware: # Updates the Greaseweazle firmware using the specified Update File. def update_firmware(args): with open(args.file, "rb") as f: dat = f.read() (sig, maj, min, pad1, pad2, crc) = struct.unpack(">2s4BH", dat[-8:]) if len(dat) & 3 != 0 or sig != b'GW' or pad1 != 0 or pad2 != 0: print("%s: Bad update file" % (args.file)) return crc16 = crcmod.predefined.Crc('crc-ccitt-false') crc16.update(dat) if crc16.crcValue != 0: print("%s: Bad CRC" % (args.file)) print("Updating to v%u.%u..." % (maj, min)) ack = usb.update_firmware(dat) if ack != 0: print("** UPDATE FAILED: Please retry!") return print("Done.") print("** Disconnect Greaseweazle and remove the Programming Jumper.") # _main: # Argument processing and dispatch. def _main(argv): actions = { "read" : read_to_scp, "write" : write_from_scp, "update" : update_firmware } parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("action") parser.add_argument("--revs", type=int, default=3, help="number of revolutions to read per track") parser.add_argument("--scyl", type=int, default=0, help="first cylinder to read/write") parser.add_argument("--ecyl", type=int, default=81, help="last cylinder to read/write") parser.add_argument("--single-sided", action="store_true") parser.add_argument("file", help="in/out filename") parser.add_argument("device", help="serial device") args = parser.parse_args(argv[1:]) if not args.action in actions: print("** Action \"%s\" is not recognised" % args.action) print("Valid actions: ", end="") print(", ".join(str(key) for key in actions.keys())) return global usb usb = USB.Unit(serial.Serial(args.device)) update_mode = (usb.max_index == 0) print("** %s v%u.%u, Host Tools v%u.%u" % (("Greaseweazle","Bootloader")[update_mode], usb.major, usb.minor, version.major, version.minor)) if (not update_mode and (version.major > usb.major or (version.major == usb.major and version.minor > usb.minor))): print("Firmware is out of date: Require >= v%u.%u" % (version.major, version.minor)) print("Install the Update Jumper and \"update \"") return if update_mode and args.action != "update": print("Greaseweazle is in Firmware Update Mode:") print(" The only available action is \"update \"") if usb.sample_freq & 1: print(" Remove the Update Jumper for normal operation") else: print(" Main firmware is erased: You *must* perform an update!") return if not update_mode and args.action == "update": print("Greaseweazle is in Normal Mode:") print(" To \"update\" you must install the Update Jumper") return usb.step_delay = 5000 print("Select Delay: %uus" % usb.select_delay) print("Step Delay: %uus" % usb.step_delay) print("Settle Time: %ums" % usb.seek_settle_delay) print("Motor Delay: %ums" % usb.motor_delay) print("Auto Off: %ums" % usb.auto_off_delay) try: if not update_mode: usb.drive_select(True) usb.drive_motor(True) actions[args.action](args) except KeyboardInterrupt: print() usb.reset() usb.ser.close() usb.ser.open() finally: if not update_mode: usb.drive_motor(False) usb.drive_select(False) def main(argv): try: _main(argv) except USB.CmdError as error: print("Command Failed: %s" % error) if __name__ == "__main__": main(sys.argv)