# gw.py # # Greaseweazle control script. # # Written & released by Keir Fraser # # This is free and unencumbered software released into the public domain. # See the file COPYING for more details, or visit . import crcmod.predefined import sys, struct, argparse, serial from timeit import default_timer as timer from greaseweazle import version, USB # 40MHz scp_freq = 40000000 # flux_to_scp: # Converts Greaseweazle flux samples into a Supercard Pro Track. # Returns the Track Data Header (TDH) and the SCP "bitcell" array. def flux_to_scp(flux, index_times, track, nr_revs): factor = scp_freq / usb.sample_freq tdh = struct.pack("<3sB", b"TRK", track) dat = bytearray() len_at_index = rev = 0 to_index = index_times[0] rem = 0.0 for x in flux: # Are we processing initial samples before the first revolution? if rev == 0: if to_index >= x: # Discard initial samples to_index -= x continue # Now starting the first full revolution rev = 1 to_index += index_times[rev] # Does the next flux interval cross the index mark? while to_index < x: # Append to the Track Data Header for the previous full revolution tdh += struct.pack(" nr_revs: # We're done: We simply discard any surplus flux samples return tdh, dat to_index += index_times[rev] # Process the current flux sample into SCP "bitcell" format to_index -= x y = x * factor + rem val = int(round(y)) if (val & 65535) == 0: val += 1 rem = y - val while val >= 65536: dat.append(0) dat.append(0) val -= 65536 dat.append(val>>8) dat.append(val&255) # Header for last track(s) in case we ran out of flux timings. while rev <= nr_revs: tdh += struct.pack("> (nr_sides - 1) side = track & (nr_sides - 1) print("\rReading Track %u.%u..." % (cyl, side), end="") trk_offs.append(len(trk_dat)) usb.seek(cyl, side) for retry in range(1, 5): ack, index_times, enc_flux = usb.read_track(args.revs+1) if ack == USB.Ack.Okay: break elif ack == USB.Ack.FluxOverflow and retry < 5: print("Retry #%u..." % (retry)) else: raise CmdError(ack) flux = usb.decode_flux(enc_flux) tdh, dat = flux_to_scp(flux, index_times, track, args.revs) trk_dat += tdh trk_dat += dat print() csum = 0 for x in trk_dat: csum += x trk_offs_dat = bytearray() for x in trk_offs: trk_offs_dat += struct.pack("= 5: raise CmdError(ack) drive_ticks = (index_times[1] + index_times[2]) / 2 else: # Simple ratio between the Greaseweazle and SCP sample frequencies. factor = usb.sample_freq / scp_freq # Parse the SCP image header. with open(args.file, "rb") as f: dat = f.read() header = struct.unpack("<3s9BI", dat[0:16]) assert header[0] == b"SCP" trk_offs = struct.unpack("<168I", dat[16:0x2b0]) if args.single_sided: track_range = range(args.scyl, args.ecyl+1) nr_sides = 1 else: track_range = range(args.scyl*2, (args.ecyl+1)*2) nr_sides = 2 for i in track_range: cyl = i >> (nr_sides - 1) side = i & (nr_sides - 1) print("\rWriting Track %u.%u..." % (cyl, side), end="") if trk_offs[i] == 0: continue usb.seek(cyl, side) # Parse the SCP track header and extract the flux data. thdr = struct.unpack("<3sBIII", dat[trk_offs[i]:trk_offs[i]+16]) sig, _, track_ticks, samples,off = thdr assert sig == b"TRK" tdat = dat[trk_offs[i]+off:trk_offs[i]+off+samples*2] # Decode the SCP flux data into a simple list of flux times. flux = [] rem = 0.0 if args.adjust_speed: # @factor adjusts flux times for speed variations between the # read-in and write-out drives. factor = drive_ticks / track_ticks for i in range(0,len(tdat),2): x = tdat[i]*256 + tdat[i+1] if x == 0: rem += 65536.0 continue y = x * factor + rem val = int(round(y)) rem = y - val flux.append(val) # Encode the flux times for Greaseweazle, and write them out. enc_flux = usb.encode_flux(flux) for retry in range(1, 5): ack = usb.write_track(enc_flux) if ack == USB.Ack.Okay: break elif ack == USB.Ack.FluxUnderflow and retry < 5: print("Retry #%u..." % (retry)) else: raise CmdError(ack) print() # update_firmware: # Updates the Greaseweazle firmware using the specified Update File. def update_firmware(args): # Check that an update operation was actually requested. if args.action != "update": print("Greaseweazle is in Firmware Update Mode:") print(" The only available action is \"update \"") if usb.update_jumpered: print(" Remove the Update Jumper for normal operation") else: print(" Main firmware is erased: You *must* perform an update!") return # Check that the firmware is actually in update mode. if not usb.update_mode: print("Greaseweazle is in Normal Mode:") print(" To \"update\" you must install the Update Jumper") return # Read and check the update file. with open(args.file, "rb") as f: dat = f.read() sig, maj, min, pad1, pad2, crc = struct.unpack(">2s4BH", dat[-8:]) if len(dat) & 3 != 0 or sig != b'GW' or pad1 != 0 or pad2 != 0: print("%s: Bad update file" % (args.file)) return crc16 = crcmod.predefined.Crc('crc-ccitt-false') crc16.update(dat) if crc16.crcValue != 0: print("%s: Bad CRC" % (args.file)) # Perform the update. print("Updating to v%u.%u..." % (maj, min)) ack = usb.update_firmware(dat) if ack != 0: print("** UPDATE FAILED: Please retry!") return print("Done.") print("** Disconnect Greaseweazle and remove the Programming Jumper.") # _main: # Argument processing and dispatch. def _main(argv): actions = { "read" : read_to_scp, "write" : write_from_scp, "update" : update_firmware } parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("action") parser.add_argument("--revs", type=int, default=3, help="number of revolutions to read per track") parser.add_argument("--scyl", type=int, default=0, help="first cylinder to read/write") parser.add_argument("--ecyl", type=int, default=81, help="last cylinder to read/write") parser.add_argument("--single-sided", action="store_true", help="read/write a single-sided image") parser.add_argument("--adjust-speed", action="store_true", help="adjust write-flux times for drive speed") parser.add_argument("file", help="in/out filename") parser.add_argument("device", help="serial device") args = parser.parse_args(argv[1:]) if not args.action in actions: print("** Action \"%s\" is not recognised" % args.action) print("Valid actions: ", end="") print(", ".join(str(key) for key in actions.keys())) return global usb usb = USB.Unit(serial.Serial(args.device)) print("** %s v%u.%u, Host Tools v%u.%u" % (("Greaseweazle","Bootloader")[usb.update_mode], usb.major, usb.minor, version.major, version.minor)) if args.action == "update" or usb.update_mode: return actions[args.action](args) elif usb.update_needed: print("Firmware is out of date: Require v%u.%u" % (version.major, version.minor)) print("Install the Update Jumper and \"update \"") return #usb.step_delay = 5000 #print("Select Delay: %uus" % usb.select_delay) #print("Step Delay: %uus" % usb.step_delay) #print("Settle Time: %ums" % usb.seek_settle_delay) #print("Motor Delay: %ums" % usb.motor_delay) #print("Auto Off: %ums" % usb.auto_off_delay) try: usb.drive_select(True) usb.drive_motor(True) actions[args.action](args) except KeyboardInterrupt: print() usb.reset() usb.ser.close() usb.ser.open() finally: usb.drive_motor(False) usb.drive_select(False) def main(argv): try: _main(argv) except USB.CmdError as error: print("Command Failed: %s" % error) if __name__ == "__main__": main(sys.argv) # Local variables: # python-indent: 4 # End: