# gw.py # # Greaseweazle control script. # # Written & released by Keir Fraser # # This is free and unencumbered software released into the public domain. # See the file COPYING for more details, or visit . import crcmod.predefined import sys, struct, argparse, serial, collections from timeit import default_timer as timer from greaseweazle import version # 40MHz scp_freq = 40000000 BAUD_CLEAR_COMMS = 10000 BAUD_NORMAL = 9600 CMD_GET_INFO = 0 CMD_SEEK = 1 CMD_SIDE = 2 CMD_SET_PARAMS = 3 CMD_GET_PARAMS = 4 CMD_MOTOR = 5 CMD_READ_FLUX = 6 CMD_WRITE_FLUX = 7 CMD_GET_FLUX_STATUS = 8 CMD_GET_INDEX_TIMES = 9 CMD_SELECT = 10 # Bootloader-specific: CMD_UPDATE = 1 ACK_OKAY = 0 ACK_BAD_COMMAND = 1 ACK_NO_INDEX = 2 ACK_NO_TRK0 = 3 ACK_FLUX_OVERFLOW = 4 ACK_FLUX_UNDERFLOW = 5 ACK_WRPROT = 6 ACK_MAX = 6 # CMD_{GET,SET}_PARAMS: PARAMS_DELAYS = 0 ack_str = [ "Okay", "Bad Command", "No Index", "Track 0 not found", "Flux Overflow", "Flux Underflow", "Disk is Write Protected" ] class CmdError(Exception): def __init__(self, cmd, code): self.cmd = cmd self.code = code def __str__(self): if self.code <= ACK_MAX: return ack_str[self.code] return "Unknown Error (%u)" % self.code def send_cmd(cmd): ser.write(cmd) (c,r) = struct.unpack("2B", ser.read(2)) assert c == cmd[0] if r != 0: raise CmdError(c,r) def get_fw_info(): send_cmd(struct.pack("4B", CMD_GET_INFO, 4, 0, 8)) return struct.unpack("<4BI", ser.read(8)) def print_fw_info(info): (major, minor, max_index, max_cmd, freq) = info print("Greaseweazle v%u.%u" % (major, minor)) print("Max index timings %u" % (max_index)) print("Max cmd %u" % (max_cmd)) print("Sample frequency: %.2f MHz" % (freq / 1000000)) def seek(cyl, side): send_cmd(struct.pack("3B", CMD_SEEK, 3, cyl)) send_cmd(struct.pack("3B", CMD_SIDE, 3, side)) def get_delays(): send_cmd(struct.pack("4B", CMD_GET_PARAMS, 4, PARAMS_DELAYS, 5*2)) return struct.unpack("<5H", ser.read(5*2)) def print_delays(x): (select_delay, step_delay, seek_settle, motor_delay, auto_off) = x print("Select Delay: %uus" % select_delay) print("Step Delay: %uus" % step_delay) print("Settle Time: %ums" % seek_settle) print("Motor Delay: %ums" % motor_delay) print("Auto Off: %ums" % auto_off) def set_delays(seek_delay = None, step_delay = None, seek_settle = None, motor_delay = None, auto_off = None): (_seek_delay, _step_delay, _seek_settle, _motor_delay, _auto_off) = get_delays() if not seek_delay: seek_delay = _seek_delay if not step_delay: step_delay = _step_delay if not seek_settle: seek_settle = _seek_settle if not motor_delay: motor_delay = _motor_delay if not auto_off: auto_off = _auto_off send_cmd(struct.pack("<3B5H", CMD_SET_PARAMS, 3+5*2, PARAMS_DELAYS, seek_delay, step_delay, seek_settle, motor_delay, auto_off)) def drive_select(state): send_cmd(struct.pack("3B", CMD_SELECT, 3, int(state))) def drive_motor(state): send_cmd(struct.pack("3B", CMD_MOTOR, 3, int(state))) def get_index_times(nr): send_cmd(struct.pack("4B", CMD_GET_INDEX_TIMES, 4, 0, nr)) x = struct.unpack("<%dI" % nr, ser.read(4*nr)) return x def print_index_times(index_times): for ticks in index_times: print("%u ticks" % (ticks)) # write_flux: # Write the current track via Greaseweazle with the specified flux timings. def write_flux(flux): start = timer() x = bytearray() for val in flux: if val == 0: pass elif val < 250: x.append(val) else: high = val // 250 if high <= 5: x.append(249+high) x.append(1 + val%250) else: x.append(255) x.append(1 | (val<<1) & 255) x.append(1 | (val>>6) & 255) x.append(1 | (val>>13) & 255) x.append(1 | (val>>20) & 255) x.append(0) # End of Stream end = timer() #print("%u flux -> %u bytes in %f seconds" % (len(flux), len(x), end-start)) retry = 0 while True: start = timer() send_cmd(struct.pack("<2BIB", CMD_WRITE_FLUX, 7, 0, 1)) ser.write(x) ser.read(1) # Sync with Greaseweazle try: send_cmd(struct.pack("2B", CMD_GET_FLUX_STATUS, 2)) except CmdError as error: if error.code == ACK_FLUX_UNDERFLOW and retry < 5: retry += 1 print("Retry #%u..." % retry) continue; raise end = timer() #print("Track written in %f seconds" % (end-start)) break # read_flux: # Read flux timings from Greaseweazle for the current track. def read_flux(nr_idx): # Read the encoded flux stream, retrying if necessary. retry = 0 while True: start = timer() x = collections.deque() send_cmd(struct.pack("3B", CMD_READ_FLUX, 3, nr_idx)) nr = 0 while True: x += ser.read(1) x += ser.read(ser.in_waiting) nr += 1; if x[-1] == 0: break try: send_cmd(struct.pack("2B", CMD_GET_FLUX_STATUS, 2)) except CmdError as error: if error.code == ACK_FLUX_OVERFLOW and retry < 5: retry += 1 print("Retry #%u..." % retry) del x continue; raise end = timer() break #print("Read %u bytes in %u batches in %f seconds" % (len(x), nr, end-start)) # Decode the flux stream into a list of flux samples. start = timer() y = [] while x: i = x.popleft() if i < 250: y.append(i) elif i == 255: val = (x.popleft() & 254) >> 1 val += (x.popleft() & 254) << 6 val += (x.popleft() & 254) << 13 val += (x.popleft() & 254) << 20 y.append(val) else: val = (i - 249) * 250 val += x.popleft() - 1 y.append(val) assert y[-1] == 0 y = y[:-1] end = timer() #print("Processed %u flux values in %f seconds" % (len(y), end-start)) return y # flux_to_scp: # Converts Greaseweazle flux samples into a Supercard Pro Track. # Returns the Track Data Header (TDH) and the SCP "bitcell" array. def flux_to_scp(flux, track, nr_revs): factor = scp_freq / sample_freq index_times = get_index_times(nr_revs+1) tdh = struct.pack("<3sB", b"TRK", track) dat = bytearray() len_at_index = rev = 0 to_index = index_times[0] rem = 0.0 for x in flux: # Are we processing initial samples before the first revolution? if rev == 0: if to_index >= x: # Discard initial samples to_index -= x continue # Now starting the first full revolution rev = 1 to_index += index_times[rev] # Does the next flux interval cross the index mark? while to_index < x: # Append to the Track Data Header for the previous full revolution tdh += struct.pack(" nr_revs: # We're done: We simply discard any surplus flux samples return (tdh, dat) to_index += index_times[rev] # Process the current flux sample into SCP "bitcell" format to_index -= x y = x * factor + rem val = int(round(y)) if (val & 65535) == 0: val += 1 rem = y - val while val >= 65536: dat.append(0) dat.append(0) val -= 65536 dat.append(val>>8) dat.append(val&255) # Header for last track(s) in case we ran out of flux timings. while rev <= nr_revs: tdh += struct.pack("> (nr_sides - 1) side = track & (nr_sides - 1) print("\rReading Track %u.%u..." % (cyl, side), end="") trk_offs.append(len(trk_dat)) seek(cyl, side) flux = read_flux(args.revs+1) (tdh, dat) = flux_to_scp(flux, track, args.revs) trk_dat += tdh trk_dat += dat print() csum = 0 for x in trk_dat: csum += x trk_offs_dat = bytearray() for x in trk_offs: trk_offs_dat += struct.pack("> (nr_sides - 1) side = i & (nr_sides - 1) print("\rWriting Track %u.%u..." % (cyl, side), end="") if trk_offs[i] == 0: continue seek(cyl, side) thdr = struct.unpack("<3sBIII", dat[trk_offs[i]:trk_offs[i]+16]) (sig,_,_,samples,off) = thdr assert sig == b"TRK" tdat = dat[trk_offs[i]+off:trk_offs[i]+off+samples*2] flux = [] rem = 0.0 for i in range(0,len(tdat),2): x = tdat[i]*256 + tdat[i+1] if x == 0: rem += 65536.0 continue y = x * factor + rem val = int(round(y)) rem = y - val flux.append(val) write_flux(flux) print() # update_firmware: # Updates the Greaseweazle firmware using the specified Update File. def update_firmware(args): with open(args.file, "rb") as f: dat = f.read() (sig, maj, min, pad1, pad2, crc) = struct.unpack(">2s4BH", dat[-8:]) if len(dat) & 3 != 0 or sig != b'GW' or pad1 != 0 or pad2 != 0: print("%s: Bad update file" % (args.file)) return crc16 = crcmod.predefined.Crc('crc-ccitt-false') crc16.update(dat) if crc16.crcValue != 0: print("%s: Bad CRC" % (args.file)) print("Updating to v%u.%u..." % (maj, min)) send_cmd(struct.pack("<2BI", CMD_UPDATE, 6, len(dat))) ser.write(dat) (ack,) = struct.unpack("B", ser.read(1)) if ack != 0: print("** UPDATE FAILED: Please retry!") return print("Done.") print("** Disconnect Greaseweazle and remove the Programming Jumper.") # _main: # Argument processing and dispatch. def _main(argv): actions = { "read" : read_to_scp, "write" : write_from_scp, "update" : update_firmware } parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("action") parser.add_argument("--revs", type=int, default=3, help="number of revolutions to read per track") parser.add_argument("--scyl", type=int, default=0, help="first cylinder to read/write") parser.add_argument("--ecyl", type=int, default=81, help="last cylinder to read/write") parser.add_argument("--single-sided", action="store_true") parser.add_argument("file", help="in/out filename") parser.add_argument("device", help="serial device") args = parser.parse_args(argv[1:]) if not args.action in actions: print("** Action \"%s\" is not recognised" % args.action) print("Valid actions: ", end="") print(", ".join(str(key) for key in actions.keys())) return global ser ser = serial.Serial(args.device) ser.baudrate = BAUD_CLEAR_COMMS ser.baudrate = BAUD_NORMAL ser.reset_input_buffer() global sample_freq info = get_fw_info() sample_freq = info[4] update_mode = (info[2] == 0) print("** %s v%u.%u, Host Tools v%u.%u" % (("Greaseweazle","Bootloader")[update_mode], info[0], info[1], version.major, version.minor)) if (not update_mode and (version.major > info[0] or (version.major == info[0] and version.minor > info[1]))): print("Firmware is out of date: Require >= v%u.%u" % (version.major, version.minor)) print("Install the Update Jumper and \"update \"") return if update_mode and args.action != "update": print("Greaseweazle is in Firmware Update Mode:") print(" The only available action is \"update \"") if info[4] & 1: print(" Remove the Update Jumper for normal operation") else: print(" Main firmware is erased: You *must* perform an update!") return if not update_mode and args.action == "update": print("Greaseweazle is in Normal Mode:") print(" To \"update\" you must install the Update Jumper") return set_delays(step_delay=5000) print_delays(get_delays()) if not update_mode: drive_select(True) drive_motor(True) try: actions[args.action](args) except: if not update_mode: ser.reset_output_buffer() ser.baudrate = BAUD_CLEAR_COMMS ser.baudrate = BAUD_NORMAL ser.reset_input_buffer() drive_motor(False) drive_select(False) raise else: if not update_mode: drive_motor(False) drive_select(False) def main(argv): try: _main(argv) except CmdError as error: print("Command Failed: %s" % error) if __name__ == "__main__": main(sys.argv)