util.py 12 KB


  1. # greaseweazle/tools/util.py
  2. #
  3. # Greaseweazle control script: Utility functions.
  4. #
  5. # Written & released by Keir Fraser <keir.xen@gmail.com>
  6. #
  7. # This is free and unencumbered software released into the public domain.
  8. # See the file COPYING for more details, or visit <http://unlicense.org>.
  9. import argparse, os, sys, serial, struct, time, re, platform
  10. import importlib
  11. import serial.tools.list_ports
  12. from collections import OrderedDict
  13. from greaseweazle import version
  14. from greaseweazle import error
  15. from greaseweazle import usb as USB
  16. class CmdlineHelpFormatter(argparse.ArgumentDefaultsHelpFormatter,
  17. argparse.RawDescriptionHelpFormatter):
  18. def _get_help_string(self, action):
  19. help = action.help
  20. if '%no_default' in help:
  21. return help.replace('%no_default', '')
  22. if ('%(default)' in help
  23. or action.default is None
  24. or action.default is False
  25. or action.default is argparse.SUPPRESS):
  26. return help
  27. return help + ' (default: %(default)s)'
  28. class ArgumentParser(argparse.ArgumentParser):
  29. def __init__(self, formatter_class=CmdlineHelpFormatter, *args, **kwargs):
  30. return super().__init__(formatter_class=formatter_class,
  31. *args, **kwargs)
  32. def drive_letter(letter):
  33. types = {
  34. 'A': (USB.BusType.IBMPC, 0),
  35. 'B': (USB.BusType.IBMPC, 1),
  36. '0': (USB.BusType.Shugart, 0),
  37. '1': (USB.BusType.Shugart, 1),
  38. '2': (USB.BusType.Shugart, 2)
  39. }
  40. if not letter.upper() in types:
  41. raise argparse.ArgumentTypeError("invalid drive letter: '%s'" % letter)
  42. return types[letter.upper()]
  43. def range_str(l):
  44. if len(l) == 0:
  45. return '<none>'
  46. p, str = None, ''
  47. for i in l:
  48. if p is not None and i == p[1]+1:
  49. p = p[0], i
  50. continue
  51. if p is not None:
  52. str += ('%d,' % p[0]) if p[0] == p[1] else ('%d-%d,' % p)
  53. p = (i,i)
  54. if p is not None:
  55. str += ('%d' % p[0]) if p[0] == p[1] else ('%d-%d' % p)
  56. return str
  57. class TrackSet:
  58. class TrackIter:
  59. """Iterate over a TrackSet in physical <cyl,head> order."""
  60. def __init__(self, ts):
  61. l = []
  62. for c in ts.cyls:
  63. for h in ts.heads:
  64. pc = c*ts.step + ts.h_off[h]
  65. l.append((pc, h, c))
  66. l.sort()
  67. self.l = iter(l)
  68. def __next__(self):
  69. self.physical_cyl, self.head, self.cyl = next(self.l)
  70. return self
  71. def __init__(self, trackspec):
  72. self.cyls = list()
  73. self.heads = list()
  74. self.h_off = [0]*2
  75. self.step = 1
  76. self.trackspec = ''
  77. self.update_from_trackspec(trackspec)
  78. def update_from_trackspec(self, trackspec):
  79. """Update a TrackSet based on a trackspec."""
  80. self.trackspec += trackspec
  81. for x in trackspec.split(':'):
  82. k,v = x.split('=')
  83. if k == 'c':
  84. cyls = [False]*100
  85. for crange in v.split(','):
  86. m = re.match('(\d\d?)(-(\d\d?))?$', crange)
  87. if m is None: raise ValueError()
  88. if m.group(3) is None:
  89. s,e = int(m.group(1)), int(m.group(1))
  90. else:
  91. s,e = int(m.group(1)), int(m.group(3))
  92. for c in range(s, e+1):
  93. cyls[c] = True
  94. self.cyls = []
  95. for c in range(len(cyls)):
  96. if cyls[c]: self.cyls.append(c)
  97. elif k == 'h':
  98. heads = [False]*2
  99. for hrange in v.split(','):
  100. m = re.match('([01])(-([01]))?$', hrange)
  101. if m is None: raise ValueError()
  102. if m.group(3) is None:
  103. s,e = int(m.group(1)), int(m.group(1))
  104. else:
  105. s,e = int(m.group(1)), int(m.group(3))
  106. for h in range(s, e+1):
  107. heads[h] = True
  108. self.heads = []
  109. for h in range(len(heads)):
  110. if heads[h]: self.heads.append(h)
  111. elif re.match('h[01].off$', k):
  112. h = int(re.match('h([01]).off$', k).group(1))
  113. m = re.match('([+-][\d])$', v)
  114. if m is None: raise ValueError()
  115. self.h_off[h] = int(m.group(1))
  116. elif k == 'step':
  117. self.step = int(v)
  118. if self.step <= 0: raise ValueError()
  119. else:
  120. raise ValueError()
  121. def __str__(self):
  122. s = 'c=%s' % range_str(self.cyls)
  123. s += ':h=%s' % range_str(self.heads)
  124. for i in range(len(self.h_off)):
  125. x = self.h_off[i]
  126. if x != 0:
  127. s += ':h%d.off=%s%d' % (i, '+' if x >= 0 else '', x)
  128. if self.step != 1: s += ':step=%d' % self.step
  129. return s
  130. def __iter__(self):
  131. return self.TrackIter(self)
  132. def split_opts(seq):
  133. """Splits a name from its list of options."""
  134. parts = seq.split('::')
  135. name, opts = parts[0], dict()
  136. for x in map(lambda x: x.split(':'), parts[1:]):
  137. for y in x:
  138. try:
  139. opt, val = y.split('=')
  140. except ValueError:
  141. opt, val = y, True
  142. if opt:
  143. opts[opt] = val
  144. return name, opts
  145. image_types = OrderedDict(
  146. { '.adf': 'ADF',
  147. '.ads': ('ADS','acorn'),
  148. '.adm': ('ADM','acorn'),
  149. '.adl': ('ADL','acorn'),
  150. '.d81': 'D81',
  151. '.dsd': ('DSD','acorn'),
  152. '.dsk': 'EDSK',
  153. '.hfe': 'HFE',
  154. '.ima': 'IMG',
  155. '.img': 'IMG',
  156. '.ipf': 'IPF',
  157. '.raw': 'KryoFlux',
  158. '.scp': 'SCP',
  159. '.ssd': ('SSD','acorn'),
  160. '.st' : 'IMG' })
  161. def get_image_class(name):
  162. if os.path.isdir(name):
  163. typespec = 'KryoFlux'
  164. else:
  165. _, ext = os.path.splitext(name)
  166. error.check(ext.lower() in image_types,
  167. """\
  168. %s: Unrecognised file suffix '%s'
  169. Known suffixes: %s"""
  170. % (name, ext, ', '.join(image_types)))
  171. typespec = image_types[ext.lower()]
  172. if isinstance(typespec, tuple):
  173. typename, classname = typespec
  174. else:
  175. typename, classname = typespec, typespec.lower()
  176. mod = importlib.import_module('greaseweazle.image.' + classname)
  177. return mod.__dict__[typename]
  178. def with_drive_selected(fn, usb, args, *_args, **_kwargs):
  179. usb.set_bus_type(args.drive[0])
  180. try:
  181. usb.drive_select(args.drive[1])
  182. usb.drive_motor(args.drive[1], _kwargs.pop('motor', True))
  183. fn(usb, args, *_args, **_kwargs)
  184. except KeyboardInterrupt:
  185. print()
  186. usb.reset()
  187. raise
  188. finally:
  189. usb.drive_motor(args.drive[1], False)
  190. usb.drive_deselect()
  191. def valid_ser_id(ser_id):
  192. return ser_id and ser_id.upper().startswith("GW")
  193. def score_port(x, old_port=None):
  194. score = 0
  195. if x.manufacturer == "Keir Fraser" and x.product == "Greaseweazle":
  196. score = 20
  197. elif x.vid == 0x1209 and x.pid == 0x4d69:
  198. # Our very own properly-assigned PID. Guaranteed to be us.
  199. score = 20
  200. elif x.vid == 0x1209 and x.pid == 0x0001:
  201. # Our old shared Test PID. It's not guaranteed to be us.
  202. score = 10
  203. if score > 0 and valid_ser_id(x.serial_number):
  204. # A valid serial id is a good sign unless this is a reopen, and
  205. # the serials don't match!
  206. if not old_port or not valid_ser_id(old_port.serial_number):
  207. score = 20
  208. elif x.serial_number == old_port.serial_number:
  209. score = 30
  210. else:
  211. score = 0
  212. if old_port and old_port.location:
  213. # If this is a reopen, location field must match. A match is not
  214. # sufficient in itself however, as Windows may supply the same
  215. # location for multiple USB ports (this may be an interaction with
  216. # BitDefender). Hence we do not increase the port's score here.
  217. if not x.location or x.location != old_port.location:
  218. score = 0
  219. return score
  220. def find_port(old_port=None):
  221. best_score, best_port = 0, None
  222. for x in serial.tools.list_ports.comports():
  223. score = score_port(x, old_port)
  224. if score > best_score:
  225. best_score, best_port = score, x
  226. if best_port:
  227. return best_port.device
  228. raise serial.SerialException('Cannot find the Greaseweazle device')
  229. def port_info(devname):
  230. for x in serial.tools.list_ports.comports():
  231. if x.device == devname:
  232. return x
  233. return None
  234. def usb_reopen(usb, is_update):
  235. mode = { False: 1, True: 0 }
  236. try:
  237. usb.switch_fw_mode(mode[is_update])
  238. except (serial.SerialException, struct.error):
  239. # Mac and Linux raise SerialException ("... returned no data")
  240. # Win10 pyserial returns a short read which fails struct.unpack
  241. pass
  242. usb.ser.close()
  243. for i in range(10):
  244. time.sleep(0.5)
  245. try:
  246. devicename = find_port(usb.port_info)
  247. new_ser = serial.Serial(devicename)
  248. except serial.SerialException:
  249. # Device not found
  250. pass
  251. else:
  252. new_usb = USB.Unit(new_ser)
  253. new_usb.port_info = port_info(devicename)
  254. new_usb.jumperless_update = usb.jumperless_update
  255. return new_usb
  256. raise serial.SerialException('Could not reopen port after mode switch')
  257. def print_update_instructions(usb):
  258. print("To perform an Update:")
  259. if not usb.jumperless_update:
  260. print(" - Disconnect from USB")
  261. print(" - Install the Update Jumper at pins %s"
  262. % ("RXI-TXO" if usb.hw_model != 1 else "DCLK-GND"))
  263. print(" - Reconnect to USB")
  264. print(" - Run \"gw update\" to install firmware v%u.%u" %
  265. (version.major, version.minor))
  266. def usb_open(devicename, is_update=False, mode_check=True):
  267. if devicename is None:
  268. devicename = find_port()
  269. usb = USB.Unit(serial.Serial(devicename))
  270. usb.port_info = port_info(devicename)
  271. is_win7 = (platform.system() == 'Windows' and platform.release() == '7')
  272. usb.jumperless_update = ((usb.hw_model, usb.hw_submodel) != (1, 0)
  273. and not is_win7)
  274. if not mode_check:
  275. return usb
  276. if usb.update_mode and not is_update:
  277. if usb.jumperless_update and not usb.update_jumpered:
  278. usb = usb_reopen(usb, is_update)
  279. if not usb.update_mode:
  280. return usb
  281. print("ERROR: Greaseweazle is in Firmware Update Mode")
  282. print(" - The only available action is \"gw update\"")
  283. if usb.update_jumpered:
  284. print(" - For normal operation disconnect from USB and remove "
  285. "the Update Jumper at pins %s"
  286. % ("RXI-TXO" if usb.hw_model != 1 else "DCLK-GND"))
  287. else:
  288. print(" - Main firmware is erased: You *must* perform an update!")
  289. sys.exit(1)
  290. if is_update and not usb.update_mode:
  291. if usb.jumperless_update:
  292. usb = usb_reopen(usb, is_update)
  293. error.check(usb.update_mode, """\
  294. Greaseweazle F7 did not change to Firmware Update Mode as requested.
  295. If the problem persists, install the Update Jumper at pins RXI-TXO.""")
  296. return usb
  297. print("ERROR: Greaseweazle is not in Firmware Update Mode")
  298. print_update_instructions(usb)
  299. sys.exit(1)
  300. if not usb.update_mode and usb.update_needed:
  301. print("ERROR: Greaseweazle firmware v%u.%u is unsupported"
  302. % (usb.major, usb.minor))
  303. print_update_instructions(usb)
  304. sys.exit(1)
  305. return usb
  306. # Local variables:
  307. # python-indent: 4
  308. # End: